]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/ReplicatedBackend.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / osd / ReplicatedBackend.cc
index f8d67af325ba4243c20905ae2016355b9dcaa4bd..9a243f795a7a9ac5b5a7434bbe92c6b4f74149a0 100644 (file)
@@ -255,6 +255,20 @@ int ReplicatedBackend::objects_read_sync(
   return store->read(ch, ghobject_t(hoid), off, len, *bl, op_flags);
 }
 
+int ReplicatedBackend::objects_readv_sync(
+  const hobject_t &hoid,
+  map<uint64_t, uint64_t>&& m,
+  uint32_t op_flags,
+  bufferlist *bl)
+{
+  interval_set<uint64_t> im(std::move(m));
+  auto r = store->readv(ch, ghobject_t(hoid), im, *bl, op_flags);
+  if (r >= 0) {
+    m = std::move(im).detach();
+  }
+  return r;
+}
+
 void ReplicatedBackend::objects_read_async(
   const hobject_t &hoid,
   const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
@@ -267,10 +281,10 @@ void ReplicatedBackend::objects_read_async(
 
 class C_OSD_OnOpCommit : public Context {
   ReplicatedBackend *pg;
-  ReplicatedBackend::InProgressOpRef op;
+  ceph::ref_t<ReplicatedBackend::InProgressOp> op;
 public:
-  C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op) 
-    : pg(pg), op(op) {}
+  C_OSD_OnOpCommit(ReplicatedBackend *pg, ceph::ref_t<ReplicatedBackend::InProgressOp> op)
+    : pg(pg), op(std::move(op)) {}
   void finish(int) override {
     pg->op_commit(op);
   }
@@ -282,7 +296,8 @@ void generate_transaction(
   vector<pg_log_entry_t> &log_entries,
   ObjectStore::Transaction *t,
   set<hobject_t> *added,
-  set<hobject_t> *removed)
+  set<hobject_t> *removed,
+  const ceph_release_t require_osd_release = ceph_release_t::unknown )
 {
   ceph_assert(t);
   ceph_assert(added);
@@ -323,7 +338,11 @@ void generate_transaction(
        [&](const PGTransaction::ObjectOperation::Init::None &) {
        },
        [&](const PGTransaction::ObjectOperation::Init::Create &op) {
-         t->touch(coll, goid);
+         if (require_osd_release >= ceph_release_t::octopus) {
+           t->create(coll, goid);
+         } else {
+           t->touch(coll, goid);
+         }
        },
        [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
          t->clone(
@@ -373,6 +392,9 @@ void generate_transaction(
        case UpdateType::Insert:
          t->omap_setkeys(coll, goid, up.second);
          break;
+       case UpdateType::RemoveRange:
+         t->omap_rmkeyrange(coll, goid, up.second);
+         break;
        }
       }
 
@@ -398,7 +420,8 @@ void generate_transaction(
              goid,
              extent.get_off(),
              extent.get_len(),
-             op.buffer);
+             op.buffer,
+             op.fadvise_flags);
          },
          [&](const BufferUpdate::Zero &op) {
            t->zero(
@@ -427,9 +450,9 @@ void ReplicatedBackend::submit_transaction(
   const eversion_t &at_version,
   PGTransactionUPtr &&_t,
   const eversion_t &trim_to,
-  const eversion_t &roll_forward_to,
+  const eversion_t &min_last_complete_ondisk,
   const vector<pg_log_entry_t> &_log_entries,
-  boost::optional<pg_hit_set_history_t> &hset_history,
+  std::optional<pg_hit_set_history_t> &hset_history,
   Context *on_all_commit,
   ceph_tid_t tid,
   osd_reqid_t reqid,
@@ -449,14 +472,15 @@ void ReplicatedBackend::submit_transaction(
     log_entries,
     &op_t,
     &added,
-    &removed);
+    &removed,
+    get_osdmap()->require_osd_release);
   ceph_assert(added.size() <= 1);
   ceph_assert(removed.size() <= 1);
 
   auto insert_res = in_progress_ops.insert(
     make_pair(
       tid,
-      new InProgressOp(
+      ceph::make_ref<InProgressOp>(
        tid, on_all_commit,
        orig_op, at_version)
       )
@@ -474,7 +498,7 @@ void ReplicatedBackend::submit_transaction(
     tid,
     reqid,
     trim_to,
-    at_version,
+    min_last_complete_ondisk,
     added.size() ? *(added.begin()) : hobject_t(),
     removed.size() ? *(removed.begin()) : hobject_t(),
     log_entries,
@@ -490,6 +514,7 @@ void ReplicatedBackend::submit_transaction(
     hset_history,
     trim_to,
     at_version,
+    min_last_complete_ondisk,
     true,
     op_t);
   
@@ -506,8 +531,7 @@ void ReplicatedBackend::submit_transaction(
   }
 }
 
-void ReplicatedBackend::op_commit(
-  InProgressOpRef& op)
+void ReplicatedBackend::op_commit(const ceph::ref_t<InProgressOp>& op)
 {
   if (op->on_commit == nullptr) {
     // aborted
@@ -534,7 +558,7 @@ void ReplicatedBackend::op_commit(
 void ReplicatedBackend::do_repop_reply(OpRequestRef op)
 {
   static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
-  const MOSDRepOpReply *r = static_cast<const MOSDRepOpReply *>(op->get_req());
+  auto r = op->get_req<MOSDRepOpReply>();
   ceph_assert(r->get_header().type == MSG_OSD_REPOPREPLY);
 
   op->mark_started();
@@ -546,9 +570,9 @@ void ReplicatedBackend::do_repop_reply(OpRequestRef op)
   auto iter = in_progress_ops.find(rep_tid);
   if (iter != in_progress_ops.end()) {
     InProgressOp &ip_op = *iter->second;
-    const MOSDOp *m = NULL;
+    const MOSDOp *m = nullptr;
     if (ip_op.op)
-      m = static_cast<const MOSDOp *>(ip_op.op->get_req());
+      m = ip_op.op->get_req<MOSDOp>();
 
     if (m)
       dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
@@ -735,7 +759,7 @@ int ReplicatedBackend::be_deep_scrub(
 
 void ReplicatedBackend::_do_push(OpRequestRef op)
 {
-  const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
+  auto m = op->get_req<MOSDPGPush>();
   ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
   pg_shard_t from = m->from;
 
@@ -787,8 +811,9 @@ struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
       int started = bc->start_pushes(i.hoid, obc, h);
       if (started < 0) {
        bc->pushing[i.hoid].clear();
-       bc->get_parent()->primary_failed(i.hoid);
-       bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version);
+       bc->get_parent()->on_failed_pull(
+         { bc->get_parent()->whoami_shard() },
+         i.hoid, obc->obs.oi.version);
       } else if (!started) {
        bc->get_parent()->on_global_recover(
          i.hoid, i.stat, false);
@@ -801,7 +826,7 @@ struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
 
 void ReplicatedBackend::_do_pull_response(OpRequestRef op)
 {
-  const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
+  auto m = op->get_req<MOSDPGPush>();
   ceph_assert(m->get_type() == MSG_OSD_PG_PUSH);
   pg_shard_t from = m->from;
 
@@ -871,7 +896,7 @@ void ReplicatedBackend::do_pull(OpRequestRef op)
 
 void ReplicatedBackend::do_push_reply(OpRequestRef op)
 {
-  const MOSDPGPushReply *m = static_cast<const MOSDPGPushReply *>(op->get_req());
+  auto m = op->get_req<MOSDPGPushReply>();
   ceph_assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
   pg_shard_t from = m->from;
 
@@ -896,11 +921,11 @@ Message * ReplicatedBackend::generate_subop(
   ceph_tid_t tid,
   osd_reqid_t reqid,
   eversion_t pg_trim_to,
-  eversion_t pg_roll_forward_to,
+  eversion_t min_last_complete_ondisk,
   hobject_t new_temp_oid,
   hobject_t discard_temp_oid,
   const bufferlist &log_entries,
-  boost::optional<pg_hit_set_history_t> &hset_hist,
+  std::optional<pg_hit_set_history_t> &hset_hist,
   ObjectStore::Transaction &op_t,
   pg_shard_t peer,
   const pg_info_t &pinfo)
@@ -932,7 +957,14 @@ Message * ReplicatedBackend::generate_subop(
     wr->pg_stats = get_info().stats;
 
   wr->pg_trim_to = pg_trim_to;
-  wr->pg_roll_forward_to = pg_roll_forward_to;
+
+  if (HAVE_FEATURE(parent->min_peer_features(), OSD_REPOP_MLCOD)) {
+    wr->min_last_complete_ondisk = min_last_complete_ondisk;
+  } else {
+    /* Some replicas need this field to be at_version.  New replicas
+     * will ignore it */
+    wr->set_rollback_to(at_version);
+  }
 
   wr->new_temp_oid = new_temp_oid;
   wr->discard_temp_oid = discard_temp_oid;
@@ -946,11 +978,11 @@ void ReplicatedBackend::issue_op(
   ceph_tid_t tid,
   osd_reqid_t reqid,
   eversion_t pg_trim_to,
-  eversion_t pg_roll_forward_to,
+  eversion_t min_last_complete_ondisk,
   hobject_t new_temp_oid,
   hobject_t discard_temp_oid,
   const vector<pg_log_entry_t> &log_entries,
-  boost::optional<pg_hit_set_history_t> &hset_hist,
+  std::optional<pg_hit_set_history_t> &hset_hist,
   InProgressOp *op,
   ObjectStore::Transaction &op_t)
 {
@@ -979,7 +1011,7 @@ void ReplicatedBackend::issue_op(
          tid,
          reqid,
          pg_trim_to,
-         pg_roll_forward_to,
+         min_last_complete_ondisk,
          new_temp_oid,
          discard_temp_oid,
          logs,
@@ -999,7 +1031,7 @@ void ReplicatedBackend::issue_op(
 void ReplicatedBackend::do_repop(OpRequestRef op)
 {
   static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
-  const MOSDRepOp *m = static_cast<const MOSDRepOp *>(op->get_req());
+  auto m = op->get_req<MOSDRepOp>();
   int msg_type = m->get_type();
   ceph_assert(MSG_OSD_REPOP == msg_type);
 
@@ -1079,7 +1111,8 @@ void ReplicatedBackend::do_repop(OpRequestRef op)
     log,
     m->updated_hit_set_history,
     m->pg_trim_to,
-    m->pg_roll_forward_to,
+    m->version, /* Replicated PGs don't have rollback info */
+    m->min_last_complete_ondisk,
     update_snaps,
     rm->localt,
     async);
@@ -1103,7 +1136,7 @@ void ReplicatedBackend::repop_commit(RepModifyRef rm)
   rm->committed = true;
 
   // send commit.
-  const MOSDRepOp *m = static_cast<const MOSDRepOp*>(rm->op->get_req());
+  auto m = rm->op->get_req<MOSDRepOp>();
   ceph_assert(m->get_type() == MSG_OSD_REPOP);
   dout(10) << __func__ << " on op " << *m
           << ", sending commit to osd." << rm->ackerosd
@@ -1143,6 +1176,14 @@ void ReplicatedBackend::calc_head_subsets(
   if (size)
     data_subset.insert(0, size);
 
+  if (HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS)) {
+    const auto it = missing.get_items().find(head);
+    assert(it != missing.get_items().end());
+    data_subset.intersection_of(it->second.clean_regions.get_dirty_regions());
+    dout(10) << "calc_head_subsets " << head
+             << " data_subset " << data_subset << dendl;
+  }
+
   if (get_parent()->get_pool().allow_incomplete_clones()) {
     dout(10) << __func__ << ": caching (was) enabled, skipping clone subsets" << dendl;
     return;
@@ -1156,11 +1197,11 @@ void ReplicatedBackend::calc_head_subsets(
 
   interval_set<uint64_t> cloning;
   interval_set<uint64_t> prev;
+  hobject_t c = head;
   if (size)
     prev.insert(0, size);
 
   for (int j=snapset.clones.size()-1; j>=0; j--) {
-    hobject_t c = head;
     c.snap = snapset.clones[j];
     prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
     if (!missing.is_missing(c) &&
@@ -1168,23 +1209,29 @@ void ReplicatedBackend::calc_head_subsets(
        get_parent()->try_lock_for_read(c, manager)) {
       dout(10) << "calc_head_subsets " << head << " has prev " << c
               << " overlap " << prev << dendl;
-      clone_subsets[c] = prev;
-      cloning.union_of(prev);
+      cloning = prev;
       break;
     }
     dout(10) << "calc_head_subsets " << head << " does not have prev " << c
             << " overlap " << prev << dendl;
   }
 
+  cloning.intersection_of(data_subset);
+  if (cloning.empty()) {
+    dout(10) << "skipping clone, nothing needs to clone" << dendl;
+    return;
+  }
 
-  if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
+  if (cloning.num_intervals() > g_conf().get_val<uint64_t>("osd_recover_clone_overlap_limit")) {
     dout(10) << "skipping clone, too many holes" << dendl;
     get_parent()->release_locks(manager);
     clone_subsets.clear();
     cloning.clear();
+    return;
   }
 
   // what's left for us to push?
+  clone_subsets[c] = cloning;
   data_subset.subtract(cloning);
 
   dout(10) << "calc_head_subsets " << head
@@ -1265,7 +1312,7 @@ void ReplicatedBackend::calc_clone_subsets(
             << " overlap " << next << dendl;
   }
 
-  if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
+  if (cloning.num_intervals() > g_conf().get_val<uint64_t>("osd_recover_clone_overlap_limit")) {
     dout(10) << "skipping clone, too many holes" << dendl;
     get_parent()->release_locks(manager);
     clone_subsets.clear();
@@ -1287,9 +1334,9 @@ void ReplicatedBackend::prepare_pull(
   ObjectContextRef headctx,
   RPGHandle *h)
 {
-  ceph_assert(get_parent()->get_local_missing().get_items().count(soid));
-  eversion_t _v = get_parent()->get_local_missing().get_items().find(
-    soid)->second.need;
+  const auto missing_iter = get_parent()->get_local_missing().get_items().find(soid);
+  ceph_assert(missing_iter != get_parent()->get_local_missing().get_items().end());
+  eversion_t _v = missing_iter->second.need;
   ceph_assert(_v == v);
   const map<hobject_t, set<pg_shard_t>> &missing_loc(
     get_parent()->get_missing_loc_shards());
@@ -1363,11 +1410,15 @@ void ReplicatedBackend::prepare_pull(
 
     ceph_assert(ssc->snapset.clone_size.count(soid.snap));
     recovery_info.size = ssc->snapset.clone_size[soid.snap];
+    recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
   } else {
     // pulling head or unversioned object.
     // always pull the whole thing.
     recovery_info.copy_subset.insert(0, (uint64_t)-1);
+    if (HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS))
+      recovery_info.copy_subset.intersection_of(missing_iter->second.clean_regions.get_dirty_regions());
     recovery_info.size = ((uint64_t)-1);
+    recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
   }
 
   h->pulls[fromshard].push_back(PullOp());
@@ -1378,7 +1429,8 @@ void ReplicatedBackend::prepare_pull(
   op.recovery_info.soid = soid;
   op.recovery_info.version = v;
   op.recovery_progress.data_complete = false;
-  op.recovery_progress.omap_complete = false;
+  op.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty() 
+                                && HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS);
   op.recovery_progress.data_recovered_to = 0;
   op.recovery_progress.first = true;
 
@@ -1491,6 +1543,9 @@ int ReplicatedBackend::prep_push(
   ObcLockManager &&lock_manager)
 {
   get_parent()->begin_peer_recover(peer, soid);
+  const auto pmissing_iter = get_parent()->get_shard_missing().find(peer);
+  const auto missing_iter = pmissing_iter->second.get_items().find(soid);
+  assert(missing_iter != pmissing_iter->second.get_items().end());
   // take note.
   PushInfo &pi = pushing[soid][peer];
   pi.obc = obc;
@@ -1501,6 +1556,9 @@ int ReplicatedBackend::prep_push(
   pi.recovery_info.oi = obc->obs.oi;
   pi.recovery_info.ss = pop->recovery_info.ss;
   pi.recovery_info.version = version;
+  pi.recovery_info.object_exist = missing_iter->second.clean_regions.object_is_exist();
+  pi.recovery_progress.omap_complete = !missing_iter->second.clean_regions.omap_is_dirty() &&
+    HAVE_FEATURE(parent->min_peer_features(), SERVER_OCTOPUS);
   pi.lock_manager = std::move(lock_manager);
 
   ObjectRecoveryProgress new_progress;
@@ -1519,7 +1577,9 @@ void ReplicatedBackend::submit_push_data(
   const ObjectRecoveryInfo &recovery_info,
   bool first,
   bool complete,
+  bool clear_omap,
   bool cache_dont_need,
+  interval_set<uint64_t> &data_zeros,
   const interval_set<uint64_t> &intervals_included,
   bufferlist data_included,
   bufferlist omap_header,
@@ -1541,25 +1601,43 @@ void ReplicatedBackend::submit_push_data(
   }
 
   if (first) {
-    t->remove(coll, ghobject_t(target_oid));
-    t->touch(coll, ghobject_t(target_oid));
+    if (!complete) {
+      t->remove(coll, ghobject_t(target_oid));
+      t->touch(coll, ghobject_t(target_oid));
+      bufferlist bv = attrs.at(OI_ATTR);
+      object_info_t oi(bv);
+      t->set_alloc_hint(coll, ghobject_t(target_oid),
+                       oi.expected_object_size,
+                       oi.expected_write_size,
+                       oi.alloc_hint_flags);
+      } else {
+        if (!recovery_info.object_exist) {
+         t->remove(coll, ghobject_t(target_oid));
+          t->touch(coll, ghobject_t(target_oid));
+          bufferlist bv = attrs.at(OI_ATTR);
+          object_info_t oi(bv);
+          t->set_alloc_hint(coll, ghobject_t(target_oid),
+                            oi.expected_object_size,
+                            oi.expected_write_size,
+                            oi.alloc_hint_flags);
+        }
+        //remove xattr and update later if overwrite on original object
+        t->rmattrs(coll, ghobject_t(target_oid));
+        //if need update omap, clear the previous content first
+        if (clear_omap)
+          t->omap_clear(coll, ghobject_t(target_oid));
+      }
+
     t->truncate(coll, ghobject_t(target_oid), recovery_info.size);
-    if (omap_header.length()) 
+    if (omap_header.length())
       t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
 
-    bufferlist bv = attrs.at(OI_ATTR);
-    object_info_t oi(bv);
-    t->set_alloc_hint(coll, ghobject_t(target_oid),
-                     oi.expected_object_size,
-                     oi.expected_write_size,
-                     oi.alloc_hint_flags);
+    struct stat st;
+    int r = store->stat(ch, ghobject_t(recovery_info.soid), &st);
     if (get_parent()->pg_is_remote_backfilling()) {
-      struct stat st;
       uint64_t size = 0;
-      int r = store->stat(ch, ghobject_t(recovery_info.soid), &st);
-      if (r == 0) {
+      if (r == 0)
         size = st.st_size;
-      }
       // Don't need to do anything if object is still the same size
       if (size != recovery_info.oi.size) {
         get_parent()->pg_add_local_num_bytes((int64_t)recovery_info.oi.size - (int64_t)size);
@@ -1571,11 +1649,46 @@ void ReplicatedBackend::submit_push_data(
                << dendl;
       }
     }
+    if (!complete) {
+      //clone overlap content in local object
+      if (recovery_info.object_exist) {
+        assert(r == 0);
+        uint64_t local_size = std::min(recovery_info.size, (uint64_t)st.st_size);
+        interval_set<uint64_t> local_intervals_included, local_intervals_excluded;
+        if (local_size) {
+          local_intervals_included.insert(0, local_size);
+          local_intervals_excluded.intersection_of(local_intervals_included, recovery_info.copy_subset);
+          local_intervals_included.subtract(local_intervals_excluded);
+        }
+       for (interval_set<uint64_t>::const_iterator q = local_intervals_included.begin();
+          q != local_intervals_included.end();
+         ++q) {
+         dout(15) << " clone_range " << recovery_info.soid << " "
+                  << q.get_start() << "~" << q.get_len() << dendl;
+         t->clone_range(coll, ghobject_t(recovery_info.soid), ghobject_t(target_oid),
+             q.get_start(), q.get_len(), q.get_start());
+        }
+      }
+    }
   }
   uint64_t off = 0;
   uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL;
   if (cache_dont_need)
     fadvise_flags |= CEPH_OSD_OP_FLAG_FADVISE_DONTNEED;
+  // Punch zeros for data, if fiemap indicates nothing but it is marked dirty
+  if (data_zeros.size() > 0) {
+    data_zeros.intersection_of(recovery_info.copy_subset);
+    assert(intervals_included.subset_of(data_zeros));
+    data_zeros.subtract(intervals_included);
+
+    dout(20) << __func__ <<" recovering object " << recovery_info.soid
+             << " copy_subset: " << recovery_info.copy_subset
+             << " intervals_included: " << intervals_included
+             << " data_zeros: " << data_zeros << dendl;
+
+    for (auto p = data_zeros.begin(); p != data_zeros.end(); ++p)
+      t->zero(coll, ghobject_t(target_oid), p.get_start(), p.get_len());
+  }
   for (interval_set<uint64_t>::const_iterator p = intervals_included.begin();
        p != intervals_included.end();
        ++p) {
@@ -1594,14 +1707,15 @@ void ReplicatedBackend::submit_push_data(
   if (complete) {
     if (!first) {
       dout(10) << __func__ << ": Removing oid "
-              << target_oid << " from the temp collection" << dendl;
+               << target_oid << " from the temp collection" << dendl;
       clear_temp_obj(target_oid);
       t->remove(coll, ghobject_t(recovery_info.soid));
       t->collection_move_rename(coll, ghobject_t(target_oid),
-                               coll, ghobject_t(recovery_info.soid));
+                                coll, ghobject_t(recovery_info.soid));
     }
 
     submit_push_complete(recovery_info, t);
+
   }
 }
 
@@ -1666,7 +1780,7 @@ bool ReplicatedBackend::handle_pull_response(
 
   const hobject_t &hoid = pop.soid;
   ceph_assert((data_included.empty() && data.length() == 0) ||
-        (!data_included.empty() && data.length() > 0));
+         (!data_included.empty() && data.length() > 0));
 
   auto piter = pulling.find(hoid);
   if (piter == pulling.end()) {
@@ -1697,6 +1811,11 @@ bool ReplicatedBackend::handle_pull_response(
       a.second.rebuild();
     }
     pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
+    if (attrset.find(SS_ATTR) != attrset.end()) {
+      bufferlist ssbv = attrset.at(SS_ATTR);
+      SnapSet ss(ssbv);
+      assert(!pi.obc->ssc->exists || ss.seq  == pi.obc->ssc->snapset.seq);
+    }
     pi.recovery_info.oi = pi.obc->obs.oi;
     pi.recovery_info = recalc_subsets(
       pi.recovery_info,
@@ -1719,18 +1838,28 @@ bool ReplicatedBackend::handle_pull_response(
   pi.recovery_progress = pop.after_progress;
 
   dout(10) << "new recovery_info " << pi.recovery_info
-          << ", new progress " << pi.recovery_progress
-          << dendl;
-
+           << ", new progress " << pi.recovery_progress
+           << dendl;
+  interval_set<uint64_t> data_zeros;
+  uint64_t z_offset = pop.before_progress.data_recovered_to;
+  uint64_t z_length = pop.after_progress.data_recovered_to - pop.before_progress.data_recovered_to;
+  if(z_length)
+    data_zeros.insert(z_offset, z_length);
   bool complete = pi.is_complete();
-
-  submit_push_data(pi.recovery_info, first,
-                  complete, pi.cache_dont_need,
-                  data_included, data,
-                  pop.omap_header,
-                  pop.attrset,
-                  pop.omap_entries,
-                  t);
+  bool clear_omap = !pop.before_progress.omap_complete;
+
+  submit_push_data(pi.recovery_info,
+                  first,
+                  complete,
+                  clear_omap,
+                  pi.cache_dont_need,
+                  data_zeros,
+                  data_included,
+                  data,
+                  pop.omap_header,
+                  pop.attrset,
+                  pop.omap_entries,
+                  t);
 
   pi.stat.num_keys_recovered += pop.omap_entries.size();
   pi.stat.num_bytes_recovered += data.length();
@@ -1769,12 +1898,20 @@ void ReplicatedBackend::handle_push(
   bool first = pop.before_progress.first;
   bool complete = pop.after_progress.data_complete &&
     pop.after_progress.omap_complete;
-
+  bool clear_omap = !pop.before_progress.omap_complete;
+  interval_set<uint64_t> data_zeros;
+  uint64_t z_offset = pop.before_progress.data_recovered_to;
+  uint64_t z_length = pop.after_progress.data_recovered_to - pop.before_progress.data_recovered_to;
+  if(z_length)
+    data_zeros.insert(z_offset, z_length);
   response->soid = pop.recovery_info.soid;
+
   submit_push_data(pop.recovery_info,
                   first,
                   complete,
+                  clear_omap,
                   true, // must be replicate
+                  data_zeros,
                   pop.data_included,
                   data,
                   pop.omap_header,
@@ -1953,7 +2090,7 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
       int r = store->fiemap(ch, ghobject_t(recovery_info.soid), 0,
                             copy_subset.range_end(), m);
       if (r >= 0)  {
-        interval_set<uint64_t> fiemap_included(m);
+        interval_set<uint64_t> fiemap_included(std::move(m));
         copy_subset.intersection_of(fiemap_included);
       } else {
         // intersection of copy_subset and empty interval_set would be empty anyway
@@ -1971,41 +2108,26 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
     out_op->data_included.clear();
   }
 
-  for (interval_set<uint64_t>::iterator p = out_op->data_included.begin();
-       p != out_op->data_included.end();
-       ++p) {
-    bufferlist bit;
-    int r = store->read(ch, ghobject_t(recovery_info.soid),
-               p.get_start(), p.get_len(), bit,
-                cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
-    if (cct->_conf->osd_debug_random_push_read_error &&
+  auto origin_size = out_op->data_included.size();
+  bufferlist bit;
+  int r = store->readv(ch, ghobject_t(recovery_info.soid),
+                      out_op->data_included, bit,
+                       cache_dont_need ? CEPH_OSD_OP_FLAG_FADVISE_DONTNEED: 0);
+  if (cct->_conf->osd_debug_random_push_read_error &&
         (rand() % (int)(cct->_conf->osd_debug_random_push_read_error * 100.0)) == 0) {
-      dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
-      r = -EIO;
-    }
-    if (r < 0) {
-      return r;
-    }
-    if (p.get_len() != bit.length()) {
-      dout(10) << " extent " << p.get_start() << "~" << p.get_len()
-              << " is actually " << p.get_start() << "~" << bit.length()
-              << dendl;
-      interval_set<uint64_t>::iterator save = p++;
-      if (bit.length() == 0)
-        out_op->data_included.erase(save);     //Remove this empty interval
-      else
-        save.set_len(bit.length());
-      // Remove any other intervals present
-      while (p != out_op->data_included.end()) {
-        interval_set<uint64_t>::iterator save = p++;
-        out_op->data_included.erase(save);
-      }
-      new_progress.data_complete = true;
-      out_op->data.claim_append(bit);
-      break;
-    }
-    out_op->data.claim_append(bit);
+    dout(0) << __func__ << ": inject EIO " << recovery_info.soid << dendl;
+    r = -EIO;
   }
+  if (r < 0) {
+    return r;
+  }
+  if (out_op->data_included.size() != origin_size) {
+    dout(10) << __func__ << " some extents get pruned "
+             << out_op->data_included.size() << "/" << origin_size
+             << dendl;
+    new_progress.data_complete = true;
+  }
+  out_op->data.claim_append(bit);
   if (progress.first && !out_op->data_included.empty() &&
       out_op->data_included.begin().get_start() == 0 &&
       out_op->data.length() == oi.size && oi.is_data_digest()) {
@@ -2026,6 +2148,9 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
       if (get_parent()->pg_is_repair())
         stat->num_objects_repaired++;
     }
+  } else if (progress.first && progress.omap_complete) {
+    // If omap is not changed, we need recovery omap when recovery cannot be completed once
+    new_progress.omap_complete = false;
   }
 
   if (stat) {
@@ -2105,7 +2230,10 @@ done:
        if (!error)
          get_parent()->on_global_recover(soid, stat, false);
        else
-         get_parent()->on_primary_error(soid, v);
+         get_parent()->on_failed_pull(
+           std::set<pg_shard_t>{ get_parent()->whoami_shard() },
+           soid,
+           v);
        pushing.erase(soid);
       } else {
        // This looks weird, but we erased the current peer and need to remember
@@ -2136,10 +2264,14 @@ void ReplicatedBackend::handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply)
     if (progress.first && recovery_info.size == ((uint64_t)-1)) {
       // Adjust size and copy_subset
       recovery_info.size = st.st_size;
-      recovery_info.copy_subset.clear();
-      if (st.st_size)
-        recovery_info.copy_subset.insert(0, st.st_size);
-      ceph_assert(recovery_info.clone_subset.empty());
+      if (st.st_size) {
+        interval_set<uint64_t> object_range;
+        object_range.insert(0, st.st_size);
+        recovery_info.copy_subset.intersection_of(object_range);
+      } else {
+        recovery_info.copy_subset.clear();
+      }
+      assert(recovery_info.clone_subset.empty());
     }
 
     r = build_push_op(recovery_info, progress, 0, reply);
@@ -2195,10 +2327,12 @@ void ReplicatedBackend::trim_pushed_data(
 void ReplicatedBackend::_failed_pull(pg_shard_t from, const hobject_t &soid)
 {
   dout(20) << __func__ << ": " << soid << " from " << from << dendl;
-  list<pg_shard_t> fl = { from };
   auto it = pulling.find(soid);
   assert(it != pulling.end());
-  get_parent()->failed_push(fl, soid, it->second.recovery_info.version);
+  get_parent()->on_failed_pull(
+    { from },
+    soid,
+    it->second.recovery_info.version);
 
   clear_pull(it);
 }