]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/blob/db_blob_compaction_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / blob / db_blob_compaction_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/blob/blob_index.h"
7 #include "db/blob/blob_log_format.h"
8 #include "db/db_test_util.h"
9 #include "port/stack_trace.h"
10 #include "test_util/sync_point.h"
11
12 namespace ROCKSDB_NAMESPACE {
13
14 class DBBlobCompactionTest : public DBTestBase {
15 public:
16 explicit DBBlobCompactionTest()
17 : DBTestBase("db_blob_compaction_test", /*env_do_fsync=*/false) {}
18
19 #ifndef ROCKSDB_LITE
20 const std::vector<InternalStats::CompactionStats>& GetCompactionStats() {
21 VersionSet* const versions = dbfull()->GetVersionSet();
22 assert(versions);
23 assert(versions->GetColumnFamilySet());
24
25 ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
26 assert(cfd);
27
28 const InternalStats* const internal_stats = cfd->internal_stats();
29 assert(internal_stats);
30
31 return internal_stats->TEST_GetCompactionStats();
32 }
33 #endif // ROCKSDB_LITE
34 };
35
36 namespace {
37
38 class FilterByKeyLength : public CompactionFilter {
39 public:
40 explicit FilterByKeyLength(size_t len) : length_threshold_(len) {}
41 const char* Name() const override {
42 return "rocksdb.compaction.filter.by.key.length";
43 }
44 CompactionFilter::Decision FilterBlobByKey(
45 int /*level*/, const Slice& key, std::string* /*new_value*/,
46 std::string* /*skip_until*/) const override {
47 if (key.size() < length_threshold_) {
48 return CompactionFilter::Decision::kRemove;
49 }
50 return CompactionFilter::Decision::kKeep;
51 }
52
53 private:
54 size_t length_threshold_;
55 };
56
57 class FilterByValueLength : public CompactionFilter {
58 public:
59 explicit FilterByValueLength(size_t len) : length_threshold_(len) {}
60 const char* Name() const override {
61 return "rocksdb.compaction.filter.by.value.length";
62 }
63 CompactionFilter::Decision FilterV2(
64 int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
65 const Slice& existing_value, std::string* /*new_value*/,
66 std::string* /*skip_until*/) const override {
67 if (existing_value.size() < length_threshold_) {
68 return CompactionFilter::Decision::kRemove;
69 }
70 return CompactionFilter::Decision::kKeep;
71 }
72
73 private:
74 size_t length_threshold_;
75 };
76
77 class BadBlobCompactionFilter : public CompactionFilter {
78 public:
79 explicit BadBlobCompactionFilter(std::string prefix,
80 CompactionFilter::Decision filter_by_key,
81 CompactionFilter::Decision filter_v2)
82 : prefix_(std::move(prefix)),
83 filter_blob_by_key_(filter_by_key),
84 filter_v2_(filter_v2) {}
85 const char* Name() const override { return "rocksdb.compaction.filter.bad"; }
86 CompactionFilter::Decision FilterBlobByKey(
87 int /*level*/, const Slice& key, std::string* /*new_value*/,
88 std::string* /*skip_until*/) const override {
89 if (key.size() >= prefix_.size() &&
90 0 == strncmp(prefix_.data(), key.data(), prefix_.size())) {
91 return CompactionFilter::Decision::kUndetermined;
92 }
93 return filter_blob_by_key_;
94 }
95 CompactionFilter::Decision FilterV2(
96 int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
97 const Slice& /*existing_value*/, std::string* /*new_value*/,
98 std::string* /*skip_until*/) const override {
99 return filter_v2_;
100 }
101
102 private:
103 const std::string prefix_;
104 const CompactionFilter::Decision filter_blob_by_key_;
105 const CompactionFilter::Decision filter_v2_;
106 };
107
108 class ValueBlindWriteFilter : public CompactionFilter {
109 public:
110 explicit ValueBlindWriteFilter(std::string new_val)
111 : new_value_(std::move(new_val)) {}
112 const char* Name() const override {
113 return "rocksdb.compaction.filter.blind.write";
114 }
115 CompactionFilter::Decision FilterBlobByKey(
116 int level, const Slice& key, std::string* new_value,
117 std::string* skip_until) const override;
118
119 private:
120 const std::string new_value_;
121 };
122
123 CompactionFilter::Decision ValueBlindWriteFilter::FilterBlobByKey(
124 int /*level*/, const Slice& /*key*/, std::string* new_value,
125 std::string* /*skip_until*/) const {
126 assert(new_value);
127 new_value->assign(new_value_);
128 return CompactionFilter::Decision::kChangeValue;
129 }
130
131 class ValueMutationFilter : public CompactionFilter {
132 public:
133 explicit ValueMutationFilter(std::string padding)
134 : padding_(std::move(padding)) {}
135 const char* Name() const override {
136 return "rocksdb.compaction.filter.value.mutation";
137 }
138 CompactionFilter::Decision FilterV2(int level, const Slice& key,
139 ValueType value_type,
140 const Slice& existing_value,
141 std::string* new_value,
142 std::string* skip_until) const override;
143
144 private:
145 const std::string padding_;
146 };
147
148 CompactionFilter::Decision ValueMutationFilter::FilterV2(
149 int /*level*/, const Slice& /*key*/, ValueType value_type,
150 const Slice& existing_value, std::string* new_value,
151 std::string* /*skip_until*/) const {
152 assert(CompactionFilter::ValueType::kBlobIndex != value_type);
153 if (CompactionFilter::ValueType::kValue != value_type) {
154 return CompactionFilter::Decision::kKeep;
155 }
156 assert(new_value);
157 new_value->assign(existing_value.data(), existing_value.size());
158 new_value->append(padding_);
159 return CompactionFilter::Decision::kChangeValue;
160 }
161
162 class AlwaysKeepFilter : public CompactionFilter {
163 public:
164 explicit AlwaysKeepFilter() = default;
165 const char* Name() const override {
166 return "rocksdb.compaction.filter.always.keep";
167 }
168 CompactionFilter::Decision FilterV2(
169 int /*level*/, const Slice& /*key*/, ValueType /*value_type*/,
170 const Slice& /*existing_value*/, std::string* /*new_value*/,
171 std::string* /*skip_until*/) const override {
172 return CompactionFilter::Decision::kKeep;
173 }
174 };
175
176 class SkipUntilFilter : public CompactionFilter {
177 public:
178 explicit SkipUntilFilter(std::string skip_until)
179 : skip_until_(std::move(skip_until)) {}
180
181 const char* Name() const override {
182 return "rocksdb.compaction.filter.skip.until";
183 }
184
185 CompactionFilter::Decision FilterV2(int /* level */, const Slice& /* key */,
186 ValueType /* value_type */,
187 const Slice& /* existing_value */,
188 std::string* /* new_value */,
189 std::string* skip_until) const override {
190 assert(skip_until);
191 *skip_until = skip_until_;
192
193 return CompactionFilter::Decision::kRemoveAndSkipUntil;
194 }
195
196 private:
197 std::string skip_until_;
198 };
199
200 } // anonymous namespace
201
202 class DBBlobBadCompactionFilterTest
203 : public DBBlobCompactionTest,
204 public testing::WithParamInterface<
205 std::tuple<std::string, CompactionFilter::Decision,
206 CompactionFilter::Decision>> {
207 public:
208 explicit DBBlobBadCompactionFilterTest()
209 : compaction_filter_guard_(new BadBlobCompactionFilter(
210 std::get<0>(GetParam()), std::get<1>(GetParam()),
211 std::get<2>(GetParam()))) {}
212
213 protected:
214 std::unique_ptr<CompactionFilter> compaction_filter_guard_;
215 };
216
217 INSTANTIATE_TEST_CASE_P(
218 BadCompactionFilter, DBBlobBadCompactionFilterTest,
219 testing::Combine(
220 testing::Values("a"),
221 testing::Values(CompactionFilter::Decision::kChangeBlobIndex,
222 CompactionFilter::Decision::kIOError),
223 testing::Values(CompactionFilter::Decision::kUndetermined,
224 CompactionFilter::Decision::kChangeBlobIndex,
225 CompactionFilter::Decision::kIOError)));
226
227 TEST_F(DBBlobCompactionTest, FilterByKeyLength) {
228 Options options = GetDefaultOptions();
229 options.enable_blob_files = true;
230 options.min_blob_size = 0;
231 options.create_if_missing = true;
232 constexpr size_t kKeyLength = 2;
233 std::unique_ptr<CompactionFilter> compaction_filter_guard(
234 new FilterByKeyLength(kKeyLength));
235 options.compaction_filter = compaction_filter_guard.get();
236
237 constexpr char short_key[] = "a";
238 constexpr char long_key[] = "abc";
239 constexpr char blob_value[] = "value";
240
241 DestroyAndReopen(options);
242 ASSERT_OK(Put(short_key, blob_value));
243 ASSERT_OK(Put(long_key, blob_value));
244 ASSERT_OK(Flush());
245 CompactRangeOptions cro;
246 ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
247 std::string value;
248 ASSERT_TRUE(db_->Get(ReadOptions(), short_key, &value).IsNotFound());
249 value.clear();
250 ASSERT_OK(db_->Get(ReadOptions(), long_key, &value));
251 ASSERT_EQ("value", value);
252
253 #ifndef ROCKSDB_LITE
254 const auto& compaction_stats = GetCompactionStats();
255 ASSERT_GE(compaction_stats.size(), 2);
256
257 // Filter decides between kKeep and kRemove solely based on key;
258 // this involves neither reading nor writing blobs
259 ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
260 ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
261 #endif // ROCKSDB_LITE
262
263 Close();
264 }
265
266 TEST_F(DBBlobCompactionTest, FilterByValueLength) {
267 Options options = GetDefaultOptions();
268 options.enable_blob_files = true;
269 options.min_blob_size = 5;
270 options.create_if_missing = true;
271 constexpr size_t kValueLength = 5;
272 std::unique_ptr<CompactionFilter> compaction_filter_guard(
273 new FilterByValueLength(kValueLength));
274 options.compaction_filter = compaction_filter_guard.get();
275
276 const std::vector<std::string> short_value_keys = {"a", "e", "j"};
277 constexpr char short_value[] = "val";
278 const std::vector<std::string> long_value_keys = {"b", "f", "k"};
279 constexpr char long_value[] = "valuevalue";
280
281 DestroyAndReopen(options);
282 for (size_t i = 0; i < short_value_keys.size(); ++i) {
283 ASSERT_OK(Put(short_value_keys[i], short_value));
284 }
285 for (size_t i = 0; i < short_value_keys.size(); ++i) {
286 ASSERT_OK(Put(long_value_keys[i], long_value));
287 }
288 ASSERT_OK(Flush());
289 CompactRangeOptions cro;
290 ASSERT_OK(db_->CompactRange(cro, /*begin=*/nullptr, /*end=*/nullptr));
291 std::string value;
292 for (size_t i = 0; i < short_value_keys.size(); ++i) {
293 ASSERT_TRUE(
294 db_->Get(ReadOptions(), short_value_keys[i], &value).IsNotFound());
295 value.clear();
296 }
297 for (size_t i = 0; i < long_value_keys.size(); ++i) {
298 ASSERT_OK(db_->Get(ReadOptions(), long_value_keys[i], &value));
299 ASSERT_EQ(long_value, value);
300 }
301
302 #ifndef ROCKSDB_LITE
303 const auto& compaction_stats = GetCompactionStats();
304 ASSERT_GE(compaction_stats.size(), 2);
305
306 // Filter decides between kKeep and kRemove based on value;
307 // this involves reading but not writing blobs
308 ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
309 ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
310 #endif // ROCKSDB_LITE
311
312 Close();
313 }
314
315 #ifndef ROCKSDB_LITE
316 TEST_F(DBBlobCompactionTest, BlobCompactWithStartingLevel) {
317 Options options = GetDefaultOptions();
318
319 options.enable_blob_files = true;
320 options.min_blob_size = 1000;
321 options.blob_file_starting_level = 5;
322 options.create_if_missing = true;
323
324 // Open DB with fixed-prefix sst-partitioner so that compaction will cut
325 // new table file when encountering a new key whose 1-byte prefix changes.
326 constexpr size_t key_len = 1;
327 options.sst_partitioner_factory =
328 NewSstPartitionerFixedPrefixFactory(key_len);
329
330 ASSERT_OK(TryReopen(options));
331
332 constexpr size_t blob_size = 3000;
333
334 constexpr char first_key[] = "a";
335 const std::string first_blob(blob_size, 'a');
336 ASSERT_OK(Put(first_key, first_blob));
337
338 constexpr char second_key[] = "b";
339 const std::string second_blob(2 * blob_size, 'b');
340 ASSERT_OK(Put(second_key, second_blob));
341
342 constexpr char third_key[] = "d";
343 const std::string third_blob(blob_size, 'd');
344 ASSERT_OK(Put(third_key, third_blob));
345
346 ASSERT_OK(Flush());
347
348 constexpr char fourth_key[] = "c";
349 const std::string fourth_blob(blob_size, 'c');
350 ASSERT_OK(Put(fourth_key, fourth_blob));
351
352 ASSERT_OK(Flush());
353
354 ASSERT_EQ(0, GetBlobFileNumbers().size());
355 ASSERT_EQ(2, NumTableFilesAtLevel(/*level=*/0));
356 ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/1));
357
358 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
359 /*end=*/nullptr));
360
361 // No blob file should be created since blob_file_starting_level is 5.
362 ASSERT_EQ(0, GetBlobFileNumbers().size());
363 ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0));
364 ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));
365
366 {
367 options.blob_file_starting_level = 1;
368 DestroyAndReopen(options);
369
370 ASSERT_OK(Put(first_key, first_blob));
371 ASSERT_OK(Put(second_key, second_blob));
372 ASSERT_OK(Put(third_key, third_blob));
373 ASSERT_OK(Flush());
374 ASSERT_OK(Put(fourth_key, fourth_blob));
375 ASSERT_OK(Flush());
376
377 ASSERT_EQ(0, GetBlobFileNumbers().size());
378 ASSERT_EQ(2, NumTableFilesAtLevel(/*level=*/0));
379 ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/1));
380
381 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
382 /*end=*/nullptr));
383 // The compaction's output level equals to blob_file_starting_level.
384 ASSERT_EQ(1, GetBlobFileNumbers().size());
385 ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0));
386 ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));
387 }
388
389 Close();
390 }
391 #endif
392
393 TEST_F(DBBlobCompactionTest, BlindWriteFilter) {
394 Options options = GetDefaultOptions();
395 options.enable_blob_files = true;
396 options.min_blob_size = 0;
397 options.create_if_missing = true;
398 constexpr char new_blob_value[] = "new_blob_value";
399 std::unique_ptr<CompactionFilter> compaction_filter_guard(
400 new ValueBlindWriteFilter(new_blob_value));
401 options.compaction_filter = compaction_filter_guard.get();
402 DestroyAndReopen(options);
403 const std::vector<std::string> keys = {"a", "b", "c"};
404 const std::vector<std::string> values = {"a_value", "b_value", "c_value"};
405 assert(keys.size() == values.size());
406 for (size_t i = 0; i < keys.size(); ++i) {
407 ASSERT_OK(Put(keys[i], values[i]));
408 }
409 ASSERT_OK(Flush());
410 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
411 /*end=*/nullptr));
412 for (const auto& key : keys) {
413 ASSERT_EQ(new_blob_value, Get(key));
414 }
415
416 #ifndef ROCKSDB_LITE
417 const auto& compaction_stats = GetCompactionStats();
418 ASSERT_GE(compaction_stats.size(), 2);
419
420 // Filter unconditionally changes value in FilterBlobByKey;
421 // this involves writing but not reading blobs
422 ASSERT_EQ(compaction_stats[1].bytes_read_blob, 0);
423 ASSERT_GT(compaction_stats[1].bytes_written_blob, 0);
424 #endif // ROCKSDB_LITE
425
426 Close();
427 }
428
429 TEST_F(DBBlobCompactionTest, SkipUntilFilter) {
430 Options options = GetDefaultOptions();
431 options.enable_blob_files = true;
432
433 std::unique_ptr<CompactionFilter> compaction_filter_guard(
434 new SkipUntilFilter("z"));
435 options.compaction_filter = compaction_filter_guard.get();
436
437 Reopen(options);
438
439 const std::vector<std::string> keys{"a", "b", "c"};
440 const std::vector<std::string> values{"a_value", "b_value", "c_value"};
441 assert(keys.size() == values.size());
442
443 for (size_t i = 0; i < keys.size(); ++i) {
444 ASSERT_OK(Put(keys[i], values[i]));
445 }
446
447 ASSERT_OK(Flush());
448
449 int process_in_flow_called = 0;
450
451 SyncPoint::GetInstance()->SetCallBack(
452 "BlobCountingIterator::UpdateAndCountBlobIfNeeded:ProcessInFlow",
453 [&process_in_flow_called](void* /* arg */) { ++process_in_flow_called; });
454 SyncPoint::GetInstance()->EnableProcessing();
455
456 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /* begin */ nullptr,
457 /* end */ nullptr));
458
459 SyncPoint::GetInstance()->DisableProcessing();
460 SyncPoint::GetInstance()->ClearAllCallBacks();
461
462 for (const auto& key : keys) {
463 ASSERT_EQ(Get(key), "NOT_FOUND");
464 }
465
466 // Make sure SkipUntil was performed using iteration rather than Seek
467 ASSERT_EQ(process_in_flow_called, keys.size());
468
469 Close();
470 }
471
472 TEST_P(DBBlobBadCompactionFilterTest, BadDecisionFromCompactionFilter) {
473 Options options = GetDefaultOptions();
474 options.enable_blob_files = true;
475 options.min_blob_size = 0;
476 options.create_if_missing = true;
477 options.compaction_filter = compaction_filter_guard_.get();
478 DestroyAndReopen(options);
479 ASSERT_OK(Put("b", "value"));
480 ASSERT_OK(Flush());
481 ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
482 /*end=*/nullptr)
483 .IsNotSupported());
484 Close();
485
486 DestroyAndReopen(options);
487 std::string key(std::get<0>(GetParam()));
488 ASSERT_OK(Put(key, "value"));
489 ASSERT_OK(Flush());
490 ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
491 /*end=*/nullptr)
492 .IsNotSupported());
493 Close();
494 }
495
496 TEST_F(DBBlobCompactionTest, CompactionFilter_InlinedTTLIndex) {
497 Options options = GetDefaultOptions();
498 options.create_if_missing = true;
499 options.enable_blob_files = true;
500 options.min_blob_size = 0;
501 std::unique_ptr<CompactionFilter> compaction_filter_guard(
502 new ValueMutationFilter(""));
503 options.compaction_filter = compaction_filter_guard.get();
504 DestroyAndReopen(options);
505 constexpr char key[] = "key";
506 constexpr char blob[] = "blob";
507 // Fake an inlined TTL blob index.
508 std::string blob_index;
509 constexpr uint64_t expiration = 1234567890;
510 BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob);
511 WriteBatch batch;
512 ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
513 ASSERT_OK(db_->Write(WriteOptions(), &batch));
514 ASSERT_OK(Flush());
515 ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
516 /*end=*/nullptr)
517 .IsCorruption());
518 Close();
519 }
520
521 TEST_F(DBBlobCompactionTest, CompactionFilter) {
522 Options options = GetDefaultOptions();
523 options.create_if_missing = true;
524 options.enable_blob_files = true;
525 options.min_blob_size = 0;
526 constexpr char padding[] = "_delta";
527 std::unique_ptr<CompactionFilter> compaction_filter_guard(
528 new ValueMutationFilter(padding));
529 options.compaction_filter = compaction_filter_guard.get();
530 DestroyAndReopen(options);
531 const std::vector<std::pair<std::string, std::string>> kvs = {
532 {"a", "a_value"}, {"b", "b_value"}, {"c", "c_value"}};
533 for (const auto& kv : kvs) {
534 ASSERT_OK(Put(kv.first, kv.second));
535 }
536 ASSERT_OK(Flush());
537 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
538 /*end=*/nullptr));
539 for (const auto& kv : kvs) {
540 ASSERT_EQ(kv.second + std::string(padding), Get(kv.first));
541 }
542
543 #ifndef ROCKSDB_LITE
544 const auto& compaction_stats = GetCompactionStats();
545 ASSERT_GE(compaction_stats.size(), 2);
546
547 // Filter changes the value using the previous value in FilterV2;
548 // this involves reading and writing blobs
549 ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
550 ASSERT_GT(compaction_stats[1].bytes_written_blob, 0);
551 #endif // ROCKSDB_LITE
552
553 Close();
554 }
555
556 TEST_F(DBBlobCompactionTest, CorruptedBlobIndex) {
557 Options options = GetDefaultOptions();
558 options.create_if_missing = true;
559 options.enable_blob_files = true;
560 options.min_blob_size = 0;
561 std::unique_ptr<CompactionFilter> compaction_filter_guard(
562 new ValueMutationFilter(""));
563 options.compaction_filter = compaction_filter_guard.get();
564 DestroyAndReopen(options);
565
566 constexpr char key[] = "key";
567 constexpr char blob[] = "blob";
568
569 ASSERT_OK(Put(key, blob));
570 ASSERT_OK(Flush());
571
572 SyncPoint::GetInstance()->SetCallBack(
573 "CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex",
574 [](void* arg) {
575 Slice* const blob_index = static_cast<Slice*>(arg);
576 assert(blob_index);
577 assert(!blob_index->empty());
578 blob_index->remove_prefix(1);
579 });
580 SyncPoint::GetInstance()->EnableProcessing();
581
582 ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
583 /*end=*/nullptr)
584 .IsCorruption());
585
586 SyncPoint::GetInstance()->DisableProcessing();
587 SyncPoint::GetInstance()->ClearAllCallBacks();
588
589 Close();
590 }
591
592 TEST_F(DBBlobCompactionTest, CompactionFilterReadBlobAndKeep) {
593 Options options = GetDefaultOptions();
594 options.create_if_missing = true;
595 options.enable_blob_files = true;
596 options.min_blob_size = 0;
597 std::unique_ptr<CompactionFilter> compaction_filter_guard(
598 new AlwaysKeepFilter());
599 options.compaction_filter = compaction_filter_guard.get();
600 DestroyAndReopen(options);
601 ASSERT_OK(Put("foo", "foo_value"));
602 ASSERT_OK(Flush());
603 std::vector<uint64_t> blob_files = GetBlobFileNumbers();
604 ASSERT_EQ(1, blob_files.size());
605 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
606 /*end=*/nullptr));
607 ASSERT_EQ(blob_files, GetBlobFileNumbers());
608
609 #ifndef ROCKSDB_LITE
610 const auto& compaction_stats = GetCompactionStats();
611 ASSERT_GE(compaction_stats.size(), 2);
612
613 // Filter decides to keep the existing value in FilterV2;
614 // this involves reading but not writing blobs
615 ASSERT_GT(compaction_stats[1].bytes_read_blob, 0);
616 ASSERT_EQ(compaction_stats[1].bytes_written_blob, 0);
617 #endif // ROCKSDB_LITE
618
619 Close();
620 }
621
622 TEST_F(DBBlobCompactionTest, TrackGarbage) {
623 Options options = GetDefaultOptions();
624 options.enable_blob_files = true;
625
626 Reopen(options);
627
628 // First table+blob file pair: 4 blobs with different keys
629 constexpr char first_key[] = "first_key";
630 constexpr char first_value[] = "first_value";
631 constexpr char second_key[] = "second_key";
632 constexpr char second_value[] = "second_value";
633 constexpr char third_key[] = "third_key";
634 constexpr char third_value[] = "third_value";
635 constexpr char fourth_key[] = "fourth_key";
636 constexpr char fourth_value[] = "fourth_value";
637
638 ASSERT_OK(Put(first_key, first_value));
639 ASSERT_OK(Put(second_key, second_value));
640 ASSERT_OK(Put(third_key, third_value));
641 ASSERT_OK(Put(fourth_key, fourth_value));
642 ASSERT_OK(Flush());
643
644 // Second table+blob file pair: overwrite 2 existing keys
645 constexpr char new_first_value[] = "new_first_value";
646 constexpr char new_second_value[] = "new_second_value";
647
648 ASSERT_OK(Put(first_key, new_first_value));
649 ASSERT_OK(Put(second_key, new_second_value));
650 ASSERT_OK(Flush());
651
652 // Compact them together. The first blob file should have 2 garbage blobs
653 // corresponding to the 2 overwritten keys.
654 constexpr Slice* begin = nullptr;
655 constexpr Slice* end = nullptr;
656
657 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
658
659 VersionSet* const versions = dbfull()->GetVersionSet();
660 assert(versions);
661 assert(versions->GetColumnFamilySet());
662
663 ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
664 assert(cfd);
665
666 Version* const current = cfd->current();
667 assert(current);
668
669 const VersionStorageInfo* const storage_info = current->storage_info();
670 assert(storage_info);
671
672 const auto& blob_files = storage_info->GetBlobFiles();
673 ASSERT_EQ(blob_files.size(), 2);
674
675 {
676 const auto& meta = blob_files.front();
677 assert(meta);
678
679 constexpr uint64_t first_expected_bytes =
680 sizeof(first_value) - 1 +
681 BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key) -
682 1);
683 constexpr uint64_t second_expected_bytes =
684 sizeof(second_value) - 1 +
685 BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key) -
686 1);
687 constexpr uint64_t third_expected_bytes =
688 sizeof(third_value) - 1 +
689 BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(third_key) -
690 1);
691 constexpr uint64_t fourth_expected_bytes =
692 sizeof(fourth_value) - 1 +
693 BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(fourth_key) -
694 1);
695
696 ASSERT_EQ(meta->GetTotalBlobCount(), 4);
697 ASSERT_EQ(meta->GetTotalBlobBytes(),
698 first_expected_bytes + second_expected_bytes +
699 third_expected_bytes + fourth_expected_bytes);
700 ASSERT_EQ(meta->GetGarbageBlobCount(), 2);
701 ASSERT_EQ(meta->GetGarbageBlobBytes(),
702 first_expected_bytes + second_expected_bytes);
703 }
704
705 {
706 const auto& meta = blob_files.back();
707 assert(meta);
708
709 constexpr uint64_t new_first_expected_bytes =
710 sizeof(new_first_value) - 1 +
711 BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key) -
712 1);
713 constexpr uint64_t new_second_expected_bytes =
714 sizeof(new_second_value) - 1 +
715 BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key) -
716 1);
717
718 ASSERT_EQ(meta->GetTotalBlobCount(), 2);
719 ASSERT_EQ(meta->GetTotalBlobBytes(),
720 new_first_expected_bytes + new_second_expected_bytes);
721 ASSERT_EQ(meta->GetGarbageBlobCount(), 0);
722 ASSERT_EQ(meta->GetGarbageBlobBytes(), 0);
723 }
724 }
725
726 TEST_F(DBBlobCompactionTest, MergeBlobWithBase) {
727 Options options = GetDefaultOptions();
728 options.enable_blob_files = true;
729 options.min_blob_size = 0;
730 options.merge_operator = MergeOperators::CreateStringAppendOperator();
731 options.disable_auto_compactions = true;
732
733 Reopen(options);
734 ASSERT_OK(Put("Key1", "v1_1"));
735 ASSERT_OK(Put("Key2", "v2_1"));
736 ASSERT_OK(Flush());
737
738 ASSERT_OK(Merge("Key1", "v1_2"));
739 ASSERT_OK(Merge("Key2", "v2_2"));
740 ASSERT_OK(Flush());
741
742 ASSERT_OK(Merge("Key1", "v1_3"));
743 ASSERT_OK(Flush());
744
745 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
746 /*end=*/nullptr));
747 ASSERT_EQ(Get("Key1"), "v1_1,v1_2,v1_3");
748 ASSERT_EQ(Get("Key2"), "v2_1,v2_2");
749 Close();
750 }
751
752 TEST_F(DBBlobCompactionTest, CompactionReadaheadGarbageCollection) {
753 Options options = GetDefaultOptions();
754 options.enable_blob_files = true;
755 options.min_blob_size = 0;
756 options.enable_blob_garbage_collection = true;
757 options.blob_garbage_collection_age_cutoff = 1.0;
758 options.blob_compaction_readahead_size = 1 << 10;
759 options.disable_auto_compactions = true;
760
761 Reopen(options);
762
763 ASSERT_OK(Put("key", "lime"));
764 ASSERT_OK(Put("foo", "bar"));
765 ASSERT_OK(Flush());
766
767 ASSERT_OK(Put("key", "pie"));
768 ASSERT_OK(Put("foo", "baz"));
769 ASSERT_OK(Flush());
770
771 size_t num_non_prefetch_reads = 0;
772 SyncPoint::GetInstance()->SetCallBack(
773 "BlobFileReader::GetBlob:ReadFromFile",
774 [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
775 SyncPoint::GetInstance()->EnableProcessing();
776
777 constexpr Slice* begin = nullptr;
778 constexpr Slice* end = nullptr;
779
780 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
781
782 SyncPoint::GetInstance()->DisableProcessing();
783 SyncPoint::GetInstance()->ClearAllCallBacks();
784
785 ASSERT_EQ(Get("key"), "pie");
786 ASSERT_EQ(Get("foo"), "baz");
787 ASSERT_EQ(num_non_prefetch_reads, 0);
788
789 Close();
790 }
791
792 TEST_F(DBBlobCompactionTest, CompactionReadaheadFilter) {
793 Options options = GetDefaultOptions();
794
795 std::unique_ptr<CompactionFilter> compaction_filter_guard(
796 new ValueMutationFilter("pie"));
797
798 options.compaction_filter = compaction_filter_guard.get();
799 options.enable_blob_files = true;
800 options.min_blob_size = 0;
801 options.blob_compaction_readahead_size = 1 << 10;
802 options.disable_auto_compactions = true;
803
804 Reopen(options);
805
806 ASSERT_OK(Put("key", "lime"));
807 ASSERT_OK(Put("foo", "bar"));
808 ASSERT_OK(Flush());
809
810 size_t num_non_prefetch_reads = 0;
811 SyncPoint::GetInstance()->SetCallBack(
812 "BlobFileReader::GetBlob:ReadFromFile",
813 [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
814 SyncPoint::GetInstance()->EnableProcessing();
815
816 constexpr Slice* begin = nullptr;
817 constexpr Slice* end = nullptr;
818
819 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
820
821 SyncPoint::GetInstance()->DisableProcessing();
822 SyncPoint::GetInstance()->ClearAllCallBacks();
823
824 ASSERT_EQ(Get("key"), "limepie");
825 ASSERT_EQ(Get("foo"), "barpie");
826 ASSERT_EQ(num_non_prefetch_reads, 0);
827
828 Close();
829 }
830
831 TEST_F(DBBlobCompactionTest, CompactionReadaheadMerge) {
832 Options options = GetDefaultOptions();
833 options.enable_blob_files = true;
834 options.min_blob_size = 0;
835 options.blob_compaction_readahead_size = 1 << 10;
836 options.merge_operator = MergeOperators::CreateStringAppendOperator();
837 options.disable_auto_compactions = true;
838
839 Reopen(options);
840
841 ASSERT_OK(Put("key", "lime"));
842 ASSERT_OK(Put("foo", "bar"));
843 ASSERT_OK(Flush());
844
845 ASSERT_OK(Merge("key", "pie"));
846 ASSERT_OK(Merge("foo", "baz"));
847 ASSERT_OK(Flush());
848
849 size_t num_non_prefetch_reads = 0;
850 SyncPoint::GetInstance()->SetCallBack(
851 "BlobFileReader::GetBlob:ReadFromFile",
852 [&num_non_prefetch_reads](void* /* arg */) { ++num_non_prefetch_reads; });
853 SyncPoint::GetInstance()->EnableProcessing();
854
855 constexpr Slice* begin = nullptr;
856 constexpr Slice* end = nullptr;
857
858 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
859
860 SyncPoint::GetInstance()->DisableProcessing();
861 SyncPoint::GetInstance()->ClearAllCallBacks();
862
863 ASSERT_EQ(Get("key"), "lime,pie");
864 ASSERT_EQ(Get("foo"), "bar,baz");
865 ASSERT_EQ(num_non_prefetch_reads, 0);
866
867 Close();
868 }
869
870 TEST_F(DBBlobCompactionTest, CompactionDoNotFillCache) {
871 Options options = GetDefaultOptions();
872
873 options.enable_blob_files = true;
874 options.min_blob_size = 0;
875 options.enable_blob_garbage_collection = true;
876 options.blob_garbage_collection_age_cutoff = 1.0;
877 options.disable_auto_compactions = true;
878 options.statistics = CreateDBStatistics();
879
880 LRUCacheOptions cache_options;
881 cache_options.capacity = 1 << 20;
882 cache_options.metadata_charge_policy = kDontChargeCacheMetadata;
883
884 options.blob_cache = NewLRUCache(cache_options);
885
886 Reopen(options);
887
888 ASSERT_OK(Put("key", "lime"));
889 ASSERT_OK(Put("foo", "bar"));
890 ASSERT_OK(Flush());
891
892 ASSERT_OK(Put("key", "pie"));
893 ASSERT_OK(Put("foo", "baz"));
894 ASSERT_OK(Flush());
895
896 constexpr Slice* begin = nullptr;
897 constexpr Slice* end = nullptr;
898
899 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
900
901 ASSERT_EQ(options.statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0);
902
903 Close();
904 }
905
906 } // namespace ROCKSDB_NAMESPACE
907
908 int main(int argc, char** argv) {
909 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
910 ::testing::InitGoogleTest(&argc, argv);
911 RegisterCustomObjects(argc, argv);
912 return RUN_ALL_TESTS();
913 }