]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/crimson/osd/replicated_recovery_backend.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / replicated_recovery_backend.cc
index 0812003bb311a2362bc19435be2d628264f83452..dae4c8ef853f85980faa674233f2ae3fb7b5f2f0 100644 (file)
@@ -1,5 +1,5 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
+// vim: ts=8 sw=2 smarttab expandtab
 
 #include <fmt/format.h>
 #include <fmt/ostream.h>
@@ -18,7 +18,12 @@ namespace {
   }
 }
 
-seastar::future<> ReplicatedRecoveryBackend::recover_object(
+using std::less;
+using std::map;
+using std::string;
+
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::recover_object(
   const hobject_t& soid,
   eversion_t need)
 {
@@ -26,7 +31,7 @@ seastar::future<> ReplicatedRecoveryBackend::recover_object(
   // always add_recovering(soid) before recover_object(soid)
   assert(is_recovering(soid));
   // start tracking the recovery of soid
-  return maybe_pull_missing_obj(soid, need).then([this, soid, need] {
+  return maybe_pull_missing_obj(soid, need).then_interruptible([this, soid, need] {
     logger().debug("recover_object: loading obc: {}", soid);
     return pg.with_head_obc<RWState::RWREAD>(soid,
       [this, soid, need](auto obc) {
@@ -35,7 +40,7 @@ seastar::future<> ReplicatedRecoveryBackend::recover_object(
       recovery_waiter.obc = obc;
       recovery_waiter.obc->wait_recovery_read();
       return maybe_push_shards(soid, need);
-    }).handle_error(
+    }).handle_error_interruptible(
       crimson::osd::PG::load_obc_ertr::all_same_way([soid](auto& code) {
       // TODO: may need eio handling?
       logger().error("recover_object saw error code {}, ignoring object {}",
@@ -44,42 +49,47 @@ seastar::future<> ReplicatedRecoveryBackend::recover_object(
   });
 }
 
-seastar::future<>
+RecoveryBackend::interruptible_future<>
 ReplicatedRecoveryBackend::maybe_push_shards(
   const hobject_t& soid,
   eversion_t need)
 {
-  return seastar::parallel_for_each(get_shards_to_push(soid),
+  return interruptor::parallel_for_each(get_shards_to_push(soid),
     [this, need, soid](auto shard) {
-    return prep_push(soid, need, shard).then([this, soid, shard](auto push) {
-      auto msg = make_message<MOSDPGPush>();
+    return prep_push(soid, need, shard).then_interruptible([this, soid, shard](auto push) {
+      auto msg = crimson::make_message<MOSDPGPush>();
       msg->from = pg.get_pg_whoami();
       msg->pgid = pg.get_pgid();
       msg->map_epoch = pg.get_osdmap_epoch();
       msg->min_epoch = pg.get_last_peering_reset();
       msg->pushes.push_back(std::move(push));
       msg->set_priority(pg.get_recovery_op_priority());
-      return shard_services.send_to_osd(shard.osd,
-                                        std::move(msg),
-                                        pg.get_osdmap_epoch()).then(
+      return interruptor::make_interruptible(
+         shard_services.send_to_osd(shard.osd,
+                                    std::move(msg),
+                                    pg.get_osdmap_epoch()))
+      .then_interruptible(
         [this, soid, shard] {
         return recovering.at(soid).wait_for_pushes(shard);
       });
     });
-  }).then([this, soid] {
+  }).then_interruptible([this, soid] {
     auto &recovery = recovering.at(soid);
-    auto push_info = recovery.pushing.begin();
-    object_stat_sum_t stat = {};
-    if (push_info != recovery.pushing.end()) {
-      stat = push_info->second.stat;
+    if (auto push_info = recovery.pushing.begin();
+        push_info != recovery.pushing.end()) {
+      pg.get_recovery_handler()->on_global_recover(soid,
+                                                   push_info->second.stat,
+                                                   false);
+    } else if (recovery.pi) {
+      // no push happened (empty get_shards_to_push()) but pull actually did
+      pg.get_recovery_handler()->on_global_recover(soid,
+                                                   recovery.pi->stat,
+                                                   false);
     } else {
-      // no push happened, take pull_info's stat
-      assert(recovery.pi);
-      stat = recovery.pi->stat;
+      // no pulls, no pushes
     }
-    pg.get_recovery_handler()->on_global_recover(soid, stat, false);
     return seastar::make_ready_future<>();
-  }).handle_exception([this, soid](auto e) {
+  }).handle_exception_interruptible([this, soid](auto e) {
     auto &recovery = recovering.at(soid);
     if (recovery.obc) {
       recovery.obc->drop_recovery_read();
@@ -89,7 +99,7 @@ ReplicatedRecoveryBackend::maybe_push_shards(
   });
 }
 
-seastar::future<>
+RecoveryBackend::interruptible_future<>
 ReplicatedRecoveryBackend::maybe_pull_missing_obj(
   const hobject_t& soid,
   eversion_t need)
@@ -103,33 +113,35 @@ ReplicatedRecoveryBackend::maybe_pull_missing_obj(
   recovery_waiter.pi = std::make_optional<RecoveryBackend::PullInfo>();
   auto& pi = *recovery_waiter.pi;
   prepare_pull(po, pi, soid, need);
-  auto msg = make_message<MOSDPGPull>();
+  auto msg = crimson::make_message<MOSDPGPull>();
   msg->from = pg.get_pg_whoami();
   msg->set_priority(pg.get_recovery_op_priority());
   msg->pgid = pg.get_pgid();
   msg->map_epoch = pg.get_osdmap_epoch();
   msg->min_epoch = pg.get_last_peering_reset();
   msg->set_pulls({std::move(po)});
-  return shard_services.send_to_osd(
-    pi.from.osd,
-    std::move(msg),
-    pg.get_osdmap_epoch()
-  ).then([&recovery_waiter] {
+  return interruptor::make_interruptible(
+    shard_services.send_to_osd(
+      pi.from.osd,
+      std::move(msg),
+      pg.get_osdmap_epoch()
+  )).then_interruptible([&recovery_waiter] {
     return recovery_waiter.wait_for_pull();
   });
 }
 
-seastar::future<> ReplicatedRecoveryBackend::push_delete(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::push_delete(
   const hobject_t& soid,
   eversion_t need)
 {
   logger().debug("{}: {}, {}", __func__, soid, need);
-  recovering[soid];
   epoch_t min_epoch = pg.get_last_peering_reset();
 
   assert(pg.get_acting_recovery_backfill().size() > 0);
-  return seastar::parallel_for_each(pg.get_acting_recovery_backfill(),
-    [this, soid, need, min_epoch](pg_shard_t shard) {
+  return interruptor::parallel_for_each(pg.get_acting_recovery_backfill(),
+    [this, soid, need, min_epoch](pg_shard_t shard)
+    -> interruptible_future<> {
     if (shard == pg.get_pg_whoami())
       return seastar::make_ready_future<>();
     auto iter = pg.get_shard_missing().find(shard);
@@ -139,12 +151,13 @@ seastar::future<> ReplicatedRecoveryBackend::push_delete(
       logger().debug("push_delete: will remove {} from {}", soid, shard);
       pg.begin_peer_recover(shard, soid);
       spg_t target_pg(pg.get_info().pgid.pgid, shard.shard);
-      auto msg = make_message<MOSDPGRecoveryDelete>(
+      auto msg = crimson::make_message<MOSDPGRecoveryDelete>(
          pg.get_pg_whoami(), target_pg, pg.get_osdmap_epoch(), min_epoch);
       msg->set_priority(pg.get_recovery_op_priority());
       msg->objects.push_back(std::make_pair(soid, need));
-      return shard_services.send_to_osd(shard.osd, std::move(msg),
-                                       pg.get_osdmap_epoch()).then(
+      return interruptor::make_interruptible(
+         shard_services.send_to_osd(shard.osd, std::move(msg),
+                                    pg.get_osdmap_epoch())).then_interruptible(
        [this, soid, shard] {
        return recovering.at(soid).wait_for_pushes(shard);
       });
@@ -153,15 +166,17 @@ seastar::future<> ReplicatedRecoveryBackend::push_delete(
   });
 }
 
-seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_recovery_delete(
   Ref<MOSDPGRecoveryDelete> m)
 {
   logger().debug("{}: {}", __func__, *m);
 
   auto& p = m->objects.front(); //TODO: only one delete per message for now.
-  return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch()).then(
+  return local_recover_delete(p.first, p.second, pg.get_osdmap_epoch())
+  .then_interruptible(
     [this, m] {
-    auto reply = make_message<MOSDPGRecoveryDeleteReply>();
+    auto reply = crimson::make_message<MOSDPGRecoveryDeleteReply>();
     reply->from = pg.get_pg_whoami();
     reply->set_priority(m->get_priority());
     reply->pgid = spg_t(pg.get_info().pgid.pgid, m->from.shard);
@@ -172,7 +187,8 @@ seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete(
   });
 }
 
-seastar::future<> ReplicatedRecoveryBackend::on_local_recover_persist(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::on_local_recover_persist(
   const hobject_t& soid,
   const ObjectRecoveryInfo& _recovery_info,
   bool is_delete,
@@ -181,32 +197,38 @@ seastar::future<> ReplicatedRecoveryBackend::on_local_recover_persist(
   logger().debug("{}", __func__);
   ceph::os::Transaction t;
   pg.get_recovery_handler()->on_local_recover(soid, _recovery_info, is_delete, t);
-  return shard_services.get_store().do_transaction(coll, std::move(t)).then(
+  logger().debug("ReplicatedRecoveryBackend::on_local_recover_persist: do_transaction...");
+  return interruptor::make_interruptible(
+      shard_services.get_store().do_transaction(coll, std::move(t)))
+  .then_interruptible(
     [this, epoch_frozen, last_complete = pg.get_info().last_complete] {
     pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
     return seastar::make_ready_future<>();
   });
 }
 
-seastar::future<> ReplicatedRecoveryBackend::local_recover_delete(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::local_recover_delete(
   const hobject_t& soid,
   eversion_t need,
   epoch_t epoch_to_freeze)
 {
   logger().debug("{}: {}, {}", __func__, soid, need);
-  return backend->load_metadata(soid).safe_then([this]
-    (auto lomt) {
+  return backend->load_metadata(soid).safe_then_interruptible([this]
+    (auto lomt) -> interruptible_future<> {
     if (lomt->os.exists) {
       return seastar::do_with(ceph::os::Transaction(),
        [this, lomt = std::move(lomt)](auto& txn) {
-       return backend->remove(lomt->os, txn).then([this, &txn]() mutable {
+       return backend->remove(lomt->os, txn).then_interruptible(
+         [this, &txn]() mutable {
+         logger().debug("ReplicatedRecoveryBackend::local_recover_delete: do_transaction...");
          return shard_services.get_store().do_transaction(coll,
                                                           std::move(txn));
        });
       });
     }
     return seastar::make_ready_future<>();
-  }).safe_then([this, soid, epoch_to_freeze, need] {
+  }).safe_then_interruptible([this, soid, epoch_to_freeze, need] {
     ObjectRecoveryInfo recovery_info;
     recovery_info.soid = soid;
     recovery_info.version = need;
@@ -223,7 +245,8 @@ seastar::future<> ReplicatedRecoveryBackend::local_recover_delete(
   );
 }
 
-seastar::future<> ReplicatedRecoveryBackend::recover_delete(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::recover_delete(
   const hobject_t &soid, eversion_t need)
 {
   logger().debug("{}: {}, {}", __func__, soid, need);
@@ -231,8 +254,9 @@ seastar::future<> ReplicatedRecoveryBackend::recover_delete(
   epoch_t cur_epoch = pg.get_osdmap_epoch();
   return seastar::do_with(object_stat_sum_t(),
     [this, soid, need, cur_epoch](auto& stat_diff) {
-    return local_recover_delete(soid, need, cur_epoch).then(
-      [this, &stat_diff, cur_epoch, soid, need] {
+    return local_recover_delete(soid, need, cur_epoch).then_interruptible(
+      [this, &stat_diff, cur_epoch, soid, need]()
+      -> interruptible_future<> {
       if (!pg.has_reset_since(cur_epoch)) {
        bool object_missing = false;
        for (const auto& shard : pg.get_acting_recovery_backfill()) {
@@ -254,14 +278,14 @@ seastar::future<> ReplicatedRecoveryBackend::recover_delete(
        }
       }
       return seastar::make_ready_future<>();
-    }).then([this, soid, &stat_diff] {
+    }).then_interruptible([this, soid, &stat_diff] {
       pg.get_recovery_handler()->on_global_recover(soid, stat_diff, true);
       return seastar::make_ready_future<>();
     });
   });
 }
 
-seastar::future<PushOp>
+RecoveryBackend::interruptible_future<PushOp>
 ReplicatedRecoveryBackend::prep_push(
   const hobject_t& soid,
   eversion_t need,
@@ -276,14 +300,12 @@ ReplicatedRecoveryBackend::prep_push(
     data_subset.insert(0, obc->obs.oi.size);
   }
   const auto& missing = pg.get_shard_missing().find(pg_shard)->second;
-  if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS)) {
-    const auto it = missing.get_items().find(soid);
-    assert(it != missing.get_items().end());
-    data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
-    logger().debug("prep_push: {} data_subset {}", soid, data_subset);
-  }
+  const auto it = missing.get_items().find(soid);
+  assert(it != missing.get_items().end());
+  data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
+  logger().debug("prep_push: {} data_subset {} to {}",
+                 soid, data_subset, pg_shard);
 
-  logger().debug("prep_push: {} to {}", soid, pg_shard);
   auto& pi = recovery_waiter.pushing[pg_shard];
   pg.begin_peer_recover(pg_shard, soid);
   const auto pmissing_iter = pg.get_shard_missing().find(pg_shard);
@@ -299,10 +321,9 @@ ReplicatedRecoveryBackend::prep_push(
   pi.recovery_info.object_exist =
     missing_iter->second.clean_regions.object_is_exist();
   pi.recovery_progress.omap_complete =
-    (!missing_iter->second.clean_regions.omap_is_dirty() &&
-     HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS));
+    !missing_iter->second.clean_regions.omap_is_dirty();
 
-  return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then(
+  return build_push_op(pi.recovery_info, pi.recovery_progress, &pi.stat).then_interruptible(
     [this, soid, pg_shard](auto pop) {
     auto& recovery_waiter = recovering.at(soid);
     auto& pi = recovery_waiter.pushing[pg_shard];
@@ -323,9 +344,8 @@ void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, PullInfo& pi,
 
   //TODO: skipped snap objects case for now
   po.recovery_info.copy_subset.insert(0, (uint64_t) -1);
-  if (HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS))
-    po.recovery_info.copy_subset.intersection_of(
-       missing_iter->second.clean_regions.get_dirty_regions());
+  po.recovery_info.copy_subset.intersection_of(
+    missing_iter->second.clean_regions.get_dirty_regions());
   po.recovery_info.size = ((uint64_t) -1);
   po.recovery_info.object_exist =
     missing_iter->second.clean_regions.object_is_exist();
@@ -333,8 +353,7 @@ void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, PullInfo& pi,
   po.soid = soid;
   po.recovery_progress.data_complete = false;
   po.recovery_progress.omap_complete =
-    !missing_iter->second.clean_regions.omap_is_dirty() &&
-    HAVE_FEATURE(pg.min_peer_features(), SERVER_OCTOPUS);
+    !missing_iter->second.clean_regions.omap_is_dirty();
   po.recovery_progress.data_recovered_to = 0;
   po.recovery_progress.first = true;
 
@@ -344,7 +363,8 @@ void ReplicatedRecoveryBackend::prepare_pull(PullOp& po, PullInfo& pi,
   pi.recovery_progress = po.recovery_progress;
 }
 
-seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op(
+RecoveryBackend::interruptible_future<PushOp>
+ReplicatedRecoveryBackend::build_push_op(
     const ObjectRecoveryInfo& recovery_info,
     const ObjectRecoveryProgress& progress,
     object_stat_sum_t* stat)
@@ -357,10 +377,10 @@ seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op(
                          recovery_info.version,
                          PushOp(),
     [this, &recovery_info, &progress, stat]
-    (auto new_progress, auto available, auto v, auto pop) {
+    (auto& new_progress, auto& available, auto& v, auto& pop) {
     return read_metadata_for_push_op(recovery_info.soid,
                                      progress, new_progress,
-                                     v, &pop).then([&](eversion_t local_ver) mutable {
+                                     v, &pop).then_interruptible([&](eversion_t local_ver) mutable {
       // If requestor didn't know the version, use ours
       if (v == eversion_t()) {
         v = local_ver;
@@ -373,14 +393,14 @@ seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op(
                                    progress,
                                    new_progress,
                                    &available, &pop);
-    }).then([this, &recovery_info, &progress, &available, &pop]() mutable {
+    }).then_interruptible([this, &recovery_info, &progress, &available, &pop]() mutable {
       logger().debug("build_push_op: available: {}, copy_subset: {}",
                     available, recovery_info.copy_subset);
       return read_object_for_push_op(recovery_info.soid,
                                     recovery_info.copy_subset,
                                     progress.data_recovered_to,
                                     available, &pop);
-    }).then([&recovery_info, &v, &progress, &new_progress, stat, &pop]
+    }).then_interruptible([&recovery_info, &v, &progress, &new_progress, stat, &pop]
             (uint64_t recovered_to) mutable {
       new_progress.data_recovered_to = recovered_to;
       if (new_progress.is_complete(recovery_info)) {
@@ -408,7 +428,7 @@ seastar::future<PushOp> ReplicatedRecoveryBackend::build_push_op(
   });
 }
 
-seastar::future<eversion_t>
+RecoveryBackend::interruptible_future<eversion_t>
 ReplicatedRecoveryBackend::read_metadata_for_push_op(
     const hobject_t& oid,
     const ObjectRecoveryProgress& progress,
@@ -416,41 +436,44 @@ ReplicatedRecoveryBackend::read_metadata_for_push_op(
     eversion_t ver,
     PushOp* push_op)
 {
+  logger().debug("{}, {}", __func__, oid);
   if (!progress.first) {
     return seastar::make_ready_future<eversion_t>(ver);
   }
-  return seastar::when_all_succeed(
-    backend->omap_get_header(coll, ghobject_t(oid)).handle_error(
-      crimson::os::FuturizedStore::read_errorator::all_same_way(
-        [] (const std::error_code& e) {
-        return seastar::make_ready_future<bufferlist>();
-      })),
-    store->get_attrs(coll, ghobject_t(oid)).handle_error(
-      crimson::os::FuturizedStore::get_attrs_ertr::all_same_way(
-        [] (const std::error_code& e) {
-        return seastar::make_ready_future<crimson::os::FuturizedStore::attrs_t>();
-      }))
-  ).then_unpack([&new_progress, push_op](auto bl, auto attrs) {
+  return interruptor::make_interruptible(interruptor::when_all_succeed(
+      backend->omap_get_header(coll, ghobject_t(oid)).handle_error_interruptible<false>(
+       crimson::os::FuturizedStore::read_errorator::all_same_way(
+         [oid] (const std::error_code& e) {
+         logger().debug("read_metadata_for_push_op, error {} when getting omap header: {}", e, oid);
+         return seastar::make_ready_future<bufferlist>();
+       })),
+      interruptor::make_interruptible(store->get_attrs(coll, ghobject_t(oid)))
+      .handle_error_interruptible<false>(
+       crimson::os::FuturizedStore::get_attrs_ertr::all_same_way(
+         [oid] (const std::error_code& e) {
+         logger().debug("read_metadata_for_push_op, error {} when getting attrs: {}", e, oid);
+         return seastar::make_ready_future<crimson::os::FuturizedStore::attrs_t>();
+       }))
+  )).then_unpack_interruptible([&new_progress, push_op](auto bl, auto attrs) {
     if (bl.length() == 0) {
-      logger().error("read_metadata_for_push_op: fail to read omap header");
-      return eversion_t{};
+      logger().warn("read_metadata_for_push_op: fail to read omap header");
     } else if (attrs.empty()) {
       logger().error("read_metadata_for_push_op: fail to read attrs");
       return eversion_t{};
     }
     push_op->omap_header.claim_append(std::move(bl));
-    for (auto&& [key, val] : std::move(attrs)) {
-      push_op->attrset[key].push_back(val);
+    for (auto&& [key, val] : attrs) {
+      push_op->attrset.emplace(std::move(key), std::move(val));
     }
     logger().debug("read_metadata_for_push_op: {}", push_op->attrset[OI_ATTR]);
     object_info_t oi;
-    oi.decode(push_op->attrset[OI_ATTR]);
+    oi.decode_no_oid(push_op->attrset[OI_ATTR]);
     new_progress.first = false;
     return oi.version;
   });
 }
 
-seastar::future<uint64_t>
+RecoveryBackend::interruptible_future<uint64_t>
 ReplicatedRecoveryBackend::read_object_for_push_op(
     const hobject_t& oid,
     const interval_set<uint64_t>& copy_subset,
@@ -464,7 +487,7 @@ ReplicatedRecoveryBackend::read_object_for_push_op(
   }
   // 1. get the extents in the interested range
   return backend->fiemap(coll, ghobject_t{oid},
-                         0, copy_subset.range_end()).then_wrapped(
+                         0, copy_subset.range_end()).then_wrapped_interruptible(
     [=](auto&& fiemap_included) mutable {
     interval_set<uint64_t> extents;
     try {
@@ -481,7 +504,7 @@ ReplicatedRecoveryBackend::read_object_for_push_op(
     // 3. read the truncated extents
     // TODO: check if the returned extents are pruned
     return store->readv(coll, ghobject_t{oid}, push_op->data_included, 0);
-  }).safe_then([push_op, range_end=copy_subset.range_end()](auto &&bl) {
+  }).safe_then_interruptible([push_op, range_end=copy_subset.range_end()](auto &&bl) {
     push_op->data.claim_append(std::move(bl));
     uint64_t recovered_to = 0;
     if (push_op->data_included.empty()) {
@@ -498,7 +521,7 @@ ReplicatedRecoveryBackend::read_object_for_push_op(
   }));
 }
 
-seastar::future<>
+RecoveryBackend::interruptible_future<>
 ReplicatedRecoveryBackend::read_omap_for_push_op(
     const hobject_t& oid,
     const ObjectRecoveryProgress& progress,
@@ -567,47 +590,55 @@ ReplicatedRecoveryBackend::get_shards_to_push(const hobject_t& soid) const
   return shards;
 }
 
-seastar::future<> ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_pull(Ref<MOSDPGPull> m)
 {
   logger().debug("{}: {}", __func__, *m);
-  return seastar::parallel_for_each(m->take_pulls(),
-                                   [this, from=m->from](auto& pull_op) {
-    const hobject_t& soid = pull_op.soid;
-    logger().debug("handle_pull: {}", soid);
-    return backend->stat(coll, ghobject_t(soid)).then(
-      [this, &pull_op](auto st) {
-      ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
-      ObjectRecoveryProgress &progress = pull_op.recovery_progress;
-      if (progress.first && recovery_info.size == ((uint64_t) -1)) {
-        // Adjust size and copy_subset
-        recovery_info.size = st.st_size;
-        if (st.st_size) {
-          interval_set<uint64_t> object_range;
-          object_range.insert(0, st.st_size);
-          recovery_info.copy_subset.intersection_of(object_range);
-        } else {
-          recovery_info.copy_subset.clear();
+  if (pg.can_discard_replica_op(*m)) {
+    logger().debug("{}: discarding {}", __func__, *m);
+    return seastar::now();
+  }
+  return seastar::do_with(m->take_pulls(), [this, from=m->from](auto& pulls) {
+    return interruptor::parallel_for_each(pulls,
+                                      [this, from](auto& pull_op) {
+      const hobject_t& soid = pull_op.soid;
+      logger().debug("handle_pull: {}", soid);
+      return backend->stat(coll, ghobject_t(soid)).then_interruptible(
+        [this, &pull_op](auto st) {
+        ObjectRecoveryInfo &recovery_info = pull_op.recovery_info;
+        ObjectRecoveryProgress &progress = pull_op.recovery_progress;
+        if (progress.first && recovery_info.size == ((uint64_t) -1)) {
+          // Adjust size and copy_subset
+          recovery_info.size = st.st_size;
+          if (st.st_size) {
+            interval_set<uint64_t> object_range;
+            object_range.insert(0, st.st_size);
+            recovery_info.copy_subset.intersection_of(object_range);
+          } else {
+            recovery_info.copy_subset.clear();
+          }
+          assert(recovery_info.clone_subset.empty());
         }
-        assert(recovery_info.clone_subset.empty());
-      }
-      return build_push_op(recovery_info, progress, 0);
-    }).then([this, from](auto pop) {
-      auto msg = make_message<MOSDPGPush>();
-      msg->from = pg.get_pg_whoami();
-      msg->pgid = pg.get_pgid();
-      msg->map_epoch = pg.get_osdmap_epoch();
-      msg->min_epoch = pg.get_last_peering_reset();
-      msg->set_priority(pg.get_recovery_op_priority());
-      msg->pushes.push_back(std::move(pop));
-      return shard_services.send_to_osd(from.osd, std::move(msg),
-                                        pg.get_osdmap_epoch());
+        return build_push_op(recovery_info, progress, 0);
+      }).then_interruptible([this, from](auto pop) {
+        auto msg = crimson::make_message<MOSDPGPush>();
+        msg->from = pg.get_pg_whoami();
+        msg->pgid = pg.get_pgid();
+        msg->map_epoch = pg.get_osdmap_epoch();
+        msg->min_epoch = pg.get_last_peering_reset();
+        msg->set_priority(pg.get_recovery_op_priority());
+        msg->pushes.push_back(std::move(pop));
+        return shard_services.send_to_osd(from.osd, std::move(msg),
+                                          pg.get_osdmap_epoch());
+      });
     });
   });
 }
 
-seastar::future<bool> ReplicatedRecoveryBackend::_handle_pull_response(
+RecoveryBackend::interruptible_future<bool>
+ReplicatedRecoveryBackend::_handle_pull_response(
   pg_shard_t from,
-  const PushOp& pop,
+  PushOp& pop,
   PullOp* response,
   ceph::os::Transaction* t)
 {
@@ -627,18 +658,19 @@ seastar::future<bool> ReplicatedRecoveryBackend::_handle_pull_response(
   if (pi.recovery_info.version == eversion_t())
     pi.recovery_info.version = pop.version;
 
-  auto prepare_waiter = seastar::make_ready_future<>();
+  auto prepare_waiter = interruptor::make_interruptible(
+      seastar::make_ready_future<>());
   if (pi.recovery_progress.first) {
     prepare_waiter = pg.with_head_obc<RWState::RWNONE>(
       pi.recovery_info.soid, [&pi, &recovery_waiter, &pop](auto obc) {
         pi.obc = obc;
         recovery_waiter.obc = obc;
-        obc->obs.oi.decode(pop.attrset.at(OI_ATTR));
+        obc->obs.oi.decode_no_oid(pop.attrset.at(OI_ATTR), pop.soid);
         pi.recovery_info.oi = obc->obs.oi;
         return crimson::osd::PG::load_obc_ertr::now();
-      }).handle_error(crimson::ct_error::assert_all{});
+      }).handle_error_interruptible(crimson::ct_error::assert_all{});
   };
-  return prepare_waiter.then([this, &pi, &pop, t, response]() mutable {
+  return prepare_waiter.then_interruptible([this, &pi, &pop, t, response]() mutable {
     const bool first = pi.recovery_progress.first;
     pi.recovery_progress = pop.after_progress;
     logger().debug("new recovery_info {}, new progress {}",
@@ -658,8 +690,10 @@ seastar::future<bool> ReplicatedRecoveryBackend::_handle_pull_response(
     bool complete = pi.is_complete();
     bool clear_omap = !pop.before_progress.omap_complete;
     return submit_push_data(pi.recovery_info, first, complete, clear_omap,
-         std::move(data_zeros), usable_intervals, data, pop.omap_header,
-         pop.attrset, pop.omap_entries, t).then(
+                            std::move(data_zeros), std::move(usable_intervals),
+                            std::move(data), std::move(pop.omap_header),
+                            pop.attrset, std::move(pop.omap_entries), t)
+    .then_interruptible(
       [this, response, &pi, &pop, complete, t, bytes_recovered=data.length()] {
       pi.stat.num_keys_recovered += pop.omap_entries.size();
       pi.stat.num_bytes_recovered += bytes_recovered;
@@ -680,9 +714,14 @@ seastar::future<bool> ReplicatedRecoveryBackend::_handle_pull_response(
   });
 }
 
-seastar::future<> ReplicatedRecoveryBackend::handle_pull_response(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_pull_response(
   Ref<MOSDPGPush> m)
 {
+  if (pg.can_discard_replica_op(*m)) {
+    logger().debug("{}: discarding {}", __func__, *m);
+    return seastar::now();
+  }
   const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now.
   if (pop.version == eversion_t()) {
     // replica doesn't have it!
@@ -700,9 +739,10 @@ seastar::future<> ReplicatedRecoveryBackend::handle_pull_response(
       [this, &response](auto& t, auto& m) {
       pg_shard_t from = m->from;
       PushOp& pop = m->pushes[0]; // only one push per message for now
-      return _handle_pull_response(from, pop, &response, &t).then(
+      return _handle_pull_response(from, pop, &response, &t).then_interruptible(
        [this, &t](bool complete) {
        epoch_t epoch_frozen = pg.get_osdmap_epoch();
+       logger().debug("ReplicatedRecoveryBackend::handle_pull_response: do_transaction...");
        return shard_services.get_store().do_transaction(coll, std::move(t))
          .then([this, epoch_frozen, complete,
          last_complete = pg.get_info().last_complete] {
@@ -710,13 +750,13 @@ seastar::future<> ReplicatedRecoveryBackend::handle_pull_response(
          return seastar::make_ready_future<bool>(complete);
        });
       });
-    }).then([this, m, &response](bool complete) {
+    }).then_interruptible([this, m, &response](bool complete) {
       if (complete) {
        auto& pop = m->pushes[0];
        recovering.at(pop.soid).set_pulled();
        return seastar::make_ready_future<>();
       } else {
-       auto reply = make_message<MOSDPGPull>();
+       auto reply = crimson::make_message<MOSDPGPull>();
        reply->from = pg.get_pg_whoami();
        reply->set_priority(m->get_priority());
        reply->pgid = pg.get_info().pgid;
@@ -729,9 +769,10 @@ seastar::future<> ReplicatedRecoveryBackend::handle_pull_response(
   });
 }
 
-seastar::future<> ReplicatedRecoveryBackend::_handle_push(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::_handle_push(
   pg_shard_t from,
-  const PushOp &pop,
+  PushOp &pop,
   PushReplyOp *response,
   ceph::os::Transaction *t)
 {
@@ -753,8 +794,11 @@ seastar::future<> ReplicatedRecoveryBackend::_handle_push(
   response->soid = pop.recovery_info.soid;
 
   return submit_push_data(pop.recovery_info, first, complete, clear_omap,
-        std::move(data_zeros), pop.data_included, pop.data, pop.omap_header,
-        pop.attrset, pop.omap_entries, t).then([this, complete, &pop, t] {
+                          std::move(data_zeros), std::move(pop.data_included),
+                          std::move(pop.data), std::move(pop.omap_header),
+                          pop.attrset, std::move(pop.omap_entries), t)
+  .then_interruptible(
+    [this, complete, &pop, t] {
     if (complete) {
       pg.get_recovery_handler()->on_local_recover(
         pop.recovery_info.soid, pop.recovery_info,
@@ -763,29 +807,36 @@ seastar::future<> ReplicatedRecoveryBackend::_handle_push(
   });
 }
 
-seastar::future<> ReplicatedRecoveryBackend::handle_push(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_push(
   Ref<MOSDPGPush> m)
 {
+  if (pg.can_discard_replica_op(*m)) {
+    logger().debug("{}: discarding {}", __func__, *m);
+    return seastar::now();
+  }
   if (pg.is_primary()) {
     return handle_pull_response(m);
   }
 
   logger().debug("{}: {}", __func__, *m);
   return seastar::do_with(PushReplyOp(), [this, m](auto& response) {
-    const PushOp& pop = m->pushes[0]; //TODO: only one push per message for now
+    PushOp& pop = m->pushes[0]; // TODO: only one push per message for now
     return seastar::do_with(ceph::os::Transaction(),
       [this, m, &pop, &response](auto& t) {
-      return _handle_push(m->from, pop, &response, &t).then(
+      return _handle_push(m->from, pop, &response, &t).then_interruptible(
        [this, &t] {
        epoch_t epoch_frozen = pg.get_osdmap_epoch();
-       return shard_services.get_store().do_transaction(coll, std::move(t)).then(
+       logger().debug("ReplicatedRecoveryBackend::handle_push: do_transaction...");
+       return interruptor::make_interruptible(
+           shard_services.get_store().do_transaction(coll, std::move(t))).then_interruptible(
          [this, epoch_frozen, last_complete = pg.get_info().last_complete] {
          //TODO: this should be grouped with pg.on_local_recover somehow.
          pg.get_recovery_handler()->_committed_pushed_object(epoch_frozen, last_complete);
        });
       });
-    }).then([this, m, &response]() mutable {
-      auto reply = make_message<MOSDPGPushReply>();
+    }).then_interruptible([this, m, &response]() mutable {
+      auto reply = crimson::make_message<MOSDPGPushReply>();
       reply->from = pg.get_pg_whoami();
       reply->set_priority(m->get_priority());
       reply->pgid = pg.get_info().pgid;
@@ -799,7 +850,7 @@ seastar::future<> ReplicatedRecoveryBackend::handle_push(
   });
 }
 
-seastar::future<std::optional<PushOp>>
+RecoveryBackend::interruptible_future<std::optional<PushOp>>
 ReplicatedRecoveryBackend::_handle_push_reply(
   pg_shard_t peer,
   const PushReplyOp &op)
@@ -816,10 +867,10 @@ ReplicatedRecoveryBackend::_handle_push_reply(
     bool error = pi.recovery_progress.error;
     if (!pi.recovery_progress.data_complete && !error) {
       return build_push_op(pi.recovery_info, pi.recovery_progress,
-                          &pi.stat).then([&pi] (auto pop) {
+                          &pi.stat).then_interruptible([&pi] (auto pop) {
         pi.recovery_progress = pop.after_progress;
        return seastar::make_ready_future<std::optional<PushOp>>(std::move(pop));
-      }).handle_exception([recovering_iter, &pi, peer] (auto e) {
+      }).handle_exception_interruptible([recovering_iter, &pi, peer] (auto e) {
         pi.recovery_progress.error = true;
         recovering_iter->second.set_push_failed(peer, e);
         return seastar::make_ready_future<std::optional<PushOp>>();
@@ -833,17 +884,18 @@ ReplicatedRecoveryBackend::_handle_push_reply(
   }
 }
 
-seastar::future<> ReplicatedRecoveryBackend::handle_push_reply(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_push_reply(
   Ref<MOSDPGPushReply> m)
 {
   logger().debug("{}: {}", __func__, *m);
   auto from = m->from;
   auto& push_reply = m->replies[0]; //TODO: only one reply per message
 
-  return _handle_push_reply(from, push_reply).then(
+  return _handle_push_reply(from, push_reply).then_interruptible(
     [this, from](std::optional<PushOp> push_op) {
     if (push_op) {
-      auto msg = make_message<MOSDPGPush>();
+      auto msg = crimson::make_message<MOSDPGPush>();
       msg->from = pg.get_pg_whoami();
       msg->pgid = pg.get_pgid();
       msg->map_epoch = pg.get_osdmap_epoch();
@@ -891,96 +943,107 @@ ReplicatedRecoveryBackend::trim_pushed_data(
   return {intervals_usable, data_usable};
 }
 
-seastar::future<> ReplicatedRecoveryBackend::submit_push_data(
-  const ObjectRecoveryInfo &recovery_info,
+RecoveryBackend::interruptible_future<hobject_t>
+ReplicatedRecoveryBackend::prep_push_target(
+  const ObjectRecoveryInfo& recovery_info,
   bool first,
   bool complete,
   bool clear_omap,
-  interval_set<uint64_t> data_zeros,
-  const interval_set<uint64_t> &intervals_included,
-  bufferlist data_included,
-  bufferlist omap_header,
-  const map<string, bufferlist> &attrs,
-  const map<string, bufferlist> &omap_entries,
-  ObjectStore::Transaction *t)
+  ObjectStore::Transaction* t,
+  const map<string, bufferlist, less<>>& attrs,
+  bufferlist&& omap_header)
 {
-  logger().debug("{}", __func__);
-  hobject_t target_oid;
-  if (first && complete) {
-    target_oid = recovery_info.soid;
+  if (!first) {
+    return seastar::make_ready_future<hobject_t>(
+      get_temp_recovery_object(recovery_info.soid,
+                               recovery_info.version));
+  }
+
+  ghobject_t target_oid;
+  if (complete) {
+    // overwrite the original object
+    target_oid = ghobject_t(recovery_info.soid);
   } else {
-    target_oid = get_temp_recovery_object(recovery_info.soid,
-                                         recovery_info.version);
-    if (first) {
-      logger().debug("{}: Adding oid {} in the temp collection",
-         __func__, target_oid);
-      add_temp_obj(target_oid);
+    target_oid = ghobject_t(get_temp_recovery_object(recovery_info.soid,
+                                                     recovery_info.version));
+    logger().debug("{}: Adding oid {} in the temp collection",
+                   __func__, target_oid);
+    add_temp_obj(target_oid.hobj);
+  }
+  // create a new object
+  if (!complete || !recovery_info.object_exist) {
+    t->remove(coll->get_cid(), target_oid);
+    t->touch(coll->get_cid(), target_oid);
+    object_info_t oi;
+    oi.decode_no_oid(attrs.at(OI_ATTR));
+    t->set_alloc_hint(coll->get_cid(), target_oid,
+                      oi.expected_object_size,
+                      oi.expected_write_size,
+                      oi.alloc_hint_flags);
+  }
+  if (complete) {
+    // remove xattr and update later if overwrite on original object
+    t->rmattrs(coll->get_cid(), target_oid);
+    // if need update omap, clear the previous content first
+    if (clear_omap) {
+      t->omap_clear(coll->get_cid(), target_oid);
     }
   }
-
-  return [this, &recovery_info, first, complete, t,
-    &omap_header, &attrs, target_oid, clear_omap] {
-    if (first) {
-      if (!complete) {
-       t->remove(coll->get_cid(), ghobject_t(target_oid));
-       t->touch(coll->get_cid(), ghobject_t(target_oid));
-       bufferlist bv = attrs.at(OI_ATTR);
-       object_info_t oi(bv);
-       t->set_alloc_hint(coll->get_cid(), ghobject_t(target_oid),
-                         oi.expected_object_size,
-                         oi.expected_write_size,
-                         oi.alloc_hint_flags);
-      } else {
-        if (!recovery_info.object_exist) {
-         t->remove(coll->get_cid(), ghobject_t(target_oid));
-          t->touch(coll->get_cid(), ghobject_t(target_oid));
-          bufferlist bv = attrs.at(OI_ATTR);
-          object_info_t oi(bv);
-          t->set_alloc_hint(coll->get_cid(), ghobject_t(target_oid),
-                            oi.expected_object_size,
-                            oi.expected_write_size,
-                            oi.alloc_hint_flags);
-        }
-        //remove xattr and update later if overwrite on original object
-        t->rmattrs(coll->get_cid(), ghobject_t(target_oid));
-        //if need update omap, clear the previous content first
-        if (clear_omap)
-          t->omap_clear(coll->get_cid(), ghobject_t(target_oid));
-      }
-
-      t->truncate(coll->get_cid(), ghobject_t(target_oid), recovery_info.size);
-      if (omap_header.length())
-       t->omap_setheader(coll->get_cid(), ghobject_t(target_oid), omap_header);
-
-      return store->stat(coll, ghobject_t(recovery_info.soid)).then(
-       [this, &recovery_info, complete, t, target_oid,
-       omap_header = std::move(omap_header)] (auto st) {
-       //TODO: pg num bytes counting
-       if (!complete) {
-         //clone overlap content in local object
-         if (recovery_info.object_exist) {
-           uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
-           interval_set<uint64_t> local_intervals_included, local_intervals_excluded;
-           if (local_size) {
-             local_intervals_included.insert(0, local_size);
-             local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset);
-             local_intervals_included.subtract(local_intervals_excluded);
-           }
-           for (auto [off, len] : local_intervals_included) {
-             logger().debug(" clone_range {} {}~{}",
-                 recovery_info.soid, off, len);
-             t->clone_range(coll->get_cid(), ghobject_t(recovery_info.soid),
-                            ghobject_t(target_oid), off, len, off);
-           }
-         }
-       }
-       return seastar::make_ready_future<>();
-      });
+  t->truncate(coll->get_cid(), target_oid, recovery_info.size);
+  if (omap_header.length()) {
+    t->omap_setheader(coll->get_cid(), target_oid, omap_header);
+  }
+  if (complete || !recovery_info.object_exist) {
+    return seastar::make_ready_future<hobject_t>(target_oid.hobj);
+  }
+  // clone overlap content in local object if using a new object
+  return interruptor::make_interruptible(store->stat(coll, ghobject_t(recovery_info.soid)))
+  .then_interruptible(
+    [this, &recovery_info, t, target_oid] (auto st) {
+    // TODO: pg num bytes counting
+    uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
+    interval_set<uint64_t> local_intervals_included, local_intervals_excluded;
+    if (local_size) {
+      local_intervals_included.insert(0, local_size);
+      local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset);
+      local_intervals_included.subtract(local_intervals_excluded);
     }
-    return seastar::make_ready_future<>();
-  }().then([this, data_zeros=std::move(data_zeros),
-           &recovery_info, &intervals_included, t, target_oid,
-           &omap_entries, &attrs, data_included, complete, first]() mutable {
+    for (auto [off, len] : local_intervals_included) {
+      logger().debug(" clone_range {} {}~{}",
+                     recovery_info.soid, off, len);
+      t->clone_range(coll->get_cid(), ghobject_t(recovery_info.soid),
+                     target_oid, off, len, off);
+    }
+    return seastar::make_ready_future<hobject_t>(target_oid.hobj);
+  });
+}
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::submit_push_data(
+  const ObjectRecoveryInfo &recovery_info,
+  bool first,
+  bool complete,
+  bool clear_omap,
+  interval_set<uint64_t>&& data_zeros,
+  interval_set<uint64_t>&& intervals_included,
+  bufferlist&& data_included,
+  bufferlist&& omap_header,
+  const map<string, bufferlist, less<>> &attrs,
+  map<string, bufferlist>&& omap_entries,
+  ObjectStore::Transaction *t)
+{
+  logger().debug("{}", __func__);
+  return prep_push_target(recovery_info, first, complete,
+                          clear_omap, t, attrs,
+                          std::move(omap_header)).then_interruptible(
+    [this,
+     &recovery_info, t,
+     first, complete,
+     data_zeros=std::move(data_zeros),
+     intervals_included=std::move(intervals_included),
+     data_included=std::move(data_included),
+     omap_entries=std::move(omap_entries),
+     &attrs](auto target_oid) mutable {
+
     uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
     // Punch zeros for data, if fiemap indicates nothing but it is marked dirty
     if (!data_zeros.empty()) {
@@ -1040,7 +1103,8 @@ void ReplicatedRecoveryBackend::submit_push_complete(
   }
 }
 
-seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete_reply(
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_recovery_delete_reply(
   Ref<MOSDPGRecoveryDeleteReply> m)
 {
   auto& p = m->objects.front();
@@ -1052,7 +1116,8 @@ seastar::future<> ReplicatedRecoveryBackend::handle_recovery_delete_reply(
   return seastar::now();
 }
 
-seastar::future<> ReplicatedRecoveryBackend::handle_recovery_op(Ref<MOSDFastDispatchOp> m)
+RecoveryBackend::interruptible_future<>
+ReplicatedRecoveryBackend::handle_recovery_op(Ref<MOSDFastDispatchOp> m)
 {
   switch (m->get_header().type) {
   case MSG_OSD_PG_PULL: