]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction/compaction_job_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_job_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #ifndef ROCKSDB_LITE
7
8 #include "db/compaction/compaction_job.h"
9
10 #include <algorithm>
11 #include <array>
12 #include <cinttypes>
13 #include <map>
14 #include <string>
15 #include <tuple>
16
17 #include "db/blob/blob_index.h"
18 #include "db/column_family.h"
19 #include "db/db_impl/db_impl.h"
20 #include "db/error_handler.h"
21 #include "db/version_set.h"
22 #include "file/random_access_file_reader.h"
23 #include "file/writable_file_writer.h"
24 #include "options/options_helper.h"
25 #include "rocksdb/cache.h"
26 #include "rocksdb/convenience.h"
27 #include "rocksdb/db.h"
28 #include "rocksdb/file_system.h"
29 #include "rocksdb/options.h"
30 #include "rocksdb/write_buffer_manager.h"
31 #include "table/mock_table.h"
32 #include "table/unique_id_impl.h"
33 #include "test_util/testharness.h"
34 #include "test_util/testutil.h"
35 #include "util/string_util.h"
36 #include "utilities/merge_operators.h"
37
38 namespace ROCKSDB_NAMESPACE {
39
40 namespace {
41
42 void VerifyInitializationOfCompactionJobStats(
43 const CompactionJobStats& compaction_job_stats) {
44 #if !defined(IOS_CROSS_COMPILE)
45 ASSERT_EQ(compaction_job_stats.elapsed_micros, 0U);
46
47 ASSERT_EQ(compaction_job_stats.num_input_records, 0U);
48 ASSERT_EQ(compaction_job_stats.num_input_files, 0U);
49 ASSERT_EQ(compaction_job_stats.num_input_files_at_output_level, 0U);
50
51 ASSERT_EQ(compaction_job_stats.num_output_records, 0U);
52 ASSERT_EQ(compaction_job_stats.num_output_files, 0U);
53
54 ASSERT_EQ(compaction_job_stats.is_manual_compaction, true);
55
56 ASSERT_EQ(compaction_job_stats.total_input_bytes, 0U);
57 ASSERT_EQ(compaction_job_stats.total_output_bytes, 0U);
58
59 ASSERT_EQ(compaction_job_stats.total_input_raw_key_bytes, 0U);
60 ASSERT_EQ(compaction_job_stats.total_input_raw_value_bytes, 0U);
61
62 ASSERT_EQ(compaction_job_stats.smallest_output_key_prefix[0], 0);
63 ASSERT_EQ(compaction_job_stats.largest_output_key_prefix[0], 0);
64
65 ASSERT_EQ(compaction_job_stats.num_records_replaced, 0U);
66
67 ASSERT_EQ(compaction_job_stats.num_input_deletion_records, 0U);
68 ASSERT_EQ(compaction_job_stats.num_expired_deletion_records, 0U);
69
70 ASSERT_EQ(compaction_job_stats.num_corrupt_keys, 0U);
71 #endif // !defined(IOS_CROSS_COMPILE)
72 }
73
74 // Mock FSWritableFile for testing io priority.
75 // Only override the essential functions for testing compaction io priority.
76 class MockTestWritableFile : public FSWritableFileOwnerWrapper {
77 public:
78 MockTestWritableFile(std::unique_ptr<FSWritableFile>&& file,
79 Env::IOPriority io_priority)
80 : FSWritableFileOwnerWrapper(std::move(file)),
81 write_io_priority_(io_priority) {}
82 IOStatus Append(const Slice& data, const IOOptions& options,
83 IODebugContext* dbg) override {
84 EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
85 return target()->Append(data, options, dbg);
86 }
87 IOStatus Append(const Slice& data, const IOOptions& options,
88 const DataVerificationInfo& verification_info,
89 IODebugContext* dbg) override {
90 EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
91 return target()->Append(data, options, verification_info, dbg);
92 }
93 IOStatus Close(const IOOptions& options, IODebugContext* dbg) override {
94 EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
95 return target()->Close(options, dbg);
96 }
97 IOStatus Flush(const IOOptions& options, IODebugContext* dbg) override {
98 EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
99 return target()->Flush(options, dbg);
100 }
101 IOStatus Sync(const IOOptions& options, IODebugContext* dbg) override {
102 EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
103 return target()->Sync(options, dbg);
104 }
105 IOStatus Fsync(const IOOptions& options, IODebugContext* dbg) override {
106 EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
107 return target()->Fsync(options, dbg);
108 }
109 uint64_t GetFileSize(const IOOptions& options, IODebugContext* dbg) override {
110 EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
111 return target()->GetFileSize(options, dbg);
112 }
113 IOStatus RangeSync(uint64_t offset, uint64_t nbytes, const IOOptions& options,
114 IODebugContext* dbg) override {
115 EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
116 return target()->RangeSync(offset, nbytes, options, dbg);
117 }
118
119 void PrepareWrite(size_t offset, size_t len, const IOOptions& options,
120 IODebugContext* dbg) override {
121 EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
122 target()->PrepareWrite(offset, len, options, dbg);
123 }
124
125 IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& options,
126 IODebugContext* dbg) override {
127 EXPECT_EQ(options.rate_limiter_priority, write_io_priority_);
128 return target()->Allocate(offset, len, options, dbg);
129 }
130
131 private:
132 Env::IOPriority write_io_priority_;
133 };
134
135 // Mock FSRandomAccessFile for testing io priority.
136 // Only override the essential functions for testing compaction io priority.
137 class MockTestRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
138 public:
139 MockTestRandomAccessFile(std::unique_ptr<FSRandomAccessFile>&& file,
140 Env::IOPriority io_priority)
141 : FSRandomAccessFileOwnerWrapper(std::move(file)),
142 read_io_priority_(io_priority) {}
143
144 IOStatus Read(uint64_t offset, size_t n, const IOOptions& options,
145 Slice* result, char* scratch,
146 IODebugContext* dbg) const override {
147 EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
148 return target()->Read(offset, n, options, result, scratch, dbg);
149 }
150 IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
151 IODebugContext* dbg) override {
152 EXPECT_EQ(options.rate_limiter_priority, read_io_priority_);
153 return target()->Prefetch(offset, n, options, dbg);
154 }
155
156 private:
157 Env::IOPriority read_io_priority_;
158 };
159
160 // Mock FileSystem for testing io priority.
161 class MockTestFileSystem : public FileSystemWrapper {
162 public:
163 explicit MockTestFileSystem(const std::shared_ptr<FileSystem>& base,
164 Env::IOPriority read_io_priority,
165 Env::IOPriority write_io_priority)
166 : FileSystemWrapper(base),
167 read_io_priority_(read_io_priority),
168 write_io_priority_(write_io_priority) {}
169
170 static const char* kClassName() { return "MockTestFileSystem"; }
171 const char* Name() const override { return kClassName(); }
172
173 IOStatus NewRandomAccessFile(const std::string& fname,
174 const FileOptions& file_opts,
175 std::unique_ptr<FSRandomAccessFile>* result,
176 IODebugContext* dbg) override {
177 IOStatus s = target()->NewRandomAccessFile(fname, file_opts, result, dbg);
178 EXPECT_OK(s);
179 result->reset(
180 new MockTestRandomAccessFile(std::move(*result), read_io_priority_));
181 return s;
182 }
183 IOStatus NewWritableFile(const std::string& fname,
184 const FileOptions& file_opts,
185 std::unique_ptr<FSWritableFile>* result,
186 IODebugContext* dbg) override {
187 IOStatus s = target()->NewWritableFile(fname, file_opts, result, dbg);
188 EXPECT_OK(s);
189 result->reset(
190 new MockTestWritableFile(std::move(*result), write_io_priority_));
191 return s;
192 }
193
194 private:
195 Env::IOPriority read_io_priority_;
196 Env::IOPriority write_io_priority_;
197 };
198
199 enum TableTypeForTest : uint8_t { kMockTable = 0, kBlockBasedTable = 1 };
200
201 } // namespace
202
203 class CompactionJobTestBase : public testing::Test {
204 protected:
205 CompactionJobTestBase(std::string dbname, const Comparator* ucmp,
206 std::function<std::string(uint64_t)> encode_u64_ts,
207 bool test_io_priority, TableTypeForTest table_type)
208 : dbname_(std::move(dbname)),
209 ucmp_(ucmp),
210 db_options_(),
211 mutable_cf_options_(cf_options_),
212 mutable_db_options_(),
213 table_cache_(NewLRUCache(50000, 16)),
214 write_buffer_manager_(db_options_.db_write_buffer_size),
215 versions_(new VersionSet(
216 dbname_, &db_options_, env_options_, table_cache_.get(),
217 &write_buffer_manager_, &write_controller_,
218 /*block_cache_tracer=*/nullptr,
219 /*io_tracer=*/nullptr, /*db_id*/ "", /*db_session_id*/ "")),
220 shutting_down_(false),
221 mock_table_factory_(new mock::MockTableFactory()),
222 error_handler_(nullptr, db_options_, &mutex_),
223 encode_u64_ts_(std::move(encode_u64_ts)),
224 test_io_priority_(test_io_priority),
225 table_type_(table_type) {
226 Env* base_env = Env::Default();
227 EXPECT_OK(
228 test::CreateEnvFromSystem(ConfigOptions(), &base_env, &env_guard_));
229 env_ = base_env;
230 fs_ = env_->GetFileSystem();
231 // set default for the tests
232 mutable_cf_options_.target_file_size_base = 1024 * 1024;
233 mutable_cf_options_.max_compaction_bytes = 10 * 1024 * 1024;
234 }
235
236 void SetUp() override {
237 EXPECT_OK(env_->CreateDirIfMissing(dbname_));
238 db_options_.env = env_;
239 db_options_.fs = fs_;
240 db_options_.db_paths.emplace_back(dbname_,
241 std::numeric_limits<uint64_t>::max());
242 cf_options_.comparator = ucmp_;
243 if (table_type_ == TableTypeForTest::kBlockBasedTable) {
244 BlockBasedTableOptions table_options;
245 cf_options_.table_factory.reset(NewBlockBasedTableFactory(table_options));
246 } else if (table_type_ == TableTypeForTest::kMockTable) {
247 cf_options_.table_factory = mock_table_factory_;
248 } else {
249 assert(false);
250 }
251 }
252
253 std::string GenerateFileName(uint64_t file_number) {
254 FileMetaData meta;
255 std::vector<DbPath> db_paths;
256 db_paths.emplace_back(dbname_, std::numeric_limits<uint64_t>::max());
257 meta.fd = FileDescriptor(file_number, 0, 0);
258 return TableFileName(db_paths, meta.fd.GetNumber(), meta.fd.GetPathId());
259 }
260
261 std::string KeyStr(const std::string& user_key, const SequenceNumber seq_num,
262 const ValueType t, uint64_t ts = 0) {
263 std::string user_key_with_ts = user_key + encode_u64_ts_(ts);
264 return InternalKey(user_key_with_ts, seq_num, t).Encode().ToString();
265 }
266
267 static std::string BlobStr(uint64_t blob_file_number, uint64_t offset,
268 uint64_t size) {
269 std::string blob_index;
270 BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
271 kNoCompression);
272 return blob_index;
273 }
274
275 static std::string BlobStrTTL(uint64_t blob_file_number, uint64_t offset,
276 uint64_t size, uint64_t expiration) {
277 std::string blob_index;
278 BlobIndex::EncodeBlobTTL(&blob_index, expiration, blob_file_number, offset,
279 size, kNoCompression);
280 return blob_index;
281 }
282
283 static std::string BlobStrInlinedTTL(const Slice& value,
284 uint64_t expiration) {
285 std::string blob_index;
286 BlobIndex::EncodeInlinedTTL(&blob_index, expiration, value);
287 return blob_index;
288 }
289
290 // Creates a table with the specificied key value pairs.
291 void CreateTable(const std::string& table_name,
292 const mock::KVVector& contents, uint64_t& file_size) {
293 std::unique_ptr<WritableFileWriter> file_writer;
294 Status s = WritableFileWriter::Create(fs_, table_name, FileOptions(),
295 &file_writer, nullptr);
296 ASSERT_OK(s);
297 std::unique_ptr<TableBuilder> table_builder(
298 cf_options_.table_factory->NewTableBuilder(
299 TableBuilderOptions(*cfd_->ioptions(), mutable_cf_options_,
300 cfd_->internal_comparator(),
301 cfd_->int_tbl_prop_collector_factories(),
302 CompressionType::kNoCompression,
303 CompressionOptions(), 0 /* column_family_id */,
304 kDefaultColumnFamilyName, -1 /* level */),
305 file_writer.get()));
306 // Build table.
307 for (auto kv : contents) {
308 std::string key;
309 std::string value;
310 std::tie(key, value) = kv;
311 table_builder->Add(key, value);
312 }
313 ASSERT_OK(table_builder->Finish());
314 file_size = table_builder->FileSize();
315 }
316
317 void AddMockFile(const mock::KVVector& contents, int level = 0) {
318 assert(contents.size() > 0);
319
320 bool first_key = true;
321 std::string smallest, largest;
322 InternalKey smallest_key, largest_key;
323 SequenceNumber smallest_seqno = kMaxSequenceNumber;
324 SequenceNumber largest_seqno = 0;
325 uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
326 for (auto kv : contents) {
327 ParsedInternalKey key;
328 std::string skey;
329 std::string value;
330 std::tie(skey, value) = kv;
331 const Status pik_status =
332 ParseInternalKey(skey, &key, true /* log_err_key */);
333
334 smallest_seqno = std::min(smallest_seqno, key.sequence);
335 largest_seqno = std::max(largest_seqno, key.sequence);
336
337 if (first_key ||
338 cfd_->user_comparator()->Compare(key.user_key, smallest) < 0) {
339 smallest.assign(key.user_key.data(), key.user_key.size());
340 smallest_key.DecodeFrom(skey);
341 }
342 if (first_key ||
343 cfd_->user_comparator()->Compare(key.user_key, largest) > 0) {
344 largest.assign(key.user_key.data(), key.user_key.size());
345 largest_key.DecodeFrom(skey);
346 }
347
348 first_key = false;
349
350 if (pik_status.ok() && key.type == kTypeBlobIndex) {
351 BlobIndex blob_index;
352 const Status s = blob_index.DecodeFrom(value);
353 if (!s.ok()) {
354 continue;
355 }
356
357 if (blob_index.IsInlined() || blob_index.HasTTL() ||
358 blob_index.file_number() == kInvalidBlobFileNumber) {
359 continue;
360 }
361
362 if (oldest_blob_file_number == kInvalidBlobFileNumber ||
363 oldest_blob_file_number > blob_index.file_number()) {
364 oldest_blob_file_number = blob_index.file_number();
365 }
366 }
367 }
368
369 uint64_t file_number = versions_->NewFileNumber();
370
371 uint64_t file_size = 0;
372 if (table_type_ == TableTypeForTest::kBlockBasedTable) {
373 CreateTable(GenerateFileName(file_number), contents, file_size);
374 } else if (table_type_ == TableTypeForTest::kMockTable) {
375 file_size = 10;
376 EXPECT_OK(mock_table_factory_->CreateMockTable(
377 env_, GenerateFileName(file_number), std::move(contents)));
378 } else {
379 assert(false);
380 }
381
382 VersionEdit edit;
383 edit.AddFile(level, file_number, 0, file_size, smallest_key, largest_key,
384 smallest_seqno, largest_seqno, false, Temperature::kUnknown,
385 oldest_blob_file_number, kUnknownOldestAncesterTime,
386 kUnknownFileCreationTime, kUnknownFileChecksum,
387 kUnknownFileChecksumFuncName, kNullUniqueId64x2);
388
389 mutex_.Lock();
390 EXPECT_OK(
391 versions_->LogAndApply(versions_->GetColumnFamilySet()->GetDefault(),
392 mutable_cf_options_, &edit, &mutex_, nullptr));
393 mutex_.Unlock();
394 }
395
396 void VerifyTables(int output_level,
397 const std::vector<mock::KVVector>& expected_results,
398 std::vector<uint64_t> expected_oldest_blob_file_numbers) {
399 if (expected_results.empty()) {
400 ASSERT_EQ(compaction_job_stats_.num_output_files, 0U);
401 return;
402 }
403 int expected_output_file_num = 0;
404 for (const auto& e : expected_results) {
405 if (!e.empty()) {
406 ++expected_output_file_num;
407 }
408 }
409 ASSERT_EQ(expected_output_file_num, compaction_job_stats_.num_output_files);
410 if (expected_output_file_num == 0) {
411 return;
412 }
413
414 if (expected_oldest_blob_file_numbers.empty()) {
415 expected_oldest_blob_file_numbers.resize(expected_output_file_num,
416 kInvalidBlobFileNumber);
417 }
418
419 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
420 if (table_type_ == TableTypeForTest::kMockTable) {
421 ASSERT_EQ(compaction_job_stats_.num_output_files,
422 expected_results.size());
423 mock_table_factory_->AssertLatestFiles(expected_results);
424 } else {
425 assert(table_type_ == TableTypeForTest::kBlockBasedTable);
426 }
427
428 auto output_files =
429 cfd->current()->storage_info()->LevelFiles(output_level);
430 ASSERT_EQ(expected_output_file_num, output_files.size());
431
432 if (table_type_ == TableTypeForTest::kMockTable) {
433 assert(output_files.size() ==
434 static_cast<size_t>(expected_output_file_num));
435 const FileMetaData* const output_file = output_files[0];
436 ASSERT_EQ(output_file->oldest_blob_file_number,
437 expected_oldest_blob_file_numbers[0]);
438 return;
439 }
440
441 for (size_t i = 0; i < expected_results.size(); ++i) {
442 const FileMetaData* const output_file = output_files[i];
443 std::string file_name = GenerateFileName(output_file->fd.GetNumber());
444 const auto& fs = env_->GetFileSystem();
445 std::unique_ptr<RandomAccessFileReader> freader;
446 IOStatus ios = RandomAccessFileReader::Create(
447 fs, file_name, FileOptions(), &freader, nullptr);
448 ASSERT_OK(ios);
449 std::unique_ptr<TableReader> table_reader;
450 uint64_t file_size = output_file->fd.GetFileSize();
451 ReadOptions read_opts;
452 Status s = cf_options_.table_factory->NewTableReader(
453 read_opts,
454 TableReaderOptions(*cfd->ioptions(), nullptr, FileOptions(),
455 cfd_->internal_comparator()),
456 std::move(freader), file_size, &table_reader, false);
457 ASSERT_OK(s);
458 assert(table_reader);
459 std::unique_ptr<InternalIterator> iiter(
460 table_reader->NewIterator(read_opts, nullptr, nullptr, true,
461 TableReaderCaller::kUncategorized));
462 assert(iiter);
463
464 mock::KVVector from_db;
465 for (iiter->SeekToFirst(); iiter->Valid(); iiter->Next()) {
466 const Slice key = iiter->key();
467 const Slice value = iiter->value();
468 from_db.emplace_back(
469 make_pair(key.ToString(false), value.ToString(false)));
470 }
471 ASSERT_EQ(expected_results[i], from_db);
472 }
473 }
474
475 void SetLastSequence(const SequenceNumber sequence_number) {
476 versions_->SetLastAllocatedSequence(sequence_number + 1);
477 versions_->SetLastPublishedSequence(sequence_number + 1);
478 versions_->SetLastSequence(sequence_number + 1);
479 }
480
481 // returns expected result after compaction
482 mock::KVVector CreateTwoFiles(bool gen_corrupted_keys) {
483 stl_wrappers::KVMap expected_results;
484 constexpr int kKeysPerFile = 10000;
485 constexpr int kCorruptKeysPerFile = 200;
486 constexpr int kMatchingKeys = kKeysPerFile / 2;
487 SequenceNumber sequence_number = 0;
488
489 auto corrupt_id = [&](int id) {
490 return gen_corrupted_keys && id > 0 && id <= kCorruptKeysPerFile;
491 };
492
493 for (int i = 0; i < 2; ++i) {
494 auto contents = mock::MakeMockFile();
495 for (int k = 0; k < kKeysPerFile; ++k) {
496 auto key = std::to_string(i * kMatchingKeys + k);
497 auto value = std::to_string(i * kKeysPerFile + k);
498 InternalKey internal_key(key, ++sequence_number, kTypeValue);
499
500 // This is how the key will look like once it's written in bottommost
501 // file
502 InternalKey bottommost_internal_key(key, 0, kTypeValue);
503
504 if (corrupt_id(k)) {
505 test::CorruptKeyType(&internal_key);
506 test::CorruptKeyType(&bottommost_internal_key);
507 }
508 contents.push_back({internal_key.Encode().ToString(), value});
509 if (i == 1 || k < kMatchingKeys || corrupt_id(k - kMatchingKeys)) {
510 expected_results.insert(
511 {bottommost_internal_key.Encode().ToString(), value});
512 }
513 }
514 mock::SortKVVector(&contents, ucmp_);
515
516 AddMockFile(contents);
517 }
518
519 SetLastSequence(sequence_number);
520
521 mock::KVVector expected_results_kvvector;
522 for (auto& kv : expected_results) {
523 expected_results_kvvector.push_back({kv.first, kv.second});
524 }
525
526 return expected_results_kvvector;
527 }
528
529 void NewDB() {
530 EXPECT_OK(DestroyDB(dbname_, Options()));
531 EXPECT_OK(env_->CreateDirIfMissing(dbname_));
532
533 std::shared_ptr<Logger> info_log;
534 DBOptions db_opts = BuildDBOptions(db_options_, mutable_db_options_);
535 Status s = CreateLoggerFromOptions(dbname_, db_opts, &info_log);
536 ASSERT_OK(s);
537 db_options_.info_log = info_log;
538
539 versions_.reset(
540 new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
541 &write_buffer_manager_, &write_controller_,
542 /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr,
543 /*db_id*/ "", /*db_session_id*/ ""));
544 compaction_job_stats_.Reset();
545 ASSERT_OK(SetIdentityFile(env_, dbname_));
546
547 VersionEdit new_db;
548 new_db.SetLogNumber(0);
549 new_db.SetNextFile(2);
550 new_db.SetLastSequence(0);
551
552 const std::string manifest = DescriptorFileName(dbname_, 1);
553 std::unique_ptr<WritableFileWriter> file_writer;
554 const auto& fs = env_->GetFileSystem();
555 s = WritableFileWriter::Create(fs, manifest,
556 fs->OptimizeForManifestWrite(env_options_),
557 &file_writer, nullptr);
558
559 ASSERT_OK(s);
560 {
561 log::Writer log(std::move(file_writer), 0, false);
562 std::string record;
563 new_db.EncodeTo(&record);
564 s = log.AddRecord(record);
565 }
566 ASSERT_OK(s);
567 // Make "CURRENT" file that points to the new manifest file.
568 s = SetCurrentFile(fs_.get(), dbname_, 1, nullptr);
569
570 ASSERT_OK(s);
571
572 cf_options_.merge_operator = merge_op_;
573 cf_options_.compaction_filter = compaction_filter_.get();
574 std::vector<ColumnFamilyDescriptor> column_families;
575 column_families.emplace_back(kDefaultColumnFamilyName, cf_options_);
576
577 ASSERT_OK(versions_->Recover(column_families, false));
578 cfd_ = versions_->GetColumnFamilySet()->GetDefault();
579 }
580
581 // input_files[i] on input_levels[i]
582 void RunLastLevelCompaction(
583 const std::vector<std::vector<FileMetaData*>>& input_files,
584 const std::vector<int> input_levels,
585 std::function<void(Compaction& comp)>&& verify_func,
586 const std::vector<SequenceNumber>& snapshots = {}) {
587 const int kLastLevel = cf_options_.num_levels - 1;
588 verify_per_key_placement_ = std::move(verify_func);
589 mock::KVVector empty_map;
590 RunCompaction(input_files, input_levels, {empty_map}, snapshots,
591 kMaxSequenceNumber, kLastLevel, false);
592 }
593
594 // input_files[i] on input_levels[i]
595 void RunCompaction(
596 const std::vector<std::vector<FileMetaData*>>& input_files,
597 const std::vector<int>& input_levels,
598 const std::vector<mock::KVVector>& expected_results,
599 const std::vector<SequenceNumber>& snapshots = {},
600 SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
601 int output_level = 1, bool verify = true,
602 std::vector<uint64_t> expected_oldest_blob_file_numbers = {},
603 bool check_get_priority = false,
604 Env::IOPriority read_io_priority = Env::IO_TOTAL,
605 Env::IOPriority write_io_priority = Env::IO_TOTAL,
606 int max_subcompactions = 0) {
607 // For compaction, set fs as MockTestFileSystem to check the io_priority.
608 if (test_io_priority_) {
609 db_options_.fs.reset(
610 new MockTestFileSystem(fs_, read_io_priority, write_io_priority));
611 }
612
613 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
614
615 size_t num_input_files = 0;
616 std::vector<CompactionInputFiles> compaction_input_files;
617 for (size_t i = 0; i < input_files.size(); ++i) {
618 auto level_files = input_files[i];
619 CompactionInputFiles compaction_level;
620 compaction_level.level = input_levels[i];
621 compaction_level.files.insert(compaction_level.files.end(),
622 level_files.begin(), level_files.end());
623 compaction_input_files.push_back(compaction_level);
624 num_input_files += level_files.size();
625 }
626
627 std::vector<FileMetaData*> grandparents;
628 // it should actually be the next non-empty level
629 const int kGrandparentsLevel = output_level + 1;
630 if (kGrandparentsLevel < cf_options_.num_levels) {
631 grandparents =
632 cfd_->current()->storage_info()->LevelFiles(kGrandparentsLevel);
633 }
634
635 Compaction compaction(
636 cfd->current()->storage_info(), *cfd->ioptions(),
637 *cfd->GetLatestMutableCFOptions(), mutable_db_options_,
638 compaction_input_files, output_level,
639 mutable_cf_options_.target_file_size_base,
640 mutable_cf_options_.max_compaction_bytes, 0, kNoCompression,
641 cfd->GetLatestMutableCFOptions()->compression_opts,
642 Temperature::kUnknown, max_subcompactions, grandparents, true);
643 compaction.SetInputVersion(cfd->current());
644
645 assert(db_options_.info_log);
646 LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
647 mutex_.Lock();
648 EventLogger event_logger(db_options_.info_log.get());
649 // TODO(yiwu) add a mock snapshot checker and add test for it.
650 SnapshotChecker* snapshot_checker = nullptr;
651 ASSERT_TRUE(full_history_ts_low_.empty() ||
652 ucmp_->timestamp_size() == full_history_ts_low_.size());
653 const std::atomic<bool> kManualCompactionCanceledFalse{false};
654 CompactionJob compaction_job(
655 0, &compaction, db_options_, mutable_db_options_, env_options_,
656 versions_.get(), &shutting_down_, &log_buffer, nullptr, nullptr,
657 nullptr, nullptr, &mutex_, &error_handler_, snapshots,
658 earliest_write_conflict_snapshot, snapshot_checker, nullptr,
659 table_cache_, &event_logger, false, false, dbname_,
660 &compaction_job_stats_, Env::Priority::USER, nullptr /* IOTracer */,
661 /*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
662 env_->GenerateUniqueId(), DBImpl::GenerateDbSessionId(nullptr),
663 full_history_ts_low_);
664 VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
665
666 compaction_job.Prepare();
667 mutex_.Unlock();
668 Status s = compaction_job.Run();
669 ASSERT_OK(s);
670 ASSERT_OK(compaction_job.io_status());
671 mutex_.Lock();
672 ASSERT_OK(compaction_job.Install(*cfd->GetLatestMutableCFOptions()));
673 ASSERT_OK(compaction_job.io_status());
674 mutex_.Unlock();
675 log_buffer.FlushBufferToLog();
676
677 if (verify) {
678 ASSERT_GE(compaction_job_stats_.elapsed_micros, 0U);
679 ASSERT_EQ(compaction_job_stats_.num_input_files, num_input_files);
680
681 VerifyTables(output_level, expected_results,
682 expected_oldest_blob_file_numbers);
683 }
684
685 if (check_get_priority) {
686 CheckGetRateLimiterPriority(compaction_job);
687 }
688
689 if (verify_per_key_placement_) {
690 // Verify per_key_placement compaction
691 assert(compaction.SupportsPerKeyPlacement());
692 verify_per_key_placement_(compaction);
693 }
694 }
695
696 void CheckGetRateLimiterPriority(CompactionJob& compaction_job) {
697 // When the state from WriteController is normal.
698 ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_LOW);
699
700 WriteController* write_controller =
701 compaction_job.versions_->GetColumnFamilySet()->write_controller();
702
703 {
704 // When the state from WriteController is Delayed.
705 std::unique_ptr<WriteControllerToken> delay_token =
706 write_controller->GetDelayToken(1000000);
707 ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER);
708 }
709
710 {
711 // When the state from WriteController is Stopped.
712 std::unique_ptr<WriteControllerToken> stop_token =
713 write_controller->GetStopToken();
714 ASSERT_EQ(compaction_job.GetRateLimiterPriority(), Env::IO_USER);
715 }
716 }
717
718 std::shared_ptr<Env> env_guard_;
719 Env* env_;
720 std::shared_ptr<FileSystem> fs_;
721 std::string dbname_;
722 const Comparator* const ucmp_;
723 EnvOptions env_options_;
724 ImmutableDBOptions db_options_;
725 ColumnFamilyOptions cf_options_;
726 MutableCFOptions mutable_cf_options_;
727 MutableDBOptions mutable_db_options_;
728 std::shared_ptr<Cache> table_cache_;
729 WriteController write_controller_;
730 WriteBufferManager write_buffer_manager_;
731 std::unique_ptr<VersionSet> versions_;
732 InstrumentedMutex mutex_;
733 std::atomic<bool> shutting_down_;
734 std::shared_ptr<mock::MockTableFactory> mock_table_factory_;
735 CompactionJobStats compaction_job_stats_;
736 ColumnFamilyData* cfd_;
737 std::unique_ptr<CompactionFilter> compaction_filter_;
738 std::shared_ptr<MergeOperator> merge_op_;
739 ErrorHandler error_handler_;
740 std::string full_history_ts_low_;
741 const std::function<std::string(uint64_t)> encode_u64_ts_;
742 const bool test_io_priority_;
743 std::function<void(Compaction& comp)> verify_per_key_placement_;
744 const TableTypeForTest table_type_ = kMockTable;
745 };
746
747 // TODO(icanadi) Make it simpler once we mock out VersionSet
748 class CompactionJobTest : public CompactionJobTestBase {
749 public:
750 CompactionJobTest()
751 : CompactionJobTestBase(
752 test::PerThreadDBPath("compaction_job_test"), BytewiseComparator(),
753 [](uint64_t /*ts*/) { return ""; }, /*test_io_priority=*/false,
754 TableTypeForTest::kMockTable) {}
755 };
756
757 TEST_F(CompactionJobTest, Simple) {
758 NewDB();
759
760 auto expected_results = CreateTwoFiles(false);
761 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
762 constexpr int input_level = 0;
763 auto files = cfd->current()->storage_info()->LevelFiles(input_level);
764 ASSERT_EQ(2U, files.size());
765 RunCompaction({files}, {input_level}, {expected_results});
766 }
767
768 TEST_F(CompactionJobTest, DISABLED_SimpleCorrupted) {
769 NewDB();
770
771 auto expected_results = CreateTwoFiles(true);
772 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
773 constexpr int input_level = 0;
774 auto files = cfd->current()->storage_info()->LevelFiles(input_level);
775 RunCompaction({files}, {input_level}, {expected_results});
776 ASSERT_EQ(compaction_job_stats_.num_corrupt_keys, 400U);
777 }
778
779 TEST_F(CompactionJobTest, SimpleDeletion) {
780 NewDB();
781
782 auto file1 = mock::MakeMockFile({{KeyStr("c", 4U, kTypeDeletion), ""},
783 {KeyStr("c", 3U, kTypeValue), "val"}});
784 AddMockFile(file1);
785
786 auto file2 = mock::MakeMockFile({{KeyStr("b", 2U, kTypeValue), "val"},
787 {KeyStr("b", 1U, kTypeValue), "val"}});
788 AddMockFile(file2);
789
790 auto expected_results =
791 mock::MakeMockFile({{KeyStr("b", 0U, kTypeValue), "val"}});
792
793 SetLastSequence(4U);
794 constexpr int input_level = 0;
795 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
796 RunCompaction({files}, {input_level}, {expected_results});
797 }
798
799 TEST_F(CompactionJobTest, OutputNothing) {
800 NewDB();
801
802 auto file1 = mock::MakeMockFile({{KeyStr("a", 1U, kTypeValue), "val"}});
803
804 AddMockFile(file1);
805
806 auto file2 = mock::MakeMockFile({{KeyStr("a", 2U, kTypeDeletion), ""}});
807
808 AddMockFile(file2);
809
810 auto expected_results = mock::MakeMockFile();
811
812 SetLastSequence(4U);
813
814 constexpr int input_level = 0;
815 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
816 RunCompaction({files}, {input_level}, {expected_results});
817 }
818
819 TEST_F(CompactionJobTest, SimpleOverwrite) {
820 NewDB();
821
822 auto file1 = mock::MakeMockFile({
823 {KeyStr("a", 3U, kTypeValue), "val2"},
824 {KeyStr("b", 4U, kTypeValue), "val3"},
825 });
826 AddMockFile(file1);
827
828 auto file2 = mock::MakeMockFile({{KeyStr("a", 1U, kTypeValue), "val"},
829 {KeyStr("b", 2U, kTypeValue), "val"}});
830 AddMockFile(file2);
831
832 auto expected_results =
833 mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), "val2"},
834 {KeyStr("b", 0U, kTypeValue), "val3"}});
835
836 SetLastSequence(4U);
837 constexpr int input_level = 0;
838 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
839 RunCompaction({files}, {input_level}, {expected_results});
840 }
841
842 TEST_F(CompactionJobTest, SimpleNonLastLevel) {
843 NewDB();
844
845 auto file1 = mock::MakeMockFile({
846 {KeyStr("a", 5U, kTypeValue), "val2"},
847 {KeyStr("b", 6U, kTypeValue), "val3"},
848 });
849 AddMockFile(file1);
850
851 auto file2 = mock::MakeMockFile({{KeyStr("a", 3U, kTypeValue), "val"},
852 {KeyStr("b", 4U, kTypeValue), "val"}});
853 AddMockFile(file2, 1);
854
855 auto file3 = mock::MakeMockFile({{KeyStr("a", 1U, kTypeValue), "val"},
856 {KeyStr("b", 2U, kTypeValue), "val"}});
857 AddMockFile(file3, 2);
858
859 // Because level 1 is not the last level, the sequence numbers of a and b
860 // cannot be set to 0
861 auto expected_results =
862 mock::MakeMockFile({{KeyStr("a", 5U, kTypeValue), "val2"},
863 {KeyStr("b", 6U, kTypeValue), "val3"}});
864
865 SetLastSequence(6U);
866 const std::vector<int> input_levels = {0, 1};
867 auto lvl0_files =
868 cfd_->current()->storage_info()->LevelFiles(input_levels[0]);
869 auto lvl1_files =
870 cfd_->current()->storage_info()->LevelFiles(input_levels[1]);
871 RunCompaction({lvl0_files, lvl1_files}, input_levels, {expected_results});
872 }
873
874 TEST_F(CompactionJobTest, SimpleMerge) {
875 merge_op_ = MergeOperators::CreateStringAppendOperator();
876 NewDB();
877
878 auto file1 = mock::MakeMockFile({
879 {KeyStr("a", 5U, kTypeMerge), "5"},
880 {KeyStr("a", 4U, kTypeMerge), "4"},
881 {KeyStr("a", 3U, kTypeValue), "3"},
882 });
883 AddMockFile(file1);
884
885 auto file2 = mock::MakeMockFile(
886 {{KeyStr("b", 2U, kTypeMerge), "2"}, {KeyStr("b", 1U, kTypeValue), "1"}});
887 AddMockFile(file2);
888
889 auto expected_results =
890 mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), "3,4,5"},
891 {KeyStr("b", 0U, kTypeValue), "1,2"}});
892
893 SetLastSequence(5U);
894 constexpr int input_level = 0;
895 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
896 RunCompaction({files}, {input_level}, {expected_results});
897 }
898
899 TEST_F(CompactionJobTest, NonAssocMerge) {
900 merge_op_ = MergeOperators::CreateStringAppendTESTOperator();
901 NewDB();
902
903 auto file1 = mock::MakeMockFile({
904 {KeyStr("a", 5U, kTypeMerge), "5"},
905 {KeyStr("a", 4U, kTypeMerge), "4"},
906 {KeyStr("a", 3U, kTypeMerge), "3"},
907 });
908 AddMockFile(file1);
909
910 auto file2 = mock::MakeMockFile(
911 {{KeyStr("b", 2U, kTypeMerge), "2"}, {KeyStr("b", 1U, kTypeMerge), "1"}});
912 AddMockFile(file2);
913
914 auto expected_results =
915 mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), "3,4,5"},
916 {KeyStr("b", 0U, kTypeValue), "1,2"}});
917
918 SetLastSequence(5U);
919 constexpr int input_level = 0;
920 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
921 RunCompaction({files}, {input_level}, {expected_results});
922 }
923
924 // Filters merge operands with value 10.
925 TEST_F(CompactionJobTest, MergeOperandFilter) {
926 merge_op_ = MergeOperators::CreateUInt64AddOperator();
927 compaction_filter_.reset(new test::FilterNumber(10U));
928 NewDB();
929
930 auto file1 = mock::MakeMockFile(
931 {{KeyStr("a", 5U, kTypeMerge), test::EncodeInt(5U)},
932 {KeyStr("a", 4U, kTypeMerge), test::EncodeInt(10U)}, // Filtered
933 {KeyStr("a", 3U, kTypeMerge), test::EncodeInt(3U)}});
934 AddMockFile(file1);
935
936 auto file2 = mock::MakeMockFile({
937 {KeyStr("b", 2U, kTypeMerge), test::EncodeInt(2U)},
938 {KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)} // Filtered
939 });
940 AddMockFile(file2);
941
942 auto expected_results =
943 mock::MakeMockFile({{KeyStr("a", 0U, kTypeValue), test::EncodeInt(8U)},
944 {KeyStr("b", 0U, kTypeValue), test::EncodeInt(2U)}});
945
946 SetLastSequence(5U);
947 constexpr int input_level = 0;
948 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
949 RunCompaction({files}, {input_level}, {expected_results});
950 }
951
952 TEST_F(CompactionJobTest, FilterSomeMergeOperands) {
953 merge_op_ = MergeOperators::CreateUInt64AddOperator();
954 compaction_filter_.reset(new test::FilterNumber(10U));
955 NewDB();
956
957 auto file1 = mock::MakeMockFile(
958 {{KeyStr("a", 5U, kTypeMerge), test::EncodeInt(5U)},
959 {KeyStr("a", 4U, kTypeMerge), test::EncodeInt(10U)}, // Filtered
960 {KeyStr("a", 3U, kTypeValue), test::EncodeInt(5U)},
961 {KeyStr("d", 8U, kTypeMerge), test::EncodeInt(10U)}});
962 AddMockFile(file1);
963
964 auto file2 =
965 mock::MakeMockFile({{KeyStr("b", 2U, kTypeMerge), test::EncodeInt(10U)},
966 {KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)},
967 {KeyStr("c", 2U, kTypeMerge), test::EncodeInt(3U)},
968 {KeyStr("c", 1U, kTypeValue), test::EncodeInt(7U)},
969 {KeyStr("d", 1U, kTypeValue), test::EncodeInt(6U)}});
970 AddMockFile(file2);
971
972 auto file3 =
973 mock::MakeMockFile({{KeyStr("a", 1U, kTypeMerge), test::EncodeInt(3U)}});
974 AddMockFile(file3, 2);
975
976 auto expected_results = mock::MakeMockFile({
977 {KeyStr("a", 5U, kTypeValue), test::EncodeInt(10U)},
978 {KeyStr("c", 2U, kTypeValue), test::EncodeInt(10U)},
979 {KeyStr("d", 1U, kTypeValue), test::EncodeInt(6U)}
980 // b does not appear because the operands are filtered
981 });
982
983 SetLastSequence(5U);
984 constexpr int input_level = 0;
985 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
986 RunCompaction({files}, {input_level}, {expected_results});
987 }
988
989 // Test where all operands/merge results are filtered out.
990 TEST_F(CompactionJobTest, FilterAllMergeOperands) {
991 merge_op_ = MergeOperators::CreateUInt64AddOperator();
992 compaction_filter_.reset(new test::FilterNumber(10U));
993 NewDB();
994
995 auto file1 =
996 mock::MakeMockFile({{KeyStr("a", 11U, kTypeMerge), test::EncodeInt(10U)},
997 {KeyStr("a", 10U, kTypeMerge), test::EncodeInt(10U)},
998 {KeyStr("a", 9U, kTypeMerge), test::EncodeInt(10U)}});
999 AddMockFile(file1);
1000
1001 auto file2 =
1002 mock::MakeMockFile({{KeyStr("b", 8U, kTypeMerge), test::EncodeInt(10U)},
1003 {KeyStr("b", 7U, kTypeMerge), test::EncodeInt(10U)},
1004 {KeyStr("b", 6U, kTypeMerge), test::EncodeInt(10U)},
1005 {KeyStr("b", 5U, kTypeMerge), test::EncodeInt(10U)},
1006 {KeyStr("b", 4U, kTypeMerge), test::EncodeInt(10U)},
1007 {KeyStr("b", 3U, kTypeMerge), test::EncodeInt(10U)},
1008 {KeyStr("b", 2U, kTypeMerge), test::EncodeInt(10U)},
1009 {KeyStr("c", 2U, kTypeMerge), test::EncodeInt(10U)},
1010 {KeyStr("c", 1U, kTypeMerge), test::EncodeInt(10U)}});
1011 AddMockFile(file2);
1012
1013 auto file3 =
1014 mock::MakeMockFile({{KeyStr("a", 2U, kTypeMerge), test::EncodeInt(10U)},
1015 {KeyStr("b", 1U, kTypeMerge), test::EncodeInt(10U)}});
1016 AddMockFile(file3, 2);
1017
1018 SetLastSequence(11U);
1019 constexpr int input_level = 0;
1020 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
1021
1022 mock::KVVector empty_map;
1023 RunCompaction({files}, {input_level}, {empty_map});
1024 }
1025
1026 TEST_F(CompactionJobTest, SimpleSingleDelete) {
1027 NewDB();
1028
1029 auto file1 = mock::MakeMockFile({
1030 {KeyStr("a", 5U, kTypeDeletion), ""},
1031 {KeyStr("b", 6U, kTypeSingleDeletion), ""},
1032 });
1033 AddMockFile(file1);
1034
1035 auto file2 = mock::MakeMockFile({{KeyStr("a", 3U, kTypeValue), "val"},
1036 {KeyStr("b", 4U, kTypeValue), "val"}});
1037 AddMockFile(file2);
1038
1039 auto file3 = mock::MakeMockFile({
1040 {KeyStr("a", 1U, kTypeValue), "val"},
1041 });
1042 AddMockFile(file3, 2);
1043
1044 auto expected_results =
1045 mock::MakeMockFile({{KeyStr("a", 5U, kTypeDeletion), ""}});
1046
1047 SetLastSequence(6U);
1048 constexpr int input_level = 0;
1049 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
1050 RunCompaction({files}, {input_level}, {expected_results});
1051 }
1052
1053 TEST_F(CompactionJobTest, SingleDeleteSnapshots) {
1054 NewDB();
1055
1056 auto file1 = mock::MakeMockFile({
1057 {KeyStr("A", 12U, kTypeSingleDeletion), ""},
1058 {KeyStr("a", 12U, kTypeSingleDeletion), ""},
1059 {KeyStr("b", 21U, kTypeSingleDeletion), ""},
1060 {KeyStr("c", 22U, kTypeSingleDeletion), ""},
1061 {KeyStr("d", 9U, kTypeSingleDeletion), ""},
1062 {KeyStr("f", 21U, kTypeSingleDeletion), ""},
1063 {KeyStr("j", 11U, kTypeSingleDeletion), ""},
1064 {KeyStr("j", 9U, kTypeSingleDeletion), ""},
1065 {KeyStr("k", 12U, kTypeSingleDeletion), ""},
1066 {KeyStr("k", 11U, kTypeSingleDeletion), ""},
1067 {KeyStr("l", 3U, kTypeSingleDeletion), ""},
1068 {KeyStr("l", 2U, kTypeSingleDeletion), ""},
1069 });
1070 AddMockFile(file1);
1071
1072 auto file2 = mock::MakeMockFile({
1073 {KeyStr("0", 2U, kTypeSingleDeletion), ""},
1074 {KeyStr("a", 11U, kTypeValue), "val1"},
1075 {KeyStr("b", 11U, kTypeValue), "val2"},
1076 {KeyStr("c", 21U, kTypeValue), "val3"},
1077 {KeyStr("d", 8U, kTypeValue), "val4"},
1078 {KeyStr("e", 2U, kTypeSingleDeletion), ""},
1079 {KeyStr("f", 1U, kTypeValue), "val1"},
1080 {KeyStr("g", 11U, kTypeSingleDeletion), ""},
1081 {KeyStr("h", 2U, kTypeSingleDeletion), ""},
1082 {KeyStr("m", 12U, kTypeValue), "val1"},
1083 {KeyStr("m", 11U, kTypeSingleDeletion), ""},
1084 {KeyStr("m", 8U, kTypeValue), "val2"},
1085 });
1086 AddMockFile(file2);
1087
1088 auto file3 = mock::MakeMockFile({
1089 {KeyStr("A", 1U, kTypeValue), "val"},
1090 {KeyStr("e", 1U, kTypeValue), "val"},
1091 });
1092 AddMockFile(file3, 2);
1093
1094 auto expected_results = mock::MakeMockFile({
1095 {KeyStr("A", 12U, kTypeSingleDeletion), ""},
1096 {KeyStr("a", 12U, kTypeSingleDeletion), ""},
1097 {KeyStr("a", 11U, kTypeValue), ""},
1098 {KeyStr("b", 21U, kTypeSingleDeletion), ""},
1099 {KeyStr("b", 11U, kTypeValue), "val2"},
1100 {KeyStr("c", 22U, kTypeSingleDeletion), ""},
1101 {KeyStr("c", 21U, kTypeValue), ""},
1102 {KeyStr("e", 2U, kTypeSingleDeletion), ""},
1103 {KeyStr("f", 21U, kTypeSingleDeletion), ""},
1104 {KeyStr("f", 1U, kTypeValue), "val1"},
1105 {KeyStr("g", 11U, kTypeSingleDeletion), ""},
1106 {KeyStr("j", 11U, kTypeSingleDeletion), ""},
1107 {KeyStr("k", 11U, kTypeSingleDeletion), ""},
1108 {KeyStr("m", 12U, kTypeValue), "val1"},
1109 {KeyStr("m", 11U, kTypeSingleDeletion), ""},
1110 {KeyStr("m", 8U, kTypeValue), "val2"},
1111 });
1112
1113 SetLastSequence(22U);
1114 constexpr int input_level = 0;
1115 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
1116 RunCompaction({files}, {input_level}, {expected_results}, {10U, 20U}, 10U);
1117 }
1118
1119 TEST_F(CompactionJobTest, EarliestWriteConflictSnapshot) {
1120 NewDB();
1121
1122 // Test multiple snapshots where the earliest snapshot is not a
1123 // write-conflic-snapshot.
1124
1125 auto file1 = mock::MakeMockFile({
1126 {KeyStr("A", 24U, kTypeSingleDeletion), ""},
1127 {KeyStr("A", 23U, kTypeValue), "val"},
1128 {KeyStr("B", 24U, kTypeSingleDeletion), ""},
1129 {KeyStr("B", 23U, kTypeValue), "val"},
1130 {KeyStr("D", 24U, kTypeSingleDeletion), ""},
1131 {KeyStr("G", 32U, kTypeSingleDeletion), ""},
1132 {KeyStr("G", 31U, kTypeValue), "val"},
1133 {KeyStr("G", 24U, kTypeSingleDeletion), ""},
1134 {KeyStr("G", 23U, kTypeValue), "val2"},
1135 {KeyStr("H", 31U, kTypeValue), "val"},
1136 {KeyStr("H", 24U, kTypeSingleDeletion), ""},
1137 {KeyStr("H", 23U, kTypeValue), "val"},
1138 {KeyStr("I", 35U, kTypeSingleDeletion), ""},
1139 {KeyStr("I", 34U, kTypeValue), "val2"},
1140 {KeyStr("I", 33U, kTypeSingleDeletion), ""},
1141 {KeyStr("I", 32U, kTypeValue), "val3"},
1142 {KeyStr("I", 31U, kTypeSingleDeletion), ""},
1143 {KeyStr("J", 34U, kTypeValue), "val"},
1144 {KeyStr("J", 33U, kTypeSingleDeletion), ""},
1145 {KeyStr("J", 25U, kTypeValue), "val2"},
1146 {KeyStr("J", 24U, kTypeSingleDeletion), ""},
1147 });
1148 AddMockFile(file1);
1149
1150 auto file2 = mock::MakeMockFile({
1151 {KeyStr("A", 14U, kTypeSingleDeletion), ""},
1152 {KeyStr("A", 13U, kTypeValue), "val2"},
1153 {KeyStr("C", 14U, kTypeSingleDeletion), ""},
1154 {KeyStr("C", 13U, kTypeValue), "val"},
1155 {KeyStr("E", 12U, kTypeSingleDeletion), ""},
1156 {KeyStr("F", 4U, kTypeSingleDeletion), ""},
1157 {KeyStr("F", 3U, kTypeValue), "val"},
1158 {KeyStr("G", 14U, kTypeSingleDeletion), ""},
1159 {KeyStr("G", 13U, kTypeValue), "val3"},
1160 {KeyStr("H", 14U, kTypeSingleDeletion), ""},
1161 {KeyStr("H", 13U, kTypeValue), "val2"},
1162 {KeyStr("I", 13U, kTypeValue), "val4"},
1163 {KeyStr("I", 12U, kTypeSingleDeletion), ""},
1164 {KeyStr("I", 11U, kTypeValue), "val5"},
1165 {KeyStr("J", 15U, kTypeValue), "val3"},
1166 {KeyStr("J", 14U, kTypeSingleDeletion), ""},
1167 });
1168 AddMockFile(file2);
1169
1170 auto expected_results = mock::MakeMockFile({
1171 {KeyStr("A", 24U, kTypeSingleDeletion), ""},
1172 {KeyStr("A", 23U, kTypeValue), ""},
1173 {KeyStr("B", 24U, kTypeSingleDeletion), ""},
1174 {KeyStr("B", 23U, kTypeValue), ""},
1175 {KeyStr("D", 24U, kTypeSingleDeletion), ""},
1176 {KeyStr("E", 12U, kTypeSingleDeletion), ""},
1177 {KeyStr("G", 32U, kTypeSingleDeletion), ""},
1178 {KeyStr("G", 31U, kTypeValue), ""},
1179 {KeyStr("H", 31U, kTypeValue), "val"},
1180 {KeyStr("I", 35U, kTypeSingleDeletion), ""},
1181 {KeyStr("I", 34U, kTypeValue), ""},
1182 {KeyStr("I", 31U, kTypeSingleDeletion), ""},
1183 {KeyStr("I", 13U, kTypeValue), "val4"},
1184 {KeyStr("J", 34U, kTypeValue), "val"},
1185 {KeyStr("J", 33U, kTypeSingleDeletion), ""},
1186 {KeyStr("J", 25U, kTypeValue), "val2"},
1187 {KeyStr("J", 24U, kTypeSingleDeletion), ""},
1188 {KeyStr("J", 15U, kTypeValue), "val3"},
1189 {KeyStr("J", 14U, kTypeSingleDeletion), ""},
1190 });
1191
1192 SetLastSequence(24U);
1193 constexpr int input_level = 0;
1194 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
1195 RunCompaction({files}, {input_level}, {expected_results}, {10U, 20U, 30U},
1196 20U);
1197 }
1198
1199 TEST_F(CompactionJobTest, SingleDeleteZeroSeq) {
1200 NewDB();
1201
1202 auto file1 = mock::MakeMockFile({
1203 {KeyStr("A", 10U, kTypeSingleDeletion), ""},
1204 {KeyStr("dummy", 5U, kTypeValue), "val2"},
1205 });
1206 AddMockFile(file1);
1207
1208 auto file2 = mock::MakeMockFile({
1209 {KeyStr("A", 0U, kTypeValue), "val"},
1210 });
1211 AddMockFile(file2);
1212
1213 auto expected_results = mock::MakeMockFile({
1214 {KeyStr("dummy", 0U, kTypeValue), "val2"},
1215 });
1216
1217 SetLastSequence(22U);
1218 constexpr int input_level = 0;
1219 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
1220 RunCompaction({files}, {input_level}, {expected_results}, {});
1221 }
1222
1223 TEST_F(CompactionJobTest, MultiSingleDelete) {
1224 // Tests three scenarios involving multiple single delete/put pairs:
1225 //
1226 // A: Put Snapshot SDel Put SDel -> Put Snapshot SDel
1227 // B: Snapshot Put SDel Put SDel Snapshot -> Snapshot SDel Snapshot
1228 // C: SDel Put SDel Snapshot Put -> Snapshot Put
1229 // D: (Put) SDel Snapshot Put SDel -> (Put) SDel Snapshot SDel
1230 // E: Put SDel Snapshot Put SDel -> Snapshot SDel
1231 // F: Put SDel Put Sdel Snapshot -> removed
1232 // G: Snapshot SDel Put SDel Put -> Snapshot Put SDel
1233 // H: (Put) Put SDel Put Sdel Snapshot -> Removed
1234 // I: (Put) Snapshot Put SDel Put SDel -> SDel
1235 // J: Put Put SDel Put SDel SDel Snapshot Put Put SDel SDel Put
1236 // -> Snapshot Put
1237 // K: SDel SDel Put SDel Put Put Snapshot SDel Put SDel SDel Put SDel
1238 // -> Snapshot Put Snapshot SDel
1239 // L: SDel Put SDel Put SDel Snapshot SDel Put SDel SDel Put SDel
1240 // -> Snapshot SDel Put SDel
1241 // M: (Put) SDel Put SDel Put SDel Snapshot Put SDel SDel Put SDel SDel
1242 // -> SDel Snapshot Put SDel
1243 NewDB();
1244
1245 auto file1 = mock::MakeMockFile({
1246 {KeyStr("A", 14U, kTypeSingleDeletion), ""},
1247 {KeyStr("A", 13U, kTypeValue), "val5"},
1248 {KeyStr("A", 12U, kTypeSingleDeletion), ""},
1249 {KeyStr("B", 14U, kTypeSingleDeletion), ""},
1250 {KeyStr("B", 13U, kTypeValue), "val2"},
1251 {KeyStr("C", 14U, kTypeValue), "val3"},
1252 {KeyStr("D", 12U, kTypeSingleDeletion), ""},
1253 {KeyStr("D", 11U, kTypeValue), "val4"},
1254 {KeyStr("G", 15U, kTypeValue), "val"},
1255 {KeyStr("G", 14U, kTypeSingleDeletion), ""},
1256 {KeyStr("G", 13U, kTypeValue), "val"},
1257 {KeyStr("I", 14U, kTypeSingleDeletion), ""},
1258 {KeyStr("I", 13U, kTypeValue), "val"},
1259 {KeyStr("J", 15U, kTypeValue), "val"},
1260 {KeyStr("J", 14U, kTypeSingleDeletion), ""},
1261 {KeyStr("J", 13U, kTypeSingleDeletion), ""},
1262 {KeyStr("J", 12U, kTypeValue), "val"},
1263 {KeyStr("J", 11U, kTypeValue), "val"},
1264 {KeyStr("K", 16U, kTypeSingleDeletion), ""},
1265 {KeyStr("K", 15U, kTypeValue), "val1"},
1266 {KeyStr("K", 14U, kTypeSingleDeletion), ""},
1267 {KeyStr("K", 13U, kTypeSingleDeletion), ""},
1268 {KeyStr("K", 12U, kTypeValue), "val2"},
1269 {KeyStr("K", 11U, kTypeSingleDeletion), ""},
1270 {KeyStr("L", 16U, kTypeSingleDeletion), ""},
1271 {KeyStr("L", 15U, kTypeValue), "val"},
1272 {KeyStr("L", 14U, kTypeSingleDeletion), ""},
1273 {KeyStr("L", 13U, kTypeSingleDeletion), ""},
1274 {KeyStr("L", 12U, kTypeValue), "val"},
1275 {KeyStr("L", 11U, kTypeSingleDeletion), ""},
1276 {KeyStr("M", 16U, kTypeSingleDeletion), ""},
1277 {KeyStr("M", 15U, kTypeSingleDeletion), ""},
1278 {KeyStr("M", 14U, kTypeValue), "val"},
1279 {KeyStr("M", 13U, kTypeSingleDeletion), ""},
1280 {KeyStr("M", 12U, kTypeSingleDeletion), ""},
1281 {KeyStr("M", 11U, kTypeValue), "val"},
1282 });
1283 AddMockFile(file1);
1284
1285 auto file2 = mock::MakeMockFile({
1286 {KeyStr("A", 10U, kTypeValue), "val"},
1287 {KeyStr("B", 12U, kTypeSingleDeletion), ""},
1288 {KeyStr("B", 11U, kTypeValue), "val2"},
1289 {KeyStr("C", 10U, kTypeSingleDeletion), ""},
1290 {KeyStr("C", 9U, kTypeValue), "val6"},
1291 {KeyStr("C", 8U, kTypeSingleDeletion), ""},
1292 {KeyStr("D", 10U, kTypeSingleDeletion), ""},
1293 {KeyStr("E", 12U, kTypeSingleDeletion), ""},
1294 {KeyStr("E", 11U, kTypeValue), "val"},
1295 {KeyStr("E", 5U, kTypeSingleDeletion), ""},
1296 {KeyStr("E", 4U, kTypeValue), "val"},
1297 {KeyStr("F", 6U, kTypeSingleDeletion), ""},
1298 {KeyStr("F", 5U, kTypeValue), "val"},
1299 {KeyStr("F", 4U, kTypeSingleDeletion), ""},
1300 {KeyStr("F", 3U, kTypeValue), "val"},
1301 {KeyStr("G", 12U, kTypeSingleDeletion), ""},
1302 {KeyStr("H", 6U, kTypeSingleDeletion), ""},
1303 {KeyStr("H", 5U, kTypeValue), "val"},
1304 {KeyStr("H", 4U, kTypeSingleDeletion), ""},
1305 {KeyStr("H", 3U, kTypeValue), "val"},
1306 {KeyStr("I", 12U, kTypeSingleDeletion), ""},
1307 {KeyStr("I", 11U, kTypeValue), "val"},
1308 {KeyStr("J", 6U, kTypeSingleDeletion), ""},
1309 {KeyStr("J", 5U, kTypeSingleDeletion), ""},
1310 {KeyStr("J", 4U, kTypeValue), "val"},
1311 {KeyStr("J", 3U, kTypeSingleDeletion), ""},
1312 {KeyStr("J", 2U, kTypeValue), "val"},
1313 {KeyStr("K", 8U, kTypeValue), "val3"},
1314 {KeyStr("K", 7U, kTypeValue), "val4"},
1315 {KeyStr("K", 6U, kTypeSingleDeletion), ""},
1316 {KeyStr("K", 5U, kTypeValue), "val5"},
1317 {KeyStr("K", 2U, kTypeSingleDeletion), ""},
1318 {KeyStr("K", 1U, kTypeSingleDeletion), ""},
1319 {KeyStr("L", 5U, kTypeSingleDeletion), ""},
1320 {KeyStr("L", 4U, kTypeValue), "val"},
1321 {KeyStr("L", 3U, kTypeSingleDeletion), ""},
1322 {KeyStr("L", 2U, kTypeValue), "val"},
1323 {KeyStr("L", 1U, kTypeSingleDeletion), ""},
1324 {KeyStr("M", 10U, kTypeSingleDeletion), ""},
1325 {KeyStr("M", 7U, kTypeValue), "val"},
1326 {KeyStr("M", 5U, kTypeSingleDeletion), ""},
1327 {KeyStr("M", 4U, kTypeValue), "val"},
1328 {KeyStr("M", 3U, kTypeSingleDeletion), ""},
1329 });
1330 AddMockFile(file2);
1331
1332 auto file3 = mock::MakeMockFile({
1333 {KeyStr("D", 1U, kTypeValue), "val"},
1334 {KeyStr("H", 1U, kTypeValue), "val"},
1335 {KeyStr("I", 2U, kTypeValue), "val"},
1336 });
1337 AddMockFile(file3, 2);
1338
1339 auto file4 = mock::MakeMockFile({
1340 {KeyStr("M", 1U, kTypeValue), "val"},
1341 });
1342 AddMockFile(file4, 2);
1343
1344 auto expected_results =
1345 mock::MakeMockFile({{KeyStr("A", 14U, kTypeSingleDeletion), ""},
1346 {KeyStr("A", 13U, kTypeValue), ""},
1347 {KeyStr("A", 12U, kTypeSingleDeletion), ""},
1348 {KeyStr("A", 10U, kTypeValue), "val"},
1349 {KeyStr("B", 14U, kTypeSingleDeletion), ""},
1350 {KeyStr("B", 13U, kTypeValue), ""},
1351 {KeyStr("C", 14U, kTypeValue), "val3"},
1352 {KeyStr("D", 12U, kTypeSingleDeletion), ""},
1353 {KeyStr("D", 11U, kTypeValue), ""},
1354 {KeyStr("D", 10U, kTypeSingleDeletion), ""},
1355 {KeyStr("E", 12U, kTypeSingleDeletion), ""},
1356 {KeyStr("E", 11U, kTypeValue), ""},
1357 {KeyStr("G", 15U, kTypeValue), "val"},
1358 {KeyStr("G", 12U, kTypeSingleDeletion), ""},
1359 {KeyStr("I", 14U, kTypeSingleDeletion), ""},
1360 {KeyStr("I", 13U, kTypeValue), ""},
1361 {KeyStr("J", 15U, kTypeValue), "val"},
1362 {KeyStr("K", 16U, kTypeSingleDeletion), ""},
1363 {KeyStr("K", 15U, kTypeValue), ""},
1364 {KeyStr("K", 11U, kTypeSingleDeletion), ""},
1365 {KeyStr("K", 8U, kTypeValue), "val3"},
1366 {KeyStr("L", 16U, kTypeSingleDeletion), ""},
1367 {KeyStr("L", 15U, kTypeValue), ""},
1368 {KeyStr("L", 11U, kTypeSingleDeletion), ""},
1369 {KeyStr("M", 15U, kTypeSingleDeletion), ""},
1370 {KeyStr("M", 14U, kTypeValue), ""},
1371 {KeyStr("M", 3U, kTypeSingleDeletion), ""}});
1372
1373 SetLastSequence(22U);
1374 constexpr int input_level = 0;
1375 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
1376 RunCompaction({files}, {input_level}, {expected_results}, {10U}, 10U);
1377 }
1378
1379 // This test documents the behavior where a corrupt key follows a deletion or a
1380 // single deletion and the (single) deletion gets removed while the corrupt key
1381 // gets written out. TODO(noetzli): We probably want a better way to treat
1382 // corrupt keys.
1383 TEST_F(CompactionJobTest, DISABLED_CorruptionAfterDeletion) {
1384 NewDB();
1385
1386 auto file1 =
1387 mock::MakeMockFile({{test::KeyStr("A", 6U, kTypeValue), "val3"},
1388 {test::KeyStr("a", 5U, kTypeDeletion), ""},
1389 {test::KeyStr("a", 4U, kTypeValue, true), "val"}});
1390 AddMockFile(file1);
1391
1392 auto file2 =
1393 mock::MakeMockFile({{test::KeyStr("b", 3U, kTypeSingleDeletion), ""},
1394 {test::KeyStr("b", 2U, kTypeValue, true), "val"},
1395 {test::KeyStr("c", 1U, kTypeValue), "val2"}});
1396 AddMockFile(file2);
1397
1398 auto expected_results =
1399 mock::MakeMockFile({{test::KeyStr("A", 0U, kTypeValue), "val3"},
1400 {test::KeyStr("a", 0U, kTypeValue, true), "val"},
1401 {test::KeyStr("b", 0U, kTypeValue, true), "val"},
1402 {test::KeyStr("c", 0U, kTypeValue), "val2"}});
1403
1404 SetLastSequence(6U);
1405 constexpr int input_level = 0;
1406 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
1407 RunCompaction({files}, {input_level}, {expected_results});
1408 }
1409
1410 TEST_F(CompactionJobTest, OldestBlobFileNumber) {
1411 NewDB();
1412
1413 // Note: blob1 is inlined TTL, so it will not be considered for the purposes
1414 // of identifying the oldest referenced blob file. Similarly, blob6 will be
1415 // ignored because it has TTL and hence refers to a TTL blob file.
1416 const stl_wrappers::KVMap::value_type blob1(
1417 KeyStr("a", 1U, kTypeBlobIndex), BlobStrInlinedTTL("foo", 1234567890ULL));
1418 const stl_wrappers::KVMap::value_type blob2(KeyStr("b", 2U, kTypeBlobIndex),
1419 BlobStr(59, 123456, 999));
1420 const stl_wrappers::KVMap::value_type blob3(KeyStr("c", 3U, kTypeBlobIndex),
1421 BlobStr(138, 1000, 1 << 8));
1422 auto file1 = mock::MakeMockFile({blob1, blob2, blob3});
1423 AddMockFile(file1);
1424
1425 const stl_wrappers::KVMap::value_type blob4(KeyStr("d", 4U, kTypeBlobIndex),
1426 BlobStr(199, 3 << 10, 1 << 20));
1427 const stl_wrappers::KVMap::value_type blob5(KeyStr("e", 5U, kTypeBlobIndex),
1428 BlobStr(19, 6789, 333));
1429 const stl_wrappers::KVMap::value_type blob6(
1430 KeyStr("f", 6U, kTypeBlobIndex),
1431 BlobStrTTL(5, 2048, 1 << 7, 1234567890ULL));
1432 auto file2 = mock::MakeMockFile({blob4, blob5, blob6});
1433 AddMockFile(file2);
1434
1435 const stl_wrappers::KVMap::value_type expected_blob1(
1436 KeyStr("a", 0U, kTypeBlobIndex), blob1.second);
1437 const stl_wrappers::KVMap::value_type expected_blob2(
1438 KeyStr("b", 0U, kTypeBlobIndex), blob2.second);
1439 const stl_wrappers::KVMap::value_type expected_blob3(
1440 KeyStr("c", 0U, kTypeBlobIndex), blob3.second);
1441 const stl_wrappers::KVMap::value_type expected_blob4(
1442 KeyStr("d", 0U, kTypeBlobIndex), blob4.second);
1443 const stl_wrappers::KVMap::value_type expected_blob5(
1444 KeyStr("e", 0U, kTypeBlobIndex), blob5.second);
1445 const stl_wrappers::KVMap::value_type expected_blob6(
1446 KeyStr("f", 0U, kTypeBlobIndex), blob6.second);
1447 auto expected_results =
1448 mock::MakeMockFile({expected_blob1, expected_blob2, expected_blob3,
1449 expected_blob4, expected_blob5, expected_blob6});
1450
1451 SetLastSequence(6U);
1452 constexpr int input_level = 0;
1453 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
1454 RunCompaction({files}, {input_level}, {expected_results},
1455 std::vector<SequenceNumber>(), kMaxSequenceNumber,
1456 /* output_level */ 1, /* verify */ true,
1457 /* expected_oldest_blob_file_numbers */ {19});
1458 }
1459
1460 TEST_F(CompactionJobTest, VerifyPenultimateLevelOutput) {
1461 cf_options_.bottommost_temperature = Temperature::kCold;
1462 SyncPoint::GetInstance()->SetCallBack(
1463 "Compaction::SupportsPerKeyPlacement:Enabled", [&](void* arg) {
1464 auto supports_per_key_placement = static_cast<bool*>(arg);
1465 *supports_per_key_placement = true;
1466 });
1467
1468 std::atomic_uint64_t latest_cold_seq = 0;
1469
1470 SyncPoint::GetInstance()->SetCallBack(
1471 "CompactionIterator::PrepareOutput.context", [&](void* arg) {
1472 auto context = static_cast<PerKeyPlacementContext*>(arg);
1473 context->output_to_penultimate_level =
1474 context->seq_num > latest_cold_seq;
1475 });
1476 SyncPoint::GetInstance()->EnableProcessing();
1477
1478 NewDB();
1479
1480 // Add files on different levels that may overlap
1481 auto file0_1 = mock::MakeMockFile({{KeyStr("z", 12U, kTypeValue), "val"}});
1482 AddMockFile(file0_1);
1483
1484 auto file1_1 = mock::MakeMockFile({{KeyStr("b", 10U, kTypeValue), "val"},
1485 {KeyStr("f", 11U, kTypeValue), "val"}});
1486 AddMockFile(file1_1, 1);
1487 auto file1_2 = mock::MakeMockFile({{KeyStr("j", 12U, kTypeValue), "val"},
1488 {KeyStr("k", 13U, kTypeValue), "val"}});
1489 AddMockFile(file1_2, 1);
1490 auto file1_3 = mock::MakeMockFile({{KeyStr("p", 14U, kTypeValue), "val"},
1491 {KeyStr("u", 15U, kTypeValue), "val"}});
1492 AddMockFile(file1_3, 1);
1493
1494 auto file2_1 = mock::MakeMockFile({{KeyStr("f", 8U, kTypeValue), "val"},
1495 {KeyStr("h", 9U, kTypeValue), "val"}});
1496 AddMockFile(file2_1, 2);
1497 auto file2_2 = mock::MakeMockFile({{KeyStr("m", 6U, kTypeValue), "val"},
1498 {KeyStr("p", 7U, kTypeValue), "val"}});
1499 AddMockFile(file2_2, 2);
1500
1501 auto file3_1 = mock::MakeMockFile({{KeyStr("g", 2U, kTypeValue), "val"},
1502 {KeyStr("k", 3U, kTypeValue), "val"}});
1503 AddMockFile(file3_1, 3);
1504 auto file3_2 = mock::MakeMockFile({{KeyStr("v", 4U, kTypeValue), "val"},
1505 {KeyStr("x", 5U, kTypeValue), "val"}});
1506 AddMockFile(file3_2, 3);
1507
1508 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
1509 const std::vector<int> input_levels = {0, 1, 2, 3};
1510 auto files0 = cfd->current()->storage_info()->LevelFiles(input_levels[0]);
1511 auto files1 = cfd->current()->storage_info()->LevelFiles(input_levels[1]);
1512 auto files2 = cfd->current()->storage_info()->LevelFiles(input_levels[2]);
1513 auto files3 = cfd->current()->storage_info()->LevelFiles(input_levels[3]);
1514
1515 RunLastLevelCompaction(
1516 {files0, files1, files2, files3}, input_levels,
1517 /*verify_func=*/[&](Compaction& comp) {
1518 for (char c = 'a'; c <= 'z'; c++) {
1519 std::string c_str;
1520 c_str = c;
1521 const Slice key(c_str);
1522 if (c == 'a') {
1523 ASSERT_FALSE(comp.WithinPenultimateLevelOutputRange(key));
1524 } else {
1525 ASSERT_TRUE(comp.WithinPenultimateLevelOutputRange(key));
1526 }
1527 }
1528 });
1529 }
1530
1531 TEST_F(CompactionJobTest, NoEnforceSingleDeleteContract) {
1532 db_options_.enforce_single_del_contracts = false;
1533 NewDB();
1534
1535 auto file =
1536 mock::MakeMockFile({{KeyStr("a", 4U, kTypeSingleDeletion), ""},
1537 {KeyStr("a", 3U, kTypeDeletion), "dontcare"}});
1538 AddMockFile(file);
1539 SetLastSequence(4U);
1540
1541 auto expected_results = mock::MakeMockFile();
1542 constexpr int input_level = 0;
1543 auto files = cfd_->current()->storage_info()->LevelFiles(input_level);
1544 RunCompaction({files}, {input_level}, {expected_results});
1545 }
1546
1547 TEST_F(CompactionJobTest, InputSerialization) {
1548 // Setup a random CompactionServiceInput
1549 CompactionServiceInput input;
1550 const int kStrMaxLen = 1000;
1551 Random rnd(static_cast<uint32_t>(time(nullptr)));
1552 Random64 rnd64(time(nullptr));
1553 input.column_family.name = rnd.RandomString(rnd.Uniform(kStrMaxLen));
1554 input.column_family.options.comparator = ReverseBytewiseComparator();
1555 input.column_family.options.max_bytes_for_level_base =
1556 rnd64.Uniform(UINT64_MAX);
1557 input.column_family.options.disable_auto_compactions = rnd.OneIn(2);
1558 input.column_family.options.compression = kZSTD;
1559 input.column_family.options.compression_opts.level = 4;
1560 input.db_options.max_background_flushes = 10;
1561 input.db_options.paranoid_checks = rnd.OneIn(2);
1562 input.db_options.statistics = CreateDBStatistics();
1563 input.db_options.env = env_;
1564 while (!rnd.OneIn(10)) {
1565 input.snapshots.emplace_back(rnd64.Uniform(UINT64_MAX));
1566 }
1567 while (!rnd.OneIn(10)) {
1568 input.input_files.emplace_back(rnd.RandomString(
1569 rnd.Uniform(kStrMaxLen - 1) +
1570 1)); // input file name should have at least one character
1571 }
1572 input.output_level = 4;
1573 input.has_begin = rnd.OneIn(2);
1574 if (input.has_begin) {
1575 input.begin = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
1576 }
1577 input.has_end = rnd.OneIn(2);
1578 if (input.has_end) {
1579 input.end = rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen));
1580 }
1581
1582 std::string output;
1583 ASSERT_OK(input.Write(&output));
1584
1585 // Test deserialization
1586 CompactionServiceInput deserialized1;
1587 ASSERT_OK(CompactionServiceInput::Read(output, &deserialized1));
1588 ASSERT_TRUE(deserialized1.TEST_Equals(&input));
1589
1590 // Test mismatch
1591 deserialized1.db_options.max_background_flushes += 10;
1592 std::string mismatch;
1593 ASSERT_FALSE(deserialized1.TEST_Equals(&input, &mismatch));
1594 ASSERT_EQ(mismatch, "db_options.max_background_flushes");
1595
1596 // Test unknown field
1597 CompactionServiceInput deserialized2;
1598 output.clear();
1599 ASSERT_OK(input.Write(&output));
1600 output.append("new_field=123;");
1601
1602 ASSERT_OK(CompactionServiceInput::Read(output, &deserialized2));
1603 ASSERT_TRUE(deserialized2.TEST_Equals(&input));
1604
1605 // Test missing field
1606 CompactionServiceInput deserialized3;
1607 deserialized3.output_level = 0;
1608 std::string to_remove = "output_level=4;";
1609 size_t pos = output.find(to_remove);
1610 ASSERT_TRUE(pos != std::string::npos);
1611 output.erase(pos, to_remove.length());
1612 ASSERT_OK(CompactionServiceInput::Read(output, &deserialized3));
1613 mismatch.clear();
1614 ASSERT_FALSE(deserialized3.TEST_Equals(&input, &mismatch));
1615 ASSERT_EQ(mismatch, "output_level");
1616
1617 // manually set the value back, should match the original structure
1618 deserialized3.output_level = 4;
1619 ASSERT_TRUE(deserialized3.TEST_Equals(&input));
1620
1621 // Test invalid version
1622 output.clear();
1623 ASSERT_OK(input.Write(&output));
1624
1625 uint32_t data_version = DecodeFixed32(output.data());
1626 const size_t kDataVersionSize = sizeof(data_version);
1627 ASSERT_EQ(data_version,
1628 1U); // Update once the default data version is changed
1629 char buf[kDataVersionSize];
1630 EncodeFixed32(buf, data_version + 10); // make sure it's not valid
1631 output.replace(0, kDataVersionSize, buf, kDataVersionSize);
1632 Status s = CompactionServiceInput::Read(output, &deserialized3);
1633 ASSERT_TRUE(s.IsNotSupported());
1634 }
1635
1636 TEST_F(CompactionJobTest, ResultSerialization) {
1637 // Setup a random CompactionServiceResult
1638 CompactionServiceResult result;
1639 const int kStrMaxLen = 1000;
1640 Random rnd(static_cast<uint32_t>(time(nullptr)));
1641 Random64 rnd64(time(nullptr));
1642 std::vector<Status> status_list = {
1643 Status::OK(),
1644 Status::InvalidArgument("invalid option"),
1645 Status::Aborted("failed to run"),
1646 Status::NotSupported("not supported option"),
1647 };
1648 result.status =
1649 status_list.at(rnd.Uniform(static_cast<int>(status_list.size())));
1650 while (!rnd.OneIn(10)) {
1651 UniqueId64x2 id{rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX)};
1652 result.output_files.emplace_back(
1653 rnd.RandomString(rnd.Uniform(kStrMaxLen)), rnd64.Uniform(UINT64_MAX),
1654 rnd64.Uniform(UINT64_MAX),
1655 rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)),
1656 rnd.RandomBinaryString(rnd.Uniform(kStrMaxLen)),
1657 rnd64.Uniform(UINT64_MAX), rnd64.Uniform(UINT64_MAX),
1658 rnd64.Uniform(UINT64_MAX), rnd.OneIn(2), id);
1659 }
1660 result.output_level = rnd.Uniform(10);
1661 result.output_path = rnd.RandomString(rnd.Uniform(kStrMaxLen));
1662 result.num_output_records = rnd64.Uniform(UINT64_MAX);
1663 result.total_bytes = rnd64.Uniform(UINT64_MAX);
1664 result.bytes_read = 123;
1665 result.bytes_written = rnd64.Uniform(UINT64_MAX);
1666 result.stats.elapsed_micros = rnd64.Uniform(UINT64_MAX);
1667 result.stats.num_output_files = rnd.Uniform(1000);
1668 result.stats.is_full_compaction = rnd.OneIn(2);
1669 result.stats.num_single_del_mismatch = rnd64.Uniform(UINT64_MAX);
1670 result.stats.num_input_files = 9;
1671
1672 std::string output;
1673 ASSERT_OK(result.Write(&output));
1674
1675 // Test deserialization
1676 CompactionServiceResult deserialized1;
1677 ASSERT_OK(CompactionServiceResult::Read(output, &deserialized1));
1678 ASSERT_TRUE(deserialized1.TEST_Equals(&result));
1679
1680 // Test mismatch
1681 deserialized1.stats.num_input_files += 10;
1682 std::string mismatch;
1683 ASSERT_FALSE(deserialized1.TEST_Equals(&result, &mismatch));
1684 ASSERT_EQ(mismatch, "stats.num_input_files");
1685
1686 // Test unique id mismatch
1687 if (!result.output_files.empty()) {
1688 CompactionServiceResult deserialized_tmp;
1689 ASSERT_OK(CompactionServiceResult::Read(output, &deserialized_tmp));
1690 deserialized_tmp.output_files[0].unique_id[0] += 1;
1691 ASSERT_FALSE(deserialized_tmp.TEST_Equals(&result, &mismatch));
1692 ASSERT_EQ(mismatch, "output_files.unique_id");
1693 deserialized_tmp.status.PermitUncheckedError();
1694 }
1695
1696 // Test unknown field
1697 CompactionServiceResult deserialized2;
1698 output.clear();
1699 ASSERT_OK(result.Write(&output));
1700 output.append("new_field=123;");
1701
1702 ASSERT_OK(CompactionServiceResult::Read(output, &deserialized2));
1703 ASSERT_TRUE(deserialized2.TEST_Equals(&result));
1704
1705 // Test missing field
1706 CompactionServiceResult deserialized3;
1707 deserialized3.bytes_read = 0;
1708 std::string to_remove = "bytes_read=123;";
1709 size_t pos = output.find(to_remove);
1710 ASSERT_TRUE(pos != std::string::npos);
1711 output.erase(pos, to_remove.length());
1712 ASSERT_OK(CompactionServiceResult::Read(output, &deserialized3));
1713 mismatch.clear();
1714 ASSERT_FALSE(deserialized3.TEST_Equals(&result, &mismatch));
1715 ASSERT_EQ(mismatch, "bytes_read");
1716
1717 deserialized3.bytes_read = 123;
1718 ASSERT_TRUE(deserialized3.TEST_Equals(&result));
1719
1720 // Test invalid version
1721 output.clear();
1722 ASSERT_OK(result.Write(&output));
1723
1724 uint32_t data_version = DecodeFixed32(output.data());
1725 const size_t kDataVersionSize = sizeof(data_version);
1726 ASSERT_EQ(data_version,
1727 1U); // Update once the default data version is changed
1728 char buf[kDataVersionSize];
1729 EncodeFixed32(buf, data_version + 10); // make sure it's not valid
1730 output.replace(0, kDataVersionSize, buf, kDataVersionSize);
1731 Status s = CompactionServiceResult::Read(output, &deserialized3);
1732 ASSERT_TRUE(s.IsNotSupported());
1733 for (const auto& item : status_list) {
1734 item.PermitUncheckedError();
1735 }
1736 }
1737
1738 class CompactionJobDynamicFileSizeTest
1739 : public CompactionJobTestBase,
1740 public ::testing::WithParamInterface<bool> {
1741 public:
1742 CompactionJobDynamicFileSizeTest()
1743 : CompactionJobTestBase(
1744 test::PerThreadDBPath("compaction_job_dynamic_file_size_test"),
1745 BytewiseComparator(), [](uint64_t /*ts*/) { return ""; },
1746 /*test_io_priority=*/false, TableTypeForTest::kMockTable) {}
1747 };
1748
1749 TEST_P(CompactionJobDynamicFileSizeTest, CutForMaxCompactionBytes) {
1750 // dynamic_file_size option should have no impact on cutting for max
1751 // compaction bytes.
1752 bool enable_dyanmic_file_size = GetParam();
1753 cf_options_.level_compaction_dynamic_file_size = enable_dyanmic_file_size;
1754
1755 NewDB();
1756 mutable_cf_options_.target_file_size_base = 80;
1757 mutable_cf_options_.max_compaction_bytes = 21;
1758
1759 auto file1 = mock::MakeMockFile({
1760 {KeyStr("c", 5U, kTypeValue), "val2"},
1761 {KeyStr("n", 6U, kTypeValue), "val3"},
1762 });
1763 AddMockFile(file1);
1764
1765 auto file2 = mock::MakeMockFile({{KeyStr("h", 3U, kTypeValue), "val"},
1766 {KeyStr("j", 4U, kTypeValue), "val"}});
1767 AddMockFile(file2, 1);
1768
1769 // Create three L2 files, each size 10.
1770 // max_compaction_bytes 21 means the compaction output in L1 will
1771 // be cut to at least two files.
1772 auto file3 = mock::MakeMockFile({{KeyStr("b", 1U, kTypeValue), "val"},
1773 {KeyStr("c", 1U, kTypeValue), "val"},
1774 {KeyStr("c1", 1U, kTypeValue), "val"},
1775 {KeyStr("c2", 1U, kTypeValue), "val"},
1776 {KeyStr("c3", 1U, kTypeValue), "val"},
1777 {KeyStr("c4", 1U, kTypeValue), "val"},
1778 {KeyStr("d", 1U, kTypeValue), "val"},
1779 {KeyStr("e", 2U, kTypeValue), "val"}});
1780 AddMockFile(file3, 2);
1781
1782 auto file4 = mock::MakeMockFile({{KeyStr("h", 1U, kTypeValue), "val"},
1783 {KeyStr("i", 1U, kTypeValue), "val"},
1784 {KeyStr("i1", 1U, kTypeValue), "val"},
1785 {KeyStr("i2", 1U, kTypeValue), "val"},
1786 {KeyStr("i3", 1U, kTypeValue), "val"},
1787 {KeyStr("i4", 1U, kTypeValue), "val"},
1788 {KeyStr("j", 1U, kTypeValue), "val"},
1789 {KeyStr("k", 2U, kTypeValue), "val"}});
1790 AddMockFile(file4, 2);
1791
1792 auto file5 = mock::MakeMockFile({{KeyStr("l", 1U, kTypeValue), "val"},
1793 {KeyStr("m", 1U, kTypeValue), "val"},
1794 {KeyStr("m1", 1U, kTypeValue), "val"},
1795 {KeyStr("m2", 1U, kTypeValue), "val"},
1796 {KeyStr("m3", 1U, kTypeValue), "val"},
1797 {KeyStr("m4", 1U, kTypeValue), "val"},
1798 {KeyStr("n", 1U, kTypeValue), "val"},
1799 {KeyStr("o", 2U, kTypeValue), "val"}});
1800 AddMockFile(file5, 2);
1801
1802 // The expected output should be:
1803 // L1: [c, h, j] [n]
1804 // L2: [b ... e] [h ... k] [l ... o]
1805 // It's better to have "j" in the first file, because anyway it's overlapping
1806 // with the second file on L2.
1807 // (Note: before this PR, it was cut at "h" because it's using the internal
1808 // comparator which think L1 "h" with seqno 3 is smaller than L2 "h" with
1809 // seqno 1, but actually they're overlapped with the compaction picker).
1810
1811 auto expected_file1 =
1812 mock::MakeMockFile({{KeyStr("c", 5U, kTypeValue), "val2"},
1813 {KeyStr("h", 3U, kTypeValue), "val"},
1814 {KeyStr("j", 4U, kTypeValue), "val"}});
1815 auto expected_file2 =
1816 mock::MakeMockFile({{KeyStr("n", 6U, kTypeValue), "val3"}});
1817
1818 SetLastSequence(6U);
1819
1820 const std::vector<int> input_levels = {0, 1};
1821 auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
1822 auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
1823
1824 RunCompaction({lvl0_files, lvl1_files}, input_levels,
1825 {expected_file1, expected_file2});
1826 }
1827
1828 TEST_P(CompactionJobDynamicFileSizeTest, CutToSkipGrandparentFile) {
1829 bool enable_dyanmic_file_size = GetParam();
1830 cf_options_.level_compaction_dynamic_file_size = enable_dyanmic_file_size;
1831
1832 NewDB();
1833 // Make sure the grandparent level file size (10) qualifies skipping.
1834 // Currently, it has to be > 1/8 of target file size.
1835 mutable_cf_options_.target_file_size_base = 70;
1836
1837 auto file1 = mock::MakeMockFile({
1838 {KeyStr("a", 5U, kTypeValue), "val2"},
1839 {KeyStr("z", 6U, kTypeValue), "val3"},
1840 });
1841 AddMockFile(file1);
1842
1843 auto file2 = mock::MakeMockFile({{KeyStr("c", 3U, kTypeValue), "val"},
1844 {KeyStr("x", 4U, kTypeValue), "val"}});
1845 AddMockFile(file2, 1);
1846
1847 auto file3 = mock::MakeMockFile({{KeyStr("b", 1U, kTypeValue), "val"},
1848 {KeyStr("d", 2U, kTypeValue), "val"}});
1849 AddMockFile(file3, 2);
1850
1851 auto file4 = mock::MakeMockFile({{KeyStr("h", 1U, kTypeValue), "val"},
1852 {KeyStr("i", 2U, kTypeValue), "val"}});
1853 AddMockFile(file4, 2);
1854
1855 auto file5 = mock::MakeMockFile({{KeyStr("v", 1U, kTypeValue), "val"},
1856 {KeyStr("y", 2U, kTypeValue), "val"}});
1857 AddMockFile(file5, 2);
1858
1859 auto expected_file1 =
1860 mock::MakeMockFile({{KeyStr("a", 5U, kTypeValue), "val2"},
1861 {KeyStr("c", 3U, kTypeValue), "val"}});
1862 auto expected_file2 =
1863 mock::MakeMockFile({{KeyStr("x", 4U, kTypeValue), "val"},
1864 {KeyStr("z", 6U, kTypeValue), "val3"}});
1865
1866 auto expected_file_disable_dynamic_file_size =
1867 mock::MakeMockFile({{KeyStr("a", 5U, kTypeValue), "val2"},
1868 {KeyStr("c", 3U, kTypeValue), "val"},
1869 {KeyStr("x", 4U, kTypeValue), "val"},
1870 {KeyStr("z", 6U, kTypeValue), "val3"}});
1871
1872 SetLastSequence(6U);
1873 const std::vector<int> input_levels = {0, 1};
1874 auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
1875 auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
1876 if (enable_dyanmic_file_size) {
1877 RunCompaction({lvl0_files, lvl1_files}, input_levels,
1878 {expected_file1, expected_file2});
1879 } else {
1880 RunCompaction({lvl0_files, lvl1_files}, input_levels,
1881 {expected_file_disable_dynamic_file_size});
1882 }
1883 }
1884
1885 TEST_P(CompactionJobDynamicFileSizeTest, CutToAlignGrandparentBoundary) {
1886 bool enable_dyanmic_file_size = GetParam();
1887 cf_options_.level_compaction_dynamic_file_size = enable_dyanmic_file_size;
1888 NewDB();
1889
1890 // MockTable has 1 byte per entry by default and each file is 10 bytes.
1891 // When the file size is smaller than 100, it won't cut file earlier to align
1892 // with its grandparent boundary.
1893 const size_t kKeyValueSize = 10000;
1894 mock_table_factory_->SetKeyValueSize(kKeyValueSize);
1895
1896 mutable_cf_options_.target_file_size_base = 10 * kKeyValueSize;
1897
1898 mock::KVVector file1;
1899 char ch = 'd';
1900 // Add value from d -> o
1901 for (char i = 0; i < 12; i++) {
1902 file1.emplace_back(KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
1903 "val" + std::to_string(i));
1904 }
1905
1906 AddMockFile(file1);
1907
1908 auto file2 = mock::MakeMockFile({{KeyStr("e", 3U, kTypeValue), "val"},
1909 {KeyStr("s", 4U, kTypeValue), "val"}});
1910 AddMockFile(file2, 1);
1911
1912 // the 1st grandparent file should be skipped
1913 auto file3 = mock::MakeMockFile({{KeyStr("a", 1U, kTypeValue), "val"},
1914 {KeyStr("b", 2U, kTypeValue), "val"}});
1915 AddMockFile(file3, 2);
1916
1917 auto file4 = mock::MakeMockFile({{KeyStr("c", 1U, kTypeValue), "val"},
1918 {KeyStr("e", 2U, kTypeValue), "val"}});
1919 AddMockFile(file4, 2);
1920
1921 auto file5 = mock::MakeMockFile({{KeyStr("h", 1U, kTypeValue), "val"},
1922 {KeyStr("j", 2U, kTypeValue), "val"}});
1923 AddMockFile(file5, 2);
1924
1925 auto file6 = mock::MakeMockFile({{KeyStr("k", 1U, kTypeValue), "val"},
1926 {KeyStr("n", 2U, kTypeValue), "val"}});
1927 AddMockFile(file6, 2);
1928
1929 auto file7 = mock::MakeMockFile({{KeyStr("q", 1U, kTypeValue), "val"},
1930 {KeyStr("t", 2U, kTypeValue), "val"}});
1931 AddMockFile(file7, 2);
1932
1933 // The expected outputs are:
1934 // L1: [d,e,f,g,h,i,j] [k,l,m,n,o,s]
1935 // L2: [a, b] [c, e] [h, j] [k, n] [q, t]
1936 // The first output cut earlier at "j", so it could be aligned with L2 files.
1937 // If dynamic_file_size is not enabled, it will be cut based on the
1938 // target_file_size
1939 mock::KVVector expected_file1;
1940 for (char i = 0; i < 7; i++) {
1941 expected_file1.emplace_back(
1942 KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
1943 "val" + std::to_string(i));
1944 }
1945
1946 mock::KVVector expected_file2;
1947 for (char i = 7; i < 12; i++) {
1948 expected_file2.emplace_back(
1949 KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
1950 "val" + std::to_string(i));
1951 }
1952 expected_file2.emplace_back(KeyStr("s", 4U, kTypeValue), "val");
1953
1954 mock::KVVector expected_file_disable_dynamic_file_size1;
1955 for (char i = 0; i < 10; i++) {
1956 expected_file_disable_dynamic_file_size1.emplace_back(
1957 KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
1958 "val" + std::to_string(i));
1959 }
1960
1961 mock::KVVector expected_file_disable_dynamic_file_size2;
1962 for (char i = 10; i < 12; i++) {
1963 expected_file_disable_dynamic_file_size2.emplace_back(
1964 KeyStr(std::string(1, ch + i), i + 10, kTypeValue),
1965 "val" + std::to_string(i));
1966 }
1967
1968 expected_file_disable_dynamic_file_size2.emplace_back(
1969 KeyStr("s", 4U, kTypeValue), "val");
1970
1971 SetLastSequence(22U);
1972 const std::vector<int> input_levels = {0, 1};
1973 auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
1974 auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
1975 if (enable_dyanmic_file_size) {
1976 RunCompaction({lvl0_files, lvl1_files}, input_levels,
1977 {expected_file1, expected_file2});
1978 } else {
1979 RunCompaction({lvl0_files, lvl1_files}, input_levels,
1980 {expected_file_disable_dynamic_file_size1,
1981 expected_file_disable_dynamic_file_size2});
1982 }
1983 }
1984
1985 TEST_P(CompactionJobDynamicFileSizeTest, CutToAlignGrandparentBoundarySameKey) {
1986 bool enable_dyanmic_file_size = GetParam();
1987 cf_options_.level_compaction_dynamic_file_size = enable_dyanmic_file_size;
1988 NewDB();
1989
1990 // MockTable has 1 byte per entry by default and each file is 10 bytes.
1991 // When the file size is smaller than 100, it won't cut file earlier to align
1992 // with its grandparent boundary.
1993 const size_t kKeyValueSize = 10000;
1994 mock_table_factory_->SetKeyValueSize(kKeyValueSize);
1995
1996 mutable_cf_options_.target_file_size_base = 10 * kKeyValueSize;
1997
1998 mock::KVVector file1;
1999 for (int i = 0; i < 7; i++) {
2000 file1.emplace_back(KeyStr("a", 100 - i, kTypeValue),
2001 "val" + std::to_string(100 - i));
2002 }
2003 file1.emplace_back(KeyStr("b", 90, kTypeValue), "valb");
2004
2005 AddMockFile(file1);
2006
2007 auto file2 = mock::MakeMockFile({{KeyStr("a", 93U, kTypeValue), "val93"},
2008 {KeyStr("b", 90U, kTypeValue), "valb"}});
2009 AddMockFile(file2, 1);
2010
2011 auto file3 = mock::MakeMockFile({{KeyStr("a", 89U, kTypeValue), "val"},
2012 {KeyStr("a", 88U, kTypeValue), "val"}});
2013 AddMockFile(file3, 2);
2014
2015 auto file4 = mock::MakeMockFile({{KeyStr("a", 87U, kTypeValue), "val"},
2016 {KeyStr("a", 86U, kTypeValue), "val"}});
2017 AddMockFile(file4, 2);
2018
2019 auto file5 = mock::MakeMockFile({{KeyStr("b", 85U, kTypeValue), "val"},
2020 {KeyStr("b", 84U, kTypeValue), "val"}});
2021 AddMockFile(file5, 2);
2022
2023 mock::KVVector expected_file1;
2024 mock::KVVector expected_file_disable_dynamic_file_size;
2025
2026 for (int i = 0; i < 8; i++) {
2027 expected_file1.emplace_back(KeyStr("a", 100 - i, kTypeValue),
2028 "val" + std::to_string(100 - i));
2029 expected_file_disable_dynamic_file_size.emplace_back(
2030 KeyStr("a", 100 - i, kTypeValue), "val" + std::to_string(100 - i));
2031 }
2032
2033 // make sure `b` is cut in a separated file (so internally it's not using
2034 // internal comparator, which will think the "b:90" (seqno 90) here is smaller
2035 // than "b:85" on L2.)
2036 auto expected_file2 =
2037 mock::MakeMockFile({{KeyStr("b", 90U, kTypeValue), "valb"}});
2038
2039 expected_file_disable_dynamic_file_size.emplace_back(
2040 KeyStr("b", 90U, kTypeValue), "valb");
2041
2042 SetLastSequence(122U);
2043 const std::vector<int> input_levels = {0, 1};
2044 auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
2045 auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
2046
2047 // Just keep all the history
2048 std::vector<SequenceNumber> snapshots;
2049 for (int i = 80; i <= 100; i++) {
2050 snapshots.emplace_back(i);
2051 }
2052 if (enable_dyanmic_file_size) {
2053 RunCompaction({lvl0_files, lvl1_files}, input_levels,
2054 {expected_file1, expected_file2}, snapshots);
2055 } else {
2056 RunCompaction({lvl0_files, lvl1_files}, input_levels,
2057 {expected_file_disable_dynamic_file_size}, snapshots);
2058 }
2059 }
2060
2061 TEST_P(CompactionJobDynamicFileSizeTest, CutForMaxCompactionBytesSameKey) {
2062 // dynamic_file_size option should have no impact on cutting for max
2063 // compaction bytes.
2064 bool enable_dyanmic_file_size = GetParam();
2065 cf_options_.level_compaction_dynamic_file_size = enable_dyanmic_file_size;
2066
2067 NewDB();
2068 mutable_cf_options_.target_file_size_base = 80;
2069 mutable_cf_options_.max_compaction_bytes = 20;
2070
2071 auto file1 = mock::MakeMockFile({{KeyStr("a", 104U, kTypeValue), "val1"},
2072 {KeyStr("b", 103U, kTypeValue), "val"}});
2073 AddMockFile(file1);
2074
2075 auto file2 = mock::MakeMockFile({{KeyStr("a", 102U, kTypeValue), "val2"},
2076 {KeyStr("c", 101U, kTypeValue), "val"}});
2077 AddMockFile(file2, 1);
2078
2079 for (int i = 0; i < 10; i++) {
2080 auto file =
2081 mock::MakeMockFile({{KeyStr("a", 100 - (i * 2), kTypeValue), "val"},
2082 {KeyStr("a", 99 - (i * 2), kTypeValue), "val"}});
2083 AddMockFile(file, 2);
2084 }
2085
2086 for (int i = 0; i < 10; i++) {
2087 auto file =
2088 mock::MakeMockFile({{KeyStr("b", 80 - (i * 2), kTypeValue), "val"},
2089 {KeyStr("b", 79 - (i * 2), kTypeValue), "val"}});
2090 AddMockFile(file, 2);
2091 }
2092
2093 auto file5 = mock::MakeMockFile({{KeyStr("c", 60U, kTypeValue), "valc"},
2094 {KeyStr("c", 59U, kTypeValue), "valc"}});
2095
2096 // "a" has 10 overlapped grandparent files (each size 10), which is far
2097 // exceeded the `max_compaction_bytes`, but make sure 2 "a" are not separated,
2098 // as splitting them won't help reducing the compaction size.
2099 // also make sure "b" and "c" are cut separately.
2100 mock::KVVector expected_file1 =
2101 mock::MakeMockFile({{KeyStr("a", 104U, kTypeValue), "val1"},
2102 {KeyStr("a", 102U, kTypeValue), "val2"}});
2103 mock::KVVector expected_file2 =
2104 mock::MakeMockFile({{KeyStr("b", 103U, kTypeValue), "val"}});
2105 mock::KVVector expected_file3 =
2106 mock::MakeMockFile({{KeyStr("c", 101U, kTypeValue), "val"}});
2107
2108 SetLastSequence(122U);
2109 const std::vector<int> input_levels = {0, 1};
2110 auto lvl0_files = cfd_->current()->storage_info()->LevelFiles(0);
2111 auto lvl1_files = cfd_->current()->storage_info()->LevelFiles(1);
2112
2113 // Just keep all the history
2114 std::vector<SequenceNumber> snapshots;
2115 for (int i = 80; i <= 105; i++) {
2116 snapshots.emplace_back(i);
2117 }
2118 RunCompaction({lvl0_files, lvl1_files}, input_levels,
2119 {expected_file1, expected_file2, expected_file3}, snapshots);
2120 }
2121
2122 INSTANTIATE_TEST_CASE_P(CompactionJobDynamicFileSizeTest,
2123 CompactionJobDynamicFileSizeTest, testing::Bool());
2124
2125 class CompactionJobTimestampTest : public CompactionJobTestBase {
2126 public:
2127 CompactionJobTimestampTest()
2128 : CompactionJobTestBase(test::PerThreadDBPath("compaction_job_ts_test"),
2129 test::BytewiseComparatorWithU64TsWrapper(),
2130 test::EncodeInt, /*test_io_priority=*/false,
2131 TableTypeForTest::kMockTable) {}
2132 };
2133
2134 TEST_F(CompactionJobTimestampTest, GCDisabled) {
2135 NewDB();
2136
2137 auto file1 =
2138 mock::MakeMockFile({{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
2139 {KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
2140 {KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"},
2141 {KeyStr("d", 7, ValueType::kTypeValue, 97), "d7"}});
2142
2143 AddMockFile(file1);
2144
2145 auto file2 = mock::MakeMockFile(
2146 {{KeyStr("b", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
2147 {KeyStr("c", 5, ValueType::kTypeDeletionWithTimestamp, 95), ""},
2148 {KeyStr("c", 4, ValueType::kTypeValue, 94), "c5"},
2149 {KeyStr("d", 3, ValueType::kTypeSingleDeletion, 93), ""}});
2150 AddMockFile(file2);
2151
2152 SetLastSequence(10);
2153
2154 auto expected_results = mock::MakeMockFile(
2155 {{KeyStr("a", 10, ValueType::kTypeValue, 100), "a10"},
2156 {KeyStr("a", 9, ValueType::kTypeValue, 99), "a9"},
2157 {KeyStr("b", 8, ValueType::kTypeValue, 98), "b8"},
2158 {KeyStr("b", 6, ValueType::kTypeDeletionWithTimestamp, 96), ""},
2159 {KeyStr("c", 5, ValueType::kTypeDeletionWithTimestamp, 95), ""},
2160 {KeyStr("c", 4, ValueType::kTypeValue, 94), "c5"},
2161 {KeyStr("d", 7, ValueType::kTypeValue, 97), "d7"},
2162 {KeyStr("d", 3, ValueType::kTypeSingleDeletion, 93), ""}});
2163 constexpr int input_level = 0;
2164 const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
2165 RunCompaction({files}, {input_level}, {expected_results});
2166 }
2167
2168 TEST_F(CompactionJobTimestampTest, NoKeyExpired) {
2169 NewDB();
2170
2171 auto file1 =
2172 mock::MakeMockFile({{KeyStr("a", 6, ValueType::kTypeValue, 100), "a6"},
2173 {KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"},
2174 {KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"}});
2175 AddMockFile(file1);
2176
2177 auto file2 =
2178 mock::MakeMockFile({{KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
2179 {KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}});
2180 AddMockFile(file2);
2181
2182 SetLastSequence(101);
2183
2184 auto expected_results =
2185 mock::MakeMockFile({{KeyStr("a", 6, ValueType::kTypeValue, 100), "a6"},
2186 {KeyStr("a", 4, ValueType::kTypeValue, 98), "a4"},
2187 {KeyStr("b", 7, ValueType::kTypeValue, 101), "b7"},
2188 {KeyStr("c", 5, ValueType::kTypeValue, 99), "c5"},
2189 {KeyStr("c", 3, ValueType::kTypeValue, 97), "c3"}});
2190 constexpr int input_level = 0;
2191 const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
2192
2193 full_history_ts_low_ = encode_u64_ts_(0);
2194 RunCompaction({files}, {input_level}, {expected_results});
2195 }
2196
2197 TEST_F(CompactionJobTimestampTest, AllKeysExpired) {
2198 NewDB();
2199
2200 auto file1 = mock::MakeMockFile(
2201 {{KeyStr("a", 5, ValueType::kTypeDeletionWithTimestamp, 100), ""},
2202 {KeyStr("b", 6, ValueType::kTypeSingleDeletion, 99), ""},
2203 {KeyStr("c", 7, ValueType::kTypeValue, 98), "c7"}});
2204 AddMockFile(file1);
2205
2206 auto file2 = mock::MakeMockFile(
2207 {{KeyStr("a", 4, ValueType::kTypeValue, 97), "a4"},
2208 {KeyStr("b", 3, ValueType::kTypeValue, 96), "b3"},
2209 {KeyStr("c", 2, ValueType::kTypeDeletionWithTimestamp, 95), ""},
2210 {KeyStr("c", 1, ValueType::kTypeValue, 94), "c1"}});
2211 AddMockFile(file2);
2212
2213 SetLastSequence(7);
2214
2215 auto expected_results =
2216 mock::MakeMockFile({{KeyStr("c", 0, ValueType::kTypeValue, 0), "c7"}});
2217 constexpr int input_level = 0;
2218 const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
2219
2220 full_history_ts_low_ = encode_u64_ts_(std::numeric_limits<uint64_t>::max());
2221 RunCompaction({files}, {input_level}, {expected_results});
2222 }
2223
2224 TEST_F(CompactionJobTimestampTest, SomeKeysExpired) {
2225 NewDB();
2226
2227 auto file1 =
2228 mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"},
2229 {KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}});
2230 AddMockFile(file1);
2231
2232 auto file2 = mock::MakeMockFile(
2233 {{KeyStr("a", 3, ValueType::kTypeValue, 48), "a3"},
2234 {KeyStr("a", 2, ValueType::kTypeValue, 46), "a2"},
2235 {KeyStr("b", 4, ValueType::kTypeDeletionWithTimestamp, 47), ""}});
2236 AddMockFile(file2);
2237
2238 SetLastSequence(6);
2239
2240 auto expected_results =
2241 mock::MakeMockFile({{KeyStr("a", 5, ValueType::kTypeValue, 50), "a5"},
2242 {KeyStr("a", 0, ValueType::kTypeValue, 0), "a3"},
2243 {KeyStr("b", 6, ValueType::kTypeValue, 49), "b6"}});
2244 constexpr int input_level = 0;
2245 const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
2246
2247 full_history_ts_low_ = encode_u64_ts_(49);
2248 RunCompaction({files}, {input_level}, {expected_results});
2249 }
2250
2251 class CompactionJobTimestampTestWithBbTable : public CompactionJobTestBase {
2252 public:
2253 // Block-based table is needed if we want to test subcompaction partitioning
2254 // with anchors.
2255 explicit CompactionJobTimestampTestWithBbTable()
2256 : CompactionJobTestBase(
2257 test::PerThreadDBPath("compaction_job_ts_bbt_test"),
2258 test::BytewiseComparatorWithU64TsWrapper(), test::EncodeInt,
2259 /*test_io_priority=*/false, TableTypeForTest::kBlockBasedTable) {}
2260 };
2261
2262 TEST_F(CompactionJobTimestampTestWithBbTable, SubcompactionAnchorL1) {
2263 cf_options_.target_file_size_base = 20;
2264 mutable_cf_options_.target_file_size_base = 20;
2265 NewDB();
2266
2267 const std::vector<std::string> keys = {
2268 KeyStr("a", 20, ValueType::kTypeValue, 200),
2269 KeyStr("b", 21, ValueType::kTypeValue, 210),
2270 KeyStr("b", 20, ValueType::kTypeValue, 200),
2271 KeyStr("b", 18, ValueType::kTypeValue, 180),
2272 KeyStr("c", 17, ValueType::kTypeValue, 170),
2273 KeyStr("c", 16, ValueType::kTypeValue, 160),
2274 KeyStr("c", 15, ValueType::kTypeValue, 150)};
2275 const std::vector<std::string> values = {"a20", "b21", "b20", "b18",
2276 "c17", "c16", "c15"};
2277
2278 constexpr int input_level = 1;
2279
2280 auto file1 = mock::MakeMockFile(
2281 {{keys[0], values[0]}, {keys[1], values[1]}, {keys[2], values[2]}});
2282 AddMockFile(file1, input_level);
2283
2284 auto file2 = mock::MakeMockFile(
2285 {{keys[3], values[3]}, {keys[4], values[4]}, {keys[5], values[5]}});
2286 AddMockFile(file2, input_level);
2287
2288 auto file3 = mock::MakeMockFile({{keys[6], values[6]}});
2289 AddMockFile(file3, input_level);
2290
2291 SetLastSequence(20);
2292
2293 auto output1 = mock::MakeMockFile({{keys[0], values[0]}});
2294 auto output2 = mock::MakeMockFile(
2295 {{keys[1], values[1]}, {keys[2], values[2]}, {keys[3], values[3]}});
2296 auto output3 = mock::MakeMockFile(
2297 {{keys[4], values[4]}, {keys[5], values[5]}, {keys[6], values[6]}});
2298
2299 auto expected_results =
2300 std::vector<mock::KVVector>{output1, output2, output3};
2301 const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
2302
2303 constexpr int output_level = 2;
2304 constexpr int max_subcompactions = 4;
2305 RunCompaction({files}, {input_level}, expected_results, /*snapshots=*/{},
2306 /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
2307 output_level, /*verify=*/true, {kInvalidBlobFileNumber},
2308 /*check_get_priority=*/false, Env::IO_TOTAL, Env::IO_TOTAL,
2309 max_subcompactions);
2310 }
2311
2312 TEST_F(CompactionJobTimestampTestWithBbTable, SubcompactionL0) {
2313 cf_options_.target_file_size_base = 20;
2314 mutable_cf_options_.target_file_size_base = 20;
2315 NewDB();
2316
2317 const std::vector<std::string> keys = {
2318 KeyStr("a", 20, ValueType::kTypeValue, 200),
2319 KeyStr("b", 20, ValueType::kTypeValue, 200),
2320 KeyStr("b", 19, ValueType::kTypeValue, 190),
2321 KeyStr("b", 18, ValueType::kTypeValue, 180),
2322 KeyStr("c", 17, ValueType::kTypeValue, 170),
2323 KeyStr("c", 16, ValueType::kTypeValue, 160),
2324 KeyStr("c", 15, ValueType::kTypeValue, 150)};
2325 const std::vector<std::string> values = {"a20", "b20", "b19", "b18",
2326 "c17", "c16", "c15"};
2327
2328 constexpr int input_level = 0;
2329
2330 auto file1 = mock::MakeMockFile({{keys[5], values[5]}, {keys[6], values[6]}});
2331 AddMockFile(file1, input_level);
2332
2333 auto file2 = mock::MakeMockFile({{keys[3], values[3]}, {keys[4], values[4]}});
2334 AddMockFile(file2, input_level);
2335
2336 auto file3 = mock::MakeMockFile(
2337 {{keys[0], values[0]}, {keys[1], values[1]}, {keys[2], values[2]}});
2338 AddMockFile(file3, input_level);
2339
2340 SetLastSequence(20);
2341
2342 auto output1 = mock::MakeMockFile({{keys[0], values[0]}});
2343 auto output2 = mock::MakeMockFile(
2344 {{keys[1], values[1]}, {keys[2], values[2]}, {keys[3], values[3]}});
2345 auto output3 = mock::MakeMockFile(
2346 {{keys[4], values[4]}, {keys[5], values[5]}, {keys[6], values[6]}});
2347
2348 auto expected_results =
2349 std::vector<mock::KVVector>{output1, output2, output3};
2350 const auto& files = cfd_->current()->storage_info()->LevelFiles(input_level);
2351
2352 constexpr int output_level = 1;
2353 constexpr int max_subcompactions = 4;
2354 RunCompaction({files}, {input_level}, expected_results, /*snapshots=*/{},
2355 /*earliest_write_conflict_snapshot=*/kMaxSequenceNumber,
2356 output_level, /*verify=*/true, {kInvalidBlobFileNumber},
2357 /*check_get_priority=*/false, Env::IO_TOTAL, Env::IO_TOTAL,
2358 max_subcompactions);
2359 }
2360
2361 // The io priority of the compaction reads and writes are different from
2362 // other DB reads and writes. To prepare the compaction input files, use the
2363 // default filesystem from Env. To test the io priority of the compaction
2364 // reads and writes, db_options_.fs is set as MockTestFileSystem.
2365 class CompactionJobIOPriorityTest : public CompactionJobTestBase {
2366 public:
2367 CompactionJobIOPriorityTest()
2368 : CompactionJobTestBase(
2369 test::PerThreadDBPath("compaction_job_io_priority_test"),
2370 BytewiseComparator(), [](uint64_t /*ts*/) { return ""; },
2371 /*test_io_priority=*/true, TableTypeForTest::kBlockBasedTable) {}
2372 };
2373
2374 TEST_F(CompactionJobIOPriorityTest, WriteControllerStateNormal) {
2375 // When the state from WriteController is normal.
2376 NewDB();
2377 mock::KVVector expected_results = CreateTwoFiles(false);
2378 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
2379 constexpr int input_level = 0;
2380 auto files = cfd->current()->storage_info()->LevelFiles(input_level);
2381 ASSERT_EQ(2U, files.size());
2382 RunCompaction({files}, {input_level}, {expected_results}, {},
2383 kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false,
2384 Env::IO_LOW, Env::IO_LOW);
2385 }
2386
2387 TEST_F(CompactionJobIOPriorityTest, WriteControllerStateDelayed) {
2388 // When the state from WriteController is Delayed.
2389 NewDB();
2390 mock::KVVector expected_results = CreateTwoFiles(false);
2391 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
2392 constexpr int input_level = 0;
2393 auto files = cfd->current()->storage_info()->LevelFiles(input_level);
2394 ASSERT_EQ(2U, files.size());
2395 {
2396 std::unique_ptr<WriteControllerToken> delay_token =
2397 write_controller_.GetDelayToken(1000000);
2398 RunCompaction({files}, {input_level}, {expected_results}, {},
2399 kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false,
2400 Env::IO_USER, Env::IO_USER);
2401 }
2402 }
2403
2404 TEST_F(CompactionJobIOPriorityTest, WriteControllerStateStalled) {
2405 // When the state from WriteController is Stalled.
2406 NewDB();
2407 mock::KVVector expected_results = CreateTwoFiles(false);
2408 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
2409 constexpr int input_level = 0;
2410 auto files = cfd->current()->storage_info()->LevelFiles(input_level);
2411 ASSERT_EQ(2U, files.size());
2412 {
2413 std::unique_ptr<WriteControllerToken> stop_token =
2414 write_controller_.GetStopToken();
2415 RunCompaction({files}, {input_level}, {expected_results}, {},
2416 kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, false,
2417 Env::IO_USER, Env::IO_USER);
2418 }
2419 }
2420
2421 TEST_F(CompactionJobIOPriorityTest, GetRateLimiterPriority) {
2422 NewDB();
2423 mock::KVVector expected_results = CreateTwoFiles(false);
2424 auto cfd = versions_->GetColumnFamilySet()->GetDefault();
2425 constexpr int input_level = 0;
2426 auto files = cfd->current()->storage_info()->LevelFiles(input_level);
2427 ASSERT_EQ(2U, files.size());
2428 RunCompaction({files}, {input_level}, {expected_results}, {},
2429 kMaxSequenceNumber, 1, false, {kInvalidBlobFileNumber}, true,
2430 Env::IO_LOW, Env::IO_LOW);
2431 }
2432
2433 } // namespace ROCKSDB_NAMESPACE
2434
2435 int main(int argc, char** argv) {
2436 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
2437 ::testing::InitGoogleTest(&argc, argv);
2438 RegisterCustomObjects(argc, argv);
2439 return RUN_ALL_TESTS();
2440 }
2441
2442 #else
2443 #include <stdio.h>
2444
2445 int main(int /*argc*/, char** /*argv*/) {
2446 fprintf(stderr,
2447 "SKIPPED as CompactionJobStats is not supported in ROCKSDB_LITE\n");
2448 return 0;
2449 }
2450
2451 #endif // ROCKSDB_LITE