2 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
3 // This source code is licensed under both the GPLv2 (found in the
4 // COPYING file in the root directory) and Apache 2.0 License
5 // (found in the LICENSE.Apache file in the root directory).
8 #include "utilities/blob_db/blob_db_impl.h"
15 #include "db/blob/blob_index.h"
16 #include "db/db_impl/db_impl.h"
17 #include "db/write_batch_internal.h"
18 #include "env/composite_env_wrapper.h"
19 #include "file/file_util.h"
20 #include "file/filename.h"
21 #include "file/random_access_file_reader.h"
22 #include "file/sst_file_manager_impl.h"
23 #include "file/writable_file_writer.h"
24 #include "logging/logging.h"
25 #include "monitoring/instrumented_mutex.h"
26 #include "monitoring/statistics.h"
27 #include "rocksdb/convenience.h"
28 #include "rocksdb/env.h"
29 #include "rocksdb/iterator.h"
30 #include "rocksdb/utilities/stackable_db.h"
31 #include "rocksdb/utilities/transaction.h"
32 #include "table/block_based/block.h"
33 #include "table/block_based/block_based_table_builder.h"
34 #include "table/block_based/block_builder.h"
35 #include "table/meta_blocks.h"
36 #include "test_util/sync_point.h"
37 #include "util/cast_util.h"
38 #include "util/crc32c.h"
39 #include "util/mutexlock.h"
40 #include "util/random.h"
41 #include "util/stop_watch.h"
42 #include "util/timer_queue.h"
43 #include "utilities/blob_db/blob_compaction_filter.h"
44 #include "utilities/blob_db/blob_db_iterator.h"
45 #include "utilities/blob_db/blob_db_listener.h"
48 int kBlockBasedTableVersionFormat
= 2;
51 namespace ROCKSDB_NAMESPACE
{
54 bool BlobFileComparator::operator()(
55 const std::shared_ptr
<BlobFile
>& lhs
,
56 const std::shared_ptr
<BlobFile
>& rhs
) const {
57 return lhs
->BlobFileNumber() > rhs
->BlobFileNumber();
60 bool BlobFileComparatorTTL::operator()(
61 const std::shared_ptr
<BlobFile
>& lhs
,
62 const std::shared_ptr
<BlobFile
>& rhs
) const {
63 assert(lhs
->HasTTL() && rhs
->HasTTL());
64 if (lhs
->expiration_range_
.first
< rhs
->expiration_range_
.first
) {
67 if (lhs
->expiration_range_
.first
> rhs
->expiration_range_
.first
) {
70 return lhs
->BlobFileNumber() < rhs
->BlobFileNumber();
73 BlobDBImpl::BlobDBImpl(const std::string
& dbname
,
74 const BlobDBOptions
& blob_db_options
,
75 const DBOptions
& db_options
,
76 const ColumnFamilyOptions
& cf_options
)
81 bdb_options_(blob_db_options
),
82 db_options_(db_options
),
83 cf_options_(cf_options
),
84 env_options_(db_options
),
85 statistics_(db_options_
.statistics
.get()),
92 fifo_eviction_seq_(0),
93 evict_expiration_up_to_(0),
95 blob_dir_
= (bdb_options_
.path_relative
)
96 ? dbname
+ "/" + bdb_options_
.blob_dir
97 : bdb_options_
.blob_dir
;
98 env_options_
.bytes_per_sync
= blob_db_options
.bytes_per_sync
;
101 BlobDBImpl::~BlobDBImpl() {
103 // CancelAllBackgroundWork(db_, true);
104 Status s
__attribute__((__unused__
)) = Close();
108 Status
BlobDBImpl::Close() {
114 // Close base DB before BlobDBImpl destructs to stop event listener and
115 // compaction filter call.
116 Status s
= db_
->Close();
117 // delete db_ anyway even if close failed.
119 // Reset pointers to avoid StackableDB delete the pointer again.
130 BlobDBOptions
BlobDBImpl::GetBlobDBOptions() const { return bdb_options_
; }
132 Status
BlobDBImpl::Open(std::vector
<ColumnFamilyHandle
*>* handles
) {
133 assert(handles
!= nullptr);
134 assert(db_
== nullptr);
136 if (blob_dir_
.empty()) {
137 return Status::NotSupported("No blob directory in options");
140 if (bdb_options_
.garbage_collection_cutoff
< 0.0 ||
141 bdb_options_
.garbage_collection_cutoff
> 1.0) {
142 return Status::InvalidArgument(
143 "Garbage collection cutoff must be in the interval [0.0, 1.0]");
146 // Temporarily disable compactions in the base DB during open; save the user
147 // defined value beforehand so we can restore it once BlobDB is initialized.
148 // Note: this is only needed if garbage collection is enabled.
149 const bool disable_auto_compactions
= cf_options_
.disable_auto_compactions
;
151 if (bdb_options_
.enable_garbage_collection
) {
152 cf_options_
.disable_auto_compactions
= true;
158 if (db_options_
.info_log
== nullptr) {
159 s
= CreateLoggerFromOptions(dbname_
, db_options_
, &db_options_
.info_log
);
165 ROCKS_LOG_INFO(db_options_
.info_log
, "Opening BlobDB...");
167 if ((cf_options_
.compaction_filter
!= nullptr ||
168 cf_options_
.compaction_filter_factory
!= nullptr)) {
169 ROCKS_LOG_INFO(db_options_
.info_log
,
170 "BlobDB only support compaction filter on non-TTL values.");
173 // Open blob directory.
174 s
= env_
->CreateDirIfMissing(blob_dir_
);
176 ROCKS_LOG_ERROR(db_options_
.info_log
,
177 "Failed to create blob_dir %s, status: %s",
178 blob_dir_
.c_str(), s
.ToString().c_str());
180 s
= env_
->NewDirectory(blob_dir_
, &dir_ent_
);
182 ROCKS_LOG_ERROR(db_options_
.info_log
,
183 "Failed to open blob_dir %s, status: %s", blob_dir_
.c_str(),
184 s
.ToString().c_str());
189 s
= OpenAllBlobFiles();
195 if (bdb_options_
.enable_garbage_collection
) {
196 db_options_
.listeners
.push_back(std::make_shared
<BlobDBListenerGC
>(this));
197 cf_options_
.compaction_filter_factory
=
198 std::make_shared
<BlobIndexCompactionFilterFactoryGC
>(
199 this, env_
, cf_options_
, statistics_
);
201 db_options_
.listeners
.push_back(std::make_shared
<BlobDBListener
>(this));
202 cf_options_
.compaction_filter_factory
=
203 std::make_shared
<BlobIndexCompactionFilterFactory
>(
204 this, env_
, cf_options_
, statistics_
);
207 // Reset user compaction filter after building into compaction factory.
208 cf_options_
.compaction_filter
= nullptr;
211 ColumnFamilyDescriptor
cf_descriptor(kDefaultColumnFamilyName
, cf_options_
);
212 s
= DB::Open(db_options_
, dbname_
, {cf_descriptor
}, handles
, &db_
);
216 db_impl_
= static_cast_with_check
<DBImpl
>(db_
->GetRootDB());
218 // Sanitize the blob_dir provided. Using a directory where the
219 // base DB stores its files for the default CF is not supported.
220 const ColumnFamilyData
* const cfd
=
221 static_cast<ColumnFamilyHandleImpl
*>(DefaultColumnFamily())->cfd();
224 const ImmutableCFOptions
* const ioptions
= cfd
->ioptions();
229 for (const auto& cf_path
: ioptions
->cf_paths
) {
230 bool blob_dir_same_as_cf_dir
= false;
231 s
= env_
->AreFilesSame(blob_dir_
, cf_path
.path
, &blob_dir_same_as_cf_dir
);
233 ROCKS_LOG_ERROR(db_options_
.info_log
,
234 "Error while sanitizing blob_dir %s, status: %s",
235 blob_dir_
.c_str(), s
.ToString().c_str());
239 if (blob_dir_same_as_cf_dir
) {
240 return Status::NotSupported(
241 "Using the base DB's storage directories for BlobDB files is not "
246 // Initialize SST file <-> oldest blob file mapping if garbage collection
248 if (bdb_options_
.enable_garbage_collection
) {
249 std::vector
<LiveFileMetaData
> live_files
;
250 db_
->GetLiveFilesMetaData(&live_files
);
252 InitializeBlobFileToSstMapping(live_files
);
254 MarkUnreferencedBlobFilesObsoleteDuringOpen();
256 if (!disable_auto_compactions
) {
257 s
= db_
->EnableAutoCompaction(*handles
);
260 db_options_
.info_log
,
261 "Failed to enable automatic compactions during open, status: %s",
262 s
.ToString().c_str());
268 // Add trash files in blob dir to file delete scheduler.
269 SstFileManagerImpl
* sfm
= static_cast<SstFileManagerImpl
*>(
270 db_impl_
->immutable_db_options().sst_file_manager
.get());
271 DeleteScheduler::CleanupDirectory(env_
, sfm
, blob_dir_
);
275 // Start background jobs.
276 if (!bdb_options_
.disable_background_tasks
) {
277 StartBackgroundTasks();
280 ROCKS_LOG_INFO(db_options_
.info_log
, "BlobDB pointer %p", this);
281 bdb_options_
.Dump(db_options_
.info_log
.get());
286 void BlobDBImpl::StartBackgroundTasks() {
287 // store a call to a member function and object
289 kReclaimOpenFilesPeriodMillisecs
,
290 std::bind(&BlobDBImpl::ReclaimOpenFiles
, this, std::placeholders::_1
));
292 kDeleteObsoleteFilesPeriodMillisecs
,
293 std::bind(&BlobDBImpl::DeleteObsoleteFiles
, this, std::placeholders::_1
));
294 tqueue_
.add(kSanityCheckPeriodMillisecs
,
295 std::bind(&BlobDBImpl::SanityCheck
, this, std::placeholders::_1
));
297 kEvictExpiredFilesPeriodMillisecs
,
298 std::bind(&BlobDBImpl::EvictExpiredFiles
, this, std::placeholders::_1
));
301 Status
BlobDBImpl::GetAllBlobFiles(std::set
<uint64_t>* file_numbers
) {
302 assert(file_numbers
!= nullptr);
303 std::vector
<std::string
> all_files
;
304 Status s
= env_
->GetChildren(blob_dir_
, &all_files
);
306 ROCKS_LOG_ERROR(db_options_
.info_log
,
307 "Failed to get list of blob files, status: %s",
308 s
.ToString().c_str());
312 for (const auto& file_name
: all_files
) {
313 uint64_t file_number
;
315 bool success
= ParseFileName(file_name
, &file_number
, &type
);
316 if (success
&& type
== kBlobFile
) {
317 file_numbers
->insert(file_number
);
319 ROCKS_LOG_WARN(db_options_
.info_log
,
320 "Skipping file in blob directory: %s", file_name
.c_str());
327 Status
BlobDBImpl::OpenAllBlobFiles() {
328 std::set
<uint64_t> file_numbers
;
329 Status s
= GetAllBlobFiles(&file_numbers
);
334 if (!file_numbers
.empty()) {
335 next_file_number_
.store(*file_numbers
.rbegin() + 1);
338 std::ostringstream blob_file_oss
;
339 std::ostringstream live_imm_oss
;
340 std::ostringstream obsolete_file_oss
;
342 for (auto& file_number
: file_numbers
) {
343 std::shared_ptr
<BlobFile
> blob_file
= std::make_shared
<BlobFile
>(
344 this, blob_dir_
, file_number
, db_options_
.info_log
.get());
345 blob_file
->MarkImmutable(/* sequence */ 0);
347 // Read file header and footer
348 Status read_metadata_status
= blob_file
->ReadMetadata(env_
, env_options_
);
349 if (read_metadata_status
.IsCorruption()) {
350 // Remove incomplete file.
351 if (!obsolete_files_
.empty()) {
352 obsolete_file_oss
<< ", ";
354 obsolete_file_oss
<< file_number
;
356 ObsoleteBlobFile(blob_file
, 0 /*obsolete_seq*/, false /*update_size*/);
358 } else if (!read_metadata_status
.ok()) {
359 ROCKS_LOG_ERROR(db_options_
.info_log
,
360 "Unable to read metadata of blob file %" PRIu64
362 file_number
, read_metadata_status
.ToString().c_str());
363 return read_metadata_status
;
366 total_blob_size_
+= blob_file
->GetFileSize();
368 if (!blob_files_
.empty()) {
369 blob_file_oss
<< ", ";
371 blob_file_oss
<< file_number
;
373 blob_files_
[file_number
] = blob_file
;
375 if (!blob_file
->HasTTL()) {
376 if (!live_imm_non_ttl_blob_files_
.empty()) {
377 live_imm_oss
<< ", ";
379 live_imm_oss
<< file_number
;
381 live_imm_non_ttl_blob_files_
[file_number
] = blob_file
;
385 ROCKS_LOG_INFO(db_options_
.info_log
,
386 "Found %" ROCKSDB_PRIszt
" blob files: %s", blob_files_
.size(),
387 blob_file_oss
.str().c_str());
389 db_options_
.info_log
, "Found %" ROCKSDB_PRIszt
" non-TTL blob files: %s",
390 live_imm_non_ttl_blob_files_
.size(), live_imm_oss
.str().c_str());
391 ROCKS_LOG_INFO(db_options_
.info_log
,
392 "Found %" ROCKSDB_PRIszt
393 " incomplete or corrupted blob files: %s",
394 obsolete_files_
.size(), obsolete_file_oss
.str().c_str());
398 template <typename Linker
>
399 void BlobDBImpl::LinkSstToBlobFileImpl(uint64_t sst_file_number
,
400 uint64_t blob_file_number
,
402 assert(bdb_options_
.enable_garbage_collection
);
403 assert(blob_file_number
!= kInvalidBlobFileNumber
);
405 auto it
= blob_files_
.find(blob_file_number
);
406 if (it
== blob_files_
.end()) {
407 ROCKS_LOG_WARN(db_options_
.info_log
,
409 " not found while trying to link "
411 blob_file_number
, sst_file_number
);
415 BlobFile
* const blob_file
= it
->second
.get();
418 linker(blob_file
, sst_file_number
);
420 ROCKS_LOG_INFO(db_options_
.info_log
,
421 "Blob file %" PRIu64
" linked to SST file %" PRIu64
,
422 blob_file_number
, sst_file_number
);
425 void BlobDBImpl::LinkSstToBlobFile(uint64_t sst_file_number
,
426 uint64_t blob_file_number
) {
427 auto linker
= [](BlobFile
* blob_file
, uint64_t sst_file
) {
428 WriteLock
file_lock(&blob_file
->mutex_
);
429 blob_file
->LinkSstFile(sst_file
);
432 LinkSstToBlobFileImpl(sst_file_number
, blob_file_number
, linker
);
435 void BlobDBImpl::LinkSstToBlobFileNoLock(uint64_t sst_file_number
,
436 uint64_t blob_file_number
) {
437 auto linker
= [](BlobFile
* blob_file
, uint64_t sst_file
) {
438 blob_file
->LinkSstFile(sst_file
);
441 LinkSstToBlobFileImpl(sst_file_number
, blob_file_number
, linker
);
444 void BlobDBImpl::UnlinkSstFromBlobFile(uint64_t sst_file_number
,
445 uint64_t blob_file_number
) {
446 assert(bdb_options_
.enable_garbage_collection
);
447 assert(blob_file_number
!= kInvalidBlobFileNumber
);
449 auto it
= blob_files_
.find(blob_file_number
);
450 if (it
== blob_files_
.end()) {
451 ROCKS_LOG_WARN(db_options_
.info_log
,
453 " not found while trying to unlink "
455 blob_file_number
, sst_file_number
);
459 BlobFile
* const blob_file
= it
->second
.get();
463 WriteLock
file_lock(&blob_file
->mutex_
);
464 blob_file
->UnlinkSstFile(sst_file_number
);
467 ROCKS_LOG_INFO(db_options_
.info_log
,
468 "Blob file %" PRIu64
" unlinked from SST file %" PRIu64
,
469 blob_file_number
, sst_file_number
);
472 void BlobDBImpl::InitializeBlobFileToSstMapping(
473 const std::vector
<LiveFileMetaData
>& live_files
) {
474 assert(bdb_options_
.enable_garbage_collection
);
476 for (const auto& live_file
: live_files
) {
477 const uint64_t sst_file_number
= live_file
.file_number
;
478 const uint64_t blob_file_number
= live_file
.oldest_blob_file_number
;
480 if (blob_file_number
== kInvalidBlobFileNumber
) {
484 LinkSstToBlobFileNoLock(sst_file_number
, blob_file_number
);
488 void BlobDBImpl::ProcessFlushJobInfo(const FlushJobInfo
& info
) {
489 assert(bdb_options_
.enable_garbage_collection
);
491 WriteLock
lock(&mutex_
);
493 if (info
.oldest_blob_file_number
!= kInvalidBlobFileNumber
) {
494 LinkSstToBlobFile(info
.file_number
, info
.oldest_blob_file_number
);
497 assert(flush_sequence_
< info
.largest_seqno
);
498 flush_sequence_
= info
.largest_seqno
;
500 MarkUnreferencedBlobFilesObsolete();
503 void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo
& info
) {
504 assert(bdb_options_
.enable_garbage_collection
);
506 if (!info
.status
.ok()) {
510 // Note: the same SST file may appear in both the input and the output
511 // file list in case of a trivial move. We walk through the two lists
512 // below in a fashion that's similar to merge sort to detect this.
514 auto cmp
= [](const CompactionFileInfo
& lhs
, const CompactionFileInfo
& rhs
) {
515 return lhs
.file_number
< rhs
.file_number
;
518 auto inputs
= info
.input_file_infos
;
519 auto iit
= inputs
.begin();
520 const auto iit_end
= inputs
.end();
522 std::sort(iit
, iit_end
, cmp
);
524 auto outputs
= info
.output_file_infos
;
525 auto oit
= outputs
.begin();
526 const auto oit_end
= outputs
.end();
528 std::sort(oit
, oit_end
, cmp
);
530 WriteLock
lock(&mutex_
);
532 while (iit
!= iit_end
&& oit
!= oit_end
) {
533 const auto& input
= *iit
;
534 const auto& output
= *oit
;
536 if (input
.file_number
== output
.file_number
) {
539 } else if (input
.file_number
< output
.file_number
) {
540 if (input
.oldest_blob_file_number
!= kInvalidBlobFileNumber
) {
541 UnlinkSstFromBlobFile(input
.file_number
, input
.oldest_blob_file_number
);
546 assert(output
.file_number
< input
.file_number
);
548 if (output
.oldest_blob_file_number
!= kInvalidBlobFileNumber
) {
549 LinkSstToBlobFile(output
.file_number
, output
.oldest_blob_file_number
);
556 while (iit
!= iit_end
) {
557 const auto& input
= *iit
;
559 if (input
.oldest_blob_file_number
!= kInvalidBlobFileNumber
) {
560 UnlinkSstFromBlobFile(input
.file_number
, input
.oldest_blob_file_number
);
566 while (oit
!= oit_end
) {
567 const auto& output
= *oit
;
569 if (output
.oldest_blob_file_number
!= kInvalidBlobFileNumber
) {
570 LinkSstToBlobFile(output
.file_number
, output
.oldest_blob_file_number
);
576 MarkUnreferencedBlobFilesObsolete();
579 bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded(
580 const std::shared_ptr
<BlobFile
>& blob_file
, SequenceNumber obsolete_seq
) {
582 assert(!blob_file
->HasTTL());
583 assert(blob_file
->Immutable());
584 assert(bdb_options_
.enable_garbage_collection
);
586 // Note: FIFO eviction could have marked this file obsolete already.
587 if (blob_file
->Obsolete()) {
591 // We cannot mark this file (or any higher-numbered files for that matter)
592 // obsolete if it is referenced by any memtables or SSTs. We keep track of
593 // the SSTs explicitly. To account for memtables, we keep track of the highest
594 // sequence number received in flush notifications, and we do not mark the
595 // blob file obsolete if there are still unflushed memtables from before
596 // the time the blob file was closed.
597 if (blob_file
->GetImmutableSequence() > flush_sequence_
||
598 !blob_file
->GetLinkedSstFiles().empty()) {
602 ROCKS_LOG_INFO(db_options_
.info_log
,
603 "Blob file %" PRIu64
" is no longer needed, marking obsolete",
604 blob_file
->BlobFileNumber());
606 ObsoleteBlobFile(blob_file
, obsolete_seq
, /* update_size */ true);
610 template <class Functor
>
611 void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed
) {
612 assert(bdb_options_
.enable_garbage_collection
);
614 // Iterate through all live immutable non-TTL blob files, and mark them
615 // obsolete assuming no SST files or memtables rely on the blobs in them.
616 // Note: we need to stop as soon as we find a blob file that has any
617 // linked SSTs (or one potentially referenced by memtables).
619 uint64_t obsoleted_files
= 0;
621 auto it
= live_imm_non_ttl_blob_files_
.begin();
622 while (it
!= live_imm_non_ttl_blob_files_
.end()) {
623 const auto& blob_file
= it
->second
;
625 assert(blob_file
->BlobFileNumber() == it
->first
);
626 assert(!blob_file
->HasTTL());
627 assert(blob_file
->Immutable());
629 // Small optimization: Obsolete() does an atomic read, so we can do
630 // this check without taking a lock on the blob file's mutex.
631 if (blob_file
->Obsolete()) {
632 it
= live_imm_non_ttl_blob_files_
.erase(it
);
636 if (!mark_if_needed(blob_file
)) {
640 it
= live_imm_non_ttl_blob_files_
.erase(it
);
645 if (obsoleted_files
> 0) {
646 ROCKS_LOG_INFO(db_options_
.info_log
,
647 "%" PRIu64
" blob file(s) marked obsolete by GC",
649 RecordTick(statistics_
, BLOB_DB_GC_NUM_FILES
, obsoleted_files
);
653 void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() {
654 const SequenceNumber obsolete_seq
= GetLatestSequenceNumber();
656 MarkUnreferencedBlobFilesObsoleteImpl(
657 [this, obsolete_seq
](const std::shared_ptr
<BlobFile
>& blob_file
) {
658 WriteLock
file_lock(&blob_file
->mutex_
);
659 return MarkBlobFileObsoleteIfNeeded(blob_file
, obsolete_seq
);
663 void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() {
664 MarkUnreferencedBlobFilesObsoleteImpl(
665 [this](const std::shared_ptr
<BlobFile
>& blob_file
) {
666 return MarkBlobFileObsoleteIfNeeded(blob_file
, /* obsolete_seq */ 0);
670 void BlobDBImpl::CloseRandomAccessLocked(
671 const std::shared_ptr
<BlobFile
>& bfile
) {
672 bfile
->CloseRandomAccessLocked();
676 Status
BlobDBImpl::GetBlobFileReader(
677 const std::shared_ptr
<BlobFile
>& blob_file
,
678 std::shared_ptr
<RandomAccessFileReader
>* reader
) {
679 assert(reader
!= nullptr);
680 bool fresh_open
= false;
681 Status s
= blob_file
->GetReader(env_
, env_options_
, reader
, &fresh_open
);
682 if (s
.ok() && fresh_open
) {
683 assert(*reader
!= nullptr);
689 std::shared_ptr
<BlobFile
> BlobDBImpl::NewBlobFile(
690 bool has_ttl
, const ExpirationRange
& expiration_range
,
691 const std::string
& reason
) {
692 assert(has_ttl
== (expiration_range
.first
|| expiration_range
.second
));
694 uint64_t file_num
= next_file_number_
++;
696 const uint32_t column_family_id
=
697 static_cast<ColumnFamilyHandleImpl
*>(DefaultColumnFamily())->GetID();
698 auto blob_file
= std::make_shared
<BlobFile
>(
699 this, blob_dir_
, file_num
, db_options_
.info_log
.get(), column_family_id
,
700 bdb_options_
.compression
, has_ttl
, expiration_range
);
702 ROCKS_LOG_DEBUG(db_options_
.info_log
, "New blob file created: %s reason='%s'",
703 blob_file
->PathName().c_str(), reason
.c_str());
704 LogFlush(db_options_
.info_log
);
709 void BlobDBImpl::RegisterBlobFile(std::shared_ptr
<BlobFile
> blob_file
) {
710 const uint64_t blob_file_number
= blob_file
->BlobFileNumber();
712 auto it
= blob_files_
.lower_bound(blob_file_number
);
713 assert(it
== blob_files_
.end() || it
->first
!= blob_file_number
);
715 blob_files_
.insert(it
,
716 std::map
<uint64_t, std::shared_ptr
<BlobFile
>>::value_type(
717 blob_file_number
, std::move(blob_file
)));
720 Status
BlobDBImpl::CreateWriterLocked(const std::shared_ptr
<BlobFile
>& bfile
) {
721 std::string
fpath(bfile
->PathName());
722 std::unique_ptr
<WritableFile
> wfile
;
724 Status s
= env_
->ReopenWritableFile(fpath
, &wfile
, env_options_
);
726 ROCKS_LOG_ERROR(db_options_
.info_log
,
727 "Failed to open blob file for write: %s status: '%s'"
729 fpath
.c_str(), s
.ToString().c_str(),
730 env_
->FileExists(fpath
).ToString().c_str());
734 std::unique_ptr
<WritableFileWriter
> fwriter
;
735 fwriter
.reset(new WritableFileWriter(
736 NewLegacyWritableFileWrapper(std::move(wfile
)), fpath
, env_options_
));
738 uint64_t boffset
= bfile
->GetFileSize();
739 if (debug_level_
>= 2 && boffset
) {
740 ROCKS_LOG_DEBUG(db_options_
.info_log
,
741 "Open blob file: %s with offset: %" PRIu64
, fpath
.c_str(),
745 BlobLogWriter::ElemType et
= BlobLogWriter::kEtNone
;
746 if (bfile
->file_size_
== BlobLogHeader::kSize
) {
747 et
= BlobLogWriter::kEtFileHdr
;
748 } else if (bfile
->file_size_
> BlobLogHeader::kSize
) {
749 et
= BlobLogWriter::kEtRecord
;
750 } else if (bfile
->file_size_
) {
751 ROCKS_LOG_WARN(db_options_
.info_log
,
752 "Open blob file: %s with wrong size: %" PRIu64
,
753 fpath
.c_str(), boffset
);
754 return Status::Corruption("Invalid blob file size");
757 bfile
->log_writer_
= std::make_shared
<BlobLogWriter
>(
758 std::move(fwriter
), env_
, statistics_
, bfile
->file_number_
,
759 db_options_
.use_fsync
, boffset
);
760 bfile
->log_writer_
->last_elem_type_
= et
;
765 std::shared_ptr
<BlobFile
> BlobDBImpl::FindBlobFileLocked(
766 uint64_t expiration
) const {
767 if (open_ttl_files_
.empty()) {
771 std::shared_ptr
<BlobFile
> tmp
= std::make_shared
<BlobFile
>();
772 tmp
->SetHasTTL(true);
773 tmp
->expiration_range_
= std::make_pair(expiration
, 0);
774 tmp
->file_number_
= std::numeric_limits
<uint64_t>::max();
776 auto citr
= open_ttl_files_
.equal_range(tmp
);
777 if (citr
.first
== open_ttl_files_
.end()) {
778 assert(citr
.second
== open_ttl_files_
.end());
780 std::shared_ptr
<BlobFile
> check
= *(open_ttl_files_
.rbegin());
781 return (check
->expiration_range_
.second
<= expiration
) ? nullptr : check
;
784 if (citr
.first
!= citr
.second
) {
785 return *(citr
.first
);
788 auto finditr
= citr
.second
;
789 if (finditr
!= open_ttl_files_
.begin()) {
793 bool b2
= (*finditr
)->expiration_range_
.second
<= expiration
;
794 bool b1
= (*finditr
)->expiration_range_
.first
> expiration
;
796 return (b1
|| b2
) ? nullptr : (*finditr
);
799 Status
BlobDBImpl::CheckOrCreateWriterLocked(
800 const std::shared_ptr
<BlobFile
>& blob_file
,
801 std::shared_ptr
<BlobLogWriter
>* writer
) {
802 assert(writer
!= nullptr);
803 *writer
= blob_file
->GetWriter();
804 if (*writer
!= nullptr) {
807 Status s
= CreateWriterLocked(blob_file
);
809 *writer
= blob_file
->GetWriter();
814 Status
BlobDBImpl::CreateBlobFileAndWriter(
815 bool has_ttl
, const ExpirationRange
& expiration_range
,
816 const std::string
& reason
, std::shared_ptr
<BlobFile
>* blob_file
,
817 std::shared_ptr
<BlobLogWriter
>* writer
) {
818 TEST_SYNC_POINT("BlobDBImpl::CreateBlobFileAndWriter");
819 assert(has_ttl
== (expiration_range
.first
|| expiration_range
.second
));
823 *blob_file
= NewBlobFile(has_ttl
, expiration_range
, reason
);
826 // file not visible, hence no lock
827 Status s
= CheckOrCreateWriterLocked(*blob_file
, writer
);
829 ROCKS_LOG_ERROR(db_options_
.info_log
,
830 "Failed to get writer for blob file: %s, error: %s",
831 (*blob_file
)->PathName().c_str(), s
.ToString().c_str());
837 s
= (*writer
)->WriteHeader((*blob_file
)->header_
);
839 ROCKS_LOG_ERROR(db_options_
.info_log
,
840 "Failed to write header to new blob file: %s"
842 (*blob_file
)->PathName().c_str(), s
.ToString().c_str());
846 (*blob_file
)->SetFileSize(BlobLogHeader::kSize
);
847 total_blob_size_
+= BlobLogHeader::kSize
;
852 Status
BlobDBImpl::SelectBlobFile(std::shared_ptr
<BlobFile
>* blob_file
) {
856 ReadLock
rl(&mutex_
);
858 if (open_non_ttl_file_
) {
859 assert(!open_non_ttl_file_
->Immutable());
860 *blob_file
= open_non_ttl_file_
;
866 WriteLock
wl(&mutex_
);
868 if (open_non_ttl_file_
) {
869 assert(!open_non_ttl_file_
->Immutable());
870 *blob_file
= open_non_ttl_file_
;
874 std::shared_ptr
<BlobLogWriter
> writer
;
875 const Status s
= CreateBlobFileAndWriter(
876 /* has_ttl */ false, ExpirationRange(),
877 /* reason */ "SelectBlobFile", blob_file
, &writer
);
882 RegisterBlobFile(*blob_file
);
883 open_non_ttl_file_
= *blob_file
;
888 Status
BlobDBImpl::SelectBlobFileTTL(uint64_t expiration
,
889 std::shared_ptr
<BlobFile
>* blob_file
) {
891 assert(expiration
!= kNoExpiration
);
894 ReadLock
rl(&mutex_
);
896 *blob_file
= FindBlobFileLocked(expiration
);
897 if (*blob_file
!= nullptr) {
898 assert(!(*blob_file
)->Immutable());
904 WriteLock
wl(&mutex_
);
906 *blob_file
= FindBlobFileLocked(expiration
);
907 if (*blob_file
!= nullptr) {
908 assert(!(*blob_file
)->Immutable());
912 const uint64_t exp_low
=
913 (expiration
/ bdb_options_
.ttl_range_secs
) * bdb_options_
.ttl_range_secs
;
914 const uint64_t exp_high
= exp_low
+ bdb_options_
.ttl_range_secs
;
915 const ExpirationRange
expiration_range(exp_low
, exp_high
);
917 std::ostringstream oss
;
918 oss
<< "SelectBlobFileTTL range: [" << exp_low
<< ',' << exp_high
<< ')';
920 std::shared_ptr
<BlobLogWriter
> writer
;
922 CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range
,
923 /* reason */ oss
.str(), blob_file
, &writer
);
928 RegisterBlobFile(*blob_file
);
929 open_ttl_files_
.insert(*blob_file
);
934 class BlobDBImpl::BlobInserter
: public WriteBatch::Handler
{
936 const WriteOptions
& options_
;
937 BlobDBImpl
* blob_db_impl_
;
938 uint32_t default_cf_id_
;
942 BlobInserter(const WriteOptions
& options
, BlobDBImpl
* blob_db_impl
,
943 uint32_t default_cf_id
)
945 blob_db_impl_(blob_db_impl
),
946 default_cf_id_(default_cf_id
) {}
948 WriteBatch
* batch() { return &batch_
; }
950 Status
PutCF(uint32_t column_family_id
, const Slice
& key
,
951 const Slice
& value
) override
{
952 if (column_family_id
!= default_cf_id_
) {
953 return Status::NotSupported(
954 "Blob DB doesn't support non-default column family.");
956 Status s
= blob_db_impl_
->PutBlobValue(options_
, key
, value
, kNoExpiration
,
961 Status
DeleteCF(uint32_t column_family_id
, const Slice
& key
) override
{
962 if (column_family_id
!= default_cf_id_
) {
963 return Status::NotSupported(
964 "Blob DB doesn't support non-default column family.");
966 Status s
= WriteBatchInternal::Delete(&batch_
, column_family_id
, key
);
970 virtual Status
DeleteRange(uint32_t column_family_id
, const Slice
& begin_key
,
971 const Slice
& end_key
) {
972 if (column_family_id
!= default_cf_id_
) {
973 return Status::NotSupported(
974 "Blob DB doesn't support non-default column family.");
976 Status s
= WriteBatchInternal::DeleteRange(&batch_
, column_family_id
,
981 Status
SingleDeleteCF(uint32_t /*column_family_id*/,
982 const Slice
& /*key*/) override
{
983 return Status::NotSupported("Not supported operation in blob db.");
986 Status
MergeCF(uint32_t /*column_family_id*/, const Slice
& /*key*/,
987 const Slice
& /*value*/) override
{
988 return Status::NotSupported("Not supported operation in blob db.");
991 void LogData(const Slice
& blob
) override
{ batch_
.PutLogData(blob
); }
994 Status
BlobDBImpl::Write(const WriteOptions
& options
, WriteBatch
* updates
) {
995 StopWatch
write_sw(env_
, statistics_
, BLOB_DB_WRITE_MICROS
);
996 RecordTick(statistics_
, BLOB_DB_NUM_WRITE
);
997 uint32_t default_cf_id
=
998 static_cast_with_check
<ColumnFamilyHandleImpl
>(DefaultColumnFamily())
1001 BlobInserter
blob_inserter(options
, this, default_cf_id
);
1003 // Release write_mutex_ before DB write to avoid race condition with
1004 // flush begin listener, which also require write_mutex_ to sync
1006 MutexLock
l(&write_mutex_
);
1007 s
= updates
->Iterate(&blob_inserter
);
1012 return db_
->Write(options
, blob_inserter
.batch());
1015 Status
BlobDBImpl::Put(const WriteOptions
& options
, const Slice
& key
,
1016 const Slice
& value
) {
1017 return PutUntil(options
, key
, value
, kNoExpiration
);
1020 Status
BlobDBImpl::PutWithTTL(const WriteOptions
& options
,
1021 const Slice
& key
, const Slice
& value
,
1023 uint64_t now
= EpochNow();
1024 uint64_t expiration
= kNoExpiration
- now
> ttl
? now
+ ttl
: kNoExpiration
;
1025 return PutUntil(options
, key
, value
, expiration
);
1028 Status
BlobDBImpl::PutUntil(const WriteOptions
& options
, const Slice
& key
,
1029 const Slice
& value
, uint64_t expiration
) {
1030 StopWatch
write_sw(env_
, statistics_
, BLOB_DB_WRITE_MICROS
);
1031 RecordTick(statistics_
, BLOB_DB_NUM_PUT
);
1035 // Release write_mutex_ before DB write to avoid race condition with
1036 // flush begin listener, which also require write_mutex_ to sync
1038 MutexLock
l(&write_mutex_
);
1039 s
= PutBlobValue(options
, key
, value
, expiration
, &batch
);
1042 s
= db_
->Write(options
, &batch
);
1047 Status
BlobDBImpl::PutBlobValue(const WriteOptions
& /*options*/,
1048 const Slice
& key
, const Slice
& value
,
1049 uint64_t expiration
, WriteBatch
* batch
) {
1050 write_mutex_
.AssertHeld();
1052 std::string index_entry
;
1053 uint32_t column_family_id
=
1054 static_cast_with_check
<ColumnFamilyHandleImpl
>(DefaultColumnFamily())
1056 if (value
.size() < bdb_options_
.min_blob_size
) {
1057 if (expiration
== kNoExpiration
) {
1058 // Put as normal value
1059 s
= batch
->Put(key
, value
);
1060 RecordTick(statistics_
, BLOB_DB_WRITE_INLINED
);
1063 BlobIndex::EncodeInlinedTTL(&index_entry
, expiration
, value
);
1064 s
= WriteBatchInternal::PutBlobIndex(batch
, column_family_id
, key
,
1066 RecordTick(statistics_
, BLOB_DB_WRITE_INLINED_TTL
);
1069 std::string compression_output
;
1070 Slice value_compressed
= GetCompressedSlice(value
, &compression_output
);
1072 std::string headerbuf
;
1073 BlobLogWriter::ConstructBlobHeader(&headerbuf
, key
, value_compressed
,
1076 // Check DB size limit before selecting blob file to
1077 // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
1078 // done before calling SelectBlobFile().
1079 s
= CheckSizeAndEvictBlobFiles(headerbuf
.size() + key
.size() +
1080 value_compressed
.size());
1085 std::shared_ptr
<BlobFile
> blob_file
;
1086 if (expiration
!= kNoExpiration
) {
1087 s
= SelectBlobFileTTL(expiration
, &blob_file
);
1089 s
= SelectBlobFile(&blob_file
);
1092 assert(blob_file
!= nullptr);
1093 assert(blob_file
->GetCompressionType() == bdb_options_
.compression
);
1094 s
= AppendBlob(blob_file
, headerbuf
, key
, value_compressed
, expiration
,
1098 if (expiration
!= kNoExpiration
) {
1099 WriteLock
file_lock(&blob_file
->mutex_
);
1100 blob_file
->ExtendExpirationRange(expiration
);
1102 s
= CloseBlobFileIfNeeded(blob_file
);
1105 s
= WriteBatchInternal::PutBlobIndex(batch
, column_family_id
, key
,
1109 if (expiration
== kNoExpiration
) {
1110 RecordTick(statistics_
, BLOB_DB_WRITE_BLOB
);
1112 RecordTick(statistics_
, BLOB_DB_WRITE_BLOB_TTL
);
1116 db_options_
.info_log
,
1117 "Failed to append blob to FILE: %s: KEY: %s VALSZ: %" ROCKSDB_PRIszt
1118 " status: '%s' blob_file: '%s'",
1119 blob_file
->PathName().c_str(), key
.ToString().c_str(), value
.size(),
1120 s
.ToString().c_str(), blob_file
->DumpState().c_str());
1124 RecordTick(statistics_
, BLOB_DB_NUM_KEYS_WRITTEN
);
1125 RecordTick(statistics_
, BLOB_DB_BYTES_WRITTEN
, key
.size() + value
.size());
1126 RecordInHistogram(statistics_
, BLOB_DB_KEY_SIZE
, key
.size());
1127 RecordInHistogram(statistics_
, BLOB_DB_VALUE_SIZE
, value
.size());
1132 Slice
BlobDBImpl::GetCompressedSlice(const Slice
& raw
,
1133 std::string
* compression_output
) const {
1134 if (bdb_options_
.compression
== kNoCompression
) {
1137 StopWatch
compression_sw(env_
, statistics_
, BLOB_DB_COMPRESSION_MICROS
);
1138 CompressionType type
= bdb_options_
.compression
;
1139 CompressionOptions opts
;
1140 CompressionContext
context(type
);
1141 CompressionInfo
info(opts
, context
, CompressionDict::GetEmptyDict(), type
,
1142 0 /* sample_for_compression */);
1143 CompressBlock(raw
, info
, &type
, kBlockBasedTableVersionFormat
, false,
1144 compression_output
, nullptr, nullptr);
1145 return *compression_output
;
1148 Status
BlobDBImpl::DecompressSlice(const Slice
& compressed_value
,
1149 CompressionType compression_type
,
1150 PinnableSlice
* value_output
) const {
1151 assert(compression_type
!= kNoCompression
);
1153 BlockContents contents
;
1154 auto cfh
= static_cast<ColumnFamilyHandleImpl
*>(DefaultColumnFamily());
1157 StopWatch
decompression_sw(env_
, statistics_
, BLOB_DB_DECOMPRESSION_MICROS
);
1158 UncompressionContext
context(compression_type
);
1159 UncompressionInfo
info(context
, UncompressionDict::GetEmptyDict(),
1161 Status s
= UncompressBlockContentsForCompressionType(
1162 info
, compressed_value
.data(), compressed_value
.size(), &contents
,
1163 kBlockBasedTableVersionFormat
, *(cfh
->cfd()->ioptions()));
1165 return Status::Corruption("Unable to decompress blob.");
1169 value_output
->PinSelf(contents
.data
);
1171 return Status::OK();
1174 Status
BlobDBImpl::CompactFiles(
1175 const CompactionOptions
& compact_options
,
1176 const std::vector
<std::string
>& input_file_names
, const int output_level
,
1177 const int output_path_id
, std::vector
<std::string
>* const output_file_names
,
1178 CompactionJobInfo
* compaction_job_info
) {
1179 // Note: we need CompactionJobInfo to be able to track updates to the
1180 // blob file <-> SST mappings, so we provide one if the user hasn't,
1181 // assuming that GC is enabled.
1182 CompactionJobInfo info
{};
1183 if (bdb_options_
.enable_garbage_collection
&& !compaction_job_info
) {
1184 compaction_job_info
= &info
;
1188 db_
->CompactFiles(compact_options
, input_file_names
, output_level
,
1189 output_path_id
, output_file_names
, compaction_job_info
);
1194 if (bdb_options_
.enable_garbage_collection
) {
1195 assert(compaction_job_info
);
1196 ProcessCompactionJobInfo(*compaction_job_info
);
1202 void BlobDBImpl::GetCompactionContextCommon(BlobCompactionContext
* context
) {
1205 context
->blob_db_impl
= this;
1206 context
->next_file_number
= next_file_number_
.load();
1207 context
->current_blob_files
.clear();
1208 for (auto& p
: blob_files_
) {
1209 context
->current_blob_files
.insert(p
.first
);
1211 context
->fifo_eviction_seq
= fifo_eviction_seq_
;
1212 context
->evict_expiration_up_to
= evict_expiration_up_to_
;
1215 void BlobDBImpl::GetCompactionContext(BlobCompactionContext
* context
) {
1218 ReadLock
l(&mutex_
);
1219 GetCompactionContextCommon(context
);
1222 void BlobDBImpl::GetCompactionContext(BlobCompactionContext
* context
,
1223 BlobCompactionContextGC
* context_gc
) {
1227 ReadLock
l(&mutex_
);
1228 GetCompactionContextCommon(context
);
1230 if (!live_imm_non_ttl_blob_files_
.empty()) {
1231 auto it
= live_imm_non_ttl_blob_files_
.begin();
1232 std::advance(it
, bdb_options_
.garbage_collection_cutoff
*
1233 live_imm_non_ttl_blob_files_
.size());
1234 context_gc
->cutoff_file_number
= it
!= live_imm_non_ttl_blob_files_
.end()
1236 : std::numeric_limits
<uint64_t>::max();
1240 void BlobDBImpl::UpdateLiveSSTSize() {
1241 uint64_t live_sst_size
= 0;
1242 bool ok
= GetIntProperty(DB::Properties::kLiveSstFilesSize
, &live_sst_size
);
1244 live_sst_size_
.store(live_sst_size
);
1245 ROCKS_LOG_INFO(db_options_
.info_log
,
1246 "Updated total SST file size: %" PRIu64
" bytes.",
1250 db_options_
.info_log
,
1251 "Failed to update total SST file size after flush or compaction.");
1254 // Trigger FIFO eviction if needed.
1255 MutexLock
l(&write_mutex_
);
1256 Status s
= CheckSizeAndEvictBlobFiles(0, true /*force*/);
1257 if (s
.IsNoSpace()) {
1258 ROCKS_LOG_WARN(db_options_
.info_log
,
1259 "DB grow out-of-space after SST size updated. Current live"
1260 " SST size: %" PRIu64
1261 " , current blob files size: %" PRIu64
".",
1262 live_sst_size_
.load(), total_blob_size_
.load());
1267 Status
BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size
,
1269 write_mutex_
.AssertHeld();
1271 uint64_t live_sst_size
= live_sst_size_
.load();
1272 if (bdb_options_
.max_db_size
== 0 ||
1273 live_sst_size
+ total_blob_size_
.load() + blob_size
<=
1274 bdb_options_
.max_db_size
) {
1275 return Status::OK();
1278 if (bdb_options_
.is_fifo
== false ||
1279 (!force_evict
&& live_sst_size
+ blob_size
> bdb_options_
.max_db_size
)) {
1280 // FIFO eviction is disabled, or no space to insert new blob even we evict
1282 return Status::NoSpace(
1283 "Write failed, as writing it would exceed max_db_size limit.");
1286 std::vector
<std::shared_ptr
<BlobFile
>> candidate_files
;
1287 CopyBlobFiles(&candidate_files
);
1288 std::sort(candidate_files
.begin(), candidate_files
.end(),
1289 BlobFileComparator());
1290 fifo_eviction_seq_
= GetLatestSequenceNumber();
1292 WriteLock
l(&mutex_
);
1294 while (!candidate_files
.empty() &&
1295 live_sst_size
+ total_blob_size_
.load() + blob_size
>
1296 bdb_options_
.max_db_size
) {
1297 std::shared_ptr
<BlobFile
> blob_file
= candidate_files
.back();
1298 candidate_files
.pop_back();
1299 WriteLock
file_lock(&blob_file
->mutex_
);
1300 if (blob_file
->Obsolete()) {
1301 // File already obsoleted by someone else.
1302 assert(blob_file
->Immutable());
1305 // FIFO eviction can evict open blob files.
1306 if (!blob_file
->Immutable()) {
1307 Status s
= CloseBlobFile(blob_file
);
1312 assert(blob_file
->Immutable());
1313 auto expiration_range
= blob_file
->GetExpirationRange();
1314 ROCKS_LOG_INFO(db_options_
.info_log
,
1315 "Evict oldest blob file since DB out of space. Current "
1316 "live SST file size: %" PRIu64
", total blob size: %" PRIu64
1317 ", max db size: %" PRIu64
", evicted blob file #%" PRIu64
1319 live_sst_size
, total_blob_size_
.load(),
1320 bdb_options_
.max_db_size
, blob_file
->BlobFileNumber());
1321 ObsoleteBlobFile(blob_file
, fifo_eviction_seq_
, true /*update_size*/);
1322 evict_expiration_up_to_
= expiration_range
.first
;
1323 RecordTick(statistics_
, BLOB_DB_FIFO_NUM_FILES_EVICTED
);
1324 RecordTick(statistics_
, BLOB_DB_FIFO_NUM_KEYS_EVICTED
,
1325 blob_file
->BlobCount());
1326 RecordTick(statistics_
, BLOB_DB_FIFO_BYTES_EVICTED
,
1327 blob_file
->GetFileSize());
1328 TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
1330 if (live_sst_size
+ total_blob_size_
.load() + blob_size
>
1331 bdb_options_
.max_db_size
) {
1332 return Status::NoSpace(
1333 "Write failed, as writing it would exceed max_db_size limit.");
1335 return Status::OK();
1338 Status
BlobDBImpl::AppendBlob(const std::shared_ptr
<BlobFile
>& bfile
,
1339 const std::string
& headerbuf
, const Slice
& key
,
1340 const Slice
& value
, uint64_t expiration
,
1341 std::string
* index_entry
) {
1343 uint64_t blob_offset
= 0;
1344 uint64_t key_offset
= 0;
1346 WriteLock
lockbfile_w(&bfile
->mutex_
);
1347 std::shared_ptr
<BlobLogWriter
> writer
;
1348 s
= CheckOrCreateWriterLocked(bfile
, &writer
);
1353 // write the blob to the blob log.
1354 s
= writer
->EmitPhysicalRecord(headerbuf
, key
, value
, &key_offset
,
1359 ROCKS_LOG_ERROR(db_options_
.info_log
,
1360 "Invalid status in AppendBlob: %s status: '%s'",
1361 bfile
->PathName().c_str(), s
.ToString().c_str());
1365 uint64_t size_put
= headerbuf
.size() + key
.size() + value
.size();
1366 bfile
->BlobRecordAdded(size_put
);
1367 total_blob_size_
+= size_put
;
1369 if (expiration
== kNoExpiration
) {
1370 BlobIndex::EncodeBlob(index_entry
, bfile
->BlobFileNumber(), blob_offset
,
1371 value
.size(), bdb_options_
.compression
);
1373 BlobIndex::EncodeBlobTTL(index_entry
, expiration
, bfile
->BlobFileNumber(),
1374 blob_offset
, value
.size(),
1375 bdb_options_
.compression
);
1381 std::vector
<Status
> BlobDBImpl::MultiGet(
1382 const ReadOptions
& read_options
,
1383 const std::vector
<Slice
>& keys
, std::vector
<std::string
>* values
) {
1384 StopWatch
multiget_sw(env_
, statistics_
, BLOB_DB_MULTIGET_MICROS
);
1385 RecordTick(statistics_
, BLOB_DB_NUM_MULTIGET
);
1386 // Get a snapshot to avoid blob file get deleted between we
1387 // fetch and index entry and reading from the file.
1388 ReadOptions
ro(read_options
);
1389 bool snapshot_created
= SetSnapshotIfNeeded(&ro
);
1391 std::vector
<Status
> statuses
;
1392 statuses
.reserve(keys
.size());
1394 values
->reserve(keys
.size());
1395 PinnableSlice value
;
1396 for (size_t i
= 0; i
< keys
.size(); i
++) {
1397 statuses
.push_back(Get(ro
, DefaultColumnFamily(), keys
[i
], &value
));
1398 values
->push_back(value
.ToString());
1401 if (snapshot_created
) {
1402 db_
->ReleaseSnapshot(ro
.snapshot
);
1407 bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions
* read_options
) {
1408 assert(read_options
!= nullptr);
1409 if (read_options
->snapshot
!= nullptr) {
1412 read_options
->snapshot
= db_
->GetSnapshot();
1416 Status
BlobDBImpl::GetBlobValue(const Slice
& key
, const Slice
& index_entry
,
1417 PinnableSlice
* value
, uint64_t* expiration
) {
1420 BlobIndex blob_index
;
1421 Status s
= blob_index
.DecodeFrom(index_entry
);
1426 if (blob_index
.HasTTL() && blob_index
.expiration() <= EpochNow()) {
1427 return Status::NotFound("Key expired");
1430 if (expiration
!= nullptr) {
1431 if (blob_index
.HasTTL()) {
1432 *expiration
= blob_index
.expiration();
1434 *expiration
= kNoExpiration
;
1438 if (blob_index
.IsInlined()) {
1439 // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
1440 // memory buffer to avoid extra copy.
1441 value
->PinSelf(blob_index
.value());
1442 return Status::OK();
1445 CompressionType compression_type
= kNoCompression
;
1446 s
= GetRawBlobFromFile(key
, blob_index
.file_number(), blob_index
.offset(),
1447 blob_index
.size(), value
, &compression_type
);
1452 if (compression_type
!= kNoCompression
) {
1453 s
= DecompressSlice(*value
, compression_type
, value
);
1455 if (debug_level_
>= 2) {
1457 db_options_
.info_log
,
1458 "Uncompression error during blob read from file: %" PRIu64
1459 " blob_offset: %" PRIu64
" blob_size: %" PRIu64
1460 " key: %s status: '%s'",
1461 blob_index
.file_number(), blob_index
.offset(), blob_index
.size(),
1462 key
.ToString(/* output_hex */ true).c_str(), s
.ToString().c_str());
1468 return Status::OK();
1471 Status
BlobDBImpl::GetRawBlobFromFile(const Slice
& key
, uint64_t file_number
,
1472 uint64_t offset
, uint64_t size
,
1473 PinnableSlice
* value
,
1474 CompressionType
* compression_type
) {
1476 assert(compression_type
);
1477 assert(*compression_type
== kNoCompression
);
1481 return Status::OK();
1484 // offset has to have certain min, as we will read CRC
1485 // later from the Blob Header, which needs to be also a
1488 (BlobLogHeader::kSize
+ BlobLogRecord::kHeaderSize
+ key
.size())) {
1489 if (debug_level_
>= 2) {
1490 ROCKS_LOG_ERROR(db_options_
.info_log
,
1491 "Invalid blob index file_number: %" PRIu64
1492 " blob_offset: %" PRIu64
" blob_size: %" PRIu64
1494 file_number
, offset
, size
,
1495 key
.ToString(/* output_hex */ true).c_str());
1498 return Status::NotFound("Invalid blob offset");
1501 std::shared_ptr
<BlobFile
> blob_file
;
1504 ReadLock
rl(&mutex_
);
1505 auto it
= blob_files_
.find(file_number
);
1508 if (it
== blob_files_
.end()) {
1509 return Status::NotFound("Blob Not Found as blob file missing");
1512 blob_file
= it
->second
;
1515 *compression_type
= blob_file
->GetCompressionType();
1517 // takes locks when called
1518 std::shared_ptr
<RandomAccessFileReader
> reader
;
1519 Status s
= GetBlobFileReader(blob_file
, &reader
);
1524 assert(offset
>= key
.size() + sizeof(uint32_t));
1525 const uint64_t record_offset
= offset
- key
.size() - sizeof(uint32_t);
1526 const uint64_t record_size
= sizeof(uint32_t) + key
.size() + size
;
1528 // Allocate the buffer. This is safe in C++11
1530 AlignedBuf aligned_buf
;
1532 // A partial blob record contain checksum, key and value.
1536 StopWatch
read_sw(env_
, statistics_
, BLOB_DB_BLOB_FILE_READ_MICROS
);
1537 if (reader
->use_direct_io()) {
1538 s
= reader
->Read(IOOptions(), record_offset
,
1539 static_cast<size_t>(record_size
), &blob_record
, nullptr,
1542 buf
.reserve(static_cast<size_t>(record_size
));
1543 s
= reader
->Read(IOOptions(), record_offset
,
1544 static_cast<size_t>(record_size
), &blob_record
, &buf
[0],
1547 RecordTick(statistics_
, BLOB_DB_BLOB_FILE_BYTES_READ
, blob_record
.size());
1552 db_options_
.info_log
,
1553 "Failed to read blob from blob file %" PRIu64
", blob_offset: %" PRIu64
1554 ", blob_size: %" PRIu64
", key_size: %" ROCKSDB_PRIszt
", status: '%s'",
1555 file_number
, offset
, size
, key
.size(), s
.ToString().c_str());
1559 if (blob_record
.size() != record_size
) {
1561 db_options_
.info_log
,
1562 "Failed to read blob from blob file %" PRIu64
", blob_offset: %" PRIu64
1563 ", blob_size: %" PRIu64
", key_size: %" ROCKSDB_PRIszt
1564 ", read %" ROCKSDB_PRIszt
" bytes, expected %" PRIu64
" bytes",
1565 file_number
, offset
, size
, key
.size(), blob_record
.size(), record_size
);
1567 return Status::Corruption("Failed to retrieve blob from blob index.");
1570 Slice
crc_slice(blob_record
.data(), sizeof(uint32_t));
1571 Slice
blob_value(blob_record
.data() + sizeof(uint32_t) + key
.size(),
1572 static_cast<size_t>(size
));
1574 uint32_t crc_exp
= 0;
1575 if (!GetFixed32(&crc_slice
, &crc_exp
)) {
1577 db_options_
.info_log
,
1578 "Unable to decode CRC from blob file %" PRIu64
", blob_offset: %" PRIu64
1579 ", blob_size: %" PRIu64
", key size: %" ROCKSDB_PRIszt
", status: '%s'",
1580 file_number
, offset
, size
, key
.size(), s
.ToString().c_str());
1581 return Status::Corruption("Unable to decode checksum.");
1584 uint32_t crc
= crc32c::Value(blob_record
.data() + sizeof(uint32_t),
1585 blob_record
.size() - sizeof(uint32_t));
1586 crc
= crc32c::Mask(crc
); // Adjust for storage
1587 if (crc
!= crc_exp
) {
1588 if (debug_level_
>= 2) {
1590 db_options_
.info_log
,
1591 "Blob crc mismatch file: %" PRIu64
" blob_offset: %" PRIu64
1592 " blob_size: %" PRIu64
" key: %s status: '%s'",
1593 file_number
, offset
, size
,
1594 key
.ToString(/* output_hex */ true).c_str(), s
.ToString().c_str());
1597 return Status::Corruption("Corruption. Blob CRC mismatch");
1600 value
->PinSelf(blob_value
);
1602 return Status::OK();
1605 Status
BlobDBImpl::Get(const ReadOptions
& read_options
,
1606 ColumnFamilyHandle
* column_family
, const Slice
& key
,
1607 PinnableSlice
* value
) {
1608 return Get(read_options
, column_family
, key
, value
,
1609 static_cast<uint64_t*>(nullptr) /*expiration*/);
1612 Status
BlobDBImpl::Get(const ReadOptions
& read_options
,
1613 ColumnFamilyHandle
* column_family
, const Slice
& key
,
1614 PinnableSlice
* value
, uint64_t* expiration
) {
1615 StopWatch
get_sw(env_
, statistics_
, BLOB_DB_GET_MICROS
);
1616 RecordTick(statistics_
, BLOB_DB_NUM_GET
);
1617 return GetImpl(read_options
, column_family
, key
, value
, expiration
);
1620 Status
BlobDBImpl::GetImpl(const ReadOptions
& read_options
,
1621 ColumnFamilyHandle
* column_family
, const Slice
& key
,
1622 PinnableSlice
* value
, uint64_t* expiration
) {
1623 if (column_family
->GetID() != DefaultColumnFamily()->GetID()) {
1624 return Status::NotSupported(
1625 "Blob DB doesn't support non-default column family.");
1627 // Get a snapshot to avoid blob file get deleted between we
1628 // fetch and index entry and reading from the file.
1629 // TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
1630 ReadOptions
ro(read_options
);
1631 bool snapshot_created
= SetSnapshotIfNeeded(&ro
);
1633 PinnableSlice index_entry
;
1635 bool is_blob_index
= false;
1636 DBImpl::GetImplOptions get_impl_options
;
1637 get_impl_options
.column_family
= column_family
;
1638 get_impl_options
.value
= &index_entry
;
1639 get_impl_options
.is_blob_index
= &is_blob_index
;
1640 s
= db_impl_
->GetImpl(ro
, key
, get_impl_options
);
1641 if (expiration
!= nullptr) {
1642 *expiration
= kNoExpiration
;
1644 RecordTick(statistics_
, BLOB_DB_NUM_KEYS_READ
);
1646 if (is_blob_index
) {
1647 s
= GetBlobValue(key
, index_entry
, value
, expiration
);
1649 // The index entry is the value itself in this case.
1650 value
->PinSelf(index_entry
);
1652 RecordTick(statistics_
, BLOB_DB_BYTES_READ
, value
->size());
1654 if (snapshot_created
) {
1655 db_
->ReleaseSnapshot(ro
.snapshot
);
1660 std::pair
<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted
) {
1662 return std::make_pair(false, -1);
1665 ReadLock
rl(&mutex_
);
1667 ROCKS_LOG_INFO(db_options_
.info_log
, "Starting Sanity Check");
1668 ROCKS_LOG_INFO(db_options_
.info_log
, "Number of files %" ROCKSDB_PRIszt
,
1669 blob_files_
.size());
1670 ROCKS_LOG_INFO(db_options_
.info_log
, "Number of open files %" ROCKSDB_PRIszt
,
1671 open_ttl_files_
.size());
1673 for (const auto& blob_file
: open_ttl_files_
) {
1675 assert(!blob_file
->Immutable());
1678 for (const auto& pair
: live_imm_non_ttl_blob_files_
) {
1679 const auto& blob_file
= pair
.second
;
1681 assert(!blob_file
->HasTTL());
1682 assert(blob_file
->Immutable());
1685 uint64_t now
= EpochNow();
1687 for (auto blob_file_pair
: blob_files_
) {
1688 auto blob_file
= blob_file_pair
.second
;
1690 int pos
= snprintf(buf
, sizeof(buf
),
1691 "Blob file %" PRIu64
", size %" PRIu64
1692 ", blob count %" PRIu64
", immutable %d",
1693 blob_file
->BlobFileNumber(), blob_file
->GetFileSize(),
1694 blob_file
->BlobCount(), blob_file
->Immutable());
1695 if (blob_file
->HasTTL()) {
1696 ExpirationRange expiration_range
;
1699 ReadLock
file_lock(&blob_file
->mutex_
);
1700 expiration_range
= blob_file
->GetExpirationRange();
1703 pos
+= snprintf(buf
+ pos
, sizeof(buf
) - pos
,
1704 ", expiration range (%" PRIu64
", %" PRIu64
")",
1705 expiration_range
.first
, expiration_range
.second
);
1706 if (!blob_file
->Obsolete()) {
1707 pos
+= snprintf(buf
+ pos
, sizeof(buf
) - pos
,
1708 ", expire in %" PRIu64
" seconds",
1709 expiration_range
.second
- now
);
1712 if (blob_file
->Obsolete()) {
1713 pos
+= snprintf(buf
+ pos
, sizeof(buf
) - pos
, ", obsolete at %" PRIu64
,
1714 blob_file
->GetObsoleteSequence());
1716 snprintf(buf
+ pos
, sizeof(buf
) - pos
, ".");
1717 ROCKS_LOG_INFO(db_options_
.info_log
, "%s", buf
);
1721 return std::make_pair(true, -1);
1724 Status
BlobDBImpl::CloseBlobFile(std::shared_ptr
<BlobFile
> bfile
) {
1725 TEST_SYNC_POINT("BlobDBImpl::CloseBlobFile");
1727 assert(!bfile
->Immutable());
1728 assert(!bfile
->Obsolete());
1730 if (bfile
->HasTTL() || bfile
== open_non_ttl_file_
) {
1731 write_mutex_
.AssertHeld();
1734 ROCKS_LOG_INFO(db_options_
.info_log
,
1735 "Closing blob file %" PRIu64
". Path: %s",
1736 bfile
->BlobFileNumber(), bfile
->PathName().c_str());
1738 const SequenceNumber sequence
= GetLatestSequenceNumber();
1740 const Status s
= bfile
->WriteFooterAndCloseLocked(sequence
);
1743 total_blob_size_
+= BlobLogFooter::kSize
;
1745 bfile
->MarkImmutable(sequence
);
1747 ROCKS_LOG_ERROR(db_options_
.info_log
,
1748 "Failed to close blob file %" PRIu64
"with error: %s",
1749 bfile
->BlobFileNumber(), s
.ToString().c_str());
1752 if (bfile
->HasTTL()) {
1753 size_t erased
__attribute__((__unused__
));
1754 erased
= open_ttl_files_
.erase(bfile
);
1756 if (bfile
== open_non_ttl_file_
) {
1757 open_non_ttl_file_
= nullptr;
1760 const uint64_t blob_file_number
= bfile
->BlobFileNumber();
1761 auto it
= live_imm_non_ttl_blob_files_
.lower_bound(blob_file_number
);
1762 assert(it
== live_imm_non_ttl_blob_files_
.end() ||
1763 it
->first
!= blob_file_number
);
1764 live_imm_non_ttl_blob_files_
.insert(
1765 it
, std::map
<uint64_t, std::shared_ptr
<BlobFile
>>::value_type(
1766 blob_file_number
, bfile
));
1772 Status
BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr
<BlobFile
>& bfile
) {
1773 write_mutex_
.AssertHeld();
1776 if (bfile
->GetFileSize() < bdb_options_
.blob_file_size
) {
1777 return Status::OK();
1780 WriteLock
lock(&mutex_
);
1781 WriteLock
file_lock(&bfile
->mutex_
);
1783 assert(!bfile
->Obsolete() || bfile
->Immutable());
1784 if (bfile
->Immutable()) {
1785 return Status::OK();
1788 return CloseBlobFile(bfile
);
1791 void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr
<BlobFile
> blob_file
,
1792 SequenceNumber obsolete_seq
,
1794 assert(blob_file
->Immutable());
1795 assert(!blob_file
->Obsolete());
1797 // Should hold write lock of mutex_ or during DB open.
1798 blob_file
->MarkObsolete(obsolete_seq
);
1799 obsolete_files_
.push_back(blob_file
);
1800 assert(total_blob_size_
.load() >= blob_file
->GetFileSize());
1802 total_blob_size_
-= blob_file
->GetFileSize();
1806 bool BlobDBImpl::VisibleToActiveSnapshot(
1807 const std::shared_ptr
<BlobFile
>& bfile
) {
1808 assert(bfile
->Obsolete());
1810 // We check whether the oldest snapshot is no less than the last sequence
1811 // by the time the blob file become obsolete. If so, the blob file is not
1812 // visible to all existing snapshots.
1814 // If we keep track of the earliest sequence of the keys in the blob file,
1815 // we could instead check if there's a snapshot falls in range
1816 // [earliest_sequence, obsolete_sequence). But doing so will make the
1817 // implementation more complicated.
1818 SequenceNumber obsolete_sequence
= bfile
->GetObsoleteSequence();
1819 SequenceNumber oldest_snapshot
= kMaxSequenceNumber
;
1821 // Need to lock DBImpl mutex before access snapshot list.
1822 InstrumentedMutexLock
l(db_impl_
->mutex());
1823 auto& snapshots
= db_impl_
->snapshots();
1824 if (!snapshots
.empty()) {
1825 oldest_snapshot
= snapshots
.oldest()->GetSequenceNumber();
1828 bool visible
= oldest_snapshot
< obsolete_sequence
;
1830 ROCKS_LOG_INFO(db_options_
.info_log
,
1831 "Obsolete blob file %" PRIu64
" (obsolete at %" PRIu64
1832 ") visible to oldest snapshot %" PRIu64
".",
1833 bfile
->BlobFileNumber(), obsolete_sequence
, oldest_snapshot
);
1838 std::pair
<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted
) {
1840 return std::make_pair(false, -1);
1843 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
1844 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
1846 std::vector
<std::shared_ptr
<BlobFile
>> process_files
;
1847 uint64_t now
= EpochNow();
1849 ReadLock
rl(&mutex_
);
1850 for (auto p
: blob_files_
) {
1851 auto& blob_file
= p
.second
;
1852 ReadLock
file_lock(&blob_file
->mutex_
);
1853 if (blob_file
->HasTTL() && !blob_file
->Obsolete() &&
1854 blob_file
->GetExpirationRange().second
<= now
) {
1855 process_files
.push_back(blob_file
);
1860 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
1861 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
1862 TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
1864 SequenceNumber seq
= GetLatestSequenceNumber();
1866 MutexLock
l(&write_mutex_
);
1867 WriteLock
lock(&mutex_
);
1868 for (auto& blob_file
: process_files
) {
1869 WriteLock
file_lock(&blob_file
->mutex_
);
1871 // Need to double check if the file is obsolete.
1872 if (blob_file
->Obsolete()) {
1873 assert(blob_file
->Immutable());
1877 if (!blob_file
->Immutable()) {
1878 CloseBlobFile(blob_file
);
1881 assert(blob_file
->Immutable());
1883 ObsoleteBlobFile(blob_file
, seq
, true /*update_size*/);
1887 return std::make_pair(true, -1);
1890 Status
BlobDBImpl::SyncBlobFiles() {
1891 MutexLock
l(&write_mutex_
);
1893 std::vector
<std::shared_ptr
<BlobFile
>> process_files
;
1895 ReadLock
rl(&mutex_
);
1896 for (auto fitr
: open_ttl_files_
) {
1897 process_files
.push_back(fitr
);
1899 if (open_non_ttl_file_
!= nullptr) {
1900 process_files
.push_back(open_non_ttl_file_
);
1905 for (auto& blob_file
: process_files
) {
1906 s
= blob_file
->Fsync();
1908 ROCKS_LOG_ERROR(db_options_
.info_log
,
1909 "Failed to sync blob file %" PRIu64
", status: %s",
1910 blob_file
->BlobFileNumber(), s
.ToString().c_str());
1915 s
= dir_ent_
->Fsync();
1917 ROCKS_LOG_ERROR(db_options_
.info_log
,
1918 "Failed to sync blob directory, status: %s",
1919 s
.ToString().c_str());
1924 std::pair
<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted
) {
1925 if (aborted
) return std::make_pair(false, -1);
1927 if (open_file_count_
.load() < kOpenFilesTrigger
) {
1928 return std::make_pair(true, -1);
1931 // in the future, we should sort by last_access_
1932 // instead of closing every file
1933 ReadLock
rl(&mutex_
);
1934 for (auto const& ent
: blob_files_
) {
1935 auto bfile
= ent
.second
;
1936 if (bfile
->last_access_
.load() == -1) continue;
1938 WriteLock
lockbfile_w(&bfile
->mutex_
);
1939 CloseRandomAccessLocked(bfile
);
1942 return std::make_pair(true, -1);
1945 std::pair
<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted
) {
1947 return std::make_pair(false, -1);
1950 MutexLock
delete_file_lock(&delete_file_mutex_
);
1951 if (disable_file_deletions_
> 0) {
1952 return std::make_pair(true, -1);
1955 std::list
<std::shared_ptr
<BlobFile
>> tobsolete
;
1957 WriteLock
wl(&mutex_
);
1958 if (obsolete_files_
.empty()) {
1959 return std::make_pair(true, -1);
1961 tobsolete
.swap(obsolete_files_
);
1964 bool file_deleted
= false;
1965 for (auto iter
= tobsolete
.begin(); iter
!= tobsolete
.end();) {
1968 ReadLock
lockbfile_r(&bfile
->mutex_
);
1969 if (VisibleToActiveSnapshot(bfile
)) {
1970 ROCKS_LOG_INFO(db_options_
.info_log
,
1971 "Could not delete file due to snapshot failure %s",
1972 bfile
->PathName().c_str());
1977 ROCKS_LOG_INFO(db_options_
.info_log
,
1978 "Will delete file due to snapshot success %s",
1979 bfile
->PathName().c_str());
1982 WriteLock
wl(&mutex_
);
1983 blob_files_
.erase(bfile
->BlobFileNumber());
1986 Status s
= DeleteDBFile(&(db_impl_
->immutable_db_options()),
1987 bfile
->PathName(), blob_dir_
, true,
1988 /*force_fg=*/false);
1990 ROCKS_LOG_ERROR(db_options_
.info_log
,
1991 "File failed to be deleted as obsolete %s",
1992 bfile
->PathName().c_str());
1997 file_deleted
= true;
1998 ROCKS_LOG_INFO(db_options_
.info_log
,
1999 "File deleted as obsolete from blob dir %s",
2000 bfile
->PathName().c_str());
2002 iter
= tobsolete
.erase(iter
);
2005 // directory change. Fsync
2007 Status s
= dir_ent_
->Fsync();
2009 ROCKS_LOG_ERROR(db_options_
.info_log
, "Failed to sync dir %s: %s",
2010 blob_dir_
.c_str(), s
.ToString().c_str());
2014 // put files back into obsolete if for some reason, delete failed
2015 if (!tobsolete
.empty()) {
2016 WriteLock
wl(&mutex_
);
2017 for (auto bfile
: tobsolete
) {
2018 blob_files_
.insert(std::make_pair(bfile
->BlobFileNumber(), bfile
));
2019 obsolete_files_
.push_front(bfile
);
2023 return std::make_pair(!aborted
, -1);
2026 void BlobDBImpl::CopyBlobFiles(
2027 std::vector
<std::shared_ptr
<BlobFile
>>* bfiles_copy
) {
2028 ReadLock
rl(&mutex_
);
2029 for (auto const& p
: blob_files_
) {
2030 bfiles_copy
->push_back(p
.second
);
2034 Iterator
* BlobDBImpl::NewIterator(const ReadOptions
& read_options
) {
2036 static_cast_with_check
<ColumnFamilyHandleImpl
>(DefaultColumnFamily())
2038 // Get a snapshot to avoid blob file get deleted between we
2039 // fetch and index entry and reading from the file.
2040 ManagedSnapshot
* own_snapshot
= nullptr;
2041 const Snapshot
* snapshot
= read_options
.snapshot
;
2042 if (snapshot
== nullptr) {
2043 own_snapshot
= new ManagedSnapshot(db_
);
2044 snapshot
= own_snapshot
->snapshot();
2046 auto* iter
= db_impl_
->NewIteratorImpl(
2047 read_options
, cfd
, snapshot
->GetSequenceNumber(),
2048 nullptr /*read_callback*/, true /*allow_blob*/);
2049 return new BlobDBIterator(own_snapshot
, iter
, this, env_
, statistics_
);
2052 Status
DestroyBlobDB(const std::string
& dbname
, const Options
& options
,
2053 const BlobDBOptions
& bdb_options
) {
2054 const ImmutableDBOptions
soptions(SanitizeOptions(dbname
, options
));
2055 Env
* env
= soptions
.env
;
2058 std::string blobdir
;
2059 blobdir
= (bdb_options
.path_relative
) ? dbname
+ "/" + bdb_options
.blob_dir
2060 : bdb_options
.blob_dir
;
2062 std::vector
<std::string
> filenames
;
2063 env
->GetChildren(blobdir
, &filenames
);
2065 for (const auto& f
: filenames
) {
2068 if (ParseFileName(f
, &number
, &type
) && type
== kBlobFile
) {
2069 Status del
= DeleteDBFile(&soptions
, blobdir
+ "/" + f
, blobdir
, true,
2070 /*force_fg=*/false);
2071 if (status
.ok() && !del
.ok()) {
2076 env
->DeleteDir(blobdir
);
2078 Status destroy
= DestroyDB(dbname
, options
);
2079 if (status
.ok() && !destroy
.ok()) {
2087 Status
BlobDBImpl::TEST_GetBlobValue(const Slice
& key
, const Slice
& index_entry
,
2088 PinnableSlice
* value
) {
2089 return GetBlobValue(key
, index_entry
, value
);
2092 void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number
,
2093 SequenceNumber immutable_sequence
) {
2094 auto blob_file
= std::make_shared
<BlobFile
>(this, blob_dir_
, blob_file_number
,
2095 db_options_
.info_log
.get());
2096 blob_file
->MarkImmutable(immutable_sequence
);
2098 blob_files_
[blob_file_number
] = blob_file
;
2099 live_imm_non_ttl_blob_files_
[blob_file_number
] = blob_file
;
2102 std::vector
<std::shared_ptr
<BlobFile
>> BlobDBImpl::TEST_GetBlobFiles() const {
2103 ReadLock
l(&mutex_
);
2104 std::vector
<std::shared_ptr
<BlobFile
>> blob_files
;
2105 for (auto& p
: blob_files_
) {
2106 blob_files
.emplace_back(p
.second
);
2111 std::vector
<std::shared_ptr
<BlobFile
>> BlobDBImpl::TEST_GetLiveImmNonTTLFiles()
2113 ReadLock
l(&mutex_
);
2114 std::vector
<std::shared_ptr
<BlobFile
>> live_imm_non_ttl_files
;
2115 for (const auto& pair
: live_imm_non_ttl_blob_files_
) {
2116 live_imm_non_ttl_files
.emplace_back(pair
.second
);
2118 return live_imm_non_ttl_files
;
2121 std::vector
<std::shared_ptr
<BlobFile
>> BlobDBImpl::TEST_GetObsoleteFiles()
2123 ReadLock
l(&mutex_
);
2124 std::vector
<std::shared_ptr
<BlobFile
>> obsolete_files
;
2125 for (auto& bfile
: obsolete_files_
) {
2126 obsolete_files
.emplace_back(bfile
);
2128 return obsolete_files
;
2131 void BlobDBImpl::TEST_DeleteObsoleteFiles() {
2132 DeleteObsoleteFiles(false /*abort*/);
2135 Status
BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr
<BlobFile
>& bfile
) {
2136 MutexLock
l(&write_mutex_
);
2137 WriteLock
lock(&mutex_
);
2138 WriteLock
file_lock(&bfile
->mutex_
);
2140 return CloseBlobFile(bfile
);
2143 void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr
<BlobFile
>& blob_file
,
2144 SequenceNumber obsolete_seq
,
2146 return ObsoleteBlobFile(blob_file
, obsolete_seq
, update_size
);
2149 void BlobDBImpl::TEST_EvictExpiredFiles() {
2150 EvictExpiredFiles(false /*abort*/);
2153 uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_
.load(); }
2155 void BlobDBImpl::TEST_InitializeBlobFileToSstMapping(
2156 const std::vector
<LiveFileMetaData
>& live_files
) {
2157 InitializeBlobFileToSstMapping(live_files
);
2160 void BlobDBImpl::TEST_ProcessFlushJobInfo(const FlushJobInfo
& info
) {
2161 ProcessFlushJobInfo(info
);
2164 void BlobDBImpl::TEST_ProcessCompactionJobInfo(const CompactionJobInfo
& info
) {
2165 ProcessCompactionJobInfo(info
);
2170 } // namespace blob_db
2171 } // namespace ROCKSDB_NAMESPACE
2172 #endif // ROCKSDB_LITE