]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_compaction_test.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / db_compaction_test.cc
index 5136b03921f90e14953890edbda192f6969646b6..df51ef2ca2a410c5b43733376fe8aa9719922342 100644 (file)
@@ -8,11 +8,15 @@
 // found in the LICENSE file. See the AUTHORS file for names of contributors.
 
 #include "db/db_test_util.h"
-#include "port/stack_trace.h"
 #include "port/port.h"
+#include "port/stack_trace.h"
+#include "rocksdb/concurrent_task_limiter.h"
 #include "rocksdb/experimental.h"
 #include "rocksdb/utilities/convenience.h"
+#include "util/concurrent_task_limiter_impl.h"
+#include "util/fault_injection_test_env.h"
 #include "util/sync_point.h"
+
 namespace rocksdb {
 
 // SYNC_POINT is not supported in released Windows mode.
@@ -51,9 +55,9 @@ namespace {
 class FlushedFileCollector : public EventListener {
  public:
   FlushedFileCollector() {}
-  ~FlushedFileCollector() {}
+  ~FlushedFileCollector() override {}
 
-  virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
+  void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
     std::lock_guard<std::mutex> lock(mutex_);
     flushed_files_.push_back(info.file_path);
   }
@@ -83,24 +87,23 @@ public:
     }
   }
 
-  ~CompactionStatsCollector() {}
+  ~CompactionStatsCollector() override {}
 
-  virtual void OnCompactionCompleted(DB* /* db */,
-      const CompactionJobInfo& info) override {
+  void OnCompactionCompleted(DB* /* db */,
+                             const CompactionJobInfo& info) override {
     int k = static_cast<int>(info.compaction_reason);
     int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
     assert(k >= 0 && k < num_of_reasons);
     compaction_completed_[k]++;
   }
 
-  virtual void OnExternalFileIngested(DB* /* db */,
-      const ExternalFileIngestionInfo& /* info */) override {
+  void OnExternalFileIngested(
+      DB* /* db */, const ExternalFileIngestionInfo& /* info */) override {
     int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
     compaction_completed_[k]++;
   }
 
-  virtual void OnFlushCompleted(DB* /* db */,
-      const FlushJobInfo& /* info */) override {
+  void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override {
     int k = static_cast<int>(CompactionReason::kFlush);
     compaction_completed_[k]++;
   }
@@ -410,6 +413,7 @@ TEST_F(DBCompactionTest, SkipStatsUpdateTest) {
 
   // Reopen the DB with stats-update disabled
   options.skip_stats_update_on_db_open = true;
+  options.max_open_files = 20;
   env_->random_file_open_counter_.store(0);
   Reopen(options);
 
@@ -435,7 +439,7 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
   Options options = CurrentOptions();
   options.env = env_;
   options.new_table_reader_for_compaction_inputs = true;
-  options.max_open_files = 100;
+  options.max_open_files = 20;
   options.level0_file_num_compaction_trigger = 3;
   DestroyAndReopen(options);
   Random rnd(301);
@@ -464,15 +468,16 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
       Flush();
       dbfull()->TEST_WaitForCompact();
       // preloading iterator issues one table cache lookup and create
-      // a new table reader.
-      ASSERT_EQ(num_table_cache_lookup, 1);
+      // a new table reader, if not preloaded.
+      int old_num_table_cache_lookup = num_table_cache_lookup;
+      ASSERT_GE(num_table_cache_lookup, 1);
       ASSERT_EQ(num_new_table_reader, 1);
 
       num_table_cache_lookup = 0;
       num_new_table_reader = 0;
       ASSERT_EQ(Key(k), Get(Key(k)));
       // lookup iterator from table cache and no need to create a new one.
-      ASSERT_EQ(num_table_cache_lookup, 1);
+      ASSERT_EQ(old_num_table_cache_lookup + num_table_cache_lookup, 2);
       ASSERT_EQ(num_new_table_reader, 0);
     }
   }
@@ -485,7 +490,10 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
   // a new table reader. One file is created for flush and one for compaction.
   // Compaction inputs make no table cache look-up for data/range deletion
   // iterators
-  ASSERT_EQ(num_table_cache_lookup, 2);
+  // May preload table cache too.
+  ASSERT_GE(num_table_cache_lookup, 2);
+  int old_num_table_cache_lookup2 = num_table_cache_lookup;
+
   // Create new iterator for:
   // (1) 1 for verifying flush results
   // (2) 3 for compaction input files
@@ -495,7 +503,7 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
   num_table_cache_lookup = 0;
   num_new_table_reader = 0;
   ASSERT_EQ(Key(1), Get(Key(1)));
-  ASSERT_EQ(num_table_cache_lookup, 1);
+  ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 3);
   ASSERT_EQ(num_new_table_reader, 0);
 
   num_table_cache_lookup = 0;
@@ -507,14 +515,16 @@ TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
   db_->CompactRange(cro, nullptr, nullptr);
   // Only verifying compaction outputs issues one table cache lookup
   // for both data block and range deletion block).
-  ASSERT_EQ(num_table_cache_lookup, 1);
+  // May preload table cache too.
+  ASSERT_GE(num_table_cache_lookup, 1);
+  old_num_table_cache_lookup2 = num_table_cache_lookup;
   // One for compaction input, one for verifying compaction results.
   ASSERT_EQ(num_new_table_reader, 2);
 
   num_table_cache_lookup = 0;
   num_new_table_reader = 0;
   ASSERT_EQ(Key(1), Get(Key(1)));
-  ASSERT_EQ(num_table_cache_lookup, 1);
+  ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 2);
   ASSERT_EQ(num_new_table_reader, 0);
 
   rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
@@ -1592,7 +1602,8 @@ TEST_F(DBCompactionTest, DeleteFileRange) {
   // Note that we don't delete level 0 files
   compact_options.change_level = true;
   compact_options.target_level = 1;
-  ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
+  ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
+  dbfull()->TEST_WaitForCompact();
 
   ASSERT_OK(
       DeleteFilesInRange(db_, db_->DefaultColumnFamily(), nullptr, nullptr));
@@ -2953,6 +2964,12 @@ TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) {
   dbfull()->TEST_WaitForCompact();
 }
 
+static std::string ShortKey(int i) {
+  assert(i < 10000);
+  char buf[100];
+  snprintf(buf, sizeof(buf), "key%04d", i);
+  return std::string(buf);
+}
 
 TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
   int32_t trivial_move = 0;
@@ -2965,10 +2982,28 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
       [&](void* /*arg*/) { non_trivial_move++; });
   rocksdb::SyncPoint::GetInstance()->EnableProcessing();
 
+  // The key size is guaranteed to be <= 8
+  class ShortKeyComparator : public Comparator {
+    int Compare(const rocksdb::Slice& a,
+                const rocksdb::Slice& b) const override {
+      assert(a.size() <= 8);
+      assert(b.size() <= 8);
+      return BytewiseComparator()->Compare(a, b);
+    }
+    const char* Name() const override { return "ShortKeyComparator"; }
+    void FindShortestSeparator(std::string* start,
+                               const rocksdb::Slice& limit) const override {
+      return BytewiseComparator()->FindShortestSeparator(start, limit);
+    }
+    void FindShortSuccessor(std::string* key) const override {
+      return BytewiseComparator()->FindShortSuccessor(key);
+    }
+  } short_key_cmp;
   Options options = CurrentOptions();
   options.target_file_size_base = 100000000;
   options.write_buffer_size = 100000000;
   options.max_subcompactions = max_subcompactions_;
+  options.comparator = &short_key_cmp;
   DestroyAndReopen(options);
 
   int32_t value_size = 10 * 1024;  // 10 KB
@@ -2978,7 +3013,7 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
   // File with keys [ 0 => 99 ]
   for (int i = 0; i < 100; i++) {
     values.push_back(RandomString(&rnd, value_size));
-    ASSERT_OK(Put(Key(i), values[i]));
+    ASSERT_OK(Put(ShortKey(i), values[i]));
   }
   ASSERT_OK(Flush());
 
@@ -2995,7 +3030,7 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
   // File with keys [ 100 => 199 ]
   for (int i = 100; i < 200; i++) {
     values.push_back(RandomString(&rnd, value_size));
-    ASSERT_OK(Put(Key(i), values[i]));
+    ASSERT_OK(Put(ShortKey(i), values[i]));
   }
   ASSERT_OK(Flush());
 
@@ -3013,7 +3048,7 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
   // File with keys [ 200 => 299 ]
   for (int i = 200; i < 300; i++) {
     values.push_back(RandomString(&rnd, value_size));
-    ASSERT_OK(Put(Key(i), values[i]));
+    ASSERT_OK(Put(ShortKey(i), values[i]));
   }
   ASSERT_OK(Flush());
 
@@ -3031,7 +3066,7 @@ TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
   ASSERT_EQ(non_trivial_move, 0);
 
   for (int i = 0; i < 300; i++) {
-    ASSERT_EQ(Get(Key(i)), values[i]);
+    ASSERT_EQ(Get(ShortKey(i)), values[i]);
   }
 
   rocksdb::SyncPoint::GetInstance()->DisableProcessing();
@@ -3313,7 +3348,7 @@ TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
   options.level0_file_num_compaction_trigger = kNumLevelFiles;
   // inflate it a bit to account for key/metadata overhead
   options.target_file_size_base = 120 * kNumKeysPerFile * kValueSize / 100;
-  Reopen(options);
+  CreateAndReopenWithCF({"one"}, options);
 
   Random rnd(301);
   const Snapshot* snapshot = nullptr;
@@ -3344,10 +3379,12 @@ TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
   // just need to bump seqnum so ReleaseSnapshot knows the newest key in the SST
   // files does not need to be preserved in case of a future snapshot.
   ASSERT_OK(Put(Key(0), "val"));
+  ASSERT_NE(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
   // release snapshot and wait for compactions to finish. Single-file
   // compactions should be triggered, which reduce the size of each bottom-level
   // file without changing file count.
   db_->ReleaseSnapshot(snapshot);
+  ASSERT_EQ(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
   rocksdb::SyncPoint::GetInstance()->SetCallBack(
       "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
         Compaction* compaction = reinterpret_cast<Compaction*>(arg);
@@ -3864,6 +3901,192 @@ TEST_F(DBCompactionTest, CompactionHasEmptyOutput) {
   ASSERT_EQ(2, collector->num_ssts_creation_started());
 }
 
+TEST_F(DBCompactionTest, CompactionLimiter) {
+  const int kNumKeysPerFile = 10;
+  const int kMaxBackgroundThreads = 64;
+
+  struct CompactionLimiter {
+    std::string name;
+    int limit_tasks;
+    int max_tasks;
+    int tasks;
+    std::shared_ptr<ConcurrentTaskLimiter> limiter;
+  };
+
+  std::vector<CompactionLimiter> limiter_settings;
+  limiter_settings.push_back({"limiter_1", 1, 0, 0, nullptr});
+  limiter_settings.push_back({"limiter_2", 2, 0, 0, nullptr});
+  limiter_settings.push_back({"limiter_3", 3, 0, 0, nullptr});
+
+  for (auto& ls : limiter_settings) {
+    ls.limiter.reset(NewConcurrentTaskLimiter(ls.name, ls.limit_tasks));
+  }
+
+  std::shared_ptr<ConcurrentTaskLimiter> unique_limiter(
+    NewConcurrentTaskLimiter("unique_limiter", -1));
+
+  const char* cf_names[] = {"default", "0", "1", "2", "3", "4", "5",
+    "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
+  const int cf_count = sizeof cf_names / sizeof cf_names[0];
+
+  std::unordered_map<std::string, CompactionLimiter*> cf_to_limiter;
+
+  Options options = CurrentOptions();
+  options.write_buffer_size = 110 * 1024;  // 110KB
+  options.arena_block_size = 4096;
+  options.num_levels = 3;
+  options.level0_file_num_compaction_trigger = 4;
+  options.level0_slowdown_writes_trigger = 64;
+  options.level0_stop_writes_trigger = 64;
+  options.max_background_jobs = kMaxBackgroundThreads; // Enough threads
+  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+  options.max_write_buffer_number = 10; // Enough memtables
+  DestroyAndReopen(options);
+
+  std::vector<Options> option_vector;
+  option_vector.reserve(cf_count);
+
+  for (int cf = 0; cf < cf_count; cf++) {
+    ColumnFamilyOptions cf_opt(options);
+    if (cf == 0) {
+      // "Default" CF does't use compaction limiter
+      cf_opt.compaction_thread_limiter = nullptr;
+    } else if (cf == 1) {
+      // "1" CF uses bypass compaction limiter
+      unique_limiter->SetMaxOutstandingTask(-1);
+      cf_opt.compaction_thread_limiter = unique_limiter;
+    } else {
+      // Assign limiter by mod
+      auto& ls = limiter_settings[cf % 3];
+      cf_opt.compaction_thread_limiter = ls.limiter;
+      cf_to_limiter[cf_names[cf]] = &ls;
+    }
+    option_vector.emplace_back(DBOptions(options), cf_opt);
+  }
+
+  for (int cf = 1; cf < cf_count; cf++) {
+    CreateColumnFamilies({cf_names[cf]}, option_vector[cf]);
+  }
+
+  ReopenWithColumnFamilies(std::vector<std::string>(cf_names,
+                                                    cf_names + cf_count),
+                           option_vector);
+
+  port::Mutex mutex;
+
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+    "DBImpl::BackgroundCompaction:BeforeCompaction", [&](void* arg) {
+      const auto& cf_name = static_cast<ColumnFamilyData*>(arg)->GetName();
+      auto iter = cf_to_limiter.find(cf_name);
+      if (iter != cf_to_limiter.end()) {
+        MutexLock l(&mutex);
+        ASSERT_GE(iter->second->limit_tasks, ++iter->second->tasks);
+        iter->second->max_tasks = std::max(iter->second->max_tasks,
+              iter->second->limit_tasks);
+      }
+    });
+
+  rocksdb::SyncPoint::GetInstance()->SetCallBack(
+    "DBImpl::BackgroundCompaction:AfterCompaction", [&](void* arg) {
+      const auto& cf_name = static_cast<ColumnFamilyData*>(arg)->GetName();
+      auto iter = cf_to_limiter.find(cf_name);
+      if (iter != cf_to_limiter.end()) {
+        MutexLock l(&mutex);
+        ASSERT_GE(--iter->second->tasks, 0);
+      }
+    });
+
+  rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+  // Block all compact threads in thread pool.
+  const size_t kTotalFlushTasks = kMaxBackgroundThreads / 4;
+  const size_t kTotalCompactTasks = kMaxBackgroundThreads - kTotalFlushTasks;
+  env_->SetBackgroundThreads((int)kTotalFlushTasks, Env::HIGH);
+  env_->SetBackgroundThreads((int)kTotalCompactTasks, Env::LOW);
+
+  test::SleepingBackgroundTask sleeping_compact_tasks[kTotalCompactTasks];
+
+  // Block all compaction threads in thread pool.
+  for (size_t i = 0; i < kTotalCompactTasks; i++) {
+    env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
+                   &sleeping_compact_tasks[i], Env::LOW);
+    sleeping_compact_tasks[i].WaitUntilSleeping();
+  }
+
+  int keyIndex = 0;
+
+  for (int n = 0; n < options.level0_file_num_compaction_trigger; n++) {
+    for (int cf = 0; cf < cf_count; cf++) {
+      for (int i = 0; i < kNumKeysPerFile; i++) {
+        ASSERT_OK(Put(cf, Key(keyIndex++), ""));
+      }
+      // put extra key to trigger flush
+      ASSERT_OK(Put(cf, "", ""));
+    }
+
+    for (int cf = 0; cf < cf_count; cf++) {
+      dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
+    }
+  }
+
+  // Enough L0 files to trigger compaction
+  for (int cf = 0; cf < cf_count; cf++) {
+    ASSERT_EQ(NumTableFilesAtLevel(0, cf),
+      options.level0_file_num_compaction_trigger);
+  }
+
+  // Create more files for one column family, which triggers speed up
+  // condition, all compactions will be scheduled.
+  for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
+    for (int i = 0; i < kNumKeysPerFile; i++) {
+      ASSERT_OK(Put(0, Key(i), ""));
+    }
+    // put extra key to trigger flush
+    ASSERT_OK(Put(0, "", ""));
+    dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
+    ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
+              NumTableFilesAtLevel(0, 0));
+  }
+
+  // All CFs are pending compaction
+  ASSERT_EQ(cf_count, env_->GetThreadPoolQueueLen(Env::LOW));
+
+  // Unblock all compaction threads
+  for (size_t i = 0; i < kTotalCompactTasks; i++) {
+    sleeping_compact_tasks[i].WakeUp();
+    sleeping_compact_tasks[i].WaitUntilDone();
+  }
+
+  for (int cf = 0; cf < cf_count; cf++) {
+    dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
+  }
+
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+  // Max outstanding compact tasks reached limit
+  for (auto& ls : limiter_settings) {
+    ASSERT_EQ(ls.limit_tasks, ls.max_tasks);
+    ASSERT_EQ(0, ls.limiter->GetOutstandingTask());
+  }
+
+  // test manual compaction under a fully throttled limiter
+  int cf_test = 1;
+  unique_limiter->SetMaxOutstandingTask(0);
+
+  // flush one more file to cf 1
+  for (int i = 0; i < kNumKeysPerFile; i++) {
+      ASSERT_OK(Put(cf_test, Key(keyIndex++), ""));
+  }
+  // put extra key to trigger flush
+  ASSERT_OK(Put(cf_test, "", ""));
+
+  dbfull()->TEST_WaitForFlushMemTable(handles_[cf_test]);
+  ASSERT_EQ(1, NumTableFilesAtLevel(0, cf_test));
+
+  Compact(cf_test, Key(0), Key(keyIndex));
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+}
+
 INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
                         ::testing::Values(std::make_tuple(1, true),
                                           std::make_tuple(1, false),
@@ -3965,14 +4188,14 @@ class NoopMergeOperator : public MergeOperator {
  public:
   NoopMergeOperator() {}
 
-  virtual bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
-                           MergeOperationOutput* merge_out) const override {
+  bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
+                   MergeOperationOutput* merge_out) const override {
     std::string val("bar");
     merge_out->new_value = val;
     return true;
   }
 
-  virtual const char* Name() const override { return "Noop"; }
+  const char* Name() const override { return "Noop"; }
 };
 
 TEST_F(DBCompactionTest, PartialManualCompaction) {
@@ -4005,6 +4228,123 @@ TEST_F(DBCompactionTest, PartialManualCompaction) {
   dbfull()->CompactRange(cro, nullptr, nullptr);
 }
 
+TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) {
+  // Regression test for bug where manual compaction hangs forever when the DB
+  // is in read-only mode. Verify it now at least returns, despite failing.
+  const int kNumL0Files = 4;
+  std::unique_ptr<FaultInjectionTestEnv> mock_env(
+      new FaultInjectionTestEnv(Env::Default()));
+  Options opts = CurrentOptions();
+  opts.disable_auto_compactions = true;
+  opts.env = mock_env.get();
+  DestroyAndReopen(opts);
+
+  Random rnd(301);
+  for (int i = 0; i < kNumL0Files; ++i) {
+    // Make sure files are overlapping in key-range to prevent trivial move.
+    Put("key1", RandomString(&rnd, 1024));
+    Put("key2", RandomString(&rnd, 1024));
+    Flush();
+  }
+  ASSERT_EQ(kNumL0Files, NumTableFilesAtLevel(0));
+
+  // Enter read-only mode by failing a write.
+  mock_env->SetFilesystemActive(false);
+  // Make sure this is outside `CompactRange`'s range so that it doesn't fail
+  // early trying to flush memtable.
+  ASSERT_NOK(Put("key3", RandomString(&rnd, 1024)));
+
+  // In the bug scenario, the first manual compaction would fail and forget to
+  // unregister itself, causing the second one to hang forever due to conflict
+  // with a non-running compaction.
+  CompactRangeOptions cro;
+  cro.exclusive_manual_compaction = false;
+  Slice begin_key("key1");
+  Slice end_key("key2");
+  ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
+  ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
+
+  // Close before mock_env destruct.
+  Close();
+}
+
+// FixFileIngestionCompactionDeadlock tests and verifies that compaction and
+// file ingestion do not cause deadlock in the event of write stall triggered
+// by number of L0 files reaching level0_stop_writes_trigger.
+TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
+  const int kNumKeysPerFile = 100;
+  // Generate SST files.
+  Options options = CurrentOptions();
+
+  // Generate an external SST file containing a single key, i.e. 99
+  std::string sst_files_dir = dbname_ + "/sst_files/";
+  test::DestroyDir(env_, sst_files_dir);
+  ASSERT_OK(env_->CreateDir(sst_files_dir));
+  SstFileWriter sst_writer(EnvOptions(), options);
+  const std::string sst_file_path = sst_files_dir + "test.sst";
+  ASSERT_OK(sst_writer.Open(sst_file_path));
+  ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value"));
+  ASSERT_OK(sst_writer.Finish());
+
+  SyncPoint::GetInstance()->DisableProcessing();
+  SyncPoint::GetInstance()->ClearAllCallBacks();
+  SyncPoint::GetInstance()->LoadDependency({
+      {"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
+       "BackgroundCallCompaction:0"},
+  });
+  SyncPoint::GetInstance()->EnableProcessing();
+
+  options.write_buffer_size = 110 << 10;  // 110KB
+  options.level0_file_num_compaction_trigger =
+      options.level0_stop_writes_trigger;
+  options.max_subcompactions = max_subcompactions_;
+  options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+  DestroyAndReopen(options);
+  Random rnd(301);
+
+  // Generate level0_stop_writes_trigger L0 files to trigger write stop
+  for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
+    for (int j = 0; j != kNumKeysPerFile; ++j) {
+      ASSERT_OK(Put(Key(j), RandomString(&rnd, 990)));
+    }
+    if (0 == i) {
+      // When we reach here, the memtables have kNumKeysPerFile keys. Note that
+      // flush is not yet triggered. We need to write an extra key so that the
+      // write path will call PreprocessWrite and flush the previous key-value
+      // pairs to e flushed. After that, there will be the newest key in the
+      // memtable, and a bunch of L0 files. Since there is already one key in
+      // the memtable, then for i = 1, 2, ..., we do not have to write this
+      // extra key to trigger flush.
+      ASSERT_OK(Put("", ""));
+    }
+    dbfull()->TEST_WaitForFlushMemTable();
+    ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
+  }
+  // When we reach this point, there will be level0_stop_writes_trigger L0
+  // files and one extra key (99) in memory, which overlaps with the external
+  // SST file. Write stall triggers, and can be cleared only after compaction
+  // reduces the number of L0 files.
+
+  // Compaction will also be triggered since we have reached the threshold for
+  // auto compaction. Note that compaction may begin after the following file
+  // ingestion thread and waits for ingestion to finish.
+
+  // Thread to ingest file with overlapping key range with the current
+  // memtable. Consequently ingestion will trigger a flush. The flush MUST
+  // proceed without waiting for the write stall condition to clear, otherwise
+  // deadlock can happen.
+  port::Thread ingestion_thr([&]() {
+    IngestExternalFileOptions ifo;
+    Status s = db_->IngestExternalFile({sst_file_path}, ifo);
+    ASSERT_OK(s);
+  });
+
+  // More write to trigger write stop
+  ingestion_thr.join();
+  ASSERT_OK(dbfull()->TEST_WaitForCompact());
+  Close();
+}
+
 #endif // !defined(ROCKSDB_LITE)
 }  // namespace rocksdb