]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_sst_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / db_sst_test.cc
index 79d9f823b219763ca90aa61f95d3f8291f1750b4..3ccb57baa5ba8042979362117cc90cbf1f223673 100644 (file)
@@ -1,25 +1,57 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 //
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include "db/db_test_util.h"
+#include "file/sst_file_manager_impl.h"
 #include "port/port.h"
 #include "port/stack_trace.h"
 #include "rocksdb/sst_file_manager.h"
-#include "util/sst_file_manager_impl.h"
+#include "util/random.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 class DBSSTTest : public DBTestBase {
  public:
-  DBSSTTest() : DBTestBase("/db_sst_test") {}
+  DBSSTTest() : DBTestBase("/db_sst_test", /*env_do_fsync=*/true) {}
 };
 
+#ifndef ROCKSDB_LITE
+// A class which remembers the name of each flushed file.
+class FlushedFileCollector : public EventListener {
+ public:
+  FlushedFileCollector() {}
+  ~FlushedFileCollector() override {}
+
+  void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
+    std::lock_guard<std::mutex> lock(mutex_);
+    flushed_files_.push_back(info.file_path);
+  }
+
+  std::vector<std::string> GetFlushedFiles() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    std::vector<std::string> result;
+    for (auto fname : flushed_files_) {
+      result.push_back(fname);
+    }
+    return result;
+  }
+  void ClearFlushedFiles() {
+    std::lock_guard<std::mutex> lock(mutex_);
+    flushed_files_.clear();
+  }
+
+ private:
+  std::vector<std::string> flushed_files_;
+  std::mutex mutex_;
+};
+#endif  // ROCKSDB_LITE
+
 TEST_F(DBSSTTest, DontDeletePendingOutputs) {
   Options options;
   options.env = env_;
@@ -71,8 +103,16 @@ TEST_F(DBSSTTest, SSTsWithLdbSuffixHandling) {
   int const num_files = GetSstFileCount(dbname_);
   ASSERT_GT(num_files, 0);
 
+  Reopen(options);
+  std::vector<std::string> values;
+  values.reserve(key_id);
+  for (int k = 0; k < key_id; ++k) {
+    values.push_back(Get(Key(k)));
+  }
+  Close();
+
   std::vector<std::string> filenames;
-  GetSstFiles(dbname_, &filenames);
+  GetSstFiles(env_, dbname_, &filenames);
   int num_ldb_files = 0;
   for (size_t i = 0; i < filenames.size(); ++i) {
     if (i & 1) {
@@ -88,11 +128,26 @@ TEST_F(DBSSTTest, SSTsWithLdbSuffixHandling) {
 
   Reopen(options);
   for (int k = 0; k < key_id; ++k) {
-    ASSERT_NE("NOT_FOUND", Get(Key(k)));
+    ASSERT_EQ(values[k], Get(Key(k)));
   }
   Destroy(options);
 }
 
+// Check that we don't crash when opening DB with
+// DBOptions::skip_checking_sst_file_sizes_on_db_open = true.
+TEST_F(DBSSTTest, SkipCheckingSSTFileSizesOnDBOpen) {
+  ASSERT_OK(Put("pika", "choo"));
+  ASSERT_OK(Flush());
+
+  // Just open the DB with the option set to true and check that we don't crash.
+  Options options;
+  options.env = env_;
+  options.skip_checking_sst_file_sizes_on_db_open = true;
+  Reopen(options);
+
+  ASSERT_EQ("choo", Get("pika"));
+}
+
 #ifndef ROCKSDB_LITE
 TEST_F(DBSSTTest, DontDeleteMovedFile) {
   // This test triggers move compaction and verifies that the file is not
@@ -110,7 +165,7 @@ TEST_F(DBSSTTest, DontDeleteMovedFile) {
   for (int i = 0; i < 2; ++i) {
     // Create 1MB sst file
     for (int j = 0; j < 100; ++j) {
-      ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
+      ASSERT_OK(Put(Key(i * 50 + j), rnd.RandomString(10 * 1024)));
     }
     ASSERT_OK(Flush());
   }
@@ -158,7 +213,7 @@ TEST_F(DBSSTTest, DeleteObsoleteFilesPendingOutputs) {
   for (int i = 0; i < 2; ++i) {
     // Create 1MB sst file
     for (int j = 0; j < 100; ++j) {
-      ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
+      ASSERT_OK(Put(Key(i * 50 + j), rnd.RandomString(10 * 1024)));
     }
     ASSERT_OK(Flush());
   }
@@ -189,7 +244,7 @@ TEST_F(DBSSTTest, DeleteObsoleteFilesPendingOutputs) {
   // write_buffer_size. The flush will be blocked with block_first_time
   // pending_file is protecting all the files created after
   for (int j = 0; j < 256; ++j) {
-    ASSERT_OK(Put(Key(j), RandomString(&rnd, 10 * 1024)));
+    ASSERT_OK(Put(Key(j), rnd.RandomString(10 * 1024)));
   }
   blocking_thread.WaitUntilSleeping();
 
@@ -230,13 +285,14 @@ TEST_F(DBSSTTest, DBWithSstFileManager) {
   int files_added = 0;
   int files_deleted = 0;
   int files_moved = 0;
-  rocksdb::SyncPoint::GetInstance()->SetCallBack(
-      "SstFileManagerImpl::OnAddFile", [&](void* arg) { files_added++; });
-  rocksdb::SyncPoint::GetInstance()->SetCallBack(
-      "SstFileManagerImpl::OnDeleteFile", [&](void* arg) { files_deleted++; });
-  rocksdb::SyncPoint::GetInstance()->SetCallBack(
-      "SstFileManagerImpl::OnMoveFile", [&](void* arg) { files_moved++; });
-  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::OnAddFile", [&](void* /*arg*/) { files_added++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::OnDeleteFile",
+      [&](void* /*arg*/) { files_deleted++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "SstFileManagerImpl::OnMoveFile", [&](void* /*arg*/) { files_moved++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
   Options options = CurrentOptions();
   options.sst_file_manager = sst_file_manager;
@@ -249,11 +305,14 @@ TEST_F(DBSSTTest, DBWithSstFileManager) {
     dbfull()->TEST_WaitForFlushMemTable();
     dbfull()->TEST_WaitForCompact();
     // Verify that we are tracking all sst files in dbname_
-    ASSERT_EQ(sfm->GetTrackedFiles(), GetAllSSTFiles());
+    std::unordered_map<std::string, uint64_t> files_in_db;
+    ASSERT_OK(GetAllSSTFiles(&files_in_db));
+    ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
   }
   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 
-  auto files_in_db = GetAllSSTFiles();
+  std::unordered_map<std::string, uint64_t> files_in_db;
+  ASSERT_OK(GetAllSSTFiles(&files_in_db));
   // Verify that we are tracking all sst files in dbname_
   ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
   // Verify the total files size
@@ -284,66 +343,64 @@ TEST_F(DBSSTTest, DBWithSstFileManager) {
   ASSERT_EQ(sfm->GetTrackedFiles(), files_in_db);
   ASSERT_EQ(sfm->GetTotalSize(), total_files_size);
 
-  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
 TEST_F(DBSSTTest, RateLimitedDelete) {
   Destroy(last_options_);
-  rocksdb::SyncPoint::GetInstance()->LoadDependency({
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
       {"DBSSTTest::RateLimitedDelete:1",
        "DeleteScheduler::BackgroundEmptyTrash"},
   });
 
   std::vector<uint64_t> penalties;
-  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
       "DeleteScheduler::BackgroundEmptyTrash:Wait",
       [&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
-  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
       "InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
         // Turn timed wait into a simulated sleep
         uint64_t* abs_time_us = static_cast<uint64_t*>(arg);
-        int64_t cur_time = 0;
-        env_->GetCurrentTime(&cur_time);
-        if (*abs_time_us > static_cast<uint64_t>(cur_time)) {
-          env_->addon_time_.fetch_add(*abs_time_us -
-                                      static_cast<uint64_t>(cur_time));
+        uint64_t cur_time = env_->NowMicros();
+        if (*abs_time_us > cur_time) {
+          env_->MockSleepForMicroseconds(*abs_time_us - cur_time);
         }
 
-        // Randomly sleep shortly
-        env_->addon_time_.fetch_add(
-            static_cast<uint64_t>(Random::GetTLSInstance()->Uniform(10)));
+        // Plus an additional short, random amount
+        env_->MockSleepForMicroseconds(Random::GetTLSInstance()->Uniform(10));
 
-        // Set wait until time to before current to force not to sleep.
-        int64_t real_cur_time = 0;
-        Env::Default()->GetCurrentTime(&real_cur_time);
-        *abs_time_us = static_cast<uint64_t>(real_cur_time);
+        // Set wait until time to before (actual) current time to force not
+        // to sleep
+        *abs_time_us = Env::Default()->NowMicros();
       });
 
-  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
-  env_->no_slowdown_ = true;
-  env_->time_elapse_only_sleep_ = true;
   Options options = CurrentOptions();
+  SetTimeElapseOnlySleepOnReopen(&options);
   options.disable_auto_compactions = true;
   options.env = env_;
+  options.statistics = CreateDBStatistics();
 
-  std::string trash_dir = test::TmpDir(env_) + "/trash";
   int64_t rate_bytes_per_sec = 1024 * 10;  // 10 Kbs / Sec
   Status s;
   options.sst_file_manager.reset(
-      NewSstFileManager(env_, nullptr, trash_dir, 0, false, &s));
+      NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
   ASSERT_OK(s);
   options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
   auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
+  sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);
 
+  WriteOptions wo;
+  wo.disableWAL = true;
   ASSERT_OK(TryReopen(options));
   // Create 4 files in L0
   for (char v = 'a'; v <= 'd'; v++) {
-    ASSERT_OK(Put("Key2", DummyString(1024, v)));
-    ASSERT_OK(Put("Key3", DummyString(1024, v)));
-    ASSERT_OK(Put("Key4", DummyString(1024, v)));
-    ASSERT_OK(Put("Key1", DummyString(1024, v)));
-    ASSERT_OK(Put("Key4", DummyString(1024, v)));
+    ASSERT_OK(Put("Key2", DummyString(1024, v), wo));
+    ASSERT_OK(Put("Key3", DummyString(1024, v), wo));
+    ASSERT_OK(Put("Key4", DummyString(1024, v), wo));
+    ASSERT_OK(Put("Key1", DummyString(1024, v), wo));
+    ASSERT_OK(Put("Key4", DummyString(1024, v), wo));
     ASSERT_OK(Flush());
   }
   // We created 4 sst files in L0
@@ -354,6 +411,7 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
 
   // Compaction will move the 4 files in L0 to trash and create 1 L1 file
   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
   ASSERT_EQ("0,1", FilesPerLevel(0));
 
   uint64_t delete_start_time = env_->NowMicros();
@@ -372,20 +430,206 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
   }
   ASSERT_GT(time_spent_deleting, expected_penlty * 0.9);
   ASSERT_LT(time_spent_deleting, expected_penlty * 1.1);
+  ASSERT_EQ(4, options.statistics->getAndResetTickerCount(FILES_MARKED_TRASH));
+  ASSERT_EQ(
+      0, options.statistics->getAndResetTickerCount(FILES_DELETED_IMMEDIATELY));
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBSSTTest, RateLimitedWALDelete) {
+  Destroy(last_options_);
+
+  std::vector<uint64_t> penalties;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DeleteScheduler::BackgroundEmptyTrash:Wait",
+      [&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
+
+  Options options = CurrentOptions();
+  options.disable_auto_compactions = true;
+  options.compression = kNoCompression;
+  options.env = env_;
+
+  int64_t rate_bytes_per_sec = 1024 * 10;  // 10 Kbs / Sec
+  Status s;
+  options.sst_file_manager.reset(
+      NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
+  ASSERT_OK(s);
+  options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
+  auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
+  sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
+  SetTimeElapseOnlySleepOnReopen(&options);
+
+  ASSERT_OK(TryReopen(options));
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  // Create 4 files in L0
+  for (char v = 'a'; v <= 'd'; v++) {
+    ASSERT_OK(Put("Key2", DummyString(1024, v)));
+    ASSERT_OK(Put("Key3", DummyString(1024, v)));
+    ASSERT_OK(Put("Key4", DummyString(1024, v)));
+    ASSERT_OK(Put("Key1", DummyString(1024, v)));
+    ASSERT_OK(Put("Key4", DummyString(1024, v)));
+    ASSERT_OK(Flush());
+  }
+  // We created 4 sst files in L0
+  ASSERT_EQ("4", FilesPerLevel(0));
 
-  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  // Compaction will move the 4 files in L0 to trash and create 1 L1 file
+  CompactRangeOptions cro;
+  cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+  ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+  ASSERT_EQ("0,1", FilesPerLevel(0));
+
+  sfm->WaitForEmptyTrash();
+  ASSERT_EQ(penalties.size(), 8);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
+class DBWALTestWithParam
+    : public DBSSTTest,
+      public testing::WithParamInterface<std::tuple<std::string, bool>> {
+ public:
+  DBWALTestWithParam() {
+    wal_dir_ = std::get<0>(GetParam());
+    wal_dir_same_as_dbname_ = std::get<1>(GetParam());
+  }
+
+  std::string wal_dir_;
+  bool wal_dir_same_as_dbname_;
+};
+
+TEST_P(DBWALTestWithParam, WALTrashCleanupOnOpen) {
+  class MyEnv : public EnvWrapper {
+   public:
+    MyEnv(Env* t) : EnvWrapper(t), fake_log_delete(false) {}
+
+    Status DeleteFile(const std::string& fname) {
+      if (fname.find(".log.trash") != std::string::npos && fake_log_delete) {
+        return Status::OK();
+      }
+
+      return target()->DeleteFile(fname);
+    }
+
+    void set_fake_log_delete(bool fake) { fake_log_delete = fake; }
+
+   private:
+    bool fake_log_delete;
+  };
+
+  std::unique_ptr<MyEnv> env(new MyEnv(env_));
+  Destroy(last_options_);
+
+  env->set_fake_log_delete(true);
+
+  Options options = CurrentOptions();
+  options.disable_auto_compactions = true;
+  options.compression = kNoCompression;
+  options.env = env.get();
+  options.wal_dir = dbname_ + wal_dir_;
+
+  int64_t rate_bytes_per_sec = 1024 * 10;  // 10 Kbs / Sec
+  Status s;
+  options.sst_file_manager.reset(
+      NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
+  ASSERT_OK(s);
+  options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
+  auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
+  sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
+
+  ASSERT_OK(TryReopen(options));
+
+  // Create 4 files in L0
+  for (char v = 'a'; v <= 'd'; v++) {
+    ASSERT_OK(Put("Key2", DummyString(1024, v)));
+    ASSERT_OK(Put("Key3", DummyString(1024, v)));
+    ASSERT_OK(Put("Key4", DummyString(1024, v)));
+    ASSERT_OK(Put("Key1", DummyString(1024, v)));
+    ASSERT_OK(Put("Key4", DummyString(1024, v)));
+    ASSERT_OK(Flush());
+  }
+  // We created 4 sst files in L0
+  ASSERT_EQ("4", FilesPerLevel(0));
+
+  Close();
+
+  options.sst_file_manager.reset();
+  std::vector<std::string> filenames;
+  int trash_log_count = 0;
+  if (!wal_dir_same_as_dbname_) {
+    // Forcibly create some trash log files
+    std::unique_ptr<WritableFile> result;
+    env->NewWritableFile(options.wal_dir + "/1000.log.trash", &result,
+                         EnvOptions());
+    result.reset();
+  }
+  env->GetChildren(options.wal_dir, &filenames);
+  for (const std::string& fname : filenames) {
+    if (fname.find(".log.trash") != std::string::npos) {
+      trash_log_count++;
+    }
+  }
+  ASSERT_GE(trash_log_count, 1);
+
+  env->set_fake_log_delete(false);
+  ASSERT_OK(TryReopen(options));
+
+  filenames.clear();
+  trash_log_count = 0;
+  env->GetChildren(options.wal_dir, &filenames);
+  for (const std::string& fname : filenames) {
+    if (fname.find(".log.trash") != std::string::npos) {
+      trash_log_count++;
+    }
+  }
+  ASSERT_EQ(trash_log_count, 0);
+  Close();
+}
+
+INSTANTIATE_TEST_CASE_P(DBWALTestWithParam, DBWALTestWithParam,
+                        ::testing::Values(std::make_tuple("", true),
+                                          std::make_tuple("_wal_dir", false)));
+
+TEST_F(DBSSTTest, OpenDBWithExistingTrash) {
+  Options options = CurrentOptions();
+
+  options.sst_file_manager.reset(
+      NewSstFileManager(env_, nullptr, "", 1024 * 1024 /* 1 MB/sec */));
+  auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
+
+  Destroy(last_options_);
+
+  // Add some trash files to the db directory so the DB can clean them up
+  env_->CreateDirIfMissing(dbname_);
+  ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "001.sst.trash"));
+  ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "002.sst.trash"));
+  ASSERT_OK(WriteStringToFile(env_, "abc", dbname_ + "/" + "003.sst.trash"));
+
+  // Reopen the DB and verify that it deletes existing trash files
+  ASSERT_OK(TryReopen(options));
+  sfm->WaitForEmptyTrash();
+  ASSERT_NOK(env_->FileExists(dbname_ + "/" + "001.sst.trash"));
+  ASSERT_NOK(env_->FileExists(dbname_ + "/" + "002.sst.trash"));
+  ASSERT_NOK(env_->FileExists(dbname_ + "/" + "003.sst.trash"));
+}
+
+
 // Create a DB with 2 db_paths, and generate multiple files in the 2
 // db_paths using CompactRangeOptions, make sure that files that were
 // deleted from first db_path were deleted using DeleteScheduler and
 // files in the second path were not.
 TEST_F(DBSSTTest, DeleteSchedulerMultipleDBPaths) {
-  int bg_delete_file = 0;
-  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+  std::atomic<int> bg_delete_file(0);
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
       "DeleteScheduler::DeleteTrashFile:DeleteFile",
-      [&](void* arg) { bg_delete_file++; });
-  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+      [&](void* /*arg*/) { bg_delete_file++; });
+  // The deletion scheduler sometimes skips marking file as trash according to
+  // a heuristic. In that case the deletion will go through the below SyncPoint.
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DeleteScheduler::DeleteFile", [&](void* /*arg*/) { bg_delete_file++; });
 
   Options options = CurrentOptions();
   options.disable_auto_compactions = true;
@@ -393,19 +637,24 @@ TEST_F(DBSSTTest, DeleteSchedulerMultipleDBPaths) {
   options.db_paths.emplace_back(dbname_ + "_2", 1024 * 100);
   options.env = env_;
 
-  std::string trash_dir = test::TmpDir(env_) + "/trash";
   int64_t rate_bytes_per_sec = 1024 * 1024;  // 1 Mb / Sec
   Status s;
-  options.sst_file_manager.reset(NewSstFileManager(
-      env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s));
+  options.sst_file_manager.reset(
+      NewSstFileManager(env_, nullptr, "", rate_bytes_per_sec, false, &s,
+                        /* max_trash_db_ratio= */ 1.1));
+
   ASSERT_OK(s);
   auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
 
   DestroyAndReopen(options);
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  WriteOptions wo;
+  wo.disableWAL = true;
 
   // Create 4 files in L0
   for (int i = 0; i < 4; i++) {
-    ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A')));
+    ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'A'), wo));
     ASSERT_OK(Flush());
   }
   // We created 4 sst files in L0
@@ -421,7 +670,7 @@ TEST_F(DBSSTTest, DeleteSchedulerMultipleDBPaths) {
 
   // Create 4 files in L0
   for (int i = 4; i < 8; i++) {
-    ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'B')));
+    ASSERT_OK(Put("Key" + ToString(i), DummyString(1024, 'B'), wo));
     ASSERT_OK(Flush());
   }
   ASSERT_EQ("4,1", FilesPerLevel(0));
@@ -436,27 +685,33 @@ TEST_F(DBSSTTest, DeleteSchedulerMultipleDBPaths) {
   sfm->WaitForEmptyTrash();
   ASSERT_EQ(bg_delete_file, 8);
 
+  // Compaction will delete both files and regenerate a file in L1 in second
+  // db path. The deleted files should still be cleaned up via delete scheduler.
   compact_options.bottommost_level_compaction =
-      BottommostLevelCompaction::kForce;
+      BottommostLevelCompaction::kForceOptimized;
   ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
   ASSERT_EQ("0,1", FilesPerLevel(0));
 
   sfm->WaitForEmptyTrash();
-  ASSERT_EQ(bg_delete_file, 8);
+  ASSERT_EQ(bg_delete_file, 10);
 
-  rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
 TEST_F(DBSSTTest, DestroyDBWithRateLimitedDelete) {
   int bg_delete_file = 0;
-  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
       "DeleteScheduler::DeleteTrashFile:DeleteFile",
-      [&](void* arg) { bg_delete_file++; });
-  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+      [&](void* /*arg*/) { bg_delete_file++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
 
+  Status s;
   Options options = CurrentOptions();
   options.disable_auto_compactions = true;
   options.env = env_;
+  options.sst_file_manager.reset(
+      NewSstFileManager(env_, nullptr, "", 0, false, &s, 0));
+  ASSERT_OK(s);
   DestroyAndReopen(options);
 
   // Create 4 files in L0
@@ -469,18 +724,28 @@ TEST_F(DBSSTTest, DestroyDBWithRateLimitedDelete) {
 
   // Close DB and destroy it using DeleteScheduler
   Close();
-  std::string trash_dir = test::TmpDir(env_) + "/trash";
-  int64_t rate_bytes_per_sec = 1024 * 1024;  // 1 Mb / Sec
-  Status s;
-  options.sst_file_manager.reset(NewSstFileManager(
-      env_, nullptr, trash_dir, rate_bytes_per_sec, false, &s));
-  ASSERT_OK(s);
-  ASSERT_OK(DestroyDB(dbname_, options));
+
+  int num_sst_files = 0;
+  int num_wal_files = 0;
+  std::vector<std::string> db_files;
+  env_->GetChildren(dbname_, &db_files);
+  for (std::string f : db_files) {
+    if (f.substr(f.find_last_of(".") + 1) == "sst") {
+      num_sst_files++;
+    } else if (f.substr(f.find_last_of(".") + 1) == "log") {
+      num_wal_files++;
+    }
+  }
+  ASSERT_GT(num_sst_files, 0);
+  ASSERT_GT(num_wal_files, 0);
 
   auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
+
+  sfm->SetDeleteRateBytesPerSecond(1024 * 1024);
+  sfm->delete_scheduler()->SetMaxTrashDBRatio(1.1);
+  ASSERT_OK(DestroyDB(dbname_, options));
   sfm->WaitForEmptyTrash();
-  // We have deleted the 4 sst files in the delete_scheduler
-  ASSERT_EQ(bg_delete_file, 4);
+  ASSERT_EQ(bg_delete_file, num_sst_files + num_wal_files);
 }
 
 TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {
@@ -496,12 +761,13 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {
 
   // Generate a file containing 100 keys.
   for (int i = 0; i < 100; i++) {
-    ASSERT_OK(Put(Key(i), RandomString(&rnd, 50)));
+    ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
   }
   ASSERT_OK(Flush());
 
   uint64_t first_file_size = 0;
-  auto files_in_db = GetAllSSTFiles(&first_file_size);
+  std::unordered_map<std::string, uint64_t> files_in_db;
+  ASSERT_OK(GetAllSSTFiles(&files_in_db, &first_file_size));
   ASSERT_EQ(sfm->GetTotalSize(), first_file_size);
 
   // Set the maximum allowed space usage to the current total size
@@ -512,49 +778,173 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowed) {
   ASSERT_NOK(Flush());
 }
 
+TEST_F(DBSSTTest, CancellingCompactionsWorks) {
+  std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
+  auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+
+  Options options = CurrentOptions();
+  options.sst_file_manager = sst_file_manager;
+  options.level0_file_num_compaction_trigger = 2;
+  options.statistics = CreateDBStatistics();
+  DestroyAndReopen(options);
+
+  int completed_compactions = 0;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* /*arg*/) {
+        sfm->SetMaxAllowedSpaceUsage(0);
+        ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::BackgroundCompaction:NonTrivial:AfterRun",
+      [&](void* /*arg*/) { completed_compactions++; });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+  Random rnd(301);
+
+  // Generate a file containing 10 keys.
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
+  }
+  ASSERT_OK(Flush());
+  uint64_t total_file_size = 0;
+  std::unordered_map<std::string, uint64_t> files_in_db;
+  ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size));
+  // Set the maximum allowed space usage to the current total size
+  sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
+
+  // Generate another file to trigger compaction.
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
+  }
+  ASSERT_OK(Flush());
+  dbfull()->TEST_WaitForCompact(true);
+
+  // Because we set a callback in CancelledCompaction, we actually
+  // let the compaction run
+  ASSERT_GT(completed_compactions, 0);
+  ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
+  // Make sure the stat is bumped
+  ASSERT_GT(dbfull()->immutable_db_options().statistics.get()->getTickerCount(COMPACTION_CANCELLED), 0);
+  ASSERT_EQ(0,
+            dbfull()->immutable_db_options().statistics.get()->getTickerCount(
+                FILES_MARKED_TRASH));
+  ASSERT_EQ(4,
+            dbfull()->immutable_db_options().statistics.get()->getTickerCount(
+                FILES_DELETED_IMMEDIATELY));
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(DBSSTTest, CancellingManualCompactionsWorks) {
+  std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
+  auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
+
+  Options options = CurrentOptions();
+  options.sst_file_manager = sst_file_manager;
+  options.statistics = CreateDBStatistics();
+
+  FlushedFileCollector* collector = new FlushedFileCollector();
+  options.listeners.emplace_back(collector);
+
+  DestroyAndReopen(options);
+
+  Random rnd(301);
+
+  // Generate a file containing 10 keys.
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
+  }
+  ASSERT_OK(Flush());
+  uint64_t total_file_size = 0;
+  std::unordered_map<std::string, uint64_t> files_in_db;
+  ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_file_size));
+  // Set the maximum allowed space usage to the current total size
+  sfm->SetMaxAllowedSpaceUsage(2 * total_file_size + 1);
+
+  // Generate another file to trigger compaction.
+  for (int i = 0; i < 10; i++) {
+    ASSERT_OK(Put(Key(i), rnd.RandomString(50)));
+  }
+  ASSERT_OK(Flush());
+
+  // OK, now trigger a manual compaction
+  dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+
+  // Wait for manual compaction to get scheduled and finish
+  dbfull()->TEST_WaitForCompact(true);
+
+  ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
+  // Make sure the stat is bumped
+  ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
+                COMPACTION_CANCELLED),
+            1);
+
+  // Now make sure CompactFiles also gets cancelled
+  auto l0_files = collector->GetFlushedFiles();
+  dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0);
+
+  // Wait for manual compaction to get scheduled and finish
+  dbfull()->TEST_WaitForCompact(true);
+
+  ASSERT_EQ(dbfull()->immutable_db_options().statistics.get()->getTickerCount(
+                COMPACTION_CANCELLED),
+            2);
+  ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
+
+  // Now let the flush through and make sure GetCompactionsReservedSize
+  // returns to normal
+  sfm->SetMaxAllowedSpaceUsage(0);
+  int completed_compactions = 0;
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "CompactFilesImpl:End", [&](void* /*arg*/) { completed_compactions++; });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+  dbfull()->CompactFiles(ROCKSDB_NAMESPACE::CompactionOptions(), l0_files, 0);
+  dbfull()->TEST_WaitForCompact(true);
+
+  ASSERT_EQ(sfm->GetCompactionsReservedSize(), 0);
+  ASSERT_GT(completed_compactions, 0);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+}
+
 TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) {
   // This test will set a maximum allowed space for the DB, then it will
   // keep filling the DB until the limit is reached and bg_error_ is set.
   // When bg_error_ is set we will verify that the DB size is greater
   // than the limit.
 
-  std::vector<int> max_space_limits_mbs = {1, 2, 4, 8, 10};
-  decltype(max_space_limits_mbs)::value_type limit_mb_cb;
-  bool bg_error_set = false;
-  uint64_t total_sst_files_size = 0;
+  std::vector<int> max_space_limits_mbs = {1, 10};
+  std::atomic<bool> bg_error_set(false);
 
-  std::atomic<int> estimate_multiplier(1);
-  int reached_max_space_on_flush = 0;
-  int reached_max_space_on_compaction = 0;
-  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+  std::atomic<int> reached_max_space_on_flush(0);
+  std::atomic<int> reached_max_space_on_compaction(0);
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
       "DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
       [&](void* arg) {
         Status* bg_error = static_cast<Status*>(arg);
         bg_error_set = true;
-        GetAllSSTFiles(&total_sst_files_size);
         reached_max_space_on_flush++;
-        // low limit for size calculated using sst files
-        ASSERT_GE(total_sst_files_size, limit_mb_cb * 1024 * 1024);
         // clear error to ensure compaction callback is called
         *bg_error = Status::OK();
-        estimate_multiplier++;  // used in the main loop assert
       });
 
-  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "DBImpl::BackgroundCompaction():CancelledCompaction", [&](void* arg) {
+        bool* enough_room = static_cast<bool*>(arg);
+        *enough_room = true;
+      });
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
       "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached",
-      [&](void* arg) {
+      [&](void* /*arg*/) {
         bg_error_set = true;
-        GetAllSSTFiles(&total_sst_files_size);
         reached_max_space_on_compaction++;
       });
 
   for (auto limit_mb : max_space_limits_mbs) {
     bg_error_set = false;
-    total_sst_files_size = 0;
-    estimate_multiplier = 1;
-    limit_mb_cb = limit_mb;
-    rocksdb::SyncPoint::GetInstance()->ClearTrace();
-    rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearTrace();
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
     std::shared_ptr<SstFileManager> sst_file_manager(NewSstFileManager(env_));
     auto sfm = static_cast<SstFileManagerImpl*>(sst_file_manager.get());
 
@@ -566,23 +956,20 @@ TEST_F(DBSSTTest, DBWithMaxSpaceAllowedRandomized) {
 
     sfm->SetMaxAllowedSpaceUsage(limit_mb * 1024 * 1024);
 
-    int keys_written = 0;
-    uint64_t estimated_db_size = 0;
+    // It is easy to detect if the test is stuck in a loop. No need for
+    // complex termination logic.
     while (true) {
-      auto s = Put(RandomString(&rnd, 10), RandomString(&rnd, 50));
+      auto s = Put(rnd.RandomString(10), rnd.RandomString(50));
       if (!s.ok()) {
         break;
       }
-      keys_written++;
-      // Check the estimated db size vs the db limit just to make sure we
-      // dont run into an infinite loop
-      estimated_db_size = keys_written * 60;  // ~60 bytes per key
-      ASSERT_LT(estimated_db_size,
-                estimate_multiplier * limit_mb * 1024 * 1024 * 2);
     }
     ASSERT_TRUE(bg_error_set);
+    uint64_t total_sst_files_size = 0;
+    std::unordered_map<std::string, uint64_t> files_in_db;
+    ASSERT_OK(GetAllSSTFiles(&files_in_db, &total_sst_files_size));
     ASSERT_GE(total_sst_files_size, limit_mb * 1024 * 1024);
-    rocksdb::SyncPoint::GetInstance()->DisableProcessing();
+    ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
   }
 
   ASSERT_GT(reached_max_space_on_flush, 0);
@@ -646,6 +1033,18 @@ TEST_F(DBSSTTest, OpenDBWithInfiniteMaxOpenFiles) {
 }
 
 TEST_F(DBSSTTest, GetTotalSstFilesSize) {
+  // We don't propagate oldest-key-time table property on compaction and
+  // just write 0 as default value. This affect the exact table size, since
+  // we encode table properties as varint64. Force time to be 0 to work around
+  // it. Should remove the workaround after we propagate the property on
+  // compaction.
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+      "FlushJob::WriteLevel0Table:oldest_ancester_time", [&](void* arg) {
+        uint64_t* current_time = static_cast<uint64_t*>(arg);
+        *current_time = 0;
+      });
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
   Options options = CurrentOptions();
   options.disable_auto_compactions = true;
   options.compression = kNoCompression;
@@ -735,6 +1134,8 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) {
   // Live SST files = 0
   // Total SST files = 0
   ASSERT_EQ(total_sst_files_size, 0);
+
+  ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
 }
 
 TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
@@ -824,10 +1225,10 @@ TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {
 
 #endif  // ROCKSDB_LITE
 
-}  // namespace rocksdb
+}  // namespace ROCKSDB_NAMESPACE
 
 int main(int argc, char** argv) {
-  rocksdb::port::InstallStackTraceHandler();
+  ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
   ::testing::InitGoogleTest(&argc, argv);
   return RUN_ALL_TESTS();
 }