]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/osd/TestPGLog.cc
update sources to v12.1.3
[ceph.git] / ceph / src / test / osd / TestPGLog.cc
index 8e10c7532f3a2190149299b21e0d03d797723cc2..bf29f87c4e55fda6d174835da9f36b3ade15f24e 100644 (file)
 #include "osd/PGLog.h"
 #include "osd/OSDMap.h"
 #include "include/coredumpctl.h"
+#include "../objectstore/store_test_fixture.h"
 
-class PGLogTest : public ::testing::Test, protected PGLog {
-public:
-  PGLogTest() : PGLog(g_ceph_context) {}
-  void SetUp() override { }
-
-  void TearDown() override {
-    clear();
-  }
 
+struct PGLogTestBase {
   static hobject_t mk_obj(unsigned id) {
     hobject_t hoid;
     stringstream ss;
     ss << "obj_" << id;
     hoid.oid = ss.str();
     hoid.set_hash(id);
+    hoid.pool = 1;
     return hoid;
   }
   static eversion_t mk_evt(unsigned ep, unsigned v) {
     return eversion_t(ep, v);
   }
   static pg_log_entry_t mk_ple_mod(
-    const hobject_t &hoid, eversion_t v, eversion_t pv) {
+    const hobject_t &hoid, eversion_t v, eversion_t pv, osd_reqid_t reqid) {
     pg_log_entry_t e;
     e.mark_unrollbackable();
     e.op = pg_log_entry_t::MODIFY;
     e.soid = hoid;
     e.version = v;
     e.prior_version = pv;
+    e.reqid = reqid;
     return e;
   }
   static pg_log_entry_t mk_ple_dt(
-    const hobject_t &hoid, eversion_t v, eversion_t pv) {
+    const hobject_t &hoid, eversion_t v, eversion_t pv, osd_reqid_t reqid) {
     pg_log_entry_t e;
     e.mark_unrollbackable();
     e.op = pg_log_entry_t::DELETE;
     e.soid = hoid;
     e.version = v;
     e.prior_version = pv;
+    e.reqid = reqid;
     return e;
   }
-  static pg_log_entry_t mk_ple_mod_rb(
+  static pg_log_entry_t mk_ple_ldt(
     const hobject_t &hoid, eversion_t v, eversion_t pv) {
     pg_log_entry_t e;
+    e.mark_unrollbackable();
+    e.op = pg_log_entry_t::LOST_DELETE;
+    e.soid = hoid;
+    e.version = v;
+    e.prior_version = pv;
+    return e;
+  }
+  static pg_log_entry_t mk_ple_mod_rb(
+    const hobject_t &hoid, eversion_t v, eversion_t pv, osd_reqid_t reqid) {
+    pg_log_entry_t e;
     e.op = pg_log_entry_t::MODIFY;
     e.soid = hoid;
     e.version = v;
     e.prior_version = pv;
+    e.reqid = reqid;
     return e;
   }
   static pg_log_entry_t mk_ple_dt_rb(
-    const hobject_t &hoid, eversion_t v, eversion_t pv) {
+    const hobject_t &hoid, eversion_t v, eversion_t pv, osd_reqid_t reqid) {
     pg_log_entry_t e;
     e.op = pg_log_entry_t::DELETE;
     e.soid = hoid;
     e.version = v;
     e.prior_version = pv;
+    e.reqid = reqid;
     return e;
   }
+  static pg_log_entry_t mk_ple_err(
+    const hobject_t &hoid, eversion_t v, osd_reqid_t reqid) {
+    pg_log_entry_t e;
+    e.op = pg_log_entry_t::ERROR;
+    e.soid = hoid;
+    e.version = v;
+    e.prior_version = eversion_t(0, 0);
+    e.reqid = reqid;
+    return e;
+  }
+  static pg_log_entry_t mk_ple_mod(
+    const hobject_t &hoid, eversion_t v, eversion_t pv) {
+    return mk_ple_mod(hoid, v, pv, osd_reqid_t());
+  }
+  static pg_log_entry_t mk_ple_dt(
+    const hobject_t &hoid, eversion_t v, eversion_t pv) {
+    return mk_ple_dt(hoid, v, pv, osd_reqid_t());
+  }
+  static pg_log_entry_t mk_ple_mod_rb(
+    const hobject_t &hoid, eversion_t v, eversion_t pv) {
+    return mk_ple_mod_rb(hoid, v, pv, osd_reqid_t());
+  }
+  static pg_log_entry_t mk_ple_dt_rb(
+    const hobject_t &hoid, eversion_t v, eversion_t pv) {
+    return mk_ple_dt_rb(hoid, v, pv, osd_reqid_t());
+  }
+  static pg_log_entry_t mk_ple_err(
+    const hobject_t &hoid, eversion_t v) {
+    return mk_ple_err(hoid, v, osd_reqid_t());
+  }
+}; // PGLogTestBase
+
+
+class PGLogTest : virtual public ::testing::Test, protected PGLog, public PGLogTestBase  {
+public:
+  PGLogTest() : PGLog(g_ceph_context) {}
+  void SetUp() override {
+    missing.may_include_deletes = true;
+  }
+
+#include "common/ceph_context.h"
+#include "common/config.h"
+
+  void TearDown() override {
+    clear();
+  }
+
 
   struct TestCase {
     list<pg_log_entry_t> base;
@@ -95,6 +151,7 @@ public:
 
     set<hobject_t> toremove;
     list<pg_log_entry_t> torollback;
+    bool deletes_during_peering;
 
   private:
     IndexedLog fullauth;
@@ -102,7 +159,10 @@ public:
     pg_info_t authinfo;
     pg_info_t divinfo;
   public:
+    TestCase() : deletes_during_peering(false) {}
     void setup() {
+      init.may_include_deletes = !deletes_during_peering;
+      final.may_include_deletes = !deletes_during_peering;
       fullauth.log.insert(fullauth.log.end(), base.begin(), base.end());
       fullauth.log.insert(fullauth.log.end(), auth.begin(), auth.end());
       fulldiv.log.insert(fulldiv.log.end(), base.begin(), base.end());
@@ -151,12 +211,12 @@ public:
     const IndexedLog &get_fulldiv() const { return fulldiv; }
     const pg_info_t &get_authinfo() const { return authinfo; }
     const pg_info_t &get_divinfo() const { return divinfo; }
-  };
+  }; // struct TestCase
 
   struct LogHandler : public PGLog::LogEntryHandler {
     set<hobject_t> removed;
     list<pg_log_entry_t> rolledback;
-    
+
     void rollback(
       const pg_log_entry_t &entry) override {
       rolledback.push_back(entry);
@@ -235,7 +295,8 @@ public:
     ASSERT_EQ(info.last_update, oinfo.last_update);
     verify_missing(tcase, missing);
     verify_sideeffects(tcase, h);
-  };
+  }
+
   void test_proc_replica_log(const TestCase &tcase) {
     clear();
     log = tcase.get_fullauth();
@@ -248,7 +309,7 @@ public:
     pg_info_t oinfo = tcase.get_divinfo();
 
     proc_replica_log(
-       oinfo, olog, omissing, pg_shard_t(1, shard_id_t(0)));
+      oinfo, olog, omissing, pg_shard_t(1, shard_id_t(0)));
 
     assert(oinfo.last_update >= log.tail);
 
@@ -259,16 +320,22 @@ public:
     for (list<pg_log_entry_t>::const_iterator i = tcase.auth.begin();
         i != tcase.auth.end();
         ++i) {
-      if (i->version > oinfo.last_update)
-       omissing.add_next_event(*i);
+      if (i->version > oinfo.last_update) {
+       if (i->is_delete() && tcase.deletes_during_peering) {
+         omissing.rm(i->soid, i->version);
+       } else {
+         omissing.add_next_event(*i);
+       }
+      }
     }
     verify_missing(tcase, omissing);
-  }
+  } // test_proc_replica_log
+
   void run_test_case(const TestCase &tcase) {
     test_merge_log(tcase);
     test_proc_replica_log(tcase);
   }
-};
+}; // class PGLogTest
 
 struct TestHandler : public PGLog::LogEntryHandler {
   list<hobject_t> &removed;
@@ -676,7 +743,7 @@ TEST_F(PGLogTest, merge_old_entry) {
   // the old entry (from the log entry given in argument) is not a CLONE and
   // the old entry (from the log entry given in argument) is not a DELETE and
   // the old entry prior_version is lower than the tail of the log :
-  //   add the old object to the remove_snap list and 
+  //   add the old object to the remove_snap list and
   //   add the old object to divergent priors and
   //   add or update the prior_version of the object to missing and
   //   return false
@@ -769,7 +836,7 @@ TEST_F(PGLogTest, merge_old_entry) {
     oe.op = pg_log_entry_t::MODIFY;
     oe.prior_version = eversion_t();
 
-    missing.add(oe.soid, eversion_t(1,1), eversion_t());
+    missing.add(oe.soid, eversion_t(1,1), eversion_t(), false);
 
     missing.flush();
     EXPECT_FALSE(is_dirty());
@@ -858,6 +925,7 @@ TEST_F(PGLogTest, merge_log) {
     oinfo.stats.reported_epoch = 1;
     log.tail = olog.tail = eversion_t(1, 1);
     log.head = olog.head = eversion_t(2, 1);
+    missing.may_include_deletes = false;
 
     EXPECT_FALSE(missing.have_missing());
     EXPECT_EQ(0U, log.log.size());
@@ -933,6 +1001,7 @@ TEST_F(PGLogTest, merge_log) {
     list<hobject_t> remove_snap;
     bool dirty_info = false;
     bool dirty_big_info = false;
+    missing.may_include_deletes = false;
 
     {
       pg_log_entry_t e;
@@ -1027,6 +1096,7 @@ TEST_F(PGLogTest, merge_log) {
     bool dirty_big_info = false;
 
     hobject_t divergent_object;
+    missing.may_include_deletes = true;
 
     {
       pg_log_entry_t e;
@@ -1096,7 +1166,128 @@ TEST_F(PGLogTest, merge_log) {
     EXPECT_EQ(1U, log.objects.count(divergent_object));
     EXPECT_EQ(4U, log.log.size());
     /* DELETE entries from olog that are appended to the hed of the
-       log are also added to remove_snap.
+       log, and the divergent version of the object is removed (added
+       to remove_snap)
+     */
+    EXPECT_EQ(0x9U, remove_snap.front().get_hash());
+    EXPECT_EQ(log.head, info.last_update);
+    EXPECT_TRUE(info.purged_snaps.contains(purged_snap));
+    EXPECT_TRUE(is_dirty());
+    EXPECT_TRUE(dirty_info);
+    EXPECT_TRUE(dirty_big_info);
+  }
+
+  /*        +--------------------------+
+            |  log              olog   |
+            +--------+-------+---------+
+            |        |object |         |
+            |version | hash  | version |
+            |        |       |         |
+       tail > (1,1)  |  x5   |  (1,1)  < tail
+            |        |       |         |
+            |        |       |         |
+            | (1,2)  |  x3   |  (1,2)  < lower_bound
+            |        |       |         |
+            |        |       |         |
+       head > (1,3)  |  x9   |         |
+            | DELETE |       |         |
+            |        |       |         |
+            |        |  x9   |  (2,3)  |
+            |        |       |  MODIFY |
+            |        |       |         |
+            |        |  x7   |  (2,4)  < head
+            |        |       |  DELETE |
+            +--------+-------+---------+
+
+      The log entry (1,3) deletes the object x9 but the olog entry (2,3) modifies
+      it and is authoritative : the log entry (1,3) is divergent.
+
+  */
+  {
+    clear();
+
+    pg_log_t olog;
+    pg_info_t oinfo;
+    pg_shard_t fromosd;
+    pg_info_t info;
+    list<hobject_t> remove_snap;
+    bool dirty_info = false;
+    bool dirty_big_info = false;
+
+    hobject_t divergent_object;
+
+    {
+      pg_log_entry_t e;
+      e.mark_unrollbackable();
+
+      e.version = eversion_t(1, 1);
+      e.soid.set_hash(0x5);
+      log.tail = e.version;
+      log.log.push_back(e);
+      e.version = eversion_t(1, 2);
+      e.soid.set_hash(0x3);
+      log.log.push_back(e);
+      e.version = eversion_t(1,3);
+      e.soid.set_hash(0x9);
+      divergent_object = e.soid;
+      e.op = pg_log_entry_t::DELETE;
+      log.log.push_back(e);
+      log.head = e.version;
+      log.index();
+
+      info.last_update = log.head;
+
+      e.version = eversion_t(1, 1);
+      e.soid.set_hash(0x5);
+      olog.tail = e.version;
+      olog.log.push_back(e);
+      e.version = eversion_t(1, 2);
+      e.soid.set_hash(0x3);
+      olog.log.push_back(e);
+      e.version = eversion_t(2, 3);
+      e.soid.set_hash(0x9);
+      e.op = pg_log_entry_t::MODIFY;
+      olog.log.push_back(e);
+      e.version = eversion_t(2, 4);
+      e.soid.set_hash(0x7);
+      e.op = pg_log_entry_t::DELETE;
+      olog.log.push_back(e);
+      olog.head = e.version;
+    }
+
+    snapid_t purged_snap(1);
+    {
+      oinfo.last_update = olog.head;
+      oinfo.purged_snaps.insert(purged_snap);
+    }
+
+    EXPECT_FALSE(missing.have_missing());
+    EXPECT_EQ(1U, log.objects.count(divergent_object));
+    EXPECT_EQ(3U, log.log.size());
+    EXPECT_TRUE(remove_snap.empty());
+    EXPECT_EQ(log.head, info.last_update);
+    EXPECT_TRUE(info.purged_snaps.empty());
+    EXPECT_FALSE(is_dirty());
+    EXPECT_FALSE(dirty_info);
+    EXPECT_FALSE(dirty_big_info);
+
+    TestHandler h(remove_snap);
+    missing.may_include_deletes = false;
+    merge_log(oinfo, olog, fromosd, info, &h,
+              dirty_info, dirty_big_info);
+
+    /* When the divergent entry is a DELETE and the authoritative
+       entry is a MODIFY, the object will be added to missing : it is
+       a verifiable side effect proving the entry was identified
+       to be divergent.
+    */
+    EXPECT_TRUE(missing.is_missing(divergent_object));
+    EXPECT_EQ(1U, log.objects.count(divergent_object));
+    EXPECT_EQ(4U, log.log.size());
+    /* DELETE entries from olog that are appended to the hed of the
+       log, and the divergent version of the object is removed (added
+       to remove_snap). When peering handles deletes, it is the earlier
+       version that is in the removed list.
      */
     EXPECT_EQ(0x7U, remove_snap.front().get_hash());
     EXPECT_EQ(log.head, info.last_update);
@@ -1183,6 +1374,7 @@ TEST_F(PGLogTest, merge_log) {
     EXPECT_FALSE(dirty_big_info);
 
     TestHandler h(remove_snap);
+    missing.may_include_deletes = false;
     merge_log(oinfo, olog, fromosd, info, &h,
               dirty_info, dirty_big_info);
 
@@ -1238,6 +1430,7 @@ TEST_F(PGLogTest, proc_replica_log) {
     EXPECT_EQ(last_update, oinfo.last_update);
     EXPECT_EQ(last_complete, oinfo.last_complete);
 
+    missing.may_include_deletes = false;
     proc_replica_log(oinfo, olog, omissing, from);
 
     EXPECT_FALSE(omissing.have_missing());
@@ -1264,11 +1457,11 @@ TEST_F(PGLogTest, proc_replica_log) {
             |        |       |  DELETE |
             |        |       |         |
             +--------+-------+---------+
-           
+
       The log entry (1,3) deletes the object x9 and the olog entry
       (2,3) also deletes it : do nothing. The olog tail is ignored
       because it is before the log tail.
-      
+
   */
   {
     clear();
@@ -1311,6 +1504,7 @@ TEST_F(PGLogTest, proc_replica_log) {
     EXPECT_EQ(olog.head, oinfo.last_update);
     EXPECT_EQ(olog.head, oinfo.last_complete);
 
+    missing.may_include_deletes = false;
     proc_replica_log(oinfo, olog, omissing, from);
 
     EXPECT_FALSE(omissing.have_missing());
@@ -1412,6 +1606,7 @@ TEST_F(PGLogTest, proc_replica_log) {
     EXPECT_EQ(olog.head, oinfo.last_update);
     EXPECT_EQ(olog.head, oinfo.last_complete);
 
+    missing.may_include_deletes = false;
     proc_replica_log(oinfo, olog, omissing, from);
 
     EXPECT_TRUE(omissing.have_missing());
@@ -1498,6 +1693,7 @@ TEST_F(PGLogTest, proc_replica_log) {
     EXPECT_EQ(olog.head, oinfo.last_update);
     EXPECT_EQ(olog.head, oinfo.last_complete);
 
+    missing.may_include_deletes = false;
     proc_replica_log(oinfo, olog, omissing, from);
 
     EXPECT_TRUE(omissing.have_missing());
@@ -1572,7 +1768,7 @@ TEST_F(PGLogTest, proc_replica_log) {
       e.prior_version = eversion_t(1, 1);
       e.soid = divergent_object;
       divergent_object = e.soid;
-      omissing.add(divergent_object, e.version, eversion_t());
+      omissing.add(divergent_object, e.version, eversion_t(), false);
       e.op = pg_log_entry_t::MODIFY;
       olog.log.push_back(e);
       olog.head = e.version;
@@ -1587,6 +1783,7 @@ TEST_F(PGLogTest, proc_replica_log) {
     EXPECT_EQ(olog.head, oinfo.last_update);
     EXPECT_EQ(olog.head, oinfo.last_complete);
 
+    missing.may_include_deletes = false;
     proc_replica_log(oinfo, olog, omissing, from);
 
     EXPECT_TRUE(omissing.have_missing());
@@ -1665,7 +1862,7 @@ TEST_F(PGLogTest, proc_replica_log) {
       e.prior_version = eversion_t(1, 1);
       e.soid.set_hash(0x9);
       divergent_object = e.soid;
-      omissing.add(divergent_object, e.version, eversion_t());
+      omissing.add(divergent_object, e.version, eversion_t(), false);
       e.op = pg_log_entry_t::MODIFY;
       olog.log.push_back(e);
       olog.head = e.version;
@@ -1680,6 +1877,7 @@ TEST_F(PGLogTest, proc_replica_log) {
     EXPECT_EQ(olog.head, oinfo.last_update);
     EXPECT_EQ(olog.head, oinfo.last_complete);
 
+    missing.may_include_deletes = false;
     proc_replica_log(oinfo, olog, omissing, from);
 
     EXPECT_TRUE(omissing.have_missing());
@@ -1696,7 +1894,7 @@ TEST_F(PGLogTest, merge_log_1) {
 
   t.div.push_back(mk_ple_mod(mk_obj(1), mk_evt(10, 101), mk_evt(10, 100)));
 
-  t.final.add(mk_obj(1), mk_evt(10, 100), mk_evt(0, 0));
+  t.final.add(mk_obj(1), mk_evt(10, 100), mk_evt(0, 0), false);
 
   t.toremove.insert(mk_obj(1));
 
@@ -1725,7 +1923,7 @@ TEST_F(PGLogTest, merge_log_3) {
   t.div.push_back(mk_ple_mod(mk_obj(1), mk_evt(10, 101), mk_evt(10, 100)));
   t.div.push_back(mk_ple_mod_rb(mk_obj(1), mk_evt(10, 102), mk_evt(10, 101)));
 
-  t.final.add(mk_obj(1), mk_evt(10, 100), mk_evt(0, 0));
+  t.final.add(mk_obj(1), mk_evt(10, 100), mk_evt(0, 0), false);
 
   t.toremove.insert(mk_obj(1));
 
@@ -1740,8 +1938,8 @@ TEST_F(PGLogTest, merge_log_4) {
   t.div.push_back(mk_ple_mod_rb(mk_obj(1), mk_evt(10, 101), mk_evt(10, 100)));
   t.div.push_back(mk_ple_mod_rb(mk_obj(1), mk_evt(10, 102), mk_evt(10, 101)));
 
-  t.init.add(mk_obj(1), mk_evt(10, 102), mk_evt(0, 0));
-  t.final.add(mk_obj(1), mk_evt(10, 100), mk_evt(0, 0));
+  t.init.add(mk_obj(1), mk_evt(10, 102), mk_evt(0, 0), false);
+  t.final.add(mk_obj(1), mk_evt(10, 100), mk_evt(0, 0), false);
 
   t.setup();
   run_test_case(t);
@@ -1756,7 +1954,7 @@ TEST_F(PGLogTest, merge_log_5) {
 
   t.auth.push_back(mk_ple_mod(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100)));
 
-  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(0, 0));
+  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(0, 0), false);
 
   t.toremove.insert(mk_obj(1));
 
@@ -1770,7 +1968,7 @@ TEST_F(PGLogTest, merge_log_6) {
 
   t.auth.push_back(mk_ple_mod(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100)));
 
-  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100));
+  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100), false);
 
   t.setup();
   run_test_case(t);
@@ -1782,8 +1980,8 @@ TEST_F(PGLogTest, merge_log_7) {
 
   t.auth.push_back(mk_ple_mod(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100)));
 
-  t.init.add(mk_obj(1), mk_evt(10, 100), mk_evt(8, 80));
-  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(8, 80));
+  t.init.add(mk_obj(1), mk_evt(10, 100), mk_evt(8, 80), false);
+  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(8, 80), false);
 
   t.setup();
   run_test_case(t);
@@ -1795,9 +1993,35 @@ TEST_F(PGLogTest, merge_log_8) {
 
   t.auth.push_back(mk_ple_dt(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100)));
 
-  t.init.add(mk_obj(1), mk_evt(10, 100), mk_evt(8, 80));
+  t.init.add(mk_obj(1), mk_evt(10, 100), mk_evt(8, 80), false);
+  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(8, 80), true);
 
+  t.setup();
+  run_test_case(t);
+}
+
+TEST_F(PGLogTest, merge_log_9) {
+  TestCase t;
+  t.base.push_back(mk_ple_mod_rb(mk_obj(1), mk_evt(10, 100), mk_evt(8, 80)));
+
+  t.auth.push_back(mk_ple_dt(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100)));
+
+  t.init.add(mk_obj(1), mk_evt(10, 100), mk_evt(8, 80), false);
   t.toremove.insert(mk_obj(1));
+  t.deletes_during_peering = true;
+
+  t.setup();
+  run_test_case(t);
+}
+
+TEST_F(PGLogTest, merge_log_10) {
+  TestCase t;
+  t.base.push_back(mk_ple_mod_rb(mk_obj(1), mk_evt(10, 100), mk_evt(8, 80)));
+
+  t.auth.push_back(mk_ple_ldt(mk_obj(1), mk_evt(11, 101), mk_evt(10, 100)));
+
+  t.init.add(mk_obj(1), mk_evt(10, 100), mk_evt(8, 80), false);
+  t.final.add(mk_obj(1), mk_evt(11, 101), mk_evt(8, 80), true);
 
   t.setup();
   run_test_case(t);
@@ -1809,7 +2033,7 @@ TEST_F(PGLogTest, merge_log_prior_version_have) {
 
   t.div.push_back(mk_ple_mod(mk_obj(1), mk_evt(10, 101), mk_evt(10, 100)));
 
-  t.init.add(mk_obj(1), mk_evt(10, 101), mk_evt(10, 100));
+  t.init.add(mk_obj(1), mk_evt(10, 101), mk_evt(10, 100), false);
 
   t.setup();
   run_test_case(t);
@@ -1825,7 +2049,7 @@ TEST_F(PGLogTest, merge_log_split_missing_entries_at_head) {
   t.setup();
   t.set_div_bounds(mk_evt(9, 79), mk_evt(8, 69));
   t.set_auth_bounds(mk_evt(15, 160), mk_evt(9, 77));
-  t.final.add(mk_obj(1), mk_evt(15, 150), mk_evt(8, 70));
+  t.final.add(mk_obj(1), mk_evt(15, 150), mk_evt(8, 70), false);
   run_test_case(t);
 }
 
@@ -1838,7 +2062,7 @@ TEST_F(PGLogTest, olog_tail_gt_log_tail_split) {
   t.setup();
   t.set_div_bounds(mk_evt(15, 153), mk_evt(15, 151));
   t.set_auth_bounds(mk_evt(15, 156), mk_evt(10, 99));
-  t.final.add(mk_obj(1), mk_evt(15, 155), mk_evt(15, 150));
+  t.final.add(mk_obj(1), mk_evt(15, 155), mk_evt(15, 150), false);
   run_test_case(t);
 }
 
@@ -1852,7 +2076,7 @@ TEST_F(PGLogTest, olog_tail_gt_log_tail_split2) {
   t.setup();
   t.set_div_bounds(mk_evt(15, 153), mk_evt(15, 151));
   t.set_auth_bounds(mk_evt(16, 156), mk_evt(10, 99));
-  t.final.add(mk_obj(1), mk_evt(16, 155), mk_evt(0, 0));
+  t.final.add(mk_obj(1), mk_evt(16, 155), mk_evt(0, 0), false);
   t.toremove.insert(mk_obj(1));
   run_test_case(t);
 }
@@ -1863,7 +2087,7 @@ TEST_F(PGLogTest, filter_log_1) {
 
     int osd_id = 1;
     epoch_t epoch = 40;
-    int64_t pool_id = 0;
+    int64_t pool_id = 1;
     int bits = 2;
     int max_osd = 4;
     int pg_num = max_osd << bits;
@@ -1874,7 +2098,7 @@ TEST_F(PGLogTest, filter_log_1) {
     OSDMap *osdmap = new OSDMap;
     uuid_d test_uuid;
     test_uuid.generate_random();
-    osdmap->build_simple(g_ceph_context, epoch, test_uuid, max_osd, bits, bits);
+    osdmap->build_simple_with_pool(g_ceph_context, epoch, test_uuid, max_osd, bits, bits);
     osdmap->set_state(osd_id, CEPH_OSD_EXISTS);
 
     const string hit_set_namespace("internal");
@@ -2064,6 +2288,698 @@ TEST_F(PGLogTest, ErrorNotIndexedByObject) {
   EXPECT_EQ(del.reqid, entry->reqid);
 }
 
+TEST_F(PGLogTest, split_into_preserves_may_include_deletes) {
+  clear();
+
+  {
+    rebuilt_missing_with_deletes = false;
+    missing.may_include_deletes = true;
+    PGLog child_log(cct, prefix_provider);
+    pg_t child_pg;
+    split_into(child_pg, 6, &child_log);
+    ASSERT_TRUE(child_log.get_missing().may_include_deletes);
+    ASSERT_TRUE(child_log.get_rebuilt_missing_with_deletes());
+  }
+
+  {
+    rebuilt_missing_with_deletes = false;
+    missing.may_include_deletes = false;
+    PGLog child_log(cct, prefix_provider);
+    pg_t child_pg;
+    split_into(child_pg, 6, &child_log);
+    ASSERT_FALSE(child_log.get_missing().may_include_deletes);
+    ASSERT_FALSE(child_log.get_rebuilt_missing_with_deletes());
+  }
+}
+
+class PGLogTestRebuildMissing : public PGLogTest, public StoreTestFixture {
+public:
+  PGLogTestRebuildMissing() : PGLogTest(), StoreTestFixture("memstore") {}
+  void SetUp() override {
+    StoreTestFixture::SetUp();
+    ObjectStore::Sequencer osr(__func__);
+    ObjectStore::Transaction t;
+    test_coll = coll_t(spg_t(pg_t(1, 1)));
+    t.create_collection(test_coll, 0);
+    store->apply_transaction(&osr, std::move(t));
+    existing_oid = mk_obj(0);
+    nonexistent_oid = mk_obj(1);
+    ghobject_t existing_ghobj(existing_oid);
+    object_info_t existing_info;
+    existing_info.version = eversion_t(6, 2);
+    bufferlist enc_oi;
+    ::encode(existing_info, enc_oi, 0);
+    ObjectStore::Transaction t2;
+    t2.touch(test_coll, ghobject_t(existing_oid));
+    t2.setattr(test_coll, ghobject_t(existing_oid), OI_ATTR, enc_oi);
+    ASSERT_EQ(0u, store->apply_transaction(&osr, std::move(t2)));
+    info.last_backfill = hobject_t::get_max();
+    info.last_complete = eversion_t();
+  }
+
+  void TearDown() override {
+    clear();
+    missing.may_include_deletes = false;
+    StoreTestFixture::TearDown();
+  }
+
+  pg_info_t info;
+  coll_t test_coll;
+  hobject_t existing_oid, nonexistent_oid;
+
+  void run_rebuild_missing_test(const map<hobject_t, pg_missing_item> &expected_missing_items) {
+    rebuild_missing_set_with_deletes(store.get(), test_coll, info);
+    ASSERT_EQ(expected_missing_items, missing.get_items());
+  }
+};
+
+TEST_F(PGLogTestRebuildMissing, EmptyLog) {
+  missing.add(existing_oid, mk_evt(6, 2), mk_evt(6, 3), false);
+  missing.add(nonexistent_oid, mk_evt(7, 4), mk_evt(0, 0), false);
+  map<hobject_t, pg_missing_item> orig_missing = missing.get_items();
+  run_rebuild_missing_test(orig_missing);
+}
+
+TEST_F(PGLogTestRebuildMissing, SameVersionMod) {
+  missing.add(existing_oid, mk_evt(6, 2), mk_evt(6, 1), false);
+  log.add(mk_ple_mod(existing_oid, mk_evt(6, 2), mk_evt(6, 1)));
+  map<hobject_t, pg_missing_item> empty_missing;
+  run_rebuild_missing_test(empty_missing);
+}
+
+TEST_F(PGLogTestRebuildMissing, DelExisting) {
+  missing.add(existing_oid, mk_evt(6, 3), mk_evt(6, 2), false);
+  log.add(mk_ple_dt(existing_oid, mk_evt(7, 5), mk_evt(7, 4)));
+  map<hobject_t, pg_missing_item> expected;
+  expected[existing_oid] = pg_missing_item(mk_evt(7, 5), mk_evt(6, 2), true);
+  run_rebuild_missing_test(expected);
+}
+
+TEST_F(PGLogTestRebuildMissing, DelNonexistent) {
+  log.add(mk_ple_dt(nonexistent_oid, mk_evt(7, 5), mk_evt(7, 4)));
+  map<hobject_t, pg_missing_item> expected;
+  expected[nonexistent_oid] = pg_missing_item(mk_evt(7, 5), mk_evt(0, 0), true);
+  run_rebuild_missing_test(expected);
+}
+
+TEST_F(PGLogTestRebuildMissing, MissingNotInLog) {
+  missing.add(mk_obj(10), mk_evt(8, 12), mk_evt(8, 10), false);
+  log.add(mk_ple_dt(nonexistent_oid, mk_evt(7, 5), mk_evt(7, 4)));
+  map<hobject_t, pg_missing_item> expected;
+  expected[nonexistent_oid] = pg_missing_item(mk_evt(7, 5), mk_evt(0, 0), true);
+  expected[mk_obj(10)] = pg_missing_item(mk_evt(8, 12), mk_evt(8, 10), false);
+  run_rebuild_missing_test(expected);
+}
+
+
+class PGLogMergeDupsTest : public ::testing::Test, protected PGLog {
+
+public:
+
+  PGLogMergeDupsTest() : PGLog(g_ceph_context) { }
+
+  void SetUp() override { }
+
+  void TearDown() override {
+    clear();
+  }
+
+  static pg_log_dup_t create_dup_entry(uint a, uint b) {
+    // make each dup_entry unique by using different client id's
+    static uint client_id = 777;
+    return pg_log_dup_t(eversion_t(a, b),
+                       a,
+                       osd_reqid_t(entity_name_t::CLIENT(client_id++), 8, 1),
+                       0);
+  }
+
+  static std::vector<pg_log_dup_t> example_dups_1() {
+    std::vector<pg_log_dup_t> result = {
+      create_dup_entry(10, 11),
+      create_dup_entry(10, 12),
+      create_dup_entry(11, 1),
+      create_dup_entry(12, 3),
+      create_dup_entry(13, 99)
+    };
+    return result;
+  }
+
+  static std::vector<pg_log_dup_t> example_dups_2() {
+    std::vector<pg_log_dup_t> result = {
+      create_dup_entry(12, 3),
+      create_dup_entry(13, 99),
+      create_dup_entry(15, 11),
+      create_dup_entry(16, 14),
+      create_dup_entry(16, 32)
+    };
+    return result;
+  }
+
+  void add_dups(uint a, uint b) {
+    log.dups.push_back(create_dup_entry(a, b));
+  }
+
+  void add_dups(const std::vector<pg_log_dup_t>& l) {
+    for (auto& i : l) {
+      log.dups.push_back(i);
+    }
+  }
+
+  static void add_dups(IndexedLog& log, const std::vector<pg_log_dup_t>& dups) {
+    for (auto& i : dups) {
+      log.dups.push_back(i);
+    }
+  }
+
+  void check_order() {
+    eversion_t prev(0, 0);
+
+    for (auto& i : log.dups) {
+      EXPECT_LT(prev, i.version) << "verify versions monotonically increase";
+      prev = i.version;
+    }
+  }
+
+  void check_index() {
+    EXPECT_EQ(log.dups.size(), log.dup_index.size());
+    for (auto& i : log.dups) {
+      EXPECT_EQ(1u, log.dup_index.count(i.reqid));
+    }
+  }
+};
+
+TEST_F(PGLogMergeDupsTest, OtherEmpty) {
+  log.tail = eversion_t(14, 5);
+
+  IndexedLog olog;
+
+  add_dups(example_dups_1());
+  index();
+
+  bool changed = merge_log_dups(olog);
+
+  EXPECT_FALSE(changed);
+  EXPECT_EQ(5u, log.dups.size());
+
+  if (5 == log.dups.size()) {
+    EXPECT_EQ(10u, log.dups.front().version.epoch);
+    EXPECT_EQ(11u, log.dups.front().version.version);
+    EXPECT_EQ(13u, log.dups.back().version.epoch);
+    EXPECT_EQ(99u, log.dups.back().version.version);
+  }
+
+  check_order();
+  check_index();
+}
+
+TEST_F(PGLogMergeDupsTest, AmEmpty) {
+  log.tail = eversion_t(14, 5);
+  index();
+
+  IndexedLog olog;
+
+  add_dups(olog, example_dups_1());
+
+  bool changed = merge_log_dups(olog);
+
+  EXPECT_TRUE(changed);
+  EXPECT_EQ(5u, log.dups.size());
+
+  if (5 == log.dups.size()) {
+    EXPECT_EQ(10u, log.dups.front().version.epoch);
+    EXPECT_EQ(11u, log.dups.front().version.version);
+
+    EXPECT_EQ(13u, log.dups.back().version.epoch);
+    EXPECT_EQ(99u, log.dups.back().version.version);
+  }
+
+  check_order();
+  check_index();
+}
+
+TEST_F(PGLogMergeDupsTest, AmEmptyOverlap) {
+  log.tail = eversion_t(12, 3);
+  index();
+
+  IndexedLog olog;
+
+  add_dups(olog, example_dups_1());
+
+  bool changed = merge_log_dups(olog);
+
+  EXPECT_TRUE(changed);
+  EXPECT_EQ(3u, log.dups.size());
+
+  if (3 == log.dups.size()) {
+    EXPECT_EQ(10u, log.dups.front().version.epoch);
+    EXPECT_EQ(11u, log.dups.front().version.version);
+
+    EXPECT_EQ(11u, log.dups.back().version.epoch);
+    EXPECT_EQ(1u, log.dups.back().version.version);
+  }
+
+  check_order();
+  check_index();
+}
+
+TEST_F(PGLogMergeDupsTest, Same) {
+  log.tail = eversion_t(14, 1);
+
+  IndexedLog olog;
+
+  add_dups(example_dups_1());
+  index();
+  add_dups(olog, example_dups_1());
+
+  bool changed = merge_log_dups(olog);
+
+  EXPECT_FALSE(changed);
+  EXPECT_EQ(5u, log.dups.size());
+
+  if (5 == log.dups.size()) {
+    EXPECT_EQ(10u, log.dups.front().version.epoch);
+    EXPECT_EQ(11u, log.dups.front().version.version);
+
+    EXPECT_EQ(13u, log.dups.back().version.epoch);
+    EXPECT_EQ(99u, log.dups.back().version.version);
+  }
+
+  check_order();
+  check_index();
+}
+
+
+TEST_F(PGLogMergeDupsTest, Later) {
+  log.tail = eversion_t(16, 14);
+
+  IndexedLog olog;
+
+  add_dups(example_dups_1());
+  index();
+  add_dups(olog, example_dups_2());
+
+  bool changed = merge_log_dups(olog);
+
+  EXPECT_TRUE(changed);
+  EXPECT_EQ(6u, log.dups.size());
+
+  if (6 == log.dups.size()) {
+    EXPECT_EQ(10u, log.dups.front().version.epoch);
+    EXPECT_EQ(11u, log.dups.front().version.version);
+
+    EXPECT_EQ(15u, log.dups.back().version.epoch);
+    EXPECT_EQ(11u, log.dups.back().version.version);
+  }
+
+  check_order();
+  check_index();
+}
+
+
+TEST_F(PGLogMergeDupsTest, Earlier) {
+  log.tail = eversion_t(17, 2);
+
+  IndexedLog olog;
+
+  add_dups(example_dups_2());
+  index();
+  add_dups(olog, example_dups_1());
+
+  bool changed = merge_log_dups(olog);
+
+  EXPECT_TRUE(changed);
+  EXPECT_EQ(8u, log.dups.size());
+
+  if (6 == log.dups.size()) {
+    EXPECT_EQ(10u, log.dups.front().version.epoch);
+    EXPECT_EQ(11u, log.dups.front().version.version);
+
+    EXPECT_EQ(16u, log.dups.back().version.epoch);
+    EXPECT_EQ(32u, log.dups.back().version.version);
+  }
+
+  check_order();
+  check_index();
+}
+
+
+TEST_F(PGLogMergeDupsTest, Superset) {
+  log.tail = eversion_t(17, 2);
+
+  IndexedLog olog;
+
+  add_dups(example_dups_1());
+  index();
+
+  olog.dups.push_back(create_dup_entry(9, 5));
+  olog.dups.push_back(create_dup_entry(15, 11));
+
+  bool changed = merge_log_dups(olog);
+
+  EXPECT_TRUE(changed);
+  EXPECT_EQ(7u, log.dups.size());
+
+  if (7 == log.dups.size()) {
+    EXPECT_EQ(9u, log.dups.front().version.epoch);
+    EXPECT_EQ(5u, log.dups.front().version.version);
+
+    EXPECT_EQ(15u, log.dups.back().version.epoch);
+    EXPECT_EQ(11u, log.dups.back().version.version);
+  }
+
+  check_order();
+  check_index();
+}
+
+
+struct PGLogTrimTest :
+  public ::testing::Test,
+  public PGLogTestBase,
+  public PGLog::IndexedLog
+{
+  std::list<hobject_t*> test_hobjects;
+  CephContext *cct;
+
+  void SetUp() override {
+    cct = (new CephContext(CEPH_ENTITY_TYPE_OSD))->get();
+
+    hobject_t::generate_test_instances(test_hobjects);
+  }
+
+  void SetUp(unsigned min_entries, unsigned max_entries, unsigned dup_track) {
+    constexpr size_t size = 10;
+
+    char min_entries_s[size];
+    char max_entries_s[size];
+    char dup_track_s[size];
+
+    snprintf(min_entries_s, size, "%u", min_entries);
+    snprintf(max_entries_s, size, "%u", max_entries);
+    snprintf(dup_track_s, size, "%u", dup_track);
+
+    cct->_conf->set_val_or_die("osd_min_pg_log_entries", min_entries_s);
+    cct->_conf->set_val_or_die("osd_max_pg_log_entries", max_entries_s);
+    cct->_conf->set_val_or_die("osd_pg_log_dups_tracked", dup_track_s);
+}
+
+  void TearDown() override {
+    while (!test_hobjects.empty()) {
+      delete test_hobjects.front();
+      test_hobjects.pop_front();
+    }
+
+    cct->put();
+  }
+}; // struct PGLogTrimTest
+
+
+# if 0
+TEST_F(PGLogTest, Trim1) {
+  TestCase t;
+
+  t.auth.push_back(mk_ple_mod(mk_obj(1), mk_evt(10, 100), mk_evt(8, 70)));
+  t.auth.push_back(mk_ple_mod(mk_obj(1), mk_evt(15, 150), mk_evt(10, 100)));
+  t.auth.push_back(mk_ple_mod(mk_obj(1), mk_evt(15, 155), mk_evt(15, 150)));
+  t.auth.push_back(mk_ple_mod(mk_obj(1), mk_evt(20, 160), mk_evt(25, 152)));
+  t.auth.push_back(mk_ple_mod(mk_obj(1), mk_evt(21, 165), mk_evt(26, 160)));
+  t.auth.push_back(mk_ple_mod(mk_obj(1), mk_evt(21, 165), mk_evt(31, 171)));
+
+  t.setup();
+}
+#endif
+
+
+TEST_F(PGLogTrimTest, TestMakingCephContext)
+{
+  SetUp(1, 2, 5);
+
+  EXPECT_EQ(1u, cct->_conf->osd_min_pg_log_entries);
+  EXPECT_EQ(2u, cct->_conf->osd_max_pg_log_entries);
+  EXPECT_EQ(5u, cct->_conf->osd_pg_log_dups_tracked);
+}
+
+
+TEST_F(PGLogTrimTest, TestPartialTrim)
+{
+  SetUp(1, 2, 20);
+  PGLog::IndexedLog log;
+  log.head = mk_evt(24, 0);
+  log.skip_can_rollback_to_to_head();
+  log.head = mk_evt(9, 0);
+
+  log.add(mk_ple_mod(mk_obj(1), mk_evt(10, 100), mk_evt(8, 70)));
+  log.add(mk_ple_dt(mk_obj(2), mk_evt(15, 150), mk_evt(10, 100)));
+  log.add(mk_ple_mod_rb(mk_obj(3), mk_evt(15, 155), mk_evt(15, 150)));
+  log.add(mk_ple_mod(mk_obj(1), mk_evt(19, 160), mk_evt(25, 152)));
+  log.add(mk_ple_mod(mk_obj(4), mk_evt(21, 165), mk_evt(26, 160)));
+  log.add(mk_ple_dt_rb(mk_obj(5), mk_evt(21, 167), mk_evt(31, 166)));
+
+  std::set<eversion_t> trimmed;
+  std::set<std::string> trimmed_dups;
+  bool dirty_dups = false;
+
+  log.trim(cct, mk_evt(19, 157), &trimmed, &trimmed_dups, &dirty_dups);
+
+  EXPECT_EQ(true, dirty_dups);
+  EXPECT_EQ(3u, log.log.size());
+  EXPECT_EQ(3u, trimmed.size());
+  EXPECT_EQ(2u, log.dups.size());
+  EXPECT_EQ(0u, trimmed_dups.size());
+
+  SetUp(1, 2, 15);
+
+  std::set<eversion_t> trimmed2;
+  std::set<std::string> trimmed_dups2;
+  bool dirty_dups2 = false;
+  
+  log.trim(cct, mk_evt(20, 164), &trimmed2, &trimmed_dups2, &dirty_dups2);
+
+  EXPECT_EQ(true, dirty_dups2);
+  EXPECT_EQ(2u, log.log.size());
+  EXPECT_EQ(1u, trimmed2.size());
+  EXPECT_EQ(2u, log.dups.size());
+  EXPECT_EQ(1u, trimmed_dups2.size());
+}
+
+
+TEST_F(PGLogTrimTest, TestTrimNoTrimmed) {
+  SetUp(1, 2, 20);
+  PGLog::IndexedLog log;
+  log.head = mk_evt(20, 0);
+  log.skip_can_rollback_to_to_head();
+  log.head = mk_evt(9, 0);
+
+  log.add(mk_ple_mod(mk_obj(1), mk_evt(10, 100), mk_evt(8, 70)));
+  log.add(mk_ple_dt(mk_obj(2), mk_evt(15, 150), mk_evt(10, 100)));
+  log.add(mk_ple_mod_rb(mk_obj(3), mk_evt(15, 155), mk_evt(15, 150)));
+  log.add(mk_ple_mod(mk_obj(1), mk_evt(20, 160), mk_evt(25, 152)));
+  log.add(mk_ple_mod(mk_obj(4), mk_evt(21, 165), mk_evt(26, 160)));
+  log.add(mk_ple_dt_rb(mk_obj(5), mk_evt(21, 167), mk_evt(31, 166)));
+
+  bool dirty_dups = false;
+
+  log.trim(cct, mk_evt(19, 157), nullptr, nullptr, &dirty_dups);
+
+  EXPECT_EQ(true, dirty_dups);
+  EXPECT_EQ(3u, log.log.size());
+  EXPECT_EQ(2u, log.dups.size());
+}
+
+
+TEST_F(PGLogTrimTest, TestTrimNoDups)
+{
+  SetUp(1, 2, 10);
+  PGLog::IndexedLog log;
+  log.head = mk_evt(20, 0);
+  log.skip_can_rollback_to_to_head();
+  log.head = mk_evt(9, 0);
+
+  log.add(mk_ple_mod(mk_obj(1), mk_evt(10, 100), mk_evt(8, 70)));
+  log.add(mk_ple_dt(mk_obj(2), mk_evt(15, 150), mk_evt(10, 100)));
+  log.add(mk_ple_mod_rb(mk_obj(3), mk_evt(15, 155), mk_evt(15, 150)));
+  log.add(mk_ple_mod(mk_obj(1), mk_evt(20, 160), mk_evt(25, 152)));
+  log.add(mk_ple_mod(mk_obj(4), mk_evt(21, 165), mk_evt(26, 160)));
+  log.add(mk_ple_dt_rb(mk_obj(5), mk_evt(21, 167), mk_evt(31, 166)));
+
+  std::set<eversion_t> trimmed;
+  std::set<std::string> trimmed_dups;
+  bool dirty_dups = false;
+
+  log.trim(cct, mk_evt(19, 157), &trimmed, &trimmed_dups, &dirty_dups);
+
+  EXPECT_FALSE(dirty_dups);
+  EXPECT_EQ(3u, log.log.size());
+  EXPECT_EQ(3u, trimmed.size());
+  EXPECT_EQ(0u, log.dups.size());
+  EXPECT_EQ(0u, trimmed_dups.size());
+}
+
+TEST_F(PGLogTrimTest, TestNoTrim)
+{
+  SetUp(1, 2, 20);
+  PGLog::IndexedLog log;
+  log.head = mk_evt(24, 0);
+  log.skip_can_rollback_to_to_head();
+  log.head = mk_evt(9, 0);
+
+  log.add(mk_ple_mod(mk_obj(1), mk_evt(10, 100), mk_evt(8, 70)));
+  log.add(mk_ple_dt(mk_obj(2), mk_evt(15, 150), mk_evt(10, 100)));
+  log.add(mk_ple_mod_rb(mk_obj(3), mk_evt(15, 155), mk_evt(15, 150)));
+  log.add(mk_ple_mod(mk_obj(1), mk_evt(19, 160), mk_evt(25, 152)));
+  log.add(mk_ple_mod(mk_obj(4), mk_evt(21, 165), mk_evt(26, 160)));
+  log.add(mk_ple_dt_rb(mk_obj(5), mk_evt(21, 167), mk_evt(31, 166)));
+
+  std::set<eversion_t> trimmed;
+  std::set<std::string> trimmed_dups;
+  bool dirty_dups = false;
+
+  log.trim(cct, mk_evt(9, 99), &trimmed, &trimmed_dups, &dirty_dups);
+
+  EXPECT_FALSE(dirty_dups);
+  EXPECT_EQ(6u, log.log.size());
+  EXPECT_EQ(0u, trimmed.size());
+  EXPECT_EQ(0u, log.dups.size());
+  EXPECT_EQ(0u, trimmed_dups.size());
+}
+
+TEST_F(PGLogTrimTest, TestTrimAll)
+{
+  SetUp(1, 2, 20);
+  PGLog::IndexedLog log;
+  log.head = mk_evt(24, 0);
+  log.skip_can_rollback_to_to_head();
+  log.head = mk_evt(9, 0);
+
+  log.add(mk_ple_mod(mk_obj(1), mk_evt(10, 100), mk_evt(8, 70)));
+  log.add(mk_ple_dt(mk_obj(2), mk_evt(15, 150), mk_evt(10, 100)));
+  log.add(mk_ple_mod_rb(mk_obj(3), mk_evt(15, 155), mk_evt(15, 150)));
+  log.add(mk_ple_mod(mk_obj(1), mk_evt(19, 160), mk_evt(25, 152)));
+  log.add(mk_ple_mod(mk_obj(4), mk_evt(21, 165), mk_evt(26, 160)));
+  log.add(mk_ple_dt_rb(mk_obj(5), mk_evt(21, 167), mk_evt(31, 166)));
+
+  std::set<eversion_t> trimmed;
+  std::set<std::string> trimmed_dups;
+  bool dirty_dups = false;
+
+  log.trim(cct, mk_evt(22, 180), &trimmed, &trimmed_dups, &dirty_dups);
+
+  EXPECT_EQ(true, dirty_dups);
+  EXPECT_EQ(0u, log.log.size());
+  EXPECT_EQ(6u, trimmed.size());
+  EXPECT_EQ(5u, log.dups.size());
+  EXPECT_EQ(0u, trimmed_dups.size());
+}
+
+
+TEST_F(PGLogTrimTest, TestGetRequest) {
+  SetUp(1, 2, 20);
+  PGLog::IndexedLog log;
+  log.head = mk_evt(20, 0);
+  log.skip_can_rollback_to_to_head();
+  log.head = mk_evt(9, 0);
+
+  entity_name_t client = entity_name_t::CLIENT(777);
+
+  log.add(mk_ple_mod(mk_obj(1), mk_evt(10, 100), mk_evt(8, 70),
+                    osd_reqid_t(client, 8, 1)));
+  log.add(mk_ple_dt(mk_obj(2), mk_evt(15, 150), mk_evt(10, 100),
+                   osd_reqid_t(client, 8, 2)));
+  log.add(mk_ple_mod_rb(mk_obj(3), mk_evt(15, 155), mk_evt(15, 150),
+                       osd_reqid_t(client, 8, 3)));
+  log.add(mk_ple_mod(mk_obj(1), mk_evt(20, 160), mk_evt(25, 152),
+                    osd_reqid_t(client, 8, 4)));
+  log.add(mk_ple_mod(mk_obj(4), mk_evt(21, 165), mk_evt(26, 160),
+                    osd_reqid_t(client, 8, 5)));
+  log.add(mk_ple_dt_rb(mk_obj(5), mk_evt(21, 167), mk_evt(31, 166),
+                      osd_reqid_t(client, 8, 6)));
+
+  bool dirty_dups = false;
+
+  log.trim(cct, mk_evt(19, 157), nullptr, nullptr, &dirty_dups);
+
+  EXPECT_EQ(true, dirty_dups);
+  EXPECT_EQ(3u, log.log.size());
+  EXPECT_EQ(2u, log.dups.size());
+
+  eversion_t version;
+  version_t user_version;
+  int return_code;
+
+  osd_reqid_t log_reqid = osd_reqid_t(client, 8, 5);
+  osd_reqid_t dup_reqid = osd_reqid_t(client, 8, 3);
+  osd_reqid_t bad_reqid = osd_reqid_t(client, 8, 1);
+
+  bool result;
+
+  result = log.get_request(log_reqid, &version, &user_version, &return_code);
+  EXPECT_EQ(true, result);
+  EXPECT_EQ(mk_evt(21, 165), version);
+
+  result = log.get_request(dup_reqid, &version, &user_version, &return_code);
+  EXPECT_EQ(true, result);
+  EXPECT_EQ(mk_evt(15, 155), version);
+
+  result = log.get_request(bad_reqid, &version, &user_version, &return_code);
+  EXPECT_FALSE(result);
+}
+
+TEST_F(PGLogTest, _merge_object_divergent_entries) {
+  {
+    // Test for issue 20843
+    clear();
+    hobject_t hoid(object_t(/*name*/"notify.7"),
+                   /*key*/string(""),
+                   /*snap*/7,
+                   /*hash*/77,
+                   /*pool*/5,
+                   /*nspace*/string(""));
+    mempool::osd_pglog::list<pg_log_entry_t> orig_entries;
+    orig_entries.push_back(mk_ple_mod(hoid, eversion_t(8336, 957), eversion_t(8336, 952)));
+    orig_entries.push_back(mk_ple_err(hoid, eversion_t(8336, 958)));
+    orig_entries.push_back(mk_ple_err(hoid, eversion_t(8336, 959)));
+    orig_entries.push_back(mk_ple_mod(hoid, eversion_t(8336, 960), eversion_t(8336, 957)));
+    log.add(mk_ple_mod(hoid, eversion_t(8973, 1075), eversion_t(8971, 1070)));
+    missing.add(hoid,
+                /*need*/eversion_t(8971, 1070),
+                /*have*/eversion_t(8336, 952),
+                false);
+    pg_info_t oinfo;
+    LogHandler rollbacker;
+    _merge_object_divergent_entries(log, hoid,
+                                    orig_entries, oinfo,
+                                    log.get_can_rollback_to(),
+                                    missing, &rollbacker,
+                                    this);
+    // No core dump
+  }
+  {
+    // skip leading error entries
+    clear();
+    hobject_t hoid(object_t(/*name*/"notify.7"),
+                   /*key*/string(""),
+                   /*snap*/7,
+                   /*hash*/77,
+                   /*pool*/5,
+                   /*nspace*/string(""));
+    mempool::osd_pglog::list<pg_log_entry_t> orig_entries;
+    orig_entries.push_back(mk_ple_err(hoid, eversion_t(8336, 956)));
+    orig_entries.push_back(mk_ple_mod(hoid, eversion_t(8336, 957), eversion_t(8336, 952)));
+    log.add(mk_ple_mod(hoid, eversion_t(8973, 1075), eversion_t(8971, 1070)));
+    missing.add(hoid,
+                /*need*/eversion_t(8971, 1070),
+                /*have*/eversion_t(8336, 952),
+                false);
+    pg_info_t oinfo;
+    LogHandler rollbacker;
+    _merge_object_divergent_entries(log, hoid,
+                                    orig_entries, oinfo,
+                                    log.get_can_rollback_to(),
+                                    missing, &rollbacker,
+                                    this);
+    // No core dump
+  }
+}
+
 // Local Variables:
 // compile-command: "cd ../.. ; make unittest_pglog ; ./unittest_pglog --log-to-stderr=true  --debug-osd=20 # --gtest_filter=*.* "
 // End: