]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_flush_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / db_flush_test.cc
index eeaa6912370069ec31bdfadcd3e5758738f2653c..3b3f7e1836c3e6a3c1df9898d7a5d1ee43b1c38a 100644 (file)
@@ -8,22 +8,31 @@
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include <atomic>
+#include <limits>
 
 #include "db/db_impl/db_impl.h"
 #include "db/db_test_util.h"
+#include "env/mock_env.h"
 #include "file/filename.h"
 #include "port/port.h"
 #include "port/stack_trace.h"
+#include "rocksdb/utilities/transaction_db.h"
 #include "test_util/sync_point.h"
+#include "test_util/testutil.h"
 #include "util/cast_util.h"
 #include "util/mutexlock.h"
 #include "utilities/fault_injection_env.h"
+#include "utilities/fault_injection_fs.h"
 
 namespace ROCKSDB_NAMESPACE {
 
+// This is a static filter used for filtering
+// kvs during the compaction process.
+static std::string NEW_VALUE = "NewValue";
+
 class DBFlushTest : public DBTestBase {
  public:
-  DBFlushTest() : DBTestBase("/db_flush_test", /*env_do_fsync=*/true) {}
+  DBFlushTest() : DBTestBase("db_flush_test", /*env_do_fsync=*/true) {}
 };
 
 class DBFlushDirectIOTest : public DBFlushTest,
@@ -48,7 +57,7 @@ TEST_F(DBFlushTest, FlushWhileWritingManifest) {
   Reopen(options);
   FlushOptions no_wait;
   no_wait.wait = false;
-  no_wait.allow_write_stall=true;
+  no_wait.allow_write_stall = true;
 
   SyncPoint::GetInstance()->LoadDependency(
       {{"VersionSet::LogAndApply:WriteManifest",
@@ -79,29 +88,16 @@ TEST_F(DBFlushTest, SyncFail) {
   options.env = fault_injection_env.get();
 
   SyncPoint::GetInstance()->LoadDependency(
-      {{"DBFlushTest::SyncFail:GetVersionRefCount:1",
-        "DBImpl::FlushMemTableToOutputFile:BeforePickMemtables"},
-       {"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",
-        "DBFlushTest::SyncFail:GetVersionRefCount:2"},
-       {"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
+      {{"DBFlushTest::SyncFail:1", "DBImpl::SyncClosedLogs:Start"},
        {"DBImpl::SyncClosedLogs:Failed", "DBFlushTest::SyncFail:2"}});
   SyncPoint::GetInstance()->EnableProcessing();
 
   CreateAndReopenWithCF({"pikachu"}, options);
   ASSERT_OK(Put("key", "value"));
-  auto* cfd =
-      static_cast_with_check<ColumnFamilyHandleImpl>(db_->DefaultColumnFamily())
-          ->cfd();
   FlushOptions flush_options;
   flush_options.wait = false;
   ASSERT_OK(dbfull()->Flush(flush_options));
   // Flush installs a new super-version. Get the ref count after that.
-  auto current_before = cfd->current();
-  int refs_before = cfd->current()->TEST_refs();
-  TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:1");
-  TEST_SYNC_POINT("DBFlushTest::SyncFail:GetVersionRefCount:2");
-  int refs_after_picking_memtables = cfd->current()->TEST_refs();
-  ASSERT_EQ(refs_before + 1, refs_after_picking_memtables);
   fault_injection_env->SetFilesystemActive(false);
   TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
   TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
@@ -112,9 +108,6 @@ TEST_F(DBFlushTest, SyncFail) {
 #ifndef ROCKSDB_LITE
   ASSERT_EQ("", FilesPerLevel());  // flush failed.
 #endif                             // ROCKSDB_LITE
-  // Backgroun flush job should release ref count to current version.
-  ASSERT_EQ(current_before, cfd->current());
-  ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
   Destroy(options);
 }
 
@@ -147,7 +140,7 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
   // scheduled in the low-pri (compaction) thread pool.
   Options options = CurrentOptions();
   options.level0_file_num_compaction_trigger = 4;
-  options.memtable_factory.reset(new SpecialSkipListFactory(1));
+  options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
   Reopen(options);
   env_->SetBackgroundThreads(0, Env::HIGH);
 
@@ -179,6 +172,66 @@ TEST_F(DBFlushTest, FlushInLowPriThreadPool) {
   ASSERT_EQ(1, num_compactions);
 }
 
+// Test when flush job is submitted to low priority thread pool and when DB is
+// closed in the meanwhile, CloseHelper doesn't hang.
+TEST_F(DBFlushTest, CloseDBWhenFlushInLowPri) {
+  Options options = CurrentOptions();
+  options.max_background_flushes = 1;
+  options.max_total_wal_size = 8192;
+
+  DestroyAndReopen(options);
+  CreateColumnFamilies({"cf1", "cf2"}, options);
+
+  env_->SetBackgroundThreads(0, Env::HIGH);
+  env_->SetBackgroundThreads(1, Env::LOW);
+  test::SleepingBackgroundTask sleeping_task_low;
+  int num_flushes = 0;
+
+  SyncPoint::GetInstance()->SetCallBack("DBImpl::BGWorkFlush",
+                                        [&](void* /*arg*/) { ++num_flushes; });
+
+  int num_low_flush_unscheduled = 0;
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::UnscheduleLowFlushCallback", [&](void* /*arg*/) {
+        num_low_flush_unscheduled++;
+        // There should be one flush job in low pool that needs to be
+        // unscheduled
+        ASSERT_EQ(num_low_flush_unscheduled, 1);
+      });
+
+  int num_high_flush_unscheduled = 0;
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::UnscheduleHighFlushCallback", [&](void* /*arg*/) {
+        num_high_flush_unscheduled++;
+        // There should be no flush job in high pool
+        ASSERT_EQ(num_high_flush_unscheduled, 0);
+      });
+
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(Put(0, "key1", DummyString(8192)));
+  // Block thread so that flush cannot be run and can be removed from the queue
+  // when called Unschedule.
+  env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
+                 Env::Priority::LOW);
+  sleeping_task_low.WaitUntilSleeping();
+
+  // Trigger flush and flush job will be scheduled to LOW priority thread.
+  ASSERT_OK(Put(0, "key2", DummyString(8192)));
+
+  // Close DB and flush job in low priority queue will be removed without
+  // running.
+  Close();
+  sleeping_task_low.WakeUp();
+  sleeping_task_low.WaitUntilDone();
+  ASSERT_EQ(0, num_flushes);
+
+  TryReopenWithColumnFamilies({"default", "cf1", "cf2"}, options);
+  ASSERT_OK(Put(0, "key3", DummyString(8192)));
+  ASSERT_OK(Flush(0));
+  ASSERT_EQ(1, num_flushes);
+}
+
 TEST_F(DBFlushTest, ManualFlushWithMinWriteBufferNumberToMerge) {
   Options options = CurrentOptions();
   options.write_buffer_size = 100;
@@ -238,13 +291,1471 @@ TEST_F(DBFlushTest, ScheduleOnlyOneBgThread) {
   SyncPoint::GetInstance()->ClearAllCallBacks();
 }
 
+// The following 3 tests are designed for testing garbage statistics at flush
+// time.
+//
+// ======= General Information ======= (from GitHub Wiki).
+// There are three scenarios where memtable flush can be triggered:
+//
+// 1 - Memtable size exceeds ColumnFamilyOptions::write_buffer_size
+//     after a write.
+// 2 - Total memtable size across all column families exceeds
+// DBOptions::db_write_buffer_size,
+//     or DBOptions::write_buffer_manager signals a flush. In this scenario
+//     the largest memtable will be flushed.
+// 3 - Total WAL file size exceeds DBOptions::max_total_wal_size.
+//     In this scenario the memtable with the oldest data will be flushed,
+//     in order to allow the WAL file with data from this memtable to be
+//     purged.
+//
+// As a result, a memtable can be flushed before it is full. This is one
+// reason the generated SST file can be smaller than the corresponding
+// memtable. Compression is another factor to make SST file smaller than
+// corresponding memtable, since data in memtable is uncompressed.
+
+TEST_F(DBFlushTest, StatisticsGarbageBasic) {
+  Options options = CurrentOptions();
+
+  // The following options are used to enforce several values that
+  // may already exist as default values to make this test resilient
+  // to default value updates in the future.
+  options.statistics = CreateDBStatistics();
+
+  // Record all statistics.
+  options.statistics->set_stats_level(StatsLevel::kAll);
+
+  // create the DB if it's not already present
+  options.create_if_missing = true;
+
+  // Useful for now as we are trying to compare uncompressed data savings on
+  // flush().
+  options.compression = kNoCompression;
+
+  // Prevent memtable in place updates. Should already be disabled
+  // (from Wiki:
+  //  In place updates can be enabled by toggling on the bool
+  //  inplace_update_support flag. However, this flag is by default set to
+  //  false
+  //  because this thread-safe in-place update support is not compatible
+  //  with concurrent memtable writes. Note that the bool
+  //  allow_concurrent_memtable_write is set to true by default )
+  options.inplace_update_support = false;
+  options.allow_concurrent_memtable_write = true;
+
+  // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
+  options.write_buffer_size = 64 << 20;
+
+  ASSERT_OK(TryReopen(options));
+
+  // Put multiple times the same key-values.
+  // The encoded length of a db entry in the memtable is
+  // defined in db/memtable.cc (MemTable::Add) as the variable:
+  // encoded_len=  VarintLength(internal_key_size)  --> =
+  // log_256(internal_key).
+  // Min # of bytes
+  //                                                       necessary to
+  //                                                       store
+  //                                                       internal_key_size.
+  //             + internal_key_size                --> = actual key string,
+  //             (size key_size: w/o term null char)
+  //                                                      + 8 bytes for
+  //                                                      fixed uint64 "seq
+  //                                                      number
+  // +
+  //                                                      insertion type"
+  //             + VarintLength(val_size)           --> = min # of bytes to
+  //             store val_size
+  //             + val_size                         --> = actual value
+  //             string
+  // For example, in our situation, "key1" : size 4, "value1" : size 6
+  // (the terminating null characters are not copied over to the memtable).
+  // And therefore encoded_len = 1 + (4+8) + 1 + 6 = 20 bytes per entry.
+  // However in terms of raw data contained in the memtable, and written
+  // over to the SSTable, we only count internal_key_size and val_size,
+  // because this is the only raw chunk of bytes that contains everything
+  // necessary to reconstruct a user entry: sequence number, insertion type,
+  // key, and value.
+
+  // To test the relevance of our Memtable garbage statistics,
+  // namely MEMTABLE_PAYLOAD_BYTES_AT_FLUSH and MEMTABLE_GARBAGE_BYTES_AT_FLUSH,
+  // we insert K-V pairs with 3 distinct keys (of length 4),
+  // and random values of arbitrary length RAND_VALUES_LENGTH,
+  // and we repeat this step NUM_REPEAT times total.
+  // At the end, we insert 3 final K-V pairs with the same 3 keys
+  // and known values (these will be the final values, of length 6).
+  // I chose NUM_REPEAT=2,000 such that no automatic flush is
+  // triggered (the number of bytes in the memtable is therefore
+  // well below any meaningful heuristic for a memtable of size 64MB).
+  // As a result, since each K-V pair is inserted as a payload
+  // of N meaningful bytes (sequence number, insertion type,
+  // key, and value = 8 + 4 + RAND_VALUE_LENGTH),
+  // MEMTABLE_GARBAGE_BYTES_AT_FLUSH should be equal to 2,000 * N bytes
+  // and MEMTABLE_PAYLAOD_BYTES_AT_FLUSH = MEMTABLE_GARBAGE_BYTES_AT_FLUSH +
+  // (3*(8 + 4 + 6)) bytes. For RAND_VALUE_LENGTH = 172 (arbitrary value), we
+  // expect:
+  //      N = 8 + 4 + 172 = 184 bytes
+  //      MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 2,000 * 184 = 368,000 bytes.
+  //      MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 368,000 + 3*18 = 368,054 bytes.
+
+  const size_t NUM_REPEAT = 2000;
+  const size_t RAND_VALUES_LENGTH = 172;
+  const std::string KEY1 = "key1";
+  const std::string KEY2 = "key2";
+  const std::string KEY3 = "key3";
+  const std::string VALUE1 = "value1";
+  const std::string VALUE2 = "value2";
+  const std::string VALUE3 = "value3";
+  uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 0;
+  uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 0;
+
+  Random rnd(301);
+  // Insertion of of K-V pairs, multiple times.
+  for (size_t i = 0; i < NUM_REPEAT; i++) {
+    // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
+    std::string p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
+    std::string p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
+    std::string p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
+    ASSERT_OK(Put(KEY1, p_v1));
+    ASSERT_OK(Put(KEY2, p_v2));
+    ASSERT_OK(Put(KEY3, p_v3));
+    EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
+        KEY1.size() + p_v1.size() + sizeof(uint64_t);
+    EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
+        KEY2.size() + p_v2.size() + sizeof(uint64_t);
+    EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
+        KEY3.size() + p_v3.size() + sizeof(uint64_t);
+  }
+
+  // The memtable data bytes includes the "garbage"
+  // bytes along with the useful payload.
+  EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH =
+      EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH;
+
+  ASSERT_OK(Put(KEY1, VALUE1));
+  ASSERT_OK(Put(KEY2, VALUE2));
+  ASSERT_OK(Put(KEY3, VALUE3));
+
+  // Add useful payload to the memtable data bytes:
+  EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
+      KEY1.size() + VALUE1.size() + KEY2.size() + VALUE2.size() + KEY3.size() +
+      VALUE3.size() + 3 * sizeof(uint64_t);
+
+  // We assert that the last K-V pairs have been successfully inserted,
+  // and that the valid values are VALUE1, VALUE2, VALUE3.
+  PinnableSlice value;
+  ASSERT_OK(Get(KEY1, &value));
+  ASSERT_EQ(value.ToString(), VALUE1);
+  ASSERT_OK(Get(KEY2, &value));
+  ASSERT_EQ(value.ToString(), VALUE2);
+  ASSERT_OK(Get(KEY3, &value));
+  ASSERT_EQ(value.ToString(), VALUE3);
+
+  // Force flush to SST. Increments the statistics counter.
+  ASSERT_OK(Flush());
+
+  // Collect statistics.
+  uint64_t mem_data_bytes =
+      TestGetTickerCount(options, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
+  uint64_t mem_garbage_bytes =
+      TestGetTickerCount(options, MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
+
+  EXPECT_EQ(mem_data_bytes, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
+  EXPECT_EQ(mem_garbage_bytes, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
+
+  Close();
+}
+
+TEST_F(DBFlushTest, StatisticsGarbageInsertAndDeletes) {
+  Options options = CurrentOptions();
+  options.statistics = CreateDBStatistics();
+  options.statistics->set_stats_level(StatsLevel::kAll);
+  options.create_if_missing = true;
+  options.compression = kNoCompression;
+  options.inplace_update_support = false;
+  options.allow_concurrent_memtable_write = true;
+  options.write_buffer_size = 67108864;
+
+  ASSERT_OK(TryReopen(options));
+
+  const size_t NUM_REPEAT = 2000;
+  const size_t RAND_VALUES_LENGTH = 37;
+  const std::string KEY1 = "key1";
+  const std::string KEY2 = "key2";
+  const std::string KEY3 = "key3";
+  const std::string KEY4 = "key4";
+  const std::string KEY5 = "key5";
+  const std::string KEY6 = "key6";
+
+  uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 0;
+  uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 0;
+
+  WriteBatch batch;
+
+  Random rnd(301);
+  // Insertion of of K-V pairs, multiple times.
+  for (size_t i = 0; i < NUM_REPEAT; i++) {
+    // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
+    std::string p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
+    std::string p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
+    std::string p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
+    ASSERT_OK(Put(KEY1, p_v1));
+    ASSERT_OK(Put(KEY2, p_v2));
+    ASSERT_OK(Put(KEY3, p_v3));
+    EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
+        KEY1.size() + p_v1.size() + sizeof(uint64_t);
+    EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
+        KEY2.size() + p_v2.size() + sizeof(uint64_t);
+    EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
+        KEY3.size() + p_v3.size() + sizeof(uint64_t);
+    ASSERT_OK(Delete(KEY1));
+    ASSERT_OK(Delete(KEY2));
+    ASSERT_OK(Delete(KEY3));
+    EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
+        KEY1.size() + KEY2.size() + KEY3.size() + 3 * sizeof(uint64_t);
+  }
+
+  // The memtable data bytes includes the "garbage"
+  // bytes along with the useful payload.
+  EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH =
+      EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH;
+
+  // Note : one set of delete for KEY1, KEY2, KEY3 is written to
+  // SSTable to propagate the delete operations to K-V pairs
+  // that could have been inserted into the database during past Flush
+  // opeartions.
+  EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH -=
+      KEY1.size() + KEY2.size() + KEY3.size() + 3 * sizeof(uint64_t);
+
+  // Additional useful paylaod.
+  ASSERT_OK(Delete(KEY4));
+  ASSERT_OK(Delete(KEY5));
+  ASSERT_OK(Delete(KEY6));
+
+  // // Add useful payload to the memtable data bytes:
+  EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
+      KEY4.size() + KEY5.size() + KEY6.size() + 3 * sizeof(uint64_t);
+
+  // We assert that the K-V pairs have been successfully deleted.
+  PinnableSlice value;
+  ASSERT_NOK(Get(KEY1, &value));
+  ASSERT_NOK(Get(KEY2, &value));
+  ASSERT_NOK(Get(KEY3, &value));
+
+  // Force flush to SST. Increments the statistics counter.
+  ASSERT_OK(Flush());
+
+  // Collect statistics.
+  uint64_t mem_data_bytes =
+      TestGetTickerCount(options, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
+  uint64_t mem_garbage_bytes =
+      TestGetTickerCount(options, MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
+
+  EXPECT_EQ(mem_data_bytes, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
+  EXPECT_EQ(mem_garbage_bytes, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
+
+  Close();
+}
+
+TEST_F(DBFlushTest, StatisticsGarbageRangeDeletes) {
+  Options options = CurrentOptions();
+  options.statistics = CreateDBStatistics();
+  options.statistics->set_stats_level(StatsLevel::kAll);
+  options.create_if_missing = true;
+  options.compression = kNoCompression;
+  options.inplace_update_support = false;
+  options.allow_concurrent_memtable_write = true;
+  options.write_buffer_size = 67108864;
+
+  ASSERT_OK(TryReopen(options));
+
+  const size_t NUM_REPEAT = 1000;
+  const size_t RAND_VALUES_LENGTH = 42;
+  const std::string KEY1 = "key1";
+  const std::string KEY2 = "key2";
+  const std::string KEY3 = "key3";
+  const std::string KEY4 = "key4";
+  const std::string KEY5 = "key5";
+  const std::string KEY6 = "key6";
+  const std::string VALUE3 = "value3";
+
+  uint64_t EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH = 0;
+  uint64_t EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH = 0;
+
+  Random rnd(301);
+  // Insertion of of K-V pairs, multiple times.
+  // Also insert DeleteRange
+  for (size_t i = 0; i < NUM_REPEAT; i++) {
+    // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
+    std::string p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
+    std::string p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
+    std::string p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
+    ASSERT_OK(Put(KEY1, p_v1));
+    ASSERT_OK(Put(KEY2, p_v2));
+    ASSERT_OK(Put(KEY3, p_v3));
+    EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
+        KEY1.size() + p_v1.size() + sizeof(uint64_t);
+    EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
+        KEY2.size() + p_v2.size() + sizeof(uint64_t);
+    EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
+        KEY3.size() + p_v3.size() + sizeof(uint64_t);
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY1,
+                               KEY2));
+    // Note: DeleteRange have an exclusive upper bound, e.g. here: [KEY2,KEY3)
+    // is deleted.
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY2,
+                               KEY3));
+    // Delete ranges are stored as a regular K-V pair, with key=STARTKEY,
+    // value=ENDKEY.
+    EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH +=
+        (KEY1.size() + KEY2.size() + sizeof(uint64_t)) +
+        (KEY2.size() + KEY3.size() + sizeof(uint64_t));
+  }
+
+  // The memtable data bytes includes the "garbage"
+  // bytes along with the useful payload.
+  EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH =
+      EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH;
+
+  // Note : one set of deleteRange for (KEY1, KEY2) and (KEY2, KEY3) is written
+  // to SSTable to propagate the deleteRange operations to K-V pairs that could
+  // have been inserted into the database during past Flush opeartions.
+  EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH -=
+      (KEY1.size() + KEY2.size() + sizeof(uint64_t)) +
+      (KEY2.size() + KEY3.size() + sizeof(uint64_t));
+
+  // Overwrite KEY3 with known value (VALUE3)
+  // Note that during the whole time KEY3 has never been deleted
+  // by the RangeDeletes.
+  ASSERT_OK(Put(KEY3, VALUE3));
+  EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
+      KEY3.size() + VALUE3.size() + sizeof(uint64_t);
+
+  // Additional useful paylaod.
+  ASSERT_OK(
+      db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY4, KEY5));
+  ASSERT_OK(
+      db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY5, KEY6));
+
+  // Add useful payload to the memtable data bytes:
+  EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH +=
+      (KEY4.size() + KEY5.size() + sizeof(uint64_t)) +
+      (KEY5.size() + KEY6.size() + sizeof(uint64_t));
+
+  // We assert that the K-V pairs have been successfully deleted.
+  PinnableSlice value;
+  ASSERT_NOK(Get(KEY1, &value));
+  ASSERT_NOK(Get(KEY2, &value));
+  // And that KEY3's value is correct.
+  ASSERT_OK(Get(KEY3, &value));
+  ASSERT_EQ(value, VALUE3);
+
+  // Force flush to SST. Increments the statistics counter.
+  ASSERT_OK(Flush());
+
+  // Collect statistics.
+  uint64_t mem_data_bytes =
+      TestGetTickerCount(options, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
+  uint64_t mem_garbage_bytes =
+      TestGetTickerCount(options, MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
+
+  EXPECT_EQ(mem_data_bytes, EXPECTED_MEMTABLE_PAYLOAD_BYTES_AT_FLUSH);
+  EXPECT_EQ(mem_garbage_bytes, EXPECTED_MEMTABLE_GARBAGE_BYTES_AT_FLUSH);
+
+  Close();
+}
+
+#ifndef ROCKSDB_LITE
+// This simple Listener can only handle one flush at a time.
+class TestFlushListener : public EventListener {
+ public:
+  TestFlushListener(Env* env, DBFlushTest* test)
+      : slowdown_count(0), stop_count(0), db_closed(), env_(env), test_(test) {
+    db_closed = false;
+  }
+
+  ~TestFlushListener() override {
+    prev_fc_info_.status.PermitUncheckedError();  // Ignore the status
+  }
+
+  void OnTableFileCreated(const TableFileCreationInfo& info) override {
+    // remember the info for later checking the FlushJobInfo.
+    prev_fc_info_ = info;
+    ASSERT_GT(info.db_name.size(), 0U);
+    ASSERT_GT(info.cf_name.size(), 0U);
+    ASSERT_GT(info.file_path.size(), 0U);
+    ASSERT_GT(info.job_id, 0);
+    ASSERT_GT(info.table_properties.data_size, 0U);
+    ASSERT_GT(info.table_properties.raw_key_size, 0U);
+    ASSERT_GT(info.table_properties.raw_value_size, 0U);
+    ASSERT_GT(info.table_properties.num_data_blocks, 0U);
+    ASSERT_GT(info.table_properties.num_entries, 0U);
+    ASSERT_EQ(info.file_checksum, kUnknownFileChecksum);
+    ASSERT_EQ(info.file_checksum_func_name, kUnknownFileChecksumFuncName);
+  }
+
+  void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
+    flushed_dbs_.push_back(db);
+    flushed_column_family_names_.push_back(info.cf_name);
+    if (info.triggered_writes_slowdown) {
+      slowdown_count++;
+    }
+    if (info.triggered_writes_stop) {
+      stop_count++;
+    }
+    // verify whether the previously created file matches the flushed file.
+    ASSERT_EQ(prev_fc_info_.db_name, db->GetName());
+    ASSERT_EQ(prev_fc_info_.cf_name, info.cf_name);
+    ASSERT_EQ(prev_fc_info_.job_id, info.job_id);
+    ASSERT_EQ(prev_fc_info_.file_path, info.file_path);
+    ASSERT_EQ(TableFileNameToNumber(info.file_path), info.file_number);
+
+    // Note: the following chunk relies on the notification pertaining to the
+    // database pointed to by DBTestBase::db_, and is thus bypassed when
+    // that assumption does not hold (see the test case MultiDBMultiListeners
+    // below).
+    ASSERT_TRUE(test_);
+    if (db == test_->db_) {
+      std::vector<std::vector<FileMetaData>> files_by_level;
+      test_->dbfull()->TEST_GetFilesMetaData(db->DefaultColumnFamily(),
+                                             &files_by_level);
+
+      ASSERT_FALSE(files_by_level.empty());
+      auto it = std::find_if(files_by_level[0].begin(), files_by_level[0].end(),
+                             [&](const FileMetaData& meta) {
+                               return meta.fd.GetNumber() == info.file_number;
+                             });
+      ASSERT_NE(it, files_by_level[0].end());
+      ASSERT_EQ(info.oldest_blob_file_number, it->oldest_blob_file_number);
+    }
+
+    ASSERT_EQ(db->GetEnv()->GetThreadID(), info.thread_id);
+    ASSERT_GT(info.thread_id, 0U);
+  }
+
+  std::vector<std::string> flushed_column_family_names_;
+  std::vector<DB*> flushed_dbs_;
+  int slowdown_count;
+  int stop_count;
+  bool db_closing;
+  std::atomic_bool db_closed;
+  TableFileCreationInfo prev_fc_info_;
+
+ protected:
+  Env* env_;
+  DBFlushTest* test_;
+};
+#endif  // !ROCKSDB_LITE
+
+TEST_F(DBFlushTest, MemPurgeBasic) {
+  Options options = CurrentOptions();
+
+  // The following options are used to enforce several values that
+  // may already exist as default values to make this test resilient
+  // to default value updates in the future.
+  options.statistics = CreateDBStatistics();
+
+  // Record all statistics.
+  options.statistics->set_stats_level(StatsLevel::kAll);
+
+  // create the DB if it's not already present
+  options.create_if_missing = true;
+
+  // Useful for now as we are trying to compare uncompressed data savings on
+  // flush().
+  options.compression = kNoCompression;
+
+  // Prevent memtable in place updates. Should already be disabled
+  // (from Wiki:
+  //  In place updates can be enabled by toggling on the bool
+  //  inplace_update_support flag. However, this flag is by default set to
+  //  false
+  //  because this thread-safe in-place update support is not compatible
+  //  with concurrent memtable writes. Note that the bool
+  //  allow_concurrent_memtable_write is set to true by default )
+  options.inplace_update_support = false;
+  options.allow_concurrent_memtable_write = true;
+
+  // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
+  options.write_buffer_size = 1 << 20;
+#ifndef ROCKSDB_LITE
+  // Initially deactivate the MemPurge prototype.
+  options.experimental_mempurge_threshold = 0.0;
+  TestFlushListener* listener = new TestFlushListener(options.env, this);
+  options.listeners.emplace_back(listener);
+#else
+  // Activate directly the MemPurge prototype.
+  // (RocksDB lite does not support dynamic options)
+  options.experimental_mempurge_threshold = 1.0;
+#endif  // !ROCKSDB_LITE
+  ASSERT_OK(TryReopen(options));
+
+  // RocksDB lite does not support dynamic options
+#ifndef ROCKSDB_LITE
+  // Dynamically activate the MemPurge prototype without restarting the DB.
+  ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
+  ASSERT_OK(db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "1.0"}}));
+#endif
+
+  std::atomic<uint32_t> mempurge_count{0};
+  std::atomic<uint32_t> sst_count{0};
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::FlushJob:MemPurgeSuccessful",
+      [&](void* /*arg*/) { mempurge_count++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  std::string KEY1 = "IamKey1";
+  std::string KEY2 = "IamKey2";
+  std::string KEY3 = "IamKey3";
+  std::string KEY4 = "IamKey4";
+  std::string KEY5 = "IamKey5";
+  std::string KEY6 = "IamKey6";
+  std::string KEY7 = "IamKey7";
+  std::string KEY8 = "IamKey8";
+  std::string KEY9 = "IamKey9";
+  std::string RNDKEY1, RNDKEY2, RNDKEY3;
+  const std::string NOT_FOUND = "NOT_FOUND";
+
+  // Heavy overwrite workload,
+  // more than would fit in maximum allowed memtables.
+  Random rnd(719);
+  const size_t NUM_REPEAT = 100;
+  const size_t RAND_KEYS_LENGTH = 57;
+  const size_t RAND_VALUES_LENGTH = 10240;
+  std::string p_v1, p_v2, p_v3, p_v4, p_v5, p_v6, p_v7, p_v8, p_v9, p_rv1,
+      p_rv2, p_rv3;
+
+  // Insert a very first set of keys that will be
+  // mempurged at least once.
+  p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
+  p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
+  p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
+  p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
+  ASSERT_OK(Put(KEY1, p_v1));
+  ASSERT_OK(Put(KEY2, p_v2));
+  ASSERT_OK(Put(KEY3, p_v3));
+  ASSERT_OK(Put(KEY4, p_v4));
+  ASSERT_EQ(Get(KEY1), p_v1);
+  ASSERT_EQ(Get(KEY2), p_v2);
+  ASSERT_EQ(Get(KEY3), p_v3);
+  ASSERT_EQ(Get(KEY4), p_v4);
+
+  // Insertion of of K-V pairs, multiple times (overwrites).
+  for (size_t i = 0; i < NUM_REPEAT; i++) {
+    // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
+    p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_v6 = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_v7 = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_v8 = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_v9 = rnd.RandomString(RAND_VALUES_LENGTH);
+
+    ASSERT_OK(Put(KEY5, p_v5));
+    ASSERT_OK(Put(KEY6, p_v6));
+    ASSERT_OK(Put(KEY7, p_v7));
+    ASSERT_OK(Put(KEY8, p_v8));
+    ASSERT_OK(Put(KEY9, p_v9));
+
+    ASSERT_EQ(Get(KEY1), p_v1);
+    ASSERT_EQ(Get(KEY2), p_v2);
+    ASSERT_EQ(Get(KEY3), p_v3);
+    ASSERT_EQ(Get(KEY4), p_v4);
+    ASSERT_EQ(Get(KEY5), p_v5);
+    ASSERT_EQ(Get(KEY6), p_v6);
+    ASSERT_EQ(Get(KEY7), p_v7);
+    ASSERT_EQ(Get(KEY8), p_v8);
+    ASSERT_EQ(Get(KEY9), p_v9);
+  }
+
+  // Check that there was at least one mempurge
+  const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
+  // Check that there was no SST files created during flush.
+  const uint32_t EXPECTED_SST_COUNT = 0;
+
+  EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
+  EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
+
+  // Insertion of of K-V pairs, no overwrites.
+  for (size_t i = 0; i < NUM_REPEAT; i++) {
+    // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
+    RNDKEY1 = rnd.RandomString(RAND_KEYS_LENGTH);
+    RNDKEY2 = rnd.RandomString(RAND_KEYS_LENGTH);
+    RNDKEY3 = rnd.RandomString(RAND_KEYS_LENGTH);
+    p_rv1 = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_rv2 = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_rv3 = rnd.RandomString(RAND_VALUES_LENGTH);
+
+    ASSERT_OK(Put(RNDKEY1, p_rv1));
+    ASSERT_OK(Put(RNDKEY2, p_rv2));
+    ASSERT_OK(Put(RNDKEY3, p_rv3));
+
+    ASSERT_EQ(Get(KEY1), p_v1);
+    ASSERT_EQ(Get(KEY2), p_v2);
+    ASSERT_EQ(Get(KEY3), p_v3);
+    ASSERT_EQ(Get(KEY4), p_v4);
+    ASSERT_EQ(Get(KEY5), p_v5);
+    ASSERT_EQ(Get(KEY6), p_v6);
+    ASSERT_EQ(Get(KEY7), p_v7);
+    ASSERT_EQ(Get(KEY8), p_v8);
+    ASSERT_EQ(Get(KEY9), p_v9);
+    ASSERT_EQ(Get(RNDKEY1), p_rv1);
+    ASSERT_EQ(Get(RNDKEY2), p_rv2);
+    ASSERT_EQ(Get(RNDKEY3), p_rv3);
+  }
+
+  // Assert that at least one flush to storage has been performed
+  EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
+  // (which will consequently increase the number of mempurges recorded too).
+  EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
+
+  // Assert that there is no data corruption, even with
+  // a flush to storage.
+  ASSERT_EQ(Get(KEY1), p_v1);
+  ASSERT_EQ(Get(KEY2), p_v2);
+  ASSERT_EQ(Get(KEY3), p_v3);
+  ASSERT_EQ(Get(KEY4), p_v4);
+  ASSERT_EQ(Get(KEY5), p_v5);
+  ASSERT_EQ(Get(KEY6), p_v6);
+  ASSERT_EQ(Get(KEY7), p_v7);
+  ASSERT_EQ(Get(KEY8), p_v8);
+  ASSERT_EQ(Get(KEY9), p_v9);
+  ASSERT_EQ(Get(RNDKEY1), p_rv1);
+  ASSERT_EQ(Get(RNDKEY2), p_rv2);
+  ASSERT_EQ(Get(RNDKEY3), p_rv3);
+
+  Close();
+}
+
+// RocksDB lite does not support dynamic options
+#ifndef ROCKSDB_LITE
+TEST_F(DBFlushTest, MemPurgeBasicToggle) {
+  Options options = CurrentOptions();
+
+  // The following options are used to enforce several values that
+  // may already exist as default values to make this test resilient
+  // to default value updates in the future.
+  options.statistics = CreateDBStatistics();
+
+  // Record all statistics.
+  options.statistics->set_stats_level(StatsLevel::kAll);
+
+  // create the DB if it's not already present
+  options.create_if_missing = true;
+
+  // Useful for now as we are trying to compare uncompressed data savings on
+  // flush().
+  options.compression = kNoCompression;
+
+  // Prevent memtable in place updates. Should already be disabled
+  // (from Wiki:
+  //  In place updates can be enabled by toggling on the bool
+  //  inplace_update_support flag. However, this flag is by default set to
+  //  false
+  //  because this thread-safe in-place update support is not compatible
+  //  with concurrent memtable writes. Note that the bool
+  //  allow_concurrent_memtable_write is set to true by default )
+  options.inplace_update_support = false;
+  options.allow_concurrent_memtable_write = true;
+
+  // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
+  options.write_buffer_size = 1 << 20;
+  // Initially deactivate the MemPurge prototype.
+  // (negative values are equivalent to 0.0).
+  options.experimental_mempurge_threshold = -25.3;
+  TestFlushListener* listener = new TestFlushListener(options.env, this);
+  options.listeners.emplace_back(listener);
+
+  ASSERT_OK(TryReopen(options));
+  // Dynamically activate the MemPurge prototype without restarting the DB.
+  ColumnFamilyHandle* cfh = db_->DefaultColumnFamily();
+  // Values greater than 1.0 are equivalent to 1.0
+  ASSERT_OK(
+      db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "3.7898"}}));
+  std::atomic<uint32_t> mempurge_count{0};
+  std::atomic<uint32_t> sst_count{0};
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::FlushJob:MemPurgeSuccessful",
+      [&](void* /*arg*/) { mempurge_count++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  const size_t KVSIZE = 3;
+  std::vector<std::string> KEYS(KVSIZE);
+  for (size_t k = 0; k < KVSIZE; k++) {
+    KEYS[k] = "IamKey" + std::to_string(k);
+  }
+
+  std::vector<std::string> RNDVALS(KVSIZE);
+  const std::string NOT_FOUND = "NOT_FOUND";
+
+  // Heavy overwrite workload,
+  // more than would fit in maximum allowed memtables.
+  Random rnd(719);
+  const size_t NUM_REPEAT = 100;
+  const size_t RAND_VALUES_LENGTH = 10240;
+
+  // Insertion of of K-V pairs, multiple times (overwrites).
+  for (size_t i = 0; i < NUM_REPEAT; i++) {
+    for (size_t j = 0; j < KEYS.size(); j++) {
+      RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
+      ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
+      ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
+    }
+    for (size_t j = 0; j < KEYS.size(); j++) {
+      ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
+    }
+  }
+
+  // Check that there was at least one mempurge
+  const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
+  // Check that there was no SST files created during flush.
+  const uint32_t EXPECTED_SST_COUNT = 0;
+
+  EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
+  EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
+
+  // Dynamically deactivate MemPurge.
+  ASSERT_OK(
+      db_->SetOptions(cfh, {{"experimental_mempurge_threshold", "-1023.0"}}));
+
+  // Insertion of of K-V pairs, multiple times (overwrites).
+  for (size_t i = 0; i < NUM_REPEAT; i++) {
+    for (size_t j = 0; j < KEYS.size(); j++) {
+      RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
+      ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
+      ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
+    }
+    for (size_t j = 0; j < KEYS.size(); j++) {
+      ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
+    }
+  }
+
+  // Check that there was at least one mempurge
+  const uint32_t ZERO = 0;
+  // Assert that at least one flush to storage has been performed
+  EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
+  // The mempurge count is expected to be set to 0 when the options are updated.
+  // We expect no mempurge at all.
+  EXPECT_EQ(mempurge_count.exchange(0), ZERO);
+
+  Close();
+}
+// Closes the "#ifndef ROCKSDB_LITE"
+// End of MemPurgeBasicToggle, which is not
+// supported with RocksDB LITE because it
+// relies on dynamically changing the option
+// flag experimental_mempurge_threshold.
+#endif
+
+// At the moment, MemPurge feature is deactivated
+// when atomic_flush is enabled. This is because the level
+// of garbage between Column Families is not guaranteed to
+// be consistent, therefore a CF could hypothetically
+// trigger a MemPurge while another CF would trigger
+// a regular Flush.
+TEST_F(DBFlushTest, MemPurgeWithAtomicFlush) {
+  Options options = CurrentOptions();
+
+  // The following options are used to enforce several values that
+  // may already exist as default values to make this test resilient
+  // to default value updates in the future.
+  options.statistics = CreateDBStatistics();
+
+  // Record all statistics.
+  options.statistics->set_stats_level(StatsLevel::kAll);
+
+  // create the DB if it's not already present
+  options.create_if_missing = true;
+
+  // Useful for now as we are trying to compare uncompressed data savings on
+  // flush().
+  options.compression = kNoCompression;
+
+  // Prevent memtable in place updates. Should already be disabled
+  // (from Wiki:
+  //  In place updates can be enabled by toggling on the bool
+  //  inplace_update_support flag. However, this flag is by default set to
+  //  false
+  //  because this thread-safe in-place update support is not compatible
+  //  with concurrent memtable writes. Note that the bool
+  //  allow_concurrent_memtable_write is set to true by default )
+  options.inplace_update_support = false;
+  options.allow_concurrent_memtable_write = true;
+
+  // Enforce size of a single MemTable to 64KB (64KB = 65,536 bytes).
+  options.write_buffer_size = 1 << 20;
+  // Activate the MemPurge prototype.
+  options.experimental_mempurge_threshold = 153.245;
+  // Activate atomic_flush.
+  options.atomic_flush = true;
+
+  const std::vector<std::string> new_cf_names = {"pikachu", "eevie"};
+  CreateColumnFamilies(new_cf_names, options);
+
+  Close();
+
+  // 3 CFs: default will be filled with overwrites (would normally trigger
+  // mempurge)
+  //        new_cf_names[1] will be filled with random values (would trigger
+  //        flush) new_cf_names[2] not filled with anything.
+  ReopenWithColumnFamilies(
+      {kDefaultColumnFamilyName, new_cf_names[0], new_cf_names[1]}, options);
+  size_t num_cfs = handles_.size();
+  ASSERT_EQ(3, num_cfs);
+  ASSERT_OK(Put(1, "foo", "bar"));
+  ASSERT_OK(Put(2, "bar", "baz"));
+
+  std::atomic<uint32_t> mempurge_count{0};
+  std::atomic<uint32_t> sst_count{0};
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::FlushJob:MemPurgeSuccessful",
+      [&](void* /*arg*/) { mempurge_count++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  const size_t KVSIZE = 3;
+  std::vector<std::string> KEYS(KVSIZE);
+  for (size_t k = 0; k < KVSIZE; k++) {
+    KEYS[k] = "IamKey" + std::to_string(k);
+  }
+
+  std::string RNDKEY;
+  std::vector<std::string> RNDVALS(KVSIZE);
+  const std::string NOT_FOUND = "NOT_FOUND";
+
+  // Heavy overwrite workload,
+  // more than would fit in maximum allowed memtables.
+  Random rnd(106);
+  const size_t NUM_REPEAT = 100;
+  const size_t RAND_KEY_LENGTH = 128;
+  const size_t RAND_VALUES_LENGTH = 10240;
+
+  // Insertion of of K-V pairs, multiple times (overwrites).
+  for (size_t i = 0; i < NUM_REPEAT; i++) {
+    for (size_t j = 0; j < KEYS.size(); j++) {
+      RNDKEY = rnd.RandomString(RAND_KEY_LENGTH);
+      RNDVALS[j] = rnd.RandomString(RAND_VALUES_LENGTH);
+      ASSERT_OK(Put(KEYS[j], RNDVALS[j]));
+      ASSERT_OK(Put(1, RNDKEY, RNDVALS[j]));
+      ASSERT_EQ(Get(KEYS[j]), RNDVALS[j]);
+      ASSERT_EQ(Get(1, RNDKEY), RNDVALS[j]);
+    }
+  }
+
+  // Check that there was no mempurge because atomic_flush option is true.
+  const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 0;
+  // Check that there was at least one SST files created during flush.
+  const uint32_t EXPECTED_SST_COUNT = 1;
+
+  EXPECT_EQ(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
+  EXPECT_GE(sst_count.exchange(0), EXPECTED_SST_COUNT);
+
+  Close();
+}
+
+TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
+  Options options = CurrentOptions();
+
+  options.statistics = CreateDBStatistics();
+  options.statistics->set_stats_level(StatsLevel::kAll);
+  options.create_if_missing = true;
+  options.compression = kNoCompression;
+  options.inplace_update_support = false;
+  options.allow_concurrent_memtable_write = true;
+#ifndef ROCKSDB_LITE
+  TestFlushListener* listener = new TestFlushListener(options.env, this);
+  options.listeners.emplace_back(listener);
+#endif  // !ROCKSDB_LITE
+  // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
+  options.write_buffer_size = 1 << 20;
+  // Activate the MemPurge prototype.
+  options.experimental_mempurge_threshold = 15.0;
+
+  ASSERT_OK(TryReopen(options));
+
+  std::atomic<uint32_t> mempurge_count{0};
+  std::atomic<uint32_t> sst_count{0};
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::FlushJob:MemPurgeSuccessful",
+      [&](void* /*arg*/) { mempurge_count++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  std::string KEY1 = "ThisIsKey1";
+  std::string KEY2 = "ThisIsKey2";
+  std::string KEY3 = "ThisIsKey3";
+  std::string KEY4 = "ThisIsKey4";
+  std::string KEY5 = "ThisIsKey5";
+  const std::string NOT_FOUND = "NOT_FOUND";
+
+  Random rnd(117);
+  const size_t NUM_REPEAT = 100;
+  const size_t RAND_VALUES_LENGTH = 10240;
+
+  std::string key, value, p_v1, p_v2, p_v3, p_v3b, p_v4, p_v5;
+  int count = 0;
+  const int EXPECTED_COUNT_FORLOOP = 3;
+  const int EXPECTED_COUNT_END = 4;
+
+  ReadOptions ropt;
+  ropt.pin_data = true;
+  ropt.total_order_seek = true;
+  Iterator* iter = nullptr;
+
+  // Insertion of of K-V pairs, multiple times.
+  // Also insert DeleteRange
+  for (size_t i = 0; i < NUM_REPEAT; i++) {
+    // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
+    p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_v3b = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
+    ASSERT_OK(Put(KEY1, p_v1));
+    ASSERT_OK(Put(KEY2, p_v2));
+    ASSERT_OK(Put(KEY3, p_v3));
+    ASSERT_OK(Put(KEY4, p_v4));
+    ASSERT_OK(Put(KEY5, p_v5));
+    ASSERT_OK(Delete(KEY2));
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY2,
+                               KEY4));
+    ASSERT_OK(Put(KEY3, p_v3b));
+    ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), KEY1,
+                               KEY3));
+    ASSERT_OK(Delete(KEY1));
+
+    ASSERT_EQ(Get(KEY1), NOT_FOUND);
+    ASSERT_EQ(Get(KEY2), NOT_FOUND);
+    ASSERT_EQ(Get(KEY3), p_v3b);
+    ASSERT_EQ(Get(KEY4), p_v4);
+    ASSERT_EQ(Get(KEY5), p_v5);
+
+    iter = db_->NewIterator(ropt);
+    iter->SeekToFirst();
+    count = 0;
+    for (; iter->Valid(); iter->Next()) {
+      ASSERT_OK(iter->status());
+      key = (iter->key()).ToString(false);
+      value = (iter->value()).ToString(false);
+      if (key.compare(KEY3) == 0)
+        ASSERT_EQ(value, p_v3b);
+      else if (key.compare(KEY4) == 0)
+        ASSERT_EQ(value, p_v4);
+      else if (key.compare(KEY5) == 0)
+        ASSERT_EQ(value, p_v5);
+      else
+        ASSERT_EQ(value, NOT_FOUND);
+      count++;
+    }
+
+    // Expected count here is 3: KEY3, KEY4, KEY5.
+    ASSERT_EQ(count, EXPECTED_COUNT_FORLOOP);
+    if (iter) {
+      delete iter;
+    }
+  }
+
+  // Check that there was at least one mempurge
+  const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
+  // Check that there was no SST files created during flush.
+  const uint32_t EXPECTED_SST_COUNT = 0;
+
+  EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
+  EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
+
+  // Additional test for the iterator+memPurge.
+  ASSERT_OK(Put(KEY2, p_v2));
+  iter = db_->NewIterator(ropt);
+  iter->SeekToFirst();
+  ASSERT_OK(Put(KEY4, p_v4));
+  count = 0;
+  for (; iter->Valid(); iter->Next()) {
+    ASSERT_OK(iter->status());
+    key = (iter->key()).ToString(false);
+    value = (iter->value()).ToString(false);
+    if (key.compare(KEY2) == 0)
+      ASSERT_EQ(value, p_v2);
+    else if (key.compare(KEY3) == 0)
+      ASSERT_EQ(value, p_v3b);
+    else if (key.compare(KEY4) == 0)
+      ASSERT_EQ(value, p_v4);
+    else if (key.compare(KEY5) == 0)
+      ASSERT_EQ(value, p_v5);
+    else
+      ASSERT_EQ(value, NOT_FOUND);
+    count++;
+  }
+
+  // Expected count here is 4: KEY2, KEY3, KEY4, KEY5.
+  ASSERT_EQ(count, EXPECTED_COUNT_END);
+  if (iter) delete iter;
+
+  Close();
+}
+
+// Create a Compaction Fitler that will be invoked
+// at flush time and will update the value of a KV pair
+// if the key string is "lower" than the filter_key_ string.
+class ConditionalUpdateFilter : public CompactionFilter {
+ public:
+  explicit ConditionalUpdateFilter(const std::string* filtered_key)
+      : filtered_key_(filtered_key) {}
+  bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/,
+              std::string* new_value, bool* value_changed) const override {
+    // If key<filtered_key_, update the value of the KV-pair.
+    if (key.compare(*filtered_key_) < 0) {
+      assert(new_value != nullptr);
+      *new_value = NEW_VALUE;
+      *value_changed = true;
+    }
+    return false /*do not remove this KV-pair*/;
+  }
+
+  const char* Name() const override { return "ConditionalUpdateFilter"; }
+
+ private:
+  const std::string* filtered_key_;
+};
+
+class ConditionalUpdateFilterFactory : public CompactionFilterFactory {
+ public:
+  explicit ConditionalUpdateFilterFactory(const Slice& filtered_key)
+      : filtered_key_(filtered_key.ToString()) {}
+
+  std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+      const CompactionFilter::Context& /*context*/) override {
+    return std::unique_ptr<CompactionFilter>(
+        new ConditionalUpdateFilter(&filtered_key_));
+  }
+
+  const char* Name() const override { return "ConditionalUpdateFilterFactory"; }
+
+  bool ShouldFilterTableFileCreation(
+      TableFileCreationReason reason) const override {
+    // This compaction filter will be invoked
+    // at flush time (and therefore at MemPurge time).
+    return (reason == TableFileCreationReason::kFlush);
+  }
+
+ private:
+  std::string filtered_key_;
+};
+
+TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
+  Options options = CurrentOptions();
+
+  std::string KEY1 = "ThisIsKey1";
+  std::string KEY2 = "ThisIsKey2";
+  std::string KEY3 = "ThisIsKey3";
+  std::string KEY4 = "ThisIsKey4";
+  std::string KEY5 = "ThisIsKey5";
+  std::string KEY6 = "ThisIsKey6";
+  std::string KEY7 = "ThisIsKey7";
+  std::string KEY8 = "ThisIsKey8";
+  std::string KEY9 = "ThisIsKey9";
+  const std::string NOT_FOUND = "NOT_FOUND";
+
+  options.statistics = CreateDBStatistics();
+  options.statistics->set_stats_level(StatsLevel::kAll);
+  options.create_if_missing = true;
+  options.compression = kNoCompression;
+  options.inplace_update_support = false;
+  options.allow_concurrent_memtable_write = true;
+#ifndef ROCKSDB_LITE
+  TestFlushListener* listener = new TestFlushListener(options.env, this);
+  options.listeners.emplace_back(listener);
+#endif  // !ROCKSDB_LITE
+  // Create a ConditionalUpdate compaction filter
+  // that will update all the values of the KV pairs
+  // where the keys are "lower" than KEY4.
+  options.compaction_filter_factory =
+      std::make_shared<ConditionalUpdateFilterFactory>(KEY4);
+
+  // Enforce size of a single MemTable to 64MB (64MB = 67108864 bytes).
+  options.write_buffer_size = 1 << 20;
+  // Activate the MemPurge prototype.
+  options.experimental_mempurge_threshold = 26.55;
+
+  ASSERT_OK(TryReopen(options));
+
+  std::atomic<uint32_t> mempurge_count{0};
+  std::atomic<uint32_t> sst_count{0};
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::FlushJob:MemPurgeSuccessful",
+      [&](void* /*arg*/) { mempurge_count++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  Random rnd(53);
+  const size_t NUM_REPEAT = 1000;
+  const size_t RAND_VALUES_LENGTH = 10240;
+  std::string p_v1, p_v2, p_v3, p_v4, p_v5, p_v6, p_v7, p_v8, p_v9;
+
+  p_v1 = rnd.RandomString(RAND_VALUES_LENGTH);
+  p_v2 = rnd.RandomString(RAND_VALUES_LENGTH);
+  p_v3 = rnd.RandomString(RAND_VALUES_LENGTH);
+  p_v4 = rnd.RandomString(RAND_VALUES_LENGTH);
+  p_v5 = rnd.RandomString(RAND_VALUES_LENGTH);
+  ASSERT_OK(Put(KEY1, p_v1));
+  ASSERT_OK(Put(KEY2, p_v2));
+  ASSERT_OK(Put(KEY3, p_v3));
+  ASSERT_OK(Put(KEY4, p_v4));
+  ASSERT_OK(Put(KEY5, p_v5));
+  ASSERT_OK(Delete(KEY1));
+
+  // Insertion of of K-V pairs, multiple times.
+  for (size_t i = 0; i < NUM_REPEAT; i++) {
+    // Create value strings of arbitrary
+    // length RAND_VALUES_LENGTH bytes.
+    p_v6 = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_v7 = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_v8 = rnd.RandomString(RAND_VALUES_LENGTH);
+    p_v9 = rnd.RandomString(RAND_VALUES_LENGTH);
+    ASSERT_OK(Put(KEY6, p_v6));
+    ASSERT_OK(Put(KEY7, p_v7));
+    ASSERT_OK(Put(KEY8, p_v8));
+    ASSERT_OK(Put(KEY9, p_v9));
+
+    ASSERT_OK(Delete(KEY7));
+  }
+
+  // Check that there was at least one mempurge
+  const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
+  // Check that there was no SST files created during flush.
+  const uint32_t EXPECTED_SST_COUNT = 0;
+
+  EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
+  EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
+
+  // Verify that the ConditionalUpdateCompactionFilter
+  // updated the values of KEY2 and KEY3, and not KEY4 and KEY5.
+  ASSERT_EQ(Get(KEY1), NOT_FOUND);
+  ASSERT_EQ(Get(KEY2), NEW_VALUE);
+  ASSERT_EQ(Get(KEY3), NEW_VALUE);
+  ASSERT_EQ(Get(KEY4), p_v4);
+  ASSERT_EQ(Get(KEY5), p_v5);
+}
+
+TEST_F(DBFlushTest, DISABLED_MemPurgeWALSupport) {
+  Options options = CurrentOptions();
+
+  options.statistics = CreateDBStatistics();
+  options.statistics->set_stats_level(StatsLevel::kAll);
+  options.create_if_missing = true;
+  options.compression = kNoCompression;
+  options.inplace_update_support = false;
+  options.allow_concurrent_memtable_write = true;
+
+  // Enforce size of a single MemTable to 128KB.
+  options.write_buffer_size = 128 << 10;
+  // Activate the MemPurge prototype
+  // (values >1.0 are equivalent to 1.0).
+  options.experimental_mempurge_threshold = 2.5;
+
+  ASSERT_OK(TryReopen(options));
+
+  const size_t KVSIZE = 10;
+
+  do {
+    CreateAndReopenWithCF({"pikachu"}, options);
+    ASSERT_OK(Put(1, "foo", "v1"));
+    ASSERT_OK(Put(1, "baz", "v5"));
+
+    ReopenWithColumnFamilies({"default", "pikachu"}, options);
+    ASSERT_EQ("v1", Get(1, "foo"));
+
+    ASSERT_EQ("v1", Get(1, "foo"));
+    ASSERT_EQ("v5", Get(1, "baz"));
+    ASSERT_OK(Put(0, "bar", "v2"));
+    ASSERT_OK(Put(1, "bar", "v2"));
+    ASSERT_OK(Put(1, "foo", "v3"));
+    std::atomic<uint32_t> mempurge_count{0};
+    std::atomic<uint32_t> sst_count{0};
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+        "DBImpl::FlushJob:MemPurgeSuccessful",
+        [&](void* /*arg*/) { mempurge_count++; });
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+        "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+    std::vector<std::string> keys;
+    for (size_t k = 0; k < KVSIZE; k++) {
+      keys.push_back("IamKey" + std::to_string(k));
+    }
+
+    std::string RNDKEY, RNDVALUE;
+    const std::string NOT_FOUND = "NOT_FOUND";
+
+    // Heavy overwrite workload,
+    // more than would fit in maximum allowed memtables.
+    Random rnd(719);
+    const size_t NUM_REPEAT = 100;
+    const size_t RAND_KEY_LENGTH = 4096;
+    const size_t RAND_VALUES_LENGTH = 1024;
+    std::vector<std::string> values_default(KVSIZE), values_pikachu(KVSIZE);
+
+    // Insert a very first set of keys that will be
+    // mempurged at least once.
+    for (size_t k = 0; k < KVSIZE / 2; k++) {
+      values_default[k] = rnd.RandomString(RAND_VALUES_LENGTH);
+      values_pikachu[k] = rnd.RandomString(RAND_VALUES_LENGTH);
+    }
+
+    // Insert keys[0:KVSIZE/2] to
+    // both 'default' and 'pikachu' CFs.
+    for (size_t k = 0; k < KVSIZE / 2; k++) {
+      ASSERT_OK(Put(0, keys[k], values_default[k]));
+      ASSERT_OK(Put(1, keys[k], values_pikachu[k]));
+    }
+
+    // Check that the insertion was seamless.
+    for (size_t k = 0; k < KVSIZE / 2; k++) {
+      ASSERT_EQ(Get(0, keys[k]), values_default[k]);
+      ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
+    }
+
+    // Insertion of of K-V pairs, multiple times (overwrites)
+    // into 'default' CF. Will trigger mempurge.
+    for (size_t j = 0; j < NUM_REPEAT; j++) {
+      // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
+      for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
+        values_default[k] = rnd.RandomString(RAND_VALUES_LENGTH);
+      }
+
+      // Insert K-V into default CF.
+      for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
+        ASSERT_OK(Put(0, keys[k], values_default[k]));
+      }
+
+      // Check key validity, for all keys, both in
+      // default and pikachu CFs.
+      for (size_t k = 0; k < KVSIZE; k++) {
+        ASSERT_EQ(Get(0, keys[k]), values_default[k]);
+      }
+      // Note that at this point, only keys[0:KVSIZE/2]
+      // have been inserted into Pikachu.
+      for (size_t k = 0; k < KVSIZE / 2; k++) {
+        ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
+      }
+    }
+
+    // Insertion of of K-V pairs, multiple times (overwrites)
+    // into 'pikachu' CF. Will trigger mempurge.
+    // Check that we keep the older logs for 'default' imm().
+    for (size_t j = 0; j < NUM_REPEAT; j++) {
+      // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
+      for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
+        values_pikachu[k] = rnd.RandomString(RAND_VALUES_LENGTH);
+      }
+
+      // Insert K-V into pikachu CF.
+      for (size_t k = KVSIZE / 2; k < KVSIZE; k++) {
+        ASSERT_OK(Put(1, keys[k], values_pikachu[k]));
+      }
+
+      // Check key validity, for all keys,
+      // both in default and pikachu.
+      for (size_t k = 0; k < KVSIZE; k++) {
+        ASSERT_EQ(Get(0, keys[k]), values_default[k]);
+        ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
+      }
+    }
+
+    // Check that there was at least one mempurge
+    const uint32_t EXPECTED_MIN_MEMPURGE_COUNT = 1;
+    // Check that there was no SST files created during flush.
+    const uint32_t EXPECTED_SST_COUNT = 0;
+
+    EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
+    if (options.experimental_mempurge_threshold ==
+        std::numeric_limits<double>::max()) {
+      EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
+    }
+
+    ReopenWithColumnFamilies({"default", "pikachu"}, options);
+    // Check that there was no data corruption anywhere,
+    // not in 'default' nor in 'Pikachu' CFs.
+    ASSERT_EQ("v3", Get(1, "foo"));
+    ASSERT_OK(Put(1, "foo", "v4"));
+    ASSERT_EQ("v4", Get(1, "foo"));
+    ASSERT_EQ("v2", Get(1, "bar"));
+    ASSERT_EQ("v5", Get(1, "baz"));
+    // Check keys in 'Default' and 'Pikachu'.
+    // keys[0:KVSIZE/2] were for sure contained
+    // in the imm() at Reopen/recovery time.
+    for (size_t k = 0; k < KVSIZE; k++) {
+      ASSERT_EQ(Get(0, keys[k]), values_default[k]);
+      ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
+    }
+    // Insertion of random K-V pairs to trigger
+    // a flush in the Pikachu CF.
+    for (size_t j = 0; j < NUM_REPEAT; j++) {
+      RNDKEY = rnd.RandomString(RAND_KEY_LENGTH);
+      RNDVALUE = rnd.RandomString(RAND_VALUES_LENGTH);
+      ASSERT_OK(Put(1, RNDKEY, RNDVALUE));
+    }
+    // ASsert than there was at least one flush to storage.
+    EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
+    ReopenWithColumnFamilies({"default", "pikachu"}, options);
+    ASSERT_EQ("v4", Get(1, "foo"));
+    ASSERT_EQ("v2", Get(1, "bar"));
+    ASSERT_EQ("v5", Get(1, "baz"));
+    // Since values in default are held in mutable mem()
+    // and imm(), check if the flush in pikachu didn't
+    // affect these values.
+    for (size_t k = 0; k < KVSIZE; k++) {
+      ASSERT_EQ(Get(0, keys[k]), values_default[k]);
+      ASSERT_EQ(Get(1, keys[k]), values_pikachu[k]);
+    }
+    ASSERT_EQ(Get(1, RNDKEY), RNDVALUE);
+  } while (ChangeWalOptions());
+}
+
+TEST_F(DBFlushTest, MemPurgeCorrectLogNumberAndSSTFileCreation) {
+  // Before our bug fix, we noticed that when 2 memtables were
+  // being flushed (with one memtable being the output of a
+  // previous MemPurge and one memtable being a newly-sealed memtable),
+  // the SST file created was not properly added to the DB version
+  // (via the VersionEdit obj), leading to data loss (the SST file
+  // was later being purged as an obsolete file).
+  // Therefore, we reproduce this scenario to test our fix.
+  Options options = CurrentOptions();
+
+  options.create_if_missing = true;
+  options.compression = kNoCompression;
+  options.inplace_update_support = false;
+  options.allow_concurrent_memtable_write = true;
+
+  // Enforce size of a single MemTable to 1MB (64MB = 1048576 bytes).
+  options.write_buffer_size = 1 << 20;
+  // Activate the MemPurge prototype.
+  options.experimental_mempurge_threshold = 1.0;
+
+  // Force to have more than one memtable to trigger a flush.
+  // For some reason this option does not seem to be enforced,
+  // so the following test is designed to make sure that we
+  // are testing the correct test case.
+  options.min_write_buffer_number_to_merge = 3;
+  options.max_write_buffer_number = 5;
+  options.max_write_buffer_size_to_maintain = 2 * (options.write_buffer_size);
+  options.disable_auto_compactions = true;
+  ASSERT_OK(TryReopen(options));
+
+  std::atomic<uint32_t> mempurge_count{0};
+  std::atomic<uint32_t> sst_count{0};
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::FlushJob:MemPurgeSuccessful",
+      [&](void* /*arg*/) { mempurge_count++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::FlushJob:SSTFileCreated", [&](void* /*arg*/) { sst_count++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  // Dummy variable used for the following callback function.
+  uint64_t ZERO = 0;
+  // We will first execute mempurge operations exclusively.
+  // Therefore, when the first flush is triggered, we want to make
+  // sure there is at least 2 memtables being flushed: one output
+  // from a previous mempurge, and one newly sealed memtable.
+  // This is when we observed in the past that some SST files created
+  // were not properly added to the DB version (via the VersionEdit obj).
+  std::atomic<uint64_t> num_memtable_at_first_flush(0);
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "FlushJob::WriteLevel0Table:num_memtables", [&](void* arg) {
+        uint64_t* mems_size = reinterpret_cast<uint64_t*>(arg);
+        // atomic_compare_exchange_strong sometimes updates the value
+        // of ZERO (the "expected" object), so we make sure ZERO is indeed...
+        // zero.
+        ZERO = 0;
+        std::atomic_compare_exchange_strong(&num_memtable_at_first_flush, &ZERO,
+                                            *mems_size);
+      });
+
+  const std::vector<std::string> KEYS = {
+      "ThisIsKey1", "ThisIsKey2", "ThisIsKey3", "ThisIsKey4", "ThisIsKey5",
+      "ThisIsKey6", "ThisIsKey7", "ThisIsKey8", "ThisIsKey9"};
+  const std::string NOT_FOUND = "NOT_FOUND";
+
+  Random rnd(117);
+  const uint64_t NUM_REPEAT_OVERWRITES = 100;
+  const uint64_t NUM_RAND_INSERTS = 500;
+  const uint64_t RAND_VALUES_LENGTH = 10240;
+
+  std::string key, value;
+  std::vector<std::string> values(9, "");
+
+  // Keys used to check that no SST file disappeared.
+  for (uint64_t k = 0; k < 5; k++) {
+    values[k] = rnd.RandomString(RAND_VALUES_LENGTH);
+    ASSERT_OK(Put(KEYS[k], values[k]));
+  }
+
+  // Insertion of of K-V pairs, multiple times.
+  // Trigger at least one mempurge and no SST file creation.
+  for (size_t i = 0; i < NUM_REPEAT_OVERWRITES; i++) {
+    // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
+    for (uint64_t k = 5; k < values.size(); k++) {
+      values[k] = rnd.RandomString(RAND_VALUES_LENGTH);
+      ASSERT_OK(Put(KEYS[k], values[k]));
+    }
+    // Check database consistency.
+    for (uint64_t k = 0; k < values.size(); k++) {
+      ASSERT_EQ(Get(KEYS[k]), values[k]);
+    }
+  }
+
+  // Check that there was at least one mempurge
+  uint32_t expected_min_mempurge_count = 1;
+  // Check that there was no SST files created during flush.
+  uint32_t expected_sst_count = 0;
+  EXPECT_GE(mempurge_count.load(), expected_min_mempurge_count);
+  EXPECT_EQ(sst_count.load(), expected_sst_count);
+
+  // Trigger an SST file creation and no mempurge.
+  for (size_t i = 0; i < NUM_RAND_INSERTS; i++) {
+    key = rnd.RandomString(RAND_VALUES_LENGTH);
+    // Create value strings of arbitrary length RAND_VALUES_LENGTH bytes.
+    value = rnd.RandomString(RAND_VALUES_LENGTH);
+    ASSERT_OK(Put(key, value));
+    // Check database consistency.
+    for (uint64_t k = 0; k < values.size(); k++) {
+      ASSERT_EQ(Get(KEYS[k]), values[k]);
+    }
+    ASSERT_EQ(Get(key), value);
+  }
+
+  // Check that there was at least one SST files created during flush.
+  expected_sst_count = 1;
+  EXPECT_GE(sst_count.load(), expected_sst_count);
+
+  // Oddly enough, num_memtable_at_first_flush is not enforced to be
+  // equal to min_write_buffer_number_to_merge. So by asserting that
+  // the first SST file creation comes from one output memtable
+  // from a previous mempurge, and one newly sealed memtable. This
+  // is the scenario where we observed that some SST files created
+  // were not properly added to the DB version before our bug fix.
+  ASSERT_GE(num_memtable_at_first_flush.load(), 2);
+
+  // Check that no data was lost after SST file creation.
+  for (uint64_t k = 0; k < values.size(); k++) {
+    ASSERT_EQ(Get(KEYS[k]), values[k]);
+  }
+  // Extra check of database consistency.
+  ASSERT_EQ(Get(key), value);
+
+  Close();
+}
+
 TEST_P(DBFlushDirectIOTest, DirectIO) {
   Options options;
   options.create_if_missing = true;
   options.disable_auto_compactions = true;
   options.max_background_flushes = 2;
   options.use_direct_io_for_flush_and_compaction = GetParam();
-  options.env = new MockEnv(Env::Default());
+  options.env = MockEnv::Create(Env::Default());
   SyncPoint::GetInstance()->SetCallBack(
       "BuildTable:create_file", [&](void* arg) {
         bool* use_direct_writes = static_cast<bool*>(arg);
@@ -311,8 +1822,8 @@ TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) {
   ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
 #ifndef ROCKSDB_LITE
   uint64_t num_bg_errors;
-  ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kBackgroundErrors,
-                                  &num_bg_errors));
+  ASSERT_TRUE(
+      db_->GetIntProperty(DB::Properties::kBackgroundErrors, &num_bg_errors));
   ASSERT_GT(num_bg_errors, 0);
 #endif  // ROCKSDB_LITE
 
@@ -397,7 +1908,7 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
   std::shared_ptr<TestListener> listener = std::make_shared<TestListener>();
 
   SyncPoint::GetInstance()->LoadDependency(
-      {{"DBImpl::BackgroundCallFlush:start",
+      {{"DBImpl::FlushMemTableToOutputFile:AfterPickMemtables",
         "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
        {"DBImpl::FlushMemTableToOutputFile:Finish",
         "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
@@ -439,6 +1950,9 @@ TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
   flush_opts.wait = false;
   ASSERT_OK(db_->Flush(flush_opts));
   t1.join();
+  // Ensure background work is fully finished including listener callbacks
+  // before accessing listener state.
+  ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
   ASSERT_TRUE(listener->completed1);
   ASSERT_TRUE(listener->completed2);
   SyncPoint::GetInstance()->DisableProcessing();
@@ -450,7 +1964,6 @@ TEST_F(DBFlushTest, FlushWithBlob) {
   constexpr uint64_t min_blob_size = 10;
 
   Options options;
-  options.env = CurrentOptions().env;
   options.enable_blob_files = true;
   options.min_blob_size = min_blob_size;
   options.disable_auto_compactions = true;
@@ -474,7 +1987,7 @@ TEST_F(DBFlushTest, FlushWithBlob) {
   ASSERT_EQ(Get("key1"), short_value);
   ASSERT_EQ(Get("key2"), long_value);
 
-  VersionSet* const versions = dbfull()->TEST_GetVersionSet();
+  VersionSet* const versions = dbfull()->GetVersionSet();
   assert(versions);
 
   ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
@@ -495,7 +2008,7 @@ TEST_F(DBFlushTest, FlushWithBlob) {
   const auto& blob_files = storage_info->GetBlobFiles();
   ASSERT_EQ(blob_files.size(), 1);
 
-  const auto& blob_file = blob_files.begin()->second;
+  const auto& blob_file = blob_files.front();
   assert(blob_file);
 
   ASSERT_EQ(table_file->smallest.user_key(), "key1");
@@ -511,27 +2024,268 @@ TEST_F(DBFlushTest, FlushWithBlob) {
   const InternalStats* const internal_stats = cfd->internal_stats();
   assert(internal_stats);
 
-  const uint64_t expected_bytes =
-      table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
-
   const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
   ASSERT_FALSE(compaction_stats.empty());
-  ASSERT_EQ(compaction_stats[0].bytes_written, expected_bytes);
-  ASSERT_EQ(compaction_stats[0].num_output_files, 2);
+  ASSERT_EQ(compaction_stats[0].bytes_written, table_file->fd.GetFileSize());
+  ASSERT_EQ(compaction_stats[0].bytes_written_blob,
+            blob_file->GetTotalBlobBytes());
+  ASSERT_EQ(compaction_stats[0].num_output_files, 1);
+  ASSERT_EQ(compaction_stats[0].num_output_files_blob, 1);
 
   const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
-  ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED], expected_bytes);
+  ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
+            compaction_stats[0].bytes_written +
+                compaction_stats[0].bytes_written_blob);
 #endif  // ROCKSDB_LITE
 }
 
+TEST_F(DBFlushTest, FlushWithChecksumHandoff1) {
+  if (mem_env_ || encrypted_env_) {
+    ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
+    return;
+  }
+  std::shared_ptr<FaultInjectionTestFS> fault_fs(
+      new FaultInjectionTestFS(FileSystem::Default()));
+  std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
+  Options options = CurrentOptions();
+  options.write_buffer_size = 100;
+  options.max_write_buffer_number = 4;
+  options.min_write_buffer_number_to_merge = 3;
+  options.disable_auto_compactions = true;
+  options.env = fault_fs_env.get();
+  options.checksum_handoff_file_types.Add(FileType::kTableFile);
+  Reopen(options);
+
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+  ASSERT_OK(Put("key1", "value1"));
+  ASSERT_OK(Put("key2", "value2"));
+  ASSERT_OK(dbfull()->TEST_SwitchMemtable());
+
+  // The hash does not match, write fails
+  // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
+  // Since the file system returns IOStatus::Corruption, it is an
+  // unrecoverable error.
+  SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
+    fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
+  });
+  ASSERT_OK(Put("key3", "value3"));
+  ASSERT_OK(Put("key4", "value4"));
+  SyncPoint::GetInstance()->EnableProcessing();
+  Status s = Flush();
+  ASSERT_EQ(s.severity(),
+            ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  Destroy(options);
+  Reopen(options);
+
+  // The file system does not support checksum handoff. The check
+  // will be ignored.
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
+  ASSERT_OK(Put("key5", "value5"));
+  ASSERT_OK(Put("key6", "value6"));
+  ASSERT_OK(dbfull()->TEST_SwitchMemtable());
+
+  // Each write will be similated as corrupted.
+  // Since the file system returns IOStatus::Corruption, it is an
+  // unrecoverable error.
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+  SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
+    fault_fs->IngestDataCorruptionBeforeWrite();
+  });
+  ASSERT_OK(Put("key7", "value7"));
+  ASSERT_OK(Put("key8", "value8"));
+  SyncPoint::GetInstance()->EnableProcessing();
+  s = Flush();
+  ASSERT_EQ(s.severity(),
+            ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
+  SyncPoint::GetInstance()->DisableProcessing();
+
+  Destroy(options);
+}
+
+TEST_F(DBFlushTest, FlushWithChecksumHandoff2) {
+  if (mem_env_ || encrypted_env_) {
+    ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
+    return;
+  }
+  std::shared_ptr<FaultInjectionTestFS> fault_fs(
+      new FaultInjectionTestFS(FileSystem::Default()));
+  std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
+  Options options = CurrentOptions();
+  options.write_buffer_size = 100;
+  options.max_write_buffer_number = 4;
+  options.min_write_buffer_number_to_merge = 3;
+  options.disable_auto_compactions = true;
+  options.env = fault_fs_env.get();
+  Reopen(options);
+
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+  ASSERT_OK(Put("key1", "value1"));
+  ASSERT_OK(Put("key2", "value2"));
+  ASSERT_OK(Flush());
+
+  // options is not set, the checksum handoff will not be triggered
+  SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
+    fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
+  });
+  ASSERT_OK(Put("key3", "value3"));
+  ASSERT_OK(Put("key4", "value4"));
+  SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_OK(Flush());
+  SyncPoint::GetInstance()->DisableProcessing();
+  Destroy(options);
+  Reopen(options);
+
+  // The file system does not support checksum handoff. The check
+  // will be ignored.
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
+  ASSERT_OK(Put("key5", "value5"));
+  ASSERT_OK(Put("key6", "value6"));
+  ASSERT_OK(Flush());
+
+  // options is not set, the checksum handoff will not be triggered
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+  SyncPoint::GetInstance()->SetCallBack("FlushJob::Start", [&](void*) {
+    fault_fs->IngestDataCorruptionBeforeWrite();
+  });
+  ASSERT_OK(Put("key7", "value7"));
+  ASSERT_OK(Put("key8", "value8"));
+  SyncPoint::GetInstance()->EnableProcessing();
+  ASSERT_OK(Flush());
+  SyncPoint::GetInstance()->DisableProcessing();
+
+  Destroy(options);
+}
+
+TEST_F(DBFlushTest, FlushWithChecksumHandoffManifest1) {
+  if (mem_env_ || encrypted_env_) {
+    ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
+    return;
+  }
+  std::shared_ptr<FaultInjectionTestFS> fault_fs(
+      new FaultInjectionTestFS(FileSystem::Default()));
+  std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
+  Options options = CurrentOptions();
+  options.write_buffer_size = 100;
+  options.max_write_buffer_number = 4;
+  options.min_write_buffer_number_to_merge = 3;
+  options.disable_auto_compactions = true;
+  options.env = fault_fs_env.get();
+  options.checksum_handoff_file_types.Add(FileType::kDescriptorFile);
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+  Reopen(options);
+
+  ASSERT_OK(Put("key1", "value1"));
+  ASSERT_OK(Put("key2", "value2"));
+  ASSERT_OK(Flush());
+
+  // The hash does not match, write fails
+  // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
+  // Since the file system returns IOStatus::Corruption, it is mapped to
+  // kFatalError error.
+  ASSERT_OK(Put("key3", "value3"));
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::LogAndApply:WriteManifest", [&](void*) {
+        fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
+      });
+  ASSERT_OK(Put("key3", "value3"));
+  ASSERT_OK(Put("key4", "value4"));
+  SyncPoint::GetInstance()->EnableProcessing();
+  Status s = Flush();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
+  SyncPoint::GetInstance()->DisableProcessing();
+  Destroy(options);
+}
+
+TEST_F(DBFlushTest, FlushWithChecksumHandoffManifest2) {
+  if (mem_env_ || encrypted_env_) {
+    ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
+    return;
+  }
+  std::shared_ptr<FaultInjectionTestFS> fault_fs(
+      new FaultInjectionTestFS(FileSystem::Default()));
+  std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
+  Options options = CurrentOptions();
+  options.write_buffer_size = 100;
+  options.max_write_buffer_number = 4;
+  options.min_write_buffer_number_to_merge = 3;
+  options.disable_auto_compactions = true;
+  options.env = fault_fs_env.get();
+  options.checksum_handoff_file_types.Add(FileType::kDescriptorFile);
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
+  Reopen(options);
+  // The file system does not support checksum handoff. The check
+  // will be ignored.
+  ASSERT_OK(Put("key5", "value5"));
+  ASSERT_OK(Put("key6", "value6"));
+  ASSERT_OK(Flush());
+
+  // Each write will be similated as corrupted.
+  // Since the file system returns IOStatus::Corruption, it is mapped to
+  // kFatalError error.
+  fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::LogAndApply:WriteManifest",
+      [&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); });
+  ASSERT_OK(Put("key7", "value7"));
+  ASSERT_OK(Put("key8", "value8"));
+  SyncPoint::GetInstance()->EnableProcessing();
+  Status s = Flush();
+  ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
+  SyncPoint::GetInstance()->DisableProcessing();
+
+  Destroy(options);
+}
+
+TEST_F(DBFlushTest, PickRightMemtables) {
+  Options options = CurrentOptions();
+  DestroyAndReopen(options);
+  options.create_if_missing = true;
+
+  const std::string test_cf_name = "test_cf";
+  options.max_write_buffer_number = 128;
+  CreateColumnFamilies({test_cf_name}, options);
+
+  Close();
+
+  ReopenWithColumnFamilies({kDefaultColumnFamilyName, test_cf_name}, options);
+
+  ASSERT_OK(db_->Put(WriteOptions(), "key", "value"));
+
+  ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "key", "value"));
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::SyncClosedLogs:BeforeReLock", [&](void* /*arg*/) {
+        ASSERT_OK(db_->Put(WriteOptions(), handles_[1], "what", "v"));
+        auto* cfhi =
+            static_cast_with_check<ColumnFamilyHandleImpl>(handles_[1]);
+        assert(cfhi);
+        ASSERT_OK(dbfull()->TEST_SwitchMemtable(cfhi->cfd()));
+      });
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::FlushMemTableToOutputFile:AfterPickMemtables", [&](void* arg) {
+        auto* job = reinterpret_cast<FlushJob*>(arg);
+        assert(job);
+        const auto& mems = job->GetMemTables();
+        assert(mems.size() == 1);
+        assert(mems[0]);
+        ASSERT_EQ(1, mems[0]->GetID());
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(db_->Flush(FlushOptions(), handles_[1]));
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
 class DBFlushTestBlobError : public DBFlushTest,
                              public testing::WithParamInterface<std::string> {
  public:
-  DBFlushTestBlobError()
-      : fault_injection_env_(env_), sync_point_(GetParam()) {}
-  ~DBFlushTestBlobError() { Close(); }
+  DBFlushTestBlobError() : sync_point_(GetParam()) {}
 
-  FaultInjectionTestEnv fault_injection_env_;
   std::string sync_point_;
 };
 
@@ -544,20 +2298,18 @@ TEST_P(DBFlushTestBlobError, FlushError) {
   Options options;
   options.enable_blob_files = true;
   options.disable_auto_compactions = true;
-  options.env = &fault_injection_env_;
+  options.env = env_;
 
   Reopen(options);
 
   ASSERT_OK(Put("key", "blob"));
 
-  SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
-    fault_injection_env_.SetFilesystemActive(false,
-                                             Status::IOError(sync_point_));
+  SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
+    Status* const s = static_cast<Status*>(arg);
+    assert(s);
+
+    (*s) = Status::IOError(sync_point_);
   });
-  SyncPoint::GetInstance()->SetCallBack(
-      "BuildTable:BeforeDeleteFile", [this](void* /* arg */) {
-        fault_injection_env_.SetFilesystemActive(true);
-      });
   SyncPoint::GetInstance()->EnableProcessing();
 
   ASSERT_NOK(Flush());
@@ -565,7 +2317,7 @@ TEST_P(DBFlushTestBlobError, FlushError) {
   SyncPoint::GetInstance()->DisableProcessing();
   SyncPoint::GetInstance()->ClearAllCallBacks();
 
-  VersionSet* const versions = dbfull()->TEST_GetVersionSet();
+  VersionSet* const versions = dbfull()->GetVersionSet();
   assert(versions);
 
   ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
@@ -607,19 +2359,169 @@ TEST_P(DBFlushTestBlobError, FlushError) {
 
   if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
     ASSERT_EQ(compaction_stats[0].bytes_written, 0);
+    ASSERT_EQ(compaction_stats[0].bytes_written_blob, 0);
     ASSERT_EQ(compaction_stats[0].num_output_files, 0);
+    ASSERT_EQ(compaction_stats[0].num_output_files_blob, 0);
   } else {
     // SST file writing succeeded; blob file writing failed (during Finish)
     ASSERT_GT(compaction_stats[0].bytes_written, 0);
+    ASSERT_EQ(compaction_stats[0].bytes_written_blob, 0);
     ASSERT_EQ(compaction_stats[0].num_output_files, 1);
+    ASSERT_EQ(compaction_stats[0].num_output_files_blob, 0);
   }
 
   const uint64_t* const cf_stats_value = internal_stats->TEST_GetCFStatsValue();
   ASSERT_EQ(cf_stats_value[InternalStats::BYTES_FLUSHED],
-            compaction_stats[0].bytes_written);
+            compaction_stats[0].bytes_written +
+                compaction_stats[0].bytes_written_blob);
 #endif  // ROCKSDB_LITE
 }
 
+#ifndef ROCKSDB_LITE
+TEST_F(DBFlushTest, TombstoneVisibleInSnapshot) {
+  class SimpleTestFlushListener : public EventListener {
+   public:
+    explicit SimpleTestFlushListener(DBFlushTest* _test) : test_(_test) {}
+    ~SimpleTestFlushListener() override {}
+
+    void OnFlushBegin(DB* db, const FlushJobInfo& info) override {
+      ASSERT_EQ(static_cast<uint32_t>(0), info.cf_id);
+
+      ASSERT_OK(db->Delete(WriteOptions(), "foo"));
+      snapshot_ = db->GetSnapshot();
+      ASSERT_OK(db->Put(WriteOptions(), "foo", "value"));
+
+      auto* dbimpl = static_cast_with_check<DBImpl>(db);
+      assert(dbimpl);
+
+      ColumnFamilyHandle* cfh = db->DefaultColumnFamily();
+      auto* cfhi = static_cast_with_check<ColumnFamilyHandleImpl>(cfh);
+      assert(cfhi);
+      ASSERT_OK(dbimpl->TEST_SwitchMemtable(cfhi->cfd()));
+    }
+
+    DBFlushTest* test_ = nullptr;
+    const Snapshot* snapshot_ = nullptr;
+  };
+
+  Options options = CurrentOptions();
+  options.create_if_missing = true;
+  auto* listener = new SimpleTestFlushListener(this);
+  options.listeners.emplace_back(listener);
+  DestroyAndReopen(options);
+
+  ASSERT_OK(db_->Put(WriteOptions(), "foo", "value0"));
+
+  ManagedSnapshot snapshot_guard(db_);
+
+  ColumnFamilyHandle* default_cf = db_->DefaultColumnFamily();
+  ASSERT_OK(db_->Flush(FlushOptions(), default_cf));
+
+  const Snapshot* snapshot = listener->snapshot_;
+  assert(snapshot);
+
+  ReadOptions read_opts;
+  read_opts.snapshot = snapshot;
+
+  // Using snapshot should not see "foo".
+  {
+    std::string value;
+    Status s = db_->Get(read_opts, "foo", &value);
+    ASSERT_TRUE(s.IsNotFound());
+  }
+
+  db_->ReleaseSnapshot(snapshot);
+}
+
+TEST_P(DBAtomicFlushTest, ManualFlushUnder2PC) {
+  Options options = CurrentOptions();
+  options.create_if_missing = true;
+  options.allow_2pc = true;
+  options.atomic_flush = GetParam();
+  // 64MB so that memtable flush won't be trigger by the small writes.
+  options.write_buffer_size = (static_cast<size_t>(64) << 20);
+
+  // Destroy the DB to recreate as a TransactionDB.
+  Close();
+  Destroy(options, true);
+
+  // Create a TransactionDB.
+  TransactionDB* txn_db = nullptr;
+  TransactionDBOptions txn_db_opts;
+  txn_db_opts.write_policy = TxnDBWritePolicy::WRITE_COMMITTED;
+  ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, &txn_db));
+  ASSERT_NE(txn_db, nullptr);
+  db_ = txn_db;
+
+  // Create two more columns other than default CF.
+  std::vector<std::string> cfs = {"puppy", "kitty"};
+  CreateColumnFamilies(cfs, options);
+  ASSERT_EQ(handles_.size(), 2);
+  ASSERT_EQ(handles_[0]->GetName(), cfs[0]);
+  ASSERT_EQ(handles_[1]->GetName(), cfs[1]);
+  const size_t kNumCfToFlush = options.atomic_flush ? 2 : 1;
+
+  WriteOptions wopts;
+  TransactionOptions txn_opts;
+  // txn1 only prepare, but does not commit.
+  // The WAL containing the prepared but uncommitted data must be kept.
+  Transaction* txn1 = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
+  // txn2 not only prepare, but also commit.
+  Transaction* txn2 = txn_db->BeginTransaction(wopts, txn_opts, nullptr);
+  ASSERT_NE(txn1, nullptr);
+  ASSERT_NE(txn2, nullptr);
+  for (size_t i = 0; i < kNumCfToFlush; i++) {
+    ASSERT_OK(txn1->Put(handles_[i], "k1", "v1"));
+    ASSERT_OK(txn2->Put(handles_[i], "k2", "v2"));
+  }
+  // A txn must be named before prepare.
+  ASSERT_OK(txn1->SetName("txn1"));
+  ASSERT_OK(txn2->SetName("txn2"));
+  // Prepare writes to WAL, but not to memtable. (WriteCommitted)
+  ASSERT_OK(txn1->Prepare());
+  ASSERT_OK(txn2->Prepare());
+  // Commit writes to memtable.
+  ASSERT_OK(txn2->Commit());
+  delete txn1;
+  delete txn2;
+
+  // There are still data in memtable not flushed.
+  // But since data is small enough to reside in the active memtable,
+  // there are no immutable memtable.
+  for (size_t i = 0; i < kNumCfToFlush; i++) {
+    auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
+    ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
+    ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
+  }
+
+  // Atomic flush memtables,
+  // the min log with prepared data should be written to MANIFEST.
+  std::vector<ColumnFamilyHandle*> cfs_to_flush(kNumCfToFlush);
+  for (size_t i = 0; i < kNumCfToFlush; i++) {
+    cfs_to_flush[i] = handles_[i];
+  }
+  ASSERT_OK(txn_db->Flush(FlushOptions(), cfs_to_flush));
+
+  // There are no remaining data in memtable after flush.
+  for (size_t i = 0; i < kNumCfToFlush; i++) {
+    auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
+    ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
+    ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
+    ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush);
+  }
+
+  // The recovered min log number with prepared data should be non-zero.
+  // In 2pc mode, MinLogNumberToKeep returns the
+  // VersionSet::min_log_number_to_keep recovered from MANIFEST, if it's 0,
+  // it means atomic flush didn't write the min_log_number_to_keep to MANIFEST.
+  cfs.push_back(kDefaultColumnFamilyName);
+  ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
+  DBImpl* db_impl = reinterpret_cast<DBImpl*>(db_);
+  ASSERT_TRUE(db_impl->allow_2pc());
+  ASSERT_NE(db_impl->MinLogNumberToKeep(), 0);
+}
+#endif  // ROCKSDB_LITE
+
 TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
   Options options = CurrentOptions();
   options.create_if_missing = true;
@@ -634,18 +2536,84 @@ TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
   for (size_t i = 0; i != num_cfs; ++i) {
     ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
   }
+
+  for (size_t i = 0; i != num_cfs; ++i) {
+    auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
+    ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
+    ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
+  }
+
   std::vector<int> cf_ids;
   for (size_t i = 0; i != num_cfs; ++i) {
     cf_ids.emplace_back(static_cast<int>(i));
   }
   ASSERT_OK(Flush(cf_ids));
+
   for (size_t i = 0; i != num_cfs; ++i) {
     auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
+    ASSERT_EQ(cfh->cfd()->GetFlushReason(), FlushReason::kManualFlush);
     ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
     ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
   }
 }
 
+TEST_P(DBAtomicFlushTest, PrecomputeMinLogNumberToKeepNon2PC) {
+  Options options = CurrentOptions();
+  options.create_if_missing = true;
+  options.atomic_flush = GetParam();
+  options.write_buffer_size = (static_cast<size_t>(64) << 20);
+  CreateAndReopenWithCF({"pikachu"}, options);
+
+  const size_t num_cfs = handles_.size();
+  ASSERT_EQ(num_cfs, 2);
+  WriteOptions wopts;
+  for (size_t i = 0; i != num_cfs; ++i) {
+    ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
+  }
+
+  {
+    // Flush the default CF only.
+    std::vector<int> cf_ids{0};
+    ASSERT_OK(Flush(cf_ids));
+
+    autovector<ColumnFamilyData*> flushed_cfds;
+    autovector<autovector<VersionEdit*>> flush_edits;
+    auto flushed_cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[0]);
+    flushed_cfds.push_back(flushed_cfh->cfd());
+    flush_edits.push_back({});
+    auto unflushed_cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[1]);
+
+    ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(),
+                                                 flushed_cfds, flush_edits),
+              unflushed_cfh->cfd()->GetLogNumber());
+  }
+
+  {
+    // Flush all CFs.
+    std::vector<int> cf_ids;
+    for (size_t i = 0; i != num_cfs; ++i) {
+      cf_ids.emplace_back(static_cast<int>(i));
+    }
+    ASSERT_OK(Flush(cf_ids));
+    uint64_t log_num_after_flush = dbfull()->TEST_GetCurrentLogNumber();
+
+    uint64_t min_log_number_to_keep = std::numeric_limits<uint64_t>::max();
+    autovector<ColumnFamilyData*> flushed_cfds;
+    autovector<autovector<VersionEdit*>> flush_edits;
+    for (size_t i = 0; i != num_cfs; ++i) {
+      auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
+      flushed_cfds.push_back(cfh->cfd());
+      flush_edits.push_back({});
+      min_log_number_to_keep =
+          std::min(min_log_number_to_keep, cfh->cfd()->GetLogNumber());
+    }
+    ASSERT_EQ(min_log_number_to_keep, log_num_after_flush);
+    ASSERT_EQ(PrecomputeMinLogNumberToKeepNon2PC(dbfull()->GetVersionSet(),
+                                                 flushed_cfds, flush_edits),
+              min_log_number_to_keep);
+  }
+}
+
 TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) {
   Options options = CurrentOptions();
   options.create_if_missing = true;
@@ -829,7 +2797,7 @@ TEST_P(DBAtomicFlushTest, TriggerFlushAndClose) {
   options.create_if_missing = true;
   options.atomic_flush = atomic_flush;
   options.memtable_factory.reset(
-      new SpecialSkipListFactory(kNumKeysTriggerFlush));
+      test::NewSpecialSkipListFactory(kNumKeysTriggerFlush));
   CreateAndReopenWithCF({"pikachu"}, options);
 
   for (int i = 0; i != kNumKeysTriggerFlush; ++i) {
@@ -948,6 +2916,160 @@ TEST_P(DBAtomicFlushTest, RollbackAfterFailToInstallResults) {
   SyncPoint::GetInstance()->ClearAllCallBacks();
 }
 
+// In atomic flush, concurrent bg flush threads commit to the MANIFEST in
+// serial, in the order of their picked memtables for each column family.
+// Only when a bg flush thread finds out that its memtables are the earliest
+// unflushed ones for all the included column families will this bg flush
+// thread continue to commit to MANIFEST.
+// This unit test uses sync point to coordinate the execution of two bg threads
+// executing the same sequence of functions. The interleaving are as follows.
+// time            bg1                            bg2
+//  |   pick memtables to flush
+//  |   flush memtables cf1_m1, cf2_m1
+//  |   join MANIFEST write queue
+//  |                                     pick memtabls to flush
+//  |                                     flush memtables cf1_(m1+1)
+//  |                                     join MANIFEST write queue
+//  |                                     wait to write MANIFEST
+//  |   write MANIFEST
+//  |   IO error
+//  |                                     detect IO error and stop waiting
+//  V
+TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) {
+  bool atomic_flush = GetParam();
+  if (!atomic_flush) {
+    return;
+  }
+  auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
+  Options options = GetDefaultOptions();
+  options.create_if_missing = true;
+  options.atomic_flush = true;
+  options.env = fault_injection_env.get();
+  // Set a larger value than default so that RocksDB can schedule concurrent
+  // background flush threads.
+  options.max_background_jobs = 8;
+  options.max_write_buffer_number = 8;
+  CreateAndReopenWithCF({"pikachu"}, options);
+
+  assert(2 == handles_.size());
+
+  WriteOptions write_opts;
+  write_opts.disableWAL = true;
+
+  ASSERT_OK(Put(0, "a", "v_0_a", write_opts));
+  ASSERT_OK(Put(1, "a", "v_1_a", write_opts));
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+
+  SyncPoint::GetInstance()->LoadDependency({
+      {"BgFlushThr2:WaitToCommit", "BgFlushThr1:BeforeWriteManifest"},
+  });
+
+  std::thread::id bg_flush_thr1, bg_flush_thr2;
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::BackgroundCallFlush:start", [&](void*) {
+        if (bg_flush_thr1 == std::thread::id()) {
+          bg_flush_thr1 = std::this_thread::get_id();
+        } else if (bg_flush_thr2 == std::thread::id()) {
+          bg_flush_thr2 = std::this_thread::get_id();
+        }
+      });
+
+  int called = 0;
+  SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", [&](void* arg) {
+        if (std::this_thread::get_id() == bg_flush_thr2) {
+          const auto* ptr = reinterpret_cast<std::pair<Status, bool>*>(arg);
+          assert(ptr);
+          if (0 == called) {
+            // When bg flush thread 2 reaches here for the first time.
+            ASSERT_OK(ptr->first);
+            ASSERT_TRUE(ptr->second);
+          } else if (1 == called) {
+            // When bg flush thread 2 reaches here for the second time.
+            ASSERT_TRUE(ptr->first.IsIOError());
+            ASSERT_FALSE(ptr->second);
+          }
+          ++called;
+          TEST_SYNC_POINT("BgFlushThr2:WaitToCommit");
+        }
+      });
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
+      [&](void*) {
+        if (std::this_thread::get_id() == bg_flush_thr1) {
+          TEST_SYNC_POINT("BgFlushThr1:BeforeWriteManifest");
+        }
+      });
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::LogAndApply:WriteManifest", [&](void*) {
+        if (std::this_thread::get_id() != bg_flush_thr1) {
+          return;
+        }
+        ASSERT_OK(db_->Put(write_opts, "b", "v_1_b"));
+
+        FlushOptions flush_opts;
+        flush_opts.wait = false;
+        std::vector<ColumnFamilyHandle*> cfhs(1, db_->DefaultColumnFamily());
+        ASSERT_OK(dbfull()->Flush(flush_opts, cfhs));
+      });
+
+  SyncPoint::GetInstance()->SetCallBack(
+      "VersionSet::ProcessManifestWrites:AfterSyncManifest", [&](void* arg) {
+        auto* ptr = reinterpret_cast<IOStatus*>(arg);
+        assert(ptr);
+        *ptr = IOStatus::IOError("Injected failure");
+      });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_TRUE(dbfull()->Flush(FlushOptions(), handles_).IsIOError());
+
+  Close();
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_P(DBAtomicFlushTest, NoWaitWhenWritesStopped) {
+  Options options = GetDefaultOptions();
+  options.create_if_missing = true;
+  options.atomic_flush = GetParam();
+  options.max_write_buffer_number = 2;
+  options.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
+
+  Reopen(options);
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::DelayWrite:Start",
+        "DBAtomicFlushTest::NoWaitWhenWritesStopped:0"}});
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  ASSERT_OK(dbfull()->PauseBackgroundWork());
+  for (int i = 0; i < options.max_write_buffer_number; ++i) {
+    ASSERT_OK(Put("k" + std::to_string(i), "v" + std::to_string(i)));
+  }
+  std::thread stalled_writer([&]() { ASSERT_OK(Put("k", "v")); });
+
+  TEST_SYNC_POINT("DBAtomicFlushTest::NoWaitWhenWritesStopped:0");
+
+  {
+    FlushOptions flush_opts;
+    flush_opts.wait = false;
+    flush_opts.allow_write_stall = true;
+    ASSERT_TRUE(db_->Flush(flush_opts).IsTryAgain());
+  }
+
+  ASSERT_OK(dbfull()->ContinueBackgroundWork());
+  ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+
+  stalled_writer.join();
+
+  SyncPoint::GetInstance()->DisableProcessing();
+}
+
 INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
                         testing::Bool());