]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/osd/PGBackend.cc
Import ceph 15.2.8
[ceph.git] / ceph / src / osd / PGBackend.cc
index 9663729d5471318c20b0b99625e216b33892626c..be93941196734c73a10a99aa78061dd5763f361a 100644 (file)
 #undef dout_prefix
 #define dout_prefix _prefix(_dout, this)
 static ostream& _prefix(std::ostream *_dout, PGBackend *pgb) {
-  return *_dout << pgb->get_parent()->gen_dbg_prefix();
+  return pgb->get_parent()->gen_dbg_prefix(*_dout);
 }
 
 void PGBackend::recover_delete_object(const hobject_t &oid, eversion_t v,
                                      RecoveryHandle *h)
 {
-  assert(get_parent()->get_actingbackfill_shards().size() > 0);
-  for (const auto& shard : get_parent()->get_actingbackfill_shards()) {
+  ceph_assert(get_parent()->get_acting_recovery_backfill_shards().size() > 0);
+  for (const auto& shard : get_parent()->get_acting_recovery_backfill_shards()) {
     if (shard == get_parent()->whoami_shard())
       continue;
     if (get_parent()->get_shard_missing(shard).is_missing(oid)) {
@@ -64,7 +64,7 @@ void PGBackend::send_recovery_deletes(int prio,
     const auto& objects = p.second;
     ConnectionRef con = get_parent()->get_con_osd_cluster(
       shard.osd,
-      get_osdmap()->get_epoch());
+      get_osdmap_epoch());
     if (!con)
       continue;
     auto it = objects.begin();
@@ -75,7 +75,7 @@ void PGBackend::send_recovery_deletes(int prio,
       MOSDPGRecoveryDelete *msg =
        new MOSDPGRecoveryDelete(get_parent()->whoami_shard(),
                                 target_pg,
-                                get_osdmap()->get_epoch(),
+                                get_osdmap_epoch(),
                                 min_epoch);
       msg->set_priority(prio);
 
@@ -116,8 +116,8 @@ bool PGBackend::handle_message(OpRequestRef op)
 
 void PGBackend::handle_recovery_delete(OpRequestRef op)
 {
-  const MOSDPGRecoveryDelete *m = static_cast<const MOSDPGRecoveryDelete *>(op->get_req());
-  assert(m->get_type() == MSG_OSD_PG_RECOVERY_DELETE);
+  auto m = op->get_req<MOSDPGRecoveryDelete>();
+  ceph_assert(m->get_type() == MSG_OSD_PG_RECOVERY_DELETE);
   dout(20) << __func__ << " " << op << dendl;
 
   op->mark_started();
@@ -136,7 +136,7 @@ void PGBackend::handle_recovery_delete(OpRequestRef op)
   reply->objects = m->objects;
   ConnectionRef conn = m->get_connection();
 
-  gather.set_finisher(new FunctionContext(
+  gather.set_finisher(new LambdaContext(
     [=](int r) {
       if (r != -EAGAIN) {
        get_parent()->send_message_osd_cluster(reply, conn.get());
@@ -149,8 +149,8 @@ void PGBackend::handle_recovery_delete(OpRequestRef op)
 
 void PGBackend::handle_recovery_delete_reply(OpRequestRef op)
 {
-  const MOSDPGRecoveryDeleteReply *m = static_cast<const MOSDPGRecoveryDeleteReply *>(op->get_req());
-  assert(m->get_type() == MSG_OSD_PG_RECOVERY_DELETE_REPLY);
+  auto m = op->get_req<MOSDPGRecoveryDeleteReply>();
+  ceph_assert(m->get_type() == MSG_OSD_PG_RECOVERY_DELETE_REPLY);
   dout(20) << __func__ << " " << op << dendl;
 
   for (const auto &p : m->objects) {
@@ -159,7 +159,7 @@ void PGBackend::handle_recovery_delete_reply(OpRequestRef op)
     recovery_info.version = p.second;
     get_parent()->on_peer_recover(m->from, oid, recovery_info);
     bool peers_recovered = true;
-    for (const auto& shard : get_parent()->get_actingbackfill_shards()) {
+    for (const auto& shard : get_parent()->get_acting_recovery_backfill_shards()) {
       if (shard == get_parent()->whoami_shard())
        continue;
       if (get_parent()->get_shard_missing(shard).is_missing(oid)) {
@@ -197,7 +197,7 @@ void PGBackend::rollback(
       temp.append(t);
       temp.swap(t);
     }
-    void setattrs(map<string, boost::optional<bufferlist> > &attrs) override {
+    void setattrs(map<string, std::optional<bufferlist> > &attrs) override {
       ObjectStore::Transaction temp;
       pg->rollback_setattrs(hoid, attrs, &temp);
       temp.append(t);
@@ -237,7 +237,7 @@ void PGBackend::rollback(
     }
   };
 
-  assert(entry.mod_desc.can_rollback());
+  ceph_assert(entry.mod_desc.can_rollback());
   RollbackVisitor vis(entry.soid, this);
   entry.mod_desc.visit(&vis);
   t->append(vis.t);
@@ -305,7 +305,7 @@ void PGBackend::try_stash(
 void PGBackend::remove(
   const hobject_t &hoid,
   ObjectStore::Transaction *t) {
-  assert(!hoid.is_temp());
+  ceph_assert(!hoid.is_temp());
   t->remove(
     coll,
     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
@@ -335,7 +335,7 @@ int PGBackend::objects_list_partial(
   vector<hobject_t> *ls,
   hobject_t *next)
 {
-  assert(ls);
+  ceph_assert(ls);
   // Starts with the smallest generation to make sure the result list
   // has the marker object (it might have multiple generations
   // though, which would be filtered).
@@ -350,13 +350,24 @@ int PGBackend::objects_list_partial(
 
   while (!_next.is_max() && ls->size() < (unsigned)min) {
     vector<ghobject_t> objects;
-    r = store->collection_list(
-      ch,
-      _next,
-      ghobject_t::get_max(),
-      max - ls->size(),
-      &objects,
-      &_next);
+    if (HAVE_FEATURE(parent->min_upacting_features(),
+                     OSD_FIXED_COLLECTION_LIST)) {
+      r = store->collection_list(
+        ch,
+        _next,
+        ghobject_t::get_max(),
+        max - ls->size(),
+        &objects,
+        &_next);
+    } else {
+      r = store->collection_list_legacy(
+        ch,
+        _next,
+        ghobject_t::get_max(),
+        max - ls->size(),
+        &objects,
+        &_next);
+    }
     if (r != 0) {
       derr << __func__ << " list collection " << ch << " got: " << cpp_strerror(r) << dendl;
       break;
@@ -380,19 +391,30 @@ int PGBackend::objects_list_partial(
 int PGBackend::objects_list_range(
   const hobject_t &start,
   const hobject_t &end,
-  snapid_t seq,
   vector<hobject_t> *ls,
   vector<ghobject_t> *gen_obs)
 {
-  assert(ls);
+  ceph_assert(ls);
   vector<ghobject_t> objects;
-  int r = store->collection_list(
-    ch,
-    ghobject_t(start, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
-    ghobject_t(end, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
-    INT_MAX,
-    &objects,
-    NULL);
+  int r;
+  if (HAVE_FEATURE(parent->min_upacting_features(),
+                   OSD_FIXED_COLLECTION_LIST)) {
+    r = store->collection_list(
+      ch,
+      ghobject_t(start, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+      ghobject_t(end, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+      INT_MAX,
+      &objects,
+      NULL);
+  } else {
+    r = store->collection_list_legacy(
+      ch,
+      ghobject_t(start, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+      ghobject_t(end, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+      INT_MAX,
+      &objects,
+      NULL);
+  }
   ls->reserve(objects.size());
   for (vector<ghobject_t>::iterator i = objects.begin();
        i != objects.end();
@@ -439,15 +461,15 @@ int PGBackend::objects_get_attrs(
 
 void PGBackend::rollback_setattrs(
   const hobject_t &hoid,
-  map<string, boost::optional<bufferlist> > &old_attrs,
+  map<string, std::optional<bufferlist> > &old_attrs,
   ObjectStore::Transaction *t) {
   map<string, bufferlist> to_set;
-  assert(!hoid.is_temp());
-  for (map<string, boost::optional<bufferlist> >::iterator i = old_attrs.begin();
+  ceph_assert(!hoid.is_temp());
+  for (map<string, std::optional<bufferlist> >::iterator i = old_attrs.begin();
        i != old_attrs.end();
        ++i) {
     if (i->second) {
-      to_set[i->first] = i->second.get();
+      to_set[i->first] = *(i->second);
     } else {
       t->rmattr(
        coll,
@@ -465,7 +487,7 @@ void PGBackend::rollback_append(
   const hobject_t &hoid,
   uint64_t old_size,
   ObjectStore::Transaction *t) {
-  assert(!hoid.is_temp());
+  ceph_assert(!hoid.is_temp());
   t->truncate(
     coll,
     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
@@ -476,7 +498,7 @@ void PGBackend::rollback_stash(
   const hobject_t &hoid,
   version_t old_version,
   ObjectStore::Transaction *t) {
-  assert(!hoid.is_temp());
+  ceph_assert(!hoid.is_temp());
   t->remove(
     coll,
     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
@@ -491,7 +513,7 @@ void PGBackend::rollback_try_stash(
   const hobject_t &hoid,
   version_t old_version,
   ObjectStore::Transaction *t) {
-  assert(!hoid.is_temp());
+  ceph_assert(!hoid.is_temp());
   t->remove(
     coll,
     ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
@@ -525,36 +547,35 @@ void PGBackend::trim_rollback_object(
   const hobject_t &hoid,
   version_t old_version,
   ObjectStore::Transaction *t) {
-  assert(!hoid.is_temp());
+  ceph_assert(!hoid.is_temp());
   t->remove(
     coll, ghobject_t(hoid, old_version, get_parent()->whoami_shard().shard));
 }
 
 PGBackend *PGBackend::build_pg_backend(
   const pg_pool_t &pool,
-  const OSDMapRef curmap,
+  const map<string,string>& profile,
   Listener *l,
   coll_t coll,
   ObjectStore::CollectionHandle &ch,
   ObjectStore *store,
   CephContext *cct)
 {
+  ErasureCodeProfile ec_profile = profile;
   switch (pool.type) {
   case pg_pool_t::TYPE_REPLICATED: {
     return new ReplicatedBackend(l, coll, ch, store, cct);
   }
   case pg_pool_t::TYPE_ERASURE: {
     ErasureCodeInterfaceRef ec_impl;
-    ErasureCodeProfile profile = curmap->get_erasure_code_profile(pool.erasure_code_profile);
-    assert(profile.count("plugin"));
     stringstream ss;
     ceph::ErasureCodePluginRegistry::instance().factory(
       profile.find("plugin")->second,
-      cct->_conf->get_val<std::string>("erasure_code_dir"),
-      profile,
+      cct->_conf.get_val<std::string>("erasure_code_dir"),
+      ec_profile,
       &ec_impl,
       &ss);
-    assert(ec_impl);
+    ceph_assert(ec_impl);
     return new ECBackend(
       l,
       coll,
@@ -570,58 +591,53 @@ PGBackend *PGBackend::build_pg_backend(
   }
 }
 
-/*
- * pg lock may or may not be held
- */
-void PGBackend::be_scan_list(
-  ScrubMap &map, const vector<hobject_t> &ls, bool deep, uint32_t seed,
-  ThreadPool::TPHandle &handle)
+int PGBackend::be_scan_list(
+  ScrubMap &map,
+  ScrubMapBuilder &pos)
 {
-  dout(10) << __func__ << " scanning " << ls.size() << " objects"
-           << (deep ? " deeply" : "") << dendl;
-  int i = 0;
-  for (vector<hobject_t>::const_iterator p = ls.begin();
-       p != ls.end();
-       ++p, i++) {
-    handle.reset_tp_timeout();
-    hobject_t poid = *p;
-
-    struct stat st;
-    int r = store->stat(
+  dout(10) << __func__ << " " << pos << dendl;
+  ceph_assert(!pos.done());
+  ceph_assert(pos.pos < pos.ls.size());
+  hobject_t& poid = pos.ls[pos.pos];
+
+  struct stat st;
+  int r = store->stat(
+    ch,
+    ghobject_t(
+      poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+    &st,
+    true);
+  if (r == 0) {
+    ScrubMap::object &o = map.objects[poid];
+    o.size = st.st_size;
+    ceph_assert(!o.negative);
+    store->getattrs(
       ch,
       ghobject_t(
        poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
-      &st,
-      true);
-    if (r == 0) {
-      ScrubMap::object &o = map.objects[poid];
-      o.size = st.st_size;
-      assert(!o.negative);
-      store->getattrs(
-       ch,
-       ghobject_t(
-         poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
-       o.attrs);
-
-      // calculate the CRC32 on deep scrubs
-      if (deep) {
-       be_deep_scrub(*p, seed, o, handle);
-      }
+      o.attrs);
 
-      dout(25) << __func__ << "  " << poid << dendl;
-    } else if (r == -ENOENT) {
-      dout(25) << __func__ << "  " << poid << " got " << r
-              << ", skipping" << dendl;
-    } else if (r == -EIO) {
-      dout(25) << __func__ << "  " << poid << " got " << r
-              << ", stat_error" << dendl;
-      ScrubMap::object &o = map.objects[poid];
-      o.stat_error = true;
-    } else {
-      derr << __func__ << " got: " << cpp_strerror(r) << dendl;
-      ceph_abort();
+    if (pos.deep) {
+      r = be_deep_scrub(poid, map, pos, o);
     }
+    dout(25) << __func__ << "  " << poid << dendl;
+  } else if (r == -ENOENT) {
+    dout(25) << __func__ << "  " << poid << " got " << r
+            << ", skipping" << dendl;
+  } else if (r == -EIO) {
+    dout(25) << __func__ << "  " << poid << " got " << r
+            << ", stat_error" << dendl;
+    ScrubMap::object &o = map.objects[poid];
+    o.stat_error = true;
+  } else {
+    derr << __func__ << " got: " << cpp_strerror(r) << dendl;
+    ceph_abort();
   }
+  if (r == -EINPROGRESS) {
+    return -EINPROGRESS;
+  }
+  pos.next_object();
+  return 0;
 }
 
 bool PGBackend::be_compare_scrub_objects(
@@ -631,18 +647,10 @@ bool PGBackend::be_compare_scrub_objects(
   const ScrubMap::object &candidate,
   shard_info_wrapper &shard_result,
   inconsistent_obj_wrapper &obj_result,
-  ostream &errorstream)
+  ostream &errorstream,
+  bool has_snapset)
 {
   enum { CLEAN, FOUND_ERROR } error = CLEAN;
-  if (candidate.stat_error) {
-    assert(shard_result.has_stat_error());
-    error = FOUND_ERROR;
-    errorstream << "candidate had a stat error";
-  }
-  if (candidate.read_error || candidate.ec_hash_mismatch || candidate.ec_size_mismatch) {
-    error = FOUND_ERROR;
-    errorstream << "candidate had a read error";
-  }
   if (auth.digest_present && candidate.digest_present) {
     if (auth.digest != candidate.digest) {
       if (error != CLEAN)
@@ -674,7 +682,7 @@ bool PGBackend::be_compare_scrub_objects(
         errorstream << "data_digest 0x" << std::hex << candidate.digest
                    << " != data_digest 0x" << auth_oi.data_digest << std::dec
                    << " from auth oi " << auth_oi;
-        shard_result.set_data_digest_mismatch_oi();
+        shard_result.set_data_digest_mismatch_info();
       }
     }
     if (auth_oi.is_omap_digest() && candidate.omap_digest_present) {
@@ -685,12 +693,73 @@ bool PGBackend::be_compare_scrub_objects(
         errorstream << "omap_digest 0x" << std::hex << candidate.omap_digest
                    << " != omap_digest 0x" << auth_oi.omap_digest << std::dec
                    << " from auth oi " << auth_oi;
-        shard_result.set_omap_digest_mismatch_oi();
+        shard_result.set_omap_digest_mismatch_info();
       }
     }
   }
   if (candidate.stat_error)
     return error == FOUND_ERROR;
+  if (!shard_result.has_info_missing()
+      && !shard_result.has_info_corrupted()) {
+    bufferlist can_bl, auth_bl;
+    auto can_attr = candidate.attrs.find(OI_ATTR);
+    auto auth_attr = auth.attrs.find(OI_ATTR);
+
+    ceph_assert(auth_attr != auth.attrs.end());
+    ceph_assert(can_attr != candidate.attrs.end());
+
+    can_bl.push_back(can_attr->second);
+    auth_bl.push_back(auth_attr->second);
+    if (!can_bl.contents_equal(auth_bl)) {
+      if (error != CLEAN)
+        errorstream << ", ";
+      error = FOUND_ERROR;
+      obj_result.set_object_info_inconsistency();
+      errorstream << "object info inconsistent ";
+    }
+  }
+  if (has_snapset) {
+    if (!shard_result.has_snapset_missing()
+        && !shard_result.has_snapset_corrupted()) {
+      bufferlist can_bl, auth_bl;
+      auto can_attr = candidate.attrs.find(SS_ATTR);
+      auto auth_attr = auth.attrs.find(SS_ATTR);
+
+      ceph_assert(auth_attr != auth.attrs.end());
+      ceph_assert(can_attr != candidate.attrs.end());
+
+      can_bl.push_back(can_attr->second);
+      auth_bl.push_back(auth_attr->second);
+      if (!can_bl.contents_equal(auth_bl)) {
+         if (error != CLEAN)
+           errorstream << ", ";
+         error = FOUND_ERROR;
+         obj_result.set_snapset_inconsistency();
+         errorstream << "snapset inconsistent ";
+      }
+    }
+  }
+  if (parent->get_pool().is_erasure()) {
+    if (!shard_result.has_hinfo_missing()
+        && !shard_result.has_hinfo_corrupted()) {
+      bufferlist can_bl, auth_bl;
+      auto can_hi = candidate.attrs.find(ECUtil::get_hinfo_key());
+      auto auth_hi = auth.attrs.find(ECUtil::get_hinfo_key());
+
+      ceph_assert(auth_hi != auth.attrs.end());
+      ceph_assert(can_hi != candidate.attrs.end());
+
+      can_bl.push_back(can_hi->second);
+      auth_bl.push_back(auth_hi->second);
+      if (!can_bl.contents_equal(auth_bl)) {
+        if (error != CLEAN)
+         errorstream << ", ";
+       error = FOUND_ERROR;
+       obj_result.set_hinfo_inconsistency();
+       errorstream << "hinfo inconsistent ";
+      }
+    }
+  }
   uint64_t oi_size = be_get_ondisk_size(auth_oi.size);
   if (oi_size != candidate.size) {
     if (error != CLEAN)
@@ -699,7 +768,7 @@ bool PGBackend::be_compare_scrub_objects(
     errorstream << "size " << candidate.size
                << " != size " << oi_size
                << " from auth oi " << auth_oi;
-    shard_result.set_size_mismatch_oi();
+    shard_result.set_size_mismatch_info();
   }
   if (auth.size != candidate.size) {
     if (error != CLEAN)
@@ -710,11 +779,23 @@ bool PGBackend::be_compare_scrub_objects(
                << " from shard " << auth_shard;
     obj_result.set_size_mismatch();
   }
+  // If the replica is too large and we didn't already count it for this object
+  //
+  if (candidate.size > cct->_conf->osd_max_object_size
+      && !obj_result.has_size_too_large()) {
+    if (error != CLEAN)
+      errorstream << ", ";
+    error = FOUND_ERROR;
+    errorstream << "size " << candidate.size
+               << " > " << cct->_conf->osd_max_object_size
+               << " is too large";
+    obj_result.set_size_too_large();
+  }
   for (map<string,bufferptr>::const_iterator i = auth.attrs.begin();
        i != auth.attrs.end();
        ++i) {
     // We check system keys seperately
-    if (i->first == OI_ATTR || i->first == SS_ATTR)
+    if (i->first == OI_ATTR || i->first[0] != '_')
       continue;
     if (!candidate.attrs.count(i->first)) {
       if (error != CLEAN)
@@ -734,7 +815,7 @@ bool PGBackend::be_compare_scrub_objects(
        i != candidate.attrs.end();
        ++i) {
     // We check system keys seperately
-    if (i->first == OI_ATTR || i->first == SS_ATTR)
+    if (i->first == OI_ATTR || i->first[0] != '_')
       continue;
     if (!auth.attrs.count(i->first)) {
       if (error != CLEAN)
@@ -763,10 +844,11 @@ map<pg_shard_t, ScrubMap *>::const_iterator
   const map<pg_shard_t,ScrubMap*> &maps,
   object_info_t *auth_oi,
   map<pg_shard_t, shard_info_wrapper> &shard_map,
-  inconsistent_obj_wrapper &object_error)
+  bool &digest_match,
+  spg_t pgid,
+  ostream &errorstream)
 {
   eversion_t auth_version;
-  bufferlist first_bl;
 
   // Create list of shards with primary first so it will be auth copy all
   // other things being equal.
@@ -781,59 +863,105 @@ map<pg_shard_t, ScrubMap *>::const_iterator
   shards.push_front(get_parent()->whoami_shard());
 
   map<pg_shard_t, ScrubMap *>::const_iterator auth = maps.end();
+  digest_match = true;
   for (auto &l : shards) {
+    ostringstream shard_errorstream;
+    bool error = false;
     map<pg_shard_t, ScrubMap *>::const_iterator j = maps.find(l);
     map<hobject_t, ScrubMap::object>::iterator i =
       j->second->objects.find(obj);
     if (i == j->second->objects.end()) {
       continue;
     }
-    string error_string;
     auto& shard_info = shard_map[j->first];
     if (j->first == get_parent()->whoami_shard())
       shard_info.primary = true;
     if (i->second.read_error) {
       shard_info.set_read_error();
-      error_string += " read_error";
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate had a read error";
     }
     if (i->second.ec_hash_mismatch) {
       shard_info.set_ec_hash_mismatch();
-      error_string += " ec_hash_mismatch";
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate had an ec hash mismatch";
     }
     if (i->second.ec_size_mismatch) {
       shard_info.set_ec_size_mismatch();
-      error_string += " ec_size_mismatch";
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate had an ec size mismatch";
     }
 
     object_info_t oi;
     bufferlist bl;
     map<string, bufferptr>::iterator k;
     SnapSet ss;
-    bufferlist ss_bl;
+    bufferlist ss_bl, hk_bl;
 
     if (i->second.stat_error) {
       shard_info.set_stat_error();
-      error_string += " stat_error";
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate had a stat error";
       // With stat_error no further checking
       // We don't need to also see a missing_object_info_attr
       goto out;
     }
 
     // We won't pick an auth copy if the snapset is missing or won't decode.
-    if (obj.is_head() || obj.is_snapdir()) {
+    ceph_assert(!obj.is_snapdir());
+    if (obj.is_head()) {
       k = i->second.attrs.find(SS_ATTR);
       if (k == i->second.attrs.end()) {
-       shard_info.set_ss_attr_missing();
-       error_string += " ss_attr_missing";
+       shard_info.set_snapset_missing();
+        if (error)
+          shard_errorstream << ", ";
+        error = true;
+        shard_errorstream << "candidate had a missing snapset key";
       } else {
         ss_bl.push_back(k->second);
         try {
-         bufferlist::iterator bliter = ss_bl.begin();
-         ::decode(ss, bliter);
+         auto bliter = ss_bl.cbegin();
+         decode(ss, bliter);
         } catch (...) {
          // invalid snapset, probably corrupt
-         shard_info.set_ss_attr_corrupted();
-         error_string += " ss_attr_corrupted";
+         shard_info.set_snapset_corrupted();
+          if (error)
+            shard_errorstream << ", ";
+          error = true;
+          shard_errorstream << "candidate had a corrupt snapset";
+        }
+      }
+    }
+
+    if (parent->get_pool().is_erasure()) {
+      ECUtil::HashInfo hi;
+      k = i->second.attrs.find(ECUtil::get_hinfo_key());
+      if (k == i->second.attrs.end()) {
+       shard_info.set_hinfo_missing();
+        if (error)
+          shard_errorstream << ", ";
+        error = true;
+        shard_errorstream << "candidate had a missing hinfo key";
+      } else {
+       hk_bl.push_back(k->second);
+        try {
+         auto bliter = hk_bl.cbegin();
+         decode(hi, bliter);
+        } catch (...) {
+         // invalid snapset, probably corrupt
+         shard_info.set_hinfo_corrupted();
+          if (error)
+            shard_errorstream << ", ";
+          error = true;
+          shard_errorstream << "candidate had a corrupt hinfo";
         }
       }
     }
@@ -841,35 +969,48 @@ map<pg_shard_t, ScrubMap *>::const_iterator
     k = i->second.attrs.find(OI_ATTR);
     if (k == i->second.attrs.end()) {
       // no object info on object, probably corrupt
-      shard_info.set_oi_attr_missing();
-      error_string += " oi_attr_missing";
+      shard_info.set_info_missing();
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate had a missing info key";
       goto out;
     }
     bl.push_back(k->second);
     try {
-      bufferlist::iterator bliter = bl.begin();
-      ::decode(oi, bliter);
+      auto bliter = bl.cbegin();
+      decode(oi, bliter);
     } catch (...) {
       // invalid object info, probably corrupt
-      shard_info.set_oi_attr_corrupted();
-      error_string += " oi_attr_corrupted";
+      shard_info.set_info_corrupted();
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate had a corrupt info";
       goto out;
     }
 
     // This is automatically corrected in PG::_repair_oinfo_oid()
-    assert(oi.soid == obj);
+    ceph_assert(oi.soid == obj);
 
-    if (first_bl.length() == 0) {
-      first_bl.append(bl);
-    } else if (!object_error.has_object_info_inconsistency() && !bl.contents_equal(first_bl)) {
-      object_error.set_object_info_inconsistency();
-      error_string += " object_info_inconsistency";
+    if (i->second.size != be_get_ondisk_size(oi.size)) {
+      shard_info.set_obj_size_info_mismatch();
+      if (error)
+        shard_errorstream << ", ";
+      error = true;
+      shard_errorstream << "candidate size " << i->second.size << " info size "
+                       << oi.size << " mismatch";
     }
 
-    if (i->second.size != be_get_ondisk_size(oi.size)) {
-      dout(5) << __func__ << " size " << i->second.size << " oi size " << oi.size << dendl;
-      shard_info.set_obj_size_oi_mismatch();
-      error_string += " obj_size_oi_mismatch";
+    // digest_match will only be true if computed digests are the same
+    if (auth_version != eversion_t()
+        && auth->second->objects[obj].digest_present
+        && i->second.digest_present
+        && auth->second->objects[obj].digest != i->second.digest) {
+      digest_match = false;
+      dout(10) << __func__ << " digest_match = false, " << obj << " data_digest 0x" << std::hex << i->second.digest
+                   << " != data_digest 0x" << auth->second->objects[obj].digest << std::dec
+                   << dendl;
     }
 
     // Don't use this particular shard due to previous errors
@@ -885,13 +1026,9 @@ map<pg_shard_t, ScrubMap *>::const_iterator
     }
 
 out:
-    // Check error_string because some errors already generated messages
-    if (error_string != "") {
-      dout(10) << __func__ << ": error(s) osd " << j->first
-              << " for obj " << obj
-              << "," << error_string
-              << dendl;
-    }
+    if (error)
+        errorstream << pgid.pgid << " shard " << l << " soid " << obj
+                   << " : " << shard_errorstream.str() << "\n";
     // Keep scanning other shards
   }
   dout(10) << __func__ << ": selecting osd " << auth->first
@@ -903,29 +1040,21 @@ out:
 
 void PGBackend::be_compare_scrubmaps(
   const map<pg_shard_t,ScrubMap*> &maps,
+  const set<hobject_t> &master_set,
   bool repair,
   map<hobject_t, set<pg_shard_t>> &missing,
   map<hobject_t, set<pg_shard_t>> &inconsistent,
   map<hobject_t, list<pg_shard_t>> &authoritative,
-  map<hobject_t, pair<uint32_t,uint32_t>> &missing_digest,
+  map<hobject_t, pair<std::optional<uint32_t>,
+                      std::optional<uint32_t>>> &missing_digest,
   int &shallow_errors, int &deep_errors,
   Scrub::Store *store,
   const spg_t& pgid,
   const vector<int> &acting,
   ostream &errorstream)
 {
-  map<hobject_t,ScrubMap::object>::const_iterator i;
-  map<pg_shard_t, ScrubMap *>::const_iterator j;
-  set<hobject_t> master_set;
   utime_t now = ceph_clock_now();
 
-  // Construct master set
-  for (j = maps.begin(); j != maps.end(); ++j) {
-    for (i = j->second->objects.begin(); i != j->second->objects.end(); ++i) {
-      master_set.insert(i->first);
-    }
-  }
-
   // Check maps against master set and each other
   for (set<hobject_t>::const_iterator k = master_set.begin();
        k != master_set.end();
@@ -935,8 +1064,10 @@ void PGBackend::be_compare_scrubmaps(
 
     inconsistent_obj_wrapper object_error{*k};
 
+    bool digest_match;
     map<pg_shard_t, ScrubMap *>::const_iterator auth =
-      be_select_auth_object(*k, maps, &auth_oi, shard_map, object_error);
+      be_select_auth_object(*k, maps, &auth_oi, shard_map, digest_match,
+                           pgid, errorstream);
 
     list<pg_shard_t> auth_list;
     set<pg_shard_t> object_errors;
@@ -950,15 +1081,16 @@ void PGBackend::be_compare_scrubmaps(
        ++shallow_errors;
       store->add_object_error(k->pool, object_error);
       errorstream << pgid.pgid << " soid " << *k
-                 << ": failed to pick suitable object info\n";
+                 << " : failed to pick suitable object info\n";
       continue;
     }
     object_error.set_version(auth_oi.user_version);
     ScrubMap::object& auth_object = auth->second->objects[*k];
     set<pg_shard_t> cur_missing;
     set<pg_shard_t> cur_inconsistent;
+    bool fix_digest = false;
 
-    for (j = maps.begin(); j != maps.end(); ++j) {
+    for (auto j = maps.cbegin(); j != maps.cend(); ++j) {
       if (j == auth)
        shard_map[auth->first].selected_oi = true;
       if (j->second->objects.count(*k)) {
@@ -971,7 +1103,24 @@ void PGBackend::be_compare_scrubmaps(
                                   j->second->objects[*k],
                                   shard_map[j->first],
                                   object_error,
-                                  ss);
+                                  ss,
+                                  k->has_snapset());
+
+       dout(20) << __func__ << (repair ? " repair " : " ") << (parent->get_pool().is_replicated() ? "replicated " : "")
+        << (j == auth ? "auth" : "") << "shards " << shard_map.size() << (digest_match ? " digest_match " : " ")
+        << (shard_map[j->first].only_data_digest_mismatch_info() ? "'info mismatch info'" : "")
+        << dendl;
+       // If all replicas match, but they don't match object_info we can
+       // repair it by using missing_digest mechanism
+       if (repair && parent->get_pool().is_replicated() && j == auth && shard_map.size() > 1
+           && digest_match && shard_map[j->first].only_data_digest_mismatch_info()
+           && auth_object.digest_present) {
+         // Set in missing_digests
+         fix_digest = true;
+         // Clear the error
+         shard_map[j->first].clear_data_digest_mismatch_info();
+         errorstream << pgid << " soid " << *k << " : repairing object info data_digest" << "\n";
+       }
        // Some errors might have already been set in be_select_auth_object()
        if (shard_map[j->first].errors != 0) {
          cur_inconsistent.insert(j->first);
@@ -982,13 +1131,13 @@ void PGBackend::be_compare_scrubmaps(
          // Only true if be_compare_scrub_objects() found errors and put something
          // in ss.
          if (found)
-           errorstream << pgid << " shard " << j->first << ": soid " << *k
-                     << " " << ss.str() << "\n";
+           errorstream << pgid << " shard " << j->first << " soid " << *k
+                     << " " << ss.str() << "\n";
        } else if (found) {
          // Track possible shard to use as authoritative, if needed
          // There are errors, without identifying the shard
          object_errors.insert(j->first);
-         errorstream << pgid << " : soid " << *k << " " << ss.str() << "\n";
+         errorstream << pgid << " soid " << *k << " : " << ss.str() << "\n";
        } else {
          // XXX: The auth shard might get here that we don't know
          // that it has the "correct" data.
@@ -1000,8 +1149,7 @@ void PGBackend::be_compare_scrubmaps(
         shard_map[j->first].primary = (j->first == get_parent()->whoami_shard());
        // Can't have any other errors if there is no information available
        ++shallow_errors;
-       errorstream << pgid << " shard " << j->first << " missing " << *k
-                   << "\n";
+       errorstream << pgid << " shard " << j->first << " " << *k << " : missing\n";
       }
       object_error.add_shard(j->first, shard_map[j->first]);
     }
@@ -1009,7 +1157,7 @@ void PGBackend::be_compare_scrubmaps(
     if (auth_list.empty()) {
       if (object_errors.empty()) {
         errorstream << pgid.pgid << " soid " << *k
-                 << ": failed to pick suitable auth object\n";
+                 << " : failed to pick suitable auth object\n";
         goto out;
       }
       // Object errors exist and nothing in auth_list
@@ -1032,32 +1180,38 @@ void PGBackend::be_compare_scrubmaps(
     if (!cur_inconsistent.empty()) {
       inconsistent[*k] = cur_inconsistent;
     }
+
+    if (fix_digest) {
+      std::optional<uint32_t> data_digest, omap_digest;
+      ceph_assert(auth_object.digest_present);
+      data_digest = auth_object.digest;
+      if (auth_object.omap_digest_present) {
+        omap_digest = auth_object.omap_digest;
+      }
+      missing_digest[*k] = make_pair(data_digest, omap_digest);
+    }
     if (!cur_inconsistent.empty() || !cur_missing.empty()) {
       authoritative[*k] = auth_list;
-    } else if (parent->get_pool().is_replicated()) {
+    } else if (!fix_digest && parent->get_pool().is_replicated()) {
       enum {
        NO = 0,
        MAYBE = 1,
        FORCE = 2,
       } update = NO;
 
-      if (auth_object.digest_present && auth_object.omap_digest_present &&
-         (!auth_oi.is_data_digest() || !auth_oi.is_omap_digest())) {
-       dout(20) << __func__ << " missing digest on " << *k << dendl;
+      if (auth_object.digest_present && !auth_oi.is_data_digest()) {
+       dout(20) << __func__ << " missing data digest on " << *k << dendl;
        update = MAYBE;
       }
-      if (auth_object.digest_present && auth_object.omap_digest_present &&
-         cct->_conf->osd_debug_scrub_chance_rewrite_digest &&
-         (((unsigned)rand() % 100) >
-          cct->_conf->osd_debug_scrub_chance_rewrite_digest)) {
-       dout(20) << __func__ << " randomly updating digest on " << *k << dendl;
+      if (auth_object.omap_digest_present && !auth_oi.is_omap_digest()) {
+       dout(20) << __func__ << " missing omap digest on " << *k << dendl;
        update = MAYBE;
       }
 
       // recorded digest != actual digest?
       if (auth_oi.is_data_digest() && auth_object.digest_present &&
          auth_oi.data_digest != auth_object.digest) {
-        assert(shard_map[auth->first].has_data_digest_mismatch_oi());
+        ceph_assert(shard_map[auth->first].has_data_digest_mismatch_info());
        errorstream << pgid << " recorded data digest 0x"
                    << std::hex << auth_oi.data_digest << " != on disk 0x"
                    << auth_object.digest << std::dec << " on " << auth_oi.soid
@@ -1067,7 +1221,7 @@ void PGBackend::be_compare_scrubmaps(
       }
       if (auth_oi.is_omap_digest() && auth_object.omap_digest_present &&
          auth_oi.omap_digest != auth_object.omap_digest) {
-        assert(shard_map[auth->first].has_omap_digest_mismatch_oi());
+        ceph_assert(shard_map[auth->first].has_omap_digest_mismatch_info());
        errorstream << pgid << " recorded omap digest 0x"
                    << std::hex << auth_oi.omap_digest << " != on disk 0x"
                    << auth_object.omap_digest << std::dec
@@ -1080,9 +1234,16 @@ void PGBackend::be_compare_scrubmaps(
        utime_t age = now - auth_oi.local_mtime;
        if (update == FORCE ||
            age > cct->_conf->osd_deep_scrub_update_digest_min_age) {
-         dout(20) << __func__ << " will update digest on " << *k << dendl;
-         missing_digest[*k] = make_pair(auth_object.digest,
-                                        auth_object.omap_digest);
+          std::optional<uint32_t> data_digest, omap_digest;
+          if (auth_object.digest_present) {
+            data_digest = auth_object.digest;
+           dout(20) << __func__ << " will update data digest on " << *k << dendl;
+          }
+          if (auth_object.omap_digest_present) {
+            omap_digest = auth_object.omap_digest;
+           dout(20) << __func__ << " will update omap digest on " << *k << dendl;
+          }
+         missing_digest[*k] = make_pair(data_digest, omap_digest);
        } else {
          dout(20) << __func__ << " missing digest but age " << age
                   << " < " << cct->_conf->osd_deep_scrub_update_digest_min_age
@@ -1100,3 +1261,50 @@ out:
     }
   }
 }
+
+void PGBackend::be_omap_checks(const map<pg_shard_t,ScrubMap*> &maps,
+  const set<hobject_t> &master_set,
+  omap_stat_t& omap_stats,
+  ostream &warnstream) const
+{
+  bool needs_omap_check = false;
+  for (const auto& map : maps) {
+    if (map.second->has_large_omap_object_errors || map.second->has_omap_keys) {
+      needs_omap_check = true;
+      break;
+    }
+  }
+
+  if (!needs_omap_check) {
+    return; // Nothing to do
+  }
+
+  // Iterate through objects and update omap stats
+  for (const auto& k : master_set) {
+    for (const auto& map : maps) {
+      if (map.first != get_parent()->primary_shard()) {
+        // Only set omap stats for the primary
+        continue;
+      }
+      auto it = map.second->objects.find(k);
+      if (it == map.second->objects.end())
+        continue;
+      ScrubMap::object& obj = it->second;
+      omap_stats.omap_bytes += obj.object_omap_bytes;
+      omap_stats.omap_keys += obj.object_omap_keys;
+      if (obj.large_omap_object_found) {
+        pg_t pg;
+        auto osdmap = get_osdmap();
+        osdmap->map_to_pg(k.pool, k.oid.name, k.get_key(), k.nspace, &pg);
+        pg_t mpg = osdmap->raw_pg_to_pg(pg);
+        omap_stats.large_omap_objects++;
+        warnstream << "Large omap object found. Object: " << k
+                   << " PG: " << pg << " (" << mpg << ")"
+                   << " Key count: " << obj.large_omap_object_key_count
+                   << " Size (bytes): " << obj.large_omap_object_value_size
+                   << '\n';
+        break;
+      }
+    }
+  }
+}