]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/memtable_list_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / memtable_list_test.cc
index 625b86922bc20fb7f2b8ef5e6070ee39333e3ae2..cc6e566ad5733d81b3fed1c28b7fd2ebbbbefbb9 100644 (file)
@@ -1,33 +1,36 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 
 #include "db/memtable_list.h"
 #include <algorithm>
 #include <string>
 #include <vector>
 #include "db/merge_context.h"
-#include "db/range_del_aggregator.h"
 #include "db/version_set.h"
 #include "db/write_controller.h"
 #include "rocksdb/db.h"
 #include "rocksdb/status.h"
 #include "rocksdb/write_buffer_manager.h"
+#include "test_util/testharness.h"
+#include "test_util/testutil.h"
 #include "util/string_util.h"
-#include "util/testharness.h"
-#include "util/testutil.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 class MemTableListTest : public testing::Test {
  public:
   std::string dbname;
   DB* db;
   Options options;
+  std::vector<ColumnFamilyHandle*> handles;
+  std::atomic<uint64_t> file_number;
 
-  MemTableListTest() : db(nullptr) {
-    dbname = test::TmpDir() + "/memtable_list_test";
+  MemTableListTest() : db(nullptr), file_number(1) {
+    dbname = test::PerThreadDBPath("memtable_list_test");
+    options.create_if_missing = true;
+    DestroyDB(dbname, options);
   }
 
   // Create a test db if not yet created
@@ -35,19 +38,51 @@ class MemTableListTest : public testing::Test {
     if (db == nullptr) {
       options.create_if_missing = true;
       DestroyDB(dbname, options);
-      Status s = DB::Open(options, dbname, &db);
+      // Open DB only with default column family
+      ColumnFamilyOptions cf_options;
+      std::vector<ColumnFamilyDescriptor> cf_descs;
+      cf_descs.emplace_back(kDefaultColumnFamilyName, cf_options);
+      Status s = DB::Open(options, dbname, cf_descs, &handles, &db);
       EXPECT_OK(s);
+
+      ColumnFamilyOptions cf_opt1, cf_opt2;
+      cf_opt1.cf_paths.emplace_back(dbname + "_one_1",
+                                    std::numeric_limits<uint64_t>::max());
+      cf_opt2.cf_paths.emplace_back(dbname + "_two_1",
+                                    std::numeric_limits<uint64_t>::max());
+      int sz = static_cast<int>(handles.size());
+      handles.resize(sz + 2);
+      s = db->CreateColumnFamily(cf_opt1, "one", &handles[1]);
+      EXPECT_OK(s);
+      s = db->CreateColumnFamily(cf_opt2, "two", &handles[2]);
+      EXPECT_OK(s);
+
+      cf_descs.emplace_back("one", cf_options);
+      cf_descs.emplace_back("two", cf_options);
     }
   }
 
-  ~MemTableListTest() {
+  ~MemTableListTest() override {
     if (db) {
+      std::vector<ColumnFamilyDescriptor> cf_descs(handles.size());
+#ifndef ROCKSDB_LITE
+      for (int i = 0; i != static_cast<int>(handles.size()); ++i) {
+        EXPECT_OK(handles[i]->GetDescriptor(&cf_descs[i]));
+      }
+#endif  // !ROCKSDB_LITE
+      for (auto h : handles) {
+        if (h) {
+          EXPECT_OK(db->DestroyColumnFamilyHandle(h));
+        }
+      }
+      handles.clear();
       delete db;
-      DestroyDB(dbname, options);
+      db = nullptr;
+      DestroyDB(dbname, options, cf_descs);
     }
   }
 
-  // Calls MemTableList::InstallMemtableFlushResults() and sets up all
+  // Calls MemTableList::TryInstallMemtableFlushResults() and sets up all
   // structures needed to call this function.
   Status Mock_InstallMemtableFlushResults(
       MemTableList* list, const MutableCFOptions& mutable_cf_options,
@@ -56,49 +91,115 @@ class MemTableListTest : public testing::Test {
     test::NullLogger logger;
     LogBuffer log_buffer(DEBUG_LEVEL, &logger);
 
+    CreateDB();
     // Create a mock VersionSet
     DBOptions db_options;
     ImmutableDBOptions immutable_db_options(db_options);
     EnvOptions env_options;
-    shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
+    std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
     WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
     WriteController write_controller(10000000u);
 
+    VersionSet versions(dbname, &immutable_db_options, env_options,
+                        table_cache.get(), &write_buffer_manager,
+                        &write_controller, /*block_cache_tracer=*/nullptr,
+                        /*io_tracer=*/nullptr);
+    std::vector<ColumnFamilyDescriptor> cf_descs;
+    cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
+    cf_descs.emplace_back("one", ColumnFamilyOptions());
+    cf_descs.emplace_back("two", ColumnFamilyOptions());
+
+    EXPECT_OK(versions.Recover(cf_descs, false));
+
+    // Create mock default ColumnFamilyData
+    auto column_family_set = versions.GetColumnFamilySet();
+    LogsWithPrepTracker dummy_prep_tracker;
+    auto cfd = column_family_set->GetDefault();
+    EXPECT_TRUE(nullptr != cfd);
+    uint64_t file_num = file_number.fetch_add(1);
+    IOStatus io_s;
+    // Create dummy mutex.
+    InstrumentedMutex mutex;
+    InstrumentedMutexLock l(&mutex);
+    std::list<std::unique_ptr<FlushJobInfo>> flush_jobs_info;
+    Status s = list->TryInstallMemtableFlushResults(
+        cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex,
+        file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info, &io_s);
+    EXPECT_OK(io_s);
+    return s;
+  }
+
+  // Calls MemTableList::InstallMemtableFlushResults() and sets up all
+  // structures needed to call this function.
+  Status Mock_InstallMemtableAtomicFlushResults(
+      autovector<MemTableList*>& lists, const autovector<uint32_t>& cf_ids,
+      const autovector<const MutableCFOptions*>& mutable_cf_options_list,
+      const autovector<const autovector<MemTable*>*>& mems_list,
+      autovector<MemTable*>* to_delete) {
+    // Create a mock Logger
+    test::NullLogger logger;
+    LogBuffer log_buffer(DEBUG_LEVEL, &logger);
+
     CreateDB();
+    // Create a mock VersionSet
+    DBOptions db_options;
+
+    ImmutableDBOptions immutable_db_options(db_options);
+    EnvOptions env_options;
+    std::shared_ptr<Cache> table_cache(NewLRUCache(50000, 16));
+    WriteBufferManager write_buffer_manager(db_options.db_write_buffer_size);
+    WriteController write_controller(10000000u);
+
     VersionSet versions(dbname, &immutable_db_options, env_options,
                         table_cache.get(), &write_buffer_manager,
-                        &write_controller);
+                        &write_controller, /*block_cache_tracer=*/nullptr,
+                        /*io_tracer=*/nullptr);
+    std::vector<ColumnFamilyDescriptor> cf_descs;
+    cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions());
+    cf_descs.emplace_back("one", ColumnFamilyOptions());
+    cf_descs.emplace_back("two", ColumnFamilyOptions());
+    EXPECT_OK(versions.Recover(cf_descs, false));
 
     // Create mock default ColumnFamilyData
-    ColumnFamilyOptions cf_options;
-    std::vector<ColumnFamilyDescriptor> column_families;
-    column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
-    EXPECT_OK(versions.Recover(column_families, false));
 
     auto column_family_set = versions.GetColumnFamilySet();
-    auto cfd = column_family_set->GetColumnFamily(0);
-    EXPECT_TRUE(cfd != nullptr);
 
-    // Create dummy mutex.
+    LogsWithPrepTracker dummy_prep_tracker;
+    autovector<ColumnFamilyData*> cfds;
+    for (int i = 0; i != static_cast<int>(cf_ids.size()); ++i) {
+      cfds.emplace_back(column_family_set->GetColumnFamily(cf_ids[i]));
+      EXPECT_NE(nullptr, cfds[i]);
+    }
+    std::vector<FileMetaData> file_metas;
+    file_metas.reserve(cf_ids.size());
+    for (size_t i = 0; i != cf_ids.size(); ++i) {
+      FileMetaData meta;
+      uint64_t file_num = file_number.fetch_add(1);
+      meta.fd = FileDescriptor(file_num, 0, 0);
+      file_metas.emplace_back(meta);
+    }
+    autovector<FileMetaData*> file_meta_ptrs;
+    for (auto& meta : file_metas) {
+      file_meta_ptrs.push_back(&meta);
+    }
     InstrumentedMutex mutex;
     InstrumentedMutexLock l(&mutex);
-
-    return list->InstallMemtableFlushResults(cfd, mutable_cf_options, m,
-                                             &versions, &mutex, 1, to_delete,
-                                             nullptr, &log_buffer);
+    return InstallMemtableAtomicFlushResults(
+        &lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
+        file_meta_ptrs, to_delete, nullptr, &log_buffer);
   }
 };
 
 TEST_F(MemTableListTest, Empty) {
   // Create an empty MemTableList and validate basic functions.
-  MemTableList list(1, 0);
+  MemTableList list(1, 0, 0);
 
   ASSERT_EQ(0, list.NumNotFlushed());
   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
   ASSERT_FALSE(list.IsFlushPending());
 
   autovector<MemTable*> mems;
-  list.PickMemtablesToFlush(&mems);
+  list.PickMemtablesToFlush(nullptr /* memtable_id */, &mems);
   ASSERT_EQ(0, mems.size());
 
   autovector<MemTable*> to_delete;
@@ -110,20 +211,23 @@ TEST_F(MemTableListTest, GetTest) {
   // Create MemTableList
   int min_write_buffer_number_to_merge = 2;
   int max_write_buffer_number_to_maintain = 0;
+  int64_t max_write_buffer_size_to_maintain = 0;
   MemTableList list(min_write_buffer_number_to_merge,
-                    max_write_buffer_number_to_maintain);
+                    max_write_buffer_number_to_maintain,
+                    max_write_buffer_size_to_maintain);
 
   SequenceNumber seq = 1;
   std::string value;
   Status s;
   MergeContext merge_context;
   InternalKeyComparator ikey_cmp(options.comparator);
-  RangeDelAggregator range_del_agg(ikey_cmp, {} /* snapshots */);
+  SequenceNumber max_covering_tombstone_seq = 0;
   autovector<MemTable*> to_delete;
 
   LookupKey lkey("key1", seq);
-  bool found = list.current()->Get(lkey, &value, &s, &merge_context,
-                                   &range_del_agg, ReadOptions());
+  bool found = list.current()->Get(
+      lkey, &value, /*timestamp*/nullptr, &s, &merge_context,
+      &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   // Create a MemTable
@@ -134,7 +238,7 @@ TEST_F(MemTableListTest, GetTest) {
 
   WriteBufferManager wb(options.db_write_buffer_size);
   MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
-                               kMaxSequenceNumber);
+                               kMaxSequenceNumber, 0 /* column_family_id */);
   mem->Ref();
 
   // Write some keys to this memtable.
@@ -145,20 +249,23 @@ TEST_F(MemTableListTest, GetTest) {
 
   // Fetch the newly written keys
   merge_context.Clear();
-  found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
-                   &range_del_agg, ReadOptions());
+  found = mem->Get(LookupKey("key1", seq), &value,
+                   /*timestamp*/nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ(value, "value1");
 
   merge_context.Clear();
-  found = mem->Get(LookupKey("key1", 2), &value, &s, &merge_context,
-                   &range_del_agg, ReadOptions());
+  found = mem->Get(LookupKey("key1", 2), &value,
+                   /*timestamp*/nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions());
   // MemTable found out that this key is *not* found (at this sequence#)
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
-  found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
-                   &range_del_agg, ReadOptions());
+  found = mem->Get(LookupKey("key2", seq), &value,
+                   /*timestamp*/nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ(value, "value2.2");
 
@@ -173,7 +280,7 @@ TEST_F(MemTableListTest, GetTest) {
   // Create another memtable and write some keys to it
   WriteBufferManager wb2(options.db_write_buffer_size);
   MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
-                                kMaxSequenceNumber);
+                                kMaxSequenceNumber, 0 /* column_family_id */);
   mem2->Ref();
 
   mem2->Add(++seq, kTypeDeletion, "key1", "");
@@ -184,25 +291,29 @@ TEST_F(MemTableListTest, GetTest) {
 
   // Fetch keys via MemTableList
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key1", seq), &value, &s,
-                              &merge_context, &range_del_agg, ReadOptions());
+  found = list.current()->Get(
+      LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s,
+      &merge_context, &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key1", saved_seq), &value, &s,
-                              &merge_context, &range_del_agg, ReadOptions());
+  found = list.current()->Get(
+      LookupKey("key1", saved_seq), &value, /*timestamp*/nullptr,
+      &s, &merge_context, &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ("value1", value);
 
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key2", seq), &value, &s,
-                              &merge_context, &range_del_agg, ReadOptions());
+  found = list.current()->Get(
+      LookupKey("key2", seq), &value, /*timestamp*/nullptr, &s,
+      &merge_context, &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ(value, "value2.3");
 
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key2", 1), &value, &s, &merge_context,
-                              &range_del_agg, ReadOptions());
+  found = list.current()->Get(
+      LookupKey("key2", 1), &value, /*timestamp*/nullptr, &s,
+      &merge_context, &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   ASSERT_EQ(2, list.NumNotFlushed());
@@ -217,20 +328,23 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
   // Create MemTableList
   int min_write_buffer_number_to_merge = 2;
   int max_write_buffer_number_to_maintain = 2;
+  int64_t max_write_buffer_size_to_maintain = 2000;
   MemTableList list(min_write_buffer_number_to_merge,
-                    max_write_buffer_number_to_maintain);
+                    max_write_buffer_number_to_maintain,
+                    max_write_buffer_size_to_maintain);
 
   SequenceNumber seq = 1;
   std::string value;
   Status s;
   MergeContext merge_context;
   InternalKeyComparator ikey_cmp(options.comparator);
-  RangeDelAggregator range_del_agg(ikey_cmp, {} /* snapshots */);
+  SequenceNumber max_covering_tombstone_seq = 0;
   autovector<MemTable*> to_delete;
 
   LookupKey lkey("key1", seq);
-  bool found = list.current()->Get(lkey, &value, &s, &merge_context,
-                                   &range_del_agg, ReadOptions());
+  bool found = list.current()->Get(
+      lkey, &value, /*timestamp*/nullptr, &s, &merge_context,
+      &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   // Create a MemTable
@@ -241,7 +355,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
 
   WriteBufferManager wb(options.db_write_buffer_size);
   MemTable* mem = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb,
-                               kMaxSequenceNumber);
+                               kMaxSequenceNumber, 0 /* column_family_id */);
   mem->Ref();
 
   // Write some keys to this memtable.
@@ -251,14 +365,16 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
 
   // Fetch the newly written keys
   merge_context.Clear();
-  found = mem->Get(LookupKey("key1", seq), &value, &s, &merge_context,
-                   &range_del_agg, ReadOptions());
+  found = mem->Get(LookupKey("key1", seq), &value,
+                   /*timestamp*/nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions());
   // MemTable found out that this key is *not* found (at this sequence#)
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
-  found = mem->Get(LookupKey("key2", seq), &value, &s, &merge_context,
-                   &range_del_agg, ReadOptions());
+  found = mem->Get(LookupKey("key2", seq), &value,
+                   /*timestamp*/nullptr, &s, &merge_context,
+                   &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ(value, "value2.2");
 
@@ -268,24 +384,27 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
 
   // Fetch keys via MemTableList
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key1", seq), &value, &s,
-                              &merge_context, &range_del_agg, ReadOptions());
+  found = list.current()->Get(LookupKey("key1", seq), &value,
+                              /*timestamp*/nullptr, &s, &merge_context,
+                              &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key2", seq), &value, &s,
-                              &merge_context, &range_del_agg, ReadOptions());
+  found = list.current()->Get(LookupKey("key2", seq), &value,
+                              /*timestamp*/nullptr, &s, &merge_context,
+                              &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(s.ok() && found);
   ASSERT_EQ("value2.2", value);
 
   // Flush this memtable from the list.
   // (It will then be a part of the memtable history).
   autovector<MemTable*> to_flush;
-  list.PickMemtablesToFlush(&to_flush);
+  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
   ASSERT_EQ(1, to_flush.size());
 
-  s = Mock_InstallMemtableFlushResults(&list, MutableCFOptions(options),
-                                       to_flush, &to_delete);
+  MutableCFOptions mutable_cf_options(options);
+  s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
+                                       &to_delete);
   ASSERT_OK(s);
   ASSERT_EQ(0, list.NumNotFlushed());
   ASSERT_EQ(1, list.NumFlushed());
@@ -293,33 +412,35 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
 
   // Verify keys are no longer in MemTableList
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key1", seq), &value, &s,
-                              &merge_context, &range_del_agg, ReadOptions());
+  found = list.current()->Get(LookupKey("key1", seq), &value,
+                              /*timestamp*/nullptr, &s, &merge_context,
+                              &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key2", seq), &value, &s,
-                              &merge_context, &range_del_agg, ReadOptions());
+  found = list.current()->Get(LookupKey("key2", seq), &value,
+                              /*timestamp*/nullptr, &s, &merge_context,
+                              &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   // Verify keys are present in history
   merge_context.Clear();
-  found = list.current()->GetFromHistory(LookupKey("key1", seq), &value, &s,
-                                         &merge_context, &range_del_agg,
-                                         ReadOptions());
+  found = list.current()->GetFromHistory(
+      LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
+      &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
-  found = list.current()->GetFromHistory(LookupKey("key2", seq), &value, &s,
-                                         &merge_context, &range_del_agg,
-                                         ReadOptions());
+  found = list.current()->GetFromHistory(
+      LookupKey("key2", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
+      &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(found);
   ASSERT_EQ("value2.2", value);
 
   // Create another memtable and write some keys to it
   WriteBufferManager wb2(options.db_write_buffer_size);
   MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2,
-                                kMaxSequenceNumber);
+                                kMaxSequenceNumber, 0 /* column_family_id */);
   mem2->Ref();
 
   mem2->Add(++seq, kTypeDeletion, "key1", "");
@@ -330,12 +451,12 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
   ASSERT_EQ(0, to_delete.size());
 
   to_flush.clear();
-  list.PickMemtablesToFlush(&to_flush);
+  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
   ASSERT_EQ(1, to_flush.size());
 
   // Flush second memtable
-  s = Mock_InstallMemtableFlushResults(&list, MutableCFOptions(options),
-                                       to_flush, &to_delete);
+  s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
+                                       &to_delete);
   ASSERT_OK(s);
   ASSERT_EQ(0, list.NumNotFlushed());
   ASSERT_EQ(2, list.NumFlushed());
@@ -344,7 +465,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
   // Add a third memtable to push the first memtable out of the history
   WriteBufferManager wb3(options.db_write_buffer_size);
   MemTable* mem3 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb3,
-                                kMaxSequenceNumber);
+                                kMaxSequenceNumber, 0 /* column_family_id */);
   mem3->Ref();
   list.Add(mem3, &to_delete);
   ASSERT_EQ(1, list.NumNotFlushed());
@@ -353,38 +474,42 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
 
   // Verify keys are no longer in MemTableList
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key1", seq), &value, &s,
-                              &merge_context, &range_del_agg, ReadOptions());
+  found = list.current()->Get(LookupKey("key1", seq), &value,
+                              /*timestamp*/nullptr, &s, &merge_context,
+                              &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key2", seq), &value, &s,
-                              &merge_context, &range_del_agg, ReadOptions());
+  found = list.current()->Get(LookupKey("key2", seq), &value,
+                              /*timestamp*/nullptr, &s, &merge_context,
+                              &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key3", seq), &value, &s,
-                              &merge_context, &range_del_agg, ReadOptions());
+  found = list.current()->Get(LookupKey("key3", seq), &value,
+                              /*timestamp*/nullptr, &s, &merge_context,
+                              &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   // Verify that the second memtable's keys are in the history
   merge_context.Clear();
-  found = list.current()->GetFromHistory(LookupKey("key1", seq), &value, &s,
-                                         &merge_context, &range_del_agg,
-                                         ReadOptions());
+  found = list.current()->GetFromHistory(
+      LookupKey("key1", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
+      &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(found && s.IsNotFound());
 
   merge_context.Clear();
-  found = list.current()->GetFromHistory(LookupKey("key3", seq), &value, &s,
-                                         &merge_context, &range_del_agg,
-                                         ReadOptions());
+  found = list.current()->GetFromHistory(
+      LookupKey("key3", seq), &value, /*timestamp*/nullptr, &s, &merge_context,
+      &max_covering_tombstone_seq, ReadOptions());
   ASSERT_TRUE(found);
   ASSERT_EQ("value3", value);
 
   // Verify that key2 from the first memtable is no longer in the history
   merge_context.Clear();
-  found = list.current()->Get(LookupKey("key2", seq), &value, &s,
-                              &merge_context, &range_del_agg, ReadOptions());
+  found = list.current()->Get(LookupKey("key2", seq), &value,
+                              /*timestamp*/nullptr, &s, &merge_context,
+                              &max_covering_tombstone_seq, ReadOptions());
   ASSERT_FALSE(found);
 
   // Cleanup
@@ -396,7 +521,7 @@ TEST_F(MemTableListTest, GetFromHistoryTest) {
 }
 
 TEST_F(MemTableListTest, FlushPendingTest) {
-  const int num_tables = 5;
+  const int num_tables = 6;
   SequenceNumber seq = 1;
   Status s;
 
@@ -410,15 +535,20 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   // Create MemTableList
   int min_write_buffer_number_to_merge = 3;
   int max_write_buffer_number_to_maintain = 7;
+  int64_t max_write_buffer_size_to_maintain =
+      7 * static_cast<int>(options.write_buffer_size);
   MemTableList list(min_write_buffer_number_to_merge,
-                    max_write_buffer_number_to_maintain);
+                    max_write_buffer_number_to_maintain,
+                    max_write_buffer_size_to_maintain);
 
   // Create some MemTables
+  uint64_t memtable_id = 0;
   std::vector<MemTable*> tables;
   MutableCFOptions mutable_cf_options(options);
   for (int i = 0; i < num_tables; i++) {
     MemTable* mem = new MemTable(cmp, ioptions, mutable_cf_options, &wb,
-                                 kMaxSequenceNumber);
+                                 kMaxSequenceNumber, 0 /* column_family_id */);
+    mem->SetID(memtable_id++);
     mem->Ref();
 
     std::string value;
@@ -437,7 +567,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   ASSERT_FALSE(list.IsFlushPending());
   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
   autovector<MemTable*> to_flush;
-  list.PickMemtablesToFlush(&to_flush);
+  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
   ASSERT_EQ(0, to_flush.size());
 
   // Request a flush even though there is nothing to flush
@@ -446,7 +576,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
 
   // Attempt to 'flush' to clear request for flush
-  list.PickMemtablesToFlush(&to_flush);
+  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
   ASSERT_EQ(0, to_flush.size());
   ASSERT_FALSE(list.IsFlushPending());
   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
@@ -470,7 +600,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
 
   // Pick tables to flush
-  list.PickMemtablesToFlush(&to_flush);
+  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
   ASSERT_EQ(2, to_flush.size());
   ASSERT_EQ(2, list.NumNotFlushed());
   ASSERT_FALSE(list.IsFlushPending());
@@ -491,7 +621,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   ASSERT_EQ(0, to_delete.size());
 
   // Pick tables to flush
-  list.PickMemtablesToFlush(&to_flush);
+  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
   ASSERT_EQ(3, to_flush.size());
   ASSERT_EQ(3, list.NumNotFlushed());
   ASSERT_FALSE(list.IsFlushPending());
@@ -499,7 +629,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
 
   // Pick tables to flush again
   autovector<MemTable*> to_flush2;
-  list.PickMemtablesToFlush(&to_flush2);
+  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
   ASSERT_EQ(0, to_flush2.size());
   ASSERT_EQ(3, list.NumNotFlushed());
   ASSERT_FALSE(list.IsFlushPending());
@@ -517,7 +647,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
 
   // Pick tables to flush again
-  list.PickMemtablesToFlush(&to_flush2);
+  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush2);
   ASSERT_EQ(1, to_flush2.size());
   ASSERT_EQ(4, list.NumNotFlushed());
   ASSERT_FALSE(list.IsFlushPending());
@@ -538,7 +668,7 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   ASSERT_EQ(0, to_delete.size());
 
   // Pick tables to flush
-  list.PickMemtablesToFlush(&to_flush);
+  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush);
   // Should pick 4 of 5 since 1 table has been picked in to_flush2
   ASSERT_EQ(4, to_flush.size());
   ASSERT_EQ(5, list.NumNotFlushed());
@@ -547,23 +677,26 @@ TEST_F(MemTableListTest, FlushPendingTest) {
 
   // Pick tables to flush again
   autovector<MemTable*> to_flush3;
+  list.PickMemtablesToFlush(nullptr /* memtable_id */, &to_flush3);
   ASSERT_EQ(0, to_flush3.size());  // nothing not in progress of being flushed
   ASSERT_EQ(5, list.NumNotFlushed());
   ASSERT_FALSE(list.IsFlushPending());
   ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
 
   // Flush the 4 memtables that were picked in to_flush
-  s = Mock_InstallMemtableFlushResults(&list, MutableCFOptions(options),
-                                       to_flush, &to_delete);
+  s = Mock_InstallMemtableFlushResults(&list, mutable_cf_options, to_flush,
+                                       &to_delete);
   ASSERT_OK(s);
 
   // Note:  now to_flush contains tables[0,1,2,4].  to_flush2 contains
   // tables[3].
   // Current implementation will only commit memtables in the order they were
-  // created.  So InstallMemtableFlushResults will install the first 3 tables
+  // created. So TryInstallMemtableFlushResults will install the first 3 tables
   // in to_flush and stop when it encounters a table not yet flushed.
   ASSERT_EQ(2, list.NumNotFlushed());
-  int num_in_history = std::min(3, max_write_buffer_number_to_maintain);
+  int num_in_history =
+      std::min(3, static_cast<int>(max_write_buffer_size_to_maintain) /
+                      static_cast<int>(options.write_buffer_size));
   ASSERT_EQ(num_in_history, list.NumFlushed());
   ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
 
@@ -574,18 +707,20 @@ TEST_F(MemTableListTest, FlushPendingTest) {
 
   // Flush the 1 memtable that was picked in to_flush2
   s = MemTableListTest::Mock_InstallMemtableFlushResults(
-      &list, MutableCFOptions(options), to_flush2, &to_delete);
+      &list, mutable_cf_options, to_flush2, &to_delete);
   ASSERT_OK(s);
 
   // This will actually install 2 tables.  The 1 we told it to flush, and also
   // tables[4] which has been waiting for tables[3] to commit.
   ASSERT_EQ(0, list.NumNotFlushed());
-  num_in_history = std::min(5, max_write_buffer_number_to_maintain);
+  num_in_history =
+      std::min(5, static_cast<int>(max_write_buffer_size_to_maintain) /
+                      static_cast<int>(options.write_buffer_size));
   ASSERT_EQ(num_in_history, list.NumFlushed());
   ASSERT_EQ(5 - list.NumNotFlushed() - num_in_history, to_delete.size());
 
   for (const auto& m : to_delete) {
-    // Refcount should be 0 after calling InstallMemtableFlushResults.
+    // Refcount should be 0 after calling TryInstallMemtableFlushResults.
     // Verify this, by Ref'ing then UnRef'ing:
     m->Ref();
     ASSERT_EQ(m, m->Unref());
@@ -593,12 +728,42 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   }
   to_delete.clear();
 
+  // Add another table
+  list.Add(tables[5], &to_delete);
+  ASSERT_EQ(1, list.NumNotFlushed());
+  ASSERT_EQ(5, list.GetLatestMemTableID());
+  memtable_id = 4;
+  // Pick tables to flush. The tables to pick must have ID smaller than or
+  // equal to 4. Therefore, no table will be selected in this case.
+  autovector<MemTable*> to_flush4;
+  list.FlushRequested();
+  ASSERT_TRUE(list.HasFlushRequested());
+  list.PickMemtablesToFlush(&memtable_id, &to_flush4);
+  ASSERT_TRUE(to_flush4.empty());
+  ASSERT_EQ(1, list.NumNotFlushed());
+  ASSERT_TRUE(list.imm_flush_needed.load(std::memory_order_acquire));
+  ASSERT_FALSE(list.IsFlushPending());
+  ASSERT_FALSE(list.HasFlushRequested());
+
+  // Pick tables to flush. The tables to pick must have ID smaller than or
+  // equal to 5. Therefore, only tables[5] will be selected.
+  memtable_id = 5;
+  list.FlushRequested();
+  list.PickMemtablesToFlush(&memtable_id, &to_flush4);
+  ASSERT_EQ(1, static_cast<int>(to_flush4.size()));
+  ASSERT_EQ(1, list.NumNotFlushed());
+  ASSERT_FALSE(list.imm_flush_needed.load(std::memory_order_acquire));
+  ASSERT_FALSE(list.IsFlushPending());
+  to_delete.clear();
+
   list.current()->Unref(&to_delete);
-  int to_delete_size = std::min(5, max_write_buffer_number_to_maintain);
+  int to_delete_size =
+      std::min(num_tables, static_cast<int>(max_write_buffer_size_to_maintain) /
+                               static_cast<int>(options.write_buffer_size));
   ASSERT_EQ(to_delete_size, to_delete.size());
 
   for (const auto& m : to_delete) {
-    // Refcount should be 0 after calling InstallMemtableFlushResults.
+    // Refcount should be 0 after calling TryInstallMemtableFlushResults.
     // Verify this, by Ref'ing then UnRef'ing:
     m->Ref();
     ASSERT_EQ(m, m->Unref());
@@ -607,7 +772,161 @@ TEST_F(MemTableListTest, FlushPendingTest) {
   to_delete.clear();
 }
 
-}  // namespace rocksdb
+TEST_F(MemTableListTest, EmptyAtomicFlusTest) {
+  autovector<MemTableList*> lists;
+  autovector<uint32_t> cf_ids;
+  autovector<const MutableCFOptions*> options_list;
+  autovector<const autovector<MemTable*>*> to_flush;
+  autovector<MemTable*> to_delete;
+  Status s = Mock_InstallMemtableAtomicFlushResults(lists, cf_ids, options_list,
+                                                    to_flush, &to_delete);
+  ASSERT_OK(s);
+  ASSERT_TRUE(to_delete.empty());
+}
+
+TEST_F(MemTableListTest, AtomicFlusTest) {
+  const int num_cfs = 3;
+  const int num_tables_per_cf = 2;
+  SequenceNumber seq = 1;
+
+  auto factory = std::make_shared<SkipListFactory>();
+  options.memtable_factory = factory;
+  ImmutableCFOptions ioptions(options);
+  InternalKeyComparator cmp(BytewiseComparator());
+  WriteBufferManager wb(options.db_write_buffer_size);
+
+  // Create MemTableLists
+  int min_write_buffer_number_to_merge = 3;
+  int max_write_buffer_number_to_maintain = 7;
+  int64_t max_write_buffer_size_to_maintain =
+      7 * static_cast<int64_t>(options.write_buffer_size);
+  autovector<MemTableList*> lists;
+  for (int i = 0; i != num_cfs; ++i) {
+    lists.emplace_back(new MemTableList(min_write_buffer_number_to_merge,
+                                        max_write_buffer_number_to_maintain,
+                                        max_write_buffer_size_to_maintain));
+  }
+
+  autovector<uint32_t> cf_ids;
+  std::vector<std::vector<MemTable*>> tables(num_cfs);
+  autovector<const MutableCFOptions*> mutable_cf_options_list;
+  uint32_t cf_id = 0;
+  for (auto& elem : tables) {
+    mutable_cf_options_list.emplace_back(new MutableCFOptions(options));
+    uint64_t memtable_id = 0;
+    for (int i = 0; i != num_tables_per_cf; ++i) {
+      MemTable* mem =
+          new MemTable(cmp, ioptions, *(mutable_cf_options_list.back()), &wb,
+                       kMaxSequenceNumber, cf_id);
+      mem->SetID(memtable_id++);
+      mem->Ref();
+
+      std::string value;
+
+      mem->Add(++seq, kTypeValue, "key1", ToString(i));
+      mem->Add(++seq, kTypeValue, "keyN" + ToString(i), "valueN");
+      mem->Add(++seq, kTypeValue, "keyX" + ToString(i), "value");
+      mem->Add(++seq, kTypeValue, "keyM" + ToString(i), "valueM");
+      mem->Add(++seq, kTypeDeletion, "keyX" + ToString(i), "");
+
+      elem.push_back(mem);
+    }
+    cf_ids.push_back(cf_id++);
+  }
+
+  std::vector<autovector<MemTable*>> flush_candidates(num_cfs);
+
+  // Nothing to flush
+  for (auto i = 0; i != num_cfs; ++i) {
+    auto* list = lists[i];
+    ASSERT_FALSE(list->IsFlushPending());
+    ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
+    list->PickMemtablesToFlush(nullptr /* memtable_id */, &flush_candidates[i]);
+    ASSERT_EQ(0, flush_candidates[i].size());
+  }
+  // Request flush even though there is nothing to flush
+  for (auto i = 0; i != num_cfs; ++i) {
+    auto* list = lists[i];
+    list->FlushRequested();
+    ASSERT_FALSE(list->IsFlushPending());
+    ASSERT_FALSE(list->imm_flush_needed.load(std::memory_order_acquire));
+  }
+  autovector<MemTable*> to_delete;
+  // Add tables to the immutable memtalbe lists associated with column families
+  for (auto i = 0; i != num_cfs; ++i) {
+    for (auto j = 0; j != num_tables_per_cf; ++j) {
+      lists[i]->Add(tables[i][j], &to_delete);
+    }
+    ASSERT_EQ(num_tables_per_cf, lists[i]->NumNotFlushed());
+    ASSERT_TRUE(lists[i]->IsFlushPending());
+    ASSERT_TRUE(lists[i]->imm_flush_needed.load(std::memory_order_acquire));
+  }
+  std::vector<uint64_t> flush_memtable_ids = {1, 1, 0};
+  //          +----+
+  // list[0]: |0  1|
+  // list[1]: |0  1|
+  //          | +--+
+  // list[2]: |0| 1
+  //          +-+
+  // Pick memtables to flush
+  for (auto i = 0; i != num_cfs; ++i) {
+    flush_candidates[i].clear();
+    lists[i]->PickMemtablesToFlush(&flush_memtable_ids[i],
+                                   &flush_candidates[i]);
+    ASSERT_EQ(flush_memtable_ids[i] - 0 + 1,
+              static_cast<uint64_t>(flush_candidates[i].size()));
+  }
+  autovector<MemTableList*> tmp_lists;
+  autovector<uint32_t> tmp_cf_ids;
+  autovector<const MutableCFOptions*> tmp_options_list;
+  autovector<const autovector<MemTable*>*> to_flush;
+  for (auto i = 0; i != num_cfs; ++i) {
+    if (!flush_candidates[i].empty()) {
+      to_flush.push_back(&flush_candidates[i]);
+      tmp_lists.push_back(lists[i]);
+      tmp_cf_ids.push_back(i);
+      tmp_options_list.push_back(mutable_cf_options_list[i]);
+    }
+  }
+  Status s = Mock_InstallMemtableAtomicFlushResults(
+      tmp_lists, tmp_cf_ids, tmp_options_list, to_flush, &to_delete);
+  ASSERT_OK(s);
+
+  for (auto i = 0; i != num_cfs; ++i) {
+    for (auto j = 0; j != num_tables_per_cf; ++j) {
+      if (static_cast<uint64_t>(j) <= flush_memtable_ids[i]) {
+        ASSERT_LT(0, tables[i][j]->GetFileNumber());
+      }
+    }
+    ASSERT_EQ(
+        static_cast<size_t>(num_tables_per_cf) - flush_candidates[i].size(),
+        lists[i]->NumNotFlushed());
+  }
+
+  to_delete.clear();
+  for (auto list : lists) {
+    list->current()->Unref(&to_delete);
+    delete list;
+  }
+  for (auto& mutable_cf_options : mutable_cf_options_list) {
+    if (mutable_cf_options != nullptr) {
+      delete mutable_cf_options;
+      mutable_cf_options = nullptr;
+    }
+  }
+  // All memtables in tables array must have been flushed, thus ready to be
+  // deleted.
+  ASSERT_EQ(to_delete.size(), tables.size() * tables.front().size());
+  for (const auto& m : to_delete) {
+    // Refcount should be 0 after calling InstallMemtableFlushResults.
+    // Verify this by Ref'ing and then Unref'ing.
+    m->Ref();
+    ASSERT_EQ(m, m->Unref());
+    delete m;
+  }
+}
+
+}  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);