// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_test_util.h"
-#include "port/stack_trace.h"
#include "port/port.h"
+#include "port/stack_trace.h"
+#include "rocksdb/concurrent_task_limiter.h"
#include "rocksdb/experimental.h"
#include "rocksdb/utilities/convenience.h"
+#include "util/concurrent_task_limiter_impl.h"
+#include "util/fault_injection_test_env.h"
#include "util/sync_point.h"
+
namespace rocksdb {
// SYNC_POINT is not supported in released Windows mode.
class FlushedFileCollector : public EventListener {
public:
FlushedFileCollector() {}
- ~FlushedFileCollector() {}
+ ~FlushedFileCollector() override {}
- virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
+ void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
std::lock_guard<std::mutex> lock(mutex_);
flushed_files_.push_back(info.file_path);
}
}
}
- ~CompactionStatsCollector() {}
+ ~CompactionStatsCollector() override {}
- virtual void OnCompactionCompleted(DB* /* db */,
- const CompactionJobInfo& info) override {
+ void OnCompactionCompleted(DB* /* db */,
+ const CompactionJobInfo& info) override {
int k = static_cast<int>(info.compaction_reason);
int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
assert(k >= 0 && k < num_of_reasons);
compaction_completed_[k]++;
}
- virtual void OnExternalFileIngested(DB* /* db */,
- const ExternalFileIngestionInfo& /* info */) override {
+ void OnExternalFileIngested(
+ DB* /* db */, const ExternalFileIngestionInfo& /* info */) override {
int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
compaction_completed_[k]++;
}
- virtual void OnFlushCompleted(DB* /* db */,
- const FlushJobInfo& /* info */) override {
+ void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override {
int k = static_cast<int>(CompactionReason::kFlush);
compaction_completed_[k]++;
}
// Reopen the DB with stats-update disabled
options.skip_stats_update_on_db_open = true;
+ options.max_open_files = 20;
env_->random_file_open_counter_.store(0);
Reopen(options);
Options options = CurrentOptions();
options.env = env_;
options.new_table_reader_for_compaction_inputs = true;
- options.max_open_files = 100;
+ options.max_open_files = 20;
options.level0_file_num_compaction_trigger = 3;
DestroyAndReopen(options);
Random rnd(301);
Flush();
dbfull()->TEST_WaitForCompact();
// preloading iterator issues one table cache lookup and create
- // a new table reader.
- ASSERT_EQ(num_table_cache_lookup, 1);
+ // a new table reader, if not preloaded.
+ int old_num_table_cache_lookup = num_table_cache_lookup;
+ ASSERT_GE(num_table_cache_lookup, 1);
ASSERT_EQ(num_new_table_reader, 1);
num_table_cache_lookup = 0;
num_new_table_reader = 0;
ASSERT_EQ(Key(k), Get(Key(k)));
// lookup iterator from table cache and no need to create a new one.
- ASSERT_EQ(num_table_cache_lookup, 1);
+ ASSERT_EQ(old_num_table_cache_lookup + num_table_cache_lookup, 2);
ASSERT_EQ(num_new_table_reader, 0);
}
}
// a new table reader. One file is created for flush and one for compaction.
// Compaction inputs make no table cache look-up for data/range deletion
// iterators
- ASSERT_EQ(num_table_cache_lookup, 2);
+ // May preload table cache too.
+ ASSERT_GE(num_table_cache_lookup, 2);
+ int old_num_table_cache_lookup2 = num_table_cache_lookup;
+
// Create new iterator for:
// (1) 1 for verifying flush results
// (2) 3 for compaction input files
num_table_cache_lookup = 0;
num_new_table_reader = 0;
ASSERT_EQ(Key(1), Get(Key(1)));
- ASSERT_EQ(num_table_cache_lookup, 1);
+ ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 3);
ASSERT_EQ(num_new_table_reader, 0);
num_table_cache_lookup = 0;
db_->CompactRange(cro, nullptr, nullptr);
// Only verifying compaction outputs issues one table cache lookup
// for both data block and range deletion block).
- ASSERT_EQ(num_table_cache_lookup, 1);
+ // May preload table cache too.
+ ASSERT_GE(num_table_cache_lookup, 1);
+ old_num_table_cache_lookup2 = num_table_cache_lookup;
// One for compaction input, one for verifying compaction results.
ASSERT_EQ(num_new_table_reader, 2);
num_table_cache_lookup = 0;
num_new_table_reader = 0;
ASSERT_EQ(Key(1), Get(Key(1)));
- ASSERT_EQ(num_table_cache_lookup, 1);
+ ASSERT_EQ(num_table_cache_lookup + old_num_table_cache_lookup2, 2);
ASSERT_EQ(num_new_table_reader, 0);
rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
// Note that we don't delete level 0 files
compact_options.change_level = true;
compact_options.target_level = 1;
- ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
+ ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
+ dbfull()->TEST_WaitForCompact();
ASSERT_OK(
DeleteFilesInRange(db_, db_->DefaultColumnFamily(), nullptr, nullptr));
dbfull()->TEST_WaitForCompact();
}
+static std::string ShortKey(int i) {
+ assert(i < 10000);
+ char buf[100];
+ snprintf(buf, sizeof(buf), "key%04d", i);
+ return std::string(buf);
+}
TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
int32_t trivial_move = 0;
[&](void* /*arg*/) { non_trivial_move++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ // The key size is guaranteed to be <= 8
+ class ShortKeyComparator : public Comparator {
+ int Compare(const rocksdb::Slice& a,
+ const rocksdb::Slice& b) const override {
+ assert(a.size() <= 8);
+ assert(b.size() <= 8);
+ return BytewiseComparator()->Compare(a, b);
+ }
+ const char* Name() const override { return "ShortKeyComparator"; }
+ void FindShortestSeparator(std::string* start,
+ const rocksdb::Slice& limit) const override {
+ return BytewiseComparator()->FindShortestSeparator(start, limit);
+ }
+ void FindShortSuccessor(std::string* key) const override {
+ return BytewiseComparator()->FindShortSuccessor(key);
+ }
+ } short_key_cmp;
Options options = CurrentOptions();
options.target_file_size_base = 100000000;
options.write_buffer_size = 100000000;
options.max_subcompactions = max_subcompactions_;
+ options.comparator = &short_key_cmp;
DestroyAndReopen(options);
int32_t value_size = 10 * 1024; // 10 KB
// File with keys [ 0 => 99 ]
for (int i = 0; i < 100; i++) {
values.push_back(RandomString(&rnd, value_size));
- ASSERT_OK(Put(Key(i), values[i]));
+ ASSERT_OK(Put(ShortKey(i), values[i]));
}
ASSERT_OK(Flush());
// File with keys [ 100 => 199 ]
for (int i = 100; i < 200; i++) {
values.push_back(RandomString(&rnd, value_size));
- ASSERT_OK(Put(Key(i), values[i]));
+ ASSERT_OK(Put(ShortKey(i), values[i]));
}
ASSERT_OK(Flush());
// File with keys [ 200 => 299 ]
for (int i = 200; i < 300; i++) {
values.push_back(RandomString(&rnd, value_size));
- ASSERT_OK(Put(Key(i), values[i]));
+ ASSERT_OK(Put(ShortKey(i), values[i]));
}
ASSERT_OK(Flush());
ASSERT_EQ(non_trivial_move, 0);
for (int i = 0; i < 300; i++) {
- ASSERT_EQ(Get(Key(i)), values[i]);
+ ASSERT_EQ(Get(ShortKey(i)), values[i]);
}
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
options.level0_file_num_compaction_trigger = kNumLevelFiles;
// inflate it a bit to account for key/metadata overhead
options.target_file_size_base = 120 * kNumKeysPerFile * kValueSize / 100;
- Reopen(options);
+ CreateAndReopenWithCF({"one"}, options);
Random rnd(301);
const Snapshot* snapshot = nullptr;
// just need to bump seqnum so ReleaseSnapshot knows the newest key in the SST
// files does not need to be preserved in case of a future snapshot.
ASSERT_OK(Put(Key(0), "val"));
+ ASSERT_NE(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
// release snapshot and wait for compactions to finish. Single-file
// compactions should be triggered, which reduce the size of each bottom-level
// file without changing file count.
db_->ReleaseSnapshot(snapshot);
+ ASSERT_EQ(kMaxSequenceNumber, dbfull()->bottommost_files_mark_threshold_);
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
ASSERT_EQ(2, collector->num_ssts_creation_started());
}
+TEST_F(DBCompactionTest, CompactionLimiter) {
+ const int kNumKeysPerFile = 10;
+ const int kMaxBackgroundThreads = 64;
+
+ struct CompactionLimiter {
+ std::string name;
+ int limit_tasks;
+ int max_tasks;
+ int tasks;
+ std::shared_ptr<ConcurrentTaskLimiter> limiter;
+ };
+
+ std::vector<CompactionLimiter> limiter_settings;
+ limiter_settings.push_back({"limiter_1", 1, 0, 0, nullptr});
+ limiter_settings.push_back({"limiter_2", 2, 0, 0, nullptr});
+ limiter_settings.push_back({"limiter_3", 3, 0, 0, nullptr});
+
+ for (auto& ls : limiter_settings) {
+ ls.limiter.reset(NewConcurrentTaskLimiter(ls.name, ls.limit_tasks));
+ }
+
+ std::shared_ptr<ConcurrentTaskLimiter> unique_limiter(
+ NewConcurrentTaskLimiter("unique_limiter", -1));
+
+ const char* cf_names[] = {"default", "0", "1", "2", "3", "4", "5",
+ "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
+ const int cf_count = sizeof cf_names / sizeof cf_names[0];
+
+ std::unordered_map<std::string, CompactionLimiter*> cf_to_limiter;
+
+ Options options = CurrentOptions();
+ options.write_buffer_size = 110 * 1024; // 110KB
+ options.arena_block_size = 4096;
+ options.num_levels = 3;
+ options.level0_file_num_compaction_trigger = 4;
+ options.level0_slowdown_writes_trigger = 64;
+ options.level0_stop_writes_trigger = 64;
+ options.max_background_jobs = kMaxBackgroundThreads; // Enough threads
+ options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+ options.max_write_buffer_number = 10; // Enough memtables
+ DestroyAndReopen(options);
+
+ std::vector<Options> option_vector;
+ option_vector.reserve(cf_count);
+
+ for (int cf = 0; cf < cf_count; cf++) {
+ ColumnFamilyOptions cf_opt(options);
+ if (cf == 0) {
+ // "Default" CF does't use compaction limiter
+ cf_opt.compaction_thread_limiter = nullptr;
+ } else if (cf == 1) {
+ // "1" CF uses bypass compaction limiter
+ unique_limiter->SetMaxOutstandingTask(-1);
+ cf_opt.compaction_thread_limiter = unique_limiter;
+ } else {
+ // Assign limiter by mod
+ auto& ls = limiter_settings[cf % 3];
+ cf_opt.compaction_thread_limiter = ls.limiter;
+ cf_to_limiter[cf_names[cf]] = &ls;
+ }
+ option_vector.emplace_back(DBOptions(options), cf_opt);
+ }
+
+ for (int cf = 1; cf < cf_count; cf++) {
+ CreateColumnFamilies({cf_names[cf]}, option_vector[cf]);
+ }
+
+ ReopenWithColumnFamilies(std::vector<std::string>(cf_names,
+ cf_names + cf_count),
+ option_vector);
+
+ port::Mutex mutex;
+
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::BackgroundCompaction:BeforeCompaction", [&](void* arg) {
+ const auto& cf_name = static_cast<ColumnFamilyData*>(arg)->GetName();
+ auto iter = cf_to_limiter.find(cf_name);
+ if (iter != cf_to_limiter.end()) {
+ MutexLock l(&mutex);
+ ASSERT_GE(iter->second->limit_tasks, ++iter->second->tasks);
+ iter->second->max_tasks = std::max(iter->second->max_tasks,
+ iter->second->limit_tasks);
+ }
+ });
+
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::BackgroundCompaction:AfterCompaction", [&](void* arg) {
+ const auto& cf_name = static_cast<ColumnFamilyData*>(arg)->GetName();
+ auto iter = cf_to_limiter.find(cf_name);
+ if (iter != cf_to_limiter.end()) {
+ MutexLock l(&mutex);
+ ASSERT_GE(--iter->second->tasks, 0);
+ }
+ });
+
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+
+ // Block all compact threads in thread pool.
+ const size_t kTotalFlushTasks = kMaxBackgroundThreads / 4;
+ const size_t kTotalCompactTasks = kMaxBackgroundThreads - kTotalFlushTasks;
+ env_->SetBackgroundThreads((int)kTotalFlushTasks, Env::HIGH);
+ env_->SetBackgroundThreads((int)kTotalCompactTasks, Env::LOW);
+
+ test::SleepingBackgroundTask sleeping_compact_tasks[kTotalCompactTasks];
+
+ // Block all compaction threads in thread pool.
+ for (size_t i = 0; i < kTotalCompactTasks; i++) {
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
+ &sleeping_compact_tasks[i], Env::LOW);
+ sleeping_compact_tasks[i].WaitUntilSleeping();
+ }
+
+ int keyIndex = 0;
+
+ for (int n = 0; n < options.level0_file_num_compaction_trigger; n++) {
+ for (int cf = 0; cf < cf_count; cf++) {
+ for (int i = 0; i < kNumKeysPerFile; i++) {
+ ASSERT_OK(Put(cf, Key(keyIndex++), ""));
+ }
+ // put extra key to trigger flush
+ ASSERT_OK(Put(cf, "", ""));
+ }
+
+ for (int cf = 0; cf < cf_count; cf++) {
+ dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
+ }
+ }
+
+ // Enough L0 files to trigger compaction
+ for (int cf = 0; cf < cf_count; cf++) {
+ ASSERT_EQ(NumTableFilesAtLevel(0, cf),
+ options.level0_file_num_compaction_trigger);
+ }
+
+ // Create more files for one column family, which triggers speed up
+ // condition, all compactions will be scheduled.
+ for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
+ for (int i = 0; i < kNumKeysPerFile; i++) {
+ ASSERT_OK(Put(0, Key(i), ""));
+ }
+ // put extra key to trigger flush
+ ASSERT_OK(Put(0, "", ""));
+ dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
+ ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
+ NumTableFilesAtLevel(0, 0));
+ }
+
+ // All CFs are pending compaction
+ ASSERT_EQ(cf_count, env_->GetThreadPoolQueueLen(Env::LOW));
+
+ // Unblock all compaction threads
+ for (size_t i = 0; i < kTotalCompactTasks; i++) {
+ sleeping_compact_tasks[i].WakeUp();
+ sleeping_compact_tasks[i].WaitUntilDone();
+ }
+
+ for (int cf = 0; cf < cf_count; cf++) {
+ dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
+ }
+
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ // Max outstanding compact tasks reached limit
+ for (auto& ls : limiter_settings) {
+ ASSERT_EQ(ls.limit_tasks, ls.max_tasks);
+ ASSERT_EQ(0, ls.limiter->GetOutstandingTask());
+ }
+
+ // test manual compaction under a fully throttled limiter
+ int cf_test = 1;
+ unique_limiter->SetMaxOutstandingTask(0);
+
+ // flush one more file to cf 1
+ for (int i = 0; i < kNumKeysPerFile; i++) {
+ ASSERT_OK(Put(cf_test, Key(keyIndex++), ""));
+ }
+ // put extra key to trigger flush
+ ASSERT_OK(Put(cf_test, "", ""));
+
+ dbfull()->TEST_WaitForFlushMemTable(handles_[cf_test]);
+ ASSERT_EQ(1, NumTableFilesAtLevel(0, cf_test));
+
+ Compact(cf_test, Key(0), Key(keyIndex));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+}
+
INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
::testing::Values(std::make_tuple(1, true),
std::make_tuple(1, false),
public:
NoopMergeOperator() {}
- virtual bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
- MergeOperationOutput* merge_out) const override {
+ bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
+ MergeOperationOutput* merge_out) const override {
std::string val("bar");
merge_out->new_value = val;
return true;
}
- virtual const char* Name() const override { return "Noop"; }
+ const char* Name() const override { return "Noop"; }
};
TEST_F(DBCompactionTest, PartialManualCompaction) {
dbfull()->CompactRange(cro, nullptr, nullptr);
}
+TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) {
+ // Regression test for bug where manual compaction hangs forever when the DB
+ // is in read-only mode. Verify it now at least returns, despite failing.
+ const int kNumL0Files = 4;
+ std::unique_ptr<FaultInjectionTestEnv> mock_env(
+ new FaultInjectionTestEnv(Env::Default()));
+ Options opts = CurrentOptions();
+ opts.disable_auto_compactions = true;
+ opts.env = mock_env.get();
+ DestroyAndReopen(opts);
+
+ Random rnd(301);
+ for (int i = 0; i < kNumL0Files; ++i) {
+ // Make sure files are overlapping in key-range to prevent trivial move.
+ Put("key1", RandomString(&rnd, 1024));
+ Put("key2", RandomString(&rnd, 1024));
+ Flush();
+ }
+ ASSERT_EQ(kNumL0Files, NumTableFilesAtLevel(0));
+
+ // Enter read-only mode by failing a write.
+ mock_env->SetFilesystemActive(false);
+ // Make sure this is outside `CompactRange`'s range so that it doesn't fail
+ // early trying to flush memtable.
+ ASSERT_NOK(Put("key3", RandomString(&rnd, 1024)));
+
+ // In the bug scenario, the first manual compaction would fail and forget to
+ // unregister itself, causing the second one to hang forever due to conflict
+ // with a non-running compaction.
+ CompactRangeOptions cro;
+ cro.exclusive_manual_compaction = false;
+ Slice begin_key("key1");
+ Slice end_key("key2");
+ ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
+ ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
+
+ // Close before mock_env destruct.
+ Close();
+}
+
+// FixFileIngestionCompactionDeadlock tests and verifies that compaction and
+// file ingestion do not cause deadlock in the event of write stall triggered
+// by number of L0 files reaching level0_stop_writes_trigger.
+TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
+ const int kNumKeysPerFile = 100;
+ // Generate SST files.
+ Options options = CurrentOptions();
+
+ // Generate an external SST file containing a single key, i.e. 99
+ std::string sst_files_dir = dbname_ + "/sst_files/";
+ test::DestroyDir(env_, sst_files_dir);
+ ASSERT_OK(env_->CreateDir(sst_files_dir));
+ SstFileWriter sst_writer(EnvOptions(), options);
+ const std::string sst_file_path = sst_files_dir + "test.sst";
+ ASSERT_OK(sst_writer.Open(sst_file_path));
+ ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value"));
+ ASSERT_OK(sst_writer.Finish());
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->LoadDependency({
+ {"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
+ "BackgroundCallCompaction:0"},
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ options.write_buffer_size = 110 << 10; // 110KB
+ options.level0_file_num_compaction_trigger =
+ options.level0_stop_writes_trigger;
+ options.max_subcompactions = max_subcompactions_;
+ options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+ DestroyAndReopen(options);
+ Random rnd(301);
+
+ // Generate level0_stop_writes_trigger L0 files to trigger write stop
+ for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
+ for (int j = 0; j != kNumKeysPerFile; ++j) {
+ ASSERT_OK(Put(Key(j), RandomString(&rnd, 990)));
+ }
+ if (0 == i) {
+ // When we reach here, the memtables have kNumKeysPerFile keys. Note that
+ // flush is not yet triggered. We need to write an extra key so that the
+ // write path will call PreprocessWrite and flush the previous key-value
+ // pairs to e flushed. After that, there will be the newest key in the
+ // memtable, and a bunch of L0 files. Since there is already one key in
+ // the memtable, then for i = 1, 2, ..., we do not have to write this
+ // extra key to trigger flush.
+ ASSERT_OK(Put("", ""));
+ }
+ dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
+ }
+ // When we reach this point, there will be level0_stop_writes_trigger L0
+ // files and one extra key (99) in memory, which overlaps with the external
+ // SST file. Write stall triggers, and can be cleared only after compaction
+ // reduces the number of L0 files.
+
+ // Compaction will also be triggered since we have reached the threshold for
+ // auto compaction. Note that compaction may begin after the following file
+ // ingestion thread and waits for ingestion to finish.
+
+ // Thread to ingest file with overlapping key range with the current
+ // memtable. Consequently ingestion will trigger a flush. The flush MUST
+ // proceed without waiting for the write stall condition to clear, otherwise
+ // deadlock can happen.
+ port::Thread ingestion_thr([&]() {
+ IngestExternalFileOptions ifo;
+ Status s = db_->IngestExternalFile({sst_file_path}, ifo);
+ ASSERT_OK(s);
+ });
+
+ // More write to trigger write stop
+ ingestion_thr.join();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ Close();
+}
+
#endif // !defined(ROCKSDB_LITE)
} // namespace rocksdb