1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #ifndef __STDC_FORMAT_MACROS
11 #define __STDC_FORMAT_MACROS
21 #include <unordered_set>
24 #include "db/db_impl.h"
25 #include "db/dbformat.h"
26 #include "db/job_context.h"
27 #include "db/version_set.h"
28 #include "db/write_batch_internal.h"
29 #include "env/mock_env.h"
30 #include "memtable/hash_linklist_rep.h"
31 #include "monitoring/statistics.h"
32 #include "monitoring/thread_status_util.h"
33 #include "port/stack_trace.h"
34 #include "rocksdb/cache.h"
35 #include "rocksdb/compaction_filter.h"
36 #include "rocksdb/convenience.h"
37 #include "rocksdb/db.h"
38 #include "rocksdb/env.h"
39 #include "rocksdb/experimental.h"
40 #include "rocksdb/filter_policy.h"
41 #include "rocksdb/options.h"
42 #include "rocksdb/perf_context.h"
43 #include "rocksdb/slice.h"
44 #include "rocksdb/slice_transform.h"
45 #include "rocksdb/table.h"
46 #include "rocksdb/table_properties.h"
47 #include "rocksdb/thread_status.h"
48 #include "rocksdb/utilities/checkpoint.h"
49 #include "rocksdb/utilities/write_batch_with_index.h"
50 #include "table/block_based_table_factory.h"
51 #include "table/mock_table.h"
52 #include "table/plain_table_factory.h"
53 #include "table/scoped_arena_iterator.h"
54 #include "util/compression.h"
55 #include "util/filename.h"
56 #include "util/hash.h"
57 #include "util/logging.h"
58 #include "util/mutexlock.h"
59 #include "util/rate_limiter.h"
60 #include "util/string_util.h"
61 #include "util/sync_point.h"
62 #include "util/testharness.h"
63 #include "util/testutil.h"
64 #include "utilities/merge_operators.h"
66 #if !defined(IOS_CROSS_COMPILE)
70 static std::string
RandomString(Random
* rnd
, int len
, double ratio
) {
72 test::CompressibleString(rnd
, ratio
, len
, &r
);
76 std::string
Key(uint64_t key
, int length
) {
77 const int kBufSize
= 1000;
79 if (length
> kBufSize
) {
82 snprintf(buf
, kBufSize
, "%0*" PRIu64
, length
, key
);
83 return std::string(buf
);
86 class CompactionJobStatsTest
: public testing::Test
,
87 public testing::WithParamInterface
<bool> {
90 std::string alternative_wal_dir_
;
93 std::vector
<ColumnFamilyHandle
*> handles_
;
94 uint32_t max_subcompactions_
;
96 Options last_options_
;
98 CompactionJobStatsTest() : env_(Env::Default()) {
99 env_
->SetBackgroundThreads(1, Env::LOW
);
100 env_
->SetBackgroundThreads(1, Env::HIGH
);
101 dbname_
= test::PerThreadDBPath("compaction_job_stats_test");
102 alternative_wal_dir_
= dbname_
+ "/wal";
104 options
.create_if_missing
= true;
105 max_subcompactions_
= GetParam();
106 options
.max_subcompactions
= max_subcompactions_
;
107 auto delete_options
= options
;
108 delete_options
.wal_dir
= alternative_wal_dir_
;
109 EXPECT_OK(DestroyDB(dbname_
, delete_options
));
110 // Destroy it for not alternative WAL dir is used.
111 EXPECT_OK(DestroyDB(dbname_
, options
));
116 ~CompactionJobStatsTest() override
{
117 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
118 rocksdb::SyncPoint::GetInstance()->LoadDependency({});
119 rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks();
122 options
.db_paths
.emplace_back(dbname_
, 0);
123 options
.db_paths
.emplace_back(dbname_
+ "_2", 0);
124 options
.db_paths
.emplace_back(dbname_
+ "_3", 0);
125 options
.db_paths
.emplace_back(dbname_
+ "_4", 0);
126 EXPECT_OK(DestroyDB(dbname_
, options
));
129 // Required if inheriting from testing::WithParamInterface<>
130 static void SetUpTestCase() {}
131 static void TearDownTestCase() {}
134 return reinterpret_cast<DBImpl
*>(db_
);
137 void CreateColumnFamilies(const std::vector
<std::string
>& cfs
,
138 const Options
& options
) {
139 ColumnFamilyOptions
cf_opts(options
);
140 size_t cfi
= handles_
.size();
141 handles_
.resize(cfi
+ cfs
.size());
142 for (auto cf
: cfs
) {
143 ASSERT_OK(db_
->CreateColumnFamily(cf_opts
, cf
, &handles_
[cfi
++]));
147 void CreateAndReopenWithCF(const std::vector
<std::string
>& cfs
,
148 const Options
& options
) {
149 CreateColumnFamilies(cfs
, options
);
150 std::vector
<std::string
> cfs_plus_default
= cfs
;
151 cfs_plus_default
.insert(cfs_plus_default
.begin(), kDefaultColumnFamilyName
);
152 ReopenWithColumnFamilies(cfs_plus_default
, options
);
155 void ReopenWithColumnFamilies(const std::vector
<std::string
>& cfs
,
156 const std::vector
<Options
>& options
) {
157 ASSERT_OK(TryReopenWithColumnFamilies(cfs
, options
));
160 void ReopenWithColumnFamilies(const std::vector
<std::string
>& cfs
,
161 const Options
& options
) {
162 ASSERT_OK(TryReopenWithColumnFamilies(cfs
, options
));
165 Status
TryReopenWithColumnFamilies(
166 const std::vector
<std::string
>& cfs
,
167 const std::vector
<Options
>& options
) {
169 EXPECT_EQ(cfs
.size(), options
.size());
170 std::vector
<ColumnFamilyDescriptor
> column_families
;
171 for (size_t i
= 0; i
< cfs
.size(); ++i
) {
172 column_families
.push_back(ColumnFamilyDescriptor(cfs
[i
], options
[i
]));
174 DBOptions db_opts
= DBOptions(options
[0]);
175 return DB::Open(db_opts
, dbname_
, column_families
, &handles_
, &db_
);
178 Status
TryReopenWithColumnFamilies(const std::vector
<std::string
>& cfs
,
179 const Options
& options
) {
181 std::vector
<Options
> v_opts(cfs
.size(), options
);
182 return TryReopenWithColumnFamilies(cfs
, v_opts
);
185 void Reopen(const Options
& options
) {
186 ASSERT_OK(TryReopen(options
));
190 for (auto h
: handles_
) {
198 void DestroyAndReopen(const Options
& options
) {
199 // Destroy using last options
200 Destroy(last_options_
);
201 ASSERT_OK(TryReopen(options
));
204 void Destroy(const Options
& options
) {
206 ASSERT_OK(DestroyDB(dbname_
, options
));
209 Status
ReadOnlyReopen(const Options
& options
) {
210 return DB::OpenForReadOnly(options
, dbname_
, &db_
);
213 Status
TryReopen(const Options
& options
) {
215 last_options_
= options
;
216 return DB::Open(options
, dbname_
, &db_
);
219 Status
Flush(int cf
= 0) {
221 return db_
->Flush(FlushOptions());
223 return db_
->Flush(FlushOptions(), handles_
[cf
]);
227 Status
Put(const Slice
& k
, const Slice
& v
, WriteOptions wo
= WriteOptions()) {
228 return db_
->Put(wo
, k
, v
);
231 Status
Put(int cf
, const Slice
& k
, const Slice
& v
,
232 WriteOptions wo
= WriteOptions()) {
233 return db_
->Put(wo
, handles_
[cf
], k
, v
);
236 Status
Delete(const std::string
& k
) {
237 return db_
->Delete(WriteOptions(), k
);
240 Status
Delete(int cf
, const std::string
& k
) {
241 return db_
->Delete(WriteOptions(), handles_
[cf
], k
);
244 std::string
Get(const std::string
& k
, const Snapshot
* snapshot
= nullptr) {
246 options
.verify_checksums
= true;
247 options
.snapshot
= snapshot
;
249 Status s
= db_
->Get(options
, k
, &result
);
250 if (s
.IsNotFound()) {
251 result
= "NOT_FOUND";
252 } else if (!s
.ok()) {
253 result
= s
.ToString();
258 std::string
Get(int cf
, const std::string
& k
,
259 const Snapshot
* snapshot
= nullptr) {
261 options
.verify_checksums
= true;
262 options
.snapshot
= snapshot
;
264 Status s
= db_
->Get(options
, handles_
[cf
], k
, &result
);
265 if (s
.IsNotFound()) {
266 result
= "NOT_FOUND";
267 } else if (!s
.ok()) {
268 result
= s
.ToString();
273 int NumTableFilesAtLevel(int level
, int cf
= 0) {
274 std::string property
;
277 EXPECT_TRUE(db_
->GetProperty(
278 "rocksdb.num-files-at-level" + NumberToString(level
), &property
));
280 EXPECT_TRUE(db_
->GetProperty(
281 handles_
[cf
], "rocksdb.num-files-at-level" + NumberToString(level
),
284 return atoi(property
.c_str());
287 // Return spread of files per level
288 std::string
FilesPerLevel(int cf
= 0) {
290 (cf
== 0) ? db_
->NumberLevels() : db_
->NumberLevels(handles_
[1]);
292 size_t last_non_zero_offset
= 0;
293 for (int level
= 0; level
< num_levels
; level
++) {
294 int f
= NumTableFilesAtLevel(level
, cf
);
296 snprintf(buf
, sizeof(buf
), "%s%d", (level
? "," : ""), f
);
299 last_non_zero_offset
= result
.size();
302 result
.resize(last_non_zero_offset
);
306 uint64_t Size(const Slice
& start
, const Slice
& limit
, int cf
= 0) {
307 Range
r(start
, limit
);
310 db_
->GetApproximateSizes(&r
, 1, &size
);
312 db_
->GetApproximateSizes(handles_
[1], &r
, 1, &size
);
317 void Compact(int cf
, const Slice
& start
, const Slice
& limit
,
318 uint32_t target_path_id
) {
319 CompactRangeOptions compact_options
;
320 compact_options
.target_path_id
= target_path_id
;
321 ASSERT_OK(db_
->CompactRange(compact_options
, handles_
[cf
], &start
, &limit
));
324 void Compact(int cf
, const Slice
& start
, const Slice
& limit
) {
326 db_
->CompactRange(CompactRangeOptions(), handles_
[cf
], &start
, &limit
));
329 void Compact(const Slice
& start
, const Slice
& limit
) {
330 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), &start
, &limit
));
333 void TEST_Compact(int level
, int cf
, const Slice
& start
, const Slice
& limit
) {
334 ASSERT_OK(dbfull()->TEST_CompactRange(level
, &start
, &limit
, handles_
[cf
],
335 true /* disallow trivial move */));
338 // Do n memtable compactions, each of which produces an sstable
339 // covering the range [small,large].
340 void MakeTables(int n
, const std::string
& small
, const std::string
& large
,
342 for (int i
= 0; i
< n
; i
++) {
343 ASSERT_OK(Put(cf
, small
, "begin"));
344 ASSERT_OK(Put(cf
, large
, "end"));
345 ASSERT_OK(Flush(cf
));
349 static void SetDeletionCompactionStats(
350 CompactionJobStats
*stats
, uint64_t input_deletions
,
351 uint64_t expired_deletions
, uint64_t records_replaced
) {
352 stats
->num_input_deletion_records
= input_deletions
;
353 stats
->num_expired_deletion_records
= expired_deletions
;
354 stats
->num_records_replaced
= records_replaced
;
357 void MakeTableWithKeyValues(
358 Random
* rnd
, uint64_t smallest
, uint64_t largest
,
359 int key_size
, int value_size
, uint64_t interval
,
360 double ratio
, int cf
= 0) {
361 for (auto key
= smallest
; key
< largest
; key
+= interval
) {
362 ASSERT_OK(Put(cf
, Slice(Key(key
, key_size
)),
363 Slice(RandomString(rnd
, value_size
, ratio
))));
365 ASSERT_OK(Flush(cf
));
368 // This function behaves with the implicit understanding that two
369 // rounds of keys are inserted into the database, as per the behavior
370 // of the DeletionStatsTest.
371 void SelectivelyDeleteKeys(uint64_t smallest
, uint64_t largest
,
372 uint64_t interval
, int deletion_interval
, int key_size
,
373 uint64_t cutoff_key_num
, CompactionJobStats
* stats
, int cf
= 0) {
375 // interval needs to be >= 2 so that deletion entries can be inserted
376 // that are intended to not result in an actual key deletion by using
377 // an offset of 1 from another existing key
378 ASSERT_GE(interval
, 2);
381 uint32_t deletions_made
= 0;
382 uint32_t num_deleted
= 0;
383 uint32_t num_expired
= 0;
384 for (auto key
= smallest
; key
<= largest
; key
+= interval
, ctr
++) {
385 if (ctr
% deletion_interval
== 0) {
386 ASSERT_OK(Delete(cf
, Key(key
, key_size
)));
390 if (key
> cutoff_key_num
) {
396 // Insert some deletions for keys that don't exist that
397 // are both in and out of the key range
398 ASSERT_OK(Delete(cf
, Key(smallest
+1, key_size
)));
401 ASSERT_OK(Delete(cf
, Key(smallest
-1, key_size
)));
405 ASSERT_OK(Delete(cf
, Key(smallest
-9, key_size
)));
409 ASSERT_OK(Flush(cf
));
410 SetDeletionCompactionStats(stats
, deletions_made
, num_expired
,
415 // An EventListener which helps verify the compaction results in
416 // test CompactionJobStatsTest.
417 class CompactionJobStatsChecker
: public EventListener
{
419 CompactionJobStatsChecker()
420 : compression_enabled_(false), verify_next_comp_io_stats_(false) {}
422 size_t NumberOfUnverifiedStats() { return expected_stats_
.size(); }
424 void set_verify_next_comp_io_stats(bool v
) { verify_next_comp_io_stats_
= v
; }
426 // Once a compaction completed, this function will verify the returned
427 // CompactionJobInfo with the oldest CompactionJobInfo added earlier
428 // in "expected_stats_" which has not yet being used for verification.
429 void OnCompactionCompleted(DB
* /*db*/, const CompactionJobInfo
& ci
) override
{
430 if (verify_next_comp_io_stats_
) {
431 ASSERT_GT(ci
.stats
.file_write_nanos
, 0);
432 ASSERT_GT(ci
.stats
.file_range_sync_nanos
, 0);
433 ASSERT_GT(ci
.stats
.file_fsync_nanos
, 0);
434 ASSERT_GT(ci
.stats
.file_prepare_write_nanos
, 0);
435 verify_next_comp_io_stats_
= false;
438 std::lock_guard
<std::mutex
> lock(mutex_
);
439 if (expected_stats_
.size()) {
440 Verify(ci
.stats
, expected_stats_
.front());
441 expected_stats_
.pop();
445 // A helper function which verifies whether two CompactionJobStats
446 // match. The verification of all compaction stats are done by
447 // ASSERT_EQ except for the total input / output bytes, which we
448 // use ASSERT_GE and ASSERT_LE with a reasonable bias ---
449 // 10% in uncompressed case and 20% when compression is used.
450 virtual void Verify(const CompactionJobStats
& current_stats
,
451 const CompactionJobStats
& stats
) {
453 ASSERT_GT(current_stats
.elapsed_micros
, 0U);
455 ASSERT_EQ(current_stats
.num_input_records
,
456 stats
.num_input_records
);
457 ASSERT_EQ(current_stats
.num_input_files
,
458 stats
.num_input_files
);
459 ASSERT_EQ(current_stats
.num_input_files_at_output_level
,
460 stats
.num_input_files_at_output_level
);
462 ASSERT_EQ(current_stats
.num_output_records
,
463 stats
.num_output_records
);
464 ASSERT_EQ(current_stats
.num_output_files
,
465 stats
.num_output_files
);
467 ASSERT_EQ(current_stats
.is_manual_compaction
,
468 stats
.is_manual_compaction
);
471 double kFileSizeBias
= compression_enabled_
? 0.20 : 0.10;
472 ASSERT_GE(current_stats
.total_input_bytes
* (1.00 + kFileSizeBias
),
473 stats
.total_input_bytes
);
474 ASSERT_LE(current_stats
.total_input_bytes
,
475 stats
.total_input_bytes
* (1.00 + kFileSizeBias
));
476 ASSERT_GE(current_stats
.total_output_bytes
* (1.00 + kFileSizeBias
),
477 stats
.total_output_bytes
);
478 ASSERT_LE(current_stats
.total_output_bytes
,
479 stats
.total_output_bytes
* (1.00 + kFileSizeBias
));
480 ASSERT_EQ(current_stats
.total_input_raw_key_bytes
,
481 stats
.total_input_raw_key_bytes
);
482 ASSERT_EQ(current_stats
.total_input_raw_value_bytes
,
483 stats
.total_input_raw_value_bytes
);
485 ASSERT_EQ(current_stats
.num_records_replaced
,
486 stats
.num_records_replaced
);
488 ASSERT_EQ(current_stats
.num_corrupt_keys
,
489 stats
.num_corrupt_keys
);
492 std::string(current_stats
.smallest_output_key_prefix
),
493 std::string(stats
.smallest_output_key_prefix
));
495 std::string(current_stats
.largest_output_key_prefix
),
496 std::string(stats
.largest_output_key_prefix
));
499 // Add an expected compaction stats, which will be used to
500 // verify the CompactionJobStats returned by the OnCompactionCompleted()
502 void AddExpectedStats(const CompactionJobStats
& stats
) {
503 std::lock_guard
<std::mutex
> lock(mutex_
);
504 expected_stats_
.push(stats
);
507 void EnableCompression(bool flag
) {
508 compression_enabled_
= flag
;
511 bool verify_next_comp_io_stats() const { return verify_next_comp_io_stats_
; }
515 std::queue
<CompactionJobStats
> expected_stats_
;
516 bool compression_enabled_
;
517 bool verify_next_comp_io_stats_
;
520 // An EventListener which helps verify the compaction statistics in
521 // the test DeletionStatsTest.
522 class CompactionJobDeletionStatsChecker
: public CompactionJobStatsChecker
{
524 // Verifies whether two CompactionJobStats match.
525 void Verify(const CompactionJobStats
& current_stats
,
526 const CompactionJobStats
& stats
) override
{
528 current_stats
.num_input_deletion_records
,
529 stats
.num_input_deletion_records
);
531 current_stats
.num_expired_deletion_records
,
532 stats
.num_expired_deletion_records
);
534 current_stats
.num_records_replaced
,
535 stats
.num_records_replaced
);
537 ASSERT_EQ(current_stats
.num_corrupt_keys
,
538 stats
.num_corrupt_keys
);
544 uint64_t EstimatedFileSize(
545 uint64_t num_records
, size_t key_size
, size_t value_size
,
546 double compression_ratio
= 1.0,
547 size_t block_size
= 4096,
548 int bloom_bits_per_key
= 10) {
549 const size_t kPerKeyOverhead
= 8;
550 const size_t kFooterSize
= 512;
553 static_cast<uint64_t>(
554 num_records
* (key_size
+ value_size
* compression_ratio
+
557 return data_size
+ kFooterSize
558 + num_records
* bloom_bits_per_key
/ 8 // filter block
559 + data_size
* (key_size
+ 8) / block_size
; // index block
565 const Slice
& src
, size_t prefix_length
, std::string
* dst
) {
566 assert(prefix_length
> 0);
567 size_t length
= src
.size() > prefix_length
? prefix_length
: src
.size();
568 dst
->assign(src
.data(), length
);
573 CompactionJobStats
NewManualCompactionJobStats(
574 const std::string
& smallest_key
, const std::string
& largest_key
,
575 size_t num_input_files
, size_t num_input_files_at_output_level
,
576 uint64_t num_input_records
, size_t key_size
, size_t value_size
,
577 size_t num_output_files
, uint64_t num_output_records
,
578 double compression_ratio
, uint64_t num_records_replaced
,
579 bool is_manual
= true) {
580 CompactionJobStats stats
;
583 stats
.num_input_records
= num_input_records
;
584 stats
.num_input_files
= num_input_files
;
585 stats
.num_input_files_at_output_level
= num_input_files_at_output_level
;
587 stats
.num_output_records
= num_output_records
;
588 stats
.num_output_files
= num_output_files
;
590 stats
.total_input_bytes
=
592 num_input_records
/ num_input_files
,
593 key_size
, value_size
, compression_ratio
) * num_input_files
;
594 stats
.total_output_bytes
=
596 num_output_records
/ num_output_files
,
597 key_size
, value_size
, compression_ratio
) * num_output_files
;
598 stats
.total_input_raw_key_bytes
=
599 num_input_records
* (key_size
+ 8);
600 stats
.total_input_raw_value_bytes
=
601 num_input_records
* value_size
;
603 stats
.is_manual_compaction
= is_manual
;
605 stats
.num_records_replaced
= num_records_replaced
;
607 CopyPrefix(smallest_key
,
608 CompactionJobStats::kMaxPrefixLength
,
609 &stats
.smallest_output_key_prefix
);
610 CopyPrefix(largest_key
,
611 CompactionJobStats::kMaxPrefixLength
,
612 &stats
.largest_output_key_prefix
);
617 CompressionType
GetAnyCompression() {
618 if (Snappy_Supported()) {
619 return kSnappyCompression
;
620 } else if (Zlib_Supported()) {
621 return kZlibCompression
;
622 } else if (BZip2_Supported()) {
623 return kBZip2Compression
;
624 } else if (LZ4_Supported()) {
625 return kLZ4Compression
;
626 } else if (XPRESS_Supported()) {
627 return kXpressCompression
;
630 return kNoCompression
;
635 TEST_P(CompactionJobStatsTest
, CompactionJobStatsTest
) {
637 const int kBufSize
= 100;
639 uint64_t key_base
= 100000000l;
640 // Note: key_base must be multiple of num_keys_per_L0_file
641 int num_keys_per_L0_file
= 100;
642 const int kTestScale
= 8;
643 const int kKeySize
= 10;
644 const int kValueSize
= 1000;
645 const double kCompressionRatio
= 0.5;
646 double compression_ratio
= 1.0;
647 uint64_t key_interval
= key_base
/ num_keys_per_L0_file
;
649 // Whenever a compaction completes, this listener will try to
650 // verify whether the returned CompactionJobStats matches
651 // what we expect. The expected CompactionJobStats is added
652 // via AddExpectedStats().
653 auto* stats_checker
= new CompactionJobStatsChecker();
655 options
.listeners
.emplace_back(stats_checker
);
656 options
.create_if_missing
= true;
657 // just enough setting to hold off auto-compaction.
658 options
.level0_file_num_compaction_trigger
= kTestScale
+ 1;
659 options
.num_levels
= 3;
660 options
.compression
= kNoCompression
;
661 options
.max_subcompactions
= max_subcompactions_
;
662 options
.bytes_per_sync
= 512 * 1024;
664 options
.report_bg_io_stats
= true;
665 for (int test
= 0; test
< 2; ++test
) {
666 DestroyAndReopen(options
);
667 CreateAndReopenWithCF({"pikachu"}, options
);
669 // 1st Phase: generate "num_L0_files" L0 files.
670 int num_L0_files
= 0;
671 for (uint64_t start_key
= key_base
;
672 start_key
<= key_base
* kTestScale
;
673 start_key
+= key_base
) {
674 MakeTableWithKeyValues(
675 &rnd
, start_key
, start_key
+ key_base
- 1,
676 kKeySize
, kValueSize
, key_interval
,
677 compression_ratio
, 1);
678 snprintf(buf
, kBufSize
, "%d", ++num_L0_files
);
679 ASSERT_EQ(std::string(buf
), FilesPerLevel(1));
681 ASSERT_EQ(ToString(num_L0_files
), FilesPerLevel(1));
683 // 2nd Phase: perform L0 -> L1 compaction.
684 int L0_compaction_count
= 6;
686 std::string smallest_key
;
687 std::string largest_key
;
688 for (uint64_t start_key
= key_base
;
689 start_key
<= key_base
* L0_compaction_count
;
690 start_key
+= key_base
, count
++) {
691 smallest_key
= Key(start_key
, 10);
692 largest_key
= Key(start_key
+ key_base
- key_interval
, 10);
693 stats_checker
->AddExpectedStats(
694 NewManualCompactionJobStats(
695 smallest_key
, largest_key
,
696 1, 0, num_keys_per_L0_file
,
697 kKeySize
, kValueSize
,
698 1, num_keys_per_L0_file
,
699 compression_ratio
, 0));
700 ASSERT_EQ(stats_checker
->NumberOfUnverifiedStats(), 1U);
701 TEST_Compact(0, 1, smallest_key
, largest_key
);
702 snprintf(buf
, kBufSize
, "%d,%d", num_L0_files
- count
, count
);
703 ASSERT_EQ(std::string(buf
), FilesPerLevel(1));
706 // compact two files into one in the last L0 -> L1 compaction
707 int num_remaining_L0
= num_L0_files
- L0_compaction_count
;
708 smallest_key
= Key(key_base
* (L0_compaction_count
+ 1), 10);
709 largest_key
= Key(key_base
* (kTestScale
+ 1) - key_interval
, 10);
710 stats_checker
->AddExpectedStats(
711 NewManualCompactionJobStats(
712 smallest_key
, largest_key
,
714 0, num_keys_per_L0_file
* num_remaining_L0
,
715 kKeySize
, kValueSize
,
716 1, num_keys_per_L0_file
* num_remaining_L0
,
717 compression_ratio
, 0));
718 ASSERT_EQ(stats_checker
->NumberOfUnverifiedStats(), 1U);
719 TEST_Compact(0, 1, smallest_key
, largest_key
);
721 int num_L1_files
= num_L0_files
- num_remaining_L0
+ 1;
723 snprintf(buf
, kBufSize
, "%d,%d", num_L0_files
, num_L1_files
);
724 ASSERT_EQ(std::string(buf
), FilesPerLevel(1));
726 // 3rd Phase: generate sparse L0 files (wider key-range, same num of keys)
728 for (uint64_t start_key
= key_base
;
729 start_key
<= key_base
* kTestScale
;
730 start_key
+= key_base
* sparseness
) {
731 MakeTableWithKeyValues(
732 &rnd
, start_key
, start_key
+ key_base
* sparseness
- 1,
733 kKeySize
, kValueSize
,
734 key_base
* sparseness
/ num_keys_per_L0_file
,
735 compression_ratio
, 1);
736 snprintf(buf
, kBufSize
, "%d,%d", ++num_L0_files
, num_L1_files
);
737 ASSERT_EQ(std::string(buf
), FilesPerLevel(1));
740 // 4th Phase: perform L0 -> L1 compaction again, expect higher write amp
741 // When subcompactions are enabled, the number of output files increases
742 // by 1 because multiple threads are consuming the input and generating
743 // output files without coordinating to see if the output could fit into
744 // a smaller number of files like it does when it runs sequentially
745 int num_output_files
= options
.max_subcompactions
> 1 ? 2 : 1;
746 for (uint64_t start_key
= key_base
;
748 start_key
+= key_base
* sparseness
) {
749 smallest_key
= Key(start_key
, 10);
751 Key(start_key
+ key_base
* sparseness
- key_interval
, 10);
752 stats_checker
->AddExpectedStats(
753 NewManualCompactionJobStats(
754 smallest_key
, largest_key
,
755 3, 2, num_keys_per_L0_file
* 3,
756 kKeySize
, kValueSize
,
758 num_keys_per_L0_file
* 2, // 1/3 of the data will be updated.
760 num_keys_per_L0_file
));
761 ASSERT_EQ(stats_checker
->NumberOfUnverifiedStats(), 1U);
762 Compact(1, smallest_key
, largest_key
);
763 if (options
.max_subcompactions
== 1) {
766 snprintf(buf
, kBufSize
, "%d,%d", --num_L0_files
, num_L1_files
);
767 ASSERT_EQ(std::string(buf
), FilesPerLevel(1));
770 // 5th Phase: Do a full compaction, which involves in two sub-compactions.
771 // Here we expect to have 1 L0 files and 4 L1 files
772 // In the first sub-compaction, we expect L0 compaction.
773 smallest_key
= Key(key_base
, 10);
774 largest_key
= Key(key_base
* (kTestScale
+ 1) - key_interval
, 10);
775 stats_checker
->AddExpectedStats(
776 NewManualCompactionJobStats(
777 Key(key_base
* (kTestScale
+ 1 - sparseness
), 10), largest_key
,
778 2, 1, num_keys_per_L0_file
* 3,
779 kKeySize
, kValueSize
,
780 1, num_keys_per_L0_file
* 2,
782 num_keys_per_L0_file
));
783 ASSERT_EQ(stats_checker
->NumberOfUnverifiedStats(), 1U);
784 Compact(1, smallest_key
, largest_key
);
786 num_L1_files
= options
.max_subcompactions
> 1 ? 7 : 4;
788 snprintf(L1_buf
, sizeof(L1_buf
), "0,%d", num_L1_files
);
789 std::string
L1_files(L1_buf
);
790 ASSERT_EQ(L1_files
, FilesPerLevel(1));
791 options
.compression
= GetAnyCompression();
792 if (options
.compression
== kNoCompression
) {
795 stats_checker
->EnableCompression(true);
796 compression_ratio
= kCompressionRatio
;
798 for (int i
= 0; i
< 5; i
++) {
799 ASSERT_OK(Put(1, Slice(Key(key_base
+ i
, 10)),
800 Slice(RandomString(&rnd
, 512 * 1024, 1))));
804 reinterpret_cast<DBImpl
*>(db_
)->TEST_WaitForCompact();
806 stats_checker
->set_verify_next_comp_io_stats(true);
807 std::atomic
<bool> first_prepare_write(true);
808 rocksdb::SyncPoint::GetInstance()->SetCallBack(
809 "WritableFileWriter::Append:BeforePrepareWrite", [&](void* /*arg*/) {
810 if (first_prepare_write
.load()) {
811 options
.env
->SleepForMicroseconds(3);
812 first_prepare_write
.store(false);
816 std::atomic
<bool> first_flush(true);
817 rocksdb::SyncPoint::GetInstance()->SetCallBack(
818 "WritableFileWriter::Flush:BeforeAppend", [&](void* /*arg*/) {
819 if (first_flush
.load()) {
820 options
.env
->SleepForMicroseconds(3);
821 first_flush
.store(false);
825 std::atomic
<bool> first_sync(true);
826 rocksdb::SyncPoint::GetInstance()->SetCallBack(
827 "WritableFileWriter::SyncInternal:0", [&](void* /*arg*/) {
828 if (first_sync
.load()) {
829 options
.env
->SleepForMicroseconds(3);
830 first_sync
.store(false);
834 std::atomic
<bool> first_range_sync(true);
835 rocksdb::SyncPoint::GetInstance()->SetCallBack(
836 "WritableFileWriter::RangeSync:0", [&](void* /*arg*/) {
837 if (first_range_sync
.load()) {
838 options
.env
->SleepForMicroseconds(3);
839 first_range_sync
.store(false);
842 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
844 Compact(1, smallest_key
, largest_key
);
846 ASSERT_TRUE(!stats_checker
->verify_next_comp_io_stats());
847 ASSERT_TRUE(!first_prepare_write
.load());
848 ASSERT_TRUE(!first_flush
.load());
849 ASSERT_TRUE(!first_sync
.load());
850 ASSERT_TRUE(!first_range_sync
.load());
851 rocksdb::SyncPoint::GetInstance()->DisableProcessing();
853 ASSERT_EQ(stats_checker
->NumberOfUnverifiedStats(), 0U);
856 TEST_P(CompactionJobStatsTest
, DeletionStatsTest
) {
858 uint64_t key_base
= 100000l;
859 // Note: key_base must be multiple of num_keys_per_L0_file
860 int num_keys_per_L0_file
= 20;
861 const int kTestScale
= 8; // make sure this is even
862 const int kKeySize
= 10;
863 const int kValueSize
= 100;
864 double compression_ratio
= 1.0;
865 uint64_t key_interval
= key_base
/ num_keys_per_L0_file
;
866 uint64_t largest_key_num
= key_base
* (kTestScale
+ 1) - key_interval
;
867 uint64_t cutoff_key_num
= key_base
* (kTestScale
/ 2 + 1) - key_interval
;
868 const std::string smallest_key
= Key(key_base
- 10, kKeySize
);
869 const std::string largest_key
= Key(largest_key_num
+ 10, kKeySize
);
871 // Whenever a compaction completes, this listener will try to
872 // verify whether the returned CompactionJobStats matches
874 auto* stats_checker
= new CompactionJobDeletionStatsChecker();
876 options
.listeners
.emplace_back(stats_checker
);
877 options
.create_if_missing
= true;
878 options
.level0_file_num_compaction_trigger
= kTestScale
+1;
879 options
.num_levels
= 3;
880 options
.compression
= kNoCompression
;
881 options
.max_bytes_for_level_multiplier
= 2;
882 options
.max_subcompactions
= max_subcompactions_
;
884 DestroyAndReopen(options
);
885 CreateAndReopenWithCF({"pikachu"}, options
);
887 // Stage 1: Generate several L0 files and then send them to L2 by
888 // using CompactRangeOptions and CompactRange(). These files will
889 // have a strict subset of the keys from the full key-range
890 for (uint64_t start_key
= key_base
;
891 start_key
<= key_base
* kTestScale
/ 2;
892 start_key
+= key_base
) {
893 MakeTableWithKeyValues(
894 &rnd
, start_key
, start_key
+ key_base
- 1,
895 kKeySize
, kValueSize
, key_interval
,
896 compression_ratio
, 1);
899 CompactRangeOptions cr_options
;
900 cr_options
.change_level
= true;
901 cr_options
.target_level
= 2;
902 db_
->CompactRange(cr_options
, handles_
[1], nullptr, nullptr);
903 ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
905 // Stage 2: Generate files including keys from the entire key range
906 for (uint64_t start_key
= key_base
;
907 start_key
<= key_base
* kTestScale
;
908 start_key
+= key_base
) {
909 MakeTableWithKeyValues(
910 &rnd
, start_key
, start_key
+ key_base
- 1,
911 kKeySize
, kValueSize
, key_interval
,
912 compression_ratio
, 1);
915 // Send these L0 files to L1
916 TEST_Compact(0, 1, smallest_key
, largest_key
);
917 ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
919 // Add a new record and flush so now there is a L0 file
920 // with a value too (not just deletions from the next step)
921 ASSERT_OK(Put(1, Key(key_base
-6, kKeySize
), "test"));
924 // Stage 3: Generate L0 files with some deletions so now
925 // there are files with the same key range in L0, L1, and L2
926 int deletion_interval
= 3;
927 CompactionJobStats first_compaction_stats
;
928 SelectivelyDeleteKeys(key_base
, largest_key_num
,
929 key_interval
, deletion_interval
, kKeySize
, cutoff_key_num
,
930 &first_compaction_stats
, 1);
932 stats_checker
->AddExpectedStats(first_compaction_stats
);
934 // Stage 4: Trigger compaction and verify the stats
935 TEST_Compact(0, 1, smallest_key
, largest_key
);
939 int GetUniversalCompactionInputUnits(uint32_t num_flushes
) {
940 uint32_t compaction_input_units
;
941 for (compaction_input_units
= 1;
942 num_flushes
>= compaction_input_units
;
943 compaction_input_units
*= 2) {
944 if ((num_flushes
& compaction_input_units
) != 0) {
945 return compaction_input_units
> 1 ? compaction_input_units
: 0;
952 TEST_P(CompactionJobStatsTest
, UniversalCompactionTest
) {
954 uint64_t key_base
= 100000000l;
955 // Note: key_base must be multiple of num_keys_per_L0_file
956 int num_keys_per_table
= 100;
957 const uint32_t kTestScale
= 6;
958 const int kKeySize
= 10;
959 const int kValueSize
= 900;
960 double compression_ratio
= 1.0;
961 uint64_t key_interval
= key_base
/ num_keys_per_table
;
963 auto* stats_checker
= new CompactionJobStatsChecker();
965 options
.listeners
.emplace_back(stats_checker
);
966 options
.create_if_missing
= true;
967 options
.num_levels
= 3;
968 options
.compression
= kNoCompression
;
969 options
.level0_file_num_compaction_trigger
= 2;
970 options
.target_file_size_base
= num_keys_per_table
* 1000;
971 options
.compaction_style
= kCompactionStyleUniversal
;
972 options
.compaction_options_universal
.size_ratio
= 1;
973 options
.compaction_options_universal
.max_size_amplification_percent
= 1000;
974 options
.max_subcompactions
= max_subcompactions_
;
976 DestroyAndReopen(options
);
977 CreateAndReopenWithCF({"pikachu"}, options
);
979 // Generates the expected CompactionJobStats for each compaction
980 for (uint32_t num_flushes
= 2; num_flushes
<= kTestScale
; num_flushes
++) {
981 // Here we treat one newly flushed file as an unit.
983 // For example, if a newly flushed file is 100k, and a compaction has
984 // 4 input units, then this compaction inputs 400k.
985 uint32_t num_input_units
= GetUniversalCompactionInputUnits(num_flushes
);
986 if (num_input_units
== 0) {
989 // The following statement determines the expected smallest key
990 // based on whether it is a full compaction. A full compaction only
991 // happens when the number of flushes equals to the number of compaction
993 uint64_t smallest_key
=
994 (num_flushes
== num_input_units
) ?
995 key_base
: key_base
* (num_flushes
- 1);
997 stats_checker
->AddExpectedStats(
998 NewManualCompactionJobStats(
999 Key(smallest_key
, 10),
1000 Key(smallest_key
+ key_base
* num_input_units
- key_interval
, 10),
1002 num_input_units
> 2 ? num_input_units
/ 2 : 0,
1003 num_keys_per_table
* num_input_units
,
1004 kKeySize
, kValueSize
,
1006 num_keys_per_table
* num_input_units
,
1008 dbfull()->TEST_WaitForCompact();
1010 ASSERT_EQ(stats_checker
->NumberOfUnverifiedStats(), 3U);
1012 for (uint64_t start_key
= key_base
;
1013 start_key
<= key_base
* kTestScale
;
1014 start_key
+= key_base
) {
1015 MakeTableWithKeyValues(
1016 &rnd
, start_key
, start_key
+ key_base
- 1,
1017 kKeySize
, kValueSize
, key_interval
,
1018 compression_ratio
, 1);
1019 reinterpret_cast<DBImpl
*>(db_
)->TEST_WaitForCompact();
1021 ASSERT_EQ(stats_checker
->NumberOfUnverifiedStats(), 0U);
1024 INSTANTIATE_TEST_CASE_P(CompactionJobStatsTest
, CompactionJobStatsTest
,
1025 ::testing::Values(1, 4));
1026 } // namespace rocksdb
1028 int main(int argc
, char** argv
) {
1029 rocksdb::port::InstallStackTraceHandler();
1030 ::testing::InitGoogleTest(&argc
, argv
);
1031 return RUN_ALL_TESTS();
1037 int main(int /*argc*/, char** /*argv*/) {
1038 fprintf(stderr
, "SKIPPED, not supported in ROCKSDB_LITE\n");
1042 #endif // !ROCKSDB_LITE
1046 int main(int /*argc*/, char** /*argv*/) { return 0; }
1047 #endif // !defined(IOS_CROSS_COMPILE)