]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PrimaryLogPG.cc
update sources to v12.1.1
[ceph.git] / ceph / src / osd / PrimaryLogPG.cc
index 8c4f08c7090dd738ee3fcbbe61684e8593814e77..33ef908f692089353b699fc8d193aaf41cbb513d 100644 (file)
@@ -418,10 +418,6 @@ void PrimaryLogPG::on_local_recover(
        waiting_for_unreadable_object.erase(unreadable_object_entry);
       }
     }
-    if (pg_log.get_missing().get_items().size() == 0) {
-      requeue_ops(waiting_for_all_missing);
-      waiting_for_all_missing.clear();
-    }
   } else {
     t->register_on_applied(
       new C_OSD_AppliedRecoveredObjectReplica(this));
@@ -518,6 +514,17 @@ void PrimaryLogPG::send_message_osd_cluster(
   osd->send_message_osd_cluster(m, con);
 }
 
+void PrimaryLogPG::on_primary_error(
+  const hobject_t &oid,
+  eversion_t v)
+{
+  dout(0) << __func__ << ": oid " << oid << " version " << v << dendl;
+  primary_failed(oid);
+  primary_error(oid, v);
+  backfills_in_flight.erase(oid);
+  missing_loc.add_missing(oid, v, eversion_t());
+}
+
 ConnectionRef PrimaryLogPG::get_con_osd_cluster(
   int peer, epoch_t from_epoch)
 {
@@ -571,12 +578,6 @@ void PrimaryLogPG::wait_for_unreadable_object(
   op->mark_delayed("waiting for missing object");
 }
 
-void PrimaryLogPG::wait_for_all_missing(OpRequestRef op)
-{
-  waiting_for_all_missing.push_back(op);
-  op->mark_delayed("waiting for all missing");
-}
-
 bool PrimaryLogPG::is_degraded_or_backfilling_object(const hobject_t& soid)
 {
   /* The conditions below may clear (on_local_recover, before we queue
@@ -629,6 +630,15 @@ void PrimaryLogPG::block_write_on_full_cache(
   op->mark_delayed("waiting for cache not full");
 }
 
+void PrimaryLogPG::block_for_clean(
+  const hobject_t& oid, OpRequestRef op)
+{
+  dout(20) << __func__ << ": blocking object " << oid
+          << " on primary repair" << dendl;
+  waiting_for_clean_to_primary_repair.push_back(op);
+  op->mark_delayed("waiting for clean to repair");
+}
+
 void PrimaryLogPG::block_write_on_snap_rollback(
   const hobject_t& oid, ObjectContextRef obc, OpRequestRef op)
 {
@@ -1877,15 +1887,15 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     }
   }
 
-  if (op->includes_pg_op()) {
-    return do_pg_op(op);
-  }
-
   if (!op_has_sufficient_caps(op)) {
     osd->reply_op_error(op, -EPERM);
     return;
   }
 
+  if (op->includes_pg_op()) {
+    return do_pg_op(op);
+  }
+
   // object name too long?
   if (m->get_oid().name.size() > cct->_conf->osd_max_object_name_len) {
     dout(4) << "do_op name is longer than "
@@ -1989,6 +1999,10 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
 
   // missing object?
   if (is_unreadable_object(head)) {
+    if (!is_primary()) {
+      osd->reply_op_error(op, -EAGAIN);
+      return;
+    }
     if (can_backoff &&
        (g_conf->osd_backoff_on_degraded ||
         (g_conf->osd_backoff_on_unfound && missing_loc.is_unfound(head)))) {
@@ -2150,6 +2164,13 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
       return;
   }
 
+  if (obc.get() && obc->obs.exists && obc->obs.oi.has_manifest()) {
+    if (maybe_handle_manifest(op,
+                              write_ordered,
+                              obc))
+    return;
+  }
+
   if (maybe_handle_cache(op,
                         write_ordered,
                         obc,
@@ -2166,9 +2187,9 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
       fill_in_copy_get_noent(op, oid, m->ops[0]);
       return;
     }
-    dout(20) << __func__ << "find_object_context got error " << r << dendl;
+    dout(20) << __func__ << "find_object_context got error " << r << dendl;
     if (op->may_write() &&
-       get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
       record_write_error(op, oid, nullptr, r);
     } else {
       osd->reply_op_error(op, r);
@@ -2245,7 +2266,7 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     dout(20) << __func__ << " returned an error: " << r << dendl;
     close_op_ctx(ctx);
     if (op->may_write() &&
-       get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
       record_write_error(op, oid, nullptr, r);
     } else {
       osd->reply_op_error(op, r);
@@ -2296,6 +2317,53 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
   // force recovery of the oldest missing object if too many logs
   maybe_force_recovery();
 }
+PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_manifest_detail(
+  OpRequestRef op,
+  bool write_ordered,
+  ObjectContextRef obc)
+{
+  if (static_cast<const MOSDOp *>(op->get_req())->get_flags() &
+      CEPH_OSD_FLAG_IGNORE_REDIRECT) {
+    dout(20) << __func__ << ": ignoring redirect due to flag" << dendl;
+    return cache_result_t::NOOP;
+  }
+
+  if (obc)
+    dout(10) << __func__ << " " << obc->obs.oi << " "
+       << (obc->obs.exists ? "exists" : "DNE")
+       << dendl;
+
+  // if it is write-ordered and blocked, stop now
+  if (obc.get() && obc->is_blocked() && write_ordered) {
+    // we're already doing something with this object
+    dout(20) << __func__ << " blocked on " << obc->obs.oi.soid << dendl;
+    return cache_result_t::NOOP;
+  }
+
+  vector<OSDOp> ops = static_cast<const MOSDOp*>(op->get_req())->ops;
+  for (vector<OSDOp>::iterator p = ops.begin(); p != ops.end(); ++p) {
+    OSDOp& osd_op = *p;
+    ceph_osd_op& op = osd_op.op;
+    if (op.op == CEPH_OSD_OP_SET_REDIRECT) {
+      return cache_result_t::NOOP;
+    }
+  }
+
+  switch (obc->obs.oi.manifest.type) {
+  case object_manifest_t::TYPE_REDIRECT:
+    if (op->may_write() || write_ordered) {
+      do_proxy_write(op, obc->obs.oi.soid, obc);
+    } else {
+      do_proxy_read(op, obc);
+    }
+    return cache_result_t::HANDLED_PROXY;
+  case object_manifest_t::TYPE_CHUNKED:
+  default:
+    assert(0 == "unrecognized manifest type");
+  }
+
+  return cache_result_t::NOOP;
+}
 
 void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
                                      MOSDOpReply *orig_reply, int r)
@@ -2304,7 +2372,7 @@ void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
   assert(op->may_write());
   const osd_reqid_t &reqid = static_cast<const MOSDOp*>(op->get_req())->get_reqid();
   ObjectContextRef obc;
-  mempool::osd::list<pg_log_entry_t> entries;
+  mempool::osd_pglog::list<pg_log_entry_t> entries;
   entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR, soid,
                                   get_next_version(), eversion_t(), 0,
                                   reqid, utime_t(), r));
@@ -2657,15 +2725,30 @@ struct C_ProxyRead : public Context {
   }
 };
 
-void PrimaryLogPG::do_proxy_read(OpRequestRef op)
+void PrimaryLogPG::do_proxy_read(OpRequestRef op, ObjectContextRef obc)
 {
   // NOTE: non-const here because the ProxyReadOp needs mutable refs to
   // stash the result in the request's OSDOp vector
   MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
-  object_locator_t oloc(m->get_object_locator());
-  oloc.pool = pool.info.tier_of;
-
-  const hobject_t& soid = m->get_hobj();
+  object_locator_t oloc;
+  hobject_t soid;
+  /* extensible tier */
+  if (obc && obc->obs.exists && obc->obs.oi.has_manifest()) {
+    switch (obc->obs.oi.manifest.type) {
+      case object_manifest_t::TYPE_REDIRECT:
+         oloc = object_locator_t(obc->obs.oi.manifest.redirect_target);
+         soid = obc->obs.oi.manifest.redirect_target;  
+         break;
+      case object_manifest_t::TYPE_CHUNKED:
+      default:
+       assert(0 == "unrecognized manifest type");
+    }
+  } else {
+  /* proxy */
+    soid = m->get_hobj();
+    oloc = object_locator_t(m->get_object_locator());
+    oloc.pool = pool.info.tier_of;
+  }
   unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY;
 
   // pass through some original flags that make sense.
@@ -2849,16 +2932,35 @@ struct C_ProxyWrite_Commit : public Context {
   }
 };
 
-void PrimaryLogPG::do_proxy_write(OpRequestRef op, const hobject_t& missing_oid)
+void PrimaryLogPG::do_proxy_write(OpRequestRef op, const hobject_t& missing_oid, ObjectContextRef obc)
 {
   // NOTE: non-const because ProxyWriteOp takes a mutable ref
   MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
-  object_locator_t oloc(m->get_object_locator());
-  oloc.pool = pool.info.tier_of;
+  object_locator_t oloc;
   SnapContext snapc(m->get_snap_seq(), m->get_snaps());
+  hobject_t soid;
+  /* extensible tier */
+  if (obc && obc->obs.exists && obc->obs.oi.has_manifest()) {
+    switch (obc->obs.oi.manifest.type) {
+      case object_manifest_t::TYPE_REDIRECT:
+         oloc = object_locator_t(obc->obs.oi.manifest.redirect_target);
+         soid = obc->obs.oi.manifest.redirect_target;  
+         break;
+      case object_manifest_t::TYPE_CHUNKED:
+      default:
+       assert(0 == "unrecognized manifest type");
+    }
+  } else {
+  /* proxy */
+    soid = m->get_hobj();
+    oloc = object_locator_t(m->get_object_locator());
+    oloc.pool = pool.info.tier_of;
+  }
 
-  const hobject_t& soid = m->get_hobj();
   unsigned flags = CEPH_OSD_FLAG_IGNORE_CACHE | CEPH_OSD_FLAG_IGNORE_OVERLAY;
+  if (!(op->may_write() || op->may_cache())) {
+    flags |= CEPH_OSD_FLAG_RWORDERED;
+  }
   dout(10) << __func__ << " Start proxy write for " << *m << dendl;
 
   ProxyWriteOpRef pwop(std::make_shared<ProxyWriteOp>(op, soid, m->ops, m->get_reqid()));
@@ -3524,29 +3626,34 @@ void PrimaryLogPG::do_backfill_remove(OpRequestRef op)
   assert(r == 0);
 }
 
-PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
-  bool first, const hobject_t &coid)
+int PrimaryLogPG::trim_object(
+  bool first, const hobject_t &coid, PrimaryLogPG::OpContextUPtr *ctxp)
 {
+  *ctxp = NULL;
   // load clone info
   bufferlist bl;
   ObjectContextRef obc = get_object_context(coid, false, NULL);
-  if (!obc) {
-    derr << __func__ << " could not find coid " << coid << dendl;
-    ceph_abort();
+  if (!obc || !obc->ssc || !obc->ssc->exists) {
+    osd->clog->error() << __func__ << ": Can not trim " << coid
+      << " repair needed " << (obc ? "(no obc->ssc or !exists)" : "(no obc)");
+    return -ENOENT;
   }
-  assert(obc->ssc);
 
   hobject_t snapoid(
     coid.oid, coid.get_key(),
     obc->ssc->snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.get_hash(),
     info.pgid.pool(), coid.get_namespace());
   ObjectContextRef snapset_obc = get_object_context(snapoid, false);
-  assert(snapset_obc);
+  if (!snapset_obc) {
+    osd->clog->error() << __func__ << ": Can not trim " << coid
+      << " repair needed, no snapset obc for " << snapoid;
+    return -ENOENT;
+  }
 
   SnapSet& snapset = obc->ssc->snapset;
 
   bool legacy = snapset.is_legacy() ||
-    !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+    get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
 
   object_info_t &coi = obc->obs.oi;
   set<snapid_t> old_snaps;
@@ -3557,21 +3664,21 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
     if (p == snapset.clone_snaps.end()) {
       osd->clog->error() << __func__ << " No clone_snaps in snapset " << snapset
                         << " for " << coid << "\n";
-      return NULL;
+      return -ENOENT;
     }
     old_snaps.insert(snapset.clone_snaps[coid.snap].begin(),
                     snapset.clone_snaps[coid.snap].end());
   }
   if (old_snaps.empty()) {
     osd->clog->error() << __func__ << " No object info snaps for " << coid;
-    return NULL;
+    return -ENOENT;
   }
 
   dout(10) << coid << " old_snaps " << old_snaps
           << " old snapset " << snapset << dendl;
   if (snapset.seq == 0) {
     osd->clog->error() << __func__ << " No snapset.seq for " << coid;
-    return NULL;
+    return -ENOENT;
   }
 
   set<snapid_t> new_snaps;
@@ -3588,7 +3695,7 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
     p = std::find(snapset.clones.begin(), snapset.clones.end(), coid.snap);
     if (p == snapset.clones.end()) {
       osd->clog->error() << __func__ << " Snap " << coid.snap << " not in clones";
-      return NULL;
+      return -ENOENT;
     }
   }
 
@@ -3601,7 +3708,7 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
        first)) {
     close_op_ctx(ctx.release());
     dout(10) << __func__ << ": Unable to get a wlock on " << coid << dendl;
-    return NULL;
+    return -ENOLCK;
   }
 
   if (!ctx->lock_manager.get_snaptrimmer_write(
@@ -3610,7 +3717,7 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
        first)) {
     close_op_ctx(ctx.release());
     dout(10) << __func__ << ": Unable to get a wlock on " << snapoid << dendl;
-    return NULL;
+    return -ENOLCK;
   }
 
   ctx->at_version = get_next_version();
@@ -3679,6 +3786,9 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
       coid,
       old_snaps,
       new_snaps);
+
+    coi = object_info_t(coid);
+
     ctx->at_version.version++;
   } else {
     // save adjusted snaps for this object
@@ -3747,20 +3857,19 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
       ctx->delta_stats.num_objects--;
       if (oi.is_dirty()) {
        ctx->delta_stats.num_objects_dirty--;
-       oi.clear_flag(object_info_t::FLAG_DIRTY);
       }
       if (oi.is_omap())
        ctx->delta_stats.num_objects_omap--;
       if (oi.is_whiteout()) {
        dout(20) << __func__ << " trimming whiteout on " << oi.soid << dendl;
        ctx->delta_stats.num_whiteouts--;
-       oi.clear_flag(object_info_t::FLAG_WHITEOUT);
       }
-      if (oi.is_cache_pinned())
+      if (oi.is_cache_pinned()) {
        ctx->delta_stats.num_objects_pinned--;
+      }
     }
     ctx->snapset_obc->obs.exists = false;
-    
+    ctx->snapset_obc->obs.oi = object_info_t(snapoid);
     t->remove(snapoid);
   } else {
     dout(10) << coid << " filtering snapset on " << snapoid << dendl;
@@ -3795,7 +3904,8 @@ PrimaryLogPG::OpContextUPtr PrimaryLogPG::trim_object(
     t->setattrs(snapoid, attrs);
   }
 
-  return ctx;
+  *ctxp = std::move(ctx);
+  return 0;
 }
 
 void PrimaryLogPG::kick_snap_trim()
@@ -4576,6 +4686,7 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
     case CEPH_OSD_OP_COPY_FROM:  // we handle user_version update explicitly
     case CEPH_OSD_OP_CACHE_PIN:
     case CEPH_OSD_OP_CACHE_UNPIN:
+    case CEPH_OSD_OP_SET_REDIRECT:
       break;
     default:
       if (op.op & CEPH_OSD_OP_MODE_WR)
@@ -4671,6 +4782,9 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        } else {
          int r = pgbackend->objects_read_sync(
            soid, op.extent.offset, op.extent.length, op.flags, &osd_op.outdata);
+         if (r == -EIO) {
+           r = rep_repair_primary_object(soid, ctx->op);
+         }
          if (r >= 0)
            op.extent.length = r;
          else {
@@ -4803,6 +4917,9 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            bufferlist t;
            uint64_t len = miter->first - last;
            r = pgbackend->objects_read_sync(soid, last, len, op.flags, &t);
+           if (r == -EIO) {
+             r = rep_repair_primary_object(soid, ctx->op);
+           }
            if (r < 0) {
              osd->clog->error() << coll << " " << soid
                                 << " sparse-read failed to read: "
@@ -5831,6 +5948,75 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
       }
       break;
 
+    case CEPH_OSD_OP_SET_REDIRECT:
+      ++ctx->num_write;
+      {
+       if (pool.info.is_tier()) {
+         result = -EINVAL;
+         break;
+       }
+       if (!obs.exists) {
+         result = -ENOENT;
+         break;
+       }
+       if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
+         result = -EOPNOTSUPP;
+         break;
+       }
+
+       object_t target_name;
+       object_locator_t target_oloc;
+       snapid_t target_snapid = (uint64_t)op.copy_from.snapid;
+       version_t target_version = op.copy_from.src_version;
+       try {
+         ::decode(target_name, bp);
+         ::decode(target_oloc, bp);
+       }
+       catch (buffer::error& e) {
+         result = -EINVAL;
+         goto fail;
+       }
+       pg_t raw_pg;
+       get_osdmap()->object_locator_to_pg(target_name, target_oloc, raw_pg);
+       hobject_t target(target_name, target_oloc.key, target_snapid,
+               raw_pg.ps(), raw_pg.pool(),
+               target_oloc.nspace);
+       if (target == soid) {
+         dout(20) << " set-redirect self is invalid" << dendl;
+         result = -EINVAL;
+         break;
+       }
+        oi.set_flag(object_info_t::FLAG_MANIFEST);
+       oi.manifest.redirect_target = target;
+       oi.manifest.type = object_manifest_t::TYPE_REDIRECT;
+       t->truncate(soid, 0);
+       if (oi.is_omap() && pool.info.supports_omap()) {
+         t->omap_clear(soid);
+         obs.oi.clear_omap_digest();
+         obs.oi.clear_flag(object_info_t::FLAG_OMAP);
+       }
+       ctx->delta_stats.num_bytes -= oi.size;
+       oi.size = 0;
+       oi.new_object();
+       oi.user_version = target_version;
+       ctx->user_at_version = target_version;
+       /* rm_attrs */
+       map<string,bufferlist> rmattrs;
+       result = getattrs_maybe_cache(ctx->obc,
+                   &rmattrs,
+                   true);
+       if (result < 0) {
+         return result;
+       }
+       map<string, bufferlist>::iterator iter;
+       for (iter = rmattrs.begin(); iter != rmattrs.end(); ++iter) {
+         const string& name = iter->first;
+         t->rmattr(soid, name);
+       }
+       dout(10) << "set-redirect oid:" << oi.soid << " user_version: " << oi.user_version << dendl;
+      }
+
+      break;
 
       // -- object attrs --
       
@@ -6476,7 +6662,7 @@ inline int PrimaryLogPG::_delete_oid(
     whiteout = true;
   }
   bool legacy;
-  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+  if (get_osdmap()->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
     legacy = false;
     // in luminous or later, we can't delete the head if there are
     // clones. we trust the caller passing no_whiteout has already
@@ -6586,16 +6772,26 @@ int PrimaryLogPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
   }
   {
     ObjectContextRef promote_obc;
-    switch (
-      maybe_handle_cache_detail(
-       ctx->op,
-       true,
-       rollback_to,
-       ret,
-       missing_oid,
-       true,
-       false,
-       &promote_obc)) {
+    cache_result_t tier_mode_result;
+    if (obs.exists && obs.oi.has_manifest()) { 
+      tier_mode_result = 
+       maybe_handle_manifest_detail(
+         ctx->op,
+         true,
+         rollback_to);
+    } else {
+      tier_mode_result = 
+       maybe_handle_cache_detail(
+         ctx->op,
+         true,
+         rollback_to,
+         ret,
+         missing_oid,
+         true,
+         false,
+         &promote_obc);
+    }
+    switch (tier_mode_result) {
     case cache_result_t::NOOP:
       break;
     case cache_result_t::BLOCKED_PROMOTE:
@@ -6807,7 +7003,7 @@ void PrimaryLogPG::make_writeable(OpContext *ctx)
     snap_oi->copy_user_bits(ctx->obs->oi);
 
     bool legacy = ctx->new_snapset.is_legacy() ||
-      !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+      get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
     if (legacy) {
       snap_oi->legacy_snaps = snaps;
     }
@@ -6868,7 +7064,7 @@ void PrimaryLogPG::make_writeable(OpContext *ctx)
   // update snapset with latest snap context
   ctx->new_snapset.seq = snapc.seq;
   ctx->new_snapset.snaps = snapc.snaps;
-  if (!get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+  if (get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS) {
     // pessimistic assumption that this is a net-new legacy SnapSet
     ctx->delta_stats.num_legacy_snapsets++;
     ctx->new_snapset.head_exists = ctx->new_obs.exists;
@@ -7056,7 +7252,7 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx)
   int result = do_osd_ops(ctx, ctx->ops);
   if (result < 0) {
     if (ctx->op->may_write() &&
-       get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
       // need to save the error code in the pg log, to detect dup ops,
       // but do nothing else
       ctx->update_log_only = true;
@@ -7068,7 +7264,7 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx)
   if (ctx->op_t->empty() && !ctx->modify) {
     unstable_stats.add(ctx->delta_stats);
     if (ctx->op->may_write() &&
-       get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+       get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
       ctx->update_log_only = true;
     }
     return result;
@@ -7147,7 +7343,7 @@ void PrimaryLogPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
                        info.pgid.pool(), soid.get_namespace());
       dout(10) << " final snapset " << ctx->new_snapset
               << " in " << snapoid << dendl;
-      assert(!get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS));
+      assert(get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS);
       ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, snapoid,
                                        ctx->at_version,
                                        eversion_t(),
@@ -7235,7 +7431,7 @@ void PrimaryLogPG::finish_ctx(OpContext *ctx, int log_op_type, bool maintain_ssc
   }
 
   bool legacy_snapset = ctx->new_snapset.is_legacy() ||
-    !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+    get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
 
   // append to log
   ctx->log.push_back(pg_log_entry_t(log_op_type, soid, ctx->at_version,
@@ -8147,7 +8343,7 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
   tctx->extra_reqids = results->reqids;
 
   bool legacy_snapset = tctx->new_snapset.is_legacy() ||
-    !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS);
+    get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS;
 
   if (whiteout) {
     // create a whiteout
@@ -8200,7 +8396,7 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
     assert(tctx->new_obs.oi.soid.snap == CEPH_NOSNAP);
     tctx->new_snapset.from_snap_set(
       results->snapset,
-      !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS));
+      get_osdmap()->require_osd_release < CEPH_RELEASE_LUMINOUS);
   }
   tctx->new_snapset.head_exists = true;
   dout(20) << __func__ << " new_snapset " << tctx->new_snapset << dendl;
@@ -8785,7 +8981,12 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version)
   if (is_primary()) {
     if (scrubber.active) {
       if (last_update_applied == scrubber.subset_last_update) {
-        requeue_scrub();
+        if (ops_blocked_by_scrub()) {
+          requeue_scrub(true);
+        } else {
+          requeue_scrub(false);
+        }
+
       }
     } else {
       assert(scrubber.start == scrubber.end);
@@ -9040,12 +9241,13 @@ void PrimaryLogPG::simple_opc_submit(OpContextUPtr ctx)
   dout(20) << __func__ << " " << repop << dendl;
   issue_repop(repop, ctx.get());
   eval_repop(repop);
+  calc_trim_to();
   repop->put();
 }
 
 
 void PrimaryLogPG::submit_log_entries(
-  const mempool::osd::list<pg_log_entry_t> &entries,
+  const mempool::osd_pglog::list<pg_log_entry_t> &entries,
   ObcLockManager &&manager,
   boost::optional<std::function<void(void)> > &&_on_complete,
   OpRequestRef op,
@@ -9062,7 +9264,7 @@ void PrimaryLogPG::submit_log_entries(
 
   boost::intrusive_ptr<RepGather> repop;
   boost::optional<std::function<void(void)> > on_complete;
-  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+  if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
     repop = new_repop(
       version,
       r,
@@ -9088,7 +9290,7 @@ void PrimaryLogPG::submit_log_entries(
        if (peer == pg_whoami) continue;
        assert(peer_missing.count(peer));
        assert(peer_info.count(peer));
-       if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+       if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
          assert(repop);
          MOSDPGUpdateLogMissing *m = new MOSDPGUpdateLogMissing(
            entries,
@@ -9112,7 +9314,7 @@ void PrimaryLogPG::submit_log_entries(
            peer.osd, m, get_osdmap()->get_epoch());
        }
       }
-      if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+      if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
        ceph_tid_t rep_tid = repop->rep_tid;
        waiting_on.insert(pg_whoami);
        log_entry_update_waiting_on.insert(
@@ -9406,6 +9608,7 @@ ObjectContextRef PrimaryLogPG::get_object_context(
        object_info_t oi(soid);
        SnapSetContext *ssc = get_snapset_context(
          soid, true, 0, false);
+        assert(ssc);
        obc = create_object_context(oi, ssc);
        dout(10) << __func__ << ": " << obc << " " << soid
                 << " " << obc->rwstate
@@ -9453,7 +9656,13 @@ ObjectContextRef PrimaryLogPG::get_object_context(
     dout(10) << __func__ << ": creating obc from disk: " << obc
             << dendl;
   }
-  assert(obc->ssc);
+
+  // XXX: Caller doesn't expect this
+  if (obc->ssc == NULL) {
+    derr << __func__ << ": obc->ssc not available, not returning context" << dendl;
+    return ObjectContextRef();   // -ENOENT!
+  }
+
   dout(10) << __func__ << ": " << obc << " " << soid
           << " " << obc->rwstate
           << " oi: " << obc->obs.oi
@@ -9830,7 +10039,12 @@ SnapSetContext *PrimaryLogPG::get_snapset_context(
     _register_snapset_context(ssc);
     if (bv.length()) {
       bufferlist::iterator bvp = bv.begin();
-      ssc->snapset.decode(bvp);
+      try {
+       ssc->snapset.decode(bvp);
+      } catch (buffer::error& e) {
+        dout(0) << __func__ << " Can't decode snapset: " << e << dendl;
+       return NULL;
+      }
       ssc->exists = true;
     } else {
       ssc->exists = false;
@@ -9924,12 +10138,14 @@ int PrimaryLogPG::recover_missing(
   start_recovery_op(soid);
   assert(!recovering.count(soid));
   recovering.insert(make_pair(soid, obc));
-  pgbackend->recover_object(
+  int r = pgbackend->recover_object(
     soid,
     v,
     head_obc,
     obc,
     h);
+  // This is only a pull which shouldn't return an error
+  assert(r >= 0);
   return PULL_YES;
 }
 
@@ -10016,7 +10232,11 @@ void PrimaryLogPG::_applied_recovered_object(ObjectContextRef obc)
   // requeue an active chunky scrub waiting on recovery ops
   if (!deleting && active_pushes == 0
       && scrubber.is_chunky_scrub_active()) {
-    requeue_scrub();
+    if (ops_blocked_by_scrub()) {
+      requeue_scrub(true);
+    } else {
+      requeue_scrub(false);
+    }
   }
 
   unlock();
@@ -10060,6 +10280,12 @@ void PrimaryLogPG::recover_got(hobject_t oid, eversion_t v)
   }
 }
 
+void PrimaryLogPG::primary_failed(const hobject_t &soid)
+{
+  list<pg_shard_t> fl = { pg_whoami };
+  failed_push(fl, soid);
+}
+
 void PrimaryLogPG::failed_push(const list<pg_shard_t> &from, const hobject_t &soid)
 {
   dout(20) << __func__ << ": " << soid << dendl;
@@ -10109,7 +10335,6 @@ eversion_t PrimaryLogPG::pick_newest_available(const hobject_t& oid)
     if (*i == get_primary()) continue;
     pg_shard_t peer = *i;
     if (!peer_missing[peer].is_missing(oid)) {
-      assert(is_backfill_targets(peer));
       continue;
     }
     eversion_t h = peer_missing[peer].get_items().at(oid).have;
@@ -10149,7 +10374,7 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
       unlock();
     });
 
-  if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+  if (get_osdmap()->require_osd_release >= CEPH_RELEASE_KRAKEN) {
     t.register_on_commit(complete);
   } else {
     /* Hack to work around the fact that ReplicatedBackend sends
@@ -10210,12 +10435,13 @@ void PrimaryLogPG::mark_all_unfound_lost(
   ceph_tid_t tid)
 {
   dout(3) << __func__ << " " << pg_log_entry_t::get_op_name(what) << dendl;
+  list<hobject_t> oids;
 
   dout(30) << __func__ << ": log before:\n";
   pg_log.get_log().print(*_dout);
   *_dout << dendl;
 
-  mempool::osd::list<pg_log_entry_t> log_entries;
+  mempool::osd_pglog::list<pg_log_entry_t> log_entries;
 
   utime_t mtime = ceph_clock_now();
   map<hobject_t, pg_missing_item>::const_iterator m =
@@ -10265,7 +10491,7 @@ void PrimaryLogPG::mark_all_unfound_lost(
       {
        pg_log_entry_t e(pg_log_entry_t::LOST_DELETE, oid, v, m->second.need,
                         0, osd_reqid_t(), mtime, 0);
-       if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
+       if (get_osdmap()->require_osd_release >= CEPH_RELEASE_JEWEL) {
          if (pool.info.require_rollback()) {
            e.mod_desc.try_rmobject(v.version);
          } else {
@@ -10274,6 +10500,7 @@ void PrimaryLogPG::mark_all_unfound_lost(
        } // otherwise, just do what we used to do
        dout(10) << e << dendl;
        log_entries.push_back(e);
+        oids.push_back(oid);
 
        ++v.version;
        ++m;
@@ -10291,9 +10518,9 @@ void PrimaryLogPG::mark_all_unfound_lost(
     log_entries,
     std::move(manager),
     boost::optional<std::function<void(void)> >(
-      [=]() {
-       requeue_ops(waiting_for_all_missing);
-       waiting_for_all_missing.clear();
+      [this, oids, con, num_unfound, tid]() {
+         for (auto oid: oids)
+           missing_loc.recovered(oid);
        for (auto& p : waiting_for_unreadable_object) {
          release_backoffs(p.first);
        }
@@ -10435,6 +10662,11 @@ void PrimaryLogPG::on_shutdown()
   // handles queue races
   deleting = true;
 
+  if (recovery_queued) {
+    recovery_queued = false;
+    osd->clear_queued_recovery(this);
+  }
+
   clear_scrub_reserved();
   scrub_clear_state();
 
@@ -10444,6 +10676,10 @@ void PrimaryLogPG::on_shutdown()
   cancel_proxy_ops(false);
   apply_and_flush_repops(false);
   cancel_log_updates();
+  // we must remove PGRefs, so do this this prior to release_backoffs() callers
+  clear_backoffs(); 
+  // clean up snap trim references
+  snap_trimmer_machine.process_event(Reset());
 
   pgbackend->on_change();
 
@@ -10478,6 +10714,7 @@ void PrimaryLogPG::on_activate()
          RequestBackfill())));
   } else {
     dout(10) << "activate all replicas clean, no recovery" << dendl;
+    eio_errors_to_process = false;
     queue_peering_event(
       CephPeeringEvtRef(
        std::make_shared<CephPeeringEvt>(
@@ -10576,10 +10813,8 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
 
   if (is_primary()) {
     requeue_ops(waiting_for_cache_not_full);
-    requeue_ops(waiting_for_all_missing);
   } else {
     waiting_for_cache_not_full.clear();
-    waiting_for_all_missing.clear();
   }
   objects_blocked_on_cache_full.clear();
 
@@ -10900,6 +11135,7 @@ bool PrimaryLogPG::start_recovery_ops(
             RequestBackfill())));
     } else {
       dout(10) << "recovery done, no backfill" << dendl;
+      eio_errors_to_process = false;
       queue_peering_event(
         CephPeeringEvtRef(
           std::make_shared<CephPeeringEvt>(
@@ -10910,6 +11146,7 @@ bool PrimaryLogPG::start_recovery_ops(
   } else { // backfilling
     state_clear(PG_STATE_BACKFILL);
     dout(10) << "recovery done, backfill done" << dendl;
+    eio_errors_to_process = false;
     queue_peering_event(
       CephPeeringEvtRef(
         std::make_shared<CephPeeringEvt>(
@@ -10960,8 +11197,7 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
     const pg_missing_item& item = missing.get_items().find(p->second)->second;
     ++p;
 
-    hobject_t head = soid;
-    head.snap = CEPH_NOSNAP;
+    hobject_t head = soid.get_head();
 
     eversion_t need = item.need;
 
@@ -11081,6 +11317,34 @@ uint64_t PrimaryLogPG::recover_primary(uint64_t max, ThreadPool::TPHandle &handl
   return started;
 }
 
+bool PrimaryLogPG::primary_error(
+  const hobject_t& soid, eversion_t v)
+{
+  pg_log.missing_add(soid, v, eversion_t());
+  pg_log.set_last_requested(0);
+  missing_loc.remove_location(soid, pg_whoami);
+  bool uhoh = true;
+  assert(!actingbackfill.empty());
+  for (set<pg_shard_t>::iterator i = actingbackfill.begin();
+       i != actingbackfill.end();
+       ++i) {
+    if (*i == get_primary()) continue;
+    pg_shard_t peer = *i;
+    if (!peer_missing[peer].is_missing(soid, v)) {
+      missing_loc.add_location(soid, peer);
+      dout(10) << info.pgid << " unexpectedly missing " << soid << " v" << v
+              << ", there should be a copy on shard " << peer << dendl;
+      uhoh = false;
+    }
+  }
+  if (uhoh)
+    osd->clog->error() << info.pgid << " missing primary copy of " << soid << ", unfound";
+  else
+    osd->clog->error() << info.pgid << " missing primary copy of " << soid
+                        << ", will try copies on " << missing_loc.get_locations(soid);
+  return uhoh;
+}
+
 int PrimaryLogPG::prep_object_replica_pushes(
   const hobject_t& soid, eversion_t v,
   PGBackend::RecoveryHandle *h)
@@ -11091,27 +11355,7 @@ int PrimaryLogPG::prep_object_replica_pushes(
   // NOTE: we know we will get a valid oloc off of disk here.
   ObjectContextRef obc = get_object_context(soid, false);
   if (!obc) {
-    pg_log.missing_add(soid, v, eversion_t());
-    missing_loc.remove_location(soid, pg_whoami);
-    bool uhoh = true;
-    assert(!actingbackfill.empty());
-    for (set<pg_shard_t>::iterator i = actingbackfill.begin();
-        i != actingbackfill.end();
-        ++i) {
-      if (*i == get_primary()) continue;
-      pg_shard_t peer = *i;
-      if (!peer_missing[peer].is_missing(soid, v)) {
-       missing_loc.add_location(soid, peer);
-       dout(10) << info.pgid << " unexpectedly missing " << soid << " v" << v
-                << ", there should be a copy on shard " << peer << dendl;
-       uhoh = false;
-      }
-    }
-    if (uhoh)
-      osd->clog->error() << info.pgid << " missing primary copy of " << soid << ", unfound";
-    else
-      osd->clog->error() << info.pgid << " missing primary copy of " << soid
-                        << ", will try copies on " << missing_loc.get_locations(soid);
+    primary_error(soid, v);
     return 0;
   }
 
@@ -11134,13 +11378,19 @@ int PrimaryLogPG::prep_object_replica_pushes(
    * In almost all cases, therefore, this lock should be uncontended.
    */
   obc->ondisk_read_lock();
-  pgbackend->recover_object(
+  int r = pgbackend->recover_object(
     soid,
     v,
     ObjectContextRef(),
     obc, // has snapset context
     h);
   obc->ondisk_read_unlock();
+  if (r < 0) {
+    dout(0) << __func__ << " Error " << r << " on oid " << soid << dendl;
+    primary_failed(soid);
+    primary_error(soid, v);
+    return 0;
+  }
   return 1;
 }
 
@@ -11175,8 +11425,14 @@ uint64_t PrimaryLogPG::recover_replicas(uint64_t max, ThreadPool::TPHandle &hand
       handle.reset_tp_timeout();
       const hobject_t soid(p->second);
 
+      if (missing_loc.is_unfound(soid)) {
+       dout(10) << __func__ << ": " << soid << " still unfound" << dendl;
+       continue;
+      }
+
       if (soid > pi->second.last_backfill) {
        if (!recovering.count(soid)) {
+          derr << __func__ << ": object " << soid << " last_backfill " << pi->second.last_backfill << dendl;
          derr << __func__ << ": object added to missing set for backfill, but "
               << "is not in recovering, error!" << dendl;
          ceph_abort();
@@ -11189,11 +11445,6 @@ uint64_t PrimaryLogPG::recover_replicas(uint64_t max, ThreadPool::TPHandle &hand
        continue;
       }
 
-      if (missing_loc.is_unfound(soid)) {
-       dout(10) << __func__ << ": " << soid << " still unfound" << dendl;
-       continue;
-      }
-
       if (soid.is_snap() && pg_log.get_missing().is_missing(soid.get_head())) {
        dout(10) << __func__ << ": " << soid.get_head()
                 << " still missing on primary" << dendl;
@@ -11331,8 +11582,6 @@ uint64_t PrimaryLogPG::recover_backfill(
   update_range(&backfill_info, handle);
 
   unsigned ops = 0;
-  vector<boost::tuple<hobject_t, eversion_t,
-                      ObjectContextRef, vector<pg_shard_t> > > to_push;
   vector<boost::tuple<hobject_t, eversion_t, pg_shard_t> > to_remove;
   set<hobject_t> add_to_stat;
 
@@ -11344,6 +11593,7 @@ uint64_t PrimaryLogPG::recover_backfill(
   }
   backfill_info.trim_to(last_backfill_started);
 
+  PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
   while (ops < max) {
     if (backfill_info.begin <= earliest_peer_backfill() &&
        !backfill_info.extends_to_end() && backfill_info.empty()) {
@@ -11523,10 +11773,13 @@ uint64_t PrimaryLogPG::recover_backfill(
          vector<pg_shard_t> all_push = need_ver_targs;
          all_push.insert(all_push.end(), missing_targs.begin(), missing_targs.end());
 
-         to_push.push_back(
-           boost::tuple<hobject_t, eversion_t, ObjectContextRef, vector<pg_shard_t> >
-           (backfill_info.begin, obj_v, obc, all_push));
-         // Count all simultaneous pushes of the same object as a single op
+         handle.reset_tp_timeout();
+         int r = prep_backfill_object_push(backfill_info.begin, obj_v, obc, all_push, h);
+         if (r < 0) {
+           *work_started = true;
+           dout(0) << __func__ << " Error " << r << " trying to backfill " << backfill_info.begin << dendl;
+           break;
+         }
          ops++;
        } else {
          *work_started = true;
@@ -11607,12 +11860,6 @@ uint64_t PrimaryLogPG::recover_backfill(
     }
   }
 
-  PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
-  for (unsigned i = 0; i < to_push.size(); ++i) {
-    handle.reset_tp_timeout();
-    prep_backfill_object_push(to_push[i].get<0>(), to_push[i].get<1>(),
-           to_push[i].get<2>(), to_push[i].get<3>(), h);
-  }
   pgbackend->run_recovery_op(h, get_recovery_op_priority());
 
   dout(5) << "backfill_pos is " << backfill_pos << dendl;
@@ -11703,13 +11950,13 @@ uint64_t PrimaryLogPG::recover_backfill(
   return ops;
 }
 
-void PrimaryLogPG::prep_backfill_object_push(
+int PrimaryLogPG::prep_backfill_object_push(
   hobject_t oid, eversion_t v,
   ObjectContextRef obc,
   vector<pg_shard_t> peers,
   PGBackend::RecoveryHandle *h)
 {
-  dout(10) << "push_backfill_object " << oid << " v " << v << " to peers " << peers << dendl;
+  dout(10) << __func__ << " " << oid << " v " << v << " to peers " << peers << dendl;
   assert(!peers.empty());
 
   backfills_in_flight.insert(oid);
@@ -11726,13 +11973,21 @@ void PrimaryLogPG::prep_backfill_object_push(
 
   // We need to take the read_lock here in order to flush in-progress writes
   obc->ondisk_read_lock();
-  pgbackend->recover_object(
+  int r = pgbackend->recover_object(
     oid,
     v,
     ObjectContextRef(),
     obc,
     h);
   obc->ondisk_read_unlock();
+  if (r < 0) {
+    dout(0) << __func__ << " Error " << r << " on oid " << oid << dendl;
+    primary_failed(oid);
+    primary_error(oid, v);
+    backfills_in_flight.erase(oid);
+    missing_loc.add_missing(oid, v, eversion_t());
+  }
+  return r;
 }
 
 void PrimaryLogPG::update_range(
@@ -13329,7 +13584,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
          head_error.set_head_mismatch();
        }
 
-       if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_LUMINOUS)) {
+       if (get_osdmap()->require_osd_release >= CEPH_RELEASE_LUMINOUS) {
          if (soid.is_snapdir()) {
            dout(10) << " will move snapset to head from " << soid << dendl;
            snapset_to_repair[soid.get_head()] = *snapset;
@@ -13645,6 +13900,9 @@ void PrimaryLogPG::_scrub_finish()
     publish_stats_to_osd();
     share_pg_info();
   }
+  // Clear object context cache to get repair information
+  if (repair)
+    object_contexts.clear();
 }
 
 bool PrimaryLogPG::check_osdmap_full(const set<pg_shard_t> &missing_on)
@@ -13652,6 +13910,73 @@ bool PrimaryLogPG::check_osdmap_full(const set<pg_shard_t> &missing_on)
     return osd->check_osdmap_full(missing_on);
 }
 
+int PrimaryLogPG::rep_repair_primary_object(const hobject_t& soid, OpRequestRef op)
+{
+  // Only supports replicated pools
+  assert(!pool.info.require_rollback());
+  assert(is_primary());
+
+  dout(10) << __func__ << " " << soid
+          << " peers osd.{" << actingbackfill << "}" << dendl;
+
+  if (!is_clean()) {
+    block_for_clean(soid, op);
+    return -EAGAIN;
+  }
+
+  assert(!pg_log.get_missing().is_missing(soid));
+  bufferlist bv;
+  object_info_t oi;
+  eversion_t v;
+  int r = get_pgbackend()->objects_get_attr(soid, OI_ATTR, &bv);
+  if (r < 0) {
+    // Leave v and try to repair without a version, getting attr failed
+    dout(0) << __func__ << ": Need version of replica, objects_get_attr failed: "
+           << soid << " error=" << r << dendl;
+  } else try {
+    bufferlist::iterator bliter = bv.begin();
+    ::decode(oi, bliter);
+    v = oi.version;
+  } catch (...) {
+    // Leave v as default constructed. This will fail when sent to older OSDs, but
+    // not much worse than failing here.
+    dout(0) << __func__ << ": Need version of replica, bad object_info_t: " << soid << dendl;
+  }
+
+  missing_loc.add_missing(soid, v, eversion_t());
+  if (primary_error(soid, v)) {
+    dout(0) << __func__ << " No other replicas available for " << soid << dendl;
+    // XXX: If we knew that there is no down osd which could include this
+    // object, it would be nice if we could return EIO here.
+    // If a "never fail" flag was available, that could be used
+    // for rbd to NOT return EIO until object marked lost.
+
+    // Drop through to save this op in case an osd comes up with the object.
+  }
+
+  // Restart the op after object becomes readable again
+  waiting_for_unreadable_object[soid].push_back(op);
+  op->mark_delayed("waiting for missing object");
+
+  if (!eio_errors_to_process) {
+    eio_errors_to_process = true;
+    assert(is_clean());
+    queue_peering_event(
+        CephPeeringEvtRef(
+         std::make_shared<CephPeeringEvt>(
+         get_osdmap()->get_epoch(),
+         get_osdmap()->get_epoch(),
+         DoRecovery())));
+  } else {
+    // A prior error must have already cleared clean state and queued recovery
+    // or a map change has triggered re-peering.
+    // Not inlining the recovery by calling maybe_kick_recovery(soid);
+    dout(5) << __func__<< ": Read error on " << soid << ", but already seen errors" << dendl;
+  }
+
+  return -EAGAIN;
+}
+
 /*---SnapTrimmer Logging---*/
 #undef dout_prefix
 #define dout_prefix *_dout << pg->gen_prefix() 
@@ -13700,7 +14025,6 @@ boost::statechart::result PrimaryLogPG::NotTrimming::react(const KickTrim&)
   }
   if (pg->scrubber.active) {
     ldout(pg->cct, 10) << " scrubbing, will requeue snap_trimmer after" << dendl;
-    pg->scrubber.queue_snap_trim = true;
     return transit< WaitScrub >();
   } else {
     return transit< Trimming >();
@@ -13734,6 +14058,7 @@ PrimaryLogPG::AwaitAsyncWork::AwaitAsyncWork(my_context ctx)
   context< SnapTrimmer >().log_enter(state_name);
   context< SnapTrimmer >().pg->osd->queue_for_snap_trim(pg);
   pg->state_set(PG_STATE_SNAPTRIM);
+  pg->state_clear(PG_STATE_SNAPTRIM_ERROR);
   pg->publish_stats_to_osd();
 }
 
@@ -13792,18 +14117,26 @@ boost::statechart::result PrimaryLogPG::AwaitAsyncWork::react(const DoSnapWork&)
   for (auto &&object: to_trim) {
     // Get next
     ldout(pg->cct, 10) << "AwaitAsyncWork react trimming " << object << dendl;
-    OpContextUPtr ctx = pg->trim_object(in_flight.empty(), object);
-    if (!ctx) {
-      ldout(pg->cct, 10) << "could not get write lock on obj "
-                        << object << dendl;
-      if (in_flight.empty()) {
+    OpContextUPtr ctx;
+    int error = pg->trim_object(in_flight.empty(), object, &ctx);
+    if (error) {
+      if (error == -ENOLCK) {
+       ldout(pg->cct, 10) << "could not get write lock on obj "
+                          << object << dendl;
+      } else {
+       pg->state_set(PG_STATE_SNAPTRIM_ERROR);
+       ldout(pg->cct, 10) << "Snaptrim error=" << error << dendl;
+      }
+      if (!in_flight.empty()) {
+       ldout(pg->cct, 10) << "letting the ones we already started finish" << dendl;
+       return transit< WaitRepops >();
+      }
+      if (error == -ENOLCK) {
        ldout(pg->cct, 10) << "waiting for it to clear"
                           << dendl;
        return transit< WaitRWLock >();
-
       } else {
-       ldout(pg->cct, 10) << "letting the ones we already started finish" << dendl;
-       return transit< WaitRepops >();
+        return transit< NotTrimming >();
       }
     }
 
@@ -13812,8 +14145,13 @@ boost::statechart::result PrimaryLogPG::AwaitAsyncWork::react(const DoSnapWork&)
       [pg, object, &in_flight]() {
        assert(in_flight.find(object) != in_flight.end());
        in_flight.erase(object);
-       if (in_flight.empty())
-         pg->snap_trimmer_machine.process_event(RepopsComplete());
+       if (in_flight.empty()) {
+         if (pg->state_test(PG_STATE_SNAPTRIM_ERROR)) {
+           pg->snap_trimmer_machine.process_event(Reset());
+         } else {
+           pg->snap_trimmer_machine.process_event(RepopsComplete());
+         }
+       }
       });
 
     pg->simple_opc_submit(std::move(ctx));