]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/compaction/compaction_job_stats_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_job_stats_test.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
7c673cae 10#include <algorithm>
f67539c2 11#include <cinttypes>
7c673cae
FG
12#include <iostream>
13#include <mutex>
14#include <queue>
15#include <set>
16#include <thread>
17#include <unordered_set>
18#include <utility>
19
f67539c2 20#include "db/db_impl/db_impl.h"
7c673cae
FG
21#include "db/dbformat.h"
22#include "db/job_context.h"
23#include "db/version_set.h"
24#include "db/write_batch_internal.h"
25#include "env/mock_env.h"
f67539c2 26#include "file/filename.h"
7c673cae
FG
27#include "memtable/hash_linklist_rep.h"
28#include "monitoring/statistics.h"
29#include "monitoring/thread_status_util.h"
30#include "port/stack_trace.h"
31#include "rocksdb/cache.h"
32#include "rocksdb/compaction_filter.h"
33#include "rocksdb/convenience.h"
34#include "rocksdb/db.h"
35#include "rocksdb/env.h"
36#include "rocksdb/experimental.h"
37#include "rocksdb/filter_policy.h"
38#include "rocksdb/options.h"
39#include "rocksdb/perf_context.h"
40#include "rocksdb/slice.h"
41#include "rocksdb/slice_transform.h"
42#include "rocksdb/table.h"
43#include "rocksdb/table_properties.h"
44#include "rocksdb/thread_status.h"
45#include "rocksdb/utilities/checkpoint.h"
46#include "rocksdb/utilities/write_batch_with_index.h"
f67539c2 47#include "table/block_based/block_based_table_factory.h"
7c673cae 48#include "table/mock_table.h"
f67539c2 49#include "table/plain/plain_table_factory.h"
7c673cae 50#include "table/scoped_arena_iterator.h"
f67539c2
TL
51#include "test_util/sync_point.h"
52#include "test_util/testharness.h"
53#include "test_util/testutil.h"
20effc67 54#include "util/cast_util.h"
7c673cae 55#include "util/compression.h"
7c673cae 56#include "util/hash.h"
7c673cae
FG
57#include "util/mutexlock.h"
58#include "util/rate_limiter.h"
59#include "util/string_util.h"
7c673cae
FG
60#include "utilities/merge_operators.h"
61
62#if !defined(IOS_CROSS_COMPILE)
63#ifndef ROCKSDB_LITE
f67539c2 64namespace ROCKSDB_NAMESPACE {
7c673cae
FG
65
66static std::string RandomString(Random* rnd, int len, double ratio) {
67 std::string r;
68 test::CompressibleString(rnd, ratio, len, &r);
69 return r;
70}
71
72std::string Key(uint64_t key, int length) {
73 const int kBufSize = 1000;
74 char buf[kBufSize];
75 if (length > kBufSize) {
76 length = kBufSize;
77 }
78 snprintf(buf, kBufSize, "%0*" PRIu64, length, key);
79 return std::string(buf);
80}
81
82class CompactionJobStatsTest : public testing::Test,
83 public testing::WithParamInterface<bool> {
84 public:
85 std::string dbname_;
86 std::string alternative_wal_dir_;
87 Env* env_;
88 DB* db_;
89 std::vector<ColumnFamilyHandle*> handles_;
90 uint32_t max_subcompactions_;
91
92 Options last_options_;
93
94 CompactionJobStatsTest() : env_(Env::Default()) {
95 env_->SetBackgroundThreads(1, Env::LOW);
96 env_->SetBackgroundThreads(1, Env::HIGH);
11fdf7f2 97 dbname_ = test::PerThreadDBPath("compaction_job_stats_test");
7c673cae
FG
98 alternative_wal_dir_ = dbname_ + "/wal";
99 Options options;
100 options.create_if_missing = true;
101 max_subcompactions_ = GetParam();
102 options.max_subcompactions = max_subcompactions_;
103 auto delete_options = options;
104 delete_options.wal_dir = alternative_wal_dir_;
105 EXPECT_OK(DestroyDB(dbname_, delete_options));
106 // Destroy it for not alternative WAL dir is used.
107 EXPECT_OK(DestroyDB(dbname_, options));
108 db_ = nullptr;
109 Reopen(options);
110 }
111
494da23a 112 ~CompactionJobStatsTest() override {
f67539c2
TL
113 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
114 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({});
115 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
7c673cae
FG
116 Close();
117 Options options;
118 options.db_paths.emplace_back(dbname_, 0);
119 options.db_paths.emplace_back(dbname_ + "_2", 0);
120 options.db_paths.emplace_back(dbname_ + "_3", 0);
121 options.db_paths.emplace_back(dbname_ + "_4", 0);
122 EXPECT_OK(DestroyDB(dbname_, options));
123 }
124
125 // Required if inheriting from testing::WithParamInterface<>
126 static void SetUpTestCase() {}
127 static void TearDownTestCase() {}
128
20effc67 129 DBImpl* dbfull() { return static_cast_with_check<DBImpl>(db_); }
7c673cae
FG
130
131 void CreateColumnFamilies(const std::vector<std::string>& cfs,
132 const Options& options) {
133 ColumnFamilyOptions cf_opts(options);
134 size_t cfi = handles_.size();
135 handles_.resize(cfi + cfs.size());
136 for (auto cf : cfs) {
137 ASSERT_OK(db_->CreateColumnFamily(cf_opts, cf, &handles_[cfi++]));
138 }
139 }
140
141 void CreateAndReopenWithCF(const std::vector<std::string>& cfs,
142 const Options& options) {
143 CreateColumnFamilies(cfs, options);
144 std::vector<std::string> cfs_plus_default = cfs;
145 cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
146 ReopenWithColumnFamilies(cfs_plus_default, options);
147 }
148
149 void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
150 const std::vector<Options>& options) {
151 ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
152 }
153
154 void ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
155 const Options& options) {
156 ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
157 }
158
159 Status TryReopenWithColumnFamilies(
160 const std::vector<std::string>& cfs,
161 const std::vector<Options>& options) {
162 Close();
163 EXPECT_EQ(cfs.size(), options.size());
164 std::vector<ColumnFamilyDescriptor> column_families;
165 for (size_t i = 0; i < cfs.size(); ++i) {
166 column_families.push_back(ColumnFamilyDescriptor(cfs[i], options[i]));
167 }
168 DBOptions db_opts = DBOptions(options[0]);
169 return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
170 }
171
172 Status TryReopenWithColumnFamilies(const std::vector<std::string>& cfs,
173 const Options& options) {
174 Close();
175 std::vector<Options> v_opts(cfs.size(), options);
176 return TryReopenWithColumnFamilies(cfs, v_opts);
177 }
178
179 void Reopen(const Options& options) {
180 ASSERT_OK(TryReopen(options));
181 }
182
183 void Close() {
184 for (auto h : handles_) {
185 delete h;
186 }
187 handles_.clear();
188 delete db_;
189 db_ = nullptr;
190 }
191
192 void DestroyAndReopen(const Options& options) {
193 // Destroy using last options
194 Destroy(last_options_);
195 ASSERT_OK(TryReopen(options));
196 }
197
198 void Destroy(const Options& options) {
199 Close();
200 ASSERT_OK(DestroyDB(dbname_, options));
201 }
202
203 Status ReadOnlyReopen(const Options& options) {
204 return DB::OpenForReadOnly(options, dbname_, &db_);
205 }
206
207 Status TryReopen(const Options& options) {
208 Close();
209 last_options_ = options;
210 return DB::Open(options, dbname_, &db_);
211 }
212
213 Status Flush(int cf = 0) {
214 if (cf == 0) {
215 return db_->Flush(FlushOptions());
216 } else {
217 return db_->Flush(FlushOptions(), handles_[cf]);
218 }
219 }
220
221 Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions()) {
222 return db_->Put(wo, k, v);
223 }
224
225 Status Put(int cf, const Slice& k, const Slice& v,
226 WriteOptions wo = WriteOptions()) {
227 return db_->Put(wo, handles_[cf], k, v);
228 }
229
230 Status Delete(const std::string& k) {
231 return db_->Delete(WriteOptions(), k);
232 }
233
234 Status Delete(int cf, const std::string& k) {
235 return db_->Delete(WriteOptions(), handles_[cf], k);
236 }
237
238 std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
239 ReadOptions options;
240 options.verify_checksums = true;
241 options.snapshot = snapshot;
242 std::string result;
243 Status s = db_->Get(options, k, &result);
244 if (s.IsNotFound()) {
245 result = "NOT_FOUND";
246 } else if (!s.ok()) {
247 result = s.ToString();
248 }
249 return result;
250 }
251
252 std::string Get(int cf, const std::string& k,
253 const Snapshot* snapshot = nullptr) {
254 ReadOptions options;
255 options.verify_checksums = true;
256 options.snapshot = snapshot;
257 std::string result;
258 Status s = db_->Get(options, handles_[cf], k, &result);
259 if (s.IsNotFound()) {
260 result = "NOT_FOUND";
261 } else if (!s.ok()) {
262 result = s.ToString();
263 }
264 return result;
265 }
266
267 int NumTableFilesAtLevel(int level, int cf = 0) {
268 std::string property;
269 if (cf == 0) {
270 // default cfd
271 EXPECT_TRUE(db_->GetProperty(
272 "rocksdb.num-files-at-level" + NumberToString(level), &property));
273 } else {
274 EXPECT_TRUE(db_->GetProperty(
275 handles_[cf], "rocksdb.num-files-at-level" + NumberToString(level),
276 &property));
277 }
278 return atoi(property.c_str());
279 }
280
281 // Return spread of files per level
282 std::string FilesPerLevel(int cf = 0) {
283 int num_levels =
284 (cf == 0) ? db_->NumberLevels() : db_->NumberLevels(handles_[1]);
285 std::string result;
286 size_t last_non_zero_offset = 0;
287 for (int level = 0; level < num_levels; level++) {
288 int f = NumTableFilesAtLevel(level, cf);
289 char buf[100];
290 snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
291 result += buf;
292 if (f > 0) {
293 last_non_zero_offset = result.size();
294 }
295 }
296 result.resize(last_non_zero_offset);
297 return result;
298 }
299
300 uint64_t Size(const Slice& start, const Slice& limit, int cf = 0) {
301 Range r(start, limit);
302 uint64_t size;
303 if (cf == 0) {
304 db_->GetApproximateSizes(&r, 1, &size);
305 } else {
306 db_->GetApproximateSizes(handles_[1], &r, 1, &size);
307 }
308 return size;
309 }
310
311 void Compact(int cf, const Slice& start, const Slice& limit,
312 uint32_t target_path_id) {
313 CompactRangeOptions compact_options;
314 compact_options.target_path_id = target_path_id;
315 ASSERT_OK(db_->CompactRange(compact_options, handles_[cf], &start, &limit));
316 }
317
318 void Compact(int cf, const Slice& start, const Slice& limit) {
319 ASSERT_OK(
320 db_->CompactRange(CompactRangeOptions(), handles_[cf], &start, &limit));
321 }
322
323 void Compact(const Slice& start, const Slice& limit) {
324 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &start, &limit));
325 }
326
327 void TEST_Compact(int level, int cf, const Slice& start, const Slice& limit) {
328 ASSERT_OK(dbfull()->TEST_CompactRange(level, &start, &limit, handles_[cf],
329 true /* disallow trivial move */));
330 }
331
332 // Do n memtable compactions, each of which produces an sstable
333 // covering the range [small,large].
334 void MakeTables(int n, const std::string& small, const std::string& large,
335 int cf = 0) {
336 for (int i = 0; i < n; i++) {
337 ASSERT_OK(Put(cf, small, "begin"));
338 ASSERT_OK(Put(cf, large, "end"));
339 ASSERT_OK(Flush(cf));
340 }
341 }
342
343 static void SetDeletionCompactionStats(
344 CompactionJobStats *stats, uint64_t input_deletions,
345 uint64_t expired_deletions, uint64_t records_replaced) {
346 stats->num_input_deletion_records = input_deletions;
347 stats->num_expired_deletion_records = expired_deletions;
348 stats->num_records_replaced = records_replaced;
349 }
350
351 void MakeTableWithKeyValues(
352 Random* rnd, uint64_t smallest, uint64_t largest,
353 int key_size, int value_size, uint64_t interval,
354 double ratio, int cf = 0) {
355 for (auto key = smallest; key < largest; key += interval) {
356 ASSERT_OK(Put(cf, Slice(Key(key, key_size)),
357 Slice(RandomString(rnd, value_size, ratio))));
358 }
359 ASSERT_OK(Flush(cf));
360 }
361
362 // This function behaves with the implicit understanding that two
363 // rounds of keys are inserted into the database, as per the behavior
364 // of the DeletionStatsTest.
365 void SelectivelyDeleteKeys(uint64_t smallest, uint64_t largest,
366 uint64_t interval, int deletion_interval, int key_size,
367 uint64_t cutoff_key_num, CompactionJobStats* stats, int cf = 0) {
368
369 // interval needs to be >= 2 so that deletion entries can be inserted
370 // that are intended to not result in an actual key deletion by using
371 // an offset of 1 from another existing key
372 ASSERT_GE(interval, 2);
373
374 uint64_t ctr = 1;
375 uint32_t deletions_made = 0;
376 uint32_t num_deleted = 0;
377 uint32_t num_expired = 0;
378 for (auto key = smallest; key <= largest; key += interval, ctr++) {
379 if (ctr % deletion_interval == 0) {
380 ASSERT_OK(Delete(cf, Key(key, key_size)));
381 deletions_made++;
382 num_deleted++;
383
384 if (key > cutoff_key_num) {
385 num_expired++;
386 }
387 }
388 }
389
390 // Insert some deletions for keys that don't exist that
391 // are both in and out of the key range
392 ASSERT_OK(Delete(cf, Key(smallest+1, key_size)));
393 deletions_made++;
394
395 ASSERT_OK(Delete(cf, Key(smallest-1, key_size)));
396 deletions_made++;
397 num_expired++;
398
399 ASSERT_OK(Delete(cf, Key(smallest-9, key_size)));
400 deletions_made++;
401 num_expired++;
402
403 ASSERT_OK(Flush(cf));
404 SetDeletionCompactionStats(stats, deletions_made, num_expired,
405 num_deleted);
406 }
407};
408
409// An EventListener which helps verify the compaction results in
410// test CompactionJobStatsTest.
411class CompactionJobStatsChecker : public EventListener {
412 public:
413 CompactionJobStatsChecker()
414 : compression_enabled_(false), verify_next_comp_io_stats_(false) {}
415
416 size_t NumberOfUnverifiedStats() { return expected_stats_.size(); }
417
418 void set_verify_next_comp_io_stats(bool v) { verify_next_comp_io_stats_ = v; }
419
420 // Once a compaction completed, this function will verify the returned
421 // CompactionJobInfo with the oldest CompactionJobInfo added earlier
422 // in "expected_stats_" which has not yet being used for verification.
494da23a 423 void OnCompactionCompleted(DB* /*db*/, const CompactionJobInfo& ci) override {
7c673cae
FG
424 if (verify_next_comp_io_stats_) {
425 ASSERT_GT(ci.stats.file_write_nanos, 0);
426 ASSERT_GT(ci.stats.file_range_sync_nanos, 0);
427 ASSERT_GT(ci.stats.file_fsync_nanos, 0);
428 ASSERT_GT(ci.stats.file_prepare_write_nanos, 0);
429 verify_next_comp_io_stats_ = false;
430 }
431
432 std::lock_guard<std::mutex> lock(mutex_);
433 if (expected_stats_.size()) {
434 Verify(ci.stats, expected_stats_.front());
435 expected_stats_.pop();
436 }
437 }
438
439 // A helper function which verifies whether two CompactionJobStats
440 // match. The verification of all compaction stats are done by
441 // ASSERT_EQ except for the total input / output bytes, which we
442 // use ASSERT_GE and ASSERT_LE with a reasonable bias ---
443 // 10% in uncompressed case and 20% when compression is used.
444 virtual void Verify(const CompactionJobStats& current_stats,
445 const CompactionJobStats& stats) {
446 // time
447 ASSERT_GT(current_stats.elapsed_micros, 0U);
448
449 ASSERT_EQ(current_stats.num_input_records,
450 stats.num_input_records);
451 ASSERT_EQ(current_stats.num_input_files,
452 stats.num_input_files);
453 ASSERT_EQ(current_stats.num_input_files_at_output_level,
454 stats.num_input_files_at_output_level);
455
456 ASSERT_EQ(current_stats.num_output_records,
457 stats.num_output_records);
458 ASSERT_EQ(current_stats.num_output_files,
459 stats.num_output_files);
460
20effc67 461 ASSERT_EQ(current_stats.is_full_compaction, stats.is_full_compaction);
7c673cae
FG
462 ASSERT_EQ(current_stats.is_manual_compaction,
463 stats.is_manual_compaction);
464
465 // file size
466 double kFileSizeBias = compression_enabled_ ? 0.20 : 0.10;
467 ASSERT_GE(current_stats.total_input_bytes * (1.00 + kFileSizeBias),
468 stats.total_input_bytes);
469 ASSERT_LE(current_stats.total_input_bytes,
470 stats.total_input_bytes * (1.00 + kFileSizeBias));
471 ASSERT_GE(current_stats.total_output_bytes * (1.00 + kFileSizeBias),
472 stats.total_output_bytes);
473 ASSERT_LE(current_stats.total_output_bytes,
474 stats.total_output_bytes * (1.00 + kFileSizeBias));
475 ASSERT_EQ(current_stats.total_input_raw_key_bytes,
476 stats.total_input_raw_key_bytes);
477 ASSERT_EQ(current_stats.total_input_raw_value_bytes,
478 stats.total_input_raw_value_bytes);
479
480 ASSERT_EQ(current_stats.num_records_replaced,
481 stats.num_records_replaced);
482
483 ASSERT_EQ(current_stats.num_corrupt_keys,
484 stats.num_corrupt_keys);
485
486 ASSERT_EQ(
487 std::string(current_stats.smallest_output_key_prefix),
488 std::string(stats.smallest_output_key_prefix));
489 ASSERT_EQ(
490 std::string(current_stats.largest_output_key_prefix),
491 std::string(stats.largest_output_key_prefix));
492 }
493
494 // Add an expected compaction stats, which will be used to
495 // verify the CompactionJobStats returned by the OnCompactionCompleted()
496 // callback.
497 void AddExpectedStats(const CompactionJobStats& stats) {
498 std::lock_guard<std::mutex> lock(mutex_);
499 expected_stats_.push(stats);
500 }
501
502 void EnableCompression(bool flag) {
503 compression_enabled_ = flag;
504 }
505
506 bool verify_next_comp_io_stats() const { return verify_next_comp_io_stats_; }
507
508 private:
509 std::mutex mutex_;
510 std::queue<CompactionJobStats> expected_stats_;
511 bool compression_enabled_;
512 bool verify_next_comp_io_stats_;
513};
514
515// An EventListener which helps verify the compaction statistics in
516// the test DeletionStatsTest.
517class CompactionJobDeletionStatsChecker : public CompactionJobStatsChecker {
518 public:
519 // Verifies whether two CompactionJobStats match.
520 void Verify(const CompactionJobStats& current_stats,
494da23a 521 const CompactionJobStats& stats) override {
7c673cae
FG
522 ASSERT_EQ(
523 current_stats.num_input_deletion_records,
524 stats.num_input_deletion_records);
525 ASSERT_EQ(
526 current_stats.num_expired_deletion_records,
527 stats.num_expired_deletion_records);
528 ASSERT_EQ(
529 current_stats.num_records_replaced,
530 stats.num_records_replaced);
531
532 ASSERT_EQ(current_stats.num_corrupt_keys,
533 stats.num_corrupt_keys);
534 }
535};
536
537namespace {
538
539uint64_t EstimatedFileSize(
540 uint64_t num_records, size_t key_size, size_t value_size,
541 double compression_ratio = 1.0,
542 size_t block_size = 4096,
543 int bloom_bits_per_key = 10) {
544 const size_t kPerKeyOverhead = 8;
545 const size_t kFooterSize = 512;
546
547 uint64_t data_size =
548 static_cast<uint64_t>(
549 num_records * (key_size + value_size * compression_ratio +
550 kPerKeyOverhead));
551
552 return data_size + kFooterSize
553 + num_records * bloom_bits_per_key / 8 // filter block
554 + data_size * (key_size + 8) / block_size; // index block
555}
556
557namespace {
558
559void CopyPrefix(
560 const Slice& src, size_t prefix_length, std::string* dst) {
561 assert(prefix_length > 0);
562 size_t length = src.size() > prefix_length ? prefix_length : src.size();
563 dst->assign(src.data(), length);
564}
565
566} // namespace
567
568CompactionJobStats NewManualCompactionJobStats(
569 const std::string& smallest_key, const std::string& largest_key,
570 size_t num_input_files, size_t num_input_files_at_output_level,
571 uint64_t num_input_records, size_t key_size, size_t value_size,
572 size_t num_output_files, uint64_t num_output_records,
573 double compression_ratio, uint64_t num_records_replaced,
20effc67 574 bool is_full = false, bool is_manual = true) {
7c673cae
FG
575 CompactionJobStats stats;
576 stats.Reset();
577
578 stats.num_input_records = num_input_records;
579 stats.num_input_files = num_input_files;
580 stats.num_input_files_at_output_level = num_input_files_at_output_level;
581
582 stats.num_output_records = num_output_records;
583 stats.num_output_files = num_output_files;
584
585 stats.total_input_bytes =
586 EstimatedFileSize(
587 num_input_records / num_input_files,
588 key_size, value_size, compression_ratio) * num_input_files;
589 stats.total_output_bytes =
590 EstimatedFileSize(
591 num_output_records / num_output_files,
592 key_size, value_size, compression_ratio) * num_output_files;
593 stats.total_input_raw_key_bytes =
594 num_input_records * (key_size + 8);
595 stats.total_input_raw_value_bytes =
596 num_input_records * value_size;
597
20effc67 598 stats.is_full_compaction = is_full;
7c673cae
FG
599 stats.is_manual_compaction = is_manual;
600
601 stats.num_records_replaced = num_records_replaced;
602
603 CopyPrefix(smallest_key,
604 CompactionJobStats::kMaxPrefixLength,
605 &stats.smallest_output_key_prefix);
606 CopyPrefix(largest_key,
607 CompactionJobStats::kMaxPrefixLength,
608 &stats.largest_output_key_prefix);
609
610 return stats;
611}
612
613CompressionType GetAnyCompression() {
614 if (Snappy_Supported()) {
615 return kSnappyCompression;
616 } else if (Zlib_Supported()) {
617 return kZlibCompression;
618 } else if (BZip2_Supported()) {
619 return kBZip2Compression;
620 } else if (LZ4_Supported()) {
621 return kLZ4Compression;
622 } else if (XPRESS_Supported()) {
623 return kXpressCompression;
624 }
625
626 return kNoCompression;
627}
628
629} // namespace
630
631TEST_P(CompactionJobStatsTest, CompactionJobStatsTest) {
632 Random rnd(301);
633 const int kBufSize = 100;
634 char buf[kBufSize];
635 uint64_t key_base = 100000000l;
636 // Note: key_base must be multiple of num_keys_per_L0_file
637 int num_keys_per_L0_file = 100;
638 const int kTestScale = 8;
639 const int kKeySize = 10;
640 const int kValueSize = 1000;
641 const double kCompressionRatio = 0.5;
642 double compression_ratio = 1.0;
643 uint64_t key_interval = key_base / num_keys_per_L0_file;
644
645 // Whenever a compaction completes, this listener will try to
646 // verify whether the returned CompactionJobStats matches
647 // what we expect. The expected CompactionJobStats is added
648 // via AddExpectedStats().
649 auto* stats_checker = new CompactionJobStatsChecker();
650 Options options;
651 options.listeners.emplace_back(stats_checker);
652 options.create_if_missing = true;
7c673cae
FG
653 // just enough setting to hold off auto-compaction.
654 options.level0_file_num_compaction_trigger = kTestScale + 1;
655 options.num_levels = 3;
656 options.compression = kNoCompression;
657 options.max_subcompactions = max_subcompactions_;
658 options.bytes_per_sync = 512 * 1024;
659
660 options.report_bg_io_stats = true;
661 for (int test = 0; test < 2; ++test) {
662 DestroyAndReopen(options);
663 CreateAndReopenWithCF({"pikachu"}, options);
664
665 // 1st Phase: generate "num_L0_files" L0 files.
666 int num_L0_files = 0;
667 for (uint64_t start_key = key_base;
668 start_key <= key_base * kTestScale;
669 start_key += key_base) {
670 MakeTableWithKeyValues(
671 &rnd, start_key, start_key + key_base - 1,
672 kKeySize, kValueSize, key_interval,
673 compression_ratio, 1);
674 snprintf(buf, kBufSize, "%d", ++num_L0_files);
675 ASSERT_EQ(std::string(buf), FilesPerLevel(1));
676 }
677 ASSERT_EQ(ToString(num_L0_files), FilesPerLevel(1));
678
679 // 2nd Phase: perform L0 -> L1 compaction.
680 int L0_compaction_count = 6;
681 int count = 1;
682 std::string smallest_key;
683 std::string largest_key;
684 for (uint64_t start_key = key_base;
685 start_key <= key_base * L0_compaction_count;
686 start_key += key_base, count++) {
687 smallest_key = Key(start_key, 10);
688 largest_key = Key(start_key + key_base - key_interval, 10);
689 stats_checker->AddExpectedStats(
690 NewManualCompactionJobStats(
691 smallest_key, largest_key,
692 1, 0, num_keys_per_L0_file,
693 kKeySize, kValueSize,
694 1, num_keys_per_L0_file,
695 compression_ratio, 0));
696 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
697 TEST_Compact(0, 1, smallest_key, largest_key);
698 snprintf(buf, kBufSize, "%d,%d", num_L0_files - count, count);
699 ASSERT_EQ(std::string(buf), FilesPerLevel(1));
700 }
701
702 // compact two files into one in the last L0 -> L1 compaction
703 int num_remaining_L0 = num_L0_files - L0_compaction_count;
704 smallest_key = Key(key_base * (L0_compaction_count + 1), 10);
705 largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
706 stats_checker->AddExpectedStats(
707 NewManualCompactionJobStats(
708 smallest_key, largest_key,
709 num_remaining_L0,
710 0, num_keys_per_L0_file * num_remaining_L0,
711 kKeySize, kValueSize,
712 1, num_keys_per_L0_file * num_remaining_L0,
713 compression_ratio, 0));
714 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
715 TEST_Compact(0, 1, smallest_key, largest_key);
716
717 int num_L1_files = num_L0_files - num_remaining_L0 + 1;
718 num_L0_files = 0;
719 snprintf(buf, kBufSize, "%d,%d", num_L0_files, num_L1_files);
720 ASSERT_EQ(std::string(buf), FilesPerLevel(1));
721
722 // 3rd Phase: generate sparse L0 files (wider key-range, same num of keys)
723 int sparseness = 2;
724 for (uint64_t start_key = key_base;
725 start_key <= key_base * kTestScale;
726 start_key += key_base * sparseness) {
727 MakeTableWithKeyValues(
728 &rnd, start_key, start_key + key_base * sparseness - 1,
729 kKeySize, kValueSize,
730 key_base * sparseness / num_keys_per_L0_file,
731 compression_ratio, 1);
732 snprintf(buf, kBufSize, "%d,%d", ++num_L0_files, num_L1_files);
733 ASSERT_EQ(std::string(buf), FilesPerLevel(1));
734 }
735
736 // 4th Phase: perform L0 -> L1 compaction again, expect higher write amp
737 // When subcompactions are enabled, the number of output files increases
738 // by 1 because multiple threads are consuming the input and generating
739 // output files without coordinating to see if the output could fit into
740 // a smaller number of files like it does when it runs sequentially
741 int num_output_files = options.max_subcompactions > 1 ? 2 : 1;
742 for (uint64_t start_key = key_base;
743 num_L0_files > 1;
744 start_key += key_base * sparseness) {
745 smallest_key = Key(start_key, 10);
746 largest_key =
747 Key(start_key + key_base * sparseness - key_interval, 10);
748 stats_checker->AddExpectedStats(
749 NewManualCompactionJobStats(
750 smallest_key, largest_key,
751 3, 2, num_keys_per_L0_file * 3,
752 kKeySize, kValueSize,
753 num_output_files,
754 num_keys_per_L0_file * 2, // 1/3 of the data will be updated.
755 compression_ratio,
756 num_keys_per_L0_file));
757 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
758 Compact(1, smallest_key, largest_key);
759 if (options.max_subcompactions == 1) {
760 --num_L1_files;
761 }
762 snprintf(buf, kBufSize, "%d,%d", --num_L0_files, num_L1_files);
763 ASSERT_EQ(std::string(buf), FilesPerLevel(1));
764 }
765
766 // 5th Phase: Do a full compaction, which involves in two sub-compactions.
767 // Here we expect to have 1 L0 files and 4 L1 files
768 // In the first sub-compaction, we expect L0 compaction.
769 smallest_key = Key(key_base, 10);
770 largest_key = Key(key_base * (kTestScale + 1) - key_interval, 10);
771 stats_checker->AddExpectedStats(
772 NewManualCompactionJobStats(
773 Key(key_base * (kTestScale + 1 - sparseness), 10), largest_key,
774 2, 1, num_keys_per_L0_file * 3,
775 kKeySize, kValueSize,
776 1, num_keys_per_L0_file * 2,
777 compression_ratio,
778 num_keys_per_L0_file));
779 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 1U);
780 Compact(1, smallest_key, largest_key);
781
782 num_L1_files = options.max_subcompactions > 1 ? 7 : 4;
783 char L1_buf[4];
784 snprintf(L1_buf, sizeof(L1_buf), "0,%d", num_L1_files);
785 std::string L1_files(L1_buf);
786 ASSERT_EQ(L1_files, FilesPerLevel(1));
787 options.compression = GetAnyCompression();
788 if (options.compression == kNoCompression) {
789 break;
790 }
791 stats_checker->EnableCompression(true);
792 compression_ratio = kCompressionRatio;
793
794 for (int i = 0; i < 5; i++) {
795 ASSERT_OK(Put(1, Slice(Key(key_base + i, 10)),
796 Slice(RandomString(&rnd, 512 * 1024, 1))));
797 }
798
799 ASSERT_OK(Flush(1));
20effc67 800 ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_WaitForCompact());
7c673cae
FG
801
802 stats_checker->set_verify_next_comp_io_stats(true);
803 std::atomic<bool> first_prepare_write(true);
f67539c2 804 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
11fdf7f2 805 "WritableFileWriter::Append:BeforePrepareWrite", [&](void* /*arg*/) {
7c673cae
FG
806 if (first_prepare_write.load()) {
807 options.env->SleepForMicroseconds(3);
808 first_prepare_write.store(false);
809 }
810 });
811
812 std::atomic<bool> first_flush(true);
f67539c2 813 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
11fdf7f2 814 "WritableFileWriter::Flush:BeforeAppend", [&](void* /*arg*/) {
7c673cae
FG
815 if (first_flush.load()) {
816 options.env->SleepForMicroseconds(3);
817 first_flush.store(false);
818 }
819 });
820
821 std::atomic<bool> first_sync(true);
f67539c2 822 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
11fdf7f2 823 "WritableFileWriter::SyncInternal:0", [&](void* /*arg*/) {
7c673cae
FG
824 if (first_sync.load()) {
825 options.env->SleepForMicroseconds(3);
826 first_sync.store(false);
827 }
828 });
829
830 std::atomic<bool> first_range_sync(true);
f67539c2 831 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
11fdf7f2 832 "WritableFileWriter::RangeSync:0", [&](void* /*arg*/) {
7c673cae
FG
833 if (first_range_sync.load()) {
834 options.env->SleepForMicroseconds(3);
835 first_range_sync.store(false);
836 }
837 });
f67539c2 838 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
7c673cae
FG
839
840 Compact(1, smallest_key, largest_key);
841
842 ASSERT_TRUE(!stats_checker->verify_next_comp_io_stats());
843 ASSERT_TRUE(!first_prepare_write.load());
844 ASSERT_TRUE(!first_flush.load());
845 ASSERT_TRUE(!first_sync.load());
846 ASSERT_TRUE(!first_range_sync.load());
f67539c2 847 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
7c673cae
FG
848 }
849 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
850}
851
852TEST_P(CompactionJobStatsTest, DeletionStatsTest) {
853 Random rnd(301);
854 uint64_t key_base = 100000l;
855 // Note: key_base must be multiple of num_keys_per_L0_file
856 int num_keys_per_L0_file = 20;
857 const int kTestScale = 8; // make sure this is even
858 const int kKeySize = 10;
859 const int kValueSize = 100;
860 double compression_ratio = 1.0;
861 uint64_t key_interval = key_base / num_keys_per_L0_file;
862 uint64_t largest_key_num = key_base * (kTestScale + 1) - key_interval;
863 uint64_t cutoff_key_num = key_base * (kTestScale / 2 + 1) - key_interval;
864 const std::string smallest_key = Key(key_base - 10, kKeySize);
865 const std::string largest_key = Key(largest_key_num + 10, kKeySize);
866
867 // Whenever a compaction completes, this listener will try to
868 // verify whether the returned CompactionJobStats matches
869 // what we expect.
870 auto* stats_checker = new CompactionJobDeletionStatsChecker();
871 Options options;
872 options.listeners.emplace_back(stats_checker);
873 options.create_if_missing = true;
7c673cae
FG
874 options.level0_file_num_compaction_trigger = kTestScale+1;
875 options.num_levels = 3;
876 options.compression = kNoCompression;
877 options.max_bytes_for_level_multiplier = 2;
878 options.max_subcompactions = max_subcompactions_;
879
880 DestroyAndReopen(options);
881 CreateAndReopenWithCF({"pikachu"}, options);
882
883 // Stage 1: Generate several L0 files and then send them to L2 by
884 // using CompactRangeOptions and CompactRange(). These files will
885 // have a strict subset of the keys from the full key-range
886 for (uint64_t start_key = key_base;
887 start_key <= key_base * kTestScale / 2;
888 start_key += key_base) {
889 MakeTableWithKeyValues(
890 &rnd, start_key, start_key + key_base - 1,
891 kKeySize, kValueSize, key_interval,
892 compression_ratio, 1);
893 }
894
895 CompactRangeOptions cr_options;
896 cr_options.change_level = true;
897 cr_options.target_level = 2;
20effc67 898 ASSERT_OK(db_->CompactRange(cr_options, handles_[1], nullptr, nullptr));
7c673cae
FG
899 ASSERT_GT(NumTableFilesAtLevel(2, 1), 0);
900
901 // Stage 2: Generate files including keys from the entire key range
902 for (uint64_t start_key = key_base;
903 start_key <= key_base * kTestScale;
904 start_key += key_base) {
905 MakeTableWithKeyValues(
906 &rnd, start_key, start_key + key_base - 1,
907 kKeySize, kValueSize, key_interval,
908 compression_ratio, 1);
909 }
910
911 // Send these L0 files to L1
912 TEST_Compact(0, 1, smallest_key, largest_key);
913 ASSERT_GT(NumTableFilesAtLevel(1, 1), 0);
914
915 // Add a new record and flush so now there is a L0 file
916 // with a value too (not just deletions from the next step)
917 ASSERT_OK(Put(1, Key(key_base-6, kKeySize), "test"));
918 ASSERT_OK(Flush(1));
919
920 // Stage 3: Generate L0 files with some deletions so now
921 // there are files with the same key range in L0, L1, and L2
922 int deletion_interval = 3;
923 CompactionJobStats first_compaction_stats;
924 SelectivelyDeleteKeys(key_base, largest_key_num,
925 key_interval, deletion_interval, kKeySize, cutoff_key_num,
926 &first_compaction_stats, 1);
927
928 stats_checker->AddExpectedStats(first_compaction_stats);
929
930 // Stage 4: Trigger compaction and verify the stats
931 TEST_Compact(0, 1, smallest_key, largest_key);
932}
933
934namespace {
935int GetUniversalCompactionInputUnits(uint32_t num_flushes) {
936 uint32_t compaction_input_units;
937 for (compaction_input_units = 1;
938 num_flushes >= compaction_input_units;
939 compaction_input_units *= 2) {
940 if ((num_flushes & compaction_input_units) != 0) {
941 return compaction_input_units > 1 ? compaction_input_units : 0;
942 }
943 }
944 return 0;
945}
946} // namespace
947
948TEST_P(CompactionJobStatsTest, UniversalCompactionTest) {
949 Random rnd(301);
950 uint64_t key_base = 100000000l;
951 // Note: key_base must be multiple of num_keys_per_L0_file
952 int num_keys_per_table = 100;
953 const uint32_t kTestScale = 6;
954 const int kKeySize = 10;
955 const int kValueSize = 900;
956 double compression_ratio = 1.0;
957 uint64_t key_interval = key_base / num_keys_per_table;
958
959 auto* stats_checker = new CompactionJobStatsChecker();
960 Options options;
961 options.listeners.emplace_back(stats_checker);
962 options.create_if_missing = true;
963 options.num_levels = 3;
964 options.compression = kNoCompression;
965 options.level0_file_num_compaction_trigger = 2;
966 options.target_file_size_base = num_keys_per_table * 1000;
967 options.compaction_style = kCompactionStyleUniversal;
968 options.compaction_options_universal.size_ratio = 1;
969 options.compaction_options_universal.max_size_amplification_percent = 1000;
970 options.max_subcompactions = max_subcompactions_;
971
972 DestroyAndReopen(options);
973 CreateAndReopenWithCF({"pikachu"}, options);
974
975 // Generates the expected CompactionJobStats for each compaction
976 for (uint32_t num_flushes = 2; num_flushes <= kTestScale; num_flushes++) {
977 // Here we treat one newly flushed file as an unit.
978 //
979 // For example, if a newly flushed file is 100k, and a compaction has
980 // 4 input units, then this compaction inputs 400k.
981 uint32_t num_input_units = GetUniversalCompactionInputUnits(num_flushes);
982 if (num_input_units == 0) {
983 continue;
984 }
20effc67
TL
985 // A full compaction only happens when the number of flushes equals to
986 // the number of compaction input runs.
987 bool is_full = num_flushes == num_input_units;
7c673cae 988 // The following statement determines the expected smallest key
20effc67
TL
989 // based on whether it is a full compaction.
990 uint64_t smallest_key = is_full ? key_base : key_base * (num_flushes - 1);
991
992 stats_checker->AddExpectedStats(NewManualCompactionJobStats(
993 Key(smallest_key, 10),
994 Key(smallest_key + key_base * num_input_units - key_interval, 10),
995 num_input_units, num_input_units > 2 ? num_input_units / 2 : 0,
996 num_keys_per_table * num_input_units, kKeySize, kValueSize,
997 num_input_units, num_keys_per_table * num_input_units, 1.0, 0, is_full,
998 false));
999 ASSERT_OK(dbfull()->TEST_WaitForCompact());
7c673cae
FG
1000 }
1001 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 3U);
1002
1003 for (uint64_t start_key = key_base;
1004 start_key <= key_base * kTestScale;
1005 start_key += key_base) {
1006 MakeTableWithKeyValues(
1007 &rnd, start_key, start_key + key_base - 1,
1008 kKeySize, kValueSize, key_interval,
1009 compression_ratio, 1);
20effc67 1010 ASSERT_OK(static_cast_with_check<DBImpl>(db_)->TEST_WaitForCompact());
7c673cae
FG
1011 }
1012 ASSERT_EQ(stats_checker->NumberOfUnverifiedStats(), 0U);
1013}
1014
1015INSTANTIATE_TEST_CASE_P(CompactionJobStatsTest, CompactionJobStatsTest,
1016 ::testing::Values(1, 4));
f67539c2 1017} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
1018
1019int main(int argc, char** argv) {
f67539c2 1020 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
7c673cae
FG
1021 ::testing::InitGoogleTest(&argc, argv);
1022 return RUN_ALL_TESTS();
1023}
1024
1025#else
1026#include <stdio.h>
1027
11fdf7f2 1028int main(int /*argc*/, char** /*argv*/) {
7c673cae
FG
1029 fprintf(stderr, "SKIPPED, not supported in ROCKSDB_LITE\n");
1030 return 0;
1031}
1032
1033#endif // !ROCKSDB_LITE
1034
1035#else
1036
11fdf7f2 1037int main(int /*argc*/, char** /*argv*/) { return 0; }
7c673cae 1038#endif // !defined(IOS_CROSS_COMPILE)