1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/blob_db/blob_db.h"
20 #include "db/blob/blob_index.h"
21 #include "db/db_test_util.h"
22 #include "env/composite_env_wrapper.h"
23 #include "file/file_util.h"
24 #include "file/sst_file_manager_impl.h"
25 #include "port/port.h"
26 #include "rocksdb/utilities/debug.h"
27 #include "test_util/mock_time_env.h"
28 #include "test_util/sync_point.h"
29 #include "test_util/testharness.h"
30 #include "util/random.h"
31 #include "util/string_util.h"
32 #include "utilities/blob_db/blob_db_impl.h"
33 #include "utilities/fault_injection_env.h"
35 namespace ROCKSDB_NAMESPACE
{
38 class BlobDBTest
: public testing::Test
{
40 const int kMaxBlobSize
= 1 << 14;
42 struct BlobIndexVersion
{
43 BlobIndexVersion() = default;
44 BlobIndexVersion(std::string _user_key
, uint64_t _file_number
,
45 uint64_t _expiration
, SequenceNumber _sequence
,
47 : user_key(std::move(_user_key
)),
48 file_number(_file_number
),
49 expiration(_expiration
),
54 uint64_t file_number
= kInvalidBlobFileNumber
;
55 uint64_t expiration
= kNoExpiration
;
56 SequenceNumber sequence
= 0;
57 ValueType type
= kTypeValue
;
61 : dbname_(test::PerThreadDBPath("blob_db_test")), blob_db_(nullptr) {
62 mock_clock_
= std::make_shared
<MockSystemClock
>(SystemClock::Default());
63 mock_env_
.reset(new CompositeEnvWrapper(Env::Default(), mock_clock_
));
64 fault_injection_env_
.reset(new FaultInjectionTestEnv(Env::Default()));
66 Status s
= DestroyBlobDB(dbname_
, Options(), BlobDBOptions());
70 ~BlobDBTest() override
{
71 SyncPoint::GetInstance()->ClearAllCallBacks();
75 Status
TryOpen(BlobDBOptions bdb_options
= BlobDBOptions(),
76 Options options
= Options()) {
77 options
.create_if_missing
= true;
78 if (options
.env
== mock_env_
.get()) {
79 // Need to disable stats dumping and persisting which also use
80 // RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal.
81 // With mocked time, this can hang on some platforms (MacOS)
82 // because (a) on some platforms, pthread_cond_timedwait does not appear
83 // to release the lock for other threads to operate if the deadline time
84 // is already passed, and (b) TimedWait calls are currently a bad
85 // abstraction because the deadline parameter is usually computed from
86 // Env time, but is interpreted in real clock time.
87 options
.stats_dump_period_sec
= 0;
88 options
.stats_persist_period_sec
= 0;
90 return BlobDB::Open(options
, bdb_options
, dbname_
, &blob_db_
);
93 void Open(BlobDBOptions bdb_options
= BlobDBOptions(),
94 Options options
= Options()) {
95 ASSERT_OK(TryOpen(bdb_options
, options
));
98 void Reopen(BlobDBOptions bdb_options
= BlobDBOptions(),
99 Options options
= Options()) {
100 assert(blob_db_
!= nullptr);
103 Open(bdb_options
, options
);
107 assert(blob_db_
!= nullptr);
114 Options options
= blob_db_
->GetOptions();
115 BlobDBOptions bdb_options
= blob_db_
->GetBlobDBOptions();
118 ASSERT_OK(DestroyBlobDB(dbname_
, options
, bdb_options
));
122 BlobDBImpl
*blob_db_impl() {
123 return reinterpret_cast<BlobDBImpl
*>(blob_db_
);
126 Status
Put(const Slice
&key
, const Slice
&value
,
127 std::map
<std::string
, std::string
> *data
= nullptr) {
128 Status s
= blob_db_
->Put(WriteOptions(), key
, value
);
129 if (data
!= nullptr) {
130 (*data
)[key
.ToString()] = value
.ToString();
135 void Delete(const std::string
&key
,
136 std::map
<std::string
, std::string
> *data
= nullptr) {
137 ASSERT_OK(blob_db_
->Delete(WriteOptions(), key
));
138 if (data
!= nullptr) {
143 Status
PutWithTTL(const Slice
&key
, const Slice
&value
, uint64_t ttl
,
144 std::map
<std::string
, std::string
> *data
= nullptr) {
145 Status s
= blob_db_
->PutWithTTL(WriteOptions(), key
, value
, ttl
);
146 if (data
!= nullptr) {
147 (*data
)[key
.ToString()] = value
.ToString();
152 Status
PutUntil(const Slice
&key
, const Slice
&value
, uint64_t expiration
) {
153 return blob_db_
->PutUntil(WriteOptions(), key
, value
, expiration
);
156 void PutRandomWithTTL(const std::string
&key
, uint64_t ttl
, Random
*rnd
,
157 std::map
<std::string
, std::string
> *data
= nullptr) {
158 int len
= rnd
->Next() % kMaxBlobSize
+ 1;
159 std::string value
= rnd
->HumanReadableString(len
);
161 blob_db_
->PutWithTTL(WriteOptions(), Slice(key
), Slice(value
), ttl
));
162 if (data
!= nullptr) {
163 (*data
)[key
] = value
;
167 void PutRandomUntil(const std::string
&key
, uint64_t expiration
, Random
*rnd
,
168 std::map
<std::string
, std::string
> *data
= nullptr) {
169 int len
= rnd
->Next() % kMaxBlobSize
+ 1;
170 std::string value
= rnd
->HumanReadableString(len
);
171 ASSERT_OK(blob_db_
->PutUntil(WriteOptions(), Slice(key
), Slice(value
),
173 if (data
!= nullptr) {
174 (*data
)[key
] = value
;
178 void PutRandom(const std::string
&key
, Random
*rnd
,
179 std::map
<std::string
, std::string
> *data
= nullptr) {
180 PutRandom(blob_db_
, key
, rnd
, data
);
183 void PutRandom(DB
*db
, const std::string
&key
, Random
*rnd
,
184 std::map
<std::string
, std::string
> *data
= nullptr) {
185 int len
= rnd
->Next() % kMaxBlobSize
+ 1;
186 std::string value
= rnd
->HumanReadableString(len
);
187 ASSERT_OK(db
->Put(WriteOptions(), Slice(key
), Slice(value
)));
188 if (data
!= nullptr) {
189 (*data
)[key
] = value
;
193 void PutRandomToWriteBatch(
194 const std::string
&key
, Random
*rnd
, WriteBatch
*batch
,
195 std::map
<std::string
, std::string
> *data
= nullptr) {
196 int len
= rnd
->Next() % kMaxBlobSize
+ 1;
197 std::string value
= rnd
->HumanReadableString(len
);
198 ASSERT_OK(batch
->Put(key
, value
));
199 if (data
!= nullptr) {
200 (*data
)[key
] = value
;
204 // Verify blob db contain expected data and nothing more.
205 void VerifyDB(const std::map
<std::string
, std::string
> &data
) {
206 VerifyDB(blob_db_
, data
);
209 void VerifyDB(DB
*db
, const std::map
<std::string
, std::string
> &data
) {
211 auto *cfh
= db
->DefaultColumnFamily();
212 for (auto &p
: data
) {
213 PinnableSlice value_slice
;
214 ASSERT_OK(db
->Get(ReadOptions(), cfh
, p
.first
, &value_slice
));
215 ASSERT_EQ(p
.second
, value_slice
.ToString());
217 ASSERT_OK(db
->Get(ReadOptions(), cfh
, p
.first
, &value
));
218 ASSERT_EQ(p
.second
, value
);
222 Iterator
*iter
= db
->NewIterator(ReadOptions());
224 for (auto &p
: data
) {
225 ASSERT_TRUE(iter
->Valid());
226 ASSERT_EQ(p
.first
, iter
->key().ToString());
227 ASSERT_EQ(p
.second
, iter
->value().ToString());
230 ASSERT_FALSE(iter
->Valid());
231 ASSERT_OK(iter
->status());
236 const std::map
<std::string
, KeyVersion
> &expected_versions
) {
237 auto *bdb_impl
= static_cast<BlobDBImpl
*>(blob_db_
);
238 DB
*db
= blob_db_
->GetRootDB();
239 const size_t kMaxKeys
= 10000;
240 std::vector
<KeyVersion
> versions
;
241 ASSERT_OK(GetAllKeyVersions(db
, "", "", kMaxKeys
, &versions
));
242 ASSERT_EQ(expected_versions
.size(), versions
.size());
244 for (auto &key_version
: expected_versions
) {
245 const KeyVersion
&expected_version
= key_version
.second
;
246 ASSERT_EQ(expected_version
.user_key
, versions
[i
].user_key
);
247 ASSERT_EQ(expected_version
.sequence
, versions
[i
].sequence
);
248 ASSERT_EQ(expected_version
.type
, versions
[i
].type
);
249 if (versions
[i
].type
== kTypeValue
) {
250 ASSERT_EQ(expected_version
.value
, versions
[i
].value
);
252 ASSERT_EQ(kTypeBlobIndex
, versions
[i
].type
);
254 ASSERT_OK(bdb_impl
->TEST_GetBlobValue(versions
[i
].user_key
,
255 versions
[i
].value
, &value
));
256 ASSERT_EQ(expected_version
.value
, value
.ToString());
262 void VerifyBaseDBBlobIndex(
263 const std::map
<std::string
, BlobIndexVersion
> &expected_versions
) {
264 const size_t kMaxKeys
= 10000;
265 std::vector
<KeyVersion
> versions
;
267 GetAllKeyVersions(blob_db_
->GetRootDB(), "", "", kMaxKeys
, &versions
));
268 ASSERT_EQ(versions
.size(), expected_versions
.size());
271 for (const auto &expected_pair
: expected_versions
) {
272 const BlobIndexVersion
&expected_version
= expected_pair
.second
;
274 ASSERT_EQ(versions
[i
].user_key
, expected_version
.user_key
);
275 ASSERT_EQ(versions
[i
].sequence
, expected_version
.sequence
);
276 ASSERT_EQ(versions
[i
].type
, expected_version
.type
);
277 if (versions
[i
].type
!= kTypeBlobIndex
) {
278 ASSERT_EQ(kInvalidBlobFileNumber
, expected_version
.file_number
);
279 ASSERT_EQ(kNoExpiration
, expected_version
.expiration
);
285 BlobIndex blob_index
;
286 ASSERT_OK(blob_index
.DecodeFrom(versions
[i
].value
));
288 const uint64_t file_number
= !blob_index
.IsInlined()
289 ? blob_index
.file_number()
290 : kInvalidBlobFileNumber
;
291 ASSERT_EQ(file_number
, expected_version
.file_number
);
293 const uint64_t expiration
=
294 blob_index
.HasTTL() ? blob_index
.expiration() : kNoExpiration
;
295 ASSERT_EQ(expiration
, expected_version
.expiration
);
306 for (size_t i
= 0; i
< 100000; i
++) {
307 uint64_t ttl
= rnd
.Next() % 86400;
308 PutRandomWithTTL("key" + std::to_string(i
% 500), ttl
, &rnd
, nullptr);
311 for (size_t i
= 0; i
< 10; i
++) {
312 Delete("key" + std::to_string(i
% 500));
316 const std::string dbname_
;
317 std::shared_ptr
<MockSystemClock
> mock_clock_
;
318 std::unique_ptr
<Env
> mock_env_
;
319 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env_
;
321 }; // class BlobDBTest
323 TEST_F(BlobDBTest
, Put
) {
325 BlobDBOptions bdb_options
;
326 bdb_options
.min_blob_size
= 0;
327 bdb_options
.disable_background_tasks
= true;
329 std::map
<std::string
, std::string
> data
;
330 for (size_t i
= 0; i
< 100; i
++) {
331 PutRandom("key" + std::to_string(i
), &rnd
, &data
);
336 TEST_F(BlobDBTest
, PutWithTTL
) {
339 options
.env
= mock_env_
.get();
340 BlobDBOptions bdb_options
;
341 bdb_options
.ttl_range_secs
= 1000;
342 bdb_options
.min_blob_size
= 0;
343 bdb_options
.blob_file_size
= 256 * 1000 * 1000;
344 bdb_options
.disable_background_tasks
= true;
345 Open(bdb_options
, options
);
346 std::map
<std::string
, std::string
> data
;
347 mock_clock_
->SetCurrentTime(50);
348 for (size_t i
= 0; i
< 100; i
++) {
349 uint64_t ttl
= rnd
.Next() % 100;
350 PutRandomWithTTL("key" + std::to_string(i
), ttl
, &rnd
,
351 (ttl
<= 50 ? nullptr : &data
));
353 mock_clock_
->SetCurrentTime(100);
354 auto *bdb_impl
= static_cast<BlobDBImpl
*>(blob_db_
);
355 auto blob_files
= bdb_impl
->TEST_GetBlobFiles();
356 ASSERT_EQ(1, blob_files
.size());
357 ASSERT_TRUE(blob_files
[0]->HasTTL());
358 ASSERT_OK(bdb_impl
->TEST_CloseBlobFile(blob_files
[0]));
362 TEST_F(BlobDBTest
, PutUntil
) {
365 options
.env
= mock_env_
.get();
366 BlobDBOptions bdb_options
;
367 bdb_options
.ttl_range_secs
= 1000;
368 bdb_options
.min_blob_size
= 0;
369 bdb_options
.blob_file_size
= 256 * 1000 * 1000;
370 bdb_options
.disable_background_tasks
= true;
371 Open(bdb_options
, options
);
372 std::map
<std::string
, std::string
> data
;
373 mock_clock_
->SetCurrentTime(50);
374 for (size_t i
= 0; i
< 100; i
++) {
375 uint64_t expiration
= rnd
.Next() % 100 + 50;
376 PutRandomUntil("key" + std::to_string(i
), expiration
, &rnd
,
377 (expiration
<= 100 ? nullptr : &data
));
379 mock_clock_
->SetCurrentTime(100);
380 auto *bdb_impl
= static_cast<BlobDBImpl
*>(blob_db_
);
381 auto blob_files
= bdb_impl
->TEST_GetBlobFiles();
382 ASSERT_EQ(1, blob_files
.size());
383 ASSERT_TRUE(blob_files
[0]->HasTTL());
384 ASSERT_OK(bdb_impl
->TEST_CloseBlobFile(blob_files
[0]));
388 TEST_F(BlobDBTest
, StackableDBGet
) {
390 BlobDBOptions bdb_options
;
391 bdb_options
.min_blob_size
= 0;
392 bdb_options
.disable_background_tasks
= true;
394 std::map
<std::string
, std::string
> data
;
395 for (size_t i
= 0; i
< 100; i
++) {
396 PutRandom("key" + std::to_string(i
), &rnd
, &data
);
398 for (size_t i
= 0; i
< 100; i
++) {
399 StackableDB
*db
= blob_db_
;
400 ColumnFamilyHandle
*column_family
= db
->DefaultColumnFamily();
401 std::string key
= "key" + std::to_string(i
);
402 PinnableSlice pinnable_value
;
403 ASSERT_OK(db
->Get(ReadOptions(), column_family
, key
, &pinnable_value
));
404 std::string string_value
;
405 ASSERT_OK(db
->Get(ReadOptions(), column_family
, key
, &string_value
));
406 ASSERT_EQ(string_value
, pinnable_value
.ToString());
407 ASSERT_EQ(string_value
, data
[key
]);
411 TEST_F(BlobDBTest
, GetExpiration
) {
413 options
.env
= mock_env_
.get();
414 BlobDBOptions bdb_options
;
415 bdb_options
.disable_background_tasks
= true;
416 mock_clock_
->SetCurrentTime(100);
417 Open(bdb_options
, options
);
418 ASSERT_OK(Put("key1", "value1"));
419 ASSERT_OK(PutWithTTL("key2", "value2", 200));
422 ASSERT_OK(blob_db_
->Get(ReadOptions(), "key1", &value
, &expiration
));
423 ASSERT_EQ("value1", value
.ToString());
424 ASSERT_EQ(kNoExpiration
, expiration
);
425 ASSERT_OK(blob_db_
->Get(ReadOptions(), "key2", &value
, &expiration
));
426 ASSERT_EQ("value2", value
.ToString());
427 ASSERT_EQ(300 /* = 100 + 200 */, expiration
);
430 TEST_F(BlobDBTest
, GetIOError
) {
432 options
.env
= fault_injection_env_
.get();
433 BlobDBOptions bdb_options
;
434 bdb_options
.min_blob_size
= 0; // Make sure value write to blob file
435 bdb_options
.disable_background_tasks
= true;
436 Open(bdb_options
, options
);
437 ColumnFamilyHandle
*column_family
= blob_db_
->DefaultColumnFamily();
439 ASSERT_OK(Put("foo", "bar"));
440 fault_injection_env_
->SetFilesystemActive(false, Status::IOError());
441 Status s
= blob_db_
->Get(ReadOptions(), column_family
, "foo", &value
);
442 ASSERT_TRUE(s
.IsIOError());
443 // Reactivate file system to allow test to close DB.
444 fault_injection_env_
->SetFilesystemActive(true);
447 TEST_F(BlobDBTest
, PutIOError
) {
449 options
.env
= fault_injection_env_
.get();
450 BlobDBOptions bdb_options
;
451 bdb_options
.min_blob_size
= 0; // Make sure value write to blob file
452 bdb_options
.disable_background_tasks
= true;
453 Open(bdb_options
, options
);
454 fault_injection_env_
->SetFilesystemActive(false, Status::IOError());
455 ASSERT_TRUE(Put("foo", "v1").IsIOError());
456 fault_injection_env_
->SetFilesystemActive(true, Status::IOError());
457 ASSERT_OK(Put("bar", "v1"));
460 TEST_F(BlobDBTest
, WriteBatch
) {
462 BlobDBOptions bdb_options
;
463 bdb_options
.min_blob_size
= 0;
464 bdb_options
.disable_background_tasks
= true;
466 std::map
<std::string
, std::string
> data
;
467 for (size_t i
= 0; i
< 100; i
++) {
469 for (size_t j
= 0; j
< 10; j
++) {
470 PutRandomToWriteBatch("key" + std::to_string(j
* 100 + i
), &rnd
, &batch
,
474 ASSERT_OK(blob_db_
->Write(WriteOptions(), &batch
));
479 TEST_F(BlobDBTest
, Delete
) {
481 BlobDBOptions bdb_options
;
482 bdb_options
.min_blob_size
= 0;
483 bdb_options
.disable_background_tasks
= true;
485 std::map
<std::string
, std::string
> data
;
486 for (size_t i
= 0; i
< 100; i
++) {
487 PutRandom("key" + std::to_string(i
), &rnd
, &data
);
489 for (size_t i
= 0; i
< 100; i
+= 5) {
490 Delete("key" + std::to_string(i
), &data
);
495 TEST_F(BlobDBTest
, DeleteBatch
) {
497 BlobDBOptions bdb_options
;
498 bdb_options
.min_blob_size
= 0;
499 bdb_options
.disable_background_tasks
= true;
501 for (size_t i
= 0; i
< 100; i
++) {
502 PutRandom("key" + std::to_string(i
), &rnd
);
505 for (size_t i
= 0; i
< 100; i
++) {
506 ASSERT_OK(batch
.Delete("key" + std::to_string(i
)));
508 ASSERT_OK(blob_db_
->Write(WriteOptions(), &batch
));
509 // DB should be empty.
513 TEST_F(BlobDBTest
, Override
) {
515 BlobDBOptions bdb_options
;
516 bdb_options
.min_blob_size
= 0;
517 bdb_options
.disable_background_tasks
= true;
519 std::map
<std::string
, std::string
> data
;
520 for (int i
= 0; i
< 10000; i
++) {
521 PutRandom("key" + std::to_string(i
), &rnd
, nullptr);
523 // override all the keys
524 for (int i
= 0; i
< 10000; i
++) {
525 PutRandom("key" + std::to_string(i
), &rnd
, &data
);
531 TEST_F(BlobDBTest
, Compression
) {
533 BlobDBOptions bdb_options
;
534 bdb_options
.min_blob_size
= 0;
535 bdb_options
.disable_background_tasks
= true;
536 bdb_options
.compression
= CompressionType::kSnappyCompression
;
538 std::map
<std::string
, std::string
> data
;
539 for (size_t i
= 0; i
< 100; i
++) {
540 PutRandom("put-key" + std::to_string(i
), &rnd
, &data
);
542 for (int i
= 0; i
< 100; i
++) {
544 for (size_t j
= 0; j
< 10; j
++) {
545 PutRandomToWriteBatch("write-batch-key" + std::to_string(j
* 100 + i
),
546 &rnd
, &batch
, &data
);
548 ASSERT_OK(blob_db_
->Write(WriteOptions(), &batch
));
553 TEST_F(BlobDBTest
, DecompressAfterReopen
) {
555 BlobDBOptions bdb_options
;
556 bdb_options
.min_blob_size
= 0;
557 bdb_options
.disable_background_tasks
= true;
558 bdb_options
.compression
= CompressionType::kSnappyCompression
;
560 std::map
<std::string
, std::string
> data
;
561 for (size_t i
= 0; i
< 100; i
++) {
562 PutRandom("put-key" + std::to_string(i
), &rnd
, &data
);
565 bdb_options
.compression
= CompressionType::kNoCompression
;
570 TEST_F(BlobDBTest
, EnableDisableCompressionGC
) {
572 BlobDBOptions bdb_options
;
573 bdb_options
.min_blob_size
= 0;
574 bdb_options
.garbage_collection_cutoff
= 1.0;
575 bdb_options
.disable_background_tasks
= true;
576 bdb_options
.compression
= kSnappyCompression
;
578 std::map
<std::string
, std::string
> data
;
580 for (; data_idx
< 100; data_idx
++) {
581 PutRandom("put-key" + std::to_string(data_idx
), &rnd
, &data
);
584 auto blob_files
= blob_db_impl()->TEST_GetBlobFiles();
585 ASSERT_EQ(1, blob_files
.size());
586 ASSERT_EQ(kSnappyCompression
, blob_files
[0]->GetCompressionType());
588 // disable compression
589 bdb_options
.compression
= kNoCompression
;
592 // Add more data with new compression type
593 for (; data_idx
< 200; data_idx
++) {
594 PutRandom("put-key" + std::to_string(data_idx
), &rnd
, &data
);
598 blob_files
= blob_db_impl()->TEST_GetBlobFiles();
599 ASSERT_EQ(2, blob_files
.size());
600 ASSERT_EQ(kNoCompression
, blob_files
[1]->GetCompressionType());
602 // Enable GC. If we do it earlier the snapshot release triggered compaction
603 // may compact files and trigger GC before we can verify there are two files.
604 bdb_options
.enable_garbage_collection
= true;
607 // Trigger compaction
608 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
609 blob_db_impl()->TEST_DeleteObsoleteFiles();
612 blob_files
= blob_db_impl()->TEST_GetBlobFiles();
613 for (auto bfile
: blob_files
) {
614 ASSERT_EQ(kNoCompression
, bfile
->GetCompressionType());
617 // enabling the compression again
618 bdb_options
.compression
= kSnappyCompression
;
621 // Add more data with new compression type
622 for (; data_idx
< 300; data_idx
++) {
623 PutRandom("put-key" + std::to_string(data_idx
), &rnd
, &data
);
627 // Trigger compaction
628 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
629 blob_db_impl()->TEST_DeleteObsoleteFiles();
632 blob_files
= blob_db_impl()->TEST_GetBlobFiles();
633 for (auto bfile
: blob_files
) {
634 ASSERT_EQ(kSnappyCompression
, bfile
->GetCompressionType());
639 // Test switch compression types and run GC, it needs both Snappy and LZ4
641 TEST_F(BlobDBTest
, ChangeCompressionGC
) {
643 BlobDBOptions bdb_options
;
644 bdb_options
.min_blob_size
= 0;
645 bdb_options
.garbage_collection_cutoff
= 1.0;
646 bdb_options
.disable_background_tasks
= true;
647 bdb_options
.compression
= kLZ4Compression
;
649 std::map
<std::string
, std::string
> data
;
651 for (; data_idx
< 100; data_idx
++) {
652 PutRandom("put-key" + std::to_string(data_idx
), &rnd
, &data
);
655 auto blob_files
= blob_db_impl()->TEST_GetBlobFiles();
656 ASSERT_EQ(1, blob_files
.size());
657 ASSERT_EQ(kLZ4Compression
, blob_files
[0]->GetCompressionType());
659 // Change compression type
660 bdb_options
.compression
= kSnappyCompression
;
663 // Add more data with Snappy compression type
664 for (; data_idx
< 200; data_idx
++) {
665 PutRandom("put-key" + std::to_string(data_idx
), &rnd
, &data
);
669 // Verify blob file compression type
670 blob_files
= blob_db_impl()->TEST_GetBlobFiles();
671 ASSERT_EQ(2, blob_files
.size());
672 ASSERT_EQ(kSnappyCompression
, blob_files
[1]->GetCompressionType());
674 // Enable GC. If we do it earlier the snapshot release triggered compaction
675 // may compact files and trigger GC before we can verify there are two files.
676 bdb_options
.enable_garbage_collection
= true;
679 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
682 blob_db_impl()->TEST_DeleteObsoleteFiles();
683 blob_files
= blob_db_impl()->TEST_GetBlobFiles();
684 for (auto bfile
: blob_files
) {
685 ASSERT_EQ(kSnappyCompression
, bfile
->GetCompressionType());
688 // Disable compression
689 bdb_options
.compression
= kNoCompression
;
691 for (; data_idx
< 300; data_idx
++) {
692 PutRandom("put-key" + std::to_string(data_idx
), &rnd
, &data
);
696 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
699 blob_db_impl()->TEST_DeleteObsoleteFiles();
700 blob_files
= blob_db_impl()->TEST_GetBlobFiles();
701 for (auto bfile
: blob_files
) {
702 ASSERT_EQ(kNoCompression
, bfile
->GetCompressionType());
705 // switching different compression types to generate mixed compression types
706 bdb_options
.compression
= kSnappyCompression
;
708 for (; data_idx
< 400; data_idx
++) {
709 PutRandom("put-key" + std::to_string(data_idx
), &rnd
, &data
);
713 bdb_options
.compression
= kLZ4Compression
;
715 for (; data_idx
< 500; data_idx
++) {
716 PutRandom("put-key" + std::to_string(data_idx
), &rnd
, &data
);
720 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
723 blob_db_impl()->TEST_DeleteObsoleteFiles();
724 blob_files
= blob_db_impl()->TEST_GetBlobFiles();
725 for (auto bfile
: blob_files
) {
726 ASSERT_EQ(kLZ4Compression
, bfile
->GetCompressionType());
732 TEST_F(BlobDBTest
, MultipleWriters
) {
733 Open(BlobDBOptions());
735 std::vector
<port::Thread
> workers
;
736 std::vector
<std::map
<std::string
, std::string
>> data_set(10);
737 for (uint32_t i
= 0; i
< 10; i
++)
738 workers
.push_back(port::Thread(
740 Random
rnd(301 + id
);
741 for (int j
= 0; j
< 100; j
++) {
743 "key" + std::to_string(id
) + "_" + std::to_string(j
);
745 PutRandom(key
, &rnd
, &data_set
[id
]);
748 PutRandomToWriteBatch(key
, &rnd
, &batch
, &data_set
[id
]);
749 ASSERT_OK(blob_db_
->Write(WriteOptions(), &batch
));
754 std::map
<std::string
, std::string
> data
;
755 for (size_t i
= 0; i
< 10; i
++) {
757 data
.insert(data_set
[i
].begin(), data_set
[i
].end());
762 TEST_F(BlobDBTest
, SstFileManager
) {
763 // run the same test for Get(), MultiGet() and Iterator each.
764 std::shared_ptr
<SstFileManager
> sst_file_manager(
765 NewSstFileManager(mock_env_
.get()));
766 sst_file_manager
->SetDeleteRateBytesPerSecond(1);
767 SstFileManagerImpl
*sfm
=
768 static_cast<SstFileManagerImpl
*>(sst_file_manager
.get());
770 BlobDBOptions bdb_options
;
771 bdb_options
.min_blob_size
= 0;
772 bdb_options
.enable_garbage_collection
= true;
773 bdb_options
.garbage_collection_cutoff
= 1.0;
776 int files_scheduled_to_delete
= 0;
777 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
778 "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg
) {
780 const std::string
*const file_path
=
781 static_cast<const std::string
*>(arg
);
782 if (file_path
->find(".blob") != std::string::npos
) {
783 ++files_scheduled_to_delete
;
786 SyncPoint::GetInstance()->EnableProcessing();
787 db_options
.sst_file_manager
= sst_file_manager
;
789 Open(bdb_options
, db_options
);
791 // Create one obselete file and clean it.
792 ASSERT_OK(blob_db_
->Put(WriteOptions(), "foo", "bar"));
793 auto blob_files
= blob_db_impl()->TEST_GetBlobFiles();
794 ASSERT_EQ(1, blob_files
.size());
795 std::shared_ptr
<BlobFile
> bfile
= blob_files
[0];
796 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile
));
797 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
798 blob_db_impl()->TEST_DeleteObsoleteFiles();
800 // Even if SSTFileManager is not set, DB is creating a dummy one.
801 ASSERT_EQ(1, files_scheduled_to_delete
);
803 // Make sure that DestroyBlobDB() also goes through delete scheduler.
804 ASSERT_EQ(2, files_scheduled_to_delete
);
805 SyncPoint::GetInstance()->DisableProcessing();
806 sfm
->WaitForEmptyTrash();
809 TEST_F(BlobDBTest
, SstFileManagerRestart
) {
810 int files_scheduled_to_delete
= 0;
811 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
812 "SstFileManagerImpl::ScheduleFileDeletion", [&](void *arg
) {
814 const std::string
*const file_path
=
815 static_cast<const std::string
*>(arg
);
816 if (file_path
->find(".blob") != std::string::npos
) {
817 ++files_scheduled_to_delete
;
821 // run the same test for Get(), MultiGet() and Iterator each.
822 std::shared_ptr
<SstFileManager
> sst_file_manager(
823 NewSstFileManager(mock_env_
.get()));
824 sst_file_manager
->SetDeleteRateBytesPerSecond(1);
825 SstFileManagerImpl
*sfm
=
826 static_cast<SstFileManagerImpl
*>(sst_file_manager
.get());
828 BlobDBOptions bdb_options
;
829 bdb_options
.min_blob_size
= 0;
832 SyncPoint::GetInstance()->EnableProcessing();
833 db_options
.sst_file_manager
= sst_file_manager
;
835 Open(bdb_options
, db_options
);
836 std::string blob_dir
= blob_db_impl()->TEST_blob_dir();
837 ASSERT_OK(blob_db_
->Put(WriteOptions(), "foo", "bar"));
840 // Create 3 dummy trash files under the blob_dir
841 const auto &fs
= db_options
.env
->GetFileSystem();
842 ASSERT_OK(CreateFile(fs
, blob_dir
+ "/000666.blob.trash", "", false));
843 ASSERT_OK(CreateFile(fs
, blob_dir
+ "/000888.blob.trash", "", true));
844 ASSERT_OK(CreateFile(fs
, blob_dir
+ "/something_not_match.trash", "", false));
846 // Make sure that reopening the DB rescan the existing trash files
847 Open(bdb_options
, db_options
);
848 ASSERT_EQ(files_scheduled_to_delete
, 2);
850 sfm
->WaitForEmptyTrash();
852 // There should be exact one file under the blob dir now.
853 std::vector
<std::string
> all_files
;
854 ASSERT_OK(db_options
.env
->GetChildren(blob_dir
, &all_files
));
856 for (const auto &f
: all_files
) {
863 ASSERT_EQ(nfiles
, 1);
865 SyncPoint::GetInstance()->DisableProcessing();
868 TEST_F(BlobDBTest
, SnapshotAndGarbageCollection
) {
869 BlobDBOptions bdb_options
;
870 bdb_options
.min_blob_size
= 0;
871 bdb_options
.enable_garbage_collection
= true;
872 bdb_options
.garbage_collection_cutoff
= 1.0;
873 bdb_options
.disable_background_tasks
= true;
876 options
.disable_auto_compactions
= true;
878 // i = when to take snapshot
879 for (int i
= 0; i
< 4; i
++) {
881 Open(bdb_options
, options
);
883 const Snapshot
*snapshot
= nullptr;
886 ASSERT_OK(Put("key1", "value"));
888 snapshot
= blob_db_
->GetSnapshot();
891 auto blob_files
= blob_db_impl()->TEST_GetBlobFiles();
892 ASSERT_EQ(1, blob_files
.size());
893 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files
[0]));
896 ASSERT_OK(Put("key2", "value"));
898 snapshot
= blob_db_
->GetSnapshot();
901 blob_files
= blob_db_impl()->TEST_GetBlobFiles();
902 ASSERT_EQ(2, blob_files
.size());
903 auto bfile
= blob_files
[1];
904 ASSERT_FALSE(bfile
->Immutable());
905 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile
));
908 ASSERT_OK(Put("key3", "value"));
910 snapshot
= blob_db_
->GetSnapshot();
913 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
914 ASSERT_TRUE(bfile
->Obsolete());
915 ASSERT_EQ(blob_db_
->GetLatestSequenceNumber(),
916 bfile
->GetObsoleteSequence());
920 snapshot
= blob_db_
->GetSnapshot();
923 ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
924 blob_db_impl()->TEST_DeleteObsoleteFiles();
927 // The snapshot shouldn't see data in bfile
928 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
929 blob_db_
->ReleaseSnapshot(snapshot
);
931 // The snapshot will see data in bfile, so the file shouldn't be deleted
932 ASSERT_EQ(4, blob_db_impl()->TEST_GetBlobFiles().size());
933 blob_db_
->ReleaseSnapshot(snapshot
);
934 blob_db_impl()->TEST_DeleteObsoleteFiles();
935 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
940 TEST_F(BlobDBTest
, ColumnFamilyNotSupported
) {
942 options
.env
= mock_env_
.get();
943 mock_clock_
->SetCurrentTime(0);
944 Open(BlobDBOptions(), options
);
945 ColumnFamilyHandle
*default_handle
= blob_db_
->DefaultColumnFamily();
946 ColumnFamilyHandle
*handle
= nullptr;
948 std::vector
<std::string
> values
;
949 // The call simply pass through to base db. It should succeed.
951 blob_db_
->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle
));
952 ASSERT_TRUE(blob_db_
->Put(WriteOptions(), handle
, "k", "v").IsNotSupported());
953 ASSERT_TRUE(blob_db_
->PutWithTTL(WriteOptions(), handle
, "k", "v", 60)
955 ASSERT_TRUE(blob_db_
->PutUntil(WriteOptions(), handle
, "k", "v", 100)
958 ASSERT_OK(batch
.Put("k1", "v1"));
959 ASSERT_OK(batch
.Put(handle
, "k2", "v2"));
960 ASSERT_TRUE(blob_db_
->Write(WriteOptions(), &batch
).IsNotSupported());
961 ASSERT_TRUE(blob_db_
->Get(ReadOptions(), "k1", &value
).IsNotFound());
963 blob_db_
->Get(ReadOptions(), handle
, "k", &value
).IsNotSupported());
964 auto statuses
= blob_db_
->MultiGet(ReadOptions(), {default_handle
, handle
},
965 {"k1", "k2"}, &values
);
966 ASSERT_EQ(2, statuses
.size());
967 ASSERT_TRUE(statuses
[0].IsNotSupported());
968 ASSERT_TRUE(statuses
[1].IsNotSupported());
969 ASSERT_EQ(nullptr, blob_db_
->NewIterator(ReadOptions(), handle
));
973 TEST_F(BlobDBTest
, GetLiveFilesMetaData
) {
976 BlobDBOptions bdb_options
;
977 bdb_options
.blob_dir
= "blob_dir";
978 bdb_options
.path_relative
= true;
979 bdb_options
.ttl_range_secs
= 10;
980 bdb_options
.min_blob_size
= 0;
981 bdb_options
.disable_background_tasks
= true;
984 options
.env
= mock_env_
.get();
986 Open(bdb_options
, options
);
988 std::map
<std::string
, std::string
> data
;
989 for (size_t i
= 0; i
< 100; i
++) {
990 PutRandom("key" + std::to_string(i
), &rnd
, &data
);
993 constexpr uint64_t expiration
= 1000ULL;
994 PutRandomUntil("key100", expiration
, &rnd
, &data
);
996 std::vector
<LiveFileMetaData
> metadata
;
997 blob_db_
->GetLiveFilesMetaData(&metadata
);
999 ASSERT_EQ(2U, metadata
.size());
1000 // Path should be relative to db_name, but begin with slash.
1001 const std::string
filename1("/blob_dir/000001.blob");
1002 ASSERT_EQ(filename1
, metadata
[0].name
);
1003 ASSERT_EQ(1, metadata
[0].file_number
);
1004 ASSERT_EQ(0, metadata
[0].oldest_ancester_time
);
1005 ASSERT_EQ(kDefaultColumnFamilyName
, metadata
[0].column_family_name
);
1007 const std::string
filename2("/blob_dir/000002.blob");
1008 ASSERT_EQ(filename2
, metadata
[1].name
);
1009 ASSERT_EQ(2, metadata
[1].file_number
);
1010 ASSERT_EQ(expiration
, metadata
[1].oldest_ancester_time
);
1011 ASSERT_EQ(kDefaultColumnFamilyName
, metadata
[1].column_family_name
);
1013 std::vector
<std::string
> livefile
;
1015 ASSERT_OK(blob_db_
->GetLiveFiles(livefile
, &mfs
, false));
1016 ASSERT_EQ(5U, livefile
.size());
1017 ASSERT_EQ(filename1
, livefile
[3]);
1018 ASSERT_EQ(filename2
, livefile
[4]);
1022 TEST_F(BlobDBTest
, MigrateFromPlainRocksDB
) {
1023 constexpr size_t kNumKey
= 20;
1024 constexpr size_t kNumIteration
= 10;
1026 std::map
<std::string
, std::string
> data
;
1027 std::vector
<bool> is_blob(kNumKey
, false);
1029 // Write to plain rocksdb.
1031 options
.create_if_missing
= true;
1033 ASSERT_OK(DB::Open(options
, dbname_
, &db
));
1034 for (size_t i
= 0; i
< kNumIteration
; i
++) {
1035 auto key_index
= rnd
.Next() % kNumKey
;
1036 std::string key
= "key" + std::to_string(key_index
);
1037 PutRandom(db
, key
, &rnd
, &data
);
1043 // Open as blob db. Verify it can read existing data.
1045 VerifyDB(blob_db_
, data
);
1046 for (size_t i
= 0; i
< kNumIteration
; i
++) {
1047 auto key_index
= rnd
.Next() % kNumKey
;
1048 std::string key
= "key" + std::to_string(key_index
);
1049 is_blob
[key_index
] = true;
1050 PutRandom(blob_db_
, key
, &rnd
, &data
);
1052 VerifyDB(blob_db_
, data
);
1056 // Verify plain db return error for keys written by blob db.
1057 ASSERT_OK(DB::Open(options
, dbname_
, &db
));
1059 for (size_t i
= 0; i
< kNumKey
; i
++) {
1060 std::string key
= "key" + std::to_string(i
);
1061 Status s
= db
->Get(ReadOptions(), key
, &value
);
1062 if (data
.count(key
) == 0) {
1063 ASSERT_TRUE(s
.IsNotFound());
1064 } else if (is_blob
[i
]) {
1065 ASSERT_TRUE(s
.IsCorruption());
1068 ASSERT_EQ(data
[key
], value
);
1074 // Test to verify that a NoSpace IOError Status is returned on reaching
1075 // max_db_size limit.
1076 TEST_F(BlobDBTest
, OutOfSpace
) {
1077 // Use mock env to stop wall clock.
1079 options
.env
= mock_env_
.get();
1080 BlobDBOptions bdb_options
;
1081 bdb_options
.max_db_size
= 200;
1082 bdb_options
.is_fifo
= false;
1083 bdb_options
.disable_background_tasks
= true;
1086 // Each stored blob has an overhead of about 42 bytes currently.
1087 // So a small key + a 100 byte blob should take up ~150 bytes in the db.
1088 std::string
value(100, 'v');
1089 ASSERT_OK(blob_db_
->PutWithTTL(WriteOptions(), "key1", value
, 60));
1091 // Putting another blob should fail as ading it would exceed the max_db_size
1093 Status s
= blob_db_
->PutWithTTL(WriteOptions(), "key2", value
, 60);
1094 ASSERT_TRUE(s
.IsIOError());
1095 ASSERT_TRUE(s
.IsNoSpace());
1098 TEST_F(BlobDBTest
, FIFOEviction
) {
1099 BlobDBOptions bdb_options
;
1100 bdb_options
.max_db_size
= 200;
1101 bdb_options
.blob_file_size
= 100;
1102 bdb_options
.is_fifo
= true;
1103 bdb_options
.disable_background_tasks
= true;
1106 std::atomic
<int> evict_count
{0};
1107 SyncPoint::GetInstance()->SetCallBack(
1108 "BlobDBImpl::EvictOldestBlobFile:Evicted",
1109 [&](void *) { evict_count
++; });
1110 SyncPoint::GetInstance()->EnableProcessing();
1112 // Each stored blob has an overhead of 32 bytes currently.
1113 // So a 100 byte blob should take up 132 bytes.
1114 std::string
value(100, 'v');
1115 ASSERT_OK(blob_db_
->PutWithTTL(WriteOptions(), "key1", value
, 10));
1116 VerifyDB({{"key1", value
}});
1118 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1120 // Adding another 100 bytes blob would take the total size to 264 bytes
1121 // (2*132). max_db_size will be exceeded
1122 // than max_db_size and trigger FIFO eviction.
1123 ASSERT_OK(blob_db_
->PutWithTTL(WriteOptions(), "key2", value
, 60));
1124 ASSERT_EQ(1, evict_count
);
1125 // key1 will exist until corresponding file be deleted.
1126 VerifyDB({{"key1", value
}, {"key2", value
}});
1128 // Adding another 100 bytes blob without TTL.
1129 ASSERT_OK(blob_db_
->Put(WriteOptions(), "key3", value
));
1130 ASSERT_EQ(2, evict_count
);
1131 // key1 and key2 will exist until corresponding file be deleted.
1132 VerifyDB({{"key1", value
}, {"key2", value
}, {"key3", value
}});
1134 // The fourth blob file, without TTL.
1135 ASSERT_OK(blob_db_
->Put(WriteOptions(), "key4", value
));
1136 ASSERT_EQ(3, evict_count
);
1138 {{"key1", value
}, {"key2", value
}, {"key3", value
}, {"key4", value
}});
1140 auto blob_files
= blob_db_impl()->TEST_GetBlobFiles();
1141 ASSERT_EQ(4, blob_files
.size());
1142 ASSERT_TRUE(blob_files
[0]->Obsolete());
1143 ASSERT_TRUE(blob_files
[1]->Obsolete());
1144 ASSERT_TRUE(blob_files
[2]->Obsolete());
1145 ASSERT_FALSE(blob_files
[3]->Obsolete());
1146 auto obsolete_files
= blob_db_impl()->TEST_GetObsoleteFiles();
1147 ASSERT_EQ(3, obsolete_files
.size());
1148 ASSERT_EQ(blob_files
[0], obsolete_files
[0]);
1149 ASSERT_EQ(blob_files
[1], obsolete_files
[1]);
1150 ASSERT_EQ(blob_files
[2], obsolete_files
[2]);
1152 blob_db_impl()->TEST_DeleteObsoleteFiles();
1153 obsolete_files
= blob_db_impl()->TEST_GetObsoleteFiles();
1154 ASSERT_TRUE(obsolete_files
.empty());
1155 VerifyDB({{"key4", value
}});
1158 TEST_F(BlobDBTest
, FIFOEviction_NoOldestFileToEvict
) {
1160 BlobDBOptions bdb_options
;
1161 bdb_options
.max_db_size
= 1000;
1162 bdb_options
.blob_file_size
= 5000;
1163 bdb_options
.is_fifo
= true;
1164 bdb_options
.disable_background_tasks
= true;
1167 std::atomic
<int> evict_count
{0};
1168 SyncPoint::GetInstance()->SetCallBack(
1169 "BlobDBImpl::EvictOldestBlobFile:Evicted",
1170 [&](void *) { evict_count
++; });
1171 SyncPoint::GetInstance()->EnableProcessing();
1173 std::string
value(2000, 'v');
1174 ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace());
1175 ASSERT_EQ(0, evict_count
);
1178 TEST_F(BlobDBTest
, FIFOEviction_NoEnoughBlobFilesToEvict
) {
1179 BlobDBOptions bdb_options
;
1180 bdb_options
.is_fifo
= true;
1181 bdb_options
.min_blob_size
= 100;
1182 bdb_options
.disable_background_tasks
= true;
1184 // Use mock env to stop wall clock.
1185 options
.env
= mock_env_
.get();
1186 options
.disable_auto_compactions
= true;
1187 auto statistics
= CreateDBStatistics();
1188 options
.statistics
= statistics
;
1189 Open(bdb_options
, options
);
1191 ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size());
1192 std::string
small_value(50, 'v');
1193 std::map
<std::string
, std::string
> data
;
1194 // Insert some data into LSM tree to make sure FIFO eviction take SST
1195 // file size into account.
1196 for (int i
= 0; i
< 1000; i
++) {
1197 ASSERT_OK(Put("key" + std::to_string(i
), small_value
, &data
));
1199 ASSERT_OK(blob_db_
->Flush(FlushOptions()));
1200 uint64_t live_sst_size
= 0;
1201 ASSERT_TRUE(blob_db_
->GetIntProperty(DB::Properties::kTotalSstFilesSize
,
1203 ASSERT_TRUE(live_sst_size
> 0);
1204 ASSERT_EQ(live_sst_size
, blob_db_impl()->TEST_live_sst_size());
1206 bdb_options
.max_db_size
= live_sst_size
+ 2000;
1207 Reopen(bdb_options
, options
);
1208 ASSERT_EQ(live_sst_size
, blob_db_impl()->TEST_live_sst_size());
1210 std::string
value_1k(1000, 'v');
1211 ASSERT_OK(PutWithTTL("large_key1", value_1k
, 60, &data
));
1212 ASSERT_EQ(0, statistics
->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED
));
1214 // large_key2 evicts large_key1
1215 ASSERT_OK(PutWithTTL("large_key2", value_1k
, 60, &data
));
1216 ASSERT_EQ(1, statistics
->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED
));
1217 blob_db_impl()->TEST_DeleteObsoleteFiles();
1218 data
.erase("large_key1");
1220 // large_key3 get no enough space even after evicting large_key2, so it
1221 // instead return no space error.
1222 std::string
value_2k(2000, 'v');
1223 ASSERT_TRUE(PutWithTTL("large_key3", value_2k
, 60).IsNoSpace());
1224 ASSERT_EQ(1, statistics
->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED
));
1225 // Verify large_key2 still exists.
1229 // Test flush or compaction will trigger FIFO eviction since they update
1230 // total SST file size.
1231 TEST_F(BlobDBTest
, FIFOEviction_TriggerOnSSTSizeChange
) {
1232 BlobDBOptions bdb_options
;
1233 bdb_options
.max_db_size
= 1000;
1234 bdb_options
.is_fifo
= true;
1235 bdb_options
.min_blob_size
= 100;
1236 bdb_options
.disable_background_tasks
= true;
1238 // Use mock env to stop wall clock.
1239 options
.env
= mock_env_
.get();
1240 auto statistics
= CreateDBStatistics();
1241 options
.statistics
= statistics
;
1242 options
.compression
= kNoCompression
;
1243 Open(bdb_options
, options
);
1245 std::string
value(800, 'v');
1246 ASSERT_OK(PutWithTTL("large_key", value
, 60));
1247 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1248 ASSERT_EQ(0, statistics
->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED
));
1249 VerifyDB({{"large_key", value
}});
1251 // Insert some small keys and flush to bring DB out of space.
1252 std::map
<std::string
, std::string
> data
;
1253 for (int i
= 0; i
< 10; i
++) {
1254 ASSERT_OK(Put("key" + std::to_string(i
), "v", &data
));
1256 ASSERT_OK(blob_db_
->Flush(FlushOptions()));
1258 // Verify large_key is deleted by FIFO eviction.
1259 blob_db_impl()->TEST_DeleteObsoleteFiles();
1260 ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
1261 ASSERT_EQ(1, statistics
->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED
));
1265 TEST_F(BlobDBTest
, InlineSmallValues
) {
1266 constexpr uint64_t kMaxExpiration
= 1000;
1268 BlobDBOptions bdb_options
;
1269 bdb_options
.ttl_range_secs
= kMaxExpiration
;
1270 bdb_options
.min_blob_size
= 100;
1271 bdb_options
.blob_file_size
= 256 * 1000 * 1000;
1272 bdb_options
.disable_background_tasks
= true;
1274 options
.env
= mock_env_
.get();
1275 mock_clock_
->SetCurrentTime(0);
1276 Open(bdb_options
, options
);
1277 std::map
<std::string
, std::string
> data
;
1278 std::map
<std::string
, KeyVersion
> versions
;
1279 for (size_t i
= 0; i
< 1000; i
++) {
1280 bool is_small_value
= rnd
.Next() % 2;
1281 bool has_ttl
= rnd
.Next() % 2;
1282 uint64_t expiration
= rnd
.Next() % kMaxExpiration
;
1283 int len
= is_small_value
? 50 : 200;
1284 std::string key
= "key" + std::to_string(i
);
1285 std::string value
= rnd
.HumanReadableString(len
);
1286 std::string blob_index
;
1288 SequenceNumber sequence
= blob_db_
->GetLatestSequenceNumber() + 1;
1290 ASSERT_OK(blob_db_
->Put(WriteOptions(), key
, value
));
1292 ASSERT_OK(blob_db_
->PutUntil(WriteOptions(), key
, value
, expiration
));
1294 ASSERT_EQ(blob_db_
->GetLatestSequenceNumber(), sequence
);
1296 KeyVersion(key
, value
, sequence
,
1297 (is_small_value
&& !has_ttl
) ? kTypeValue
: kTypeBlobIndex
);
1300 VerifyBaseDB(versions
);
1301 auto *bdb_impl
= static_cast<BlobDBImpl
*>(blob_db_
);
1302 auto blob_files
= bdb_impl
->TEST_GetBlobFiles();
1303 ASSERT_EQ(2, blob_files
.size());
1304 std::shared_ptr
<BlobFile
> non_ttl_file
;
1305 std::shared_ptr
<BlobFile
> ttl_file
;
1306 if (blob_files
[0]->HasTTL()) {
1307 ttl_file
= blob_files
[0];
1308 non_ttl_file
= blob_files
[1];
1310 non_ttl_file
= blob_files
[0];
1311 ttl_file
= blob_files
[1];
1313 ASSERT_FALSE(non_ttl_file
->HasTTL());
1314 ASSERT_TRUE(ttl_file
->HasTTL());
1317 TEST_F(BlobDBTest
, UserCompactionFilter
) {
1318 class CustomerFilter
: public CompactionFilter
{
1320 bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
&value
,
1321 std::string
*new_value
, bool *value_changed
) const override
{
1322 *value_changed
= false;
1323 // changing value size to test value transitions between inlined data
1324 // and stored-in-blob data
1325 if (value
.size() % 4 == 1) {
1326 *new_value
= value
.ToString();
1327 // double size by duplicating value
1328 *new_value
+= *new_value
;
1329 *value_changed
= true;
1331 } else if (value
.size() % 3 == 1) {
1332 *new_value
= value
.ToString();
1333 // trancate value size by half
1334 *new_value
= new_value
->substr(0, new_value
->size() / 2);
1335 *value_changed
= true;
1337 } else if (value
.size() % 2 == 1) {
1342 bool IgnoreSnapshots() const override
{ return true; }
1343 const char *Name() const override
{ return "CustomerFilter"; }
1345 class CustomerFilterFactory
: public CompactionFilterFactory
{
1346 const char *Name() const override
{ return "CustomerFilterFactory"; }
1347 std::unique_ptr
<CompactionFilter
> CreateCompactionFilter(
1348 const CompactionFilter::Context
& /*context*/) override
{
1349 return std::unique_ptr
<CompactionFilter
>(new CustomerFilter());
1353 constexpr size_t kNumPuts
= 1 << 10;
1354 // Generate both inlined and blob value
1355 constexpr uint64_t kMinValueSize
= 1 << 6;
1356 constexpr uint64_t kMaxValueSize
= 1 << 8;
1357 constexpr uint64_t kMinBlobSize
= 1 << 7;
1358 static_assert(kMinValueSize
< kMinBlobSize
, "");
1359 static_assert(kMaxValueSize
> kMinBlobSize
, "");
1361 BlobDBOptions bdb_options
;
1362 bdb_options
.min_blob_size
= kMinBlobSize
;
1363 bdb_options
.blob_file_size
= kMaxValueSize
* 10;
1364 bdb_options
.disable_background_tasks
= true;
1365 if (Snappy_Supported()) {
1366 bdb_options
.compression
= CompressionType::kSnappyCompression
;
1368 // case_num == 0: Test user defined compaction filter
1369 // case_num == 1: Test user defined compaction filter factory
1370 for (int case_num
= 0; case_num
< 2; case_num
++) {
1372 if (case_num
== 0) {
1373 options
.compaction_filter
= new CustomerFilter();
1375 options
.compaction_filter_factory
.reset(new CustomerFilterFactory());
1377 options
.disable_auto_compactions
= true;
1378 options
.env
= mock_env_
.get();
1379 options
.statistics
= CreateDBStatistics();
1380 Open(bdb_options
, options
);
1382 std::map
<std::string
, std::string
> data
;
1383 std::map
<std::string
, std::string
> data_after_compact
;
1385 uint64_t value_size
= kMinValueSize
;
1386 int drop_record
= 0;
1387 for (size_t i
= 0; i
< kNumPuts
; ++i
) {
1388 std::ostringstream oss
;
1389 oss
<< "key" << std::setw(4) << std::setfill('0') << i
;
1391 const std::string
key(oss
.str());
1392 const std::string value
= rnd
.HumanReadableString((int)value_size
);
1393 const SequenceNumber sequence
= blob_db_
->GetLatestSequenceNumber() + 1;
1395 ASSERT_OK(Put(key
, value
));
1396 ASSERT_EQ(blob_db_
->GetLatestSequenceNumber(), sequence
);
1399 if (value
.length() % 4 == 1) {
1400 data_after_compact
[key
] = value
+ value
;
1401 } else if (value
.length() % 3 == 1) {
1402 data_after_compact
[key
] = value
.substr(0, value
.size() / 2);
1403 } else if (value
.length() % 2 == 1) {
1406 data_after_compact
[key
] = value
;
1409 if (++value_size
> kMaxValueSize
) {
1410 value_size
= kMinValueSize
;
1413 // Verify full data set
1415 // Applying compaction filter for records
1416 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1417 // Verify data after compaction, only value with even length left.
1418 VerifyDB(data_after_compact
);
1419 ASSERT_EQ(drop_record
,
1420 options
.statistics
->getTickerCount(COMPACTION_KEY_DROP_USER
));
1421 delete options
.compaction_filter
;
1426 // Test user comapction filter when there is IO error on blob data.
1427 TEST_F(BlobDBTest
, UserCompactionFilter_BlobIOError
) {
1428 class CustomerFilter
: public CompactionFilter
{
1430 bool Filter(int /*level*/, const Slice
& /*key*/, const Slice
&value
,
1431 std::string
*new_value
, bool *value_changed
) const override
{
1432 *new_value
= value
.ToString() + "_new";
1433 *value_changed
= true;
1436 bool IgnoreSnapshots() const override
{ return true; }
1437 const char *Name() const override
{ return "CustomerFilter"; }
1440 constexpr size_t kNumPuts
= 100;
1441 constexpr int kValueSize
= 100;
1443 BlobDBOptions bdb_options
;
1444 bdb_options
.min_blob_size
= 0;
1445 bdb_options
.blob_file_size
= kValueSize
* 10;
1446 bdb_options
.disable_background_tasks
= true;
1447 bdb_options
.compression
= CompressionType::kNoCompression
;
1449 std::vector
<std::string
> io_failure_cases
= {
1450 "BlobDBImpl::CreateBlobFileAndWriter",
1451 "BlobIndexCompactionFilterBase::WriteBlobToNewFile",
1452 "BlobDBImpl::CloseBlobFile"};
1454 for (size_t case_num
= 0; case_num
< io_failure_cases
.size(); case_num
++) {
1456 options
.compaction_filter
= new CustomerFilter();
1457 options
.disable_auto_compactions
= true;
1458 options
.env
= fault_injection_env_
.get();
1459 options
.statistics
= CreateDBStatistics();
1460 Open(bdb_options
, options
);
1462 std::map
<std::string
, std::string
> data
;
1464 for (size_t i
= 0; i
< kNumPuts
; ++i
) {
1465 std::ostringstream oss
;
1466 oss
<< "key" << std::setw(4) << std::setfill('0') << i
;
1468 const std::string
key(oss
.str());
1469 const std::string value
= rnd
.HumanReadableString(kValueSize
);
1470 const SequenceNumber sequence
= blob_db_
->GetLatestSequenceNumber() + 1;
1472 ASSERT_OK(Put(key
, value
));
1473 ASSERT_EQ(blob_db_
->GetLatestSequenceNumber(), sequence
);
1477 // Verify full data set
1480 SyncPoint::GetInstance()->SetCallBack(
1481 io_failure_cases
[case_num
], [&](void * /*arg*/) {
1482 fault_injection_env_
->SetFilesystemActive(false, Status::IOError());
1484 SyncPoint::GetInstance()->EnableProcessing();
1485 auto s
= blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr);
1486 ASSERT_TRUE(s
.IsIOError());
1488 // Reactivate file system to allow test to verify and close DB.
1489 fault_injection_env_
->SetFilesystemActive(true);
1490 SyncPoint::GetInstance()->DisableProcessing();
1491 SyncPoint::GetInstance()->ClearAllCallBacks();
1493 // Verify full data set after compaction failure
1496 delete options
.compaction_filter
;
1501 // Test comapction filter should remove any expired blob index.
1502 TEST_F(BlobDBTest
, FilterExpiredBlobIndex
) {
1503 constexpr size_t kNumKeys
= 100;
1504 constexpr size_t kNumPuts
= 1000;
1505 constexpr uint64_t kMaxExpiration
= 1000;
1506 constexpr uint64_t kCompactTime
= 500;
1507 constexpr uint64_t kMinBlobSize
= 100;
1509 mock_clock_
->SetCurrentTime(0);
1510 BlobDBOptions bdb_options
;
1511 bdb_options
.min_blob_size
= kMinBlobSize
;
1512 bdb_options
.disable_background_tasks
= true;
1514 options
.env
= mock_env_
.get();
1515 Open(bdb_options
, options
);
1517 std::map
<std::string
, std::string
> data
;
1518 std::map
<std::string
, std::string
> data_after_compact
;
1519 for (size_t i
= 0; i
< kNumPuts
; i
++) {
1520 bool is_small_value
= rnd
.Next() % 2;
1521 bool has_ttl
= rnd
.Next() % 2;
1522 uint64_t expiration
= rnd
.Next() % kMaxExpiration
;
1523 int len
= is_small_value
? 10 : 200;
1524 std::string key
= "key" + std::to_string(rnd
.Next() % kNumKeys
);
1525 std::string value
= rnd
.HumanReadableString(len
);
1527 if (is_small_value
) {
1528 std::string blob_entry
;
1529 BlobIndex::EncodeInlinedTTL(&blob_entry
, expiration
, value
);
1530 // Fake blob index with TTL. See what it will do.
1531 ASSERT_GT(kMinBlobSize
, blob_entry
.size());
1534 ASSERT_OK(Put(key
, value
));
1535 data_after_compact
[key
] = value
;
1537 ASSERT_OK(PutUntil(key
, value
, expiration
));
1538 if (expiration
<= kCompactTime
) {
1539 data_after_compact
.erase(key
);
1541 data_after_compact
[key
] = value
;
1548 mock_clock_
->SetCurrentTime(kCompactTime
);
1549 // Take a snapshot before compaction. Make sure expired blob indexes is
1550 // filtered regardless of snapshot.
1551 const Snapshot
*snapshot
= blob_db_
->GetSnapshot();
1552 // Issue manual compaction to trigger compaction filter.
1553 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1554 blob_db_
->ReleaseSnapshot(snapshot
);
1555 // Verify expired blob index are filtered.
1556 std::vector
<KeyVersion
> versions
;
1557 const size_t kMaxKeys
= 10000;
1558 ASSERT_OK(GetAllKeyVersions(blob_db_
, "", "", kMaxKeys
, &versions
));
1559 ASSERT_EQ(data_after_compact
.size(), versions
.size());
1560 for (auto &version
: versions
) {
1561 ASSERT_TRUE(data_after_compact
.count(version
.user_key
) > 0);
1563 VerifyDB(data_after_compact
);
1566 // Test compaction filter should remove any blob index where corresponding
1567 // blob file has been removed.
1568 TEST_F(BlobDBTest
, FilterFileNotAvailable
) {
1569 BlobDBOptions bdb_options
;
1570 bdb_options
.min_blob_size
= 0;
1571 bdb_options
.disable_background_tasks
= true;
1573 options
.disable_auto_compactions
= true;
1574 Open(bdb_options
, options
);
1576 ASSERT_OK(Put("foo", "v1"));
1577 auto blob_files
= blob_db_impl()->TEST_GetBlobFiles();
1578 ASSERT_EQ(1, blob_files
.size());
1579 ASSERT_EQ(1, blob_files
[0]->BlobFileNumber());
1580 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files
[0]));
1582 ASSERT_OK(Put("bar", "v2"));
1583 blob_files
= blob_db_impl()->TEST_GetBlobFiles();
1584 ASSERT_EQ(2, blob_files
.size());
1585 ASSERT_EQ(2, blob_files
[1]->BlobFileNumber());
1586 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files
[1]));
1588 const size_t kMaxKeys
= 10000;
1590 DB
*base_db
= blob_db_
->GetRootDB();
1591 std::vector
<KeyVersion
> versions
;
1592 ASSERT_OK(GetAllKeyVersions(base_db
, "", "", kMaxKeys
, &versions
));
1593 ASSERT_EQ(2, versions
.size());
1594 ASSERT_EQ("bar", versions
[0].user_key
);
1595 ASSERT_EQ("foo", versions
[1].user_key
);
1596 VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
1598 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1599 ASSERT_OK(GetAllKeyVersions(base_db
, "", "", kMaxKeys
, &versions
));
1600 ASSERT_EQ(2, versions
.size());
1601 ASSERT_EQ("bar", versions
[0].user_key
);
1602 ASSERT_EQ("foo", versions
[1].user_key
);
1603 VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
1605 // Remove the first blob file and compact. foo should be remove from base db.
1606 blob_db_impl()->TEST_ObsoleteBlobFile(blob_files
[0]);
1607 blob_db_impl()->TEST_DeleteObsoleteFiles();
1608 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1609 ASSERT_OK(GetAllKeyVersions(base_db
, "", "", kMaxKeys
, &versions
));
1610 ASSERT_EQ(1, versions
.size());
1611 ASSERT_EQ("bar", versions
[0].user_key
);
1612 VerifyDB({{"bar", "v2"}});
1614 // Remove the second blob file and compact. bar should be remove from base db.
1615 blob_db_impl()->TEST_ObsoleteBlobFile(blob_files
[1]);
1616 blob_db_impl()->TEST_DeleteObsoleteFiles();
1617 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1618 ASSERT_OK(GetAllKeyVersions(base_db
, "", "", kMaxKeys
, &versions
));
1619 ASSERT_EQ(0, versions
.size());
1623 // Test compaction filter should filter any inlined TTL keys that would have
1624 // been dropped by last FIFO eviction if they are store out-of-line.
1625 TEST_F(BlobDBTest
, FilterForFIFOEviction
) {
1627 BlobDBOptions bdb_options
;
1628 bdb_options
.min_blob_size
= 100;
1629 bdb_options
.ttl_range_secs
= 60;
1630 bdb_options
.max_db_size
= 0;
1631 bdb_options
.disable_background_tasks
= true;
1633 // Use mock env to stop wall clock.
1634 mock_clock_
->SetCurrentTime(0);
1635 options
.env
= mock_env_
.get();
1636 auto statistics
= CreateDBStatistics();
1637 options
.statistics
= statistics
;
1638 options
.disable_auto_compactions
= true;
1639 Open(bdb_options
, options
);
1641 std::map
<std::string
, std::string
> data
;
1642 std::map
<std::string
, std::string
> data_after_compact
;
1643 // Insert some small values that will be inlined.
1644 for (int i
= 0; i
< 1000; i
++) {
1645 std::string key
= "key" + std::to_string(i
);
1646 std::string value
= rnd
.HumanReadableString(50);
1647 uint64_t ttl
= rnd
.Next() % 120 + 1;
1648 ASSERT_OK(PutWithTTL(key
, value
, ttl
, &data
));
1650 data_after_compact
[key
] = value
;
1653 uint64_t num_keys_to_evict
= data
.size() - data_after_compact
.size();
1654 ASSERT_OK(blob_db_
->Flush(FlushOptions()));
1655 uint64_t live_sst_size
= blob_db_impl()->TEST_live_sst_size();
1656 ASSERT_GT(live_sst_size
, 0);
1659 bdb_options
.max_db_size
= live_sst_size
+ 30000;
1660 bdb_options
.is_fifo
= true;
1661 Reopen(bdb_options
, options
);
1664 // Put two large values, each on a different blob file.
1665 std::string
large_value(10000, 'v');
1666 ASSERT_OK(PutWithTTL("large_key1", large_value
, 90));
1667 ASSERT_OK(PutWithTTL("large_key2", large_value
, 150));
1668 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
1669 ASSERT_EQ(0, statistics
->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED
));
1670 data
["large_key1"] = large_value
;
1671 data
["large_key2"] = large_value
;
1674 // Put a third large value which will bring the DB out of space.
1675 // FIFO eviction will evict the file of large_key1.
1676 ASSERT_OK(PutWithTTL("large_key3", large_value
, 150));
1677 ASSERT_EQ(1, statistics
->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED
));
1678 ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
1679 blob_db_impl()->TEST_DeleteObsoleteFiles();
1680 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1681 data
.erase("large_key1");
1682 data
["large_key3"] = large_value
;
1685 // Putting some more small values. These values shouldn't be evicted by
1686 // compaction filter since they are inserted after FIFO eviction.
1687 ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact
));
1688 ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact
));
1690 // FIFO eviction doesn't trigger again since there enough room for the flush.
1691 ASSERT_OK(blob_db_
->Flush(FlushOptions()));
1692 ASSERT_EQ(1, statistics
->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED
));
1694 // Manual compact and check if compaction filter evict those keys with
1696 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1697 // All keys with expiration < 60, plus large_key1 is filtered by
1698 // compaction filter.
1699 ASSERT_EQ(num_keys_to_evict
+ 1,
1700 statistics
->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT
));
1701 ASSERT_EQ(1, statistics
->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED
));
1702 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1703 data_after_compact
["large_key2"] = large_value
;
1704 data_after_compact
["large_key3"] = large_value
;
1705 VerifyDB(data_after_compact
);
1708 TEST_F(BlobDBTest
, GarbageCollection
) {
1709 constexpr size_t kNumPuts
= 1 << 10;
1711 constexpr uint64_t kExpiration
= 1000;
1712 constexpr uint64_t kCompactTime
= 500;
1714 constexpr uint64_t kKeySize
= 7; // "key" + 4 digits
1716 constexpr uint64_t kSmallValueSize
= 1 << 6;
1717 constexpr uint64_t kLargeValueSize
= 1 << 8;
1718 constexpr uint64_t kMinBlobSize
= 1 << 7;
1719 static_assert(kSmallValueSize
< kMinBlobSize
, "");
1720 static_assert(kLargeValueSize
> kMinBlobSize
, "");
1722 constexpr size_t kBlobsPerFile
= 8;
1723 constexpr size_t kNumBlobFiles
= kNumPuts
/ kBlobsPerFile
;
1724 constexpr uint64_t kBlobFileSize
=
1725 BlobLogHeader::kSize
+
1726 (BlobLogRecord::kHeaderSize
+ kKeySize
+ kLargeValueSize
) * kBlobsPerFile
;
1728 BlobDBOptions bdb_options
;
1729 bdb_options
.min_blob_size
= kMinBlobSize
;
1730 bdb_options
.blob_file_size
= kBlobFileSize
;
1731 bdb_options
.enable_garbage_collection
= true;
1732 bdb_options
.garbage_collection_cutoff
= 0.25;
1733 bdb_options
.disable_background_tasks
= true;
1736 options
.env
= mock_env_
.get();
1737 options
.statistics
= CreateDBStatistics();
1739 Open(bdb_options
, options
);
1741 std::map
<std::string
, std::string
> data
;
1742 std::map
<std::string
, KeyVersion
> blob_value_versions
;
1743 std::map
<std::string
, BlobIndexVersion
> blob_index_versions
;
1747 // Add a bunch of large non-TTL values. These will be written to non-TTL
1748 // blob files and will be subject to GC.
1749 for (size_t i
= 0; i
< kNumPuts
; ++i
) {
1750 std::ostringstream oss
;
1751 oss
<< "key" << std::setw(4) << std::setfill('0') << i
;
1753 const std::string
key(oss
.str());
1754 const std::string value
= rnd
.HumanReadableString(kLargeValueSize
);
1755 const SequenceNumber sequence
= blob_db_
->GetLatestSequenceNumber() + 1;
1757 ASSERT_OK(Put(key
, value
));
1758 ASSERT_EQ(blob_db_
->GetLatestSequenceNumber(), sequence
);
1761 blob_value_versions
[key
] = KeyVersion(key
, value
, sequence
, kTypeBlobIndex
);
1762 blob_index_versions
[key
] =
1763 BlobIndexVersion(key
, /* file_number */ (i
>> 3) + 1, kNoExpiration
,
1764 sequence
, kTypeBlobIndex
);
1767 // Add some small and/or TTL values that will be ignored during GC.
1768 // First, add a large TTL value will be written to its own TTL blob file.
1770 const std::string
key("key2000");
1771 const std::string value
= rnd
.HumanReadableString(kLargeValueSize
);
1772 const SequenceNumber sequence
= blob_db_
->GetLatestSequenceNumber() + 1;
1774 ASSERT_OK(PutUntil(key
, value
, kExpiration
));
1775 ASSERT_EQ(blob_db_
->GetLatestSequenceNumber(), sequence
);
1778 blob_value_versions
[key
] = KeyVersion(key
, value
, sequence
, kTypeBlobIndex
);
1779 blob_index_versions
[key
] =
1780 BlobIndexVersion(key
, /* file_number */ kNumBlobFiles
+ 1, kExpiration
,
1781 sequence
, kTypeBlobIndex
);
1784 // Now add a small TTL value (which will be inlined).
1786 const std::string
key("key3000");
1787 const std::string value
= rnd
.HumanReadableString(kSmallValueSize
);
1788 const SequenceNumber sequence
= blob_db_
->GetLatestSequenceNumber() + 1;
1790 ASSERT_OK(PutUntil(key
, value
, kExpiration
));
1791 ASSERT_EQ(blob_db_
->GetLatestSequenceNumber(), sequence
);
1794 blob_value_versions
[key
] = KeyVersion(key
, value
, sequence
, kTypeBlobIndex
);
1795 blob_index_versions
[key
] = BlobIndexVersion(
1796 key
, kInvalidBlobFileNumber
, kExpiration
, sequence
, kTypeBlobIndex
);
1799 // Finally, add a small non-TTL value (which will be stored as a regular
1802 const std::string
key("key4000");
1803 const std::string value
= rnd
.HumanReadableString(kSmallValueSize
);
1804 const SequenceNumber sequence
= blob_db_
->GetLatestSequenceNumber() + 1;
1806 ASSERT_OK(Put(key
, value
));
1807 ASSERT_EQ(blob_db_
->GetLatestSequenceNumber(), sequence
);
1810 blob_value_versions
[key
] = KeyVersion(key
, value
, sequence
, kTypeValue
);
1811 blob_index_versions
[key
] = BlobIndexVersion(
1812 key
, kInvalidBlobFileNumber
, kNoExpiration
, sequence
, kTypeValue
);
1816 VerifyBaseDB(blob_value_versions
);
1817 VerifyBaseDBBlobIndex(blob_index_versions
);
1819 // At this point, we should have 128 immutable non-TTL files with file numbers
1822 auto live_imm_files
= blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1823 ASSERT_EQ(live_imm_files
.size(), kNumBlobFiles
);
1824 for (size_t i
= 0; i
< kNumBlobFiles
; ++i
) {
1825 ASSERT_EQ(live_imm_files
[i
]->BlobFileNumber(), i
+ 1);
1826 ASSERT_EQ(live_imm_files
[i
]->GetFileSize(),
1827 kBlobFileSize
+ BlobLogFooter::kSize
);
1831 mock_clock_
->SetCurrentTime(kCompactTime
);
1833 ASSERT_OK(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
1835 // We expect the data to remain the same and the blobs from the oldest N files
1836 // to be moved to new files. Sequence numbers get zeroed out during the
1840 for (auto &pair
: blob_value_versions
) {
1841 KeyVersion
&version
= pair
.second
;
1842 version
.sequence
= 0;
1845 VerifyBaseDB(blob_value_versions
);
1847 const uint64_t cutoff
= static_cast<uint64_t>(
1848 bdb_options
.garbage_collection_cutoff
* kNumBlobFiles
);
1849 for (auto &pair
: blob_index_versions
) {
1850 BlobIndexVersion
&version
= pair
.second
;
1852 version
.sequence
= 0;
1854 if (version
.file_number
== kInvalidBlobFileNumber
) {
1858 if (version
.file_number
> cutoff
) {
1862 version
.file_number
+= kNumBlobFiles
+ 1;
1865 VerifyBaseDBBlobIndex(blob_index_versions
);
1867 const Statistics
*const statistics
= options
.statistics
.get();
1870 ASSERT_EQ(statistics
->getTickerCount(BLOB_DB_GC_NUM_FILES
), cutoff
);
1871 ASSERT_EQ(statistics
->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES
), cutoff
);
1872 ASSERT_EQ(statistics
->getTickerCount(BLOB_DB_GC_FAILURES
), 0);
1873 ASSERT_EQ(statistics
->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED
),
1874 cutoff
* kBlobsPerFile
);
1875 ASSERT_EQ(statistics
->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED
),
1876 cutoff
* kBlobsPerFile
* kLargeValueSize
);
1878 // At this point, we should have 128 immutable non-TTL files with file numbers
1879 // 33..128 and 130..161. (129 was taken by the TTL blob file.)
1881 auto live_imm_files
= blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
1882 ASSERT_EQ(live_imm_files
.size(), kNumBlobFiles
);
1883 for (size_t i
= 0; i
< kNumBlobFiles
; ++i
) {
1884 uint64_t expected_file_number
= i
+ cutoff
+ 1;
1885 if (expected_file_number
> kNumBlobFiles
) {
1886 ++expected_file_number
;
1889 ASSERT_EQ(live_imm_files
[i
]->BlobFileNumber(), expected_file_number
);
1890 ASSERT_EQ(live_imm_files
[i
]->GetFileSize(),
1891 kBlobFileSize
+ BlobLogFooter::kSize
);
1896 TEST_F(BlobDBTest
, GarbageCollectionFailure
) {
1897 BlobDBOptions bdb_options
;
1898 bdb_options
.min_blob_size
= 0;
1899 bdb_options
.enable_garbage_collection
= true;
1900 bdb_options
.garbage_collection_cutoff
= 1.0;
1901 bdb_options
.disable_background_tasks
= true;
1904 db_options
.statistics
= CreateDBStatistics();
1906 Open(bdb_options
, db_options
);
1908 // Write a couple of valid blobs.
1909 ASSERT_OK(Put("foo", "bar"));
1910 ASSERT_OK(Put("dead", "beef"));
1912 // Write a fake blob reference into the base DB that points to a non-existing
1914 std::string blob_index
;
1915 BlobIndex::EncodeBlob(&blob_index
, /* file_number */ 1000, /* offset */ 1234,
1916 /* size */ 5678, kNoCompression
);
1919 ASSERT_OK(WriteBatchInternal::PutBlobIndex(
1920 &batch
, blob_db_
->DefaultColumnFamily()->GetID(), "key", blob_index
));
1921 ASSERT_OK(blob_db_
->GetRootDB()->Write(WriteOptions(), &batch
));
1923 auto blob_files
= blob_db_impl()->TEST_GetBlobFiles();
1924 ASSERT_EQ(blob_files
.size(), 1);
1925 auto blob_file
= blob_files
[0];
1926 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file
));
1928 ASSERT_TRUE(blob_db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr)
1931 const Statistics
*const statistics
= db_options
.statistics
.get();
1934 ASSERT_EQ(statistics
->getTickerCount(BLOB_DB_GC_NUM_FILES
), 0);
1935 ASSERT_EQ(statistics
->getTickerCount(BLOB_DB_GC_NUM_NEW_FILES
), 1);
1936 ASSERT_EQ(statistics
->getTickerCount(BLOB_DB_GC_FAILURES
), 1);
1937 ASSERT_EQ(statistics
->getTickerCount(BLOB_DB_GC_NUM_KEYS_RELOCATED
), 2);
1938 ASSERT_EQ(statistics
->getTickerCount(BLOB_DB_GC_BYTES_RELOCATED
), 7);
1941 // File should be evicted after expiration.
1942 TEST_F(BlobDBTest
, EvictExpiredFile
) {
1943 BlobDBOptions bdb_options
;
1944 bdb_options
.ttl_range_secs
= 100;
1945 bdb_options
.min_blob_size
= 0;
1946 bdb_options
.disable_background_tasks
= true;
1948 options
.env
= mock_env_
.get();
1949 Open(bdb_options
, options
);
1950 mock_clock_
->SetCurrentTime(50);
1951 std::map
<std::string
, std::string
> data
;
1952 ASSERT_OK(PutWithTTL("foo", "bar", 100, &data
));
1953 auto blob_files
= blob_db_impl()->TEST_GetBlobFiles();
1954 ASSERT_EQ(1, blob_files
.size());
1955 auto blob_file
= blob_files
[0];
1956 ASSERT_FALSE(blob_file
->Immutable());
1957 ASSERT_FALSE(blob_file
->Obsolete());
1959 mock_clock_
->SetCurrentTime(250);
1960 // The key should expired now.
1961 blob_db_impl()->TEST_EvictExpiredFiles();
1962 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1963 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1964 ASSERT_TRUE(blob_file
->Immutable());
1965 ASSERT_TRUE(blob_file
->Obsolete());
1966 blob_db_impl()->TEST_DeleteObsoleteFiles();
1967 ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
1968 ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
1969 // Make sure we don't return garbage value after blob file being evicted,
1970 // but the blob index still exists in the LSM tree.
1971 std::string val
= "";
1972 ASSERT_TRUE(blob_db_
->Get(ReadOptions(), "foo", &val
).IsNotFound());
1976 TEST_F(BlobDBTest
, DisableFileDeletions
) {
1977 BlobDBOptions bdb_options
;
1978 bdb_options
.disable_background_tasks
= true;
1980 std::map
<std::string
, std::string
> data
;
1981 for (bool force
: {true, false}) {
1982 ASSERT_OK(Put("foo", "v", &data
));
1983 auto blob_files
= blob_db_impl()->TEST_GetBlobFiles();
1984 ASSERT_EQ(1, blob_files
.size());
1985 auto blob_file
= blob_files
[0];
1986 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file
));
1987 blob_db_impl()->TEST_ObsoleteBlobFile(blob_file
);
1988 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1989 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1990 // Call DisableFileDeletions twice.
1991 ASSERT_OK(blob_db_
->DisableFileDeletions());
1992 ASSERT_OK(blob_db_
->DisableFileDeletions());
1993 // File deletions should be disabled.
1994 blob_db_impl()->TEST_DeleteObsoleteFiles();
1995 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
1996 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
1998 // Enable file deletions once. If force=true, file deletion is enabled.
1999 // Otherwise it needs to enable it for a second time.
2000 ASSERT_OK(blob_db_
->EnableFileDeletions(force
));
2001 blob_db_impl()->TEST_DeleteObsoleteFiles();
2003 ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
2004 ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
2006 // Call EnableFileDeletions a second time.
2007 ASSERT_OK(blob_db_
->EnableFileDeletions(false));
2008 blob_db_impl()->TEST_DeleteObsoleteFiles();
2010 // Regardless of value of `force`, file should be deleted by now.
2011 ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
2012 ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
2017 TEST_F(BlobDBTest
, MaintainBlobFileToSstMapping
) {
2018 BlobDBOptions bdb_options
;
2019 bdb_options
.enable_garbage_collection
= true;
2020 bdb_options
.disable_background_tasks
= true;
2023 // Register some dummy blob files.
2024 blob_db_impl()->TEST_AddDummyBlobFile(1, /* immutable_sequence */ 200);
2025 blob_db_impl()->TEST_AddDummyBlobFile(2, /* immutable_sequence */ 300);
2026 blob_db_impl()->TEST_AddDummyBlobFile(3, /* immutable_sequence */ 400);
2027 blob_db_impl()->TEST_AddDummyBlobFile(4, /* immutable_sequence */ 500);
2028 blob_db_impl()->TEST_AddDummyBlobFile(5, /* immutable_sequence */ 600);
2030 // Initialize the blob <-> SST file mapping. First, add some SST files with
2031 // blob file references, then some without.
2032 std::vector
<LiveFileMetaData
> live_files
;
2034 for (uint64_t i
= 1; i
<= 10; ++i
) {
2035 LiveFileMetaData live_file
;
2036 live_file
.file_number
= i
;
2037 live_file
.oldest_blob_file_number
= ((i
- 1) % 5) + 1;
2039 live_files
.emplace_back(live_file
);
2042 for (uint64_t i
= 11; i
<= 20; ++i
) {
2043 LiveFileMetaData live_file
;
2044 live_file
.file_number
= i
;
2046 live_files
.emplace_back(live_file
);
2049 blob_db_impl()->TEST_InitializeBlobFileToSstMapping(live_files
);
2051 // Check that the blob <-> SST mappings have been correctly initialized.
2052 auto blob_files
= blob_db_impl()->TEST_GetBlobFiles();
2054 ASSERT_EQ(blob_files
.size(), 5);
2057 auto live_imm_files
= blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2058 ASSERT_EQ(live_imm_files
.size(), 5);
2059 for (size_t i
= 0; i
< 5; ++i
) {
2060 ASSERT_EQ(live_imm_files
[i
]->BlobFileNumber(), i
+ 1);
2063 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
2067 const std::vector
<std::unordered_set
<uint64_t>> expected_sst_files
{
2068 {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
2069 const std::vector
<bool> expected_obsolete
{false, false, false, false,
2071 for (size_t i
= 0; i
< 5; ++i
) {
2072 const auto &blob_file
= blob_files
[i
];
2073 ASSERT_EQ(blob_file
->GetLinkedSstFiles(), expected_sst_files
[i
]);
2074 ASSERT_EQ(blob_file
->Obsolete(), expected_obsolete
[i
]);
2077 auto live_imm_files
= blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2078 ASSERT_EQ(live_imm_files
.size(), 5);
2079 for (size_t i
= 0; i
< 5; ++i
) {
2080 ASSERT_EQ(live_imm_files
[i
]->BlobFileNumber(), i
+ 1);
2083 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
2086 // Simulate a flush where the SST does not reference any blob files.
2088 FlushJobInfo info
{};
2089 info
.file_number
= 21;
2090 info
.smallest_seqno
= 1;
2091 info
.largest_seqno
= 100;
2093 blob_db_impl()->TEST_ProcessFlushJobInfo(info
);
2095 const std::vector
<std::unordered_set
<uint64_t>> expected_sst_files
{
2096 {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10}};
2097 const std::vector
<bool> expected_obsolete
{false, false, false, false,
2099 for (size_t i
= 0; i
< 5; ++i
) {
2100 const auto &blob_file
= blob_files
[i
];
2101 ASSERT_EQ(blob_file
->GetLinkedSstFiles(), expected_sst_files
[i
]);
2102 ASSERT_EQ(blob_file
->Obsolete(), expected_obsolete
[i
]);
2105 auto live_imm_files
= blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2106 ASSERT_EQ(live_imm_files
.size(), 5);
2107 for (size_t i
= 0; i
< 5; ++i
) {
2108 ASSERT_EQ(live_imm_files
[i
]->BlobFileNumber(), i
+ 1);
2111 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
2114 // Simulate a flush where the SST references a blob file.
2116 FlushJobInfo info
{};
2117 info
.file_number
= 22;
2118 info
.oldest_blob_file_number
= 5;
2119 info
.smallest_seqno
= 101;
2120 info
.largest_seqno
= 200;
2122 blob_db_impl()->TEST_ProcessFlushJobInfo(info
);
2124 const std::vector
<std::unordered_set
<uint64_t>> expected_sst_files
{
2125 {1, 6}, {2, 7}, {3, 8}, {4, 9}, {5, 10, 22}};
2126 const std::vector
<bool> expected_obsolete
{false, false, false, false,
2128 for (size_t i
= 0; i
< 5; ++i
) {
2129 const auto &blob_file
= blob_files
[i
];
2130 ASSERT_EQ(blob_file
->GetLinkedSstFiles(), expected_sst_files
[i
]);
2131 ASSERT_EQ(blob_file
->Obsolete(), expected_obsolete
[i
]);
2134 auto live_imm_files
= blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2135 ASSERT_EQ(live_imm_files
.size(), 5);
2136 for (size_t i
= 0; i
< 5; ++i
) {
2137 ASSERT_EQ(live_imm_files
[i
]->BlobFileNumber(), i
+ 1);
2140 ASSERT_TRUE(blob_db_impl()->TEST_GetObsoleteFiles().empty());
2143 // Simulate a compaction. Some inputs and outputs have blob file references,
2144 // some don't. There is also a trivial move (which means the SST appears on
2145 // both the input and the output list). Blob file 1 loses all its linked SSTs,
2146 // and since it got marked immutable at sequence number 200 which has already
2147 // been flushed, it can be marked obsolete.
2149 CompactionJobInfo info
{};
2150 info
.input_file_infos
.emplace_back(CompactionFileInfo
{1, 1, 1});
2151 info
.input_file_infos
.emplace_back(CompactionFileInfo
{1, 2, 2});
2152 info
.input_file_infos
.emplace_back(CompactionFileInfo
{1, 6, 1});
2153 info
.input_file_infos
.emplace_back(
2154 CompactionFileInfo
{1, 11, kInvalidBlobFileNumber
});
2155 info
.input_file_infos
.emplace_back(CompactionFileInfo
{1, 22, 5});
2156 info
.output_file_infos
.emplace_back(CompactionFileInfo
{2, 22, 5});
2157 info
.output_file_infos
.emplace_back(CompactionFileInfo
{2, 23, 3});
2158 info
.output_file_infos
.emplace_back(
2159 CompactionFileInfo
{2, 24, kInvalidBlobFileNumber
});
2161 blob_db_impl()->TEST_ProcessCompactionJobInfo(info
);
2163 const std::vector
<std::unordered_set
<uint64_t>> expected_sst_files
{
2164 {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
2165 const std::vector
<bool> expected_obsolete
{true, false, false, false, false};
2166 for (size_t i
= 0; i
< 5; ++i
) {
2167 const auto &blob_file
= blob_files
[i
];
2168 ASSERT_EQ(blob_file
->GetLinkedSstFiles(), expected_sst_files
[i
]);
2169 ASSERT_EQ(blob_file
->Obsolete(), expected_obsolete
[i
]);
2172 auto live_imm_files
= blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2173 ASSERT_EQ(live_imm_files
.size(), 4);
2174 for (size_t i
= 0; i
< 4; ++i
) {
2175 ASSERT_EQ(live_imm_files
[i
]->BlobFileNumber(), i
+ 2);
2178 auto obsolete_files
= blob_db_impl()->TEST_GetObsoleteFiles();
2179 ASSERT_EQ(obsolete_files
.size(), 1);
2180 ASSERT_EQ(obsolete_files
[0]->BlobFileNumber(), 1);
2183 // Simulate a failed compaction. No mappings should be updated.
2185 CompactionJobInfo info
{};
2186 info
.input_file_infos
.emplace_back(CompactionFileInfo
{1, 7, 2});
2187 info
.input_file_infos
.emplace_back(CompactionFileInfo
{2, 22, 5});
2188 info
.output_file_infos
.emplace_back(CompactionFileInfo
{2, 25, 3});
2189 info
.status
= Status::Corruption();
2191 blob_db_impl()->TEST_ProcessCompactionJobInfo(info
);
2193 const std::vector
<std::unordered_set
<uint64_t>> expected_sst_files
{
2194 {}, {7}, {3, 8, 23}, {4, 9}, {5, 10, 22}};
2195 const std::vector
<bool> expected_obsolete
{true, false, false, false, false};
2196 for (size_t i
= 0; i
< 5; ++i
) {
2197 const auto &blob_file
= blob_files
[i
];
2198 ASSERT_EQ(blob_file
->GetLinkedSstFiles(), expected_sst_files
[i
]);
2199 ASSERT_EQ(blob_file
->Obsolete(), expected_obsolete
[i
]);
2202 auto live_imm_files
= blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2203 ASSERT_EQ(live_imm_files
.size(), 4);
2204 for (size_t i
= 0; i
< 4; ++i
) {
2205 ASSERT_EQ(live_imm_files
[i
]->BlobFileNumber(), i
+ 2);
2208 auto obsolete_files
= blob_db_impl()->TEST_GetObsoleteFiles();
2209 ASSERT_EQ(obsolete_files
.size(), 1);
2210 ASSERT_EQ(obsolete_files
[0]->BlobFileNumber(), 1);
2213 // Simulate another compaction. Blob file 2 loses all its linked SSTs
2214 // but since it got marked immutable at sequence number 300 which hasn't
2215 // been flushed yet, it cannot be marked obsolete at this point.
2217 CompactionJobInfo info
{};
2218 info
.input_file_infos
.emplace_back(CompactionFileInfo
{1, 7, 2});
2219 info
.input_file_infos
.emplace_back(CompactionFileInfo
{2, 22, 5});
2220 info
.output_file_infos
.emplace_back(CompactionFileInfo
{2, 25, 3});
2222 blob_db_impl()->TEST_ProcessCompactionJobInfo(info
);
2224 const std::vector
<std::unordered_set
<uint64_t>> expected_sst_files
{
2225 {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
2226 const std::vector
<bool> expected_obsolete
{true, false, false, false, false};
2227 for (size_t i
= 0; i
< 5; ++i
) {
2228 const auto &blob_file
= blob_files
[i
];
2229 ASSERT_EQ(blob_file
->GetLinkedSstFiles(), expected_sst_files
[i
]);
2230 ASSERT_EQ(blob_file
->Obsolete(), expected_obsolete
[i
]);
2233 auto live_imm_files
= blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2234 ASSERT_EQ(live_imm_files
.size(), 4);
2235 for (size_t i
= 0; i
< 4; ++i
) {
2236 ASSERT_EQ(live_imm_files
[i
]->BlobFileNumber(), i
+ 2);
2239 auto obsolete_files
= blob_db_impl()->TEST_GetObsoleteFiles();
2240 ASSERT_EQ(obsolete_files
.size(), 1);
2241 ASSERT_EQ(obsolete_files
[0]->BlobFileNumber(), 1);
2244 // Simulate a flush with largest sequence number 300. This will make it
2245 // possible to mark blob file 2 obsolete.
2247 FlushJobInfo info
{};
2248 info
.file_number
= 26;
2249 info
.smallest_seqno
= 201;
2250 info
.largest_seqno
= 300;
2252 blob_db_impl()->TEST_ProcessFlushJobInfo(info
);
2254 const std::vector
<std::unordered_set
<uint64_t>> expected_sst_files
{
2255 {}, {}, {3, 8, 23, 25}, {4, 9}, {5, 10}};
2256 const std::vector
<bool> expected_obsolete
{true, true, false, false, false};
2257 for (size_t i
= 0; i
< 5; ++i
) {
2258 const auto &blob_file
= blob_files
[i
];
2259 ASSERT_EQ(blob_file
->GetLinkedSstFiles(), expected_sst_files
[i
]);
2260 ASSERT_EQ(blob_file
->Obsolete(), expected_obsolete
[i
]);
2263 auto live_imm_files
= blob_db_impl()->TEST_GetLiveImmNonTTLFiles();
2264 ASSERT_EQ(live_imm_files
.size(), 3);
2265 for (size_t i
= 0; i
< 3; ++i
) {
2266 ASSERT_EQ(live_imm_files
[i
]->BlobFileNumber(), i
+ 3);
2269 auto obsolete_files
= blob_db_impl()->TEST_GetObsoleteFiles();
2270 ASSERT_EQ(obsolete_files
.size(), 2);
2271 ASSERT_EQ(obsolete_files
[0]->BlobFileNumber(), 1);
2272 ASSERT_EQ(obsolete_files
[1]->BlobFileNumber(), 2);
2276 TEST_F(BlobDBTest
, ShutdownWait
) {
2277 BlobDBOptions bdb_options
;
2278 bdb_options
.ttl_range_secs
= 100;
2279 bdb_options
.min_blob_size
= 0;
2280 bdb_options
.disable_background_tasks
= false;
2282 options
.env
= mock_env_
.get();
2284 SyncPoint::GetInstance()->LoadDependency({
2285 {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
2286 {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
2287 {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
2288 {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
2290 // Force all tasks to be scheduled immediately.
2291 SyncPoint::GetInstance()->SetCallBack(
2292 "TimeQueue::Add:item.end", [&](void *arg
) {
2293 std::chrono::steady_clock::time_point
*tp
=
2294 static_cast<std::chrono::steady_clock::time_point
*>(arg
);
2296 std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
2299 SyncPoint::GetInstance()->SetCallBack(
2300 "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
2301 // Sleep 3 ms to increase the chance of data race.
2302 // We've synced up the code so that EvictExpiredFiles()
2303 // is called concurrently with ~BlobDBImpl().
2304 // ~BlobDBImpl() is supposed to wait for all background
2305 // task to shutdown before doing anything else. In order
2306 // to use the same test to reproduce a bug of the waiting
2307 // logic, we wait a little bit here, so that TSAN can
2308 // catch the data race.
2309 // We should improve the test if we find a better way.
2310 Env::Default()->SleepForMicroseconds(3000);
2313 SyncPoint::GetInstance()->EnableProcessing();
2315 Open(bdb_options
, options
);
2316 mock_clock_
->SetCurrentTime(50);
2317 std::map
<std::string
, std::string
> data
;
2318 ASSERT_OK(PutWithTTL("foo", "bar", 100, &data
));
2319 auto blob_files
= blob_db_impl()->TEST_GetBlobFiles();
2320 ASSERT_EQ(1, blob_files
.size());
2321 auto blob_file
= blob_files
[0];
2322 ASSERT_FALSE(blob_file
->Immutable());
2323 ASSERT_FALSE(blob_file
->Obsolete());
2326 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
2327 mock_clock_
->SetCurrentTime(250);
2328 // The key should expired now.
2329 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
2331 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
2332 TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
2335 SyncPoint::GetInstance()->DisableProcessing();
2338 TEST_F(BlobDBTest
, SyncBlobFileBeforeClose
) {
2340 options
.statistics
= CreateDBStatistics();
2342 BlobDBOptions blob_options
;
2343 blob_options
.min_blob_size
= 0;
2344 blob_options
.bytes_per_sync
= 1 << 20;
2345 blob_options
.disable_background_tasks
= true;
2347 Open(blob_options
, options
);
2349 ASSERT_OK(Put("foo", "bar"));
2351 auto blob_files
= blob_db_impl()->TEST_GetBlobFiles();
2352 ASSERT_EQ(blob_files
.size(), 1);
2354 ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files
[0]));
2355 ASSERT_EQ(options
.statistics
->getTickerCount(BLOB_DB_BLOB_FILE_SYNCED
), 1);
2358 TEST_F(BlobDBTest
, SyncBlobFileBeforeCloseIOError
) {
2360 options
.env
= fault_injection_env_
.get();
2362 BlobDBOptions blob_options
;
2363 blob_options
.min_blob_size
= 0;
2364 blob_options
.bytes_per_sync
= 1 << 20;
2365 blob_options
.disable_background_tasks
= true;
2367 Open(blob_options
, options
);
2369 ASSERT_OK(Put("foo", "bar"));
2371 auto blob_files
= blob_db_impl()->TEST_GetBlobFiles();
2372 ASSERT_EQ(blob_files
.size(), 1);
2374 SyncPoint::GetInstance()->SetCallBack(
2375 "BlobLogWriter::Sync", [this](void * /* arg */) {
2376 fault_injection_env_
->SetFilesystemActive(false, Status::IOError());
2378 SyncPoint::GetInstance()->EnableProcessing();
2380 const Status s
= blob_db_impl()->TEST_CloseBlobFile(blob_files
[0]);
2382 fault_injection_env_
->SetFilesystemActive(true);
2383 SyncPoint::GetInstance()->DisableProcessing();
2384 SyncPoint::GetInstance()->ClearAllCallBacks();
2386 ASSERT_TRUE(s
.IsIOError());
2389 } // namespace blob_db
2390 } // namespace ROCKSDB_NAMESPACE
2392 // A black-box test for the ttl wrapper around rocksdb
2393 int main(int argc
, char **argv
) {
2394 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
2395 ::testing::InitGoogleTest(&argc
, argv
);
2396 return RUN_ALL_TESTS();
2402 int main(int /*argc*/, char** /*argv*/) {
2403 fprintf(stderr
, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n");
2407 #endif // !ROCKSDB_LITE