]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PrimaryLogPG.cc
update sources to 12.2.7
[ceph.git] / ceph / src / osd / PrimaryLogPG.cc
index 7aba1df5d2d9852b45dbc6ebc807c2273dd40fd6..aaf9136a45e672b91acdbfcc2dc6f7c3ed3e3f28 100644 (file)
@@ -1574,8 +1574,10 @@ void PrimaryLogPG::calc_trim_to()
   if (limit != eversion_t() &&
       limit != pg_trim_to &&
       pg_log.get_log().approx_size() > target) {
-    size_t num_to_trim = pg_log.get_log().approx_size() - target;
-    if (num_to_trim < cct->_conf->osd_pg_log_trim_min) {
+    size_t num_to_trim = MIN(pg_log.get_log().approx_size() - target,
+                            cct->_conf->osd_pg_log_trim_max);
+    if (num_to_trim < cct->_conf->osd_pg_log_trim_min &&
+       cct->_conf->osd_pg_log_trim_max >= cct->_conf->osd_pg_log_trim_min) {
       return;
     }
     list<pg_log_entry_t>::const_iterator it = pg_log.get_log().log.begin();
@@ -2057,8 +2059,8 @@ void PrimaryLogPG::do_op(OpRequestRef& op)
     return;
   }
 
-  if (write_ordered &&
-      scrubber.write_blocked_by_scrub(head)) {
+  if (write_ordered && scrubber.is_chunky_scrub_active() &&
+      write_blocked_by_scrub(head)) {
     dout(20) << __func__ << ": waiting for scrub" << dendl;
     waiting_for_scrub.push_back(op);
     op->mark_delayed("waiting for scrub");
@@ -2899,14 +2901,15 @@ void PrimaryLogPG::kick_proxy_ops_blocked(hobject_t& soid)
   in_progress_proxy_ops.erase(p);
 }
 
-void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop)
+void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop,
+                                    vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << prdop->soid << dendl;
   prdop->canceled = true;
 
   // cancel objecter op, if we can
   if (prdop->objecter_tid) {
-    osd->objecter->op_cancel(prdop->objecter_tid, -ECANCELED);
+    tids->push_back(prdop->objecter_tid);
     for (uint32_t i = 0; i < prdop->ops.size(); i++) {
       prdop->ops[i].outdata.clear();
     }
@@ -2915,20 +2918,20 @@ void PrimaryLogPG::cancel_proxy_read(ProxyReadOpRef prdop)
   }
 }
 
-void PrimaryLogPG::cancel_proxy_ops(bool requeue)
+void PrimaryLogPG::cancel_proxy_ops(bool requeue, vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << dendl;
 
   // cancel proxy reads
   map<ceph_tid_t, ProxyReadOpRef>::iterator p = proxyread_ops.begin();
   while (p != proxyread_ops.end()) {
-    cancel_proxy_read((p++)->second);
+    cancel_proxy_read((p++)->second, tids);
   }
 
   // cancel proxy writes
   map<ceph_tid_t, ProxyWriteOpRef>::iterator q = proxywrite_ops.begin();
   while (q != proxywrite_ops.end()) {
-    cancel_proxy_write((q++)->second);
+    cancel_proxy_write((q++)->second, tids);
   }
 
   if (requeue) {
@@ -3082,14 +3085,15 @@ void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r)
   pwop->ctx = NULL;
 }
 
-void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop)
+void PrimaryLogPG::cancel_proxy_write(ProxyWriteOpRef pwop,
+                                     vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << pwop->soid << dendl;
   pwop->canceled = true;
 
   // cancel objecter op, if we can
   if (pwop->objecter_tid) {
-    osd->objecter->op_cancel(pwop->objecter_tid, -ECANCELED);
+    tids->push_back(pwop->objecter_tid);
     delete pwop->ctx;
     pwop->ctx = NULL;
     proxywrite_ops.erase(pwop->objecter_tid);
@@ -3123,7 +3127,7 @@ void PrimaryLogPG::promote_object(ObjectContextRef obc,
 {
   hobject_t hoid = obc ? obc->obs.oi.soid : missing_oid;
   assert(hoid != hobject_t());
-  if (scrubber.write_blocked_by_scrub(hoid)) {
+  if (write_blocked_by_scrub(hoid)) {
     dout(10) << __func__ << " " << hoid
             << " blocked by scrub" << dendl;
     if (op) {
@@ -4528,6 +4532,9 @@ int PrimaryLogPG::do_checksum(OpContext *ctx, OSDOp& osd_op,
                              bufferlist::iterator *bl_it)
 {
   dout(20) << __func__ << dendl;
+  bool skip_data_digest =
+    (osd->store->has_builtin_csum() && g_conf->osd_skip_data_digest) ||
+    g_conf->osd_distrust_data_digest;
 
   auto& op = osd_op.op;
   if (op.checksum.chunk_size > 0) {
@@ -4582,7 +4589,8 @@ int PrimaryLogPG::do_checksum(OpContext *ctx, OSDOp& osd_op,
     // If there is a data digest and it is possible we are reading
     // entire object, pass the digest.
     boost::optional<uint32_t> maybe_crc;
-    if (oi.is_data_digest() && op.checksum.offset == 0 &&
+    if (!skip_data_digest &&
+       oi.is_data_digest() && op.checksum.offset == 0 &&
         op.checksum.length >= oi.size) {
       maybe_crc = oi.data_digest;
     }
@@ -4730,6 +4738,9 @@ int PrimaryLogPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
 {
   dout(20) << __func__ << dendl;
   ceph_osd_op& op = osd_op.op;
+  bool skip_data_digest =
+    (osd->store->has_builtin_csum() && g_conf->osd_skip_data_digest) ||
+    g_conf->osd_distrust_data_digest;
 
   auto& oi = ctx->new_obs.oi;
   uint64_t size = oi.size;
@@ -4754,7 +4765,8 @@ int PrimaryLogPG::do_extent_cmp(OpContext *ctx, OSDOp& osd_op)
     // If there is a data digest and it is possible we are reading
     // entire object, pass the digest.
     boost::optional<uint32_t> maybe_crc;
-    if (oi.is_data_digest() && op.checksum.offset == 0 &&
+    if (!skip_data_digest &&
+       oi.is_data_digest() && op.checksum.offset == 0 &&
         op.checksum.length >= oi.size) {
       maybe_crc = oi.data_digest;
     }
@@ -4812,6 +4824,9 @@ int PrimaryLogPG::do_read(OpContext *ctx, OSDOp& osd_op) {
   __u32 seq = oi.truncate_seq;
   uint64_t size = oi.size;
   bool trimmed_read = false;
+  bool skip_data_digest =
+    (osd->store->has_builtin_csum() && g_conf->osd_skip_data_digest) ||
+    g_conf->osd_distrust_data_digest;
 
   // are we beyond truncate_size?
   if ( (seq < op.extent.truncate_seq) &&
@@ -4840,7 +4855,8 @@ int PrimaryLogPG::do_read(OpContext *ctx, OSDOp& osd_op) {
     // If there is a data digest and it is possible we are reading
     // entire object, pass the digest.  FillInVerifyExtent will
     // will check the oi.size again.
-    if (oi.is_data_digest() && op.extent.offset == 0 &&
+    if (!skip_data_digest &&
+       oi.is_data_digest() && op.extent.offset == 0 &&
         op.extent.length >= oi.size)
       maybe_crc = oi.data_digest;
     ctx->pending_async_reads.push_back(
@@ -4870,7 +4886,8 @@ int PrimaryLogPG::do_read(OpContext *ctx, OSDOp& osd_op) {
             << " bytes from obj " << soid << dendl;
 
     // whole object?  can we verify the checksum?
-    if (op.extent.length == oi.size && oi.is_data_digest()) {
+    if (!skip_data_digest &&
+       op.extent.length == oi.size && oi.is_data_digest()) {
       uint32_t crc = osd_op.outdata.crc32c(-1);
       if (oi.data_digest != crc) {
         osd->clog->error() << info.pgid << std::hex
@@ -4895,6 +4912,9 @@ int PrimaryLogPG::do_sparse_read(OpContext *ctx, OSDOp& osd_op) {
   auto& op = osd_op.op;
   auto& oi = ctx->new_obs.oi;
   auto& soid = oi.soid;
+  bool skip_data_digest =
+    (osd->store->has_builtin_csum() && g_conf->osd_skip_data_digest) ||
+    g_conf->osd_distrust_data_digest;
 
   if (op.extent.truncate_seq) {
     dout(0) << "sparse_read does not support truncation sequence " << dendl;
@@ -5008,7 +5028,8 @@ int PrimaryLogPG::do_sparse_read(OpContext *ctx, OSDOp& osd_op) {
     // Maybe at first, there is no much whole objects. With continued use, more
     // and more whole object exist. So from this point, for spare-read add
     // checksum make sense.
-    if (total_read == oi.size && oi.is_data_digest()) {
+    if (!skip_data_digest &&
+       total_read == oi.size && oi.is_data_digest()) {
       uint32_t crc = data_bl.crc32c(-1);
       if (oi.data_digest != crc) {
         osd->clog->error() << info.pgid << std::hex
@@ -5041,6 +5062,9 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
   ObjectState& obs = ctx->new_obs;
   object_info_t& oi = obs.oi;
   const hobject_t& soid = oi.soid;
+  bool skip_data_digest =
+    (osd->store->has_builtin_csum() && g_conf->osd_skip_data_digest) ||
+    g_conf->osd_distrust_data_digest;
 
   PGTransaction* t = ctx->op_t.get();
 
@@ -5853,12 +5877,18 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
            soid, op.extent.offset, op.extent.length, osd_op.indata, op.flags);
        }
 
-       if (op.extent.offset == 0 && op.extent.length >= oi.size)
+       if (op.extent.offset == 0 && op.extent.length >= oi.size
+            && !skip_data_digest) {
          obs.oi.set_data_digest(osd_op.indata.crc32c(-1));
-       else if (op.extent.offset == oi.size && obs.oi.is_data_digest())
-         obs.oi.set_data_digest(osd_op.indata.crc32c(obs.oi.data_digest));
-       else
+       } else if (op.extent.offset == oi.size && obs.oi.is_data_digest()) {
+          if (skip_data_digest) {
+            obs.oi.clear_data_digest();
+          } else {
+           obs.oi.set_data_digest(osd_op.indata.crc32c(obs.oi.data_digest));
+          }
+       } else {
          obs.oi.clear_data_digest();
+        }
        write_update_size_and_usage(ctx->delta_stats, oi, ctx->modified_ranges,
                                    op.extent.offset, op.extent.length);
 
@@ -5890,7 +5920,11 @@ int PrimaryLogPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
        if (op.extent.length) {
          t->write(soid, 0, op.extent.length, osd_op.indata, op.flags);
        }
-       obs.oi.set_data_digest(osd_op.indata.crc32c(-1));
+        if (!skip_data_digest) {
+         obs.oi.set_data_digest(osd_op.indata.crc32c(-1));
+        } else {
+         obs.oi.clear_data_digest();
+       }
 
        write_update_size_and_usage(ctx->delta_stats, oi, ctx->modified_ranges,
            0, op.extent.length, true);
@@ -7820,6 +7854,10 @@ int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::iterator& bp,
   int result = 0;
   object_copy_cursor_t cursor;
   uint64_t out_max;
+  bool skip_data_digest =
+    (osd->store->has_builtin_csum() && g_conf->osd_skip_data_digest) ||
+    g_conf->osd_distrust_data_digest;
+
   try {
     ::decode(cursor, bp);
     ::decode(out_max, bp);
@@ -7854,7 +7892,7 @@ int PrimaryLogPG::do_copy_get(OpContext *ctx, bufferlist::iterator& bp,
   } else {
     reply_obj.snap_seq = obc->ssc->snapset.seq;
   }
-  if (oi.is_data_digest()) {
+  if (!skip_data_digest && oi.is_data_digest()) {
     reply_obj.flags |= object_copy_data_t::FLAG_DATA_DIGEST;
     reply_obj.data_digest = oi.data_digest;
   }
@@ -8026,7 +8064,9 @@ void PrimaryLogPG::start_copy(CopyCallback *cb, ObjectContextRef obc,
     // FIXME: if the src etc match, we could avoid restarting from the
     // beginning.
     CopyOpRef cop = copy_ops[dest];
-    cancel_copy(cop, false);
+    vector<ceph_tid_t> tids;
+    cancel_copy(cop, false, &tids);
+    osd->objecter->op_cancel(tids, -ECANCELED);
   }
 
   CopyOpRef cop(std::make_shared<CopyOp>(cb, obc, src, oloc, version, flags,
@@ -8106,6 +8146,7 @@ void PrimaryLogPG::_copy_some(ObjectContextRef obc, CopyOpRef cop)
 
 void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
 {
+  vector<ceph_tid_t> tids;
   dout(10) << __func__ << " " << oid << " tid " << tid
           << " " << cpp_strerror(r) << dendl;
   map<hobject_t,CopyOpRef>::iterator p = copy_ops.find(oid);
@@ -8292,7 +8333,7 @@ void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
     for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
        it != proxyread_ops.end();) {
       if (it->second->soid == cobc->obs.oi.soid) {
-       cancel_proxy_read((it++)->second);
+       cancel_proxy_read((it++)->second, &tids);
       } else {
        ++it;
       }
@@ -8300,17 +8341,40 @@ void PrimaryLogPG::process_copy_chunk(hobject_t oid, ceph_tid_t tid, int r)
     for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
         it != proxywrite_ops.end();) {
       if (it->second->soid == cobc->obs.oi.soid) {
-       cancel_proxy_write((it++)->second);
+       cancel_proxy_write((it++)->second, &tids);
       } else {
        ++it;
       }
     }
+    osd->objecter->op_cancel(tids, -ECANCELED);
     kick_proxy_ops_blocked(cobc->obs.oi.soid);
   }
 
   kick_object_context_blocked(cobc);
 }
 
+void PrimaryLogPG::cancel_and_requeue_proxy_ops(hobject_t oid) {
+  vector<ceph_tid_t> tids;
+  for (map<ceph_tid_t, ProxyReadOpRef>::iterator it = proxyread_ops.begin();
+      it != proxyread_ops.end();) {
+    if (it->second->soid == oid) {
+      cancel_proxy_read((it++)->second, &tids);
+    } else {
+      ++it;
+    }
+  }
+  for (map<ceph_tid_t, ProxyWriteOpRef>::iterator it = proxywrite_ops.begin();
+       it != proxywrite_ops.end();) {
+    if (it->second->soid == oid) {
+      cancel_proxy_write((it++)->second, &tids);
+    } else {
+      ++it;
+    }
+  }
+  osd->objecter->op_cancel(tids, -ECANCELED);
+  kick_proxy_ops_blocked(oid);
+}
+
 void PrimaryLogPG::_write_copy_chunk(CopyOpRef cop, PGTransaction *t)
 {
   dout(20) << __func__ << " " << cop
@@ -8398,8 +8462,16 @@ void PrimaryLogPG::finish_copyfrom(CopyFromCallback *cb)
   // CopyFromCallback fills this in for us
   obs.oi.user_version = ctx->user_at_version;
 
-  obs.oi.set_data_digest(cb->results->data_digest);
-  obs.oi.set_omap_digest(cb->results->omap_digest);
+  if (cb->results->is_data_digest()) {
+    obs.oi.set_data_digest(cb->results->data_digest);
+  } else {
+    obs.oi.clear_data_digest();
+  }
+  if (cb->results->is_omap_digest()) {
+    obs.oi.set_omap_digest(cb->results->omap_digest);
+  } else {
+    obs.oi.clear_omap_digest();
+  }
 
   obs.oi.truncate_seq = cb->results->truncate_seq;
   obs.oi.truncate_size = cb->results->truncate_size;
@@ -8590,11 +8662,16 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
     }
     tctx->new_obs.oi.size = results->object_size;
     tctx->new_obs.oi.user_version = results->user_version;
-    // Don't care src object whether have data or omap digest
-    if (results->object_size)
+    if (results->is_data_digest()) {
       tctx->new_obs.oi.set_data_digest(results->data_digest);
-    if (results->has_omap)
+    } else {
+      tctx->new_obs.oi.clear_data_digest();
+    }
+    if (results->is_omap_digest()) {
       tctx->new_obs.oi.set_omap_digest(results->omap_digest);
+    } else {
+      tctx->new_obs.oi.clear_omap_digest();
+    }
     tctx->new_obs.oi.truncate_seq = results->truncate_seq;
     tctx->new_obs.oi.truncate_size = results->truncate_size;
 
@@ -8645,7 +8722,8 @@ void PrimaryLogPG::finish_promote(int r, CopyResults *results,
     agent_choose_mode();
 }
 
-void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
+void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue,
+                              vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << cop->obc->obs.oi.soid
           << " from " << cop->src << " " << cop->oloc
@@ -8653,10 +8731,10 @@ void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
 
   // cancel objecter op, if we can
   if (cop->objecter_tid) {
-    osd->objecter->op_cancel(cop->objecter_tid, -ECANCELED);
+    tids->push_back(cop->objecter_tid);
     cop->objecter_tid = 0;
     if (cop->objecter_tid2) {
-      osd->objecter->op_cancel(cop->objecter_tid2, -ECANCELED);
+      tids->push_back(cop->objecter_tid2);
       cop->objecter_tid2 = 0;
     }
   }
@@ -8675,13 +8753,13 @@ void PrimaryLogPG::cancel_copy(CopyOpRef cop, bool requeue)
   cop->obc = ObjectContextRef();
 }
 
-void PrimaryLogPG::cancel_copy_ops(bool requeue)
+void PrimaryLogPG::cancel_copy_ops(bool requeue, vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << dendl;
   map<hobject_t,CopyOpRef>::iterator p = copy_ops.begin();
   while (p != copy_ops.end()) {
     // requeue this op? can I queue up all of them?
-    cancel_copy((p++)->second, requeue);
+    cancel_copy((p++)->second, requeue, tids);
   }
 }
 
@@ -8817,7 +8895,9 @@ int PrimaryLogPG::start_flush(
       osd->reply_op_error(fop->dup_ops.front(), -EBUSY);
       fop->dup_ops.pop_front();
     }
-    cancel_flush(fop, false);
+    vector<ceph_tid_t> tids;
+    cancel_flush(fop, false, &tids);
+    osd->objecter->op_cancel(tids, -ECANCELED);
   }
 
   /**
@@ -9009,7 +9089,7 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
   }
 
   if (!fop->blocking &&
-      scrubber.write_blocked_by_scrub(oid)) {
+      write_blocked_by_scrub(oid)) {
     if (fop->op) {
       dout(10) << __func__ << " blocked by scrub" << dendl;
       requeue_op(fop->op);
@@ -9017,7 +9097,9 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
       return -EAGAIN;    // will retry
     } else {
       osd->logger->inc(l_osd_tier_try_flush_fail);
-      cancel_flush(fop, false);
+      vector<ceph_tid_t> tids;
+      cancel_flush(fop, false, &tids);
+      osd->objecter->op_cancel(tids, -ECANCELED);
       return -ECANCELED;
     }
   }
@@ -9047,16 +9129,26 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
        fop->op)) {
     dout(20) << __func__ << " took write lock" << dendl;
   } else if (fop->op) {
-    dout(10) << __func__ << " waiting on write lock" << dendl;
+    dout(10) << __func__ << " waiting on write lock " << fop->op << " "
+            << fop->dup_ops << dendl;
     close_op_ctx(ctx.release());
-    requeue_op(fop->op);
-    requeue_ops(fop->dup_ops);
+    // fop->op is now waiting on the lock; get fop->dup_ops to wait too.
+    for (auto op : fop->dup_ops) {
+      bool locked = ctx->lock_manager.get_lock_type(
+       ObjectContext::RWState::RWWRITE,
+       oid,
+       obc,
+       op);
+      assert(!locked);
+    }
     return -EAGAIN;    // will retry
   } else {
     dout(10) << __func__ << " failed write lock, no op; failing" << dendl;
     close_op_ctx(ctx.release());
     osd->logger->inc(l_osd_tier_try_flush_fail);
-    cancel_flush(fop, false);
+    vector<ceph_tid_t> tids;
+    cancel_flush(fop, false, &tids);
+    osd->objecter->op_cancel(tids, -ECANCELED);
     return -ECANCELED;
   }
 
@@ -9096,15 +9188,22 @@ int PrimaryLogPG::try_flush_mark_clean(FlushOpRef fop)
   return -EINPROGRESS;
 }
 
-void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue)
+void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue,
+                               vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << " " << fop->obc->obs.oi.soid << " tid "
           << fop->objecter_tid << dendl;
   if (fop->objecter_tid) {
-    osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED);
+    tids->push_back(fop->objecter_tid);
     fop->objecter_tid = 0;
   }
-  if (fop->blocking) {
+  if (fop->io_tids.size()) {
+    for (auto &p : fop->io_tids) {
+      tids->push_back(p.second);
+      p.second = 0;
+    } 
+  }
+  if (fop->blocking && fop->obc->is_blocked()) {
     fop->obc->stop_block();
     kick_object_context_blocked(fop->obc);
   }
@@ -9120,12 +9219,12 @@ void PrimaryLogPG::cancel_flush(FlushOpRef fop, bool requeue)
   flush_ops.erase(fop->obc->obs.oi.soid);
 }
 
-void PrimaryLogPG::cancel_flush_ops(bool requeue)
+void PrimaryLogPG::cancel_flush_ops(bool requeue, vector<ceph_tid_t> *tids)
 {
   dout(10) << __func__ << dendl;
   map<hobject_t,FlushOpRef>::iterator p = flush_ops.begin();
   while (p != flush_ops.end()) {
-    cancel_flush((p++)->second, requeue);
+    cancel_flush((p++)->second, requeue, tids);
   }
 }
 
@@ -9503,7 +9602,7 @@ void PrimaryLogPG::submit_log_entries(
     [this, entries, repop, on_complete]() {
       ObjectStore::Transaction t;
       eversion_t old_last_update = info.last_update;
-      merge_new_log_entries(entries, t);
+      merge_new_log_entries(entries, t, pg_trim_to, min_last_complete_ondisk);
 
 
       set<pg_shard_t> waiting_on;
@@ -9522,7 +9621,9 @@ void PrimaryLogPG::submit_log_entries(
            pg_whoami.shard,
            get_osdmap()->get_epoch(),
            last_peering_reset,
-           repop->rep_tid);
+           repop->rep_tid,
+           pg_trim_to,
+           min_last_complete_ondisk);
          osd->send_message_osd_cluster(
            peer.osd, m, get_osdmap()->get_epoch());
          waiting_on.insert(peer);
@@ -9604,6 +9705,8 @@ void PrimaryLogPG::submit_log_entries(
       int r = osd->store->queue_transaction(osr.get(), std::move(t), NULL);
       assert(r == 0);
     });
+
+  calc_trim_to();
 }
 
 void PrimaryLogPG::cancel_log_updates()
@@ -9728,7 +9831,7 @@ void PrimaryLogPG::handle_watch_timeout(WatchRef watch)
     return;
   }
 
-  if (scrubber.write_blocked_by_scrub(obc->obs.oi.soid)) {
+  if (write_blocked_by_scrub(obc->obs.oi.soid)) {
     dout(10) << "handle_watch_timeout waiting for scrub on obj "
             << obc->obs.oi.soid
             << dendl;
@@ -10139,6 +10242,11 @@ int PrimaryLogPG::find_object_context(const hobject_t& oid,
   } else {
     auto p = obc->ssc->snapset.clone_snaps.find(soid.snap);
     assert(p != obc->ssc->snapset.clone_snaps.end());
+    if (p->second.empty()) {
+      dout(1) << __func__ << " " << soid << " empty snapset -- DNE" << dendl;
+      assert(!cct->_conf->osd_debug_verify_snaps);
+      return -ENOENT;
+    }
     first = p->second.back();
     last = p->second.front();
   }
@@ -10646,7 +10754,16 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
     op->get_req());
   assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
   ObjectStore::Transaction t;
-  append_log_entries_update_missing(m->entries, t);
+  boost::optional<eversion_t> op_trim_to, op_roll_forward_to;
+  if (m->pg_trim_to != eversion_t())
+    op_trim_to = m->pg_trim_to;
+  if (m->pg_roll_forward_to != eversion_t())
+    op_roll_forward_to = m->pg_roll_forward_to;
+
+  dout(20) << __func__ << " op_trim_to = " << op_trim_to << " op_roll_forward_to = " << op_roll_forward_to << dendl;
+
+  append_log_entries_update_missing(m->entries, t, op_trim_to, op_roll_forward_to);
+  eversion_t new_lcod = info.last_complete;
 
   Context *complete = new FunctionContext(
     [=](int) {
@@ -10654,13 +10771,15 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
        op->get_req());
       lock();
       if (!pg_has_reset_since(msg->get_epoch())) {
+       update_last_complete_ondisk(new_lcod);
        MOSDPGUpdateLogMissingReply *reply =
          new MOSDPGUpdateLogMissingReply(
            spg_t(info.pgid.pgid, primary_shard().shard),
            pg_whoami.shard,
            msg->get_epoch(),
            msg->min_epoch,
-           msg->get_tid());
+           msg->get_tid(),
+           new_lcod);
        reply->set_priority(CEPH_MSG_PRIO_HIGH);
        msg->get_connection()->send_message(reply);
       }
@@ -10702,6 +10821,9 @@ void PrimaryLogPG::do_update_log_missing_reply(OpRequestRef &op)
   if (it != log_entry_update_waiting_on.end()) {
     if (it->second.waiting_on.count(m->get_from())) {
       it->second.waiting_on.erase(m->get_from());
+      if (m->last_complete_ondisk != eversion_t()) {
+       update_peer_last_complete_ondisk(m->get_from(), m->last_complete_ondisk);
+      }
     } else {
       osd->clog->error()
        << info.pgid << " got reply "
@@ -11002,9 +11124,13 @@ void PrimaryLogPG::on_shutdown()
   scrub_clear_state();
 
   unreg_next_scrub();
-  cancel_copy_ops(false);
-  cancel_flush_ops(false);
-  cancel_proxy_ops(false);
+
+  vector<ceph_tid_t> tids;
+  cancel_copy_ops(false, &tids);
+  cancel_flush_ops(false, &tids);
+  cancel_proxy_ops(false, &tids);
+  osd->objecter->op_cancel(tids, -ECANCELED);
+
   apply_and_flush_repops(false);
   cancel_log_updates();
   // we must remove PGRefs, so do this this prior to release_backoffs() callers
@@ -11109,9 +11235,11 @@ void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
 
   clear_scrub_reserved();
 
-  cancel_copy_ops(is_primary());
-  cancel_flush_ops(is_primary());
-  cancel_proxy_ops(is_primary());
+  vector<ceph_tid_t> tids;
+  cancel_copy_ops(is_primary(), &tids);
+  cancel_flush_ops(is_primary(), &tids);
+  cancel_proxy_ops(is_primary(), &tids);
+  osd->objecter->op_cancel(tids, -ECANCELED);
 
   // requeue object waiters
   for (auto& p : waiting_for_unreadable_object) {
@@ -12367,10 +12495,10 @@ void PrimaryLogPG::update_range(
   if (bi->version < info.log_tail) {
     dout(10) << __func__<< ": bi is old, rescanning local backfill_info"
             << dendl;
+    osr->flush();
     if (last_update_applied >= info.log_tail) {
       bi->version = last_update_applied;
     } else {
-      osr->flush();
       bi->version = info.last_update;
     }
     scan_range(local_min, local_max, bi, handle);
@@ -12590,7 +12718,7 @@ void PrimaryLogPG::hit_set_remove_all()
     // Once we hit a degraded object just skip
     if (is_degraded_or_backfilling_object(aoid))
       return;
-    if (scrubber.write_blocked_by_scrub(aoid))
+    if (write_blocked_by_scrub(aoid))
       return;
   }
 
@@ -12709,7 +12837,7 @@ void PrimaryLogPG::hit_set_persist()
     // Once we hit a degraded object just skip further trim
     if (is_degraded_or_backfilling_object(aoid))
       return;
-    if (scrubber.write_blocked_by_scrub(aoid))
+    if (write_blocked_by_scrub(aoid))
       return;
   }
 
@@ -12743,7 +12871,7 @@ void PrimaryLogPG::hit_set_persist()
     new_hset.using_gmt);
 
   // If the current object is degraded we skip this persist request
-  if (scrubber.write_blocked_by_scrub(oid))
+  if (write_blocked_by_scrub(oid))
     return;
 
   hit_set->seal();
@@ -12987,7 +13115,8 @@ bool PrimaryLogPG::agent_work(int start_max, int agent_flush_quota)
       osd->logger->inc(l_osd_agent_skip);
       continue;
     }
-    if (scrubber.write_blocked_by_scrub(obc->obs.oi.soid)) {
+    if (range_intersects_scrub(obc->obs.oi.soid,
+                              obc->obs.oi.soid.get_head())) {
       dout(20) << __func__ << " skip (scrubbing) " << obc->obs.oi << dendl;
       osd->logger->inc(l_osd_agent_skip);
       continue;
@@ -13735,7 +13864,9 @@ unsigned PrimaryLogPG::process_clones_to(const boost::optional<hobject_t> &head,
  */
 void PrimaryLogPG::scrub_snapshot_metadata(
   ScrubMap &scrubmap,
-  const map<hobject_t, pair<uint32_t, uint32_t>> &missing_digest)
+  const map<hobject_t,
+            pair<boost::optional<uint32_t>,
+                 boost::optional<uint32_t>>> &missing_digest)
 {
   dout(10) << __func__ << dendl;
 
@@ -13754,6 +13885,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
   vector<snapid_t>::reverse_iterator curclone; // Defined only if snapset initialized
   unsigned missing = 0;
   inconsistent_snapset_wrapper soid_error, head_error;
+  unsigned soid_error_count = 0;
 
   bufferlist last_data;
 
@@ -13781,7 +13913,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
       osd->clog->error() << mode << " " << info.pgid << " " << soid
                        << " no '" << OI_ATTR << "' attr";
       ++scrubber.shallow_errors;
-      soid_error.set_oi_attr_missing();
+      soid_error.set_info_missing();
     } else {
       bufferlist bv;
       bv.push_back(p->second.attrs[OI_ATTR]);
@@ -13793,8 +13925,8 @@ void PrimaryLogPG::scrub_snapshot_metadata(
        osd->clog->error() << mode << " " << info.pgid << " " << soid
                << " can't decode '" << OI_ATTR << "' attr " << e.what();
        ++scrubber.shallow_errors;
-       soid_error.set_oi_attr_corrupted();
-        soid_error.set_oi_attr_missing(); // Not available too
+       soid_error.set_info_corrupted();
+        soid_error.set_info_missing(); // Not available too
       }
     }
 
@@ -13881,6 +14013,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
       ++scrubber.shallow_errors;
       soid_error.set_headless();
       scrubber.store->add_snap_error(pool.id, soid_error);
+      ++soid_error_count;
       if (head && soid.get_head() == head->get_head())
        head_error.set_clone(soid.snap);
       continue;
@@ -13895,12 +14028,13 @@ void PrimaryLogPG::scrub_snapshot_metadata(
       }
 
       // Save previous head error information
-      if (head && head_error.errors)
+      if (head && (head_error.errors || soid_error_count))
        scrubber.store->add_snap_error(pool.id, head_error);
       // Set this as a new head object
       head = soid;
       missing = 0;
       head_error = soid_error;
+      soid_error_count = 0;
 
       dout(20) << __func__ << " " << mode << " new head " << head << dendl;
 
@@ -13909,7 +14043,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
                          << " no '" << SS_ATTR << "' attr";
         ++scrubber.shallow_errors;
        snapset = boost::none;
-       head_error.set_ss_attr_missing();
+       head_error.set_snapset_missing();
       } else {
        bufferlist bl;
        bl.push_back(p->second.attrs[SS_ATTR]);
@@ -13917,12 +14051,13 @@ void PrimaryLogPG::scrub_snapshot_metadata(
         try {
          snapset = SnapSet(); // Initialize optional<> before decoding into it
          ::decode(snapset.get(), blp);
+          head_error.ss_bl.push_back(p->second.attrs[SS_ATTR]);
         } catch (buffer::error& e) {
          snapset = boost::none;
           osd->clog->error() << mode << " " << info.pgid << " " << soid
                << " can't decode '" << SS_ATTR << "' attr " << e.what();
          ++scrubber.shallow_errors;
-         head_error.set_ss_attr_corrupted();
+         head_error.set_snapset_corrupted();
         }
       }
 
@@ -13936,7 +14071,7 @@ void PrimaryLogPG::scrub_snapshot_metadata(
            osd->clog->error() << mode << " " << info.pgid << " " << soid
                               << " snaps.seq not set";
            ++scrubber.shallow_errors;
-           head_error.set_snapset_mismatch();
+           head_error.set_snapset_error();
           }
        }
 
@@ -14047,8 +14182,10 @@ void PrimaryLogPG::scrub_snapshot_metadata(
 
       // what's next?
       ++curclone;
-      if (soid_error.errors)
+      if (soid_error.errors) {
         scrubber.store->add_snap_error(pool.id, soid_error);
+       ++soid_error_count;
+      }
     }
 
     scrub_cstat.add(stat);
@@ -14068,13 +14205,10 @@ void PrimaryLogPG::scrub_snapshot_metadata(
     log_missing(missing, head, osd->clog, info.pgid, __func__,
                mode, pool.info.allow_incomplete_clones());
   }
-  if (head && head_error.errors)
+  if (head && (head_error.errors || soid_error_count))
     scrubber.store->add_snap_error(pool.id, head_error);
 
-  for (map<hobject_t,pair<uint32_t,uint32_t>>::const_iterator p =
-        missing_digest.begin();
-       p != missing_digest.end();
-       ++p) {
+  for (auto p = missing_digest.begin(); p != missing_digest.end(); ++p) {
     if (p->first.is_snapdir())
       continue;
     dout(10) << __func__ << " recording digests for " << p->first << dendl;
@@ -14094,8 +14228,16 @@ void PrimaryLogPG::scrub_snapshot_metadata(
     OpContextUPtr ctx = simple_opc_create(obc);
     ctx->at_version = get_next_version();
     ctx->mtime = utime_t();      // do not update mtime
-    ctx->new_obs.oi.set_data_digest(p->second.first);
-    ctx->new_obs.oi.set_omap_digest(p->second.second);
+    if (p->second.first) {
+      ctx->new_obs.oi.set_data_digest(*p->second.first);
+    } else {
+      ctx->new_obs.oi.clear_data_digest();
+    }
+    if (p->second.second) {
+      ctx->new_obs.oi.set_omap_digest(*p->second.second);
+    } else {
+      ctx->new_obs.oi.clear_omap_digest();
+    }
     finish_ctx(ctx.get(), pg_log_entry_t::MODIFY);
 
     ctx->register_on_success(