]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PGBackend.cc
Import ceph 15.2.8
[ceph.git] / ceph / src / osd / PGBackend.cc
index f77b8bfad933203c9c3d3e6520f988f90c3f069e..be93941196734c73a10a99aa78061dd5763f361a 100644 (file)
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, this)
 static ostream& _prefix(std::ostream *_dout, PGBackend *pgb) {
-  return *_dout << pgb->get_parent()->gen_dbg_prefix();
+  return pgb->get_parent()->gen_dbg_prefix(*_dout);
 }
 
 void PGBackend::recover_delete_object(const hobject_t &oid, eversion_t v,
                                      RecoveryHandle *h)
 {
-  assert(get_parent()->get_actingbackfill_shards().size() > 0);
-  for (const auto& shard : get_parent()->get_actingbackfill_shards()) {
+  ceph_assert(get_parent()->get_acting_recovery_backfill_shards().size() > 0);
+  for (const auto& shard : get_parent()->get_acting_recovery_backfill_shards()) {
     if (shard == get_parent()->whoami_shard())
       continue;
     if (get_parent()->get_shard_missing(shard).is_missing(oid)) {
@@ -64,7 +64,7 @@ void PGBackend::send_recovery_deletes(int prio,
     const auto& objects = p.second;
     ConnectionRef con = get_parent()->get_con_osd_cluster(
       shard.osd,
-      get_osdmap()->get_epoch());
+      get_osdmap_epoch());
     if (!con)
       continue;
     auto it = objects.begin();
@@ -75,7 +75,7 @@ void PGBackend::send_recovery_deletes(int prio,
       MOSDPGRecoveryDelete *msg =
        new MOSDPGRecoveryDelete(get_parent()->whoami_shard(),
                                 target_pg,
-                                get_osdmap()->get_epoch(),
+                                get_osdmap_epoch(),
                                 min_epoch);
       msg->set_priority(prio);
 
@@ -116,8 +116,8 @@ bool PGBackend::handle_message(OpRequestRef op)
 
 void PGBackend::handle_recovery_delete(OpRequestRef op)
 {
-  const MOSDPGRecoveryDelete *m = static_cast<const MOSDPGRecoveryDelete *>(op->get_req());
-  assert(m->get_type() == MSG_OSD_PG_RECOVERY_DELETE);
+  auto m = op->get_req<MOSDPGRecoveryDelete>();
+  ceph_assert(m->get_type() == MSG_OSD_PG_RECOVERY_DELETE);
   dout(20) << __func__ << " " << op << dendl;
 
   op->mark_started();
@@ -136,7 +136,7 @@ void PGBackend::handle_recovery_delete(OpRequestRef op)
   reply->objects = m->objects;
   ConnectionRef conn = m->get_connection();
 
-  gather.set_finisher(new FunctionContext(
+  gather.set_finisher(new LambdaContext(
     [=](int r) {
       if (r != -EAGAIN) {
        get_parent()->send_message_osd_cluster(reply, conn.get());
@@ -149,8 +149,8 @@ void PGBackend::handle_recovery_delete(OpRequestRef op)
 
 void PGBackend::handle_recovery_delete_reply(OpRequestRef op)
 {
-  const MOSDPGRecoveryDeleteReply *m = static_cast<const MOSDPGRecoveryDeleteReply *>(op->get_req());
-  assert(m->get_type() == MSG_OSD_PG_RECOVERY_DELETE_REPLY);
+  auto m = op->get_req<MOSDPGRecoveryDeleteReply>();
+  ceph_assert(m->get_type() == MSG_OSD_PG_RECOVERY_DELETE_REPLY);
   dout(20) << __func__ << " " << op << dendl;
 
   for (const auto &p : m->objects) {
@@ -159,7 +159,7 @@ void PGBackend::handle_recovery_delete_reply(OpRequestRef op)
     recovery_info.version = p.second;
     get_parent()->on_peer_recover(m->from, oid, recovery_info);
     bool peers_recovered = true;
-    for (const auto& shard : get_parent()->get_actingbackfill_shards()) {
+    for (const auto& shard : get_parent()->get_acting_recovery_backfill_shards()) {
       if (shard == get_parent()->whoami_shard())
        continue;
       if (get_parent()->get_shard_missing(shard).is_missing(oid)) {
@@ -197,7 +197,7 @@ void PGBackend::rollback(
       temp.append(t);
       temp.swap(t);
     }
-    void setattrs(map<string, boost::optional<bufferlist> > &attrs) override {
+    void setattrs(map<string, std::optional<bufferlist> > &attrs) override {
       ObjectStore::Transaction temp;
       pg->rollback_setattrs(hoid, attrs, &temp);
       temp.append(t);
@@ -237,7 +237,7 @@ void PGBackend::rollback(
     }
   };
 
-  assert(entry.mod_desc.can_rollback());
+  ceph_assert(entry.mod_desc.can_rollback());
   RollbackVisitor vis(entry.soid, this);
   entry.mod_desc.visit(&vis);
   t->append(vis.t);
@@ -305,7 +305,7 @@ void PGBackend::try_stash(
 void PGBackend::remove(
   const hobject_t &hoid,
   ObjectStore::Transaction *t) {
-  assert(!hoid.is_temp());
+  ceph_assert(!hoid.is_temp());
   t->remove(
     coll,
     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
@@ -335,7 +335,7 @@ int PGBackend::objects_list_partial(
   vector<hobject_t> *ls,
   hobject_t *next)
 {
-  assert(ls);
+  ceph_assert(ls);
   // Starts with the smallest generation to make sure the result list
   // has the marker object (it might have multiple generations
   // though, which would be filtered).
@@ -350,13 +350,24 @@ int PGBackend::objects_list_partial(
 
   while (!_next.is_max() && ls->size() < (unsigned)min) {
     vector<ghobject_t> objects;
-    r = store->collection_list(
-      ch,
-      _next,
-      ghobject_t::get_max(),
-      max - ls->size(),
-      &objects,
-      &_next);
+    if (HAVE_FEATURE(parent->min_upacting_features(),
+                     OSD_FIXED_COLLECTION_LIST)) {
+      r = store->collection_list(
+        ch,
+        _next,
+        ghobject_t::get_max(),
+        max - ls->size(),
+        &objects,
+        &_next);
+    } else {
+      r = store->collection_list_legacy(
+        ch,
+        _next,
+        ghobject_t::get_max(),
+        max - ls->size(),
+        &objects,
+        &_next);
+    }
     if (r != 0) {
       derr << __func__ << " list collection " << ch << " got: " << cpp_strerror(r) << dendl;
       break;
@@ -380,19 +391,30 @@ int PGBackend::objects_list_partial(
 int PGBackend::objects_list_range(
   const hobject_t &start,
   const hobject_t &end,
-  snapid_t seq,
   vector<hobject_t> *ls,
   vector<ghobject_t> *gen_obs)
 {
-  assert(ls);
+  ceph_assert(ls);
   vector<ghobject_t> objects;
-  int r = store->collection_list(
-    ch,
-    ghobject_t(start, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
-    ghobject_t(end, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
-    INT_MAX,
-    &objects,
-    NULL);
+  int r;
+  if (HAVE_FEATURE(parent->min_upacting_features(),
+                   OSD_FIXED_COLLECTION_LIST)) {
+    r = store->collection_list(
+      ch,
+      ghobject_t(start, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+      ghobject_t(end, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+      INT_MAX,
+      &objects,
+      NULL);
+  } else {
+    r = store->collection_list_legacy(
+      ch,
+      ghobject_t(start, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+      ghobject_t(end, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+      INT_MAX,
+      &objects,
+      NULL);
+  }
   ls->reserve(objects.size());
   for (vector<ghobject_t>::iterator i = objects.begin();
        i != objects.end();
@@ -439,15 +461,15 @@ int PGBackend::objects_get_attrs(
 
 void PGBackend::rollback_setattrs(
   const hobject_t &hoid,
-  map<string, boost::optional<bufferlist> > &old_attrs,
+  map<string, std::optional<bufferlist> > &old_attrs,
   ObjectStore::Transaction *t) {
   map<string, bufferlist> to_set;
-  assert(!hoid.is_temp());
-  for (map<string, boost::optional<bufferlist> >::iterator i = old_attrs.begin();
+  ceph_assert(!hoid.is_temp());
+  for (map<string, std::optional<bufferlist> >::iterator i = old_attrs.begin();
        i != old_attrs.end();
        ++i) {
     if (i->second) {
-      to_set[i->first] = i->second.get();
+      to_set[i->first] = *(i->second);
     } else {
       t->rmattr(
        coll,
@@ -465,7 +487,7 @@ void PGBackend::rollback_append(
   const hobject_t &hoid,
   uint64_t old_size,
   ObjectStore::Transaction *t) {
-  assert(!hoid.is_temp());
+  ceph_assert(!hoid.is_temp());
   t->truncate(
     coll,
     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
@@ -476,7 +498,7 @@ void PGBackend::rollback_stash(
   const hobject_t &hoid,
   version_t old_version,
   ObjectStore::Transaction *t) {
-  assert(!hoid.is_temp());
+  ceph_assert(!hoid.is_temp());
   t->remove(
     coll,
     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
@@ -491,7 +513,7 @@ void PGBackend::rollback_try_stash(
   const hobject_t &hoid,
   version_t old_version,
   ObjectStore::Transaction *t) {
-  assert(!hoid.is_temp());
+  ceph_assert(!hoid.is_temp());
   t->remove(
     coll,
     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
@@ -525,36 +547,35 @@ void PGBackend::trim_rollback_object(
   const hobject_t &hoid,
   version_t old_version,
   ObjectStore::Transaction *t) {
-  assert(!hoid.is_temp());
+  ceph_assert(!hoid.is_temp());
   t->remove(
     coll, ghobject_t(hoid, old_version, get_parent()->whoami_shard().shard));
 }
 
 PGBackend *PGBackend::build_pg_backend(
   const pg_pool_t &pool,
-  const OSDMapRef curmap,
+  const map<string,string>& profile,
   Listener *l,
   coll_t coll,
   ObjectStore::CollectionHandle &ch,
   ObjectStore *store,
   CephContext *cct)
 {
+  ErasureCodeProfile ec_profile = profile;
   switch (pool.type) {
   case pg_pool_t::TYPE_REPLICATED: {
     return new ReplicatedBackend(l, coll, ch, store, cct);
   }
   case pg_pool_t::TYPE_ERASURE: {
     ErasureCodeInterfaceRef ec_impl;
-    ErasureCodeProfile profile = curmap->get_erasure_code_profile(pool.erasure_code_profile);
-    assert(profile.count("plugin"));
     stringstream ss;
     ceph::ErasureCodePluginRegistry::instance().factory(
       profile.find("plugin")->second,
-      cct->_conf->get_val<std::string>("erasure_code_dir"),
-      profile,
+      cct->_conf.get_val<std::string>("erasure_code_dir"),
+      ec_profile,
       &ec_impl,
       &ss);
-    assert(ec_impl);
+    ceph_assert(ec_impl);
     return new ECBackend(
       l,
       coll,
@@ -575,8 +596,8 @@ int PGBackend::be_scan_list(
   ScrubMapBuilder &pos)
 {
   dout(10) << __func__ << " " << pos << dendl;
-  assert(!pos.done());
-  assert(pos.pos < pos.ls.size());
+  ceph_assert(!pos.done());
+  ceph_assert(pos.pos < pos.ls.size());
   hobject_t& poid = pos.ls[pos.pos];
 
   struct stat st;
@@ -589,7 +610,7 @@ int PGBackend::be_scan_list(
   if (r == 0) {
     ScrubMap::object &o = map.objects[poid];
     o.size = st.st_size;
-    assert(!o.negative);
+    ceph_assert(!o.negative);
     store->getattrs(
       ch,
       ghobject_t(
@@ -626,18 +647,10 @@ bool PGBackend::be_compare_scrub_objects(
   const ScrubMap::object &candidate,
   shard_info_wrapper &shard_result,
   inconsistent_obj_wrapper &obj_result,
-  ostream &errorstream)
+  ostream &errorstream,
+  bool has_snapset)
 {
   enum { CLEAN, FOUND_ERROR } error = CLEAN;
-  if (candidate.stat_error) {
-    assert(shard_result.has_stat_error());
-    error = FOUND_ERROR;
-    errorstream << "candidate had a stat error";
-  }
-  if (candidate.read_error || candidate.ec_hash_mismatch || candidate.ec_size_mismatch) {
-    error = FOUND_ERROR;
-    errorstream << "candidate had a read error";
-  }
   if (auth.digest_present && candidate.digest_present) {
     if (auth.digest != candidate.digest) {
       if (error != CLEAN)
@@ -686,6 +699,67 @@ bool PGBackend::be_compare_scrub_objects(
   }
   if (candidate.stat_error)
     return error == FOUND_ERROR;
+  if (!shard_result.has_info_missing()
+      && !shard_result.has_info_corrupted()) {
+    bufferlist can_bl, auth_bl;
+    auto can_attr = candidate.attrs.find(OI_ATTR);
+    auto auth_attr = auth.attrs.find(OI_ATTR);
+
+    ceph_assert(auth_attr != auth.attrs.end());
+    ceph_assert(can_attr != candidate.attrs.end());
+
+    can_bl.push_back(can_attr->second);
+    auth_bl.push_back(auth_attr->second);
+    if (!can_bl.contents_equal(auth_bl)) {
+      if (error != CLEAN)
+        errorstream << ", ";
+      error = FOUND_ERROR;
+      obj_result.set_object_info_inconsistency();
+      errorstream << "object info inconsistent ";
+    }
+  }
+  if (has_snapset) {
+    if (!shard_result.has_snapset_missing()
+        && !shard_result.has_snapset_corrupted()) {
+      bufferlist can_bl, auth_bl;
+      auto can_attr = candidate.attrs.find(SS_ATTR);
+      auto auth_attr = auth.attrs.find(SS_ATTR);
+
+      ceph_assert(auth_attr != auth.attrs.end());
+      ceph_assert(can_attr != candidate.attrs.end());
+
+      can_bl.push_back(can_attr->second);
+      auth_bl.push_back(auth_attr->second);
+      if (!can_bl.contents_equal(auth_bl)) {
+         if (error != CLEAN)
+           errorstream << ", ";
+         error = FOUND_ERROR;
+         obj_result.set_snapset_inconsistency();
+         errorstream << "snapset inconsistent ";
+      }
+    }
+  }
+  if (parent->get_pool().is_erasure()) {
+    if (!shard_result.has_hinfo_missing()
+        && !shard_result.has_hinfo_corrupted()) {
+      bufferlist can_bl, auth_bl;
+      auto can_hi = candidate.attrs.find(ECUtil::get_hinfo_key());
+      auto auth_hi = auth.attrs.find(ECUtil::get_hinfo_key());
+
+      ceph_assert(auth_hi != auth.attrs.end());
+      ceph_assert(can_hi != candidate.attrs.end());
+
+      can_bl.push_back(can_hi->second);
+      auth_bl.push_back(auth_hi->second);
+      if (!can_bl.contents_equal(auth_bl)) {
+        if (error != CLEAN)
+         errorstream << ", ";
+       error = FOUND_ERROR;
+       obj_result.set_hinfo_inconsistency();
+       errorstream << "hinfo inconsistent ";
+      }
+    }
+  }
   uint64_t oi_size = be_get_ondisk_size(auth_oi.size);
   if (oi_size != candidate.size) {
     if (error != CLEAN)
@@ -705,6 +779,18 @@ bool PGBackend::be_compare_scrub_objects(
                << " from shard " << auth_shard;
     obj_result.set_size_mismatch();
   }
+  // If the replica is too large and we didn't already count it for this object
+  //
+  if (candidate.size > cct->_conf->osd_max_object_size
+      && !obj_result.has_size_too_large()) {
+    if (error != CLEAN)
+      errorstream << ", ";
+    error = FOUND_ERROR;
+    errorstream << "size " << candidate.size
+               << " > " << cct->_conf->osd_max_object_size
+               << " is too large";
+    obj_result.set_size_too_large();
+  }
   for (map<string,bufferptr>::const_iterator i = auth.attrs.begin();
        i != auth.attrs.end();
        ++i) {
@@ -742,12 +828,9 @@ bool PGBackend::be_compare_scrub_objects(
   return error == FOUND_ERROR;
 }
 
-static int dcount(const object_info_t &oi, bool prioritize)
+static int dcount(const object_info_t &oi)
 {
   int count = 0;
-  // Prioritize bluestore objects when osd_distrust_data_digest is set
-  if (prioritize)
-    count += 1000;
   if (oi.is_data_digest())
     count++;
   if (oi.is_omap_digest())
@@ -761,12 +844,11 @@ map<pg_shard_t, ScrubMap *>::const_iterator
   const map<pg_shard_t,ScrubMap*> &maps,
   object_info_t *auth_oi,
   map<pg_shard_t, shard_info_wrapper> &shard_map,
-  inconsistent_obj_wrapper &object_error,
-  bool &digest_match)
+  bool &digest_match,
+  spg_t pgid,
+  ostream &errorstream)
 {
   eversion_t auth_version;
-  bool auth_prio = false;
-  bufferlist first_oi_bl, first_ss_bl, first_hk_bl;
 
   // Create list of shards with primary first so it will be auth copy all
   // other things being equal.
@@ -783,28 +865,37 @@ map<pg_shard_t, ScrubMap *>::const_iterator
   map<pg_shard_t, ScrubMap *>::const_iterator auth = maps.end();
   digest_match = true;
   for (auto &l : shards) {
-    bool oi_prio = false;
+    ostringstream shard_errorstream;
+    bool error = false;
     map<pg_shard_t, ScrubMap *>::const_iterator j = maps.find(l);
     map<hobject_t, ScrubMap::object>::iterator i =
       j->second->objects.find(obj);
     if (i == j->second->objects.end()) {
       continue;
     }
-    string error_string;
     auto& shard_info = shard_map[j->first];
     if (j->first == get_parent()->whoami_shard())
       shard_info.primary = true;
     if (i->second.read_error) {
       shard_info.set_read_error();
-      error_string += " read_error";
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate had a read error";
     }
     if (i->second.ec_hash_mismatch) {
       shard_info.set_ec_hash_mismatch();
-      error_string += " ec_hash_mismatch";
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate had an ec hash mismatch";
     }
     if (i->second.ec_size_mismatch) {
       shard_info.set_ec_size_mismatch();
-      error_string += " ec_size_mismatch";
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate had an ec size mismatch";
     }
 
     object_info_t oi;
@@ -815,33 +906,37 @@ map<pg_shard_t, ScrubMap *>::const_iterator
 
     if (i->second.stat_error) {
       shard_info.set_stat_error();
-      error_string += " stat_error";
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate had a stat error";
       // With stat_error no further checking
       // We don't need to also see a missing_object_info_attr
       goto out;
     }
 
     // We won't pick an auth copy if the snapset is missing or won't decode.
-    if (obj.is_head() || obj.is_snapdir()) {
+    ceph_assert(!obj.is_snapdir());
+    if (obj.is_head()) {
       k = i->second.attrs.find(SS_ATTR);
       if (k == i->second.attrs.end()) {
        shard_info.set_snapset_missing();
-       error_string += " snapset_missing";
+        if (error)
+          shard_errorstream << ", ";
+        error = true;
+        shard_errorstream << "candidate had a missing snapset key";
       } else {
         ss_bl.push_back(k->second);
         try {
-         bufferlist::iterator bliter = ss_bl.begin();
-         ::decode(ss, bliter);
-         if (first_ss_bl.length() == 0) {
-           first_ss_bl.append(ss_bl);
-         } else if (!object_error.has_snapset_inconsistency() && !ss_bl.contents_equal(first_ss_bl)) {
-           object_error.set_snapset_inconsistency();
-           error_string += " snapset_inconsistency";
-         }
+         auto bliter = ss_bl.cbegin();
+         decode(ss, bliter);
         } catch (...) {
          // invalid snapset, probably corrupt
          shard_info.set_snapset_corrupted();
-         error_string += " snapset_corrupted";
+          if (error)
+            shard_errorstream << ", ";
+          error = true;
+          shard_errorstream << "candidate had a corrupt snapset";
         }
       }
     }
@@ -851,22 +946,22 @@ map<pg_shard_t, ScrubMap *>::const_iterator
       k = i->second.attrs.find(ECUtil::get_hinfo_key());
       if (k == i->second.attrs.end()) {
        shard_info.set_hinfo_missing();
-       error_string += " hinfo_key_missing";
+        if (error)
+          shard_errorstream << ", ";
+        error = true;
+        shard_errorstream << "candidate had a missing hinfo key";
       } else {
        hk_bl.push_back(k->second);
         try {
-         bufferlist::iterator bliter = hk_bl.begin();
+         auto bliter = hk_bl.cbegin();
          decode(hi, bliter);
-         if (first_hk_bl.length() == 0) {
-           first_hk_bl.append(hk_bl);
-         } else if (!object_error.has_hinfo_inconsistency() && !hk_bl.contents_equal(first_hk_bl)) {
-           object_error.set_hinfo_inconsistency();
-           error_string += " hinfo_inconsistency";
-         }
         } catch (...) {
          // invalid snapset, probably corrupt
          shard_info.set_hinfo_corrupted();
-         error_string += " hinfo_corrupted";
+          if (error)
+            shard_errorstream << ", ";
+          error = true;
+          shard_errorstream << "candidate had a corrupt hinfo";
         }
       }
     }
@@ -875,34 +970,36 @@ map<pg_shard_t, ScrubMap *>::const_iterator
     if (k == i->second.attrs.end()) {
       // no object info on object, probably corrupt
       shard_info.set_info_missing();
-      error_string += " info_missing";
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate had a missing info key";
       goto out;
     }
     bl.push_back(k->second);
     try {
-      bufferlist::iterator bliter = bl.begin();
-      ::decode(oi, bliter);
+      auto bliter = bl.cbegin();
+      decode(oi, bliter);
     } catch (...) {
       // invalid object info, probably corrupt
       shard_info.set_info_corrupted();
-      error_string += " info_corrupted";
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate had a corrupt info";
       goto out;
     }
 
     // This is automatically corrected in PG::_repair_oinfo_oid()
-    assert(oi.soid == obj);
-
-    if (first_oi_bl.length() == 0) {
-      first_oi_bl.append(bl);
-    } else if (!object_error.has_object_info_inconsistency() && !bl.contents_equal(first_oi_bl)) {
-      object_error.set_object_info_inconsistency();
-      error_string += " object_info_inconsistency";
-    }
+    ceph_assert(oi.soid == obj);
 
     if (i->second.size != be_get_ondisk_size(oi.size)) {
-      dout(5) << __func__ << " size " << i->second.size << " oi size " << oi.size << dendl;
       shard_info.set_obj_size_info_mismatch();
-      error_string += " obj_size_info_mismatch";
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate size " << i->second.size << " info size "
+                       << oi.size << " mismatch";
     }
 
     // digest_match will only be true if computed digests are the same
@@ -921,31 +1018,17 @@ map<pg_shard_t, ScrubMap *>::const_iterator
     if (shard_info.errors)
       goto out;
 
-    // XXX: Do I want replicated only?
-    if (parent->get_pool().is_replicated() && cct->_conf->osd_distrust_data_digest) {
-      // This is a boost::optional<bool> so see if option set AND it has the value true
-      // We give priority to a replica where the ObjectStore like BlueStore has builtin checksum
-      if (j->second->has_builtin_csum && j->second->has_builtin_csum == true) {
-        oi_prio = true;
-      }
-    }
-
     if (auth_version == eversion_t() || oi.version > auth_version ||
-        (oi.version == auth_version && dcount(oi, oi_prio) > dcount(*auth_oi, auth_prio))) {
+        (oi.version == auth_version && dcount(oi) > dcount(*auth_oi))) {
       auth = j;
       *auth_oi = oi;
       auth_version = oi.version;
-      auth_prio = oi_prio;
     }
 
 out:
-    // Check error_string because some errors already generated messages
-    if (error_string != "") {
-      dout(10) << __func__ << ": error(s) osd " << j->first
-              << " for obj " << obj
-              << "," << error_string
-              << dendl;
-    }
+    if (error)
+        errorstream << pgid.pgid << " shard " << l << " soid " << obj
+                   << " : " << shard_errorstream.str() << "\n";
     // Keep scanning other shards
   }
   dout(10) << __func__ << ": selecting osd " << auth->first
@@ -962,8 +1045,8 @@ void PGBackend::be_compare_scrubmaps(
   map<hobject_t, set<pg_shard_t>> &missing,
   map<hobject_t, set<pg_shard_t>> &inconsistent,
   map<hobject_t, list<pg_shard_t>> &authoritative,
-  map<hobject_t, pair<boost::optional<uint32_t>,
-                      boost::optional<uint32_t>>> &missing_digest,
+  map<hobject_t, pair<std::optional<uint32_t>,
+                      std::optional<uint32_t>>> &missing_digest,
   int &shallow_errors, int &deep_errors,
   Scrub::Store *store,
   const spg_t& pgid,
@@ -983,8 +1066,8 @@ void PGBackend::be_compare_scrubmaps(
 
     bool digest_match;
     map<pg_shard_t, ScrubMap *>::const_iterator auth =
-      be_select_auth_object(*k, maps, &auth_oi, shard_map, object_error,
-                            digest_match);
+      be_select_auth_object(*k, maps, &auth_oi, shard_map, digest_match,
+                           pgid, errorstream);
 
     list<pg_shard_t> auth_list;
     set<pg_shard_t> object_errors;
@@ -998,7 +1081,7 @@ void PGBackend::be_compare_scrubmaps(
        ++shallow_errors;
       store->add_object_error(k->pool, object_error);
       errorstream << pgid.pgid << " soid " << *k
-                 << ": failed to pick suitable object info\n";
+                 << " : failed to pick suitable object info\n";
       continue;
     }
     object_error.set_version(auth_oi.user_version);
@@ -1007,7 +1090,7 @@ void PGBackend::be_compare_scrubmaps(
     set<pg_shard_t> cur_inconsistent;
     bool fix_digest = false;
 
-    for (auto  j = maps.cbegin(); j != maps.cend(); ++j) {
+    for (auto j = maps.cbegin(); j != maps.cend(); ++j) {
       if (j == auth)
        shard_map[auth->first].selected_oi = true;
       if (j->second->objects.count(*k)) {
@@ -1020,30 +1103,23 @@ void PGBackend::be_compare_scrubmaps(
                                   j->second->objects[*k],
                                   shard_map[j->first],
                                   object_error,
-                                  ss);
+                                  ss,
+                                  k->has_snapset());
 
        dout(20) << __func__ << (repair ? " repair " : " ") << (parent->get_pool().is_replicated() ? "replicated " : "")
-         << (j == auth ? "auth " : "") << "shards " << shard_map.size() << (digest_match ? " digest_match " : " ")
-         << (shard_map[j->first].has_data_digest_mismatch_info() ? "info_mismatch " : "")
-         << (shard_map[j->first].only_data_digest_mismatch_info() ? "only" : "")
-         << dendl;
-
-        if (cct->_conf->osd_distrust_data_digest) {
-         if (digest_match && parent->get_pool().is_replicated()
-              && shard_map[j->first].has_data_digest_mismatch_info()) {
-           fix_digest = true;
-         }
-         shard_map[j->first].clear_data_digest_mismatch_info();
+        << (j == auth ? "auth" : "") << "shards " << shard_map.size() << (digest_match ? " digest_match " : " ")
+        << (shard_map[j->first].only_data_digest_mismatch_info() ? "'info mismatch info'" : "")
+        << dendl;
        // If all replicas match, but they don't match object_info we can
        // repair it by using missing_digest mechanism
-       } else if (repair && parent->get_pool().is_replicated() && j == auth && shard_map.size() > 1
+       if (repair && parent->get_pool().is_replicated() && j == auth && shard_map.size() > 1
            && digest_match && shard_map[j->first].only_data_digest_mismatch_info()
            && auth_object.digest_present) {
          // Set in missing_digests
          fix_digest = true;
          // Clear the error
          shard_map[j->first].clear_data_digest_mismatch_info();
-         errorstream << pgid << " : soid " << *k << " repairing object info data_digest" << "\n";
+         errorstream << pgid << " soid " << *k << " : repairing object info data_digest" << "\n";
        }
        // Some errors might have already been set in be_select_auth_object()
        if (shard_map[j->first].errors != 0) {
@@ -1055,14 +1131,13 @@ void PGBackend::be_compare_scrubmaps(
          // Only true if be_compare_scrub_objects() found errors and put something
          // in ss.
          if (found)
-           errorstream << pgid << " shard " << j->first << ": soid " << *k
-                     << " " << ss.str() << "\n";
-       } else if (object_error.errors != 0) {
+           errorstream << pgid << " shard " << j->first << " soid " << *k
+                     << " " << ss.str() << "\n";
+       } else if (found) {
          // Track possible shard to use as authoritative, if needed
          // There are errors, without identifying the shard
          object_errors.insert(j->first);
-         if (found)
-           errorstream << pgid << " : soid " << *k << " " << ss.str() << "\n";
+         errorstream << pgid << " soid " << *k << " : " << ss.str() << "\n";
        } else {
          // XXX: The auth shard might get here that we don't know
          // that it has the "correct" data.
@@ -1074,8 +1149,7 @@ void PGBackend::be_compare_scrubmaps(
         shard_map[j->first].primary = (j->first == get_parent()->whoami_shard());
        // Can't have any other errors if there is no information available
        ++shallow_errors;
-       errorstream << pgid << " shard " << j->first << " missing " << *k
-                   << "\n";
+       errorstream << pgid << " shard " << j->first << " " << *k << " : missing\n";
       }
       object_error.add_shard(j->first, shard_map[j->first]);
     }
@@ -1083,7 +1157,7 @@ void PGBackend::be_compare_scrubmaps(
     if (auth_list.empty()) {
       if (object_errors.empty()) {
         errorstream << pgid.pgid << " soid " << *k
-                 << ": failed to pick suitable auth object\n";
+                 << " : failed to pick suitable auth object\n";
         goto out;
       }
       // Object errors exist and nothing in auth_list
@@ -1108,17 +1182,14 @@ void PGBackend::be_compare_scrubmaps(
     }
 
     if (fix_digest) {
-      boost::optional<uint32_t> data_digest, omap_digest;
-      assert(auth_object.digest_present);
+      std::optional<uint32_t> data_digest, omap_digest;
+      ceph_assert(auth_object.digest_present);
       data_digest = auth_object.digest;
       if (auth_object.omap_digest_present) {
         omap_digest = auth_object.omap_digest;
       }
       missing_digest[*k] = make_pair(data_digest, omap_digest);
     }
-    // Special handling of this particular type of inconsistency
-    // This can over-ride a data_digest or set an omap_digest
-    // when all replicas match but the object info is wrong.
     if (!cur_inconsistent.empty() || !cur_missing.empty()) {
       authoritative[*k] = auth_list;
     } else if (!fix_digest && parent->get_pool().is_replicated()) {
@@ -1140,8 +1211,7 @@ void PGBackend::be_compare_scrubmaps(
       // recorded digest != actual digest?
       if (auth_oi.is_data_digest() && auth_object.digest_present &&
          auth_oi.data_digest != auth_object.digest) {
-       assert(cct->_conf->osd_distrust_data_digest
-              || shard_map[auth->first].has_data_digest_mismatch_info());
+        ceph_assert(shard_map[auth->first].has_data_digest_mismatch_info());
        errorstream << pgid << " recorded data digest 0x"
                    << std::hex << auth_oi.data_digest << " != on disk 0x"
                    << auth_object.digest << std::dec << " on " << auth_oi.soid
@@ -1151,7 +1221,7 @@ void PGBackend::be_compare_scrubmaps(
       }
       if (auth_oi.is_omap_digest() && auth_object.omap_digest_present &&
          auth_oi.omap_digest != auth_object.omap_digest) {
-        assert(shard_map[auth->first].has_omap_digest_mismatch_info());
+        ceph_assert(shard_map[auth->first].has_omap_digest_mismatch_info());
        errorstream << pgid << " recorded omap digest 0x"
                    << std::hex << auth_oi.omap_digest << " != on disk 0x"
                    << auth_object.omap_digest << std::dec
@@ -1164,7 +1234,7 @@ void PGBackend::be_compare_scrubmaps(
        utime_t age = now - auth_oi.local_mtime;
        if (update == FORCE ||
            age > cct->_conf->osd_deep_scrub_update_digest_min_age) {
-          boost::optional<uint32_t> data_digest, omap_digest;
+          std::optional<uint32_t> data_digest, omap_digest;
           if (auth_object.digest_present) {
             data_digest = auth_object.digest;
            dout(20) << __func__ << " will update data digest on " << *k << dendl;
@@ -1192,32 +1262,47 @@ out:
   }
 }
 
-void PGBackend::be_large_omap_check(const map<pg_shard_t,ScrubMap*> &maps,
+void PGBackend::be_omap_checks(const map<pg_shard_t,ScrubMap*> &maps,
   const set<hobject_t> &master_set,
-  int& large_omap_objects,
+  omap_stat_t& omap_stats,
   ostream &warnstream) const
 {
-  bool needs_check = false;
+  bool needs_omap_check = false;
   for (const auto& map : maps) {
-    if (map.second->has_large_omap_object_errors) {
-      needs_check = true;
+    if (map.second->has_large_omap_object_errors || map.second->has_omap_keys) {
+      needs_omap_check = true;
       break;
     }
   }
 
-  if (!needs_check) {
-    return;
+  if (!needs_omap_check) {
+    return; // Nothing to do
   }
 
-  // Iterate through objects and check large omap object flag
+  // Iterate through objects and update omap stats
   for (const auto& k : master_set) {
     for (const auto& map : maps) {
-      ScrubMap::object& obj = map.second->objects[k];
+      if (map.first != get_parent()->primary_shard()) {
+        // Only set omap stats for the primary
+        continue;
+      }
+      auto it = map.second->objects.find(k);
+      if (it == map.second->objects.end())
+        continue;
+      ScrubMap::object& obj = it->second;
+      omap_stats.omap_bytes += obj.object_omap_bytes;
+      omap_stats.omap_keys += obj.object_omap_keys;
       if (obj.large_omap_object_found) {
-        large_omap_objects++;
-        warnstream << "Large omap object found. Object: " << k << " Key count: "
-                   << obj.large_omap_object_key_count << " Size (bytes): "
-                   << obj.large_omap_object_value_size << '\n';
+        pg_t pg;
+        auto osdmap = get_osdmap();
+        osdmap->map_to_pg(k.pool, k.oid.name, k.get_key(), k.nspace, &pg);
+        pg_t mpg = osdmap->raw_pg_to_pg(pg);
+        omap_stats.large_omap_objects++;
+        warnstream << "Large omap object found. Object: " << k
+                   << " PG: " << pg << " (" << mpg << ")"
+                   << " Key count: " << obj.large_omap_object_key_count
+                   << " Size (bytes): " << obj.large_omap_object_value_size
+                   << '\n';
         break;
       }
     }