]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "db/db_test_util.h" | |
11 | #include "port/stack_trace.h" | |
12 | #include "port/port.h" | |
13 | #include "rocksdb/experimental.h" | |
14 | #include "rocksdb/utilities/convenience.h" | |
15 | #include "util/sync_point.h" | |
16 | namespace rocksdb { | |
17 | ||
18 | // SYNC_POINT is not supported in released Windows mode. | |
19 | #if !defined(ROCKSDB_LITE) | |
20 | ||
21 | class DBCompactionTest : public DBTestBase { | |
22 | public: | |
23 | DBCompactionTest() : DBTestBase("/db_compaction_test") {} | |
24 | }; | |
25 | ||
26 | class DBCompactionTestWithParam | |
27 | : public DBTestBase, | |
28 | public testing::WithParamInterface<std::tuple<uint32_t, bool>> { | |
29 | public: | |
30 | DBCompactionTestWithParam() : DBTestBase("/db_compaction_test") { | |
31 | max_subcompactions_ = std::get<0>(GetParam()); | |
32 | exclusive_manual_compaction_ = std::get<1>(GetParam()); | |
33 | } | |
34 | ||
35 | // Required if inheriting from testing::WithParamInterface<> | |
36 | static void SetUpTestCase() {} | |
37 | static void TearDownTestCase() {} | |
38 | ||
39 | uint32_t max_subcompactions_; | |
40 | bool exclusive_manual_compaction_; | |
41 | }; | |
42 | ||
43 | class DBCompactionDirectIOTest : public DBCompactionTest, | |
44 | public ::testing::WithParamInterface<bool> { | |
45 | public: | |
46 | DBCompactionDirectIOTest() : DBCompactionTest() {} | |
47 | }; | |
48 | ||
49 | namespace { | |
50 | ||
51 | class FlushedFileCollector : public EventListener { | |
52 | public: | |
53 | FlushedFileCollector() {} | |
54 | ~FlushedFileCollector() {} | |
55 | ||
11fdf7f2 | 56 | virtual void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override { |
7c673cae FG |
57 | std::lock_guard<std::mutex> lock(mutex_); |
58 | flushed_files_.push_back(info.file_path); | |
59 | } | |
60 | ||
61 | std::vector<std::string> GetFlushedFiles() { | |
62 | std::lock_guard<std::mutex> lock(mutex_); | |
63 | std::vector<std::string> result; | |
64 | for (auto fname : flushed_files_) { | |
65 | result.push_back(fname); | |
66 | } | |
67 | return result; | |
68 | } | |
69 | ||
70 | void ClearFlushedFiles() { flushed_files_.clear(); } | |
71 | ||
72 | private: | |
73 | std::vector<std::string> flushed_files_; | |
74 | std::mutex mutex_; | |
75 | }; | |
76 | ||
11fdf7f2 TL |
77 | class CompactionStatsCollector : public EventListener { |
78 | public: | |
79 | CompactionStatsCollector() | |
80 | : compaction_completed_(static_cast<int>(CompactionReason::kNumOfReasons)) { | |
81 | for (auto& v : compaction_completed_) { | |
82 | v.store(0); | |
83 | } | |
84 | } | |
85 | ||
86 | ~CompactionStatsCollector() {} | |
87 | ||
88 | virtual void OnCompactionCompleted(DB* /* db */, | |
89 | const CompactionJobInfo& info) override { | |
90 | int k = static_cast<int>(info.compaction_reason); | |
91 | int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons); | |
92 | assert(k >= 0 && k < num_of_reasons); | |
93 | compaction_completed_[k]++; | |
94 | } | |
95 | ||
96 | virtual void OnExternalFileIngested(DB* /* db */, | |
97 | const ExternalFileIngestionInfo& /* info */) override { | |
98 | int k = static_cast<int>(CompactionReason::kExternalSstIngestion); | |
99 | compaction_completed_[k]++; | |
100 | } | |
101 | ||
102 | virtual void OnFlushCompleted(DB* /* db */, | |
103 | const FlushJobInfo& /* info */) override { | |
104 | int k = static_cast<int>(CompactionReason::kFlush); | |
105 | compaction_completed_[k]++; | |
106 | } | |
107 | ||
108 | int NumberOfCompactions(CompactionReason reason) const { | |
109 | int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons); | |
110 | int k = static_cast<int>(reason); | |
111 | assert(k >= 0 && k < num_of_reasons); | |
112 | return compaction_completed_.at(k).load(); | |
113 | } | |
114 | ||
115 | private: | |
116 | std::vector<std::atomic<int>> compaction_completed_; | |
117 | }; | |
118 | ||
119 | class SstStatsCollector : public EventListener { | |
120 | public: | |
121 | SstStatsCollector() : num_ssts_creation_started_(0) {} | |
122 | ||
123 | void OnTableFileCreationStarted( | |
124 | const TableFileCreationBriefInfo& /* info */) override { | |
125 | ++num_ssts_creation_started_; | |
126 | } | |
127 | ||
128 | int num_ssts_creation_started() { return num_ssts_creation_started_; } | |
129 | ||
130 | private: | |
131 | std::atomic<int> num_ssts_creation_started_; | |
132 | }; | |
133 | ||
7c673cae FG |
134 | static const int kCDTValueSize = 1000; |
135 | static const int kCDTKeysPerBuffer = 4; | |
136 | static const int kCDTNumLevels = 8; | |
137 | Options DeletionTriggerOptions(Options options) { | |
138 | options.compression = kNoCompression; | |
139 | options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24); | |
140 | options.min_write_buffer_number_to_merge = 1; | |
141 | options.max_write_buffer_number_to_maintain = 0; | |
142 | options.num_levels = kCDTNumLevels; | |
143 | options.level0_file_num_compaction_trigger = 1; | |
144 | options.target_file_size_base = options.write_buffer_size * 2; | |
145 | options.target_file_size_multiplier = 2; | |
146 | options.max_bytes_for_level_base = | |
147 | options.target_file_size_base * options.target_file_size_multiplier; | |
148 | options.max_bytes_for_level_multiplier = 2; | |
149 | options.disable_auto_compactions = false; | |
150 | return options; | |
151 | } | |
152 | ||
153 | bool HaveOverlappingKeyRanges( | |
154 | const Comparator* c, | |
155 | const SstFileMetaData& a, const SstFileMetaData& b) { | |
156 | if (c->Compare(a.smallestkey, b.smallestkey) >= 0) { | |
157 | if (c->Compare(a.smallestkey, b.largestkey) <= 0) { | |
158 | // b.smallestkey <= a.smallestkey <= b.largestkey | |
159 | return true; | |
160 | } | |
161 | } else if (c->Compare(a.largestkey, b.smallestkey) >= 0) { | |
162 | // a.smallestkey < b.smallestkey <= a.largestkey | |
163 | return true; | |
164 | } | |
165 | if (c->Compare(a.largestkey, b.largestkey) <= 0) { | |
166 | if (c->Compare(a.largestkey, b.smallestkey) >= 0) { | |
167 | // b.smallestkey <= a.largestkey <= b.largestkey | |
168 | return true; | |
169 | } | |
170 | } else if (c->Compare(a.smallestkey, b.largestkey) <= 0) { | |
171 | // a.smallestkey <= b.largestkey < a.largestkey | |
172 | return true; | |
173 | } | |
174 | return false; | |
175 | } | |
176 | ||
177 | // Identifies all files between level "min_level" and "max_level" | |
178 | // which has overlapping key range with "input_file_meta". | |
179 | void GetOverlappingFileNumbersForLevelCompaction( | |
180 | const ColumnFamilyMetaData& cf_meta, | |
181 | const Comparator* comparator, | |
182 | int min_level, int max_level, | |
183 | const SstFileMetaData* input_file_meta, | |
184 | std::set<std::string>* overlapping_file_names) { | |
185 | std::set<const SstFileMetaData*> overlapping_files; | |
186 | overlapping_files.insert(input_file_meta); | |
187 | for (int m = min_level; m <= max_level; ++m) { | |
188 | for (auto& file : cf_meta.levels[m].files) { | |
189 | for (auto* included_file : overlapping_files) { | |
190 | if (HaveOverlappingKeyRanges( | |
191 | comparator, *included_file, file)) { | |
192 | overlapping_files.insert(&file); | |
193 | overlapping_file_names->insert(file.name); | |
194 | break; | |
195 | } | |
196 | } | |
197 | } | |
198 | } | |
199 | } | |
200 | ||
201 | void VerifyCompactionResult( | |
202 | const ColumnFamilyMetaData& cf_meta, | |
203 | const std::set<std::string>& overlapping_file_numbers) { | |
204 | #ifndef NDEBUG | |
205 | for (auto& level : cf_meta.levels) { | |
206 | for (auto& file : level.files) { | |
207 | assert(overlapping_file_numbers.find(file.name) == | |
208 | overlapping_file_numbers.end()); | |
209 | } | |
210 | } | |
211 | #endif | |
212 | } | |
213 | ||
11fdf7f2 TL |
214 | /* |
215 | * Verifies compaction stats of cfd are valid. | |
216 | * | |
217 | * For each level of cfd, its compaction stats are valid if | |
218 | * 1) sum(stat.counts) == stat.count, and | |
219 | * 2) stat.counts[i] == collector.NumberOfCompactions(i) | |
220 | */ | |
221 | void VerifyCompactionStats(ColumnFamilyData& cfd, | |
222 | const CompactionStatsCollector& collector) { | |
223 | #ifndef NDEBUG | |
224 | InternalStats* internal_stats_ptr = cfd.internal_stats(); | |
225 | ASSERT_TRUE(internal_stats_ptr != nullptr); | |
226 | const std::vector<InternalStats::CompactionStats>& comp_stats = | |
227 | internal_stats_ptr->TEST_GetCompactionStats(); | |
228 | const int num_of_reasons = static_cast<int>(CompactionReason::kNumOfReasons); | |
229 | std::vector<int> counts(num_of_reasons, 0); | |
230 | // Count the number of compactions caused by each CompactionReason across | |
231 | // all levels. | |
232 | for (const auto& stat : comp_stats) { | |
233 | int sum = 0; | |
234 | for (int i = 0; i < num_of_reasons; i++) { | |
235 | counts[i] += stat.counts[i]; | |
236 | sum += stat.counts[i]; | |
237 | } | |
238 | ASSERT_EQ(sum, stat.count); | |
239 | } | |
240 | // Verify InternalStats bookkeeping matches that of CompactionStatsCollector, | |
241 | // assuming that all compactions complete. | |
242 | for (int i = 0; i < num_of_reasons; i++) { | |
243 | ASSERT_EQ(collector.NumberOfCompactions(static_cast<CompactionReason>(i)), counts[i]); | |
244 | } | |
245 | #endif /* NDEBUG */ | |
246 | } | |
247 | ||
7c673cae FG |
248 | const SstFileMetaData* PickFileRandomly( |
249 | const ColumnFamilyMetaData& cf_meta, | |
250 | Random* rand, | |
251 | int* level = nullptr) { | |
252 | auto file_id = rand->Uniform(static_cast<int>( | |
253 | cf_meta.file_count)) + 1; | |
254 | for (auto& level_meta : cf_meta.levels) { | |
255 | if (file_id <= level_meta.files.size()) { | |
256 | if (level != nullptr) { | |
257 | *level = level_meta.level; | |
258 | } | |
259 | auto result = rand->Uniform(file_id); | |
260 | return &(level_meta.files[result]); | |
261 | } | |
262 | file_id -= static_cast<uint32_t>(level_meta.files.size()); | |
263 | } | |
264 | assert(false); | |
265 | return nullptr; | |
266 | } | |
267 | } // anonymous namespace | |
268 | ||
11fdf7f2 | 269 | #ifndef ROCKSDB_VALGRIND_RUN |
7c673cae FG |
270 | // All the TEST_P tests run once with sub_compactions disabled (i.e. |
271 | // options.max_subcompactions = 1) and once with it enabled | |
272 | TEST_P(DBCompactionTestWithParam, CompactionDeletionTrigger) { | |
273 | for (int tid = 0; tid < 3; ++tid) { | |
274 | uint64_t db_size[2]; | |
275 | Options options = DeletionTriggerOptions(CurrentOptions()); | |
276 | options.max_subcompactions = max_subcompactions_; | |
277 | ||
278 | if (tid == 1) { | |
279 | // the following only disable stats update in DB::Open() | |
280 | // and should not affect the result of this test. | |
281 | options.skip_stats_update_on_db_open = true; | |
282 | } else if (tid == 2) { | |
283 | // third pass with universal compaction | |
284 | options.compaction_style = kCompactionStyleUniversal; | |
285 | options.num_levels = 1; | |
286 | } | |
287 | ||
288 | DestroyAndReopen(options); | |
289 | Random rnd(301); | |
290 | ||
291 | const int kTestSize = kCDTKeysPerBuffer * 1024; | |
292 | std::vector<std::string> values; | |
293 | for (int k = 0; k < kTestSize; ++k) { | |
294 | values.push_back(RandomString(&rnd, kCDTValueSize)); | |
295 | ASSERT_OK(Put(Key(k), values[k])); | |
296 | } | |
297 | dbfull()->TEST_WaitForFlushMemTable(); | |
298 | dbfull()->TEST_WaitForCompact(); | |
299 | db_size[0] = Size(Key(0), Key(kTestSize - 1)); | |
300 | ||
301 | for (int k = 0; k < kTestSize; ++k) { | |
302 | ASSERT_OK(Delete(Key(k))); | |
303 | } | |
304 | dbfull()->TEST_WaitForFlushMemTable(); | |
305 | dbfull()->TEST_WaitForCompact(); | |
306 | db_size[1] = Size(Key(0), Key(kTestSize - 1)); | |
307 | ||
308 | // must have much smaller db size. | |
309 | ASSERT_GT(db_size[0] / 3, db_size[1]); | |
310 | } | |
311 | } | |
11fdf7f2 TL |
312 | #endif // ROCKSDB_VALGRIND_RUN |
313 | ||
314 | TEST_P(DBCompactionTestWithParam, CompactionsPreserveDeletes) { | |
315 | // For each options type we test following | |
316 | // - Enable preserve_deletes | |
317 | // - write bunch of keys and deletes | |
318 | // - Set start_seqnum to the beginning; compact; check that keys are present | |
319 | // - rewind start_seqnum way forward; compact; check that keys are gone | |
320 | ||
321 | for (int tid = 0; tid < 3; ++tid) { | |
322 | Options options = DeletionTriggerOptions(CurrentOptions()); | |
323 | options.max_subcompactions = max_subcompactions_; | |
324 | options.preserve_deletes=true; | |
325 | options.num_levels = 2; | |
326 | ||
327 | if (tid == 1) { | |
328 | options.skip_stats_update_on_db_open = true; | |
329 | } else if (tid == 2) { | |
330 | // third pass with universal compaction | |
331 | options.compaction_style = kCompactionStyleUniversal; | |
332 | } | |
333 | ||
334 | DestroyAndReopen(options); | |
335 | Random rnd(301); | |
336 | // highlight the default; all deletes should be preserved | |
337 | SetPreserveDeletesSequenceNumber(0); | |
338 | ||
339 | const int kTestSize = kCDTKeysPerBuffer; | |
340 | std::vector<std::string> values; | |
341 | for (int k = 0; k < kTestSize; ++k) { | |
342 | values.push_back(RandomString(&rnd, kCDTValueSize)); | |
343 | ASSERT_OK(Put(Key(k), values[k])); | |
344 | } | |
345 | ||
346 | for (int k = 0; k < kTestSize; ++k) { | |
347 | ASSERT_OK(Delete(Key(k))); | |
348 | } | |
349 | // to ensure we tackle all tombstones | |
350 | CompactRangeOptions cro; | |
351 | cro.change_level = true; | |
352 | cro.target_level = 2; | |
353 | cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; | |
354 | ||
355 | dbfull()->TEST_WaitForFlushMemTable(); | |
356 | dbfull()->CompactRange(cro, nullptr, nullptr); | |
357 | ||
358 | // check that normal user iterator doesn't see anything | |
359 | Iterator* db_iter = dbfull()->NewIterator(ReadOptions()); | |
360 | int i = 0; | |
361 | for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) { | |
362 | i++; | |
363 | } | |
364 | ASSERT_EQ(i, 0); | |
365 | delete db_iter; | |
366 | ||
367 | // check that iterator that sees internal keys sees tombstones | |
368 | ReadOptions ro; | |
369 | ro.iter_start_seqnum=1; | |
370 | db_iter = dbfull()->NewIterator(ro); | |
371 | i = 0; | |
372 | for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) { | |
373 | i++; | |
374 | } | |
375 | ASSERT_EQ(i, 4); | |
376 | delete db_iter; | |
377 | ||
378 | // now all deletes should be gone | |
379 | SetPreserveDeletesSequenceNumber(100000000); | |
380 | dbfull()->CompactRange(cro, nullptr, nullptr); | |
381 | ||
382 | db_iter = dbfull()->NewIterator(ro); | |
383 | i = 0; | |
384 | for (db_iter->SeekToFirst(); db_iter->Valid(); db_iter->Next()) { | |
385 | i++; | |
386 | } | |
387 | ASSERT_EQ(i, 0); | |
388 | delete db_iter; | |
389 | } | |
390 | } | |
7c673cae FG |
391 | |
392 | TEST_F(DBCompactionTest, SkipStatsUpdateTest) { | |
393 | // This test verify UpdateAccumulatedStats is not on | |
394 | // if options.skip_stats_update_on_db_open = true | |
395 | // The test will need to be updated if the internal behavior changes. | |
396 | ||
397 | Options options = DeletionTriggerOptions(CurrentOptions()); | |
398 | options.env = env_; | |
399 | DestroyAndReopen(options); | |
400 | Random rnd(301); | |
401 | ||
402 | const int kTestSize = kCDTKeysPerBuffer * 512; | |
403 | std::vector<std::string> values; | |
404 | for (int k = 0; k < kTestSize; ++k) { | |
405 | values.push_back(RandomString(&rnd, kCDTValueSize)); | |
406 | ASSERT_OK(Put(Key(k), values[k])); | |
407 | } | |
408 | dbfull()->TEST_WaitForFlushMemTable(); | |
409 | dbfull()->TEST_WaitForCompact(); | |
410 | ||
411 | // Reopen the DB with stats-update disabled | |
412 | options.skip_stats_update_on_db_open = true; | |
413 | env_->random_file_open_counter_.store(0); | |
414 | Reopen(options); | |
415 | ||
416 | // As stats-update is disabled, we expect a very low number of | |
417 | // random file open. | |
418 | // Note that this number must be changed accordingly if we change | |
419 | // the number of files needed to be opened in the DB::Open process. | |
420 | const int kMaxFileOpenCount = 10; | |
421 | ASSERT_LT(env_->random_file_open_counter_.load(), kMaxFileOpenCount); | |
422 | ||
423 | // Repeat the reopen process, but this time we enable | |
424 | // stats-update. | |
425 | options.skip_stats_update_on_db_open = false; | |
426 | env_->random_file_open_counter_.store(0); | |
427 | Reopen(options); | |
428 | ||
429 | // Since we do a normal stats update on db-open, there | |
430 | // will be more random open files. | |
431 | ASSERT_GT(env_->random_file_open_counter_.load(), kMaxFileOpenCount); | |
432 | } | |
433 | ||
434 | TEST_F(DBCompactionTest, TestTableReaderForCompaction) { | |
435 | Options options = CurrentOptions(); | |
436 | options.env = env_; | |
437 | options.new_table_reader_for_compaction_inputs = true; | |
438 | options.max_open_files = 100; | |
439 | options.level0_file_num_compaction_trigger = 3; | |
440 | DestroyAndReopen(options); | |
441 | Random rnd(301); | |
442 | ||
443 | int num_table_cache_lookup = 0; | |
444 | int num_new_table_reader = 0; | |
445 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
446 | "TableCache::FindTable:0", [&](void* arg) { | |
447 | assert(arg != nullptr); | |
448 | bool no_io = *(reinterpret_cast<bool*>(arg)); | |
449 | if (!no_io) { | |
450 | // filter out cases for table properties queries. | |
451 | num_table_cache_lookup++; | |
452 | } | |
453 | }); | |
454 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
455 | "TableCache::GetTableReader:0", | |
11fdf7f2 | 456 | [&](void* /*arg*/) { num_new_table_reader++; }); |
7c673cae FG |
457 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
458 | ||
459 | for (int k = 0; k < options.level0_file_num_compaction_trigger; ++k) { | |
460 | ASSERT_OK(Put(Key(k), Key(k))); | |
461 | ASSERT_OK(Put(Key(10 - k), "bar")); | |
462 | if (k < options.level0_file_num_compaction_trigger - 1) { | |
463 | num_table_cache_lookup = 0; | |
464 | Flush(); | |
465 | dbfull()->TEST_WaitForCompact(); | |
466 | // preloading iterator issues one table cache lookup and create | |
467 | // a new table reader. | |
468 | ASSERT_EQ(num_table_cache_lookup, 1); | |
469 | ASSERT_EQ(num_new_table_reader, 1); | |
470 | ||
471 | num_table_cache_lookup = 0; | |
472 | num_new_table_reader = 0; | |
473 | ASSERT_EQ(Key(k), Get(Key(k))); | |
474 | // lookup iterator from table cache and no need to create a new one. | |
475 | ASSERT_EQ(num_table_cache_lookup, 1); | |
476 | ASSERT_EQ(num_new_table_reader, 0); | |
477 | } | |
478 | } | |
479 | ||
480 | num_table_cache_lookup = 0; | |
481 | num_new_table_reader = 0; | |
482 | Flush(); | |
483 | dbfull()->TEST_WaitForCompact(); | |
484 | // Preloading iterator issues one table cache lookup and creates | |
485 | // a new table reader. One file is created for flush and one for compaction. | |
486 | // Compaction inputs make no table cache look-up for data/range deletion | |
487 | // iterators | |
488 | ASSERT_EQ(num_table_cache_lookup, 2); | |
489 | // Create new iterator for: | |
490 | // (1) 1 for verifying flush results | |
491 | // (2) 3 for compaction input files | |
492 | // (3) 1 for verifying compaction results. | |
493 | ASSERT_EQ(num_new_table_reader, 5); | |
494 | ||
495 | num_table_cache_lookup = 0; | |
496 | num_new_table_reader = 0; | |
497 | ASSERT_EQ(Key(1), Get(Key(1))); | |
498 | ASSERT_EQ(num_table_cache_lookup, 1); | |
499 | ASSERT_EQ(num_new_table_reader, 0); | |
500 | ||
501 | num_table_cache_lookup = 0; | |
502 | num_new_table_reader = 0; | |
503 | CompactRangeOptions cro; | |
504 | cro.change_level = true; | |
505 | cro.target_level = 2; | |
506 | cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; | |
507 | db_->CompactRange(cro, nullptr, nullptr); | |
508 | // Only verifying compaction outputs issues one table cache lookup | |
509 | // for both data block and range deletion block). | |
510 | ASSERT_EQ(num_table_cache_lookup, 1); | |
511 | // One for compaction input, one for verifying compaction results. | |
512 | ASSERT_EQ(num_new_table_reader, 2); | |
513 | ||
514 | num_table_cache_lookup = 0; | |
515 | num_new_table_reader = 0; | |
516 | ASSERT_EQ(Key(1), Get(Key(1))); | |
517 | ASSERT_EQ(num_table_cache_lookup, 1); | |
518 | ASSERT_EQ(num_new_table_reader, 0); | |
519 | ||
520 | rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); | |
521 | } | |
522 | ||
523 | TEST_P(DBCompactionTestWithParam, CompactionDeletionTriggerReopen) { | |
524 | for (int tid = 0; tid < 2; ++tid) { | |
525 | uint64_t db_size[3]; | |
526 | Options options = DeletionTriggerOptions(CurrentOptions()); | |
527 | options.max_subcompactions = max_subcompactions_; | |
528 | ||
529 | if (tid == 1) { | |
530 | // second pass with universal compaction | |
531 | options.compaction_style = kCompactionStyleUniversal; | |
532 | options.num_levels = 1; | |
533 | } | |
534 | ||
535 | DestroyAndReopen(options); | |
536 | Random rnd(301); | |
537 | ||
538 | // round 1 --- insert key/value pairs. | |
539 | const int kTestSize = kCDTKeysPerBuffer * 512; | |
540 | std::vector<std::string> values; | |
541 | for (int k = 0; k < kTestSize; ++k) { | |
542 | values.push_back(RandomString(&rnd, kCDTValueSize)); | |
543 | ASSERT_OK(Put(Key(k), values[k])); | |
544 | } | |
545 | dbfull()->TEST_WaitForFlushMemTable(); | |
546 | dbfull()->TEST_WaitForCompact(); | |
547 | db_size[0] = Size(Key(0), Key(kTestSize - 1)); | |
548 | Close(); | |
549 | ||
550 | // round 2 --- disable auto-compactions and issue deletions. | |
551 | options.create_if_missing = false; | |
552 | options.disable_auto_compactions = true; | |
553 | Reopen(options); | |
554 | ||
555 | for (int k = 0; k < kTestSize; ++k) { | |
556 | ASSERT_OK(Delete(Key(k))); | |
557 | } | |
558 | db_size[1] = Size(Key(0), Key(kTestSize - 1)); | |
559 | Close(); | |
560 | // as auto_compaction is off, we shouldn't see too much reduce | |
561 | // in db size. | |
562 | ASSERT_LT(db_size[0] / 3, db_size[1]); | |
563 | ||
564 | // round 3 --- reopen db with auto_compaction on and see if | |
565 | // deletion compensation still work. | |
566 | options.disable_auto_compactions = false; | |
567 | Reopen(options); | |
568 | // insert relatively small amount of data to trigger auto compaction. | |
569 | for (int k = 0; k < kTestSize / 10; ++k) { | |
570 | ASSERT_OK(Put(Key(k), values[k])); | |
571 | } | |
572 | dbfull()->TEST_WaitForFlushMemTable(); | |
573 | dbfull()->TEST_WaitForCompact(); | |
574 | db_size[2] = Size(Key(0), Key(kTestSize - 1)); | |
575 | // this time we're expecting significant drop in size. | |
576 | ASSERT_GT(db_size[0] / 3, db_size[2]); | |
577 | } | |
578 | } | |
579 | ||
580 | TEST_F(DBCompactionTest, DisableStatsUpdateReopen) { | |
581 | uint64_t db_size[3]; | |
582 | for (int test = 0; test < 2; ++test) { | |
583 | Options options = DeletionTriggerOptions(CurrentOptions()); | |
584 | options.skip_stats_update_on_db_open = (test == 0); | |
585 | ||
586 | env_->random_read_counter_.Reset(); | |
587 | DestroyAndReopen(options); | |
588 | Random rnd(301); | |
589 | ||
590 | // round 1 --- insert key/value pairs. | |
591 | const int kTestSize = kCDTKeysPerBuffer * 512; | |
592 | std::vector<std::string> values; | |
593 | for (int k = 0; k < kTestSize; ++k) { | |
594 | values.push_back(RandomString(&rnd, kCDTValueSize)); | |
595 | ASSERT_OK(Put(Key(k), values[k])); | |
596 | } | |
597 | dbfull()->TEST_WaitForFlushMemTable(); | |
598 | dbfull()->TEST_WaitForCompact(); | |
599 | db_size[0] = Size(Key(0), Key(kTestSize - 1)); | |
600 | Close(); | |
601 | ||
602 | // round 2 --- disable auto-compactions and issue deletions. | |
603 | options.create_if_missing = false; | |
604 | options.disable_auto_compactions = true; | |
605 | ||
606 | env_->random_read_counter_.Reset(); | |
607 | Reopen(options); | |
608 | ||
609 | for (int k = 0; k < kTestSize; ++k) { | |
610 | ASSERT_OK(Delete(Key(k))); | |
611 | } | |
612 | db_size[1] = Size(Key(0), Key(kTestSize - 1)); | |
613 | Close(); | |
614 | // as auto_compaction is off, we shouldn't see too much reduce | |
615 | // in db size. | |
616 | ASSERT_LT(db_size[0] / 3, db_size[1]); | |
617 | ||
618 | // round 3 --- reopen db with auto_compaction on and see if | |
619 | // deletion compensation still work. | |
620 | options.disable_auto_compactions = false; | |
621 | Reopen(options); | |
622 | dbfull()->TEST_WaitForFlushMemTable(); | |
623 | dbfull()->TEST_WaitForCompact(); | |
624 | db_size[2] = Size(Key(0), Key(kTestSize - 1)); | |
625 | ||
626 | if (options.skip_stats_update_on_db_open) { | |
627 | // If update stats on DB::Open is disable, we don't expect | |
628 | // deletion entries taking effect. | |
629 | ASSERT_LT(db_size[0] / 3, db_size[2]); | |
630 | } else { | |
631 | // Otherwise, we should see a significant drop in db size. | |
632 | ASSERT_GT(db_size[0] / 3, db_size[2]); | |
633 | } | |
634 | } | |
635 | } | |
636 | ||
637 | ||
638 | TEST_P(DBCompactionTestWithParam, CompactionTrigger) { | |
639 | const int kNumKeysPerFile = 100; | |
640 | ||
641 | Options options = CurrentOptions(); | |
642 | options.write_buffer_size = 110 << 10; // 110KB | |
643 | options.arena_block_size = 4 << 10; | |
644 | options.num_levels = 3; | |
645 | options.level0_file_num_compaction_trigger = 3; | |
646 | options.max_subcompactions = max_subcompactions_; | |
647 | options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); | |
648 | CreateAndReopenWithCF({"pikachu"}, options); | |
649 | ||
650 | Random rnd(301); | |
651 | ||
652 | for (int num = 0; num < options.level0_file_num_compaction_trigger - 1; | |
653 | num++) { | |
654 | std::vector<std::string> values; | |
655 | // Write 100KB (100 values, each 1K) | |
656 | for (int i = 0; i < kNumKeysPerFile; i++) { | |
657 | values.push_back(RandomString(&rnd, 990)); | |
658 | ASSERT_OK(Put(1, Key(i), values[i])); | |
659 | } | |
660 | // put extra key to trigger flush | |
661 | ASSERT_OK(Put(1, "", "")); | |
662 | dbfull()->TEST_WaitForFlushMemTable(handles_[1]); | |
663 | ASSERT_EQ(NumTableFilesAtLevel(0, 1), num + 1); | |
664 | } | |
665 | ||
666 | // generate one more file in level-0, and should trigger level-0 compaction | |
667 | std::vector<std::string> values; | |
668 | for (int i = 0; i < kNumKeysPerFile; i++) { | |
669 | values.push_back(RandomString(&rnd, 990)); | |
670 | ASSERT_OK(Put(1, Key(i), values[i])); | |
671 | } | |
672 | // put extra key to trigger flush | |
673 | ASSERT_OK(Put(1, "", "")); | |
674 | dbfull()->TEST_WaitForCompact(); | |
675 | ||
676 | ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); | |
677 | ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1); | |
678 | } | |
679 | ||
680 | TEST_F(DBCompactionTest, BGCompactionsAllowed) { | |
681 | // Create several column families. Make compaction triggers in all of them | |
682 | // and see number of compactions scheduled to be less than allowed. | |
683 | const int kNumKeysPerFile = 100; | |
684 | ||
685 | Options options = CurrentOptions(); | |
686 | options.write_buffer_size = 110 << 10; // 110KB | |
687 | options.arena_block_size = 4 << 10; | |
688 | options.num_levels = 3; | |
689 | // Should speed up compaction when there are 4 files. | |
690 | options.level0_file_num_compaction_trigger = 2; | |
691 | options.level0_slowdown_writes_trigger = 20; | |
692 | options.soft_pending_compaction_bytes_limit = 1 << 30; // Infinitely large | |
7c673cae FG |
693 | options.max_background_compactions = 3; |
694 | options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile)); | |
695 | ||
696 | // Block all threads in thread pool. | |
697 | const size_t kTotalTasks = 4; | |
698 | env_->SetBackgroundThreads(4, Env::LOW); | |
699 | test::SleepingBackgroundTask sleeping_tasks[kTotalTasks]; | |
700 | for (size_t i = 0; i < kTotalTasks; i++) { | |
701 | env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, | |
702 | &sleeping_tasks[i], Env::Priority::LOW); | |
703 | sleeping_tasks[i].WaitUntilSleeping(); | |
704 | } | |
705 | ||
706 | CreateAndReopenWithCF({"one", "two", "three"}, options); | |
707 | ||
708 | Random rnd(301); | |
709 | for (int cf = 0; cf < 4; cf++) { | |
710 | for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { | |
711 | for (int i = 0; i < kNumKeysPerFile; i++) { | |
712 | ASSERT_OK(Put(cf, Key(i), "")); | |
713 | } | |
714 | // put extra key to trigger flush | |
715 | ASSERT_OK(Put(cf, "", "")); | |
716 | dbfull()->TEST_WaitForFlushMemTable(handles_[cf]); | |
717 | ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1); | |
718 | } | |
719 | } | |
720 | ||
721 | // Now all column families qualify compaction but only one should be | |
722 | // scheduled, because no column family hits speed up condition. | |
723 | ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); | |
724 | ||
725 | // Create two more files for one column family, which triggers speed up | |
726 | // condition, three compactions will be scheduled. | |
727 | for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { | |
728 | for (int i = 0; i < kNumKeysPerFile; i++) { | |
729 | ASSERT_OK(Put(2, Key(i), "")); | |
730 | } | |
731 | // put extra key to trigger flush | |
732 | ASSERT_OK(Put(2, "", "")); | |
733 | dbfull()->TEST_WaitForFlushMemTable(handles_[2]); | |
734 | ASSERT_EQ(options.level0_file_num_compaction_trigger + num + 1, | |
735 | NumTableFilesAtLevel(0, 2)); | |
736 | } | |
737 | ASSERT_EQ(3, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); | |
738 | ||
739 | // Unblock all threads to unblock all compactions. | |
740 | for (size_t i = 0; i < kTotalTasks; i++) { | |
741 | sleeping_tasks[i].WakeUp(); | |
742 | sleeping_tasks[i].WaitUntilDone(); | |
743 | } | |
744 | dbfull()->TEST_WaitForCompact(); | |
745 | ||
746 | // Verify number of compactions allowed will come back to 1. | |
747 | ||
748 | for (size_t i = 0; i < kTotalTasks; i++) { | |
749 | sleeping_tasks[i].Reset(); | |
750 | env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, | |
751 | &sleeping_tasks[i], Env::Priority::LOW); | |
752 | sleeping_tasks[i].WaitUntilSleeping(); | |
753 | } | |
754 | for (int cf = 0; cf < 4; cf++) { | |
755 | for (int num = 0; num < options.level0_file_num_compaction_trigger; num++) { | |
756 | for (int i = 0; i < kNumKeysPerFile; i++) { | |
757 | ASSERT_OK(Put(cf, Key(i), "")); | |
758 | } | |
759 | // put extra key to trigger flush | |
760 | ASSERT_OK(Put(cf, "", "")); | |
761 | dbfull()->TEST_WaitForFlushMemTable(handles_[cf]); | |
762 | ASSERT_EQ(NumTableFilesAtLevel(0, cf), num + 1); | |
763 | } | |
764 | } | |
765 | ||
766 | // Now all column families qualify compaction but only one should be | |
767 | // scheduled, because no column family hits speed up condition. | |
768 | ASSERT_EQ(1, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); | |
769 | ||
770 | for (size_t i = 0; i < kTotalTasks; i++) { | |
771 | sleeping_tasks[i].WakeUp(); | |
772 | sleeping_tasks[i].WaitUntilDone(); | |
773 | } | |
774 | } | |
775 | ||
776 | TEST_P(DBCompactionTestWithParam, CompactionsGenerateMultipleFiles) { | |
777 | Options options = CurrentOptions(); | |
778 | options.write_buffer_size = 100000000; // Large write buffer | |
779 | options.max_subcompactions = max_subcompactions_; | |
780 | CreateAndReopenWithCF({"pikachu"}, options); | |
781 | ||
782 | Random rnd(301); | |
783 | ||
784 | // Write 8MB (80 values, each 100K) | |
785 | ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); | |
786 | std::vector<std::string> values; | |
787 | for (int i = 0; i < 80; i++) { | |
788 | values.push_back(RandomString(&rnd, 100000)); | |
789 | ASSERT_OK(Put(1, Key(i), values[i])); | |
790 | } | |
791 | ||
792 | // Reopening moves updates to level-0 | |
793 | ReopenWithColumnFamilies({"default", "pikachu"}, options); | |
794 | dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1], | |
795 | true /* disallow trivial move */); | |
796 | ||
797 | ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); | |
798 | ASSERT_GT(NumTableFilesAtLevel(1, 1), 1); | |
799 | for (int i = 0; i < 80; i++) { | |
800 | ASSERT_EQ(Get(1, Key(i)), values[i]); | |
801 | } | |
802 | } | |
803 | ||
804 | TEST_F(DBCompactionTest, MinorCompactionsHappen) { | |
805 | do { | |
806 | Options options = CurrentOptions(); | |
807 | options.write_buffer_size = 10000; | |
808 | CreateAndReopenWithCF({"pikachu"}, options); | |
809 | ||
810 | const int N = 500; | |
811 | ||
812 | int starting_num_tables = TotalTableFiles(1); | |
813 | for (int i = 0; i < N; i++) { | |
814 | ASSERT_OK(Put(1, Key(i), Key(i) + std::string(1000, 'v'))); | |
815 | } | |
816 | int ending_num_tables = TotalTableFiles(1); | |
817 | ASSERT_GT(ending_num_tables, starting_num_tables); | |
818 | ||
819 | for (int i = 0; i < N; i++) { | |
820 | ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(1, Key(i))); | |
821 | } | |
822 | ||
823 | ReopenWithColumnFamilies({"default", "pikachu"}, options); | |
824 | ||
825 | for (int i = 0; i < N; i++) { | |
826 | ASSERT_EQ(Key(i) + std::string(1000, 'v'), Get(1, Key(i))); | |
827 | } | |
828 | } while (ChangeCompactOptions()); | |
829 | } | |
830 | ||
831 | TEST_F(DBCompactionTest, UserKeyCrossFile1) { | |
832 | Options options = CurrentOptions(); | |
833 | options.compaction_style = kCompactionStyleLevel; | |
834 | options.level0_file_num_compaction_trigger = 3; | |
835 | ||
836 | DestroyAndReopen(options); | |
837 | ||
838 | // create first file and flush to l0 | |
839 | Put("4", "A"); | |
840 | Put("3", "A"); | |
841 | Flush(); | |
842 | dbfull()->TEST_WaitForFlushMemTable(); | |
843 | ||
844 | Put("2", "A"); | |
845 | Delete("3"); | |
846 | Flush(); | |
847 | dbfull()->TEST_WaitForFlushMemTable(); | |
848 | ASSERT_EQ("NOT_FOUND", Get("3")); | |
849 | ||
850 | // move both files down to l1 | |
851 | dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
852 | ASSERT_EQ("NOT_FOUND", Get("3")); | |
853 | ||
854 | for (int i = 0; i < 3; i++) { | |
855 | Put("2", "B"); | |
856 | Flush(); | |
857 | dbfull()->TEST_WaitForFlushMemTable(); | |
858 | } | |
859 | dbfull()->TEST_WaitForCompact(); | |
860 | ||
861 | ASSERT_EQ("NOT_FOUND", Get("3")); | |
862 | } | |
863 | ||
864 | TEST_F(DBCompactionTest, UserKeyCrossFile2) { | |
865 | Options options = CurrentOptions(); | |
866 | options.compaction_style = kCompactionStyleLevel; | |
867 | options.level0_file_num_compaction_trigger = 3; | |
868 | ||
869 | DestroyAndReopen(options); | |
870 | ||
871 | // create first file and flush to l0 | |
872 | Put("4", "A"); | |
873 | Put("3", "A"); | |
874 | Flush(); | |
875 | dbfull()->TEST_WaitForFlushMemTable(); | |
876 | ||
877 | Put("2", "A"); | |
878 | SingleDelete("3"); | |
879 | Flush(); | |
880 | dbfull()->TEST_WaitForFlushMemTable(); | |
881 | ASSERT_EQ("NOT_FOUND", Get("3")); | |
882 | ||
883 | // move both files down to l1 | |
884 | dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
885 | ASSERT_EQ("NOT_FOUND", Get("3")); | |
886 | ||
887 | for (int i = 0; i < 3; i++) { | |
888 | Put("2", "B"); | |
889 | Flush(); | |
890 | dbfull()->TEST_WaitForFlushMemTable(); | |
891 | } | |
892 | dbfull()->TEST_WaitForCompact(); | |
893 | ||
894 | ASSERT_EQ("NOT_FOUND", Get("3")); | |
895 | } | |
896 | ||
897 | TEST_F(DBCompactionTest, ZeroSeqIdCompaction) { | |
898 | Options options = CurrentOptions(); | |
899 | options.compaction_style = kCompactionStyleLevel; | |
900 | options.level0_file_num_compaction_trigger = 3; | |
901 | ||
902 | FlushedFileCollector* collector = new FlushedFileCollector(); | |
903 | options.listeners.emplace_back(collector); | |
904 | ||
905 | // compaction options | |
906 | CompactionOptions compact_opt; | |
907 | compact_opt.compression = kNoCompression; | |
908 | compact_opt.output_file_size_limit = 4096; | |
909 | const size_t key_len = | |
910 | static_cast<size_t>(compact_opt.output_file_size_limit) / 5; | |
911 | ||
912 | DestroyAndReopen(options); | |
913 | ||
914 | std::vector<const Snapshot*> snaps; | |
915 | ||
916 | // create first file and flush to l0 | |
917 | for (auto& key : {"1", "2", "3", "3", "3", "3"}) { | |
918 | Put(key, std::string(key_len, 'A')); | |
919 | snaps.push_back(dbfull()->GetSnapshot()); | |
920 | } | |
921 | Flush(); | |
922 | dbfull()->TEST_WaitForFlushMemTable(); | |
923 | ||
924 | // create second file and flush to l0 | |
925 | for (auto& key : {"3", "4", "5", "6", "7", "8"}) { | |
926 | Put(key, std::string(key_len, 'A')); | |
927 | snaps.push_back(dbfull()->GetSnapshot()); | |
928 | } | |
929 | Flush(); | |
930 | dbfull()->TEST_WaitForFlushMemTable(); | |
931 | ||
932 | // move both files down to l1 | |
933 | dbfull()->CompactFiles(compact_opt, collector->GetFlushedFiles(), 1); | |
934 | ||
935 | // release snap so that first instance of key(3) can have seqId=0 | |
936 | for (auto snap : snaps) { | |
937 | dbfull()->ReleaseSnapshot(snap); | |
938 | } | |
939 | ||
940 | // create 3 files in l0 so to trigger compaction | |
941 | for (int i = 0; i < options.level0_file_num_compaction_trigger; i++) { | |
942 | Put("2", std::string(1, 'A')); | |
943 | Flush(); | |
944 | dbfull()->TEST_WaitForFlushMemTable(); | |
945 | } | |
946 | ||
947 | dbfull()->TEST_WaitForCompact(); | |
948 | ASSERT_OK(Put("", "")); | |
949 | } | |
950 | ||
11fdf7f2 TL |
951 | TEST_F(DBCompactionTest, ManualCompactionUnknownOutputSize) { |
952 | // github issue #2249 | |
953 | Options options = CurrentOptions(); | |
954 | options.compaction_style = kCompactionStyleLevel; | |
955 | options.level0_file_num_compaction_trigger = 3; | |
956 | DestroyAndReopen(options); | |
957 | ||
958 | // create two files in l1 that we can compact | |
959 | for (int i = 0; i < 2; ++i) { | |
960 | for (int j = 0; j < options.level0_file_num_compaction_trigger; j++) { | |
961 | // make l0 files' ranges overlap to avoid trivial move | |
962 | Put(std::to_string(2 * i), std::string(1, 'A')); | |
963 | Put(std::to_string(2 * i + 1), std::string(1, 'A')); | |
964 | Flush(); | |
965 | dbfull()->TEST_WaitForFlushMemTable(); | |
966 | } | |
967 | dbfull()->TEST_WaitForCompact(); | |
968 | ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); | |
969 | ASSERT_EQ(NumTableFilesAtLevel(1, 0), i + 1); | |
970 | } | |
971 | ||
972 | ColumnFamilyMetaData cf_meta; | |
973 | dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta); | |
974 | ASSERT_EQ(2, cf_meta.levels[1].files.size()); | |
975 | std::vector<std::string> input_filenames; | |
976 | for (const auto& sst_file : cf_meta.levels[1].files) { | |
977 | input_filenames.push_back(sst_file.name); | |
978 | } | |
979 | ||
980 | // note CompactionOptions::output_file_size_limit is unset. | |
981 | CompactionOptions compact_opt; | |
982 | compact_opt.compression = kNoCompression; | |
983 | dbfull()->CompactFiles(compact_opt, input_filenames, 1); | |
984 | } | |
985 | ||
7c673cae FG |
986 | // Check that writes done during a memtable compaction are recovered |
987 | // if the database is shutdown during the memtable compaction. | |
988 | TEST_F(DBCompactionTest, RecoverDuringMemtableCompaction) { | |
989 | do { | |
990 | Options options = CurrentOptions(); | |
991 | options.env = env_; | |
992 | CreateAndReopenWithCF({"pikachu"}, options); | |
993 | ||
994 | // Trigger a long memtable compaction and reopen the database during it | |
995 | ASSERT_OK(Put(1, "foo", "v1")); // Goes to 1st log file | |
996 | ASSERT_OK(Put(1, "big1", std::string(10000000, 'x'))); // Fills memtable | |
997 | ASSERT_OK(Put(1, "big2", std::string(1000, 'y'))); // Triggers compaction | |
998 | ASSERT_OK(Put(1, "bar", "v2")); // Goes to new log file | |
999 | ||
1000 | ReopenWithColumnFamilies({"default", "pikachu"}, options); | |
1001 | ASSERT_EQ("v1", Get(1, "foo")); | |
1002 | ASSERT_EQ("v2", Get(1, "bar")); | |
1003 | ASSERT_EQ(std::string(10000000, 'x'), Get(1, "big1")); | |
1004 | ASSERT_EQ(std::string(1000, 'y'), Get(1, "big2")); | |
1005 | } while (ChangeOptions()); | |
1006 | } | |
1007 | ||
1008 | TEST_P(DBCompactionTestWithParam, TrivialMoveOneFile) { | |
1009 | int32_t trivial_move = 0; | |
1010 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
1011 | "DBImpl::BackgroundCompaction:TrivialMove", | |
11fdf7f2 | 1012 | [&](void* /*arg*/) { trivial_move++; }); |
7c673cae FG |
1013 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
1014 | ||
1015 | Options options = CurrentOptions(); | |
1016 | options.write_buffer_size = 100000000; | |
1017 | options.max_subcompactions = max_subcompactions_; | |
1018 | DestroyAndReopen(options); | |
1019 | ||
1020 | int32_t num_keys = 80; | |
1021 | int32_t value_size = 100 * 1024; // 100 KB | |
1022 | ||
1023 | Random rnd(301); | |
1024 | std::vector<std::string> values; | |
1025 | for (int i = 0; i < num_keys; i++) { | |
1026 | values.push_back(RandomString(&rnd, value_size)); | |
1027 | ASSERT_OK(Put(Key(i), values[i])); | |
1028 | } | |
1029 | ||
1030 | // Reopening moves updates to L0 | |
1031 | Reopen(options); | |
1032 | ASSERT_EQ(NumTableFilesAtLevel(0, 0), 1); // 1 file in L0 | |
1033 | ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // 0 files in L1 | |
1034 | ||
1035 | std::vector<LiveFileMetaData> metadata; | |
1036 | db_->GetLiveFilesMetaData(&metadata); | |
1037 | ASSERT_EQ(metadata.size(), 1U); | |
1038 | LiveFileMetaData level0_file = metadata[0]; // L0 file meta | |
1039 | ||
1040 | CompactRangeOptions cro; | |
1041 | cro.exclusive_manual_compaction = exclusive_manual_compaction_; | |
1042 | ||
1043 | // Compaction will initiate a trivial move from L0 to L1 | |
1044 | dbfull()->CompactRange(cro, nullptr, nullptr); | |
1045 | ||
1046 | // File moved From L0 to L1 | |
1047 | ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); // 0 files in L0 | |
1048 | ASSERT_EQ(NumTableFilesAtLevel(1, 0), 1); // 1 file in L1 | |
1049 | ||
1050 | metadata.clear(); | |
1051 | db_->GetLiveFilesMetaData(&metadata); | |
1052 | ASSERT_EQ(metadata.size(), 1U); | |
1053 | ASSERT_EQ(metadata[0].name /* level1_file.name */, level0_file.name); | |
1054 | ASSERT_EQ(metadata[0].size /* level1_file.size */, level0_file.size); | |
1055 | ||
1056 | for (int i = 0; i < num_keys; i++) { | |
1057 | ASSERT_EQ(Get(Key(i)), values[i]); | |
1058 | } | |
1059 | ||
1060 | ASSERT_EQ(trivial_move, 1); | |
1061 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
1062 | } | |
1063 | ||
1064 | TEST_P(DBCompactionTestWithParam, TrivialMoveNonOverlappingFiles) { | |
1065 | int32_t trivial_move = 0; | |
1066 | int32_t non_trivial_move = 0; | |
1067 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
1068 | "DBImpl::BackgroundCompaction:TrivialMove", | |
11fdf7f2 | 1069 | [&](void* /*arg*/) { trivial_move++; }); |
7c673cae FG |
1070 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
1071 | "DBImpl::BackgroundCompaction:NonTrivial", | |
11fdf7f2 | 1072 | [&](void* /*arg*/) { non_trivial_move++; }); |
7c673cae FG |
1073 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
1074 | ||
1075 | Options options = CurrentOptions(); | |
1076 | options.disable_auto_compactions = true; | |
1077 | options.write_buffer_size = 10 * 1024 * 1024; | |
1078 | options.max_subcompactions = max_subcompactions_; | |
1079 | ||
1080 | DestroyAndReopen(options); | |
1081 | // non overlapping ranges | |
1082 | std::vector<std::pair<int32_t, int32_t>> ranges = { | |
1083 | {100, 199}, | |
1084 | {300, 399}, | |
1085 | {0, 99}, | |
1086 | {200, 299}, | |
1087 | {600, 699}, | |
1088 | {400, 499}, | |
1089 | {500, 550}, | |
1090 | {551, 599}, | |
1091 | }; | |
1092 | int32_t value_size = 10 * 1024; // 10 KB | |
1093 | ||
1094 | Random rnd(301); | |
1095 | std::map<int32_t, std::string> values; | |
1096 | for (size_t i = 0; i < ranges.size(); i++) { | |
1097 | for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { | |
1098 | values[j] = RandomString(&rnd, value_size); | |
1099 | ASSERT_OK(Put(Key(j), values[j])); | |
1100 | } | |
1101 | ASSERT_OK(Flush()); | |
1102 | } | |
1103 | ||
1104 | int32_t level0_files = NumTableFilesAtLevel(0, 0); | |
1105 | ASSERT_EQ(level0_files, ranges.size()); // Multiple files in L0 | |
1106 | ASSERT_EQ(NumTableFilesAtLevel(1, 0), 0); // No files in L1 | |
1107 | ||
1108 | CompactRangeOptions cro; | |
1109 | cro.exclusive_manual_compaction = exclusive_manual_compaction_; | |
1110 | ||
1111 | // Since data is non-overlapping we expect compaction to initiate | |
1112 | // a trivial move | |
1113 | db_->CompactRange(cro, nullptr, nullptr); | |
1114 | // We expect that all the files were trivially moved from L0 to L1 | |
1115 | ASSERT_EQ(NumTableFilesAtLevel(0, 0), 0); | |
1116 | ASSERT_EQ(NumTableFilesAtLevel(1, 0) /* level1_files */, level0_files); | |
1117 | ||
1118 | for (size_t i = 0; i < ranges.size(); i++) { | |
1119 | for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { | |
1120 | ASSERT_EQ(Get(Key(j)), values[j]); | |
1121 | } | |
1122 | } | |
1123 | ||
1124 | ASSERT_EQ(trivial_move, 1); | |
1125 | ASSERT_EQ(non_trivial_move, 0); | |
1126 | ||
1127 | trivial_move = 0; | |
1128 | non_trivial_move = 0; | |
1129 | values.clear(); | |
1130 | DestroyAndReopen(options); | |
1131 | // Same ranges as above but overlapping | |
1132 | ranges = { | |
1133 | {100, 199}, | |
1134 | {300, 399}, | |
1135 | {0, 99}, | |
1136 | {200, 299}, | |
1137 | {600, 699}, | |
1138 | {400, 499}, | |
1139 | {500, 560}, // this range overlap with the next one | |
1140 | {551, 599}, | |
1141 | }; | |
1142 | for (size_t i = 0; i < ranges.size(); i++) { | |
1143 | for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { | |
1144 | values[j] = RandomString(&rnd, value_size); | |
1145 | ASSERT_OK(Put(Key(j), values[j])); | |
1146 | } | |
1147 | ASSERT_OK(Flush()); | |
1148 | } | |
1149 | ||
1150 | db_->CompactRange(cro, nullptr, nullptr); | |
1151 | ||
1152 | for (size_t i = 0; i < ranges.size(); i++) { | |
1153 | for (int32_t j = ranges[i].first; j <= ranges[i].second; j++) { | |
1154 | ASSERT_EQ(Get(Key(j)), values[j]); | |
1155 | } | |
1156 | } | |
1157 | ASSERT_EQ(trivial_move, 0); | |
1158 | ASSERT_EQ(non_trivial_move, 1); | |
1159 | ||
1160 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
1161 | } | |
1162 | ||
1163 | TEST_P(DBCompactionTestWithParam, TrivialMoveTargetLevel) { | |
1164 | int32_t trivial_move = 0; | |
1165 | int32_t non_trivial_move = 0; | |
1166 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
1167 | "DBImpl::BackgroundCompaction:TrivialMove", | |
11fdf7f2 | 1168 | [&](void* /*arg*/) { trivial_move++; }); |
7c673cae FG |
1169 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
1170 | "DBImpl::BackgroundCompaction:NonTrivial", | |
11fdf7f2 | 1171 | [&](void* /*arg*/) { non_trivial_move++; }); |
7c673cae FG |
1172 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
1173 | ||
1174 | Options options = CurrentOptions(); | |
1175 | options.disable_auto_compactions = true; | |
1176 | options.write_buffer_size = 10 * 1024 * 1024; | |
1177 | options.num_levels = 7; | |
1178 | options.max_subcompactions = max_subcompactions_; | |
1179 | ||
1180 | DestroyAndReopen(options); | |
1181 | int32_t value_size = 10 * 1024; // 10 KB | |
1182 | ||
1183 | // Add 2 non-overlapping files | |
1184 | Random rnd(301); | |
1185 | std::map<int32_t, std::string> values; | |
1186 | ||
1187 | // file 1 [0 => 300] | |
1188 | for (int32_t i = 0; i <= 300; i++) { | |
1189 | values[i] = RandomString(&rnd, value_size); | |
1190 | ASSERT_OK(Put(Key(i), values[i])); | |
1191 | } | |
1192 | ASSERT_OK(Flush()); | |
1193 | ||
1194 | // file 2 [600 => 700] | |
1195 | for (int32_t i = 600; i <= 700; i++) { | |
1196 | values[i] = RandomString(&rnd, value_size); | |
1197 | ASSERT_OK(Put(Key(i), values[i])); | |
1198 | } | |
1199 | ASSERT_OK(Flush()); | |
1200 | ||
1201 | // 2 files in L0 | |
1202 | ASSERT_EQ("2", FilesPerLevel(0)); | |
1203 | CompactRangeOptions compact_options; | |
1204 | compact_options.change_level = true; | |
1205 | compact_options.target_level = 6; | |
1206 | compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; | |
1207 | ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); | |
1208 | // 2 files in L6 | |
1209 | ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0)); | |
1210 | ||
1211 | ASSERT_EQ(trivial_move, 1); | |
1212 | ASSERT_EQ(non_trivial_move, 0); | |
1213 | ||
1214 | for (int32_t i = 0; i <= 300; i++) { | |
1215 | ASSERT_EQ(Get(Key(i)), values[i]); | |
1216 | } | |
1217 | for (int32_t i = 600; i <= 700; i++) { | |
1218 | ASSERT_EQ(Get(Key(i)), values[i]); | |
1219 | } | |
1220 | } | |
1221 | ||
1222 | TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) { | |
1223 | int32_t trivial_move = 0; | |
1224 | int32_t non_trivial_move = 0; | |
1225 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
1226 | "DBImpl::BackgroundCompaction:TrivialMove", | |
11fdf7f2 | 1227 | [&](void* /*arg*/) { trivial_move++; }); |
7c673cae FG |
1228 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
1229 | "DBImpl::BackgroundCompaction:NonTrivial", | |
11fdf7f2 | 1230 | [&](void* /*arg*/) { non_trivial_move++; }); |
7c673cae FG |
1231 | bool first = true; |
1232 | // Purpose of dependencies: | |
1233 | // 4 -> 1: ensure the order of two non-trivial compactions | |
1234 | // 5 -> 2 and 5 -> 3: ensure we do a check before two non-trivial compactions | |
1235 | // are installed | |
1236 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
1237 | {{"DBCompaction::ManualPartial:4", "DBCompaction::ManualPartial:1"}, | |
1238 | {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:2"}, | |
1239 | {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:3"}}); | |
1240 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
11fdf7f2 | 1241 | "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) { |
7c673cae FG |
1242 | if (first) { |
1243 | first = false; | |
1244 | TEST_SYNC_POINT("DBCompaction::ManualPartial:4"); | |
1245 | TEST_SYNC_POINT("DBCompaction::ManualPartial:3"); | |
1246 | } else { // second non-trivial compaction | |
1247 | TEST_SYNC_POINT("DBCompaction::ManualPartial:2"); | |
1248 | } | |
1249 | }); | |
1250 | ||
1251 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
1252 | ||
1253 | Options options = CurrentOptions(); | |
1254 | options.write_buffer_size = 10 * 1024 * 1024; | |
1255 | options.num_levels = 7; | |
1256 | options.max_subcompactions = max_subcompactions_; | |
1257 | options.level0_file_num_compaction_trigger = 3; | |
1258 | options.max_background_compactions = 3; | |
1259 | options.target_file_size_base = 1 << 23; // 8 MB | |
1260 | ||
1261 | DestroyAndReopen(options); | |
1262 | int32_t value_size = 10 * 1024; // 10 KB | |
1263 | ||
1264 | // Add 2 non-overlapping files | |
1265 | Random rnd(301); | |
1266 | std::map<int32_t, std::string> values; | |
1267 | ||
1268 | // file 1 [0 => 100] | |
1269 | for (int32_t i = 0; i < 100; i++) { | |
1270 | values[i] = RandomString(&rnd, value_size); | |
1271 | ASSERT_OK(Put(Key(i), values[i])); | |
1272 | } | |
1273 | ASSERT_OK(Flush()); | |
1274 | ||
1275 | // file 2 [100 => 300] | |
1276 | for (int32_t i = 100; i < 300; i++) { | |
1277 | values[i] = RandomString(&rnd, value_size); | |
1278 | ASSERT_OK(Put(Key(i), values[i])); | |
1279 | } | |
1280 | ASSERT_OK(Flush()); | |
1281 | ||
1282 | // 2 files in L0 | |
1283 | ASSERT_EQ("2", FilesPerLevel(0)); | |
1284 | CompactRangeOptions compact_options; | |
1285 | compact_options.change_level = true; | |
1286 | compact_options.target_level = 6; | |
1287 | compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; | |
1288 | // Trivial move the two non-overlapping files to level 6 | |
1289 | ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); | |
1290 | // 2 files in L6 | |
1291 | ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel(0)); | |
1292 | ||
1293 | ASSERT_EQ(trivial_move, 1); | |
1294 | ASSERT_EQ(non_trivial_move, 0); | |
1295 | ||
1296 | // file 3 [ 0 => 200] | |
1297 | for (int32_t i = 0; i < 200; i++) { | |
1298 | values[i] = RandomString(&rnd, value_size); | |
1299 | ASSERT_OK(Put(Key(i), values[i])); | |
1300 | } | |
1301 | ASSERT_OK(Flush()); | |
1302 | ||
1303 | // 1 files in L0 | |
1304 | ASSERT_EQ("1,0,0,0,0,0,2", FilesPerLevel(0)); | |
1305 | ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false)); | |
1306 | ASSERT_OK(dbfull()->TEST_CompactRange(1, nullptr, nullptr, nullptr, false)); | |
1307 | ASSERT_OK(dbfull()->TEST_CompactRange(2, nullptr, nullptr, nullptr, false)); | |
1308 | ASSERT_OK(dbfull()->TEST_CompactRange(3, nullptr, nullptr, nullptr, false)); | |
1309 | ASSERT_OK(dbfull()->TEST_CompactRange(4, nullptr, nullptr, nullptr, false)); | |
1310 | // 2 files in L6, 1 file in L5 | |
1311 | ASSERT_EQ("0,0,0,0,0,1,2", FilesPerLevel(0)); | |
1312 | ||
1313 | ASSERT_EQ(trivial_move, 6); | |
1314 | ASSERT_EQ(non_trivial_move, 0); | |
1315 | ||
1316 | rocksdb::port::Thread threads([&] { | |
1317 | compact_options.change_level = false; | |
1318 | compact_options.exclusive_manual_compaction = false; | |
1319 | std::string begin_string = Key(0); | |
1320 | std::string end_string = Key(199); | |
1321 | Slice begin(begin_string); | |
1322 | Slice end(end_string); | |
1323 | // First non-trivial compaction is triggered | |
1324 | ASSERT_OK(db_->CompactRange(compact_options, &begin, &end)); | |
1325 | }); | |
1326 | ||
1327 | TEST_SYNC_POINT("DBCompaction::ManualPartial:1"); | |
1328 | // file 4 [300 => 400) | |
1329 | for (int32_t i = 300; i <= 400; i++) { | |
1330 | values[i] = RandomString(&rnd, value_size); | |
1331 | ASSERT_OK(Put(Key(i), values[i])); | |
1332 | } | |
1333 | ASSERT_OK(Flush()); | |
1334 | ||
1335 | // file 5 [400 => 500) | |
1336 | for (int32_t i = 400; i <= 500; i++) { | |
1337 | values[i] = RandomString(&rnd, value_size); | |
1338 | ASSERT_OK(Put(Key(i), values[i])); | |
1339 | } | |
1340 | ASSERT_OK(Flush()); | |
1341 | ||
1342 | // file 6 [500 => 600) | |
1343 | for (int32_t i = 500; i <= 600; i++) { | |
1344 | values[i] = RandomString(&rnd, value_size); | |
1345 | ASSERT_OK(Put(Key(i), values[i])); | |
1346 | } | |
1347 | // Second non-trivial compaction is triggered | |
1348 | ASSERT_OK(Flush()); | |
1349 | ||
1350 | // Before two non-trivial compactions are installed, there are 3 files in L0 | |
1351 | ASSERT_EQ("3,0,0,0,0,1,2", FilesPerLevel(0)); | |
1352 | TEST_SYNC_POINT("DBCompaction::ManualPartial:5"); | |
1353 | ||
1354 | dbfull()->TEST_WaitForFlushMemTable(); | |
1355 | dbfull()->TEST_WaitForCompact(); | |
1356 | // After two non-trivial compactions are installed, there is 1 file in L6, and | |
1357 | // 1 file in L1 | |
1358 | ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel(0)); | |
1359 | threads.join(); | |
1360 | ||
1361 | for (int32_t i = 0; i < 600; i++) { | |
1362 | ASSERT_EQ(Get(Key(i)), values[i]); | |
1363 | } | |
1364 | } | |
1365 | ||
1366 | // Disable as the test is flaky. | |
1367 | TEST_F(DBCompactionTest, DISABLED_ManualPartialFill) { | |
1368 | int32_t trivial_move = 0; | |
1369 | int32_t non_trivial_move = 0; | |
1370 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
1371 | "DBImpl::BackgroundCompaction:TrivialMove", | |
11fdf7f2 | 1372 | [&](void* /*arg*/) { trivial_move++; }); |
7c673cae FG |
1373 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
1374 | "DBImpl::BackgroundCompaction:NonTrivial", | |
11fdf7f2 | 1375 | [&](void* /*arg*/) { non_trivial_move++; }); |
7c673cae FG |
1376 | bool first = true; |
1377 | bool second = true; | |
1378 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
1379 | {{"DBCompaction::PartialFill:4", "DBCompaction::PartialFill:1"}, | |
1380 | {"DBCompaction::PartialFill:2", "DBCompaction::PartialFill:3"}}); | |
1381 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
11fdf7f2 | 1382 | "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* /*arg*/) { |
7c673cae FG |
1383 | if (first) { |
1384 | TEST_SYNC_POINT("DBCompaction::PartialFill:4"); | |
1385 | first = false; | |
1386 | TEST_SYNC_POINT("DBCompaction::PartialFill:3"); | |
1387 | } else if (second) { | |
1388 | } | |
1389 | }); | |
1390 | ||
1391 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
1392 | ||
1393 | Options options = CurrentOptions(); | |
1394 | options.write_buffer_size = 10 * 1024 * 1024; | |
1395 | options.max_bytes_for_level_multiplier = 2; | |
1396 | options.num_levels = 4; | |
1397 | options.level0_file_num_compaction_trigger = 3; | |
1398 | options.max_background_compactions = 3; | |
1399 | ||
1400 | DestroyAndReopen(options); | |
11fdf7f2 TL |
1401 | // make sure all background compaction jobs can be scheduled |
1402 | auto stop_token = | |
1403 | dbfull()->TEST_write_controler().GetCompactionPressureToken(); | |
7c673cae FG |
1404 | int32_t value_size = 10 * 1024; // 10 KB |
1405 | ||
1406 | // Add 2 non-overlapping files | |
1407 | Random rnd(301); | |
1408 | std::map<int32_t, std::string> values; | |
1409 | ||
1410 | // file 1 [0 => 100] | |
1411 | for (int32_t i = 0; i < 100; i++) { | |
1412 | values[i] = RandomString(&rnd, value_size); | |
1413 | ASSERT_OK(Put(Key(i), values[i])); | |
1414 | } | |
1415 | ASSERT_OK(Flush()); | |
1416 | ||
1417 | // file 2 [100 => 300] | |
1418 | for (int32_t i = 100; i < 300; i++) { | |
1419 | values[i] = RandomString(&rnd, value_size); | |
1420 | ASSERT_OK(Put(Key(i), values[i])); | |
1421 | } | |
1422 | ASSERT_OK(Flush()); | |
1423 | ||
1424 | // 2 files in L0 | |
1425 | ASSERT_EQ("2", FilesPerLevel(0)); | |
1426 | CompactRangeOptions compact_options; | |
1427 | compact_options.change_level = true; | |
1428 | compact_options.target_level = 2; | |
1429 | ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); | |
1430 | // 2 files in L2 | |
1431 | ASSERT_EQ("0,0,2", FilesPerLevel(0)); | |
1432 | ||
1433 | ASSERT_EQ(trivial_move, 1); | |
1434 | ASSERT_EQ(non_trivial_move, 0); | |
1435 | ||
1436 | // file 3 [ 0 => 200] | |
1437 | for (int32_t i = 0; i < 200; i++) { | |
1438 | values[i] = RandomString(&rnd, value_size); | |
1439 | ASSERT_OK(Put(Key(i), values[i])); | |
1440 | } | |
1441 | ASSERT_OK(Flush()); | |
1442 | ||
1443 | // 2 files in L2, 1 in L0 | |
1444 | ASSERT_EQ("1,0,2", FilesPerLevel(0)); | |
1445 | ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, nullptr, false)); | |
1446 | // 2 files in L2, 1 in L1 | |
1447 | ASSERT_EQ("0,1,2", FilesPerLevel(0)); | |
1448 | ||
1449 | ASSERT_EQ(trivial_move, 2); | |
1450 | ASSERT_EQ(non_trivial_move, 0); | |
1451 | ||
1452 | rocksdb::port::Thread threads([&] { | |
1453 | compact_options.change_level = false; | |
1454 | compact_options.exclusive_manual_compaction = false; | |
1455 | std::string begin_string = Key(0); | |
1456 | std::string end_string = Key(199); | |
1457 | Slice begin(begin_string); | |
1458 | Slice end(end_string); | |
1459 | ASSERT_OK(db_->CompactRange(compact_options, &begin, &end)); | |
1460 | }); | |
1461 | ||
1462 | TEST_SYNC_POINT("DBCompaction::PartialFill:1"); | |
1463 | // Many files 4 [300 => 4300) | |
1464 | for (int32_t i = 0; i <= 5; i++) { | |
1465 | for (int32_t j = 300; j < 4300; j++) { | |
1466 | if (j == 2300) { | |
1467 | ASSERT_OK(Flush()); | |
1468 | dbfull()->TEST_WaitForFlushMemTable(); | |
1469 | } | |
1470 | values[j] = RandomString(&rnd, value_size); | |
1471 | ASSERT_OK(Put(Key(j), values[j])); | |
1472 | } | |
1473 | } | |
1474 | ||
1475 | // Verify level sizes | |
1476 | uint64_t target_size = 4 * options.max_bytes_for_level_base; | |
1477 | for (int32_t i = 1; i < options.num_levels; i++) { | |
1478 | ASSERT_LE(SizeAtLevel(i), target_size); | |
1479 | target_size = static_cast<uint64_t>(target_size * | |
1480 | options.max_bytes_for_level_multiplier); | |
1481 | } | |
1482 | ||
1483 | TEST_SYNC_POINT("DBCompaction::PartialFill:2"); | |
1484 | dbfull()->TEST_WaitForFlushMemTable(); | |
1485 | dbfull()->TEST_WaitForCompact(); | |
1486 | threads.join(); | |
1487 | ||
1488 | for (int32_t i = 0; i < 4300; i++) { | |
1489 | ASSERT_EQ(Get(Key(i)), values[i]); | |
1490 | } | |
1491 | } | |
1492 | ||
1493 | TEST_F(DBCompactionTest, DeleteFileRange) { | |
1494 | Options options = CurrentOptions(); | |
1495 | options.write_buffer_size = 10 * 1024 * 1024; | |
1496 | options.max_bytes_for_level_multiplier = 2; | |
1497 | options.num_levels = 4; | |
1498 | options.level0_file_num_compaction_trigger = 3; | |
1499 | options.max_background_compactions = 3; | |
1500 | ||
1501 | DestroyAndReopen(options); | |
1502 | int32_t value_size = 10 * 1024; // 10 KB | |
1503 | ||
1504 | // Add 2 non-overlapping files | |
1505 | Random rnd(301); | |
1506 | std::map<int32_t, std::string> values; | |
1507 | ||
1508 | // file 1 [0 => 100] | |
1509 | for (int32_t i = 0; i < 100; i++) { | |
1510 | values[i] = RandomString(&rnd, value_size); | |
1511 | ASSERT_OK(Put(Key(i), values[i])); | |
1512 | } | |
1513 | ASSERT_OK(Flush()); | |
1514 | ||
1515 | // file 2 [100 => 300] | |
1516 | for (int32_t i = 100; i < 300; i++) { | |
1517 | values[i] = RandomString(&rnd, value_size); | |
1518 | ASSERT_OK(Put(Key(i), values[i])); | |
1519 | } | |
1520 | ASSERT_OK(Flush()); | |
1521 | ||
1522 | // 2 files in L0 | |
1523 | ASSERT_EQ("2", FilesPerLevel(0)); | |
1524 | CompactRangeOptions compact_options; | |
1525 | compact_options.change_level = true; | |
1526 | compact_options.target_level = 2; | |
1527 | ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); | |
1528 | // 2 files in L2 | |
1529 | ASSERT_EQ("0,0,2", FilesPerLevel(0)); | |
1530 | ||
1531 | // file 3 [ 0 => 200] | |
1532 | for (int32_t i = 0; i < 200; i++) { | |
1533 | values[i] = RandomString(&rnd, value_size); | |
1534 | ASSERT_OK(Put(Key(i), values[i])); | |
1535 | } | |
1536 | ASSERT_OK(Flush()); | |
1537 | ||
1538 | // Many files 4 [300 => 4300) | |
1539 | for (int32_t i = 0; i <= 5; i++) { | |
1540 | for (int32_t j = 300; j < 4300; j++) { | |
1541 | if (j == 2300) { | |
1542 | ASSERT_OK(Flush()); | |
1543 | dbfull()->TEST_WaitForFlushMemTable(); | |
1544 | } | |
1545 | values[j] = RandomString(&rnd, value_size); | |
1546 | ASSERT_OK(Put(Key(j), values[j])); | |
1547 | } | |
1548 | } | |
1549 | ASSERT_OK(Flush()); | |
1550 | dbfull()->TEST_WaitForFlushMemTable(); | |
1551 | dbfull()->TEST_WaitForCompact(); | |
1552 | ||
1553 | // Verify level sizes | |
1554 | uint64_t target_size = 4 * options.max_bytes_for_level_base; | |
1555 | for (int32_t i = 1; i < options.num_levels; i++) { | |
1556 | ASSERT_LE(SizeAtLevel(i), target_size); | |
1557 | target_size = static_cast<uint64_t>(target_size * | |
1558 | options.max_bytes_for_level_multiplier); | |
1559 | } | |
1560 | ||
1561 | size_t old_num_files = CountFiles(); | |
1562 | std::string begin_string = Key(1000); | |
1563 | std::string end_string = Key(2000); | |
1564 | Slice begin(begin_string); | |
1565 | Slice end(end_string); | |
1566 | ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end)); | |
1567 | ||
1568 | int32_t deleted_count = 0; | |
1569 | for (int32_t i = 0; i < 4300; i++) { | |
1570 | if (i < 1000 || i > 2000) { | |
1571 | ASSERT_EQ(Get(Key(i)), values[i]); | |
1572 | } else { | |
1573 | ReadOptions roptions; | |
1574 | std::string result; | |
1575 | Status s = db_->Get(roptions, Key(i), &result); | |
1576 | ASSERT_TRUE(s.IsNotFound() || s.ok()); | |
1577 | if (s.IsNotFound()) { | |
1578 | deleted_count++; | |
1579 | } | |
1580 | } | |
1581 | } | |
1582 | ASSERT_GT(deleted_count, 0); | |
1583 | begin_string = Key(5000); | |
1584 | end_string = Key(6000); | |
1585 | Slice begin1(begin_string); | |
1586 | Slice end1(end_string); | |
1587 | // Try deleting files in range which contain no keys | |
1588 | ASSERT_OK( | |
1589 | DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin1, &end1)); | |
1590 | ||
1591 | // Push data from level 0 to level 1 to force all data to be deleted | |
1592 | // Note that we don't delete level 0 files | |
1593 | compact_options.change_level = true; | |
1594 | compact_options.target_level = 1; | |
1595 | ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr)); | |
1596 | ||
1597 | ASSERT_OK( | |
1598 | DeleteFilesInRange(db_, db_->DefaultColumnFamily(), nullptr, nullptr)); | |
1599 | ||
1600 | int32_t deleted_count2 = 0; | |
1601 | for (int32_t i = 0; i < 4300; i++) { | |
1602 | ReadOptions roptions; | |
1603 | std::string result; | |
1604 | Status s = db_->Get(roptions, Key(i), &result); | |
1605 | ASSERT_TRUE(s.IsNotFound()); | |
1606 | deleted_count2++; | |
1607 | } | |
1608 | ASSERT_GT(deleted_count2, deleted_count); | |
1609 | size_t new_num_files = CountFiles(); | |
1610 | ASSERT_GT(old_num_files, new_num_files); | |
1611 | } | |
1612 | ||
11fdf7f2 TL |
1613 | TEST_F(DBCompactionTest, DeleteFilesInRanges) { |
1614 | Options options = CurrentOptions(); | |
1615 | options.write_buffer_size = 10 * 1024 * 1024; | |
1616 | options.max_bytes_for_level_multiplier = 2; | |
1617 | options.num_levels = 4; | |
1618 | options.max_background_compactions = 3; | |
1619 | options.disable_auto_compactions = true; | |
1620 | ||
1621 | DestroyAndReopen(options); | |
1622 | int32_t value_size = 10 * 1024; // 10 KB | |
1623 | ||
1624 | Random rnd(301); | |
1625 | std::map<int32_t, std::string> values; | |
1626 | ||
1627 | // file [0 => 100), [100 => 200), ... [900, 1000) | |
1628 | for (auto i = 0; i < 10; i++) { | |
1629 | for (auto j = 0; j < 100; j++) { | |
1630 | auto k = i * 100 + j; | |
1631 | values[k] = RandomString(&rnd, value_size); | |
1632 | ASSERT_OK(Put(Key(k), values[k])); | |
1633 | } | |
1634 | ASSERT_OK(Flush()); | |
1635 | } | |
1636 | ASSERT_EQ("10", FilesPerLevel(0)); | |
1637 | CompactRangeOptions compact_options; | |
1638 | compact_options.change_level = true; | |
1639 | compact_options.target_level = 2; | |
1640 | ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); | |
1641 | ASSERT_EQ("0,0,10", FilesPerLevel(0)); | |
1642 | ||
1643 | // file [0 => 100), [200 => 300), ... [800, 900) | |
1644 | for (auto i = 0; i < 10; i+=2) { | |
1645 | for (auto j = 0; j < 100; j++) { | |
1646 | auto k = i * 100 + j; | |
1647 | ASSERT_OK(Put(Key(k), values[k])); | |
1648 | } | |
1649 | ASSERT_OK(Flush()); | |
1650 | } | |
1651 | ASSERT_EQ("5,0,10", FilesPerLevel(0)); | |
1652 | ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr)); | |
1653 | ASSERT_EQ("0,5,10", FilesPerLevel(0)); | |
1654 | ||
1655 | // Delete files in range [0, 299] (inclusive) | |
1656 | { | |
1657 | auto begin_str1 = Key(0), end_str1 = Key(100); | |
1658 | auto begin_str2 = Key(100), end_str2 = Key(200); | |
1659 | auto begin_str3 = Key(200), end_str3 = Key(299); | |
1660 | Slice begin1(begin_str1), end1(end_str1); | |
1661 | Slice begin2(begin_str2), end2(end_str2); | |
1662 | Slice begin3(begin_str3), end3(end_str3); | |
1663 | std::vector<RangePtr> ranges; | |
1664 | ranges.push_back(RangePtr(&begin1, &end1)); | |
1665 | ranges.push_back(RangePtr(&begin2, &end2)); | |
1666 | ranges.push_back(RangePtr(&begin3, &end3)); | |
1667 | ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(), | |
1668 | ranges.data(), ranges.size())); | |
1669 | ASSERT_EQ("0,3,7", FilesPerLevel(0)); | |
1670 | ||
1671 | // Keys [0, 300) should not exist. | |
1672 | for (auto i = 0; i < 300; i++) { | |
1673 | ReadOptions ropts; | |
1674 | std::string result; | |
1675 | auto s = db_->Get(ropts, Key(i), &result); | |
1676 | ASSERT_TRUE(s.IsNotFound()); | |
1677 | } | |
1678 | for (auto i = 300; i < 1000; i++) { | |
1679 | ASSERT_EQ(Get(Key(i)), values[i]); | |
1680 | } | |
1681 | } | |
1682 | ||
1683 | // Delete files in range [600, 999) (exclusive) | |
1684 | { | |
1685 | auto begin_str1 = Key(600), end_str1 = Key(800); | |
1686 | auto begin_str2 = Key(700), end_str2 = Key(900); | |
1687 | auto begin_str3 = Key(800), end_str3 = Key(999); | |
1688 | Slice begin1(begin_str1), end1(end_str1); | |
1689 | Slice begin2(begin_str2), end2(end_str2); | |
1690 | Slice begin3(begin_str3), end3(end_str3); | |
1691 | std::vector<RangePtr> ranges; | |
1692 | ranges.push_back(RangePtr(&begin1, &end1)); | |
1693 | ranges.push_back(RangePtr(&begin2, &end2)); | |
1694 | ranges.push_back(RangePtr(&begin3, &end3)); | |
1695 | ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(), | |
1696 | ranges.data(), ranges.size(), false)); | |
1697 | ASSERT_EQ("0,1,4", FilesPerLevel(0)); | |
1698 | ||
1699 | // Keys [600, 900) should not exist. | |
1700 | for (auto i = 600; i < 900; i++) { | |
1701 | ReadOptions ropts; | |
1702 | std::string result; | |
1703 | auto s = db_->Get(ropts, Key(i), &result); | |
1704 | ASSERT_TRUE(s.IsNotFound()); | |
1705 | } | |
1706 | for (auto i = 300; i < 600; i++) { | |
1707 | ASSERT_EQ(Get(Key(i)), values[i]); | |
1708 | } | |
1709 | for (auto i = 900; i < 1000; i++) { | |
1710 | ASSERT_EQ(Get(Key(i)), values[i]); | |
1711 | } | |
1712 | } | |
1713 | ||
1714 | // Delete all files. | |
1715 | { | |
1716 | RangePtr range; | |
1717 | ASSERT_OK(DeleteFilesInRanges(db_, db_->DefaultColumnFamily(), &range, 1)); | |
1718 | ASSERT_EQ("", FilesPerLevel(0)); | |
1719 | ||
1720 | for (auto i = 0; i < 1000; i++) { | |
1721 | ReadOptions ropts; | |
1722 | std::string result; | |
1723 | auto s = db_->Get(ropts, Key(i), &result); | |
1724 | ASSERT_TRUE(s.IsNotFound()); | |
1725 | } | |
1726 | } | |
1727 | } | |
1728 | ||
1729 | TEST_F(DBCompactionTest, DeleteFileRangeFileEndpointsOverlapBug) { | |
1730 | // regression test for #2833: groups of files whose user-keys overlap at the | |
1731 | // endpoints could be split by `DeleteFilesInRange`. This caused old data to | |
1732 | // reappear, either because a new version of the key was removed, or a range | |
1733 | // deletion was partially dropped. It could also cause non-overlapping | |
1734 | // invariant to be violated if the files dropped by DeleteFilesInRange were | |
1735 | // a subset of files that a range deletion spans. | |
1736 | const int kNumL0Files = 2; | |
1737 | const int kValSize = 8 << 10; // 8KB | |
1738 | Options options = CurrentOptions(); | |
1739 | options.level0_file_num_compaction_trigger = kNumL0Files; | |
1740 | options.target_file_size_base = 1 << 10; // 1KB | |
1741 | DestroyAndReopen(options); | |
1742 | ||
1743 | // The snapshot prevents key 1 from having its old version dropped. The low | |
1744 | // `target_file_size_base` ensures two keys will be in each output file. | |
1745 | const Snapshot* snapshot = nullptr; | |
1746 | Random rnd(301); | |
1747 | // The value indicates which flush the key belonged to, which is enough | |
1748 | // for us to determine the keys' relative ages. After L0 flushes finish, | |
1749 | // files look like: | |
1750 | // | |
1751 | // File 0: 0 -> vals[0], 1 -> vals[0] | |
1752 | // File 1: 1 -> vals[1], 2 -> vals[1] | |
1753 | // | |
1754 | // Then L0->L1 compaction happens, which outputs keys as follows: | |
1755 | // | |
1756 | // File 0: 0 -> vals[0], 1 -> vals[1] | |
1757 | // File 1: 1 -> vals[0], 2 -> vals[1] | |
1758 | // | |
1759 | // DeleteFilesInRange shouldn't be allowed to drop just file 0, as that | |
1760 | // would cause `1 -> vals[0]` (an older key) to reappear. | |
1761 | std::string vals[kNumL0Files]; | |
1762 | for (int i = 0; i < kNumL0Files; ++i) { | |
1763 | vals[i] = RandomString(&rnd, kValSize); | |
1764 | Put(Key(i), vals[i]); | |
1765 | Put(Key(i + 1), vals[i]); | |
1766 | Flush(); | |
1767 | if (i == 0) { | |
1768 | snapshot = db_->GetSnapshot(); | |
1769 | } | |
1770 | } | |
1771 | dbfull()->TEST_WaitForCompact(); | |
1772 | ||
1773 | // Verify `DeleteFilesInRange` can't drop only file 0 which would cause | |
1774 | // "1 -> vals[0]" to reappear. | |
1775 | std::string begin_str = Key(0), end_str = Key(1); | |
1776 | Slice begin = begin_str, end = end_str; | |
1777 | ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end)); | |
1778 | ASSERT_EQ(vals[1], Get(Key(1))); | |
1779 | ||
1780 | db_->ReleaseSnapshot(snapshot); | |
1781 | } | |
1782 | ||
7c673cae FG |
1783 | TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) { |
1784 | int32_t trivial_move = 0; | |
1785 | int32_t non_trivial_move = 0; | |
1786 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
1787 | "DBImpl::BackgroundCompaction:TrivialMove", | |
11fdf7f2 | 1788 | [&](void* /*arg*/) { trivial_move++; }); |
7c673cae FG |
1789 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
1790 | "DBImpl::BackgroundCompaction:NonTrivial", | |
11fdf7f2 | 1791 | [&](void* /*arg*/) { non_trivial_move++; }); |
7c673cae FG |
1792 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
1793 | ||
1794 | Options options = CurrentOptions(); | |
1795 | options.write_buffer_size = 100000000; | |
1796 | options.max_subcompactions = max_subcompactions_; | |
1797 | DestroyAndReopen(options); | |
1798 | ||
1799 | int32_t value_size = 10 * 1024; // 10 KB | |
1800 | ||
1801 | Random rnd(301); | |
1802 | std::vector<std::string> values; | |
1803 | // File with keys [ 0 => 99 ] | |
1804 | for (int i = 0; i < 100; i++) { | |
1805 | values.push_back(RandomString(&rnd, value_size)); | |
1806 | ASSERT_OK(Put(Key(i), values[i])); | |
1807 | } | |
1808 | ASSERT_OK(Flush()); | |
1809 | ||
1810 | ASSERT_EQ("1", FilesPerLevel(0)); | |
1811 | // Compaction will do L0=>L1 (trivial move) then move L1 files to L3 | |
1812 | CompactRangeOptions compact_options; | |
1813 | compact_options.change_level = true; | |
1814 | compact_options.target_level = 3; | |
1815 | compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; | |
1816 | ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); | |
1817 | ASSERT_EQ("0,0,0,1", FilesPerLevel(0)); | |
1818 | ASSERT_EQ(trivial_move, 1); | |
1819 | ASSERT_EQ(non_trivial_move, 0); | |
1820 | ||
1821 | // File with keys [ 100 => 199 ] | |
1822 | for (int i = 100; i < 200; i++) { | |
1823 | values.push_back(RandomString(&rnd, value_size)); | |
1824 | ASSERT_OK(Put(Key(i), values[i])); | |
1825 | } | |
1826 | ASSERT_OK(Flush()); | |
1827 | ||
1828 | ASSERT_EQ("1,0,0,1", FilesPerLevel(0)); | |
1829 | CompactRangeOptions cro; | |
1830 | cro.exclusive_manual_compaction = exclusive_manual_compaction_; | |
1831 | // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves) | |
1832 | ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr)); | |
1833 | ASSERT_EQ("0,0,0,2", FilesPerLevel(0)); | |
1834 | ASSERT_EQ(trivial_move, 4); | |
1835 | ASSERT_EQ(non_trivial_move, 0); | |
1836 | ||
1837 | for (int i = 0; i < 200; i++) { | |
1838 | ASSERT_EQ(Get(Key(i)), values[i]); | |
1839 | } | |
1840 | ||
1841 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
1842 | } | |
1843 | ||
1844 | TEST_P(DBCompactionTestWithParam, LevelCompactionThirdPath) { | |
1845 | Options options = CurrentOptions(); | |
1846 | options.db_paths.emplace_back(dbname_, 500 * 1024); | |
1847 | options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024); | |
1848 | options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024); | |
1849 | options.memtable_factory.reset( | |
1850 | new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1)); | |
1851 | options.compaction_style = kCompactionStyleLevel; | |
1852 | options.write_buffer_size = 110 << 10; // 110KB | |
1853 | options.arena_block_size = 4 << 10; | |
1854 | options.level0_file_num_compaction_trigger = 2; | |
1855 | options.num_levels = 4; | |
1856 | options.max_bytes_for_level_base = 400 * 1024; | |
1857 | options.max_subcompactions = max_subcompactions_; | |
1858 | // options = CurrentOptions(options); | |
1859 | ||
1860 | std::vector<std::string> filenames; | |
1861 | env_->GetChildren(options.db_paths[1].path, &filenames); | |
1862 | // Delete archival files. | |
1863 | for (size_t i = 0; i < filenames.size(); ++i) { | |
1864 | env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]); | |
1865 | } | |
1866 | env_->DeleteDir(options.db_paths[1].path); | |
1867 | Reopen(options); | |
1868 | ||
1869 | Random rnd(301); | |
1870 | int key_idx = 0; | |
1871 | ||
1872 | // First three 110KB files are not going to second path. | |
1873 | // After that, (100K, 200K) | |
1874 | for (int num = 0; num < 3; num++) { | |
1875 | GenerateNewFile(&rnd, &key_idx); | |
1876 | } | |
1877 | ||
1878 | // Another 110KB triggers a compaction to 400K file to fill up first path | |
1879 | GenerateNewFile(&rnd, &key_idx); | |
1880 | ASSERT_EQ(3, GetSstFileCount(options.db_paths[1].path)); | |
1881 | ||
1882 | // (1, 4) | |
1883 | GenerateNewFile(&rnd, &key_idx); | |
1884 | ASSERT_EQ("1,4", FilesPerLevel(0)); | |
1885 | ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); | |
1886 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
1887 | ||
1888 | // (1, 4, 1) | |
1889 | GenerateNewFile(&rnd, &key_idx); | |
1890 | ASSERT_EQ("1,4,1", FilesPerLevel(0)); | |
1891 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[2].path)); | |
1892 | ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); | |
1893 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
1894 | ||
1895 | // (1, 4, 2) | |
1896 | GenerateNewFile(&rnd, &key_idx); | |
1897 | ASSERT_EQ("1,4,2", FilesPerLevel(0)); | |
1898 | ASSERT_EQ(2, GetSstFileCount(options.db_paths[2].path)); | |
1899 | ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); | |
1900 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
1901 | ||
1902 | // (1, 4, 3) | |
1903 | GenerateNewFile(&rnd, &key_idx); | |
1904 | ASSERT_EQ("1,4,3", FilesPerLevel(0)); | |
1905 | ASSERT_EQ(3, GetSstFileCount(options.db_paths[2].path)); | |
1906 | ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); | |
1907 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
1908 | ||
1909 | // (1, 4, 4) | |
1910 | GenerateNewFile(&rnd, &key_idx); | |
1911 | ASSERT_EQ("1,4,4", FilesPerLevel(0)); | |
1912 | ASSERT_EQ(4, GetSstFileCount(options.db_paths[2].path)); | |
1913 | ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); | |
1914 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
1915 | ||
1916 | // (1, 4, 5) | |
1917 | GenerateNewFile(&rnd, &key_idx); | |
1918 | ASSERT_EQ("1,4,5", FilesPerLevel(0)); | |
1919 | ASSERT_EQ(5, GetSstFileCount(options.db_paths[2].path)); | |
1920 | ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); | |
1921 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
1922 | ||
1923 | // (1, 4, 6) | |
1924 | GenerateNewFile(&rnd, &key_idx); | |
1925 | ASSERT_EQ("1,4,6", FilesPerLevel(0)); | |
1926 | ASSERT_EQ(6, GetSstFileCount(options.db_paths[2].path)); | |
1927 | ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); | |
1928 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
1929 | ||
1930 | // (1, 4, 7) | |
1931 | GenerateNewFile(&rnd, &key_idx); | |
1932 | ASSERT_EQ("1,4,7", FilesPerLevel(0)); | |
1933 | ASSERT_EQ(7, GetSstFileCount(options.db_paths[2].path)); | |
1934 | ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); | |
1935 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
1936 | ||
1937 | // (1, 4, 8) | |
1938 | GenerateNewFile(&rnd, &key_idx); | |
1939 | ASSERT_EQ("1,4,8", FilesPerLevel(0)); | |
1940 | ASSERT_EQ(8, GetSstFileCount(options.db_paths[2].path)); | |
1941 | ASSERT_EQ(4, GetSstFileCount(options.db_paths[1].path)); | |
1942 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
1943 | ||
1944 | for (int i = 0; i < key_idx; i++) { | |
1945 | auto v = Get(Key(i)); | |
1946 | ASSERT_NE(v, "NOT_FOUND"); | |
1947 | ASSERT_TRUE(v.size() == 1 || v.size() == 990); | |
1948 | } | |
1949 | ||
1950 | Reopen(options); | |
1951 | ||
1952 | for (int i = 0; i < key_idx; i++) { | |
1953 | auto v = Get(Key(i)); | |
1954 | ASSERT_NE(v, "NOT_FOUND"); | |
1955 | ASSERT_TRUE(v.size() == 1 || v.size() == 990); | |
1956 | } | |
1957 | ||
1958 | Destroy(options); | |
1959 | } | |
1960 | ||
1961 | TEST_P(DBCompactionTestWithParam, LevelCompactionPathUse) { | |
1962 | Options options = CurrentOptions(); | |
1963 | options.db_paths.emplace_back(dbname_, 500 * 1024); | |
1964 | options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024); | |
1965 | options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024); | |
1966 | options.memtable_factory.reset( | |
1967 | new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1)); | |
1968 | options.compaction_style = kCompactionStyleLevel; | |
1969 | options.write_buffer_size = 110 << 10; // 110KB | |
1970 | options.arena_block_size = 4 << 10; | |
1971 | options.level0_file_num_compaction_trigger = 2; | |
1972 | options.num_levels = 4; | |
1973 | options.max_bytes_for_level_base = 400 * 1024; | |
1974 | options.max_subcompactions = max_subcompactions_; | |
1975 | // options = CurrentOptions(options); | |
1976 | ||
1977 | std::vector<std::string> filenames; | |
1978 | env_->GetChildren(options.db_paths[1].path, &filenames); | |
1979 | // Delete archival files. | |
1980 | for (size_t i = 0; i < filenames.size(); ++i) { | |
1981 | env_->DeleteFile(options.db_paths[1].path + "/" + filenames[i]); | |
1982 | } | |
1983 | env_->DeleteDir(options.db_paths[1].path); | |
1984 | Reopen(options); | |
1985 | ||
1986 | Random rnd(301); | |
1987 | int key_idx = 0; | |
1988 | ||
1989 | // Always gets compacted into 1 Level1 file, | |
1990 | // 0/1 Level 0 file | |
1991 | for (int num = 0; num < 3; num++) { | |
1992 | key_idx = 0; | |
1993 | GenerateNewFile(&rnd, &key_idx); | |
1994 | } | |
1995 | ||
1996 | key_idx = 0; | |
1997 | GenerateNewFile(&rnd, &key_idx); | |
1998 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); | |
1999 | ||
2000 | key_idx = 0; | |
2001 | GenerateNewFile(&rnd, &key_idx); | |
2002 | ASSERT_EQ("1,1", FilesPerLevel(0)); | |
2003 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); | |
2004 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
2005 | ||
2006 | key_idx = 0; | |
2007 | GenerateNewFile(&rnd, &key_idx); | |
2008 | ASSERT_EQ("0,1", FilesPerLevel(0)); | |
2009 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); | |
2010 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); | |
2011 | ASSERT_EQ(0, GetSstFileCount(dbname_)); | |
2012 | ||
2013 | key_idx = 0; | |
2014 | GenerateNewFile(&rnd, &key_idx); | |
2015 | ASSERT_EQ("1,1", FilesPerLevel(0)); | |
2016 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); | |
2017 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); | |
2018 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
2019 | ||
2020 | key_idx = 0; | |
2021 | GenerateNewFile(&rnd, &key_idx); | |
2022 | ASSERT_EQ("0,1", FilesPerLevel(0)); | |
2023 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); | |
2024 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); | |
2025 | ASSERT_EQ(0, GetSstFileCount(dbname_)); | |
2026 | ||
2027 | key_idx = 0; | |
2028 | GenerateNewFile(&rnd, &key_idx); | |
2029 | ASSERT_EQ("1,1", FilesPerLevel(0)); | |
2030 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); | |
2031 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); | |
2032 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
2033 | ||
2034 | key_idx = 0; | |
2035 | GenerateNewFile(&rnd, &key_idx); | |
2036 | ASSERT_EQ("0,1", FilesPerLevel(0)); | |
2037 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); | |
2038 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); | |
2039 | ASSERT_EQ(0, GetSstFileCount(dbname_)); | |
2040 | ||
2041 | key_idx = 0; | |
2042 | GenerateNewFile(&rnd, &key_idx); | |
2043 | ASSERT_EQ("1,1", FilesPerLevel(0)); | |
2044 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); | |
2045 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); | |
2046 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
2047 | ||
2048 | key_idx = 0; | |
2049 | GenerateNewFile(&rnd, &key_idx); | |
2050 | ASSERT_EQ("0,1", FilesPerLevel(0)); | |
2051 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); | |
2052 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); | |
2053 | ASSERT_EQ(0, GetSstFileCount(dbname_)); | |
2054 | ||
2055 | key_idx = 0; | |
2056 | GenerateNewFile(&rnd, &key_idx); | |
2057 | ASSERT_EQ("1,1", FilesPerLevel(0)); | |
2058 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[2].path)); | |
2059 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); | |
2060 | ASSERT_EQ(1, GetSstFileCount(dbname_)); | |
2061 | ||
2062 | for (int i = 0; i < key_idx; i++) { | |
2063 | auto v = Get(Key(i)); | |
2064 | ASSERT_NE(v, "NOT_FOUND"); | |
2065 | ASSERT_TRUE(v.size() == 1 || v.size() == 990); | |
2066 | } | |
2067 | ||
2068 | Reopen(options); | |
2069 | ||
2070 | for (int i = 0; i < key_idx; i++) { | |
2071 | auto v = Get(Key(i)); | |
2072 | ASSERT_NE(v, "NOT_FOUND"); | |
2073 | ASSERT_TRUE(v.size() == 1 || v.size() == 990); | |
2074 | } | |
2075 | ||
2076 | Destroy(options); | |
2077 | } | |
2078 | ||
11fdf7f2 TL |
2079 | TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) { |
2080 | Options options = CurrentOptions(); | |
2081 | options.db_paths.emplace_back(dbname_, 500 * 1024); | |
2082 | options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024); | |
2083 | options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024); | |
2084 | options.memtable_factory.reset( | |
2085 | new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1)); | |
2086 | options.compaction_style = kCompactionStyleLevel; | |
2087 | options.write_buffer_size = 110 << 10; // 110KB | |
2088 | options.arena_block_size = 4 << 10; | |
2089 | options.level0_file_num_compaction_trigger = 2; | |
2090 | options.num_levels = 4; | |
2091 | options.max_bytes_for_level_base = 400 * 1024; | |
2092 | options.max_subcompactions = max_subcompactions_; | |
2093 | ||
2094 | std::vector<Options> option_vector; | |
2095 | option_vector.emplace_back(options); | |
2096 | ColumnFamilyOptions cf_opt1(options), cf_opt2(options); | |
2097 | // Configure CF1 specific paths. | |
2098 | cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 500 * 1024); | |
2099 | cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 4 * 1024 * 1024); | |
2100 | cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 1024 * 1024 * 1024); | |
2101 | option_vector.emplace_back(DBOptions(options), cf_opt1); | |
2102 | CreateColumnFamilies({"one"},option_vector[1]); | |
2103 | ||
2104 | // Configura CF2 specific paths. | |
2105 | cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 500 * 1024); | |
2106 | cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 4 * 1024 * 1024); | |
2107 | cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 1024 * 1024 * 1024); | |
2108 | option_vector.emplace_back(DBOptions(options), cf_opt2); | |
2109 | CreateColumnFamilies({"two"},option_vector[2]); | |
2110 | ||
2111 | ReopenWithColumnFamilies({"default", "one", "two"}, option_vector); | |
2112 | ||
2113 | Random rnd(301); | |
2114 | int key_idx = 0; | |
2115 | int key_idx1 = 0; | |
2116 | int key_idx2 = 0; | |
2117 | ||
2118 | auto generate_file = [&]() { | |
2119 | GenerateNewFile(0, &rnd, &key_idx); | |
2120 | GenerateNewFile(1, &rnd, &key_idx1); | |
2121 | GenerateNewFile(2, &rnd, &key_idx2); | |
2122 | }; | |
2123 | ||
2124 | auto check_sstfilecount = [&](int path_id, int expected) { | |
2125 | ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path)); | |
2126 | ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path)); | |
2127 | ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path)); | |
2128 | }; | |
2129 | ||
2130 | auto check_filesperlevel = [&](const std::string& expected) { | |
2131 | ASSERT_EQ(expected, FilesPerLevel(0)); | |
2132 | ASSERT_EQ(expected, FilesPerLevel(1)); | |
2133 | ASSERT_EQ(expected, FilesPerLevel(2)); | |
2134 | }; | |
2135 | ||
2136 | auto check_getvalues = [&]() { | |
2137 | for (int i = 0; i < key_idx; i++) { | |
2138 | auto v = Get(0, Key(i)); | |
2139 | ASSERT_NE(v, "NOT_FOUND"); | |
2140 | ASSERT_TRUE(v.size() == 1 || v.size() == 990); | |
2141 | } | |
2142 | ||
2143 | for (int i = 0; i < key_idx1; i++) { | |
2144 | auto v = Get(1, Key(i)); | |
2145 | ASSERT_NE(v, "NOT_FOUND"); | |
2146 | ASSERT_TRUE(v.size() == 1 || v.size() == 990); | |
2147 | } | |
2148 | ||
2149 | for (int i = 0; i < key_idx2; i++) { | |
2150 | auto v = Get(2, Key(i)); | |
2151 | ASSERT_NE(v, "NOT_FOUND"); | |
2152 | ASSERT_TRUE(v.size() == 1 || v.size() == 990); | |
2153 | } | |
2154 | }; | |
2155 | ||
2156 | // Check that default column family uses db_paths. | |
2157 | // And Column family "one" uses cf_paths. | |
2158 | ||
2159 | // First three 110KB files are not going to second path. | |
2160 | // After that, (100K, 200K) | |
2161 | for (int num = 0; num < 3; num++) { | |
2162 | generate_file(); | |
2163 | } | |
2164 | ||
2165 | // Another 110KB triggers a compaction to 400K file to fill up first path | |
2166 | generate_file(); | |
2167 | check_sstfilecount(1, 3); | |
2168 | ||
2169 | // (1, 4) | |
2170 | generate_file(); | |
2171 | check_filesperlevel("1,4"); | |
2172 | check_sstfilecount(1, 4); | |
2173 | check_sstfilecount(0, 1); | |
2174 | ||
2175 | // (1, 4, 1) | |
2176 | generate_file(); | |
2177 | check_filesperlevel("1,4,1"); | |
2178 | check_sstfilecount(2, 1); | |
2179 | check_sstfilecount(1, 4); | |
2180 | check_sstfilecount(0, 1); | |
2181 | ||
2182 | // (1, 4, 2) | |
2183 | generate_file(); | |
2184 | check_filesperlevel("1,4,2"); | |
2185 | check_sstfilecount(2, 2); | |
2186 | check_sstfilecount(1, 4); | |
2187 | check_sstfilecount(0, 1); | |
2188 | ||
2189 | check_getvalues(); | |
2190 | ||
2191 | ReopenWithColumnFamilies({"default", "one", "two"}, option_vector); | |
2192 | ||
2193 | check_getvalues(); | |
2194 | ||
2195 | Destroy(options, true); | |
2196 | } | |
2197 | ||
7c673cae FG |
2198 | TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) { |
2199 | Random rnd(301); | |
2200 | int max_key_level_insert = 200; | |
2201 | int max_key_universal_insert = 600; | |
2202 | ||
2203 | // Stage 1: generate a db with level compaction | |
2204 | Options options = CurrentOptions(); | |
2205 | options.write_buffer_size = 110 << 10; // 110KB | |
2206 | options.arena_block_size = 4 << 10; | |
2207 | options.num_levels = 4; | |
2208 | options.level0_file_num_compaction_trigger = 3; | |
2209 | options.max_bytes_for_level_base = 500 << 10; // 500KB | |
2210 | options.max_bytes_for_level_multiplier = 1; | |
2211 | options.target_file_size_base = 200 << 10; // 200KB | |
2212 | options.target_file_size_multiplier = 1; | |
2213 | options.max_subcompactions = max_subcompactions_; | |
2214 | CreateAndReopenWithCF({"pikachu"}, options); | |
2215 | ||
2216 | for (int i = 0; i <= max_key_level_insert; i++) { | |
2217 | // each value is 10K | |
2218 | ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000))); | |
2219 | } | |
2220 | ASSERT_OK(Flush(1)); | |
2221 | dbfull()->TEST_WaitForCompact(); | |
2222 | ||
2223 | ASSERT_GT(TotalTableFiles(1, 4), 1); | |
2224 | int non_level0_num_files = 0; | |
2225 | for (int i = 1; i < options.num_levels; i++) { | |
2226 | non_level0_num_files += NumTableFilesAtLevel(i, 1); | |
2227 | } | |
2228 | ASSERT_GT(non_level0_num_files, 0); | |
2229 | ||
2230 | // Stage 2: reopen with universal compaction - should fail | |
2231 | options = CurrentOptions(); | |
2232 | options.compaction_style = kCompactionStyleUniversal; | |
2233 | options.num_levels = 1; | |
2234 | options = CurrentOptions(options); | |
2235 | Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options); | |
2236 | ASSERT_TRUE(s.IsInvalidArgument()); | |
2237 | ||
2238 | // Stage 3: compact into a single file and move the file to level 0 | |
2239 | options = CurrentOptions(); | |
2240 | options.disable_auto_compactions = true; | |
2241 | options.target_file_size_base = INT_MAX; | |
2242 | options.target_file_size_multiplier = 1; | |
2243 | options.max_bytes_for_level_base = INT_MAX; | |
2244 | options.max_bytes_for_level_multiplier = 1; | |
2245 | options.num_levels = 4; | |
2246 | options = CurrentOptions(options); | |
2247 | ReopenWithColumnFamilies({"default", "pikachu"}, options); | |
2248 | ||
2249 | CompactRangeOptions compact_options; | |
2250 | compact_options.change_level = true; | |
2251 | compact_options.target_level = 0; | |
2252 | compact_options.bottommost_level_compaction = | |
2253 | BottommostLevelCompaction::kForce; | |
2254 | compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; | |
2255 | dbfull()->CompactRange(compact_options, handles_[1], nullptr, nullptr); | |
2256 | ||
2257 | // Only 1 file in L0 | |
2258 | ASSERT_EQ("1", FilesPerLevel(1)); | |
2259 | ||
2260 | // Stage 4: re-open in universal compaction style and do some db operations | |
2261 | options = CurrentOptions(); | |
2262 | options.compaction_style = kCompactionStyleUniversal; | |
2263 | options.num_levels = 4; | |
2264 | options.write_buffer_size = 110 << 10; // 110KB | |
2265 | options.arena_block_size = 4 << 10; | |
2266 | options.level0_file_num_compaction_trigger = 3; | |
2267 | options = CurrentOptions(options); | |
2268 | ReopenWithColumnFamilies({"default", "pikachu"}, options); | |
2269 | ||
2270 | options.num_levels = 1; | |
2271 | ReopenWithColumnFamilies({"default", "pikachu"}, options); | |
2272 | ||
2273 | for (int i = max_key_level_insert / 2; i <= max_key_universal_insert; i++) { | |
2274 | ASSERT_OK(Put(1, Key(i), RandomString(&rnd, 10000))); | |
2275 | } | |
2276 | dbfull()->Flush(FlushOptions()); | |
2277 | ASSERT_OK(Flush(1)); | |
2278 | dbfull()->TEST_WaitForCompact(); | |
2279 | ||
2280 | for (int i = 1; i < options.num_levels; i++) { | |
2281 | ASSERT_EQ(NumTableFilesAtLevel(i, 1), 0); | |
2282 | } | |
2283 | ||
2284 | // verify keys inserted in both level compaction style and universal | |
2285 | // compaction style | |
2286 | std::string keys_in_db; | |
2287 | Iterator* iter = dbfull()->NewIterator(ReadOptions(), handles_[1]); | |
2288 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
2289 | keys_in_db.append(iter->key().ToString()); | |
2290 | keys_in_db.push_back(','); | |
2291 | } | |
2292 | delete iter; | |
2293 | ||
2294 | std::string expected_keys; | |
2295 | for (int i = 0; i <= max_key_universal_insert; i++) { | |
2296 | expected_keys.append(Key(i)); | |
2297 | expected_keys.push_back(','); | |
2298 | } | |
2299 | ||
2300 | ASSERT_EQ(keys_in_db, expected_keys); | |
2301 | } | |
2302 | ||
2303 | TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_a) { | |
2304 | do { | |
2305 | CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); | |
2306 | ASSERT_OK(Put(1, "b", "v")); | |
2307 | ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); | |
2308 | ASSERT_OK(Delete(1, "b")); | |
2309 | ASSERT_OK(Delete(1, "a")); | |
2310 | ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); | |
2311 | ASSERT_OK(Delete(1, "a")); | |
2312 | ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); | |
2313 | ASSERT_OK(Put(1, "a", "v")); | |
2314 | ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); | |
2315 | ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); | |
2316 | ASSERT_EQ("(a->v)", Contents(1)); | |
2317 | env_->SleepForMicroseconds(1000000); // Wait for compaction to finish | |
2318 | ASSERT_EQ("(a->v)", Contents(1)); | |
2319 | } while (ChangeCompactOptions()); | |
2320 | } | |
2321 | ||
2322 | TEST_F(DBCompactionTest, L0_CompactionBug_Issue44_b) { | |
2323 | do { | |
2324 | CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); | |
2325 | Put(1, "", ""); | |
2326 | ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); | |
2327 | Delete(1, "e"); | |
2328 | Put(1, "", ""); | |
2329 | ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); | |
2330 | Put(1, "c", "cv"); | |
2331 | ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); | |
2332 | Put(1, "", ""); | |
2333 | ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); | |
2334 | Put(1, "", ""); | |
2335 | env_->SleepForMicroseconds(1000000); // Wait for compaction to finish | |
2336 | ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); | |
2337 | Put(1, "d", "dv"); | |
2338 | ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); | |
2339 | Put(1, "", ""); | |
2340 | ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); | |
2341 | Delete(1, "d"); | |
2342 | Delete(1, "b"); | |
2343 | ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions()); | |
2344 | ASSERT_EQ("(->)(c->cv)", Contents(1)); | |
2345 | env_->SleepForMicroseconds(1000000); // Wait for compaction to finish | |
2346 | ASSERT_EQ("(->)(c->cv)", Contents(1)); | |
2347 | } while (ChangeCompactOptions()); | |
2348 | } | |
2349 | ||
11fdf7f2 TL |
2350 | TEST_F(DBCompactionTest, ManualAutoRace) { |
2351 | CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); | |
2352 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
2353 | {{"DBImpl::BGWorkCompaction", "DBCompactionTest::ManualAutoRace:1"}, | |
2354 | {"DBImpl::RunManualCompaction:WaitScheduled", | |
2355 | "BackgroundCallCompaction:0"}}); | |
2356 | ||
2357 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
2358 | ||
2359 | Put(1, "foo", ""); | |
2360 | Put(1, "bar", ""); | |
2361 | Flush(1); | |
2362 | Put(1, "foo", ""); | |
2363 | Put(1, "bar", ""); | |
2364 | // Generate four files in CF 0, which should trigger an auto compaction | |
2365 | Put("foo", ""); | |
2366 | Put("bar", ""); | |
2367 | Flush(); | |
2368 | Put("foo", ""); | |
2369 | Put("bar", ""); | |
2370 | Flush(); | |
2371 | Put("foo", ""); | |
2372 | Put("bar", ""); | |
2373 | Flush(); | |
2374 | Put("foo", ""); | |
2375 | Put("bar", ""); | |
2376 | Flush(); | |
2377 | ||
2378 | // The auto compaction is scheduled but waited until here | |
2379 | TEST_SYNC_POINT("DBCompactionTest::ManualAutoRace:1"); | |
2380 | // The auto compaction will wait until the manual compaction is registerd | |
2381 | // before processing so that it will be cancelled. | |
2382 | dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr); | |
2383 | ASSERT_EQ("0,1", FilesPerLevel(1)); | |
2384 | ||
2385 | // Eventually the cancelled compaction will be rescheduled and executed. | |
2386 | dbfull()->TEST_WaitForCompact(); | |
2387 | ASSERT_EQ("0,1", FilesPerLevel(0)); | |
2388 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
2389 | } | |
2390 | ||
7c673cae FG |
2391 | TEST_P(DBCompactionTestWithParam, ManualCompaction) { |
2392 | Options options = CurrentOptions(); | |
2393 | options.max_subcompactions = max_subcompactions_; | |
2394 | options.statistics = rocksdb::CreateDBStatistics(); | |
2395 | CreateAndReopenWithCF({"pikachu"}, options); | |
2396 | ||
2397 | // iter - 0 with 7 levels | |
2398 | // iter - 1 with 3 levels | |
2399 | for (int iter = 0; iter < 2; ++iter) { | |
2400 | MakeTables(3, "p", "q", 1); | |
2401 | ASSERT_EQ("1,1,1", FilesPerLevel(1)); | |
2402 | ||
2403 | // Compaction range falls before files | |
2404 | Compact(1, "", "c"); | |
2405 | ASSERT_EQ("1,1,1", FilesPerLevel(1)); | |
2406 | ||
2407 | // Compaction range falls after files | |
2408 | Compact(1, "r", "z"); | |
2409 | ASSERT_EQ("1,1,1", FilesPerLevel(1)); | |
2410 | ||
2411 | // Compaction range overlaps files | |
2412 | Compact(1, "p1", "p9"); | |
2413 | ASSERT_EQ("0,0,1", FilesPerLevel(1)); | |
2414 | ||
2415 | // Populate a different range | |
2416 | MakeTables(3, "c", "e", 1); | |
2417 | ASSERT_EQ("1,1,2", FilesPerLevel(1)); | |
2418 | ||
2419 | // Compact just the new range | |
2420 | Compact(1, "b", "f"); | |
2421 | ASSERT_EQ("0,0,2", FilesPerLevel(1)); | |
2422 | ||
2423 | // Compact all | |
2424 | MakeTables(1, "a", "z", 1); | |
2425 | ASSERT_EQ("1,0,2", FilesPerLevel(1)); | |
2426 | ||
2427 | uint64_t prev_block_cache_add = | |
2428 | options.statistics->getTickerCount(BLOCK_CACHE_ADD); | |
2429 | CompactRangeOptions cro; | |
2430 | cro.exclusive_manual_compaction = exclusive_manual_compaction_; | |
2431 | db_->CompactRange(cro, handles_[1], nullptr, nullptr); | |
2432 | // Verify manual compaction doesn't fill block cache | |
2433 | ASSERT_EQ(prev_block_cache_add, | |
2434 | options.statistics->getTickerCount(BLOCK_CACHE_ADD)); | |
2435 | ||
2436 | ASSERT_EQ("0,0,1", FilesPerLevel(1)); | |
2437 | ||
2438 | if (iter == 0) { | |
2439 | options = CurrentOptions(); | |
7c673cae FG |
2440 | options.num_levels = 3; |
2441 | options.create_if_missing = true; | |
2442 | options.statistics = rocksdb::CreateDBStatistics(); | |
2443 | DestroyAndReopen(options); | |
2444 | CreateAndReopenWithCF({"pikachu"}, options); | |
2445 | } | |
2446 | } | |
2447 | } | |
2448 | ||
2449 | ||
2450 | TEST_P(DBCompactionTestWithParam, ManualLevelCompactionOutputPathId) { | |
2451 | Options options = CurrentOptions(); | |
2452 | options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760); | |
2453 | options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760); | |
2454 | options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760); | |
2455 | options.max_subcompactions = max_subcompactions_; | |
2456 | CreateAndReopenWithCF({"pikachu"}, options); | |
2457 | ||
2458 | // iter - 0 with 7 levels | |
2459 | // iter - 1 with 3 levels | |
2460 | for (int iter = 0; iter < 2; ++iter) { | |
2461 | for (int i = 0; i < 3; ++i) { | |
2462 | ASSERT_OK(Put(1, "p", "begin")); | |
2463 | ASSERT_OK(Put(1, "q", "end")); | |
2464 | ASSERT_OK(Flush(1)); | |
2465 | } | |
2466 | ASSERT_EQ("3", FilesPerLevel(1)); | |
2467 | ASSERT_EQ(3, GetSstFileCount(options.db_paths[0].path)); | |
2468 | ASSERT_EQ(0, GetSstFileCount(dbname_)); | |
2469 | ||
2470 | // Compaction range falls before files | |
2471 | Compact(1, "", "c"); | |
2472 | ASSERT_EQ("3", FilesPerLevel(1)); | |
2473 | ||
2474 | // Compaction range falls after files | |
2475 | Compact(1, "r", "z"); | |
2476 | ASSERT_EQ("3", FilesPerLevel(1)); | |
2477 | ||
2478 | // Compaction range overlaps files | |
2479 | Compact(1, "p1", "p9", 1); | |
11fdf7f2 | 2480 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
7c673cae FG |
2481 | ASSERT_EQ("0,1", FilesPerLevel(1)); |
2482 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); | |
2483 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); | |
2484 | ASSERT_EQ(0, GetSstFileCount(dbname_)); | |
2485 | ||
2486 | // Populate a different range | |
2487 | for (int i = 0; i < 3; ++i) { | |
2488 | ASSERT_OK(Put(1, "c", "begin")); | |
2489 | ASSERT_OK(Put(1, "e", "end")); | |
2490 | ASSERT_OK(Flush(1)); | |
2491 | } | |
2492 | ASSERT_EQ("3,1", FilesPerLevel(1)); | |
2493 | ||
2494 | // Compact just the new range | |
2495 | Compact(1, "b", "f", 1); | |
11fdf7f2 | 2496 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
7c673cae FG |
2497 | ASSERT_EQ("0,2", FilesPerLevel(1)); |
2498 | ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); | |
2499 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); | |
2500 | ASSERT_EQ(0, GetSstFileCount(dbname_)); | |
2501 | ||
2502 | // Compact all | |
2503 | ASSERT_OK(Put(1, "a", "begin")); | |
2504 | ASSERT_OK(Put(1, "z", "end")); | |
2505 | ASSERT_OK(Flush(1)); | |
2506 | ASSERT_EQ("1,2", FilesPerLevel(1)); | |
2507 | ASSERT_EQ(2, GetSstFileCount(options.db_paths[1].path)); | |
2508 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[0].path)); | |
2509 | CompactRangeOptions compact_options; | |
2510 | compact_options.target_path_id = 1; | |
2511 | compact_options.exclusive_manual_compaction = exclusive_manual_compaction_; | |
2512 | db_->CompactRange(compact_options, handles_[1], nullptr, nullptr); | |
11fdf7f2 | 2513 | ASSERT_OK(dbfull()->TEST_WaitForCompact()); |
7c673cae FG |
2514 | |
2515 | ASSERT_EQ("0,1", FilesPerLevel(1)); | |
2516 | ASSERT_EQ(1, GetSstFileCount(options.db_paths[1].path)); | |
2517 | ASSERT_EQ(0, GetSstFileCount(options.db_paths[0].path)); | |
2518 | ASSERT_EQ(0, GetSstFileCount(dbname_)); | |
2519 | ||
2520 | if (iter == 0) { | |
2521 | DestroyAndReopen(options); | |
2522 | options = CurrentOptions(); | |
2523 | options.db_paths.emplace_back(dbname_ + "_2", 2 * 10485760); | |
2524 | options.db_paths.emplace_back(dbname_ + "_3", 100 * 10485760); | |
2525 | options.db_paths.emplace_back(dbname_ + "_4", 120 * 10485760); | |
2526 | options.max_background_flushes = 1; | |
2527 | options.num_levels = 3; | |
2528 | options.create_if_missing = true; | |
2529 | CreateAndReopenWithCF({"pikachu"}, options); | |
2530 | } | |
2531 | } | |
2532 | } | |
2533 | ||
2534 | TEST_F(DBCompactionTest, FilesDeletedAfterCompaction) { | |
2535 | do { | |
2536 | CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); | |
2537 | ASSERT_OK(Put(1, "foo", "v2")); | |
2538 | Compact(1, "a", "z"); | |
2539 | const size_t num_files = CountLiveFiles(); | |
2540 | for (int i = 0; i < 10; i++) { | |
2541 | ASSERT_OK(Put(1, "foo", "v2")); | |
2542 | Compact(1, "a", "z"); | |
2543 | } | |
2544 | ASSERT_EQ(CountLiveFiles(), num_files); | |
2545 | } while (ChangeCompactOptions()); | |
2546 | } | |
2547 | ||
2548 | // Check level comapction with compact files | |
2549 | TEST_P(DBCompactionTestWithParam, DISABLED_CompactFilesOnLevelCompaction) { | |
2550 | const int kTestKeySize = 16; | |
2551 | const int kTestValueSize = 984; | |
2552 | const int kEntrySize = kTestKeySize + kTestValueSize; | |
2553 | const int kEntriesPerBuffer = 100; | |
2554 | Options options; | |
2555 | options.create_if_missing = true; | |
2556 | options.write_buffer_size = kEntrySize * kEntriesPerBuffer; | |
2557 | options.compaction_style = kCompactionStyleLevel; | |
2558 | options.target_file_size_base = options.write_buffer_size; | |
2559 | options.max_bytes_for_level_base = options.target_file_size_base * 2; | |
2560 | options.level0_stop_writes_trigger = 2; | |
2561 | options.max_bytes_for_level_multiplier = 2; | |
2562 | options.compression = kNoCompression; | |
2563 | options.max_subcompactions = max_subcompactions_; | |
2564 | options = CurrentOptions(options); | |
2565 | CreateAndReopenWithCF({"pikachu"}, options); | |
2566 | ||
2567 | Random rnd(301); | |
2568 | for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) { | |
2569 | ASSERT_OK(Put(1, ToString(key), RandomString(&rnd, kTestValueSize))); | |
2570 | } | |
2571 | dbfull()->TEST_WaitForFlushMemTable(handles_[1]); | |
2572 | dbfull()->TEST_WaitForCompact(); | |
2573 | ||
2574 | ColumnFamilyMetaData cf_meta; | |
2575 | dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta); | |
2576 | int output_level = static_cast<int>(cf_meta.levels.size()) - 1; | |
2577 | for (int file_picked = 5; file_picked > 0; --file_picked) { | |
2578 | std::set<std::string> overlapping_file_names; | |
2579 | std::vector<std::string> compaction_input_file_names; | |
2580 | for (int f = 0; f < file_picked; ++f) { | |
2581 | int level = 0; | |
2582 | auto file_meta = PickFileRandomly(cf_meta, &rnd, &level); | |
2583 | compaction_input_file_names.push_back(file_meta->name); | |
2584 | GetOverlappingFileNumbersForLevelCompaction( | |
2585 | cf_meta, options.comparator, level, output_level, | |
2586 | file_meta, &overlapping_file_names); | |
2587 | } | |
2588 | ||
2589 | ASSERT_OK(dbfull()->CompactFiles( | |
2590 | CompactionOptions(), handles_[1], | |
2591 | compaction_input_file_names, | |
2592 | output_level)); | |
2593 | ||
2594 | // Make sure all overlapping files do not exist after compaction | |
2595 | dbfull()->GetColumnFamilyMetaData(handles_[1], &cf_meta); | |
2596 | VerifyCompactionResult(cf_meta, overlapping_file_names); | |
2597 | } | |
2598 | ||
2599 | // make sure all key-values are still there. | |
2600 | for (int key = 64 * kEntriesPerBuffer; key >= 0; --key) { | |
2601 | ASSERT_NE(Get(1, ToString(key)), "NOT_FOUND"); | |
2602 | } | |
2603 | } | |
2604 | ||
2605 | TEST_P(DBCompactionTestWithParam, PartialCompactionFailure) { | |
2606 | Options options; | |
2607 | const int kKeySize = 16; | |
2608 | const int kKvSize = 1000; | |
2609 | const int kKeysPerBuffer = 100; | |
2610 | const int kNumL1Files = 5; | |
2611 | options.create_if_missing = true; | |
2612 | options.write_buffer_size = kKeysPerBuffer * kKvSize; | |
2613 | options.max_write_buffer_number = 2; | |
2614 | options.target_file_size_base = | |
2615 | options.write_buffer_size * | |
2616 | (options.max_write_buffer_number - 1); | |
2617 | options.level0_file_num_compaction_trigger = kNumL1Files; | |
2618 | options.max_bytes_for_level_base = | |
2619 | options.level0_file_num_compaction_trigger * | |
2620 | options.target_file_size_base; | |
2621 | options.max_bytes_for_level_multiplier = 2; | |
2622 | options.compression = kNoCompression; | |
2623 | options.max_subcompactions = max_subcompactions_; | |
2624 | ||
2625 | env_->SetBackgroundThreads(1, Env::HIGH); | |
2626 | env_->SetBackgroundThreads(1, Env::LOW); | |
2627 | // stop the compaction thread until we simulate the file creation failure. | |
2628 | test::SleepingBackgroundTask sleeping_task_low; | |
2629 | env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, | |
2630 | Env::Priority::LOW); | |
2631 | ||
2632 | options.env = env_; | |
2633 | ||
2634 | DestroyAndReopen(options); | |
2635 | ||
2636 | const int kNumInsertedKeys = | |
2637 | options.level0_file_num_compaction_trigger * | |
2638 | (options.max_write_buffer_number - 1) * | |
2639 | kKeysPerBuffer; | |
2640 | ||
2641 | Random rnd(301); | |
2642 | std::vector<std::string> keys; | |
2643 | std::vector<std::string> values; | |
2644 | for (int k = 0; k < kNumInsertedKeys; ++k) { | |
2645 | keys.emplace_back(RandomString(&rnd, kKeySize)); | |
2646 | values.emplace_back(RandomString(&rnd, kKvSize - kKeySize)); | |
2647 | ASSERT_OK(Put(Slice(keys[k]), Slice(values[k]))); | |
2648 | dbfull()->TEST_WaitForFlushMemTable(); | |
2649 | } | |
2650 | ||
2651 | dbfull()->TEST_FlushMemTable(true); | |
2652 | // Make sure the number of L0 files can trigger compaction. | |
2653 | ASSERT_GE(NumTableFilesAtLevel(0), | |
2654 | options.level0_file_num_compaction_trigger); | |
2655 | ||
2656 | auto previous_num_level0_files = NumTableFilesAtLevel(0); | |
2657 | ||
2658 | // Fail the first file creation. | |
2659 | env_->non_writable_count_ = 1; | |
2660 | sleeping_task_low.WakeUp(); | |
2661 | sleeping_task_low.WaitUntilDone(); | |
2662 | ||
2663 | // Expect compaction to fail here as one file will fail its | |
2664 | // creation. | |
2665 | ASSERT_TRUE(!dbfull()->TEST_WaitForCompact().ok()); | |
2666 | ||
2667 | // Verify L0 -> L1 compaction does fail. | |
2668 | ASSERT_EQ(NumTableFilesAtLevel(1), 0); | |
2669 | ||
2670 | // Verify all L0 files are still there. | |
2671 | ASSERT_EQ(NumTableFilesAtLevel(0), previous_num_level0_files); | |
2672 | ||
2673 | // All key-values must exist after compaction fails. | |
2674 | for (int k = 0; k < kNumInsertedKeys; ++k) { | |
2675 | ASSERT_EQ(values[k], Get(keys[k])); | |
2676 | } | |
2677 | ||
2678 | env_->non_writable_count_ = 0; | |
2679 | ||
2680 | // Make sure RocksDB will not get into corrupted state. | |
2681 | Reopen(options); | |
2682 | ||
2683 | // Verify again after reopen. | |
2684 | for (int k = 0; k < kNumInsertedKeys; ++k) { | |
2685 | ASSERT_EQ(values[k], Get(keys[k])); | |
2686 | } | |
2687 | } | |
2688 | ||
2689 | TEST_P(DBCompactionTestWithParam, DeleteMovedFileAfterCompaction) { | |
2690 | // iter 1 -- delete_obsolete_files_period_micros == 0 | |
2691 | for (int iter = 0; iter < 2; ++iter) { | |
2692 | // This test triggers move compaction and verifies that the file is not | |
2693 | // deleted when it's part of move compaction | |
2694 | Options options = CurrentOptions(); | |
2695 | options.env = env_; | |
2696 | if (iter == 1) { | |
2697 | options.delete_obsolete_files_period_micros = 0; | |
2698 | } | |
2699 | options.create_if_missing = true; | |
2700 | options.level0_file_num_compaction_trigger = | |
2701 | 2; // trigger compaction when we have 2 files | |
2702 | OnFileDeletionListener* listener = new OnFileDeletionListener(); | |
2703 | options.listeners.emplace_back(listener); | |
2704 | options.max_subcompactions = max_subcompactions_; | |
2705 | DestroyAndReopen(options); | |
2706 | ||
2707 | Random rnd(301); | |
2708 | // Create two 1MB sst files | |
2709 | for (int i = 0; i < 2; ++i) { | |
2710 | // Create 1MB sst file | |
2711 | for (int j = 0; j < 100; ++j) { | |
2712 | ASSERT_OK(Put(Key(i * 50 + j), RandomString(&rnd, 10 * 1024))); | |
2713 | } | |
2714 | ASSERT_OK(Flush()); | |
2715 | } | |
2716 | // this should execute L0->L1 | |
2717 | dbfull()->TEST_WaitForCompact(); | |
2718 | ASSERT_EQ("0,1", FilesPerLevel(0)); | |
2719 | ||
2720 | // block compactions | |
2721 | test::SleepingBackgroundTask sleeping_task; | |
2722 | env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, | |
2723 | Env::Priority::LOW); | |
2724 | ||
2725 | options.max_bytes_for_level_base = 1024 * 1024; // 1 MB | |
2726 | Reopen(options); | |
2727 | std::unique_ptr<Iterator> iterator(db_->NewIterator(ReadOptions())); | |
2728 | ASSERT_EQ("0,1", FilesPerLevel(0)); | |
2729 | // let compactions go | |
2730 | sleeping_task.WakeUp(); | |
2731 | sleeping_task.WaitUntilDone(); | |
2732 | ||
2733 | // this should execute L1->L2 (move) | |
2734 | dbfull()->TEST_WaitForCompact(); | |
2735 | ||
2736 | ASSERT_EQ("0,0,1", FilesPerLevel(0)); | |
2737 | ||
2738 | std::vector<LiveFileMetaData> metadata; | |
2739 | db_->GetLiveFilesMetaData(&metadata); | |
2740 | ASSERT_EQ(metadata.size(), 1U); | |
2741 | auto moved_file_name = metadata[0].name; | |
2742 | ||
2743 | // Create two more 1MB sst files | |
2744 | for (int i = 0; i < 2; ++i) { | |
2745 | // Create 1MB sst file | |
2746 | for (int j = 0; j < 100; ++j) { | |
2747 | ASSERT_OK(Put(Key(i * 50 + j + 100), RandomString(&rnd, 10 * 1024))); | |
2748 | } | |
2749 | ASSERT_OK(Flush()); | |
2750 | } | |
2751 | // this should execute both L0->L1 and L1->L2 (merge with previous file) | |
2752 | dbfull()->TEST_WaitForCompact(); | |
2753 | ||
2754 | ASSERT_EQ("0,0,2", FilesPerLevel(0)); | |
2755 | ||
2756 | // iterator is holding the file | |
2757 | ASSERT_OK(env_->FileExists(dbname_ + moved_file_name)); | |
2758 | ||
2759 | listener->SetExpectedFileName(dbname_ + moved_file_name); | |
2760 | iterator.reset(); | |
2761 | ||
2762 | // this file should have been compacted away | |
2763 | ASSERT_NOK(env_->FileExists(dbname_ + moved_file_name)); | |
2764 | listener->VerifyMatchedCount(1); | |
2765 | } | |
2766 | } | |
2767 | ||
2768 | TEST_P(DBCompactionTestWithParam, CompressLevelCompaction) { | |
2769 | if (!Zlib_Supported()) { | |
2770 | return; | |
2771 | } | |
2772 | Options options = CurrentOptions(); | |
2773 | options.memtable_factory.reset( | |
2774 | new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1)); | |
2775 | options.compaction_style = kCompactionStyleLevel; | |
2776 | options.write_buffer_size = 110 << 10; // 110KB | |
2777 | options.arena_block_size = 4 << 10; | |
2778 | options.level0_file_num_compaction_trigger = 2; | |
2779 | options.num_levels = 4; | |
2780 | options.max_bytes_for_level_base = 400 * 1024; | |
2781 | options.max_subcompactions = max_subcompactions_; | |
2782 | // First two levels have no compression, so that a trivial move between | |
2783 | // them will be allowed. Level 2 has Zlib compression so that a trivial | |
2784 | // move to level 3 will not be allowed | |
2785 | options.compression_per_level = {kNoCompression, kNoCompression, | |
2786 | kZlibCompression}; | |
2787 | int matches = 0, didnt_match = 0, trivial_move = 0, non_trivial = 0; | |
2788 | ||
2789 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
2790 | "Compaction::InputCompressionMatchesOutput:Matches", | |
11fdf7f2 | 2791 | [&](void* /*arg*/) { matches++; }); |
7c673cae FG |
2792 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
2793 | "Compaction::InputCompressionMatchesOutput:DidntMatch", | |
11fdf7f2 | 2794 | [&](void* /*arg*/) { didnt_match++; }); |
7c673cae FG |
2795 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
2796 | "DBImpl::BackgroundCompaction:NonTrivial", | |
11fdf7f2 | 2797 | [&](void* /*arg*/) { non_trivial++; }); |
7c673cae FG |
2798 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
2799 | "DBImpl::BackgroundCompaction:TrivialMove", | |
11fdf7f2 | 2800 | [&](void* /*arg*/) { trivial_move++; }); |
7c673cae FG |
2801 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
2802 | ||
2803 | Reopen(options); | |
2804 | ||
2805 | Random rnd(301); | |
2806 | int key_idx = 0; | |
2807 | ||
2808 | // First three 110KB files are going to level 0 | |
2809 | // After that, (100K, 200K) | |
2810 | for (int num = 0; num < 3; num++) { | |
2811 | GenerateNewFile(&rnd, &key_idx); | |
2812 | } | |
2813 | ||
2814 | // Another 110KB triggers a compaction to 400K file to fill up level 0 | |
2815 | GenerateNewFile(&rnd, &key_idx); | |
2816 | ASSERT_EQ(4, GetSstFileCount(dbname_)); | |
2817 | ||
2818 | // (1, 4) | |
2819 | GenerateNewFile(&rnd, &key_idx); | |
2820 | ASSERT_EQ("1,4", FilesPerLevel(0)); | |
2821 | ||
2822 | // (1, 4, 1) | |
2823 | GenerateNewFile(&rnd, &key_idx); | |
2824 | ASSERT_EQ("1,4,1", FilesPerLevel(0)); | |
2825 | ||
2826 | // (1, 4, 2) | |
2827 | GenerateNewFile(&rnd, &key_idx); | |
2828 | ASSERT_EQ("1,4,2", FilesPerLevel(0)); | |
2829 | ||
2830 | // (1, 4, 3) | |
2831 | GenerateNewFile(&rnd, &key_idx); | |
2832 | ASSERT_EQ("1,4,3", FilesPerLevel(0)); | |
2833 | ||
2834 | // (1, 4, 4) | |
2835 | GenerateNewFile(&rnd, &key_idx); | |
2836 | ASSERT_EQ("1,4,4", FilesPerLevel(0)); | |
2837 | ||
2838 | // (1, 4, 5) | |
2839 | GenerateNewFile(&rnd, &key_idx); | |
2840 | ASSERT_EQ("1,4,5", FilesPerLevel(0)); | |
2841 | ||
2842 | // (1, 4, 6) | |
2843 | GenerateNewFile(&rnd, &key_idx); | |
2844 | ASSERT_EQ("1,4,6", FilesPerLevel(0)); | |
2845 | ||
2846 | // (1, 4, 7) | |
2847 | GenerateNewFile(&rnd, &key_idx); | |
2848 | ASSERT_EQ("1,4,7", FilesPerLevel(0)); | |
2849 | ||
2850 | // (1, 4, 8) | |
2851 | GenerateNewFile(&rnd, &key_idx); | |
2852 | ASSERT_EQ("1,4,8", FilesPerLevel(0)); | |
2853 | ||
2854 | ASSERT_EQ(matches, 12); | |
2855 | // Currently, the test relies on the number of calls to | |
2856 | // InputCompressionMatchesOutput() per compaction. | |
2857 | const int kCallsToInputCompressionMatch = 2; | |
2858 | ASSERT_EQ(didnt_match, 8 * kCallsToInputCompressionMatch); | |
2859 | ASSERT_EQ(trivial_move, 12); | |
2860 | ASSERT_EQ(non_trivial, 8); | |
2861 | ||
2862 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
2863 | ||
2864 | for (int i = 0; i < key_idx; i++) { | |
2865 | auto v = Get(Key(i)); | |
2866 | ASSERT_NE(v, "NOT_FOUND"); | |
2867 | ASSERT_TRUE(v.size() == 1 || v.size() == 990); | |
2868 | } | |
2869 | ||
2870 | Reopen(options); | |
2871 | ||
2872 | for (int i = 0; i < key_idx; i++) { | |
2873 | auto v = Get(Key(i)); | |
2874 | ASSERT_NE(v, "NOT_FOUND"); | |
2875 | ASSERT_TRUE(v.size() == 1 || v.size() == 990); | |
2876 | } | |
2877 | ||
2878 | Destroy(options); | |
2879 | } | |
2880 | ||
2881 | TEST_F(DBCompactionTest, SanitizeCompactionOptionsTest) { | |
2882 | Options options = CurrentOptions(); | |
2883 | options.max_background_compactions = 5; | |
2884 | options.soft_pending_compaction_bytes_limit = 0; | |
2885 | options.hard_pending_compaction_bytes_limit = 100; | |
2886 | options.create_if_missing = true; | |
2887 | DestroyAndReopen(options); | |
7c673cae FG |
2888 | ASSERT_EQ(100, db_->GetOptions().soft_pending_compaction_bytes_limit); |
2889 | ||
7c673cae FG |
2890 | options.max_background_compactions = 3; |
2891 | options.soft_pending_compaction_bytes_limit = 200; | |
2892 | options.hard_pending_compaction_bytes_limit = 150; | |
2893 | DestroyAndReopen(options); | |
7c673cae FG |
2894 | ASSERT_EQ(150, db_->GetOptions().soft_pending_compaction_bytes_limit); |
2895 | } | |
2896 | ||
2897 | // This tests for a bug that could cause two level0 compactions running | |
2898 | // concurrently | |
2899 | // TODO(aekmekji): Make sure that the reason this fails when run with | |
2900 | // max_subcompactions > 1 is not a correctness issue but just inherent to | |
2901 | // running parallel L0-L1 compactions | |
2902 | TEST_F(DBCompactionTest, SuggestCompactRangeNoTwoLevel0Compactions) { | |
2903 | Options options = CurrentOptions(); | |
2904 | options.compaction_style = kCompactionStyleLevel; | |
2905 | options.write_buffer_size = 110 << 10; | |
2906 | options.arena_block_size = 4 << 10; | |
2907 | options.level0_file_num_compaction_trigger = 4; | |
2908 | options.num_levels = 4; | |
2909 | options.compression = kNoCompression; | |
2910 | options.max_bytes_for_level_base = 450 << 10; | |
2911 | options.target_file_size_base = 98 << 10; | |
2912 | options.max_write_buffer_number = 2; | |
2913 | options.max_background_compactions = 2; | |
2914 | ||
2915 | DestroyAndReopen(options); | |
2916 | ||
2917 | // fill up the DB | |
2918 | Random rnd(301); | |
2919 | for (int num = 0; num < 10; num++) { | |
2920 | GenerateNewRandomFile(&rnd); | |
2921 | } | |
2922 | db_->CompactRange(CompactRangeOptions(), nullptr, nullptr); | |
2923 | ||
2924 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
2925 | {{"CompactionJob::Run():Start", | |
2926 | "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1"}, | |
2927 | {"DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2", | |
2928 | "CompactionJob::Run():End"}}); | |
2929 | ||
2930 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
2931 | ||
2932 | // trigger L0 compaction | |
2933 | for (int num = 0; num < options.level0_file_num_compaction_trigger + 1; | |
2934 | num++) { | |
2935 | GenerateNewRandomFile(&rnd, /* nowait */ true); | |
2936 | ASSERT_OK(Flush()); | |
2937 | } | |
2938 | ||
2939 | TEST_SYNC_POINT( | |
2940 | "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:1"); | |
2941 | ||
2942 | GenerateNewRandomFile(&rnd, /* nowait */ true); | |
2943 | dbfull()->TEST_WaitForFlushMemTable(); | |
2944 | ASSERT_OK(experimental::SuggestCompactRange(db_, nullptr, nullptr)); | |
2945 | for (int num = 0; num < options.level0_file_num_compaction_trigger + 1; | |
2946 | num++) { | |
2947 | GenerateNewRandomFile(&rnd, /* nowait */ true); | |
2948 | ASSERT_OK(Flush()); | |
2949 | } | |
2950 | ||
2951 | TEST_SYNC_POINT( | |
2952 | "DBCompactionTest::SuggestCompactRangeNoTwoLevel0Compactions:2"); | |
2953 | dbfull()->TEST_WaitForCompact(); | |
2954 | } | |
2955 | ||
2956 | ||
2957 | TEST_P(DBCompactionTestWithParam, ForceBottommostLevelCompaction) { | |
2958 | int32_t trivial_move = 0; | |
2959 | int32_t non_trivial_move = 0; | |
2960 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
2961 | "DBImpl::BackgroundCompaction:TrivialMove", | |
11fdf7f2 | 2962 | [&](void* /*arg*/) { trivial_move++; }); |
7c673cae FG |
2963 | rocksdb::SyncPoint::GetInstance()->SetCallBack( |
2964 | "DBImpl::BackgroundCompaction:NonTrivial", | |
11fdf7f2 | 2965 | [&](void* /*arg*/) { non_trivial_move++; }); |
7c673cae FG |
2966 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); |
2967 | ||
2968 | Options options = CurrentOptions(); | |
11fdf7f2 | 2969 | options.target_file_size_base = 100000000; |
7c673cae FG |
2970 | options.write_buffer_size = 100000000; |
2971 | options.max_subcompactions = max_subcompactions_; | |
2972 | DestroyAndReopen(options); | |
2973 | ||
2974 | int32_t value_size = 10 * 1024; // 10 KB | |
2975 | ||
2976 | Random rnd(301); | |
2977 | std::vector<std::string> values; | |
2978 | // File with keys [ 0 => 99 ] | |
2979 | for (int i = 0; i < 100; i++) { | |
2980 | values.push_back(RandomString(&rnd, value_size)); | |
2981 | ASSERT_OK(Put(Key(i), values[i])); | |
2982 | } | |
2983 | ASSERT_OK(Flush()); | |
2984 | ||
2985 | ASSERT_EQ("1", FilesPerLevel(0)); | |
2986 | // Compaction will do L0=>L1 (trivial move) then move L1 files to L3 | |
2987 | CompactRangeOptions compact_options; | |
2988 | compact_options.change_level = true; | |
2989 | compact_options.target_level = 3; | |
2990 | ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); | |
2991 | ASSERT_EQ("0,0,0,1", FilesPerLevel(0)); | |
2992 | ASSERT_EQ(trivial_move, 1); | |
2993 | ASSERT_EQ(non_trivial_move, 0); | |
2994 | ||
2995 | // File with keys [ 100 => 199 ] | |
2996 | for (int i = 100; i < 200; i++) { | |
2997 | values.push_back(RandomString(&rnd, value_size)); | |
2998 | ASSERT_OK(Put(Key(i), values[i])); | |
2999 | } | |
3000 | ASSERT_OK(Flush()); | |
3001 | ||
3002 | ASSERT_EQ("1,0,0,1", FilesPerLevel(0)); | |
3003 | // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves) | |
3004 | // then compacte the bottommost level L3=>L3 (non trivial move) | |
3005 | compact_options = CompactRangeOptions(); | |
3006 | compact_options.bottommost_level_compaction = | |
3007 | BottommostLevelCompaction::kForce; | |
3008 | ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); | |
3009 | ASSERT_EQ("0,0,0,1", FilesPerLevel(0)); | |
3010 | ASSERT_EQ(trivial_move, 4); | |
3011 | ASSERT_EQ(non_trivial_move, 1); | |
3012 | ||
3013 | // File with keys [ 200 => 299 ] | |
3014 | for (int i = 200; i < 300; i++) { | |
3015 | values.push_back(RandomString(&rnd, value_size)); | |
3016 | ASSERT_OK(Put(Key(i), values[i])); | |
3017 | } | |
3018 | ASSERT_OK(Flush()); | |
3019 | ||
3020 | ASSERT_EQ("1,0,0,1", FilesPerLevel(0)); | |
3021 | trivial_move = 0; | |
3022 | non_trivial_move = 0; | |
3023 | compact_options = CompactRangeOptions(); | |
3024 | compact_options.bottommost_level_compaction = | |
3025 | BottommostLevelCompaction::kSkip; | |
3026 | // Compaction will do L0=>L1 L1=>L2 L2=>L3 (3 trivial moves) | |
3027 | // and will skip bottommost level compaction | |
3028 | ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr)); | |
3029 | ASSERT_EQ("0,0,0,2", FilesPerLevel(0)); | |
3030 | ASSERT_EQ(trivial_move, 3); | |
3031 | ASSERT_EQ(non_trivial_move, 0); | |
3032 | ||
3033 | for (int i = 0; i < 300; i++) { | |
3034 | ASSERT_EQ(Get(Key(i)), values[i]); | |
3035 | } | |
3036 | ||
3037 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
3038 | } | |
3039 | ||
3040 | TEST_P(DBCompactionTestWithParam, IntraL0Compaction) { | |
3041 | Options options = CurrentOptions(); | |
3042 | options.compression = kNoCompression; | |
3043 | options.level0_file_num_compaction_trigger = 5; | |
3044 | options.max_background_compactions = 2; | |
3045 | options.max_subcompactions = max_subcompactions_; | |
3046 | DestroyAndReopen(options); | |
3047 | ||
3048 | const size_t kValueSize = 1 << 20; | |
3049 | Random rnd(301); | |
3050 | std::string value(RandomString(&rnd, kValueSize)); | |
3051 | ||
3052 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
3053 | {{"LevelCompactionPicker::PickCompactionBySize:0", | |
3054 | "CompactionJob::Run():Start"}}); | |
3055 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
3056 | ||
3057 | // index: 0 1 2 3 4 5 6 7 8 9 | |
3058 | // size: 1MB 1MB 1MB 1MB 1MB 2MB 1MB 1MB 1MB 1MB | |
3059 | // score: 1.5 1.3 1.5 2.0 inf | |
3060 | // | |
3061 | // Files 0-4 will be included in an L0->L1 compaction. | |
3062 | // | |
3063 | // L0->L0 will be triggered since the sync points guarantee compaction to base | |
3064 | // level is still blocked when files 5-9 trigger another compaction. | |
3065 | // | |
3066 | // Files 6-9 are the longest span of available files for which | |
3067 | // work-per-deleted-file decreases (see "score" row above). | |
3068 | for (int i = 0; i < 10; ++i) { | |
11fdf7f2 TL |
3069 | ASSERT_OK(Put(Key(0), "")); // prevents trivial move |
3070 | if (i == 5) { | |
3071 | ASSERT_OK(Put(Key(i + 1), value + value)); | |
3072 | } else { | |
3073 | ASSERT_OK(Put(Key(i + 1), value)); | |
7c673cae FG |
3074 | } |
3075 | ASSERT_OK(Flush()); | |
3076 | } | |
3077 | dbfull()->TEST_WaitForCompact(); | |
3078 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
3079 | ||
3080 | std::vector<std::vector<FileMetaData>> level_to_files; | |
3081 | dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(), | |
3082 | &level_to_files); | |
3083 | ASSERT_GE(level_to_files.size(), 2); // at least L0 and L1 | |
3084 | // L0 has the 2MB file (not compacted) and 4MB file (output of L0->L0) | |
3085 | ASSERT_EQ(2, level_to_files[0].size()); | |
3086 | ASSERT_GT(level_to_files[1].size(), 0); | |
3087 | for (int i = 0; i < 2; ++i) { | |
11fdf7f2 | 3088 | ASSERT_GE(level_to_files[0][i].fd.file_size, 1 << 21); |
7c673cae FG |
3089 | } |
3090 | } | |
3091 | ||
11fdf7f2 TL |
3092 | TEST_P(DBCompactionTestWithParam, IntraL0CompactionDoesNotObsoleteDeletions) { |
3093 | // regression test for issue #2722: L0->L0 compaction can resurrect deleted | |
3094 | // keys from older L0 files if L1+ files' key-ranges do not include the key. | |
7c673cae | 3095 | Options options = CurrentOptions(); |
11fdf7f2 TL |
3096 | options.compression = kNoCompression; |
3097 | options.level0_file_num_compaction_trigger = 5; | |
3098 | options.max_background_compactions = 2; | |
3099 | options.max_subcompactions = max_subcompactions_; | |
3100 | DestroyAndReopen(options); | |
3101 | ||
3102 | const size_t kValueSize = 1 << 20; | |
3103 | Random rnd(301); | |
3104 | std::string value(RandomString(&rnd, kValueSize)); | |
3105 | ||
3106 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
3107 | {{"LevelCompactionPicker::PickCompactionBySize:0", | |
3108 | "CompactionJob::Run():Start"}}); | |
3109 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
3110 | ||
3111 | // index: 0 1 2 3 4 5 6 7 8 9 | |
3112 | // size: 1MB 1MB 1MB 1MB 1MB 1MB 1MB 1MB 1MB 1MB | |
3113 | // score: 1.25 1.33 1.5 2.0 inf | |
3114 | // | |
3115 | // Files 0-4 will be included in an L0->L1 compaction. | |
3116 | // | |
3117 | // L0->L0 will be triggered since the sync points guarantee compaction to base | |
3118 | // level is still blocked when files 5-9 trigger another compaction. All files | |
3119 | // 5-9 are included in the L0->L0 due to work-per-deleted file decreasing. | |
3120 | // | |
3121 | // Put a key-value in files 0-4. Delete that key in files 5-9. Verify the | |
3122 | // L0->L0 preserves the deletion such that the key remains deleted. | |
3123 | for (int i = 0; i < 10; ++i) { | |
3124 | // key 0 serves both to prevent trivial move and as the key we want to | |
3125 | // verify is not resurrected by L0->L0 compaction. | |
3126 | if (i < 5) { | |
3127 | ASSERT_OK(Put(Key(0), "")); | |
3128 | } else { | |
3129 | ASSERT_OK(Delete(Key(0))); | |
3130 | } | |
3131 | ASSERT_OK(Put(Key(i + 1), value)); | |
3132 | ASSERT_OK(Flush()); | |
3133 | } | |
3134 | dbfull()->TEST_WaitForCompact(); | |
3135 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
3136 | ||
3137 | std::vector<std::vector<FileMetaData>> level_to_files; | |
3138 | dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(), | |
3139 | &level_to_files); | |
3140 | ASSERT_GE(level_to_files.size(), 2); // at least L0 and L1 | |
3141 | // L0 has a single output file from L0->L0 | |
3142 | ASSERT_EQ(1, level_to_files[0].size()); | |
3143 | ASSERT_GT(level_to_files[1].size(), 0); | |
3144 | ASSERT_GE(level_to_files[0][0].fd.file_size, 1 << 22); | |
3145 | ||
3146 | ReadOptions roptions; | |
3147 | std::string result; | |
3148 | ASSERT_TRUE(db_->Get(roptions, Key(0), &result).IsNotFound()); | |
3149 | } | |
3150 | ||
3151 | TEST_P(DBCompactionTestWithParam, FullCompactionInBottomPriThreadPool) { | |
3152 | const int kNumFilesTrigger = 3; | |
3153 | Env::Default()->SetBackgroundThreads(1, Env::Priority::BOTTOM); | |
3154 | for (bool use_universal_compaction : {false, true}) { | |
3155 | Options options = CurrentOptions(); | |
3156 | if (use_universal_compaction) { | |
3157 | options.compaction_style = kCompactionStyleUniversal; | |
3158 | } else { | |
3159 | options.compaction_style = kCompactionStyleLevel; | |
3160 | options.level_compaction_dynamic_level_bytes = true; | |
3161 | } | |
3162 | options.num_levels = 4; | |
3163 | options.write_buffer_size = 100 << 10; // 100KB | |
3164 | options.target_file_size_base = 32 << 10; // 32KB | |
3165 | options.level0_file_num_compaction_trigger = kNumFilesTrigger; | |
3166 | // Trigger compaction if size amplification exceeds 110% | |
3167 | options.compaction_options_universal.max_size_amplification_percent = 110; | |
3168 | DestroyAndReopen(options); | |
3169 | ||
3170 | int num_bottom_pri_compactions = 0; | |
3171 | SyncPoint::GetInstance()->SetCallBack( | |
3172 | "DBImpl::BGWorkBottomCompaction", | |
3173 | [&](void* /*arg*/) { ++num_bottom_pri_compactions; }); | |
3174 | SyncPoint::GetInstance()->EnableProcessing(); | |
3175 | ||
3176 | Random rnd(301); | |
3177 | for (int num = 0; num < kNumFilesTrigger; num++) { | |
3178 | ASSERT_EQ(NumSortedRuns(), num); | |
3179 | int key_idx = 0; | |
3180 | GenerateNewFile(&rnd, &key_idx); | |
3181 | } | |
3182 | dbfull()->TEST_WaitForCompact(); | |
3183 | ||
3184 | ASSERT_EQ(1, num_bottom_pri_compactions); | |
3185 | ||
3186 | // Verify that size amplification did occur | |
3187 | ASSERT_EQ(NumSortedRuns(), 1); | |
3188 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
3189 | } | |
3190 | Env::Default()->SetBackgroundThreads(0, Env::Priority::BOTTOM); | |
3191 | } | |
3192 | ||
3193 | TEST_F(DBCompactionTest, OptimizedDeletionObsoleting) { | |
3194 | // Deletions can be dropped when compacted to non-last level if they fall | |
3195 | // outside the lower-level files' key-ranges. | |
3196 | const int kNumL0Files = 4; | |
3197 | Options options = CurrentOptions(); | |
3198 | options.level0_file_num_compaction_trigger = kNumL0Files; | |
3199 | options.statistics = rocksdb::CreateDBStatistics(); | |
3200 | DestroyAndReopen(options); | |
3201 | ||
3202 | // put key 1 and 3 in separate L1, L2 files. | |
3203 | // So key 0, 2, and 4+ fall outside these levels' key-ranges. | |
3204 | for (int level = 2; level >= 1; --level) { | |
3205 | for (int i = 0; i < 2; ++i) { | |
3206 | Put(Key(2 * i + 1), "val"); | |
3207 | Flush(); | |
3208 | } | |
3209 | MoveFilesToLevel(level); | |
3210 | ASSERT_EQ(2, NumTableFilesAtLevel(level)); | |
3211 | } | |
3212 | ||
3213 | // Delete keys in range [1, 4]. These L0 files will be compacted with L1: | |
3214 | // - Tombstones for keys 2 and 4 can be dropped early. | |
3215 | // - Tombstones for keys 1 and 3 must be kept due to L2 files' key-ranges. | |
3216 | for (int i = 0; i < kNumL0Files; ++i) { | |
3217 | Put(Key(0), "val"); // sentinel to prevent trivial move | |
3218 | Delete(Key(i + 1)); | |
3219 | Flush(); | |
3220 | } | |
3221 | dbfull()->TEST_WaitForCompact(); | |
3222 | ||
3223 | for (int i = 0; i < kNumL0Files; ++i) { | |
3224 | std::string value; | |
3225 | ASSERT_TRUE(db_->Get(ReadOptions(), Key(i + 1), &value).IsNotFound()); | |
3226 | } | |
3227 | ASSERT_EQ(2, options.statistics->getTickerCount( | |
3228 | COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE)); | |
3229 | ASSERT_EQ(2, | |
3230 | options.statistics->getTickerCount(COMPACTION_KEY_DROP_OBSOLETE)); | |
3231 | } | |
3232 | ||
3233 | TEST_F(DBCompactionTest, CompactFilesPendingL0Bug) { | |
3234 | // https://www.facebook.com/groups/rocksdb.dev/permalink/1389452781153232/ | |
3235 | // CompactFiles() had a bug where it failed to pick a compaction when an L0 | |
3236 | // compaction existed, but marked it as scheduled anyways. It'd never be | |
3237 | // unmarked as scheduled, so future compactions or DB close could hang. | |
3238 | const int kNumL0Files = 5; | |
3239 | Options options = CurrentOptions(); | |
3240 | options.level0_file_num_compaction_trigger = kNumL0Files - 1; | |
3241 | options.max_background_compactions = 2; | |
3242 | DestroyAndReopen(options); | |
3243 | ||
3244 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
3245 | {{"LevelCompactionPicker::PickCompaction:Return", | |
3246 | "DBCompactionTest::CompactFilesPendingL0Bug:Picked"}, | |
3247 | {"DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted", | |
3248 | "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}}); | |
3249 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
3250 | ||
3251 | auto schedule_multi_compaction_token = | |
3252 | dbfull()->TEST_write_controler().GetCompactionPressureToken(); | |
3253 | ||
3254 | // Files 0-3 will be included in an L0->L1 compaction. | |
3255 | // | |
3256 | // File 4 will be included in a call to CompactFiles() while the first | |
3257 | // compaction is running. | |
3258 | for (int i = 0; i < kNumL0Files - 1; ++i) { | |
3259 | ASSERT_OK(Put(Key(0), "val")); // sentinel to prevent trivial move | |
3260 | ASSERT_OK(Put(Key(i + 1), "val")); | |
3261 | ASSERT_OK(Flush()); | |
3262 | } | |
3263 | TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:Picked"); | |
3264 | // file 4 flushed after 0-3 picked | |
3265 | ASSERT_OK(Put(Key(kNumL0Files), "val")); | |
3266 | ASSERT_OK(Flush()); | |
3267 | ||
3268 | // previously DB close would hang forever as this situation caused scheduled | |
3269 | // compactions count to never decrement to zero. | |
3270 | ColumnFamilyMetaData cf_meta; | |
3271 | dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta); | |
3272 | ASSERT_EQ(kNumL0Files, cf_meta.levels[0].files.size()); | |
3273 | std::vector<std::string> input_filenames; | |
3274 | input_filenames.push_back(cf_meta.levels[0].files.front().name); | |
3275 | ASSERT_OK(dbfull() | |
3276 | ->CompactFiles(CompactionOptions(), input_filenames, | |
3277 | 0 /* output_level */)); | |
3278 | TEST_SYNC_POINT("DBCompactionTest::CompactFilesPendingL0Bug:ManualCompacted"); | |
3279 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
3280 | } | |
3281 | ||
3282 | TEST_F(DBCompactionTest, CompactFilesOverlapInL0Bug) { | |
3283 | // Regression test for bug of not pulling in L0 files that overlap the user- | |
3284 | // specified input files in time- and key-ranges. | |
3285 | Put(Key(0), "old_val"); | |
3286 | Flush(); | |
3287 | Put(Key(0), "new_val"); | |
3288 | Flush(); | |
3289 | ||
3290 | ColumnFamilyMetaData cf_meta; | |
3291 | dbfull()->GetColumnFamilyMetaData(dbfull()->DefaultColumnFamily(), &cf_meta); | |
3292 | ASSERT_GE(cf_meta.levels.size(), 2); | |
3293 | ASSERT_EQ(2, cf_meta.levels[0].files.size()); | |
3294 | ||
3295 | // Compacting {new L0 file, L1 file} should pull in the old L0 file since it | |
3296 | // overlaps in key-range and time-range. | |
3297 | std::vector<std::string> input_filenames; | |
3298 | input_filenames.push_back(cf_meta.levels[0].files.front().name); | |
3299 | ASSERT_OK(dbfull()->CompactFiles(CompactionOptions(), input_filenames, | |
3300 | 1 /* output_level */)); | |
3301 | ASSERT_EQ("new_val", Get(Key(0))); | |
3302 | } | |
3303 | ||
3304 | TEST_F(DBCompactionTest, CompactBottomLevelFilesWithDeletions) { | |
3305 | // bottom-level files may contain deletions due to snapshots protecting the | |
3306 | // deleted keys. Once the snapshot is released, we should see files with many | |
3307 | // such deletions undergo single-file compactions. | |
3308 | const int kNumKeysPerFile = 1024; | |
3309 | const int kNumLevelFiles = 4; | |
3310 | const int kValueSize = 128; | |
3311 | Options options = CurrentOptions(); | |
3312 | options.compression = kNoCompression; | |
3313 | options.level0_file_num_compaction_trigger = kNumLevelFiles; | |
3314 | // inflate it a bit to account for key/metadata overhead | |
3315 | options.target_file_size_base = 120 * kNumKeysPerFile * kValueSize / 100; | |
3316 | Reopen(options); | |
3317 | ||
3318 | Random rnd(301); | |
3319 | const Snapshot* snapshot = nullptr; | |
3320 | for (int i = 0; i < kNumLevelFiles; ++i) { | |
3321 | for (int j = 0; j < kNumKeysPerFile; ++j) { | |
3322 | ASSERT_OK( | |
3323 | Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize))); | |
3324 | } | |
3325 | if (i == kNumLevelFiles - 1) { | |
3326 | snapshot = db_->GetSnapshot(); | |
3327 | // delete every other key after grabbing a snapshot, so these deletions | |
3328 | // and the keys they cover can't be dropped until after the snapshot is | |
3329 | // released. | |
3330 | for (int j = 0; j < kNumLevelFiles * kNumKeysPerFile; j += 2) { | |
3331 | ASSERT_OK(Delete(Key(j))); | |
3332 | } | |
3333 | } | |
3334 | Flush(); | |
3335 | if (i < kNumLevelFiles - 1) { | |
3336 | ASSERT_EQ(i + 1, NumTableFilesAtLevel(0)); | |
3337 | } | |
3338 | } | |
3339 | dbfull()->TEST_WaitForCompact(); | |
3340 | ASSERT_EQ(kNumLevelFiles, NumTableFilesAtLevel(1)); | |
3341 | ||
3342 | std::vector<LiveFileMetaData> pre_release_metadata, post_release_metadata; | |
3343 | db_->GetLiveFilesMetaData(&pre_release_metadata); | |
3344 | // just need to bump seqnum so ReleaseSnapshot knows the newest key in the SST | |
3345 | // files does not need to be preserved in case of a future snapshot. | |
3346 | ASSERT_OK(Put(Key(0), "val")); | |
3347 | // release snapshot and wait for compactions to finish. Single-file | |
3348 | // compactions should be triggered, which reduce the size of each bottom-level | |
3349 | // file without changing file count. | |
3350 | db_->ReleaseSnapshot(snapshot); | |
3351 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
3352 | "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { | |
3353 | Compaction* compaction = reinterpret_cast<Compaction*>(arg); | |
3354 | ASSERT_TRUE(compaction->compaction_reason() == | |
3355 | CompactionReason::kBottommostFiles); | |
3356 | }); | |
3357 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
3358 | dbfull()->TEST_WaitForCompact(); | |
3359 | db_->GetLiveFilesMetaData(&post_release_metadata); | |
3360 | ASSERT_EQ(pre_release_metadata.size(), post_release_metadata.size()); | |
3361 | ||
3362 | for (size_t i = 0; i < pre_release_metadata.size(); ++i) { | |
3363 | const auto& pre_file = pre_release_metadata[i]; | |
3364 | const auto& post_file = post_release_metadata[i]; | |
3365 | ASSERT_EQ(1, pre_file.level); | |
3366 | ASSERT_EQ(1, post_file.level); | |
3367 | // each file is smaller than it was before as it was rewritten without | |
3368 | // deletion markers/deleted keys. | |
3369 | ASSERT_LT(post_file.size, pre_file.size); | |
3370 | } | |
3371 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
3372 | } | |
3373 | ||
3374 | TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) { | |
3375 | const int kNumKeysPerFile = 32; | |
3376 | const int kNumLevelFiles = 2; | |
3377 | const int kValueSize = 1024; | |
3378 | ||
3379 | Options options = CurrentOptions(); | |
3380 | options.compression = kNoCompression; | |
3381 | options.ttl = 24 * 60 * 60; // 24 hours | |
3382 | options.max_open_files = -1; | |
3383 | env_->time_elapse_only_sleep_ = false; | |
3384 | options.env = env_; | |
3385 | ||
3386 | env_->addon_time_.store(0); | |
3387 | DestroyAndReopen(options); | |
3388 | ||
3389 | Random rnd(301); | |
3390 | for (int i = 0; i < kNumLevelFiles; ++i) { | |
3391 | for (int j = 0; j < kNumKeysPerFile; ++j) { | |
3392 | ASSERT_OK( | |
3393 | Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize))); | |
3394 | } | |
3395 | Flush(); | |
3396 | } | |
3397 | dbfull()->TEST_WaitForCompact(); | |
3398 | MoveFilesToLevel(3); | |
3399 | ASSERT_EQ("0,0,0,2", FilesPerLevel()); | |
3400 | ||
3401 | // Delete previously written keys. | |
3402 | for (int i = 0; i < kNumLevelFiles; ++i) { | |
3403 | for (int j = 0; j < kNumKeysPerFile; ++j) { | |
3404 | ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j))); | |
3405 | } | |
3406 | Flush(); | |
3407 | } | |
3408 | dbfull()->TEST_WaitForCompact(); | |
3409 | ASSERT_EQ("2,0,0,2", FilesPerLevel()); | |
3410 | MoveFilesToLevel(1); | |
3411 | ASSERT_EQ("0,2,0,2", FilesPerLevel()); | |
3412 | ||
3413 | env_->addon_time_.fetch_add(36 * 60 * 60); // 36 hours | |
3414 | ASSERT_EQ("0,2,0,2", FilesPerLevel()); | |
3415 | ||
3416 | // Just do a simple write + flush so that the Ttl expired files get | |
3417 | // compacted. | |
3418 | ASSERT_OK(Put("a", "1")); | |
3419 | Flush(); | |
3420 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
3421 | "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { | |
3422 | Compaction* compaction = reinterpret_cast<Compaction*>(arg); | |
3423 | ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl); | |
3424 | }); | |
3425 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
3426 | dbfull()->TEST_WaitForCompact(); | |
3427 | // All non-L0 files are deleted, as they contained only deleted data. | |
3428 | ASSERT_EQ("1", FilesPerLevel()); | |
3429 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
3430 | ||
3431 | // Test dynamically changing ttl. | |
3432 | ||
3433 | env_->addon_time_.store(0); | |
3434 | DestroyAndReopen(options); | |
3435 | ||
3436 | for (int i = 0; i < kNumLevelFiles; ++i) { | |
3437 | for (int j = 0; j < kNumKeysPerFile; ++j) { | |
3438 | ASSERT_OK( | |
3439 | Put(Key(i * kNumKeysPerFile + j), RandomString(&rnd, kValueSize))); | |
3440 | } | |
3441 | Flush(); | |
3442 | } | |
3443 | dbfull()->TEST_WaitForCompact(); | |
3444 | MoveFilesToLevel(3); | |
3445 | ASSERT_EQ("0,0,0,2", FilesPerLevel()); | |
3446 | ||
3447 | // Delete previously written keys. | |
3448 | for (int i = 0; i < kNumLevelFiles; ++i) { | |
3449 | for (int j = 0; j < kNumKeysPerFile; ++j) { | |
3450 | ASSERT_OK(Delete(Key(i * kNumKeysPerFile + j))); | |
3451 | } | |
3452 | Flush(); | |
3453 | } | |
3454 | dbfull()->TEST_WaitForCompact(); | |
3455 | ASSERT_EQ("2,0,0,2", FilesPerLevel()); | |
3456 | MoveFilesToLevel(1); | |
3457 | ASSERT_EQ("0,2,0,2", FilesPerLevel()); | |
3458 | ||
3459 | // Move time forward by 12 hours, and make sure that compaction still doesn't | |
3460 | // trigger as ttl is set to 24 hours. | |
3461 | env_->addon_time_.fetch_add(12 * 60 * 60); | |
3462 | ASSERT_OK(Put("a", "1")); | |
3463 | Flush(); | |
3464 | dbfull()->TEST_WaitForCompact(); | |
3465 | ASSERT_EQ("1,2,0,2", FilesPerLevel()); | |
3466 | ||
3467 | rocksdb::SyncPoint::GetInstance()->SetCallBack( | |
3468 | "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { | |
3469 | Compaction* compaction = reinterpret_cast<Compaction*>(arg); | |
3470 | ASSERT_TRUE(compaction->compaction_reason() == CompactionReason::kTtl); | |
3471 | }); | |
3472 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
3473 | ||
3474 | // Dynamically change ttl to 10 hours. | |
3475 | // This should trigger a ttl compaction, as 12 hours have already passed. | |
3476 | ASSERT_OK(dbfull()->SetOptions({{"ttl", "36000"}})); | |
3477 | dbfull()->TEST_WaitForCompact(); | |
3478 | // All non-L0 files are deleted, as they contained only deleted data. | |
3479 | ASSERT_EQ("1", FilesPerLevel()); | |
3480 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
3481 | } | |
3482 | ||
3483 | TEST_F(DBCompactionTest, CompactRangeDelayedByL0FileCount) { | |
3484 | // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual | |
3485 | // compaction only triggers flush after it's sure stall won't be triggered for | |
3486 | // L0 file count going too high. | |
3487 | const int kNumL0FilesTrigger = 4; | |
3488 | const int kNumL0FilesLimit = 8; | |
3489 | // i == 0: verifies normal case where stall is avoided by delay | |
3490 | // i == 1: verifies no delay in edge case where stall trigger is same as | |
3491 | // compaction trigger, so stall can't be avoided | |
3492 | for (int i = 0; i < 2; ++i) { | |
3493 | Options options = CurrentOptions(); | |
3494 | options.level0_slowdown_writes_trigger = kNumL0FilesLimit; | |
3495 | if (i == 0) { | |
3496 | options.level0_file_num_compaction_trigger = kNumL0FilesTrigger; | |
3497 | } else { | |
3498 | options.level0_file_num_compaction_trigger = kNumL0FilesLimit; | |
3499 | } | |
3500 | Reopen(options); | |
3501 | ||
3502 | if (i == 0) { | |
3503 | // ensure the auto compaction doesn't finish until manual compaction has | |
3504 | // had a chance to be delayed. | |
3505 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
3506 | {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait", | |
3507 | "CompactionJob::Run():End"}}); | |
3508 | } else { | |
3509 | // ensure the auto-compaction doesn't finish until manual compaction has | |
3510 | // continued without delay. | |
3511 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
3512 | {{"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}}); | |
3513 | } | |
3514 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
3515 | ||
3516 | Random rnd(301); | |
3517 | for (int j = 0; j < kNumL0FilesLimit - 1; ++j) { | |
3518 | for (int k = 0; k < 2; ++k) { | |
3519 | ASSERT_OK(Put(Key(k), RandomString(&rnd, 1024))); | |
3520 | } | |
3521 | Flush(); | |
3522 | } | |
3523 | auto manual_compaction_thread = port::Thread([this]() { | |
3524 | CompactRangeOptions cro; | |
3525 | cro.allow_write_stall = false; | |
3526 | db_->CompactRange(cro, nullptr, nullptr); | |
3527 | }); | |
3528 | ||
3529 | manual_compaction_thread.join(); | |
3530 | dbfull()->TEST_WaitForCompact(); | |
3531 | ASSERT_EQ(0, NumTableFilesAtLevel(0)); | |
3532 | ASSERT_GT(NumTableFilesAtLevel(1), 0); | |
3533 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
3534 | } | |
3535 | } | |
3536 | ||
3537 | TEST_F(DBCompactionTest, CompactRangeDelayedByImmMemTableCount) { | |
3538 | // Verify that, when `CompactRangeOptions::allow_write_stall == false`, manual | |
3539 | // compaction only triggers flush after it's sure stall won't be triggered for | |
3540 | // immutable memtable count going too high. | |
3541 | const int kNumImmMemTableLimit = 8; | |
3542 | // i == 0: verifies normal case where stall is avoided by delay | |
3543 | // i == 1: verifies no delay in edge case where stall trigger is same as flush | |
3544 | // trigger, so stall can't be avoided | |
3545 | for (int i = 0; i < 2; ++i) { | |
3546 | Options options = CurrentOptions(); | |
3547 | options.disable_auto_compactions = true; | |
3548 | // the delay limit is one less than the stop limit. This test focuses on | |
3549 | // avoiding delay limit, but this option sets stop limit, so add one. | |
3550 | options.max_write_buffer_number = kNumImmMemTableLimit + 1; | |
3551 | if (i == 1) { | |
3552 | options.min_write_buffer_number_to_merge = kNumImmMemTableLimit; | |
3553 | } | |
3554 | Reopen(options); | |
3555 | ||
3556 | if (i == 0) { | |
3557 | // ensure the flush doesn't finish until manual compaction has had a | |
3558 | // chance to be delayed. | |
3559 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
3560 | {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait", | |
3561 | "FlushJob::WriteLevel0Table"}}); | |
3562 | } else { | |
3563 | // ensure the flush doesn't finish until manual compaction has continued | |
3564 | // without delay. | |
3565 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
3566 | {{"DBImpl::FlushMemTable:StallWaitDone", | |
3567 | "FlushJob::WriteLevel0Table"}}); | |
3568 | } | |
3569 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
3570 | ||
3571 | Random rnd(301); | |
3572 | for (int j = 0; j < kNumImmMemTableLimit - 1; ++j) { | |
3573 | ASSERT_OK(Put(Key(0), RandomString(&rnd, 1024))); | |
3574 | FlushOptions flush_opts; | |
3575 | flush_opts.wait = false; | |
3576 | flush_opts.allow_write_stall = true; | |
3577 | dbfull()->Flush(flush_opts); | |
3578 | } | |
3579 | ||
3580 | auto manual_compaction_thread = port::Thread([this]() { | |
3581 | CompactRangeOptions cro; | |
3582 | cro.allow_write_stall = false; | |
3583 | db_->CompactRange(cro, nullptr, nullptr); | |
3584 | }); | |
3585 | ||
3586 | manual_compaction_thread.join(); | |
3587 | dbfull()->TEST_WaitForFlushMemTable(); | |
3588 | ASSERT_EQ(0, NumTableFilesAtLevel(0)); | |
3589 | ASSERT_GT(NumTableFilesAtLevel(1), 0); | |
3590 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
3591 | } | |
3592 | } | |
3593 | ||
3594 | TEST_F(DBCompactionTest, CompactRangeShutdownWhileDelayed) { | |
3595 | // Verify that, when `CompactRangeOptions::allow_write_stall == false`, delay | |
3596 | // does not hang if CF is dropped or DB is closed | |
3597 | const int kNumL0FilesTrigger = 4; | |
3598 | const int kNumL0FilesLimit = 8; | |
3599 | Options options = CurrentOptions(); | |
3600 | options.level0_file_num_compaction_trigger = kNumL0FilesTrigger; | |
3601 | options.level0_slowdown_writes_trigger = kNumL0FilesLimit; | |
3602 | // i == 0: DB::DropColumnFamily() on CompactRange's target CF unblocks it | |
3603 | // i == 1: DB::CancelAllBackgroundWork() unblocks CompactRange. This is to | |
3604 | // simulate what happens during Close as we can't call Close (it | |
3605 | // blocks on the auto-compaction, making a cycle). | |
3606 | for (int i = 0; i < 2; ++i) { | |
3607 | CreateAndReopenWithCF({"one"}, options); | |
3608 | // The calls to close CF/DB wait until the manual compaction stalls. | |
3609 | // The auto-compaction waits until the manual compaction finishes to ensure | |
3610 | // the signal comes from closing CF/DB, not from compaction making progress. | |
3611 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
3612 | {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait", | |
3613 | "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"}, | |
3614 | {"DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual", | |
3615 | "CompactionJob::Run():End"}}); | |
3616 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
3617 | ||
3618 | Random rnd(301); | |
3619 | for (int j = 0; j < kNumL0FilesLimit - 1; ++j) { | |
3620 | for (int k = 0; k < 2; ++k) { | |
3621 | ASSERT_OK(Put(1, Key(k), RandomString(&rnd, 1024))); | |
3622 | } | |
3623 | Flush(1); | |
3624 | } | |
3625 | auto manual_compaction_thread = port::Thread([this]() { | |
3626 | CompactRangeOptions cro; | |
3627 | cro.allow_write_stall = false; | |
3628 | ASSERT_TRUE(db_->CompactRange(cro, handles_[1], nullptr, nullptr) | |
3629 | .IsShutdownInProgress()); | |
3630 | }); | |
3631 | ||
3632 | TEST_SYNC_POINT( | |
3633 | "DBCompactionTest::CompactRangeShutdownWhileDelayed:PreShutdown"); | |
3634 | if (i == 0) { | |
3635 | ASSERT_OK(db_->DropColumnFamily(handles_[1])); | |
3636 | } else { | |
3637 | dbfull()->CancelAllBackgroundWork(false /* wait */); | |
3638 | } | |
3639 | manual_compaction_thread.join(); | |
3640 | TEST_SYNC_POINT( | |
3641 | "DBCompactionTest::CompactRangeShutdownWhileDelayed:PostManual"); | |
3642 | dbfull()->TEST_WaitForCompact(); | |
3643 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
3644 | } | |
3645 | } | |
3646 | ||
3647 | TEST_F(DBCompactionTest, CompactRangeSkipFlushAfterDelay) { | |
3648 | // Verify that, when `CompactRangeOptions::allow_write_stall == false`, | |
3649 | // CompactRange skips its flush if the delay is long enough that the memtables | |
3650 | // existing at the beginning of the call have already been flushed. | |
3651 | const int kNumL0FilesTrigger = 4; | |
3652 | const int kNumL0FilesLimit = 8; | |
3653 | Options options = CurrentOptions(); | |
3654 | options.level0_slowdown_writes_trigger = kNumL0FilesLimit; | |
3655 | options.level0_file_num_compaction_trigger = kNumL0FilesTrigger; | |
3656 | Reopen(options); | |
3657 | ||
3658 | Random rnd(301); | |
3659 | // The manual flush includes the memtable that was active when CompactRange | |
3660 | // began. So it unblocks CompactRange and precludes its flush. Throughout the | |
3661 | // test, stall conditions are upheld via high L0 file count. | |
3662 | rocksdb::SyncPoint::GetInstance()->LoadDependency( | |
3663 | {{"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait", | |
3664 | "DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"}, | |
3665 | {"DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush", | |
3666 | "DBImpl::FlushMemTable:StallWaitDone"}, | |
3667 | {"DBImpl::FlushMemTable:StallWaitDone", "CompactionJob::Run():End"}}); | |
3668 | rocksdb::SyncPoint::GetInstance()->EnableProcessing(); | |
3669 | ||
3670 | //used for the delayable flushes | |
3671 | FlushOptions flush_opts; | |
3672 | flush_opts.allow_write_stall = true; | |
3673 | for (int i = 0; i < kNumL0FilesLimit - 1; ++i) { | |
3674 | for (int j = 0; j < 2; ++j) { | |
3675 | ASSERT_OK(Put(Key(j), RandomString(&rnd, 1024))); | |
3676 | } | |
3677 | dbfull()->Flush(flush_opts); | |
3678 | } | |
3679 | auto manual_compaction_thread = port::Thread([this]() { | |
3680 | CompactRangeOptions cro; | |
3681 | cro.allow_write_stall = false; | |
3682 | db_->CompactRange(cro, nullptr, nullptr); | |
3683 | }); | |
3684 | ||
3685 | TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PreFlush"); | |
3686 | Put(ToString(0), RandomString(&rnd, 1024)); | |
3687 | dbfull()->Flush(flush_opts); | |
3688 | Put(ToString(0), RandomString(&rnd, 1024)); | |
3689 | TEST_SYNC_POINT("DBCompactionTest::CompactRangeSkipFlushAfterDelay:PostFlush"); | |
3690 | manual_compaction_thread.join(); | |
3691 | ||
3692 | // If CompactRange's flush was skipped, the final Put above will still be | |
3693 | // in the active memtable. | |
3694 | std::string num_keys_in_memtable; | |
3695 | db_->GetProperty(DB::Properties::kNumEntriesActiveMemTable, &num_keys_in_memtable); | |
3696 | ASSERT_EQ(ToString(1), num_keys_in_memtable); | |
3697 | ||
3698 | rocksdb::SyncPoint::GetInstance()->DisableProcessing(); | |
3699 | } | |
3700 | ||
3701 | TEST_F(DBCompactionTest, CompactRangeFlushOverlappingMemtable) { | |
3702 | // Verify memtable only gets flushed if it contains data overlapping the range | |
3703 | // provided to `CompactRange`. Tests all kinds of overlap/non-overlap. | |
3704 | const int kNumEndpointKeys = 5; | |
3705 | std::string keys[kNumEndpointKeys] = {"a", "b", "c", "d", "e"}; | |
3706 | Options options = CurrentOptions(); | |
3707 | options.disable_auto_compactions = true; | |
3708 | Reopen(options); | |
3709 | ||
3710 | // One extra iteration for nullptr, which means left side of interval is | |
3711 | // unbounded. | |
3712 | for (int i = 0; i <= kNumEndpointKeys; ++i) { | |
3713 | Slice begin; | |
3714 | Slice* begin_ptr; | |
3715 | if (i == 0) { | |
3716 | begin_ptr = nullptr; | |
3717 | } else { | |
3718 | begin = keys[i - 1]; | |
3719 | begin_ptr = &begin; | |
3720 | } | |
3721 | // Start at `i` so right endpoint comes after left endpoint. One extra | |
3722 | // iteration for nullptr, which means right side of interval is unbounded. | |
3723 | for (int j = std::max(0, i - 1); j <= kNumEndpointKeys; ++j) { | |
3724 | Slice end; | |
3725 | Slice* end_ptr; | |
3726 | if (j == kNumEndpointKeys) { | |
3727 | end_ptr = nullptr; | |
3728 | } else { | |
3729 | end = keys[j]; | |
3730 | end_ptr = &end; | |
3731 | } | |
3732 | ASSERT_OK(Put("b", "val")); | |
3733 | ASSERT_OK(Put("d", "val")); | |
3734 | CompactRangeOptions compact_range_opts; | |
3735 | ASSERT_OK(db_->CompactRange(compact_range_opts, begin_ptr, end_ptr)); | |
3736 | ||
3737 | uint64_t get_prop_tmp, num_memtable_entries = 0; | |
3738 | ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesImmMemTables, | |
3739 | &get_prop_tmp)); | |
3740 | num_memtable_entries += get_prop_tmp; | |
3741 | ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kNumEntriesActiveMemTable, | |
3742 | &get_prop_tmp)); | |
3743 | num_memtable_entries += get_prop_tmp; | |
3744 | if (begin_ptr == nullptr || end_ptr == nullptr || | |
3745 | (i <= 4 && j >= 1 && (begin != "c" || end != "c"))) { | |
3746 | // In this case `CompactRange`'s range overlapped in some way with the | |
3747 | // memtable's range, so flush should've happened. Then "b" and "d" won't | |
3748 | // be in the memtable. | |
3749 | ASSERT_EQ(0, num_memtable_entries); | |
3750 | } else { | |
3751 | ASSERT_EQ(2, num_memtable_entries); | |
3752 | // flush anyways to prepare for next iteration | |
3753 | db_->Flush(FlushOptions()); | |
3754 | } | |
3755 | } | |
3756 | } | |
3757 | } | |
3758 | ||
3759 | TEST_F(DBCompactionTest, CompactionStatsTest) { | |
3760 | Options options = CurrentOptions(); | |
3761 | options.level0_file_num_compaction_trigger = 2; | |
3762 | CompactionStatsCollector* collector = new CompactionStatsCollector(); | |
3763 | options.listeners.emplace_back(collector); | |
3764 | DestroyAndReopen(options); | |
3765 | ||
3766 | for (int i = 0; i < 32; i++) { | |
3767 | for (int j = 0; j < 5000; j++) { | |
3768 | Put(std::to_string(j), std::string(1, 'A')); | |
3769 | } | |
3770 | ASSERT_OK(Flush()); | |
3771 | ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable()); | |
3772 | } | |
3773 | dbfull()->TEST_WaitForCompact(); | |
3774 | ColumnFamilyHandleImpl* cfh = | |
3775 | static_cast<ColumnFamilyHandleImpl*>(dbfull()->DefaultColumnFamily()); | |
3776 | ColumnFamilyData* cfd = cfh->cfd(); | |
3777 | ||
3778 | VerifyCompactionStats(*cfd, *collector); | |
3779 | } | |
3780 | ||
3781 | TEST_F(DBCompactionTest, CompactFilesOutputRangeConflict) { | |
3782 | // LSM setup: | |
3783 | // L1: [ba bz] | |
3784 | // L2: [a b] [c d] | |
3785 | // L3: [a b] [c d] | |
3786 | // | |
3787 | // Thread 1: Thread 2: | |
3788 | // Begin compacting all L2->L3 | |
3789 | // Compact [ba bz] L1->L3 | |
3790 | // End compacting all L2->L3 | |
3791 | // | |
3792 | // The compaction operation in thread 2 should be disallowed because the range | |
3793 | // overlaps with the compaction in thread 1, which also covers that range in | |
3794 | // L3. | |
3795 | Options options = CurrentOptions(); | |
3796 | FlushedFileCollector* collector = new FlushedFileCollector(); | |
3797 | options.listeners.emplace_back(collector); | |
3798 | Reopen(options); | |
3799 | ||
3800 | for (int level = 3; level >= 2; --level) { | |
3801 | ASSERT_OK(Put("a", "val")); | |
3802 | ASSERT_OK(Put("b", "val")); | |
3803 | ASSERT_OK(Flush()); | |
3804 | ASSERT_OK(Put("c", "val")); | |
3805 | ASSERT_OK(Put("d", "val")); | |
3806 | ASSERT_OK(Flush()); | |
3807 | MoveFilesToLevel(level); | |
3808 | } | |
3809 | ASSERT_OK(Put("ba", "val")); | |
3810 | ASSERT_OK(Put("bz", "val")); | |
3811 | ASSERT_OK(Flush()); | |
3812 | MoveFilesToLevel(1); | |
3813 | ||
3814 | SyncPoint::GetInstance()->LoadDependency({ | |
3815 | {"CompactFilesImpl:0", | |
3816 | "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin"}, | |
3817 | {"DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End", | |
3818 | "CompactFilesImpl:1"}, | |
3819 | }); | |
3820 | SyncPoint::GetInstance()->EnableProcessing(); | |
3821 | ||
3822 | auto bg_thread = port::Thread([&]() { | |
3823 | // Thread 1 | |
3824 | std::vector<std::string> filenames = collector->GetFlushedFiles(); | |
3825 | filenames.pop_back(); | |
3826 | ASSERT_OK(db_->CompactFiles(CompactionOptions(), filenames, | |
3827 | 3 /* output_level */)); | |
3828 | }); | |
3829 | ||
3830 | // Thread 2 | |
3831 | TEST_SYNC_POINT( | |
3832 | "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2Begin"); | |
3833 | std::string filename = collector->GetFlushedFiles().back(); | |
3834 | ASSERT_FALSE( | |
3835 | db_->CompactFiles(CompactionOptions(), {filename}, 3 /* output_level */) | |
3836 | .ok()); | |
3837 | TEST_SYNC_POINT( | |
3838 | "DBCompactionTest::CompactFilesOutputRangeConflict:Thread2End"); | |
3839 | ||
3840 | bg_thread.join(); | |
3841 | } | |
3842 | ||
3843 | TEST_F(DBCompactionTest, CompactionHasEmptyOutput) { | |
3844 | Options options = CurrentOptions(); | |
3845 | SstStatsCollector* collector = new SstStatsCollector(); | |
3846 | options.level0_file_num_compaction_trigger = 2; | |
3847 | options.listeners.emplace_back(collector); | |
3848 | Reopen(options); | |
3849 | ||
3850 | // Make sure the L0 files overlap to prevent trivial move. | |
3851 | ASSERT_OK(Put("a", "val")); | |
3852 | ASSERT_OK(Put("b", "val")); | |
3853 | ASSERT_OK(Flush()); | |
3854 | ASSERT_OK(Delete("a")); | |
3855 | ASSERT_OK(Delete("b")); | |
3856 | ASSERT_OK(Flush()); | |
3857 | ||
3858 | dbfull()->TEST_WaitForCompact(); | |
3859 | ASSERT_EQ(NumTableFilesAtLevel(0), 0); | |
3860 | ASSERT_EQ(NumTableFilesAtLevel(1), 0); | |
3861 | ||
3862 | // Expect one file creation to start for each flush, and zero for compaction | |
3863 | // since no keys are written. | |
3864 | ASSERT_EQ(2, collector->num_ssts_creation_started()); | |
3865 | } | |
3866 | ||
3867 | INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, | |
3868 | ::testing::Values(std::make_tuple(1, true), | |
3869 | std::make_tuple(1, false), | |
3870 | std::make_tuple(4, true), | |
3871 | std::make_tuple(4, false))); | |
3872 | ||
3873 | TEST_P(DBCompactionDirectIOTest, DirectIO) { | |
3874 | Options options = CurrentOptions(); | |
3875 | Destroy(options); | |
3876 | options.create_if_missing = true; | |
3877 | options.disable_auto_compactions = true; | |
3878 | options.use_direct_io_for_flush_and_compaction = GetParam(); | |
3879 | options.env = new MockEnv(Env::Default()); | |
3880 | Reopen(options); | |
3881 | bool readahead = false; | |
3882 | SyncPoint::GetInstance()->SetCallBack( | |
3883 | "TableCache::NewIterator:for_compaction", [&](void* arg) { | |
3884 | bool* use_direct_reads = static_cast<bool*>(arg); | |
3885 | ASSERT_EQ(*use_direct_reads, | |
3886 | options.use_direct_reads); | |
3887 | }); | |
3888 | SyncPoint::GetInstance()->SetCallBack( | |
3889 | "CompactionJob::OpenCompactionOutputFile", [&](void* arg) { | |
3890 | bool* use_direct_writes = static_cast<bool*>(arg); | |
3891 | ASSERT_EQ(*use_direct_writes, | |
3892 | options.use_direct_io_for_flush_and_compaction); | |
3893 | }); | |
3894 | if (options.use_direct_io_for_flush_and_compaction) { | |
3895 | SyncPoint::GetInstance()->SetCallBack( | |
3896 | "SanitizeOptions:direct_io", [&](void* /*arg*/) { | |
3897 | readahead = true; | |
3898 | }); | |
3899 | } | |
3900 | SyncPoint::GetInstance()->EnableProcessing(); | |
3901 | CreateAndReopenWithCF({"pikachu"}, options); | |
3902 | MakeTables(3, "p", "q", 1); | |
3903 | ASSERT_EQ("1,1,1", FilesPerLevel(1)); | |
3904 | Compact(1, "p1", "p9"); | |
3905 | ASSERT_EQ(readahead, options.use_direct_reads); | |
3906 | ASSERT_EQ("0,0,1", FilesPerLevel(1)); | |
3907 | Destroy(options); | |
3908 | delete options.env; | |
3909 | } | |
3910 | ||
3911 | INSTANTIATE_TEST_CASE_P(DBCompactionDirectIOTest, DBCompactionDirectIOTest, | |
3912 | testing::Bool()); | |
7c673cae FG |
3913 | |
3914 | class CompactionPriTest : public DBTestBase, | |
3915 | public testing::WithParamInterface<uint32_t> { | |
3916 | public: | |
3917 | CompactionPriTest() : DBTestBase("/compaction_pri_test") { | |
3918 | compaction_pri_ = GetParam(); | |
3919 | } | |
3920 | ||
3921 | // Required if inheriting from testing::WithParamInterface<> | |
3922 | static void SetUpTestCase() {} | |
3923 | static void TearDownTestCase() {} | |
3924 | ||
3925 | uint32_t compaction_pri_; | |
3926 | }; | |
3927 | ||
3928 | TEST_P(CompactionPriTest, Test) { | |
3929 | Options options = CurrentOptions(); | |
3930 | options.write_buffer_size = 16 * 1024; | |
3931 | options.compaction_pri = static_cast<CompactionPri>(compaction_pri_); | |
3932 | options.hard_pending_compaction_bytes_limit = 256 * 1024; | |
3933 | options.max_bytes_for_level_base = 64 * 1024; | |
3934 | options.max_bytes_for_level_multiplier = 4; | |
3935 | options.compression = kNoCompression; | |
3936 | ||
3937 | DestroyAndReopen(options); | |
3938 | ||
3939 | Random rnd(301); | |
3940 | const int kNKeys = 5000; | |
3941 | int keys[kNKeys]; | |
3942 | for (int i = 0; i < kNKeys; i++) { | |
3943 | keys[i] = i; | |
3944 | } | |
3945 | std::random_shuffle(std::begin(keys), std::end(keys)); | |
3946 | ||
3947 | for (int i = 0; i < kNKeys; i++) { | |
3948 | ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 102))); | |
3949 | } | |
3950 | ||
3951 | dbfull()->TEST_WaitForCompact(); | |
3952 | for (int i = 0; i < kNKeys; i++) { | |
3953 | ASSERT_NE("NOT_FOUND", Get(Key(i))); | |
3954 | } | |
3955 | } | |
3956 | ||
3957 | INSTANTIATE_TEST_CASE_P( | |
3958 | CompactionPriTest, CompactionPriTest, | |
3959 | ::testing::Values(CompactionPri::kByCompensatedSize, | |
3960 | CompactionPri::kOldestLargestSeqFirst, | |
3961 | CompactionPri::kOldestSmallestSeqFirst, | |
3962 | CompactionPri::kMinOverlappingRatio)); | |
3963 | ||
11fdf7f2 TL |
3964 | class NoopMergeOperator : public MergeOperator { |
3965 | public: | |
3966 | NoopMergeOperator() {} | |
3967 | ||
3968 | virtual bool FullMergeV2(const MergeOperationInput& /*merge_in*/, | |
3969 | MergeOperationOutput* merge_out) const override { | |
3970 | std::string val("bar"); | |
3971 | merge_out->new_value = val; | |
3972 | return true; | |
3973 | } | |
3974 | ||
3975 | virtual const char* Name() const override { return "Noop"; } | |
3976 | }; | |
3977 | ||
3978 | TEST_F(DBCompactionTest, PartialManualCompaction) { | |
3979 | Options opts = CurrentOptions(); | |
3980 | opts.num_levels = 3; | |
3981 | opts.level0_file_num_compaction_trigger = 10; | |
3982 | opts.compression = kNoCompression; | |
3983 | opts.merge_operator.reset(new NoopMergeOperator()); | |
3984 | opts.target_file_size_base = 10240; | |
3985 | DestroyAndReopen(opts); | |
3986 | ||
3987 | Random rnd(301); | |
3988 | for (auto i = 0; i < 8; ++i) { | |
3989 | for (auto j = 0; j < 10; ++j) { | |
3990 | Merge("foo", RandomString(&rnd, 1024)); | |
3991 | } | |
3992 | Flush(); | |
3993 | } | |
3994 | ||
3995 | MoveFilesToLevel(2); | |
3996 | ||
3997 | std::string prop; | |
3998 | EXPECT_TRUE(dbfull()->GetProperty(DB::Properties::kLiveSstFilesSize, &prop)); | |
3999 | uint64_t max_compaction_bytes = atoi(prop.c_str()) / 2; | |
4000 | ASSERT_OK(dbfull()->SetOptions( | |
4001 | {{"max_compaction_bytes", std::to_string(max_compaction_bytes)}})); | |
4002 | ||
4003 | CompactRangeOptions cro; | |
4004 | cro.bottommost_level_compaction = BottommostLevelCompaction::kForce; | |
4005 | dbfull()->CompactRange(cro, nullptr, nullptr); | |
4006 | } | |
4007 | ||
7c673cae FG |
4008 | #endif // !defined(ROCKSDB_LITE) |
4009 | } // namespace rocksdb | |
4010 | ||
4011 | int main(int argc, char** argv) { | |
4012 | #if !defined(ROCKSDB_LITE) | |
4013 | rocksdb::port::InstallStackTraceHandler(); | |
4014 | ::testing::InitGoogleTest(&argc, argv); | |
4015 | return RUN_ALL_TESTS(); | |
4016 | #else | |
11fdf7f2 TL |
4017 | (void) argc; |
4018 | (void) argv; | |
7c673cae FG |
4019 | return 0; |
4020 | #endif | |
4021 | } |