// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include <tuple>
+
+#include "db/blob/blob_index.h"
#include "db/db_test_util.h"
+#include "env/mock_env.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "rocksdb/concurrent_task_limiter.h"
#include "rocksdb/sst_file_writer.h"
#include "rocksdb/utilities/convenience.h"
#include "test_util/sync_point.h"
+#include "test_util/testutil.h"
#include "util/concurrent_task_limiter_impl.h"
#include "util/random.h"
#include "utilities/fault_injection_env.h"
+#include "utilities/fault_injection_fs.h"
namespace ROCKSDB_NAMESPACE {
// SYNC_POINT is not supported in released Windows mode.
#if !defined(ROCKSDB_LITE)
+class CompactionStatsCollector : public EventListener {
+ public:
+ CompactionStatsCollector()
+ : compaction_completed_(
+ static_cast<int>(CompactionReason::kNumOfReasons)) {
+ for (auto& v : compaction_completed_) {
+ v.store(0);
+ }
+ }
+
+ ~CompactionStatsCollector() override {}
+
+ void OnCompactionCompleted(DB* /* db */,
+ const CompactionJobInfo& info) override {
+ int k = static_cast<int>(info.compaction_reason);
+ int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
+ assert(k >= 0 && k < num_of_reasons);
+ compaction_completed_[k]++;
+ }
+
+ void OnExternalFileIngested(
+ DB* /* db */, const ExternalFileIngestionInfo& /* info */) override {
+ int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
+ compaction_completed_[k]++;
+ }
+
+ void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override {
+ int k = static_cast<int>(CompactionReason::kFlush);
+ compaction_completed_[k]++;
+ }
+
+ int NumberOfCompactions(CompactionReason reason) const {
+ int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
+ int k = static_cast<int>(reason);
+ assert(k >= 0 && k < num_of_reasons);
+ return compaction_completed_.at(k).load();
+ }
+
+ private:
+ std::vector<std::atomic<int>> compaction_completed_;
+};
+
class DBCompactionTest : public DBTestBase {
public:
DBCompactionTest()
- : DBTestBase("/db_compaction_test", /*env_do_fsync=*/true) {}
+ : DBTestBase("db_compaction_test", /*env_do_fsync=*/true) {}
+
+ protected:
+ /*
+ * Verifies compaction stats of cfd are valid.
+ *
+ * For each level of cfd, its compaction stats are valid if
+ * 1) sum(stat.counts) == stat.count, and
+ * 2) stat.counts[i] == collector.NumberOfCompactions(i)
+ */
+ void VerifyCompactionStats(ColumnFamilyData& cfd,
+ const CompactionStatsCollector& collector) {
+#ifndef NDEBUG
+ InternalStats* internal_stats_ptr = cfd.internal_stats();
+ ASSERT_NE(internal_stats_ptr, nullptr);
+ const std::vector<InternalStats::CompactionStats>& comp_stats =
+ internal_stats_ptr->TEST_GetCompactionStats();
+ const int num_of_reasons =
+ static_cast<int>(CompactionReason::kNumOfReasons);
+ std::vector<int> counts(num_of_reasons, 0);
+ // Count the number of compactions caused by each CompactionReason across
+ // all levels.
+ for (const auto& stat : comp_stats) {
+ int sum = 0;
+ for (int i = 0; i < num_of_reasons; i++) {
+ counts[i] += stat.counts[i];
+ sum += stat.counts[i];
+ }
+ ASSERT_EQ(sum, stat.count);
+ }
+ // Verify InternalStats bookkeeping matches that of
+ // CompactionStatsCollector, assuming that all compactions complete.
+ for (int i = 0; i < num_of_reasons; i++) {
+ ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)),
+ counts[i]);
+ }
+#endif /* NDEBUG */
+ }
};
class DBCompactionTestWithParam
public testing::WithParamInterface<std::tuple<uint32_t, bool>> {
public:
DBCompactionTestWithParam()
- : DBTestBase("/db_compaction_test", /*env_do_fsync=*/true) {
+ : DBTestBase("db_compaction_test", /*env_do_fsync=*/true) {
max_subcompactions_ = std::get<0>(GetParam());
exclusive_manual_compaction_ = std::get<1>(GetParam());
}
public testing::WithParamInterface<BottommostLevelCompaction> {
public:
DBCompactionTestWithBottommostParam()
- : DBTestBase("/db_compaction_test", /*env_do_fsync=*/true) {
+ : DBTestBase("db_compaction_test", /*env_do_fsync=*/true) {
bottommost_level_compaction_ = GetParam();
}
ChangeLevelConflictsWithAuto() : DBCompactionTest() {}
};
-namespace {
+// Param = true: grab the compaction pressure token (enable
+// parallel compactions)
+// Param = false: Not grab the token (no parallel compactions)
+class RoundRobinSubcompactionsAgainstPressureToken
+ : public DBCompactionTest,
+ public ::testing::WithParamInterface<bool> {
+ public:
+ RoundRobinSubcompactionsAgainstPressureToken() {
+ grab_pressure_token_ = GetParam();
+ }
+ bool grab_pressure_token_;
+};
+
+class RoundRobinSubcompactionsAgainstResources
+ : public DBCompactionTest,
+ public ::testing::WithParamInterface<std::tuple<int, int>> {
+ public:
+ RoundRobinSubcompactionsAgainstResources() {
+ total_low_pri_threads_ = std::get<0>(GetParam());
+ max_compaction_limits_ = std::get<1>(GetParam());
+ }
+ int total_low_pri_threads_;
+ int max_compaction_limits_;
+};
+namespace {
class FlushedFileCollector : public EventListener {
public:
FlushedFileCollector() {}
std::mutex mutex_;
};
-class CompactionStatsCollector : public EventListener {
-public:
- CompactionStatsCollector()
- : compaction_completed_(static_cast<int>(CompactionReason::kNumOfReasons)) {
- for (auto& v : compaction_completed_) {
- v.store(0);
- }
- }
-
- ~CompactionStatsCollector() override {}
-
- void OnCompactionCompleted(DB* /* db */,
- const CompactionJobInfo& info) override {
- int k = static_cast<int>(info.compaction_reason);
- int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
- assert(k >= 0 && k < num_of_reasons);
- compaction_completed_[k]++;
- }
-
- void OnExternalFileIngested(
- DB* /* db */, const ExternalFileIngestionInfo& /* info */) override {
- int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
- compaction_completed_[k]++;
- }
-
- void OnFlushCompleted(DB* /* db */, const FlushJobInfo& /* info */) override {
- int k = static_cast<int>(CompactionReason::kFlush);
- compaction_completed_[k]++;
- }
-
- int NumberOfCompactions(CompactionReason reason) const {
- int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
- int k = static_cast<int>(reason);
- assert(k >= 0 && k < num_of_reasons);
- return compaction_completed_.at(k).load();
- }
-
-private:
- std::vector<std::atomic<int>> compaction_completed_;
-};
-
class SstStatsCollector : public EventListener {
public:
SstStatsCollector() : num_ssts_creation_started_(0) {}
options.target_file_size_base * options.target_file_size_multiplier;
options.max_bytes_for_level_multiplier = 2;
options.disable_auto_compactions = false;
+ options.compaction_options_universal.max_size_amplification_percent = 100;
return options;
}
-bool HaveOverlappingKeyRanges(
- const Comparator* c,
- const SstFileMetaData& a, const SstFileMetaData& b) {
+bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a,
+ const SstFileMetaData& b) {
if (c->CompareWithoutTimestamp(a.smallestkey, b.smallestkey) >= 0) {
if (c->CompareWithoutTimestamp(a.smallestkey, b.largestkey) <= 0) {
// b.smallestkey <= a.smallestkey <= b.largestkey
// Identifies all files between level "min_level" and "max_level"
// which has overlapping key range with "input_file_meta".
void GetOverlappingFileNumbersForLevelCompaction(
- const ColumnFamilyMetaData& cf_meta,
- const Comparator* comparator,
- int min_level, int max_level,
- const SstFileMetaData* input_file_meta,
+ const ColumnFamilyMetaData& cf_meta, const Comparator* comparator,
+ int min_level, int max_level, const SstFileMetaData* input_file_meta,
std::set<std::string>* overlapping_file_names) {
std::set<const SstFileMetaData*> overlapping_files;
overlapping_files.insert(input_file_meta);
for (int m = min_level; m <= max_level; ++m) {
for (auto& file : cf_meta.levels[m].files) {
for (auto* included_file : overlapping_files) {
- if (HaveOverlappingKeyRanges(
- comparator, *included_file, file)) {
+ if (HaveOverlappingKeyRanges(comparator, *included_file, file)) {
overlapping_files.insert(&file);
overlapping_file_names->insert(file.name);
break;
#endif
}
-/*
- * Verifies compaction stats of cfd are valid.
- *
- * For each level of cfd, its compaction stats are valid if
- * 1) sum(stat.counts) == stat.count, and
- * 2) stat.counts[i] == collector.NumberOfCompactions(i)
- */
-void VerifyCompactionStats(ColumnFamilyData& cfd,
- const CompactionStatsCollector& collector) {
-#ifndef NDEBUG
- InternalStats* internal_stats_ptr = cfd.internal_stats();
- ASSERT_TRUE(internal_stats_ptr != nullptr);
- const std::vector<InternalStats::CompactionStats>& comp_stats =
- internal_stats_ptr->TEST_GetCompactionStats();
- const int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
- std::vector<int> counts(num_of_reasons, 0);
- // Count the number of compactions caused by each CompactionReason across
- // all levels.
- for (const auto& stat : comp_stats) {
- int sum = 0;
- for (int i = 0; i < num_of_reasons; i++) {
- counts[i] += stat.counts[i];
- sum += stat.counts[i];
- }
- ASSERT_EQ(sum, stat.count);
- }
- // Verify InternalStats bookkeeping matches that of CompactionStatsCollector,
- // assuming that all compactions complete.
- for (int i = 0; i < num_of_reasons; i++) {
- ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)), counts[i]);
- }
-#endif /* NDEBUG */
-}
-
-const SstFileMetaData* PickFileRandomly(
- const ColumnFamilyMetaData& cf_meta,
- Random* rand,
- int* level = nullptr) {
- auto file_id = rand->Uniform(static_cast<int>(
- cf_meta.file_count)) + 1;
+const SstFileMetaData* PickFileRandomly(const ColumnFamilyMetaData& cf_meta,
+ Random* rand, int* level = nullptr) {
+ auto file_id = rand->Uniform(static_cast<int>(cf_meta.file_count)) + 1;
for (auto& level_meta : cf_meta.levels) {
if (file_id <= level_meta.files.size()) {
if (level != nullptr) {
}
} // anonymous namespace
-#ifndef ROCKSDB_VALGRIND_RUN
+#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
// All the TEST_P tests run once with sub_compactions disabled (i.e.
// options.max_subcompactions = 1) and once with it enabled
TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) {
values.push_back(rnd.RandomString(kCDTValueSize));
ASSERT_OK(Put(Key(k), values[k]));
}
- dbfull()->TEST_WaitForFlushMemTable();
- dbfull()->TEST_WaitForCompact();
- db_size[0] = Size(Key(0), Key(kTestSize - 1));
-
- for (int k = 0; k < kTestSize; ++k) {
- ASSERT_OK(Delete(Key(k)));
- }
- dbfull()->TEST_WaitForFlushMemTable();
- dbfull()->TEST_WaitForCompact();
- db_size[1] = Size(Key(0), Key(kTestSize - 1));
-
- // must have much smaller db size.
- ASSERT_GT(db_size[0] / 3, db_size[1]);
- }
-}
-#endif // ROCKSDB_VALGRIND_RUN
-
-TEST_P(DBCompactionTestWithParam, CompactionsPreserveDeletes) {
- // For each options type we test following
- // - Enable preserve_deletes
- // - write bunch of keys and deletes
- // - Set start_seqnum to the beginning; compact; check that keys are present
- // - rewind start_seqnum way forward; compact; check that keys are gone
-
- for (int tid = 0; tid < 3; ++tid) {
- Options options = DeletionTriggerOptions(CurrentOptions());
- options.max_subcompactions = max_subcompactions_;
- options.preserve_deletes=true;
- options.num_levels = 2;
-
- if (tid == 1) {
- options.skip_stats_update_on_db_open = true;
- } else if (tid == 2) {
- // third pass with universal compaction
- options.compaction_style = kCompactionStyleUniversal;
- }
-
- DestroyAndReopen(options);
- Random rnd(301);
- // highlight the default; all deletes should be preserved
- SetPreserveDeletesSequenceNumber(0);
-
- const int kTestSize = kCDTKeysPerBuffer;
- std::vector<std::string> values;
- for (int k = 0; k < kTestSize; ++k) {
- values.push_back(rnd.RandomString(kCDTValueSize));
- ASSERT_OK(Put(Key(k), values[k]));
- }
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[0]));
for (int k = 0; k < kTestSize; ++k) {
ASSERT_OK(Delete(Key(k)));
}
- // to ensure we tackle all tombstones
- CompactRangeOptions cro;
- cro.change_level = true;
- cro.target_level = 2;
- cro.bottommost_level_compaction =
- BottommostLevelCompaction::kForceOptimized;
-
- dbfull()->TEST_WaitForFlushMemTable();
- dbfull()->CompactRange(cro, nullptr, nullptr);
-
- // check that normal user iterator doesn't see anything
- Iterator* db_iter = dbfull()->NewIterator(ReadOptions());
- int i = 0;
- for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
- i++;
- }
- ASSERT_EQ(i, 0);
- delete db_iter;
-
- // check that iterator that sees internal keys sees tombstones
- ReadOptions ro;
- ro.iter_start_seqnum=1;
- db_iter = dbfull()->NewIterator(ro);
- i = 0;
- for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
- i++;
- }
- ASSERT_EQ(i, 4);
- delete db_iter;
-
- // now all deletes should be gone
- SetPreserveDeletesSequenceNumber(100000000);
- dbfull()->CompactRange(cro, nullptr, nullptr);
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[1]));
- db_iter = dbfull()->NewIterator(ro);
- i = 0;
- for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
- i++;
+ if (options.compaction_style == kCompactionStyleUniversal) {
+ // Claim: in universal compaction none of the original data will remain
+ // once compactions settle.
+ //
+ // Proof: The compensated size of the file containing the most tombstones
+ // is enough on its own to trigger size amp compaction. Size amp
+ // compaction is a full compaction, so all tombstones meet the obsolete
+ // keys they cover.
+ ASSERT_EQ(0, db_size[1]);
+ } else {
+ // Claim: in level compaction at most `db_size[0] / 2` of the original
+ // data will remain once compactions settle.
+ //
+ // Proof: Assume the original data is all in the bottom level. If it were
+ // not, it would meet its tombstone sooner. The original data size is
+ // large enough to require fanout to bottom level to be greater than
+ // `max_bytes_for_level_multiplier == 2`. In the level just above,
+ // tombstones must cover less than `db_size[0] / 4` bytes since fanout >=
+ // 2 and file size is compensated by doubling the size of values we expect
+ // are covered (`kDeletionWeightOnCompaction == 2`). The tombstones in
+ // levels above must cover less than `db_size[0] / 8` bytes of original
+ // data, `db_size[0] / 16`, and so on.
+ ASSERT_GT(db_size[0] / 2, db_size[1]);
}
- ASSERT_EQ(i, 0);
- delete db_iter;
}
}
+#endif // !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
TEST_F(DBCompactionTest, SkipStatsUpdateTest) {
// This test verify UpdateAccumulatedStats is not on
TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
Options options = CurrentOptions();
options.env = env_;
- options.new_table_reader_for_compaction_inputs = true;
options.max_open_files = 20;
options.level0_file_num_compaction_trigger = 3;
+ // Avoid many shards with small max_open_files, where as little as
+ // two table insertions could lead to an LRU eviction, depending on
+ // hash values.
+ options.table_cache_numshardbits = 2;
DestroyAndReopen(options);
Random rnd(301);
ASSERT_OK(Put(Key(10 - k), "bar"));
if (k < options.level0_file_num_compaction_trigger - 1) {
num_table_cache_lookup = 0;
- Flush();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// preloading iterator issues one table cache lookup and create
// a new table reader, if not preloaded.
int old_num_table_cache_lookup = num_table_cache_lookup;
num_table_cache_lookup = 0;
num_new_table_reader = 0;
- Flush();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Preloading iterator issues one table cache lookup and creates
// a new table reader. One file is created for flush and one for compaction.
// Compaction inputs make no table cache look-up for data/range deletion
cro.change_level = true;
cro.target_level = 2;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
- db_->CompactRange(cro, nullptr, nullptr);
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
// Only verifying compaction outputs issues one table cache lookup
// for both data block and range deletion block).
// May preload table cache too.
values.push_back(rnd.RandomString(kCDTValueSize));
ASSERT_OK(Put(Key(k), values[k]));
}
- dbfull()->TEST_WaitForFlushMemTable();
- dbfull()->TEST_WaitForCompact();
- db_size[0] = Size(Key(0), Key(kTestSize - 1));
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[0]));
Close();
// round 2 --- disable auto-compactions and issue deletions.
for (int k = 0; k < kTestSize; ++k) {
ASSERT_OK(Delete(Key(k)));
}
- db_size[1] = Size(Key(0), Key(kTestSize - 1));
+ ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[1]));
Close();
- // as auto_compaction is off, we shouldn't see too much reduce
- // in db size.
- ASSERT_LT(db_size[0] / 3, db_size[1]);
+ // as auto_compaction is off, we shouldn't see any reduction in db size.
+ ASSERT_LE(db_size[0], db_size[1]);
// round 3 --- reopen db with auto_compaction on and see if
// deletion compensation still work.
for (int k = 0; k < kTestSize / 10; ++k) {
ASSERT_OK(Put(Key(k), values[k]));
}
- dbfull()->TEST_WaitForFlushMemTable();
- dbfull()->TEST_WaitForCompact();
- db_size[2] = Size(Key(0), Key(kTestSize - 1));
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[2]));
// this time we're expecting significant drop in size.
- ASSERT_GT(db_size[0] / 3, db_size[2]);
+ //
+ // See "CompactionDeletionTrigger" test for proof that at most
+ // `db_size[0] / 2` of the original data remains. In addition to that, this
+ // test inserts `db_size[0] / 10` to push the tombstones into SST files and
+ // then through automatic compactions. So in total `3 * db_size[0] / 5` of
+ // the original data may remain.
+ ASSERT_GT(3 * db_size[0] / 5, db_size[2]);
}
}
CompactRangeOptions cro;
cro.change_level = true;
cro.target_level = 2;
- dbfull()->CompactRange(cro, nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
}
ASSERT_EQ("0,0,3", FilesPerLevel(0));
});
SyncPoint::GetInstance()->EnableProcessing();
env_->SetBackgroundThreads(1, Env::Priority::BOTTOM);
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(1, low_pri_count);
ASSERT_EQ(1, bottom_pri_count);
ASSERT_EQ("0,0,2", FilesPerLevel(0));
// Recompact bottom most level uses bottom pool
CompactRangeOptions cro;
cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
- dbfull()->CompactRange(cro, nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
ASSERT_EQ(1, low_pri_count);
ASSERT_EQ(2, bottom_pri_count);
env_->SetBackgroundThreads(0, Env::Priority::BOTTOM);
- dbfull()->CompactRange(cro, nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
// Low pri pool is used if bottom pool has size 0.
ASSERT_EQ(2, low_pri_count);
ASSERT_EQ(2, bottom_pri_count);
values.push_back(rnd.RandomString(kCDTValueSize));
ASSERT_OK(Put(Key(k), values[k]));
}
- dbfull()->TEST_WaitForFlushMemTable();
- dbfull()->TEST_WaitForCompact();
- db_size[0] = Size(Key(0), Key(kTestSize - 1));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ // L1 and L2 can fit deletions iff size compensation does not take effect,
+ // i.e., when `skip_stats_update_on_db_open == true`. Move any remaining
+ // files at or above L2 down to L3 to ensure obsolete data does not
+ // accidentally meet its tombstone above L3. This makes the final size more
+ // deterministic and easy to see whether size compensation for deletions
+ // took effect.
+ MoveFilesToLevel(3 /* level */);
+ ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[0]));
Close();
// round 2 --- disable auto-compactions and issue deletions.
for (int k = 0; k < kTestSize; ++k) {
ASSERT_OK(Delete(Key(k)));
}
- db_size[1] = Size(Key(0), Key(kTestSize - 1));
+ ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[1]));
Close();
- // as auto_compaction is off, we shouldn't see too much reduce
- // in db size.
- ASSERT_LT(db_size[0] / 3, db_size[1]);
+ // as auto_compaction is off, we shouldn't see any reduction in db size.
+ ASSERT_LE(db_size[0], db_size[1]);
// round 3 --- reopen db with auto_compaction on and see if
// deletion compensation still work.
options.disable_auto_compactions = false;
Reopen(options);
- dbfull()->TEST_WaitForFlushMemTable();
- dbfull()->TEST_WaitForCompact();
- db_size[2] = Size(Key(0), Key(kTestSize - 1));
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_OK(Size(Key(0), Key(kTestSize - 1), &db_size[2]));
if (options.skip_stats_update_on_db_open) {
// If update stats on DB::Open is disable, we don't expect
// deletion entries taking effect.
- ASSERT_LT(db_size[0] / 3, db_size[2]);
+ //
+ // The deletions are small enough to fit in L1 and L2, and obsolete keys
+ // were moved to L3+, so none of the original data should have been
+ // dropped.
+ ASSERT_LE(db_size[0], db_size[2]);
} else {
// Otherwise, we should see a significant drop in db size.
- ASSERT_GT(db_size[0] / 3, db_size[2]);
+ //
+ // See "CompactionDeletionTrigger" test for proof that at most
+ // `db_size[0] / 2` of the original data remains.
+ ASSERT_GT(db_size[0] / 2, db_size[2]);
}
}
}
-
TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
const int kNumKeysPerFile = 100;
options.num_levels = 3;
options.level0_file_num_compaction_trigger = 3;
options.max_subcompactions = max_subcompactions_;
- options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+ options.memtable_factory.reset(
+ test::NewSpecialSkipListFactory(kNumKeysPerFile));
CreateAndReopenWithCF({"pikachu"}, options);
Random rnd(301);
}
// put extra key to trigger flush
ASSERT_OK(Put(1, "", ""));
- dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1);
}
}
// put extra key to trigger flush
ASSERT_OK(Put(1, "", ""));
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1);
options.level0_slowdown_writes_trigger = 20;
options.soft_pending_compaction_bytes_limit = 1 << 30; // Infinitely large
options.max_background_compactions = 3;
- options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
+ options.memtable_factory.reset(
+ test::NewSpecialSkipListFactory(kNumKeysPerFile));
// Block all threads in thread pool.
const size_t kTotalTasks = 4;
}
// put extra key to trigger flush
ASSERT_OK(Put(cf, "", ""));
- dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
}
}
}
// put extra key to trigger flush
ASSERT_OK(Put(2, "", ""));
- dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
NumTableFilesAtLevel(0, 2));
}
sleeping_tasks[i].WakeUp();
sleeping_tasks[i].WaitUntilDone();
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Verify number of compactions allowed will come back to 1.
}
// put extra key to trigger flush
ASSERT_OK(Put(cf, "", ""));
- dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
}
}
TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) {
Options options = CurrentOptions();
- options.write_buffer_size = 100000000; // Large write buffer
+ options.write_buffer_size = 100000000; // Large write buffer
options.max_subcompactions = max_subcompactions_;
CreateAndReopenWithCF({"pikachu"}, options);
// Reopening moves updates to level-0
ReopenWithColumnFamilies({"default", "pikachu"}, options);
- dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
- true /* disallow trivial move */);
+ ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
+ true /* disallow trivial move */));
ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
ASSERT_GT(NumTableFilesAtLevel(1, 1), 1);
DestroyAndReopen(options);
// create first file and flush to l0
- Put("4", "A");
- Put("3", "A");
- Flush();
- dbfull()->TEST_WaitForFlushMemTable();
-
- Put("2", "A");
- Delete("3");
- Flush();
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(Put("4", "A"));
+ ASSERT_OK(Put("3", "A"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+
+ ASSERT_OK(Put("2", "A"));
+ ASSERT_OK(Delete("3"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ("NOT_FOUND", Get("3"));
// move both files down to l1
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("NOT_FOUND", Get("3"));
for (int i = 0; i < 3; i++) {
- Put("2", "B");
- Flush();
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(Put("2", "B"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("NOT_FOUND", Get("3"));
}
DestroyAndReopen(options);
// create first file and flush to l0
- Put("4", "A");
- Put("3", "A");
- Flush();
- dbfull()->TEST_WaitForFlushMemTable();
-
- Put("2", "A");
- SingleDelete("3");
- Flush();
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(Put("4", "A"));
+ ASSERT_OK(Put("3", "A"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+
+ ASSERT_OK(Put("2", "A"));
+ ASSERT_OK(SingleDelete("3"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ("NOT_FOUND", Get("3"));
// move both files down to l1
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("NOT_FOUND", Get("3"));
for (int i = 0; i < 3; i++) {
- Put("2", "B");
- Flush();
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(Put("2", "B"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("NOT_FOUND", Get("3"));
}
DestroyAndReopen(options);
// create first file and flush to l0
- Put("aaaa1", "A");
- Put("bbbb1", "B");
- Flush();
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(Put("aaaa1", "A"));
+ ASSERT_OK(Put("bbbb1", "B"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
- Put("aaaa1", "A2");
- Flush();
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(Put("aaaa1", "A2"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// move both files down to l1
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
std::vector<LiveFileMetaData> files;
dbfull()->GetLiveFilesMetaData(&files);
DestroyAndReopen(options);
// create first file and flush to l0
- Put("aaaa1", "A");
- Put("bbbb1", "B");
- Flush();
- dbfull()->TEST_WaitForFlushMemTable();
- dbfull()->TEST_WaitForCompact(true);
+ ASSERT_OK(Put("aaaa1", "A"));
+ ASSERT_OK(Put("bbbb1", "B"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
std::vector<LiveFileMetaData> files;
dbfull()->GetLiveFilesMetaData(&files);
compact_opt.compression = kNoCompression;
compact_opt.output_file_size_limit = 4096;
const size_t key_len =
- static_cast<size_t>(compact_opt.output_file_size_limit) / 5;
+ static_cast<size_t>(compact_opt.output_file_size_limit) / 5;
DestroyAndReopen(options);
// create first file and flush to l0
for (auto& key : {"1", "2", "3", "3", "3", "3"}) {
- Put(key, std::string(key_len, 'A'));
+ ASSERT_OK(Put(key, std::string(key_len, 'A')));
snaps.push_back(dbfull()->GetSnapshot());
}
- Flush();
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// create second file and flush to l0
for (auto& key : {"3", "4", "5", "6", "7", "8"}) {
- Put(key, std::string(key_len, 'A'));
+ ASSERT_OK(Put(key, std::string(key_len, 'A')));
snaps.push_back(dbfull()->GetSnapshot());
}
- Flush();
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
// move both files down to l1
- dbfull()->CompactFiles(compact_opt, collector->GetFlushedFiles(), 1);
+ ASSERT_OK(
+ dbfull()->CompactFiles(compact_opt, collector->GetFlushedFiles(), 1));
// release snap so that first instance of key(3) can have seqId=0
for (auto snap : snaps) {
// create 3 files in l0 so to trigger compaction
for (int i = 0; i < options.level0_file_num_compaction_trigger; i++) {
- Put("2", std::string(1, 'A'));
- Flush();
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(Put("2", std::string(1, 'A')));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(Put("", ""));
}
// create two files in l1 that we can compact
for (int i = 0; i < 2; ++i) {
for (int j = 0; j < options.level0_file_num_compaction_trigger; j++) {
- // make l0 files' ranges overlap to avoid trivial move
- Put(std::to_string(2 * i), std::string(1, 'A'));
- Put(std::to_string(2 * i + 1), std::string(1, 'A'));
- Flush();
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(Put(std::to_string(2 * i), std::string(1, 'A')));
+ ASSERT_OK(Put(std::to_string(2 * i + 1), std::string(1, 'A')));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
- dbfull()->TEST_WaitForCompact();
- ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
- ASSERT_EQ(NumTableFilesAtLevel(1, 0), i + 1);
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
+ ASSERT_OK(
+ dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"}}));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
+ ASSERT_EQ(NumTableFilesAtLevel(1, 0), 2);
+ ASSERT_OK(
+ dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "3"}}));
ColumnFamilyMetaData cf_meta;
dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
// note CompactionOptions::output_file_size_limit is unset.
CompactionOptions compact_opt;
compact_opt.compression = kNoCompression;
- dbfull()->CompactFiles(compact_opt, input_filenames, 1);
+ ASSERT_OK(dbfull()->CompactFiles(compact_opt, input_filenames, 1));
}
// Check that writes done during a memtable compaction are recovered
cro.exclusive_manual_compaction = exclusive_manual_compaction_;
// Compaction will initiate a trivial move from L0 to L1
- dbfull()->CompactRange(cro, nullptr, nullptr);
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
// File moved From L0 to L1
ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0
DestroyAndReopen(options);
// non overlapping ranges
std::vector<std::pair<int32_t, int32_t>> ranges = {
- {100, 199},
- {300, 399},
- {0, 99},
- {200, 299},
- {600, 699},
- {400, 499},
- {500, 550},
- {551, 599},
+ {100, 199}, {300, 399}, {0, 99}, {200, 299},
+ {600, 699}, {400, 499}, {500, 550}, {551, 599},
};
int32_t value_size = 10 * 1024; // 10 KB
// Since data is non-overlapping we expect compaction to initiate
// a trivial move
- db_->CompactRange(cro, nullptr, nullptr);
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
// We expect that all the files were trivially moved from L0 to L1
ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files);
DestroyAndReopen(options);
// Same ranges as above but overlapping
ranges = {
- {100, 199},
- {300, 399},
- {0, 99},
- {200, 299},
- {600, 699},
- {400, 499},
- {500, 560}, // this range overlap with the next one
- {551, 599},
+ {100, 199},
+ {300, 399},
+ {0, 99},
+ {200, 299},
+ {600, 699},
+ {400, 499},
+ {500, 560}, // this range overlap with the next
+ // one
+ {551, 599},
};
for (size_t i = 0; i < ranges.size(); i++) {
for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
ASSERT_OK(Flush());
}
- db_->CompactRange(cro, nullptr, nullptr);
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
for (size_t i = 0; i < ranges.size(); i++) {
for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
}
}
+TEST_P(DBCompactionTestWithParam, PartialOverlappingL0) {
+ class SubCompactionEventListener : public EventListener {
+ public:
+ void OnSubcompactionCompleted(const SubcompactionJobInfo&) override {
+ sub_compaction_finished_++;
+ }
+ std::atomic<int> sub_compaction_finished_{0};
+ };
+
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.write_buffer_size = 10 * 1024 * 1024;
+ options.max_subcompactions = max_subcompactions_;
+ SubCompactionEventListener* listener = new SubCompactionEventListener();
+ options.listeners.emplace_back(listener);
+
+ DestroyAndReopen(options);
+
+ // For subcompactino to trigger, output level needs to be non-empty.
+ ASSERT_OK(Put("key", ""));
+ ASSERT_OK(Put("kez", ""));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put("key", ""));
+ ASSERT_OK(Put("kez", ""));
+ ASSERT_OK(Flush());
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+
+ // Ranges that are only briefly overlapping so that they won't be trivially
+ // moved but subcompaction ranges would only contain a subset of files.
+ std::vector<std::pair<int32_t, int32_t>> ranges = {
+ {100, 199}, {198, 399}, {397, 600}, {598, 800}, {799, 900}, {895, 999},
+ };
+ int32_t value_size = 10 * 1024; // 10 KB
+
+ Random rnd(301);
+ std::map<int32_t, std::string> values;
+ for (size_t i = 0; i < ranges.size(); i++) {
+ for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
+ values[j] = rnd.RandomString(value_size);
+ ASSERT_OK(Put(Key(j), values[j]));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ int32_t level0_files = NumTableFilesAtLevel(0, 0);
+ ASSERT_EQ(level0_files, ranges.size()); // Multiple files in L0
+ ASSERT_EQ(NumTableFilesAtLevel(1, 0), 1); // One file in L1
+
+ listener->sub_compaction_finished_ = 0;
+ ASSERT_OK(db_->EnableAutoCompaction({db_->DefaultColumnFamily()}));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ if (max_subcompactions_ > 3) {
+ // RocksDB might not generate the exact number of sub compactions.
+ // Here we validate that at least subcompaction happened.
+ ASSERT_GT(listener->sub_compaction_finished_.load(), 2);
+ }
+
+ // We expect that all the files were compacted to L1
+ ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
+ ASSERT_GT(NumTableFilesAtLevel(1, 0), 1);
+
+ for (size_t i = 0; i < ranges.size(); i++) {
+ for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
+ ASSERT_EQ(Get(Key(j)), values[j]);
+ }
+ }
+}
+
TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
int32_t trivial_move = 0;
int32_t non_trivial_move = 0;
ASSERT_EQ("3,0,0,0,0,1,2", FilesPerLevel(0));
TEST_SYNC_POINT("DBCompaction::ManualPartial:5");
- dbfull()->TEST_WaitForFlushMemTable();
- dbfull()->TEST_WaitForCompact();
- // After two non-trivial compactions are installed, there is 1 file in L6, and
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ // After two non-trivial compactions are installed, there is 1 file in L6, and
// 1 file in L1
ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel(0));
threads.join();
for (int32_t j = 300; j < 4300; j++) {
if (j == 2300) {
ASSERT_OK(Flush());
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
values[j] = rnd.RandomString(value_size);
ASSERT_OK(Put(Key(j), values[j]));
}
TEST_SYNC_POINT("DBCompaction::PartialFill:2");
- dbfull()->TEST_WaitForFlushMemTable();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
threads.join();
for (int32_t i = 0; i < 4300; i++) {
Options options = CurrentOptions();
options.unordered_write = true;
DestroyAndReopen(options);
- Put("foo", "v1");
+ ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Flush());
- Put("bar", "v1");
+ ASSERT_OK(Put("bar", "v1"));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- port::Thread writer([&]() { Put("foo", "v2"); });
+ port::Thread writer([&]() { ASSERT_OK(Put("foo", "v2")); });
TEST_SYNC_POINT(
"DBCompactionTest::ManualCompactionWithUnorderedWrite:WaitWriteWAL");
for (int32_t j = 300; j < 4300; j++) {
if (j == 2300) {
ASSERT_OK(Flush());
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
values[j] = rnd.RandomString(value_size);
ASSERT_OK(Put(Key(j), values[j]));
}
}
ASSERT_OK(Flush());
- dbfull()->TEST_WaitForFlushMemTable();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Verify level sizes
uint64_t target_size = 4 * options.max_bytes_for_level_base;
options.max_bytes_for_level_multiplier);
}
- size_t old_num_files = CountFiles();
+ const size_t old_num_files = CountFiles();
std::string begin_string = Key(1000);
std::string end_string = Key(2000);
Slice begin(begin_string);
compact_options.change_level = true;
compact_options.target_level = 1;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_OK(
DeleteFilesInRange(db_, db_->DefaultColumnFamily(), nullptr, nullptr));
for (int32_t i = 0; i < 4300; i++) {
ReadOptions roptions;
std::string result;
- Status s = db_->Get(roptions, Key(i), &result);
- ASSERT_TRUE(s.IsNotFound());
+ ASSERT_TRUE(db_->Get(roptions, Key(i), &result).IsNotFound());
deleted_count2++;
}
ASSERT_GT(deleted_count2, deleted_count);
- size_t new_num_files = CountFiles();
+ const size_t new_num_files = CountFiles();
ASSERT_GT(old_num_files, new_num_files);
}
ASSERT_EQ("0,0,10", FilesPerLevel(0));
// file [0 => 100), [200 => 300), ... [800, 900)
- for (auto i = 0; i < 10; i+=2) {
+ for (auto i = 0; i < 10; i += 2) {
for (auto j = 0; j < 100; j++) {
auto k = i * 100 + j;
ASSERT_OK(Put(Key(k), values[k]));
std::string vals[kNumL0Files];
for (int i = 0; i < kNumL0Files; ++i) {
vals[i] = rnd.RandomString(kValSize);
- Put(Key(i), vals[i]);
- Put(Key(i + 1), vals[i]);
- Flush();
+ ASSERT_OK(Put(Key(i), vals[i]));
+ ASSERT_OK(Put(Key(i + 1), vals[i]));
+ ASSERT_OK(Flush());
if (i == 0) {
snapshot = db_->GetSnapshot();
}
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Verify `DeleteFilesInRange` can't drop only file 0 which would cause
// "1 -> vals[0]" to reappear.
options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
options.memtable_factory.reset(
- new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
+ test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = 110 << 10; // 110KB
options.arena_block_size = 4 << 10;
options.num_levels = 4;
options.max_bytes_for_level_base = 400 * 1024;
options.max_subcompactions = max_subcompactions_;
- // options = CurrentOptions(options);
- std::vector<std::string> filenames;
- env_->GetChildren(options.db_paths[1].path, &filenames);
- // Delete archival files.
- for (size_t i = 0; i < filenames.size(); ++i) {
- env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
- }
- env_->DeleteDir(options.db_paths[1].path);
- Reopen(options);
+ DestroyAndReopen(options);
Random rnd(301);
int key_idx = 0;
options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
options.memtable_factory.reset(
- new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
+ test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = 110 << 10; // 110KB
options.arena_block_size = 4 << 10;
options.num_levels = 4;
options.max_bytes_for_level_base = 400 * 1024;
options.max_subcompactions = max_subcompactions_;
- // options = CurrentOptions(options);
- std::vector<std::string> filenames;
- env_->GetChildren(options.db_paths[1].path, &filenames);
- // Delete archival files.
- for (size_t i = 0; i < filenames.size(); ++i) {
- env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
- }
- env_->DeleteDir(options.db_paths[1].path);
- Reopen(options);
+ DestroyAndReopen(options);
Random rnd(301);
int key_idx = 0;
options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
options.memtable_factory.reset(
- new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
+ test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = 110 << 10; // 110KB
options.arena_block_size = 4 << 10;
cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 4 * 1024 * 1024);
cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 1024 * 1024 * 1024);
option_vector.emplace_back(DBOptions(options), cf_opt1);
- CreateColumnFamilies({"one"},option_vector[1]);
+ CreateColumnFamilies({"one"}, option_vector[1]);
- // Configura CF2 specific paths.
+ // Configure CF2 specific paths.
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 500 * 1024);
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 4 * 1024 * 1024);
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 1024 * 1024 * 1024);
option_vector.emplace_back(DBOptions(options), cf_opt2);
- CreateColumnFamilies({"two"},option_vector[2]);
+ CreateColumnFamilies({"two"}, option_vector[2]);
ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
// Check that default column family uses db_paths.
// And Column family "one" uses cf_paths.
- // First three 110KB files are not going to second path.
- // After that, (100K, 200K)
+ // The compaction in level0 outputs the sst files in level1.
+ // The first path cannot hold level1's data(400KB+400KB > 500KB),
+ // so every compaction move a sst file to second path. Please
+ // refer to LevelCompactionBuilder::GetPathId.
for (int num = 0; num < 3; num++) {
generate_file();
}
+ check_sstfilecount(0, 1);
+ check_sstfilecount(1, 2);
- // Another 110KB triggers a compaction to 400K file to fill up first path
generate_file();
check_sstfilecount(1, 3);
check_getvalues();
+ { // Also verify GetLiveFilesStorageInfo with db_paths / cf_paths
+ std::vector<LiveFileStorageInfo> new_infos;
+ LiveFilesStorageInfoOptions lfsio;
+ lfsio.wal_size_for_flush = UINT64_MAX; // no flush
+ ASSERT_OK(db_->GetLiveFilesStorageInfo(lfsio, &new_infos));
+ std::unordered_map<std::string, int> live_sst_by_dir;
+ for (auto& info : new_infos) {
+ if (info.file_type == kTableFile) {
+ live_sst_by_dir[info.directory]++;
+ // Verify file on disk (no directory confusion)
+ uint64_t size;
+ ASSERT_OK(env_->GetFileSize(
+ info.directory + "/" + info.relative_filename, &size));
+ ASSERT_EQ(info.size, size);
+ }
+ }
+ ASSERT_EQ(3U * 3U, live_sst_by_dir.size());
+ for (auto& paths : {options.db_paths, cf_opt1.cf_paths, cf_opt2.cf_paths}) {
+ ASSERT_EQ(1, live_sst_by_dir[paths[0].path]);
+ ASSERT_EQ(4, live_sst_by_dir[paths[1].path]);
+ ASSERT_EQ(2, live_sst_by_dir[paths[2].path]);
+ }
+ }
+
ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
check_getvalues();
ASSERT_OK(Put(1, Key(i), rnd.RandomString(10000)));
}
ASSERT_OK(Flush(1));
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_GT(TotalTableFiles(1, 4), 1);
int non_level0_num_files = 0;
compact_options.bottommost_level_compaction =
BottommostLevelCompaction::kForce;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
- dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
+ ASSERT_OK(
+ dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr));
// Only 1 file in L0
ASSERT_EQ("1", FilesPerLevel(1));
for (int i = max_key_level_insert / 2; i <= max_key_universal_insert; i++) {
ASSERT_OK(Put(1, Key(i), rnd.RandomString(10000)));
}
- dbfull()->Flush(FlushOptions());
+ ASSERT_OK(dbfull()->Flush(FlushOptions()));
ASSERT_OK(Flush(1));
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
for (int i = 1; i < options.num_levels; i++) {
ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
// compaction style
std::string keys_in_db;
Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]);
+ ASSERT_OK(iter->status());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
keys_in_db.append(iter->key().ToString());
keys_in_db.push_back(',');
TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_b) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
- Put(1, "", "");
+ ASSERT_OK(Put(1, "", ""));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- Delete(1, "e");
- Put(1, "", "");
+ ASSERT_OK(Delete(1, "e"));
+ ASSERT_OK(Put(1, "", ""));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- Put(1, "c", "cv");
+ ASSERT_OK(Put(1, "c", "cv"));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- Put(1, "", "");
+ ASSERT_OK(Put(1, "", ""));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- Put(1, "", "");
+ ASSERT_OK(Put(1, "", ""));
env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- Put(1, "d", "dv");
+ ASSERT_OK(Put(1, "d", "dv"));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- Put(1, "", "");
+ ASSERT_OK(Put(1, "", ""));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
- Delete(1, "d");
- Delete(1, "b");
+ ASSERT_OK(Delete(1, "d"));
+ ASSERT_OK(Delete(1, "b"));
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
ASSERT_EQ("(->)(c->cv)", Contents(1));
env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- Put(1, "foo", "");
- Put(1, "bar", "");
- Flush(1);
- Put(1, "foo", "");
- Put(1, "bar", "");
+ ASSERT_OK(Put(1, "foo", ""));
+ ASSERT_OK(Put(1, "bar", ""));
+ ASSERT_OK(Flush(1));
+ ASSERT_OK(Put(1, "foo", ""));
+ ASSERT_OK(Put(1, "bar", ""));
// Generate four files in CF 0, which should trigger an auto compaction
- Put("foo", "");
- Put("bar", "");
- Flush();
- Put("foo", "");
- Put("bar", "");
- Flush();
- Put("foo", "");
- Put("bar", "");
- Flush();
- Put("foo", "");
- Put("bar", "");
- Flush();
+ ASSERT_OK(Put("foo", ""));
+ ASSERT_OK(Put("bar", ""));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put("foo", ""));
+ ASSERT_OK(Put("bar", ""));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put("foo", ""));
+ ASSERT_OK(Put("bar", ""));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put("foo", ""));
+ ASSERT_OK(Put("bar", ""));
+ ASSERT_OK(Flush());
// The auto compaction is scheduled but waited until here
TEST_SYNC_POINT("DBCompactionTest::ManualAutoRace:1");
// The auto compaction will wait until the manual compaction is registerd
// before processing so that it will be cancelled.
- dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
+ CompactRangeOptions cro;
+ cro.exclusive_manual_compaction = true;
+ ASSERT_OK(dbfull()->CompactRange(cro, handles_[1], nullptr, nullptr));
ASSERT_EQ("0,1", FilesPerLevel(1));
// Eventually the cancelled compaction will be rescheduled and executed.
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,1", FilesPerLevel(0));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
options.statistics->getTickerCount(BLOCK_CACHE_ADD);
CompactRangeOptions cro;
cro.exclusive_manual_compaction = exclusive_manual_compaction_;
- db_->CompactRange(cro, handles_[1], nullptr, nullptr);
+ ASSERT_OK(db_->CompactRange(cro, handles_[1], nullptr, nullptr));
// Verify manual compaction doesn't fill block cache
ASSERT_EQ(prev_block_cache_add,
options.statistics->getTickerCount(BLOCK_CACHE_ADD));
}
}
-
TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
Options options = CurrentOptions();
options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
CompactRangeOptions compact_options;
compact_options.target_path_id = 1;
compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
- db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
+ ASSERT_OK(
+ db_->CompactRange(compact_options, handles_[1], nullptr, nullptr));
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,1", FilesPerLevel(1));
Random rnd(301);
for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
- ASSERT_OK(Put(1, ToString(key), rnd.RandomString(kTestValueSize)));
+ ASSERT_OK(Put(1, std::to_string(key), rnd.RandomString(kTestValueSize)));
}
- dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[1]));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ColumnFamilyMetaData cf_meta;
dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
auto file_meta = PickFileRandomly(cf_meta, &rnd, &level);
compaction_input_file_names.push_back(file_meta->name);
GetOverlappingFileNumbersForLevelCompaction(
- cf_meta, options.comparator, level, output_level,
- file_meta, &overlapping_file_names);
+ cf_meta, options.comparator, level, output_level, file_meta,
+ &overlapping_file_names);
}
- ASSERT_OK(dbfull()->CompactFiles(
- CompactionOptions(), handles_[1],
- compaction_input_file_names,
- output_level));
+ ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), handles_[1],
+ compaction_input_file_names,
+ output_level));
// Make sure all overlapping files do not exist after compaction
dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
// make sure all key-values are still there.
for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
- ASSERT_NE(Get(1, ToString(key)), "NOT_FOUND");
+ ASSERT_NE(Get(1, std::to_string(key)), "NOT_FOUND");
}
}
options.write_buffer_size = kKeysPerBuffer * kKvSize;
options.max_write_buffer_number = 2;
options.target_file_size_base =
- options.write_buffer_size *
- (options.max_write_buffer_number - 1);
+ options.write_buffer_size * (options.max_write_buffer_number - 1);
options.level0_file_num_compaction_trigger = kNumL1Files;
options.max_bytes_for_level_base =
options.level0_file_num_compaction_trigger *
DestroyAndReopen(options);
- const int kNumInsertedKeys =
- options.level0_file_num_compaction_trigger *
- (options.max_write_buffer_number - 1) *
- kKeysPerBuffer;
+ const int kNumInsertedKeys = options.level0_file_num_compaction_trigger *
+ (options.max_write_buffer_number - 1) *
+ kKeysPerBuffer;
Random rnd(301);
std::vector<std::string> keys;
keys.emplace_back(rnd.RandomString(kKeySize));
values.emplace_back(rnd.RandomString(kKvSize - kKeySize));
ASSERT_OK(Put(Slice(keys[k]), Slice(values[k])));
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
- dbfull()->TEST_FlushMemTable(true);
+ ASSERT_OK(dbfull()->TEST_FlushMemTable(true));
// Make sure the number of L0 files can trigger compaction.
ASSERT_GE(NumTableFilesAtLevel(0),
options.level0_file_num_compaction_trigger);
ASSERT_OK(Flush());
}
// this should execute L0->L1
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,1", FilesPerLevel(0));
// block compactions
sleeping_task.WaitUntilDone();
// this should execute L1->L2 (move)
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,1", FilesPerLevel(0));
ASSERT_OK(Flush());
}
// this should execute both L0->L1 and L1->L2 (merge with previous file)
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("0,0,2", FilesPerLevel(0));
ASSERT_OK(env_->FileExists(dbname_ + moved_file_name));
listener->SetExpectedFileName(dbname_ + moved_file_name);
+ ASSERT_OK(iterator->status());
iterator.reset();
// this file should have been compacted away
}
Options options = CurrentOptions();
options.memtable_factory.reset(
- new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
+ test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = 110 << 10; // 110KB
options.arena_block_size = 4 << 10;
for (int num = 0; num < 10; num++) {
GenerateNewRandomFile(&rnd);
}
- db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"CompactionJob::Run():Start",
"DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1");
GenerateNewRandomFile(&rnd, /* nowait */ true);
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(experimental::SuggestCompactRange(db_, nullptr, nullptr));
for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
num++) {
TEST_SYNC_POINT(
"DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2");
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
static std::string ShortKey(int i) {
}
ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
std::vector<std::vector<FileMetaData>> level_to_files;
ASSERT_OK(Put(Key(i + 1), value));
ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
std::vector<std::vector<FileMetaData>> level_to_files;
int key_idx = 0;
GenerateNewFile(&rnd, &key_idx);
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(1, num_bottom_pri_compactions);
// So key 0, 2, and 4+ fall outside these levels' key-ranges.
for (int level = 2; level >= 1; --level) {
for (int i = 0; i < 2; ++i) {
- Put(Key(2 * i + 1), "val");
- Flush();
+ ASSERT_OK(Put(Key(2 * i + 1), "val"));
+ ASSERT_OK(Flush());
}
MoveFilesToLevel(level);
ASSERT_EQ(2, NumTableFilesAtLevel(level));
// - Tombstones for keys 2 and 4 can be dropped early.
// - Tombstones for keys 1 and 3 must be kept due to L2 files' key-ranges.
for (int i = 0; i < kNumL0Files; ++i) {
- Put(Key(0), "val"); // sentinel to prevent trivial move
- Delete(Key(i + 1));
- Flush();
+ ASSERT_OK(Put(Key(0), "val")); // sentinel to prevent trivial move
+ ASSERT_OK(Delete(Key(i + 1)));
+ ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
for (int i = 0; i < kNumL0Files; ++i) {
std::string value;
ASSERT_EQ(kNumL0Files, cf_meta.levels[0].files.size());
std::vector<std::string> input_filenames;
input_filenames.push_back(cf_meta.levels[0].files.front().name);
- ASSERT_OK(dbfull()
- ->CompactFiles(CompactionOptions(), input_filenames,
- 0 /* output_level */));
+ ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), input_filenames,
+ 0 /* output_level */));
TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) {
// Regression test for bug of not pulling in L0 files that overlap the user-
// specified input files in time- and key-ranges.
- Put(Key(0), "old_val");
- Flush();
- Put(Key(0), "new_val");
- Flush();
+ ASSERT_OK(Put(Key(0), "old_val"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put(Key(0), "new_val"));
+ ASSERT_OK(Flush());
ColumnFamilyMetaData cf_meta;
dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
ASSERT_EQ("new_val", Get(Key(0)));
}
+TEST_F(DBCompactionTest, DeleteFilesInRangeConflictWithCompaction) {
+ Options options = CurrentOptions();
+ DestroyAndReopen(options);
+ const Snapshot* snapshot = nullptr;
+ const int kMaxKey = 10;
+
+ for (int i = 0; i < kMaxKey; i++) {
+ ASSERT_OK(Put(Key(i), Key(i)));
+ ASSERT_OK(Delete(Key(i)));
+ if (!snapshot) {
+ snapshot = db_->GetSnapshot();
+ }
+ }
+ ASSERT_OK(Flush());
+ MoveFilesToLevel(1);
+ ASSERT_OK(Put(Key(kMaxKey), Key(kMaxKey)));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ // test DeleteFilesInRange() deletes the files already picked for compaction
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"VersionSet::LogAndApply:WriteManifestStart",
+ "BackgroundCallCompaction:0"},
+ {"DBImpl::BackgroundCompaction:Finish",
+ "VersionSet::LogAndApply:WriteManifestDone"}});
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ // release snapshot which mark bottommost file for compaction
+ db_->ReleaseSnapshot(snapshot);
+ std::string begin_string = Key(0);
+ std::string end_string = Key(kMaxKey + 1);
+ Slice begin(begin_string);
+ Slice end(end_string);
+ ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
+ SyncPoint::GetInstance()->DisableProcessing();
+}
+
TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
// bottom-level files may contain deletions due to snapshots protecting the
// deleted keys. Once the snapshot is released, we should see files with many
ASSERT_OK(Delete(Key(j)));
}
}
- Flush();
+ ASSERT_OK(Flush());
if (i < kNumLevelFiles - 1) {
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
}
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(kNumLevelFiles, NumTableFilesAtLevel(1));
std::vector<LiveFileMetaData> pre_release_metadata, post_release_metadata;
CompactionReason::kBottommostFiles);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
db_->GetLiveFilesMetaData(&post_release_metadata);
ASSERT_EQ(pre_release_metadata.size(), post_release_metadata.size());
ASSERT_OK(Delete(Key(j)));
}
}
- Flush();
+ ASSERT_OK(Flush());
if (i < kNumLevelFiles - 1) {
ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
}
}
- dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr);
+ ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr));
ASSERT_EQ(kNumLevelFiles, NumTableFilesAtLevel(1));
std::vector<LiveFileMetaData> pre_release_metadata, post_release_metadata;
[&](void* /*arg*/) { num_compactions.fetch_add(1); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
db_->ReleaseSnapshot(snapshot);
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, num_compactions);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
}
+TEST_F(DBCompactionTest, RoundRobinTtlCompactionNormal) {
+ Options options = CurrentOptions();
+ options.compression = kNoCompression;
+ options.level0_file_num_compaction_trigger = 20;
+ options.ttl = 24 * 60 * 60; // 24 hours
+ options.compaction_pri = kRoundRobin;
+ env_->now_cpu_count_.store(0);
+ env_->SetMockSleep();
+ options.env = env_;
+
+ // add a small second for each wait time, to make sure the file is expired
+ int small_seconds = 1;
+
+ std::atomic_int ttl_compactions{0};
+ std::atomic_int round_robin_ttl_compactions{0};
+ std::atomic_int other_compactions{0};
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
+ Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+ auto compaction_reason = compaction->compaction_reason();
+ if (compaction_reason == CompactionReason::kTtl) {
+ ttl_compactions++;
+ } else if (compaction_reason == CompactionReason::kRoundRobinTtl) {
+ round_robin_ttl_compactions++;
+ } else {
+ other_compactions++;
+ }
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ DestroyAndReopen(options);
+
+ // Setup the files from lower level to up level, each file is 1 hour's older
+ // than the next one.
+ // create 10 files on the last level (L6)
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 100; j++) {
+ ASSERT_OK(Put(Key(i * 100 + j), "value" + std::to_string(i * 100 + j)));
+ }
+ ASSERT_OK(Flush());
+ env_->MockSleepForSeconds(60 * 60); // generate 1 file per hour
+ }
+ MoveFilesToLevel(6);
+
+ // create 5 files on L5
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 200; j++) {
+ ASSERT_OK(Put(Key(i * 200 + j), "value" + std::to_string(i * 200 + j)));
+ }
+ ASSERT_OK(Flush());
+ env_->MockSleepForSeconds(60 * 60);
+ }
+ MoveFilesToLevel(5);
+
+ // create 3 files on L4
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 300; j++) {
+ ASSERT_OK(Put(Key(i * 300 + j), "value" + std::to_string(i * 300 + j)));
+ }
+ ASSERT_OK(Flush());
+ env_->MockSleepForSeconds(60 * 60);
+ }
+ MoveFilesToLevel(4);
+
+ // The LSM tree should be like:
+ // L4: [0, 299], [300, 599], [600, 899]
+ // L5: [0, 199] [200, 399]...............[800, 999]
+ // L6: [0,99][100,199][200,299][300,399]...............[800,899][900,999]
+ ASSERT_EQ("0,0,0,0,3,5,10", FilesPerLevel());
+
+ // make sure the first L5 file is expired
+ env_->MockSleepForSeconds(16 * 60 * 60 + small_seconds++);
+
+ // trigger TTL compaction
+ ASSERT_OK(Put(Key(4), "value" + std::to_string(1)));
+ ASSERT_OK(Put(Key(5), "value" + std::to_string(1)));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ // verify there's a RoundRobin TTL compaction
+ ASSERT_EQ(1, round_robin_ttl_compactions);
+ round_robin_ttl_compactions = 0;
+
+ // expire 2 more files
+ env_->MockSleepForSeconds(2 * 60 * 60 + small_seconds++);
+ // trigger TTL compaction
+ ASSERT_OK(Put(Key(4), "value" + std::to_string(2)));
+ ASSERT_OK(Put(Key(5), "value" + std::to_string(2)));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ ASSERT_EQ(2, round_robin_ttl_compactions);
+ round_robin_ttl_compactions = 0;
+
+ // expire 4 more files, 2 out of 3 files on L4 are expired
+ env_->MockSleepForSeconds(4 * 60 * 60 + small_seconds++);
+ // trigger TTL compaction
+ ASSERT_OK(Put(Key(6), "value" + std::to_string(3)));
+ ASSERT_OK(Put(Key(7), "value" + std::to_string(3)));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ ASSERT_EQ(1, NumTableFilesAtLevel(4));
+ ASSERT_EQ(0, NumTableFilesAtLevel(5));
+
+ ASSERT_GT(round_robin_ttl_compactions, 0);
+ round_robin_ttl_compactions = 0;
+
+ // make the first L0 file expired, which triggers a normal TTL compaction
+ // instead of roundrobin TTL compaction, it will also include an extra file
+ // from L0 because of overlap
+ ASSERT_EQ(0, ttl_compactions);
+ env_->MockSleepForSeconds(19 * 60 * 60 + small_seconds++);
+
+ // trigger TTL compaction
+ ASSERT_OK(Put(Key(6), "value" + std::to_string(4)));
+ ASSERT_OK(Put(Key(7), "value" + std::to_string(4)));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ // L0 -> L1 compaction is normal TTL compaction, L1 -> next levels compactions
+ // are RoundRobin TTL compaction.
+ ASSERT_GT(ttl_compactions, 0);
+ ttl_compactions = 0;
+ ASSERT_GT(round_robin_ttl_compactions, 0);
+ round_robin_ttl_compactions = 0;
+
+ // All files are expired, so only the last level has data
+ env_->MockSleepForSeconds(24 * 60 * 60);
+ // trigger TTL compaction
+ ASSERT_OK(Put(Key(6), "value" + std::to_string(4)));
+ ASSERT_OK(Put(Key(7), "value" + std::to_string(4)));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
+
+ ASSERT_GT(ttl_compactions, 0);
+ ttl_compactions = 0;
+ ASSERT_GT(round_robin_ttl_compactions, 0);
+ round_robin_ttl_compactions = 0;
+
+ ASSERT_EQ(0, other_compactions);
+}
+
+TEST_F(DBCompactionTest, RoundRobinTtlCompactionUnsortedTime) {
+ // This is to test the case that the RoundRobin compaction cursor not pointing
+ // to the oldest file, RoundRobin compaction should still compact the file
+ // after cursor until all expired files are compacted.
+ Options options = CurrentOptions();
+ options.compression = kNoCompression;
+ options.level0_file_num_compaction_trigger = 20;
+ options.ttl = 24 * 60 * 60; // 24 hours
+ options.compaction_pri = kRoundRobin;
+ env_->now_cpu_count_.store(0);
+ env_->SetMockSleep();
+ options.env = env_;
+
+ std::atomic_int ttl_compactions{0};
+ std::atomic_int round_robin_ttl_compactions{0};
+ std::atomic_int other_compactions{0};
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
+ Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+ auto compaction_reason = compaction->compaction_reason();
+ if (compaction_reason == CompactionReason::kTtl) {
+ ttl_compactions++;
+ } else if (compaction_reason == CompactionReason::kRoundRobinTtl) {
+ round_robin_ttl_compactions++;
+ } else {
+ other_compactions++;
+ }
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ DestroyAndReopen(options);
+
+ // create 10 files on the last level (L6)
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 100; j++) {
+ ASSERT_OK(Put(Key(i * 100 + j), "value" + std::to_string(i * 100 + j)));
+ }
+ ASSERT_OK(Flush());
+ env_->MockSleepForSeconds(60 * 60); // generate 1 file per hour
+ }
+ MoveFilesToLevel(6);
+
+ // create 5 files on L5
+ for (int i = 0; i < 5; i++) {
+ for (int j = 0; j < 200; j++) {
+ ASSERT_OK(Put(Key(i * 200 + j), "value" + std::to_string(i * 200 + j)));
+ }
+ ASSERT_OK(Flush());
+ env_->MockSleepForSeconds(60 * 60); // 1 hour
+ }
+ MoveFilesToLevel(5);
+
+ // The LSM tree should be like:
+ // L5: [0, 199] [200, 399] [400,599] [600,799] [800, 999]
+ // L6: [0,99][100,199][200,299][300,399]....................[800,899][900,999]
+ ASSERT_EQ("0,0,0,0,0,5,10", FilesPerLevel());
+
+ // point the compaction cursor to the 4th file on L5
+ VersionSet* const versions = dbfull()->GetVersionSet();
+ assert(versions);
+ ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+ ASSERT_NE(cfd, nullptr);
+ Version* const current = cfd->current();
+ ASSERT_NE(current, nullptr);
+ VersionStorageInfo* storage_info = current->storage_info();
+ ASSERT_NE(storage_info, nullptr);
+ const InternalKey split_cursor = InternalKey(Key(600), 100000, kTypeValue);
+ storage_info->AddCursorForOneLevel(5, split_cursor);
+
+ // make the first file on L5 expired, there should be 3 TTL compactions:
+ // 4th one, 5th one, then 1st one.
+ env_->MockSleepForSeconds(19 * 60 * 60 + 1);
+ // trigger TTL compaction
+ ASSERT_OK(Put(Key(6), "value" + std::to_string(4)));
+ ASSERT_OK(Put(Key(7), "value" + std::to_string(4)));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_EQ(2, NumTableFilesAtLevel(5));
+
+ ASSERT_EQ(3, round_robin_ttl_compactions);
+ ASSERT_EQ(0, ttl_compactions);
+ ASSERT_EQ(0, other_compactions);
+}
+
TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
const int kNumKeysPerFile = 32;
const int kNumLevelFiles = 2;
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
}
- Flush();
+ ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
MoveFilesToLevel(3);
ASSERT_EQ("0,0,0,2", FilesPerLevel());
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
}
- Flush();
+ ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("2,0,0,2", FilesPerLevel());
MoveFilesToLevel(1);
ASSERT_EQ("0,2,0,2", FilesPerLevel());
// Just do a simple write + flush so that the Ttl expired files get
// compacted.
ASSERT_OK(Put("a", "1"));
- Flush();
+ ASSERT_OK(Flush());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
Compaction* compaction = reinterpret_cast<Compaction*>(arg);
ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// All non-L0 files are deleted, as they contained only deleted data.
ASSERT_EQ("1", FilesPerLevel());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
}
- Flush();
+ ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
MoveFilesToLevel(3);
ASSERT_EQ("0,0,0,2", FilesPerLevel());
for (int j = 0; j < kNumKeysPerFile; ++j) {
ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
}
- Flush();
+ ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("2,0,0,2", FilesPerLevel());
MoveFilesToLevel(1);
ASSERT_EQ("0,2,0,2", FilesPerLevel());
// trigger as ttl is set to 24 hours.
env_->MockSleepForSeconds(12 * 60 * 60);
ASSERT_OK(Put("a", "1"));
- Flush();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("1,2,0,2", FilesPerLevel());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
// Dynamically change ttl to 10 hours.
// This should trigger a ttl compaction, as 12 hours have already passed.
ASSERT_OK(dbfull()->SetOptions({{"ttl", "36000"}}));
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// All non-L0 files are deleted, as they contained only deleted data.
ASSERT_EQ("1", FilesPerLevel());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
for (int i = 1; i <= 100; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
}
- Flush();
+ ASSERT_OK(Flush());
// Get the first file's creation time. This will be the oldest file in the
// DB. Compactions inolving this file's descendents should keep getting
// this time.
for (int i = 101; i <= 200; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
}
- Flush();
+ ASSERT_OK(Flush());
MoveFilesToLevel(6);
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
for (int i = 1; i <= 50; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
}
- Flush();
+ ASSERT_OK(Flush());
env_->MockSleepForSeconds(1 * 60 * 60);
for (int i = 51; i <= 150; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
}
- Flush();
+ ASSERT_OK(Flush());
MoveFilesToLevel(4);
ASSERT_EQ("0,0,0,0,2,0,2", FilesPerLevel());
for (int i = 26; i <= 75; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
}
- Flush();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
MoveFilesToLevel(1);
ASSERT_EQ("0,1,0,0,2,0,2", FilesPerLevel());
if (if_restart) {
Reopen(options);
} else {
- Flush();
+ ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
ASSERT_EQ(5, ttl_compactions);
if (if_restart) {
Reopen(options);
} else {
- Flush();
+ ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("1,0,0,0,0,0,1", FilesPerLevel());
ASSERT_GE(ttl_compactions, 6);
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
}
- Flush();
+ ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("2", FilesPerLevel());
ASSERT_EQ(0, periodic_compactions);
// Add 50 hours and do a write
env_->MockSleepForSeconds(50 * 60 * 60);
ASSERT_OK(Put("a", "1"));
- Flush();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Assert that the files stay in the same level
ASSERT_EQ("3", FilesPerLevel());
// The two old files go through the periodic compaction process
if (if_restart) {
Reopen(options);
} else {
- Flush();
+ ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("1,3", FilesPerLevel());
// The three old files now go through the periodic compaction process. 2
// + 3.
// Add another 50 hours and do another write
env_->MockSleepForSeconds(50 * 60 * 60);
ASSERT_OK(Put("c", "3"));
- Flush();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("2,3", FilesPerLevel());
// The four old files now go through the periodic compaction process. 5
// + 4.
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
}
- Flush();
+ ASSERT_OK(Flush());
// Move the first two files to L2.
if (i == 1) {
MoveFilesToLevel(2);
const int kValueSize = 100;
Options options = CurrentOptions();
- options.ttl = 10 * 60 * 60; // 10 hours
+ options.ttl = 10 * 60 * 60; // 10 hours
options.periodic_compaction_seconds = 48 * 60 * 60; // 2 days
- options.max_open_files = -1; // needed for both periodic and ttl compactions
+ options.max_open_files = -1; // needed for both periodic and ttl compactions
env_->SetMockSleep();
options.env = env_;
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
}
- Flush();
+ ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
MoveFilesToLevel(3);
// Add some time greater than periodic_compaction_time.
env_->MockSleepForSeconds(50 * 60 * 60);
ASSERT_OK(Put("a", "1"));
- Flush();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Files in the bottom level go through periodic compactions.
ASSERT_EQ("1,0,0,2", FilesPerLevel());
ASSERT_EQ(2, periodic_compactions);
// Add a little more time than ttl
env_->MockSleepForSeconds(11 * 60 * 60);
ASSERT_OK(Put("b", "1"));
- Flush();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Notice that the previous file in level 1 falls down to the bottom level
// due to ttl compactions, one level at a time.
// And bottom level files don't get picked up for ttl compactions.
// Add some time greater than periodic_compaction_time.
env_->MockSleepForSeconds(50 * 60 * 60);
ASSERT_OK(Put("c", "1"));
- Flush();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Previous L0 file falls one level at a time to bottom level due to ttl.
// And all 4 bottom files go through periodic compactions.
ASSERT_EQ("1,0,0,4", FilesPerLevel());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
-TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
- class TestCompactionFilter : public CompactionFilter {
- const char* Name() const override { return "TestCompactionFilter"; }
- };
- class TestCompactionFilterFactory : public CompactionFilterFactory {
- const char* Name() const override { return "TestCompactionFilterFactory"; }
- std::unique_ptr<CompactionFilter> CreateCompactionFilter(
- const CompactionFilter::Context& /*context*/) override {
- return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
- }
- };
-
+TEST_F(DBCompactionTest, LevelTtlBooster) {
const int kNumKeysPerFile = 32;
- const int kNumLevelFiles = 2;
- const int kValueSize = 100;
-
- Random rnd(301);
+ const int kNumLevelFiles = 3;
+ const int kValueSize = 1000;
Options options = CurrentOptions();
- TestCompactionFilter test_compaction_filter;
+ options.ttl = 10 * 60 * 60; // 10 hours
+ options.periodic_compaction_seconds = 480 * 60 * 60; // very long
+ options.level0_file_num_compaction_trigger = 2;
+ options.max_bytes_for_level_base = 5 * uint64_t{kNumKeysPerFile * kValueSize};
+ options.max_open_files = -1; // needed for both periodic and ttl compactions
+ options.compaction_pri = CompactionPri::kMinOverlappingRatio;
env_->SetMockSleep();
options.env = env_;
// NOTE: Presumed unnecessary and removed: resetting mock time in env
- enum CompactionFilterType {
- kUseCompactionFilter,
- kUseCompactionFilterFactory
- };
-
- for (CompactionFilterType comp_filter_type :
- {kUseCompactionFilter, kUseCompactionFilterFactory}) {
- // Assert that periodic compactions are not enabled.
- ASSERT_EQ(port::kMaxUint64 - 1, options.periodic_compaction_seconds);
+ DestroyAndReopen(options);
- if (comp_filter_type == kUseCompactionFilter) {
- options.compaction_filter = &test_compaction_filter;
- options.compaction_filter_factory.reset();
- } else if (comp_filter_type == kUseCompactionFilterFactory) {
- options.compaction_filter = nullptr;
- options.compaction_filter_factory.reset(
- new TestCompactionFilterFactory());
+ Random rnd(301);
+ for (int i = 0; i < kNumLevelFiles; ++i) {
+ for (int j = 0; j < kNumKeysPerFile; ++j) {
+ ASSERT_OK(
+ Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
+ }
+ ASSERT_OK(Flush());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ MoveFilesToLevel(2);
+
+ ASSERT_EQ("0,0,3", FilesPerLevel());
+
+ // Create some files for L1
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < kNumKeysPerFile; ++j) {
+ ASSERT_OK(Put(Key(2 * j + i), rnd.RandomString(kValueSize)));
+ }
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+
+ ASSERT_EQ("0,1,3", FilesPerLevel());
+
+ // Make the new L0 files qualify TTL boosting and generate one more to trigger
+ // L1 -> L2 compaction. Old files will be picked even if their priority is
+ // lower without boosting.
+ env_->MockSleepForSeconds(8 * 60 * 60);
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < kNumKeysPerFile; ++j) {
+ ASSERT_OK(Put(Key(kNumKeysPerFile * 2 + 2 * j + i),
+ rnd.RandomString(kValueSize * 2)));
+ }
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ }
+ // Force files to be compacted to L1
+ ASSERT_OK(
+ dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "1"}}));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_EQ("0,1,2", FilesPerLevel());
+ ASSERT_OK(
+ dbfull()->SetOptions({{"level0_file_num_compaction_trigger", "2"}}));
+
+ ASSERT_GT(SizeAtLevel(1), kNumKeysPerFile * 4 * kValueSize);
+}
+
+TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
+ class TestCompactionFilter : public CompactionFilter {
+ const char* Name() const override { return "TestCompactionFilter"; }
+ };
+ class TestCompactionFilterFactory : public CompactionFilterFactory {
+ const char* Name() const override { return "TestCompactionFilterFactory"; }
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& /*context*/) override {
+ return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
+ }
+ };
+
+ const int kNumKeysPerFile = 32;
+ const int kNumLevelFiles = 2;
+ const int kValueSize = 100;
+
+ Random rnd(301);
+
+ Options options = CurrentOptions();
+ TestCompactionFilter test_compaction_filter;
+ env_->SetMockSleep();
+ options.env = env_;
+
+ // NOTE: Presumed unnecessary and removed: resetting mock time in env
+
+ enum CompactionFilterType {
+ kUseCompactionFilter,
+ kUseCompactionFilterFactory
+ };
+
+ for (CompactionFilterType comp_filter_type :
+ {kUseCompactionFilter, kUseCompactionFilterFactory}) {
+ // Assert that periodic compactions are not enabled.
+ ASSERT_EQ(std::numeric_limits<uint64_t>::max() - 1,
+ options.periodic_compaction_seconds);
+
+ if (comp_filter_type == kUseCompactionFilter) {
+ options.compaction_filter = &test_compaction_filter;
+ options.compaction_filter_factory.reset();
+ } else if (comp_filter_type == kUseCompactionFilterFactory) {
+ options.compaction_filter = nullptr;
+ options.compaction_filter_factory.reset(
+ new TestCompactionFilterFactory());
}
DestroyAndReopen(options);
ASSERT_OK(
Put(Key(i * kNumKeysPerFile + j), rnd.RandomString(kValueSize)));
}
- Flush();
+ ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ("2", FilesPerLevel());
ASSERT_EQ(0, periodic_compactions);
// Add 31 days and do a write
env_->MockSleepForSeconds(31 * 24 * 60 * 60);
ASSERT_OK(Put("a", "1"));
- Flush();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
// Assert that the files stay in the same level
ASSERT_EQ("3", FilesPerLevel());
// The two old files go through the periodic compaction process
for (int k = 0; k < 2; ++k) {
ASSERT_OK(Put(Key(k), rnd.RandomString(1024)));
}
- Flush();
+ ASSERT_OK(Flush());
}
auto manual_compaction_thread = port::Thread([this]() {
CompactRangeOptions cro;
cro.allow_write_stall = false;
- db_->CompactRange(cro, nullptr, nullptr);
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
});
manual_compaction_thread.join();
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(1), 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
FlushOptions flush_opts;
flush_opts.wait = false;
flush_opts.allow_write_stall = true;
- dbfull()->Flush(flush_opts);
+ ASSERT_OK(dbfull()->Flush(flush_opts));
}
auto manual_compaction_thread = port::Thread([this]() {
CompactRangeOptions cro;
cro.allow_write_stall = false;
- db_->CompactRange(cro, nullptr, nullptr);
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
});
manual_compaction_thread.join();
- dbfull()->TEST_WaitForFlushMemTable();
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_GT(NumTableFilesAtLevel(1), 0);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
for (int k = 0; k < 2; ++k) {
ASSERT_OK(Put(1, Key(k), rnd.RandomString(1024)));
}
- Flush(1);
+ ASSERT_OK(Flush(1));
}
auto manual_compaction_thread = port::Thread([this, i]() {
CompactRangeOptions cro;
cro.allow_write_stall = false;
- Status s = db_->CompactRange(cro, handles_[1], nullptr, nullptr);
if (i == 0) {
ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
.IsColumnFamilyDropped());
manual_compaction_thread.join();
TEST_SYNC_POINT(
"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual");
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
}
{"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- //used for the delayable flushes
+ // used for the delayable flushes
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
for (int i = 0; i < kNumL0FilesLimit - 1; ++i) {
for (int j = 0; j < 2; ++j) {
ASSERT_OK(Put(Key(j), rnd.RandomString(1024)));
}
- dbfull()->Flush(flush_opts);
+ ASSERT_OK(dbfull()->Flush(flush_opts));
}
auto manual_compaction_thread = port::Thread([this]() {
CompactRangeOptions cro;
cro.allow_write_stall = false;
- db_->CompactRange(cro, nullptr, nullptr);
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
});
TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush");
- Put(ToString(0), rnd.RandomString(1024));
- dbfull()->Flush(flush_opts);
- Put(ToString(0), rnd.RandomString(1024));
- TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
+ ASSERT_OK(Put(std::to_string(0), rnd.RandomString(1024)));
+ ASSERT_OK(dbfull()->Flush(flush_opts));
+ ASSERT_OK(Put(std::to_string(0), rnd.RandomString(1024)));
+ TEST_SYNC_POINT(
+ "DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
manual_compaction_thread.join();
// If CompactRange's flush was skipped, the final Put above will still be
// in the active memtable.
std::string num_keys_in_memtable;
- db_->GetProperty(DB::Properties::kNumEntriesActiveMemTable, &num_keys_in_memtable);
- ASSERT_EQ(ToString(1), num_keys_in_memtable);
+ ASSERT_TRUE(db_->GetProperty(DB::Properties::kNumEntriesActiveMemTable,
+ &num_keys_in_memtable));
+ ASSERT_EQ(std::to_string(1), num_keys_in_memtable);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
} else {
ASSERT_EQ(2, num_memtable_entries);
// flush anyways to prepare for next iteration
- db_->Flush(FlushOptions());
+ ASSERT_OK(db_->Flush(FlushOptions()));
}
}
}
for (int i = 0; i < 32; i++) {
for (int j = 0; j < 5000; j++) {
- Put(std::to_string(j), std::string(1, 'A'));
+ ASSERT_OK(Put(std::to_string(j), std::string(1, 'A')));
}
ASSERT_OK(Flush());
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ColumnFamilyHandleImpl* cfh =
static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
ColumnFamilyData* cfd = cfh->cfd();
VerifyCompactionStats(*cfd, *collector);
}
+TEST_F(DBCompactionTest, SubcompactionEvent) {
+ class SubCompactionEventListener : public EventListener {
+ public:
+ void OnCompactionBegin(DB* /*db*/, const CompactionJobInfo& ci) override {
+ InstrumentedMutexLock l(&mutex_);
+ ASSERT_EQ(running_compactions_.find(ci.job_id),
+ running_compactions_.end());
+ running_compactions_.emplace(ci.job_id, std::unordered_set<int>());
+ }
+
+ void OnCompactionCompleted(DB* /*db*/,
+ const CompactionJobInfo& ci) override {
+ InstrumentedMutexLock l(&mutex_);
+ auto it = running_compactions_.find(ci.job_id);
+ ASSERT_NE(it, running_compactions_.end());
+ ASSERT_EQ(it->second.size(), 0);
+ running_compactions_.erase(it);
+ }
+
+ void OnSubcompactionBegin(const SubcompactionJobInfo& si) override {
+ InstrumentedMutexLock l(&mutex_);
+ auto it = running_compactions_.find(si.job_id);
+ ASSERT_NE(it, running_compactions_.end());
+ auto r = it->second.insert(si.subcompaction_job_id);
+ ASSERT_TRUE(r.second); // each subcompaction_job_id should be different
+ total_subcompaction_cnt_++;
+ }
+
+ void OnSubcompactionCompleted(const SubcompactionJobInfo& si) override {
+ InstrumentedMutexLock l(&mutex_);
+ auto it = running_compactions_.find(si.job_id);
+ ASSERT_NE(it, running_compactions_.end());
+ auto r = it->second.erase(si.subcompaction_job_id);
+ ASSERT_EQ(r, 1);
+ }
+
+ size_t GetRunningCompactionCount() {
+ InstrumentedMutexLock l(&mutex_);
+ return running_compactions_.size();
+ }
+
+ size_t GetTotalSubcompactionCount() {
+ InstrumentedMutexLock l(&mutex_);
+ return total_subcompaction_cnt_;
+ }
+
+ private:
+ InstrumentedMutex mutex_;
+ std::unordered_map<int, std::unordered_set<int>> running_compactions_;
+ size_t total_subcompaction_cnt_ = 0;
+ };
+
+ Options options = CurrentOptions();
+ options.target_file_size_base = 1024;
+ options.level0_file_num_compaction_trigger = 10;
+ auto* listener = new SubCompactionEventListener();
+ options.listeners.emplace_back(listener);
+
+ DestroyAndReopen(options);
+
+ // generate 4 files @ L2
+ for (int i = 0; i < 4; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 10 + j;
+ ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+ MoveFilesToLevel(2);
+
+ // generate 2 files @ L1 which overlaps with L2 files
+ for (int i = 0; i < 2; i++) {
+ for (int j = 0; j < 10; j++) {
+ int key_id = i * 20 + j * 2;
+ ASSERT_OK(Put(Key(key_id), "value" + std::to_string(key_id)));
+ }
+ ASSERT_OK(Flush());
+ }
+ MoveFilesToLevel(1);
+ ASSERT_EQ(FilesPerLevel(), "0,2,4");
+
+ CompactRangeOptions comp_opts;
+ comp_opts.max_subcompactions = 4;
+ Status s = dbfull()->CompactRange(comp_opts, nullptr, nullptr);
+ ASSERT_OK(s);
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ // make sure there's no running compaction
+ ASSERT_EQ(listener->GetRunningCompactionCount(), 0);
+ // and sub compaction is triggered
+ ASSERT_GT(listener->GetTotalSubcompactionCount(), 0);
+}
+
TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
// LSM setup:
// L1: [ba bz]
ASSERT_OK(Delete("b"));
ASSERT_OK(Flush());
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 0);
}
std::shared_ptr<ConcurrentTaskLimiter> unique_limiter(
- NewConcurrentTaskLimiter("unique_limiter", -1));
+ NewConcurrentTaskLimiter("unique_limiter", -1));
- const char* cf_names[] = {"default", "0", "1", "2", "3", "4", "5",
- "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
+ const char* cf_names[] = {"default", "0", "1", "2", "3", "4", "5", "6", "7",
+ "8", "9", "a", "b", "c", "d", "e", "f"};
const unsigned int cf_count = sizeof cf_names / sizeof cf_names[0];
std::unordered_map<std::string, CompactionLimiter*> cf_to_limiter;
options.level0_file_num_compaction_trigger = 4;
options.level0_slowdown_writes_trigger = 64;
options.level0_stop_writes_trigger = 64;
- options.max_background_jobs = kMaxBackgroundThreads; // Enough threads
- options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
- options.max_write_buffer_number = 10; // Enough memtables
+ options.max_background_jobs = kMaxBackgroundThreads; // Enough threads
+ options.memtable_factory.reset(
+ test::NewSpecialSkipListFactory(kNumKeysPerFile));
+ options.max_write_buffer_number = 10; // Enough memtables
DestroyAndReopen(options);
std::vector<Options> option_vector;
CreateColumnFamilies({cf_names[cf]}, option_vector[cf]);
}
- ReopenWithColumnFamilies(std::vector<std::string>(cf_names,
- cf_names + cf_count),
- option_vector);
+ ReopenWithColumnFamilies(
+ std::vector<std::string>(cf_names, cf_names + cf_count), option_vector);
port::Mutex mutex;
}
for (unsigned int cf = 0; cf < cf_count; cf++) {
- dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
}
}
// Enough L0 files to trigger compaction
for (unsigned int cf = 0; cf < cf_count; cf++) {
ASSERT_EQ(NumTableFilesAtLevel(0, cf),
- options.level0_file_num_compaction_trigger);
+ options.level0_file_num_compaction_trigger);
}
// Create more files for one column family, which triggers speed up
}
// put extra key to trigger flush
ASSERT_OK(Put(0, "", ""));
- dbfull()->TEST_WaitForFlushMemTable(handles_[0]);
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[0]));
ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
NumTableFilesAtLevel(0, 0));
}
}
for (unsigned int cf = 0; cf < cf_count; cf++) {
- dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf]));
}
ASSERT_OK(dbfull()->TEST_WaitForCompact());
// flush one more file to cf 1
for (int i = 0; i < kNumKeysPerFile; i++) {
- ASSERT_OK(Put(cf_test, Key(keyIndex++), ""));
+ ASSERT_OK(Put(cf_test, Key(keyIndex++), ""));
}
// put extra key to trigger flush
ASSERT_OK(Put(cf_test, "", ""));
- dbfull()->TEST_WaitForFlushMemTable(handles_[cf_test]);
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[cf_test]));
ASSERT_EQ(1, NumTableFilesAtLevel(0, cf_test));
Compact(cf_test, Key(0), Key(keyIndex));
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.use_direct_io_for_flush_and_compaction = GetParam();
- options.env = new MockEnv(Env::Default());
+ options.env = MockEnv::Create(Env::Default());
Reopen(options);
bool readahead = false;
SyncPoint::GetInstance()->SetCallBack(
});
if (options.use_direct_io_for_flush_and_compaction) {
SyncPoint::GetInstance()->SetCallBack(
- "SanitizeOptions:direct_io", [&](void* /*arg*/) {
- readahead = true;
- });
+ "SanitizeOptions:direct_io", [&](void* /*arg*/) { readahead = true; });
}
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
public testing::WithParamInterface<uint32_t> {
public:
CompactionPriTest()
- : DBTestBase("/compaction_pri_test", /*env_do_fsync=*/true) {
+ : DBTestBase("compaction_pri_test", /*env_do_fsync=*/true) {
compaction_pri_ = GetParam();
}
ASSERT_OK(Put(Key(keys[i]), rnd.RandomString(102)));
}
- dbfull()->TEST_WaitForCompact();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
for (int i = 0; i < kNKeys; i++) {
ASSERT_NE("NOT_FOUND", Get(Key(i)));
}
::testing::Values(CompactionPri::kByCompensatedSize,
CompactionPri::kOldestLargestSeqFirst,
CompactionPri::kOldestSmallestSeqFirst,
- CompactionPri::kMinOverlappingRatio));
-
-class NoopMergeOperator : public MergeOperator {
- public:
- NoopMergeOperator() {}
-
- bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
- MergeOperationOutput* merge_out) const override {
- std::string val("bar");
- merge_out->new_value = val;
- return true;
- }
+ CompactionPri::kMinOverlappingRatio,
+ CompactionPri::kRoundRobin));
- const char* Name() const override { return "Noop"; }
-};
+TEST_F(DBCompactionTest, PersistRoundRobinCompactCursor) {
+ Options options = CurrentOptions();
+ options.write_buffer_size = 16 * 1024;
+ options.max_bytes_for_level_base = 128 * 1024;
+ options.target_file_size_base = 64 * 1024;
+ options.level0_file_num_compaction_trigger = 4;
+ options.compaction_pri = CompactionPri::kRoundRobin;
+ options.max_bytes_for_level_multiplier = 4;
+ options.num_levels = 3;
+ options.compression = kNoCompression;
-TEST_F(DBCompactionTest, PartialManualCompaction) {
- Options opts = CurrentOptions();
- opts.num_levels = 3;
- opts.level0_file_num_compaction_trigger = 10;
- opts.compression = kNoCompression;
- opts.merge_operator.reset(new NoopMergeOperator());
- opts.target_file_size_base = 10240;
- DestroyAndReopen(opts);
+ DestroyAndReopen(options);
Random rnd(301);
- for (auto i = 0; i < 8; ++i) {
- for (auto j = 0; j < 10; ++j) {
- Merge("foo", rnd.RandomString(1024));
+
+ // 30 Files in L0 to trigger compactions between L1 and L2
+ for (int i = 0; i < 30; i++) {
+ for (int j = 0; j < 16; j++) {
+ ASSERT_OK(Put(rnd.RandomString(24), rnd.RandomString(1000)));
}
- Flush();
+ ASSERT_OK(Flush());
}
- MoveFilesToLevel(2);
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
- std::string prop;
- EXPECT_TRUE(dbfull()->GetProperty(DB::Properties::kLiveSstFilesSize, &prop));
- uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2;
- ASSERT_OK(dbfull()->SetOptions(
- {{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
+ VersionSet* const versions = dbfull()->GetVersionSet();
+ assert(versions);
- CompactRangeOptions cro;
- cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
- dbfull()->CompactRange(cro, nullptr, nullptr);
-}
+ ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+ ASSERT_NE(cfd, nullptr);
-TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) {
- // Regression test for bug where manual compaction hangs forever when the DB
- // is in read-only mode. Verify it now at least returns, despite failing.
- const int kNumL0Files = 4;
- std::unique_ptr<FaultInjectionTestEnv> mock_env(
- new FaultInjectionTestEnv(env_));
- Options opts = CurrentOptions();
- opts.disable_auto_compactions = true;
- opts.env = mock_env.get();
- DestroyAndReopen(opts);
+ Version* const current = cfd->current();
+ ASSERT_NE(current, nullptr);
- Random rnd(301);
- for (int i = 0; i < kNumL0Files; ++i) {
- // Make sure files are overlapping in key-range to prevent trivial move.
- Put("key1", rnd.RandomString(1024));
- Put("key2", rnd.RandomString(1024));
- Flush();
- }
- ASSERT_EQ(kNumL0Files, NumTableFilesAtLevel(0));
+ const VersionStorageInfo* const storage_info = current->storage_info();
+ ASSERT_NE(storage_info, nullptr);
- // Enter read-only mode by failing a write.
- mock_env->SetFilesystemActive(false);
- // Make sure this is outside `CompactRange`'s range so that it doesn't fail
- // early trying to flush memtable.
- ASSERT_NOK(Put("key3", rnd.RandomString(1024)));
+ const std::vector<InternalKey> compact_cursors =
+ storage_info->GetCompactCursors();
- // In the bug scenario, the first manual compaction would fail and forget to
- // unregister itself, causing the second one to hang forever due to conflict
- // with a non-running compaction.
- CompactRangeOptions cro;
- cro.exclusive_manual_compaction = false;
- Slice begin_key("key1");
- Slice end_key("key2");
- ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
- ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
+ Reopen(options);
- // Close before mock_env destruct.
- Close();
-}
+ VersionSet* const reopened_versions = dbfull()->GetVersionSet();
+ assert(reopened_versions);
-// ManualCompactionBottomLevelOptimization tests the bottom level manual
-// compaction optimization to skip recompacting files created by Ln-1 to Ln
-// compaction
-TEST_F(DBCompactionTest, ManualCompactionBottomLevelOptimized) {
- Options opts = CurrentOptions();
- opts.num_levels = 3;
- opts.level0_file_num_compaction_trigger = 5;
- opts.compression = kNoCompression;
- opts.merge_operator.reset(new NoopMergeOperator());
- opts.target_file_size_base = 1024;
- opts.max_bytes_for_level_multiplier = 2;
- opts.disable_auto_compactions = true;
- DestroyAndReopen(opts);
- ColumnFamilyHandleImpl* cfh =
- static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
- ColumnFamilyData* cfd = cfh->cfd();
- InternalStats* internal_stats_ptr = cfd->internal_stats();
- ASSERT_NE(internal_stats_ptr, nullptr);
+ ColumnFamilyData* const reopened_cfd =
+ reopened_versions->GetColumnFamilySet()->GetDefault();
+ ASSERT_NE(reopened_cfd, nullptr);
- Random rnd(301);
- for (auto i = 0; i < 8; ++i) {
- for (auto j = 0; j < 10; ++j) {
- ASSERT_OK(
- Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
+ Version* const reopened_current = reopened_cfd->current();
+ ASSERT_NE(reopened_current, nullptr);
+
+ const VersionStorageInfo* const reopened_storage_info =
+ reopened_current->storage_info();
+ ASSERT_NE(reopened_storage_info, nullptr);
+
+ const std::vector<InternalKey> reopened_compact_cursors =
+ reopened_storage_info->GetCompactCursors();
+ const auto icmp = reopened_storage_info->InternalComparator();
+ ASSERT_EQ(compact_cursors.size(), reopened_compact_cursors.size());
+ for (size_t i = 0; i < compact_cursors.size(); i++) {
+ if (compact_cursors[i].Valid()) {
+ ASSERT_EQ(0,
+ icmp->Compare(compact_cursors[i], reopened_compact_cursors[i]));
+ } else {
+ ASSERT_TRUE(!reopened_compact_cursors[i].Valid());
}
- Flush();
}
+}
- MoveFilesToLevel(2);
+TEST_P(RoundRobinSubcompactionsAgainstPressureToken, PressureTokenTest) {
+ const int kKeysPerBuffer = 100;
+ Options options = CurrentOptions();
+ options.num_levels = 4;
+ options.max_bytes_for_level_multiplier = 2;
+ options.level0_file_num_compaction_trigger = 4;
+ options.target_file_size_base = kKeysPerBuffer * 1024;
+ options.compaction_pri = CompactionPri::kRoundRobin;
+ options.max_bytes_for_level_base = 8 * kKeysPerBuffer * 1024;
+ options.disable_auto_compactions = true;
+ // Setup 7 threads but limited subcompactions so that
+ // RoundRobin requires extra compactions from reserved threads
+ options.max_subcompactions = 1;
+ options.max_background_compactions = 7;
+ options.max_compaction_bytes = 100000000;
+ DestroyAndReopen(options);
+ env_->SetBackgroundThreads(7, Env::LOW);
- for (auto i = 0; i < 8; ++i) {
- for (auto j = 0; j < 10; ++j) {
- ASSERT_OK(
- Put("bar" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
+ Random rnd(301);
+ const std::vector<int> files_per_level = {0, 15, 25};
+ for (int lvl = 2; lvl > 0; lvl--) {
+ for (int i = 0; i < files_per_level[lvl]; i++) {
+ for (int j = 0; j < kKeysPerBuffer; j++) {
+ // Add (lvl-1) to ensure nearly equivallent number of files
+ // in L2 are overlapped with fils selected to compact from
+ // L1
+ ASSERT_OK(Put(Key(2 * i * kKeysPerBuffer + 2 * j + (lvl - 1)),
+ rnd.RandomString(1010)));
+ }
+ ASSERT_OK(Flush());
}
- Flush();
+ MoveFilesToLevel(lvl);
+ ASSERT_EQ(files_per_level[lvl], NumTableFilesAtLevel(lvl, 0));
}
- const std::vector<InternalStats::CompactionStats>& comp_stats =
- internal_stats_ptr->TEST_GetCompactionStats();
- int num = comp_stats[2].num_input_files_in_output_level;
- ASSERT_EQ(num, 0);
+ // 15 files in L1; 25 files in L2
- CompactRangeOptions cro;
- cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
- dbfull()->CompactRange(cro, nullptr, nullptr);
+ // This is a variable for making sure the following callback is called
+ // and the assertions in it are indeed excuted.
+ bool num_planned_subcompactions_verified = false;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "CompactionJob::GenSubcompactionBoundaries:0", [&](void* arg) {
+ uint64_t num_planned_subcompactions = *(static_cast<uint64_t*>(arg));
+ if (grab_pressure_token_) {
+ // 7 files are selected for round-robin under auto
+ // compaction. The number of planned subcompaction is restricted by
+ // the limited number of max_background_compactions
+ ASSERT_EQ(num_planned_subcompactions, 7);
+ } else {
+ ASSERT_EQ(num_planned_subcompactions, 1);
+ }
+ num_planned_subcompactions_verified = true;
+ });
- const std::vector<InternalStats::CompactionStats>& comp_stats2 =
- internal_stats_ptr->TEST_GetCompactionStats();
- num = comp_stats2[2].num_input_files_in_output_level;
- ASSERT_EQ(num, 0);
-}
+ // The following 3 dependencies have to be added to ensure the auto
+ // compaction and the pressure token is correctly enabled. Same for
+ // RoundRobinSubcompactionsUsingResources and
+ // DBCompactionTest.RoundRobinSubcompactionsShrinkResources
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"RoundRobinSubcompactionsAgainstPressureToken:0",
+ "BackgroundCallCompaction:0"},
+ {"CompactionJob::AcquireSubcompactionResources:0",
+ "RoundRobinSubcompactionsAgainstPressureToken:1"},
+ {"RoundRobinSubcompactionsAgainstPressureToken:2",
+ "CompactionJob::AcquireSubcompactionResources:1"}});
+ SyncPoint::GetInstance()->EnableProcessing();
-TEST_F(DBCompactionTest, CompactionDuringShutdown) {
- Options opts = CurrentOptions();
- opts.level0_file_num_compaction_trigger = 2;
- opts.disable_auto_compactions = true;
- DestroyAndReopen(opts);
- ColumnFamilyHandleImpl* cfh =
- static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
- ColumnFamilyData* cfd = cfh->cfd();
- InternalStats* internal_stats_ptr = cfd->internal_stats();
- ASSERT_NE(internal_stats_ptr, nullptr);
-
- Random rnd(301);
- for (auto i = 0; i < 2; ++i) {
- for (auto j = 0; j < 10; ++j) {
- ASSERT_OK(
- Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
- }
- Flush();
+ ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()}));
+ TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:0");
+ TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:1");
+ std::unique_ptr<WriteControllerToken> pressure_token;
+ if (grab_pressure_token_) {
+ pressure_token =
+ dbfull()->TEST_write_controler().GetCompactionPressureToken();
}
+ TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstPressureToken:2");
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun",
- [&](void* /*arg*/) { dbfull()->shutting_down_.store(true); });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
- ASSERT_OK(dbfull()->error_handler_.GetBGError());
+ ASSERT_OK(dbfull()->WaitForCompact());
+ ASSERT_TRUE(num_planned_subcompactions_verified);
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
}
-// FixFileIngestionCompactionDeadlock tests and verifies that compaction and
-// file ingestion do not cause deadlock in the event of write stall triggered
-// by number of L0 files reaching level0_stop_writes_trigger.
-TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
- const int kNumKeysPerFile = 100;
- // Generate SST files.
- Options options = CurrentOptions();
-
- // Generate an external SST file containing a single key, i.e. 99
- std::string sst_files_dir = dbname_ + "/sst_files/";
- DestroyDir(env_, sst_files_dir);
- ASSERT_OK(env_->CreateDir(sst_files_dir));
- SstFileWriter sst_writer(EnvOptions(), options);
- const std::string sst_file_path = sst_files_dir + "test.sst";
- ASSERT_OK(sst_writer.Open(sst_file_path));
- ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value"));
- ASSERT_OK(sst_writer.Finish());
+INSTANTIATE_TEST_CASE_P(RoundRobinSubcompactionsAgainstPressureToken,
+ RoundRobinSubcompactionsAgainstPressureToken,
+ testing::Bool());
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
- SyncPoint::GetInstance()->LoadDependency({
- {"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
- "BackgroundCallCompaction:0"},
- });
- SyncPoint::GetInstance()->EnableProcessing();
+TEST_P(RoundRobinSubcompactionsAgainstResources, SubcompactionsUsingResources) {
+ const int kKeysPerBuffer = 200;
+ Options options = CurrentOptions();
+ options.num_levels = 4;
+ options.level0_file_num_compaction_trigger = 3;
+ options.target_file_size_base = kKeysPerBuffer * 1024;
+ options.compaction_pri = CompactionPri::kRoundRobin;
+ options.max_bytes_for_level_base = 30 * kKeysPerBuffer * 1024;
+ options.disable_auto_compactions = true;
+ options.max_subcompactions = 1;
+ options.max_background_compactions = max_compaction_limits_;
+ // Set a large number for max_compaction_bytes so that one round-robin
+ // compaction is enough to make post-compaction L1 size less than
+ // the maximum size (this test assumes only one round-robin compaction
+ // is triggered by kLevelMaxLevelSize)
+ options.max_compaction_bytes = 100000000;
- options.write_buffer_size = 110 << 10; // 110KB
- options.level0_file_num_compaction_trigger =
- options.level0_stop_writes_trigger;
- options.max_subcompactions = max_subcompactions_;
- options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
DestroyAndReopen(options);
- Random rnd(301);
+ env_->SetBackgroundThreads(total_low_pri_threads_, Env::LOW);
- // Generate level0_stop_writes_trigger L0 files to trigger write stop
- for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
- for (int j = 0; j != kNumKeysPerFile; ++j) {
- ASSERT_OK(Put(Key(j), rnd.RandomString(990)));
- }
- if (0 == i) {
- // When we reach here, the memtables have kNumKeysPerFile keys. Note that
- // flush is not yet triggered. We need to write an extra key so that the
- // write path will call PreprocessWrite and flush the previous key-value
- // pairs to e flushed. After that, there will be the newest key in the
- // memtable, and a bunch of L0 files. Since there is already one key in
- // the memtable, then for i = 1, 2, ..., we do not have to write this
- // extra key to trigger flush.
- ASSERT_OK(Put("", ""));
+ Random rnd(301);
+ const std::vector<int> files_per_level = {0, 40, 100};
+ for (int lvl = 2; lvl > 0; lvl--) {
+ for (int i = 0; i < files_per_level[lvl]; i++) {
+ for (int j = 0; j < kKeysPerBuffer; j++) {
+ // Add (lvl-1) to ensure nearly equivallent number of files
+ // in L2 are overlapped with fils selected to compact from
+ // L1
+ ASSERT_OK(Put(Key(2 * i * kKeysPerBuffer + 2 * j + (lvl - 1)),
+ rnd.RandomString(1010)));
+ }
+ ASSERT_OK(Flush());
}
- dbfull()->TEST_WaitForFlushMemTable();
- ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i + 1);
+ MoveFilesToLevel(lvl);
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_EQ(files_per_level[lvl], NumTableFilesAtLevel(lvl, 0));
}
- // When we reach this point, there will be level0_stop_writes_trigger L0
- // files and one extra key (99) in memory, which overlaps with the external
- // SST file. Write stall triggers, and can be cleared only after compaction
- // reduces the number of L0 files.
-
- // Compaction will also be triggered since we have reached the threshold for
- // auto compaction. Note that compaction may begin after the following file
- // ingestion thread and waits for ingestion to finish.
-
- // Thread to ingest file with overlapping key range with the current
- // memtable. Consequently ingestion will trigger a flush. The flush MUST
- // proceed without waiting for the write stall condition to clear, otherwise
- // deadlock can happen.
- port::Thread ingestion_thr([&]() {
- IngestExternalFileOptions ifo;
- Status s = db_->IngestExternalFile({sst_file_path}, ifo);
- ASSERT_OK(s);
- });
-
- // More write to trigger write stop
- ingestion_thr.join();
- ASSERT_OK(dbfull()->TEST_WaitForCompact());
- Close();
-}
-
-TEST_F(DBCompactionTest, ConsistencyFailTest) {
- Options options = CurrentOptions();
- options.force_consistency_checks = true;
- DestroyAndReopen(options);
+ // 40 files in L1; 100 files in L2
+ // This is a variable for making sure the following callback is called
+ // and the assertions in it are indeed excuted.
+ bool num_planned_subcompactions_verified = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "VersionBuilder::CheckConsistency0", [&](void* arg) {
- auto p =
- reinterpret_cast<std::pair<FileMetaData**, FileMetaData**>*>(arg);
- // just swap the two FileMetaData so that we hit error
- // in CheckConsistency funcion
- FileMetaData* temp = *(p->first);
- *(p->first) = *(p->second);
- *(p->second) = temp;
+ "CompactionJob::GenSubcompactionBoundaries:0", [&](void* arg) {
+ uint64_t num_planned_subcompactions = *(static_cast<uint64_t*>(arg));
+ // More than 10 files are selected for round-robin under auto
+ // compaction. The number of planned subcompaction is restricted by
+ // the minimum number between available threads and compaction limits
+ ASSERT_EQ(num_planned_subcompactions - options.max_subcompactions,
+ std::min(total_low_pri_threads_, max_compaction_limits_) - 1);
+ num_planned_subcompactions_verified = true;
});
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"RoundRobinSubcompactionsAgainstResources:0",
+ "BackgroundCallCompaction:0"},
+ {"CompactionJob::AcquireSubcompactionResources:0",
+ "RoundRobinSubcompactionsAgainstResources:1"},
+ {"RoundRobinSubcompactionsAgainstResources:2",
+ "CompactionJob::AcquireSubcompactionResources:1"},
+ {"CompactionJob::ReleaseSubcompactionResources:0",
+ "RoundRobinSubcompactionsAgainstResources:3"},
+ {"RoundRobinSubcompactionsAgainstResources:4",
+ "CompactionJob::ReleaseSubcompactionResources:1"}});
+ SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
-
- for (int k = 0; k < 2; ++k) {
- ASSERT_OK(Put("foo", "bar"));
- Flush();
- }
+ ASSERT_OK(dbfull()->WaitForCompact());
+ ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()}));
+ TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:0");
+ TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:1");
+ auto pressure_token =
+ dbfull()->TEST_write_controler().GetCompactionPressureToken();
- ASSERT_NOK(Put("foo", "bar"));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:2");
+ TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:3");
+ // We can reserve more threads now except one is being used
+ ASSERT_EQ(total_low_pri_threads_ - 1,
+ env_->ReserveThreads(total_low_pri_threads_, Env::Priority::LOW));
+ ASSERT_EQ(
+ total_low_pri_threads_ - 1,
+ env_->ReleaseThreads(total_low_pri_threads_ - 1, Env::Priority::LOW));
+ TEST_SYNC_POINT("RoundRobinSubcompactionsAgainstResources:4");
+ ASSERT_OK(dbfull()->WaitForCompact());
+ ASSERT_TRUE(num_planned_subcompactions_verified);
+ SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
-TEST_F(DBCompactionTest, ConsistencyFailTest2) {
+INSTANTIATE_TEST_CASE_P(RoundRobinSubcompactionsAgainstResources,
+ RoundRobinSubcompactionsAgainstResources,
+ ::testing::Values(std::make_tuple(1, 5),
+ std::make_tuple(5, 1),
+ std::make_tuple(10, 5),
+ std::make_tuple(5, 10),
+ std::make_tuple(10, 10)));
+
+TEST_P(DBCompactionTestWithParam, RoundRobinWithoutAdditionalResources) {
+ const int kKeysPerBuffer = 200;
Options options = CurrentOptions();
- options.force_consistency_checks = true;
- options.target_file_size_base = 1000;
- options.level0_file_num_compaction_trigger = 2;
- BlockBasedTableOptions bbto;
- bbto.block_size = 400; // small block size
- options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ options.num_levels = 4;
+ options.level0_file_num_compaction_trigger = 3;
+ options.target_file_size_base = kKeysPerBuffer * 1024;
+ options.compaction_pri = CompactionPri::kRoundRobin;
+ options.max_bytes_for_level_base = 30 * kKeysPerBuffer * 1024;
+ options.disable_auto_compactions = true;
+ options.max_subcompactions = max_subcompactions_;
+ options.max_background_compactions = 1;
+ options.max_compaction_bytes = 100000000;
+ // Similar experiment setting as above except the max_subcompactions
+ // is given by max_subcompactions_ (1 or 4), and we fix the
+ // additional resources as (1, 1) and thus no more extra resources
+ // can be used
DestroyAndReopen(options);
+ env_->SetBackgroundThreads(1, Env::LOW);
+
+ Random rnd(301);
+ const std::vector<int> files_per_level = {0, 33, 100};
+ for (int lvl = 2; lvl > 0; lvl--) {
+ for (int i = 0; i < files_per_level[lvl]; i++) {
+ for (int j = 0; j < kKeysPerBuffer; j++) {
+ // Add (lvl-1) to ensure nearly equivallent number of files
+ // in L2 are overlapped with fils selected to compact from
+ // L1
+ ASSERT_OK(Put(Key(2 * i * kKeysPerBuffer + 2 * j + (lvl - 1)),
+ rnd.RandomString(1010)));
+ }
+ ASSERT_OK(Flush());
+ }
+ MoveFilesToLevel(lvl);
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_EQ(files_per_level[lvl], NumTableFilesAtLevel(lvl, 0));
+ }
+ // 33 files in L1; 100 files in L2
+ // This is a variable for making sure the following callback is called
+ // and the assertions in it are indeed excuted.
+ bool num_planned_subcompactions_verified = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "VersionBuilder::CheckConsistency1", [&](void* arg) {
- auto p =
- reinterpret_cast<std::pair<FileMetaData**, FileMetaData**>*>(arg);
- // just swap the two FileMetaData so that we hit error
- // in CheckConsistency funcion
- FileMetaData* temp = *(p->first);
- *(p->first) = *(p->second);
- *(p->second) = temp;
+ "CompactionJob::GenSubcompactionBoundaries:0", [&](void* arg) {
+ uint64_t num_planned_subcompactions = *(static_cast<uint64_t*>(arg));
+ // At most 4 files are selected for round-robin under auto
+ // compaction. The number of planned subcompaction is restricted by
+ // the max_subcompactions since no extra resources can be used
+ ASSERT_EQ(num_planned_subcompactions, options.max_subcompactions);
+ num_planned_subcompactions_verified = true;
});
+ // No need to setup dependency for pressure token since
+ // AcquireSubcompactionResources may not be called and it anyway cannot
+ // reserve any additional resources
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"DBCompactionTest::RoundRobinWithoutAdditionalResources:0",
+ "BackgroundCallCompaction:0"}});
+ SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
-
- Random rnd(301);
- std::string value = rnd.RandomString(1000);
-
- ASSERT_OK(Put("foo1", value));
- ASSERT_OK(Put("z", ""));
- Flush();
- ASSERT_OK(Put("foo2", value));
- ASSERT_OK(Put("z", ""));
- Flush();
+ ASSERT_OK(dbfull()->WaitForCompact());
+ ASSERT_OK(dbfull()->EnableAutoCompaction({dbfull()->DefaultColumnFamily()}));
+ TEST_SYNC_POINT("DBCompactionTest::RoundRobinWithoutAdditionalResources:0");
- // This probably returns non-OK, but we rely on the next Put()
- // to determine the DB is frozen.
- dbfull()->TEST_WaitForCompact();
- ASSERT_NOK(Put("foo", "bar"));
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ASSERT_OK(dbfull()->WaitForCompact());
+ ASSERT_TRUE(num_planned_subcompactions_verified);
+ SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
-void IngestOneKeyValue(DBImpl* db, const std::string& key,
- const std::string& value, const Options& options) {
- ExternalSstFileInfo info;
- std::string f = test::PerThreadDBPath("sst_file" + key);
- EnvOptions env;
- ROCKSDB_NAMESPACE::SstFileWriter writer(env, options);
- auto s = writer.Open(f);
- ASSERT_OK(s);
- // ASSERT_OK(writer.Put(Key(), ""));
- ASSERT_OK(writer.Put(key, value));
-
- ASSERT_OK(writer.Finish(&info));
- IngestExternalFileOptions ingest_opt;
-
- ASSERT_OK(db->IngestExternalFile({info.file_path}, ingest_opt));
-}
-
-TEST_P(DBCompactionTestWithParam,
- FlushAfterIntraL0CompactionCheckConsistencyFail) {
+TEST_F(DBCompactionTest, RoundRobinCutOutputAtCompactCursor) {
Options options = CurrentOptions();
- options.force_consistency_checks = true;
+ options.num_levels = 3;
options.compression = kNoCompression;
- options.level0_file_num_compaction_trigger = 5;
- options.max_background_compactions = 2;
- options.max_subcompactions = max_subcompactions_;
- DestroyAndReopen(options);
+ options.write_buffer_size = 4 * 1024;
+ options.max_bytes_for_level_base = 64 * 1024;
+ options.max_bytes_for_level_multiplier = 4;
+ options.level0_file_num_compaction_trigger = 4;
+ options.compaction_pri = CompactionPri::kRoundRobin;
- const size_t kValueSize = 1 << 20;
- Random rnd(301);
- std::atomic<int> pick_intra_l0_count(0);
- std::string value(rnd.RandomString(kValueSize));
+ DestroyAndReopen(options);
- // The L0->L1 must be picked before we begin ingesting files to trigger
- // intra-L0 compaction, and must not finish until after an intra-L0
- // compaction has been picked.
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"LevelCompactionPicker::PickCompaction:Return",
- "DBCompactionTestWithParam::"
- "FlushAfterIntraL0CompactionCheckConsistencyFail:L0ToL1Ready"},
- {"LevelCompactionPicker::PickCompactionBySize:0",
- "CompactionJob::Run():Start"}});
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "FindIntraL0Compaction",
- [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
+ VersionSet* const versions = dbfull()->GetVersionSet();
+ assert(versions);
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+ ASSERT_NE(cfd, nullptr);
- // prevents trivial move
- for (int i = 0; i < 10; ++i) {
- ASSERT_OK(Put(Key(i), "")); // prevents trivial move
- }
- ASSERT_OK(Flush());
- Compact("", Key(99));
- ASSERT_EQ(0, NumTableFilesAtLevel(0));
+ Version* const current = cfd->current();
+ ASSERT_NE(current, nullptr);
- // Flush 5 L0 sst.
- for (int i = 0; i < 5; ++i) {
- ASSERT_OK(Put(Key(i + 1), value));
- ASSERT_OK(Flush());
- }
- ASSERT_EQ(5, NumTableFilesAtLevel(0));
+ VersionStorageInfo* storage_info = current->storage_info();
+ ASSERT_NE(storage_info, nullptr);
- // Put one key, to make smallest log sequence number in this memtable is less
- // than sst which would be ingested in next step.
- ASSERT_OK(Put(Key(0), "a"));
+ const InternalKey split_cursor = InternalKey(Key(600), 100, kTypeValue);
+ storage_info->AddCursorForOneLevel(2, split_cursor);
- ASSERT_EQ(5, NumTableFilesAtLevel(0));
- TEST_SYNC_POINT(
- "DBCompactionTestWithParam::"
- "FlushAfterIntraL0CompactionCheckConsistencyFail:L0ToL1Ready");
+ Random rnd(301);
- // Ingest 5 L0 sst. And this files would trigger PickIntraL0Compaction.
- for (int i = 5; i < 10; i++) {
- ASSERT_EQ(i, NumTableFilesAtLevel(0));
- IngestOneKeyValue(dbfull(), Key(i), value, options);
+ for (int i = 0; i < 50; i++) {
+ for (int j = 0; j < 50; j++) {
+ ASSERT_OK(Put(Key(j * 2 + i * 100), rnd.RandomString(102)));
+ }
+ }
+ // Add more overlapping files (avoid trivial move) to trigger compaction that
+ // output files in L2. Note that trivial move does not trigger compaction and
+ // in that case the cursor is not necessarily the boundary of file.
+ for (int i = 0; i < 50; i++) {
+ for (int j = 0; j < 50; j++) {
+ ASSERT_OK(Put(Key(j * 2 + 1 + i * 100), rnd.RandomString(1014)));
+ }
}
- // Put one key, to make biggest log sequence number in this memtable is bigger
- // than sst which would be ingested in next step.
- ASSERT_OK(Put(Key(2), "b"));
- dbfull()->TEST_WaitForCompact();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
std::vector<std::vector<FileMetaData>> level_to_files;
dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
&level_to_files);
- ASSERT_GT(level_to_files[0].size(), 0);
- ASSERT_GT(pick_intra_l0_count.load(), 0);
-
- ASSERT_OK(Flush());
+ const auto icmp = cfd->current()->storage_info()->InternalComparator();
+ // Files in level 2 should be split by the cursor
+ for (const auto& file : level_to_files[2]) {
+ ASSERT_TRUE(
+ icmp->Compare(file.smallest.Encode(), split_cursor.Encode()) >= 0 ||
+ icmp->Compare(file.largest.Encode(), split_cursor.Encode()) < 0);
+ }
}
-TEST_P(DBCompactionTestWithParam,
- IntraL0CompactionAfterFlushCheckConsistencyFail) {
- Options options = CurrentOptions();
- options.force_consistency_checks = true;
- options.compression = kNoCompression;
- options.level0_file_num_compaction_trigger = 5;
- options.max_background_compactions = 2;
- options.max_subcompactions = max_subcompactions_;
- options.write_buffer_size = 2 << 20;
- options.max_write_buffer_number = 6;
- DestroyAndReopen(options);
+class NoopMergeOperator : public MergeOperator {
+ public:
+ NoopMergeOperator() {}
+
+ bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
+ MergeOperationOutput* merge_out) const override {
+ std::string val("bar");
+ merge_out->new_value = val;
+ return true;
+ }
+
+ const char* Name() const override { return "Noop"; }
+};
+
+TEST_F(DBCompactionTest, PartialManualCompaction) {
+ Options opts = CurrentOptions();
+ opts.num_levels = 3;
+ opts.level0_file_num_compaction_trigger = 10;
+ opts.compression = kNoCompression;
+ opts.merge_operator.reset(new NoopMergeOperator());
+ opts.target_file_size_base = 10240;
+ DestroyAndReopen(opts);
+
+ Random rnd(301);
+ for (auto i = 0; i < 8; ++i) {
+ for (auto j = 0; j < 10; ++j) {
+ ASSERT_OK(Merge("foo", rnd.RandomString(1024)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ MoveFilesToLevel(2);
+
+ std::string prop;
+ EXPECT_TRUE(dbfull()->GetProperty(DB::Properties::kLiveSstFilesSize, &prop));
+ uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2;
+ ASSERT_OK(dbfull()->SetOptions(
+ {{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
+
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+}
+
+TEST_F(DBCompactionTest, ManualCompactionFailsInReadOnlyMode) {
+ // Regression test for bug where manual compaction hangs forever when the DB
+ // is in read-only mode. Verify it now at least returns, despite failing.
+ const int kNumL0Files = 4;
+ std::unique_ptr<FaultInjectionTestEnv> mock_env(
+ new FaultInjectionTestEnv(env_));
+ Options opts = CurrentOptions();
+ opts.disable_auto_compactions = true;
+ opts.env = mock_env.get();
+ DestroyAndReopen(opts);
+
+ Random rnd(301);
+ for (int i = 0; i < kNumL0Files; ++i) {
+ // Make sure files are overlapping in key-range to prevent trivial move.
+ ASSERT_OK(Put("key1", rnd.RandomString(1024)));
+ ASSERT_OK(Put("key2", rnd.RandomString(1024)));
+ ASSERT_OK(Flush());
+ }
+ ASSERT_EQ(kNumL0Files, NumTableFilesAtLevel(0));
+
+ // Enter read-only mode by failing a write.
+ mock_env->SetFilesystemActive(false);
+ // Make sure this is outside `CompactRange`'s range so that it doesn't fail
+ // early trying to flush memtable.
+ ASSERT_NOK(Put("key3", rnd.RandomString(1024)));
+
+ // In the bug scenario, the first manual compaction would fail and forget to
+ // unregister itself, causing the second one to hang forever due to conflict
+ // with a non-running compaction.
+ CompactRangeOptions cro;
+ cro.exclusive_manual_compaction = false;
+ Slice begin_key("key1");
+ Slice end_key("key2");
+ ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
+ ASSERT_NOK(dbfull()->CompactRange(cro, &begin_key, &end_key));
+
+ // Close before mock_env destruct.
+ Close();
+}
+
+// ManualCompactionBottomLevelOptimization tests the bottom level manual
+// compaction optimization to skip recompacting files created by Ln-1 to Ln
+// compaction
+TEST_F(DBCompactionTest, ManualCompactionBottomLevelOptimized) {
+ Options opts = CurrentOptions();
+ opts.num_levels = 3;
+ opts.level0_file_num_compaction_trigger = 5;
+ opts.compression = kNoCompression;
+ opts.merge_operator.reset(new NoopMergeOperator());
+ opts.target_file_size_base = 1024;
+ opts.max_bytes_for_level_multiplier = 2;
+ opts.disable_auto_compactions = true;
+ DestroyAndReopen(opts);
+ ColumnFamilyHandleImpl* cfh =
+ static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
+ ColumnFamilyData* cfd = cfh->cfd();
+ InternalStats* internal_stats_ptr = cfd->internal_stats();
+ ASSERT_NE(internal_stats_ptr, nullptr);
+
+ Random rnd(301);
+ for (auto i = 0; i < 8; ++i) {
+ for (auto j = 0; j < 10; ++j) {
+ ASSERT_OK(
+ Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ MoveFilesToLevel(2);
+
+ for (auto i = 0; i < 8; ++i) {
+ for (auto j = 0; j < 10; ++j) {
+ ASSERT_OK(
+ Put("bar" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
+ }
+ ASSERT_OK(Flush());
+ }
+ const std::vector<InternalStats::CompactionStats>& comp_stats =
+ internal_stats_ptr->TEST_GetCompactionStats();
+ int num = comp_stats[2].num_input_files_in_output_level;
+ ASSERT_EQ(num, 0);
+
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForceOptimized;
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+
+ const std::vector<InternalStats::CompactionStats>& comp_stats2 =
+ internal_stats_ptr->TEST_GetCompactionStats();
+ num = comp_stats2[2].num_input_files_in_output_level;
+ ASSERT_EQ(num, 0);
+}
+
+TEST_F(DBCompactionTest, ManualCompactionMax) {
+ uint64_t l1_avg_size = 0, l2_avg_size = 0;
+ auto generate_sst_func = [&]() {
+ Random rnd(301);
+ for (auto i = 0; i < 100; i++) {
+ for (auto j = 0; j < 10; j++) {
+ ASSERT_OK(Put(Key(i * 10 + j), rnd.RandomString(1024)));
+ }
+ ASSERT_OK(Flush());
+ }
+ MoveFilesToLevel(2);
+
+ for (auto i = 0; i < 10; i++) {
+ for (auto j = 0; j < 10; j++) {
+ ASSERT_OK(Put(Key(i * 100 + j * 10), rnd.RandomString(1024)));
+ }
+ ASSERT_OK(Flush());
+ }
+ MoveFilesToLevel(1);
+
+ std::vector<std::vector<FileMetaData>> level_to_files;
+ dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
+ &level_to_files);
+
+ uint64_t total = 0;
+ for (const auto& file : level_to_files[1]) {
+ total += file.compensated_file_size;
+ }
+ l1_avg_size = total / level_to_files[1].size();
+
+ total = 0;
+ for (const auto& file : level_to_files[2]) {
+ total += file.compensated_file_size;
+ }
+ l2_avg_size = total / level_to_files[2].size();
+ };
+
+ std::atomic_int num_compactions(0);
+ SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::BGWorkCompaction", [&](void* /*arg*/) { ++num_compactions; });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Options opts = CurrentOptions();
+ opts.disable_auto_compactions = true;
+
+ // with default setting (1.6G by default), it should cover all files in 1
+ // compaction
+ DestroyAndReopen(opts);
+ generate_sst_func();
+ num_compactions.store(0);
+ CompactRangeOptions cro;
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ ASSERT_TRUE(num_compactions.load() == 1);
+
+ // split the compaction to 5
+ int num_split = 5;
+ DestroyAndReopen(opts);
+ generate_sst_func();
+ uint64_t total_size = (l1_avg_size * 10) + (l2_avg_size * 100);
+ opts.max_compaction_bytes = total_size / num_split;
+ opts.target_file_size_base = total_size / num_split;
+ Reopen(opts);
+ num_compactions.store(0);
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ ASSERT_TRUE(num_compactions.load() == num_split);
+
+ // very small max_compaction_bytes, it should still move forward
+ opts.max_compaction_bytes = l1_avg_size / 2;
+ opts.target_file_size_base = l1_avg_size / 2;
+ DestroyAndReopen(opts);
+ generate_sst_func();
+ num_compactions.store(0);
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ ASSERT_TRUE(num_compactions.load() > 10);
+
+ // dynamically set the option
+ num_split = 2;
+ opts.max_compaction_bytes = 0;
+ DestroyAndReopen(opts);
+ generate_sst_func();
+ total_size = (l1_avg_size * 10) + (l2_avg_size * 100);
+ Status s = db_->SetOptions(
+ {{"max_compaction_bytes", std::to_string(total_size / num_split)},
+ {"target_file_size_base", std::to_string(total_size / num_split)}});
+ ASSERT_OK(s);
+
+ num_compactions.store(0);
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ ASSERT_TRUE(num_compactions.load() == num_split);
+}
+
+TEST_F(DBCompactionTest, CompactionDuringShutdown) {
+ Options opts = CurrentOptions();
+ opts.level0_file_num_compaction_trigger = 2;
+ opts.disable_auto_compactions = true;
+ DestroyAndReopen(opts);
+ ColumnFamilyHandleImpl* cfh =
+ static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
+ ColumnFamilyData* cfd = cfh->cfd();
+ InternalStats* internal_stats_ptr = cfd->internal_stats();
+ ASSERT_NE(internal_stats_ptr, nullptr);
+
+ Random rnd(301);
+ for (auto i = 0; i < 2; ++i) {
+ for (auto j = 0; j < 10; ++j) {
+ ASSERT_OK(
+ Put("foo" + std::to_string(i * 10 + j), rnd.RandomString(1024)));
+ }
+ ASSERT_OK(Flush());
+ }
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "DBImpl::BackgroundCompaction:NonTrivial:BeforeRun",
+ [&](void* /*arg*/) { dbfull()->shutting_down_.store(true); });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ Status s = dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ ASSERT_TRUE(s.ok() || s.IsShutdownInProgress());
+ ASSERT_OK(dbfull()->error_handler_.GetBGError());
+}
+
+// FixFileIngestionCompactionDeadlock tests and verifies that compaction and
+// file ingestion do not cause deadlock in the event of write stall triggered
+// by number of L0 files reaching level0_stop_writes_trigger.
+TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) {
+ const int kNumKeysPerFile = 100;
+ // Generate SST files.
+ Options options = CurrentOptions();
+
+ // Generate an external SST file containing a single key, i.e. 99
+ std::string sst_files_dir = dbname_ + "/sst_files/";
+ ASSERT_OK(DestroyDir(env_, sst_files_dir));
+ ASSERT_OK(env_->CreateDir(sst_files_dir));
+ SstFileWriter sst_writer(EnvOptions(), options);
+ const std::string sst_file_path = sst_files_dir + "test.sst";
+ ASSERT_OK(sst_writer.Open(sst_file_path));
+ ASSERT_OK(sst_writer.Put(Key(kNumKeysPerFile - 1), "value"));
+ ASSERT_OK(sst_writer.Finish());
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ SyncPoint::GetInstance()->LoadDependency({
+ {"DBImpl::IngestExternalFile:AfterIncIngestFileCounter",
+ "BackgroundCallCompaction:0"},
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ options.write_buffer_size = 110 << 10; // 110KB
+ options.level0_file_num_compaction_trigger =
+ options.level0_stop_writes_trigger;
+ options.max_subcompactions = max_subcompactions_;
+ options.memtable_factory.reset(
+ test::NewSpecialSkipListFactory(kNumKeysPerFile));
+ DestroyAndReopen(options);
+ Random rnd(301);
+
+ // Generate level0_stop_writes_trigger L0 files to trigger write stop
+ for (int i = 0; i != options.level0_file_num_compaction_trigger; ++i) {
+ for (int j = 0; j != kNumKeysPerFile; ++j) {
+ ASSERT_OK(Put(Key(j), rnd.RandomString(990)));
+ }
+ if (i > 0) {
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ ASSERT_EQ(NumTableFilesAtLevel(0 /*level*/, 0 /*cf*/), i);
+ }
+ }
+ // When we reach this point, there will be level0_stop_writes_trigger L0
+ // files and one extra key (99) in memory, which overlaps with the external
+ // SST file. Write stall triggers, and can be cleared only after compaction
+ // reduces the number of L0 files.
+
+ // Compaction will also be triggered since we have reached the threshold for
+ // auto compaction. Note that compaction may begin after the following file
+ // ingestion thread and waits for ingestion to finish.
+
+ // Thread to ingest file with overlapping key range with the current
+ // memtable. Consequently ingestion will trigger a flush. The flush MUST
+ // proceed without waiting for the write stall condition to clear, otherwise
+ // deadlock can happen.
+ port::Thread ingestion_thr([&]() {
+ IngestExternalFileOptions ifo;
+ Status s = db_->IngestExternalFile({sst_file_path}, ifo);
+ ASSERT_OK(s);
+ });
+
+ // More write to trigger write stop
+ ingestion_thr.join();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ Close();
+}
+
+TEST_F(DBCompactionTest, ConsistencyFailTest) {
+ Options options = CurrentOptions();
+ options.force_consistency_checks = true;
+ DestroyAndReopen(options);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "VersionBuilder::CheckConsistency0", [&](void* arg) {
+ auto p =
+ reinterpret_cast<std::pair<FileMetaData**, FileMetaData**>*>(arg);
+ // just swap the two FileMetaData so that we hit error
+ // in CheckConsistency funcion
+ FileMetaData* temp = *(p->first);
+ *(p->first) = *(p->second);
+ *(p->second) = temp;
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ for (int k = 0; k < 2; ++k) {
+ ASSERT_OK(Put("foo", "bar"));
+ Status s = Flush();
+ if (k < 1) {
+ ASSERT_OK(s);
+ } else {
+ ASSERT_TRUE(s.IsCorruption());
+ }
+ }
+
+ ASSERT_NOK(Put("foo", "bar"));
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_F(DBCompactionTest, ConsistencyFailTest2) {
+ Options options = CurrentOptions();
+ options.force_consistency_checks = true;
+ options.target_file_size_base = 1000;
+ options.level0_file_num_compaction_trigger = 2;
+ BlockBasedTableOptions bbto;
+ bbto.block_size = 400; // small block size
+ options.table_factory.reset(NewBlockBasedTableFactory(bbto));
+ DestroyAndReopen(options);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "VersionBuilder::CheckConsistency1", [&](void* arg) {
+ auto p =
+ reinterpret_cast<std::pair<FileMetaData**, FileMetaData**>*>(arg);
+ // just swap the two FileMetaData so that we hit error
+ // in CheckConsistency funcion
+ FileMetaData* temp = *(p->first);
+ *(p->first) = *(p->second);
+ *(p->second) = temp;
+ });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ Random rnd(301);
+ std::string value = rnd.RandomString(1000);
+
+ ASSERT_OK(Put("foo1", value));
+ ASSERT_OK(Put("z", ""));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put("foo2", value));
+ ASSERT_OK(Put("z", ""));
+ Status s = Flush();
+ ASSERT_TRUE(s.ok() || s.IsCorruption());
+
+ // This probably returns non-OK, but we rely on the next Put()
+ // to determine the DB is frozen.
+ ASSERT_NOK(dbfull()->TEST_WaitForCompact());
+ ASSERT_NOK(Put("foo", "bar"));
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+void IngestOneKeyValue(DBImpl* db, const std::string& key,
+ const std::string& value, const Options& options) {
+ ExternalSstFileInfo info;
+ std::string f = test::PerThreadDBPath("sst_file" + key);
+ EnvOptions env;
+ ROCKSDB_NAMESPACE::SstFileWriter writer(env, options);
+ auto s = writer.Open(f);
+ ASSERT_OK(s);
+ // ASSERT_OK(writer.Put(Key(), ""));
+ ASSERT_OK(writer.Put(key, value));
+
+ ASSERT_OK(writer.Finish(&info));
+ IngestExternalFileOptions ingest_opt;
+
+ ASSERT_OK(db->IngestExternalFile({info.file_path}, ingest_opt));
+}
+
+TEST_P(DBCompactionTestWithParam,
+ FlushAfterIntraL0CompactionCheckConsistencyFail) {
+ Options options = CurrentOptions();
+ options.force_consistency_checks = true;
+ options.compression = kNoCompression;
+ options.level0_file_num_compaction_trigger = 5;
+ options.max_background_compactions = 2;
+ options.max_subcompactions = max_subcompactions_;
+ DestroyAndReopen(options);
+
+ const size_t kValueSize = 1 << 20;
+ Random rnd(301);
+ std::atomic<int> pick_intra_l0_count(0);
+ std::string value(rnd.RandomString(kValueSize));
+
+ // The L0->L1 must be picked before we begin ingesting files to trigger
+ // intra-L0 compaction, and must not finish until after an intra-L0
+ // compaction has been picked.
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"LevelCompactionPicker::PickCompaction:Return",
+ "DBCompactionTestWithParam::"
+ "FlushAfterIntraL0CompactionCheckConsistencyFail:L0ToL1Ready"},
+ {"LevelCompactionPicker::PickCompactionBySize:0",
+ "CompactionJob::Run():Start"}});
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "FindIntraL0Compaction",
+ [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ // prevents trivial move
+ for (int i = 0; i < 10; ++i) {
+ ASSERT_OK(Put(Key(i), "")); // prevents trivial move
+ }
+ ASSERT_OK(Flush());
+ Compact("", Key(99));
+ ASSERT_EQ(0, NumTableFilesAtLevel(0));
+
+ // Flush 5 L0 sst.
+ for (int i = 0; i < 5; ++i) {
+ ASSERT_OK(Put(Key(i + 1), value));
+ ASSERT_OK(Flush());
+ }
+ ASSERT_EQ(5, NumTableFilesAtLevel(0));
+
+ // Put one key, to make smallest log sequence number in this memtable is less
+ // than sst which would be ingested in next step.
+ ASSERT_OK(Put(Key(0), "a"));
+
+ ASSERT_EQ(5, NumTableFilesAtLevel(0));
+ TEST_SYNC_POINT(
+ "DBCompactionTestWithParam::"
+ "FlushAfterIntraL0CompactionCheckConsistencyFail:L0ToL1Ready");
+
+ // Ingest 5 L0 sst. And this files would trigger PickIntraL0Compaction.
+ for (int i = 5; i < 10; i++) {
+ ASSERT_EQ(i, NumTableFilesAtLevel(0));
+ IngestOneKeyValue(dbfull(), Key(i), value, options);
+ }
+
+ // Put one key, to make biggest log sequence number in this memtable is bigger
+ // than sst which would be ingested in next step.
+ ASSERT_OK(Put(Key(2), "b"));
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ std::vector<std::vector<FileMetaData>> level_to_files;
+ dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
+ &level_to_files);
+ ASSERT_GT(level_to_files[0].size(), 0);
+ ASSERT_GT(pick_intra_l0_count.load(), 0);
+
+ ASSERT_OK(Flush());
+}
+
+TEST_P(DBCompactionTestWithParam,
+ IntraL0CompactionAfterFlushCheckConsistencyFail) {
+ Options options = CurrentOptions();
+ options.force_consistency_checks = true;
+ options.compression = kNoCompression;
+ options.level0_file_num_compaction_trigger = 5;
+ options.max_background_compactions = 2;
+ options.max_subcompactions = max_subcompactions_;
+ options.write_buffer_size = 2 << 20;
+ options.max_write_buffer_number = 6;
+ DestroyAndReopen(options);
+
+ const size_t kValueSize = 1 << 20;
+ Random rnd(301);
+ std::string value(rnd.RandomString(kValueSize));
+ std::string value2(rnd.RandomString(kValueSize));
+ std::string bigvalue = value + value;
+
+ // prevents trivial move
+ for (int i = 0; i < 10; ++i) {
+ ASSERT_OK(Put(Key(i), "")); // prevents trivial move
+ }
+ ASSERT_OK(Flush());
+ Compact("", Key(99));
+ ASSERT_EQ(0, NumTableFilesAtLevel(0));
+
+ std::atomic<int> pick_intra_l0_count(0);
+ // The L0->L1 must be picked before we begin ingesting files to trigger
+ // intra-L0 compaction, and must not finish until after an intra-L0
+ // compaction has been picked.
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"LevelCompactionPicker::PickCompaction:Return",
+ "DBCompactionTestWithParam::"
+ "IntraL0CompactionAfterFlushCheckConsistencyFail:L0ToL1Ready"},
+ {"LevelCompactionPicker::PickCompactionBySize:0",
+ "CompactionJob::Run():Start"}});
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "FindIntraL0Compaction",
+ [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ // Make 6 L0 sst.
+ for (int i = 0; i < 6; ++i) {
+ if (i % 2 == 0) {
+ IngestOneKeyValue(dbfull(), Key(i), value, options);
+ } else {
+ ASSERT_OK(Put(Key(i), value));
+ ASSERT_OK(Flush());
+ }
+ }
+
+ ASSERT_EQ(6, NumTableFilesAtLevel(0));
+
+ // Stop run flush job
+ env_->SetBackgroundThreads(1, Env::HIGH);
+ test::SleepingBackgroundTask sleeping_tasks;
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks,
+ Env::Priority::HIGH);
+ sleeping_tasks.WaitUntilSleeping();
+
+ // Put many keys to make memtable request to flush
+ for (int i = 0; i < 6; ++i) {
+ ASSERT_OK(Put(Key(i), bigvalue));
+ }
+
+ ASSERT_EQ(6, NumTableFilesAtLevel(0));
+ TEST_SYNC_POINT(
+ "DBCompactionTestWithParam::"
+ "IntraL0CompactionAfterFlushCheckConsistencyFail:L0ToL1Ready");
+ // ingest file to trigger IntraL0Compaction
+ for (int i = 6; i < 10; ++i) {
+ ASSERT_EQ(i, NumTableFilesAtLevel(0));
+ IngestOneKeyValue(dbfull(), Key(i), value2, options);
+ }
+
+ // Wake up flush job
+ sleeping_tasks.WakeUp();
+ sleeping_tasks.WaitUntilDone();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+
+ uint64_t error_count = 0;
+ db_->GetIntProperty("rocksdb.background-errors", &error_count);
+ ASSERT_EQ(error_count, 0);
+ ASSERT_GT(pick_intra_l0_count.load(), 0);
+ for (int i = 0; i < 6; ++i) {
+ ASSERT_EQ(bigvalue, Get(Key(i)));
+ }
+ for (int i = 6; i < 10; ++i) {
+ ASSERT_EQ(value2, Get(Key(i)));
+ }
+}
+
+TEST_P(DBCompactionTestWithBottommostParam, SequenceKeysManualCompaction) {
+ constexpr int kSstNum = 10;
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ DestroyAndReopen(options);
+
+ // Generate some sst files on level 0 with sequence keys (no overlap)
+ for (int i = 0; i < kSstNum; i++) {
+ for (int j = 1; j < UCHAR_MAX; j++) {
+ auto key = std::string(kSstNum, '\0');
+ key[kSstNum - i] += static_cast<char>(j);
+ ASSERT_OK(Put(key, std::string(i % 1000, 'A')));
+ }
+ ASSERT_OK(Flush());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+
+ ASSERT_EQ(std::to_string(kSstNum), FilesPerLevel(0));
+
+ auto cro = CompactRangeOptions();
+ cro.bottommost_level_compaction = bottommost_level_compaction_;
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+ if (bottommost_level_compaction_ == BottommostLevelCompaction::kForce ||
+ bottommost_level_compaction_ ==
+ BottommostLevelCompaction::kForceOptimized) {
+ // Real compaction to compact all sst files from level 0 to 1 file on level
+ // 1
+ ASSERT_EQ("0,1", FilesPerLevel(0));
+ } else {
+ // Just trivial move from level 0 -> 1
+ ASSERT_EQ("0," + std::to_string(kSstNum), FilesPerLevel(0));
+ }
+}
+
+INSTANTIATE_TEST_CASE_P(
+ DBCompactionTestWithBottommostParam, DBCompactionTestWithBottommostParam,
+ ::testing::Values(BottommostLevelCompaction::kSkip,
+ BottommostLevelCompaction::kIfHaveCompactionFilter,
+ BottommostLevelCompaction::kForce,
+ BottommostLevelCompaction::kForceOptimized));
+
+TEST_F(DBCompactionTest, UpdateLevelSubCompactionTest) {
+ Options options = CurrentOptions();
+ options.max_subcompactions = 10;
+ options.target_file_size_base = 1 << 10; // 1KB
+ DestroyAndReopen(options);
+
+ bool has_compaction = false;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
+ Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+ ASSERT_TRUE(compaction->max_subcompactions() == 10);
+ has_compaction = true;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ASSERT_TRUE(dbfull()->GetDBOptions().max_subcompactions == 10);
+ // Trigger compaction
+ for (int i = 0; i < 32; i++) {
+ for (int j = 0; j < 5000; j++) {
+ ASSERT_OK(Put(std::to_string(j), std::string(1, 'A')));
+ }
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_TRUE(has_compaction);
+
+ has_compaction = false;
+ ASSERT_OK(dbfull()->SetDBOptions({{"max_subcompactions", "2"}}));
+ ASSERT_TRUE(dbfull()->GetDBOptions().max_subcompactions == 2);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
+ Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+ ASSERT_TRUE(compaction->max_subcompactions() == 2);
+ has_compaction = true;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ // Trigger compaction
+ for (int i = 0; i < 32; i++) {
+ for (int j = 0; j < 5000; j++) {
+ ASSERT_OK(Put(std::to_string(j), std::string(1, 'A')));
+ }
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_TRUE(has_compaction);
+}
+
+TEST_F(DBCompactionTest, UpdateUniversalSubCompactionTest) {
+ Options options = CurrentOptions();
+ options.max_subcompactions = 10;
+ options.compaction_style = kCompactionStyleUniversal;
+ options.target_file_size_base = 1 << 10; // 1KB
+ DestroyAndReopen(options);
+
+ bool has_compaction = false;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
+ Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+ ASSERT_TRUE(compaction->max_subcompactions() == 10);
+ has_compaction = true;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ // Trigger compaction
+ for (int i = 0; i < 32; i++) {
+ for (int j = 0; j < 5000; j++) {
+ ASSERT_OK(Put(std::to_string(j), std::string(1, 'A')));
+ }
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_TRUE(has_compaction);
+ has_compaction = false;
+
+ ASSERT_OK(dbfull()->SetDBOptions({{"max_subcompactions", "2"}}));
+ ASSERT_TRUE(dbfull()->GetDBOptions().max_subcompactions == 2);
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
+ Compaction* compaction = reinterpret_cast<Compaction*>(arg);
+ ASSERT_TRUE(compaction->max_subcompactions() == 2);
+ has_compaction = true;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ // Trigger compaction
+ for (int i = 0; i < 32; i++) {
+ for (int j = 0; j < 5000; j++) {
+ ASSERT_OK(Put(std::to_string(j), std::string(1, 'A')));
+ }
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ }
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_TRUE(has_compaction);
+}
+
+TEST_P(ChangeLevelConflictsWithAuto, TestConflict) {
+ // A `CompactRange()` may race with an automatic compaction, we'll need
+ // to make sure it doesn't corrupte the data.
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = 2;
+ Reopen(options);
+
+ ASSERT_OK(Put("foo", "v1"));
+ ASSERT_OK(Put("bar", "v1"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+
+ {
+ CompactRangeOptions cro;
+ cro.change_level = true;
+ cro.target_level = 2;
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+ }
+ ASSERT_EQ("0,0,1", FilesPerLevel(0));
+
+ // Run a qury to refitting to level 1 while another thread writing to
+ // the same level.
+ SyncPoint::GetInstance()->LoadDependency({
+ // The first two dependencies ensure the foreground creates an L0 file
+ // between the background compaction's L0->L1 and its L1->L2.
+ {
+ "DBImpl::CompactRange:BeforeRefit:1",
+ "AutoCompactionFinished1",
+ },
+ {
+ "AutoCompactionFinished2",
+ "DBImpl::CompactRange:BeforeRefit:2",
+ },
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ std::thread auto_comp([&] {
+ TEST_SYNC_POINT("AutoCompactionFinished1");
+ ASSERT_OK(Put("bar", "v2"));
+ ASSERT_OK(Put("foo", "v2"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(Put("bar", "v3"));
+ ASSERT_OK(Put("foo", "v3"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ TEST_SYNC_POINT("AutoCompactionFinished2");
+ });
+
+ {
+ CompactRangeOptions cro;
+ cro.change_level = true;
+ cro.target_level = GetParam() ? 1 : 0;
+ // This should return non-OK, but it's more important for the test to
+ // make sure that the DB is not corrupted.
+ ASSERT_NOK(dbfull()->CompactRange(cro, nullptr, nullptr));
+ }
+ auto_comp.join();
+ // Refitting didn't happen.
+ SyncPoint::GetInstance()->DisableProcessing();
+
+ // Write something to DB just make sure that consistency check didn't
+ // fail and make the DB readable.
+}
+
+INSTANTIATE_TEST_CASE_P(ChangeLevelConflictsWithAuto,
+ ChangeLevelConflictsWithAuto, testing::Bool());
+
+TEST_F(DBCompactionTest, ChangeLevelCompactRangeConflictsWithManual) {
+ // A `CompactRange()` with `change_level == true` needs to execute its final
+ // step, `ReFitLevel()`, in isolation. Previously there was a bug where
+ // refitting could target the same level as an ongoing manual compaction,
+ // leading to overlapping files in that level.
+ //
+ // This test ensures that case is not possible by verifying any manual
+ // compaction issued during the `ReFitLevel()` phase fails with
+ // `Status::Incomplete`.
+ Options options = CurrentOptions();
+ options.memtable_factory.reset(
+ test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
+ options.level0_file_num_compaction_trigger = 2;
+ options.num_levels = 3;
+ Reopen(options);
+
+ // Setup an LSM with three levels populated.
+ Random rnd(301);
+ int key_idx = 0;
+ GenerateNewFile(&rnd, &key_idx);
+ {
+ CompactRangeOptions cro;
+ cro.change_level = true;
+ cro.target_level = 2;
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+ }
+ ASSERT_EQ("0,0,2", FilesPerLevel(0));
+
+ GenerateNewFile(&rnd, &key_idx);
+ GenerateNewFile(&rnd, &key_idx);
+ ASSERT_EQ("1,1,2", FilesPerLevel(0));
+
+ // The background thread will refit L2->L1 while the
+ // foreground thread will try to simultaneously compact L0->L1.
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+ // The first two dependencies ensure the foreground creates an L0 file
+ // between the background compaction's L0->L1 and its L1->L2.
+ {
+ "DBImpl::RunManualCompaction()::1",
+ "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
+ "PutFG",
+ },
+ {
+ "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
+ "FlushedFG",
+ "DBImpl::RunManualCompaction()::2",
+ },
+ // The next two dependencies ensure the foreground invokes
+ // `CompactRange()` while the background is refitting. The
+ // foreground's `CompactRange()` is guaranteed to attempt an L0->L1
+ // as we set it up with an empty memtable and a new L0 file.
+ {
+ "DBImpl::CompactRange:PreRefitLevel",
+ "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
+ "CompactFG",
+ },
+ {
+ "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
+ "CompactedFG",
+ "DBImpl::CompactRange:PostRefitLevel",
+ },
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+
+ ROCKSDB_NAMESPACE::port::Thread refit_level_thread([&] {
+ CompactRangeOptions cro;
+ cro.change_level = true;
+ cro.target_level = 1;
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+ });
+
+ TEST_SYNC_POINT(
+ "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:PutFG");
+ // Make sure we have something new to compact in the foreground.
+ // Note key 1 is carefully chosen as it ensures the file we create here
+ // overlaps with one of the files being refitted L2->L1 in the background.
+ // If we chose key 0, the file created here would not overlap.
+ ASSERT_OK(Put(Key(1), "val"));
+ ASSERT_OK(Flush());
+ TEST_SYNC_POINT(
+ "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:FlushedFG");
+
+ TEST_SYNC_POINT(
+ "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:CompactFG");
+ ASSERT_TRUE(dbfull()
+ ->CompactRange(CompactRangeOptions(), nullptr, nullptr)
+ .IsIncomplete());
+ TEST_SYNC_POINT(
+ "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
+ "CompactedFG");
+ refit_level_thread.join();
+}
+
+TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) {
+ // This test is added to ensure that RefitLevel() error paths are clearing
+ // internal flags and to test that subsequent valid RefitLevel() calls
+ // succeeds
+ Options options = CurrentOptions();
+ options.memtable_factory.reset(
+ test::NewSpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
+ options.level0_file_num_compaction_trigger = 2;
+ options.num_levels = 3;
+ Reopen(options);
+
+ ASSERT_EQ("", FilesPerLevel(0));
+
+ // Setup an LSM with three levels populated.
+ Random rnd(301);
+ int key_idx = 0;
+ GenerateNewFile(&rnd, &key_idx);
+ ASSERT_EQ("1", FilesPerLevel(0));
+ {
+ CompactRangeOptions cro;
+ cro.change_level = true;
+ cro.target_level = 2;
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+ }
+ ASSERT_EQ("0,0,2", FilesPerLevel(0));
+
+ auto start_idx = key_idx;
+ GenerateNewFile(&rnd, &key_idx);
+ GenerateNewFile(&rnd, &key_idx);
+ auto end_idx = key_idx - 1;
+ ASSERT_EQ("1,1,2", FilesPerLevel(0));
+
+ // Next two CompactRange() calls are used to test exercise error paths within
+ // RefitLevel() before triggering a valid RefitLevel() call
+
+ // Trigger a refit to L1 first
+ {
+ std::string begin_string = Key(start_idx);
+ std::string end_string = Key(end_idx);
+ Slice begin(begin_string);
+ Slice end(end_string);
+
+ CompactRangeOptions cro;
+ cro.change_level = true;
+ cro.target_level = 1;
+ ASSERT_OK(dbfull()->CompactRange(cro, &begin, &end));
+ }
+ ASSERT_EQ("0,3,2", FilesPerLevel(0));
+
+ // Try a refit from L2->L1 - this should fail and exercise error paths in
+ // RefitLevel()
+ {
+ // Select key range that matches the bottom most level (L2)
+ std::string begin_string = Key(0);
+ std::string end_string = Key(start_idx - 1);
+ Slice begin(begin_string);
+ Slice end(end_string);
+
+ CompactRangeOptions cro;
+ cro.change_level = true;
+ cro.target_level = 1;
+ ASSERT_NOK(dbfull()->CompactRange(cro, &begin, &end));
+ }
+ ASSERT_EQ("0,3,2", FilesPerLevel(0));
+
+ // Try a valid Refit request to ensure, the path is still working
+ {
+ CompactRangeOptions cro;
+ cro.change_level = true;
+ cro.target_level = 1;
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+ }
+ ASSERT_EQ("0,5", FilesPerLevel(0));
+}
+
+TEST_F(DBCompactionTest, CompactionWithBlob) {
+ Options options;
+ options.env = env_;
+ options.disable_auto_compactions = true;
+
+ Reopen(options);
+
+ constexpr char first_key[] = "first_key";
+ constexpr char second_key[] = "second_key";
+ constexpr char first_value[] = "first_value";
+ constexpr char second_value[] = "second_value";
+ constexpr char third_value[] = "third_value";
+
+ ASSERT_OK(Put(first_key, first_value));
+ ASSERT_OK(Put(second_key, first_value));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(first_key, second_value));
+ ASSERT_OK(Put(second_key, second_value));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(first_key, third_value));
+ ASSERT_OK(Put(second_key, third_value));
+ ASSERT_OK(Flush());
+
+ options.enable_blob_files = true;
+
+ Reopen(options);
+
+ constexpr Slice* begin = nullptr;
+ constexpr Slice* end = nullptr;
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+
+ ASSERT_EQ(Get(first_key), third_value);
+ ASSERT_EQ(Get(second_key), third_value);
+
+ VersionSet* const versions = dbfull()->GetVersionSet();
+ assert(versions);
+
+ ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+ ASSERT_NE(cfd, nullptr);
+
+ Version* const current = cfd->current();
+ ASSERT_NE(current, nullptr);
+
+ const VersionStorageInfo* const storage_info = current->storage_info();
+ ASSERT_NE(storage_info, nullptr);
+
+ const auto& l1_files = storage_info->LevelFiles(1);
+ ASSERT_EQ(l1_files.size(), 1);
+
+ const FileMetaData* const table_file = l1_files[0];
+ ASSERT_NE(table_file, nullptr);
+
+ const auto& blob_files = storage_info->GetBlobFiles();
+ ASSERT_EQ(blob_files.size(), 1);
+
+ const auto& blob_file = blob_files.front();
+ ASSERT_NE(blob_file, nullptr);
+
+ ASSERT_EQ(table_file->smallest.user_key(), first_key);
+ ASSERT_EQ(table_file->largest.user_key(), second_key);
+ ASSERT_EQ(table_file->oldest_blob_file_number,
+ blob_file->GetBlobFileNumber());
+
+ ASSERT_EQ(blob_file->GetTotalBlobCount(), 2);
+
+ const InternalStats* const internal_stats = cfd->internal_stats();
+ ASSERT_NE(internal_stats, nullptr);
+
+ const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
+ ASSERT_GE(compaction_stats.size(), 2);
+ ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
+ ASSERT_EQ(compaction_stats[1].bytes_written, table_file->fd.GetFileSize());
+ ASSERT_EQ(compaction_stats[1].bytes_written_blob,
+ blob_file->GetTotalBlobBytes());
+ ASSERT_EQ(compaction_stats[1].num_output_files, 1);
+ ASSERT_EQ(compaction_stats[1].num_output_files_blob, 1);
+}
+
+class DBCompactionTestBlobError
+ : public DBCompactionTest,
+ public testing::WithParamInterface<std::string> {
+ public:
+ DBCompactionTestBlobError() : sync_point_(GetParam()) {}
+
+ std::string sync_point_;
+};
+
+INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobError, DBCompactionTestBlobError,
+ ::testing::ValuesIn(std::vector<std::string>{
+ "BlobFileBuilder::WriteBlobToFile:AddRecord",
+ "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
+
+TEST_P(DBCompactionTestBlobError, CompactionError) {
+ Options options;
+ options.disable_auto_compactions = true;
+ options.env = env_;
+
+ Reopen(options);
+
+ constexpr char first_key[] = "first_key";
+ constexpr char second_key[] = "second_key";
+ constexpr char first_value[] = "first_value";
+ constexpr char second_value[] = "second_value";
+ constexpr char third_value[] = "third_value";
+
+ ASSERT_OK(Put(first_key, first_value));
+ ASSERT_OK(Put(second_key, first_value));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(first_key, second_value));
+ ASSERT_OK(Put(second_key, second_value));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(first_key, third_value));
+ ASSERT_OK(Put(second_key, third_value));
+ ASSERT_OK(Flush());
+
+ options.enable_blob_files = true;
+
+ Reopen(options);
+
+ SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
+ Status* const s = static_cast<Status*>(arg);
+ assert(s);
+
+ (*s) = Status::IOError(sync_point_);
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ constexpr Slice* begin = nullptr;
+ constexpr Slice* end = nullptr;
+
+ ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), begin, end).IsIOError());
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+
+ VersionSet* const versions = dbfull()->GetVersionSet();
+ assert(versions);
+
+ ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+ ASSERT_NE(cfd, nullptr);
+
+ Version* const current = cfd->current();
+ ASSERT_NE(current, nullptr);
+
+ const VersionStorageInfo* const storage_info = current->storage_info();
+ ASSERT_NE(storage_info, nullptr);
+
+ const auto& l1_files = storage_info->LevelFiles(1);
+ ASSERT_TRUE(l1_files.empty());
+
+ const auto& blob_files = storage_info->GetBlobFiles();
+ ASSERT_TRUE(blob_files.empty());
+
+ const InternalStats* const internal_stats = cfd->internal_stats();
+ ASSERT_NE(internal_stats, nullptr);
+
+ const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
+ ASSERT_GE(compaction_stats.size(), 2);
+
+ if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
+ ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
+ ASSERT_EQ(compaction_stats[1].bytes_written, 0);
+ ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
+ ASSERT_EQ(compaction_stats[1].num_output_files, 0);
+ ASSERT_EQ(compaction_stats[1].num_output_files_blob, 0);
+ } else {
+ // SST file writing succeeded; blob file writing failed (during Finish)
+ ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
+ ASSERT_GT(compaction_stats[1].bytes_written, 0);
+ ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
+ ASSERT_EQ(compaction_stats[1].num_output_files, 1);
+ ASSERT_EQ(compaction_stats[1].num_output_files_blob, 0);
+ }
+}
+
+class DBCompactionTestBlobGC
+ : public DBCompactionTest,
+ public testing::WithParamInterface<std::tuple<double, bool>> {
+ public:
+ DBCompactionTestBlobGC()
+ : blob_gc_age_cutoff_(std::get<0>(GetParam())),
+ updated_enable_blob_files_(std::get<1>(GetParam())) {}
+
+ double blob_gc_age_cutoff_;
+ bool updated_enable_blob_files_;
+};
+
+INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobGC, DBCompactionTestBlobGC,
+ ::testing::Combine(::testing::Values(0.0, 0.5, 1.0),
+ ::testing::Bool()));
+
+TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGCOverrides) {
+ Options options = CurrentOptions();
+ options.disable_auto_compactions = true;
+ options.enable_blob_files = true;
+ options.blob_file_size = 32; // one blob per file
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 0;
+
+ DestroyAndReopen(options);
+
+ for (int i = 0; i < 128; i += 2) {
+ ASSERT_OK(Put("key" + std::to_string(i), "value" + std::to_string(i)));
+ ASSERT_OK(
+ Put("key" + std::to_string(i + 1), "value" + std::to_string(i + 1)));
+ ASSERT_OK(Flush());
+ }
+
+ std::vector<uint64_t> original_blob_files = GetBlobFileNumbers();
+ ASSERT_EQ(original_blob_files.size(), 128);
+
+ // Note: turning off enable_blob_files before the compaction results in
+ // garbage collected values getting inlined.
+ ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}}));
+
+ CompactRangeOptions cro;
+ cro.blob_garbage_collection_policy = BlobGarbageCollectionPolicy::kForce;
+ cro.blob_garbage_collection_age_cutoff = blob_gc_age_cutoff_;
+
+ ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
+
+ // Check that the GC stats are correct
+ {
+ VersionSet* const versions = dbfull()->GetVersionSet();
+ assert(versions);
+ assert(versions->GetColumnFamilySet());
+
+ ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+ assert(cfd);
+
+ const InternalStats* const internal_stats = cfd->internal_stats();
+ assert(internal_stats);
+
+ const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
+ ASSERT_GE(compaction_stats.size(), 2);
+
+ ASSERT_GE(compaction_stats[1].bytes_read_blob, 0);
+ ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
+ }
+
+ const size_t cutoff_index = static_cast<size_t>(
+ cro.blob_garbage_collection_age_cutoff * original_blob_files.size());
+ const size_t expected_num_files = original_blob_files.size() - cutoff_index;
+
+ const std::vector<uint64_t> new_blob_files = GetBlobFileNumbers();
+
+ ASSERT_EQ(new_blob_files.size(), expected_num_files);
+
+ // Original blob files below the cutoff should be gone, original blob files
+ // at or above the cutoff should be still there
+ for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) {
+ ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
+ }
+
+ for (size_t i = 0; i < 128; ++i) {
+ ASSERT_EQ(Get("key" + std::to_string(i)), "value" + std::to_string(i));
+ }
+}
+
+TEST_P(DBCompactionTestBlobGC, CompactionWithBlobGC) {
+ Options options;
+ options.env = env_;
+ options.disable_auto_compactions = true;
+ options.enable_blob_files = true;
+ options.blob_file_size = 32; // one blob per file
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = blob_gc_age_cutoff_;
+
+ Reopen(options);
+
+ constexpr char first_key[] = "first_key";
+ constexpr char first_value[] = "first_value";
+ constexpr char second_key[] = "second_key";
+ constexpr char second_value[] = "second_value";
+
+ ASSERT_OK(Put(first_key, first_value));
+ ASSERT_OK(Put(second_key, second_value));
+ ASSERT_OK(Flush());
+
+ constexpr char third_key[] = "third_key";
+ constexpr char third_value[] = "third_value";
+ constexpr char fourth_key[] = "fourth_key";
+ constexpr char fourth_value[] = "fourth_value";
+
+ ASSERT_OK(Put(third_key, third_value));
+ ASSERT_OK(Put(fourth_key, fourth_value));
+ ASSERT_OK(Flush());
+
+ const std::vector<uint64_t> original_blob_files = GetBlobFileNumbers();
+
+ ASSERT_EQ(original_blob_files.size(), 4);
+
+ const size_t cutoff_index = static_cast<size_t>(
+ options.blob_garbage_collection_age_cutoff * original_blob_files.size());
+
+ // Note: turning off enable_blob_files before the compaction results in
+ // garbage collected values getting inlined.
+ size_t expected_number_of_files = original_blob_files.size();
+
+ if (!updated_enable_blob_files_) {
+ ASSERT_OK(db_->SetOptions({{"enable_blob_files", "false"}}));
+
+ expected_number_of_files -= cutoff_index;
+ }
+
+ constexpr Slice* begin = nullptr;
+ constexpr Slice* end = nullptr;
+
+ ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+
+ ASSERT_EQ(Get(first_key), first_value);
+ ASSERT_EQ(Get(second_key), second_value);
+ ASSERT_EQ(Get(third_key), third_value);
+ ASSERT_EQ(Get(fourth_key), fourth_value);
+
+ const std::vector<uint64_t> new_blob_files = GetBlobFileNumbers();
+
+ ASSERT_EQ(new_blob_files.size(), expected_number_of_files);
+
+ // Original blob files below the cutoff should be gone, original blob files at
+ // or above the cutoff should be still there
+ for (size_t i = cutoff_index; i < original_blob_files.size(); ++i) {
+ ASSERT_EQ(new_blob_files[i - cutoff_index], original_blob_files[i]);
+ }
+
+ VersionSet* const versions = dbfull()->GetVersionSet();
+ assert(versions);
+ assert(versions->GetColumnFamilySet());
+
+ ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
+ assert(cfd);
+
+ const InternalStats* const internal_stats = cfd->internal_stats();
+ assert(internal_stats);
+
+ const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
+ ASSERT_GE(compaction_stats.size(), 2);
+
+ if (blob_gc_age_cutoff_ > 0.0) {
+ ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
+
+ if (updated_enable_blob_files_) {
+ // GC relocated some blobs to new blob files
+ ASSERT_GT(compaction_stats[1].bytes_written_blob, 0);
+ ASSERT_EQ(compaction_stats[1].bytes_read_blob,
+ compaction_stats[1].bytes_written_blob);
+ } else {
+ // GC moved some blobs back to the LSM, no new blob files
+ ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
+ }
+ } else {
+ ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
+ ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
+ }
+}
+
+TEST_F(DBCompactionTest, CompactionWithBlobGCError_CorruptIndex) {
+ Options options;
+ options.env = env_;
+ options.disable_auto_compactions = true;
+ options.enable_blob_files = true;
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 1.0;
+
+ Reopen(options);
+
+ constexpr char first_key[] = "first_key";
+ constexpr char first_value[] = "first_value";
+ ASSERT_OK(Put(first_key, first_value));
+
+ constexpr char second_key[] = "second_key";
+ constexpr char second_value[] = "second_value";
+ ASSERT_OK(Put(second_key, second_value));
+
+ ASSERT_OK(Flush());
+
+ constexpr char third_key[] = "third_key";
+ constexpr char third_value[] = "third_value";
+ ASSERT_OK(Put(third_key, third_value));
+
+ constexpr char fourth_key[] = "fourth_key";
+ constexpr char fourth_value[] = "fourth_value";
+ ASSERT_OK(Put(fourth_key, fourth_value));
+
+ ASSERT_OK(Flush());
+
+ SyncPoint::GetInstance()->SetCallBack(
+ "CompactionIterator::GarbageCollectBlobIfNeeded::TamperWithBlobIndex",
+ [](void* arg) {
+ Slice* const blob_index = static_cast<Slice*>(arg);
+ assert(blob_index);
+ assert(!blob_index->empty());
+ blob_index->remove_prefix(1);
+ });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ constexpr Slice* begin = nullptr;
+ constexpr Slice* end = nullptr;
+
+ ASSERT_TRUE(
+ db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption());
+
+ SyncPoint::GetInstance()->DisableProcessing();
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+}
+
+TEST_F(DBCompactionTest, CompactionWithBlobGCError_InlinedTTLIndex) {
+ constexpr uint64_t min_blob_size = 10;
+
+ Options options;
+ options.env = env_;
+ options.disable_auto_compactions = true;
+ options.enable_blob_files = true;
+ options.min_blob_size = min_blob_size;
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 1.0;
+
+ Reopen(options);
- const size_t kValueSize = 1 << 20;
- Random rnd(301);
- std::string value(rnd.RandomString(kValueSize));
- std::string value2(rnd.RandomString(kValueSize));
- std::string bigvalue = value + value;
+ constexpr char first_key[] = "first_key";
+ constexpr char first_value[] = "first_value";
+ ASSERT_OK(Put(first_key, first_value));
+
+ constexpr char second_key[] = "second_key";
+ constexpr char second_value[] = "second_value";
+ ASSERT_OK(Put(second_key, second_value));
- // prevents trivial move
- for (int i = 0; i < 10; ++i) {
- ASSERT_OK(Put(Key(i), "")); // prevents trivial move
- }
ASSERT_OK(Flush());
- Compact("", Key(99));
- ASSERT_EQ(0, NumTableFilesAtLevel(0));
- std::atomic<int> pick_intra_l0_count(0);
- // The L0->L1 must be picked before we begin ingesting files to trigger
- // intra-L0 compaction, and must not finish until after an intra-L0
- // compaction has been picked.
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
- {{"LevelCompactionPicker::PickCompaction:Return",
- "DBCompactionTestWithParam::"
- "IntraL0CompactionAfterFlushCheckConsistencyFail:L0ToL1Ready"},
- {"LevelCompactionPicker::PickCompactionBySize:0",
- "CompactionJob::Run():Start"}});
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "FindIntraL0Compaction",
- [&](void* /*arg*/) { pick_intra_l0_count.fetch_add(1); });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- // Make 6 L0 sst.
- for (int i = 0; i < 6; ++i) {
- if (i % 2 == 0) {
- IngestOneKeyValue(dbfull(), Key(i), value, options);
- } else {
- ASSERT_OK(Put(Key(i), value));
- ASSERT_OK(Flush());
- }
- }
+ constexpr char third_key[] = "third_key";
+ constexpr char third_value[] = "third_value";
+ ASSERT_OK(Put(third_key, third_value));
- ASSERT_EQ(6, NumTableFilesAtLevel(0));
+ constexpr char fourth_key[] = "fourth_key";
+ constexpr char blob[] = "short";
+ static_assert(sizeof(short) - 1 < min_blob_size,
+ "Blob too long to be inlined");
- // Stop run flush job
- env_->SetBackgroundThreads(1, Env::HIGH);
- test::SleepingBackgroundTask sleeping_tasks;
- env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks,
- Env::Priority::HIGH);
- sleeping_tasks.WaitUntilSleeping();
+ // Fake an inlined TTL blob index.
+ std::string blob_index;
- // Put many keys to make memtable request to flush
- for (int i = 0; i < 6; ++i) {
- ASSERT_OK(Put(Key(i), bigvalue));
- }
+ constexpr uint64_t expiration = 1234567890;
- ASSERT_EQ(6, NumTableFilesAtLevel(0));
- TEST_SYNC_POINT(
- "DBCompactionTestWithParam::"
- "IntraL0CompactionAfterFlushCheckConsistencyFail:L0ToL1Ready");
- // ingest file to trigger IntraL0Compaction
- for (int i = 6; i < 10; ++i) {
- ASSERT_EQ(i, NumTableFilesAtLevel(0));
- IngestOneKeyValue(dbfull(), Key(i), value2, options);
- }
+ BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob);
- // Wake up flush job
- sleeping_tasks.WakeUp();
- sleeping_tasks.WaitUntilDone();
- dbfull()->TEST_WaitForCompact();
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+ WriteBatch batch;
+ ASSERT_OK(
+ WriteBatchInternal::PutBlobIndex(&batch, 0, fourth_key, blob_index));
+ ASSERT_OK(db_->Write(WriteOptions(), &batch));
- uint64_t error_count = 0;
- db_->GetIntProperty("rocksdb.background-errors", &error_count);
- ASSERT_EQ(error_count, 0);
- ASSERT_GT(pick_intra_l0_count.load(), 0);
- for (int i = 0; i < 6; ++i) {
- ASSERT_EQ(bigvalue, Get(Key(i)));
- }
- for (int i = 6; i < 10; ++i) {
- ASSERT_EQ(value2, Get(Key(i)));
- }
+ ASSERT_OK(Flush());
+
+ constexpr Slice* begin = nullptr;
+ constexpr Slice* end = nullptr;
+
+ ASSERT_TRUE(
+ db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption());
}
-TEST_P(DBCompactionTestWithBottommostParam, SequenceKeysManualCompaction) {
- constexpr int kSstNum = 10;
- Options options = CurrentOptions();
+TEST_F(DBCompactionTest, CompactionWithBlobGCError_IndexWithInvalidFileNumber) {
+ Options options;
+ options.env = env_;
options.disable_auto_compactions = true;
- DestroyAndReopen(options);
+ options.enable_blob_files = true;
+ options.enable_blob_garbage_collection = true;
+ options.blob_garbage_collection_age_cutoff = 1.0;
- // Generate some sst files on level 0 with sequence keys (no overlap)
- for (int i = 0; i < kSstNum; i++) {
- for (int j = 1; j < UCHAR_MAX; j++) {
- auto key = std::string(kSstNum, '\0');
- key[kSstNum - i] += static_cast<char>(j);
- Put(key, std::string(i % 1000, 'A'));
- }
- ASSERT_OK(Flush());
- }
- ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ Reopen(options);
- ASSERT_EQ(ToString(kSstNum), FilesPerLevel(0));
+ constexpr char first_key[] = "first_key";
+ constexpr char first_value[] = "first_value";
+ ASSERT_OK(Put(first_key, first_value));
- auto cro = CompactRangeOptions();
- cro.bottommost_level_compaction = bottommost_level_compaction_;
- db_->CompactRange(cro, nullptr, nullptr);
- if (bottommost_level_compaction_ == BottommostLevelCompaction::kForce ||
- bottommost_level_compaction_ ==
- BottommostLevelCompaction::kForceOptimized) {
- // Real compaction to compact all sst files from level 0 to 1 file on level
- // 1
- ASSERT_EQ("0,1", FilesPerLevel(0));
- } else {
- // Just trivial move from level 0 -> 1
- ASSERT_EQ("0," + ToString(kSstNum), FilesPerLevel(0));
- }
-}
+ constexpr char second_key[] = "second_key";
+ constexpr char second_value[] = "second_value";
+ ASSERT_OK(Put(second_key, second_value));
-INSTANTIATE_TEST_CASE_P(
- DBCompactionTestWithBottommostParam, DBCompactionTestWithBottommostParam,
- ::testing::Values(BottommostLevelCompaction::kSkip,
- BottommostLevelCompaction::kIfHaveCompactionFilter,
- BottommostLevelCompaction::kForce,
- BottommostLevelCompaction::kForceOptimized));
+ ASSERT_OK(Flush());
-TEST_F(DBCompactionTest, UpdateLevelSubCompactionTest) {
+ constexpr char third_key[] = "third_key";
+ constexpr char third_value[] = "third_value";
+ ASSERT_OK(Put(third_key, third_value));
+
+ constexpr char fourth_key[] = "fourth_key";
+
+ // Fake a blob index referencing a non-existent blob file.
+ std::string blob_index;
+
+ constexpr uint64_t blob_file_number = 1000;
+ constexpr uint64_t offset = 1234;
+ constexpr uint64_t size = 5678;
+
+ BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
+ kNoCompression);
+
+ WriteBatch batch;
+ ASSERT_OK(
+ WriteBatchInternal::PutBlobIndex(&batch, 0, fourth_key, blob_index));
+ ASSERT_OK(db_->Write(WriteOptions(), &batch));
+
+ ASSERT_OK(Flush());
+
+ constexpr Slice* begin = nullptr;
+ constexpr Slice* end = nullptr;
+
+ ASSERT_TRUE(
+ db_->CompactRange(CompactRangeOptions(), begin, end).IsCorruption());
+}
+
+TEST_F(DBCompactionTest, CompactionWithChecksumHandoff1) {
+ if (mem_env_ || encrypted_env_) {
+ ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
+ return;
+ }
+ std::shared_ptr<FaultInjectionTestFS> fault_fs(
+ new FaultInjectionTestFS(FileSystem::Default()));
+ std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
Options options = CurrentOptions();
- options.max_subcompactions = 10;
- options.target_file_size_base = 1 << 10; // 1KB
- DestroyAndReopen(options);
+ options.level0_file_num_compaction_trigger = 2;
+ options.num_levels = 3;
+ options.env = fault_fs_env.get();
+ options.create_if_missing = true;
+ options.checksum_handoff_file_types.Add(FileType::kTableFile);
+ Status s;
+ Reopen(options);
- bool has_compaction = false;
+ fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+ ASSERT_OK(Put(Key(0), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ ASSERT_OK(Put(Key(1), "value3"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ s = dbfull()->TEST_WaitForCompact();
+ ASSERT_EQ(s, Status::OK());
+ Destroy(options);
+ Reopen(options);
+
+ // The hash does not match, compaction write fails
+ // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
+ // Since the file system returns IOStatus::Corruption, it is an
+ // unrecoverable error.
+ ASSERT_OK(Put(Key(0), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+ "BackgroundCallCompaction:0"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
- Compaction* compaction = reinterpret_cast<Compaction*>(arg);
- ASSERT_TRUE(compaction->max_subcompactions() == 10);
- has_compaction = true;
+ "BackgroundCallCompaction:0", [&](void*) {
+ fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_OK(Put(Key(1), "value3"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ s = dbfull()->TEST_WaitForCompact();
+ ASSERT_EQ(s.severity(),
+ ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
+ SyncPoint::GetInstance()->DisableProcessing();
+ Destroy(options);
+ Reopen(options);
+
+ // The file system does not support checksum handoff. The check
+ // will be ignored.
+ fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
+ ASSERT_OK(Put(Key(0), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ ASSERT_OK(Put(Key(1), "value3"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ s = dbfull()->TEST_WaitForCompact();
+ ASSERT_EQ(s, Status::OK());
+
+ // Each write will be similated as corrupted.
+ // Since the file system returns IOStatus::Corruption, it is an
+ // unrecoverable error.
+ fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+ ASSERT_OK(Put(Key(0), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+ "BackgroundCallCompaction:0"}});
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "BackgroundCallCompaction:0",
+ [&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_OK(Put(Key(1), "value3"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ s = dbfull()->TEST_WaitForCompact();
+ ASSERT_EQ(s.severity(),
+ ROCKSDB_NAMESPACE::Status::Severity::kUnrecoverableError);
+ SyncPoint::GetInstance()->DisableProcessing();
- ASSERT_TRUE(dbfull()->GetDBOptions().max_subcompactions == 10);
- // Trigger compaction
- for (int i = 0; i < 32; i++) {
- for (int j = 0; j < 5000; j++) {
- Put(std::to_string(j), std::string(1, 'A'));
- }
- ASSERT_OK(Flush());
- ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ Destroy(options);
+}
+
+TEST_F(DBCompactionTest, CompactionWithChecksumHandoff2) {
+ if (mem_env_ || encrypted_env_) {
+ ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
+ return;
}
- dbfull()->TEST_WaitForCompact();
- ASSERT_TRUE(has_compaction);
+ std::shared_ptr<FaultInjectionTestFS> fault_fs(
+ new FaultInjectionTestFS(FileSystem::Default()));
+ std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = 2;
+ options.num_levels = 3;
+ options.env = fault_fs_env.get();
+ options.create_if_missing = true;
+ Status s;
+ Reopen(options);
- has_compaction = false;
- ASSERT_OK(dbfull()->SetDBOptions({{"max_subcompactions", "2"}}));
- ASSERT_TRUE(dbfull()->GetDBOptions().max_subcompactions == 2);
+ fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+ ASSERT_OK(Put(Key(0), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ ASSERT_OK(Put(Key(1), "value3"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ s = dbfull()->TEST_WaitForCompact();
+ ASSERT_EQ(s, Status::OK());
+ Destroy(options);
+ Reopen(options);
+ // options is not set, the checksum handoff will not be triggered
+ ASSERT_OK(Put(Key(0), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+ "BackgroundCallCompaction:0"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
- Compaction* compaction = reinterpret_cast<Compaction*>(arg);
- ASSERT_TRUE(compaction->max_subcompactions() == 2);
- has_compaction = true;
+ "BackgroundCallCompaction:0", [&](void*) {
+ fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_OK(Put(Key(1), "value3"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ s = dbfull()->TEST_WaitForCompact();
+ ASSERT_EQ(s, Status::OK());
+ SyncPoint::GetInstance()->DisableProcessing();
+ Destroy(options);
+ Reopen(options);
- // Trigger compaction
- for (int i = 0; i < 32; i++) {
- for (int j = 0; j < 5000; j++) {
- Put(std::to_string(j), std::string(1, 'A'));
- }
- ASSERT_OK(Flush());
- ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
- }
- dbfull()->TEST_WaitForCompact();
- ASSERT_TRUE(has_compaction);
+ // The file system does not support checksum handoff. The check
+ // will be ignored.
+ fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
+ ASSERT_OK(Put(Key(0), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ ASSERT_OK(Put(Key(1), "value3"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ s = dbfull()->TEST_WaitForCompact();
+ ASSERT_EQ(s, Status::OK());
+
+ // options is not set, the checksum handoff will not be triggered
+ fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+ ASSERT_OK(Put(Key(0), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+ "BackgroundCallCompaction:0"}});
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "BackgroundCallCompaction:0",
+ [&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_OK(Put(Key(1), "value3"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ s = dbfull()->TEST_WaitForCompact();
+ ASSERT_EQ(s, Status::OK());
+
+ Destroy(options);
}
-TEST_F(DBCompactionTest, UpdateUniversalSubCompactionTest) {
+TEST_F(DBCompactionTest, CompactionWithChecksumHandoffManifest1) {
+ if (mem_env_ || encrypted_env_) {
+ ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
+ return;
+ }
+ std::shared_ptr<FaultInjectionTestFS> fault_fs(
+ new FaultInjectionTestFS(FileSystem::Default()));
+ std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
Options options = CurrentOptions();
- options.max_subcompactions = 10;
- options.compaction_style = kCompactionStyleUniversal;
- options.target_file_size_base = 1 << 10; // 1KB
- DestroyAndReopen(options);
+ options.level0_file_num_compaction_trigger = 2;
+ options.num_levels = 3;
+ options.env = fault_fs_env.get();
+ options.create_if_missing = true;
+ options.checksum_handoff_file_types.Add(FileType::kDescriptorFile);
+ Status s;
+ fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+ Reopen(options);
- bool has_compaction = false;
+ ASSERT_OK(Put(Key(0), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ ASSERT_OK(Put(Key(1), "value3"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ s = dbfull()->TEST_WaitForCompact();
+ ASSERT_EQ(s, Status::OK());
+ Destroy(options);
+ Reopen(options);
+
+ // The hash does not match, compaction write fails
+ // fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
+ // Since the file system returns IOStatus::Corruption, it is mapped to
+ // kFatalError error.
+ ASSERT_OK(Put(Key(0), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+ "BackgroundCallCompaction:0"}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
- Compaction* compaction = reinterpret_cast<Compaction*>(arg);
- ASSERT_TRUE(compaction->max_subcompactions() == 10);
- has_compaction = true;
+ "BackgroundCallCompaction:0", [&](void*) {
+ fault_fs->SetChecksumHandoffFuncType(ChecksumType::kxxHash);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_OK(Put(Key(1), "value3"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ s = dbfull()->TEST_WaitForCompact();
+ ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
+ SyncPoint::GetInstance()->DisableProcessing();
+ Destroy(options);
+}
- // Trigger compaction
- for (int i = 0; i < 32; i++) {
- for (int j = 0; j < 5000; j++) {
- Put(std::to_string(j), std::string(1, 'A'));
- }
- ASSERT_OK(Flush());
- ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+TEST_F(DBCompactionTest, CompactionWithChecksumHandoffManifest2) {
+ if (mem_env_ || encrypted_env_) {
+ ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
+ return;
}
- dbfull()->TEST_WaitForCompact();
- ASSERT_TRUE(has_compaction);
- has_compaction = false;
+ std::shared_ptr<FaultInjectionTestFS> fault_fs(
+ new FaultInjectionTestFS(FileSystem::Default()));
+ std::unique_ptr<Env> fault_fs_env(NewCompositeEnv(fault_fs));
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = 2;
+ options.num_levels = 3;
+ options.env = fault_fs_env.get();
+ options.create_if_missing = true;
+ options.checksum_handoff_file_types.Add(FileType::kDescriptorFile);
+ Status s;
+ fault_fs->SetChecksumHandoffFuncType(ChecksumType::kNoChecksum);
+ Reopen(options);
+
+ // The file system does not support checksum handoff. The check
+ // will be ignored.
+ ASSERT_OK(Put(Key(0), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ ASSERT_OK(Put(Key(1), "value3"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ s = dbfull()->TEST_WaitForCompact();
+ ASSERT_EQ(s, Status::OK());
+
+ // Each write will be similated as corrupted.
+ // Since the file system returns IOStatus::Corruption, it is mapped to
+ // kFatalError error.
+ fault_fs->SetChecksumHandoffFuncType(ChecksumType::kCRC32c);
+ ASSERT_OK(Put(Key(0), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::FlushMemTable:FlushMemTableFinished",
+ "BackgroundCallCompaction:0"}});
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "BackgroundCallCompaction:0",
+ [&](void*) { fault_fs->IngestDataCorruptionBeforeWrite(); });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ ASSERT_OK(Put(Key(1), "value3"));
+ s = Flush();
+ ASSERT_EQ(s, Status::OK());
+ s = dbfull()->TEST_WaitForCompact();
+ ASSERT_EQ(s.severity(), ROCKSDB_NAMESPACE::Status::Severity::kFatalError);
+ SyncPoint::GetInstance()->DisableProcessing();
- ASSERT_OK(dbfull()->SetDBOptions({{"max_subcompactions", "2"}}));
- ASSERT_TRUE(dbfull()->GetDBOptions().max_subcompactions == 2);
+ Destroy(options);
+}
+
+TEST_F(DBCompactionTest, FIFOWarm) {
+ Options options = CurrentOptions();
+ options.compaction_style = kCompactionStyleFIFO;
+ options.num_levels = 1;
+ options.max_open_files = -1;
+ options.level0_file_num_compaction_trigger = 2;
+ options.create_if_missing = true;
+ CompactionOptionsFIFO fifo_options;
+ fifo_options.age_for_warm = 1000;
+ fifo_options.max_table_files_size = 100000000;
+ options.compaction_options_fifo = fifo_options;
+ env_->SetMockSleep();
+ Reopen(options);
+ int total_warm = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
- "UniversalCompactionBuilder::PickCompaction:Return", [&](void* arg) {
- Compaction* compaction = reinterpret_cast<Compaction*>(arg);
- ASSERT_TRUE(compaction->max_subcompactions() == 2);
- has_compaction = true;
+ "NewWritableFile::FileOptions.temperature", [&](void* arg) {
+ Temperature temperature = *(static_cast<Temperature*>(arg));
+ if (temperature == Temperature::kWarm) {
+ total_warm++;
+ }
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- // Trigger compaction
- for (int i = 0; i < 32; i++) {
- for (int j = 0; j < 5000; j++) {
- Put(std::to_string(j), std::string(1, 'A'));
- }
- ASSERT_OK(Flush());
- ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
- }
- dbfull()->TEST_WaitForCompact();
- ASSERT_TRUE(has_compaction);
+ // The file system does not support checksum handoff. The check
+ // will be ignored.
+ ASSERT_OK(Put(Key(0), "value1"));
+ env_->MockSleepForSeconds(800);
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(Key(0), "value1"));
+ env_->MockSleepForSeconds(800);
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(Put(Key(0), "value1"));
+ env_->MockSleepForSeconds(800);
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ ASSERT_OK(Put(Key(0), "value1"));
+ env_->MockSleepForSeconds(800);
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
+
+ ColumnFamilyMetaData metadata;
+ db_->GetColumnFamilyMetaData(&metadata);
+ ASSERT_EQ(4, metadata.file_count);
+ ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[0].temperature);
+ ASSERT_EQ(Temperature::kUnknown, metadata.levels[0].files[1].temperature);
+ ASSERT_EQ(Temperature::kWarm, metadata.levels[0].files[2].temperature);
+ ASSERT_EQ(Temperature::kWarm, metadata.levels[0].files[3].temperature);
+ ASSERT_EQ(2, total_warm);
+
+ Destroy(options);
}
-TEST_P(ChangeLevelConflictsWithAuto, TestConflict) {
- // A `CompactRange()` may race with an automatic compaction, we'll need
- // to make sure it doesn't corrupte the data.
+TEST_F(DBCompactionTest, DisableMultiManualCompaction) {
+ const int kNumL0Files = 10;
+
Options options = CurrentOptions();
- options.level0_file_num_compaction_trigger = 2;
+ options.level0_file_num_compaction_trigger = kNumL0Files;
Reopen(options);
- ASSERT_OK(Put("foo", "v1"));
- ASSERT_OK(Put("bar", "v1"));
- ASSERT_OK(Flush());
- ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
+ // Generate 2 levels of file to make sure the manual compaction is not skipped
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(Put(Key(i), "value"));
+ if (i % 2) {
+ ASSERT_OK(Flush());
+ }
+ }
+ MoveFilesToLevel(2);
- {
- CompactRangeOptions cro;
- cro.change_level = true;
- cro.target_level = 2;
- ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(Put(Key(i), "value"));
+ if (i % 2) {
+ ASSERT_OK(Flush());
+ }
}
- ASSERT_EQ("0,0,1", FilesPerLevel(0));
+ MoveFilesToLevel(1);
- // Run a qury to refitting to level 1 while another thread writing to
- // the same level.
- SyncPoint::GetInstance()->LoadDependency({
- // The first two dependencies ensure the foreground creates an L0 file
- // between the background compaction's L0->L1 and its L1->L2.
- {
- "DBImpl::CompactRange:BeforeRefit:1",
- "AutoCompactionFinished1",
- },
- {
- "AutoCompactionFinished2",
- "DBImpl::CompactRange:BeforeRefit:2",
- },
- });
- SyncPoint::GetInstance()->EnableProcessing();
+ // Block compaction queue
+ test::SleepingBackgroundTask sleeping_task_low;
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
+ Env::Priority::LOW);
- std::thread auto_comp([&] {
- TEST_SYNC_POINT("AutoCompactionFinished1");
- ASSERT_OK(Put("bar", "v2"));
- ASSERT_OK(Put("foo", "v2"));
- ASSERT_OK(Flush());
- ASSERT_OK(Put("bar", "v3"));
- ASSERT_OK(Put("foo", "v3"));
- ASSERT_OK(Flush());
- dbfull()->TEST_WaitForCompact();
- TEST_SYNC_POINT("AutoCompactionFinished2");
+ port::Thread compact_thread1([&]() {
+ CompactRangeOptions cro;
+ cro.exclusive_manual_compaction = false;
+ std::string begin_str = Key(0);
+ std::string end_str = Key(3);
+ Slice b = begin_str;
+ Slice e = end_str;
+ auto s = db_->CompactRange(cro, &b, &e);
+ ASSERT_TRUE(s.IsIncomplete());
});
- {
+ port::Thread compact_thread2([&]() {
CompactRangeOptions cro;
- cro.change_level = true;
- cro.target_level = GetParam() ? 1 : 0;
- // This should return non-OK, but it's more important for the test to
- // make sure that the DB is not corrupted.
- dbfull()->CompactRange(cro, nullptr, nullptr);
- }
- auto_comp.join();
- // Refitting didn't happen.
- SyncPoint::GetInstance()->DisableProcessing();
+ cro.exclusive_manual_compaction = false;
+ std::string begin_str = Key(4);
+ std::string end_str = Key(7);
+ Slice b = begin_str;
+ Slice e = end_str;
+ auto s = db_->CompactRange(cro, &b, &e);
+ ASSERT_TRUE(s.IsIncomplete());
+ });
- // Write something to DB just make sure that consistency check didn't
- // fail and make the DB readable.
+ // Disable manual compaction should cancel both manual compactions and both
+ // compaction should return incomplete.
+ db_->DisableManualCompaction();
+
+ compact_thread1.join();
+ compact_thread2.join();
+
+ sleeping_task_low.WakeUp();
+ sleeping_task_low.WaitUntilDone();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
}
-INSTANTIATE_TEST_CASE_P(ChangeLevelConflictsWithAuto,
- ChangeLevelConflictsWithAuto, testing::Bool());
+TEST_F(DBCompactionTest, DisableJustStartedManualCompaction) {
+ const int kNumL0Files = 4;
-TEST_F(DBCompactionTest, ChangeLevelCompactRangeConflictsWithManual) {
- // A `CompactRange()` with `change_level == true` needs to execute its final
- // step, `ReFitLevel()`, in isolation. Previously there was a bug where
- // refitting could target the same level as an ongoing manual compaction,
- // leading to overlapping files in that level.
- //
- // This test ensures that case is not possible by verifying any manual
- // compaction issued during the `ReFitLevel()` phase fails with
- // `Status::Incomplete`.
Options options = CurrentOptions();
- options.memtable_factory.reset(
- new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
- options.level0_file_num_compaction_trigger = 2;
- options.num_levels = 3;
+ options.level0_file_num_compaction_trigger = kNumL0Files;
Reopen(options);
- // Setup an LSM with three levels populated.
- Random rnd(301);
- int key_idx = 0;
- GenerateNewFile(&rnd, &key_idx);
- {
- CompactRangeOptions cro;
- cro.change_level = true;
- cro.target_level = 2;
- ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+ // generate files, but avoid trigger auto compaction
+ for (int i = 0; i < kNumL0Files / 2; i++) {
+ ASSERT_OK(Put(Key(1), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
}
- ASSERT_EQ("0,0,2", FilesPerLevel(0));
-
- GenerateNewFile(&rnd, &key_idx);
- GenerateNewFile(&rnd, &key_idx);
- ASSERT_EQ("1,1,2", FilesPerLevel(0));
- // The background thread will refit L2->L1 while the
- // foreground thread will try to simultaneously compact L0->L1.
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
- // The first two dependencies ensure the foreground creates an L0 file
- // between the background compaction's L0->L1 and its L1->L2.
- {
- "DBImpl::RunManualCompaction()::1",
- "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
- "PutFG",
- },
- {
- "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
- "FlushedFG",
- "DBImpl::RunManualCompaction()::2",
- },
- // The next two dependencies ensure the foreground invokes
- // `CompactRange()` while the background is refitting. The
- // foreground's `CompactRange()` is guaranteed to attempt an L0->L1
- // as we set it up with an empty memtable and a new L0 file.
- {
- "DBImpl::CompactRange:PreRefitLevel",
- "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
- "CompactFG",
- },
- {
- "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
- "CompactedFG",
- "DBImpl::CompactRange:PostRefitLevel",
- },
- });
- ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ // make sure the manual compaction background is started but not yet set the
+ // status to in_progress, then cancel the manual compaction, which should not
+ // result in segfault
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::BGWorkCompaction",
+ "DBCompactionTest::DisableJustStartedManualCompaction:"
+ "PreDisableManualCompaction"},
+ {"DBImpl::RunManualCompaction:Unscheduled",
+ "BackgroundCallCompaction:0"}});
+ SyncPoint::GetInstance()->EnableProcessing();
- ROCKSDB_NAMESPACE::port::Thread refit_level_thread([&] {
+ port::Thread compact_thread([&]() {
CompactRangeOptions cro;
- cro.change_level = true;
- cro.target_level = 1;
- ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+ cro.exclusive_manual_compaction = true;
+ auto s = db_->CompactRange(cro, nullptr, nullptr);
+ ASSERT_TRUE(s.IsIncomplete());
});
-
- TEST_SYNC_POINT(
- "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:PutFG");
- // Make sure we have something new to compact in the foreground.
- // Note key 1 is carefully chosen as it ensures the file we create here
- // overlaps with one of the files being refitted L2->L1 in the background.
- // If we chose key 0, the file created here would not overlap.
- ASSERT_OK(Put(Key(1), "val"));
- ASSERT_OK(Flush());
TEST_SYNC_POINT(
- "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:FlushedFG");
+ "DBCompactionTest::DisableJustStartedManualCompaction:"
+ "PreDisableManualCompaction");
+ db_->DisableManualCompaction();
- TEST_SYNC_POINT(
- "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:CompactFG");
- ASSERT_TRUE(dbfull()
- ->CompactRange(CompactRangeOptions(), nullptr, nullptr)
- .IsIncomplete());
- TEST_SYNC_POINT(
- "DBCompactionTest::ChangeLevelCompactRangeConflictsWithManual:"
- "CompactedFG");
- refit_level_thread.join();
+ compact_thread.join();
}
-TEST_F(DBCompactionTest, ChangeLevelErrorPathTest) {
- // This test is added to ensure that RefitLevel() error paths are clearing
- // internal flags and to test that subsequent valid RefitLevel() calls
- // succeeds
+TEST_F(DBCompactionTest, DisableInProgressManualCompaction) {
+ const int kNumL0Files = 4;
+
Options options = CurrentOptions();
- options.memtable_factory.reset(
- new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
- options.level0_file_num_compaction_trigger = 2;
- options.num_levels = 3;
+ options.level0_file_num_compaction_trigger = kNumL0Files;
Reopen(options);
- ASSERT_EQ("", FilesPerLevel(0));
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::BackgroundCompaction:InProgress",
+ "DBCompactionTest::DisableInProgressManualCompaction:"
+ "PreDisableManualCompaction"},
+ {"DBImpl::RunManualCompaction:Unscheduled",
+ "CompactionJob::Run():Start"}});
+ SyncPoint::GetInstance()->EnableProcessing();
- // Setup an LSM with three levels populated.
- Random rnd(301);
- int key_idx = 0;
- GenerateNewFile(&rnd, &key_idx);
- ASSERT_EQ("1", FilesPerLevel(0));
- {
- CompactRangeOptions cro;
- cro.change_level = true;
- cro.target_level = 2;
- ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+ // generate files, but avoid trigger auto compaction
+ for (int i = 0; i < kNumL0Files / 2; i++) {
+ ASSERT_OK(Put(Key(1), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
}
- ASSERT_EQ("0,0,2", FilesPerLevel(0));
- auto start_idx = key_idx;
- GenerateNewFile(&rnd, &key_idx);
- GenerateNewFile(&rnd, &key_idx);
- auto end_idx = key_idx - 1;
- ASSERT_EQ("1,1,2", FilesPerLevel(0));
+ port::Thread compact_thread([&]() {
+ CompactRangeOptions cro;
+ cro.exclusive_manual_compaction = true;
+ auto s = db_->CompactRange(cro, nullptr, nullptr);
+ ASSERT_TRUE(s.IsIncomplete());
+ });
- // Next two CompactRange() calls are used to test exercise error paths within
- // RefitLevel() before triggering a valid RefitLevel() call
+ TEST_SYNC_POINT(
+ "DBCompactionTest::DisableInProgressManualCompaction:"
+ "PreDisableManualCompaction");
+ db_->DisableManualCompaction();
- // Trigger a refit to L1 first
- {
- std::string begin_string = Key(start_idx);
- std::string end_string = Key(end_idx);
- Slice begin(begin_string);
- Slice end(end_string);
+ compact_thread.join();
+}
- CompactRangeOptions cro;
- cro.change_level = true;
- cro.target_level = 1;
- ASSERT_OK(dbfull()->CompactRange(cro, &begin, &end));
- }
- ASSERT_EQ("0,3,2", FilesPerLevel(0));
+TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFull) {
+ const int kNumL0Files = 4;
- // Try a refit from L2->L1 - this should fail and exercise error paths in
- // RefitLevel()
- {
- // Select key range that matches the bottom most level (L2)
- std::string begin_string = Key(0);
- std::string end_string = Key(start_idx - 1);
- Slice begin(begin_string);
- Slice end(end_string);
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::RunManualCompaction:Scheduled",
+ "DBCompactionTest::DisableManualCompactionThreadQueueFull:"
+ "PreDisableManualCompaction"}});
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = kNumL0Files;
+ Reopen(options);
- CompactRangeOptions cro;
- cro.change_level = true;
- cro.target_level = 1;
- ASSERT_NOK(dbfull()->CompactRange(cro, &begin, &end));
- }
- ASSERT_EQ("0,3,2", FilesPerLevel(0));
+ // Block compaction queue
+ test::SleepingBackgroundTask sleeping_task_low;
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
+ Env::Priority::LOW);
- // Try a valid Refit request to ensure, the path is still working
- {
- CompactRangeOptions cro;
- cro.change_level = true;
- cro.target_level = 1;
- ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+ // generate files, but avoid trigger auto compaction
+ for (int i = 0; i < kNumL0Files / 2; i++) {
+ ASSERT_OK(Put(Key(1), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
}
- ASSERT_EQ("0,5", FilesPerLevel(0));
-}
-TEST_F(DBCompactionTest, CompactionWithBlob) {
- Options options;
- options.env = env_;
- options.disable_auto_compactions = true;
+ port::Thread compact_thread([&]() {
+ CompactRangeOptions cro;
+ cro.exclusive_manual_compaction = true;
+ auto s = db_->CompactRange(cro, nullptr, nullptr);
+ ASSERT_TRUE(s.IsIncomplete());
+ });
- Reopen(options);
+ TEST_SYNC_POINT(
+ "DBCompactionTest::DisableManualCompactionThreadQueueFull:"
+ "PreDisableManualCompaction");
+
+ // Generate more files to trigger auto compaction which is scheduled after
+ // manual compaction. Has to generate 4 more files because existing files are
+ // pending compaction
+ for (int i = 0; i < kNumL0Files; i++) {
+ ASSERT_OK(Put(Key(1), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+ }
+ ASSERT_EQ(std::to_string(kNumL0Files + (kNumL0Files / 2)), FilesPerLevel(0));
- constexpr char first_key[] = "first_key";
- constexpr char second_key[] = "second_key";
- constexpr char first_value[] = "first_value";
- constexpr char second_value[] = "second_value";
- constexpr char third_value[] = "third_value";
+ db_->DisableManualCompaction();
- ASSERT_OK(Put(first_key, first_value));
- ASSERT_OK(Put(second_key, first_value));
- ASSERT_OK(Flush());
+ // CompactRange should return before the compaction has the chance to run
+ compact_thread.join();
- ASSERT_OK(Put(first_key, second_value));
- ASSERT_OK(Put(second_key, second_value));
- ASSERT_OK(Flush());
+ sleeping_task_low.WakeUp();
+ sleeping_task_low.WaitUntilDone();
+ ASSERT_OK(dbfull()->TEST_WaitForCompact(true));
+ ASSERT_EQ("0,1", FilesPerLevel(0));
+}
- ASSERT_OK(Put(first_key, third_value));
- ASSERT_OK(Put(second_key, third_value));
- ASSERT_OK(Flush());
+TEST_F(DBCompactionTest, DisableManualCompactionThreadQueueFullDBClose) {
+ const int kNumL0Files = 4;
- options.enable_blob_files = true;
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::RunManualCompaction:Scheduled",
+ "DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
+ "PreDisableManualCompaction"}});
+ SyncPoint::GetInstance()->EnableProcessing();
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = kNumL0Files;
Reopen(options);
- constexpr Slice* begin = nullptr;
- constexpr Slice* end = nullptr;
+ // Block compaction queue
+ test::SleepingBackgroundTask sleeping_task_low;
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
+ Env::Priority::LOW);
- ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
+ // generate files, but avoid trigger auto compaction
+ for (int i = 0; i < kNumL0Files / 2; i++) {
+ ASSERT_OK(Put(Key(1), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+ }
- ASSERT_EQ(Get(first_key), third_value);
- ASSERT_EQ(Get(second_key), third_value);
+ port::Thread compact_thread([&]() {
+ CompactRangeOptions cro;
+ cro.exclusive_manual_compaction = true;
+ auto s = db_->CompactRange(cro, nullptr, nullptr);
+ ASSERT_TRUE(s.IsIncomplete());
+ });
- VersionSet* const versions = dbfull()->TEST_GetVersionSet();
- assert(versions);
+ TEST_SYNC_POINT(
+ "DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
+ "PreDisableManualCompaction");
+
+ // Generate more files to trigger auto compaction which is scheduled after
+ // manual compaction. Has to generate 4 more files because existing files are
+ // pending compaction
+ for (int i = 0; i < kNumL0Files; i++) {
+ ASSERT_OK(Put(Key(1), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+ }
+ ASSERT_EQ(std::to_string(kNumL0Files + (kNumL0Files / 2)), FilesPerLevel(0));
- ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
- assert(cfd);
+ db_->DisableManualCompaction();
- Version* const current = cfd->current();
- assert(current);
+ // CompactRange should return before the compaction has the chance to run
+ compact_thread.join();
- const VersionStorageInfo* const storage_info = current->storage_info();
- assert(storage_info);
+ // Try close DB while manual compaction is canceled but still in the queue.
+ // And an auto-triggered compaction is also in the queue.
+ auto s = db_->Close();
+ ASSERT_OK(s);
- const auto& l1_files = storage_info->LevelFiles(1);
- ASSERT_EQ(l1_files.size(), 1);
+ sleeping_task_low.WakeUp();
+ sleeping_task_low.WaitUntilDone();
+}
- const FileMetaData* const table_file = l1_files[0];
- assert(table_file);
+TEST_F(DBCompactionTest, DBCloseWithManualCompaction) {
+ const int kNumL0Files = 4;
- const auto& blob_files = storage_info->GetBlobFiles();
- ASSERT_EQ(blob_files.size(), 1);
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::RunManualCompaction:Scheduled",
+ "DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
+ "PreDisableManualCompaction"}});
+ SyncPoint::GetInstance()->EnableProcessing();
- const auto& blob_file = blob_files.begin()->second;
- assert(blob_file);
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = kNumL0Files;
+ Reopen(options);
- ASSERT_EQ(table_file->smallest.user_key(), first_key);
- ASSERT_EQ(table_file->largest.user_key(), second_key);
- ASSERT_EQ(table_file->oldest_blob_file_number,
- blob_file->GetBlobFileNumber());
+ // Block compaction queue
+ test::SleepingBackgroundTask sleeping_task_low;
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
+ Env::Priority::LOW);
- ASSERT_EQ(blob_file->GetTotalBlobCount(), 2);
+ // generate files, but avoid trigger auto compaction
+ for (int i = 0; i < kNumL0Files / 2; i++) {
+ ASSERT_OK(Put(Key(1), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+ }
- const InternalStats* const internal_stats = cfd->internal_stats();
- assert(internal_stats);
+ port::Thread compact_thread([&]() {
+ CompactRangeOptions cro;
+ cro.exclusive_manual_compaction = true;
+ auto s = db_->CompactRange(cro, nullptr, nullptr);
+ ASSERT_TRUE(s.IsIncomplete());
+ });
- const uint64_t expected_bytes =
- table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
+ TEST_SYNC_POINT(
+ "DBCompactionTest::DisableManualCompactionThreadQueueFullDBClose:"
+ "PreDisableManualCompaction");
+
+ // Generate more files to trigger auto compaction which is scheduled after
+ // manual compaction. Has to generate 4 more files because existing files are
+ // pending compaction
+ for (int i = 0; i < kNumL0Files; i++) {
+ ASSERT_OK(Put(Key(1), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+ }
+ ASSERT_EQ(std::to_string(kNumL0Files + (kNumL0Files / 2)), FilesPerLevel(0));
- const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
- ASSERT_GE(compaction_stats.size(), 2);
- ASSERT_EQ(compaction_stats[1].bytes_written, expected_bytes);
- ASSERT_EQ(compaction_stats[1].num_output_files, 2);
-}
+ // Close DB with manual compaction and auto triggered compaction in the queue.
+ auto s = db_->Close();
+ ASSERT_OK(s);
-class DBCompactionTestBlobError
- : public DBCompactionTest,
- public testing::WithParamInterface<std::string> {
- public:
- DBCompactionTestBlobError()
- : fault_injection_env_(env_), sync_point_(GetParam()) {}
- ~DBCompactionTestBlobError() { Close(); }
+ // manual compaction thread should return with Incomplete().
+ compact_thread.join();
- FaultInjectionTestEnv fault_injection_env_;
- std::string sync_point_;
-};
+ sleeping_task_low.WakeUp();
+ sleeping_task_low.WaitUntilDone();
+}
-INSTANTIATE_TEST_CASE_P(DBCompactionTestBlobError, DBCompactionTestBlobError,
- ::testing::ValuesIn(std::vector<std::string>{
- "BlobFileBuilder::WriteBlobToFile:AddRecord",
- "BlobFileBuilder::WriteBlobToFile:AppendFooter"}));
+TEST_F(DBCompactionTest,
+ DisableManualCompactionDoesNotWaitForDrainingAutomaticCompaction) {
+ // When `CompactRangeOptions::exclusive_manual_compaction == true`, we wait
+ // for automatic compactions to drain before starting the manual compaction.
+ // This test verifies `DisableManualCompaction()` can cancel such a compaction
+ // without waiting for the drain to complete.
+ const int kNumL0Files = 4;
-TEST_P(DBCompactionTestBlobError, CompactionError) {
- Options options;
- options.disable_auto_compactions = true;
- options.env = env_;
+ // Enforces manual compaction enters wait loop due to pending automatic
+ // compaction.
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::BGWorkCompaction", "DBImpl::RunManualCompaction:NotScheduled"},
+ {"DBImpl::RunManualCompaction:WaitScheduled",
+ "BackgroundCallCompaction:0"}});
+ // The automatic compaction will cancel the waiting manual compaction.
+ // Completing this implies the cancellation did not wait on automatic
+ // compactions to finish.
+ bool callback_completed = false;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "BackgroundCallCompaction:0", [&](void* /*arg*/) {
+ db_->DisableManualCompaction();
+ callback_completed = true;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = kNumL0Files;
Reopen(options);
- constexpr char first_key[] = "first_key";
- constexpr char second_key[] = "second_key";
- constexpr char first_value[] = "first_value";
- constexpr char second_value[] = "second_value";
- constexpr char third_value[] = "third_value";
+ for (int i = 0; i < kNumL0Files; ++i) {
+ ASSERT_OK(Put(Key(1), "value1"));
+ ASSERT_OK(Put(Key(2), "value2"));
+ ASSERT_OK(Flush());
+ }
- ASSERT_OK(Put(first_key, first_value));
- ASSERT_OK(Put(second_key, first_value));
- ASSERT_OK(Flush());
+ CompactRangeOptions cro;
+ cro.exclusive_manual_compaction = true;
+ ASSERT_TRUE(db_->CompactRange(cro, nullptr, nullptr).IsIncomplete());
- ASSERT_OK(Put(first_key, second_value));
- ASSERT_OK(Put(second_key, second_value));
- ASSERT_OK(Flush());
+ ASSERT_OK(dbfull()->TEST_WaitForCompact());
+ ASSERT_TRUE(callback_completed);
+}
- ASSERT_OK(Put(first_key, third_value));
- ASSERT_OK(Put(second_key, third_value));
- ASSERT_OK(Flush());
+TEST_F(DBCompactionTest, ChangeLevelConflictsWithManual) {
+ Options options = CurrentOptions();
+ options.num_levels = 3;
+ Reopen(options);
- options.enable_blob_files = true;
- options.env = &fault_injection_env_;
+ // Setup an LSM with L2 populated.
+ Random rnd(301);
+ ASSERT_OK(Put(Key(0), rnd.RandomString(990)));
+ ASSERT_OK(Put(Key(1), rnd.RandomString(990)));
+ {
+ CompactRangeOptions cro;
+ cro.change_level = true;
+ cro.target_level = 2;
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+ }
+ ASSERT_EQ("0,0,1", FilesPerLevel(0));
- Reopen(options);
+ // The background thread will refit L2->L1 while the foreground thread will
+ // attempt to run a compaction on new data. The following dependencies
+ // ensure the background manual compaction's refitting phase disables manual
+ // compaction immediately before the foreground manual compaction can register
+ // itself. Manual compaction is kept disabled until the foreground manual
+ // checks for the failure once.
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({
+ // Only do Put()s for foreground CompactRange() once the background
+ // CompactRange() has reached the refitting phase.
+ {
+ "DBImpl::CompactRange:BeforeRefit:1",
+ "DBCompactionTest::ChangeLevelConflictsWithManual:"
+ "PreForegroundCompactRange",
+ },
+ // Right before we register the manual compaction, proceed with
+ // the refitting phase so manual compactions are disabled. Stay in
+ // the refitting phase with manual compactions disabled until it is
+ // noticed.
+ {
+ "DBImpl::RunManualCompaction:0",
+ "DBImpl::CompactRange:BeforeRefit:2",
+ },
+ {
+ "DBImpl::CompactRange:PreRefitLevel",
+ "DBImpl::RunManualCompaction:1",
+ },
+ {
+ "DBImpl::RunManualCompaction:PausedAtStart",
+ "DBImpl::CompactRange:PostRefitLevel",
+ },
+ // If compaction somehow were scheduled, let's let it run after reenabling
+ // manual compactions. This dependency is not expected to be hit but is
+ // here for speculatively coercing future bugs.
+ {
+ "DBImpl::CompactRange:PostRefitLevel:ManualCompactionEnabled",
+ "BackgroundCallCompaction:0",
+ },
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
- SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
- fault_injection_env_.SetFilesystemActive(false,
- Status::IOError(sync_point_));
+ ROCKSDB_NAMESPACE::port::Thread refit_level_thread([&] {
+ CompactRangeOptions cro;
+ cro.change_level = true;
+ cro.target_level = 1;
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
});
- SyncPoint::GetInstance()->EnableProcessing();
- constexpr Slice* begin = nullptr;
- constexpr Slice* end = nullptr;
+ TEST_SYNC_POINT(
+ "DBCompactionTest::ChangeLevelConflictsWithManual:"
+ "PreForegroundCompactRange");
+ ASSERT_OK(Put(Key(0), rnd.RandomString(990)));
+ ASSERT_OK(Put(Key(1), rnd.RandomString(990)));
+ ASSERT_TRUE(dbfull()
+ ->CompactRange(CompactRangeOptions(), nullptr, nullptr)
+ .IsIncomplete());
- ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), begin, end).IsIOError());
+ refit_level_thread.join();
+}
- SyncPoint::GetInstance()->DisableProcessing();
- SyncPoint::GetInstance()->ClearAllCallBacks();
+TEST_F(DBCompactionTest, BottomPriCompactionCountsTowardConcurrencyLimit) {
+ // Flushes several files to trigger compaction while lock is released during
+ // a bottom-pri compaction. Verifies it does not get scheduled to thread pool
+ // because per-DB limit for compaction parallelism is one (default).
+ const int kNumL0Files = 4;
+ const int kNumLevels = 3;
- VersionSet* const versions = dbfull()->TEST_GetVersionSet();
- assert(versions);
+ env_->SetBackgroundThreads(1, Env::Priority::BOTTOM);
- ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
- assert(cfd);
+ Options options = CurrentOptions();
+ options.level0_file_num_compaction_trigger = kNumL0Files;
+ options.num_levels = kNumLevels;
+ DestroyAndReopen(options);
- Version* const current = cfd->current();
- assert(current);
+ // Setup last level to be non-empty since it's a bit unclear whether
+ // compaction to an empty level would be considered "bottommost".
+ ASSERT_OK(Put(Key(0), "val"));
+ ASSERT_OK(Flush());
+ MoveFilesToLevel(kNumLevels - 1);
+
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"DBImpl::BGWorkBottomCompaction",
+ "DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:"
+ "PreTriggerCompaction"},
+ {"DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:"
+ "PostTriggerCompaction",
+ "BackgroundCallCompaction:0"}});
+ SyncPoint::GetInstance()->EnableProcessing();
- const VersionStorageInfo* const storage_info = current->storage_info();
- assert(storage_info);
+ port::Thread compact_range_thread([&] {
+ CompactRangeOptions cro;
+ cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
+ cro.exclusive_manual_compaction = false;
+ ASSERT_OK(dbfull()->CompactRange(cro, nullptr, nullptr));
+ });
- const auto& l1_files = storage_info->LevelFiles(1);
- ASSERT_TRUE(l1_files.empty());
+ // Sleep in the low-pri thread so any newly scheduled compaction will be
+ // queued. Otherwise it might finish before we check its existence.
+ test::SleepingBackgroundTask sleeping_task_low;
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
+ Env::Priority::LOW);
- const auto& blob_files = storage_info->GetBlobFiles();
- ASSERT_TRUE(blob_files.empty());
+ TEST_SYNC_POINT(
+ "DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:"
+ "PreTriggerCompaction");
+ for (int i = 0; i < kNumL0Files; ++i) {
+ ASSERT_OK(Put(Key(0), "val"));
+ ASSERT_OK(Flush());
+ }
+ ASSERT_EQ(0u, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
+ TEST_SYNC_POINT(
+ "DBCompactionTest::BottomPriCompactionCountsTowardConcurrencyLimit:"
+ "PostTriggerCompaction");
- const InternalStats* const internal_stats = cfd->internal_stats();
- assert(internal_stats);
+ sleeping_task_low.WakeUp();
+ sleeping_task_low.WaitUntilDone();
+ compact_range_thread.join();
+}
- const auto& compaction_stats = internal_stats->TEST_GetCompactionStats();
- ASSERT_GE(compaction_stats.size(), 2);
+TEST_F(DBCompactionTest, BottommostFileCompactionAllowIngestBehind) {
+ // allow_ingest_behind prevents seqnum zeroing, and could cause
+ // compaction loop with reason kBottommostFiles.
+ Options options = CurrentOptions();
+ options.env = env_;
+ options.compaction_style = kCompactionStyleLevel;
+ options.allow_ingest_behind = true;
+ options.comparator = BytewiseComparator();
+ DestroyAndReopen(options);
- if (sync_point_ == "BlobFileBuilder::WriteBlobToFile:AddRecord") {
- ASSERT_EQ(compaction_stats[1].bytes_written, 0);
- ASSERT_EQ(compaction_stats[1].num_output_files, 0);
- } else {
- // SST file writing succeeded; blob file writing failed (during Finish)
- ASSERT_GT(compaction_stats[1].bytes_written, 0);
- ASSERT_EQ(compaction_stats[1].num_output_files, 1);
- }
+ WriteOptions write_opts;
+ ASSERT_OK(db_->Put(write_opts, "infinite", "compaction loop"));
+ ASSERT_OK(db_->Put(write_opts, "infinite", "loop"));
+
+ ASSERT_OK(Flush());
+ MoveFilesToLevel(1);
+ ASSERT_OK(db_->Put(write_opts, "bumpseqnum", ""));
+ ASSERT_OK(Flush());
+ auto snapshot = db_->GetSnapshot();
+ // Bump up oldest_snapshot_seqnum_ in VersionStorageInfo.
+ db_->ReleaseSnapshot(snapshot);
+ bool compacted = false;
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
+ "LevelCompactionPicker::PickCompaction:Return", [&](void* /* arg */) {
+ // There should not be a compaction.
+ compacted = true;
+ });
+ ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
+ // Wait for compaction to be scheduled.
+ env_->SleepForMicroseconds(2000000);
+ ASSERT_FALSE(compacted);
+ // The following assert can be used to check for compaction loop:
+ // it used to wait forever before the fix.
+ // ASSERT_OK(dbfull()->TEST_WaitForCompact(true /* wait_unscheduled */));
}
#endif // !defined(ROCKSDB_LITE)
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
#else
- (void) argc;
- (void) argv;
+ (void)argc;
+ (void)argv;
return 0;
#endif
}