]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/compaction/compaction_iterator_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_iterator_test.cc
index 0885da5c5de0ce00a4ead79783be91160a4de8d6..81362d792159c65be0aa66e4d7c44318662ece51 100644 (file)
@@ -3,15 +3,17 @@
 //  COPYING file in the root directory) and Apache 2.0 License
 //  (found in the LICENSE.Apache file in the root directory).
 
+#include "db/compaction/compaction_iterator.h"
 
 #include <string>
 #include <vector>
 
-#include "db/compaction/compaction_iterator.h"
+#include "db/dbformat.h"
 #include "port/port.h"
 #include "test_util/testharness.h"
 #include "test_util/testutil.h"
 #include "util/string_util.h"
+#include "util/vector_iterator.h"
 #include "utilities/merge_operators.h"
 
 namespace ROCKSDB_NAMESPACE {
@@ -38,7 +40,7 @@ class NoMergingMergeOp : public MergeOperator {
 
 // Compaction filter that gets stuck when it sees a particular key,
 // then gets unstuck when told to.
-// Always returns Decition::kRemove.
+// Always returns Decision::kRemove.
 class StallingFilter : public CompactionFilter {
  public:
   Decision FilterV2(int /*level*/, const Slice& key, ValueType /*type*/,
@@ -86,7 +88,7 @@ class FilterAllKeysCompactionFilter : public CompactionFilter {
   const char* Name() const override { return "AllKeysCompactionFilter"; }
 };
 
-class LoggingForwardVectorIterator : public InternalIterator {
+class LoggingForwardVectorIterator : public VectorIterator {
  public:
   struct Action {
     enum class Type {
@@ -108,22 +110,19 @@ class LoggingForwardVectorIterator : public InternalIterator {
 
   LoggingForwardVectorIterator(const std::vector<std::string>& keys,
                                const std::vector<std::string>& values)
-      : keys_(keys), values_(values), current_(keys.size()) {
-    assert(keys_.size() == values_.size());
+      : VectorIterator(keys, values) {
+    current_ = keys_.size();
   }
 
-  bool Valid() const override { return current_ < keys_.size(); }
-
   void SeekToFirst() override {
     log.emplace_back(Action::Type::SEEK_TO_FIRST);
-    current_ = 0;
+    VectorIterator::SeekToFirst();
   }
   void SeekToLast() override { assert(false); }
 
   void Seek(const Slice& target) override {
     log.emplace_back(Action::Type::SEEK, target.ToString());
-    current_ = std::lower_bound(keys_.begin(), keys_.end(), target.ToString()) -
-               keys_.begin();
+    VectorIterator::Seek(target);
   }
 
   void SeekForPrev(const Slice& /*target*/) override { assert(false); }
@@ -131,27 +130,20 @@ class LoggingForwardVectorIterator : public InternalIterator {
   void Next() override {
     assert(Valid());
     log.emplace_back(Action::Type::NEXT);
-    current_++;
+    VectorIterator::Next();
   }
   void Prev() override { assert(false); }
 
   Slice key() const override {
     assert(Valid());
-    return Slice(keys_[current_]);
+    return VectorIterator::key();
   }
   Slice value() const override {
     assert(Valid());
-    return Slice(values_[current_]);
+    return VectorIterator::value();
   }
 
-  Status status() const override { return Status::OK(); }
-
   std::vector<Action> log;
-
- private:
-  std::vector<std::string> keys_;
-  std::vector<std::string> values_;
-  size_t current_;
 };
 
 class FakeCompaction : public CompactionIterator::CompactionProxy {
@@ -174,22 +166,45 @@ class FakeCompaction : public CompactionIterator::CompactionProxy {
 
   bool allow_ingest_behind() const override { return is_allow_ingest_behind; }
 
-  bool preserve_deletes() const override { return false; }
+  bool allow_mmap_reads() const override { return false; }
+
+  bool enable_blob_garbage_collection() const override { return false; }
+
+  double blob_garbage_collection_age_cutoff() const override { return 0.0; }
+
+  uint64_t blob_compaction_readahead_size() const override { return 0; }
+
+  const Version* input_version() const override { return nullptr; }
+
+  bool DoesInputReferenceBlobFiles() const override { return false; }
+
+  const Compaction* real_compaction() const override { return nullptr; }
+
+  bool SupportsPerKeyPlacement() const override {
+    return supports_per_key_placement;
+  }
+
+  bool WithinPenultimateLevelOutputRange(const Slice& key) const override {
+    return (!key.starts_with("unsafe_pb"));
+  }
 
   bool key_not_exists_beyond_output_level = false;
 
   bool is_bottommost_level = false;
 
   bool is_allow_ingest_behind = false;
+
+  bool supports_per_key_placement = false;
 };
 
-// A simplifed snapshot checker which assumes each snapshot has a global
+// A simplified snapshot checker which assumes each snapshot has a global
 // last visible sequence.
 class TestSnapshotChecker : public SnapshotChecker {
  public:
   explicit TestSnapshotChecker(
       SequenceNumber last_committed_sequence,
-      const std::unordered_map<SequenceNumber, SequenceNumber>& snapshots = {{}})
+      const std::unordered_map<SequenceNumber, SequenceNumber>& snapshots =
+          {{}})
       : last_committed_sequence_(last_committed_sequence),
         snapshots_(snapshots) {}
 
@@ -234,7 +249,7 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
       bool key_not_exists_beyond_output_level = false,
       const std::string* full_history_ts_low = nullptr) {
     std::unique_ptr<InternalIterator> unfragmented_range_del_iter(
-        new test::VectorIterator(range_del_ks, range_del_vs));
+        new VectorIterator(range_del_ks, range_del_vs, &icmp_));
     auto tombstone_list = std::make_shared<FragmentedRangeTombstoneList>(
         std::move(unfragmented_range_del_iter), icmp_);
     std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
@@ -250,6 +265,7 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
       compaction_proxy_->is_allow_ingest_behind = AllowIngestBehind();
       compaction_proxy_->key_not_exists_beyond_output_level =
           key_not_exists_beyond_output_level;
+      compaction_proxy_->supports_per_key_placement = SupportsPerKeyPlacement();
       compaction.reset(compaction_proxy_);
     }
     bool use_snapshot_checker = UseSnapshotChecker() || GetParam();
@@ -271,12 +287,13 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
     iter_->SeekToFirst();
     c_iter_.reset(new CompactionIterator(
         iter_.get(), cmp_, merge_helper_.get(), last_sequence, &snapshots_,
-        earliest_write_conflict_snapshot, snapshot_checker_.get(),
-        Env::Default(), false /* report_detailed_time */, false,
-        range_del_agg_.get(), nullptr /* blob_file_builder */,
-        true /*allow_data_in_errors*/, std::move(compaction), filter,
-        &shutting_down_, /*preserve_deletes_seqnum=*/0,
-        /*manual_compaction_paused=*/nullptr, /*info_log=*/nullptr,
+        earliest_write_conflict_snapshot, kMaxSequenceNumber,
+        snapshot_checker_.get(), Env::Default(),
+        false /* report_detailed_time */, false, range_del_agg_.get(),
+        nullptr /* blob_file_builder */, true /*allow_data_in_errors*/,
+        true /*enforce_single_del_contracts*/,
+        /*manual_compaction_canceled=*/kManualCompactionCanceledFalse_,
+        std::move(compaction), filter, &shutting_down_, /*info_log=*/nullptr,
         full_history_ts_low));
   }
 
@@ -290,6 +307,8 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
 
   virtual bool AllowIngestBehind() const { return false; }
 
+  virtual bool SupportsPerKeyPlacement() const { return false; }
+
   void RunTest(
       const std::vector<std::string>& input_keys,
       const std::vector<std::string>& input_values,
@@ -308,7 +327,7 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
                   key_not_exists_beyond_output_level, full_history_ts_low);
     c_iter_->SeekToFirst();
     for (size_t i = 0; i < expected_keys.size(); i++) {
-      std::string info = "i = " + ToString(i);
+      std::string info = "i = " + std::to_string(i);
       ASSERT_TRUE(c_iter_->Valid()) << info;
       ASSERT_OK(c_iter_->status()) << info;
       ASSERT_EQ(expected_keys[i], c_iter_->key().ToString()) << info;
@@ -335,6 +354,7 @@ class CompactionIteratorTest : public testing::TestWithParam<bool> {
   std::unique_ptr<CompactionRangeDelAggregator> range_del_agg_;
   std::unique_ptr<SnapshotChecker> snapshot_checker_;
   std::atomic<bool> shutting_down_{false};
+  const std::atomic<bool> kManualCompactionCanceledFalse_{false};
   FakeCompaction* compaction_proxy_;
 };
 
@@ -705,7 +725,7 @@ TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
   RunTest({test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
           {"v1", "v2"},
           {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue)},
-          {"v1", "v2"}, kMaxSequenceNumber /*last_commited_seq*/,
+          {"v1", "v2"}, kMaxSequenceNumber /*last_committed_seq*/,
           nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
           true /*bottommost_level*/);
 }
@@ -714,15 +734,14 @@ TEST_P(CompactionIteratorTest, ZeroOutSequenceAtBottomLevel) {
 // permanently.
 TEST_P(CompactionIteratorTest, RemoveDeletionAtBottomLevel) {
   AddSnapshot(1);
-  RunTest({test::KeyStr("a", 1, kTypeDeletion),
-           test::KeyStr("b", 3, kTypeDeletion),
-           test::KeyStr("b", 1, kTypeValue)},
-          {"", "", ""},
-          {test::KeyStr("b", 3, kTypeDeletion),
-           test::KeyStr("b", 0, kTypeValue)},
-          {"", ""},
-          kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/,
-          nullptr /*compaction_filter*/, true /*bottommost_level*/);
+  RunTest(
+      {test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 3, kTypeDeletion),
+       test::KeyStr("b", 1, kTypeValue)},
+      {"", "", ""},
+      {test::KeyStr("b", 3, kTypeDeletion), test::KeyStr("b", 0, kTypeValue)},
+      {"", ""}, kMaxSequenceNumber /*last_committed_seq*/,
+      nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
+      true /*bottommost_level*/);
 }
 
 // In bottommost level, single deletions earlier than earliest snapshot can be
@@ -732,7 +751,7 @@ TEST_P(CompactionIteratorTest, RemoveSingleDeletionAtBottomLevel) {
   RunTest({test::KeyStr("a", 1, kTypeSingleDeletion),
            test::KeyStr("b", 2, kTypeSingleDeletion)},
           {"", ""}, {test::KeyStr("b", 2, kTypeSingleDeletion)}, {""},
-          kMaxSequenceNumber /*last_commited_seq*/, nullptr /*merge_operator*/,
+          kMaxSequenceNumber /*last_committed_seq*/, nullptr /*merge_operator*/,
           nullptr /*compaction_filter*/, true /*bottommost_level*/);
 }
 
@@ -751,6 +770,123 @@ TEST_P(CompactionIteratorTest, ConvertToPutAtBottom) {
 INSTANTIATE_TEST_CASE_P(CompactionIteratorTestInstance, CompactionIteratorTest,
                         testing::Values(true, false));
 
+class PerKeyPlacementCompIteratorTest : public CompactionIteratorTest {
+ public:
+  bool SupportsPerKeyPlacement() const override { return true; }
+};
+
+TEST_P(PerKeyPlacementCompIteratorTest, SplitLastLevelData) {
+  std::atomic_uint64_t latest_cold_seq = 0;
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "CompactionIterator::PrepareOutput.context", [&](void* arg) {
+        auto context = static_cast<PerKeyPlacementContext*>(arg);
+        context->output_to_penultimate_level =
+            context->seq_num > latest_cold_seq;
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  latest_cold_seq = 5;
+
+  InitIterators(
+      {test::KeyStr("a", 7, kTypeValue), test::KeyStr("b", 6, kTypeValue),
+       test::KeyStr("c", 5, kTypeValue)},
+      {"vala", "valb", "valc"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
+      nullptr, nullptr, true);
+  c_iter_->SeekToFirst();
+  ASSERT_TRUE(c_iter_->Valid());
+
+  // the first 2 keys are hot, which should has
+  // `output_to_penultimate_level()==true` and seq num not zeroed out
+  ASSERT_EQ(test::KeyStr("a", 7, kTypeValue), c_iter_->key().ToString());
+  ASSERT_TRUE(c_iter_->output_to_penultimate_level());
+  c_iter_->Next();
+  ASSERT_TRUE(c_iter_->Valid());
+  ASSERT_EQ(test::KeyStr("b", 6, kTypeValue), c_iter_->key().ToString());
+  ASSERT_TRUE(c_iter_->output_to_penultimate_level());
+  c_iter_->Next();
+  ASSERT_TRUE(c_iter_->Valid());
+  // `a` is cold data, which should be output to bottommost
+  ASSERT_EQ(test::KeyStr("c", 0, kTypeValue), c_iter_->key().ToString());
+  ASSERT_FALSE(c_iter_->output_to_penultimate_level());
+  c_iter_->Next();
+  ASSERT_OK(c_iter_->status());
+  ASSERT_FALSE(c_iter_->Valid());
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_P(PerKeyPlacementCompIteratorTest, SnapshotData) {
+  AddSnapshot(5);
+
+  InitIterators(
+      {test::KeyStr("a", 7, kTypeValue), test::KeyStr("b", 6, kTypeDeletion),
+       test::KeyStr("b", 5, kTypeValue)},
+      {"vala", "", "valb"}, {}, {}, kMaxSequenceNumber, kMaxSequenceNumber,
+      nullptr, nullptr, true);
+  c_iter_->SeekToFirst();
+  ASSERT_TRUE(c_iter_->Valid());
+
+  // The first key and the tombstone are within snapshot, which should output
+  // to the penultimate level (and seq num cannot be zeroed out).
+  ASSERT_EQ(test::KeyStr("a", 7, kTypeValue), c_iter_->key().ToString());
+  ASSERT_TRUE(c_iter_->output_to_penultimate_level());
+  c_iter_->Next();
+  ASSERT_TRUE(c_iter_->Valid());
+  ASSERT_EQ(test::KeyStr("b", 6, kTypeDeletion), c_iter_->key().ToString());
+  ASSERT_TRUE(c_iter_->output_to_penultimate_level());
+  c_iter_->Next();
+  ASSERT_TRUE(c_iter_->Valid());
+  // `a` is not protected by the snapshot, the sequence number is zero out and
+  // should output bottommost
+  ASSERT_EQ(test::KeyStr("b", 0, kTypeValue), c_iter_->key().ToString());
+  ASSERT_FALSE(c_iter_->output_to_penultimate_level());
+  c_iter_->Next();
+  ASSERT_OK(c_iter_->status());
+  ASSERT_FALSE(c_iter_->Valid());
+}
+
+TEST_P(PerKeyPlacementCompIteratorTest, ConflictWithSnapshot) {
+  std::atomic_uint64_t latest_cold_seq = 0;
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "CompactionIterator::PrepareOutput.context", [&](void* arg) {
+        auto context = static_cast<PerKeyPlacementContext*>(arg);
+        context->output_to_penultimate_level =
+            context->seq_num > latest_cold_seq;
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  latest_cold_seq = 6;
+
+  AddSnapshot(5);
+
+  InitIterators({test::KeyStr("a", 7, kTypeValue),
+                 test::KeyStr("unsafe_pb", 6, kTypeValue),
+                 test::KeyStr("c", 5, kTypeValue)},
+                {"vala", "valb", "valc"}, {}, {}, kMaxSequenceNumber,
+                kMaxSequenceNumber, nullptr, nullptr, true);
+  c_iter_->SeekToFirst();
+  ASSERT_TRUE(c_iter_->Valid());
+
+  ASSERT_EQ(test::KeyStr("a", 7, kTypeValue), c_iter_->key().ToString());
+  ASSERT_TRUE(c_iter_->output_to_penultimate_level());
+  // the 2nd key is unsafe to output_to_penultimate_level, but it's within
+  // snapshot so for per_key_placement feature it has to be outputted to the
+  // penultimate level. which is a corruption. We should never see
+  // such case as the data with seq num (within snapshot) should always come
+  // from higher compaction input level, which makes it safe to
+  // output_to_penultimate_level.
+  c_iter_->Next();
+  ASSERT_TRUE(c_iter_->status().IsCorruption());
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+INSTANTIATE_TEST_CASE_P(PerKeyPlacementCompIteratorTest,
+                        PerKeyPlacementCompIteratorTest,
+                        testing::Values(true, false));
+
 // Tests how CompactionIterator work together with SnapshotChecker.
 class CompactionIteratorWithSnapshotCheckerTest
     : public CompactionIteratorTest {
@@ -889,7 +1025,7 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest,
           {"v1", "v2", "v3"},
           {test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 2, kTypeValue),
            test::KeyStr("c", 3, kTypeValue)},
-          {"v1", "v2", "v3"}, kMaxSequenceNumber /*last_commited_seq*/,
+          {"v1", "v2", "v3"}, kMaxSequenceNumber /*last_committed_seq*/,
           nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
           true /*bottommost_level*/);
 }
@@ -900,25 +1036,22 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest,
   RunTest(
       {test::KeyStr("a", 1, kTypeDeletion), test::KeyStr("b", 2, kTypeDeletion),
        test::KeyStr("c", 3, kTypeDeletion)},
-      {"", "", ""},
-      {},
-      {"", ""}, kMaxSequenceNumber /*last_commited_seq*/,
+      {"", "", ""}, {}, {"", ""}, kMaxSequenceNumber /*last_committed_seq*/,
       nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
       true /*bottommost_level*/);
 }
 
 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
        NotRemoveDeletionIfValuePresentToEarlierSnapshot) {
-  AddSnapshot(2,1);
-  RunTest(
-      {test::KeyStr("a", 4, kTypeDeletion), test::KeyStr("a", 1, kTypeValue),
-          test::KeyStr("b", 3, kTypeValue)},
-      {"", "", ""},
-      {test::KeyStr("a", 4, kTypeDeletion), test::KeyStr("a", 0, kTypeValue),
-            test::KeyStr("b", 3, kTypeValue)},
-      {"", "", ""}, kMaxSequenceNumber /*last_commited_seq*/,
-      nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
-      true /*bottommost_level*/);
+  AddSnapshot(2, 1);
+  RunTest({test::KeyStr("a", 4, kTypeDeletion),
+           test::KeyStr("a", 1, kTypeValue), test::KeyStr("b", 3, kTypeValue)},
+          {"", "", ""},
+          {test::KeyStr("a", 4, kTypeDeletion),
+           test::KeyStr("a", 0, kTypeValue), test::KeyStr("b", 3, kTypeValue)},
+          {"", "", ""}, kMaxSequenceNumber /*last_committed_seq*/,
+          nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
+          true /*bottommost_level*/);
 }
 
 TEST_F(CompactionIteratorWithSnapshotCheckerTest,
@@ -930,7 +1063,7 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest,
           {"", "", ""},
           {test::KeyStr("b", 2, kTypeSingleDeletion),
            test::KeyStr("c", 3, kTypeSingleDeletion)},
-          {"", ""}, kMaxSequenceNumber /*last_commited_seq*/,
+          {"", ""}, kMaxSequenceNumber /*last_committed_seq*/,
           nullptr /*merge_operator*/, nullptr /*compaction_filter*/,
           true /*bottommost_level*/);
 }
@@ -964,9 +1097,39 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest,
           2 /*earliest_write_conflict_snapshot*/);
 }
 
+// Same as above but with a blob index. In addition to the value getting
+// trimmed, the type of the KV is changed to kTypeValue.
+TEST_F(CompactionIteratorWithSnapshotCheckerTest,
+       KeepSingleDeletionForWriteConflictChecking_BlobIndex) {
+  AddSnapshot(2, 0);
+  RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
+           test::KeyStr("a", 1, kTypeBlobIndex)},
+          {"", "fake_blob_index"},
+          {test::KeyStr("a", 2, kTypeSingleDeletion),
+           test::KeyStr("a", 1, kTypeValue)},
+          {"", ""}, 2 /*last_committed_seq*/, nullptr /*merge_operator*/,
+          nullptr /*compaction_filter*/, false /*bottommost_level*/,
+          2 /*earliest_write_conflict_snapshot*/);
+}
+
+// Same as above but with a wide-column entity. In addition to the value getting
+// trimmed, the type of the KV is changed to kTypeValue.
+TEST_F(CompactionIteratorWithSnapshotCheckerTest,
+       KeepSingleDeletionForWriteConflictChecking_WideColumnEntity) {
+  AddSnapshot(2, 0);
+  RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
+           test::KeyStr("a", 1, kTypeWideColumnEntity)},
+          {"", "fake_entity"},
+          {test::KeyStr("a", 2, kTypeSingleDeletion),
+           test::KeyStr("a", 1, kTypeValue)},
+          {"", ""}, 2 /* last_committed_seq */, nullptr /* merge_operator */,
+          nullptr /* compaction_filter */, false /* bottommost_level */,
+          2 /* earliest_write_conflict_snapshot */);
+}
+
 // Compaction filter should keep uncommitted key as-is, and
-//   * Convert the latest velue to deletion, and/or
-//   * if latest value is a merge, apply filter to all suequent merges.
+//   * Convert the latest value to deletion, and/or
+//   * if latest value is a merge, apply filter to all subsequent merges.
 
 TEST_F(CompactionIteratorWithSnapshotCheckerTest, CompactionFilter_Value) {
   std::unique_ptr<CompactionFilter> compaction_filter(
@@ -1061,7 +1224,7 @@ INSTANTIATE_TEST_CASE_P(CompactionIteratorWithAllowIngestBehindTestInstance,
 class CompactionIteratorTsGcTest : public CompactionIteratorTest {
  public:
   CompactionIteratorTsGcTest()
-      : CompactionIteratorTest(test::ComparatorWithU64Ts()) {}
+      : CompactionIteratorTest(test::BytewiseComparatorWithU64TsWrapper()) {}
 };
 
 TEST_P(CompactionIteratorTsGcTest, NoKeyEligibleForGC) {
@@ -1092,6 +1255,31 @@ TEST_P(CompactionIteratorTsGcTest, NoKeyEligibleForGC) {
   }
 }
 
+TEST_P(CompactionIteratorTsGcTest, NoMergeEligibleForGc) {
+  constexpr char user_key[] = "a";
+  const std::vector<std::string> input_keys = {
+      test::KeyStr(10002, user_key, 102, kTypeMerge),
+      test::KeyStr(10001, user_key, 101, kTypeMerge),
+      test::KeyStr(10000, user_key, 100, kTypeValue)};
+  const std::vector<std::string> input_values = {"2", "1", "a0"};
+  std::shared_ptr<MergeOperator> merge_op =
+      MergeOperators::CreateStringAppendTESTOperator();
+  const std::vector<std::string>& expected_keys = input_keys;
+  const std::vector<std::string>& expected_values = input_values;
+  const std::vector<std::pair<bool, bool>> params = {
+      {false, false}, {false, true}, {true, true}};
+  for (const auto& param : params) {
+    const bool bottommost_level = param.first;
+    const bool key_not_exists_beyond_output_level = param.second;
+    RunTest(input_keys, input_values, expected_keys, expected_values,
+            /*last_committed_seq=*/kMaxSequenceNumber, merge_op.get(),
+            /*compaction_filter=*/nullptr, bottommost_level,
+            /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
+            key_not_exists_beyond_output_level,
+            /*full_history_ts_low=*/nullptr);
+  }
+}
+
 TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) {
   constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
   const std::vector<std::string> input_keys = {
@@ -1145,6 +1333,91 @@ TEST_P(CompactionIteratorTsGcTest, AllKeysOlderThanThreshold) {
   }
 }
 
+TEST_P(CompactionIteratorTsGcTest, SomeMergesOlderThanThreshold) {
+  constexpr char user_key[][2] = {"a", "f"};
+  const std::vector<std::string> input_keys = {
+      test::KeyStr(/*ts=*/25000, user_key[0], /*seq=*/2500, kTypeMerge),
+      test::KeyStr(/*ts=*/19000, user_key[0], /*seq=*/2300, kTypeMerge),
+      test::KeyStr(/*ts=*/18000, user_key[0], /*seq=*/1800, kTypeMerge),
+      test::KeyStr(/*ts=*/16000, user_key[0], /*seq=*/1600, kTypeValue),
+      test::KeyStr(/*ts=*/19000, user_key[1], /*seq=*/2000, kTypeMerge),
+      test::KeyStr(/*ts=*/17000, user_key[1], /*seq=*/1700, kTypeMerge),
+      test::KeyStr(/*ts=*/15000, user_key[1], /*seq=*/1600,
+                   kTypeDeletionWithTimestamp)};
+  const std::vector<std::string> input_values = {"25", "19", "18", "16",
+                                                 "19", "17", ""};
+  std::shared_ptr<MergeOperator> merge_op =
+      MergeOperators::CreateStringAppendTESTOperator();
+  std::string full_history_ts_low;
+  PutFixed64(&full_history_ts_low, 20000);
+
+  const std::vector<std::pair<bool, bool>> params = {
+      {false, false}, {false, true}, {true, true}};
+
+  {
+    AddSnapshot(1600);
+    AddSnapshot(1900);
+    const std::vector<std::string> expected_keys = {
+        test::KeyStr(/*ts=*/25000, user_key[0], /*seq=*/2500, kTypeMerge),
+        test::KeyStr(/*ts=*/19000, user_key[0], /*seq=*/2300, kTypeMerge),
+        test::KeyStr(/*ts=*/18000, user_key[0], /*seq=*/1800, kTypeMerge),
+        test::KeyStr(/*ts=*/16000, user_key[0], /*seq=*/1600, kTypeValue),
+        test::KeyStr(/*ts=*/19000, user_key[1], /*seq=*/2000, kTypeMerge),
+        test::KeyStr(/*ts=*/17000, user_key[1], /*seq=*/1700, kTypeMerge),
+        test::KeyStr(/*ts=*/15000, user_key[1], /*seq=*/1600,
+                     kTypeDeletionWithTimestamp)};
+    const std::vector<std::string> expected_values = {"25", "19", "18", "16",
+                                                      "19", "17", ""};
+    for (const auto& param : params) {
+      const bool bottommost_level = param.first;
+      const bool key_not_exists_beyond_output_level = param.second;
+      auto expected_keys_copy = expected_keys;
+      auto expected_values_copy = expected_values;
+      if (bottommost_level || key_not_exists_beyond_output_level) {
+        // the kTypeDeletionWithTimestamp will be dropped
+        expected_keys_copy.pop_back();
+        expected_values_copy.pop_back();
+        if (bottommost_level) {
+          // seq zero
+          expected_keys_copy[3] =
+              test::KeyStr(/*ts=*/0, user_key[0], /*seq=*/0, kTypeValue);
+        }
+      }
+      RunTest(input_keys, input_values, expected_keys_copy,
+              expected_values_copy,
+              /*last_committed_seq=*/kMaxSequenceNumber, merge_op.get(),
+              /*compaction_filter=*/nullptr, bottommost_level,
+              /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
+              key_not_exists_beyond_output_level, &full_history_ts_low);
+    }
+    ClearSnapshots();
+  }
+
+  // No snapshots
+  {
+    const std::vector<std::string> expected_keys = {
+        test::KeyStr(/*ts=*/25000, user_key[0], /*seq=*/2500, kTypeValue),
+        test::KeyStr(/*ts=*/19000, user_key[1], /*seq=*/2000, kTypeValue)};
+    const std::vector<std::string> expected_values = {"16,18,19,25", "17,19"};
+    for (const auto& param : params) {
+      const bool bottommost_level = param.first;
+      const bool key_not_exists_beyond_output_level = param.second;
+      auto expected_keys_copy = expected_keys;
+      auto expected_values_copy = expected_values;
+      if (bottommost_level) {
+        expected_keys_copy[1] =
+            test::KeyStr(/*ts=*/0, user_key[1], /*seq=*/0, kTypeValue);
+      }
+      RunTest(input_keys, input_values, expected_keys_copy,
+              expected_values_copy,
+              /*last_committed_seq=*/kMaxSequenceNumber, merge_op.get(),
+              /*compaction_filter=*/nullptr, bottommost_level,
+              /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
+              key_not_exists_beyond_output_level, &full_history_ts_low);
+    }
+  }
+}
+
 TEST_P(CompactionIteratorTsGcTest, NewHidesOldSameSnapshot) {
   constexpr char user_key[] = "a";
   const std::vector<std::string> input_keys = {
@@ -1157,9 +1430,10 @@ TEST_P(CompactionIteratorTsGcTest, NewHidesOldSameSnapshot) {
     std::string full_history_ts_low;
     // Keys whose timestamps larger than or equal to 102 will be preserved.
     PutFixed64(&full_history_ts_low, 102);
-    const std::vector<std::string> expected_keys = {input_keys[0],
-                                                    input_keys[1]};
-    const std::vector<std::string> expected_values = {"", "a2"};
+    const std::vector<std::string> expected_keys = {
+        input_keys[0], input_keys[1], input_keys[2]};
+    const std::vector<std::string> expected_values = {"", input_values[1],
+                                                      input_values[2]};
     RunTest(input_keys, input_values, expected_keys, expected_values,
             /*last_committed_seq=*/kMaxSequenceNumber,
             /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
@@ -1236,6 +1510,101 @@ TEST_P(CompactionIteratorTsGcTest, RewriteTs) {
   }
 }
 
+TEST_P(CompactionIteratorTsGcTest, SingleDeleteNoKeyEligibleForGC) {
+  constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
+  const std::vector<std::string> input_keys = {
+      test::KeyStr(/*ts=*/104, user_key[0], /*seq=*/4, kTypeSingleDeletion),
+      test::KeyStr(/*ts=*/103, user_key[0], /*seq=*/3, kTypeValue),
+      test::KeyStr(/*ts=*/102, user_key[1], /*seq=*/2, kTypeValue)};
+  const std::vector<std::string> input_values = {"", "a3", "b2"};
+  std::string full_history_ts_low;
+  // All keys' timestamps are newer than or equal to 102, thus none of them
+  // will be eligible for GC.
+  PutFixed64(&full_history_ts_low, 102);
+  const std::vector<std::string>& expected_keys = input_keys;
+  const std::vector<std::string>& expected_values = input_values;
+  const std::vector<std::pair<bool, bool>> params = {
+      {false, false}, {false, true}, {true, true}};
+  for (const std::pair<bool, bool>& param : params) {
+    const bool bottommost_level = param.first;
+    const bool key_not_exists_beyond_output_level = param.second;
+    RunTest(input_keys, input_values, expected_keys, expected_values,
+            /*last_committed_seq=*/kMaxSequenceNumber,
+            /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
+            bottommost_level,
+            /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
+            key_not_exists_beyond_output_level, &full_history_ts_low);
+  }
+}
+
+TEST_P(CompactionIteratorTsGcTest, SingleDeleteDropTombstones) {
+  constexpr char user_key[] = "a";
+  const std::vector<std::string> input_keys = {
+      test::KeyStr(/*ts=*/103, user_key, /*seq=*/4, kTypeSingleDeletion),
+      test::KeyStr(/*ts=*/102, user_key, /*seq=*/3, kTypeValue),
+      test::KeyStr(/*ts=*/101, user_key, /*seq=*/2, kTypeSingleDeletion),
+      test::KeyStr(/*ts=*/100, user_key, /*seq=*/1, kTypeValue)};
+  const std::vector<std::string> input_values = {"", "a2", "", "a0"};
+  const std::vector<std::string> expected_keys = {input_keys[0], input_keys[1]};
+  const std::vector<std::string> expected_values = {"", "a2"};
+
+  // Take a snapshot at seq 2.
+  AddSnapshot(2);
+  {
+    const std::vector<std::pair<bool, bool>> params = {
+        {false, false}, {false, true}, {true, true}};
+    for (const std::pair<bool, bool>& param : params) {
+      const bool bottommost_level = param.first;
+      const bool key_not_exists_beyond_output_level = param.second;
+      std::string full_history_ts_low;
+      PutFixed64(&full_history_ts_low, 102);
+      RunTest(input_keys, input_values, expected_keys, expected_values,
+              /*last_committed_seq=*/kMaxSequenceNumber,
+              /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
+              bottommost_level,
+              /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
+              key_not_exists_beyond_output_level, &full_history_ts_low);
+    }
+  }
+}
+
+TEST_P(CompactionIteratorTsGcTest, SingleDeleteAllKeysOlderThanThreshold) {
+  constexpr char user_key[][2] = {{'a', '\0'}, {'b', '\0'}};
+  const std::vector<std::string> input_keys = {
+      test::KeyStr(/*ts=*/103, user_key[0], /*seq=*/4, kTypeSingleDeletion),
+      test::KeyStr(/*ts=*/102, user_key[0], /*seq=*/3, kTypeValue),
+      test::KeyStr(/*ts=*/104, user_key[1], /*seq=*/5, kTypeValue)};
+  const std::vector<std::string> input_values = {"", "a2", "b5"};
+  std::string full_history_ts_low;
+  PutFixed64(&full_history_ts_low, std::numeric_limits<uint64_t>::max());
+  {
+    // With a snapshot at seq 3, both the deletion marker and the key at 3 must
+    // be preserved.
+    AddSnapshot(3);
+    const std::vector<std::string> expected_keys = {
+        input_keys[0], input_keys[1], input_keys[2]};
+    const std::vector<std::string> expected_values = {"", "a2", "b5"};
+    RunTest(input_keys, input_values, expected_keys, expected_values,
+            /*last_committed_seq=*/kMaxSequenceNumber,
+            /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
+            /*bottommost_level=*/false,
+            /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
+            /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
+    ClearSnapshots();
+  }
+  {
+    // No snapshot.
+    const std::vector<std::string> expected_keys = {input_keys[2]};
+    const std::vector<std::string> expected_values = {"b5"};
+    RunTest(input_keys, input_values, expected_keys, expected_values,
+            /*last_committed_seq=*/kMaxSequenceNumber,
+            /*merge_operator=*/nullptr, /*compaction_filter=*/nullptr,
+            /*bottommost_level=*/false,
+            /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
+            /*key_not_exists_beyond_output_level=*/false, &full_history_ts_low);
+  }
+}
+
 INSTANTIATE_TEST_CASE_P(CompactionIteratorTsGcTestInstance,
                         CompactionIteratorTsGcTest,
                         testing::Values(true, false));
@@ -1243,6 +1612,7 @@ INSTANTIATE_TEST_CASE_P(CompactionIteratorTsGcTestInstance,
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
+  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();
 }