]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/memtable_list_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / memtable_list_test.cc
index cc6e566ad5733d81b3fed1c28b7fd2ebbbbefbb9..8242061afb0d874ae7a644a93a954d77daa4c815 100644 (file)
@@ -4,9 +4,11 @@
 //  (found in the LICENSE.Apache file in the root directory).
 
 #include "db/memtable_list.h"
+
 #include <algorithm>
 #include <string>
 #include <vector>
+
 #include "db/merge_context.h"
 #include "db/version_set.h"
 #include "db/write_controller.h"
@@ -30,14 +32,14 @@ class MemTableListTest : public testing::Test {
   MemTableListTest() : db(nullptr), file_number(1) {
     dbname = test::PerThreadDBPath("memtable_list_test");
     options.create_if_missing = true;
-    DestroyDB(dbname, options);
+    EXPECT_OK(DestroyDB(dbname, options));
   }
 
   // Create a test db if not yet created
   void CreateDB() {
     if (db == nullptr) {
       options.create_if_missing = true;
-      DestroyDB(dbname, options);
+      EXPECT_OK(DestroyDB(dbname, options));
       // Open DB only with default column family
       ColumnFamilyOptions cf_options;
       std::vector<ColumnFamilyDescriptor> cf_descs;
@@ -78,7 +80,7 @@ class MemTableListTest : public testing::Test {
       handles.clear();
       delete db;
       db = nullptr;
-      DestroyDB(dbname, options, cf_descs);
+      EXPECT_OK(DestroyDB(dbname, options, cf_descs));
     }
   }
 
@@ -103,7 +105,8 @@ class MemTableListTest : public testing::Test {
     VersionSet versions(dbname, &immutable_db_options, env_options,
                         table_cache.get(), &write_buffer_manager,
                         &write_controller, /*block_cache_tracer=*/nullptr,
-                        /*io_tracer=*/nullptr);
+                        /*io_tracer=*/nullptr, /*db_id*/ "",
+                        /*db_session_id*/ "");
     std::vector<ColumnFamilyDescriptor> cf_descs;
     cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
     cf_descs.emplace_back("one", ColumnFamilyOptions());
@@ -124,7 +127,7 @@ class MemTableListTest : public testing::Test {
     std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
     Status s = list->TryInstallMemtableFlushResults(
         cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
-        file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info, &io_s);
+        file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info);
     EXPECT_OK(io_s);
     return s;
   }
@@ -153,7 +156,8 @@ class MemTableListTest : public testing::Test {
     VersionSet versions(dbname, &immutable_db_options, env_options,
                         table_cache.get(), &write_buffer_manager,
                         &write_controller, /*block_cache_tracer=*/nullptr,
-                        /*io_tracer=*/nullptr);
+                        /*io_tracer=*/nullptr, /*db_id*/ "",
+                        /*db_session_id*/ "");
     std::vector<ColumnFamilyDescriptor> cf_descs;
     cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
     cf_descs.emplace_back("one", ColumnFamilyOptions());
@@ -182,11 +186,21 @@ class MemTableListTest : public testing::Test {
     for (auto& meta : file_metas) {
       file_meta_ptrs.push_back(&meta);
     }
+    std::vector<std::list<std::unique_ptr<FlushJobInfo>>>
+        committed_flush_jobs_info_storage(cf_ids.size());
+    autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
+        committed_flush_jobs_info;
+    for (int i = 0; i < static_cast<int>(cf_ids.size()); ++i) {
+      committed_flush_jobs_info.push_back(
+          &committed_flush_jobs_info_storage[i]);
+    }
+
     InstrumentedMutex mutex;
     InstrumentedMutexLock l(&mutex);
     return InstallMemtableAtomicFlushResults(
-        &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
-        file_meta_ptrs, to_delete, nullptr, &log_buffer);
+        &lists, cfds, mutable_cf_options_list, mems_list, &versions,
+        nullptr /* prep_tracker */, &mutex, file_meta_ptrs,
+        committed_flush_jobs_info, to_delete, nullptr, &log_buffer);
   }
 };
 
@@ -199,7 +213,8 @@ TEST_F(MemTableListTest, Empty) {
   ASSERT_FALSE(list.IsFlushPending());
 
   autovector<MemTable*> mems;
-  list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems);
+  list.PickMemtablesToFlush(
+      std::numeric_limits<uint64_t>::max() /* memtable_id */, &mems);
   ASSERT_EQ(0, mems.size());
 
   autovector<MemTable*> to_delete;
@@ -225,16 +240,16 @@ TEST_F(MemTableListTest, GetTest) {
   autovector<MemTable*> to_delete;
 
   LookupKey lkey("key1", seq);
-  bool found = list.current()->Get(
-      lkey, &value, /*timestamp*/nullptr, &s, &merge_context,
-      &max_covering_tombstone_seq, ReadOptions());
+  bool found = list.current()->Get(lkey, &value, /*columns=*/nullptr,
+                                   /*timestamp=*/nullptr, &s, &merge_context,
+                                   &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   // Create a MemTable
   InternalKeyComparator cmp(BytewiseComparator());
   auto factory = std::make_shared<SkipListFactory>();
   options.memtable_factory = factory;
-  ImmutableCFOptions ioptions(options);
+  ImmutableOptions ioptions(options);
 
   WriteBufferManager wb(options.db_write_buffer_size);
   MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
@@ -242,30 +257,37 @@ TEST_F(MemTableListTest, GetTest) {
   mem->Ref();
 
   // Write some keys to this memtable.
-  mem->Add(++seq, kTypeDeletion, "key1", "");
-  mem->Add(++seq, kTypeValue, "key2", "value2");
-  mem->Add(++seq, kTypeValue, "key1", "value1");
-  mem->Add(++seq, kTypeValue, "key2", "value2.2");
+  ASSERT_OK(
+      mem->Add(++seq, kTypeDeletion, "key1", "", nullptr /* kv_prot_info */));
+  ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2",
+                     nullptr /* kv_prot_info */));
+  ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", "value1",
+                     nullptr /* kv_prot_info */));
+  ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2.2",
+                     nullptr /* kv_prot_info */));
 
   // Fetch the newly written keys
   merge_context.Clear();
-  found = mem->Get(LookupKey("key1", seq), &value,
-                   /*timestamp*/nullptr, &s, &merge_context,
-                   &max_covering_tombstone_seq, ReadOptions());
+  found = mem->Get(LookupKey("key1", seq), &value, /*columns*/ nullptr,
+                   /*timestamp*/ nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions(),
+                   false /* immutable_memtable */);
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ(value, "value1");
 
   merge_context.Clear();
-  found = mem->Get(LookupKey("key1", 2), &value,
-                   /*timestamp*/nullptr, &s, &merge_context,
-                   &max_covering_tombstone_seq, ReadOptions());
+  found = mem->Get(LookupKey("key1", 2), &value, /*columns*/ nullptr,
+                   /*timestamp*/ nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions(),
+                   false /* immutable_memtable */);
   // MemTable found out that this key is *not* found (at this sequence#)
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
-  found = mem->Get(LookupKey("key2", seq), &value,
-                   /*timestamp*/nullptr, &s, &merge_context,
-                   &max_covering_tombstone_seq, ReadOptions());
+  found = mem->Get(LookupKey("key2", seq), &value, /*columns*/ nullptr,
+                   /*timestamp*/ nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions(),
+                   false /* immutable_memtable */);
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ(value, "value2.2");
 
@@ -273,6 +295,9 @@ TEST_F(MemTableListTest, GetTest) {
   ASSERT_EQ(1, mem->num_deletes());
 
   // Add memtable to list
+  // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+  // in MemTableListVersion::GetFromList work.
+  mem->ConstructFragmentedRangeTombstones();
   list.Add(mem, &to_delete);
 
   SequenceNumber saved_seq = seq;
@@ -283,37 +308,45 @@ TEST_F(MemTableListTest, GetTest) {
                                 kMaxSequenceNumber, 0 /* column_family_id */);
   mem2->Ref();
 
-  mem2->Add(++seq, kTypeDeletion, "key1", "");
-  mem2->Add(++seq, kTypeValue, "key2", "value2.3");
+  ASSERT_OK(
+      mem2->Add(++seq, kTypeDeletion, "key1", "", nullptr /* kv_prot_info */));
+  ASSERT_OK(mem2->Add(++seq, kTypeValue, "key2", "value2.3",
+                      nullptr /* kv_prot_info */));
 
   // Add second memtable to list
+  // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+  // in MemTableListVersion::GetFromList work.
+  mem2->ConstructFragmentedRangeTombstones();
   list.Add(mem2, &to_delete);
 
   // Fetch keys via MemTableList
   merge_context.Clear();
-  found = list.current()->Get(
-      LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s,
-      &merge_context, &max_covering_tombstone_seq, ReadOptions());
+  found =
+      list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
+                          /*timestamp=*/nullptr, &s, &merge_context,
+                          &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
-  found = list.current()->Get(
-      LookupKey("key1", saved_seq), &value, /*timestamp*/nullptr,
-      &s, &merge_context, &max_covering_tombstone_seq, ReadOptions());
+  found = list.current()->Get(LookupKey("key1", saved_seq), &value,
+                              /*columns=*/nullptr, /*timestamp=*/nullptr, &s,
+                              &merge_context, &max_covering_tombstone_seq,
+                              ReadOptions());
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ("value1", value);
 
   merge_context.Clear();
-  found = list.current()->Get(
-      LookupKey("key2", seq), &value, /*timestamp*/nullptr, &s,
-      &merge_context, &max_covering_tombstone_seq, ReadOptions());
+  found =
+      list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
+                          /*timestamp=*/nullptr, &s, &merge_context,
+                          &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ(value, "value2.3");
 
   merge_context.Clear();
-  found = list.current()->Get(
-      LookupKey("key2", 1), &value, /*timestamp*/nullptr, &s,
-      &merge_context, &max_covering_tombstone_seq, ReadOptions());
+  found = list.current()->Get(LookupKey("key2", 1), &value, /*columns=*/nullptr,
+                              /*timestamp=*/nullptr, &s, &merge_context,
+                              &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   ASSERT_EQ(2, list.NumNotFlushed());
@@ -328,7 +361,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
   // Create MemTableList
   int min_write_buffer_number_to_merge = 2;
   int max_write_buffer_number_to_maintain = 2;
-  int64_t max_write_buffer_size_to_maintain = 2000;
+  int64_t max_write_buffer_size_to_maintain = 2 * Arena::kInlineSize;
   MemTableList list(min_write_buffer_number_to_merge,
                     max_write_buffer_number_to_maintain,
                     max_write_buffer_size_to_maintain);
@@ -342,16 +375,16 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
   autovector<MemTable*> to_delete;
 
   LookupKey lkey("key1", seq);
-  bool found = list.current()->Get(
-      lkey, &value, /*timestamp*/nullptr, &s, &merge_context,
-      &max_covering_tombstone_seq, ReadOptions());
+  bool found = list.current()->Get(lkey, &value, /*columns=*/nullptr,
+                                   /*timestamp=*/nullptr, &s, &merge_context,
+                                   &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   // Create a MemTable
   InternalKeyComparator cmp(BytewiseComparator());
   auto factory = std::make_shared<SkipListFactory>();
   options.memtable_factory = factory;
-  ImmutableCFOptions ioptions(options);
+  ImmutableOptions ioptions(options);
 
   WriteBufferManager wb(options.db_write_buffer_size);
   MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
@@ -359,47 +392,58 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
   mem->Ref();
 
   // Write some keys to this memtable.
-  mem->Add(++seq, kTypeDeletion, "key1", "");
-  mem->Add(++seq, kTypeValue, "key2", "value2");
-  mem->Add(++seq, kTypeValue, "key2", "value2.2");
+  ASSERT_OK(
+      mem->Add(++seq, kTypeDeletion, "key1", "", nullptr /* kv_prot_info */));
+  ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2",
+                     nullptr /* kv_prot_info */));
+  ASSERT_OK(mem->Add(++seq, kTypeValue, "key2", "value2.2",
+                     nullptr /* kv_prot_info */));
 
   // Fetch the newly written keys
   merge_context.Clear();
-  found = mem->Get(LookupKey("key1", seq), &value,
-                   /*timestamp*/nullptr, &s, &merge_context,
-                   &max_covering_tombstone_seq, ReadOptions());
+  found = mem->Get(LookupKey("key1", seq), &value, /*columns*/ nullptr,
+                   /*timestamp*/ nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions(),
+                   false /* immutable_memtable */);
   // MemTable found out that this key is *not* found (at this sequence#)
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
-  found = mem->Get(LookupKey("key2", seq), &value,
-                   /*timestamp*/nullptr, &s, &merge_context,
-                   &max_covering_tombstone_seq, ReadOptions());
+  found = mem->Get(LookupKey("key2", seq), &value, /*columns*/ nullptr,
+                   /*timestamp*/ nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions(),
+                   false /* immutable_memtable */);
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ(value, "value2.2");
 
   // Add memtable to list
+  // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+  // in MemTableListVersion::GetFromList work.
+  mem->ConstructFragmentedRangeTombstones();
   list.Add(mem, &to_delete);
   ASSERT_EQ(0, to_delete.size());
 
   // Fetch keys via MemTableList
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key1", seq), &value,
-                              /*timestamp*/nullptr, &s, &merge_context,
-                              &max_covering_tombstone_seq, ReadOptions());
+  found =
+      list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
+                          /*timestamp=*/nullptr, &s, &merge_context,
+                          &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key2", seq), &value,
-                              /*timestamp*/nullptr, &s, &merge_context,
-                              &max_covering_tombstone_seq, ReadOptions());
+  found =
+      list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
+                          /*timestamp=*/nullptr, &s, &merge_context,
+                          &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ("value2.2", value);
 
   // Flush this memtable from the list.
   // (It will then be a part of the memtable history).
   autovector<MemTable*> to_flush;
-  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
+  list.PickMemtablesToFlush(
+      std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
   ASSERT_EQ(1, to_flush.size());
 
   MutableCFOptions mutable_cf_options(options);
@@ -412,28 +456,32 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
 
   // Verify keys are no longer in MemTableList
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key1", seq), &value,
-                              /*timestamp*/nullptr, &s, &merge_context,
-                              &max_covering_tombstone_seq, ReadOptions());
+  found =
+      list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
+                          /*timestamp=*/nullptr, &s, &merge_context,
+                          &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key2", seq), &value,
-                              /*timestamp*/nullptr, &s, &merge_context,
-                              &max_covering_tombstone_seq, ReadOptions());
+  found =
+      list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
+                          /*timestamp=*/nullptr, &s, &merge_context,
+                          &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   // Verify keys are present in history
   merge_context.Clear();
   found = list.current()->GetFromHistory(
-      LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
-      &max_covering_tombstone_seq, ReadOptions());
+      LookupKey("key1", seq), &value, /*columns=*/nullptr,
+      /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
+      ReadOptions());
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
   found = list.current()->GetFromHistory(
-      LookupKey("key2", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
-      &max_covering_tombstone_seq, ReadOptions());
+      LookupKey("key2", seq), &value, /*columns=*/nullptr,
+      /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
+      ReadOptions());
   ASSERT_TRUE(found);
   ASSERT_EQ("value2.2", value);
 
@@ -443,15 +491,21 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
                                 kMaxSequenceNumber, 0 /* column_family_id */);
   mem2->Ref();
 
-  mem2->Add(++seq, kTypeDeletion, "key1", "");
-  mem2->Add(++seq, kTypeValue, "key3", "value3");
+  ASSERT_OK(
+      mem2->Add(++seq, kTypeDeletion, "key1", "", nullptr /* kv_prot_info */));
+  ASSERT_OK(mem2->Add(++seq, kTypeValue, "key3", "value3",
+                      nullptr /* kv_prot_info */));
 
   // Add second memtable to list
+  // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+  // in MemTableListVersion::GetFromList work.
+  mem2->ConstructFragmentedRangeTombstones();
   list.Add(mem2, &to_delete);
   ASSERT_EQ(0, to_delete.size());
 
   to_flush.clear();
-  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
+  list.PickMemtablesToFlush(
+      std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
   ASSERT_EQ(1, to_flush.size());
 
   // Flush second memtable
@@ -467,6 +521,9 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
   MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
                                 kMaxSequenceNumber, 0 /* column_family_id */);
   mem3->Ref();
+  // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed())
+  // in MemTableListVersion::GetFromList work.
+  mem3->ConstructFragmentedRangeTombstones();
   list.Add(mem3, &to_delete);
   ASSERT_EQ(1, list.NumNotFlushed());
   ASSERT_EQ(1, list.NumFlushed());
@@ -474,42 +531,48 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
 
   // Verify keys are no longer in MemTableList
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key1", seq), &value,
-                              /*timestamp*/nullptr, &s, &merge_context,
-                              &max_covering_tombstone_seq, ReadOptions());
+  found =
+      list.current()->Get(LookupKey("key1", seq), &value, /*columns=*/nullptr,
+                          /*timestamp=*/nullptr, &s, &merge_context,
+                          &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key2", seq), &value,
-                              /*timestamp*/nullptr, &s, &merge_context,
-                              &max_covering_tombstone_seq, ReadOptions());
+  found =
+      list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
+                          /*timestamp=*/nullptr, &s, &merge_context,
+                          &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key3", seq), &value,
-                              /*timestamp*/nullptr, &s, &merge_context,
-                              &max_covering_tombstone_seq, ReadOptions());
+  found =
+      list.current()->Get(LookupKey("key3", seq), &value, /*columns=*/nullptr,
+                          /*timestamp=*/nullptr, &s, &merge_context,
+                          &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   // Verify that the second memtable's keys are in the history
   merge_context.Clear();
   found = list.current()->GetFromHistory(
-      LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
-      &max_covering_tombstone_seq, ReadOptions());
+      LookupKey("key1", seq), &value, /*columns=*/nullptr,
+      /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
+      ReadOptions());
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
   found = list.current()->GetFromHistory(
-      LookupKey("key3", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
-      &max_covering_tombstone_seq, ReadOptions());
+      LookupKey("key3", seq), &value, /*columns=*/nullptr,
+      /*timestamp=*/nullptr, &s, &merge_context, &max_covering_tombstone_seq,
+      ReadOptions());
   ASSERT_TRUE(found);
   ASSERT_EQ("value3", value);
 
   // Verify that key2 from the first memtable is no longer in the history
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key2", seq), &value,
-                              /*timestamp*/nullptr, &s, &merge_context,
-                              &max_covering_tombstone_seq, ReadOptions());
+  found =
+      list.current()->Get(LookupKey("key2", seq), &value, /*columns=*/nullptr,
+                          /*timestamp=*/nullptr, &s, &merge_context,
+                          &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   // Cleanup
@@ -527,7 +590,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
 
   auto factory = std::make_shared<SkipListFactory>();
   options.memtable_factory = factory;
-  ImmutableCFOptions ioptions(options);
+  ImmutableOptions ioptions(options);
   InternalKeyComparator cmp(BytewiseComparator());
   WriteBufferManager wb(options.db_write_buffer_size);
   autovector<MemTable*> to_delete;
@@ -554,11 +617,16 @@ TEST_F(MemTableListTest, FlushPendingTest) {
     std::string value;
     MergeContext merge_context;
 
-    mem->Add(++seq, kTypeValue, "key1", ToString(i));
-    mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
-    mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
-    mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
-    mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
+    ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", std::to_string(i),
+                       nullptr /* kv_prot_info */));
+    ASSERT_OK(mem->Add(++seq, kTypeValue, "keyN" + std::to_string(i), "valueN",
+                       nullptr /* kv_prot_info */));
+    ASSERT_OK(mem->Add(++seq, kTypeValue, "keyX" + std::to_string(i), "value",
+                       nullptr /* kv_prot_info */));
+    ASSERT_OK(mem->Add(++seq, kTypeValue, "keyM" + std::to_string(i), "valueM",
+                       nullptr /* kv_prot_info */));
+    ASSERT_OK(mem->Add(++seq, kTypeDeletion, "keyX" + std::to_string(i), "",
+                       nullptr /* kv_prot_info */));
 
     tables.push_back(mem);
   }
@@ -567,7 +635,8 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   ASSERT_FALSE(list.IsFlushPending());
   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
   autovector<MemTable*> to_flush;
-  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
+  list.PickMemtablesToFlush(
+      std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
   ASSERT_EQ(0, to_flush.size());
 
   // Request a flush even though there is nothing to flush
@@ -576,7 +645,8 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
 
   // Attempt to 'flush' to clear request for flush
-  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
+  list.PickMemtablesToFlush(
+      std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
   ASSERT_EQ(0, to_flush.size());
   ASSERT_FALSE(list.IsFlushPending());
   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
@@ -600,7 +670,8 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
 
   // Pick tables to flush
-  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
+  list.PickMemtablesToFlush(
+      std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
   ASSERT_EQ(2, to_flush.size());
   ASSERT_EQ(2, list.NumNotFlushed());
   ASSERT_FALSE(list.IsFlushPending());
@@ -621,7 +692,8 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   ASSERT_EQ(0, to_delete.size());
 
   // Pick tables to flush
-  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
+  list.PickMemtablesToFlush(
+      std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
   ASSERT_EQ(3, to_flush.size());
   ASSERT_EQ(3, list.NumNotFlushed());
   ASSERT_FALSE(list.IsFlushPending());
@@ -629,7 +701,8 @@ TEST_F(MemTableListTest, FlushPendingTest) {
 
   // Pick tables to flush again
   autovector<MemTable*> to_flush2;
-  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
+  list.PickMemtablesToFlush(
+      std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush2);
   ASSERT_EQ(0, to_flush2.size());
   ASSERT_EQ(3, list.NumNotFlushed());
   ASSERT_FALSE(list.IsFlushPending());
@@ -647,7 +720,8 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
 
   // Pick tables to flush again
-  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
+  list.PickMemtablesToFlush(
+      std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush2);
   ASSERT_EQ(1, to_flush2.size());
   ASSERT_EQ(4, list.NumNotFlushed());
   ASSERT_FALSE(list.IsFlushPending());
@@ -668,28 +742,42 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   ASSERT_EQ(0, to_delete.size());
 
   // Pick tables to flush
-  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
-  // Should pick 4 of 5 since 1 table has been picked in to_flush2
-  ASSERT_EQ(4, to_flush.size());
+  list.PickMemtablesToFlush(
+      std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush);
+  // Picks three oldest memtables. The fourth oldest is picked in `to_flush2` so
+  // must be excluded. The newest (fifth oldest) is non-consecutive with the
+  // three oldest due to omitting the fourth oldest so must not be picked.
+  ASSERT_EQ(3, to_flush.size());
   ASSERT_EQ(5, list.NumNotFlushed());
   ASSERT_FALSE(list.IsFlushPending());
-  ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
+  ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
 
   // Pick tables to flush again
   autovector<MemTable*> to_flush3;
-  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3);
-  ASSERT_EQ(0, to_flush3.size());  // nothing not in progress of being flushed
+  list.PickMemtablesToFlush(
+      std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush3);
+  // Picks newest (fifth oldest)
+  ASSERT_EQ(1, to_flush3.size());
+  ASSERT_EQ(5, list.NumNotFlushed());
+  ASSERT_FALSE(list.IsFlushPending());
+  ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
+
+  // Nothing left to flush
+  autovector<MemTable*> to_flush4;
+  list.PickMemtablesToFlush(
+      std::numeric_limits<uint64_t>::max() /* memtable_id */, &to_flush4);
+  ASSERT_EQ(0, to_flush4.size());
   ASSERT_EQ(5, list.NumNotFlushed());
   ASSERT_FALSE(list.IsFlushPending());
   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
 
-  // Flush the 4 memtables that were picked in to_flush
+  // Flush the 3 memtables that were picked in to_flush
   s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
                                        &to_delete);
   ASSERT_OK(s);
 
-  // Note:  now to_flush contains tables[0,1,2,4].  to_flush2 contains
-  // tables[3].
+  // Note:  now to_flush contains tables[0,1,2].  to_flush2 contains
+  // tables[3]. to_flush3 contains tables[4].
   // Current implementation will only commit memtables in the order they were
   // created. So TryInstallMemtableFlushResults will install the first 3 tables
   // in to_flush and stop when it encounters a table not yet flushed.
@@ -705,7 +793,17 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   ASSERT_FALSE(list.IsFlushPending());
   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
 
-  // Flush the 1 memtable that was picked in to_flush2
+  // Flush the 1 memtable (tables[4]) that was picked in to_flush3
+  s = MemTableListTest::Mock_InstallMemtableFlushResults(
+      &list, mutable_cf_options, to_flush3, &to_delete);
+  ASSERT_OK(s);
+
+  // This will install 0 tables since tables[4] flushed while tables[3] has not
+  // yet flushed.
+  ASSERT_EQ(2, list.NumNotFlushed());
+  ASSERT_EQ(0, to_delete.size());
+
+  // Flush the 1 memtable (tables[3]) that was picked in to_flush2
   s = MemTableListTest::Mock_InstallMemtableFlushResults(
       &list, mutable_cf_options, to_flush2, &to_delete);
   ASSERT_OK(s);
@@ -735,11 +833,11 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   memtable_id = 4;
   // Pick tables to flush. The tables to pick must have ID smaller than or
   // equal to 4. Therefore, no table will be selected in this case.
-  autovector<MemTable*> to_flush4;
+  autovector<MemTable*> to_flush5;
   list.FlushRequested();
   ASSERT_TRUE(list.HasFlushRequested());
-  list.PickMemtablesToFlush(&memtable_id, &to_flush4);
-  ASSERT_TRUE(to_flush4.empty());
+  list.PickMemtablesToFlush(memtable_id, &to_flush5);
+  ASSERT_TRUE(to_flush5.empty());
   ASSERT_EQ(1, list.NumNotFlushed());
   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
   ASSERT_FALSE(list.IsFlushPending());
@@ -749,8 +847,8 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   // equal to 5. Therefore, only tables[5] will be selected.
   memtable_id = 5;
   list.FlushRequested();
-  list.PickMemtablesToFlush(&memtable_id, &to_flush4);
-  ASSERT_EQ(1, static_cast<int>(to_flush4.size()));
+  list.PickMemtablesToFlush(memtable_id, &to_flush5);
+  ASSERT_EQ(1, static_cast<int>(to_flush5.size()));
   ASSERT_EQ(1, list.NumNotFlushed());
   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
   ASSERT_FALSE(list.IsFlushPending());
@@ -791,7 +889,7 @@ TEST_F(MemTableListTest, AtomicFlusTest) {
 
   auto factory = std::make_shared<SkipListFactory>();
   options.memtable_factory = factory;
-  ImmutableCFOptions ioptions(options);
+  ImmutableOptions ioptions(options);
   InternalKeyComparator cmp(BytewiseComparator());
   WriteBufferManager wb(options.db_write_buffer_size);
 
@@ -823,11 +921,16 @@ TEST_F(MemTableListTest, AtomicFlusTest) {
 
       std::string value;
 
-      mem->Add(++seq, kTypeValue, "key1", ToString(i));
-      mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
-      mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
-      mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
-      mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
+      ASSERT_OK(mem->Add(++seq, kTypeValue, "key1", std::to_string(i),
+                         nullptr /* kv_prot_info */));
+      ASSERT_OK(mem->Add(++seq, kTypeValue, "keyN" + std::to_string(i),
+                         "valueN", nullptr /* kv_prot_info */));
+      ASSERT_OK(mem->Add(++seq, kTypeValue, "keyX" + std::to_string(i), "value",
+                         nullptr /* kv_prot_info */));
+      ASSERT_OK(mem->Add(++seq, kTypeValue, "keyM" + std::to_string(i),
+                         "valueM", nullptr /* kv_prot_info */));
+      ASSERT_OK(mem->Add(++seq, kTypeDeletion, "keyX" + std::to_string(i), "",
+                         nullptr /* kv_prot_info */));
 
       elem.push_back(mem);
     }
@@ -841,7 +944,9 @@ TEST_F(MemTableListTest, AtomicFlusTest) {
     auto* list = lists[i];
     ASSERT_FALSE(list->IsFlushPending());
     ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
-    list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]);
+    list->PickMemtablesToFlush(
+        std::numeric_limits<uint64_t>::max() /* memtable_id */,
+        &flush_candidates[i]);
     ASSERT_EQ(0, flush_candidates[i].size());
   }
   // Request flush even though there is nothing to flush
@@ -871,8 +976,7 @@ TEST_F(MemTableListTest, AtomicFlusTest) {
   // Pick memtables to flush
   for (auto i = 0; i != num_cfs; ++i) {
     flush_candidates[i].clear();
-    lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i],
-                                   &flush_candidates[i]);
+    lists[i]->PickMemtablesToFlush(flush_memtable_ids[i], &flush_candidates[i]);
     ASSERT_EQ(flush_memtable_ids[i] - 0 + 1,
               static_cast<uint64_t>(flush_candidates[i].size()));
   }
@@ -929,6 +1033,7 @@ TEST_F(MemTableListTest, AtomicFlusTest) {
 }  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
+  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();
 }