]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PrimaryLogPG.cc
update sources to v12.2.3
[ceph.git] / ceph / src / osd / PrimaryLogPG.cc
index 8c4f08c7090dd738ee3fcbbe61684e8593814e77..7aba1df5d2d9852b45dbc6ebc807c2273dd40fd6 100644 (file)
@@ -207,8 +207,6 @@ struct OnReadComplete : public Context {
     PrimaryLogPG *pg,
     PrimaryLogPG::OpContext *ctx) : pg(pg), opcontext(ctx) {}
   void finish(int r) override {
-    if (r < 0)
-      opcontext->async_read_result = r;
     opcontext->finish_read(pg);
   }
   ~OnReadComplete() override {}
@@ -269,34 +267,34 @@ void PrimaryLogPG::OpContext::finish_read(PrimaryLogPG *pg)
     assert(pg->in_progress_async_reads.size());
     assert(pg->in_progress_async_reads.front().second == this);
     pg->in_progress_async_reads.pop_front();
-    pg->complete_read_ctx(async_read_result, this);
+
+    // Restart the op context now that all reads have been
+    // completed. Read failures will be handled by the op finisher
+    pg->execute_ctx(this);
   }
 }
 
-class CopyFromCallback: public PrimaryLogPG::CopyCallback {
+class CopyFromCallback : public PrimaryLogPG::CopyCallback {
 public:
-  PrimaryLogPG::CopyResults *results;
-  int retval;
+  PrimaryLogPG::CopyResults *results = nullptr;
   PrimaryLogPG::OpContext *ctx;
-  explicit CopyFromCallback(PrimaryLogPG::OpContext *ctx_)
-    : results(NULL),
-      retval(0),
-      ctx(ctx_) {}
+  OSDOp &osd_op;
+
+  CopyFromCallback(PrimaryLogPG::OpContext *ctx, OSDOp &osd_op)
+    : ctx(ctx), osd_op(osd_op) {
+  }
   ~CopyFromCallback() override {}
 
   void finish(PrimaryLogPG::CopyCallbackResults results_) override {
     results = results_.get<1>();
     int r = results_.get<0>();
-    retval = r;
 
     // for finish_copyfrom
     ctx->user_at_version = results->user_version;
 
     if (r >= 0) {
       ctx->pg->execute_ctx(ctx);
-    }
-    ctx->copy_cb = NULL;
-    if (r < 0) {
+    } else {
       if (r != -ECANCELED) { // on cancel just toss it out; client resends
        if (ctx->op)
          ctx->pg->osd->reply_op_error(ctx->op, r);
@@ -314,8 +312,19 @@ public:
   uint64_t get_data_size() {
     return results->object_size;
   }
-  int get_result() {
-    return retval;
+};
+
+struct CopyFromFinisher : public PrimaryLogPG::OpFinisher {
+  CopyFromCallback *copy_from_callback;
+
+  CopyFromFinisher(CopyFromCallback *copy_from_callback)
+    : copy_from_callback(copy_from_callback) {
+  }
+
+  int execute() override {
+    // instance will be destructed after this method completes
+    copy_from_callback->ctx->pg->finish_copyfrom(copy_from_callback);
+    return 0;
   }
 };
 
@@ -326,6 +335,7 @@ void PrimaryLogPG::on_local_recover(
   const hobject_t &hoid,
   const ObjectRecoveryInfo &_recovery_info,
   ObjectContextRef obc,
+  bool is_delete,
   ObjectStore::Transaction *t
   )
 {
@@ -333,7 +343,7 @@ void PrimaryLogPG::on_local_recover(
 
   ObjectRecoveryInfo recovery_info(_recovery_info);
   clear_object_snap_mapping(t, hoid);
-  if (recovery_info.soid.is_snap()) {
+  if (!is_delete && recovery_info.soid.is_snap()) {
     OSDriver::OSTransaction _t(osdriver.get_transaction(t));
     set<snapid_t> snaps;
     dout(20) << " snapset " << recovery_info.ss
@@ -354,7 +364,7 @@ void PrimaryLogPG::on_local_recover(
       snaps,
       &_t);
   }
-  if (pg_log.get_missing().is_missing(recovery_info.soid) &&
+  if (!is_delete && pg_log.get_missing().is_missing(recovery_info.soid) &&
       pg_log.get_missing().get_items().find(recovery_info.soid)->second.need > recovery_info.version) {
     assert(is_primary());
     const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
@@ -391,24 +401,25 @@ void PrimaryLogPG::on_local_recover(
   recover_got(recovery_info.soid, recovery_info.version);
 
   if (is_primary()) {
-    assert(obc);
-    obc->obs.exists = true;
-    obc->ondisk_write_lock();
-
-    bool got = obc->get_recovery_read();
-    assert(got);
+    if (!is_delete) {
+      obc->obs.exists = true;
+      obc->ondisk_write_lock();
 
-    assert(recovering.count(obc->obs.oi.soid));
-    recovering[obc->obs.oi.soid] = obc;
-    obc->obs.oi = recovery_info.oi;  // may have been updated above
+      bool got = obc->get_recovery_read();
+      assert(got);
 
+      assert(recovering.count(obc->obs.oi.soid));
+      recovering[obc->obs.oi.soid] = obc;
+      obc->obs.oi = recovery_info.oi;  // may have been updated above
+      t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
+    }
 
     t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
-    t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
 
     publish_stats_to_osd();
     assert(missing_loc.needs_recovery(hoid));
-    missing_loc.add_location(hoid, pg_whoami);
+    if (!is_delete)
+      missing_loc.add_location(hoid, pg_whoami);
     release_backoffs(hoid);
     if (!is_unreadable_object(hoid)) {
       auto unreadable_object_entry = waiting_for_unreadable_object.find(hoid);
@@ -418,10 +429,6 @@ void PrimaryLogPG::on_local_recover(
        waiting_for_unreadable_object.erase(unreadable_object_entry);
       }
     }
-    if (pg_log.get_missing().get_items().size() == 0) {
-      requeue_ops(waiting_for_all_missing);
-      waiting_for_all_missing.clear();
-    }
   } else {
     t->register_on_applied(
       new C_OSD_AppliedRecoveredObjectReplica(this));
@@ -441,7 +448,8 @@ void PrimaryLogPG::on_local_recover(
 
 void PrimaryLogPG::on_global_recover(
   const hobject_t &soid,
-  const object_stat_sum_t &stat_diff)
+  const object_stat_sum_t &stat_diff,
+  bool is_delete)
 {
   info.stats.stats.sum.add(stat_diff);
   missing_loc.recovered(soid);
@@ -450,12 +458,14 @@ void PrimaryLogPG::on_global_recover(
   map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
   assert(i != recovering.end());
 
-  // recover missing won't have had an obc, but it gets filled in
-  // during on_local_recover
-  assert(i->second);
-  list<OpRequestRef> requeue_list;
-  i->second->drop_recovery_read(&requeue_list);
-  requeue_ops(requeue_list);
+  if (!is_delete) {
+    // recover missing won't have had an obc, but it gets filled in
+    // during on_local_recover
+    assert(i->second);
+    list<OpRequestRef> requeue_list;
+    i->second->drop_recovery_read(&requeue_list);
+    requeue_ops(requeue_list);
+  }
 
   backfills_in_flight.erase(soid);
 
@@ -518,6 +528,25 @@ void PrimaryLogPG::send_message_osd_cluster(
   osd->send_message_osd_cluster(m, con);
 }
 
+void PrimaryLogPG::on_primary_error(
+  const hobject_t &oid,
+  eversion_t v)
+{
+  dout(0) << __func__ << ": oid " << oid << " version " << v << dendl;
+  primary_failed(oid);
+  primary_error(oid, v);
+  backfill_add_missing(oid, v);
+}
+
+void PrimaryLogPG::backfill_add_missing(
+  const hobject_t &oid,
+  eversion_t v)
+{
+  dout(0) << __func__ << ": oid " << oid << " version " << v << dendl;
+  backfills_in_flight.erase(oid);
+  missing_loc.add_missing(oid, v, eversion_t());
+}
+
 ConnectionRef PrimaryLogPG::get_con_osd_cluster(
   int peer, epoch_t from_epoch)
 {
@@ -555,6 +584,8 @@ void PrimaryLogPG::maybe_kick_recovery(
     PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
     if (is_missing_object(soid)) {
       recover_missing(soid, v, cct->_conf->osd_client_op_priority, h);
+    } else if (missing_loc.is_deleted(soid)) {
+      prep_object_replica_deletes(soid, v, h);
     } else {
       prep_object_replica_pushes(soid, v, h);
     }
@@ -571,12 +602,6 @@ void PrimaryLogPG::wait_for_unreadable_object(
   op->mark_delayed("waiting for missing object");
 }
 
-void PrimaryLogPG::wait_for_all_missing(OpRequestRef op)
-{
-  waiting_for_all_missing.push_back(op);
-  op->mark_delayed("waiting for all missing");
-}
-
 bool PrimaryLogPG::is_degraded_or_backfilling_object(const hobject_t& soid)
 {
   /* The conditions below may clear (on_local_recover, before we queue
@@ -629,6 +654,15 @@ void PrimaryLogPG::block_write_on_full_cache(
   op->mark_delayed("waiting for cache not full");
 }
 
+void PrimaryLogPG::block_for_clean(
+  const hobject_t& oid, OpRequestRef op)
+{
+  dout(20) << __func__ << ": blocking object " << oid
+          << " on primary repair" << dendl;
+  waiting_for_clean_to_primary_repair.push_back(op);
+  op->mark_delayed("waiting for clean to repair");
+}
+
 void PrimaryLogPG::block_write_on_snap_rollback(
   const hobject_t& oid, ObjectContextRef obc, OpRequestRef op)
 {
@@ -687,11 +721,11 @@ void PrimaryLogPG::wait_for_blocked_object(const hobject_t& soid, OpRequestRef o
 
 void PrimaryLogPG::maybe_force_recovery()
 {
-  // no force if not in degraded/recovery/backfill stats
+  // no force if not in degraded/recovery/backfill states
   if (!is_degraded() &&
       !state_test(PG_STATE_RECOVERING |
                   PG_STATE_RECOVERY_WAIT |
-                 PG_STATE_BACKFILL |
+                 PG_STATE_BACKFILLING |
                  PG_STATE_BACKFILL_WAIT |
                  PG_STATE_BACKFILL_TOOFULL))
     return;
@@ -901,7 +935,7 @@ int PrimaryLogPG::do_command(
   ConnectionRef con,
   ceph_tid_t tid)
 {
-  const pg_missing_t &missing = pg_log.get_missing();
+  const auto &missing = pg_log.get_missing();
   string prefix;
   string format;
 
@@ -914,6 +948,7 @@ int PrimaryLogPG::do_command(
     f->open_object_section("pg");
     f->dump_string("state", pg_state_string(get_state()));
     f->dump_stream("snap_trimq") << snap_trimq;
+    f->dump_unsigned("snap_trimq_len", snap_trimq.size());
     f->dump_unsigned("epoch", get_osdmap()->get_epoch());
     f->open_array_section("up");
     for (vector<int>::iterator p = up.begin(); p != up.end(); ++p)
@@ -1225,6 +1260,9 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
          if (candidate.get_namespace() == cct->_conf->osd_hit_set_namespace)
            continue;
 
+         if (missing_loc.is_deleted(candidate))
+           continue;
+
          // skip wrong namespace
          if (m->get_hobj().nspace != librados::all_nspaces &&
                candidate.get_namespace() != m->get_hobj().nspace)
@@ -1381,6 +1419,9 @@ void PrimaryLogPG::do_pg_op(OpRequestRef op)
          if (candidate.get_namespace() != m->get_hobj().nspace)
            continue;
 
+         if (missing_loc.is_deleted(candidate))
+           continue;
+
          if (filter && !pgls_filter(filter, candidate, filter_out))
            continue;
 
@@ -1521,7 +1562,7 @@ void PrimaryLogPG::calc_trim_to()
   if (is_degraded() ||
       state_test(PG_STATE_RECOVERING |
                 PG_STATE_RECOVERY_WAIT |
-                PG_STATE_BACKFILL |
+                PG_STATE_BACKFILLING |
                 PG_STATE_BACKFILL_WAIT |
                 PG_STATE_BACKFILL_TOOFULL)) {
     target = cct->_conf->osd_max_pg_log_entries;
@@ -1623,6 +1664,7 @@ void PrimaryLogPG::do_request(
             << ", queue on waiting_for_map " << op->get_source() << dendl;
     waiting_for_map[op->get_source()].push_back(op);
     op->mark_delayed("op must wait for map");
+    osd->request_osdmap_update(op->min_epoch);
     return;
   }
 
@@ -1668,15 +1710,6 @@ void PrimaryLogPG::do_request(
     }
   }
 
-  if (flushes_in_progress > 0) {
-    dout(20) << flushes_in_progress
-            << " flushes_in_progress pending "
-            << "waiting for active on " << op << dendl;
-    waiting_for_peered.push_back(op);
-    op->mark_delayed("waiting for peered");
-    return;
-  }
-
   if (!is_peered()) {
     // Delay unless PGBackend says it's ok
     if (pgbackend->can_handle_while_inactive(op)) {
@@ -1690,6 +1723,15 @@ void PrimaryLogPG::do_request(
     }
   }
 
+  if (flushes_in_progress > 0) {
+    dout(20) << flushes_in_progress
+            << " flushes_in_progress pending "
+            << "waiting for flush on " << op << dendl;
+    waiting_for_flush.push_back(op);
+    op->mark_delayed("waiting for flush");
+    return;
+  }
+
   assert(is_peered() && flushes_in_progress == 0);
   if (pgbackend->handle_message(op))
     return;
@@ -1877,15 +1919,15 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     }
   }
 
-  if (op->includes_pg_op()) {
-    return do_pg_op(op);
-  }
-
   if (!op_has_sufficient_caps(op)) {
     osd->reply_op_error(op, -EPERM);
     return;
   }
 
+  if (op->includes_pg_op()) {
+    return do_pg_op(op);
+  }
+
   // object name too long?
   if (m->get_oid().name.size() > cct->_conf->osd_max_object_name_len) {
     dout(4) << "do_op name is longer than "
@@ -1989,6 +2031,10 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
 
   // missing object?
   if (is_unreadable_object(head)) {
+    if (!is_primary()) {
+      osd->reply_op_error(op, -EAGAIN);
+      return;
+    }
     if (can_backoff &&
        (g_conf->osd_backoff_on_degraded ||
         (g_conf->osd_backoff_on_unfound && missing_loc.is_unfound(head)))) {
@@ -2004,6 +2050,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
   if (write_ordered && is_degraded_or_backfilling_object(head)) {
     if (can_backoff && g_conf->osd_backoff_on_degraded) {
       add_backoff(session, head, head);
+      maybe_kick_recovery(head);
     } else {
       wait_for_degraded_object(head, op);
     }
@@ -2150,6 +2197,13 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
       return;
   }
 
+  if (obc.get() && obc->obs.exists && obc->obs.oi.has_manifest()) {
+    if (maybe_handle_manifest(op,
+                              write_ordered,
+                              obc))
+    return;
+  }
+
   if (maybe_handle_cache(op,
                         write_ordered,
                         obc,
@@ -2166,9 +2220,9 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
       fill_in_copy_get_noent(op, oid, m->ops[0]);
       return;
     }
-    dout(20) << __func__ << "find_object_context got error " << r << dendl;
+    dout(20) << __func__ << "find_object_context got error " << r << dendl;
     if (op->may_write() &&
-       get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
       record_write_error(op, oid, nullptr, r);
     } else {
       osd->reply_op_error(op, r);
@@ -2207,7 +2261,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     }
   }
 
-  OpContext *ctx = new OpContext(op, m->get_reqid(), m->ops, obc, this);
+  OpContext *ctx = new OpContext(op, m->get_reqid(), &m->ops, obc, this);
 
   if (!obc->obs.exists)
     ctx->snapset_obc = get_object_context(obc->obs.oi.soid.get_snapdir(), false);
@@ -2245,7 +2299,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     dout(20) << __func__ << " returned an error: " << r << dendl;
     close_op_ctx(ctx);
     if (op->may_write() &&
-       get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
       record_write_error(op, oid, nullptr, r);
     } else {
       osd->reply_op_error(op, r);
@@ -2297,14 +2351,61 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
   maybe_force_recovery();
 }
 
+PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
+  OpRequestRef op,
+  bool write_ordered,
+  ObjectContextRef obc)
+{
+  if (static_cast<const MOSDOp *>(op->get_req())->get_flags() &
+      CEPH_OSD_FLAG_IGNORE_REDIRECT) {
+    dout(20) << __func__ << ": ignoring redirect due to flag" << dendl;
+    return cache_result_t::NOOP;
+  }
+
+  if (obc)
+    dout(10) << __func__ << " " << obc->obs.oi << " "
+       << (obc->obs.exists ? "exists" : "DNE")
+       << dendl;
+
+  // if it is write-ordered and blocked, stop now
+  if (obc.get() && obc->is_blocked() && write_ordered) {
+    // we're already doing something with this object
+    dout(20) << __func__ << " blocked on " << obc->obs.oi.soid << dendl;
+    return cache_result_t::NOOP;
+  }
+
+  vector<OSDOp> ops = static_cast<const MOSDOp*>(op->get_req())->ops;
+  for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p) {
+    OSDOp& osd_op = *p;
+    ceph_osd_op& op = osd_op.op;
+    if (op.op == CEPH_OSD_OP_SET_REDIRECT) {
+      return cache_result_t::NOOP;
+    }
+  }
+
+  switch (obc->obs.oi.manifest.type) {
+  case object_manifest_t::TYPE_REDIRECT:
+    if (op->may_write() || write_ordered) {
+      do_proxy_write(op, obc->obs.oi.soid, obc);
+    } else {
+      do_proxy_read(op, obc);
+    }
+    return cache_result_t::HANDLED_PROXY;
+  case object_manifest_t::TYPE_CHUNKED:
+  default:
+    assert(0 == "unrecognized manifest type");
+  }
+
+  return cache_result_t::NOOP;
+}
+
 void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
                                      MOSDOpReply *orig_reply, int r)
 {
   dout(20) << __func__ << " r=" << r << dendl;
   assert(op->may_write());
   const osd_reqid_t &reqid = static_cast<const MOSDOp*>(op->get_req())->get_reqid();
-  ObjectContextRef obc;
-  mempool::osd::list<pg_log_entry_t> entries;
+  mempool::osd_pglog::list<pg_log_entry_t> entries;
   entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR, soid,
                                   get_next_version(), eversion_t(), 0,
                                   reqid, utime_t(), r));
@@ -2355,6 +2456,10 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_cache_detail(
   bool in_hit_set,
   ObjectContextRef *promote_obc)
 {
+  // return quickly if caching is not enabled
+  if (pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE)
+    return cache_result_t::NOOP;
+
   if (op &&
       op->get_req() &&
       op->get_req()->get_type() == CEPH_MSG_OSD_OP &&
@@ -2363,9 +2468,6 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_cache_detail(
     dout(20) << __func__ << ": ignoring cache due to flag" << dendl;
     return cache_result_t::NOOP;
   }
-  // return quickly if caching is not enabled
-  if (pool.info.cache_mode == pg_pool_t::CACHEMODE_NONE)
-    return cache_result_t::NOOP;
 
   must_promote = must_promote || op->need_promote();
 
@@ -2399,6 +2501,11 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_cache_detail(
     osd->logger->inc(l_osd_op_cache_hit);
     return cache_result_t::NOOP;
   }
+  if (!is_primary()) {
+    dout(20) << __func__ << " cache miss; ask the primary" << dendl;
+    osd->reply_op_error(op, -EAGAIN);
+    return cache_result_t::REPLIED_WITH_EAGAIN;
+  }
 
   if (missing_oid == hobject_t() && obc.get()) {
     missing_oid = obc->obs.oi.soid;
@@ -2657,15 +2764,30 @@ struct C_ProxyRead : public Context {
   }
 };
 
-void PrimaryLogPG::do_proxy_read(OpRequestRef op)
+void PrimaryLogPG::do_proxy_read(OpRequestRef op, ObjectContextRef obc)
 {
   // NOTE: non-const here because the ProxyReadOp needs mutable refs to
   // stash the result in the request's OSDOp vector
   MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
-  object_locator_t oloc(m->get_object_locator());
-  oloc.pool = pool.info.tier_of;
-
-  const hobject_t& soid = m->get_hobj();
+  object_locator_t oloc;
+  hobject_t soid;
+  /* extensible tier */
+  if (obc && obc->obs.exists && obc->obs.oi.has_manifest()) {
+    switch (obc->obs.oi.manifest.type) {
+      case object_manifest_t::TYPE_REDIRECT:
+         oloc = object_locator_t(obc->obs.oi.manifest.redirect_target);
+         soid = obc->obs.oi.manifest.redirect_target;  
+         break;
+      case object_manifest_t::TYPE_CHUNKED:
+      default:
+       assert(0 == "unrecognized manifest type");
+    }
+  } else {
+  /* proxy */
+    soid = m->get_hobj();
+    oloc = object_locator_t(m->get_object_locator());
+    oloc.pool = pool.info.tier_of;
+  }
   unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY;
 
   // pass through some original flags that make sense.
@@ -2693,6 +2815,7 @@ void PrimaryLogPG::do_proxy_read(OpRequestRef op)
        case CEPH_OSD_OP_SYNC_READ:
        case CEPH_OSD_OP_SPARSE_READ:
        case CEPH_OSD_OP_CHECKSUM:
+       case CEPH_OSD_OP_CMPEXT:
          op.flags = (op.flags | CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL) &
                       ~(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_FADVISE_NOCACHE);
       }
@@ -2756,7 +2879,7 @@ void PrimaryLogPG::finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r)
   osd->logger->inc(l_osd_tier_proxy_read);
 
   const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
-  OpContext *ctx = new OpContext(op, m->get_reqid(), prdop->ops, this);
+  OpContext *ctx = new OpContext(op, m->get_reqid(), &prdop->ops, this);
   ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
   ctx->user_at_version = prdop->user_version;
   ctx->data_off = prdop->data_offset;
@@ -2849,20 +2972,39 @@ struct C_ProxyWrite_Commit : public Context {
   }
 };
 
-void PrimaryLogPG::do_proxy_write(OpRequestRef op, const hobject_t& missing_oid)
+void PrimaryLogPG::do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc)
 {
   // NOTE: non-const because ProxyWriteOp takes a mutable ref
   MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
-  object_locator_t oloc(m->get_object_locator());
-  oloc.pool = pool.info.tier_of;
+  object_locator_t oloc;
   SnapContext snapc(m->get_snap_seq(), m->get_snaps());
+  hobject_t soid;
+  /* extensible tier */
+  if (obc && obc->obs.exists && obc->obs.oi.has_manifest()) {
+    switch (obc->obs.oi.manifest.type) {
+      case object_manifest_t::TYPE_REDIRECT:
+         oloc = object_locator_t(obc->obs.oi.manifest.redirect_target);
+         soid = obc->obs.oi.manifest.redirect_target;  
+         break;
+      case object_manifest_t::TYPE_CHUNKED:
+      default:
+       assert(0 == "unrecognized manifest type");
+    }
+  } else {
+  /* proxy */
+    soid = m->get_hobj();
+    oloc = object_locator_t(m->get_object_locator());
+    oloc.pool = pool.info.tier_of;
+  }
 
-  const hobject_t& soid = m->get_hobj();
   unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY;
+  if (!(op->may_write() || op->may_cache())) {
+    flags |= CEPH_OSD_FLAG_RWORDERED;
+  }
   dout(10) << __func__ << " Start proxy write for " << *m << dendl;
 
   ProxyWriteOpRef pwop(std::make_shared<ProxyWriteOp>(op, soid, m->ops, m->get_reqid()));
-  pwop->ctx = new OpContext(op, m->get_reqid(), pwop->ops, this);
+  pwop->ctx = new OpContext(op, m->get_reqid(), &pwop->ops, this);
   pwop->mtime = m->get_mtime();
 
   ObjectOperation obj_op;
@@ -3071,13 +3213,13 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
     ctx->at_version = get_next_version();
     ctx->mtime = m->get_mtime();
 
-    dout(10) << __func__ << " " << soid << " " << ctx->ops
+    dout(10) << __func__ << " " << soid << " " << *ctx->ops
             << " ov " << obc->obs.oi.version << " av " << ctx->at_version 
             << " snapc " << ctx->snapc
             << " snapset " << obc->ssc->snapset
             << dendl;  
   } else {
-    dout(10) << __func__ << " " << soid << " " << ctx->ops
+    dout(10) << __func__ << " " << soid << " " << *ctx->ops
             << " ov " << obc->obs.oi.version
             << dendl;  
   }
@@ -3114,8 +3256,13 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
     obc->ondisk_read_unlock();
   }
 
-  if (result == -EINPROGRESS) {
+  bool pending_async_reads = !ctx->pending_async_reads.empty();
+  if (result == -EINPROGRESS || pending_async_reads) {
     // come back later.
+    if (pending_async_reads) {
+      in_progress_async_reads.push_back(make_pair(op, ctx));
+      ctx->start_async_reads(this);
+    }
     return;
   }
 
@@ -3150,13 +3297,7 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
     if (result >= 0)
       do_osd_op_effects(ctx, m->get_connection());
 
-    if (ctx->pending_async_reads.empty()) {
-      complete_read_ctx(result, ctx);
-    } else {
-      in_progress_async_reads.push_back(make_pair(op, ctx));
-      ctx->start_async_reads(this);
-    }
-
+    complete_read_ctx(result, ctx);
     return;
   }
 
@@ -3195,8 +3336,8 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
     // save just what we need from ctx
     MOSDOpReply *reply = ctx->reply;
     ctx->reply = nullptr;
-    reply->claim_op_out_data(ctx->ops);
-    reply->get_header().data_off = ctx->data_off;
+    reply->claim_op_out_data(*ctx->ops);
+    reply->get_header().data_off = (ctx->data_off ? *ctx->data_off : 0);
     close_op_ctx(ctx);
 
     if (result == -ENOENT) {
@@ -3255,6 +3396,18 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
   repop->put();
 }
 
+void PrimaryLogPG::close_op_ctx(OpContext *ctx) {
+  release_object_locks(ctx->lock_manager);
+
+  ctx->op_t.reset();
+
+  for (auto p = ctx->on_finish.begin(); p != ctx->on_finish.end();
+       ctx->on_finish.erase(p++)) {
+    (*p)();
+  }
+  delete ctx;
+}
+
 void PrimaryLogPG::reply_ctx(OpContext *ctx, int r)
 {
   if (ctx->op)
@@ -3524,29 +3677,34 @@ void PrimaryLogPG::do_backfill_remove(OpRequestRef op)
   assert(r == 0);
 }
 
-PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
-  bool first, const hobject_t &coid)
+int PrimaryLogPG::trim_object(
+  bool first, const hobject_t &coid, PrimaryLogPG::OpContextUPtr *ctxp)
 {
+  *ctxp = NULL;
   // load clone info
   bufferlist bl;
   ObjectContextRef obc = get_object_context(coid, false, NULL);
-  if (!obc) {
-    derr << __func__ << " could not find coid " << coid << dendl;
-    ceph_abort();
+  if (!obc || !obc->ssc || !obc->ssc->exists) {
+    osd->clog->error() << __func__ << ": Can not trim " << coid
+      << " repair needed " << (obc ? "(no obc->ssc or !exists)" : "(no obc)");
+    return -ENOENT;
   }
-  assert(obc->ssc);
 
   hobject_t snapoid(
     coid.oid, coid.get_key(),
     obc->ssc->snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.get_hash(),
     info.pgid.pool(), coid.get_namespace());
   ObjectContextRef snapset_obc = get_object_context(snapoid, false);
-  assert(snapset_obc);
+  if (!snapset_obc) {
+    osd->clog->error() << __func__ << ": Can not trim " << coid
+      << " repair needed, no snapset obc for " << snapoid;
+    return -ENOENT;
+  }
 
   SnapSet& snapset = obc->ssc->snapset;
 
   bool legacy = snapset.is_legacy() ||
-    !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+    get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
 
   object_info_t &coi = obc->obs.oi;
   set<snapid_t> old_snaps;
@@ -3555,23 +3713,23 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
   } else {
     auto p = snapset.clone_snaps.find(coid.snap);
     if (p == snapset.clone_snaps.end()) {
-      osd->clog->error() << __func__ << " No clone_snaps in snapset " << snapset
-                        << " for " << coid << "\n";
-      return NULL;
+      osd->clog->error() << "No clone_snaps in snapset " << snapset
+                        << " for object " << coid << "\n";
+      return -ENOENT;
     }
     old_snaps.insert(snapset.clone_snaps[coid.snap].begin(),
                     snapset.clone_snaps[coid.snap].end());
   }
   if (old_snaps.empty()) {
-    osd->clog->error() << __func__ << " No object info snaps for " << coid;
-    return NULL;
+    osd->clog->error() << "No object info snaps for object " << coid;
+    return -ENOENT;
   }
 
   dout(10) << coid << " old_snaps " << old_snaps
           << " old snapset " << snapset << dendl;
   if (snapset.seq == 0) {
-    osd->clog->error() << __func__ << " No snapset.seq for " << coid;
-    return NULL;
+    osd->clog->error() << "No snapset.seq for object " << coid;
+    return -ENOENT;
   }
 
   set<snapid_t> new_snaps;
@@ -3587,8 +3745,8 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
   if (new_snaps.empty()) {
     p = std::find(snapset.clones.begin(), snapset.clones.end(), coid.snap);
     if (p == snapset.clones.end()) {
-      osd->clog->error() << __func__ << " Snap " << coid.snap << " not in clones";
-      return NULL;
+      osd->clog->error() << "Snap " << coid.snap << " not in clones";
+      return -ENOENT;
     }
   }
 
@@ -3601,7 +3759,7 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
        first)) {
     close_op_ctx(ctx.release());
     dout(10) << __func__ << ": Unable to get a wlock on " << coid << dendl;
-    return NULL;
+    return -ENOLCK;
   }
 
   if (!ctx->lock_manager.get_snaptrimmer_write(
@@ -3610,7 +3768,7 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
        first)) {
     close_op_ctx(ctx.release());
     dout(10) << __func__ << ": Unable to get a wlock on " << snapoid << dendl;
-    return NULL;
+    return -ENOLCK;
   }
 
   ctx->at_version = get_next_version();
@@ -3679,6 +3837,9 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
       coid,
       old_snaps,
       new_snaps);
+
+    coi = object_info_t(coid);
+
     ctx->at_version.version++;
   } else {
     // save adjusted snaps for this object
@@ -3747,20 +3908,19 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
       ctx->delta_stats.num_objects--;
       if (oi.is_dirty()) {
        ctx->delta_stats.num_objects_dirty--;
-       oi.clear_flag(object_info_t::FLAG_DIRTY);
       }
       if (oi.is_omap())
        ctx->delta_stats.num_objects_omap--;
       if (oi.is_whiteout()) {
        dout(20) << __func__ << " trimming whiteout on " << oi.soid << dendl;
        ctx->delta_stats.num_whiteouts--;
-       oi.clear_flag(object_info_t::FLAG_WHITEOUT);
       }
-      if (oi.is_cache_pinned())
+      if (oi.is_cache_pinned()) {
        ctx->delta_stats.num_objects_pinned--;
+      }
     }
     ctx->snapset_obc->obs.exists = false;
-    
+    ctx->snapset_obc->obs.oi = object_info_t(snapoid);
     t->remove(snapoid);
   } else {
     dout(10) << coid << " filtering snapset on " << snapoid << dendl;
@@ -3795,7 +3955,8 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
     t->setattrs(snapoid, attrs);
   }
 
-  return ctx;
+  *ctxp = std::move(ctx);
+  return 0;
 }
 
 void PrimaryLogPG::kick_snap_trim()
@@ -3884,37 +4045,6 @@ int PrimaryLogPG::do_xattr_cmp_str(int op, string& v1s, bufferlist& xattr)
   }
 }
 
-int PrimaryLogPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
-{
-  ceph_osd_op& op = osd_op.op;
-  vector<OSDOp> read_ops(1);
-  OSDOp& read_op = read_ops[0];
-  int result = 0;
-
-  read_op.op.op = CEPH_OSD_OP_SYNC_READ;
-  read_op.op.extent.offset = op.extent.offset;
-  read_op.op.extent.length = op.extent.length;
-  read_op.op.extent.truncate_seq = op.extent.truncate_seq;
-  read_op.op.extent.truncate_size = op.extent.truncate_size;
-
-  result = do_osd_ops(ctx, read_ops);
-  if (result < 0) {
-    derr << "do_extent_cmp do_osd_ops failed " << result << dendl;
-    return result;
-  }
-
-  if (read_op.outdata.length() != osd_op.indata.length())
-    return -EINVAL;
-
-  for (uint64_t p = 0; p < osd_op.indata.length(); p++) {
-    if (read_op.outdata[p] != osd_op.indata[p]) {
-      return (-MAX_ERRNO - p);
-    }
-  }
-
-  return result;
-}
-
 int PrimaryLogPG::do_writesame(OpContext *ctx, OSDOp& osd_op)
 {
   ceph_osd_op& op = osd_op.op;
@@ -4259,10 +4389,13 @@ struct FillInVerifyExtent : public Context {
     r(r), rval(rv), outdatap(blp), maybe_crc(mc),
     size(size), osd(osd), soid(soid), flags(flags) {}
   void finish(int len) override {
-    *rval = len;
     *r = len;
-    if (len < 0)
+    if (len < 0) {
+      *rval = len;
       return;
+    }
+    *rval = 0;
+
     // whole object?  can we verify the checksum?
     if (maybe_crc && *r == size) {
       uint32_t crc = outdatap->crc32c(-1);
@@ -4280,19 +4413,25 @@ struct FillInVerifyExtent : public Context {
 };
 
 struct ToSparseReadResult : public Context {
-  bufferlist& data_bl;
+  int* result;
+  bufferlist* data_bl;
   uint64_t data_offset;
-  ceph_le64& len;
-  ToSparseReadResult(bufferlist& bl, uint64_t offset, ceph_le64& len):
-    data_bl(bl), data_offset(offset),len(len) {}
+  ceph_le64* len;
+  ToSparseReadResult(int* result, bufferlist* bl, uint64_t offset,
+                    ceph_le64* len)
+    : result(result), data_bl(bl), data_offset(offset),len(len) {}
   void finish(int r) override {
-    if (r < 0) return;
-    len = r;
+    if (r < 0) {
+      *result = r;
+      return;
+    }
+    *result = 0;
+    *len = r;
     bufferlist outdata;
     map<uint64_t, uint64_t> extents = {{data_offset, r}};
     ::encode(extents, outdata);
-    ::encode_destructively(data_bl, outdata);
-    data_bl.swap(outdata);
+    ::encode_destructively(*data_bl, outdata);
+    data_bl->swap(outdata);
   }
 };
 
@@ -4339,6 +4478,17 @@ void PrimaryLogPG::maybe_create_new_object(
   }
 }
 
+struct ReadFinisher : public PrimaryLogPG::OpFinisher {
+  OSDOp& osd_op;
+
+  ReadFinisher(OSDOp& osd_op) : osd_op(osd_op) {
+  }
+
+  int execute() override {
+    return osd_op.rval;
+  }
+};
+
 struct C_ChecksumRead : public Context {
   PrimaryLogPG *primary_log_pg;
   OSDOp &osd_op;
@@ -4358,21 +4508,24 @@ struct C_ChecksumRead : public Context {
                                             &read_bl, maybe_crc, size,
                                             osd, soid, flags)) {
   }
+  ~C_ChecksumRead() override {
+    delete fill_extent_ctx;
+  }
 
   void finish(int r) override {
     fill_extent_ctx->complete(r);
+    fill_extent_ctx = nullptr;
 
     if (osd_op.rval >= 0) {
       bufferlist::iterator init_value_bl_it = init_value_bl.begin();
       osd_op.rval = primary_log_pg->finish_checksum(osd_op, csum_type,
-                                                   &init_value_bl_it,
-                                                   read_bl);
+                                                   &init_value_bl_it, read_bl);
     }
   }
 };
 
 int PrimaryLogPG::do_checksum(OpContext *ctx, OSDOp& osd_op,
-                             bufferlist::iterator *bl_it, bool *async_read)
+                             bufferlist::iterator *bl_it)
 {
   dout(20) << __func__ << dendl;
 
@@ -4439,17 +4592,18 @@ int PrimaryLogPG::do_checksum(OpContext *ctx, OSDOp& osd_op,
     auto checksum_ctx = new C_ChecksumRead(this, osd_op, csum_type,
                                           std::move(init_value_bl), maybe_crc,
                                           oi.size, osd, soid, op.flags);
+
     ctx->pending_async_reads.push_back({
       {op.checksum.offset, op.checksum.length, op.flags},
       {&checksum_ctx->read_bl, checksum_ctx}});
 
     dout(10) << __func__ << ": async_read noted for " << soid << dendl;
-    *async_read = true;
-    return 0;
+    ctx->op_finishers[ctx->current_osd_subop_num].reset(
+      new ReadFinisher(osd_op));
+    return -EINPROGRESS;
   }
 
   // sync read
-  *async_read = false;
   std::vector<OSDOp> read_ops(1);
   auto& read_op = read_ops[0];
   if (op.checksum.length > 0) {
@@ -4537,6 +4691,349 @@ int PrimaryLogPG::finish_checksum(OSDOp& osd_op,
   return 0;
 }
 
+struct C_ExtentCmpRead : public Context {
+  PrimaryLogPG *primary_log_pg;
+  OSDOp &osd_op;
+  ceph_le64 read_length;
+  bufferlist read_bl;
+  Context *fill_extent_ctx;
+
+  C_ExtentCmpRead(PrimaryLogPG *primary_log_pg, OSDOp &osd_op,
+                 boost::optional<uint32_t> maybe_crc, uint64_t size,
+                 OSDService *osd, hobject_t soid, __le32 flags)
+    : primary_log_pg(primary_log_pg), osd_op(osd_op),
+      fill_extent_ctx(new FillInVerifyExtent(&read_length, &osd_op.rval,
+                                            &read_bl, maybe_crc, size,
+                                            osd, soid, flags)) {
+  }
+  ~C_ExtentCmpRead() override {
+    delete fill_extent_ctx;
+  }
+
+  void finish(int r) override {
+    if (r == -ENOENT) {
+      osd_op.rval = 0;
+      read_bl.clear();
+      delete fill_extent_ctx;
+    } else {
+      fill_extent_ctx->complete(r);
+    }
+    fill_extent_ctx = nullptr;
+
+    if (osd_op.rval >= 0) {
+      osd_op.rval = primary_log_pg->finish_extent_cmp(osd_op, read_bl);
+    }
+  }
+};
+
+int PrimaryLogPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
+{
+  dout(20) << __func__ << dendl;
+  ceph_osd_op& op = osd_op.op;
+
+  auto& oi = ctx->new_obs.oi;
+  uint64_t size = oi.size;
+  if ((oi.truncate_seq < op.extent.truncate_seq) &&
+      (op.extent.offset + op.extent.length > op.extent.truncate_size)) {
+    size = op.extent.truncate_size;
+  }
+
+  if (op.extent.offset >= size) {
+    op.extent.length = 0;
+  } else if (op.extent.offset + op.extent.length > size) {
+    op.extent.length = size - op.extent.offset;
+  }
+
+  if (op.extent.length == 0) {
+    dout(20) << __func__ << " zero length extent" << dendl;
+    return finish_extent_cmp(osd_op, bufferlist{});
+  } else if (!ctx->obs->exists || ctx->obs->oi.is_whiteout()) {
+    dout(20) << __func__ << " object DNE" << dendl;
+    return finish_extent_cmp(osd_op, {});
+  } else if (pool.info.require_rollback()) {
+    // If there is a data digest and it is possible we are reading
+    // entire object, pass the digest.
+    boost::optional<uint32_t> maybe_crc;
+    if (oi.is_data_digest() && op.checksum.offset == 0 &&
+        op.checksum.length >= oi.size) {
+      maybe_crc = oi.data_digest;
+    }
+
+    // async read
+    auto& soid = oi.soid;
+    auto extent_cmp_ctx = new C_ExtentCmpRead(this, osd_op, maybe_crc, oi.size,
+                                             osd, soid, op.flags);
+    ctx->pending_async_reads.push_back({
+      {op.extent.offset, op.extent.length, op.flags},
+      {&extent_cmp_ctx->read_bl, extent_cmp_ctx}});
+
+    dout(10) << __func__ << ": async_read noted for " << soid << dendl;
+
+    ctx->op_finishers[ctx->current_osd_subop_num].reset(
+      new ReadFinisher(osd_op));
+    return -EINPROGRESS;
+  }
+
+  // sync read
+  vector<OSDOp> read_ops(1);
+  OSDOp& read_op = read_ops[0];
+
+  read_op.op.op = CEPH_OSD_OP_SYNC_READ;
+  read_op.op.extent.offset = op.extent.offset;
+  read_op.op.extent.length = op.extent.length;
+  read_op.op.extent.truncate_seq = op.extent.truncate_seq;
+  read_op.op.extent.truncate_size = op.extent.truncate_size;
+
+  int result = do_osd_ops(ctx, read_ops);
+  if (result < 0) {
+    derr << __func__ << " failed " << result << dendl;
+    return result;
+  }
+  return finish_extent_cmp(osd_op, read_op.outdata);
+}
+
+int PrimaryLogPG::finish_extent_cmp(OSDOp& osd_op, const bufferlist &read_bl)
+{
+  for (uint64_t idx = 0; idx < osd_op.indata.length(); ++idx) {
+    char read_byte = (idx < read_bl.length() ? read_bl[idx] : 0);
+    if (osd_op.indata[idx] != read_byte) {
+        return (-MAX_ERRNO - idx);
+    }
+  }
+
+  return 0;
+}
+
+int PrimaryLogPG::do_read(OpContext *ctx, OSDOp& osd_op) {
+  dout(20) << __func__ << dendl;
+  auto& op = osd_op.op;
+  auto& oi = ctx->new_obs.oi;
+  auto& soid = oi.soid;
+  __u32 seq = oi.truncate_seq;
+  uint64_t size = oi.size;
+  bool trimmed_read = false;
+
+  // are we beyond truncate_size?
+  if ( (seq < op.extent.truncate_seq) &&
+       (op.extent.offset + op.extent.length > op.extent.truncate_size) )
+    size = op.extent.truncate_size;
+
+  if (op.extent.length == 0) //length is zero mean read the whole object
+    op.extent.length = size;
+
+  if (op.extent.offset >= size) {
+    op.extent.length = 0;
+    trimmed_read = true;
+  } else if (op.extent.offset + op.extent.length > size) {
+    op.extent.length = size - op.extent.offset;
+    trimmed_read = true;
+  }
+
+  // read into a buffer
+  int result = 0;
+  if (trimmed_read && op.extent.length == 0) {
+    // read size was trimmed to zero and it is expected to do nothing
+    // a read operation of 0 bytes does *not* do nothing, this is why
+    // the trimmed_read boolean is needed
+  } else if (pool.info.require_rollback()) {
+    boost::optional<uint32_t> maybe_crc;
+    // If there is a data digest and it is possible we are reading
+    // entire object, pass the digest.  FillInVerifyExtent will
+    // will check the oi.size again.
+    if (oi.is_data_digest() && op.extent.offset == 0 &&
+        op.extent.length >= oi.size)
+      maybe_crc = oi.data_digest;
+    ctx->pending_async_reads.push_back(
+      make_pair(
+        boost::make_tuple(op.extent.offset, op.extent.length, op.flags),
+        make_pair(&osd_op.outdata,
+                 new FillInVerifyExtent(&op.extent.length, &osd_op.rval,
+                                        &osd_op.outdata, maybe_crc, oi.size,
+                                        osd, soid, op.flags))));
+    dout(10) << " async_read noted for " << soid << dendl;
+
+    ctx->op_finishers[ctx->current_osd_subop_num].reset(
+      new ReadFinisher(osd_op));
+  } else {
+    int r = pgbackend->objects_read_sync(
+      soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata);
+    if (r == -EIO) {
+      r = rep_repair_primary_object(soid, ctx->op);
+    }
+    if (r >= 0)
+      op.extent.length = r;
+    else {
+      result = r;
+      op.extent.length = 0;
+    }
+    dout(10) << " read got " << r << " / " << op.extent.length
+            << " bytes from obj " << soid << dendl;
+
+    // whole object?  can we verify the checksum?
+    if (op.extent.length == oi.size && oi.is_data_digest()) {
+      uint32_t crc = osd_op.outdata.crc32c(-1);
+      if (oi.data_digest != crc) {
+        osd->clog->error() << info.pgid << std::hex
+                          << " full-object read crc 0x" << crc
+                          << " != expected 0x" << oi.data_digest
+                          << std::dec << " on " << soid;
+        // FIXME fall back to replica or something?
+        result = -EIO;
+      }
+    }
+  }
+
+  // XXX the op.extent.length is the requested length for async read
+  // On error this length is changed to 0 after the error comes back.
+  ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
+  ctx->delta_stats.num_rd++;
+  return result;
+}
+
+int PrimaryLogPG::do_sparse_read(OpContext *ctx, OSDOp& osd_op) {
+  dout(20) << __func__ << dendl;
+  auto& op = osd_op.op;
+  auto& oi = ctx->new_obs.oi;
+  auto& soid = oi.soid;
+
+  if (op.extent.truncate_seq) {
+    dout(0) << "sparse_read does not support truncation sequence " << dendl;
+    return -EINVAL;
+  }
+
+  ++ctx->num_read;
+  if (pool.info.ec_pool()) {
+    // translate sparse read to a normal one if not supported
+    uint64_t offset = op.extent.offset;
+    uint64_t length = op.extent.length;
+    if (offset > oi.size) {
+      length = 0;
+    } else if (offset + length > oi.size) {
+      length = oi.size - offset;
+    }
+
+    if (length > 0) {
+      ctx->pending_async_reads.push_back(
+        make_pair(
+          boost::make_tuple(offset, length, op.flags),
+          make_pair(
+           &osd_op.outdata,
+           new ToSparseReadResult(&osd_op.rval, &osd_op.outdata, offset,
+                                  &op.extent.length))));
+      dout(10) << " async_read (was sparse_read) noted for " << soid << dendl;
+
+      ctx->op_finishers[ctx->current_osd_subop_num].reset(
+        new ReadFinisher(osd_op));
+    } else {
+      dout(10) << " sparse read ended up empty for " << soid << dendl;
+      map<uint64_t, uint64_t> extents;
+      ::encode(extents, osd_op.outdata);
+    }
+  } else {
+    // read into a buffer
+    map<uint64_t, uint64_t> m;
+    uint32_t total_read = 0;
+    int r = osd->store->fiemap(ch, ghobject_t(soid, ghobject_t::NO_GEN,
+                                             info.pgid.shard),
+                              op.extent.offset, op.extent.length, m);
+    if (r < 0)  {
+      return r;
+    }
+
+    map<uint64_t, uint64_t>::iterator miter;
+    bufferlist data_bl;
+    uint64_t last = op.extent.offset;
+    for (miter = m.begin(); miter != m.end(); ++miter) {
+      // verify hole?
+      if (cct->_conf->osd_verify_sparse_read_holes &&
+          last < miter->first) {
+        bufferlist t;
+        uint64_t len = miter->first - last;
+        r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
+        if (r < 0) {
+          osd->clog->error() << coll << " " << soid
+                            << " sparse-read failed to read: "
+                            << r;
+        } else if (!t.is_zero()) {
+          osd->clog->error() << coll << " " << soid
+                            << " sparse-read found data in hole "
+                            << last << "~" << len;
+        }
+      }
+
+      bufferlist tmpbl;
+      r = pgbackend->objects_read_sync(soid, miter->first, miter->second,
+                                      op.flags, &tmpbl);
+      if (r == -EIO) {
+        r = rep_repair_primary_object(soid, ctx->op);
+      }
+      if (r < 0) {
+       return r;
+      }
+
+      // this is usually happen when we get extent that exceeds the actual file
+      // size
+      if (r < (int)miter->second)
+        miter->second = r;
+      total_read += r;
+      dout(10) << "sparse-read " << miter->first << "@" << miter->second
+              << dendl;
+      data_bl.claim_append(tmpbl);
+      last = miter->first + r;
+    }
+
+    if (r < 0) {
+      return r;
+    }
+
+    // verify trailing hole?
+    if (cct->_conf->osd_verify_sparse_read_holes) {
+      uint64_t end = MIN(op.extent.offset + op.extent.length, oi.size);
+      if (last < end) {
+        bufferlist t;
+        uint64_t len = end - last;
+        r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
+        if (r < 0) {
+          osd->clog->error() << coll << " " << soid
+                            << " sparse-read failed to read: " << r;
+        } else if (!t.is_zero()) {
+          osd->clog->error() << coll << " " << soid
+                            << " sparse-read found data in hole "
+                            << last << "~" << len;
+        }
+      }
+    }
+
+    // Why SPARSE_READ need checksum? In fact, librbd always use sparse-read.
+    // Maybe at first, there is no much whole objects. With continued use, more
+    // and more whole object exist. So from this point, for spare-read add
+    // checksum make sense.
+    if (total_read == oi.size && oi.is_data_digest()) {
+      uint32_t crc = data_bl.crc32c(-1);
+      if (oi.data_digest != crc) {
+        osd->clog->error() << info.pgid << std::hex
+          << " full-object read crc 0x" << crc
+          << " != expected 0x" << oi.data_digest
+          << std::dec << " on " << soid;
+        // FIXME fall back to replica or something?
+        return -EIO;
+      }
+    }
+
+    op.extent.length = total_read;
+
+    ::encode(m, osd_op.outdata); // re-encode since it might be modified
+    ::encode_destructively(data_bl, osd_op.outdata);
+
+    dout(10) << " sparse_read got " << total_read << " bytes from object "
+            << soid << dendl;
+  }
+
+  ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
+  ctx->delta_stats.num_rd++;
+  return 0;
+}
+
 int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 {
   int result = 0;
@@ -4545,16 +5042,23 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
   object_info_t& oi = obs.oi;
   const hobject_t& soid = oi.soid;
 
-  bool first_read = true;
-
   PGTransaction* t = ctx->op_t.get();
 
   dout(10) << "do_osd_op " << soid << " " << ops << dendl;
 
-  for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p, ctx->current_osd_subop_num++) {
+  ctx->current_osd_subop_num = 0;
+  for (auto p = ops.begin(); p != ops.end(); ++p, ctx->current_osd_subop_num++, ctx->processed_subop_count++) {
     OSDOp& osd_op = *p;
     ceph_osd_op& op = osd_op.op;
 
+    OpFinisher* op_finisher = nullptr;
+    {
+      auto op_finisher_it = ctx->op_finishers.find(ctx->current_osd_subop_num);
+      if (op_finisher_it != ctx->op_finishers.end()) {
+        op_finisher = op_finisher_it->second.get();
+      }
+    }
+
     // TODO: check endianness (__le32 vs uint32_t, etc.)
     // The fields in ceph_osd_op are little-endian (according to the definition in rados.h),
     // but the code in this function seems to treat them as native-endian.  What should the
@@ -4576,6 +5080,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
     case CEPH_OSD_OP_COPY_FROM:  // we handle user_version update explicitly
     case CEPH_OSD_OP_CACHE_PIN:
     case CEPH_OSD_OP_CACHE_UNPIN:
+    case CEPH_OSD_OP_SET_REDIRECT:
       break;
     default:
       if (op.op & CEPH_OSD_OP_MODE_WR)
@@ -4612,8 +5117,16 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_CMPEXT:
       ++ctx->num_read;
-      tracepoint(osd, do_osd_op_pre_extent_cmp, soid.oid.name.c_str(), soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
-      result = do_extent_cmp(ctx, osd_op);
+      tracepoint(osd, do_osd_op_pre_extent_cmp, soid.oid.name.c_str(),
+                soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset,
+                op.extent.length, op.extent.truncate_size,
+                op.extent.truncate_seq);
+
+      if (op_finisher == nullptr) {
+       result = do_extent_cmp(ctx, osd_op);
+      } else {
+       result = op_finisher->execute();
+      }
       break;
 
     case CEPH_OSD_OP_SYNC_READ:
@@ -4621,91 +5134,20 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        result = -EOPNOTSUPP;
        break;
       }
-      // fall through
-    case CEPH_OSD_OP_READ:
-      ++ctx->num_read;
-      {
-       __u32 seq = oi.truncate_seq;
-       uint64_t size = oi.size;
-       tracepoint(osd, do_osd_op_pre_read, soid.oid.name.c_str(), soid.snap.val, size, seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
-       bool trimmed_read = false;
-       // are we beyond truncate_size?
-       if ( (seq < op.extent.truncate_seq) &&
-            (op.extent.offset + op.extent.length > op.extent.truncate_size) )
-         size = op.extent.truncate_size;
-
-       if (op.extent.length == 0) //length is zero mean read the whole object
-         op.extent.length = size;
-
-       if (op.extent.offset >= size) {
-         op.extent.length = 0;
-         trimmed_read = true;
-       } else if (op.extent.offset + op.extent.length > size) {
-         op.extent.length = size - op.extent.offset;
-         trimmed_read = true;
-       }
-
-       // read into a buffer
-       bool async = false;
-       if (trimmed_read && op.extent.length == 0) {
-         // read size was trimmed to zero and it is expected to do nothing
-         // a read operation of 0 bytes does *not* do nothing, this is why
-         // the trimmed_read boolean is needed
-       } else if (pool.info.require_rollback()) {
-         async = true;
-         boost::optional<uint32_t> maybe_crc;
-         // If there is a data digest and it is possible we are reading
-         // entire object, pass the digest.  FillInVerifyExtent will
-         // will check the oi.size again.
-         if (oi.is_data_digest() && op.extent.offset == 0 &&
-             op.extent.length >= oi.size)
-           maybe_crc = oi.data_digest;
-         ctx->pending_async_reads.push_back(
-           make_pair(
-             boost::make_tuple(op.extent.offset, op.extent.length, op.flags),
-             make_pair(&osd_op.outdata,
-                       new FillInVerifyExtent(&op.extent.length, &osd_op.rval,
-                               &osd_op.outdata, maybe_crc, oi.size, osd,
-                               soid, op.flags))));
-         dout(10) << " async_read noted for " << soid << dendl;
-       } else {
-         int r = pgbackend->objects_read_sync(
-           soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata);
-         if (r >= 0)
-           op.extent.length = r;
-         else {
-           result = r;
-           op.extent.length = 0;
-         }
-         dout(10) << " read got " << r << " / " << op.extent.length
-                  << " bytes from obj " << soid << dendl;
-
-         // whole object?  can we verify the checksum?
-         if (op.extent.length == oi.size && oi.is_data_digest()) {
-           uint32_t crc = osd_op.outdata.crc32c(-1);
-           if (oi.data_digest != crc) {
-             osd->clog->error() << info.pgid << std::hex
-                                << " full-object read crc 0x" << crc
-                                << " != expected 0x" << oi.data_digest
-                                << std::dec << " on " << soid;
-             // FIXME fall back to replica or something?
-             result = -EIO;
-           }
-         }
-       }
-       if (first_read) {
-         first_read = false;
+      // fall through
+    case CEPH_OSD_OP_READ:
+      ++ctx->num_read;
+      tracepoint(osd, do_osd_op_pre_read, soid.oid.name.c_str(),
+                soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset,
+                op.extent.length, op.extent.truncate_size,
+                op.extent.truncate_seq);
+      if (op_finisher == nullptr) {
+       if (!ctx->data_off) {
          ctx->data_off = op.extent.offset;
        }
-       // XXX the op.extent.length is the requested length for async read
-       // On error this length is changed to 0 after the error comes back.
-       ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
-       ctx->delta_stats.num_rd++;
-
-       // Skip checking the result and just proceed to the next operation
-       if (async)
-         continue;
-
+       result = do_read(ctx, osd_op);
+      } else {
+       result = op_finisher->execute();
       }
       break;
 
@@ -4717,10 +5159,10 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
                   op.checksum.offset, op.checksum.length,
                   op.checksum.chunk_size);
 
-       bool async_read;
-       result = do_checksum(ctx, osd_op, &bp, &async_read);
-       if (result == 0 && async_read) {
-         continue;
+       if (op_finisher == nullptr) {
+         result = do_checksum(ctx, osd_op, &bp);
+       } else {
+         result = op_finisher->execute();
        }
       }
       break;
@@ -4751,131 +5193,15 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     /* map extents */
     case CEPH_OSD_OP_SPARSE_READ:
-      tracepoint(osd, do_osd_op_pre_sparse_read, soid.oid.name.c_str(), soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset, op.extent.length, op.extent.truncate_size, op.extent.truncate_seq);
-      if (op.extent.truncate_seq) {
-       dout(0) << "sparse_read does not support truncation sequence " << dendl;
-       result = -EINVAL;
-       break;
-      }
-      ++ctx->num_read;
-      if (pool.info.ec_pool()) {
-       // translate sparse read to a normal one if not supported
-       uint64_t offset = op.extent.offset;
-       uint64_t length = op.extent.length;
-       if (offset > oi.size) {
-         length = 0;
-       } else if (offset + length > oi.size) {
-         length = oi.size - offset;
-       }
-       if (length > 0) {
-         ctx->pending_async_reads.push_back(
-           make_pair(
-             boost::make_tuple(offset, length, op.flags),
-             make_pair(
-               &osd_op.outdata,
-               new ToSparseReadResult(
-                 osd_op.outdata, offset,
-                 op.extent.length /* updated by the callback */))));
-         dout(10) << " async_read (was sparse_read) noted for " << soid << dendl;
-       } else {
-         dout(10) << " sparse read ended up empty for " << soid << dendl;
-         map<uint64_t, uint64_t> extents;
-         ::encode(extents, osd_op.outdata);
-       }
+      tracepoint(osd, do_osd_op_pre_sparse_read, soid.oid.name.c_str(),
+                soid.snap.val, oi.size, oi.truncate_seq, op.extent.offset,
+                op.extent.length, op.extent.truncate_size,
+                op.extent.truncate_seq);
+      if (op_finisher == nullptr) {
+       result = do_sparse_read(ctx, osd_op);
       } else {
-       // read into a buffer
-        map<uint64_t, uint64_t> m;
-        uint32_t total_read = 0;
-       int r = osd->store->fiemap(ch, ghobject_t(soid, ghobject_t::NO_GEN,
-                                                 info.pgid.shard),
-                                  op.extent.offset, op.extent.length, m);
-       if (r < 0)  {
-         result = r;
-          break;
-       }
-        map<uint64_t, uint64_t>::iterator miter;
-        bufferlist data_bl;
-       uint64_t last = op.extent.offset;
-        for (miter = m.begin(); miter != m.end(); ++miter) {
-         // verify hole?
-         if (cct->_conf->osd_verify_sparse_read_holes &&
-             last < miter->first) {
-           bufferlist t;
-           uint64_t len = miter->first - last;
-           r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
-           if (r < 0) {
-             osd->clog->error() << coll << " " << soid
-                                << " sparse-read failed to read: "
-                                << r;
-           } else if (!t.is_zero()) {
-             osd->clog->error() << coll << " " << soid << " sparse-read found data in hole "
-                                << last << "~" << len;
-           }
-         }
-
-          bufferlist tmpbl;
-         r = pgbackend->objects_read_sync(soid, miter->first, miter->second, op.flags, &tmpbl);
-          if (r < 0) {
-            result = r;
-            break;
-          }
-
-          if (r < (int)miter->second) /* this is usually happen when we get extent that exceeds the actual file size */
-            miter->second = r;
-          total_read += r;
-          dout(10) << "sparse-read " << miter->first << "@" << miter->second << dendl;
-         data_bl.claim_append(tmpbl);
-         last = miter->first + r;
-        }
-        
-        if (r < 0) {
-          result = r;
-          break;
-        }
-
-       // verify trailing hole?
-       if (cct->_conf->osd_verify_sparse_read_holes) {
-         uint64_t end = MIN(op.extent.offset + op.extent.length, oi.size);
-         if (last < end) {
-           bufferlist t;
-           uint64_t len = end - last;
-           r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
-           if (r < 0) {
-             osd->clog->error() << coll << " " << soid
-                                << " sparse-read failed to read: "
-                                << r;
-           } else if (!t.is_zero()) {
-             osd->clog->error() << coll << " " << soid << " sparse-read found data in hole "
-                                << last << "~" << len;
-           }
-         }
-       }
-
-       // Why SPARSE_READ need checksum? In fact, librbd always use sparse-read. 
-       // Maybe at first, there is no much whole objects. With continued use, more and more whole object exist.
-       // So from this point, for spare-read add checksum make sense.
-       if (total_read == oi.size && oi.is_data_digest()) {
-         uint32_t crc = data_bl.crc32c(-1);
-         if (oi.data_digest != crc) {
-           osd->clog->error() << info.pgid << std::hex
-             << " full-object read crc 0x" << crc
-             << " != expected 0x" << oi.data_digest
-             << std::dec << " on " << soid;
-           // FIXME fall back to replica or something?
-           result = -EIO;
-           break;
-         }
-       }
-
-        op.extent.length = total_read;
-
-        ::encode(m, osd_op.outdata); // re-encode since it might be modified
-        ::encode_destructively(data_bl, osd_op.outdata);
-
-       dout(10) << " sparse_read got " << total_read << " bytes from object " << soid << dendl;
+       result = op_finisher->execute();
       }
-      ctx->delta_stats.num_rd_kb += SHIFT_ROUND_UP(op.extent.length, 10);
-      ctx->delta_stats.num_rd++;
       break;
 
     case CEPH_OSD_OP_CALL:
@@ -5122,8 +5448,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        map<string, bufferlist> out;
        result = getattrs_maybe_cache(
          ctx->obc,
-         &out,
-         true);
+         &out);
         
         bufferlist bl;
         ::encode(out, bl);
@@ -5831,6 +6156,74 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       break;
 
+    case CEPH_OSD_OP_SET_REDIRECT:
+      ++ctx->num_write;
+      {
+       if (pool.info.is_tier()) {
+         result = -EINVAL;
+         break;
+       }
+       if (!obs.exists) {
+         result = -ENOENT;
+         break;
+       }
+       if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+         result = -EOPNOTSUPP;
+         break;
+       }
+
+       object_t target_name;
+       object_locator_t target_oloc;
+       snapid_t target_snapid = (uint64_t)op.copy_from.snapid;
+       version_t target_version = op.copy_from.src_version;
+       try {
+         ::decode(target_name, bp);
+         ::decode(target_oloc, bp);
+       }
+       catch (buffer::error& e) {
+         result = -EINVAL;
+         goto fail;
+       }
+       pg_t raw_pg;
+       get_osdmap()->object_locator_to_pg(target_name, target_oloc, raw_pg);
+       hobject_t target(target_name, target_oloc.key, target_snapid,
+               raw_pg.ps(), raw_pg.pool(),
+               target_oloc.nspace);
+       if (target == soid) {
+         dout(20) << " set-redirect self is invalid" << dendl;
+         result = -EINVAL;
+         break;
+       }
+        oi.set_flag(object_info_t::FLAG_MANIFEST);
+       oi.manifest.redirect_target = target;
+       oi.manifest.type = object_manifest_t::TYPE_REDIRECT;
+       t->truncate(soid, 0);
+       if (oi.is_omap() && pool.info.supports_omap()) {
+         t->omap_clear(soid);
+         obs.oi.clear_omap_digest();
+         obs.oi.clear_flag(object_info_t::FLAG_OMAP);
+       }
+       ctx->delta_stats.num_bytes -= oi.size;
+       oi.size = 0;
+       oi.new_object();
+       oi.user_version = target_version;
+       ctx->user_at_version = target_version;
+       /* rm_attrs */
+       map<string,bufferlist> rmattrs;
+       result = getattrs_maybe_cache(ctx->obc,
+                   &rmattrs);
+       if (result < 0) {
+         return result;
+       }
+       map<string, bufferlist>::iterator iter;
+       for (iter = rmattrs.begin(); iter != rmattrs.end(); ++iter) {
+         const string& name = iter->first;
+         t->rmattr(soid, name);
+       }
+       dout(10) << "set-redirect oid:" << oi.soid << " user_version: " << oi.user_version << dendl;
+      }
+
+      break;
 
       // -- object attrs --
       
@@ -6315,8 +6708,13 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     case CEPH_OSD_OP_COPY_GET:
       ++ctx->num_read;
-      tracepoint(osd, do_osd_op_pre_copy_get, soid.oid.name.c_str(), soid.snap.val);
-      result = fill_in_copy_get(ctx, bp, osd_op, ctx->obc);
+      tracepoint(osd, do_osd_op_pre_copy_get, soid.oid.name.c_str(),
+                soid.snap.val);
+      if (op_finisher == nullptr) {
+       result = do_copy_get(ctx, bp, osd_op, ctx->obc);
+      } else {
+       result = op_finisher->execute();
+      }
       break;
 
     case CEPH_OSD_OP_COPY_FROM:
@@ -6356,7 +6754,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
                   src_oloc.hash,
                   src_snapid,
                   src_version);
-       if (!ctx->copy_cb) {
+       if (op_finisher == nullptr) {
          // start
          pg_t raw_pg;
          get_osdmap()->object_locator_to_pg(src_name, src_oloc, raw_pg);
@@ -6368,8 +6766,9 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            result = -EINVAL;
            break;
          }
-         CopyFromCallback *cb = new CopyFromCallback(ctx);
-         ctx->copy_cb = cb;
+         CopyFromCallback *cb = new CopyFromCallback(ctx, osd_op);
+          ctx->op_finishers[ctx->current_osd_subop_num].reset(
+            new CopyFromFinisher(cb));
          start_copy(cb, ctx->obc, src, src_oloc, src_version,
                     op.copy_from.flags,
                     false,
@@ -6378,9 +6777,11 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
          result = -EINPROGRESS;
        } else {
          // finish
-         assert(ctx->copy_cb->get_result() >= 0);
-         finish_copyfrom(ctx);
-         result = 0;
+         result = op_finisher->execute();
+         assert(result == 0);
+
+          // COPY_FROM cannot be executed multiple times -- it must restart
+          ctx->op_finishers.erase(ctx->current_osd_subop_num);
        }
       }
       break;
@@ -6476,7 +6877,7 @@ inline int PrimaryLogPG::_delete_oid(
     whiteout = true;
   }
   bool legacy;
-  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+  if (get_osdmap()->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
     legacy = false;
     // in luminous or later, we can't delete the head if there are
     // clones. we trust the caller passing no_whiteout has already
@@ -6493,7 +6894,7 @@ inline int PrimaryLogPG::_delete_oid(
       }
     }
   } else {
-    legacy = false;
+    legacy = true;
   }
   dout(20) << __func__ << " " << soid << " whiteout=" << (int)whiteout
           << " no_whiteout=" << (int)no_whiteout
@@ -6578,24 +6979,34 @@ int PrimaryLogPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
     &rollback_to, false, false, &missing_oid);
   if (ret == -EAGAIN) {
     /* clone must be missing */
-    assert(is_missing_object(missing_oid));
-    dout(20) << "_rollback_to attempted to roll back to a missing object "
+    assert(is_degraded_or_backfilling_object(missing_oid));
+    dout(20) << "_rollback_to attempted to roll back to a missing or backfilling clone "
             << missing_oid << " (requested snapid: ) " << snapid << dendl;
     block_write_on_degraded_snap(missing_oid, ctx->op);
     return ret;
   }
   {
     ObjectContextRef promote_obc;
-    switch (
-      maybe_handle_cache_detail(
-       ctx->op,
-       true,
-       rollback_to,
-       ret,
-       missing_oid,
-       true,
-       false,
-       &promote_obc)) {
+    cache_result_t tier_mode_result;
+    if (obs.exists && obs.oi.has_manifest()) { 
+      tier_mode_result = 
+       maybe_handle_manifest_detail(
+         ctx->op,
+         true,
+         rollback_to);
+    } else {
+      tier_mode_result = 
+       maybe_handle_cache_detail(
+         ctx->op,
+         true,
+         rollback_to,
+         ret,
+         missing_oid,
+         true,
+         false,
+         &promote_obc);
+    }
+    switch (tier_mode_result) {
     case cache_result_t::NOOP:
       break;
     case cache_result_t::BLOCKED_PROMOTE:
@@ -6605,6 +7016,8 @@ int PrimaryLogPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
     case cache_result_t::BLOCKED_FULL:
       block_write_on_full_cache(soid, ctx->op);
       return -EAGAIN;
+    case cache_result_t::REPLIED_WITH_EAGAIN:
+      assert(0 == "this can't happen, no rollback on replica");
     default:
       assert(0 == "must promote was set, other values are not valid");
       return -EAGAIN;
@@ -6807,7 +7220,7 @@ void PrimaryLogPG::make_writeable(OpContext *ctx)
     snap_oi->copy_user_bits(ctx->obs->oi);
 
     bool legacy = ctx->new_snapset.is_legacy() ||
-      !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+      get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
     if (legacy) {
       snap_oi->legacy_snaps = snaps;
     }
@@ -6868,7 +7281,7 @@ void PrimaryLogPG::make_writeable(OpContext *ctx)
   // update snapset with latest snap context
   ctx->new_snapset.seq = snapc.seq;
   ctx->new_snapset.snaps = snapc.snaps;
-  if (!get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+  if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
     // pessimistic assumption that this is a net-new legacy SnapSet
     ctx->delta_stats.num_legacy_snapsets++;
     ctx->new_snapset.head_exists = ctx->new_obs.exists;
@@ -7042,8 +7455,8 @@ hobject_t PrimaryLogPG::get_temp_recovery_object(
 
 int PrimaryLogPG::prepare_transaction(OpContext *ctx)
 {
-  assert(!ctx->ops.empty());
-  
+  assert(!ctx->ops->empty());
+
   const hobject_t& soid = ctx->obs->oi.soid;
 
   // valid snap context?
@@ -7053,10 +7466,10 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx)
   }
 
   // prepare the actual mutation
-  int result = do_osd_ops(ctx, ctx->ops);
+  int result = do_osd_ops(ctx, *ctx->ops);
   if (result < 0) {
     if (ctx->op->may_write() &&
-       get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
       // need to save the error code in the pg log, to detect dup ops,
       // but do nothing else
       ctx->update_log_only = true;
@@ -7068,7 +7481,7 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx)
   if (ctx->op_t->empty() && !ctx->modify) {
     unstable_stats.add(ctx->delta_stats);
     if (ctx->op->may_write() &&
-       get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
       ctx->update_log_only = true;
     }
     return result;
@@ -7147,7 +7560,7 @@ void PrimaryLogPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
                        info.pgid.pool(), soid.get_namespace());
       dout(10) << " final snapset " << ctx->new_snapset
               << " in " << snapoid << dendl;
-      assert(!get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS));
+      assert(get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS);
       ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, snapoid,
                                        ctx->at_version,
                                        eversion_t(),
@@ -7235,7 +7648,7 @@ void PrimaryLogPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
   }
 
   bool legacy_snapset = ctx->new_snapset.is_legacy() ||
-    !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+    get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
 
   // append to log
   ctx->log.push_back(pg_log_entry_t(log_op_type, soid, ctx->at_version,
@@ -7315,16 +7728,16 @@ void PrimaryLogPG::complete_read_ctx(int result, OpContext *ctx)
   const MOSDOp *m = static_cast<const MOSDOp*>(ctx->op->get_req());
   assert(ctx->async_reads_complete());
 
-  for (vector<OSDOp>::iterator p = ctx->ops.begin();
-    p != ctx->ops.end() && result >= 0; ++p) {
+  for (vector<OSDOp>::iterator p = ctx->ops->begin();
+    p != ctx->ops->end() && result >= 0; ++p) {
     if (p->rval < 0 && !(p->op.flags & CEPH_OSD_OP_FLAG_FAILOK)) {
       result = p->rval;
       break;
     }
     ctx->bytes_read += p->outdata.length();
   }
-  ctx->reply->claim_op_out_data(ctx->ops);
-  ctx->reply->get_header().data_off = ctx->data_off;
+  ctx->reply->claim_op_out_data(*ctx->ops);
+  ctx->reply->get_header().data_off = (ctx->data_off ? *ctx->data_off : 0);
 
   MOSDOpReply *reply = ctx->reply;
   ctx->reply = nullptr;
@@ -7385,6 +7798,11 @@ struct C_CopyFrom_AsyncReadCb : public Context {
   C_CopyFrom_AsyncReadCb(OSDOp *osd_op, uint64_t features) :
     osd_op(osd_op), features(features), len(0) {}
   void finish(int r) override {
+    osd_op->rval = r;
+    if (r < 0) {
+      return;
+    }
+
     assert(len > 0);
     assert(len <= reply_obj.data.length());
     bufferlist bl;
@@ -7394,11 +7812,8 @@ struct C_CopyFrom_AsyncReadCb : public Context {
   }
 };
 
-int PrimaryLogPG::fill_in_copy_get(
-  OpContext *ctx,
-  bufferlist::iterator& bp,
-  OSDOp& osd_op,
-  ObjectContextRef &obc)
+int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::iterator& bp,
+                             OSDOp& osd_op, ObjectContextRef &obc)
 {
   object_info_t& oi = obc->obs.oi;
   hobject_t& soid = oi.soid;
@@ -7455,8 +7870,7 @@ int PrimaryLogPG::fill_in_copy_get(
   if (!cursor.attr_complete) {
     result = getattrs_maybe_cache(
       ctx->obc,
-      &out_attrs,
-      true);
+      &out_attrs);
     if (result < 0) {
       if (cb) {
         delete cb;
@@ -7480,17 +7894,21 @@ int PrimaryLogPG::fill_in_copy_get(
          make_pair(
            boost::make_tuple(cursor.data_offset, max_read, osd_op.op.flags),
            make_pair(&bl, cb)));
-        result = max_read;
-       cb->len = result;
+       cb->len = max_read;
+
+        ctx->op_finishers[ctx->current_osd_subop_num].reset(
+          new ReadFinisher(osd_op));
+       result = -EINPROGRESS;
+
+       dout(10) << __func__ << ": async_read noted for " << soid << dendl;
       } else {
        result = pgbackend->objects_read_sync(
-         oi.soid, cursor.data_offset, left, osd_op.op.flags, &bl);
+         oi.soid, cursor.data_offset, max_read, osd_op.op.flags, &bl);
        if (result < 0)
          return result;
       }
-      assert(result <= left);
-      left -= result;
-      cursor.data_offset += result;
+      left -= max_read;
+      cursor.data_offset += max_read;
     }
     if (cursor.data_offset == oi.size) {
       cursor.data_complete = true;
@@ -7558,7 +7976,10 @@ int PrimaryLogPG::fill_in_copy_get(
   if (cb && !async_read_started) {
     delete cb;
   }
-  result = 0;
+
+  if (result > 0) {
+    result = 0;
+  }
   return result;
 }
 
@@ -7956,12 +8377,12 @@ void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t)
   cop->temp_cursor = cop->cursor;
 }
 
-void PrimaryLogPG::finish_copyfrom(OpContext *ctx)
+void PrimaryLogPG::finish_copyfrom(CopyFromCallback *cb)
 {
+  OpContext *ctx = cb->ctx;
   dout(20) << "finish_copyfrom on " << ctx->obs->oi.soid << dendl;
-  ObjectState& obs = ctx->new_obs;
-  CopyFromCallback *cb = static_cast<CopyFromCallback*>(ctx->copy_cb);
 
+  ObjectState& obs = ctx->new_obs;
   if (obs.exists) {
     dout(20) << __func__ << ": exists, removing" << dendl;
     ctx->op_t->remove(obs.oi.soid);
@@ -8147,7 +8568,7 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
   tctx->extra_reqids = results->reqids;
 
   bool legacy_snapset = tctx->new_snapset.is_legacy() ||
-    !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+    get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
 
   if (whiteout) {
     // create a whiteout
@@ -8200,7 +8621,7 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
     assert(tctx->new_obs.oi.soid.snap == CEPH_NOSNAP);
     tctx->new_snapset.from_snap_set(
       results->snapset,
-      !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS));
+      get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS);
   }
   tctx->new_snapset.head_exists = true;
   dout(20) << __func__ << " new_snapset " << tctx->new_snapset << dendl;
@@ -8784,15 +9205,20 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version)
   last_update_applied = applied_version;
   if (is_primary()) {
     if (scrubber.active) {
-      if (last_update_applied == scrubber.subset_last_update) {
-        requeue_scrub();
+      if (last_update_applied >= scrubber.subset_last_update) {
+        if (ops_blocked_by_scrub()) {
+          requeue_scrub(true);
+        } else {
+          requeue_scrub(false);
+        }
+
       }
     } else {
       assert(scrubber.start == scrubber.end);
     }
   } else {
     if (scrubber.active_rep_scrub) {
-      if (last_update_applied == static_cast<const MOSDRepScrub*>(
+      if (last_update_applied >= static_cast<const MOSDRepScrub*>(
            scrubber.active_rep_scrub->get_req())->scrub_to) {
        osd->enqueue_back(
          info.pgid,
@@ -9025,10 +9451,9 @@ void PrimaryLogPG::remove_repop(RepGather *repop)
 PrimaryLogPG::OpContextUPtr PrimaryLogPG::simple_opc_create(ObjectContextRef obc)
 {
   dout(20) << __func__ << " " << obc->obs.oi.soid << dendl;
-  vector<OSDOp> ops;
   ceph_tid_t rep_tid = osd->get_tid();
   osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
-  OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, ops, obc, this));
+  OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, nullptr, obc, this));
   ctx->op_t.reset(new PGTransaction());
   ctx->mtime = ceph_clock_now();
   return ctx;
@@ -9040,12 +9465,13 @@ void PrimaryLogPG::simple_opc_submit(OpContextUPtr ctx)
   dout(20) << __func__ << " " << repop << dendl;
   issue_repop(repop, ctx.get());
   eval_repop(repop);
+  calc_trim_to();
   repop->put();
 }
 
 
 void PrimaryLogPG::submit_log_entries(
-  const mempool::osd::list<pg_log_entry_t> &entries,
+  const mempool::osd_pglog::list<pg_log_entry_t> &entries,
   ObcLockManager &&manager,
   boost::optional<std::function<void(void)> > &&_on_complete,
   OpRequestRef op,
@@ -9062,7 +9488,7 @@ void PrimaryLogPG::submit_log_entries(
 
   boost::intrusive_ptr<RepGather> repop;
   boost::optional<std::function<void(void)> > on_complete;
-  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+  if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
     repop = new_repop(
       version,
       r,
@@ -9088,7 +9514,7 @@ void PrimaryLogPG::submit_log_entries(
        if (peer == pg_whoami) continue;
        assert(peer_missing.count(peer));
        assert(peer_info.count(peer));
-       if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+       if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
          assert(repop);
          MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing(
            entries,
@@ -9112,7 +9538,7 @@ void PrimaryLogPG::submit_log_entries(
            peer.osd, m, get_osdmap()->get_epoch());
        }
       }
-      if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+      if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
        ceph_tid_t rep_tid = repop->rep_tid;
        waiting_on.insert(pg_whoami);
        log_entry_update_waiting_on.insert(
@@ -9406,6 +9832,7 @@ ObjectContextRef PrimaryLogPG::get_object_context(
        object_info_t oi(soid);
        SnapSetContext *ssc = get_snapset_context(
          soid, true, 0, false);
+        assert(ssc);
        obc = create_object_context(oi, ssc);
        dout(10) << __func__ << ": " << obc << " " << soid
                 << " " << obc->rwstate
@@ -9453,7 +9880,13 @@ ObjectContextRef PrimaryLogPG::get_object_context(
     dout(10) << __func__ << ": creating obc from disk: " << obc
             << dendl;
   }
-  assert(obc->ssc);
+
+  // XXX: Caller doesn't expect this
+  if (obc->ssc == NULL) {
+    derr << __func__ << ": obc->ssc not available, not returning context" << dendl;
+    return ObjectContextRef();   // -ENOENT!
+  }
+
   dout(10) << __func__ << ": " << obc << " " << soid
           << " " << obc->rwstate
           << " oi: " << obc->obs.oi
@@ -9674,11 +10107,16 @@ int PrimaryLogPG::find_object_context(const hobject_t& oid,
 
   ObjectContextRef obc = get_object_context(soid, false);
   if (!obc || !obc->obs.exists) {
-    dout(20) << __func__ << " missing clone " << soid << dendl;
     if (pmissing)
       *pmissing = soid;
     put_snapset_context(ssc);
-    return -ENOENT;
+    if (is_degraded_or_backfilling_object(soid)) {
+      dout(20) << __func__ << " clone is degraded or backfilling " << soid << dendl;
+      return -EAGAIN;
+    } else {
+      dout(20) << __func__ << " missing clone " << soid << dendl;
+      return -ENOENT;
+    }
   }
 
   if (!obc->ssc) {
@@ -9830,7 +10268,12 @@ SnapSetContext *PrimaryLogPG::get_snapset_context(
     _register_snapset_context(ssc);
     if (bv.length()) {
       bufferlist::iterator bvp = bv.begin();
-      ssc->snapset.decode(bvp);
+      try {
+       ssc->snapset.decode(bvp);
+      } catch (buffer::error& e) {
+        dout(0) << __func__ << " Can't decode snapset: " << e << dendl;
+       return NULL;
+      }
       ssc->exists = true;
     } else {
       ssc->exists = false;
@@ -9875,6 +10318,40 @@ int PrimaryLogPG::recover_missing(
     return PULL_NONE;
   }
 
+  if (missing_loc.is_deleted(soid)) {
+    start_recovery_op(soid);
+    assert(!recovering.count(soid));
+    recovering.insert(make_pair(soid, ObjectContextRef()));
+    epoch_t cur_epoch = get_osdmap()->get_epoch();
+    remove_missing_object(soid, v, new FunctionContext(
+     [=](int) {
+       lock();
+       if (!pg_has_reset_since(cur_epoch)) {
+        bool object_missing = false;
+        for (const auto& shard : actingbackfill) {
+          if (shard == pg_whoami)
+            continue;
+          if (peer_missing[shard].is_missing(soid)) {
+            dout(20) << __func__ << ": soid " << soid << " needs to be deleted from replica " << shard << dendl;
+            object_missing = true;
+            break;
+          }
+        }
+        if (!object_missing) {
+          object_stat_sum_t stat_diff;
+          stat_diff.num_objects_recovered = 1;
+          on_global_recover(soid, stat_diff, true);
+        } else {
+          auto recovery_handle = pgbackend->open_recovery_op();
+          pgbackend->recover_delete_object(soid, v, recovery_handle);
+          pgbackend->run_recovery_op(recovery_handle, priority);
+        }
+       }
+       unlock();
+     }));
+    return PULL_YES;
+  }
+
   // is this a snapped object?  if so, consult the snapset.. we may not need the entire object!
   ObjectContextRef obc;
   ObjectContextRef head_obc;
@@ -9924,12 +10401,14 @@ int PrimaryLogPG::recover_missing(
   start_recovery_op(soid);
   assert(!recovering.count(soid));
   recovering.insert(make_pair(soid, obc));
-  pgbackend->recover_object(
+  int r = pgbackend->recover_object(
     soid,
     v,
     head_obc,
     obc,
     h);
+  // This is only a pull which shouldn't return an error
+  assert(r >= 0);
   return PULL_YES;
 }
 
@@ -9952,11 +10431,42 @@ void PrimaryLogPG::send_remove_op(
   osd->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch());
 }
 
+void PrimaryLogPG::remove_missing_object(const hobject_t &soid,
+                                        eversion_t v, Context *on_complete)
+{
+  dout(20) << __func__ << " " << soid << " " << v << dendl;
+  assert(on_complete != nullptr);
+  // delete locally
+  ObjectStore::Transaction t;
+  remove_snap_mapped_object(t, soid);
+
+  ObjectRecoveryInfo recovery_info;
+  recovery_info.soid = soid;
+  recovery_info.version = v;
+
+  epoch_t cur_epoch = get_osdmap()->get_epoch();
+  t.register_on_complete(new FunctionContext(
+     [=](int) {
+       lock();
+       if (!pg_has_reset_since(cur_epoch)) {
+        ObjectStore::Transaction t2;
+        on_local_recover(soid, recovery_info, ObjectContextRef(), true, &t2);
+        t2.register_on_complete(on_complete);
+        int r = osd->store->queue_transaction(osr.get(), std::move(t2), nullptr);
+        assert(r == 0);
+        unlock();
+       } else {
+        unlock();
+        on_complete->complete(-EAGAIN);
+       }
+     }));
+  int r = osd->store->queue_transaction(osr.get(), std::move(t), nullptr);
+  assert(r == 0);
+}
 
 void PrimaryLogPG::finish_degraded_object(const hobject_t& oid)
 {
   dout(10) << "finish_degraded_object " << oid << dendl;
-  ObjectContextRef obc(object_contexts.lookup(oid));
   if (callbacks_for_degraded_object.count(oid)) {
     list<Context*> contexts;
     contexts.swap(callbacks_for_degraded_object[oid]);
@@ -10008,25 +10518,29 @@ void PrimaryLogPG::_committed_pushed_object(
 void PrimaryLogPG::_applied_recovered_object(ObjectContextRef obc)
 {
   lock();
-  dout(10) << "_applied_recovered_object " << *obc << dendl;
-
+  dout(20) << __func__ << dendl;
+  if (obc) {
+    dout(20) << "obc = " << *obc << dendl;
+  }
   assert(active_pushes >= 1);
   --active_pushes;
 
   // requeue an active chunky scrub waiting on recovery ops
   if (!deleting && active_pushes == 0
       && scrubber.is_chunky_scrub_active()) {
-    requeue_scrub();
+    if (ops_blocked_by_scrub()) {
+      requeue_scrub(true);
+    } else {
+      requeue_scrub(false);
+    }
   }
-
   unlock();
 }
 
 void PrimaryLogPG::_applied_recovered_object_replica()
 {
   lock();
-  dout(10) << "_applied_recovered_object_replica" << dendl;
-
+  dout(20) << __func__ << dendl;
   assert(active_pushes >= 1);
   --active_pushes;
 
@@ -10039,7 +10553,6 @@ void PrimaryLogPG::_applied_recovered_object_replica()
       PGQueueable(scrubber.active_rep_scrub, get_osdmap()->get_epoch()));
     scrubber.active_rep_scrub = OpRequestRef();
   }
-
   unlock();
 }
 
@@ -10060,6 +10573,12 @@ void PrimaryLogPG::recover_got(hobject_t oid, eversion_t v)
   }
 }
 
+void PrimaryLogPG::primary_failed(const hobject_t &soid)
+{
+  list<pg_shard_t> fl = { pg_whoami };
+  failed_push(fl, soid);
+}
+
 void PrimaryLogPG::failed_push(const list<pg_shard_t> &from, const hobject_t &soid)
 {
   dout(20) << __func__ << ": " << soid << dendl;
@@ -10109,7 +10628,6 @@ eversion_t PrimaryLogPG::pick_newest_available(const hobject_t& oid)
     if (*i == get_primary()) continue;
     pg_shard_t peer = *i;
     if (!peer_missing[peer].is_missing(oid)) {
-      assert(is_backfill_targets(peer));
       continue;
     }
     eversion_t h = peer_missing[peer].get_items().at(oid).have;
@@ -10149,7 +10667,7 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
       unlock();
     });
 
-  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+  if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
     t.register_on_commit(complete);
   } else {
     /* Hack to work around the fact that ReplicatedBackend sends
@@ -10210,12 +10728,13 @@ void PrimaryLogPG::mark_all_unfound_lost(
   ceph_tid_t tid)
 {
   dout(3) << __func__ << " " << pg_log_entry_t::get_op_name(what) << dendl;
+  list<hobject_t> oids;
 
   dout(30) << __func__ << ": log before:\n";
   pg_log.get_log().print(*_dout);
   *_dout << dendl;
 
-  mempool::osd::list<pg_log_entry_t> log_entries;
+  mempool::osd_pglog::list<pg_log_entry_t> log_entries;
 
   utime_t mtime = ceph_clock_now();
   map<hobject_t, pg_missing_item>::const_iterator m =
@@ -10265,7 +10784,7 @@ void PrimaryLogPG::mark_all_unfound_lost(
       {
        pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, v, m->second.need,
                         0, osd_reqid_t(), mtime, 0);
-       if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+       if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
          if (pool.info.require_rollback()) {
            e.mod_desc.try_rmobject(v.version);
          } else {
@@ -10274,6 +10793,14 @@ void PrimaryLogPG::mark_all_unfound_lost(
        } // otherwise, just do what we used to do
        dout(10) << e << dendl;
        log_entries.push_back(e);
+        oids.push_back(oid);
+
+       // If context found mark object as deleted in case
+       // of racing with new creation.  This can happen if
+       // object lost and EIO at primary.
+       obc = object_contexts.lookup(oid);
+       if (obc)
+         obc->obs.exists = false;
 
        ++v.version;
        ++m;
@@ -10291,14 +10818,33 @@ void PrimaryLogPG::mark_all_unfound_lost(
     log_entries,
     std::move(manager),
     boost::optional<std::function<void(void)> >(
-      [=]() {
-       requeue_ops(waiting_for_all_missing);
-       waiting_for_all_missing.clear();
-       for (auto& p : waiting_for_unreadable_object) {
-         release_backoffs(p.first);
+      [this, oids, con, num_unfound, tid]() {
+       if (perform_deletes_during_peering()) {
+         for (auto oid : oids) {
+           // clear old locations - merge_new_log_entries will have
+           // handled rebuilding missing_loc for each of these
+           // objects if we have the RECOVERY_DELETES flag
+           missing_loc.recovered(oid);
+         }
+       }
+
+       if (is_recovery_unfound()) {
+         queue_peering_event(
+           CephPeeringEvtRef(
+             std::make_shared<CephPeeringEvt>(
+             get_osdmap()->get_epoch(),
+             get_osdmap()->get_epoch(),
+             DoRecovery())));
+       } else if (is_backfill_unfound()) {
+         queue_peering_event(
+           CephPeeringEvtRef(
+             std::make_shared<CephPeeringEvt>(
+             get_osdmap()->get_epoch(),
+             get_osdmap()->get_epoch(),
+             RequestBackfill())));
+       } else {
+         queue_recovery();
        }
-       requeue_object_waiters(waiting_for_unreadable_object);
-       queue_recovery();
 
        stringstream ss;
        ss << "pg has " << num_unfound
@@ -10392,7 +10938,7 @@ void PrimaryLogPG::on_flushed()
   assert(flushes_in_progress > 0);
   flushes_in_progress--;
   if (flushes_in_progress == 0) {
-    requeue_ops(waiting_for_peered);
+    requeue_ops(waiting_for_flush);
   }
   if (!is_peered() || !is_primary()) {
     pair<hobject_t, ObjectContextRef> i;
@@ -10424,6 +10970,18 @@ void PrimaryLogPG::on_removal(ObjectStore::Transaction *t)
     on_shutdown();
 }
 
+void PrimaryLogPG::clear_async_reads()
+{
+  dout(10) << __func__ << dendl;
+  for(auto& i : in_progress_async_reads) {
+    dout(10) << "clear ctx: "
+             << "OpRequestRef " << i.first
+             << " OpContext " << i.second
+             << dendl;
+    close_op_ctx(i.second);
+  }
+}
+
 void PrimaryLogPG::on_shutdown()
 {
   dout(10) << "on_shutdown" << dendl;
@@ -10435,6 +10993,11 @@ void PrimaryLogPG::on_shutdown()
   // handles queue races
   deleting = true;
 
+  if (recovery_queued) {
+    recovery_queued = false;
+    osd->clear_queued_recovery(this);
+  }
+
   clear_scrub_reserved();
   scrub_clear_state();
 
@@ -10444,12 +11007,18 @@ void PrimaryLogPG::on_shutdown()
   cancel_proxy_ops(false);
   apply_and_flush_repops(false);
   cancel_log_updates();
+  // we must remove PGRefs, so do this this prior to release_backoffs() callers
+  clear_backoffs(); 
+  // clean up snap trim references
+  snap_trimmer_machine.process_event(Reset());
 
   pgbackend->on_change();
 
   context_registry_on_change();
   object_contexts.clear();
 
+  clear_async_reads();
+
   osd->remote_reserver.cancel_reservation(info.pgid);
   osd->local_reserver.cancel_reservation(info.pgid);
 
@@ -10478,6 +11047,7 @@ void PrimaryLogPG::on_activate()
          RequestBackfill())));
   } else {
     dout(10) << "activate all replicas clean, no recovery" << dendl;
+    eio_errors_to_process = false;
     queue_peering_event(
       CephPeeringEvtRef(
        std::make_shared<CephPeeringEvt>(
@@ -10509,6 +11079,12 @@ void PrimaryLogPG::on_activate()
 
 void PrimaryLogPG::_on_new_interval()
 {
+  dout(20) << __func__ << " checking missing set deletes flag. missing = " << pg_log.get_missing() << dendl;
+  if (!pg_log.get_missing().may_include_deletes &&
+      get_osdmap()->test_flag(CEPH_OSDMAP_RECOVERY_DELETES)) {
+    pg_log.rebuild_missing_set_with_deletes(osd->store, coll, info);
+  }
+  assert(pg_log.get_missing().may_include_deletes == get_osdmap()->test_flag(CEPH_OSDMAP_RECOVERY_DELETES));
 }
 
 void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
@@ -10528,6 +11104,7 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
   // requeue everything in the reverse order they should be
   // reexamined.
   requeue_ops(waiting_for_peered);
+  requeue_ops(waiting_for_flush);
   requeue_ops(waiting_for_active);
 
   clear_scrub_reserved();
@@ -10576,10 +11153,8 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
 
   if (is_primary()) {
     requeue_ops(waiting_for_cache_not_full);
-    requeue_ops(waiting_for_all_missing);
   } else {
     waiting_for_cache_not_full.clear();
-    waiting_for_all_missing.clear();
   }
   objects_blocked_on_cache_full.clear();
 
@@ -10788,14 +11363,14 @@ bool PrimaryLogPG::start_recovery_ops(
   assert(is_primary());
 
   if (!state_test(PG_STATE_RECOVERING) &&
-      !state_test(PG_STATE_BACKFILL)) {
+      !state_test(PG_STATE_BACKFILLING)) {
     /* TODO: I think this case is broken and will make do_recovery()
      * unhappy since we're returning false */
     dout(10) << "recovery raced and were queued twice, ignoring!" << dendl;
     return false;
   }
 
-  const pg_missing_t &missing = pg_log.get_missing();
+  const auto &missing = pg_log.get_missing();
 
   unsigned int num_missing = missing.num_missing();
   uint64_t num_unfound = get_num_unfound();
@@ -10823,7 +11398,7 @@ bool PrimaryLogPG::start_recovery_ops(
 
   bool deferred_backfill = false;
   if (recovering.empty() &&
-      state_test(PG_STATE_BACKFILL) &&
+      state_test(PG_STATE_BACKFILLING) &&
       !backfill_targets.empty() && started < max &&
       missing.num_missing() == 0 &&
       waiting_on_backfill.empty()) {
@@ -10876,20 +11451,22 @@ bool PrimaryLogPG::start_recovery_ops(
 
   if (missing.num_missing() > 0) {
     // this shouldn't happen!
-    osd->clog->error() << info.pgid << " recovery ending with " << missing.num_missing()
-                      << ": " << missing.get_items();
+    osd->clog->error() << info.pgid << " Unexpected Error: recovery ending with "
+                      << missing.num_missing() << ": " << missing.get_items();
     return work_in_progress;
   }
 
   if (needs_recovery()) {
     // this shouldn't happen!
     // We already checked num_missing() so we must have missing replicas
-    osd->clog->error() << info.pgid << " recovery ending with missing replicas";
+    osd->clog->error() << info.pgid 
+                       << " Unexpected Error: recovery ending with missing replicas";
     return work_in_progress;
   }
 
   if (state_test(PG_STATE_RECOVERING)) {
     state_clear(PG_STATE_RECOVERING);
+    state_clear(PG_STATE_FORCED_RECOVERY);
     if (needs_backfill()) {
       dout(10) << "recovery done, queuing backfill" << dendl;
       queue_peering_event(
@@ -10900,6 +11477,8 @@ bool PrimaryLogPG::start_recovery_ops(
             RequestBackfill())));
     } else {
       dout(10) << "recovery done, no backfill" << dendl;
+      eio_errors_to_process = false;
+      state_clear(PG_STATE_FORCED_BACKFILL);
       queue_peering_event(
         CephPeeringEvtRef(
           std::make_shared<CephPeeringEvt>(
@@ -10908,8 +11487,11 @@ bool PrimaryLogPG::start_recovery_ops(
             AllReplicasRecovered())));
     }
   } else { // backfilling
-    state_clear(PG_STATE_BACKFILL);
+    state_clear(PG_STATE_BACKFILLING);
+    state_clear(PG_STATE_FORCED_BACKFILL);
+    state_clear(PG_STATE_FORCED_RECOVERY);
     dout(10) << "recovery done, backfill done" << dendl;
+    eio_errors_to_process = false;
     queue_peering_event(
       CephPeeringEvtRef(
         std::make_shared<CephPeeringEvt>(
@@ -10929,7 +11511,7 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
 {
   assert(is_primary());
 
-  const pg_missing_t &missing = pg_log.get_missing();
+  const auto &missing = pg_log.get_missing();
 
   dout(10) << "recover_primary recovering " << recovering.size()
           << " in pg" << dendl;
@@ -10951,7 +11533,7 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
 
     if (pg_log.get_log().objects.count(p->second)) {
       latest = pg_log.get_log().objects.find(p->second)->second;
-      assert(latest->is_update());
+      assert(latest->is_update() || latest->is_delete());
       soid = latest->soid;
     } else {
       latest = 0;
@@ -10960,8 +11542,7 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
     const pg_missing_item& item = missing.get_items().find(p->second)->second;
     ++p;
 
-    hobject_t head = soid;
-    head.snap = CEPH_NOSNAP;
+    hobject_t head = soid.get_head();
 
     eversion_t need = item.need;
 
@@ -11081,6 +11662,49 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
   return started;
 }
 
+bool PrimaryLogPG::primary_error(
+  const hobject_t& soid, eversion_t v)
+{
+  pg_log.missing_add(soid, v, eversion_t());
+  pg_log.set_last_requested(0);
+  missing_loc.remove_location(soid, pg_whoami);
+  bool uhoh = true;
+  assert(!actingbackfill.empty());
+  for (set<pg_shard_t>::iterator i = actingbackfill.begin();
+       i != actingbackfill.end();
+       ++i) {
+    if (*i == get_primary()) continue;
+    pg_shard_t peer = *i;
+    if (!peer_missing[peer].is_missing(soid, v)) {
+      missing_loc.add_location(soid, peer);
+      dout(10) << info.pgid << " unexpectedly missing " << soid << " v" << v
+              << ", there should be a copy on shard " << peer << dendl;
+      uhoh = false;
+    }
+  }
+  if (uhoh)
+    osd->clog->error() << info.pgid << " missing primary copy of " << soid << ", unfound";
+  else
+    osd->clog->error() << info.pgid << " missing primary copy of " << soid
+                        << ", will try copies on " << missing_loc.get_locations(soid);
+  return uhoh;
+}
+
+int PrimaryLogPG::prep_object_replica_deletes(
+  const hobject_t& soid, eversion_t v,
+  PGBackend::RecoveryHandle *h)
+{
+  assert(is_primary());
+  dout(10) << __func__ << ": on " << soid << dendl;
+
+  start_recovery_op(soid);
+  assert(!recovering.count(soid));
+  recovering.insert(make_pair(soid, ObjectContextRef()));
+
+  pgbackend->recover_delete_object(soid, v, h);
+  return 1;
+}
+
 int PrimaryLogPG::prep_object_replica_pushes(
   const hobject_t& soid, eversion_t v,
   PGBackend::RecoveryHandle *h)
@@ -11091,27 +11715,7 @@ int PrimaryLogPG::prep_object_replica_pushes(
   // NOTE: we know we will get a valid oloc off of disk here.
   ObjectContextRef obc = get_object_context(soid, false);
   if (!obc) {
-    pg_log.missing_add(soid, v, eversion_t());
-    missing_loc.remove_location(soid, pg_whoami);
-    bool uhoh = true;
-    assert(!actingbackfill.empty());
-    for (set<pg_shard_t>::iterator i = actingbackfill.begin();
-        i != actingbackfill.end();
-        ++i) {
-      if (*i == get_primary()) continue;
-      pg_shard_t peer = *i;
-      if (!peer_missing[peer].is_missing(soid, v)) {
-       missing_loc.add_location(soid, peer);
-       dout(10) << info.pgid << " unexpectedly missing " << soid << " v" << v
-                << ", there should be a copy on shard " << peer << dendl;
-       uhoh = false;
-      }
-    }
-    if (uhoh)
-      osd->clog->error() << info.pgid << " missing primary copy of " << soid << ", unfound";
-    else
-      osd->clog->error() << info.pgid << " missing primary copy of " << soid
-                        << ", will try copies on " << missing_loc.get_locations(soid);
+    primary_error(soid, v);
     return 0;
   }
 
@@ -11134,13 +11738,19 @@ int PrimaryLogPG::prep_object_replica_pushes(
    * In almost all cases, therefore, this lock should be uncontended.
    */
   obc->ondisk_read_lock();
-  pgbackend->recover_object(
+  int r = pgbackend->recover_object(
     soid,
     v,
     ObjectContextRef(),
     obc, // has snapset context
     h);
   obc->ondisk_read_unlock();
+  if (r < 0) {
+    dout(0) << __func__ << " Error " << r << " on oid " << soid << dendl;
+    primary_failed(soid);
+    primary_error(soid, v);
+    return 0;
+  }
   return 1;
 }
 
@@ -11175,8 +11785,14 @@ uint64_t PrimaryLogPG::recover_replicas(uint64_t max, ThreadPool::TPHandle &hand
       handle.reset_tp_timeout();
       const hobject_t soid(p->second);
 
+      if (missing_loc.is_unfound(soid)) {
+       dout(10) << __func__ << ": " << soid << " still unfound" << dendl;
+       continue;
+      }
+
       if (soid > pi->second.last_backfill) {
        if (!recovering.count(soid)) {
+          derr << __func__ << ": object " << soid << " last_backfill " << pi->second.last_backfill << dendl;
          derr << __func__ << ": object added to missing set for backfill, but "
               << "is not in recovering, error!" << dendl;
          ceph_abort();
@@ -11189,8 +11805,10 @@ uint64_t PrimaryLogPG::recover_replicas(uint64_t max, ThreadPool::TPHandle &hand
        continue;
       }
 
-      if (missing_loc.is_unfound(soid)) {
-       dout(10) << __func__ << ": " << soid << " still unfound" << dendl;
+      if (missing_loc.is_deleted(soid)) {
+       dout(10) << __func__ << ": " << soid << " is a delete, removing" << dendl;
+       map<hobject_t,pg_missing_item>::const_iterator r = m.get_items().find(soid);
+       started += prep_object_replica_deletes(soid, r->second.need, h);
        continue;
       }
 
@@ -11331,8 +11949,6 @@ uint64_t PrimaryLogPG::recover_backfill(
   update_range(&backfill_info, handle);
 
   unsigned ops = 0;
-  vector<boost::tuple<hobject_t, eversion_t,
-                      ObjectContextRef, vector<pg_shard_t> > > to_push;
   vector<boost::tuple<hobject_t, eversion_t, pg_shard_t> > to_remove;
   set<hobject_t> add_to_stat;
 
@@ -11344,6 +11960,7 @@ uint64_t PrimaryLogPG::recover_backfill(
   }
   backfill_info.trim_to(last_backfill_started);
 
+  PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
   while (ops < max) {
     if (backfill_info.begin <= earliest_peer_backfill() &&
        !backfill_info.extends_to_end() && backfill_info.empty()) {
@@ -11523,10 +12140,13 @@ uint64_t PrimaryLogPG::recover_backfill(
          vector<pg_shard_t> all_push = need_ver_targs;
          all_push.insert(all_push.end(), missing_targs.begin(), missing_targs.end());
 
-         to_push.push_back(
-           boost::tuple<hobject_t, eversion_t, ObjectContextRef, vector<pg_shard_t> >
-           (backfill_info.begin, obj_v, obc, all_push));
-         // Count all simultaneous pushes of the same object as a single op
+         handle.reset_tp_timeout();
+         int r = prep_backfill_object_push(backfill_info.begin, obj_v, obc, all_push, h);
+         if (r < 0) {
+           *work_started = true;
+           dout(0) << __func__ << " Error " << r << " trying to backfill " << backfill_info.begin << dendl;
+           break;
+         }
          ops++;
        } else {
          *work_started = true;
@@ -11607,12 +12227,6 @@ uint64_t PrimaryLogPG::recover_backfill(
     }
   }
 
-  PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
-  for (unsigned i = 0; i < to_push.size(); ++i) {
-    handle.reset_tp_timeout();
-    prep_backfill_object_push(to_push[i].get<0>(), to_push[i].get<1>(),
-           to_push[i].get<2>(), to_push[i].get<3>(), h);
-  }
   pgbackend->run_recovery_op(h, get_recovery_op_priority());
 
   dout(5) << "backfill_pos is " << backfill_pos << dendl;
@@ -11703,20 +12317,20 @@ uint64_t PrimaryLogPG::recover_backfill(
   return ops;
 }
 
-void PrimaryLogPG::prep_backfill_object_push(
+int PrimaryLogPG::prep_backfill_object_push(
   hobject_t oid, eversion_t v,
   ObjectContextRef obc,
   vector<pg_shard_t> peers,
   PGBackend::RecoveryHandle *h)
 {
-  dout(10) << "push_backfill_object " << oid << " v " << v << " to peers " << peers << dendl;
+  dout(10) << __func__ << " " << oid << " v " << v << " to peers " << peers << dendl;
   assert(!peers.empty());
 
   backfills_in_flight.insert(oid);
   for (unsigned int i = 0 ; i < peers.size(); ++i) {
     map<pg_shard_t, pg_missing_t>::iterator bpm = peer_missing.find(peers[i]);
     assert(bpm != peer_missing.end());
-    bpm->second.add(oid, eversion_t(), eversion_t());
+    bpm->second.add(oid, eversion_t(), eversion_t(), false);
   }
 
   assert(!recovering.count(oid));
@@ -11726,13 +12340,21 @@ void PrimaryLogPG::prep_backfill_object_push(
 
   // We need to take the read_lock here in order to flush in-progress writes
   obc->ondisk_read_lock();
-  pgbackend->recover_object(
+  int r = pgbackend->recover_object(
     oid,
     v,
     ObjectContextRef(),
     obc,
     h);
   obc->ondisk_read_unlock();
+  if (r < 0) {
+    dout(0) << __func__ << " Error " << r << " on oid " << oid << dendl;
+    primary_failed(oid);
+    primary_error(oid, v);
+    backfills_in_flight.erase(oid);
+    missing_loc.add_missing(oid, v, eversion_t());
+  }
+  return r;
 }
 
 void PrimaryLogPG::update_range(
@@ -11868,7 +12490,7 @@ void PrimaryLogPG::check_local()
       continue;
     did.insert(p->soid);
 
-    if (p->is_delete()) {
+    if (p->is_delete() && !is_missing_object(p->soid)) {
       dout(10) << " checking " << p->soid
               << " at " << p->version << dendl;
       struct stat st;
@@ -12880,6 +13502,7 @@ bool PrimaryLogPG::agent_choose_mode(bool restart, OpRequestRef op)
        is_active()) {
       if (op)
        requeue_op(op);
+      requeue_ops(waiting_for_flush);
       requeue_ops(waiting_for_active);
       requeue_ops(waiting_for_scrub);
       requeue_ops(waiting_for_cache_not_full);
@@ -13074,7 +13697,8 @@ unsigned PrimaryLogPG::process_clones_to(const boost::optional<hobject_t> &head,
     if (!allow_incomplete_clones) {
       next_clone.snap = **curclone;
       clog->error() << mode << " " << pgid << " " << head.get()
-                        << " expected clone " << next_clone;
+                        << " expected clone " << next_clone << " " << missing
+                         << " missing";
       ++scrubber.shallow_errors;
       e.set_clone_missing(next_clone.snap);
     }
@@ -13321,15 +13945,19 @@ void PrimaryLogPG::scrub_snapshot_metadata(
                          << " snapset.head_exists=false, but head exists";
          ++scrubber.shallow_errors;
          head_error.set_head_mismatch();
+         // Fix head_exists locally so is_legacy() returns correctly
+          snapset->head_exists = true;
        }
        if (soid.is_snapdir() && snapset->head_exists) {
          osd->clog->error() << mode << " " << info.pgid << " " << soid
                          << " snapset.head_exists=true, but snapdir exists";
          ++scrubber.shallow_errors;
          head_error.set_head_mismatch();
+         // For symmetry fix this too, but probably doesn't matter
+          snapset->head_exists = false;
        }
 
-       if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+       if (get_osdmap()->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
          if (soid.is_snapdir()) {
            dout(10) << " will move snapset to head from " << soid << dendl;
            snapset_to_repair[soid.get_head()] = *snapset;
@@ -13453,7 +14081,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
     ObjectContextRef obc = get_object_context(p->first, false);
     if (!obc) {
       osd->clog->error() << info.pgid << " " << mode
-                        << " cannot get object context for "
+                        << " cannot get object context for object "
                         << p->first;
       continue;
     } else if (obc->obs.oi.soid != p->first) {
@@ -13510,7 +14138,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
     ObjectContextRef obc = get_object_context(p.first, true);
     if (!obc) {
       osd->clog->error() << info.pgid << " " << mode
-                        << " cannot get object context for "
+                        << " cannot get object context for object "
                         << p.first;
       continue;
     } else if (obc->obs.oi.soid != p.first) {
@@ -13645,6 +14273,9 @@ void PrimaryLogPG::_scrub_finish()
     publish_stats_to_osd();
     share_pg_info();
   }
+  // Clear object context cache to get repair information
+  if (repair)
+    object_contexts.clear();
 }
 
 bool PrimaryLogPG::check_osdmap_full(const set<pg_shard_t> &missing_on)
@@ -13652,6 +14283,73 @@ bool PrimaryLogPG::check_osdmap_full(const set<pg_shard_t> &missing_on)
     return osd->check_osdmap_full(missing_on);
 }
 
+int PrimaryLogPG::rep_repair_primary_object(const hobject_t& soid, OpRequestRef op)
+{
+  // Only supports replicated pools
+  assert(!pool.info.require_rollback());
+  assert(is_primary());
+
+  dout(10) << __func__ << " " << soid
+          << " peers osd.{" << actingbackfill << "}" << dendl;
+
+  if (!is_clean()) {
+    block_for_clean(soid, op);
+    return -EAGAIN;
+  }
+
+  assert(!pg_log.get_missing().is_missing(soid));
+  bufferlist bv;
+  object_info_t oi;
+  eversion_t v;
+  int r = get_pgbackend()->objects_get_attr(soid, OI_ATTR, &bv);
+  if (r < 0) {
+    // Leave v and try to repair without a version, getting attr failed
+    dout(0) << __func__ << ": Need version of replica, objects_get_attr failed: "
+           << soid << " error=" << r << dendl;
+  } else try {
+    bufferlist::iterator bliter = bv.begin();
+    ::decode(oi, bliter);
+    v = oi.version;
+  } catch (...) {
+    // Leave v as default constructed. This will fail when sent to older OSDs, but
+    // not much worse than failing here.
+    dout(0) << __func__ << ": Need version of replica, bad object_info_t: " << soid << dendl;
+  }
+
+  missing_loc.add_missing(soid, v, eversion_t());
+  if (primary_error(soid, v)) {
+    dout(0) << __func__ << " No other replicas available for " << soid << dendl;
+    // XXX: If we knew that there is no down osd which could include this
+    // object, it would be nice if we could return EIO here.
+    // If a "never fail" flag was available, that could be used
+    // for rbd to NOT return EIO until object marked lost.
+
+    // Drop through to save this op in case an osd comes up with the object.
+  }
+
+  // Restart the op after object becomes readable again
+  waiting_for_unreadable_object[soid].push_back(op);
+  op->mark_delayed("waiting for missing object");
+
+  if (!eio_errors_to_process) {
+    eio_errors_to_process = true;
+    assert(is_clean());
+    queue_peering_event(
+        CephPeeringEvtRef(
+         std::make_shared<CephPeeringEvt>(
+         get_osdmap()->get_epoch(),
+         get_osdmap()->get_epoch(),
+         DoRecovery())));
+  } else {
+    // A prior error must have already cleared clean state and queued recovery
+    // or a map change has triggered re-peering.
+    // Not inlining the recovery by calling maybe_kick_recovery(soid);
+    dout(5) << __func__<< ": Read error on " << soid << ", but already seen errors" << dendl;
+  }
+
+  return -EAGAIN;
+}
+
 /*---SnapTrimmer Logging---*/
 #undef dout_prefix
 #define dout_prefix *_dout << pg->gen_prefix() 
@@ -13700,7 +14398,6 @@ boost::statechart::result PrimaryLogPG::NotTrimming::react(const KickTrim&)
   }
   if (pg->scrubber.active) {
     ldout(pg->cct, 10) << " scrubbing, will requeue snap_trimmer after" << dendl;
-    pg->scrubber.queue_snap_trim = true;
     return transit< WaitScrub >();
   } else {
     return transit< Trimming >();
@@ -13734,6 +14431,7 @@ PrimaryLogPG::AwaitAsyncWork::AwaitAsyncWork(my_context ctx)
   context< SnapTrimmer >().log_enter(state_name);
   context< SnapTrimmer >().pg->osd->queue_for_snap_trim(pg);
   pg->state_set(PG_STATE_SNAPTRIM);
+  pg->state_clear(PG_STATE_SNAPTRIM_ERROR);
   pg->publish_stats_to_osd();
 }
 
@@ -13792,18 +14490,26 @@ boost::statechart::result PrimaryLogPG::AwaitAsyncWork::react(const DoSnapWork&)
   for (auto &&object: to_trim) {
     // Get next
     ldout(pg->cct, 10) << "AwaitAsyncWork react trimming " << object << dendl;
-    OpContextUPtr ctx = pg->trim_object(in_flight.empty(), object);
-    if (!ctx) {
-      ldout(pg->cct, 10) << "could not get write lock on obj "
-                        << object << dendl;
-      if (in_flight.empty()) {
+    OpContextUPtr ctx;
+    int error = pg->trim_object(in_flight.empty(), object, &ctx);
+    if (error) {
+      if (error == -ENOLCK) {
+       ldout(pg->cct, 10) << "could not get write lock on obj "
+                          << object << dendl;
+      } else {
+       pg->state_set(PG_STATE_SNAPTRIM_ERROR);
+       ldout(pg->cct, 10) << "Snaptrim error=" << error << dendl;
+      }
+      if (!in_flight.empty()) {
+       ldout(pg->cct, 10) << "letting the ones we already started finish" << dendl;
+       return transit< WaitRepops >();
+      }
+      if (error == -ENOLCK) {
        ldout(pg->cct, 10) << "waiting for it to clear"
                           << dendl;
        return transit< WaitRWLock >();
-
       } else {
-       ldout(pg->cct, 10) << "letting the ones we already started finish" << dendl;
-       return transit< WaitRepops >();
+        return transit< NotTrimming >();
       }
     }
 
@@ -13812,8 +14518,13 @@ boost::statechart::result PrimaryLogPG::AwaitAsyncWork::react(const DoSnapWork&)
       [pg, object, &in_flight]() {
        assert(in_flight.find(object) != in_flight.end());
        in_flight.erase(object);
-       if (in_flight.empty())
-         pg->snap_trimmer_machine.process_event(RepopsComplete());
+       if (in_flight.empty()) {
+         if (pg->state_test(PG_STATE_SNAPTRIM_ERROR)) {
+           pg->snap_trimmer_machine.process_event(Reset());
+         } else {
+           pg->snap_trimmer_machine.process_event(RepopsComplete());
+         }
+       }
       });
 
     pg->simple_opc_submit(std::move(ctx));
@@ -13870,26 +14581,23 @@ int PrimaryLogPG::getattr_maybe_cache(
 
 int PrimaryLogPG::getattrs_maybe_cache(
   ObjectContextRef obc,
-  map<string, bufferlist> *out,
-  bool user_only)
+  map<string, bufferlist> *out)
 {
   int r = 0;
+  assert(out);
   if (pool.info.require_rollback()) {
-    if (out)
-      *out = obc->attr_cache;
+    *out = obc->attr_cache;
   } else {
     r = pgbackend->objects_get_attrs(obc->obs.oi.soid, out);
   }
-  if (out && user_only) {
-    map<string, bufferlist> tmp;
-    for (map<string, bufferlist>::iterator i = out->begin();
-        i != out->end();
-        ++i) {
-      if (i->first.size() > 1 && i->first[0] == '_')
-       tmp[i->first.substr(1, i->first.size())].claim(i->second);
-    }
-    tmp.swap(*out);
+  map<string, bufferlist> tmp;
+  for (map<string, bufferlist>::iterator i = out->begin();
+       i != out->end();
+       ++i) {
+    if (i->first.size() > 1 && i->first[0] == '_')
+      tmp[i->first.substr(1, i->first.size())].claim(i->second);
   }
+  tmp.swap(*out);
   return r;
 }