]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/db_compaction_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / db_compaction_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#include "db/db_test_util.h"
11#include "port/stack_trace.h"
12#include "port/port.h"
13#include "rocksdb/experimental.h"
14#include "rocksdb/utilities/convenience.h"
15#include "util/sync_point.h"
16namespace rocksdb {
17
18// SYNC_POINT is not supported in released Windows mode.
19#if !defined(ROCKSDB_LITE)
20
21class DBCompactionTest : public DBTestBase {
22 public:
23 DBCompactionTest() : DBTestBase("/db_compaction_test") {}
24};
25
26class DBCompactionTestWithParam
27 : public DBTestBase,
28 public testing::WithParamInterface<std::tuple<uint32_t, bool>> {
29 public:
30 DBCompactionTestWithParam() : DBTestBase("/db_compaction_test") {
31 max_subcompactions_ = std::get<0>(GetParam());
32 exclusive_manual_compaction_ = std::get<1>(GetParam());
33 }
34
35 // Required if inheriting from testing::WithParamInterface<>
36 static void SetUpTestCase() {}
37 static void TearDownTestCase() {}
38
39 uint32_t max_subcompactions_;
40 bool exclusive_manual_compaction_;
41};
42
43class DBCompactionDirectIOTest : public DBCompactionTest,
44 public ::testing::WithParamInterface<bool> {
45 public:
46 DBCompactionDirectIOTest() : DBCompactionTest() {}
47};
48
49namespace {
50
51class FlushedFileCollector : public EventListener {
52 public:
53 FlushedFileCollector() {}
54 ~FlushedFileCollector() {}
55
11fdf7f2 56 virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
7c673cae
FG
57 std::lock_guard<std::mutex> lock(mutex_);
58 flushed_files_.push_back(info.file_path);
59 }
60
61 std::vector<std::string> GetFlushedFiles() {
62 std::lock_guard<std::mutex> lock(mutex_);
63 std::vector<std::string> result;
64 for (auto fname : flushed_files_) {
65 result.push_back(fname);
66 }
67 return result;
68 }
69
70 void ClearFlushedFiles() { flushed_files_.clear(); }
71
72 private:
73 std::vector<std::string> flushed_files_;
74 std::mutex mutex_;
75};
76
11fdf7f2
TL
77class CompactionStatsCollector : public EventListener {
78public:
79 CompactionStatsCollector()
80 : compaction_completed_(static_cast<int>(CompactionReason::kNumOfReasons)) {
81 for (auto& v : compaction_completed_) {
82 v.store(0);
83 }
84 }
85
86 ~CompactionStatsCollector() {}
87
88 virtual void OnCompactionCompleted(DB* /* db */,
89 const CompactionJobInfo& info) override {
90 int k = static_cast<int>(info.compaction_reason);
91 int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
92 assert(k >= 0 && k < num_of_reasons);
93 compaction_completed_[k]++;
94 }
95
96 virtual void OnExternalFileIngested(DB* /* db */,
97 const ExternalFileIngestionInfo& /* info */) override {
98 int k = static_cast<int>(CompactionReason::kExternalSstIngestion);
99 compaction_completed_[k]++;
100 }
101
102 virtual void OnFlushCompleted(DB* /* db */,
103 const FlushJobInfo& /* info */) override {
104 int k = static_cast<int>(CompactionReason::kFlush);
105 compaction_completed_[k]++;
106 }
107
108 int NumberOfCompactions(CompactionReason reason) const {
109 int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
110 int k = static_cast<int>(reason);
111 assert(k >= 0 && k < num_of_reasons);
112 return compaction_completed_.at(k).load();
113 }
114
115private:
116 std::vector<std::atomic<int>> compaction_completed_;
117};
118
119class SstStatsCollector : public EventListener {
120 public:
121 SstStatsCollector() : num_ssts_creation_started_(0) {}
122
123 void OnTableFileCreationStarted(
124 const TableFileCreationBriefInfo& /* info */) override {
125 ++num_ssts_creation_started_;
126 }
127
128 int num_ssts_creation_started() { return num_ssts_creation_started_; }
129
130 private:
131 std::atomic<int> num_ssts_creation_started_;
132};
133
7c673cae
FG
134static const int kCDTValueSize = 1000;
135static const int kCDTKeysPerBuffer = 4;
136static const int kCDTNumLevels = 8;
137Options DeletionTriggerOptions(Options options) {
138 options.compression = kNoCompression;
139 options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24);
140 options.min_write_buffer_number_to_merge = 1;
141 options.max_write_buffer_number_to_maintain = 0;
142 options.num_levels = kCDTNumLevels;
143 options.level0_file_num_compaction_trigger = 1;
144 options.target_file_size_base = options.write_buffer_size * 2;
145 options.target_file_size_multiplier = 2;
146 options.max_bytes_for_level_base =
147 options.target_file_size_base * options.target_file_size_multiplier;
148 options.max_bytes_for_level_multiplier = 2;
149 options.disable_auto_compactions = false;
150 return options;
151}
152
153bool HaveOverlappingKeyRanges(
154 const Comparator* c,
155 const SstFileMetaData& a, const SstFileMetaData& b) {
156 if (c->Compare(a.smallestkey, b.smallestkey) >= 0) {
157 if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
158 // b.smallestkey <= a.smallestkey <= b.largestkey
159 return true;
160 }
161 } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
162 // a.smallestkey < b.smallestkey <= a.largestkey
163 return true;
164 }
165 if (c->Compare(a.largestkey, b.largestkey) <= 0) {
166 if (c->Compare(a.largestkey, b.smallestkey) >= 0) {
167 // b.smallestkey <= a.largestkey <= b.largestkey
168 return true;
169 }
170 } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) {
171 // a.smallestkey <= b.largestkey < a.largestkey
172 return true;
173 }
174 return false;
175}
176
177// Identifies all files between level "min_level" and "max_level"
178// which has overlapping key range with "input_file_meta".
179void GetOverlappingFileNumbersForLevelCompaction(
180 const ColumnFamilyMetaData& cf_meta,
181 const Comparator* comparator,
182 int min_level, int max_level,
183 const SstFileMetaData* input_file_meta,
184 std::set<std::string>* overlapping_file_names) {
185 std::set<const SstFileMetaData*> overlapping_files;
186 overlapping_files.insert(input_file_meta);
187 for (int m = min_level; m <= max_level; ++m) {
188 for (auto& file : cf_meta.levels[m].files) {
189 for (auto* included_file : overlapping_files) {
190 if (HaveOverlappingKeyRanges(
191 comparator, *included_file, file)) {
192 overlapping_files.insert(&file);
193 overlapping_file_names->insert(file.name);
194 break;
195 }
196 }
197 }
198 }
199}
200
201void VerifyCompactionResult(
202 const ColumnFamilyMetaData& cf_meta,
203 const std::set<std::string>& overlapping_file_numbers) {
204#ifndef NDEBUG
205 for (auto& level : cf_meta.levels) {
206 for (auto& file : level.files) {
207 assert(overlapping_file_numbers.find(file.name) ==
208 overlapping_file_numbers.end());
209 }
210 }
211#endif
212}
213
11fdf7f2
TL
214/*
215 * Verifies compaction stats of cfd are valid.
216 *
217 * For each level of cfd, its compaction stats are valid if
218 * 1) sum(stat.counts) == stat.count, and
219 * 2) stat.counts[i] == collector.NumberOfCompactions(i)
220 */
221void VerifyCompactionStats(ColumnFamilyData& cfd,
222 const CompactionStatsCollector& collector) {
223#ifndef NDEBUG
224 InternalStats* internal_stats_ptr = cfd.internal_stats();
225 ASSERT_TRUE(internal_stats_ptr != nullptr);
226 const std::vector<InternalStats::CompactionStats>& comp_stats =
227 internal_stats_ptr->TEST_GetCompactionStats();
228 const int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons);
229 std::vector<int> counts(num_of_reasons, 0);
230 // Count the number of compactions caused by each CompactionReason across
231 // all levels.
232 for (const auto& stat : comp_stats) {
233 int sum = 0;
234 for (int i = 0; i < num_of_reasons; i++) {
235 counts[i] += stat.counts[i];
236 sum += stat.counts[i];
237 }
238 ASSERT_EQ(sum, stat.count);
239 }
240 // Verify InternalStats bookkeeping matches that of CompactionStatsCollector,
241 // assuming that all compactions complete.
242 for (int i = 0; i < num_of_reasons; i++) {
243 ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)), counts[i]);
244 }
245#endif /* NDEBUG */
246}
247
7c673cae
FG
248const SstFileMetaData* PickFileRandomly(
249 const ColumnFamilyMetaData& cf_meta,
250 Random* rand,
251 int* level = nullptr) {
252 auto file_id = rand->Uniform(static_cast<int>(
253 cf_meta.file_count)) + 1;
254 for (auto& level_meta : cf_meta.levels) {
255 if (file_id <= level_meta.files.size()) {
256 if (level != nullptr) {
257 *level = level_meta.level;
258 }
259 auto result = rand->Uniform(file_id);
260 return &(level_meta.files[result]);
261 }
262 file_id -= static_cast<uint32_t>(level_meta.files.size());
263 }
264 assert(false);
265 return nullptr;
266}
267} // anonymous namespace
268
11fdf7f2 269#ifndef ROCKSDB_VALGRIND_RUN
7c673cae
FG
270// All the TEST_P tests run once with sub_compactions disabled (i.e.
271// options.max_subcompactions = 1) and once with it enabled
272TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) {
273 for (int tid = 0; tid < 3; ++tid) {
274 uint64_t db_size[2];
275 Options options = DeletionTriggerOptions(CurrentOptions());
276 options.max_subcompactions = max_subcompactions_;
277
278 if (tid == 1) {
279 // the following only disable stats update in DB::Open()
280 // and should not affect the result of this test.
281 options.skip_stats_update_on_db_open = true;
282 } else if (tid == 2) {
283 // third pass with universal compaction
284 options.compaction_style = kCompactionStyleUniversal;
285 options.num_levels = 1;
286 }
287
288 DestroyAndReopen(options);
289 Random rnd(301);
290
291 const int kTestSize = kCDTKeysPerBuffer * 1024;
292 std::vector<std::string> values;
293 for (int k = 0; k < kTestSize; ++k) {
294 values.push_back(RandomString(&rnd, kCDTValueSize));
295 ASSERT_OK(Put(Key(k), values[k]));
296 }
297 dbfull()->TEST_WaitForFlushMemTable();
298 dbfull()->TEST_WaitForCompact();
299 db_size[0] = Size(Key(0), Key(kTestSize - 1));
300
301 for (int k = 0; k < kTestSize; ++k) {
302 ASSERT_OK(Delete(Key(k)));
303 }
304 dbfull()->TEST_WaitForFlushMemTable();
305 dbfull()->TEST_WaitForCompact();
306 db_size[1] = Size(Key(0), Key(kTestSize - 1));
307
308 // must have much smaller db size.
309 ASSERT_GT(db_size[0] / 3, db_size[1]);
310 }
311}
11fdf7f2
TL
312#endif // ROCKSDB_VALGRIND_RUN
313
314TEST_P(DBCompactionTestWithParam, CompactionsPreserveDeletes) {
315 // For each options type we test following
316 // - Enable preserve_deletes
317 // - write bunch of keys and deletes
318 // - Set start_seqnum to the beginning; compact; check that keys are present
319 // - rewind start_seqnum way forward; compact; check that keys are gone
320
321 for (int tid = 0; tid < 3; ++tid) {
322 Options options = DeletionTriggerOptions(CurrentOptions());
323 options.max_subcompactions = max_subcompactions_;
324 options.preserve_deletes=true;
325 options.num_levels = 2;
326
327 if (tid == 1) {
328 options.skip_stats_update_on_db_open = true;
329 } else if (tid == 2) {
330 // third pass with universal compaction
331 options.compaction_style = kCompactionStyleUniversal;
332 }
333
334 DestroyAndReopen(options);
335 Random rnd(301);
336 // highlight the default; all deletes should be preserved
337 SetPreserveDeletesSequenceNumber(0);
338
339 const int kTestSize = kCDTKeysPerBuffer;
340 std::vector<std::string> values;
341 for (int k = 0; k < kTestSize; ++k) {
342 values.push_back(RandomString(&rnd, kCDTValueSize));
343 ASSERT_OK(Put(Key(k), values[k]));
344 }
345
346 for (int k = 0; k < kTestSize; ++k) {
347 ASSERT_OK(Delete(Key(k)));
348 }
349 // to ensure we tackle all tombstones
350 CompactRangeOptions cro;
351 cro.change_level = true;
352 cro.target_level = 2;
353 cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
354
355 dbfull()->TEST_WaitForFlushMemTable();
356 dbfull()->CompactRange(cro, nullptr, nullptr);
357
358 // check that normal user iterator doesn't see anything
359 Iterator* db_iter = dbfull()->NewIterator(ReadOptions());
360 int i = 0;
361 for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
362 i++;
363 }
364 ASSERT_EQ(i, 0);
365 delete db_iter;
366
367 // check that iterator that sees internal keys sees tombstones
368 ReadOptions ro;
369 ro.iter_start_seqnum=1;
370 db_iter = dbfull()->NewIterator(ro);
371 i = 0;
372 for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
373 i++;
374 }
375 ASSERT_EQ(i, 4);
376 delete db_iter;
377
378 // now all deletes should be gone
379 SetPreserveDeletesSequenceNumber(100000000);
380 dbfull()->CompactRange(cro, nullptr, nullptr);
381
382 db_iter = dbfull()->NewIterator(ro);
383 i = 0;
384 for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) {
385 i++;
386 }
387 ASSERT_EQ(i, 0);
388 delete db_iter;
389 }
390}
7c673cae
FG
391
392TEST_F(DBCompactionTest, SkipStatsUpdateTest) {
393 // This test verify UpdateAccumulatedStats is not on
394 // if options.skip_stats_update_on_db_open = true
395 // The test will need to be updated if the internal behavior changes.
396
397 Options options = DeletionTriggerOptions(CurrentOptions());
398 options.env = env_;
399 DestroyAndReopen(options);
400 Random rnd(301);
401
402 const int kTestSize = kCDTKeysPerBuffer * 512;
403 std::vector<std::string> values;
404 for (int k = 0; k < kTestSize; ++k) {
405 values.push_back(RandomString(&rnd, kCDTValueSize));
406 ASSERT_OK(Put(Key(k), values[k]));
407 }
408 dbfull()->TEST_WaitForFlushMemTable();
409 dbfull()->TEST_WaitForCompact();
410
411 // Reopen the DB with stats-update disabled
412 options.skip_stats_update_on_db_open = true;
413 env_->random_file_open_counter_.store(0);
414 Reopen(options);
415
416 // As stats-update is disabled, we expect a very low number of
417 // random file open.
418 // Note that this number must be changed accordingly if we change
419 // the number of files needed to be opened in the DB::Open process.
420 const int kMaxFileOpenCount = 10;
421 ASSERT_LT(env_->random_file_open_counter_.load(), kMaxFileOpenCount);
422
423 // Repeat the reopen process, but this time we enable
424 // stats-update.
425 options.skip_stats_update_on_db_open = false;
426 env_->random_file_open_counter_.store(0);
427 Reopen(options);
428
429 // Since we do a normal stats update on db-open, there
430 // will be more random open files.
431 ASSERT_GT(env_->random_file_open_counter_.load(), kMaxFileOpenCount);
432}
433
434TEST_F(DBCompactionTest, TestTableReaderForCompaction) {
435 Options options = CurrentOptions();
436 options.env = env_;
437 options.new_table_reader_for_compaction_inputs = true;
438 options.max_open_files = 100;
439 options.level0_file_num_compaction_trigger = 3;
440 DestroyAndReopen(options);
441 Random rnd(301);
442
443 int num_table_cache_lookup = 0;
444 int num_new_table_reader = 0;
445 rocksdb::SyncPoint::GetInstance()->SetCallBack(
446 "TableCache::FindTable:0", [&](void* arg) {
447 assert(arg != nullptr);
448 bool no_io = *(reinterpret_cast<bool*>(arg));
449 if (!no_io) {
450 // filter out cases for table properties queries.
451 num_table_cache_lookup++;
452 }
453 });
454 rocksdb::SyncPoint::GetInstance()->SetCallBack(
455 "TableCache::GetTableReader:0",
11fdf7f2 456 [&](void* /*arg*/) { num_new_table_reader++; });
7c673cae
FG
457 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
458
459 for (int k = 0; k < options.level0_file_num_compaction_trigger; ++k) {
460 ASSERT_OK(Put(Key(k), Key(k)));
461 ASSERT_OK(Put(Key(10 - k), "bar"));
462 if (k < options.level0_file_num_compaction_trigger - 1) {
463 num_table_cache_lookup = 0;
464 Flush();
465 dbfull()->TEST_WaitForCompact();
466 // preloading iterator issues one table cache lookup and create
467 // a new table reader.
468 ASSERT_EQ(num_table_cache_lookup, 1);
469 ASSERT_EQ(num_new_table_reader, 1);
470
471 num_table_cache_lookup = 0;
472 num_new_table_reader = 0;
473 ASSERT_EQ(Key(k), Get(Key(k)));
474 // lookup iterator from table cache and no need to create a new one.
475 ASSERT_EQ(num_table_cache_lookup, 1);
476 ASSERT_EQ(num_new_table_reader, 0);
477 }
478 }
479
480 num_table_cache_lookup = 0;
481 num_new_table_reader = 0;
482 Flush();
483 dbfull()->TEST_WaitForCompact();
484 // Preloading iterator issues one table cache lookup and creates
485 // a new table reader. One file is created for flush and one for compaction.
486 // Compaction inputs make no table cache look-up for data/range deletion
487 // iterators
488 ASSERT_EQ(num_table_cache_lookup, 2);
489 // Create new iterator for:
490 // (1) 1 for verifying flush results
491 // (2) 3 for compaction input files
492 // (3) 1 for verifying compaction results.
493 ASSERT_EQ(num_new_table_reader, 5);
494
495 num_table_cache_lookup = 0;
496 num_new_table_reader = 0;
497 ASSERT_EQ(Key(1), Get(Key(1)));
498 ASSERT_EQ(num_table_cache_lookup, 1);
499 ASSERT_EQ(num_new_table_reader, 0);
500
501 num_table_cache_lookup = 0;
502 num_new_table_reader = 0;
503 CompactRangeOptions cro;
504 cro.change_level = true;
505 cro.target_level = 2;
506 cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
507 db_->CompactRange(cro, nullptr, nullptr);
508 // Only verifying compaction outputs issues one table cache lookup
509 // for both data block and range deletion block).
510 ASSERT_EQ(num_table_cache_lookup, 1);
511 // One for compaction input, one for verifying compaction results.
512 ASSERT_EQ(num_new_table_reader, 2);
513
514 num_table_cache_lookup = 0;
515 num_new_table_reader = 0;
516 ASSERT_EQ(Key(1), Get(Key(1)));
517 ASSERT_EQ(num_table_cache_lookup, 1);
518 ASSERT_EQ(num_new_table_reader, 0);
519
520 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
521}
522
523TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) {
524 for (int tid = 0; tid < 2; ++tid) {
525 uint64_t db_size[3];
526 Options options = DeletionTriggerOptions(CurrentOptions());
527 options.max_subcompactions = max_subcompactions_;
528
529 if (tid == 1) {
530 // second pass with universal compaction
531 options.compaction_style = kCompactionStyleUniversal;
532 options.num_levels = 1;
533 }
534
535 DestroyAndReopen(options);
536 Random rnd(301);
537
538 // round 1 --- insert key/value pairs.
539 const int kTestSize = kCDTKeysPerBuffer * 512;
540 std::vector<std::string> values;
541 for (int k = 0; k < kTestSize; ++k) {
542 values.push_back(RandomString(&rnd, kCDTValueSize));
543 ASSERT_OK(Put(Key(k), values[k]));
544 }
545 dbfull()->TEST_WaitForFlushMemTable();
546 dbfull()->TEST_WaitForCompact();
547 db_size[0] = Size(Key(0), Key(kTestSize - 1));
548 Close();
549
550 // round 2 --- disable auto-compactions and issue deletions.
551 options.create_if_missing = false;
552 options.disable_auto_compactions = true;
553 Reopen(options);
554
555 for (int k = 0; k < kTestSize; ++k) {
556 ASSERT_OK(Delete(Key(k)));
557 }
558 db_size[1] = Size(Key(0), Key(kTestSize - 1));
559 Close();
560 // as auto_compaction is off, we shouldn't see too much reduce
561 // in db size.
562 ASSERT_LT(db_size[0] / 3, db_size[1]);
563
564 // round 3 --- reopen db with auto_compaction on and see if
565 // deletion compensation still work.
566 options.disable_auto_compactions = false;
567 Reopen(options);
568 // insert relatively small amount of data to trigger auto compaction.
569 for (int k = 0; k < kTestSize / 10; ++k) {
570 ASSERT_OK(Put(Key(k), values[k]));
571 }
572 dbfull()->TEST_WaitForFlushMemTable();
573 dbfull()->TEST_WaitForCompact();
574 db_size[2] = Size(Key(0), Key(kTestSize - 1));
575 // this time we're expecting significant drop in size.
576 ASSERT_GT(db_size[0] / 3, db_size[2]);
577 }
578}
579
580TEST_F(DBCompactionTest, DisableStatsUpdateReopen) {
581 uint64_t db_size[3];
582 for (int test = 0; test < 2; ++test) {
583 Options options = DeletionTriggerOptions(CurrentOptions());
584 options.skip_stats_update_on_db_open = (test == 0);
585
586 env_->random_read_counter_.Reset();
587 DestroyAndReopen(options);
588 Random rnd(301);
589
590 // round 1 --- insert key/value pairs.
591 const int kTestSize = kCDTKeysPerBuffer * 512;
592 std::vector<std::string> values;
593 for (int k = 0; k < kTestSize; ++k) {
594 values.push_back(RandomString(&rnd, kCDTValueSize));
595 ASSERT_OK(Put(Key(k), values[k]));
596 }
597 dbfull()->TEST_WaitForFlushMemTable();
598 dbfull()->TEST_WaitForCompact();
599 db_size[0] = Size(Key(0), Key(kTestSize - 1));
600 Close();
601
602 // round 2 --- disable auto-compactions and issue deletions.
603 options.create_if_missing = false;
604 options.disable_auto_compactions = true;
605
606 env_->random_read_counter_.Reset();
607 Reopen(options);
608
609 for (int k = 0; k < kTestSize; ++k) {
610 ASSERT_OK(Delete(Key(k)));
611 }
612 db_size[1] = Size(Key(0), Key(kTestSize - 1));
613 Close();
614 // as auto_compaction is off, we shouldn't see too much reduce
615 // in db size.
616 ASSERT_LT(db_size[0] / 3, db_size[1]);
617
618 // round 3 --- reopen db with auto_compaction on and see if
619 // deletion compensation still work.
620 options.disable_auto_compactions = false;
621 Reopen(options);
622 dbfull()->TEST_WaitForFlushMemTable();
623 dbfull()->TEST_WaitForCompact();
624 db_size[2] = Size(Key(0), Key(kTestSize - 1));
625
626 if (options.skip_stats_update_on_db_open) {
627 // If update stats on DB::Open is disable, we don't expect
628 // deletion entries taking effect.
629 ASSERT_LT(db_size[0] / 3, db_size[2]);
630 } else {
631 // Otherwise, we should see a significant drop in db size.
632 ASSERT_GT(db_size[0] / 3, db_size[2]);
633 }
634 }
635}
636
637
638TEST_P(DBCompactionTestWithParam, CompactionTrigger) {
639 const int kNumKeysPerFile = 100;
640
641 Options options = CurrentOptions();
642 options.write_buffer_size = 110 << 10; // 110KB
643 options.arena_block_size = 4 << 10;
644 options.num_levels = 3;
645 options.level0_file_num_compaction_trigger = 3;
646 options.max_subcompactions = max_subcompactions_;
647 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
648 CreateAndReopenWithCF({"pikachu"}, options);
649
650 Random rnd(301);
651
652 for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
653 num++) {
654 std::vector<std::string> values;
655 // Write 100KB (100 values, each 1K)
656 for (int i = 0; i < kNumKeysPerFile; i++) {
657 values.push_back(RandomString(&rnd, 990));
658 ASSERT_OK(Put(1, Key(i), values[i]));
659 }
660 // put extra key to trigger flush
661 ASSERT_OK(Put(1, "", ""));
662 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
663 ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1);
664 }
665
666 // generate one more file in level-0, and should trigger level-0 compaction
667 std::vector<std::string> values;
668 for (int i = 0; i < kNumKeysPerFile; i++) {
669 values.push_back(RandomString(&rnd, 990));
670 ASSERT_OK(Put(1, Key(i), values[i]));
671 }
672 // put extra key to trigger flush
673 ASSERT_OK(Put(1, "", ""));
674 dbfull()->TEST_WaitForCompact();
675
676 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
677 ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1);
678}
679
680TEST_F(DBCompactionTest, BGCompactionsAllowed) {
681 // Create several column families. Make compaction triggers in all of them
682 // and see number of compactions scheduled to be less than allowed.
683 const int kNumKeysPerFile = 100;
684
685 Options options = CurrentOptions();
686 options.write_buffer_size = 110 << 10; // 110KB
687 options.arena_block_size = 4 << 10;
688 options.num_levels = 3;
689 // Should speed up compaction when there are 4 files.
690 options.level0_file_num_compaction_trigger = 2;
691 options.level0_slowdown_writes_trigger = 20;
692 options.soft_pending_compaction_bytes_limit = 1 << 30; // Infinitely large
7c673cae
FG
693 options.max_background_compactions = 3;
694 options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
695
696 // Block all threads in thread pool.
697 const size_t kTotalTasks = 4;
698 env_->SetBackgroundThreads(4, Env::LOW);
699 test::SleepingBackgroundTask sleeping_tasks[kTotalTasks];
700 for (size_t i = 0; i < kTotalTasks; i++) {
701 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
702 &sleeping_tasks[i], Env::Priority::LOW);
703 sleeping_tasks[i].WaitUntilSleeping();
704 }
705
706 CreateAndReopenWithCF({"one", "two", "three"}, options);
707
708 Random rnd(301);
709 for (int cf = 0; cf < 4; cf++) {
710 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
711 for (int i = 0; i < kNumKeysPerFile; i++) {
712 ASSERT_OK(Put(cf, Key(i), ""));
713 }
714 // put extra key to trigger flush
715 ASSERT_OK(Put(cf, "", ""));
716 dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
717 ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
718 }
719 }
720
721 // Now all column families qualify compaction but only one should be
722 // scheduled, because no column family hits speed up condition.
723 ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
724
725 // Create two more files for one column family, which triggers speed up
726 // condition, three compactions will be scheduled.
727 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
728 for (int i = 0; i < kNumKeysPerFile; i++) {
729 ASSERT_OK(Put(2, Key(i), ""));
730 }
731 // put extra key to trigger flush
732 ASSERT_OK(Put(2, "", ""));
733 dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
734 ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1,
735 NumTableFilesAtLevel(0, 2));
736 }
737 ASSERT_EQ(3, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
738
739 // Unblock all threads to unblock all compactions.
740 for (size_t i = 0; i < kTotalTasks; i++) {
741 sleeping_tasks[i].WakeUp();
742 sleeping_tasks[i].WaitUntilDone();
743 }
744 dbfull()->TEST_WaitForCompact();
745
746 // Verify number of compactions allowed will come back to 1.
747
748 for (size_t i = 0; i < kTotalTasks; i++) {
749 sleeping_tasks[i].Reset();
750 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
751 &sleeping_tasks[i], Env::Priority::LOW);
752 sleeping_tasks[i].WaitUntilSleeping();
753 }
754 for (int cf = 0; cf < 4; cf++) {
755 for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) {
756 for (int i = 0; i < kNumKeysPerFile; i++) {
757 ASSERT_OK(Put(cf, Key(i), ""));
758 }
759 // put extra key to trigger flush
760 ASSERT_OK(Put(cf, "", ""));
761 dbfull()->TEST_WaitForFlushMemTable(handles_[cf]);
762 ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1);
763 }
764 }
765
766 // Now all column families qualify compaction but only one should be
767 // scheduled, because no column family hits speed up condition.
768 ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
769
770 for (size_t i = 0; i < kTotalTasks; i++) {
771 sleeping_tasks[i].WakeUp();
772 sleeping_tasks[i].WaitUntilDone();
773 }
774}
775
776TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) {
777 Options options = CurrentOptions();
778 options.write_buffer_size = 100000000; // Large write buffer
779 options.max_subcompactions = max_subcompactions_;
780 CreateAndReopenWithCF({"pikachu"}, options);
781
782 Random rnd(301);
783
784 // Write 8MB (80 values, each 100K)
785 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
786 std::vector<std::string> values;
787 for (int i = 0; i < 80; i++) {
788 values.push_back(RandomString(&rnd, 100000));
789 ASSERT_OK(Put(1, Key(i), values[i]));
790 }
791
792 // Reopening moves updates to level-0
793 ReopenWithColumnFamilies({"default", "pikachu"}, options);
794 dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1],
795 true /* disallow trivial move */);
796
797 ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0);
798 ASSERT_GT(NumTableFilesAtLevel(1, 1), 1);
799 for (int i = 0; i < 80; i++) {
800 ASSERT_EQ(Get(1, Key(i)), values[i]);
801 }
802}
803
804TEST_F(DBCompactionTest, MinorCompactionsHappen) {
805 do {
806 Options options = CurrentOptions();
807 options.write_buffer_size = 10000;
808 CreateAndReopenWithCF({"pikachu"}, options);
809
810 const int N = 500;
811
812 int starting_num_tables = TotalTableFiles(1);
813 for (int i = 0; i < N; i++) {
814 ASSERT_OK(Put(1, Key(i), Key(i) + std::string(1000, 'v')));
815 }
816 int ending_num_tables = TotalTableFiles(1);
817 ASSERT_GT(ending_num_tables, starting_num_tables);
818
819 for (int i = 0; i < N; i++) {
820 ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(1, Key(i)));
821 }
822
823 ReopenWithColumnFamilies({"default", "pikachu"}, options);
824
825 for (int i = 0; i < N; i++) {
826 ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(1, Key(i)));
827 }
828 } while (ChangeCompactOptions());
829}
830
831TEST_F(DBCompactionTest, UserKeyCrossFile1) {
832 Options options = CurrentOptions();
833 options.compaction_style = kCompactionStyleLevel;
834 options.level0_file_num_compaction_trigger = 3;
835
836 DestroyAndReopen(options);
837
838 // create first file and flush to l0
839 Put("4", "A");
840 Put("3", "A");
841 Flush();
842 dbfull()->TEST_WaitForFlushMemTable();
843
844 Put("2", "A");
845 Delete("3");
846 Flush();
847 dbfull()->TEST_WaitForFlushMemTable();
848 ASSERT_EQ("NOT_FOUND", Get("3"));
849
850 // move both files down to l1
851 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
852 ASSERT_EQ("NOT_FOUND", Get("3"));
853
854 for (int i = 0; i < 3; i++) {
855 Put("2", "B");
856 Flush();
857 dbfull()->TEST_WaitForFlushMemTable();
858 }
859 dbfull()->TEST_WaitForCompact();
860
861 ASSERT_EQ("NOT_FOUND", Get("3"));
862}
863
864TEST_F(DBCompactionTest, UserKeyCrossFile2) {
865 Options options = CurrentOptions();
866 options.compaction_style = kCompactionStyleLevel;
867 options.level0_file_num_compaction_trigger = 3;
868
869 DestroyAndReopen(options);
870
871 // create first file and flush to l0
872 Put("4", "A");
873 Put("3", "A");
874 Flush();
875 dbfull()->TEST_WaitForFlushMemTable();
876
877 Put("2", "A");
878 SingleDelete("3");
879 Flush();
880 dbfull()->TEST_WaitForFlushMemTable();
881 ASSERT_EQ("NOT_FOUND", Get("3"));
882
883 // move both files down to l1
884 dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
885 ASSERT_EQ("NOT_FOUND", Get("3"));
886
887 for (int i = 0; i < 3; i++) {
888 Put("2", "B");
889 Flush();
890 dbfull()->TEST_WaitForFlushMemTable();
891 }
892 dbfull()->TEST_WaitForCompact();
893
894 ASSERT_EQ("NOT_FOUND", Get("3"));
895}
896
897TEST_F(DBCompactionTest, ZeroSeqIdCompaction) {
898 Options options = CurrentOptions();
899 options.compaction_style = kCompactionStyleLevel;
900 options.level0_file_num_compaction_trigger = 3;
901
902 FlushedFileCollector* collector = new FlushedFileCollector();
903 options.listeners.emplace_back(collector);
904
905 // compaction options
906 CompactionOptions compact_opt;
907 compact_opt.compression = kNoCompression;
908 compact_opt.output_file_size_limit = 4096;
909 const size_t key_len =
910 static_cast<size_t>(compact_opt.output_file_size_limit) / 5;
911
912 DestroyAndReopen(options);
913
914 std::vector<const Snapshot*> snaps;
915
916 // create first file and flush to l0
917 for (auto& key : {"1", "2", "3", "3", "3", "3"}) {
918 Put(key, std::string(key_len, 'A'));
919 snaps.push_back(dbfull()->GetSnapshot());
920 }
921 Flush();
922 dbfull()->TEST_WaitForFlushMemTable();
923
924 // create second file and flush to l0
925 for (auto& key : {"3", "4", "5", "6", "7", "8"}) {
926 Put(key, std::string(key_len, 'A'));
927 snaps.push_back(dbfull()->GetSnapshot());
928 }
929 Flush();
930 dbfull()->TEST_WaitForFlushMemTable();
931
932 // move both files down to l1
933 dbfull()->CompactFiles(compact_opt, collector->GetFlushedFiles(), 1);
934
935 // release snap so that first instance of key(3) can have seqId=0
936 for (auto snap : snaps) {
937 dbfull()->ReleaseSnapshot(snap);
938 }
939
940 // create 3 files in l0 so to trigger compaction
941 for (int i = 0; i < options.level0_file_num_compaction_trigger; i++) {
942 Put("2", std::string(1, 'A'));
943 Flush();
944 dbfull()->TEST_WaitForFlushMemTable();
945 }
946
947 dbfull()->TEST_WaitForCompact();
948 ASSERT_OK(Put("", ""));
949}
950
11fdf7f2
TL
951TEST_F(DBCompactionTest, ManualCompactionUnknownOutputSize) {
952 // github issue #2249
953 Options options = CurrentOptions();
954 options.compaction_style = kCompactionStyleLevel;
955 options.level0_file_num_compaction_trigger = 3;
956 DestroyAndReopen(options);
957
958 // create two files in l1 that we can compact
959 for (int i = 0; i < 2; ++i) {
960 for (int j = 0; j < options.level0_file_num_compaction_trigger; j++) {
961 // make l0 files' ranges overlap to avoid trivial move
962 Put(std::to_string(2 * i), std::string(1, 'A'));
963 Put(std::to_string(2 * i + 1), std::string(1, 'A'));
964 Flush();
965 dbfull()->TEST_WaitForFlushMemTable();
966 }
967 dbfull()->TEST_WaitForCompact();
968 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
969 ASSERT_EQ(NumTableFilesAtLevel(1, 0), i + 1);
970 }
971
972 ColumnFamilyMetaData cf_meta;
973 dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
974 ASSERT_EQ(2, cf_meta.levels[1].files.size());
975 std::vector<std::string> input_filenames;
976 for (const auto& sst_file : cf_meta.levels[1].files) {
977 input_filenames.push_back(sst_file.name);
978 }
979
980 // note CompactionOptions::output_file_size_limit is unset.
981 CompactionOptions compact_opt;
982 compact_opt.compression = kNoCompression;
983 dbfull()->CompactFiles(compact_opt, input_filenames, 1);
984}
985
7c673cae
FG
986// Check that writes done during a memtable compaction are recovered
987// if the database is shutdown during the memtable compaction.
988TEST_F(DBCompactionTest, RecoverDuringMemtableCompaction) {
989 do {
990 Options options = CurrentOptions();
991 options.env = env_;
992 CreateAndReopenWithCF({"pikachu"}, options);
993
994 // Trigger a long memtable compaction and reopen the database during it
995 ASSERT_OK(Put(1, "foo", "v1")); // Goes to 1st log file
996 ASSERT_OK(Put(1, "big1", std::string(10000000, 'x'))); // Fills memtable
997 ASSERT_OK(Put(1, "big2", std::string(1000, 'y'))); // Triggers compaction
998 ASSERT_OK(Put(1, "bar", "v2")); // Goes to new log file
999
1000 ReopenWithColumnFamilies({"default", "pikachu"}, options);
1001 ASSERT_EQ("v1", Get(1, "foo"));
1002 ASSERT_EQ("v2", Get(1, "bar"));
1003 ASSERT_EQ(std::string(10000000, 'x'), Get(1, "big1"));
1004 ASSERT_EQ(std::string(1000, 'y'), Get(1, "big2"));
1005 } while (ChangeOptions());
1006}
1007
1008TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) {
1009 int32_t trivial_move = 0;
1010 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1011 "DBImpl::BackgroundCompaction:TrivialMove",
11fdf7f2 1012 [&](void* /*arg*/) { trivial_move++; });
7c673cae
FG
1013 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1014
1015 Options options = CurrentOptions();
1016 options.write_buffer_size = 100000000;
1017 options.max_subcompactions = max_subcompactions_;
1018 DestroyAndReopen(options);
1019
1020 int32_t num_keys = 80;
1021 int32_t value_size = 100 * 1024; // 100 KB
1022
1023 Random rnd(301);
1024 std::vector<std::string> values;
1025 for (int i = 0; i < num_keys; i++) {
1026 values.push_back(RandomString(&rnd, value_size));
1027 ASSERT_OK(Put(Key(i), values[i]));
1028 }
1029
1030 // Reopening moves updates to L0
1031 Reopen(options);
1032 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 1); // 1 file in L0
1033 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // 0 files in L1
1034
1035 std::vector<LiveFileMetaData> metadata;
1036 db_->GetLiveFilesMetaData(&metadata);
1037 ASSERT_EQ(metadata.size(), 1U);
1038 LiveFileMetaData level0_file = metadata[0]; // L0 file meta
1039
1040 CompactRangeOptions cro;
1041 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1042
1043 // Compaction will initiate a trivial move from L0 to L1
1044 dbfull()->CompactRange(cro, nullptr, nullptr);
1045
1046 // File moved From L0 to L1
1047 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0
1048 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 1); // 1 file in L1
1049
1050 metadata.clear();
1051 db_->GetLiveFilesMetaData(&metadata);
1052 ASSERT_EQ(metadata.size(), 1U);
1053 ASSERT_EQ(metadata[0].name /* level1_file.name */, level0_file.name);
1054 ASSERT_EQ(metadata[0].size /* level1_file.size */, level0_file.size);
1055
1056 for (int i = 0; i < num_keys; i++) {
1057 ASSERT_EQ(Get(Key(i)), values[i]);
1058 }
1059
1060 ASSERT_EQ(trivial_move, 1);
1061 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1062}
1063
1064TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) {
1065 int32_t trivial_move = 0;
1066 int32_t non_trivial_move = 0;
1067 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1068 "DBImpl::BackgroundCompaction:TrivialMove",
11fdf7f2 1069 [&](void* /*arg*/) { trivial_move++; });
7c673cae
FG
1070 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1071 "DBImpl::BackgroundCompaction:NonTrivial",
11fdf7f2 1072 [&](void* /*arg*/) { non_trivial_move++; });
7c673cae
FG
1073 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1074
1075 Options options = CurrentOptions();
1076 options.disable_auto_compactions = true;
1077 options.write_buffer_size = 10 * 1024 * 1024;
1078 options.max_subcompactions = max_subcompactions_;
1079
1080 DestroyAndReopen(options);
1081 // non overlapping ranges
1082 std::vector<std::pair<int32_t, int32_t>> ranges = {
1083 {100, 199},
1084 {300, 399},
1085 {0, 99},
1086 {200, 299},
1087 {600, 699},
1088 {400, 499},
1089 {500, 550},
1090 {551, 599},
1091 };
1092 int32_t value_size = 10 * 1024; // 10 KB
1093
1094 Random rnd(301);
1095 std::map<int32_t, std::string> values;
1096 for (size_t i = 0; i < ranges.size(); i++) {
1097 for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1098 values[j] = RandomString(&rnd, value_size);
1099 ASSERT_OK(Put(Key(j), values[j]));
1100 }
1101 ASSERT_OK(Flush());
1102 }
1103
1104 int32_t level0_files = NumTableFilesAtLevel(0, 0);
1105 ASSERT_EQ(level0_files, ranges.size()); // Multiple files in L0
1106 ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1
1107
1108 CompactRangeOptions cro;
1109 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1110
1111 // Since data is non-overlapping we expect compaction to initiate
1112 // a trivial move
1113 db_->CompactRange(cro, nullptr, nullptr);
1114 // We expect that all the files were trivially moved from L0 to L1
1115 ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0);
1116 ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files);
1117
1118 for (size_t i = 0; i < ranges.size(); i++) {
1119 for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1120 ASSERT_EQ(Get(Key(j)), values[j]);
1121 }
1122 }
1123
1124 ASSERT_EQ(trivial_move, 1);
1125 ASSERT_EQ(non_trivial_move, 0);
1126
1127 trivial_move = 0;
1128 non_trivial_move = 0;
1129 values.clear();
1130 DestroyAndReopen(options);
1131 // Same ranges as above but overlapping
1132 ranges = {
1133 {100, 199},
1134 {300, 399},
1135 {0, 99},
1136 {200, 299},
1137 {600, 699},
1138 {400, 499},
1139 {500, 560}, // this range overlap with the next one
1140 {551, 599},
1141 };
1142 for (size_t i = 0; i < ranges.size(); i++) {
1143 for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1144 values[j] = RandomString(&rnd, value_size);
1145 ASSERT_OK(Put(Key(j), values[j]));
1146 }
1147 ASSERT_OK(Flush());
1148 }
1149
1150 db_->CompactRange(cro, nullptr, nullptr);
1151
1152 for (size_t i = 0; i < ranges.size(); i++) {
1153 for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) {
1154 ASSERT_EQ(Get(Key(j)), values[j]);
1155 }
1156 }
1157 ASSERT_EQ(trivial_move, 0);
1158 ASSERT_EQ(non_trivial_move, 1);
1159
1160 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1161}
1162
1163TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) {
1164 int32_t trivial_move = 0;
1165 int32_t non_trivial_move = 0;
1166 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1167 "DBImpl::BackgroundCompaction:TrivialMove",
11fdf7f2 1168 [&](void* /*arg*/) { trivial_move++; });
7c673cae
FG
1169 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1170 "DBImpl::BackgroundCompaction:NonTrivial",
11fdf7f2 1171 [&](void* /*arg*/) { non_trivial_move++; });
7c673cae
FG
1172 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1173
1174 Options options = CurrentOptions();
1175 options.disable_auto_compactions = true;
1176 options.write_buffer_size = 10 * 1024 * 1024;
1177 options.num_levels = 7;
1178 options.max_subcompactions = max_subcompactions_;
1179
1180 DestroyAndReopen(options);
1181 int32_t value_size = 10 * 1024; // 10 KB
1182
1183 // Add 2 non-overlapping files
1184 Random rnd(301);
1185 std::map<int32_t, std::string> values;
1186
1187 // file 1 [0 => 300]
1188 for (int32_t i = 0; i <= 300; i++) {
1189 values[i] = RandomString(&rnd, value_size);
1190 ASSERT_OK(Put(Key(i), values[i]));
1191 }
1192 ASSERT_OK(Flush());
1193
1194 // file 2 [600 => 700]
1195 for (int32_t i = 600; i <= 700; i++) {
1196 values[i] = RandomString(&rnd, value_size);
1197 ASSERT_OK(Put(Key(i), values[i]));
1198 }
1199 ASSERT_OK(Flush());
1200
1201 // 2 files in L0
1202 ASSERT_EQ("2", FilesPerLevel(0));
1203 CompactRangeOptions compact_options;
1204 compact_options.change_level = true;
1205 compact_options.target_level = 6;
1206 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1207 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1208 // 2 files in L6
1209 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
1210
1211 ASSERT_EQ(trivial_move, 1);
1212 ASSERT_EQ(non_trivial_move, 0);
1213
1214 for (int32_t i = 0; i <= 300; i++) {
1215 ASSERT_EQ(Get(Key(i)), values[i]);
1216 }
1217 for (int32_t i = 600; i <= 700; i++) {
1218 ASSERT_EQ(Get(Key(i)), values[i]);
1219 }
1220}
1221
1222TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) {
1223 int32_t trivial_move = 0;
1224 int32_t non_trivial_move = 0;
1225 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1226 "DBImpl::BackgroundCompaction:TrivialMove",
11fdf7f2 1227 [&](void* /*arg*/) { trivial_move++; });
7c673cae
FG
1228 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1229 "DBImpl::BackgroundCompaction:NonTrivial",
11fdf7f2 1230 [&](void* /*arg*/) { non_trivial_move++; });
7c673cae
FG
1231 bool first = true;
1232 // Purpose of dependencies:
1233 // 4 -> 1: ensure the order of two non-trivial compactions
1234 // 5 -> 2 and 5 -> 3: ensure we do a check before two non-trivial compactions
1235 // are installed
1236 rocksdb::SyncPoint::GetInstance()->LoadDependency(
1237 {{"DBCompaction::ManualPartial:4", "DBCompaction::ManualPartial:1"},
1238 {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:2"},
1239 {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:3"}});
1240 rocksdb::SyncPoint::GetInstance()->SetCallBack(
11fdf7f2 1241 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
7c673cae
FG
1242 if (first) {
1243 first = false;
1244 TEST_SYNC_POINT("DBCompaction::ManualPartial:4");
1245 TEST_SYNC_POINT("DBCompaction::ManualPartial:3");
1246 } else { // second non-trivial compaction
1247 TEST_SYNC_POINT("DBCompaction::ManualPartial:2");
1248 }
1249 });
1250
1251 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1252
1253 Options options = CurrentOptions();
1254 options.write_buffer_size = 10 * 1024 * 1024;
1255 options.num_levels = 7;
1256 options.max_subcompactions = max_subcompactions_;
1257 options.level0_file_num_compaction_trigger = 3;
1258 options.max_background_compactions = 3;
1259 options.target_file_size_base = 1 << 23; // 8 MB
1260
1261 DestroyAndReopen(options);
1262 int32_t value_size = 10 * 1024; // 10 KB
1263
1264 // Add 2 non-overlapping files
1265 Random rnd(301);
1266 std::map<int32_t, std::string> values;
1267
1268 // file 1 [0 => 100]
1269 for (int32_t i = 0; i < 100; i++) {
1270 values[i] = RandomString(&rnd, value_size);
1271 ASSERT_OK(Put(Key(i), values[i]));
1272 }
1273 ASSERT_OK(Flush());
1274
1275 // file 2 [100 => 300]
1276 for (int32_t i = 100; i < 300; i++) {
1277 values[i] = RandomString(&rnd, value_size);
1278 ASSERT_OK(Put(Key(i), values[i]));
1279 }
1280 ASSERT_OK(Flush());
1281
1282 // 2 files in L0
1283 ASSERT_EQ("2", FilesPerLevel(0));
1284 CompactRangeOptions compact_options;
1285 compact_options.change_level = true;
1286 compact_options.target_level = 6;
1287 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1288 // Trivial move the two non-overlapping files to level 6
1289 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1290 // 2 files in L6
1291 ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0));
1292
1293 ASSERT_EQ(trivial_move, 1);
1294 ASSERT_EQ(non_trivial_move, 0);
1295
1296 // file 3 [ 0 => 200]
1297 for (int32_t i = 0; i < 200; i++) {
1298 values[i] = RandomString(&rnd, value_size);
1299 ASSERT_OK(Put(Key(i), values[i]));
1300 }
1301 ASSERT_OK(Flush());
1302
1303 // 1 files in L0
1304 ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel(0));
1305 ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false));
1306 ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr, false));
1307 ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr, nullptr, false));
1308 ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr, false));
1309 ASSERT_OK(dbfull()->TEST_CompactRange(4, nullptr, nullptr, nullptr, false));
1310 // 2 files in L6, 1 file in L5
1311 ASSERT_EQ("0,0,0,0,0,1,2", FilesPerLevel(0));
1312
1313 ASSERT_EQ(trivial_move, 6);
1314 ASSERT_EQ(non_trivial_move, 0);
1315
1316 rocksdb::port::Thread threads([&] {
1317 compact_options.change_level = false;
1318 compact_options.exclusive_manual_compaction = false;
1319 std::string begin_string = Key(0);
1320 std::string end_string = Key(199);
1321 Slice begin(begin_string);
1322 Slice end(end_string);
1323 // First non-trivial compaction is triggered
1324 ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
1325 });
1326
1327 TEST_SYNC_POINT("DBCompaction::ManualPartial:1");
1328 // file 4 [300 => 400)
1329 for (int32_t i = 300; i <= 400; i++) {
1330 values[i] = RandomString(&rnd, value_size);
1331 ASSERT_OK(Put(Key(i), values[i]));
1332 }
1333 ASSERT_OK(Flush());
1334
1335 // file 5 [400 => 500)
1336 for (int32_t i = 400; i <= 500; i++) {
1337 values[i] = RandomString(&rnd, value_size);
1338 ASSERT_OK(Put(Key(i), values[i]));
1339 }
1340 ASSERT_OK(Flush());
1341
1342 // file 6 [500 => 600)
1343 for (int32_t i = 500; i <= 600; i++) {
1344 values[i] = RandomString(&rnd, value_size);
1345 ASSERT_OK(Put(Key(i), values[i]));
1346 }
1347 // Second non-trivial compaction is triggered
1348 ASSERT_OK(Flush());
1349
1350 // Before two non-trivial compactions are installed, there are 3 files in L0
1351 ASSERT_EQ("3,0,0,0,0,1,2", FilesPerLevel(0));
1352 TEST_SYNC_POINT("DBCompaction::ManualPartial:5");
1353
1354 dbfull()->TEST_WaitForFlushMemTable();
1355 dbfull()->TEST_WaitForCompact();
1356 // After two non-trivial compactions are installed, there is 1 file in L6, and
1357 // 1 file in L1
1358 ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel(0));
1359 threads.join();
1360
1361 for (int32_t i = 0; i < 600; i++) {
1362 ASSERT_EQ(Get(Key(i)), values[i]);
1363 }
1364}
1365
1366// Disable as the test is flaky.
1367TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) {
1368 int32_t trivial_move = 0;
1369 int32_t non_trivial_move = 0;
1370 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1371 "DBImpl::BackgroundCompaction:TrivialMove",
11fdf7f2 1372 [&](void* /*arg*/) { trivial_move++; });
7c673cae
FG
1373 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1374 "DBImpl::BackgroundCompaction:NonTrivial",
11fdf7f2 1375 [&](void* /*arg*/) { non_trivial_move++; });
7c673cae
FG
1376 bool first = true;
1377 bool second = true;
1378 rocksdb::SyncPoint::GetInstance()->LoadDependency(
1379 {{"DBCompaction::PartialFill:4", "DBCompaction::PartialFill:1"},
1380 {"DBCompaction::PartialFill:2", "DBCompaction::PartialFill:3"}});
1381 rocksdb::SyncPoint::GetInstance()->SetCallBack(
11fdf7f2 1382 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) {
7c673cae
FG
1383 if (first) {
1384 TEST_SYNC_POINT("DBCompaction::PartialFill:4");
1385 first = false;
1386 TEST_SYNC_POINT("DBCompaction::PartialFill:3");
1387 } else if (second) {
1388 }
1389 });
1390
1391 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1392
1393 Options options = CurrentOptions();
1394 options.write_buffer_size = 10 * 1024 * 1024;
1395 options.max_bytes_for_level_multiplier = 2;
1396 options.num_levels = 4;
1397 options.level0_file_num_compaction_trigger = 3;
1398 options.max_background_compactions = 3;
1399
1400 DestroyAndReopen(options);
11fdf7f2
TL
1401 // make sure all background compaction jobs can be scheduled
1402 auto stop_token =
1403 dbfull()->TEST_write_controler().GetCompactionPressureToken();
7c673cae
FG
1404 int32_t value_size = 10 * 1024; // 10 KB
1405
1406 // Add 2 non-overlapping files
1407 Random rnd(301);
1408 std::map<int32_t, std::string> values;
1409
1410 // file 1 [0 => 100]
1411 for (int32_t i = 0; i < 100; i++) {
1412 values[i] = RandomString(&rnd, value_size);
1413 ASSERT_OK(Put(Key(i), values[i]));
1414 }
1415 ASSERT_OK(Flush());
1416
1417 // file 2 [100 => 300]
1418 for (int32_t i = 100; i < 300; i++) {
1419 values[i] = RandomString(&rnd, value_size);
1420 ASSERT_OK(Put(Key(i), values[i]));
1421 }
1422 ASSERT_OK(Flush());
1423
1424 // 2 files in L0
1425 ASSERT_EQ("2", FilesPerLevel(0));
1426 CompactRangeOptions compact_options;
1427 compact_options.change_level = true;
1428 compact_options.target_level = 2;
1429 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1430 // 2 files in L2
1431 ASSERT_EQ("0,0,2", FilesPerLevel(0));
1432
1433 ASSERT_EQ(trivial_move, 1);
1434 ASSERT_EQ(non_trivial_move, 0);
1435
1436 // file 3 [ 0 => 200]
1437 for (int32_t i = 0; i < 200; i++) {
1438 values[i] = RandomString(&rnd, value_size);
1439 ASSERT_OK(Put(Key(i), values[i]));
1440 }
1441 ASSERT_OK(Flush());
1442
1443 // 2 files in L2, 1 in L0
1444 ASSERT_EQ("1,0,2", FilesPerLevel(0));
1445 ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false));
1446 // 2 files in L2, 1 in L1
1447 ASSERT_EQ("0,1,2", FilesPerLevel(0));
1448
1449 ASSERT_EQ(trivial_move, 2);
1450 ASSERT_EQ(non_trivial_move, 0);
1451
1452 rocksdb::port::Thread threads([&] {
1453 compact_options.change_level = false;
1454 compact_options.exclusive_manual_compaction = false;
1455 std::string begin_string = Key(0);
1456 std::string end_string = Key(199);
1457 Slice begin(begin_string);
1458 Slice end(end_string);
1459 ASSERT_OK(db_->CompactRange(compact_options, &begin, &end));
1460 });
1461
1462 TEST_SYNC_POINT("DBCompaction::PartialFill:1");
1463 // Many files 4 [300 => 4300)
1464 for (int32_t i = 0; i <= 5; i++) {
1465 for (int32_t j = 300; j < 4300; j++) {
1466 if (j == 2300) {
1467 ASSERT_OK(Flush());
1468 dbfull()->TEST_WaitForFlushMemTable();
1469 }
1470 values[j] = RandomString(&rnd, value_size);
1471 ASSERT_OK(Put(Key(j), values[j]));
1472 }
1473 }
1474
1475 // Verify level sizes
1476 uint64_t target_size = 4 * options.max_bytes_for_level_base;
1477 for (int32_t i = 1; i < options.num_levels; i++) {
1478 ASSERT_LE(SizeAtLevel(i), target_size);
1479 target_size = static_cast<uint64_t>(target_size *
1480 options.max_bytes_for_level_multiplier);
1481 }
1482
1483 TEST_SYNC_POINT("DBCompaction::PartialFill:2");
1484 dbfull()->TEST_WaitForFlushMemTable();
1485 dbfull()->TEST_WaitForCompact();
1486 threads.join();
1487
1488 for (int32_t i = 0; i < 4300; i++) {
1489 ASSERT_EQ(Get(Key(i)), values[i]);
1490 }
1491}
1492
1493TEST_F(DBCompactionTest, DeleteFileRange) {
1494 Options options = CurrentOptions();
1495 options.write_buffer_size = 10 * 1024 * 1024;
1496 options.max_bytes_for_level_multiplier = 2;
1497 options.num_levels = 4;
1498 options.level0_file_num_compaction_trigger = 3;
1499 options.max_background_compactions = 3;
1500
1501 DestroyAndReopen(options);
1502 int32_t value_size = 10 * 1024; // 10 KB
1503
1504 // Add 2 non-overlapping files
1505 Random rnd(301);
1506 std::map<int32_t, std::string> values;
1507
1508 // file 1 [0 => 100]
1509 for (int32_t i = 0; i < 100; i++) {
1510 values[i] = RandomString(&rnd, value_size);
1511 ASSERT_OK(Put(Key(i), values[i]));
1512 }
1513 ASSERT_OK(Flush());
1514
1515 // file 2 [100 => 300]
1516 for (int32_t i = 100; i < 300; i++) {
1517 values[i] = RandomString(&rnd, value_size);
1518 ASSERT_OK(Put(Key(i), values[i]));
1519 }
1520 ASSERT_OK(Flush());
1521
1522 // 2 files in L0
1523 ASSERT_EQ("2", FilesPerLevel(0));
1524 CompactRangeOptions compact_options;
1525 compact_options.change_level = true;
1526 compact_options.target_level = 2;
1527 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1528 // 2 files in L2
1529 ASSERT_EQ("0,0,2", FilesPerLevel(0));
1530
1531 // file 3 [ 0 => 200]
1532 for (int32_t i = 0; i < 200; i++) {
1533 values[i] = RandomString(&rnd, value_size);
1534 ASSERT_OK(Put(Key(i), values[i]));
1535 }
1536 ASSERT_OK(Flush());
1537
1538 // Many files 4 [300 => 4300)
1539 for (int32_t i = 0; i <= 5; i++) {
1540 for (int32_t j = 300; j < 4300; j++) {
1541 if (j == 2300) {
1542 ASSERT_OK(Flush());
1543 dbfull()->TEST_WaitForFlushMemTable();
1544 }
1545 values[j] = RandomString(&rnd, value_size);
1546 ASSERT_OK(Put(Key(j), values[j]));
1547 }
1548 }
1549 ASSERT_OK(Flush());
1550 dbfull()->TEST_WaitForFlushMemTable();
1551 dbfull()->TEST_WaitForCompact();
1552
1553 // Verify level sizes
1554 uint64_t target_size = 4 * options.max_bytes_for_level_base;
1555 for (int32_t i = 1; i < options.num_levels; i++) {
1556 ASSERT_LE(SizeAtLevel(i), target_size);
1557 target_size = static_cast<uint64_t>(target_size *
1558 options.max_bytes_for_level_multiplier);
1559 }
1560
1561 size_t old_num_files = CountFiles();
1562 std::string begin_string = Key(1000);
1563 std::string end_string = Key(2000);
1564 Slice begin(begin_string);
1565 Slice end(end_string);
1566 ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
1567
1568 int32_t deleted_count = 0;
1569 for (int32_t i = 0; i < 4300; i++) {
1570 if (i < 1000 || i > 2000) {
1571 ASSERT_EQ(Get(Key(i)), values[i]);
1572 } else {
1573 ReadOptions roptions;
1574 std::string result;
1575 Status s = db_->Get(roptions, Key(i), &result);
1576 ASSERT_TRUE(s.IsNotFound() || s.ok());
1577 if (s.IsNotFound()) {
1578 deleted_count++;
1579 }
1580 }
1581 }
1582 ASSERT_GT(deleted_count, 0);
1583 begin_string = Key(5000);
1584 end_string = Key(6000);
1585 Slice begin1(begin_string);
1586 Slice end1(end_string);
1587 // Try deleting files in range which contain no keys
1588 ASSERT_OK(
1589 DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin1, &end1));
1590
1591 // Push data from level 0 to level 1 to force all data to be deleted
1592 // Note that we don't delete level 0 files
1593 compact_options.change_level = true;
1594 compact_options.target_level = 1;
1595 ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
1596
1597 ASSERT_OK(
1598 DeleteFilesInRange(db_, db_->DefaultColumnFamily(), nullptr, nullptr));
1599
1600 int32_t deleted_count2 = 0;
1601 for (int32_t i = 0; i < 4300; i++) {
1602 ReadOptions roptions;
1603 std::string result;
1604 Status s = db_->Get(roptions, Key(i), &result);
1605 ASSERT_TRUE(s.IsNotFound());
1606 deleted_count2++;
1607 }
1608 ASSERT_GT(deleted_count2, deleted_count);
1609 size_t new_num_files = CountFiles();
1610 ASSERT_GT(old_num_files, new_num_files);
1611}
1612
11fdf7f2
TL
1613TEST_F(DBCompactionTest, DeleteFilesInRanges) {
1614 Options options = CurrentOptions();
1615 options.write_buffer_size = 10 * 1024 * 1024;
1616 options.max_bytes_for_level_multiplier = 2;
1617 options.num_levels = 4;
1618 options.max_background_compactions = 3;
1619 options.disable_auto_compactions = true;
1620
1621 DestroyAndReopen(options);
1622 int32_t value_size = 10 * 1024; // 10 KB
1623
1624 Random rnd(301);
1625 std::map<int32_t, std::string> values;
1626
1627 // file [0 => 100), [100 => 200), ... [900, 1000)
1628 for (auto i = 0; i < 10; i++) {
1629 for (auto j = 0; j < 100; j++) {
1630 auto k = i * 100 + j;
1631 values[k] = RandomString(&rnd, value_size);
1632 ASSERT_OK(Put(Key(k), values[k]));
1633 }
1634 ASSERT_OK(Flush());
1635 }
1636 ASSERT_EQ("10", FilesPerLevel(0));
1637 CompactRangeOptions compact_options;
1638 compact_options.change_level = true;
1639 compact_options.target_level = 2;
1640 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1641 ASSERT_EQ("0,0,10", FilesPerLevel(0));
1642
1643 // file [0 => 100), [200 => 300), ... [800, 900)
1644 for (auto i = 0; i < 10; i+=2) {
1645 for (auto j = 0; j < 100; j++) {
1646 auto k = i * 100 + j;
1647 ASSERT_OK(Put(Key(k), values[k]));
1648 }
1649 ASSERT_OK(Flush());
1650 }
1651 ASSERT_EQ("5,0,10", FilesPerLevel(0));
1652 ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr));
1653 ASSERT_EQ("0,5,10", FilesPerLevel(0));
1654
1655 // Delete files in range [0, 299] (inclusive)
1656 {
1657 auto begin_str1 = Key(0), end_str1 = Key(100);
1658 auto begin_str2 = Key(100), end_str2 = Key(200);
1659 auto begin_str3 = Key(200), end_str3 = Key(299);
1660 Slice begin1(begin_str1), end1(end_str1);
1661 Slice begin2(begin_str2), end2(end_str2);
1662 Slice begin3(begin_str3), end3(end_str3);
1663 std::vector<RangePtr> ranges;
1664 ranges.push_back(RangePtr(&begin1, &end1));
1665 ranges.push_back(RangePtr(&begin2, &end2));
1666 ranges.push_back(RangePtr(&begin3, &end3));
1667 ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(),
1668 ranges.data(), ranges.size()));
1669 ASSERT_EQ("0,3,7", FilesPerLevel(0));
1670
1671 // Keys [0, 300) should not exist.
1672 for (auto i = 0; i < 300; i++) {
1673 ReadOptions ropts;
1674 std::string result;
1675 auto s = db_->Get(ropts, Key(i), &result);
1676 ASSERT_TRUE(s.IsNotFound());
1677 }
1678 for (auto i = 300; i < 1000; i++) {
1679 ASSERT_EQ(Get(Key(i)), values[i]);
1680 }
1681 }
1682
1683 // Delete files in range [600, 999) (exclusive)
1684 {
1685 auto begin_str1 = Key(600), end_str1 = Key(800);
1686 auto begin_str2 = Key(700), end_str2 = Key(900);
1687 auto begin_str3 = Key(800), end_str3 = Key(999);
1688 Slice begin1(begin_str1), end1(end_str1);
1689 Slice begin2(begin_str2), end2(end_str2);
1690 Slice begin3(begin_str3), end3(end_str3);
1691 std::vector<RangePtr> ranges;
1692 ranges.push_back(RangePtr(&begin1, &end1));
1693 ranges.push_back(RangePtr(&begin2, &end2));
1694 ranges.push_back(RangePtr(&begin3, &end3));
1695 ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(),
1696 ranges.data(), ranges.size(), false));
1697 ASSERT_EQ("0,1,4", FilesPerLevel(0));
1698
1699 // Keys [600, 900) should not exist.
1700 for (auto i = 600; i < 900; i++) {
1701 ReadOptions ropts;
1702 std::string result;
1703 auto s = db_->Get(ropts, Key(i), &result);
1704 ASSERT_TRUE(s.IsNotFound());
1705 }
1706 for (auto i = 300; i < 600; i++) {
1707 ASSERT_EQ(Get(Key(i)), values[i]);
1708 }
1709 for (auto i = 900; i < 1000; i++) {
1710 ASSERT_EQ(Get(Key(i)), values[i]);
1711 }
1712 }
1713
1714 // Delete all files.
1715 {
1716 RangePtr range;
1717 ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(), &range, 1));
1718 ASSERT_EQ("", FilesPerLevel(0));
1719
1720 for (auto i = 0; i < 1000; i++) {
1721 ReadOptions ropts;
1722 std::string result;
1723 auto s = db_->Get(ropts, Key(i), &result);
1724 ASSERT_TRUE(s.IsNotFound());
1725 }
1726 }
1727}
1728
1729TEST_F(DBCompactionTest, DeleteFileRangeFileEndpointsOverlapBug) {
1730 // regression test for #2833: groups of files whose user-keys overlap at the
1731 // endpoints could be split by `DeleteFilesInRange`. This caused old data to
1732 // reappear, either because a new version of the key was removed, or a range
1733 // deletion was partially dropped. It could also cause non-overlapping
1734 // invariant to be violated if the files dropped by DeleteFilesInRange were
1735 // a subset of files that a range deletion spans.
1736 const int kNumL0Files = 2;
1737 const int kValSize = 8 << 10; // 8KB
1738 Options options = CurrentOptions();
1739 options.level0_file_num_compaction_trigger = kNumL0Files;
1740 options.target_file_size_base = 1 << 10; // 1KB
1741 DestroyAndReopen(options);
1742
1743 // The snapshot prevents key 1 from having its old version dropped. The low
1744 // `target_file_size_base` ensures two keys will be in each output file.
1745 const Snapshot* snapshot = nullptr;
1746 Random rnd(301);
1747 // The value indicates which flush the key belonged to, which is enough
1748 // for us to determine the keys' relative ages. After L0 flushes finish,
1749 // files look like:
1750 //
1751 // File 0: 0 -> vals[0], 1 -> vals[0]
1752 // File 1: 1 -> vals[1], 2 -> vals[1]
1753 //
1754 // Then L0->L1 compaction happens, which outputs keys as follows:
1755 //
1756 // File 0: 0 -> vals[0], 1 -> vals[1]
1757 // File 1: 1 -> vals[0], 2 -> vals[1]
1758 //
1759 // DeleteFilesInRange shouldn't be allowed to drop just file 0, as that
1760 // would cause `1 -> vals[0]` (an older key) to reappear.
1761 std::string vals[kNumL0Files];
1762 for (int i = 0; i < kNumL0Files; ++i) {
1763 vals[i] = RandomString(&rnd, kValSize);
1764 Put(Key(i), vals[i]);
1765 Put(Key(i + 1), vals[i]);
1766 Flush();
1767 if (i == 0) {
1768 snapshot = db_->GetSnapshot();
1769 }
1770 }
1771 dbfull()->TEST_WaitForCompact();
1772
1773 // Verify `DeleteFilesInRange` can't drop only file 0 which would cause
1774 // "1 -> vals[0]" to reappear.
1775 std::string begin_str = Key(0), end_str = Key(1);
1776 Slice begin = begin_str, end = end_str;
1777 ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
1778 ASSERT_EQ(vals[1], Get(Key(1)));
1779
1780 db_->ReleaseSnapshot(snapshot);
1781}
1782
7c673cae
FG
1783TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) {
1784 int32_t trivial_move = 0;
1785 int32_t non_trivial_move = 0;
1786 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1787 "DBImpl::BackgroundCompaction:TrivialMove",
11fdf7f2 1788 [&](void* /*arg*/) { trivial_move++; });
7c673cae
FG
1789 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1790 "DBImpl::BackgroundCompaction:NonTrivial",
11fdf7f2 1791 [&](void* /*arg*/) { non_trivial_move++; });
7c673cae
FG
1792 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1793
1794 Options options = CurrentOptions();
1795 options.write_buffer_size = 100000000;
1796 options.max_subcompactions = max_subcompactions_;
1797 DestroyAndReopen(options);
1798
1799 int32_t value_size = 10 * 1024; // 10 KB
1800
1801 Random rnd(301);
1802 std::vector<std::string> values;
1803 // File with keys [ 0 => 99 ]
1804 for (int i = 0; i < 100; i++) {
1805 values.push_back(RandomString(&rnd, value_size));
1806 ASSERT_OK(Put(Key(i), values[i]));
1807 }
1808 ASSERT_OK(Flush());
1809
1810 ASSERT_EQ("1", FilesPerLevel(0));
1811 // Compaction will do L0=>L1 (trivial move) then move L1 files to L3
1812 CompactRangeOptions compact_options;
1813 compact_options.change_level = true;
1814 compact_options.target_level = 3;
1815 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
1816 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
1817 ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
1818 ASSERT_EQ(trivial_move, 1);
1819 ASSERT_EQ(non_trivial_move, 0);
1820
1821 // File with keys [ 100 => 199 ]
1822 for (int i = 100; i < 200; i++) {
1823 values.push_back(RandomString(&rnd, value_size));
1824 ASSERT_OK(Put(Key(i), values[i]));
1825 }
1826 ASSERT_OK(Flush());
1827
1828 ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
1829 CompactRangeOptions cro;
1830 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
1831 // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
1832 ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
1833 ASSERT_EQ("0,0,0,2", FilesPerLevel(0));
1834 ASSERT_EQ(trivial_move, 4);
1835 ASSERT_EQ(non_trivial_move, 0);
1836
1837 for (int i = 0; i < 200; i++) {
1838 ASSERT_EQ(Get(Key(i)), values[i]);
1839 }
1840
1841 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
1842}
1843
1844TEST_P(DBCompactionTestWithParam, LevelCompactionThirdPath) {
1845 Options options = CurrentOptions();
1846 options.db_paths.emplace_back(dbname_, 500 * 1024);
1847 options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
1848 options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
1849 options.memtable_factory.reset(
1850 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1851 options.compaction_style = kCompactionStyleLevel;
1852 options.write_buffer_size = 110 << 10; // 110KB
1853 options.arena_block_size = 4 << 10;
1854 options.level0_file_num_compaction_trigger = 2;
1855 options.num_levels = 4;
1856 options.max_bytes_for_level_base = 400 * 1024;
1857 options.max_subcompactions = max_subcompactions_;
1858 // options = CurrentOptions(options);
1859
1860 std::vector<std::string> filenames;
1861 env_->GetChildren(options.db_paths[1].path, &filenames);
1862 // Delete archival files.
1863 for (size_t i = 0; i < filenames.size(); ++i) {
1864 env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
1865 }
1866 env_->DeleteDir(options.db_paths[1].path);
1867 Reopen(options);
1868
1869 Random rnd(301);
1870 int key_idx = 0;
1871
1872 // First three 110KB files are not going to second path.
1873 // After that, (100K, 200K)
1874 for (int num = 0; num < 3; num++) {
1875 GenerateNewFile(&rnd, &key_idx);
1876 }
1877
1878 // Another 110KB triggers a compaction to 400K file to fill up first path
1879 GenerateNewFile(&rnd, &key_idx);
1880 ASSERT_EQ(3, GetSstFileCount(options.db_paths[1].path));
1881
1882 // (1, 4)
1883 GenerateNewFile(&rnd, &key_idx);
1884 ASSERT_EQ("1,4", FilesPerLevel(0));
1885 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1886 ASSERT_EQ(1, GetSstFileCount(dbname_));
1887
1888 // (1, 4, 1)
1889 GenerateNewFile(&rnd, &key_idx);
1890 ASSERT_EQ("1,4,1", FilesPerLevel(0));
1891 ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path));
1892 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1893 ASSERT_EQ(1, GetSstFileCount(dbname_));
1894
1895 // (1, 4, 2)
1896 GenerateNewFile(&rnd, &key_idx);
1897 ASSERT_EQ("1,4,2", FilesPerLevel(0));
1898 ASSERT_EQ(2, GetSstFileCount(options.db_paths[2].path));
1899 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1900 ASSERT_EQ(1, GetSstFileCount(dbname_));
1901
1902 // (1, 4, 3)
1903 GenerateNewFile(&rnd, &key_idx);
1904 ASSERT_EQ("1,4,3", FilesPerLevel(0));
1905 ASSERT_EQ(3, GetSstFileCount(options.db_paths[2].path));
1906 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1907 ASSERT_EQ(1, GetSstFileCount(dbname_));
1908
1909 // (1, 4, 4)
1910 GenerateNewFile(&rnd, &key_idx);
1911 ASSERT_EQ("1,4,4", FilesPerLevel(0));
1912 ASSERT_EQ(4, GetSstFileCount(options.db_paths[2].path));
1913 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1914 ASSERT_EQ(1, GetSstFileCount(dbname_));
1915
1916 // (1, 4, 5)
1917 GenerateNewFile(&rnd, &key_idx);
1918 ASSERT_EQ("1,4,5", FilesPerLevel(0));
1919 ASSERT_EQ(5, GetSstFileCount(options.db_paths[2].path));
1920 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1921 ASSERT_EQ(1, GetSstFileCount(dbname_));
1922
1923 // (1, 4, 6)
1924 GenerateNewFile(&rnd, &key_idx);
1925 ASSERT_EQ("1,4,6", FilesPerLevel(0));
1926 ASSERT_EQ(6, GetSstFileCount(options.db_paths[2].path));
1927 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1928 ASSERT_EQ(1, GetSstFileCount(dbname_));
1929
1930 // (1, 4, 7)
1931 GenerateNewFile(&rnd, &key_idx);
1932 ASSERT_EQ("1,4,7", FilesPerLevel(0));
1933 ASSERT_EQ(7, GetSstFileCount(options.db_paths[2].path));
1934 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1935 ASSERT_EQ(1, GetSstFileCount(dbname_));
1936
1937 // (1, 4, 8)
1938 GenerateNewFile(&rnd, &key_idx);
1939 ASSERT_EQ("1,4,8", FilesPerLevel(0));
1940 ASSERT_EQ(8, GetSstFileCount(options.db_paths[2].path));
1941 ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path));
1942 ASSERT_EQ(1, GetSstFileCount(dbname_));
1943
1944 for (int i = 0; i < key_idx; i++) {
1945 auto v = Get(Key(i));
1946 ASSERT_NE(v, "NOT_FOUND");
1947 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1948 }
1949
1950 Reopen(options);
1951
1952 for (int i = 0; i < key_idx; i++) {
1953 auto v = Get(Key(i));
1954 ASSERT_NE(v, "NOT_FOUND");
1955 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
1956 }
1957
1958 Destroy(options);
1959}
1960
1961TEST_P(DBCompactionTestWithParam, LevelCompactionPathUse) {
1962 Options options = CurrentOptions();
1963 options.db_paths.emplace_back(dbname_, 500 * 1024);
1964 options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
1965 options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
1966 options.memtable_factory.reset(
1967 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
1968 options.compaction_style = kCompactionStyleLevel;
1969 options.write_buffer_size = 110 << 10; // 110KB
1970 options.arena_block_size = 4 << 10;
1971 options.level0_file_num_compaction_trigger = 2;
1972 options.num_levels = 4;
1973 options.max_bytes_for_level_base = 400 * 1024;
1974 options.max_subcompactions = max_subcompactions_;
1975 // options = CurrentOptions(options);
1976
1977 std::vector<std::string> filenames;
1978 env_->GetChildren(options.db_paths[1].path, &filenames);
1979 // Delete archival files.
1980 for (size_t i = 0; i < filenames.size(); ++i) {
1981 env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]);
1982 }
1983 env_->DeleteDir(options.db_paths[1].path);
1984 Reopen(options);
1985
1986 Random rnd(301);
1987 int key_idx = 0;
1988
1989 // Always gets compacted into 1 Level1 file,
1990 // 0/1 Level 0 file
1991 for (int num = 0; num < 3; num++) {
1992 key_idx = 0;
1993 GenerateNewFile(&rnd, &key_idx);
1994 }
1995
1996 key_idx = 0;
1997 GenerateNewFile(&rnd, &key_idx);
1998 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
1999
2000 key_idx = 0;
2001 GenerateNewFile(&rnd, &key_idx);
2002 ASSERT_EQ("1,1", FilesPerLevel(0));
2003 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2004 ASSERT_EQ(1, GetSstFileCount(dbname_));
2005
2006 key_idx = 0;
2007 GenerateNewFile(&rnd, &key_idx);
2008 ASSERT_EQ("0,1", FilesPerLevel(0));
2009 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2010 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2011 ASSERT_EQ(0, GetSstFileCount(dbname_));
2012
2013 key_idx = 0;
2014 GenerateNewFile(&rnd, &key_idx);
2015 ASSERT_EQ("1,1", FilesPerLevel(0));
2016 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2017 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2018 ASSERT_EQ(1, GetSstFileCount(dbname_));
2019
2020 key_idx = 0;
2021 GenerateNewFile(&rnd, &key_idx);
2022 ASSERT_EQ("0,1", FilesPerLevel(0));
2023 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2024 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2025 ASSERT_EQ(0, GetSstFileCount(dbname_));
2026
2027 key_idx = 0;
2028 GenerateNewFile(&rnd, &key_idx);
2029 ASSERT_EQ("1,1", FilesPerLevel(0));
2030 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2031 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2032 ASSERT_EQ(1, GetSstFileCount(dbname_));
2033
2034 key_idx = 0;
2035 GenerateNewFile(&rnd, &key_idx);
2036 ASSERT_EQ("0,1", FilesPerLevel(0));
2037 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2038 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2039 ASSERT_EQ(0, GetSstFileCount(dbname_));
2040
2041 key_idx = 0;
2042 GenerateNewFile(&rnd, &key_idx);
2043 ASSERT_EQ("1,1", FilesPerLevel(0));
2044 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2045 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2046 ASSERT_EQ(1, GetSstFileCount(dbname_));
2047
2048 key_idx = 0;
2049 GenerateNewFile(&rnd, &key_idx);
2050 ASSERT_EQ("0,1", FilesPerLevel(0));
2051 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2052 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2053 ASSERT_EQ(0, GetSstFileCount(dbname_));
2054
2055 key_idx = 0;
2056 GenerateNewFile(&rnd, &key_idx);
2057 ASSERT_EQ("1,1", FilesPerLevel(0));
2058 ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path));
2059 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2060 ASSERT_EQ(1, GetSstFileCount(dbname_));
2061
2062 for (int i = 0; i < key_idx; i++) {
2063 auto v = Get(Key(i));
2064 ASSERT_NE(v, "NOT_FOUND");
2065 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2066 }
2067
2068 Reopen(options);
2069
2070 for (int i = 0; i < key_idx; i++) {
2071 auto v = Get(Key(i));
2072 ASSERT_NE(v, "NOT_FOUND");
2073 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2074 }
2075
2076 Destroy(options);
2077}
2078
11fdf7f2
TL
2079TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) {
2080 Options options = CurrentOptions();
2081 options.db_paths.emplace_back(dbname_, 500 * 1024);
2082 options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
2083 options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
2084 options.memtable_factory.reset(
2085 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2086 options.compaction_style = kCompactionStyleLevel;
2087 options.write_buffer_size = 110 << 10; // 110KB
2088 options.arena_block_size = 4 << 10;
2089 options.level0_file_num_compaction_trigger = 2;
2090 options.num_levels = 4;
2091 options.max_bytes_for_level_base = 400 * 1024;
2092 options.max_subcompactions = max_subcompactions_;
2093
2094 std::vector<Options> option_vector;
2095 option_vector.emplace_back(options);
2096 ColumnFamilyOptions cf_opt1(options), cf_opt2(options);
2097 // Configure CF1 specific paths.
2098 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 500 * 1024);
2099 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 4 * 1024 * 1024);
2100 cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 1024 * 1024 * 1024);
2101 option_vector.emplace_back(DBOptions(options), cf_opt1);
2102 CreateColumnFamilies({"one"},option_vector[1]);
2103
2104 // Configura CF2 specific paths.
2105 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 500 * 1024);
2106 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 4 * 1024 * 1024);
2107 cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 1024 * 1024 * 1024);
2108 option_vector.emplace_back(DBOptions(options), cf_opt2);
2109 CreateColumnFamilies({"two"},option_vector[2]);
2110
2111 ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
2112
2113 Random rnd(301);
2114 int key_idx = 0;
2115 int key_idx1 = 0;
2116 int key_idx2 = 0;
2117
2118 auto generate_file = [&]() {
2119 GenerateNewFile(0, &rnd, &key_idx);
2120 GenerateNewFile(1, &rnd, &key_idx1);
2121 GenerateNewFile(2, &rnd, &key_idx2);
2122 };
2123
2124 auto check_sstfilecount = [&](int path_id, int expected) {
2125 ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path));
2126 ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path));
2127 ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path));
2128 };
2129
2130 auto check_filesperlevel = [&](const std::string& expected) {
2131 ASSERT_EQ(expected, FilesPerLevel(0));
2132 ASSERT_EQ(expected, FilesPerLevel(1));
2133 ASSERT_EQ(expected, FilesPerLevel(2));
2134 };
2135
2136 auto check_getvalues = [&]() {
2137 for (int i = 0; i < key_idx; i++) {
2138 auto v = Get(0, Key(i));
2139 ASSERT_NE(v, "NOT_FOUND");
2140 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2141 }
2142
2143 for (int i = 0; i < key_idx1; i++) {
2144 auto v = Get(1, Key(i));
2145 ASSERT_NE(v, "NOT_FOUND");
2146 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2147 }
2148
2149 for (int i = 0; i < key_idx2; i++) {
2150 auto v = Get(2, Key(i));
2151 ASSERT_NE(v, "NOT_FOUND");
2152 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2153 }
2154 };
2155
2156 // Check that default column family uses db_paths.
2157 // And Column family "one" uses cf_paths.
2158
2159 // First three 110KB files are not going to second path.
2160 // After that, (100K, 200K)
2161 for (int num = 0; num < 3; num++) {
2162 generate_file();
2163 }
2164
2165 // Another 110KB triggers a compaction to 400K file to fill up first path
2166 generate_file();
2167 check_sstfilecount(1, 3);
2168
2169 // (1, 4)
2170 generate_file();
2171 check_filesperlevel("1,4");
2172 check_sstfilecount(1, 4);
2173 check_sstfilecount(0, 1);
2174
2175 // (1, 4, 1)
2176 generate_file();
2177 check_filesperlevel("1,4,1");
2178 check_sstfilecount(2, 1);
2179 check_sstfilecount(1, 4);
2180 check_sstfilecount(0, 1);
2181
2182 // (1, 4, 2)
2183 generate_file();
2184 check_filesperlevel("1,4,2");
2185 check_sstfilecount(2, 2);
2186 check_sstfilecount(1, 4);
2187 check_sstfilecount(0, 1);
2188
2189 check_getvalues();
2190
2191 ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
2192
2193 check_getvalues();
2194
2195 Destroy(options, true);
2196}
2197
7c673cae
FG
2198TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
2199 Random rnd(301);
2200 int max_key_level_insert = 200;
2201 int max_key_universal_insert = 600;
2202
2203 // Stage 1: generate a db with level compaction
2204 Options options = CurrentOptions();
2205 options.write_buffer_size = 110 << 10; // 110KB
2206 options.arena_block_size = 4 << 10;
2207 options.num_levels = 4;
2208 options.level0_file_num_compaction_trigger = 3;
2209 options.max_bytes_for_level_base = 500 << 10; // 500KB
2210 options.max_bytes_for_level_multiplier = 1;
2211 options.target_file_size_base = 200 << 10; // 200KB
2212 options.target_file_size_multiplier = 1;
2213 options.max_subcompactions = max_subcompactions_;
2214 CreateAndReopenWithCF({"pikachu"}, options);
2215
2216 for (int i = 0; i <= max_key_level_insert; i++) {
2217 // each value is 10K
2218 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
2219 }
2220 ASSERT_OK(Flush(1));
2221 dbfull()->TEST_WaitForCompact();
2222
2223 ASSERT_GT(TotalTableFiles(1, 4), 1);
2224 int non_level0_num_files = 0;
2225 for (int i = 1; i < options.num_levels; i++) {
2226 non_level0_num_files += NumTableFilesAtLevel(i, 1);
2227 }
2228 ASSERT_GT(non_level0_num_files, 0);
2229
2230 // Stage 2: reopen with universal compaction - should fail
2231 options = CurrentOptions();
2232 options.compaction_style = kCompactionStyleUniversal;
2233 options.num_levels = 1;
2234 options = CurrentOptions(options);
2235 Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
2236 ASSERT_TRUE(s.IsInvalidArgument());
2237
2238 // Stage 3: compact into a single file and move the file to level 0
2239 options = CurrentOptions();
2240 options.disable_auto_compactions = true;
2241 options.target_file_size_base = INT_MAX;
2242 options.target_file_size_multiplier = 1;
2243 options.max_bytes_for_level_base = INT_MAX;
2244 options.max_bytes_for_level_multiplier = 1;
2245 options.num_levels = 4;
2246 options = CurrentOptions(options);
2247 ReopenWithColumnFamilies({"default", "pikachu"}, options);
2248
2249 CompactRangeOptions compact_options;
2250 compact_options.change_level = true;
2251 compact_options.target_level = 0;
2252 compact_options.bottommost_level_compaction =
2253 BottommostLevelCompaction::kForce;
2254 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
2255 dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr);
2256
2257 // Only 1 file in L0
2258 ASSERT_EQ("1", FilesPerLevel(1));
2259
2260 // Stage 4: re-open in universal compaction style and do some db operations
2261 options = CurrentOptions();
2262 options.compaction_style = kCompactionStyleUniversal;
2263 options.num_levels = 4;
2264 options.write_buffer_size = 110 << 10; // 110KB
2265 options.arena_block_size = 4 << 10;
2266 options.level0_file_num_compaction_trigger = 3;
2267 options = CurrentOptions(options);
2268 ReopenWithColumnFamilies({"default", "pikachu"}, options);
2269
2270 options.num_levels = 1;
2271 ReopenWithColumnFamilies({"default", "pikachu"}, options);
2272
2273 for (int i = max_key_level_insert / 2; i <= max_key_universal_insert; i++) {
2274 ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000)));
2275 }
2276 dbfull()->Flush(FlushOptions());
2277 ASSERT_OK(Flush(1));
2278 dbfull()->TEST_WaitForCompact();
2279
2280 for (int i = 1; i < options.num_levels; i++) {
2281 ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0);
2282 }
2283
2284 // verify keys inserted in both level compaction style and universal
2285 // compaction style
2286 std::string keys_in_db;
2287 Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]);
2288 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
2289 keys_in_db.append(iter->key().ToString());
2290 keys_in_db.push_back(',');
2291 }
2292 delete iter;
2293
2294 std::string expected_keys;
2295 for (int i = 0; i <= max_key_universal_insert; i++) {
2296 expected_keys.append(Key(i));
2297 expected_keys.push_back(',');
2298 }
2299
2300 ASSERT_EQ(keys_in_db, expected_keys);
2301}
2302
2303TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_a) {
2304 do {
2305 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2306 ASSERT_OK(Put(1, "b", "v"));
2307 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2308 ASSERT_OK(Delete(1, "b"));
2309 ASSERT_OK(Delete(1, "a"));
2310 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2311 ASSERT_OK(Delete(1, "a"));
2312 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2313 ASSERT_OK(Put(1, "a", "v"));
2314 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2315 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2316 ASSERT_EQ("(a->v)", Contents(1));
2317 env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
2318 ASSERT_EQ("(a->v)", Contents(1));
2319 } while (ChangeCompactOptions());
2320}
2321
2322TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_b) {
2323 do {
2324 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2325 Put(1, "", "");
2326 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2327 Delete(1, "e");
2328 Put(1, "", "");
2329 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2330 Put(1, "c", "cv");
2331 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2332 Put(1, "", "");
2333 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2334 Put(1, "", "");
2335 env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
2336 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2337 Put(1, "d", "dv");
2338 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2339 Put(1, "", "");
2340 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2341 Delete(1, "d");
2342 Delete(1, "b");
2343 ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
2344 ASSERT_EQ("(->)(c->cv)", Contents(1));
2345 env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
2346 ASSERT_EQ("(->)(c->cv)", Contents(1));
2347 } while (ChangeCompactOptions());
2348}
2349
11fdf7f2
TL
2350TEST_F(DBCompactionTest, ManualAutoRace) {
2351 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2352 rocksdb::SyncPoint::GetInstance()->LoadDependency(
2353 {{"DBImpl::BGWorkCompaction", "DBCompactionTest::ManualAutoRace:1"},
2354 {"DBImpl::RunManualCompaction:WaitScheduled",
2355 "BackgroundCallCompaction:0"}});
2356
2357 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2358
2359 Put(1, "foo", "");
2360 Put(1, "bar", "");
2361 Flush(1);
2362 Put(1, "foo", "");
2363 Put(1, "bar", "");
2364 // Generate four files in CF 0, which should trigger an auto compaction
2365 Put("foo", "");
2366 Put("bar", "");
2367 Flush();
2368 Put("foo", "");
2369 Put("bar", "");
2370 Flush();
2371 Put("foo", "");
2372 Put("bar", "");
2373 Flush();
2374 Put("foo", "");
2375 Put("bar", "");
2376 Flush();
2377
2378 // The auto compaction is scheduled but waited until here
2379 TEST_SYNC_POINT("DBCompactionTest::ManualAutoRace:1");
2380 // The auto compaction will wait until the manual compaction is registerd
2381 // before processing so that it will be cancelled.
2382 dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
2383 ASSERT_EQ("0,1", FilesPerLevel(1));
2384
2385 // Eventually the cancelled compaction will be rescheduled and executed.
2386 dbfull()->TEST_WaitForCompact();
2387 ASSERT_EQ("0,1", FilesPerLevel(0));
2388 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2389}
2390
7c673cae
FG
2391TEST_P(DBCompactionTestWithParam, ManualCompaction) {
2392 Options options = CurrentOptions();
2393 options.max_subcompactions = max_subcompactions_;
2394 options.statistics = rocksdb::CreateDBStatistics();
2395 CreateAndReopenWithCF({"pikachu"}, options);
2396
2397 // iter - 0 with 7 levels
2398 // iter - 1 with 3 levels
2399 for (int iter = 0; iter < 2; ++iter) {
2400 MakeTables(3, "p", "q", 1);
2401 ASSERT_EQ("1,1,1", FilesPerLevel(1));
2402
2403 // Compaction range falls before files
2404 Compact(1, "", "c");
2405 ASSERT_EQ("1,1,1", FilesPerLevel(1));
2406
2407 // Compaction range falls after files
2408 Compact(1, "r", "z");
2409 ASSERT_EQ("1,1,1", FilesPerLevel(1));
2410
2411 // Compaction range overlaps files
2412 Compact(1, "p1", "p9");
2413 ASSERT_EQ("0,0,1", FilesPerLevel(1));
2414
2415 // Populate a different range
2416 MakeTables(3, "c", "e", 1);
2417 ASSERT_EQ("1,1,2", FilesPerLevel(1));
2418
2419 // Compact just the new range
2420 Compact(1, "b", "f");
2421 ASSERT_EQ("0,0,2", FilesPerLevel(1));
2422
2423 // Compact all
2424 MakeTables(1, "a", "z", 1);
2425 ASSERT_EQ("1,0,2", FilesPerLevel(1));
2426
2427 uint64_t prev_block_cache_add =
2428 options.statistics->getTickerCount(BLOCK_CACHE_ADD);
2429 CompactRangeOptions cro;
2430 cro.exclusive_manual_compaction = exclusive_manual_compaction_;
2431 db_->CompactRange(cro, handles_[1], nullptr, nullptr);
2432 // Verify manual compaction doesn't fill block cache
2433 ASSERT_EQ(prev_block_cache_add,
2434 options.statistics->getTickerCount(BLOCK_CACHE_ADD));
2435
2436 ASSERT_EQ("0,0,1", FilesPerLevel(1));
2437
2438 if (iter == 0) {
2439 options = CurrentOptions();
7c673cae
FG
2440 options.num_levels = 3;
2441 options.create_if_missing = true;
2442 options.statistics = rocksdb::CreateDBStatistics();
2443 DestroyAndReopen(options);
2444 CreateAndReopenWithCF({"pikachu"}, options);
2445 }
2446 }
2447}
2448
2449
2450TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) {
2451 Options options = CurrentOptions();
2452 options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
2453 options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
2454 options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
2455 options.max_subcompactions = max_subcompactions_;
2456 CreateAndReopenWithCF({"pikachu"}, options);
2457
2458 // iter - 0 with 7 levels
2459 // iter - 1 with 3 levels
2460 for (int iter = 0; iter < 2; ++iter) {
2461 for (int i = 0; i < 3; ++i) {
2462 ASSERT_OK(Put(1, "p", "begin"));
2463 ASSERT_OK(Put(1, "q", "end"));
2464 ASSERT_OK(Flush(1));
2465 }
2466 ASSERT_EQ("3", FilesPerLevel(1));
2467 ASSERT_EQ(3, GetSstFileCount(options.db_paths[0].path));
2468 ASSERT_EQ(0, GetSstFileCount(dbname_));
2469
2470 // Compaction range falls before files
2471 Compact(1, "", "c");
2472 ASSERT_EQ("3", FilesPerLevel(1));
2473
2474 // Compaction range falls after files
2475 Compact(1, "r", "z");
2476 ASSERT_EQ("3", FilesPerLevel(1));
2477
2478 // Compaction range overlaps files
2479 Compact(1, "p1", "p9", 1);
11fdf7f2 2480 ASSERT_OK(dbfull()->TEST_WaitForCompact());
7c673cae
FG
2481 ASSERT_EQ("0,1", FilesPerLevel(1));
2482 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2483 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2484 ASSERT_EQ(0, GetSstFileCount(dbname_));
2485
2486 // Populate a different range
2487 for (int i = 0; i < 3; ++i) {
2488 ASSERT_OK(Put(1, "c", "begin"));
2489 ASSERT_OK(Put(1, "e", "end"));
2490 ASSERT_OK(Flush(1));
2491 }
2492 ASSERT_EQ("3,1", FilesPerLevel(1));
2493
2494 // Compact just the new range
2495 Compact(1, "b", "f", 1);
11fdf7f2 2496 ASSERT_OK(dbfull()->TEST_WaitForCompact());
7c673cae
FG
2497 ASSERT_EQ("0,2", FilesPerLevel(1));
2498 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
2499 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2500 ASSERT_EQ(0, GetSstFileCount(dbname_));
2501
2502 // Compact all
2503 ASSERT_OK(Put(1, "a", "begin"));
2504 ASSERT_OK(Put(1, "z", "end"));
2505 ASSERT_OK(Flush(1));
2506 ASSERT_EQ("1,2", FilesPerLevel(1));
2507 ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path));
2508 ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path));
2509 CompactRangeOptions compact_options;
2510 compact_options.target_path_id = 1;
2511 compact_options.exclusive_manual_compaction = exclusive_manual_compaction_;
2512 db_->CompactRange(compact_options, handles_[1], nullptr, nullptr);
11fdf7f2 2513 ASSERT_OK(dbfull()->TEST_WaitForCompact());
7c673cae
FG
2514
2515 ASSERT_EQ("0,1", FilesPerLevel(1));
2516 ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path));
2517 ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path));
2518 ASSERT_EQ(0, GetSstFileCount(dbname_));
2519
2520 if (iter == 0) {
2521 DestroyAndReopen(options);
2522 options = CurrentOptions();
2523 options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760);
2524 options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760);
2525 options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760);
2526 options.max_background_flushes = 1;
2527 options.num_levels = 3;
2528 options.create_if_missing = true;
2529 CreateAndReopenWithCF({"pikachu"}, options);
2530 }
2531 }
2532}
2533
2534TEST_F(DBCompactionTest, FilesDeletedAfterCompaction) {
2535 do {
2536 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
2537 ASSERT_OK(Put(1, "foo", "v2"));
2538 Compact(1, "a", "z");
2539 const size_t num_files = CountLiveFiles();
2540 for (int i = 0; i < 10; i++) {
2541 ASSERT_OK(Put(1, "foo", "v2"));
2542 Compact(1, "a", "z");
2543 }
2544 ASSERT_EQ(CountLiveFiles(), num_files);
2545 } while (ChangeCompactOptions());
2546}
2547
2548// Check level comapction with compact files
2549TEST_P(DBCompactionTestWithParam, DISABLED_CompactFilesOnLevelCompaction) {
2550 const int kTestKeySize = 16;
2551 const int kTestValueSize = 984;
2552 const int kEntrySize = kTestKeySize + kTestValueSize;
2553 const int kEntriesPerBuffer = 100;
2554 Options options;
2555 options.create_if_missing = true;
2556 options.write_buffer_size = kEntrySize * kEntriesPerBuffer;
2557 options.compaction_style = kCompactionStyleLevel;
2558 options.target_file_size_base = options.write_buffer_size;
2559 options.max_bytes_for_level_base = options.target_file_size_base * 2;
2560 options.level0_stop_writes_trigger = 2;
2561 options.max_bytes_for_level_multiplier = 2;
2562 options.compression = kNoCompression;
2563 options.max_subcompactions = max_subcompactions_;
2564 options = CurrentOptions(options);
2565 CreateAndReopenWithCF({"pikachu"}, options);
2566
2567 Random rnd(301);
2568 for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
2569 ASSERT_OK(Put(1, ToString(key), RandomString(&rnd, kTestValueSize)));
2570 }
2571 dbfull()->TEST_WaitForFlushMemTable(handles_[1]);
2572 dbfull()->TEST_WaitForCompact();
2573
2574 ColumnFamilyMetaData cf_meta;
2575 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
2576 int output_level = static_cast<int>(cf_meta.levels.size()) - 1;
2577 for (int file_picked = 5; file_picked > 0; --file_picked) {
2578 std::set<std::string> overlapping_file_names;
2579 std::vector<std::string> compaction_input_file_names;
2580 for (int f = 0; f < file_picked; ++f) {
2581 int level = 0;
2582 auto file_meta = PickFileRandomly(cf_meta, &rnd, &level);
2583 compaction_input_file_names.push_back(file_meta->name);
2584 GetOverlappingFileNumbersForLevelCompaction(
2585 cf_meta, options.comparator, level, output_level,
2586 file_meta, &overlapping_file_names);
2587 }
2588
2589 ASSERT_OK(dbfull()->CompactFiles(
2590 CompactionOptions(), handles_[1],
2591 compaction_input_file_names,
2592 output_level));
2593
2594 // Make sure all overlapping files do not exist after compaction
2595 dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta);
2596 VerifyCompactionResult(cf_meta, overlapping_file_names);
2597 }
2598
2599 // make sure all key-values are still there.
2600 for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) {
2601 ASSERT_NE(Get(1, ToString(key)), "NOT_FOUND");
2602 }
2603}
2604
2605TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) {
2606 Options options;
2607 const int kKeySize = 16;
2608 const int kKvSize = 1000;
2609 const int kKeysPerBuffer = 100;
2610 const int kNumL1Files = 5;
2611 options.create_if_missing = true;
2612 options.write_buffer_size = kKeysPerBuffer * kKvSize;
2613 options.max_write_buffer_number = 2;
2614 options.target_file_size_base =
2615 options.write_buffer_size *
2616 (options.max_write_buffer_number - 1);
2617 options.level0_file_num_compaction_trigger = kNumL1Files;
2618 options.max_bytes_for_level_base =
2619 options.level0_file_num_compaction_trigger *
2620 options.target_file_size_base;
2621 options.max_bytes_for_level_multiplier = 2;
2622 options.compression = kNoCompression;
2623 options.max_subcompactions = max_subcompactions_;
2624
2625 env_->SetBackgroundThreads(1, Env::HIGH);
2626 env_->SetBackgroundThreads(1, Env::LOW);
2627 // stop the compaction thread until we simulate the file creation failure.
2628 test::SleepingBackgroundTask sleeping_task_low;
2629 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
2630 Env::Priority::LOW);
2631
2632 options.env = env_;
2633
2634 DestroyAndReopen(options);
2635
2636 const int kNumInsertedKeys =
2637 options.level0_file_num_compaction_trigger *
2638 (options.max_write_buffer_number - 1) *
2639 kKeysPerBuffer;
2640
2641 Random rnd(301);
2642 std::vector<std::string> keys;
2643 std::vector<std::string> values;
2644 for (int k = 0; k < kNumInsertedKeys; ++k) {
2645 keys.emplace_back(RandomString(&rnd, kKeySize));
2646 values.emplace_back(RandomString(&rnd, kKvSize - kKeySize));
2647 ASSERT_OK(Put(Slice(keys[k]), Slice(values[k])));
2648 dbfull()->TEST_WaitForFlushMemTable();
2649 }
2650
2651 dbfull()->TEST_FlushMemTable(true);
2652 // Make sure the number of L0 files can trigger compaction.
2653 ASSERT_GE(NumTableFilesAtLevel(0),
2654 options.level0_file_num_compaction_trigger);
2655
2656 auto previous_num_level0_files = NumTableFilesAtLevel(0);
2657
2658 // Fail the first file creation.
2659 env_->non_writable_count_ = 1;
2660 sleeping_task_low.WakeUp();
2661 sleeping_task_low.WaitUntilDone();
2662
2663 // Expect compaction to fail here as one file will fail its
2664 // creation.
2665 ASSERT_TRUE(!dbfull()->TEST_WaitForCompact().ok());
2666
2667 // Verify L0 -> L1 compaction does fail.
2668 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
2669
2670 // Verify all L0 files are still there.
2671 ASSERT_EQ(NumTableFilesAtLevel(0), previous_num_level0_files);
2672
2673 // All key-values must exist after compaction fails.
2674 for (int k = 0; k < kNumInsertedKeys; ++k) {
2675 ASSERT_EQ(values[k], Get(keys[k]));
2676 }
2677
2678 env_->non_writable_count_ = 0;
2679
2680 // Make sure RocksDB will not get into corrupted state.
2681 Reopen(options);
2682
2683 // Verify again after reopen.
2684 for (int k = 0; k < kNumInsertedKeys; ++k) {
2685 ASSERT_EQ(values[k], Get(keys[k]));
2686 }
2687}
2688
2689TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) {
2690 // iter 1 -- delete_obsolete_files_period_micros == 0
2691 for (int iter = 0; iter < 2; ++iter) {
2692 // This test triggers move compaction and verifies that the file is not
2693 // deleted when it's part of move compaction
2694 Options options = CurrentOptions();
2695 options.env = env_;
2696 if (iter == 1) {
2697 options.delete_obsolete_files_period_micros = 0;
2698 }
2699 options.create_if_missing = true;
2700 options.level0_file_num_compaction_trigger =
2701 2; // trigger compaction when we have 2 files
2702 OnFileDeletionListener* listener = new OnFileDeletionListener();
2703 options.listeners.emplace_back(listener);
2704 options.max_subcompactions = max_subcompactions_;
2705 DestroyAndReopen(options);
2706
2707 Random rnd(301);
2708 // Create two 1MB sst files
2709 for (int i = 0; i < 2; ++i) {
2710 // Create 1MB sst file
2711 for (int j = 0; j < 100; ++j) {
2712 ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024)));
2713 }
2714 ASSERT_OK(Flush());
2715 }
2716 // this should execute L0->L1
2717 dbfull()->TEST_WaitForCompact();
2718 ASSERT_EQ("0,1", FilesPerLevel(0));
2719
2720 // block compactions
2721 test::SleepingBackgroundTask sleeping_task;
2722 env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
2723 Env::Priority::LOW);
2724
2725 options.max_bytes_for_level_base = 1024 * 1024; // 1 MB
2726 Reopen(options);
2727 std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions()));
2728 ASSERT_EQ("0,1", FilesPerLevel(0));
2729 // let compactions go
2730 sleeping_task.WakeUp();
2731 sleeping_task.WaitUntilDone();
2732
2733 // this should execute L1->L2 (move)
2734 dbfull()->TEST_WaitForCompact();
2735
2736 ASSERT_EQ("0,0,1", FilesPerLevel(0));
2737
2738 std::vector<LiveFileMetaData> metadata;
2739 db_->GetLiveFilesMetaData(&metadata);
2740 ASSERT_EQ(metadata.size(), 1U);
2741 auto moved_file_name = metadata[0].name;
2742
2743 // Create two more 1MB sst files
2744 for (int i = 0; i < 2; ++i) {
2745 // Create 1MB sst file
2746 for (int j = 0; j < 100; ++j) {
2747 ASSERT_OK(Put(Key(i * 50 + j + 100), RandomString(&rnd, 10 * 1024)));
2748 }
2749 ASSERT_OK(Flush());
2750 }
2751 // this should execute both L0->L1 and L1->L2 (merge with previous file)
2752 dbfull()->TEST_WaitForCompact();
2753
2754 ASSERT_EQ("0,0,2", FilesPerLevel(0));
2755
2756 // iterator is holding the file
2757 ASSERT_OK(env_->FileExists(dbname_ + moved_file_name));
2758
2759 listener->SetExpectedFileName(dbname_ + moved_file_name);
2760 iterator.reset();
2761
2762 // this file should have been compacted away
2763 ASSERT_NOK(env_->FileExists(dbname_ + moved_file_name));
2764 listener->VerifyMatchedCount(1);
2765 }
2766}
2767
2768TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) {
2769 if (!Zlib_Supported()) {
2770 return;
2771 }
2772 Options options = CurrentOptions();
2773 options.memtable_factory.reset(
2774 new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
2775 options.compaction_style = kCompactionStyleLevel;
2776 options.write_buffer_size = 110 << 10; // 110KB
2777 options.arena_block_size = 4 << 10;
2778 options.level0_file_num_compaction_trigger = 2;
2779 options.num_levels = 4;
2780 options.max_bytes_for_level_base = 400 * 1024;
2781 options.max_subcompactions = max_subcompactions_;
2782 // First two levels have no compression, so that a trivial move between
2783 // them will be allowed. Level 2 has Zlib compression so that a trivial
2784 // move to level 3 will not be allowed
2785 options.compression_per_level = {kNoCompression, kNoCompression,
2786 kZlibCompression};
2787 int matches = 0, didnt_match = 0, trivial_move = 0, non_trivial = 0;
2788
2789 rocksdb::SyncPoint::GetInstance()->SetCallBack(
2790 "Compaction::InputCompressionMatchesOutput:Matches",
11fdf7f2 2791 [&](void* /*arg*/) { matches++; });
7c673cae
FG
2792 rocksdb::SyncPoint::GetInstance()->SetCallBack(
2793 "Compaction::InputCompressionMatchesOutput:DidntMatch",
11fdf7f2 2794 [&](void* /*arg*/) { didnt_match++; });
7c673cae
FG
2795 rocksdb::SyncPoint::GetInstance()->SetCallBack(
2796 "DBImpl::BackgroundCompaction:NonTrivial",
11fdf7f2 2797 [&](void* /*arg*/) { non_trivial++; });
7c673cae
FG
2798 rocksdb::SyncPoint::GetInstance()->SetCallBack(
2799 "DBImpl::BackgroundCompaction:TrivialMove",
11fdf7f2 2800 [&](void* /*arg*/) { trivial_move++; });
7c673cae
FG
2801 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2802
2803 Reopen(options);
2804
2805 Random rnd(301);
2806 int key_idx = 0;
2807
2808 // First three 110KB files are going to level 0
2809 // After that, (100K, 200K)
2810 for (int num = 0; num < 3; num++) {
2811 GenerateNewFile(&rnd, &key_idx);
2812 }
2813
2814 // Another 110KB triggers a compaction to 400K file to fill up level 0
2815 GenerateNewFile(&rnd, &key_idx);
2816 ASSERT_EQ(4, GetSstFileCount(dbname_));
2817
2818 // (1, 4)
2819 GenerateNewFile(&rnd, &key_idx);
2820 ASSERT_EQ("1,4", FilesPerLevel(0));
2821
2822 // (1, 4, 1)
2823 GenerateNewFile(&rnd, &key_idx);
2824 ASSERT_EQ("1,4,1", FilesPerLevel(0));
2825
2826 // (1, 4, 2)
2827 GenerateNewFile(&rnd, &key_idx);
2828 ASSERT_EQ("1,4,2", FilesPerLevel(0));
2829
2830 // (1, 4, 3)
2831 GenerateNewFile(&rnd, &key_idx);
2832 ASSERT_EQ("1,4,3", FilesPerLevel(0));
2833
2834 // (1, 4, 4)
2835 GenerateNewFile(&rnd, &key_idx);
2836 ASSERT_EQ("1,4,4", FilesPerLevel(0));
2837
2838 // (1, 4, 5)
2839 GenerateNewFile(&rnd, &key_idx);
2840 ASSERT_EQ("1,4,5", FilesPerLevel(0));
2841
2842 // (1, 4, 6)
2843 GenerateNewFile(&rnd, &key_idx);
2844 ASSERT_EQ("1,4,6", FilesPerLevel(0));
2845
2846 // (1, 4, 7)
2847 GenerateNewFile(&rnd, &key_idx);
2848 ASSERT_EQ("1,4,7", FilesPerLevel(0));
2849
2850 // (1, 4, 8)
2851 GenerateNewFile(&rnd, &key_idx);
2852 ASSERT_EQ("1,4,8", FilesPerLevel(0));
2853
2854 ASSERT_EQ(matches, 12);
2855 // Currently, the test relies on the number of calls to
2856 // InputCompressionMatchesOutput() per compaction.
2857 const int kCallsToInputCompressionMatch = 2;
2858 ASSERT_EQ(didnt_match, 8 * kCallsToInputCompressionMatch);
2859 ASSERT_EQ(trivial_move, 12);
2860 ASSERT_EQ(non_trivial, 8);
2861
2862 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
2863
2864 for (int i = 0; i < key_idx; i++) {
2865 auto v = Get(Key(i));
2866 ASSERT_NE(v, "NOT_FOUND");
2867 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2868 }
2869
2870 Reopen(options);
2871
2872 for (int i = 0; i < key_idx; i++) {
2873 auto v = Get(Key(i));
2874 ASSERT_NE(v, "NOT_FOUND");
2875 ASSERT_TRUE(v.size() == 1 || v.size() == 990);
2876 }
2877
2878 Destroy(options);
2879}
2880
2881TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) {
2882 Options options = CurrentOptions();
2883 options.max_background_compactions = 5;
2884 options.soft_pending_compaction_bytes_limit = 0;
2885 options.hard_pending_compaction_bytes_limit = 100;
2886 options.create_if_missing = true;
2887 DestroyAndReopen(options);
7c673cae
FG
2888 ASSERT_EQ(100, db_->GetOptions().soft_pending_compaction_bytes_limit);
2889
7c673cae
FG
2890 options.max_background_compactions = 3;
2891 options.soft_pending_compaction_bytes_limit = 200;
2892 options.hard_pending_compaction_bytes_limit = 150;
2893 DestroyAndReopen(options);
7c673cae
FG
2894 ASSERT_EQ(150, db_->GetOptions().soft_pending_compaction_bytes_limit);
2895}
2896
2897// This tests for a bug that could cause two level0 compactions running
2898// concurrently
2899// TODO(aekmekji): Make sure that the reason this fails when run with
2900// max_subcompactions > 1 is not a correctness issue but just inherent to
2901// running parallel L0-L1 compactions
2902TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) {
2903 Options options = CurrentOptions();
2904 options.compaction_style = kCompactionStyleLevel;
2905 options.write_buffer_size = 110 << 10;
2906 options.arena_block_size = 4 << 10;
2907 options.level0_file_num_compaction_trigger = 4;
2908 options.num_levels = 4;
2909 options.compression = kNoCompression;
2910 options.max_bytes_for_level_base = 450 << 10;
2911 options.target_file_size_base = 98 << 10;
2912 options.max_write_buffer_number = 2;
2913 options.max_background_compactions = 2;
2914
2915 DestroyAndReopen(options);
2916
2917 // fill up the DB
2918 Random rnd(301);
2919 for (int num = 0; num < 10; num++) {
2920 GenerateNewRandomFile(&rnd);
2921 }
2922 db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
2923
2924 rocksdb::SyncPoint::GetInstance()->LoadDependency(
2925 {{"CompactionJob::Run():Start",
2926 "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1"},
2927 {"DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2",
2928 "CompactionJob::Run():End"}});
2929
2930 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2931
2932 // trigger L0 compaction
2933 for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
2934 num++) {
2935 GenerateNewRandomFile(&rnd, /* nowait */ true);
2936 ASSERT_OK(Flush());
2937 }
2938
2939 TEST_SYNC_POINT(
2940 "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1");
2941
2942 GenerateNewRandomFile(&rnd, /* nowait */ true);
2943 dbfull()->TEST_WaitForFlushMemTable();
2944 ASSERT_OK(experimental::SuggestCompactRange(db_, nullptr, nullptr));
2945 for (int num = 0; num < options.level0_file_num_compaction_trigger + 1;
2946 num++) {
2947 GenerateNewRandomFile(&rnd, /* nowait */ true);
2948 ASSERT_OK(Flush());
2949 }
2950
2951 TEST_SYNC_POINT(
2952 "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2");
2953 dbfull()->TEST_WaitForCompact();
2954}
2955
2956
2957TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) {
2958 int32_t trivial_move = 0;
2959 int32_t non_trivial_move = 0;
2960 rocksdb::SyncPoint::GetInstance()->SetCallBack(
2961 "DBImpl::BackgroundCompaction:TrivialMove",
11fdf7f2 2962 [&](void* /*arg*/) { trivial_move++; });
7c673cae
FG
2963 rocksdb::SyncPoint::GetInstance()->SetCallBack(
2964 "DBImpl::BackgroundCompaction:NonTrivial",
11fdf7f2 2965 [&](void* /*arg*/) { non_trivial_move++; });
7c673cae
FG
2966 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
2967
2968 Options options = CurrentOptions();
11fdf7f2 2969 options.target_file_size_base = 100000000;
7c673cae
FG
2970 options.write_buffer_size = 100000000;
2971 options.max_subcompactions = max_subcompactions_;
2972 DestroyAndReopen(options);
2973
2974 int32_t value_size = 10 * 1024; // 10 KB
2975
2976 Random rnd(301);
2977 std::vector<std::string> values;
2978 // File with keys [ 0 => 99 ]
2979 for (int i = 0; i < 100; i++) {
2980 values.push_back(RandomString(&rnd, value_size));
2981 ASSERT_OK(Put(Key(i), values[i]));
2982 }
2983 ASSERT_OK(Flush());
2984
2985 ASSERT_EQ("1", FilesPerLevel(0));
2986 // Compaction will do L0=>L1 (trivial move) then move L1 files to L3
2987 CompactRangeOptions compact_options;
2988 compact_options.change_level = true;
2989 compact_options.target_level = 3;
2990 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
2991 ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
2992 ASSERT_EQ(trivial_move, 1);
2993 ASSERT_EQ(non_trivial_move, 0);
2994
2995 // File with keys [ 100 => 199 ]
2996 for (int i = 100; i < 200; i++) {
2997 values.push_back(RandomString(&rnd, value_size));
2998 ASSERT_OK(Put(Key(i), values[i]));
2999 }
3000 ASSERT_OK(Flush());
3001
3002 ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
3003 // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
3004 // then compacte the bottommost level L3=>L3 (non trivial move)
3005 compact_options = CompactRangeOptions();
3006 compact_options.bottommost_level_compaction =
3007 BottommostLevelCompaction::kForce;
3008 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3009 ASSERT_EQ("0,0,0,1", FilesPerLevel(0));
3010 ASSERT_EQ(trivial_move, 4);
3011 ASSERT_EQ(non_trivial_move, 1);
3012
3013 // File with keys [ 200 => 299 ]
3014 for (int i = 200; i < 300; i++) {
3015 values.push_back(RandomString(&rnd, value_size));
3016 ASSERT_OK(Put(Key(i), values[i]));
3017 }
3018 ASSERT_OK(Flush());
3019
3020 ASSERT_EQ("1,0,0,1", FilesPerLevel(0));
3021 trivial_move = 0;
3022 non_trivial_move = 0;
3023 compact_options = CompactRangeOptions();
3024 compact_options.bottommost_level_compaction =
3025 BottommostLevelCompaction::kSkip;
3026 // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves)
3027 // and will skip bottommost level compaction
3028 ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
3029 ASSERT_EQ("0,0,0,2", FilesPerLevel(0));
3030 ASSERT_EQ(trivial_move, 3);
3031 ASSERT_EQ(non_trivial_move, 0);
3032
3033 for (int i = 0; i < 300; i++) {
3034 ASSERT_EQ(Get(Key(i)), values[i]);
3035 }
3036
3037 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3038}
3039
3040TEST_P(DBCompactionTestWithParam, IntraL0Compaction) {
3041 Options options = CurrentOptions();
3042 options.compression = kNoCompression;
3043 options.level0_file_num_compaction_trigger = 5;
3044 options.max_background_compactions = 2;
3045 options.max_subcompactions = max_subcompactions_;
3046 DestroyAndReopen(options);
3047
3048 const size_t kValueSize = 1 << 20;
3049 Random rnd(301);
3050 std::string value(RandomString(&rnd, kValueSize));
3051
3052 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3053 {{"LevelCompactionPicker::PickCompactionBySize:0",
3054 "CompactionJob::Run():Start"}});
3055 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3056
3057 // index: 0 1 2 3 4 5 6 7 8 9
3058 // size: 1MB 1MB 1MB 1MB 1MB 2MB 1MB 1MB 1MB 1MB
3059 // score: 1.5 1.3 1.5 2.0 inf
3060 //
3061 // Files 0-4 will be included in an L0->L1 compaction.
3062 //
3063 // L0->L0 will be triggered since the sync points guarantee compaction to base
3064 // level is still blocked when files 5-9 trigger another compaction.
3065 //
3066 // Files 6-9 are the longest span of available files for which
3067 // work-per-deleted-file decreases (see "score" row above).
3068 for (int i = 0; i < 10; ++i) {
11fdf7f2
TL
3069 ASSERT_OK(Put(Key(0), "")); // prevents trivial move
3070 if (i == 5) {
3071 ASSERT_OK(Put(Key(i + 1), value + value));
3072 } else {
3073 ASSERT_OK(Put(Key(i + 1), value));
7c673cae
FG
3074 }
3075 ASSERT_OK(Flush());
3076 }
3077 dbfull()->TEST_WaitForCompact();
3078 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3079
3080 std::vector<std::vector<FileMetaData>> level_to_files;
3081 dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3082 &level_to_files);
3083 ASSERT_GE(level_to_files.size(), 2); // at least L0 and L1
3084 // L0 has the 2MB file (not compacted) and 4MB file (output of L0->L0)
3085 ASSERT_EQ(2, level_to_files[0].size());
3086 ASSERT_GT(level_to_files[1].size(), 0);
3087 for (int i = 0; i < 2; ++i) {
11fdf7f2 3088 ASSERT_GE(level_to_files[0][i].fd.file_size, 1 << 21);
7c673cae
FG
3089 }
3090}
3091
11fdf7f2
TL
3092TEST_P(DBCompactionTestWithParam, IntraL0CompactionDoesNotObsoleteDeletions) {
3093 // regression test for issue #2722: L0->L0 compaction can resurrect deleted
3094 // keys from older L0 files if L1+ files' key-ranges do not include the key.
7c673cae 3095 Options options = CurrentOptions();
11fdf7f2
TL
3096 options.compression = kNoCompression;
3097 options.level0_file_num_compaction_trigger = 5;
3098 options.max_background_compactions = 2;
3099 options.max_subcompactions = max_subcompactions_;
3100 DestroyAndReopen(options);
3101
3102 const size_t kValueSize = 1 << 20;
3103 Random rnd(301);
3104 std::string value(RandomString(&rnd, kValueSize));
3105
3106 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3107 {{"LevelCompactionPicker::PickCompactionBySize:0",
3108 "CompactionJob::Run():Start"}});
3109 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3110
3111 // index: 0 1 2 3 4 5 6 7 8 9
3112 // size: 1MB 1MB 1MB 1MB 1MB 1MB 1MB 1MB 1MB 1MB
3113 // score: 1.25 1.33 1.5 2.0 inf
3114 //
3115 // Files 0-4 will be included in an L0->L1 compaction.
3116 //
3117 // L0->L0 will be triggered since the sync points guarantee compaction to base
3118 // level is still blocked when files 5-9 trigger another compaction. All files
3119 // 5-9 are included in the L0->L0 due to work-per-deleted file decreasing.
3120 //
3121 // Put a key-value in files 0-4. Delete that key in files 5-9. Verify the
3122 // L0->L0 preserves the deletion such that the key remains deleted.
3123 for (int i = 0; i < 10; ++i) {
3124 // key 0 serves both to prevent trivial move and as the key we want to
3125 // verify is not resurrected by L0->L0 compaction.
3126 if (i < 5) {
3127 ASSERT_OK(Put(Key(0), ""));
3128 } else {
3129 ASSERT_OK(Delete(Key(0)));
3130 }
3131 ASSERT_OK(Put(Key(i + 1), value));
3132 ASSERT_OK(Flush());
3133 }
3134 dbfull()->TEST_WaitForCompact();
3135 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3136
3137 std::vector<std::vector<FileMetaData>> level_to_files;
3138 dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(),
3139 &level_to_files);
3140 ASSERT_GE(level_to_files.size(), 2); // at least L0 and L1
3141 // L0 has a single output file from L0->L0
3142 ASSERT_EQ(1, level_to_files[0].size());
3143 ASSERT_GT(level_to_files[1].size(), 0);
3144 ASSERT_GE(level_to_files[0][0].fd.file_size, 1 << 22);
3145
3146 ReadOptions roptions;
3147 std::string result;
3148 ASSERT_TRUE(db_->Get(roptions, Key(0), &result).IsNotFound());
3149}
3150
3151TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) {
3152 const int kNumFilesTrigger = 3;
3153 Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM);
3154 for (bool use_universal_compaction : {false, true}) {
3155 Options options = CurrentOptions();
3156 if (use_universal_compaction) {
3157 options.compaction_style = kCompactionStyleUniversal;
3158 } else {
3159 options.compaction_style = kCompactionStyleLevel;
3160 options.level_compaction_dynamic_level_bytes = true;
3161 }
3162 options.num_levels = 4;
3163 options.write_buffer_size = 100 << 10; // 100KB
3164 options.target_file_size_base = 32 << 10; // 32KB
3165 options.level0_file_num_compaction_trigger = kNumFilesTrigger;
3166 // Trigger compaction if size amplification exceeds 110%
3167 options.compaction_options_universal.max_size_amplification_percent = 110;
3168 DestroyAndReopen(options);
3169
3170 int num_bottom_pri_compactions = 0;
3171 SyncPoint::GetInstance()->SetCallBack(
3172 "DBImpl::BGWorkBottomCompaction",
3173 [&](void* /*arg*/) { ++num_bottom_pri_compactions; });
3174 SyncPoint::GetInstance()->EnableProcessing();
3175
3176 Random rnd(301);
3177 for (int num = 0; num < kNumFilesTrigger; num++) {
3178 ASSERT_EQ(NumSortedRuns(), num);
3179 int key_idx = 0;
3180 GenerateNewFile(&rnd, &key_idx);
3181 }
3182 dbfull()->TEST_WaitForCompact();
3183
3184 ASSERT_EQ(1, num_bottom_pri_compactions);
3185
3186 // Verify that size amplification did occur
3187 ASSERT_EQ(NumSortedRuns(), 1);
3188 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3189 }
3190 Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM);
3191}
3192
3193TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) {
3194 // Deletions can be dropped when compacted to non-last level if they fall
3195 // outside the lower-level files' key-ranges.
3196 const int kNumL0Files = 4;
3197 Options options = CurrentOptions();
3198 options.level0_file_num_compaction_trigger = kNumL0Files;
3199 options.statistics = rocksdb::CreateDBStatistics();
3200 DestroyAndReopen(options);
3201
3202 // put key 1 and 3 in separate L1, L2 files.
3203 // So key 0, 2, and 4+ fall outside these levels' key-ranges.
3204 for (int level = 2; level >= 1; --level) {
3205 for (int i = 0; i < 2; ++i) {
3206 Put(Key(2 * i + 1), "val");
3207 Flush();
3208 }
3209 MoveFilesToLevel(level);
3210 ASSERT_EQ(2, NumTableFilesAtLevel(level));
3211 }
3212
3213 // Delete keys in range [1, 4]. These L0 files will be compacted with L1:
3214 // - Tombstones for keys 2 and 4 can be dropped early.
3215 // - Tombstones for keys 1 and 3 must be kept due to L2 files' key-ranges.
3216 for (int i = 0; i < kNumL0Files; ++i) {
3217 Put(Key(0), "val"); // sentinel to prevent trivial move
3218 Delete(Key(i + 1));
3219 Flush();
3220 }
3221 dbfull()->TEST_WaitForCompact();
3222
3223 for (int i = 0; i < kNumL0Files; ++i) {
3224 std::string value;
3225 ASSERT_TRUE(db_->Get(ReadOptions(), Key(i + 1), &value).IsNotFound());
3226 }
3227 ASSERT_EQ(2, options.statistics->getTickerCount(
3228 COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE));
3229 ASSERT_EQ(2,
3230 options.statistics->getTickerCount(COMPACTION_KEY_DROP_OBSOLETE));
3231}
3232
3233TEST_F(DBCompactionTest, CompactFilesPendingL0Bug) {
3234 // https://www.facebook.com/groups/rocksdb.dev/permalink/1389452781153232/
3235 // CompactFiles() had a bug where it failed to pick a compaction when an L0
3236 // compaction existed, but marked it as scheduled anyways. It'd never be
3237 // unmarked as scheduled, so future compactions or DB close could hang.
3238 const int kNumL0Files = 5;
3239 Options options = CurrentOptions();
3240 options.level0_file_num_compaction_trigger = kNumL0Files - 1;
3241 options.max_background_compactions = 2;
3242 DestroyAndReopen(options);
3243
3244 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3245 {{"LevelCompactionPicker::PickCompaction:Return",
3246 "DBCompactionTest::CompactFilesPendingL0Bug:Picked"},
3247 {"DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted",
3248 "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}});
3249 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3250
3251 auto schedule_multi_compaction_token =
3252 dbfull()->TEST_write_controler().GetCompactionPressureToken();
3253
3254 // Files 0-3 will be included in an L0->L1 compaction.
3255 //
3256 // File 4 will be included in a call to CompactFiles() while the first
3257 // compaction is running.
3258 for (int i = 0; i < kNumL0Files - 1; ++i) {
3259 ASSERT_OK(Put(Key(0), "val")); // sentinel to prevent trivial move
3260 ASSERT_OK(Put(Key(i + 1), "val"));
3261 ASSERT_OK(Flush());
3262 }
3263 TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:Picked");
3264 // file 4 flushed after 0-3 picked
3265 ASSERT_OK(Put(Key(kNumL0Files), "val"));
3266 ASSERT_OK(Flush());
3267
3268 // previously DB close would hang forever as this situation caused scheduled
3269 // compactions count to never decrement to zero.
3270 ColumnFamilyMetaData cf_meta;
3271 dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
3272 ASSERT_EQ(kNumL0Files, cf_meta.levels[0].files.size());
3273 std::vector<std::string> input_filenames;
3274 input_filenames.push_back(cf_meta.levels[0].files.front().name);
3275 ASSERT_OK(dbfull()
3276 ->CompactFiles(CompactionOptions(), input_filenames,
3277 0 /* output_level */));
3278 TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted");
3279 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3280}
3281
3282TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) {
3283 // Regression test for bug of not pulling in L0 files that overlap the user-
3284 // specified input files in time- and key-ranges.
3285 Put(Key(0), "old_val");
3286 Flush();
3287 Put(Key(0), "new_val");
3288 Flush();
3289
3290 ColumnFamilyMetaData cf_meta;
3291 dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta);
3292 ASSERT_GE(cf_meta.levels.size(), 2);
3293 ASSERT_EQ(2, cf_meta.levels[0].files.size());
3294
3295 // Compacting {new L0 file, L1 file} should pull in the old L0 file since it
3296 // overlaps in key-range and time-range.
3297 std::vector<std::string> input_filenames;
3298 input_filenames.push_back(cf_meta.levels[0].files.front().name);
3299 ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), input_filenames,
3300 1 /* output_level */));
3301 ASSERT_EQ("new_val", Get(Key(0)));
3302}
3303
3304TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) {
3305 // bottom-level files may contain deletions due to snapshots protecting the
3306 // deleted keys. Once the snapshot is released, we should see files with many
3307 // such deletions undergo single-file compactions.
3308 const int kNumKeysPerFile = 1024;
3309 const int kNumLevelFiles = 4;
3310 const int kValueSize = 128;
3311 Options options = CurrentOptions();
3312 options.compression = kNoCompression;
3313 options.level0_file_num_compaction_trigger = kNumLevelFiles;
3314 // inflate it a bit to account for key/metadata overhead
3315 options.target_file_size_base = 120 * kNumKeysPerFile * kValueSize / 100;
3316 Reopen(options);
3317
3318 Random rnd(301);
3319 const Snapshot* snapshot = nullptr;
3320 for (int i = 0; i < kNumLevelFiles; ++i) {
3321 for (int j = 0; j < kNumKeysPerFile; ++j) {
3322 ASSERT_OK(
3323 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3324 }
3325 if (i == kNumLevelFiles - 1) {
3326 snapshot = db_->GetSnapshot();
3327 // delete every other key after grabbing a snapshot, so these deletions
3328 // and the keys they cover can't be dropped until after the snapshot is
3329 // released.
3330 for (int j = 0; j < kNumLevelFiles * kNumKeysPerFile; j += 2) {
3331 ASSERT_OK(Delete(Key(j)));
3332 }
3333 }
3334 Flush();
3335 if (i < kNumLevelFiles - 1) {
3336 ASSERT_EQ(i + 1, NumTableFilesAtLevel(0));
3337 }
3338 }
3339 dbfull()->TEST_WaitForCompact();
3340 ASSERT_EQ(kNumLevelFiles, NumTableFilesAtLevel(1));
3341
3342 std::vector<LiveFileMetaData> pre_release_metadata, post_release_metadata;
3343 db_->GetLiveFilesMetaData(&pre_release_metadata);
3344 // just need to bump seqnum so ReleaseSnapshot knows the newest key in the SST
3345 // files does not need to be preserved in case of a future snapshot.
3346 ASSERT_OK(Put(Key(0), "val"));
3347 // release snapshot and wait for compactions to finish. Single-file
3348 // compactions should be triggered, which reduce the size of each bottom-level
3349 // file without changing file count.
3350 db_->ReleaseSnapshot(snapshot);
3351 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3352 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3353 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3354 ASSERT_TRUE(compaction->compaction_reason() ==
3355 CompactionReason::kBottommostFiles);
3356 });
3357 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3358 dbfull()->TEST_WaitForCompact();
3359 db_->GetLiveFilesMetaData(&post_release_metadata);
3360 ASSERT_EQ(pre_release_metadata.size(), post_release_metadata.size());
3361
3362 for (size_t i = 0; i < pre_release_metadata.size(); ++i) {
3363 const auto& pre_file = pre_release_metadata[i];
3364 const auto& post_file = post_release_metadata[i];
3365 ASSERT_EQ(1, pre_file.level);
3366 ASSERT_EQ(1, post_file.level);
3367 // each file is smaller than it was before as it was rewritten without
3368 // deletion markers/deleted keys.
3369 ASSERT_LT(post_file.size, pre_file.size);
3370 }
3371 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3372}
3373
3374TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
3375 const int kNumKeysPerFile = 32;
3376 const int kNumLevelFiles = 2;
3377 const int kValueSize = 1024;
3378
3379 Options options = CurrentOptions();
3380 options.compression = kNoCompression;
3381 options.ttl = 24 * 60 * 60; // 24 hours
3382 options.max_open_files = -1;
3383 env_->time_elapse_only_sleep_ = false;
3384 options.env = env_;
3385
3386 env_->addon_time_.store(0);
3387 DestroyAndReopen(options);
3388
3389 Random rnd(301);
3390 for (int i = 0; i < kNumLevelFiles; ++i) {
3391 for (int j = 0; j < kNumKeysPerFile; ++j) {
3392 ASSERT_OK(
3393 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3394 }
3395 Flush();
3396 }
3397 dbfull()->TEST_WaitForCompact();
3398 MoveFilesToLevel(3);
3399 ASSERT_EQ("0,0,0,2", FilesPerLevel());
3400
3401 // Delete previously written keys.
3402 for (int i = 0; i < kNumLevelFiles; ++i) {
3403 for (int j = 0; j < kNumKeysPerFile; ++j) {
3404 ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
3405 }
3406 Flush();
3407 }
3408 dbfull()->TEST_WaitForCompact();
3409 ASSERT_EQ("2,0,0,2", FilesPerLevel());
3410 MoveFilesToLevel(1);
3411 ASSERT_EQ("0,2,0,2", FilesPerLevel());
3412
3413 env_->addon_time_.fetch_add(36 * 60 * 60); // 36 hours
3414 ASSERT_EQ("0,2,0,2", FilesPerLevel());
3415
3416 // Just do a simple write + flush so that the Ttl expired files get
3417 // compacted.
3418 ASSERT_OK(Put("a", "1"));
3419 Flush();
3420 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3421 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3422 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3423 ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
3424 });
3425 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3426 dbfull()->TEST_WaitForCompact();
3427 // All non-L0 files are deleted, as they contained only deleted data.
3428 ASSERT_EQ("1", FilesPerLevel());
3429 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3430
3431 // Test dynamically changing ttl.
3432
3433 env_->addon_time_.store(0);
3434 DestroyAndReopen(options);
3435
3436 for (int i = 0; i < kNumLevelFiles; ++i) {
3437 for (int j = 0; j < kNumKeysPerFile; ++j) {
3438 ASSERT_OK(
3439 Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize)));
3440 }
3441 Flush();
3442 }
3443 dbfull()->TEST_WaitForCompact();
3444 MoveFilesToLevel(3);
3445 ASSERT_EQ("0,0,0,2", FilesPerLevel());
3446
3447 // Delete previously written keys.
3448 for (int i = 0; i < kNumLevelFiles; ++i) {
3449 for (int j = 0; j < kNumKeysPerFile; ++j) {
3450 ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j)));
3451 }
3452 Flush();
3453 }
3454 dbfull()->TEST_WaitForCompact();
3455 ASSERT_EQ("2,0,0,2", FilesPerLevel());
3456 MoveFilesToLevel(1);
3457 ASSERT_EQ("0,2,0,2", FilesPerLevel());
3458
3459 // Move time forward by 12 hours, and make sure that compaction still doesn't
3460 // trigger as ttl is set to 24 hours.
3461 env_->addon_time_.fetch_add(12 * 60 * 60);
3462 ASSERT_OK(Put("a", "1"));
3463 Flush();
3464 dbfull()->TEST_WaitForCompact();
3465 ASSERT_EQ("1,2,0,2", FilesPerLevel());
3466
3467 rocksdb::SyncPoint::GetInstance()->SetCallBack(
3468 "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) {
3469 Compaction* compaction = reinterpret_cast<Compaction*>(arg);
3470 ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl);
3471 });
3472 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3473
3474 // Dynamically change ttl to 10 hours.
3475 // This should trigger a ttl compaction, as 12 hours have already passed.
3476 ASSERT_OK(dbfull()->SetOptions({{"ttl", "36000"}}));
3477 dbfull()->TEST_WaitForCompact();
3478 // All non-L0 files are deleted, as they contained only deleted data.
3479 ASSERT_EQ("1", FilesPerLevel());
3480 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3481}
3482
3483TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) {
3484 // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
3485 // compaction only triggers flush after it's sure stall won't be triggered for
3486 // L0 file count going too high.
3487 const int kNumL0FilesTrigger = 4;
3488 const int kNumL0FilesLimit = 8;
3489 // i == 0: verifies normal case where stall is avoided by delay
3490 // i == 1: verifies no delay in edge case where stall trigger is same as
3491 // compaction trigger, so stall can't be avoided
3492 for (int i = 0; i < 2; ++i) {
3493 Options options = CurrentOptions();
3494 options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
3495 if (i == 0) {
3496 options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
3497 } else {
3498 options.level0_file_num_compaction_trigger = kNumL0FilesLimit;
3499 }
3500 Reopen(options);
3501
3502 if (i == 0) {
3503 // ensure the auto compaction doesn't finish until manual compaction has
3504 // had a chance to be delayed.
3505 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3506 {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
3507 "CompactionJob::Run():End"}});
3508 } else {
3509 // ensure the auto-compaction doesn't finish until manual compaction has
3510 // continued without delay.
3511 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3512 {{"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
3513 }
3514 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3515
3516 Random rnd(301);
3517 for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
3518 for (int k = 0; k < 2; ++k) {
3519 ASSERT_OK(Put(Key(k), RandomString(&rnd, 1024)));
3520 }
3521 Flush();
3522 }
3523 auto manual_compaction_thread = port::Thread([this]() {
3524 CompactRangeOptions cro;
3525 cro.allow_write_stall = false;
3526 db_->CompactRange(cro, nullptr, nullptr);
3527 });
3528
3529 manual_compaction_thread.join();
3530 dbfull()->TEST_WaitForCompact();
3531 ASSERT_EQ(0, NumTableFilesAtLevel(0));
3532 ASSERT_GT(NumTableFilesAtLevel(1), 0);
3533 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3534 }
3535}
3536
3537TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) {
3538 // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual
3539 // compaction only triggers flush after it's sure stall won't be triggered for
3540 // immutable memtable count going too high.
3541 const int kNumImmMemTableLimit = 8;
3542 // i == 0: verifies normal case where stall is avoided by delay
3543 // i == 1: verifies no delay in edge case where stall trigger is same as flush
3544 // trigger, so stall can't be avoided
3545 for (int i = 0; i < 2; ++i) {
3546 Options options = CurrentOptions();
3547 options.disable_auto_compactions = true;
3548 // the delay limit is one less than the stop limit. This test focuses on
3549 // avoiding delay limit, but this option sets stop limit, so add one.
3550 options.max_write_buffer_number = kNumImmMemTableLimit + 1;
3551 if (i == 1) {
3552 options.min_write_buffer_number_to_merge = kNumImmMemTableLimit;
3553 }
3554 Reopen(options);
3555
3556 if (i == 0) {
3557 // ensure the flush doesn't finish until manual compaction has had a
3558 // chance to be delayed.
3559 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3560 {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
3561 "FlushJob::WriteLevel0Table"}});
3562 } else {
3563 // ensure the flush doesn't finish until manual compaction has continued
3564 // without delay.
3565 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3566 {{"DBImpl::FlushMemTable:StallWaitDone",
3567 "FlushJob::WriteLevel0Table"}});
3568 }
3569 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3570
3571 Random rnd(301);
3572 for (int j = 0; j < kNumImmMemTableLimit - 1; ++j) {
3573 ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024)));
3574 FlushOptions flush_opts;
3575 flush_opts.wait = false;
3576 flush_opts.allow_write_stall = true;
3577 dbfull()->Flush(flush_opts);
3578 }
3579
3580 auto manual_compaction_thread = port::Thread([this]() {
3581 CompactRangeOptions cro;
3582 cro.allow_write_stall = false;
3583 db_->CompactRange(cro, nullptr, nullptr);
3584 });
3585
3586 manual_compaction_thread.join();
3587 dbfull()->TEST_WaitForFlushMemTable();
3588 ASSERT_EQ(0, NumTableFilesAtLevel(0));
3589 ASSERT_GT(NumTableFilesAtLevel(1), 0);
3590 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3591 }
3592}
3593
3594TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) {
3595 // Verify that, when `CompactRangeOptions::allow_write_stall == false`, delay
3596 // does not hang if CF is dropped or DB is closed
3597 const int kNumL0FilesTrigger = 4;
3598 const int kNumL0FilesLimit = 8;
3599 Options options = CurrentOptions();
3600 options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
3601 options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
3602 // i == 0: DB::DropColumnFamily() on CompactRange's target CF unblocks it
3603 // i == 1: DB::CancelAllBackgroundWork() unblocks CompactRange. This is to
3604 // simulate what happens during Close as we can't call Close (it
3605 // blocks on the auto-compaction, making a cycle).
3606 for (int i = 0; i < 2; ++i) {
3607 CreateAndReopenWithCF({"one"}, options);
3608 // The calls to close CF/DB wait until the manual compaction stalls.
3609 // The auto-compaction waits until the manual compaction finishes to ensure
3610 // the signal comes from closing CF/DB, not from compaction making progress.
3611 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3612 {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
3613 "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"},
3614 {"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual",
3615 "CompactionJob::Run():End"}});
3616 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3617
3618 Random rnd(301);
3619 for (int j = 0; j < kNumL0FilesLimit - 1; ++j) {
3620 for (int k = 0; k < 2; ++k) {
3621 ASSERT_OK(Put(1, Key(k), RandomString(&rnd, 1024)));
3622 }
3623 Flush(1);
3624 }
3625 auto manual_compaction_thread = port::Thread([this]() {
3626 CompactRangeOptions cro;
3627 cro.allow_write_stall = false;
3628 ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr)
3629 .IsShutdownInProgress());
3630 });
3631
3632 TEST_SYNC_POINT(
3633 "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown");
3634 if (i == 0) {
3635 ASSERT_OK(db_->DropColumnFamily(handles_[1]));
3636 } else {
3637 dbfull()->CancelAllBackgroundWork(false /* wait */);
3638 }
3639 manual_compaction_thread.join();
3640 TEST_SYNC_POINT(
3641 "DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual");
3642 dbfull()->TEST_WaitForCompact();
3643 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3644 }
3645}
3646
3647TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) {
3648 // Verify that, when `CompactRangeOptions::allow_write_stall == false`,
3649 // CompactRange skips its flush if the delay is long enough that the memtables
3650 // existing at the beginning of the call have already been flushed.
3651 const int kNumL0FilesTrigger = 4;
3652 const int kNumL0FilesLimit = 8;
3653 Options options = CurrentOptions();
3654 options.level0_slowdown_writes_trigger = kNumL0FilesLimit;
3655 options.level0_file_num_compaction_trigger = kNumL0FilesTrigger;
3656 Reopen(options);
3657
3658 Random rnd(301);
3659 // The manual flush includes the memtable that was active when CompactRange
3660 // began. So it unblocks CompactRange and precludes its flush. Throughout the
3661 // test, stall conditions are upheld via high L0 file count.
3662 rocksdb::SyncPoint::GetInstance()->LoadDependency(
3663 {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
3664 "DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"},
3665 {"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush",
3666 "DBImpl::FlushMemTable:StallWaitDone"},
3667 {"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}});
3668 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
3669
3670 //used for the delayable flushes
3671 FlushOptions flush_opts;
3672 flush_opts.allow_write_stall = true;
3673 for (int i = 0; i < kNumL0FilesLimit - 1; ++i) {
3674 for (int j = 0; j < 2; ++j) {
3675 ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024)));
3676 }
3677 dbfull()->Flush(flush_opts);
3678 }
3679 auto manual_compaction_thread = port::Thread([this]() {
3680 CompactRangeOptions cro;
3681 cro.allow_write_stall = false;
3682 db_->CompactRange(cro, nullptr, nullptr);
3683 });
3684
3685 TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush");
3686 Put(ToString(0), RandomString(&rnd, 1024));
3687 dbfull()->Flush(flush_opts);
3688 Put(ToString(0), RandomString(&rnd, 1024));
3689 TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush");
3690 manual_compaction_thread.join();
3691
3692 // If CompactRange's flush was skipped, the final Put above will still be
3693 // in the active memtable.
3694 std::string num_keys_in_memtable;
3695 db_->GetProperty(DB::Properties::kNumEntriesActiveMemTable, &num_keys_in_memtable);
3696 ASSERT_EQ(ToString(1), num_keys_in_memtable);
3697
3698 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
3699}
3700
3701TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) {
3702 // Verify memtable only gets flushed if it contains data overlapping the range
3703 // provided to `CompactRange`. Tests all kinds of overlap/non-overlap.
3704 const int kNumEndpointKeys = 5;
3705 std::string keys[kNumEndpointKeys] = {"a", "b", "c", "d", "e"};
3706 Options options = CurrentOptions();
3707 options.disable_auto_compactions = true;
3708 Reopen(options);
3709
3710 // One extra iteration for nullptr, which means left side of interval is
3711 // unbounded.
3712 for (int i = 0; i <= kNumEndpointKeys; ++i) {
3713 Slice begin;
3714 Slice* begin_ptr;
3715 if (i == 0) {
3716 begin_ptr = nullptr;
3717 } else {
3718 begin = keys[i - 1];
3719 begin_ptr = &begin;
3720 }
3721 // Start at `i` so right endpoint comes after left endpoint. One extra
3722 // iteration for nullptr, which means right side of interval is unbounded.
3723 for (int j = std::max(0, i - 1); j <= kNumEndpointKeys; ++j) {
3724 Slice end;
3725 Slice* end_ptr;
3726 if (j == kNumEndpointKeys) {
3727 end_ptr = nullptr;
3728 } else {
3729 end = keys[j];
3730 end_ptr = &end;
3731 }
3732 ASSERT_OK(Put("b", "val"));
3733 ASSERT_OK(Put("d", "val"));
3734 CompactRangeOptions compact_range_opts;
3735 ASSERT_OK(db_->CompactRange(compact_range_opts, begin_ptr, end_ptr));
3736
3737 uint64_t get_prop_tmp, num_memtable_entries = 0;
3738 ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesImmMemTables,
3739 &get_prop_tmp));
3740 num_memtable_entries += get_prop_tmp;
3741 ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable,
3742 &get_prop_tmp));
3743 num_memtable_entries += get_prop_tmp;
3744 if (begin_ptr == nullptr || end_ptr == nullptr ||
3745 (i <= 4 && j >= 1 && (begin != "c" || end != "c"))) {
3746 // In this case `CompactRange`'s range overlapped in some way with the
3747 // memtable's range, so flush should've happened. Then "b" and "d" won't
3748 // be in the memtable.
3749 ASSERT_EQ(0, num_memtable_entries);
3750 } else {
3751 ASSERT_EQ(2, num_memtable_entries);
3752 // flush anyways to prepare for next iteration
3753 db_->Flush(FlushOptions());
3754 }
3755 }
3756 }
3757}
3758
3759TEST_F(DBCompactionTest, CompactionStatsTest) {
3760 Options options = CurrentOptions();
3761 options.level0_file_num_compaction_trigger = 2;
3762 CompactionStatsCollector* collector = new CompactionStatsCollector();
3763 options.listeners.emplace_back(collector);
3764 DestroyAndReopen(options);
3765
3766 for (int i = 0; i < 32; i++) {
3767 for (int j = 0; j < 5000; j++) {
3768 Put(std::to_string(j), std::string(1, 'A'));
3769 }
3770 ASSERT_OK(Flush());
3771 ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
3772 }
3773 dbfull()->TEST_WaitForCompact();
3774 ColumnFamilyHandleImpl* cfh =
3775 static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily());
3776 ColumnFamilyData* cfd = cfh->cfd();
3777
3778 VerifyCompactionStats(*cfd, *collector);
3779}
3780
3781TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) {
3782 // LSM setup:
3783 // L1: [ba bz]
3784 // L2: [a b] [c d]
3785 // L3: [a b] [c d]
3786 //
3787 // Thread 1: Thread 2:
3788 // Begin compacting all L2->L3
3789 // Compact [ba bz] L1->L3
3790 // End compacting all L2->L3
3791 //
3792 // The compaction operation in thread 2 should be disallowed because the range
3793 // overlaps with the compaction in thread 1, which also covers that range in
3794 // L3.
3795 Options options = CurrentOptions();
3796 FlushedFileCollector* collector = new FlushedFileCollector();
3797 options.listeners.emplace_back(collector);
3798 Reopen(options);
3799
3800 for (int level = 3; level >= 2; --level) {
3801 ASSERT_OK(Put("a", "val"));
3802 ASSERT_OK(Put("b", "val"));
3803 ASSERT_OK(Flush());
3804 ASSERT_OK(Put("c", "val"));
3805 ASSERT_OK(Put("d", "val"));
3806 ASSERT_OK(Flush());
3807 MoveFilesToLevel(level);
3808 }
3809 ASSERT_OK(Put("ba", "val"));
3810 ASSERT_OK(Put("bz", "val"));
3811 ASSERT_OK(Flush());
3812 MoveFilesToLevel(1);
3813
3814 SyncPoint::GetInstance()->LoadDependency({
3815 {"CompactFilesImpl:0",
3816 "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin"},
3817 {"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End",
3818 "CompactFilesImpl:1"},
3819 });
3820 SyncPoint::GetInstance()->EnableProcessing();
3821
3822 auto bg_thread = port::Thread([&]() {
3823 // Thread 1
3824 std::vector<std::string> filenames = collector->GetFlushedFiles();
3825 filenames.pop_back();
3826 ASSERT_OK(db_->CompactFiles(CompactionOptions(), filenames,
3827 3 /* output_level */));
3828 });
3829
3830 // Thread 2
3831 TEST_SYNC_POINT(
3832 "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin");
3833 std::string filename = collector->GetFlushedFiles().back();
3834 ASSERT_FALSE(
3835 db_->CompactFiles(CompactionOptions(), {filename}, 3 /* output_level */)
3836 .ok());
3837 TEST_SYNC_POINT(
3838 "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End");
3839
3840 bg_thread.join();
3841}
3842
3843TEST_F(DBCompactionTest, CompactionHasEmptyOutput) {
3844 Options options = CurrentOptions();
3845 SstStatsCollector* collector = new SstStatsCollector();
3846 options.level0_file_num_compaction_trigger = 2;
3847 options.listeners.emplace_back(collector);
3848 Reopen(options);
3849
3850 // Make sure the L0 files overlap to prevent trivial move.
3851 ASSERT_OK(Put("a", "val"));
3852 ASSERT_OK(Put("b", "val"));
3853 ASSERT_OK(Flush());
3854 ASSERT_OK(Delete("a"));
3855 ASSERT_OK(Delete("b"));
3856 ASSERT_OK(Flush());
3857
3858 dbfull()->TEST_WaitForCompact();
3859 ASSERT_EQ(NumTableFilesAtLevel(0), 0);
3860 ASSERT_EQ(NumTableFilesAtLevel(1), 0);
3861
3862 // Expect one file creation to start for each flush, and zero for compaction
3863 // since no keys are written.
3864 ASSERT_EQ(2, collector->num_ssts_creation_started());
3865}
3866
3867INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
3868 ::testing::Values(std::make_tuple(1, true),
3869 std::make_tuple(1, false),
3870 std::make_tuple(4, true),
3871 std::make_tuple(4, false)));
3872
3873TEST_P(DBCompactionDirectIOTest, DirectIO) {
3874 Options options = CurrentOptions();
3875 Destroy(options);
3876 options.create_if_missing = true;
3877 options.disable_auto_compactions = true;
3878 options.use_direct_io_for_flush_and_compaction = GetParam();
3879 options.env = new MockEnv(Env::Default());
3880 Reopen(options);
3881 bool readahead = false;
3882 SyncPoint::GetInstance()->SetCallBack(
3883 "TableCache::NewIterator:for_compaction", [&](void* arg) {
3884 bool* use_direct_reads = static_cast<bool*>(arg);
3885 ASSERT_EQ(*use_direct_reads,
3886 options.use_direct_reads);
3887 });
3888 SyncPoint::GetInstance()->SetCallBack(
3889 "CompactionJob::OpenCompactionOutputFile", [&](void* arg) {
3890 bool* use_direct_writes = static_cast<bool*>(arg);
3891 ASSERT_EQ(*use_direct_writes,
3892 options.use_direct_io_for_flush_and_compaction);
3893 });
3894 if (options.use_direct_io_for_flush_and_compaction) {
3895 SyncPoint::GetInstance()->SetCallBack(
3896 "SanitizeOptions:direct_io", [&](void* /*arg*/) {
3897 readahead = true;
3898 });
3899 }
3900 SyncPoint::GetInstance()->EnableProcessing();
3901 CreateAndReopenWithCF({"pikachu"}, options);
3902 MakeTables(3, "p", "q", 1);
3903 ASSERT_EQ("1,1,1", FilesPerLevel(1));
3904 Compact(1, "p1", "p9");
3905 ASSERT_EQ(readahead, options.use_direct_reads);
3906 ASSERT_EQ("0,0,1", FilesPerLevel(1));
3907 Destroy(options);
3908 delete options.env;
3909}
3910
3911INSTANTIATE_TEST_CASE_P(DBCompactionDirectIOTest, DBCompactionDirectIOTest,
3912 testing::Bool());
7c673cae
FG
3913
3914class CompactionPriTest : public DBTestBase,
3915 public testing::WithParamInterface<uint32_t> {
3916 public:
3917 CompactionPriTest() : DBTestBase("/compaction_pri_test") {
3918 compaction_pri_ = GetParam();
3919 }
3920
3921 // Required if inheriting from testing::WithParamInterface<>
3922 static void SetUpTestCase() {}
3923 static void TearDownTestCase() {}
3924
3925 uint32_t compaction_pri_;
3926};
3927
3928TEST_P(CompactionPriTest, Test) {
3929 Options options = CurrentOptions();
3930 options.write_buffer_size = 16 * 1024;
3931 options.compaction_pri = static_cast<CompactionPri>(compaction_pri_);
3932 options.hard_pending_compaction_bytes_limit = 256 * 1024;
3933 options.max_bytes_for_level_base = 64 * 1024;
3934 options.max_bytes_for_level_multiplier = 4;
3935 options.compression = kNoCompression;
3936
3937 DestroyAndReopen(options);
3938
3939 Random rnd(301);
3940 const int kNKeys = 5000;
3941 int keys[kNKeys];
3942 for (int i = 0; i < kNKeys; i++) {
3943 keys[i] = i;
3944 }
3945 std::random_shuffle(std::begin(keys), std::end(keys));
3946
3947 for (int i = 0; i < kNKeys; i++) {
3948 ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 102)));
3949 }
3950
3951 dbfull()->TEST_WaitForCompact();
3952 for (int i = 0; i < kNKeys; i++) {
3953 ASSERT_NE("NOT_FOUND", Get(Key(i)));
3954 }
3955}
3956
3957INSTANTIATE_TEST_CASE_P(
3958 CompactionPriTest, CompactionPriTest,
3959 ::testing::Values(CompactionPri::kByCompensatedSize,
3960 CompactionPri::kOldestLargestSeqFirst,
3961 CompactionPri::kOldestSmallestSeqFirst,
3962 CompactionPri::kMinOverlappingRatio));
3963
11fdf7f2
TL
3964class NoopMergeOperator : public MergeOperator {
3965 public:
3966 NoopMergeOperator() {}
3967
3968 virtual bool FullMergeV2(const MergeOperationInput& /*merge_in*/,
3969 MergeOperationOutput* merge_out) const override {
3970 std::string val("bar");
3971 merge_out->new_value = val;
3972 return true;
3973 }
3974
3975 virtual const char* Name() const override { return "Noop"; }
3976};
3977
3978TEST_F(DBCompactionTest, PartialManualCompaction) {
3979 Options opts = CurrentOptions();
3980 opts.num_levels = 3;
3981 opts.level0_file_num_compaction_trigger = 10;
3982 opts.compression = kNoCompression;
3983 opts.merge_operator.reset(new NoopMergeOperator());
3984 opts.target_file_size_base = 10240;
3985 DestroyAndReopen(opts);
3986
3987 Random rnd(301);
3988 for (auto i = 0; i < 8; ++i) {
3989 for (auto j = 0; j < 10; ++j) {
3990 Merge("foo", RandomString(&rnd, 1024));
3991 }
3992 Flush();
3993 }
3994
3995 MoveFilesToLevel(2);
3996
3997 std::string prop;
3998 EXPECT_TRUE(dbfull()->GetProperty(DB::Properties::kLiveSstFilesSize, &prop));
3999 uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2;
4000 ASSERT_OK(dbfull()->SetOptions(
4001 {{"max_compaction_bytes", std::to_string(max_compaction_bytes)}}));
4002
4003 CompactRangeOptions cro;
4004 cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
4005 dbfull()->CompactRange(cro, nullptr, nullptr);
4006}
4007
7c673cae
FG
4008#endif // !defined(ROCKSDB_LITE)
4009} // namespace rocksdb
4010
4011int main(int argc, char** argv) {
4012#if !defined(ROCKSDB_LITE)
4013 rocksdb::port::InstallStackTraceHandler();
4014 ::testing::InitGoogleTest(&argc, argv);
4015 return RUN_ALL_TESTS();
4016#else
11fdf7f2
TL
4017 (void) argc;
4018 (void) argv;
7c673cae
FG
4019 return 0;
4020#endif
4021}