1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/blob/blob_index.h"
7 #include "db/blob/blob_log_format.h"
8 #include "db/db_test_util.h"
9 #include "port/stack_trace.h"
10 #include "test_util/sync_point.h"
12 namespace ROCKSDB_NAMESPACE
{
14 class DBBlobCompactionTest
: public DBTestBase
{
16 explicit DBBlobCompactionTest()
17 : DBTestBase("db_blob_compaction_test", /*env_do_fsync=*/false) {}
20 const std::vector
<InternalStats::CompactionStats
>& GetCompactionStats() {
21 VersionSet
* const versions
= dbfull()->GetVersionSet();
23 assert(versions
->GetColumnFamilySet());
25 ColumnFamilyData
* const cfd
= versions
->GetColumnFamilySet()->GetDefault();
28 const InternalStats
* const internal_stats
= cfd
->internal_stats();
29 assert(internal_stats
);
31 return internal_stats
->TEST_GetCompactionStats();
33 #endif // ROCKSDB_LITE
38 class FilterByKeyLength
: public CompactionFilter
{
40 explicit FilterByKeyLength(size_t len
) : length_threshold_(len
) {}
41 const char* Name() const override
{
42 return "rocksdb.compaction.filter.by.key.length";
44 CompactionFilter::Decision
FilterBlobByKey(
45 int /*level*/, const Slice
& key
, std::string
* /*new_value*/,
46 std::string
* /*skip_until*/) const override
{
47 if (key
.size() < length_threshold_
) {
48 return CompactionFilter::Decision::kRemove
;
50 return CompactionFilter::Decision::kKeep
;
54 size_t length_threshold_
;
57 class FilterByValueLength
: public CompactionFilter
{
59 explicit FilterByValueLength(size_t len
) : length_threshold_(len
) {}
60 const char* Name() const override
{
61 return "rocksdb.compaction.filter.by.value.length";
63 CompactionFilter::Decision
FilterV2(
64 int /*level*/, const Slice
& /*key*/, ValueType
/*value_type*/,
65 const Slice
& existing_value
, std::string
* /*new_value*/,
66 std::string
* /*skip_until*/) const override
{
67 if (existing_value
.size() < length_threshold_
) {
68 return CompactionFilter::Decision::kRemove
;
70 return CompactionFilter::Decision::kKeep
;
74 size_t length_threshold_
;
77 class BadBlobCompactionFilter
: public CompactionFilter
{
79 explicit BadBlobCompactionFilter(std::string prefix
,
80 CompactionFilter::Decision filter_by_key
,
81 CompactionFilter::Decision filter_v2
)
82 : prefix_(std::move(prefix
)),
83 filter_blob_by_key_(filter_by_key
),
84 filter_v2_(filter_v2
) {}
85 const char* Name() const override
{ return "rocksdb.compaction.filter.bad"; }
86 CompactionFilter::Decision
FilterBlobByKey(
87 int /*level*/, const Slice
& key
, std::string
* /*new_value*/,
88 std::string
* /*skip_until*/) const override
{
89 if (key
.size() >= prefix_
.size() &&
90 0 == strncmp(prefix_
.data(), key
.data(), prefix_
.size())) {
91 return CompactionFilter::Decision::kUndetermined
;
93 return filter_blob_by_key_
;
95 CompactionFilter::Decision
FilterV2(
96 int /*level*/, const Slice
& /*key*/, ValueType
/*value_type*/,
97 const Slice
& /*existing_value*/, std::string
* /*new_value*/,
98 std::string
* /*skip_until*/) const override
{
103 const std::string prefix_
;
104 const CompactionFilter::Decision filter_blob_by_key_
;
105 const CompactionFilter::Decision filter_v2_
;
108 class ValueBlindWriteFilter
: public CompactionFilter
{
110 explicit ValueBlindWriteFilter(std::string new_val
)
111 : new_value_(std::move(new_val
)) {}
112 const char* Name() const override
{
113 return "rocksdb.compaction.filter.blind.write";
115 CompactionFilter::Decision
FilterBlobByKey(
116 int level
, const Slice
& key
, std::string
* new_value
,
117 std::string
* skip_until
) const override
;
120 const std::string new_value_
;
123 CompactionFilter::Decision
ValueBlindWriteFilter::FilterBlobByKey(
124 int /*level*/, const Slice
& /*key*/, std::string
* new_value
,
125 std::string
* /*skip_until*/) const {
127 new_value
->assign(new_value_
);
128 return CompactionFilter::Decision::kChangeValue
;
131 class ValueMutationFilter
: public CompactionFilter
{
133 explicit ValueMutationFilter(std::string padding
)
134 : padding_(std::move(padding
)) {}
135 const char* Name() const override
{
136 return "rocksdb.compaction.filter.value.mutation";
138 CompactionFilter::Decision
FilterV2(int level
, const Slice
& key
,
139 ValueType value_type
,
140 const Slice
& existing_value
,
141 std::string
* new_value
,
142 std::string
* skip_until
) const override
;
145 const std::string padding_
;
148 CompactionFilter::Decision
ValueMutationFilter::FilterV2(
149 int /*level*/, const Slice
& /*key*/, ValueType value_type
,
150 const Slice
& existing_value
, std::string
* new_value
,
151 std::string
* /*skip_until*/) const {
152 assert(CompactionFilter::ValueType::kBlobIndex
!= value_type
);
153 if (CompactionFilter::ValueType::kValue
!= value_type
) {
154 return CompactionFilter::Decision::kKeep
;
157 new_value
->assign(existing_value
.data(), existing_value
.size());
158 new_value
->append(padding_
);
159 return CompactionFilter::Decision::kChangeValue
;
162 class AlwaysKeepFilter
: public CompactionFilter
{
164 explicit AlwaysKeepFilter() = default;
165 const char* Name() const override
{
166 return "rocksdb.compaction.filter.always.keep";
168 CompactionFilter::Decision
FilterV2(
169 int /*level*/, const Slice
& /*key*/, ValueType
/*value_type*/,
170 const Slice
& /*existing_value*/, std::string
* /*new_value*/,
171 std::string
* /*skip_until*/) const override
{
172 return CompactionFilter::Decision::kKeep
;
176 class SkipUntilFilter
: public CompactionFilter
{
178 explicit SkipUntilFilter(std::string skip_until
)
179 : skip_until_(std::move(skip_until
)) {}
181 const char* Name() const override
{
182 return "rocksdb.compaction.filter.skip.until";
185 CompactionFilter::Decision
FilterV2(int /* level */, const Slice
& /* key */,
186 ValueType
/* value_type */,
187 const Slice
& /* existing_value */,
188 std::string
* /* new_value */,
189 std::string
* skip_until
) const override
{
191 *skip_until
= skip_until_
;
193 return CompactionFilter::Decision::kRemoveAndSkipUntil
;
197 std::string skip_until_
;
200 } // anonymous namespace
202 class DBBlobBadCompactionFilterTest
203 : public DBBlobCompactionTest
,
204 public testing::WithParamInterface
<
205 std::tuple
<std::string
, CompactionFilter::Decision
,
206 CompactionFilter::Decision
>> {
208 explicit DBBlobBadCompactionFilterTest()
209 : compaction_filter_guard_(new BadBlobCompactionFilter(
210 std::get
<0>(GetParam()), std::get
<1>(GetParam()),
211 std::get
<2>(GetParam()))) {}
214 std::unique_ptr
<CompactionFilter
> compaction_filter_guard_
;
217 INSTANTIATE_TEST_CASE_P(
218 BadCompactionFilter
, DBBlobBadCompactionFilterTest
,
220 testing::Values("a"),
221 testing::Values(CompactionFilter::Decision::kChangeBlobIndex
,
222 CompactionFilter::Decision::kIOError
),
223 testing::Values(CompactionFilter::Decision::kUndetermined
,
224 CompactionFilter::Decision::kChangeBlobIndex
,
225 CompactionFilter::Decision::kIOError
)));
227 TEST_F(DBBlobCompactionTest
, FilterByKeyLength
) {
228 Options options
= GetDefaultOptions();
229 options
.enable_blob_files
= true;
230 options
.min_blob_size
= 0;
231 options
.create_if_missing
= true;
232 constexpr size_t kKeyLength
= 2;
233 std::unique_ptr
<CompactionFilter
> compaction_filter_guard(
234 new FilterByKeyLength(kKeyLength
));
235 options
.compaction_filter
= compaction_filter_guard
.get();
237 constexpr char short_key
[] = "a";
238 constexpr char long_key
[] = "abc";
239 constexpr char blob_value
[] = "value";
241 DestroyAndReopen(options
);
242 ASSERT_OK(Put(short_key
, blob_value
));
243 ASSERT_OK(Put(long_key
, blob_value
));
245 CompactRangeOptions cro
;
246 ASSERT_OK(db_
->CompactRange(cro
, /*begin=*/nullptr, /*end=*/nullptr));
248 ASSERT_TRUE(db_
->Get(ReadOptions(), short_key
, &value
).IsNotFound());
250 ASSERT_OK(db_
->Get(ReadOptions(), long_key
, &value
));
251 ASSERT_EQ("value", value
);
254 const auto& compaction_stats
= GetCompactionStats();
255 ASSERT_GE(compaction_stats
.size(), 2);
257 // Filter decides between kKeep and kRemove solely based on key;
258 // this involves neither reading nor writing blobs
259 ASSERT_EQ(compaction_stats
[1].bytes_read_blob
, 0);
260 ASSERT_EQ(compaction_stats
[1].bytes_written_blob
, 0);
261 #endif // ROCKSDB_LITE
266 TEST_F(DBBlobCompactionTest
, FilterByValueLength
) {
267 Options options
= GetDefaultOptions();
268 options
.enable_blob_files
= true;
269 options
.min_blob_size
= 5;
270 options
.create_if_missing
= true;
271 constexpr size_t kValueLength
= 5;
272 std::unique_ptr
<CompactionFilter
> compaction_filter_guard(
273 new FilterByValueLength(kValueLength
));
274 options
.compaction_filter
= compaction_filter_guard
.get();
276 const std::vector
<std::string
> short_value_keys
= {"a", "e", "j"};
277 constexpr char short_value
[] = "val";
278 const std::vector
<std::string
> long_value_keys
= {"b", "f", "k"};
279 constexpr char long_value
[] = "valuevalue";
281 DestroyAndReopen(options
);
282 for (size_t i
= 0; i
< short_value_keys
.size(); ++i
) {
283 ASSERT_OK(Put(short_value_keys
[i
], short_value
));
285 for (size_t i
= 0; i
< short_value_keys
.size(); ++i
) {
286 ASSERT_OK(Put(long_value_keys
[i
], long_value
));
289 CompactRangeOptions cro
;
290 ASSERT_OK(db_
->CompactRange(cro
, /*begin=*/nullptr, /*end=*/nullptr));
292 for (size_t i
= 0; i
< short_value_keys
.size(); ++i
) {
294 db_
->Get(ReadOptions(), short_value_keys
[i
], &value
).IsNotFound());
297 for (size_t i
= 0; i
< long_value_keys
.size(); ++i
) {
298 ASSERT_OK(db_
->Get(ReadOptions(), long_value_keys
[i
], &value
));
299 ASSERT_EQ(long_value
, value
);
303 const auto& compaction_stats
= GetCompactionStats();
304 ASSERT_GE(compaction_stats
.size(), 2);
306 // Filter decides between kKeep and kRemove based on value;
307 // this involves reading but not writing blobs
308 ASSERT_GT(compaction_stats
[1].bytes_read_blob
, 0);
309 ASSERT_EQ(compaction_stats
[1].bytes_written_blob
, 0);
310 #endif // ROCKSDB_LITE
316 TEST_F(DBBlobCompactionTest
, BlobCompactWithStartingLevel
) {
317 Options options
= GetDefaultOptions();
319 options
.enable_blob_files
= true;
320 options
.min_blob_size
= 1000;
321 options
.blob_file_starting_level
= 5;
322 options
.create_if_missing
= true;
324 // Open DB with fixed-prefix sst-partitioner so that compaction will cut
325 // new table file when encountering a new key whose 1-byte prefix changes.
326 constexpr size_t key_len
= 1;
327 options
.sst_partitioner_factory
=
328 NewSstPartitionerFixedPrefixFactory(key_len
);
330 ASSERT_OK(TryReopen(options
));
332 constexpr size_t blob_size
= 3000;
334 constexpr char first_key
[] = "a";
335 const std::string
first_blob(blob_size
, 'a');
336 ASSERT_OK(Put(first_key
, first_blob
));
338 constexpr char second_key
[] = "b";
339 const std::string
second_blob(2 * blob_size
, 'b');
340 ASSERT_OK(Put(second_key
, second_blob
));
342 constexpr char third_key
[] = "d";
343 const std::string
third_blob(blob_size
, 'd');
344 ASSERT_OK(Put(third_key
, third_blob
));
348 constexpr char fourth_key
[] = "c";
349 const std::string
fourth_blob(blob_size
, 'c');
350 ASSERT_OK(Put(fourth_key
, fourth_blob
));
354 ASSERT_EQ(0, GetBlobFileNumbers().size());
355 ASSERT_EQ(2, NumTableFilesAtLevel(/*level=*/0));
356 ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/1));
358 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
361 // No blob file should be created since blob_file_starting_level is 5.
362 ASSERT_EQ(0, GetBlobFileNumbers().size());
363 ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0));
364 ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));
367 options
.blob_file_starting_level
= 1;
368 DestroyAndReopen(options
);
370 ASSERT_OK(Put(first_key
, first_blob
));
371 ASSERT_OK(Put(second_key
, second_blob
));
372 ASSERT_OK(Put(third_key
, third_blob
));
374 ASSERT_OK(Put(fourth_key
, fourth_blob
));
377 ASSERT_EQ(0, GetBlobFileNumbers().size());
378 ASSERT_EQ(2, NumTableFilesAtLevel(/*level=*/0));
379 ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/1));
381 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
383 // The compaction's output level equals to blob_file_starting_level.
384 ASSERT_EQ(1, GetBlobFileNumbers().size());
385 ASSERT_EQ(0, NumTableFilesAtLevel(/*level=*/0));
386 ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));
393 TEST_F(DBBlobCompactionTest
, BlindWriteFilter
) {
394 Options options
= GetDefaultOptions();
395 options
.enable_blob_files
= true;
396 options
.min_blob_size
= 0;
397 options
.create_if_missing
= true;
398 constexpr char new_blob_value
[] = "new_blob_value";
399 std::unique_ptr
<CompactionFilter
> compaction_filter_guard(
400 new ValueBlindWriteFilter(new_blob_value
));
401 options
.compaction_filter
= compaction_filter_guard
.get();
402 DestroyAndReopen(options
);
403 const std::vector
<std::string
> keys
= {"a", "b", "c"};
404 const std::vector
<std::string
> values
= {"a_value", "b_value", "c_value"};
405 assert(keys
.size() == values
.size());
406 for (size_t i
= 0; i
< keys
.size(); ++i
) {
407 ASSERT_OK(Put(keys
[i
], values
[i
]));
410 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
412 for (const auto& key
: keys
) {
413 ASSERT_EQ(new_blob_value
, Get(key
));
417 const auto& compaction_stats
= GetCompactionStats();
418 ASSERT_GE(compaction_stats
.size(), 2);
420 // Filter unconditionally changes value in FilterBlobByKey;
421 // this involves writing but not reading blobs
422 ASSERT_EQ(compaction_stats
[1].bytes_read_blob
, 0);
423 ASSERT_GT(compaction_stats
[1].bytes_written_blob
, 0);
424 #endif // ROCKSDB_LITE
429 TEST_F(DBBlobCompactionTest
, SkipUntilFilter
) {
430 Options options
= GetDefaultOptions();
431 options
.enable_blob_files
= true;
433 std::unique_ptr
<CompactionFilter
> compaction_filter_guard(
434 new SkipUntilFilter("z"));
435 options
.compaction_filter
= compaction_filter_guard
.get();
439 const std::vector
<std::string
> keys
{"a", "b", "c"};
440 const std::vector
<std::string
> values
{"a_value", "b_value", "c_value"};
441 assert(keys
.size() == values
.size());
443 for (size_t i
= 0; i
< keys
.size(); ++i
) {
444 ASSERT_OK(Put(keys
[i
], values
[i
]));
449 int process_in_flow_called
= 0;
451 SyncPoint::GetInstance()->SetCallBack(
452 "BlobCountingIterator::UpdateAndCountBlobIfNeeded:ProcessInFlow",
453 [&process_in_flow_called
](void* /* arg */) { ++process_in_flow_called
; });
454 SyncPoint::GetInstance()->EnableProcessing();
456 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), /* begin */ nullptr,
459 SyncPoint::GetInstance()->DisableProcessing();
460 SyncPoint::GetInstance()->ClearAllCallBacks();
462 for (const auto& key
: keys
) {
463 ASSERT_EQ(Get(key
), "NOT_FOUND");
466 // Make sure SkipUntil was performed using iteration rather than Seek
467 ASSERT_EQ(process_in_flow_called
, keys
.size());
472 TEST_P(DBBlobBadCompactionFilterTest
, BadDecisionFromCompactionFilter
) {
473 Options options
= GetDefaultOptions();
474 options
.enable_blob_files
= true;
475 options
.min_blob_size
= 0;
476 options
.create_if_missing
= true;
477 options
.compaction_filter
= compaction_filter_guard_
.get();
478 DestroyAndReopen(options
);
479 ASSERT_OK(Put("b", "value"));
481 ASSERT_TRUE(db_
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
486 DestroyAndReopen(options
);
487 std::string
key(std::get
<0>(GetParam()));
488 ASSERT_OK(Put(key
, "value"));
490 ASSERT_TRUE(db_
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
496 TEST_F(DBBlobCompactionTest
, CompactionFilter_InlinedTTLIndex
) {
497 Options options
= GetDefaultOptions();
498 options
.create_if_missing
= true;
499 options
.enable_blob_files
= true;
500 options
.min_blob_size
= 0;
501 std::unique_ptr
<CompactionFilter
> compaction_filter_guard(
502 new ValueMutationFilter(""));
503 options
.compaction_filter
= compaction_filter_guard
.get();
504 DestroyAndReopen(options
);
505 constexpr char key
[] = "key";
506 constexpr char blob
[] = "blob";
507 // Fake an inlined TTL blob index.
508 std::string blob_index
;
509 constexpr uint64_t expiration
= 1234567890;
510 BlobIndex::EncodeInlinedTTL(&blob_index
, expiration
, blob
);
512 ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch
, 0, key
, blob_index
));
513 ASSERT_OK(db_
->Write(WriteOptions(), &batch
));
515 ASSERT_TRUE(db_
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
521 TEST_F(DBBlobCompactionTest
, CompactionFilter
) {
522 Options options
= GetDefaultOptions();
523 options
.create_if_missing
= true;
524 options
.enable_blob_files
= true;
525 options
.min_blob_size
= 0;
526 constexpr char padding
[] = "_delta";
527 std::unique_ptr
<CompactionFilter
> compaction_filter_guard(
528 new ValueMutationFilter(padding
));
529 options
.compaction_filter
= compaction_filter_guard
.get();
530 DestroyAndReopen(options
);
531 const std::vector
<std::pair
<std::string
, std::string
>> kvs
= {
532 {"a", "a_value"}, {"b", "b_value"}, {"c", "c_value"}};
533 for (const auto& kv
: kvs
) {
534 ASSERT_OK(Put(kv
.first
, kv
.second
));
537 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
539 for (const auto& kv
: kvs
) {
540 ASSERT_EQ(kv
.second
+ std::string(padding
), Get(kv
.first
));
544 const auto& compaction_stats
= GetCompactionStats();
545 ASSERT_GE(compaction_stats
.size(), 2);
547 // Filter changes the value using the previous value in FilterV2;
548 // this involves reading and writing blobs
549 ASSERT_GT(compaction_stats
[1].bytes_read_blob
, 0);
550 ASSERT_GT(compaction_stats
[1].bytes_written_blob
, 0);
551 #endif // ROCKSDB_LITE
556 TEST_F(DBBlobCompactionTest
, CorruptedBlobIndex
) {
557 Options options
= GetDefaultOptions();
558 options
.create_if_missing
= true;
559 options
.enable_blob_files
= true;
560 options
.min_blob_size
= 0;
561 std::unique_ptr
<CompactionFilter
> compaction_filter_guard(
562 new ValueMutationFilter(""));
563 options
.compaction_filter
= compaction_filter_guard
.get();
564 DestroyAndReopen(options
);
566 constexpr char key
[] = "key";
567 constexpr char blob
[] = "blob";
569 ASSERT_OK(Put(key
, blob
));
572 SyncPoint::GetInstance()->SetCallBack(
573 "CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex",
575 Slice
* const blob_index
= static_cast<Slice
*>(arg
);
577 assert(!blob_index
->empty());
578 blob_index
->remove_prefix(1);
580 SyncPoint::GetInstance()->EnableProcessing();
582 ASSERT_TRUE(db_
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
586 SyncPoint::GetInstance()->DisableProcessing();
587 SyncPoint::GetInstance()->ClearAllCallBacks();
592 TEST_F(DBBlobCompactionTest
, CompactionFilterReadBlobAndKeep
) {
593 Options options
= GetDefaultOptions();
594 options
.create_if_missing
= true;
595 options
.enable_blob_files
= true;
596 options
.min_blob_size
= 0;
597 std::unique_ptr
<CompactionFilter
> compaction_filter_guard(
598 new AlwaysKeepFilter());
599 options
.compaction_filter
= compaction_filter_guard
.get();
600 DestroyAndReopen(options
);
601 ASSERT_OK(Put("foo", "foo_value"));
603 std::vector
<uint64_t> blob_files
= GetBlobFileNumbers();
604 ASSERT_EQ(1, blob_files
.size());
605 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
607 ASSERT_EQ(blob_files
, GetBlobFileNumbers());
610 const auto& compaction_stats
= GetCompactionStats();
611 ASSERT_GE(compaction_stats
.size(), 2);
613 // Filter decides to keep the existing value in FilterV2;
614 // this involves reading but not writing blobs
615 ASSERT_GT(compaction_stats
[1].bytes_read_blob
, 0);
616 ASSERT_EQ(compaction_stats
[1].bytes_written_blob
, 0);
617 #endif // ROCKSDB_LITE
622 TEST_F(DBBlobCompactionTest
, TrackGarbage
) {
623 Options options
= GetDefaultOptions();
624 options
.enable_blob_files
= true;
628 // First table+blob file pair: 4 blobs with different keys
629 constexpr char first_key
[] = "first_key";
630 constexpr char first_value
[] = "first_value";
631 constexpr char second_key
[] = "second_key";
632 constexpr char second_value
[] = "second_value";
633 constexpr char third_key
[] = "third_key";
634 constexpr char third_value
[] = "third_value";
635 constexpr char fourth_key
[] = "fourth_key";
636 constexpr char fourth_value
[] = "fourth_value";
638 ASSERT_OK(Put(first_key
, first_value
));
639 ASSERT_OK(Put(second_key
, second_value
));
640 ASSERT_OK(Put(third_key
, third_value
));
641 ASSERT_OK(Put(fourth_key
, fourth_value
));
644 // Second table+blob file pair: overwrite 2 existing keys
645 constexpr char new_first_value
[] = "new_first_value";
646 constexpr char new_second_value
[] = "new_second_value";
648 ASSERT_OK(Put(first_key
, new_first_value
));
649 ASSERT_OK(Put(second_key
, new_second_value
));
652 // Compact them together. The first blob file should have 2 garbage blobs
653 // corresponding to the 2 overwritten keys.
654 constexpr Slice
* begin
= nullptr;
655 constexpr Slice
* end
= nullptr;
657 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), begin
, end
));
659 VersionSet
* const versions
= dbfull()->GetVersionSet();
661 assert(versions
->GetColumnFamilySet());
663 ColumnFamilyData
* const cfd
= versions
->GetColumnFamilySet()->GetDefault();
666 Version
* const current
= cfd
->current();
669 const VersionStorageInfo
* const storage_info
= current
->storage_info();
670 assert(storage_info
);
672 const auto& blob_files
= storage_info
->GetBlobFiles();
673 ASSERT_EQ(blob_files
.size(), 2);
676 const auto& meta
= blob_files
.front();
679 constexpr uint64_t first_expected_bytes
=
680 sizeof(first_value
) - 1 +
681 BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key
) -
683 constexpr uint64_t second_expected_bytes
=
684 sizeof(second_value
) - 1 +
685 BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key
) -
687 constexpr uint64_t third_expected_bytes
=
688 sizeof(third_value
) - 1 +
689 BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(third_key
) -
691 constexpr uint64_t fourth_expected_bytes
=
692 sizeof(fourth_value
) - 1 +
693 BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(fourth_key
) -
696 ASSERT_EQ(meta
->GetTotalBlobCount(), 4);
697 ASSERT_EQ(meta
->GetTotalBlobBytes(),
698 first_expected_bytes
+ second_expected_bytes
+
699 third_expected_bytes
+ fourth_expected_bytes
);
700 ASSERT_EQ(meta
->GetGarbageBlobCount(), 2);
701 ASSERT_EQ(meta
->GetGarbageBlobBytes(),
702 first_expected_bytes
+ second_expected_bytes
);
706 const auto& meta
= blob_files
.back();
709 constexpr uint64_t new_first_expected_bytes
=
710 sizeof(new_first_value
) - 1 +
711 BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(first_key
) -
713 constexpr uint64_t new_second_expected_bytes
=
714 sizeof(new_second_value
) - 1 +
715 BlobLogRecord::CalculateAdjustmentForRecordHeader(sizeof(second_key
) -
718 ASSERT_EQ(meta
->GetTotalBlobCount(), 2);
719 ASSERT_EQ(meta
->GetTotalBlobBytes(),
720 new_first_expected_bytes
+ new_second_expected_bytes
);
721 ASSERT_EQ(meta
->GetGarbageBlobCount(), 0);
722 ASSERT_EQ(meta
->GetGarbageBlobBytes(), 0);
726 TEST_F(DBBlobCompactionTest
, MergeBlobWithBase
) {
727 Options options
= GetDefaultOptions();
728 options
.enable_blob_files
= true;
729 options
.min_blob_size
= 0;
730 options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
731 options
.disable_auto_compactions
= true;
734 ASSERT_OK(Put("Key1", "v1_1"));
735 ASSERT_OK(Put("Key2", "v2_1"));
738 ASSERT_OK(Merge("Key1", "v1_2"));
739 ASSERT_OK(Merge("Key2", "v2_2"));
742 ASSERT_OK(Merge("Key1", "v1_3"));
745 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
747 ASSERT_EQ(Get("Key1"), "v1_1,v1_2,v1_3");
748 ASSERT_EQ(Get("Key2"), "v2_1,v2_2");
752 TEST_F(DBBlobCompactionTest
, CompactionReadaheadGarbageCollection
) {
753 Options options
= GetDefaultOptions();
754 options
.enable_blob_files
= true;
755 options
.min_blob_size
= 0;
756 options
.enable_blob_garbage_collection
= true;
757 options
.blob_garbage_collection_age_cutoff
= 1.0;
758 options
.blob_compaction_readahead_size
= 1 << 10;
759 options
.disable_auto_compactions
= true;
763 ASSERT_OK(Put("key", "lime"));
764 ASSERT_OK(Put("foo", "bar"));
767 ASSERT_OK(Put("key", "pie"));
768 ASSERT_OK(Put("foo", "baz"));
771 size_t num_non_prefetch_reads
= 0;
772 SyncPoint::GetInstance()->SetCallBack(
773 "BlobFileReader::GetBlob:ReadFromFile",
774 [&num_non_prefetch_reads
](void* /* arg */) { ++num_non_prefetch_reads
; });
775 SyncPoint::GetInstance()->EnableProcessing();
777 constexpr Slice
* begin
= nullptr;
778 constexpr Slice
* end
= nullptr;
780 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), begin
, end
));
782 SyncPoint::GetInstance()->DisableProcessing();
783 SyncPoint::GetInstance()->ClearAllCallBacks();
785 ASSERT_EQ(Get("key"), "pie");
786 ASSERT_EQ(Get("foo"), "baz");
787 ASSERT_EQ(num_non_prefetch_reads
, 0);
792 TEST_F(DBBlobCompactionTest
, CompactionReadaheadFilter
) {
793 Options options
= GetDefaultOptions();
795 std::unique_ptr
<CompactionFilter
> compaction_filter_guard(
796 new ValueMutationFilter("pie"));
798 options
.compaction_filter
= compaction_filter_guard
.get();
799 options
.enable_blob_files
= true;
800 options
.min_blob_size
= 0;
801 options
.blob_compaction_readahead_size
= 1 << 10;
802 options
.disable_auto_compactions
= true;
806 ASSERT_OK(Put("key", "lime"));
807 ASSERT_OK(Put("foo", "bar"));
810 size_t num_non_prefetch_reads
= 0;
811 SyncPoint::GetInstance()->SetCallBack(
812 "BlobFileReader::GetBlob:ReadFromFile",
813 [&num_non_prefetch_reads
](void* /* arg */) { ++num_non_prefetch_reads
; });
814 SyncPoint::GetInstance()->EnableProcessing();
816 constexpr Slice
* begin
= nullptr;
817 constexpr Slice
* end
= nullptr;
819 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), begin
, end
));
821 SyncPoint::GetInstance()->DisableProcessing();
822 SyncPoint::GetInstance()->ClearAllCallBacks();
824 ASSERT_EQ(Get("key"), "limepie");
825 ASSERT_EQ(Get("foo"), "barpie");
826 ASSERT_EQ(num_non_prefetch_reads
, 0);
831 TEST_F(DBBlobCompactionTest
, CompactionReadaheadMerge
) {
832 Options options
= GetDefaultOptions();
833 options
.enable_blob_files
= true;
834 options
.min_blob_size
= 0;
835 options
.blob_compaction_readahead_size
= 1 << 10;
836 options
.merge_operator
= MergeOperators::CreateStringAppendOperator();
837 options
.disable_auto_compactions
= true;
841 ASSERT_OK(Put("key", "lime"));
842 ASSERT_OK(Put("foo", "bar"));
845 ASSERT_OK(Merge("key", "pie"));
846 ASSERT_OK(Merge("foo", "baz"));
849 size_t num_non_prefetch_reads
= 0;
850 SyncPoint::GetInstance()->SetCallBack(
851 "BlobFileReader::GetBlob:ReadFromFile",
852 [&num_non_prefetch_reads
](void* /* arg */) { ++num_non_prefetch_reads
; });
853 SyncPoint::GetInstance()->EnableProcessing();
855 constexpr Slice
* begin
= nullptr;
856 constexpr Slice
* end
= nullptr;
858 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), begin
, end
));
860 SyncPoint::GetInstance()->DisableProcessing();
861 SyncPoint::GetInstance()->ClearAllCallBacks();
863 ASSERT_EQ(Get("key"), "lime,pie");
864 ASSERT_EQ(Get("foo"), "bar,baz");
865 ASSERT_EQ(num_non_prefetch_reads
, 0);
870 TEST_F(DBBlobCompactionTest
, CompactionDoNotFillCache
) {
871 Options options
= GetDefaultOptions();
873 options
.enable_blob_files
= true;
874 options
.min_blob_size
= 0;
875 options
.enable_blob_garbage_collection
= true;
876 options
.blob_garbage_collection_age_cutoff
= 1.0;
877 options
.disable_auto_compactions
= true;
878 options
.statistics
= CreateDBStatistics();
880 LRUCacheOptions cache_options
;
881 cache_options
.capacity
= 1 << 20;
882 cache_options
.metadata_charge_policy
= kDontChargeCacheMetadata
;
884 options
.blob_cache
= NewLRUCache(cache_options
);
888 ASSERT_OK(Put("key", "lime"));
889 ASSERT_OK(Put("foo", "bar"));
892 ASSERT_OK(Put("key", "pie"));
893 ASSERT_OK(Put("foo", "baz"));
896 constexpr Slice
* begin
= nullptr;
897 constexpr Slice
* end
= nullptr;
899 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), begin
, end
));
901 ASSERT_EQ(options
.statistics
->getTickerCount(BLOB_DB_CACHE_ADD
), 0);
906 } // namespace ROCKSDB_NAMESPACE
908 int main(int argc
, char** argv
) {
909 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
910 ::testing::InitGoogleTest(&argc
, argv
);
911 RegisterCustomObjects(argc
, argv
);
912 return RUN_ALL_TESTS();