]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / db_test.cc
index 87f1494aebc9fec63c87f3e7683c3e8dea73da64..682a382d7a7a9faf300b09bcecd6e4ba2d2dae50 100644 (file)
@@ -1,7 +1,7 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 //
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
@@ -23,6 +23,7 @@
 #include <alloca.h>
 #endif
 
+#include "cache/lru_cache.h"
 #include "db/db_impl.h"
 #include "db/db_test_util.h"
 #include "db/dbformat.h"
@@ -59,8 +60,6 @@
 #include "util/compression.h"
 #include "util/file_reader_writer.h"
 #include "util/filename.h"
-#include "util/hash.h"
-#include "util/logging.h"
 #include "util/mutexlock.h"
 #include "util/rate_limiter.h"
 #include "util/string_util.h"
@@ -223,6 +222,10 @@ TEST_F(DBTest, SkipDelay) {
 
   for (bool sync : {true, false}) {
     for (bool disableWAL : {true, false}) {
+      if (sync && disableWAL) {
+        // sync and disableWAL is incompatible.
+        continue;
+      }
       // Use a small number to ensure a large delay that is still effective
       // when we do Put
       // TODO(myabandeh): this is time dependent and could potentially make
@@ -231,11 +234,11 @@ TEST_F(DBTest, SkipDelay) {
       std::atomic<int> sleep_count(0);
       rocksdb::SyncPoint::GetInstance()->SetCallBack(
           "DBImpl::DelayWrite:Sleep",
-          [&](void* arg) { sleep_count.fetch_add(1); });
+          [&](void* /*arg*/) { sleep_count.fetch_add(1); });
       std::atomic<int> wait_count(0);
       rocksdb::SyncPoint::GetInstance()->SetCallBack(
           "DBImpl::DelayWrite:Wait",
-          [&](void* arg) { wait_count.fetch_add(1); });
+          [&](void* /*arg*/) { wait_count.fetch_add(1); });
       rocksdb::SyncPoint::GetInstance()->EnableProcessing();
 
       WriteOptions wo;
@@ -259,6 +262,196 @@ TEST_F(DBTest, SkipDelay) {
   }
 }
 
+TEST_F(DBTest, MixedSlowdownOptions) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.write_buffer_size = 100000;
+  CreateAndReopenWithCF({"pikachu"}, options);
+  std::vector<port::Thread> threads;
+  std::atomic<int> thread_num(0);
+
+  std::function<void()> write_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = false;
+    ASSERT_OK(dbfull()->Put(wo, key, "bar"));
+  };
+  std::function<void()> write_no_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = true;
+    ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
+  };
+  // Use a small number to ensure a large delay that is still effective
+  // when we do Put
+  // TODO(myabandeh): this is time dependent and could potentially make
+  // the test flaky
+  auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
+  std::atomic<int> sleep_count(0);
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::DelayWrite:BeginWriteStallDone",
+      [&](void* /*arg*/) {
+        sleep_count.fetch_add(1);
+        if (threads.empty()) {
+          for (int i = 0; i < 2; ++i) {
+            threads.emplace_back(write_slowdown_func);
+          }
+          for (int i = 0; i < 2; ++i) {
+            threads.emplace_back(write_no_slowdown_func);
+          }
+        }
+      });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  WriteOptions wo;
+  wo.sync = false;
+  wo.disableWAL = false;
+  wo.no_slowdown = false;
+  dbfull()->Put(wo, "foo", "bar");
+  // We need the 2nd write to trigger delay. This is because delay is
+  // estimated based on the last write size which is 0 for the first write.
+  ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
+          token.reset();
+
+  for (auto& t : threads) {
+    t.join();
+  }
+  ASSERT_GE(sleep_count.load(), 1);
+
+  wo.no_slowdown = true;
+  ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
+}
+
+TEST_F(DBTest, MixedSlowdownOptionsInQueue) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.write_buffer_size = 100000;
+  CreateAndReopenWithCF({"pikachu"}, options);
+  std::vector<port::Thread> threads;
+  std::atomic<int> thread_num(0);
+
+  std::function<void()> write_no_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = true;
+    ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
+  };
+  // Use a small number to ensure a large delay that is still effective
+  // when we do Put
+  // TODO(myabandeh): this is time dependent and could potentially make
+  // the test flaky
+  auto token = dbfull()->TEST_write_controler().GetDelayToken(1);
+  std::atomic<int> sleep_count(0);
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::DelayWrite:Sleep",
+      [&](void* /*arg*/) {
+        sleep_count.fetch_add(1);
+        if (threads.empty()) {
+          for (int i = 0; i < 2; ++i) {
+            threads.emplace_back(write_no_slowdown_func);
+          }
+          // Sleep for 2s to allow the threads to insert themselves into the
+          // write queue
+          env_->SleepForMicroseconds(3000000ULL);
+        }
+      });
+  std::atomic<int> wait_count(0);
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::DelayWrite:Wait",
+      [&](void* /*arg*/) { wait_count.fetch_add(1); });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  WriteOptions wo;
+  wo.sync = false;
+  wo.disableWAL = false;
+  wo.no_slowdown = false;
+  dbfull()->Put(wo, "foo", "bar");
+  // We need the 2nd write to trigger delay. This is because delay is
+  // estimated based on the last write size which is 0 for the first write.
+  ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
+          token.reset();
+
+  for (auto& t : threads) {
+    t.join();
+  }
+  ASSERT_EQ(sleep_count.load(), 1);
+  ASSERT_GE(wait_count.load(), 0);
+}
+
+TEST_F(DBTest, MixedSlowdownOptionsStop) {
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.write_buffer_size = 100000;
+  CreateAndReopenWithCF({"pikachu"}, options);
+  std::vector<port::Thread> threads;
+  std::atomic<int> thread_num(0);
+
+  std::function<void()> write_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = false;
+    ASSERT_OK(dbfull()->Put(wo, key, "bar"));
+  };
+  std::function<void()> write_no_slowdown_func = [&]() {
+    int a = thread_num.fetch_add(1);
+    std::string key = "foo" + std::to_string(a);
+    WriteOptions wo;
+    wo.no_slowdown = true;
+    ASSERT_NOK(dbfull()->Put(wo, key, "bar"));
+  };
+  std::function<void()> wakeup_writer = [&]() {
+    dbfull()->mutex_.Lock();
+    dbfull()->bg_cv_.SignalAll();
+    dbfull()->mutex_.Unlock();
+  };
+  // Use a small number to ensure a large delay that is still effective
+  // when we do Put
+  // TODO(myabandeh): this is time dependent and could potentially make
+  // the test flaky
+  auto token = dbfull()->TEST_write_controler().GetStopToken();
+  std::atomic<int> wait_count(0);
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::DelayWrite:Wait",
+      [&](void* /*arg*/) {
+        wait_count.fetch_add(1);
+        if (threads.empty()) {
+          for (int i = 0; i < 2; ++i) {
+            threads.emplace_back(write_slowdown_func);
+          }
+          for (int i = 0; i < 2; ++i) {
+            threads.emplace_back(write_no_slowdown_func);
+          }
+          // Sleep for 2s to allow the threads to insert themselves into the
+          // write queue
+          env_->SleepForMicroseconds(3000000ULL);
+        }
+        token.reset();
+        threads.emplace_back(wakeup_writer);
+      });
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  WriteOptions wo;
+  wo.sync = false;
+  wo.disableWAL = false;
+  wo.no_slowdown = false;
+  dbfull()->Put(wo, "foo", "bar");
+  // We need the 2nd write to trigger delay. This is because delay is
+  // estimated based on the last write size which is 0 for the first write.
+  ASSERT_OK(dbfull()->Put(wo, "foo2", "bar2"));
+          token.reset();
+
+  for (auto& t : threads) {
+    t.join();
+  }
+  ASSERT_GE(wait_count.load(), 1);
+
+  wo.no_slowdown = true;
+  ASSERT_OK(dbfull()->Put(wo, "foo3", "bar"));
+}
 #ifndef ROCKSDB_LITE
 
 TEST_F(DBTest, LevelLimitReopen) {
@@ -477,6 +670,36 @@ TEST_F(DBTest, SingleDeletePutFlush) {
                          kSkipUniversalCompaction | kSkipMergePut));
 }
 
+// Disable because not all platform can run it.
+// It requires more than 9GB memory to run it, With single allocation
+// of more than 3GB.
+TEST_F(DBTest, DISABLED_SanitizeVeryVeryLargeValue) {
+  const size_t kValueSize = 4 * size_t{1024 * 1024 * 1024};  // 4GB value
+  std::string raw(kValueSize, 'v');
+  Options options = CurrentOptions();
+  options.env = env_;
+  options.merge_operator = MergeOperators::CreatePutOperator();
+  options.write_buffer_size = 100000;  // Small write buffer
+  options.paranoid_checks = true;
+  DestroyAndReopen(options);
+
+  ASSERT_OK(Put("boo", "v1"));
+  ASSERT_TRUE(Put("foo", raw).IsInvalidArgument());
+  ASSERT_TRUE(Merge("foo", raw).IsInvalidArgument());
+
+  WriteBatch wb;
+  ASSERT_TRUE(wb.Put("foo", raw).IsInvalidArgument());
+  ASSERT_TRUE(wb.Merge("foo", raw).IsInvalidArgument());
+
+  Slice value_slice = raw;
+  Slice key_slice = "foo";
+  SliceParts sp_key(&key_slice, 1);
+  SliceParts sp_value(&value_slice, 1);
+
+  ASSERT_TRUE(wb.Put(sp_key, sp_value).IsInvalidArgument());
+  ASSERT_TRUE(wb.Merge(sp_key, sp_value).IsInvalidArgument());
+}
+
 // Disable because not all platform can run it.
 // It requires more than 9GB memory to run it, With single allocation
 // of more than 3GB.
@@ -500,7 +723,9 @@ TEST_F(DBTest, DISABLED_VeryLargeValue) {
   ASSERT_OK(Put(key2, raw));
   dbfull()->TEST_WaitForFlushMemTable();
 
+#ifndef ROCKSDB_LITE
   ASSERT_EQ(1, NumTableFilesAtLevel(0));
+#endif  // !ROCKSDB_LITE
 
   std::string value;
   Status s = db_->Get(ReadOptions(), key1, &value);
@@ -715,9 +940,9 @@ TEST_F(DBTest, FlushSchedule) {
 namespace {
 class KeepFilter : public CompactionFilter {
  public:
-  virtual bool Filter(int level, const Slice& key, const Slice& value,
-                      std::string* new_value,
-                      bool* value_changed) const override {
+  virtual bool Filter(int /*level*/, const Slice& /*key*/,
+                      const Slice& /*value*/, std::string* /*new_value*/,
+                      bool* /*value_changed*/) const override {
     return false;
   }
 
@@ -747,9 +972,9 @@ class KeepFilterFactory : public CompactionFilterFactory {
 class DelayFilter : public CompactionFilter {
  public:
   explicit DelayFilter(DBTestBase* d) : db_test(d) {}
-  virtual bool Filter(int level, const Slice& key, const Slice& value,
-                      std::string* new_value,
-                      bool* value_changed) const override {
+  virtual bool Filter(int /*level*/, const Slice& /*key*/,
+                      const Slice& /*value*/, std::string* /*new_value*/,
+                      bool* /*value_changed*/) const override {
     db_test->env_->addon_time_.fetch_add(1000);
     return true;
   }
@@ -764,7 +989,7 @@ class DelayFilterFactory : public CompactionFilterFactory {
  public:
   explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
   virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
-      const CompactionFilter::Context& context) override {
+      const CompactionFilter::Context& /*context*/) override {
     return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
   }
 
@@ -1439,7 +1664,6 @@ TEST_F(DBTest, UnremovableSingleDelete) {
 #ifndef ROCKSDB_LITE
 TEST_F(DBTest, DeletionMarkers1) {
   Options options = CurrentOptions();
-  options.max_background_flushes = 0;
   CreateAndReopenWithCF({"pikachu"}, options);
   Put(1, "foo", "v1");
   ASSERT_OK(Flush(1));
@@ -1648,7 +1872,7 @@ TEST_F(DBTest, CustomComparator) {
 
 TEST_F(DBTest, DBOpen_Options) {
   Options options = CurrentOptions();
-  std::string dbname = test::TmpDir(env_) + "/db_options_test";
+  std::string dbname = test::PerThreadDBPath("db_options_test");
   ASSERT_OK(DestroyDB(dbname, options));
 
   // Does not exist, and create_if_missing == false: error
@@ -1706,7 +1930,7 @@ TEST_F(DBTest, DBOpen_Change_NumLevels) {
 }
 
 TEST_F(DBTest, DestroyDBMetaDatabase) {
-  std::string dbname = test::TmpDir(env_) + "/db_meta";
+  std::string dbname = test::PerThreadDBPath("db_meta");
   ASSERT_OK(env_->CreateDirIfMissing(dbname));
   std::string metadbname = MetaDatabaseName(dbname, 0);
   ASSERT_OK(env_->CreateDirIfMissing(metadbname));
@@ -2115,14 +2339,21 @@ static void GCThreadBody(void* arg) {
 
 }  // namespace
 
+#ifndef TRAVIS
+// Disable this test temporarily on Travis as it fails intermittently.
+// Github issue: #4151
 TEST_F(DBTest, GroupCommitTest) {
   do {
     Options options = CurrentOptions();
     options.env = env_;
-    env_->log_write_slowdown_.store(100);
     options.statistics = rocksdb::CreateDBStatistics();
     Reopen(options);
 
+    rocksdb::SyncPoint::GetInstance()->LoadDependency(
+        {{"WriteThread::JoinBatchGroup:BeganWaiting",
+          "DBImpl::WriteImpl:BeforeLeaderEnters"}});
+    rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
     // Start threads
     GCThread thread[kGCNumThreads];
     for (int id = 0; id < kGCNumThreads; id++) {
@@ -2131,13 +2362,7 @@ TEST_F(DBTest, GroupCommitTest) {
       thread[id].done = false;
       env_->StartThread(GCThreadBody, &thread[id]);
     }
-
-    for (int id = 0; id < kGCNumThreads; id++) {
-      while (thread[id].done == false) {
-        env_->SleepForMicroseconds(100000);
-      }
-    }
-    env_->log_write_slowdown_.store(0);
+    env_->WaitForJoin();
 
     ASSERT_GT(TestGetTickerCount(options, WRITE_DONE_BY_OTHER), 0);
 
@@ -2163,6 +2388,7 @@ TEST_F(DBTest, GroupCommitTest) {
     ASSERT_GT(hist_data.average, 0.0);
   } while (ChangeOptions(kSkipNoSeekToLast));
 }
+#endif  // TRAVIS
 
 namespace {
 typedef std::map<std::string, std::string> KVMap;
@@ -2189,6 +2415,8 @@ class ModelDB : public DB {
     batch.Put(cf, k, v);
     return Write(o, &batch);
   }
+  using DB::Close;
+  virtual Status Close() override { return Status::OK(); }
   using DB::Delete;
   virtual Status Delete(const WriteOptions& o, ColumnFamilyHandle* cf,
                         const Slice& key) override {
@@ -2211,17 +2439,17 @@ class ModelDB : public DB {
     return Write(o, &batch);
   }
   using DB::Get;
-  virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* cf,
-                     const Slice& key, PinnableSlice* value) override {
+  virtual Status Get(const ReadOptions& /*options*/, ColumnFamilyHandle* /*cf*/,
+                     const Slice& key, PinnableSlice* /*value*/) override {
     return Status::NotSupported(key);
   }
 
   using DB::MultiGet;
   virtual std::vector<Status> MultiGet(
-      const ReadOptions& options,
-      const std::vector<ColumnFamilyHandle*>& column_family,
+      const ReadOptions& /*options*/,
+      const std::vector<ColumnFamilyHandle*>& /*column_family*/,
       const std::vector<Slice>& keys,
-      std::vector<std::string>* values) override {
+      std::vector<std::string>* /*values*/) override {
     std::vector<Status> s(keys.size(),
                           Status::NotSupported("Not implemented."));
     return s;
@@ -2230,30 +2458,34 @@ class ModelDB : public DB {
 #ifndef ROCKSDB_LITE
   using DB::IngestExternalFile;
   virtual Status IngestExternalFile(
-      ColumnFamilyHandle* column_family,
-      const std::vector<std::string>& external_files,
-      const IngestExternalFileOptions& options) override {
+      ColumnFamilyHandle* /*column_family*/,
+      const std::vector<std::string>& /*external_files*/,
+      const IngestExternalFileOptions& /*options*/) override {
+    return Status::NotSupported("Not implemented.");
+  }
+
+  virtual Status VerifyChecksum() override {
     return Status::NotSupported("Not implemented.");
   }
 
   using DB::GetPropertiesOfAllTables;
   virtual Status GetPropertiesOfAllTables(
-      ColumnFamilyHandle* column_family,
-      TablePropertiesCollection* props) override {
+      ColumnFamilyHandle* /*column_family*/,
+      TablePropertiesCollection* /*props*/) override {
     return Status();
   }
 
   virtual Status GetPropertiesOfTablesInRange(
-      ColumnFamilyHandle* column_family, const Range* range, std::size_t n,
-      TablePropertiesCollection* props) override {
+      ColumnFamilyHandle* /*column_family*/, const Range* /*range*/,
+      std::size_t /*n*/, TablePropertiesCollection* /*props*/) override {
     return Status();
   }
 #endif  // ROCKSDB_LITE
 
   using DB::KeyMayExist;
-  virtual bool KeyMayExist(const ReadOptions& options,
-                           ColumnFamilyHandle* column_family, const Slice& key,
-                           std::string* value,
+  virtual bool KeyMayExist(const ReadOptions& /*options*/,
+                           ColumnFamilyHandle* /*column_family*/,
+                           const Slice& /*key*/, std::string* /*value*/,
                            bool* value_found = nullptr) override {
     if (value_found != nullptr) {
       *value_found = false;
@@ -2261,8 +2493,9 @@ class ModelDB : public DB {
     return true;  // Not Supported directly
   }
   using DB::NewIterator;
-  virtual Iterator* NewIterator(const ReadOptions& options,
-                                ColumnFamilyHandle* column_family) override {
+  virtual Iterator* NewIterator(
+      const ReadOptions& options,
+      ColumnFamilyHandle* /*column_family*/) override {
     if (options.snapshot == nullptr) {
       KVMap* saved = new KVMap;
       *saved = map_;
@@ -2274,9 +2507,9 @@ class ModelDB : public DB {
     }
   }
   virtual Status NewIterators(
-      const ReadOptions& options,
-      const std::vector<ColumnFamilyHandle*>& column_family,
-      std::vector<Iterator*>* iterators) override {
+      const ReadOptions& /*options*/,
+      const std::vector<ColumnFamilyHandle*>& /*column_family*/,
+      std::vector<Iterator*>* /*iterators*/) override {
     return Status::NotSupported("Not supported yet");
   }
   virtual const Snapshot* GetSnapshot() override {
@@ -2289,7 +2522,7 @@ class ModelDB : public DB {
     delete reinterpret_cast<const ModelSnapshot*>(snapshot);
   }
 
-  virtual Status Write(const WriteOptions& options,
+  virtual Status Write(const WriteOptions& /*options*/,
                        WriteBatch* batch) override {
     class Handler : public WriteBatch::Handler {
      public:
@@ -2297,7 +2530,8 @@ class ModelDB : public DB {
       virtual void Put(const Slice& key, const Slice& value) override {
         (*map_)[key.ToString()] = value.ToString();
       }
-      virtual void Merge(const Slice& key, const Slice& value) override {
+      virtual void Merge(const Slice& /*key*/,
+                         const Slice& /*value*/) override {
         // ignore merge for now
         // (*map_)[key.ToString()] = value.ToString();
       }
@@ -2311,62 +2545,67 @@ class ModelDB : public DB {
   }
 
   using DB::GetProperty;
-  virtual bool GetProperty(ColumnFamilyHandle* column_family,
-                           const Slice& property, std::string* value) override {
+  virtual bool GetProperty(ColumnFamilyHandle* /*column_family*/,
+                           const Slice& /*property*/,
+                           std::string* /*value*/) override {
     return false;
   }
   using DB::GetIntProperty;
-  virtual bool GetIntProperty(ColumnFamilyHandle* column_family,
-                              const Slice& property, uint64_t* value) override {
+  virtual bool GetIntProperty(ColumnFamilyHandle* /*column_family*/,
+                              const Slice& /*property*/,
+                              uint64_t* /*value*/) override {
     return false;
   }
   using DB::GetMapProperty;
-  virtual bool GetMapProperty(ColumnFamilyHandle* column_family,
-                              const Slice& property,
-                              std::map<std::string, double>* value) override {
+  virtual bool GetMapProperty(
+      ColumnFamilyHandle* /*column_family*/, const Slice& /*property*/,
+      std::map<std::string, std::string>* /*value*/) override {
     return false;
   }
   using DB::GetAggregatedIntProperty;
-  virtual bool GetAggregatedIntProperty(const Slice& property,
-                                        uint64_t* value) override {
+  virtual bool GetAggregatedIntProperty(const Slice& /*property*/,
+                                        uint64_t* /*value*/) override {
     return false;
   }
   using DB::GetApproximateSizes;
-  virtual void GetApproximateSizes(ColumnFamilyHandle* column_family,
-                                   const Range* range, int n, uint64_t* sizes,
-                                   uint8_t include_flags
+  virtual void GetApproximateSizes(ColumnFamilyHandle* /*column_family*/,
+                                   const Range* /*range*/, int n,
+                                   uint64_t* sizes,
+                                   uint8_t /*include_flags*/
                                    = INCLUDE_FILES) override {
     for (int i = 0; i < n; i++) {
       sizes[i] = 0;
     }
   }
   using DB::GetApproximateMemTableStats;
-  virtual void GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
-                                           const Range& range,
-                                           uint64_t* const count,
-                                           uint64_t* const size) override {
+  virtual void GetApproximateMemTableStats(
+      ColumnFamilyHandle* /*column_family*/, const Range& /*range*/,
+      uint64_t* const count, uint64_t* const size) override {
     *count = 0;
     *size = 0;
   }
   using DB::CompactRange;
-  virtual Status CompactRange(const CompactRangeOptions& options,
-                              ColumnFamilyHandle* column_family,
-                              const Slice* start, const Slice* end) override {
+  virtual Status CompactRange(const CompactRangeOptions& /*options*/,
+                              ColumnFamilyHandle* /*column_family*/,
+                              const Slice* /*start*/,
+                              const Slice* /*end*/) override {
     return Status::NotSupported("Not supported operation.");
   }
 
   virtual Status SetDBOptions(
-      const std::unordered_map<std::string, std::string>& new_options)
+      const std::unordered_map<std::string, std::string>& /*new_options*/)
       override {
     return Status::NotSupported("Not supported operation.");
   }
 
   using DB::CompactFiles;
-  virtual Status CompactFiles(const CompactionOptions& compact_options,
-                              ColumnFamilyHandle* column_family,
-                              const std::vector<std::string>& input_file_names,
-                              const int output_level,
-                              const int output_path_id = -1) override {
+  virtual Status CompactFiles(
+      const CompactionOptions& /*compact_options*/,
+      ColumnFamilyHandle* /*column_family*/,
+      const std::vector<std::string>& /*input_file_names*/,
+      const int /*output_level*/, const int /*output_path_id*/ = -1,
+      std::vector<std::string>* const /*output_file_names*/ = nullptr
+      ) override {
     return Status::NotSupported("Not supported operation.");
   }
 
@@ -2379,24 +2618,25 @@ class ModelDB : public DB {
   }
 
   Status EnableAutoCompaction(
-      const std::vector<ColumnFamilyHandle*>& column_family_handles) override {
+      const std::vector<ColumnFamilyHandle*>& /*column_family_handles*/)
+      override {
     return Status::NotSupported("Not supported operation.");
   }
 
   using DB::NumberLevels;
-  virtual int NumberLevels(ColumnFamilyHandle* column_family) override {
+  virtual int NumberLevels(ColumnFamilyHandle* /*column_family*/) override {
     return 1;
   }
 
   using DB::MaxMemCompactionLevel;
   virtual int MaxMemCompactionLevel(
-      ColumnFamilyHandle* column_family) override {
+      ColumnFamilyHandle* /*column_family*/) override {
     return 1;
   }
 
   using DB::Level0StopWriteTrigger;
   virtual int Level0StopWriteTrigger(
-      ColumnFamilyHandle* column_family) override {
+      ColumnFamilyHandle* /*column_family*/) override {
     return -1;
   }
 
@@ -2405,7 +2645,8 @@ class ModelDB : public DB {
   virtual Env* GetEnv() const override { return nullptr; }
 
   using DB::GetOptions;
-  virtual Options GetOptions(ColumnFamilyHandle* column_family) const override {
+  virtual Options GetOptions(
+      ColumnFamilyHandle* /*column_family*/) const override {
     return options_;
   }
 
@@ -2413,8 +2654,8 @@ class ModelDB : public DB {
   virtual DBOptions GetDBOptions() const override { return options_; }
 
   using DB::Flush;
-  virtual Status Flush(const rocksdb::FlushOptions& options,
-                       ColumnFamilyHandle* column_family) override {
+  virtual Status Flush(const rocksdb::FlushOptions& /*options*/,
+                       ColumnFamilyHandle* /*column_family*/) override {
     Status ret;
     return ret;
   }
@@ -2424,38 +2665,45 @@ class ModelDB : public DB {
 #ifndef ROCKSDB_LITE
   virtual Status DisableFileDeletions() override { return Status::OK(); }
 
-  virtual Status EnableFileDeletions(bool force) override {
+  virtual Status EnableFileDeletions(bool /*force*/) override {
     return Status::OK();
   }
-  virtual Status GetLiveFiles(std::vector<std::string>&, uint64_t* size,
-                              bool flush_memtable = true) override {
+  virtual Status GetLiveFiles(std::vector<std::string>&, uint64_t* /*size*/,
+                              bool /*flush_memtable*/ = true) override {
     return Status::OK();
   }
 
-  virtual Status GetSortedWalFiles(VectorLogPtr& files) override {
+  virtual Status GetSortedWalFiles(VectorLogPtr& /*files*/) override {
     return Status::OK();
   }
 
-  virtual Status DeleteFile(std::string name) override { return Status::OK(); }
+  virtual Status DeleteFile(std::string /*name*/) override {
+    return Status::OK();
+  }
 
   virtual Status GetUpdatesSince(
       rocksdb::SequenceNumber, unique_ptr<rocksdb::TransactionLogIterator>*,
-      const TransactionLogIterator::ReadOptions& read_options =
+      const TransactionLogIterator::ReadOptions& /*read_options*/ =
           TransactionLogIterator::ReadOptions()) override {
     return Status::NotSupported("Not supported in Model DB");
   }
 
   virtual void GetColumnFamilyMetaData(
-      ColumnFamilyHandle* column_family,
-      ColumnFamilyMetaData* metadata) override {}
+      ColumnFamilyHandle* /*column_family*/,
+      ColumnFamilyMetaData* /*metadata*/) override {}
 #endif  // ROCKSDB_LITE
 
-  virtual Status GetDbIdentity(std::string& identity) const override {
+  virtual Status GetDbIdentity(std::string& /*identity*/) const override {
     return Status::OK();
   }
 
   virtual SequenceNumber GetLatestSequenceNumber() const override { return 0; }
 
+  virtual bool SetPreserveDeletesSequenceNumber(
+      SequenceNumber /*seqnum*/) override {
+    return true;
+  }
+
   virtual ColumnFamilyHandle* DefaultColumnFamily() const override {
     return nullptr;
   }
@@ -2507,6 +2755,7 @@ class ModelDB : public DB {
   std::string name_ = "";
 };
 
+#ifndef ROCKSDB_VALGRIND_RUN
 static std::string RandomKey(Random* rnd, int minimum = 0) {
   int len;
   do {
@@ -2663,6 +2912,7 @@ TEST_P(DBTestRandomized, Randomized) {
   if (model_snap != nullptr) model.ReleaseSnapshot(model_snap);
   if (db_snap != nullptr) db_->ReleaseSnapshot(db_snap);
 }
+#endif  // ROCKSDB_VALGRIND_RUN
 
 TEST_F(DBTest, BlockBasedTablePrefixIndexTest) {
   // create a DB with block prefix index
@@ -2765,6 +3015,337 @@ TEST_P(DBTestWithParam, FIFOCompactionTest) {
     }
   }
 }
+
+TEST_F(DBTest, FIFOCompactionTestWithCompaction) {
+  Options options;
+  options.compaction_style = kCompactionStyleFIFO;
+  options.write_buffer_size = 20 << 10;  // 20K
+  options.arena_block_size = 4096;
+  options.compaction_options_fifo.max_table_files_size = 1500 << 10;  // 1MB
+  options.compaction_options_fifo.allow_compaction = true;
+  options.level0_file_num_compaction_trigger = 6;
+  options.compression = kNoCompression;
+  options.create_if_missing = true;
+  options = CurrentOptions(options);
+  DestroyAndReopen(options);
+
+  Random rnd(301);
+  for (int i = 0; i < 60; i++) {
+    // Generate and flush a file about 20KB.
+    for (int j = 0; j < 20; j++) {
+      ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+    }
+    Flush();
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  }
+  // It should be compacted to 10 files.
+  ASSERT_EQ(NumTableFilesAtLevel(0), 10);
+
+  for (int i = 0; i < 60; i++) {
+    // Generate and flush a file about 20KB.
+    for (int j = 0; j < 20; j++) {
+      ASSERT_OK(Put(ToString(i * 20 + j + 2000), RandomString(&rnd, 980)));
+    }
+    Flush();
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  }
+
+  // It should be compacted to no more than 20 files.
+  ASSERT_GT(NumTableFilesAtLevel(0), 10);
+  ASSERT_LT(NumTableFilesAtLevel(0), 18);
+  // Size limit is still guaranteed.
+  ASSERT_LE(SizeAtLevel(0),
+            options.compaction_options_fifo.max_table_files_size);
+}
+
+TEST_F(DBTest, FIFOCompactionStyleWithCompactionAndDelete) {
+  Options options;
+  options.compaction_style = kCompactionStyleFIFO;
+  options.write_buffer_size = 20 << 10;  // 20K
+  options.arena_block_size = 4096;
+  options.compaction_options_fifo.max_table_files_size = 1500 << 10;  // 1MB
+  options.compaction_options_fifo.allow_compaction = true;
+  options.level0_file_num_compaction_trigger = 3;
+  options.compression = kNoCompression;
+  options.create_if_missing = true;
+  options = CurrentOptions(options);
+  DestroyAndReopen(options);
+
+  Random rnd(301);
+  for (int i = 0; i < 3; i++) {
+    // Each file contains a different key which will be dropped later.
+    ASSERT_OK(Put("a" + ToString(i), RandomString(&rnd, 500)));
+    ASSERT_OK(Put("key" + ToString(i), ""));
+    ASSERT_OK(Put("z" + ToString(i), RandomString(&rnd, 500)));
+    Flush();
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  }
+  ASSERT_EQ(NumTableFilesAtLevel(0), 1);
+  for (int i = 0; i < 3; i++) {
+    ASSERT_EQ("", Get("key" + ToString(i)));
+  }
+  for (int i = 0; i < 3; i++) {
+    // Each file contains a different key which will be dropped later.
+    ASSERT_OK(Put("a" + ToString(i), RandomString(&rnd, 500)));
+    ASSERT_OK(Delete("key" + ToString(i)));
+    ASSERT_OK(Put("z" + ToString(i), RandomString(&rnd, 500)));
+    Flush();
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  }
+  ASSERT_EQ(NumTableFilesAtLevel(0), 2);
+  for (int i = 0; i < 3; i++) {
+    ASSERT_EQ("NOT_FOUND", Get("key" + ToString(i)));
+  }
+}
+
+// Check that FIFO-with-TTL is not supported with max_open_files != -1.
+TEST_F(DBTest, FIFOCompactionWithTTLAndMaxOpenFilesTest) {
+  Options options;
+  options.compaction_style = kCompactionStyleFIFO;
+  options.create_if_missing = true;
+  options.compaction_options_fifo.ttl = 600;  // seconds
+
+  // Check that it is not supported with max_open_files != -1.
+  options.max_open_files = 100;
+  options = CurrentOptions(options);
+  ASSERT_TRUE(TryReopen(options).IsNotSupported());
+
+  options.max_open_files = -1;
+  ASSERT_OK(TryReopen(options));
+}
+
+// Check that FIFO-with-TTL is supported only with BlockBasedTableFactory.
+TEST_F(DBTest, FIFOCompactionWithTTLAndVariousTableFormatsTest) {
+  Options options;
+  options.compaction_style = kCompactionStyleFIFO;
+  options.create_if_missing = true;
+  options.compaction_options_fifo.ttl = 600;  // seconds
+
+  options = CurrentOptions(options);
+  options.table_factory.reset(NewBlockBasedTableFactory());
+  ASSERT_OK(TryReopen(options));
+
+  Destroy(options);
+  options.table_factory.reset(NewPlainTableFactory());
+  ASSERT_TRUE(TryReopen(options).IsNotSupported());
+
+  Destroy(options);
+  options.table_factory.reset(NewCuckooTableFactory());
+  ASSERT_TRUE(TryReopen(options).IsNotSupported());
+
+  Destroy(options);
+  options.table_factory.reset(NewAdaptiveTableFactory());
+  ASSERT_TRUE(TryReopen(options).IsNotSupported());
+}
+
+TEST_F(DBTest, FIFOCompactionWithTTLTest) {
+  Options options;
+  options.compaction_style = kCompactionStyleFIFO;
+  options.write_buffer_size = 10 << 10;  // 10KB
+  options.arena_block_size = 4096;
+  options.compression = kNoCompression;
+  options.create_if_missing = true;
+  env_->time_elapse_only_sleep_ = false;
+  options.env = env_;
+
+  // Test to make sure that all files with expired ttl are deleted on next
+  // manual compaction.
+  {
+    env_->addon_time_.store(0);
+    options.compaction_options_fifo.max_table_files_size = 150 << 10;  // 150KB
+    options.compaction_options_fifo.allow_compaction = false;
+    options.compaction_options_fifo.ttl = 1 * 60 * 60 ;  // 1 hour
+    options = CurrentOptions(options);
+    DestroyAndReopen(options);
+
+    Random rnd(301);
+    for (int i = 0; i < 10; i++) {
+      // Generate and flush a file about 10KB.
+      for (int j = 0; j < 10; j++) {
+        ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+      }
+      Flush();
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    }
+    ASSERT_EQ(NumTableFilesAtLevel(0), 10);
+
+    // Sleep for 2 hours -- which is much greater than TTL.
+    // Note: Couldn't use SleepForMicroseconds because it takes an int instead
+    // of uint64_t. Hence used addon_time_ directly.
+    // env_->SleepForMicroseconds(2 * 60 * 60 * 1000 * 1000);
+    env_->addon_time_.fetch_add(2 * 60 * 60);
+
+    // Since no flushes and compactions have run, the db should still be in
+    // the same state even after considerable time has passed.
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    ASSERT_EQ(NumTableFilesAtLevel(0), 10);
+
+    dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+    ASSERT_EQ(NumTableFilesAtLevel(0), 0);
+  }
+
+  // Test to make sure that all files with expired ttl are deleted on next
+  // automatic compaction.
+  {
+    options.compaction_options_fifo.max_table_files_size = 150 << 10;  // 150KB
+    options.compaction_options_fifo.allow_compaction = false;
+    options.compaction_options_fifo.ttl = 1 * 60 * 60;  // 1 hour
+    options = CurrentOptions(options);
+    DestroyAndReopen(options);
+
+    Random rnd(301);
+    for (int i = 0; i < 10; i++) {
+      // Generate and flush a file about 10KB.
+      for (int j = 0; j < 10; j++) {
+        ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+      }
+      Flush();
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    }
+    ASSERT_EQ(NumTableFilesAtLevel(0), 10);
+
+    // Sleep for 2 hours -- which is much greater than TTL.
+    env_->addon_time_.fetch_add(2 * 60 * 60);
+    // Just to make sure that we are in the same state even after sleeping.
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    ASSERT_EQ(NumTableFilesAtLevel(0), 10);
+
+    // Create 1 more file to trigger TTL compaction. The old files are dropped.
+    for (int i = 0; i < 1; i++) {
+      for (int j = 0; j < 10; j++) {
+        ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+      }
+      Flush();
+    }
+
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    // Only the new 10 files remain.
+    ASSERT_EQ(NumTableFilesAtLevel(0), 1);
+    ASSERT_LE(SizeAtLevel(0),
+              options.compaction_options_fifo.max_table_files_size);
+  }
+
+  // Test that shows the fall back to size-based FIFO compaction if TTL-based
+  // deletion doesn't move the total size to be less than max_table_files_size.
+  {
+    options.write_buffer_size = 10 << 10;                              // 10KB
+    options.compaction_options_fifo.max_table_files_size = 150 << 10;  // 150KB
+    options.compaction_options_fifo.allow_compaction = false;
+    options.compaction_options_fifo.ttl =  1 * 60 * 60;  // 1 hour
+    options = CurrentOptions(options);
+    DestroyAndReopen(options);
+
+    Random rnd(301);
+    for (int i = 0; i < 3; i++) {
+      // Generate and flush a file about 10KB.
+      for (int j = 0; j < 10; j++) {
+        ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+      }
+      Flush();
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    }
+    ASSERT_EQ(NumTableFilesAtLevel(0), 3);
+
+    // Sleep for 2 hours -- which is much greater than TTL.
+    env_->addon_time_.fetch_add(2 * 60 * 60);
+    // Just to make sure that we are in the same state even after sleeping.
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    ASSERT_EQ(NumTableFilesAtLevel(0), 3);
+
+    for (int i = 0; i < 5; i++) {
+      for (int j = 0; j < 140; j++) {
+        ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+      }
+      Flush();
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    }
+    // Size limit is still guaranteed.
+    ASSERT_LE(SizeAtLevel(0),
+              options.compaction_options_fifo.max_table_files_size);
+  }
+
+  // Test with TTL + Intra-L0 compactions.
+  {
+    options.compaction_options_fifo.max_table_files_size = 150 << 10;  // 150KB
+    options.compaction_options_fifo.allow_compaction = true;
+    options.compaction_options_fifo.ttl = 1 * 60 * 60;  // 1 hour
+    options.level0_file_num_compaction_trigger = 6;
+    options = CurrentOptions(options);
+    DestroyAndReopen(options);
+
+    Random rnd(301);
+    for (int i = 0; i < 10; i++) {
+      // Generate and flush a file about 10KB.
+      for (int j = 0; j < 10; j++) {
+        ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+      }
+      Flush();
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    }
+    // With Intra-L0 compaction, out of 10 files, 6 files will be compacted to 1
+    // (due to level0_file_num_compaction_trigger = 6).
+    // So total files = 1 + remaining 4 = 5.
+    ASSERT_EQ(NumTableFilesAtLevel(0), 5);
+
+    // Sleep for 2 hours -- which is much greater than TTL.
+    env_->addon_time_.fetch_add(2 * 60 * 60);
+    // Just to make sure that we are in the same state even after sleeping.
+    ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    ASSERT_EQ(NumTableFilesAtLevel(0), 5);
+
+    // Create 10 more files. The old 5 files are dropped as their ttl expired.
+    for (int i = 0; i < 10; i++) {
+      for (int j = 0; j < 10; j++) {
+        ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+      }
+      Flush();
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    }
+    ASSERT_EQ(NumTableFilesAtLevel(0), 5);
+    ASSERT_LE(SizeAtLevel(0),
+              options.compaction_options_fifo.max_table_files_size);
+  }
+
+  // Test with large TTL + Intra-L0 compactions.
+  // Files dropped based on size, as ttl doesn't kick in.
+  {
+    options.write_buffer_size = 20 << 10;                               // 20K
+    options.compaction_options_fifo.max_table_files_size = 1500 << 10;  // 1.5MB
+    options.compaction_options_fifo.allow_compaction = true;
+    options.compaction_options_fifo.ttl = 1 * 60 * 60;  // 1 hour
+    options.level0_file_num_compaction_trigger = 6;
+    options = CurrentOptions(options);
+    DestroyAndReopen(options);
+
+    Random rnd(301);
+    for (int i = 0; i < 60; i++) {
+      // Generate and flush a file about 20KB.
+      for (int j = 0; j < 20; j++) {
+        ASSERT_OK(Put(ToString(i * 20 + j), RandomString(&rnd, 980)));
+      }
+      Flush();
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    }
+    // It should be compacted to 10 files.
+    ASSERT_EQ(NumTableFilesAtLevel(0), 10);
+
+    for (int i = 0; i < 60; i++) {
+      // Generate and flush a file about 20KB.
+      for (int j = 0; j < 20; j++) {
+        ASSERT_OK(Put(ToString(i * 20 + j + 2000), RandomString(&rnd, 980)));
+      }
+      Flush();
+      ASSERT_OK(dbfull()->TEST_WaitForCompact());
+    }
+
+    // It should be compacted to no more than 20 files.
+    ASSERT_GT(NumTableFilesAtLevel(0), 10);
+    ASSERT_LT(NumTableFilesAtLevel(0), 18);
+    // Size limit is still guaranteed.
+    ASSERT_LE(SizeAtLevel(0),
+              options.compaction_options_fifo.max_table_files_size);
+  }
+}
 #endif  // ROCKSDB_LITE
 
 #ifndef ROCKSDB_LITE
@@ -2961,6 +3542,56 @@ TEST_F(DBTest, WriteSingleThreadEntry) {
   }
 }
 
+TEST_F(DBTest, ConcurrentFlushWAL) {
+  const size_t cnt = 100;
+  Options options;
+  WriteOptions wopt;
+  ReadOptions ropt;
+  for (bool two_write_queues : {false, true}) {
+    for (bool manual_wal_flush : {false, true}) {
+      options.two_write_queues = two_write_queues;
+      options.manual_wal_flush = manual_wal_flush;
+      options.create_if_missing = true;
+      DestroyAndReopen(options);
+      std::vector<port::Thread> threads;
+      threads.emplace_back([&] {
+        for (size_t i = 0; i < cnt; i++) {
+          auto istr = ToString(i);
+          db_->Put(wopt, db_->DefaultColumnFamily(), "a" + istr, "b" + istr);
+        }
+      });
+      if (two_write_queues) {
+        threads.emplace_back([&] {
+          for (size_t i = cnt; i < 2 * cnt; i++) {
+            auto istr = ToString(i);
+            WriteBatch batch;
+            batch.Put("a" + istr, "b" + istr);
+            dbfull()->WriteImpl(wopt, &batch, nullptr, nullptr, 0, true);
+          }
+        });
+      }
+      threads.emplace_back([&] {
+        for (size_t i = 0; i < cnt * 100; i++) {  // FlushWAL is faster than Put
+          db_->FlushWAL(false);
+        }
+      });
+      for (auto& t : threads) {
+        t.join();
+      }
+      options.create_if_missing = false;
+      // Recover from the wal and make sure that it is not corrupted
+      Reopen(options);
+      for (size_t i = 0; i < cnt; i++) {
+        PinnableSlice pval;
+        auto istr = ToString(i);
+        ASSERT_OK(
+            db_->Get(ropt, db_->DefaultColumnFamily(), "a" + istr, &pval));
+        ASSERT_TRUE(pval == ("b" + istr));
+      }
+    }
+  }
+}
+
 #ifndef ROCKSDB_LITE
 TEST_F(DBTest, DynamicMemtableOptions) {
   const uint64_t k64KB = 1 << 16;
@@ -3013,11 +3644,23 @@ TEST_F(DBTest, DynamicMemtableOptions) {
       {"write_buffer_size", "131072"},
   }));
 
-  // The existing memtable is still 64KB in size, after it becomes immutable,
-  // the next memtable will be 128KB in size. Write 256KB total, we should
-  // have a 64KB L0 file, a 128KB L0 file, and a memtable with 64KB data
-  gen_l0_kb(256);
-  ASSERT_EQ(NumTableFilesAtLevel(0), 2);  // (A)
+  // The existing memtable inflated 64KB->128KB when we invoked SetOptions().
+  // Write 192KB, we should have a 128KB L0 file and a memtable with 64KB data.
+  gen_l0_kb(192);
+  ASSERT_EQ(NumTableFilesAtLevel(0), 1);  // (A)
+  ASSERT_LT(SizeAtLevel(0), k128KB + 2 * k5KB);
+  ASSERT_GT(SizeAtLevel(0), k128KB - 4 * k5KB);
+
+  // Decrease buffer size below current usage
+  ASSERT_OK(dbfull()->SetOptions({
+      {"write_buffer_size", "65536"},
+  }));
+  // The existing memtable became eligible for flush when we reduced its
+  // capacity to 64KB. Two keys need to be added to trigger flush: first causes
+  // memtable to be marked full, second schedules the flush. Then we should have
+  // a 128KB L0 file, a 64KB L0 file, and a memtable with just one key.
+  gen_l0_kb(2);
+  ASSERT_EQ(NumTableFilesAtLevel(0), 2);
   ASSERT_LT(SizeAtLevel(0), k128KB + k64KB + 2 * k5KB);
   ASSERT_GT(SizeAtLevel(0), k128KB + k64KB - 4 * k5KB);
 
@@ -3031,9 +3674,9 @@ TEST_F(DBTest, DynamicMemtableOptions) {
                  Env::Priority::LOW);
   // Start from scratch and disable compaction/flush. Flush can only happen
   // during compaction but trigger is pretty high
-  options.max_background_flushes = 0;
   options.disable_auto_compactions = true;
   DestroyAndReopen(options);
+  env_->SetBackgroundThreads(0, Env::HIGH);
 
   // Put until writes are stopped, bounded by 256 puts. We should see stop at
   // ~128KB
@@ -3042,7 +3685,7 @@ TEST_F(DBTest, DynamicMemtableOptions) {
 
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::DelayWrite:Wait",
-      [&](void* arg) { sleeping_task_low.WakeUp(); });
+      [&](void* /*arg*/) { sleeping_task_low.WakeUp(); });
   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
 
   while (!sleeping_task_low.WokenUp() && count < 256) {
@@ -3135,12 +3778,16 @@ TEST_F(DBTest, GetThreadStatus) {
     const int kTestCount = 3;
     const unsigned int kHighPriCounts[kTestCount] = {3, 2, 5};
     const unsigned int kLowPriCounts[kTestCount] = {10, 15, 3};
+    const unsigned int kBottomPriCounts[kTestCount] = {2, 1, 4};
     for (int test = 0; test < kTestCount; ++test) {
       // Change the number of threads in high / low priority pool.
       env_->SetBackgroundThreads(kHighPriCounts[test], Env::HIGH);
       env_->SetBackgroundThreads(kLowPriCounts[test], Env::LOW);
+      env_->SetBackgroundThreads(kBottomPriCounts[test], Env::BOTTOM);
       // Wait to ensure the all threads has been registered
       unsigned int thread_type_counts[ThreadStatus::NUM_THREAD_TYPES];
+      // TODO(ajkr): it'd be better if SetBackgroundThreads returned only after
+      // all threads have been registered.
       // Try up to 60 seconds.
       for (int num_try = 0; num_try < 60000; num_try++) {
         env_->SleepForMicroseconds(1000);
@@ -3155,20 +3802,21 @@ TEST_F(DBTest, GetThreadStatus) {
         if (thread_type_counts[ThreadStatus::HIGH_PRIORITY] ==
                 kHighPriCounts[test] &&
             thread_type_counts[ThreadStatus::LOW_PRIORITY] ==
-                kLowPriCounts[test]) {
+                kLowPriCounts[test] &&
+            thread_type_counts[ThreadStatus::BOTTOM_PRIORITY] ==
+                kBottomPriCounts[test]) {
           break;
         }
       }
-      // Verify the total number of threades
-      ASSERT_EQ(thread_type_counts[ThreadStatus::HIGH_PRIORITY] +
-                    thread_type_counts[ThreadStatus::LOW_PRIORITY],
-                kHighPriCounts[test] + kLowPriCounts[test]);
       // Verify the number of high-priority threads
       ASSERT_EQ(thread_type_counts[ThreadStatus::HIGH_PRIORITY],
                 kHighPriCounts[test]);
       // Verify the number of low-priority threads
       ASSERT_EQ(thread_type_counts[ThreadStatus::LOW_PRIORITY],
                 kLowPriCounts[test]);
+      // Verify the number of bottom-priority threads
+      ASSERT_EQ(thread_type_counts[ThreadStatus::BOTTOM_PRIORITY],
+                kBottomPriCounts[test]);
     }
     if (i == 0) {
       // repeat the test with multiple column families
@@ -3310,7 +3958,6 @@ TEST_P(DBTestWithParam, ThreadStatusSingleCompaction) {
 
 TEST_P(DBTestWithParam, PreShutdownManualCompaction) {
   Options options = CurrentOptions();
-  options.max_background_flushes = 0;
   options.max_subcompactions = max_subcompactions_;
   CreateAndReopenWithCF({"pikachu"}, options);
 
@@ -3349,7 +3996,6 @@ TEST_P(DBTestWithParam, PreShutdownManualCompaction) {
 
     if (iter == 0) {
       options = CurrentOptions();
-      options.max_background_flushes = 0;
       options.num_levels = 3;
       options.create_if_missing = true;
       DestroyAndReopen(options);
@@ -3360,7 +4006,6 @@ TEST_P(DBTestWithParam, PreShutdownManualCompaction) {
 
 TEST_F(DBTest, PreShutdownFlush) {
   Options options = CurrentOptions();
-  options.max_background_flushes = 0;
   CreateAndReopenWithCF({"pikachu"}, options);
   ASSERT_OK(Put(1, "key", "value"));
   CancelAllBackgroundWork(db_);
@@ -3652,19 +4297,14 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
   Random rnd(301);
   Options options;
   options.create_if_missing = true;
-  options.db_write_buffer_size = 6000;
-  options.write_buffer_size = 6000;
+  options.db_write_buffer_size = 6000000;
+  options.write_buffer_size = 600000;
   options.max_write_buffer_number = 2;
   options.level0_file_num_compaction_trigger = 2;
   options.level0_slowdown_writes_trigger = 2;
   options.level0_stop_writes_trigger = 2;
   options.soft_pending_compaction_bytes_limit = 1024 * 1024;
-
-  // Use file size to distinguish levels
-  // L1: 10, L2: 20, L3 40, L4 80
-  // L0 is less than 30
-  options.target_file_size_base = 10;
-  options.target_file_size_multiplier = 2;
+  options.target_file_size_base = 20;
 
   options.level_compaction_dynamic_level_bytes = true;
   options.max_bytes_for_level_base = 200;
@@ -3701,10 +4341,11 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
 
   for (int i = 0; i < 100; i++) {
-    ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
-
-    if (i % 25 == 0) {
-      dbfull()->TEST_WaitForFlushMemTable();
+    std::string value = RandomString(&rnd, 200);
+    ASSERT_OK(Put(Key(keys[i]), value));
+    if (i % 25 == 24) {
+      Flush();
+      dbfull()->TEST_WaitForCompact();
     }
   }
 
@@ -3745,7 +4386,8 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) {
   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
 
   for (int i = 101; i < 500; i++) {
-    ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200)));
+    std::string value = RandomString(&rnd, 200);
+    ASSERT_OK(Put(Key(keys[i]), value));
     if (i % 100 == 99) {
       Flush();
       dbfull()->TEST_WaitForCompact();
@@ -3879,7 +4521,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
   // Clean up memtable and L0. Block compaction threads. If continue to write
   // and flush memtables. We should see put stop after 8 memtable flushes
   // since level0_stop_writes_trigger = 8
-  dbfull()->TEST_FlushMemTable(true);
+  dbfull()->TEST_FlushMemTable(true, true);
   dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
   // Block compaction
   test::SleepingBackgroundTask sleeping_task_low;
@@ -3892,7 +4534,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
   WriteOptions wo;
   while (count < 64) {
     ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
-    dbfull()->TEST_FlushMemTable(true);
+    dbfull()->TEST_FlushMemTable(true, true);
     count++;
     if (dbfull()->TEST_write_controler().IsStopped()) {
       sleeping_task_low.WakeUp();
@@ -3920,7 +4562,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
   count = 0;
   while (count < 64) {
     ASSERT_OK(Put(Key(count), RandomString(&rnd, 1024), wo));
-    dbfull()->TEST_FlushMemTable(true);
+    dbfull()->TEST_FlushMemTable(true, true);
     count++;
     if (dbfull()->TEST_write_controler().IsStopped()) {
       sleeping_task_low.WakeUp();
@@ -3962,6 +4604,140 @@ TEST_F(DBTest, DynamicCompactionOptions) {
   dbfull()->TEST_WaitForCompact();
   ASSERT_LT(NumTableFilesAtLevel(0), 4);
 }
+
+// Test dynamic FIFO copmaction options.
+// This test covers just option parsing and makes sure that the options are
+// correctly assigned. Also look at DBOptionsTest.SetFIFOCompactionOptions
+// test which makes sure that the FIFO compaction funcionality is working
+// as expected on dynamically changing the options.
+// Even more FIFOCompactionTests are at DBTest.FIFOCompaction* .
+TEST_F(DBTest, DynamicFIFOCompactionOptions) {
+  Options options;
+  options.create_if_missing = true;
+  DestroyAndReopen(options);
+
+  // Initial defaults
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+            1024 * 1024 * 1024);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 0);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+            false);
+
+  ASSERT_OK(dbfull()->SetOptions(
+      {{"compaction_options_fifo", "{max_table_files_size=23;}"}}));
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+            23);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 0);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+            false);
+
+  ASSERT_OK(dbfull()->SetOptions({{"compaction_options_fifo", "{ttl=97}"}}));
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+            23);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 97);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+            false);
+
+  ASSERT_OK(dbfull()->SetOptions({{"compaction_options_fifo", "{ttl=203;}"}}));
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+            23);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 203);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+            false);
+
+  ASSERT_OK(dbfull()->SetOptions(
+      {{"compaction_options_fifo", "{allow_compaction=true;}"}}));
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+            23);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 203);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+            true);
+
+  ASSERT_OK(dbfull()->SetOptions(
+      {{"compaction_options_fifo", "{max_table_files_size=31;ttl=19;}"}}));
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+            31);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 19);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+            true);
+
+  ASSERT_OK(dbfull()->SetOptions(
+      {{"compaction_options_fifo",
+        "{max_table_files_size=51;ttl=49;allow_compaction=true;}"}}));
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.max_table_files_size,
+            51);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.ttl, 49);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_fifo.allow_compaction,
+            true);
+}
+
+TEST_F(DBTest, DynamicUniversalCompactionOptions) {
+  Options options;
+  options.create_if_missing = true;
+  DestroyAndReopen(options);
+
+  // Initial defaults
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.size_ratio, 1);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.min_merge_width,
+            2);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.max_merge_width,
+            UINT_MAX);
+  ASSERT_EQ(dbfull()
+                ->GetOptions()
+                .compaction_options_universal.max_size_amplification_percent,
+            200);
+  ASSERT_EQ(dbfull()
+                ->GetOptions()
+                .compaction_options_universal.compression_size_percent,
+            -1);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.stop_style,
+            kCompactionStopStyleTotalSize);
+  ASSERT_EQ(
+      dbfull()->GetOptions().compaction_options_universal.allow_trivial_move,
+      false);
+
+  ASSERT_OK(dbfull()->SetOptions(
+      {{"compaction_options_universal", "{size_ratio=7;}"}}));
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.size_ratio, 7);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.min_merge_width,
+            2);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.max_merge_width,
+            UINT_MAX);
+  ASSERT_EQ(dbfull()
+                ->GetOptions()
+                .compaction_options_universal.max_size_amplification_percent,
+            200);
+  ASSERT_EQ(dbfull()
+                ->GetOptions()
+                .compaction_options_universal.compression_size_percent,
+            -1);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.stop_style,
+            kCompactionStopStyleTotalSize);
+  ASSERT_EQ(
+      dbfull()->GetOptions().compaction_options_universal.allow_trivial_move,
+      false);
+
+  ASSERT_OK(dbfull()->SetOptions(
+      {{"compaction_options_universal", "{min_merge_width=11;}"}}));
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.size_ratio, 7);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.min_merge_width,
+            11);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.max_merge_width,
+            UINT_MAX);
+  ASSERT_EQ(dbfull()
+                ->GetOptions()
+                .compaction_options_universal.max_size_amplification_percent,
+            200);
+  ASSERT_EQ(dbfull()
+                ->GetOptions()
+                .compaction_options_universal.compression_size_percent,
+            -1);
+  ASSERT_EQ(dbfull()->GetOptions().compaction_options_universal.stop_style,
+            kCompactionStopStyleTotalSize);
+  ASSERT_EQ(
+      dbfull()->GetOptions().compaction_options_universal.allow_trivial_move,
+      false);
+}
 #endif  // ROCKSDB_LITE
 
 TEST_F(DBTest, FileCreationRandomFailure) {
@@ -4025,6 +4801,7 @@ TEST_F(DBTest, FileCreationRandomFailure) {
 }
 
 #ifndef ROCKSDB_LITE
+
 TEST_F(DBTest, DynamicMiscOptions) {
   // Test max_sequential_skip_in_iterations
   Options options;
@@ -4175,7 +4952,7 @@ TEST_F(DBTest, EncodeDecompressedBlockSizeTest) {
       options.compression = comp;
       DestroyAndReopen(options);
 
-      int kNumKeysWritten = 100000;
+      int kNumKeysWritten = 1000;
 
       Random rnd(301);
       for (int i = 0; i < kNumKeysWritten; ++i) {
@@ -4256,7 +5033,7 @@ class DelayedMergeOperator : public MergeOperator {
  public:
   explicit DelayedMergeOperator(DBTest* d) : db_test_(d) {}
 
-  virtual bool FullMergeV2(const MergeOperationInput& merge_in,
+  virtual bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
                            MergeOperationOutput* merge_out) const override {
     db_test_->env_->addon_time_.fetch_add(1000);
     merge_out->new_value = "";
@@ -4342,6 +5119,7 @@ TEST_P(DBTestWithParam, FilterCompactionTimeTest) {
   options.disable_auto_compactions = true;
   options.create_if_missing = true;
   options.statistics = rocksdb::CreateDBStatistics();
+  options.statistics->stats_level_ = kExceptTimeForMutex;
   options.max_subcompactions = max_subcompactions_;
   DestroyAndReopen(options);
 
@@ -4568,55 +5346,84 @@ TEST_F(DBTest, PromoteL0Failure) {
   status = experimental::PromoteL0(db_, db_->DefaultColumnFamily());
   ASSERT_TRUE(status.IsInvalidArgument());
 }
-#endif  // ROCKSDB_LITE
 
 // Github issue #596
-TEST_F(DBTest, HugeNumberOfLevels) {
+TEST_F(DBTest, CompactRangeWithEmptyBottomLevel) {
+  const int kNumLevels = 2;
+  const int kNumL0Files = 2;
   Options options = CurrentOptions();
-  options.write_buffer_size = 2 * 1024 * 1024;         // 2MB
-  options.max_bytes_for_level_base = 2 * 1024 * 1024;  // 2MB
-  options.num_levels = 12;
-  options.max_background_compactions = 10;
-  options.max_bytes_for_level_multiplier = 2;
-  options.level_compaction_dynamic_level_bytes = true;
+  options.disable_auto_compactions = true;
+  options.num_levels = kNumLevels;
   DestroyAndReopen(options);
 
   Random rnd(301);
-  for (int i = 0; i < 300000; ++i) {
-    ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
+  for (int i = 0; i < kNumL0Files; ++i) {
+    ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
+    Flush();
   }
+  ASSERT_EQ(NumTableFilesAtLevel(0), kNumL0Files);
+  ASSERT_EQ(NumTableFilesAtLevel(1), 0);
 
   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+  ASSERT_EQ(NumTableFilesAtLevel(0), 0);
+  ASSERT_EQ(NumTableFilesAtLevel(1), kNumL0Files);
 }
+#endif  // ROCKSDB_LITE
 
 TEST_F(DBTest, AutomaticConflictsWithManualCompaction) {
+  const int kNumL0Files = 50;
   Options options = CurrentOptions();
-  options.write_buffer_size = 2 * 1024 * 1024;         // 2MB
-  options.max_bytes_for_level_base = 2 * 1024 * 1024;  // 2MB
-  options.num_levels = 12;
+  options.level0_file_num_compaction_trigger = 4;
+  // never slowdown / stop
+  options.level0_slowdown_writes_trigger = 999999;
+  options.level0_stop_writes_trigger = 999999;
   options.max_background_compactions = 10;
-  options.max_bytes_for_level_multiplier = 2;
-  options.level_compaction_dynamic_level_bytes = true;
   DestroyAndReopen(options);
 
-  Random rnd(301);
-  for (int i = 0; i < 300000; ++i) {
-    ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
-  }
-
+  // schedule automatic compactions after the manual one starts, but before it
+  // finishes to ensure conflict.
+  rocksdb::SyncPoint::GetInstance()->LoadDependency(
+      {{"DBImpl::BackgroundCompaction:Start",
+        "DBTest::AutomaticConflictsWithManualCompaction:PrePuts"},
+       {"DBTest::AutomaticConflictsWithManualCompaction:PostPuts",
+        "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
   std::atomic<int> callback_count(0);
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
-      "DBImpl::BackgroundCompaction()::Conflict",
-      [&](void* arg) { callback_count.fetch_add(1); });
+      "DBImpl::MaybeScheduleFlushOrCompaction:Conflict",
+      [&](void* /*arg*/) { callback_count.fetch_add(1); });
   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
-  CompactRangeOptions croptions;
-  croptions.exclusive_manual_compaction = false;
-  ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr));
+
+  Random rnd(301);
+  for (int i = 0; i < 2; ++i) {
+    // put two keys to ensure no trivial move
+    for (int j = 0; j < 2; ++j) {
+      ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
+    }
+    ASSERT_OK(Flush());
+  }
+  port::Thread manual_compaction_thread([this]() {
+    CompactRangeOptions croptions;
+    croptions.exclusive_manual_compaction = true;
+    ASSERT_OK(db_->CompactRange(croptions, nullptr, nullptr));
+  });
+
+  TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PrePuts");
+  for (int i = 0; i < kNumL0Files; ++i) {
+    // put two keys to ensure no trivial move
+    for (int j = 0; j < 2; ++j) {
+      ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
+    }
+    ASSERT_OK(Flush());
+  }
+  TEST_SYNC_POINT("DBTest::AutomaticConflictsWithManualCompaction:PostPuts");
+
   ASSERT_GE(callback_count.load(), 1);
-  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
-  for (int i = 0; i < 300000; ++i) {
+  for (int i = 0; i < 2; ++i) {
     ASSERT_NE("NOT_FOUND", Get(Key(i)));
   }
+  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  manual_compaction_thread.join();
+  dbfull()->TEST_WaitForCompact();
 }
 
 // Github issue #595
@@ -4806,7 +5613,7 @@ TEST_F(DBTest, HardLimit) {
 
   std::atomic<int> callback_count(0);
   rocksdb::SyncPoint::GetInstance()->SetCallBack("DBImpl::DelayWrite:Wait",
-                                                 [&](void* arg) {
+                                                 [&](void* /*arg*/) {
                                                    callback_count.fetch_add(1);
                                                    sleeping_task_low.WakeUp();
                                                  });
@@ -4831,7 +5638,46 @@ TEST_F(DBTest, HardLimit) {
   sleeping_task_low.WaitUntilDone();
 }
 
-#ifndef ROCKSDB_LITE
+#if !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
+class WriteStallListener : public EventListener {
+ public:
+  WriteStallListener()
+      : cond_(&mutex_),
+        condition_(WriteStallCondition::kNormal),
+        expected_(WriteStallCondition::kNormal),
+        expected_set_(false) {}
+  void OnStallConditionsChanged(const WriteStallInfo& info) override {
+    MutexLock l(&mutex_);
+    condition_ = info.condition.cur;
+    if (expected_set_ && condition_ == expected_) {
+      cond_.Signal();
+      expected_set_ = false;
+    }
+  }
+  bool CheckCondition(WriteStallCondition expected) {
+    MutexLock l(&mutex_);
+    if (expected != condition_) {
+      expected_ = expected;
+      expected_set_ = true;
+      while (expected != condition_) {
+        // We bail out on timeout 500 milliseconds
+        const uint64_t timeout_us = 500000;
+        if (cond_.TimedWait(timeout_us)) {
+          expected_set_ = false;
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+ private:
+  port::Mutex   mutex_;
+  port::CondVar cond_;
+  WriteStallCondition condition_;
+  WriteStallCondition expected_;
+  bool                expected_set_;
+};
+
 TEST_F(DBTest, SoftLimit) {
   Options options = CurrentOptions();
   options.env = env_;
@@ -4847,6 +5693,8 @@ TEST_F(DBTest, SoftLimit) {
   options.max_bytes_for_level_multiplier = 10;
   options.max_background_compactions = 1;
   options.compression = kNoCompression;
+  WriteStallListener* listener = new WriteStallListener();
+  options.listeners.emplace_back(listener);
 
   Reopen(options);
 
@@ -4854,7 +5702,7 @@ TEST_F(DBTest, SoftLimit) {
   for (int i = 0; i < 72; i++) {
     Put(Key(i), std::string(5000, 'x'));
     if (i % 10 == 0) {
-      Flush();
+      dbfull()->TEST_FlushMemTable(true, true);
     }
   }
   dbfull()->TEST_WaitForCompact();
@@ -4864,7 +5712,7 @@ TEST_F(DBTest, SoftLimit) {
   for (int i = 0; i < 72; i++) {
     Put(Key(i), std::string(5000, 'x'));
     if (i % 10 == 0) {
-      Flush();
+      dbfull()->TEST_FlushMemTable(true, true);
     }
   }
   dbfull()->TEST_WaitForCompact();
@@ -4883,9 +5731,10 @@ TEST_F(DBTest, SoftLimit) {
     Put(Key(i), std::string(5000, 'x'));
     Put(Key(100 - i), std::string(5000, 'x'));
     // Flush the file. File size is around 30KB.
-    Flush();
+    dbfull()->TEST_FlushMemTable(true, true);
   }
   ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
+  ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
 
   sleeping_task_low.WakeUp();
   sleeping_task_low.WaitUntilDone();
@@ -4896,10 +5745,11 @@ TEST_F(DBTest, SoftLimit) {
   // The L1 file size is around 30KB.
   ASSERT_EQ(NumTableFilesAtLevel(1), 1);
   ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
+  ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal));
 
   // Only allow one compactin going through.
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
-      "BackgroundCallCompaction:0", [&](void* arg) {
+      "BackgroundCallCompaction:0", [&](void* /*arg*/) {
         // Schedule a sleeping task.
         sleeping_task_low.Reset();
         env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
@@ -4916,7 +5766,7 @@ TEST_F(DBTest, SoftLimit) {
     Put(Key(10 + i), std::string(5000, 'x'));
     Put(Key(90 - i), std::string(5000, 'x'));
     // Flush the file. File size is around 30KB.
-    Flush();
+    dbfull()->TEST_FlushMemTable(true, true);
   }
 
   // Wake up sleep task to enable compaction to run and waits
@@ -4930,13 +5780,14 @@ TEST_F(DBTest, SoftLimit) {
   // doesn't trigger soft_pending_compaction_bytes_limit
   ASSERT_EQ(NumTableFilesAtLevel(1), 1);
   ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
+  ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal));
 
   // Create 3 L0 files, making score of L0 to be 3, higher than L0.
   for (int i = 0; i < 3; i++) {
     Put(Key(20 + i), std::string(5000, 'x'));
     Put(Key(80 - i), std::string(5000, 'x'));
     // Flush the file. File size is around 30KB.
-    Flush();
+    dbfull()->TEST_FlushMemTable(true, true);
   }
   // Wake up sleep task to enable compaction to run and waits
   // for it to go to sleep state again to make sure one compaction
@@ -4950,11 +5801,13 @@ TEST_F(DBTest, SoftLimit) {
   // triggerring soft_pending_compaction_bytes_limit
   ASSERT_EQ(NumTableFilesAtLevel(1), 1);
   ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
+  ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
 
   sleeping_task_low.WakeUp();
   sleeping_task_low.WaitUntilSleeping();
 
   ASSERT_TRUE(!dbfull()->TEST_write_controler().NeedsDelay());
+  ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kNormal));
 
   // shrink level base so L2 will hit soft limit easier.
   ASSERT_OK(dbfull()->SetOptions({
@@ -4964,6 +5817,7 @@ TEST_F(DBTest, SoftLimit) {
   Put("", "");
   Flush();
   ASSERT_TRUE(dbfull()->TEST_write_controler().NeedsDelay());
+  ASSERT_TRUE(listener->CheckCondition(WriteStallCondition::kDelayed));
 
   sleeping_task_low.WaitUntilSleeping();
   rocksdb::SyncPoint::GetInstance()->DisableProcessing();
@@ -5005,7 +5859,7 @@ TEST_F(DBTest, LastWriteBufferDelay) {
   sleeping_task.WakeUp();
   sleeping_task.WaitUntilDone();
 }
-#endif  // ROCKSDB_LITE
+#endif  // !defined(ROCKSDB_LITE) && !defined(ROCKSDB_DISABLE_STALL_NOTIFICATION)
 
 TEST_F(DBTest, FailWhenCompressionNotSupportedTest) {
   CompressionType compressions[] = {kZlibCompression, kBZip2Compression,
@@ -5047,6 +5901,36 @@ TEST_F(DBTest, RowCache) {
   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_HIT), 1);
   ASSERT_EQ(TestGetTickerCount(options, ROW_CACHE_MISS), 1);
 }
+
+TEST_F(DBTest, PinnableSliceAndRowCache) {
+  Options options = CurrentOptions();
+  options.statistics = rocksdb::CreateDBStatistics();
+  options.row_cache = NewLRUCache(8192);
+  DestroyAndReopen(options);
+
+  ASSERT_OK(Put("foo", "bar"));
+  ASSERT_OK(Flush());
+
+  ASSERT_EQ(Get("foo"), "bar");
+  ASSERT_EQ(
+      reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
+      1);
+
+  {
+    PinnableSlice pin_slice;
+    ASSERT_EQ(Get("foo", &pin_slice), Status::OK());
+    ASSERT_EQ(pin_slice.ToString(), "bar");
+    // Entry is already in cache, lookup will remove the element from lru
+    ASSERT_EQ(
+        reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
+        0);
+  }
+  // After PinnableSlice destruction element is added back in LRU
+  ASSERT_EQ(
+      reinterpret_cast<LRUCache*>(options.row_cache.get())->TEST_GetLRUSize(),
+      1);
+}
+
 #endif  // ROCKSDB_LITE
 
 TEST_F(DBTest, DeletingOldWalAfterDrop) {
@@ -5120,6 +6004,49 @@ TEST_F(DBTest, PauseBackgroundWorkTest) {
   ASSERT_TRUE(done.load());
 }
 
+// Keep spawning short-living threads that create an iterator and quit.
+// Meanwhile in another thread keep flushing memtables.
+// This used to cause a deadlock.
+TEST_F(DBTest, ThreadLocalPtrDeadlock) {
+  std::atomic<int> flushes_done{0};
+  std::atomic<int> threads_destroyed{0};
+  auto done = [&] {
+    return flushes_done.load() > 10;
+  };
+
+  port::Thread flushing_thread([&] {
+    for (int i = 0; !done(); ++i) {
+      ASSERT_OK(db_->Put(WriteOptions(), Slice("hi"),
+                         Slice(std::to_string(i).c_str())));
+      ASSERT_OK(db_->Flush(FlushOptions()));
+      int cnt = ++flushes_done;
+      fprintf(stderr, "Flushed %d times\n", cnt);
+    }
+  });
+
+  std::vector<port::Thread> thread_spawning_threads(10);
+  for (auto& t: thread_spawning_threads) {
+    t = port::Thread([&] {
+      while (!done()) {
+        {
+          port::Thread tmp_thread([&] {
+            auto it = db_->NewIterator(ReadOptions());
+            delete it;
+          });
+          tmp_thread.join();
+        }
+        ++threads_destroyed;
+      }
+    });
+  }
+
+  for (auto& t: thread_spawning_threads) {
+    t.join();
+  }
+  flushing_thread.join();
+  fprintf(stderr, "Done. Flushed %d times, destroyed %d threads\n",
+          flushes_done.load(), threads_destroyed.load());
+}
 }  // namespace rocksdb
 
 int main(int argc, char** argv) {