]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PrimaryLogPG.cc
update sources to v12.1.2
[ceph.git] / ceph / src / osd / PrimaryLogPG.cc
index 33ef908f692089353b699fc8d193aaf41cbb513d..dbacb3c63c0f30bb34772f2d45ccf10138d4a0d6 100644 (file)
@@ -207,8 +207,6 @@ struct OnReadComplete : public Context {
     PrimaryLogPG *pg,
     PrimaryLogPG::OpContext *ctx) : pg(pg), opcontext(ctx) {}
   void finish(int r) override {
-    if (r < 0)
-      opcontext->async_read_result = r;
     opcontext->finish_read(pg);
   }
   ~OnReadComplete() override {}
@@ -269,34 +267,34 @@ void PrimaryLogPG::OpContext::finish_read(PrimaryLogPG *pg)
     assert(pg->in_progress_async_reads.size());
     assert(pg->in_progress_async_reads.front().second == this);
     pg->in_progress_async_reads.pop_front();
-    pg->complete_read_ctx(async_read_result, this);
+
+    // Restart the op context now that all reads have been
+    // completed. Read failures will be handled by the op finisher
+    pg->execute_ctx(this);
   }
 }
 
-class CopyFromCallback: public PrimaryLogPG::CopyCallback {
+class CopyFromCallback : public PrimaryLogPG::CopyCallback {
 public:
-  PrimaryLogPG::CopyResults *results;
-  int retval;
+  PrimaryLogPG::CopyResults *results = nullptr;
   PrimaryLogPG::OpContext *ctx;
-  explicit CopyFromCallback(PrimaryLogPG::OpContext *ctx_)
-    : results(NULL),
-      retval(0),
-      ctx(ctx_) {}
+  OSDOp &osd_op;
+
+  CopyFromCallback(PrimaryLogPG::OpContext *ctx, OSDOp &osd_op)
+    : ctx(ctx), osd_op(osd_op) {
+  }
   ~CopyFromCallback() override {}
 
   void finish(PrimaryLogPG::CopyCallbackResults results_) override {
     results = results_.get<1>();
     int r = results_.get<0>();
-    retval = r;
 
     // for finish_copyfrom
     ctx->user_at_version = results->user_version;
 
     if (r >= 0) {
       ctx->pg->execute_ctx(ctx);
-    }
-    ctx->copy_cb = NULL;
-    if (r < 0) {
+    } else {
       if (r != -ECANCELED) { // on cancel just toss it out; client resends
        if (ctx->op)
          ctx->pg->osd->reply_op_error(ctx->op, r);
@@ -314,8 +312,19 @@ public:
   uint64_t get_data_size() {
     return results->object_size;
   }
-  int get_result() {
-    return retval;
+};
+
+struct CopyFromFinisher : public PrimaryLogPG::OpFinisher {
+  CopyFromCallback *copy_from_callback;
+
+  CopyFromFinisher(CopyFromCallback *copy_from_callback)
+    : copy_from_callback(copy_from_callback) {
+  }
+
+  int execute() override {
+    // instance will be destructed after this method completes
+    copy_from_callback->ctx->pg->finish_copyfrom(copy_from_callback);
+    return 0;
   }
 };
 
@@ -326,6 +335,7 @@ void PrimaryLogPG::on_local_recover(
   const hobject_t &hoid,
   const ObjectRecoveryInfo &_recovery_info,
   ObjectContextRef obc,
+  bool is_delete,
   ObjectStore::Transaction *t
   )
 {
@@ -333,7 +343,7 @@ void PrimaryLogPG::on_local_recover(
 
   ObjectRecoveryInfo recovery_info(_recovery_info);
   clear_object_snap_mapping(t, hoid);
-  if (recovery_info.soid.is_snap()) {
+  if (!is_delete && recovery_info.soid.is_snap()) {
     OSDriver::OSTransaction _t(osdriver.get_transaction(t));
     set<snapid_t> snaps;
     dout(20) << " snapset " << recovery_info.ss
@@ -354,7 +364,7 @@ void PrimaryLogPG::on_local_recover(
       snaps,
       &_t);
   }
-  if (pg_log.get_missing().is_missing(recovery_info.soid) &&
+  if (!is_delete && pg_log.get_missing().is_missing(recovery_info.soid) &&
       pg_log.get_missing().get_items().find(recovery_info.soid)->second.need > recovery_info.version) {
     assert(is_primary());
     const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
@@ -391,24 +401,25 @@ void PrimaryLogPG::on_local_recover(
   recover_got(recovery_info.soid, recovery_info.version);
 
   if (is_primary()) {
-    assert(obc);
-    obc->obs.exists = true;
-    obc->ondisk_write_lock();
-
-    bool got = obc->get_recovery_read();
-    assert(got);
+    if (!is_delete) {
+      obc->obs.exists = true;
+      obc->ondisk_write_lock();
 
-    assert(recovering.count(obc->obs.oi.soid));
-    recovering[obc->obs.oi.soid] = obc;
-    obc->obs.oi = recovery_info.oi;  // may have been updated above
+      bool got = obc->get_recovery_read();
+      assert(got);
 
+      assert(recovering.count(obc->obs.oi.soid));
+      recovering[obc->obs.oi.soid] = obc;
+      obc->obs.oi = recovery_info.oi;  // may have been updated above
+      t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
+    }
 
     t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
-    t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
 
     publish_stats_to_osd();
     assert(missing_loc.needs_recovery(hoid));
-    missing_loc.add_location(hoid, pg_whoami);
+    if (!is_delete)
+      missing_loc.add_location(hoid, pg_whoami);
     release_backoffs(hoid);
     if (!is_unreadable_object(hoid)) {
       auto unreadable_object_entry = waiting_for_unreadable_object.find(hoid);
@@ -437,7 +448,8 @@ void PrimaryLogPG::on_local_recover(
 
 void PrimaryLogPG::on_global_recover(
   const hobject_t &soid,
-  const object_stat_sum_t &stat_diff)
+  const object_stat_sum_t &stat_diff,
+  bool is_delete)
 {
   info.stats.stats.sum.add(stat_diff);
   missing_loc.recovered(soid);
@@ -446,12 +458,14 @@ void PrimaryLogPG::on_global_recover(
   map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
   assert(i != recovering.end());
 
-  // recover missing won't have had an obc, but it gets filled in
-  // during on_local_recover
-  assert(i->second);
-  list<OpRequestRef> requeue_list;
-  i->second->drop_recovery_read(&requeue_list);
-  requeue_ops(requeue_list);
+  if (!is_delete) {
+    // recover missing won't have had an obc, but it gets filled in
+    // during on_local_recover
+    assert(i->second);
+    list<OpRequestRef> requeue_list;
+    i->second->drop_recovery_read(&requeue_list);
+    requeue_ops(requeue_list);
+  }
 
   backfills_in_flight.erase(soid);
 
@@ -562,6 +576,8 @@ void PrimaryLogPG::maybe_kick_recovery(
     PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
     if (is_missing_object(soid)) {
       recover_missing(soid, v, cct->_conf->osd_client_op_priority, h);
+    } else if (missing_loc.is_deleted(soid)) {
+      prep_object_replica_deletes(soid, v, h);
     } else {
       prep_object_replica_pushes(soid, v, h);
     }
@@ -911,7 +927,7 @@ int PrimaryLogPG::do_command(
   ConnectionRef con,
   ceph_tid_t tid)
 {
-  const pg_missing_t &missing = pg_log.get_missing();
+  const auto &missing = pg_log.get_missing();
   string prefix;
   string format;
 
@@ -1235,6 +1251,9 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
          if (candidate.get_namespace() == cct->_conf->osd_hit_set_namespace)
            continue;
 
+         if (missing_loc.is_deleted(candidate))
+           continue;
+
          // skip wrong namespace
          if (m->get_hobj().nspace != librados::all_nspaces &&
                candidate.get_namespace() != m->get_hobj().nspace)
@@ -1391,6 +1410,9 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
          if (candidate.get_namespace() != m->get_hobj().nspace)
            continue;
 
+         if (missing_loc.is_deleted(candidate))
+           continue;
+
          if (filter && !pgls_filter(filter, candidate, filter_out))
            continue;
 
@@ -2228,7 +2250,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     }
   }
 
-  OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, obc, this);
+  OpContext *ctx = new OpContext(op, m->get_reqid(), &m->ops, obc, this);
 
   if (!obc->obs.exists)
     ctx->snapset_obc = get_object_context(obc->obs.oi.soid.get_snapdir(), false);
@@ -2776,6 +2798,7 @@ void PrimaryLogPG::do_proxy_read(OpRequestRef op, ObjectContextRef obc)
        case CEPH_OSD_OP_SYNC_READ:
        case CEPH_OSD_OP_SPARSE_READ:
        case CEPH_OSD_OP_CHECKSUM:
+       case CEPH_OSD_OP_CMPEXT:
          op.flags = (op.flags | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL) &
                       ~(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_FADVISE_NOCACHE);
       }
@@ -2839,7 +2862,7 @@ void PrimaryLogPG::finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r)
   osd->logger->inc(l_osd_tier_proxy_read);
 
   const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
-  OpContext *ctx = new OpContext(op, m->get_reqid(), prdop->ops, this);
+  OpContext *ctx = new OpContext(op, m->get_reqid(), &prdop->ops, this);
   ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
   ctx->user_at_version = prdop->user_version;
   ctx->data_off = prdop->data_offset;
@@ -2964,7 +2987,7 @@ void PrimaryLogPG::do_proxy_write(OpRequestRef op, const hobject_t& missing_oid,
   dout(10) << __func__ << " Start proxy write for " << *m << dendl;
 
   ProxyWriteOpRef pwop(std::make_shared<ProxyWriteOp>(op, soid, m->ops, m->get_reqid()));
-  pwop->ctx = new OpContext(op, m->get_reqid(), pwop->ops, this);
+  pwop->ctx = new OpContext(op, m->get_reqid(), &pwop->ops, this);
   pwop->mtime = m->get_mtime();
 
   ObjectOperation obj_op;
@@ -3173,13 +3196,13 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
     ctx->at_version = get_next_version();
     ctx->mtime = m->get_mtime();
 
-    dout(10) << __func__ << " " << soid << " " << ctx->ops
+    dout(10) << __func__ << " " << soid << " " << *ctx->ops
             << " ov " << obc->obs.oi.version << " av " << ctx->at_version 
             << " snapc " << ctx->snapc
             << " snapset " << obc->ssc->snapset
             << dendl;  
   } else {
-    dout(10) << __func__ << " " << soid << " " << ctx->ops
+    dout(10) << __func__ << " " << soid << " " << *ctx->ops
             << " ov " << obc->obs.oi.version
             << dendl;  
   }
@@ -3216,8 +3239,13 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
     obc->ondisk_read_unlock();
   }
 
-  if (result == -EINPROGRESS) {
+  bool pending_async_reads = !ctx->pending_async_reads.empty();
+  if (result == -EINPROGRESS || pending_async_reads) {
     // come back later.
+    if (pending_async_reads) {
+      in_progress_async_reads.push_back(make_pair(op, ctx));
+      ctx->start_async_reads(this);
+    }
     return;
   }
 
@@ -3252,13 +3280,7 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
     if (result >= 0)
       do_osd_op_effects(ctx, m->get_connection());
 
-    if (ctx->pending_async_reads.empty()) {
-      complete_read_ctx(result, ctx);
-    } else {
-      in_progress_async_reads.push_back(make_pair(op, ctx));
-      ctx->start_async_reads(this);
-    }
-
+    complete_read_ctx(result, ctx);
     return;
   }
 
@@ -3297,8 +3319,8 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
     // save just what we need from ctx
     MOSDOpReply *reply = ctx->reply;
     ctx->reply = nullptr;
-    reply->claim_op_out_data(ctx->ops);
-    reply->get_header().data_off = ctx->data_off;
+    reply->claim_op_out_data(*ctx->ops);
+    reply->get_header().data_off = (ctx->data_off ? *ctx->data_off : 0);
     close_op_ctx(ctx);
 
     if (result == -ENOENT) {
@@ -3357,6 +3379,18 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
   repop->put();
 }
 
+void PrimaryLogPG::close_op_ctx(OpContext *ctx) {
+  release_object_locks(ctx->lock_manager);
+
+  ctx->op_t.reset();
+
+  for (auto p = ctx->on_finish.begin(); p != ctx->on_finish.end();
+       ctx->on_finish.erase(p++)) {
+    (*p)();
+  }
+  delete ctx;
+}
+
 void PrimaryLogPG::reply_ctx(OpContext *ctx, int r)
 {
   if (ctx->op)
@@ -3662,22 +3696,22 @@ int PrimaryLogPG::trim_object(
   } else {
     auto p = snapset.clone_snaps.find(coid.snap);
     if (p == snapset.clone_snaps.end()) {
-      osd->clog->error() << __func__ << " No clone_snaps in snapset " << snapset
-                        << " for " << coid << "\n";
+      osd->clog->error() << "No clone_snaps in snapset " << snapset
+                        << " for object " << coid << "\n";
       return -ENOENT;
     }
     old_snaps.insert(snapset.clone_snaps[coid.snap].begin(),
                     snapset.clone_snaps[coid.snap].end());
   }
   if (old_snaps.empty()) {
-    osd->clog->error() << __func__ << " No object info snaps for " << coid;
+    osd->clog->error() << "No object info snaps for object " << coid;
     return -ENOENT;
   }
 
   dout(10) << coid << " old_snaps " << old_snaps
           << " old snapset " << snapset << dendl;
   if (snapset.seq == 0) {
-    osd->clog->error() << __func__ << " No snapset.seq for " << coid;
+    osd->clog->error() << "No snapset.seq for object " << coid;
     return -ENOENT;
   }
 
@@ -3694,7 +3728,7 @@ int PrimaryLogPG::trim_object(
   if (new_snaps.empty()) {
     p = std::find(snapset.clones.begin(), snapset.clones.end(), coid.snap);
     if (p == snapset.clones.end()) {
-      osd->clog->error() << __func__ << " Snap " << coid.snap << " not in clones";
+      osd->clog->error() << "Snap " << coid.snap << " not in clones";
       return -ENOENT;
     }
   }
@@ -3994,37 +4028,6 @@ int PrimaryLogPG::do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr)
   }
 }
 
-int PrimaryLogPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
-{
-  ceph_osd_op& op = osd_op.op;
-  vector<OSDOp> read_ops(1);
-  OSDOp& read_op = read_ops[0];
-  int result = 0;
-
-  read_op.op.op = CEPH_OSD_OP_SYNC_READ;
-  read_op.op.extent.offset = op.extent.offset;
-  read_op.op.extent.length = op.extent.length;
-  read_op.op.extent.truncate_seq = op.extent.truncate_seq;
-  read_op.op.extent.truncate_size = op.extent.truncate_size;
-
-  result = do_osd_ops(ctx, read_ops);
-  if (result < 0) {
-    derr << "do_extent_cmp do_osd_ops failed " << result << dendl;
-    return result;
-  }
-
-  if (read_op.outdata.length() != osd_op.indata.length())
-    return -EINVAL;
-
-  for (uint64_t p = 0; p < osd_op.indata.length(); p++) {
-    if (read_op.outdata[p] != osd_op.indata[p]) {
-      return (-MAX_ERRNO - p);
-    }
-  }
-
-  return result;
-}
-
 int PrimaryLogPG::do_writesame(OpContext *ctx, OSDOp& osd_op)
 {
   ceph_osd_op& op = osd_op.op;
@@ -4369,10 +4372,13 @@ struct FillInVerifyExtent : public Context {
     r(r), rval(rv), outdatap(blp), maybe_crc(mc),
     size(size), osd(osd), soid(soid), flags(flags) {}
   void finish(int len) override {
-    *rval = len;
     *r = len;
-    if (len < 0)
+    if (len < 0) {
+      *rval = len;
       return;
+    }
+    *rval = 0;
+
     // whole object?  can we verify the checksum?
     if (maybe_crc && *r == size) {
       uint32_t crc = outdatap->crc32c(-1);
@@ -4390,19 +4396,25 @@ struct FillInVerifyExtent : public Context {
 };
 
 struct ToSparseReadResult : public Context {
-  bufferlist& data_bl;
+  int* result;
+  bufferlist* data_bl;
   uint64_t data_offset;
-  ceph_le64& len;
-  ToSparseReadResult(bufferlist& bl, uint64_t offset, ceph_le64& len):
-    data_bl(bl), data_offset(offset),len(len) {}
+  ceph_le64* len;
+  ToSparseReadResult(int* result, bufferlist* bl, uint64_t offset,
+                    ceph_le64* len)
+    : result(result), data_bl(bl), data_offset(offset),len(len) {}
   void finish(int r) override {
-    if (r < 0) return;
-    len = r;
+    if (r < 0) {
+      *result = r;
+      return;
+    }
+    *result = 0;
+    *len = r;
     bufferlist outdata;
     map<uint64_t, uint64_t> extents = {{data_offset, r}};
     ::encode(extents, outdata);
-    ::encode_destructively(data_bl, outdata);
-    data_bl.swap(outdata);
+    ::encode_destructively(*data_bl, outdata);
+    data_bl->swap(outdata);
   }
 };
 
@@ -4449,6 +4461,17 @@ void PrimaryLogPG::maybe_create_new_object(
   }
 }
 
+struct ReadFinisher : public PrimaryLogPG::OpFinisher {
+  OSDOp& osd_op;
+
+  ReadFinisher(OSDOp& osd_op) : osd_op(osd_op) {
+  }
+
+  int execute() override {
+    return osd_op.rval;
+  }
+};
+
 struct C_ChecksumRead : public Context {
   PrimaryLogPG *primary_log_pg;
   OSDOp &osd_op;
@@ -4468,21 +4491,24 @@ struct C_ChecksumRead : public Context {
                                             &read_bl, maybe_crc, size,
                                             osd, soid, flags)) {
   }
+  ~C_ChecksumRead() override {
+    delete fill_extent_ctx;
+  }
 
   void finish(int r) override {
     fill_extent_ctx->complete(r);
+    fill_extent_ctx = nullptr;
 
     if (osd_op.rval >= 0) {
       bufferlist::iterator init_value_bl_it = init_value_bl.begin();
       osd_op.rval = primary_log_pg->finish_checksum(osd_op, csum_type,
-                                                   &init_value_bl_it,
-                                                   read_bl);
+                                                   &init_value_bl_it, read_bl);
     }
   }
 };
 
 int PrimaryLogPG::do_checksum(OpContext *ctx, OSDOp& osd_op,
-                             bufferlist::iterator *bl_it, bool *async_read)
+                             bufferlist::iterator *bl_it)
 {
   dout(20) << __func__ << dendl;
 
@@ -4549,17 +4575,18 @@ int PrimaryLogPG::do_checksum(OpContext *ctx, OSDOp& osd_op,
     auto checksum_ctx = new C_ChecksumRead(this, osd_op, csum_type,
                                           std::move(init_value_bl), maybe_crc,
                                           oi.size, osd, soid, op.flags);
+
     ctx->pending_async_reads.push_back({
       {op.checksum.offset, op.checksum.length, op.flags},
       {&checksum_ctx->read_bl, checksum_ctx}});
 
     dout(10) << __func__ << ": async_read noted for " << soid << dendl;
-    *async_read = true;
-    return 0;
+    ctx->op_finishers[ctx->current_osd_subop_num].reset(
+      new ReadFinisher(osd_op));
+    return -EINPROGRESS;
   }
 
   // sync read
-  *async_read = false;
   std::vector<OSDOp> read_ops(1);
   auto& read_op = read_ops[0];
   if (op.checksum.length > 0) {
@@ -4647,6 +4674,334 @@ int PrimaryLogPG::finish_checksum(OSDOp& osd_op,
   return 0;
 }
 
+struct C_ExtentCmpRead : public Context {
+  PrimaryLogPG *primary_log_pg;
+  OSDOp &osd_op;
+  ceph_le64 read_length;
+  bufferlist read_bl;
+  Context *fill_extent_ctx;
+
+  C_ExtentCmpRead(PrimaryLogPG *primary_log_pg, OSDOp &osd_op,
+                 boost::optional<uint32_t> maybe_crc, uint64_t size,
+                 OSDService *osd, hobject_t soid, __le32 flags)
+    : primary_log_pg(primary_log_pg), osd_op(osd_op),
+      fill_extent_ctx(new FillInVerifyExtent(&read_length, &osd_op.rval,
+                                            &read_bl, maybe_crc, size,
+                                            osd, soid, flags)) {
+  }
+  ~C_ExtentCmpRead() override {
+    delete fill_extent_ctx;
+  }
+
+  void finish(int r) override {
+    if (r == -ENOENT) {
+      osd_op.rval = 0;
+      read_bl.clear();
+      delete fill_extent_ctx;
+    } else {
+      fill_extent_ctx->complete(r);
+    }
+    fill_extent_ctx = nullptr;
+
+    if (osd_op.rval >= 0) {
+      osd_op.rval = primary_log_pg->finish_extent_cmp(osd_op, read_bl);
+    }
+  }
+};
+
+int PrimaryLogPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
+{
+  dout(20) << __func__ << dendl;
+  ceph_osd_op& op = osd_op.op;
+
+  if (!ctx->obs->exists || ctx->obs->oi.is_whiteout()) {
+    dout(20) << __func__ << " object DNE" << dendl;
+    return finish_extent_cmp(osd_op, {});
+  } else if (pool.info.require_rollback()) {
+    // If there is a data digest and it is possible we are reading
+    // entire object, pass the digest.
+    auto& oi = ctx->new_obs.oi;
+    boost::optional<uint32_t> maybe_crc;
+    if (oi.is_data_digest() && op.checksum.offset == 0 &&
+        op.checksum.length >= oi.size) {
+      maybe_crc = oi.data_digest;
+    }
+
+    // async read
+    auto& soid = oi.soid;
+    auto extent_cmp_ctx = new C_ExtentCmpRead(this, osd_op, maybe_crc, oi.size,
+                                             osd, soid, op.flags);
+    ctx->pending_async_reads.push_back({
+      {op.extent.offset, op.extent.length, op.flags},
+      {&extent_cmp_ctx->read_bl, extent_cmp_ctx}});
+
+    dout(10) << __func__ << ": async_read noted for " << soid << dendl;
+
+    ctx->op_finishers[ctx->current_osd_subop_num].reset(
+      new ReadFinisher(osd_op));
+    return -EINPROGRESS;
+  }
+
+  // sync read
+  vector<OSDOp> read_ops(1);
+  OSDOp& read_op = read_ops[0];
+
+  read_op.op.op = CEPH_OSD_OP_SYNC_READ;
+  read_op.op.extent.offset = op.extent.offset;
+  read_op.op.extent.length = op.extent.length;
+  read_op.op.extent.truncate_seq = op.extent.truncate_seq;
+  read_op.op.extent.truncate_size = op.extent.truncate_size;
+
+  int result = do_osd_ops(ctx, read_ops);
+  if (result < 0) {
+    derr << __func__ << " failed " << result << dendl;
+    return result;
+  }
+  return finish_extent_cmp(osd_op, read_op.outdata);
+}
+
+int PrimaryLogPG::finish_extent_cmp(OSDOp& osd_op, const bufferlist &read_bl)
+{
+  for (uint64_t idx = 0; idx < osd_op.indata.length(); ++idx) {
+    char read_byte = (idx < read_bl.length() ? read_bl[idx] : 0);
+    if (osd_op.indata[idx] != read_byte) {
+        return (-MAX_ERRNO - idx);
+    }
+  }
+
+  return 0;
+}
+
+int PrimaryLogPG::do_read(OpContext *ctx, OSDOp& osd_op) {
+  dout(20) << __func__ << dendl;
+  auto& op = osd_op.op;
+  auto& oi = ctx->new_obs.oi;
+  auto& soid = oi.soid;
+  __u32 seq = oi.truncate_seq;
+  uint64_t size = oi.size;
+  bool trimmed_read = false;
+
+  // are we beyond truncate_size?
+  if ( (seq < op.extent.truncate_seq) &&
+       (op.extent.offset + op.extent.length > op.extent.truncate_size) )
+    size = op.extent.truncate_size;
+
+  if (op.extent.length == 0) //length is zero mean read the whole object
+    op.extent.length = size;
+
+  if (op.extent.offset >= size) {
+    op.extent.length = 0;
+    trimmed_read = true;
+  } else if (op.extent.offset + op.extent.length > size) {
+    op.extent.length = size - op.extent.offset;
+    trimmed_read = true;
+  }
+
+  // read into a buffer
+  int result = 0;
+  if (trimmed_read && op.extent.length == 0) {
+    // read size was trimmed to zero and it is expected to do nothing
+    // a read operation of 0 bytes does *not* do nothing, this is why
+    // the trimmed_read boolean is needed
+  } else if (pool.info.require_rollback()) {
+    boost::optional<uint32_t> maybe_crc;
+    // If there is a data digest and it is possible we are reading
+    // entire object, pass the digest.  FillInVerifyExtent will
+    // will check the oi.size again.
+    if (oi.is_data_digest() && op.extent.offset == 0 &&
+        op.extent.length >= oi.size)
+      maybe_crc = oi.data_digest;
+    ctx->pending_async_reads.push_back(
+      make_pair(
+        boost::make_tuple(op.extent.offset, op.extent.length, op.flags),
+        make_pair(&osd_op.outdata,
+                 new FillInVerifyExtent(&op.extent.length, &osd_op.rval,
+                                        &osd_op.outdata, maybe_crc, oi.size,
+                                        osd, soid, op.flags))));
+    dout(10) << " async_read noted for " << soid << dendl;
+
+    ctx->op_finishers[ctx->current_osd_subop_num].reset(
+      new ReadFinisher(osd_op));
+  } else {
+    int r = pgbackend->objects_read_sync(
+      soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata);
+    if (r == -EIO) {
+      r = rep_repair_primary_object(soid, ctx->op);
+    }
+    if (r >= 0)
+      op.extent.length = r;
+    else {
+      result = r;
+      op.extent.length = 0;
+    }
+    dout(10) << " read got " << r << " / " << op.extent.length
+            << " bytes from obj " << soid << dendl;
+
+    // whole object?  can we verify the checksum?
+    if (op.extent.length == oi.size && oi.is_data_digest()) {
+      uint32_t crc = osd_op.outdata.crc32c(-1);
+      if (oi.data_digest != crc) {
+        osd->clog->error() << info.pgid << std::hex
+                          << " full-object read crc 0x" << crc
+                          << " != expected 0x" << oi.data_digest
+                          << std::dec << " on " << soid;
+        // FIXME fall back to replica or something?
+        result = -EIO;
+      }
+    }
+  }
+
+  // XXX the op.extent.length is the requested length for async read
+  // On error this length is changed to 0 after the error comes back.
+  ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
+  ctx->delta_stats.num_rd++;
+  return result;
+}
+
+int PrimaryLogPG::do_sparse_read(OpContext *ctx, OSDOp& osd_op) {
+  dout(20) << __func__ << dendl;
+  auto& op = osd_op.op;
+  auto& oi = ctx->new_obs.oi;
+  auto& soid = oi.soid;
+
+  if (op.extent.truncate_seq) {
+    dout(0) << "sparse_read does not support truncation sequence " << dendl;
+    return -EINVAL;
+  }
+
+  ++ctx->num_read;
+  if (pool.info.ec_pool()) {
+    // translate sparse read to a normal one if not supported
+    uint64_t offset = op.extent.offset;
+    uint64_t length = op.extent.length;
+    if (offset > oi.size) {
+      length = 0;
+    } else if (offset + length > oi.size) {
+      length = oi.size - offset;
+    }
+
+    if (length > 0) {
+      ctx->pending_async_reads.push_back(
+        make_pair(
+          boost::make_tuple(offset, length, op.flags),
+          make_pair(
+           &osd_op.outdata,
+           new ToSparseReadResult(&osd_op.rval, &osd_op.outdata, offset,
+                                  &op.extent.length))));
+      dout(10) << " async_read (was sparse_read) noted for " << soid << dendl;
+
+      ctx->op_finishers[ctx->current_osd_subop_num].reset(
+        new ReadFinisher(osd_op));
+    } else {
+      dout(10) << " sparse read ended up empty for " << soid << dendl;
+      map<uint64_t, uint64_t> extents;
+      ::encode(extents, osd_op.outdata);
+    }
+  } else {
+    // read into a buffer
+    map<uint64_t, uint64_t> m;
+    uint32_t total_read = 0;
+    int r = osd->store->fiemap(ch, ghobject_t(soid, ghobject_t::NO_GEN,
+                                             info.pgid.shard),
+                              op.extent.offset, op.extent.length, m);
+    if (r < 0)  {
+      return r;
+    }
+
+    map<uint64_t, uint64_t>::iterator miter;
+    bufferlist data_bl;
+    uint64_t last = op.extent.offset;
+    for (miter = m.begin(); miter != m.end(); ++miter) {
+      // verify hole?
+      if (cct->_conf->osd_verify_sparse_read_holes &&
+          last < miter->first) {
+        bufferlist t;
+        uint64_t len = miter->first - last;
+        r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
+       if (r == -EIO) {
+         r = rep_repair_primary_object(soid, ctx->op);
+       }
+        if (r < 0) {
+          osd->clog->error() << coll << " " << soid
+                            << " sparse-read failed to read: "
+                            << r;
+        } else if (!t.is_zero()) {
+          osd->clog->error() << coll << " " << soid
+                            << " sparse-read found data in hole "
+                            << last << "~" << len;
+        }
+      }
+
+      bufferlist tmpbl;
+      r = pgbackend->objects_read_sync(soid, miter->first, miter->second,
+                                      op.flags, &tmpbl);
+      if (r < 0) {
+       return r;
+      }
+
+      // this is usually happen when we get extent that exceeds the actual file
+      // size
+      if (r < (int)miter->second)
+        miter->second = r;
+      total_read += r;
+      dout(10) << "sparse-read " << miter->first << "@" << miter->second
+              << dendl;
+      data_bl.claim_append(tmpbl);
+      last = miter->first + r;
+    }
+
+    if (r < 0) {
+      return r;
+    }
+
+    // verify trailing hole?
+    if (cct->_conf->osd_verify_sparse_read_holes) {
+      uint64_t end = MIN(op.extent.offset + op.extent.length, oi.size);
+      if (last < end) {
+        bufferlist t;
+        uint64_t len = end - last;
+        r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
+        if (r < 0) {
+          osd->clog->error() << coll << " " << soid
+                            << " sparse-read failed to read: " << r;
+        } else if (!t.is_zero()) {
+          osd->clog->error() << coll << " " << soid
+                            << " sparse-read found data in hole "
+                            << last << "~" << len;
+        }
+      }
+    }
+
+    // Why SPARSE_READ need checksum? In fact, librbd always use sparse-read.
+    // Maybe at first, there is no much whole objects. With continued use, more
+    // and more whole object exist. So from this point, for spare-read add
+    // checksum make sense.
+    if (total_read == oi.size && oi.is_data_digest()) {
+      uint32_t crc = data_bl.crc32c(-1);
+      if (oi.data_digest != crc) {
+        osd->clog->error() << info.pgid << std::hex
+          << " full-object read crc 0x" << crc
+          << " != expected 0x" << oi.data_digest
+          << std::dec << " on " << soid;
+        // FIXME fall back to replica or something?
+        return -EIO;
+      }
+    }
+
+    op.extent.length = total_read;
+
+    ::encode(m, osd_op.outdata); // re-encode since it might be modified
+    ::encode_destructively(data_bl, osd_op.outdata);
+
+    dout(10) << " sparse_read got " << total_read << " bytes from object "
+            << soid << dendl;
+  }
+
+  ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
+  ctx->delta_stats.num_rd++;
+  return 0;
+}
+
 int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 {
   int result = 0;
@@ -4655,16 +5010,23 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
   object_info_t& oi = obs.oi;
   const hobject_t& soid = oi.soid;
 
-  bool first_read = true;
-
   PGTransaction* t = ctx->op_t.get();
 
   dout(10) << "do_osd_op " << soid << " " << ops << dendl;
 
+  ctx->current_osd_subop_num = 0;
   for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p, ctx->current_osd_subop_num++) {
     OSDOp& osd_op = *p;
     ceph_osd_op& op = osd_op.op;
 
+    OpFinisher* op_finisher = nullptr;
+    {
+      auto op_finisher_it = ctx->op_finishers.find(ctx->current_osd_subop_num);
+      if (op_finisher_it != ctx->op_finishers.end()) {
+        op_finisher = op_finisher_it->second.get();
+      }
+    }
+
     // TODO: check endianness (__le32 vs uint32_t, etc.)
     // The fields in ceph_osd_op are little-endian (according to the definition in rados.h),
     // but the code in this function seems to treat them as native-endian.  What should the
@@ -4723,8 +5085,16 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_CMPEXT:
       ++ctx->num_read;
-      tracepoint(osd, do_osd_op_pre_extent_cmp, soid.oid.name.c_str(), soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
-      result = do_extent_cmp(ctx, osd_op);
+      tracepoint(osd, do_osd_op_pre_extent_cmp, soid.oid.name.c_str(),
+                soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset,
+                op.extent.length, op.extent.truncate_size,
+                op.extent.truncate_seq);
+
+      if (op_finisher == nullptr) {
+       result = do_extent_cmp(ctx, osd_op);
+      } else {
+       result = op_finisher->execute();
+      }
       break;
 
     case CEPH_OSD_OP_SYNC_READ:
@@ -4735,91 +5105,17 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       // fall through
     case CEPH_OSD_OP_READ:
       ++ctx->num_read;
-      {
-       __u32 seq = oi.truncate_seq;
-       uint64_t size = oi.size;
-       tracepoint(osd, do_osd_op_pre_read, soid.oid.name.c_str(), soid.snap.val, size, seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
-       bool trimmed_read = false;
-       // are we beyond truncate_size?
-       if ( (seq < op.extent.truncate_seq) &&
-            (op.extent.offset + op.extent.length > op.extent.truncate_size) )
-         size = op.extent.truncate_size;
-
-       if (op.extent.length == 0) //length is zero mean read the whole object
-         op.extent.length = size;
-
-       if (op.extent.offset >= size) {
-         op.extent.length = 0;
-         trimmed_read = true;
-       } else if (op.extent.offset + op.extent.length > size) {
-         op.extent.length = size - op.extent.offset;
-         trimmed_read = true;
-       }
-
-       // read into a buffer
-       bool async = false;
-       if (trimmed_read && op.extent.length == 0) {
-         // read size was trimmed to zero and it is expected to do nothing
-         // a read operation of 0 bytes does *not* do nothing, this is why
-         // the trimmed_read boolean is needed
-       } else if (pool.info.require_rollback()) {
-         async = true;
-         boost::optional<uint32_t> maybe_crc;
-         // If there is a data digest and it is possible we are reading
-         // entire object, pass the digest.  FillInVerifyExtent will
-         // will check the oi.size again.
-         if (oi.is_data_digest() && op.extent.offset == 0 &&
-             op.extent.length >= oi.size)
-           maybe_crc = oi.data_digest;
-         ctx->pending_async_reads.push_back(
-           make_pair(
-             boost::make_tuple(op.extent.offset, op.extent.length, op.flags),
-             make_pair(&osd_op.outdata,
-                       new FillInVerifyExtent(&op.extent.length, &osd_op.rval,
-                               &osd_op.outdata, maybe_crc, oi.size, osd,
-                               soid, op.flags))));
-         dout(10) << " async_read noted for " << soid << dendl;
-       } else {
-         int r = pgbackend->objects_read_sync(
-           soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata);
-         if (r == -EIO) {
-           r = rep_repair_primary_object(soid, ctx->op);
-         }
-         if (r >= 0)
-           op.extent.length = r;
-         else {
-           result = r;
-           op.extent.length = 0;
-         }
-         dout(10) << " read got " << r << " / " << op.extent.length
-                  << " bytes from obj " << soid << dendl;
-
-         // whole object?  can we verify the checksum?
-         if (op.extent.length == oi.size && oi.is_data_digest()) {
-           uint32_t crc = osd_op.outdata.crc32c(-1);
-           if (oi.data_digest != crc) {
-             osd->clog->error() << info.pgid << std::hex
-                                << " full-object read crc 0x" << crc
-                                << " != expected 0x" << oi.data_digest
-                                << std::dec << " on " << soid;
-             // FIXME fall back to replica or something?
-             result = -EIO;
-           }
-         }
-       }
-       if (first_read) {
-         first_read = false;
+      tracepoint(osd, do_osd_op_pre_read, soid.oid.name.c_str(),
+                soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset,
+                op.extent.length, op.extent.truncate_size,
+                op.extent.truncate_seq);
+      if (op_finisher == nullptr) {
+       if (!ctx->data_off) {
          ctx->data_off = op.extent.offset;
        }
-       // XXX the op.extent.length is the requested length for async read
-       // On error this length is changed to 0 after the error comes back.
-       ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
-       ctx->delta_stats.num_rd++;
-
-       // Skip checking the result and just proceed to the next operation
-       if (async)
-         continue;
-
+       result = do_read(ctx, osd_op);
+      } else {
+       result = op_finisher->execute();
       }
       break;
 
@@ -4831,10 +5127,10 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
                   op.checksum.offset, op.checksum.length,
                   op.checksum.chunk_size);
 
-       bool async_read;
-       result = do_checksum(ctx, osd_op, &bp, &async_read);
-       if (result == 0 && async_read) {
-         continue;
+       if (op_finisher == nullptr) {
+         result = do_checksum(ctx, osd_op, &bp);
+       } else {
+         result = op_finisher->execute();
        }
       }
       break;
@@ -4865,134 +5161,15 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     /* map extents */
     case CEPH_OSD_OP_SPARSE_READ:
-      tracepoint(osd, do_osd_op_pre_sparse_read, soid.oid.name.c_str(), soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
-      if (op.extent.truncate_seq) {
-       dout(0) << "sparse_read does not support truncation sequence " << dendl;
-       result = -EINVAL;
-       break;
-      }
-      ++ctx->num_read;
-      if (pool.info.ec_pool()) {
-       // translate sparse read to a normal one if not supported
-       uint64_t offset = op.extent.offset;
-       uint64_t length = op.extent.length;
-       if (offset > oi.size) {
-         length = 0;
-       } else if (offset + length > oi.size) {
-         length = oi.size - offset;
-       }
-       if (length > 0) {
-         ctx->pending_async_reads.push_back(
-           make_pair(
-             boost::make_tuple(offset, length, op.flags),
-             make_pair(
-               &osd_op.outdata,
-               new ToSparseReadResult(
-                 osd_op.outdata, offset,
-                 op.extent.length /* updated by the callback */))));
-         dout(10) << " async_read (was sparse_read) noted for " << soid << dendl;
-       } else {
-         dout(10) << " sparse read ended up empty for " << soid << dendl;
-         map<uint64_t, uint64_t> extents;
-         ::encode(extents, osd_op.outdata);
-       }
+      tracepoint(osd, do_osd_op_pre_sparse_read, soid.oid.name.c_str(),
+                soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset,
+                op.extent.length, op.extent.truncate_size,
+                op.extent.truncate_seq);
+      if (op_finisher == nullptr) {
+       result = do_sparse_read(ctx, osd_op);
       } else {
-       // read into a buffer
-        map<uint64_t, uint64_t> m;
-        uint32_t total_read = 0;
-       int r = osd->store->fiemap(ch, ghobject_t(soid, ghobject_t::NO_GEN,
-                                                 info.pgid.shard),
-                                  op.extent.offset, op.extent.length, m);
-       if (r < 0)  {
-         result = r;
-          break;
-       }
-        map<uint64_t, uint64_t>::iterator miter;
-        bufferlist data_bl;
-       uint64_t last = op.extent.offset;
-        for (miter = m.begin(); miter != m.end(); ++miter) {
-         // verify hole?
-         if (cct->_conf->osd_verify_sparse_read_holes &&
-             last < miter->first) {
-           bufferlist t;
-           uint64_t len = miter->first - last;
-           r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
-           if (r == -EIO) {
-             r = rep_repair_primary_object(soid, ctx->op);
-           }
-           if (r < 0) {
-             osd->clog->error() << coll << " " << soid
-                                << " sparse-read failed to read: "
-                                << r;
-           } else if (!t.is_zero()) {
-             osd->clog->error() << coll << " " << soid << " sparse-read found data in hole "
-                                << last << "~" << len;
-           }
-         }
-
-          bufferlist tmpbl;
-         r = pgbackend->objects_read_sync(soid, miter->first, miter->second, op.flags, &tmpbl);
-          if (r < 0) {
-            result = r;
-            break;
-          }
-
-          if (r < (int)miter->second) /* this is usually happen when we get extent that exceeds the actual file size */
-            miter->second = r;
-          total_read += r;
-          dout(10) << "sparse-read " << miter->first << "@" << miter->second << dendl;
-         data_bl.claim_append(tmpbl);
-         last = miter->first + r;
-        }
-        
-        if (r < 0) {
-          result = r;
-          break;
-        }
-
-       // verify trailing hole?
-       if (cct->_conf->osd_verify_sparse_read_holes) {
-         uint64_t end = MIN(op.extent.offset + op.extent.length, oi.size);
-         if (last < end) {
-           bufferlist t;
-           uint64_t len = end - last;
-           r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
-           if (r < 0) {
-             osd->clog->error() << coll << " " << soid
-                                << " sparse-read failed to read: "
-                                << r;
-           } else if (!t.is_zero()) {
-             osd->clog->error() << coll << " " << soid << " sparse-read found data in hole "
-                                << last << "~" << len;
-           }
-         }
-       }
-
-       // Why SPARSE_READ need checksum? In fact, librbd always use sparse-read. 
-       // Maybe at first, there is no much whole objects. With continued use, more and more whole object exist.
-       // So from this point, for spare-read add checksum make sense.
-       if (total_read == oi.size && oi.is_data_digest()) {
-         uint32_t crc = data_bl.crc32c(-1);
-         if (oi.data_digest != crc) {
-           osd->clog->error() << info.pgid << std::hex
-             << " full-object read crc 0x" << crc
-             << " != expected 0x" << oi.data_digest
-             << std::dec << " on " << soid;
-           // FIXME fall back to replica or something?
-           result = -EIO;
-           break;
-         }
-       }
-
-        op.extent.length = total_read;
-
-        ::encode(m, osd_op.outdata); // re-encode since it might be modified
-        ::encode_destructively(data_bl, osd_op.outdata);
-
-       dout(10) << " sparse_read got " << total_read << " bytes from object " << soid << dendl;
+       result = op_finisher->execute();
       }
-      ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
-      ctx->delta_stats.num_rd++;
       break;
 
     case CEPH_OSD_OP_CALL:
@@ -6501,8 +6678,13 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_COPY_GET:
       ++ctx->num_read;
-      tracepoint(osd, do_osd_op_pre_copy_get, soid.oid.name.c_str(), soid.snap.val);
-      result = fill_in_copy_get(ctx, bp, osd_op, ctx->obc);
+      tracepoint(osd, do_osd_op_pre_copy_get, soid.oid.name.c_str(),
+                soid.snap.val);
+      if (op_finisher == nullptr) {
+       result = do_copy_get(ctx, bp, osd_op, ctx->obc);
+      } else {
+       result = op_finisher->execute();
+      }
       break;
 
     case CEPH_OSD_OP_COPY_FROM:
@@ -6542,7 +6724,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
                   src_oloc.hash,
                   src_snapid,
                   src_version);
-       if (!ctx->copy_cb) {
+       if (op_finisher == nullptr) {
          // start
          pg_t raw_pg;
          get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg);
@@ -6554,8 +6736,9 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            result = -EINVAL;
            break;
          }
-         CopyFromCallback *cb = new CopyFromCallback(ctx);
-         ctx->copy_cb = cb;
+         CopyFromCallback *cb = new CopyFromCallback(ctx, osd_op);
+          ctx->op_finishers[ctx->current_osd_subop_num].reset(
+            new CopyFromFinisher(cb));
          start_copy(cb, ctx->obc, src, src_oloc, src_version,
                     op.copy_from.flags,
                     false,
@@ -6564,9 +6747,11 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          result = -EINPROGRESS;
        } else {
          // finish
-         assert(ctx->copy_cb->get_result() >= 0);
-         finish_copyfrom(ctx);
-         result = 0;
+         result = op_finisher->execute();
+         assert(result == 0);
+
+          // COPY_FROM cannot be executed multiple times -- it must restart
+          ctx->op_finishers.erase(ctx->current_osd_subop_num);
        }
       }
       break;
@@ -6764,8 +6949,8 @@ int PrimaryLogPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
     &rollback_to, false, false, &missing_oid);
   if (ret == -EAGAIN) {
     /* clone must be missing */
-    assert(is_missing_object(missing_oid));
-    dout(20) << "_rollback_to attempted to roll back to a missing object "
+    assert(is_degraded_or_backfilling_object(missing_oid));
+    dout(20) << "_rollback_to attempted to roll back to a missing or backfilling clone "
             << missing_oid << " (requested snapid: ) " << snapid << dendl;
     block_write_on_degraded_snap(missing_oid, ctx->op);
     return ret;
@@ -7238,8 +7423,8 @@ hobject_t PrimaryLogPG::get_temp_recovery_object(
 
 int PrimaryLogPG::prepare_transaction(OpContext *ctx)
 {
-  assert(!ctx->ops.empty());
-  
+  assert(!ctx->ops->empty());
+
   const hobject_t& soid = ctx->obs->oi.soid;
 
   // valid snap context?
@@ -7249,7 +7434,7 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx)
   }
 
   // prepare the actual mutation
-  int result = do_osd_ops(ctx, ctx->ops);
+  int result = do_osd_ops(ctx, *ctx->ops);
   if (result < 0) {
     if (ctx->op->may_write() &&
        get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
@@ -7511,16 +7696,16 @@ void PrimaryLogPG::complete_read_ctx(int result, OpContext *ctx)
   const MOSDOp *m = static_cast<const MOSDOp*>(ctx->op->get_req());
   assert(ctx->async_reads_complete());
 
-  for (vector<OSDOp>::iterator p = ctx->ops.begin();
-    p != ctx->ops.end() && result >= 0; ++p) {
+  for (vector<OSDOp>::iterator p = ctx->ops->begin();
+    p != ctx->ops->end() && result >= 0; ++p) {
     if (p->rval < 0 && !(p->op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
       result = p->rval;
       break;
     }
     ctx->bytes_read += p->outdata.length();
   }
-  ctx->reply->claim_op_out_data(ctx->ops);
-  ctx->reply->get_header().data_off = ctx->data_off;
+  ctx->reply->claim_op_out_data(*ctx->ops);
+  ctx->reply->get_header().data_off = (ctx->data_off ? *ctx->data_off : 0);
 
   MOSDOpReply *reply = ctx->reply;
   ctx->reply = nullptr;
@@ -7581,6 +7766,11 @@ struct C_CopyFrom_AsyncReadCb : public Context {
   C_CopyFrom_AsyncReadCb(OSDOp *osd_op, uint64_t features) :
     osd_op(osd_op), features(features), len(0) {}
   void finish(int r) override {
+    osd_op->rval = r;
+    if (r < 0) {
+      return;
+    }
+
     assert(len > 0);
     assert(len <= reply_obj.data.length());
     bufferlist bl;
@@ -7590,11 +7780,8 @@ struct C_CopyFrom_AsyncReadCb : public Context {
   }
 };
 
-int PrimaryLogPG::fill_in_copy_get(
-  OpContext *ctx,
-  bufferlist::iterator& bp,
-  OSDOp& osd_op,
-  ObjectContextRef &obc)
+int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::iterator& bp,
+                             OSDOp& osd_op, ObjectContextRef &obc)
 {
   object_info_t& oi = obc->obs.oi;
   hobject_t& soid = oi.soid;
@@ -7676,17 +7863,21 @@ int PrimaryLogPG::fill_in_copy_get(
          make_pair(
            boost::make_tuple(cursor.data_offset, max_read, osd_op.op.flags),
            make_pair(&bl, cb)));
-        result = max_read;
-       cb->len = result;
+       cb->len = max_read;
+
+        ctx->op_finishers[ctx->current_osd_subop_num].reset(
+          new ReadFinisher(osd_op));
+       result = -EINPROGRESS;
+
+       dout(10) << __func__ << ": async_read noted for " << soid << dendl;
       } else {
        result = pgbackend->objects_read_sync(
-         oi.soid, cursor.data_offset, left, osd_op.op.flags, &bl);
+         oi.soid, cursor.data_offset, max_read, osd_op.op.flags, &bl);
        if (result < 0)
          return result;
       }
-      assert(result <= left);
-      left -= result;
-      cursor.data_offset += result;
+      left -= max_read;
+      cursor.data_offset += max_read;
     }
     if (cursor.data_offset == oi.size) {
       cursor.data_complete = true;
@@ -7754,7 +7945,10 @@ int PrimaryLogPG::fill_in_copy_get(
   if (cb && !async_read_started) {
     delete cb;
   }
-  result = 0;
+
+  if (result > 0) {
+    result = 0;
+  }
   return result;
 }
 
@@ -8152,12 +8346,12 @@ void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t)
   cop->temp_cursor = cop->cursor;
 }
 
-void PrimaryLogPG::finish_copyfrom(OpContext *ctx)
+void PrimaryLogPG::finish_copyfrom(CopyFromCallback *cb)
 {
+  OpContext *ctx = cb->ctx;
   dout(20) << "finish_copyfrom on " << ctx->obs->oi.soid << dendl;
-  ObjectState& obs = ctx->new_obs;
-  CopyFromCallback *cb = static_cast<CopyFromCallback*>(ctx->copy_cb);
 
+  ObjectState& obs = ctx->new_obs;
   if (obs.exists) {
     dout(20) << __func__ << ": exists, removing" << dendl;
     ctx->op_t->remove(obs.oi.soid);
@@ -8980,7 +9174,7 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version)
   last_update_applied = applied_version;
   if (is_primary()) {
     if (scrubber.active) {
-      if (last_update_applied == scrubber.subset_last_update) {
+      if (last_update_applied >= scrubber.subset_last_update) {
         if (ops_blocked_by_scrub()) {
           requeue_scrub(true);
         } else {
@@ -8993,7 +9187,7 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version)
     }
   } else {
     if (scrubber.active_rep_scrub) {
-      if (last_update_applied == static_cast<const MOSDRepScrub*>(
+      if (last_update_applied >= static_cast<const MOSDRepScrub*>(
            scrubber.active_rep_scrub->get_req())->scrub_to) {
        osd->enqueue_back(
          info.pgid,
@@ -9226,10 +9420,9 @@ void PrimaryLogPG::remove_repop(RepGather *repop)
 PrimaryLogPG::OpContextUPtr PrimaryLogPG::simple_opc_create(ObjectContextRef obc)
 {
   dout(20) << __func__ << " " << obc->obs.oi.soid << dendl;
-  vector<OSDOp> ops;
   ceph_tid_t rep_tid = osd->get_tid();
   osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
-  OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, ops, obc, this));
+  OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, nullptr, obc, this));
   ctx->op_t.reset(new PGTransaction());
   ctx->mtime = ceph_clock_now();
   return ctx;
@@ -9883,11 +10076,16 @@ int PrimaryLogPG::find_object_context(const hobject_t& oid,
 
   ObjectContextRef obc = get_object_context(soid, false);
   if (!obc || !obc->obs.exists) {
-    dout(20) << __func__ << " missing clone " << soid << dendl;
     if (pmissing)
       *pmissing = soid;
     put_snapset_context(ssc);
-    return -ENOENT;
+    if (is_degraded_or_backfilling_object(soid)) {
+      dout(20) << __func__ << " clone is degraded or backfilling " << soid << dendl;
+      return -EAGAIN;
+    } else {
+      dout(20) << __func__ << " missing clone " << soid << dendl;
+      return -ENOENT;
+    }
   }
 
   if (!obc->ssc) {
@@ -10089,6 +10287,40 @@ int PrimaryLogPG::recover_missing(
     return PULL_NONE;
   }
 
+  if (missing_loc.is_deleted(soid)) {
+    start_recovery_op(soid);
+    assert(!recovering.count(soid));
+    recovering.insert(make_pair(soid, ObjectContextRef()));
+    epoch_t cur_epoch = get_osdmap()->get_epoch();
+    remove_missing_object(soid, v, new FunctionContext(
+     [=](int) {
+       lock();
+       if (!pg_has_reset_since(cur_epoch)) {
+        bool object_missing = false;
+        for (const auto& shard : actingbackfill) {
+          if (shard == pg_whoami)
+            continue;
+          if (peer_missing[shard].is_missing(soid)) {
+            dout(20) << __func__ << ": soid " << soid << " needs to be deleted from replica " << shard << dendl;
+            object_missing = true;
+            break;
+          }
+        }
+        if (!object_missing) {
+          object_stat_sum_t stat_diff;
+          stat_diff.num_objects_recovered = 1;
+          on_global_recover(soid, stat_diff, true);
+        } else {
+          auto recovery_handle = pgbackend->open_recovery_op();
+          pgbackend->recover_delete_object(soid, v, recovery_handle);
+          pgbackend->run_recovery_op(recovery_handle, priority);
+        }
+       }
+       unlock();
+     }));
+    return PULL_YES;
+  }
+
   // is this a snapped object?  if so, consult the snapset.. we may not need the entire object!
   ObjectContextRef obc;
   ObjectContextRef head_obc;
@@ -10168,11 +10400,42 @@ void PrimaryLogPG::send_remove_op(
   osd->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch());
 }
 
+void PrimaryLogPG::remove_missing_object(const hobject_t &soid,
+                                        eversion_t v, Context *on_complete)
+{
+  dout(20) << __func__ << " " << soid << " " << v << dendl;
+  assert(on_complete != nullptr);
+  // delete locally
+  ObjectStore::Transaction t;
+  remove_snap_mapped_object(t, soid);
+
+  ObjectRecoveryInfo recovery_info;
+  recovery_info.soid = soid;
+  recovery_info.version = v;
+
+  epoch_t cur_epoch = get_osdmap()->get_epoch();
+  t.register_on_complete(new FunctionContext(
+     [=](int) {
+       lock();
+       if (!pg_has_reset_since(cur_epoch)) {
+        ObjectStore::Transaction t2;
+        on_local_recover(soid, recovery_info, ObjectContextRef(), true, &t2);
+        t2.register_on_complete(on_complete);
+        int r = osd->store->queue_transaction(osr.get(), std::move(t2), nullptr);
+        assert(r == 0);
+        unlock();
+       } else {
+        unlock();
+        on_complete->complete(-EAGAIN);
+       }
+     }));
+  int r = osd->store->queue_transaction(osr.get(), std::move(t), nullptr);
+  assert(r == 0);
+}
 
 void PrimaryLogPG::finish_degraded_object(const hobject_t& oid)
 {
   dout(10) << "finish_degraded_object " << oid << dendl;
-  ObjectContextRef obc(object_contexts.lookup(oid));
   if (callbacks_for_degraded_object.count(oid)) {
     list<Context*> contexts;
     contexts.swap(callbacks_for_degraded_object[oid]);
@@ -10224,8 +10487,10 @@ void PrimaryLogPG::_committed_pushed_object(
 void PrimaryLogPG::_applied_recovered_object(ObjectContextRef obc)
 {
   lock();
-  dout(10) << "_applied_recovered_object " << *obc << dendl;
-
+  dout(20) << __func__ << dendl;
+  if (obc) {
+    dout(20) << "obc = " << *obc << dendl;
+  }
   assert(active_pushes >= 1);
   --active_pushes;
 
@@ -10238,15 +10503,13 @@ void PrimaryLogPG::_applied_recovered_object(ObjectContextRef obc)
       requeue_scrub(false);
     }
   }
-
   unlock();
 }
 
 void PrimaryLogPG::_applied_recovered_object_replica()
 {
   lock();
-  dout(10) << "_applied_recovered_object_replica" << dendl;
-
+  dout(20) << __func__ << dendl;
   assert(active_pushes >= 1);
   --active_pushes;
 
@@ -10259,7 +10522,6 @@ void PrimaryLogPG::_applied_recovered_object_replica()
       PGQueueable(scrubber.active_rep_scrub, get_osdmap()->get_epoch()));
     scrubber.active_rep_scrub = OpRequestRef();
   }
-
   unlock();
 }
 
@@ -10519,8 +10781,15 @@ void PrimaryLogPG::mark_all_unfound_lost(
     std::move(manager),
     boost::optional<std::function<void(void)> >(
       [this, oids, con, num_unfound, tid]() {
-         for (auto oid: oids)
+       if (perform_deletes_during_peering()) {
+         for (auto oid : oids) {
+           // clear old locations - merge_new_log_entries will have
+           // handled rebuilding missing_loc for each of these
+           // objects if we have the RECOVERY_DELETES flag
            missing_loc.recovered(oid);
+         }
+       }
+
        for (auto& p : waiting_for_unreadable_object) {
          release_backoffs(p.first);
        }
@@ -10651,6 +10920,18 @@ void PrimaryLogPG::on_removal(ObjectStore::Transaction *t)
     on_shutdown();
 }
 
+void PrimaryLogPG::clear_async_reads()
+{
+  dout(10) << __func__ << dendl;
+  for(auto& i : in_progress_async_reads) {
+    dout(10) << "clear ctx: "
+             << "OpRequestRef " << i.first
+             << " OpContext " << i.second
+             << dendl;
+    close_op_ctx(i.second);
+  }
+}
+
 void PrimaryLogPG::on_shutdown()
 {
   dout(10) << "on_shutdown" << dendl;
@@ -10686,6 +10967,8 @@ void PrimaryLogPG::on_shutdown()
   context_registry_on_change();
   object_contexts.clear();
 
+  clear_async_reads();
+
   osd->remote_reserver.cancel_reservation(info.pgid);
   osd->local_reserver.cancel_reservation(info.pgid);
 
@@ -10746,6 +11029,12 @@ void PrimaryLogPG::on_activate()
 
 void PrimaryLogPG::_on_new_interval()
 {
+  dout(20) << __func__ << "checking missing set deletes flag. missing = " << pg_log.get_missing() << dendl;
+  if (!pg_log.get_missing().may_include_deletes &&
+      get_osdmap()->test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
+    pg_log.rebuild_missing_set_with_deletes(osd->store, coll, info);
+  }
+  assert(pg_log.get_missing().may_include_deletes == get_osdmap()->test_flag(CEPH_OSDMAP_RECOVERY_DELETES));
 }
 
 void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
@@ -11030,7 +11319,7 @@ bool PrimaryLogPG::start_recovery_ops(
     return false;
   }
 
-  const pg_missing_t &missing = pg_log.get_missing();
+  const auto &missing = pg_log.get_missing();
 
   unsigned int num_missing = missing.num_missing();
   uint64_t num_unfound = get_num_unfound();
@@ -11111,20 +11400,22 @@ bool PrimaryLogPG::start_recovery_ops(
 
   if (missing.num_missing() > 0) {
     // this shouldn't happen!
-    osd->clog->error() << info.pgid << " recovery ending with " << missing.num_missing()
-                      << ": " << missing.get_items();
+    osd->clog->error() << info.pgid << " Unexpected Error: recovery ending with "
+                      << missing.num_missing() << ": " << missing.get_items();
     return work_in_progress;
   }
 
   if (needs_recovery()) {
     // this shouldn't happen!
     // We already checked num_missing() so we must have missing replicas
-    osd->clog->error() << info.pgid << " recovery ending with missing replicas";
+    osd->clog->error() << info.pgid 
+                       << " Unexpected Error: recovery ending with missing replicas";
     return work_in_progress;
   }
 
   if (state_test(PG_STATE_RECOVERING)) {
     state_clear(PG_STATE_RECOVERING);
+    state_clear(PG_STATE_FORCED_RECOVERY);
     if (needs_backfill()) {
       dout(10) << "recovery done, queuing backfill" << dendl;
       queue_peering_event(
@@ -11136,6 +11427,7 @@ bool PrimaryLogPG::start_recovery_ops(
     } else {
       dout(10) << "recovery done, no backfill" << dendl;
       eio_errors_to_process = false;
+      state_clear(PG_STATE_FORCED_BACKFILL);
       queue_peering_event(
         CephPeeringEvtRef(
           std::make_shared<CephPeeringEvt>(
@@ -11145,6 +11437,8 @@ bool PrimaryLogPG::start_recovery_ops(
     }
   } else { // backfilling
     state_clear(PG_STATE_BACKFILL);
+    state_clear(PG_STATE_FORCED_BACKFILL);
+    state_clear(PG_STATE_FORCED_RECOVERY);
     dout(10) << "recovery done, backfill done" << dendl;
     eio_errors_to_process = false;
     queue_peering_event(
@@ -11166,7 +11460,7 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
 {
   assert(is_primary());
 
-  const pg_missing_t &missing = pg_log.get_missing();
+  const auto &missing = pg_log.get_missing();
 
   dout(10) << "recover_primary recovering " << recovering.size()
           << " in pg" << dendl;
@@ -11188,7 +11482,7 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
 
     if (pg_log.get_log().objects.count(p->second)) {
       latest = pg_log.get_log().objects.find(p->second)->second;
-      assert(latest->is_update());
+      assert(latest->is_update() || latest->is_delete());
       soid = latest->soid;
     } else {
       latest = 0;
@@ -11345,6 +11639,21 @@ bool PrimaryLogPG::primary_error(
   return uhoh;
 }
 
+int PrimaryLogPG::prep_object_replica_deletes(
+  const hobject_t& soid, eversion_t v,
+  PGBackend::RecoveryHandle *h)
+{
+  assert(is_primary());
+  dout(10) << __func__ << ": on " << soid << dendl;
+
+  start_recovery_op(soid);
+  assert(!recovering.count(soid));
+  recovering.insert(make_pair(soid, ObjectContextRef()));
+
+  pgbackend->recover_delete_object(soid, v, h);
+  return 1;
+}
+
 int PrimaryLogPG::prep_object_replica_pushes(
   const hobject_t& soid, eversion_t v,
   PGBackend::RecoveryHandle *h)
@@ -11445,6 +11754,13 @@ uint64_t PrimaryLogPG::recover_replicas(uint64_t max, ThreadPool::TPHandle &hand
        continue;
       }
 
+      if (missing_loc.is_deleted(soid)) {
+       dout(10) << __func__ << ": " << soid << " is a delete, removing" << dendl;
+       map<hobject_t,pg_missing_item>::const_iterator r = m.get_items().find(soid);
+       started += prep_object_replica_deletes(soid, r->second.need, h);
+       continue;
+      }
+
       if (soid.is_snap() && pg_log.get_missing().is_missing(soid.get_head())) {
        dout(10) << __func__ << ": " << soid.get_head()
                 << " still missing on primary" << dendl;
@@ -11963,7 +12279,7 @@ int PrimaryLogPG::prep_backfill_object_push(
   for (unsigned int i = 0 ; i < peers.size(); ++i) {
     map<pg_shard_t, pg_missing_t>::iterator bpm = peer_missing.find(peers[i]);
     assert(bpm != peer_missing.end());
-    bpm->second.add(oid, eversion_t(), eversion_t());
+    bpm->second.add(oid, eversion_t(), eversion_t(), false);
   }
 
   assert(!recovering.count(oid));
@@ -12123,7 +12439,7 @@ void PrimaryLogPG::check_local()
       continue;
     did.insert(p->soid);
 
-    if (p->is_delete()) {
+    if (p->is_delete() && !is_missing_object(p->soid)) {
       dout(10) << " checking " << p->soid
               << " at " << p->version << dendl;
       struct stat st;
@@ -13329,7 +13645,8 @@ unsigned PrimaryLogPG::process_clones_to(const boost::optional<hobject_t> &head,
     if (!allow_incomplete_clones) {
       next_clone.snap = **curclone;
       clog->error() << mode << " " << pgid << " " << head.get()
-                        << " expected clone " << next_clone;
+                        << " expected clone " << next_clone << " " << missing
+                         << " missing";
       ++scrubber.shallow_errors;
       e.set_clone_missing(next_clone.snap);
     }
@@ -13708,7 +14025,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
     ObjectContextRef obc = get_object_context(p->first, false);
     if (!obc) {
       osd->clog->error() << info.pgid << " " << mode
-                        << " cannot get object context for "
+                        << " cannot get object context for object "
                         << p->first;
       continue;
     } else if (obc->obs.oi.soid != p->first) {
@@ -13765,7 +14082,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
     ObjectContextRef obc = get_object_context(p.first, true);
     if (!obc) {
       osd->clog->error() << info.pgid << " " << mode
-                        << " cannot get object context for "
+                        << " cannot get object context for object "
                         << p.first;
       continue;
     } else if (obc->obs.oi.soid != p.first) {