1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
7 #include "utilities/blob_db/blob_db_impl.h"
13 #include "db/db_impl.h"
14 #include "db/write_batch_internal.h"
15 #include "monitoring/instrumented_mutex.h"
16 #include "monitoring/statistics.h"
17 #include "rocksdb/convenience.h"
18 #include "rocksdb/env.h"
19 #include "rocksdb/iterator.h"
20 #include "rocksdb/utilities/stackable_db.h"
21 #include "rocksdb/utilities/transaction.h"
22 #include "table/block.h"
23 #include "table/block_based_table_builder.h"
24 #include "table/block_builder.h"
25 #include "table/meta_blocks.h"
26 #include "util/cast_util.h"
27 #include "util/crc32c.h"
28 #include "util/file_reader_writer.h"
29 #include "util/filename.h"
30 #include "util/logging.h"
31 #include "util/mutexlock.h"
32 #include "util/random.h"
33 #include "util/stop_watch.h"
34 #include "util/sync_point.h"
35 #include "util/timer_queue.h"
36 #include "utilities/blob_db/blob_compaction_filter.h"
37 #include "utilities/blob_db/blob_db_iterator.h"
38 #include "utilities/blob_db/blob_db_listener.h"
39 #include "utilities/blob_db/blob_index.h"
42 int kBlockBasedTableVersionFormat
= 2;
48 WalFilter::WalProcessingOption
BlobReconcileWalFilter::LogRecordFound(
49 unsigned long long /*log_number*/, const std::string
& /*log_file_name*/,
50 const WriteBatch
& /*batch*/, WriteBatch
* /*new_batch*/,
51 bool* /*batch_changed*/) {
52 return WalFilter::WalProcessingOption::kContinueProcessing
;
55 bool BlobFileComparator::operator()(
56 const std::shared_ptr
<BlobFile
>& lhs
,
57 const std::shared_ptr
<BlobFile
>& rhs
) const {
58 return lhs
->BlobFileNumber() > rhs
->BlobFileNumber();
61 bool BlobFileComparatorTTL::operator()(
62 const std::shared_ptr
<BlobFile
>& lhs
,
63 const std::shared_ptr
<BlobFile
>& rhs
) const {
64 assert(lhs
->HasTTL() && rhs
->HasTTL());
65 if (lhs
->expiration_range_
.first
< rhs
->expiration_range_
.first
) {
68 if (lhs
->expiration_range_
.first
> rhs
->expiration_range_
.first
) {
71 return lhs
->BlobFileNumber() < rhs
->BlobFileNumber();
74 BlobDBImpl::BlobDBImpl(const std::string
& dbname
,
75 const BlobDBOptions
& blob_db_options
,
76 const DBOptions
& db_options
,
77 const ColumnFamilyOptions
& cf_options
)
82 bdb_options_(blob_db_options
),
83 db_options_(db_options
),
84 cf_options_(cf_options
),
85 env_options_(db_options
),
86 statistics_(db_options_
.statistics
.get()),
93 fifo_eviction_seq_(0),
94 evict_expiration_up_to_(0),
96 blob_dir_
= (bdb_options_
.path_relative
)
97 ? dbname
+ "/" + bdb_options_
.blob_dir
98 : bdb_options_
.blob_dir
;
99 env_options_
.bytes_per_sync
= blob_db_options
.bytes_per_sync
;
102 BlobDBImpl::~BlobDBImpl() {
103 // CancelAllBackgroundWork(db_, true);
104 Status s
__attribute__((__unused__
)) = Close();
108 Status
BlobDBImpl::Close() {
114 // Close base DB before BlobDBImpl destructs to stop event listener and
115 // compaction filter call.
116 Status s
= db_
->Close();
117 // delete db_ anyway even if close failed.
119 // Reset pointers to avoid StackableDB delete the pointer again.
130 BlobDBOptions
BlobDBImpl::GetBlobDBOptions() const { return bdb_options_
; }
132 Status
BlobDBImpl::Open(std::vector
<ColumnFamilyHandle
*>* handles
) {
133 assert(handles
!= nullptr);
134 assert(db_
== nullptr);
135 if (blob_dir_
.empty()) {
136 return Status::NotSupported("No blob directory in options");
138 if (cf_options_
.compaction_filter
!= nullptr ||
139 cf_options_
.compaction_filter_factory
!= nullptr) {
140 return Status::NotSupported("Blob DB doesn't support compaction filter.");
146 if (db_options_
.info_log
== nullptr) {
147 s
= CreateLoggerFromOptions(dbname_
, db_options_
, &db_options_
.info_log
);
153 ROCKS_LOG_INFO(db_options_
.info_log
, "Opening BlobDB...");
155 // Open blob directory.
156 s
= env_
->CreateDirIfMissing(blob_dir_
);
158 ROCKS_LOG_ERROR(db_options_
.info_log
,
159 "Failed to create blob_dir %s, status: %s",
160 blob_dir_
.c_str(), s
.ToString().c_str());
162 s
= env_
->NewDirectory(blob_dir_
, &dir_ent_
);
164 ROCKS_LOG_ERROR(db_options_
.info_log
,
165 "Failed to open blob_dir %s, status: %s", blob_dir_
.c_str(),
166 s
.ToString().c_str());
171 s
= OpenAllBlobFiles();
177 db_options_
.listeners
.push_back(std::make_shared
<BlobDBListener
>(this));
178 cf_options_
.compaction_filter_factory
.reset(
179 new BlobIndexCompactionFilterFactory(this, env_
, statistics_
));
182 ColumnFamilyDescriptor
cf_descriptor(kDefaultColumnFamilyName
, cf_options_
);
183 s
= DB::Open(db_options_
, dbname_
, {cf_descriptor
}, handles
, &db_
);
187 db_impl_
= static_cast_with_check
<DBImpl
, DB
>(db_
->GetRootDB());
190 // Start background jobs.
191 if (!bdb_options_
.disable_background_tasks
) {
192 StartBackgroundTasks();
195 ROCKS_LOG_INFO(db_options_
.info_log
, "BlobDB pointer %p", this);
196 bdb_options_
.Dump(db_options_
.info_log
.get());
201 void BlobDBImpl::StartBackgroundTasks() {
202 // store a call to a member function and object
204 kReclaimOpenFilesPeriodMillisecs
,
205 std::bind(&BlobDBImpl::ReclaimOpenFiles
, this, std::placeholders::_1
));
206 tqueue_
.add(static_cast<int64_t>(
207 bdb_options_
.garbage_collection_interval_secs
* 1000),
208 std::bind(&BlobDBImpl::RunGC
, this, std::placeholders::_1
));
210 kDeleteObsoleteFilesPeriodMillisecs
,
211 std::bind(&BlobDBImpl::DeleteObsoleteFiles
, this, std::placeholders::_1
));
212 tqueue_
.add(kSanityCheckPeriodMillisecs
,
213 std::bind(&BlobDBImpl::SanityCheck
, this, std::placeholders::_1
));
215 kEvictExpiredFilesPeriodMillisecs
,
216 std::bind(&BlobDBImpl::EvictExpiredFiles
, this, std::placeholders::_1
));
219 Status
BlobDBImpl::GetAllBlobFiles(std::set
<uint64_t>* file_numbers
) {
220 assert(file_numbers
!= nullptr);
221 std::vector
<std::string
> all_files
;
222 Status s
= env_
->GetChildren(blob_dir_
, &all_files
);
224 ROCKS_LOG_ERROR(db_options_
.info_log
,
225 "Failed to get list of blob files, status: %s",
226 s
.ToString().c_str());
230 for (const auto& file_name
: all_files
) {
231 uint64_t file_number
;
233 bool success
= ParseFileName(file_name
, &file_number
, &type
);
234 if (success
&& type
== kBlobFile
) {
235 file_numbers
->insert(file_number
);
237 ROCKS_LOG_WARN(db_options_
.info_log
,
238 "Skipping file in blob directory: %s", file_name
.c_str());
245 Status
BlobDBImpl::OpenAllBlobFiles() {
246 std::set
<uint64_t> file_numbers
;
247 Status s
= GetAllBlobFiles(&file_numbers
);
252 if (!file_numbers
.empty()) {
253 next_file_number_
.store(*file_numbers
.rbegin() + 1);
256 std::string blob_file_list
;
257 std::string obsolete_file_list
;
259 for (auto& file_number
: file_numbers
) {
260 std::shared_ptr
<BlobFile
> blob_file
= std::make_shared
<BlobFile
>(
261 this, blob_dir_
, file_number
, db_options_
.info_log
.get());
262 blob_file
->MarkImmutable();
264 // Read file header and footer
265 Status read_metadata_status
= blob_file
->ReadMetadata(env_
, env_options_
);
266 if (read_metadata_status
.IsCorruption()) {
267 // Remove incomplete file.
268 ObsoleteBlobFile(blob_file
, 0 /*obsolete_seq*/, false /*update_size*/);
269 if (!obsolete_file_list
.empty()) {
270 obsolete_file_list
.append(", ");
272 obsolete_file_list
.append(ToString(file_number
));
274 } else if (!read_metadata_status
.ok()) {
275 ROCKS_LOG_ERROR(db_options_
.info_log
,
276 "Unable to read metadata of blob file % " PRIu64
278 file_number
, read_metadata_status
.ToString().c_str());
279 return read_metadata_status
;
282 total_blob_size_
+= blob_file
->GetFileSize();
284 blob_files_
[file_number
] = blob_file
;
285 if (!blob_file_list
.empty()) {
286 blob_file_list
.append(", ");
288 blob_file_list
.append(ToString(file_number
));
291 ROCKS_LOG_INFO(db_options_
.info_log
,
292 "Found %" ROCKSDB_PRIszt
" blob files: %s", blob_files_
.size(),
293 blob_file_list
.c_str());
294 ROCKS_LOG_INFO(db_options_
.info_log
,
295 "Found %" ROCKSDB_PRIszt
296 " incomplete or corrupted blob files: %s",
297 obsolete_files_
.size(), obsolete_file_list
.c_str());
301 void BlobDBImpl::CloseRandomAccessLocked(
302 const std::shared_ptr
<BlobFile
>& bfile
) {
303 bfile
->CloseRandomAccessLocked();
307 Status
BlobDBImpl::GetBlobFileReader(
308 const std::shared_ptr
<BlobFile
>& blob_file
,
309 std::shared_ptr
<RandomAccessFileReader
>* reader
) {
310 assert(reader
!= nullptr);
311 bool fresh_open
= false;
312 Status s
= blob_file
->GetReader(env_
, env_options_
, reader
, &fresh_open
);
313 if (s
.ok() && fresh_open
) {
314 assert(*reader
!= nullptr);
320 std::shared_ptr
<BlobFile
> BlobDBImpl::NewBlobFile(const std::string
& reason
) {
321 uint64_t file_num
= next_file_number_
++;
322 auto bfile
= std::make_shared
<BlobFile
>(this, blob_dir_
, file_num
,
323 db_options_
.info_log
.get());
324 ROCKS_LOG_DEBUG(db_options_
.info_log
, "New blob file created: %s reason='%s'",
325 bfile
->PathName().c_str(), reason
.c_str());
326 LogFlush(db_options_
.info_log
);
330 Status
BlobDBImpl::CreateWriterLocked(const std::shared_ptr
<BlobFile
>& bfile
) {
331 std::string
fpath(bfile
->PathName());
332 std::unique_ptr
<WritableFile
> wfile
;
334 Status s
= env_
->ReopenWritableFile(fpath
, &wfile
, env_options_
);
336 ROCKS_LOG_ERROR(db_options_
.info_log
,
337 "Failed to open blob file for write: %s status: '%s'"
339 fpath
.c_str(), s
.ToString().c_str(),
340 env_
->FileExists(fpath
).ToString().c_str());
344 std::unique_ptr
<WritableFileWriter
> fwriter
;
345 fwriter
.reset(new WritableFileWriter(std::move(wfile
), fpath
, env_options_
));
347 uint64_t boffset
= bfile
->GetFileSize();
348 if (debug_level_
>= 2 && boffset
) {
349 ROCKS_LOG_DEBUG(db_options_
.info_log
, "Open blob file: %s with offset: %d",
350 fpath
.c_str(), boffset
);
353 Writer::ElemType et
= Writer::kEtNone
;
354 if (bfile
->file_size_
== BlobLogHeader::kSize
) {
355 et
= Writer::kEtFileHdr
;
356 } else if (bfile
->file_size_
> BlobLogHeader::kSize
) {
357 et
= Writer::kEtRecord
;
358 } else if (bfile
->file_size_
) {
359 ROCKS_LOG_WARN(db_options_
.info_log
,
360 "Open blob file: %s with wrong size: %d", fpath
.c_str(),
362 return Status::Corruption("Invalid blob file size");
365 bfile
->log_writer_
= std::make_shared
<Writer
>(
366 std::move(fwriter
), env_
, statistics_
, bfile
->file_number_
,
367 bdb_options_
.bytes_per_sync
, db_options_
.use_fsync
, boffset
);
368 bfile
->log_writer_
->last_elem_type_
= et
;
373 std::shared_ptr
<BlobFile
> BlobDBImpl::FindBlobFileLocked(
374 uint64_t expiration
) const {
375 if (open_ttl_files_
.empty()) {
379 std::shared_ptr
<BlobFile
> tmp
= std::make_shared
<BlobFile
>();
380 tmp
->SetHasTTL(true);
381 tmp
->expiration_range_
= std::make_pair(expiration
, 0);
382 tmp
->file_number_
= std::numeric_limits
<uint64_t>::max();
384 auto citr
= open_ttl_files_
.equal_range(tmp
);
385 if (citr
.first
== open_ttl_files_
.end()) {
386 assert(citr
.second
== open_ttl_files_
.end());
388 std::shared_ptr
<BlobFile
> check
= *(open_ttl_files_
.rbegin());
389 return (check
->expiration_range_
.second
<= expiration
) ? nullptr : check
;
392 if (citr
.first
!= citr
.second
) {
393 return *(citr
.first
);
396 auto finditr
= citr
.second
;
397 if (finditr
!= open_ttl_files_
.begin()) {
401 bool b2
= (*finditr
)->expiration_range_
.second
<= expiration
;
402 bool b1
= (*finditr
)->expiration_range_
.first
> expiration
;
404 return (b1
|| b2
) ? nullptr : (*finditr
);
407 std::shared_ptr
<Writer
> BlobDBImpl::CheckOrCreateWriterLocked(
408 const std::shared_ptr
<BlobFile
>& bfile
) {
409 std::shared_ptr
<Writer
> writer
= bfile
->GetWriter();
410 if (writer
) return writer
;
412 Status s
= CreateWriterLocked(bfile
);
413 if (!s
.ok()) return nullptr;
415 writer
= bfile
->GetWriter();
419 std::shared_ptr
<BlobFile
> BlobDBImpl::SelectBlobFile() {
421 ReadLock
rl(&mutex_
);
422 if (open_non_ttl_file_
!= nullptr) {
423 return open_non_ttl_file_
;
428 WriteLock
wl(&mutex_
);
429 if (open_non_ttl_file_
!= nullptr) {
430 return open_non_ttl_file_
;
433 std::shared_ptr
<BlobFile
> bfile
= NewBlobFile("SelectBlobFile");
436 // file not visible, hence no lock
437 std::shared_ptr
<Writer
> writer
= CheckOrCreateWriterLocked(bfile
);
439 ROCKS_LOG_ERROR(db_options_
.info_log
,
440 "Failed to get writer from blob file: %s",
441 bfile
->PathName().c_str());
445 bfile
->file_size_
= BlobLogHeader::kSize
;
446 bfile
->header_
.compression
= bdb_options_
.compression
;
447 bfile
->header_
.has_ttl
= false;
448 bfile
->header_
.column_family_id
=
449 reinterpret_cast<ColumnFamilyHandleImpl
*>(DefaultColumnFamily())->GetID();
450 bfile
->header_valid_
= true;
451 bfile
->SetColumnFamilyId(bfile
->header_
.column_family_id
);
452 bfile
->SetHasTTL(false);
453 bfile
->SetCompression(bdb_options_
.compression
);
455 Status s
= writer
->WriteHeader(bfile
->header_
);
457 ROCKS_LOG_ERROR(db_options_
.info_log
,
458 "Failed to write header to new blob file: %s"
460 bfile
->PathName().c_str(), s
.ToString().c_str());
464 blob_files_
.insert(std::make_pair(bfile
->BlobFileNumber(), bfile
));
465 open_non_ttl_file_
= bfile
;
466 total_blob_size_
+= BlobLogHeader::kSize
;
470 std::shared_ptr
<BlobFile
> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration
) {
471 assert(expiration
!= kNoExpiration
);
472 uint64_t epoch_read
= 0;
473 std::shared_ptr
<BlobFile
> bfile
;
475 ReadLock
rl(&mutex_
);
476 bfile
= FindBlobFileLocked(expiration
);
477 epoch_read
= epoch_of_
.load();
481 assert(!bfile
->Immutable());
486 (expiration
/ bdb_options_
.ttl_range_secs
) * bdb_options_
.ttl_range_secs
;
487 uint64_t exp_high
= exp_low
+ bdb_options_
.ttl_range_secs
;
488 ExpirationRange expiration_range
= std::make_pair(exp_low
, exp_high
);
490 bfile
= NewBlobFile("SelectBlobFileTTL");
493 ROCKS_LOG_INFO(db_options_
.info_log
, "New blob file TTL range: %s %d %d",
494 bfile
->PathName().c_str(), exp_low
, exp_high
);
495 LogFlush(db_options_
.info_log
);
497 // we don't need to take lock as no other thread is seeing bfile yet
498 std::shared_ptr
<Writer
> writer
= CheckOrCreateWriterLocked(bfile
);
500 ROCKS_LOG_ERROR(db_options_
.info_log
,
501 "Failed to get writer from blob file with TTL: %s",
502 bfile
->PathName().c_str());
506 bfile
->header_
.expiration_range
= expiration_range
;
507 bfile
->header_
.compression
= bdb_options_
.compression
;
508 bfile
->header_
.has_ttl
= true;
509 bfile
->header_
.column_family_id
=
510 reinterpret_cast<ColumnFamilyHandleImpl
*>(DefaultColumnFamily())->GetID();
512 bfile
->header_valid_
= true;
513 bfile
->SetColumnFamilyId(bfile
->header_
.column_family_id
);
514 bfile
->SetHasTTL(true);
515 bfile
->SetCompression(bdb_options_
.compression
);
516 bfile
->file_size_
= BlobLogHeader::kSize
;
518 // set the first value of the range, since that is
519 // concrete at this time. also necessary to add to open_ttl_files_
520 bfile
->expiration_range_
= expiration_range
;
522 WriteLock
wl(&mutex_
);
523 // in case the epoch has shifted in the interim, then check
524 // check condition again - should be rare.
525 if (epoch_of_
.load() != epoch_read
) {
526 auto bfile2
= FindBlobFileLocked(expiration
);
527 if (bfile2
) return bfile2
;
530 Status s
= writer
->WriteHeader(bfile
->header_
);
532 ROCKS_LOG_ERROR(db_options_
.info_log
,
533 "Failed to write header to new blob file: %s"
535 bfile
->PathName().c_str(), s
.ToString().c_str());
539 blob_files_
.insert(std::make_pair(bfile
->BlobFileNumber(), bfile
));
540 open_ttl_files_
.insert(bfile
);
541 total_blob_size_
+= BlobLogHeader::kSize
;
547 class BlobDBImpl::BlobInserter
: public WriteBatch::Handler
{
549 const WriteOptions
& options_
;
550 BlobDBImpl
* blob_db_impl_
;
551 uint32_t default_cf_id_
;
555 BlobInserter(const WriteOptions
& options
, BlobDBImpl
* blob_db_impl
,
556 uint32_t default_cf_id
)
558 blob_db_impl_(blob_db_impl
),
559 default_cf_id_(default_cf_id
) {}
561 WriteBatch
* batch() { return &batch_
; }
563 virtual Status
PutCF(uint32_t column_family_id
, const Slice
& key
,
564 const Slice
& value
) override
{
565 if (column_family_id
!= default_cf_id_
) {
566 return Status::NotSupported(
567 "Blob DB doesn't support non-default column family.");
569 Status s
= blob_db_impl_
->PutBlobValue(options_
, key
, value
, kNoExpiration
,
574 virtual Status
DeleteCF(uint32_t column_family_id
,
575 const Slice
& key
) override
{
576 if (column_family_id
!= default_cf_id_
) {
577 return Status::NotSupported(
578 "Blob DB doesn't support non-default column family.");
580 Status s
= WriteBatchInternal::Delete(&batch_
, column_family_id
, key
);
584 virtual Status
DeleteRange(uint32_t column_family_id
, const Slice
& begin_key
,
585 const Slice
& end_key
) {
586 if (column_family_id
!= default_cf_id_
) {
587 return Status::NotSupported(
588 "Blob DB doesn't support non-default column family.");
590 Status s
= WriteBatchInternal::DeleteRange(&batch_
, column_family_id
,
595 virtual Status
SingleDeleteCF(uint32_t /*column_family_id*/,
596 const Slice
& /*key*/) override
{
597 return Status::NotSupported("Not supported operation in blob db.");
600 virtual Status
MergeCF(uint32_t /*column_family_id*/, const Slice
& /*key*/,
601 const Slice
& /*value*/) override
{
602 return Status::NotSupported("Not supported operation in blob db.");
605 virtual void LogData(const Slice
& blob
) override
{ batch_
.PutLogData(blob
); }
608 Status
BlobDBImpl::Write(const WriteOptions
& options
, WriteBatch
* updates
) {
609 StopWatch
write_sw(env_
, statistics_
, BLOB_DB_WRITE_MICROS
);
610 RecordTick(statistics_
, BLOB_DB_NUM_WRITE
);
611 uint32_t default_cf_id
=
612 reinterpret_cast<ColumnFamilyHandleImpl
*>(DefaultColumnFamily())->GetID();
614 BlobInserter
blob_inserter(options
, this, default_cf_id
);
616 // Release write_mutex_ before DB write to avoid race condition with
617 // flush begin listener, which also require write_mutex_ to sync
619 MutexLock
l(&write_mutex_
);
620 s
= updates
->Iterate(&blob_inserter
);
625 return db_
->Write(options
, blob_inserter
.batch());
628 Status
BlobDBImpl::Put(const WriteOptions
& options
, const Slice
& key
,
629 const Slice
& value
) {
630 return PutUntil(options
, key
, value
, kNoExpiration
);
633 Status
BlobDBImpl::PutWithTTL(const WriteOptions
& options
,
634 const Slice
& key
, const Slice
& value
,
636 uint64_t now
= EpochNow();
637 uint64_t expiration
= kNoExpiration
- now
> ttl
? now
+ ttl
: kNoExpiration
;
638 return PutUntil(options
, key
, value
, expiration
);
641 Status
BlobDBImpl::PutUntil(const WriteOptions
& options
, const Slice
& key
,
642 const Slice
& value
, uint64_t expiration
) {
643 StopWatch
write_sw(env_
, statistics_
, BLOB_DB_WRITE_MICROS
);
644 RecordTick(statistics_
, BLOB_DB_NUM_PUT
);
645 TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
649 // Release write_mutex_ before DB write to avoid race condition with
650 // flush begin listener, which also require write_mutex_ to sync
652 MutexLock
l(&write_mutex_
);
653 s
= PutBlobValue(options
, key
, value
, expiration
, &batch
);
656 s
= db_
->Write(options
, &batch
);
658 TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish");
662 Status
BlobDBImpl::PutBlobValue(const WriteOptions
& /*options*/,
663 const Slice
& key
, const Slice
& value
,
664 uint64_t expiration
, WriteBatch
* batch
) {
665 write_mutex_
.AssertHeld();
667 std::string index_entry
;
668 uint32_t column_family_id
=
669 reinterpret_cast<ColumnFamilyHandleImpl
*>(DefaultColumnFamily())->GetID();
670 if (value
.size() < bdb_options_
.min_blob_size
) {
671 if (expiration
== kNoExpiration
) {
672 // Put as normal value
673 s
= batch
->Put(key
, value
);
674 RecordTick(statistics_
, BLOB_DB_WRITE_INLINED
);
677 BlobIndex::EncodeInlinedTTL(&index_entry
, expiration
, value
);
678 s
= WriteBatchInternal::PutBlobIndex(batch
, column_family_id
, key
,
680 RecordTick(statistics_
, BLOB_DB_WRITE_INLINED_TTL
);
683 std::string compression_output
;
684 Slice value_compressed
= GetCompressedSlice(value
, &compression_output
);
686 std::string headerbuf
;
687 Writer::ConstructBlobHeader(&headerbuf
, key
, value_compressed
, expiration
);
689 // Check DB size limit before selecting blob file to
690 // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
691 // done before calling SelectBlobFile().
692 s
= CheckSizeAndEvictBlobFiles(headerbuf
.size() + key
.size() +
693 value_compressed
.size());
698 std::shared_ptr
<BlobFile
> bfile
= (expiration
!= kNoExpiration
)
699 ? SelectBlobFileTTL(expiration
)
701 assert(bfile
!= nullptr);
702 assert(bfile
->compression() == bdb_options_
.compression
);
704 s
= AppendBlob(bfile
, headerbuf
, key
, value_compressed
, expiration
,
706 if (expiration
== kNoExpiration
) {
707 RecordTick(statistics_
, BLOB_DB_WRITE_BLOB
);
709 RecordTick(statistics_
, BLOB_DB_WRITE_BLOB_TTL
);
713 if (expiration
!= kNoExpiration
) {
714 bfile
->ExtendExpirationRange(expiration
);
716 s
= CloseBlobFileIfNeeded(bfile
);
718 s
= WriteBatchInternal::PutBlobIndex(batch
, column_family_id
, key
,
722 ROCKS_LOG_ERROR(db_options_
.info_log
,
723 "Failed to append blob to FILE: %s: KEY: %s VALSZ: %d"
724 " status: '%s' blob_file: '%s'",
725 bfile
->PathName().c_str(), key
.ToString().c_str(),
726 value
.size(), s
.ToString().c_str(),
727 bfile
->DumpState().c_str());
731 RecordTick(statistics_
, BLOB_DB_NUM_KEYS_WRITTEN
);
732 RecordTick(statistics_
, BLOB_DB_BYTES_WRITTEN
, key
.size() + value
.size());
733 MeasureTime(statistics_
, BLOB_DB_KEY_SIZE
, key
.size());
734 MeasureTime(statistics_
, BLOB_DB_VALUE_SIZE
, value
.size());
739 Slice
BlobDBImpl::GetCompressedSlice(const Slice
& raw
,
740 std::string
* compression_output
) const {
741 if (bdb_options_
.compression
== kNoCompression
) {
744 StopWatch
compression_sw(env_
, statistics_
, BLOB_DB_COMPRESSION_MICROS
);
745 CompressionType ct
= bdb_options_
.compression
;
746 CompressionContext
compression_ctx(ct
);
747 CompressBlock(raw
, compression_ctx
, &ct
, kBlockBasedTableVersionFormat
,
749 return *compression_output
;
752 void BlobDBImpl::GetCompactionContext(BlobCompactionContext
* context
) {
755 context
->next_file_number
= next_file_number_
.load();
756 context
->current_blob_files
.clear();
757 for (auto& p
: blob_files_
) {
758 context
->current_blob_files
.insert(p
.first
);
760 context
->fifo_eviction_seq
= fifo_eviction_seq_
;
761 context
->evict_expiration_up_to
= evict_expiration_up_to_
;
764 void BlobDBImpl::UpdateLiveSSTSize() {
765 uint64_t live_sst_size
= 0;
766 bool ok
= GetIntProperty(DB::Properties::kLiveSstFilesSize
, &live_sst_size
);
768 live_sst_size_
.store(live_sst_size
);
769 ROCKS_LOG_INFO(db_options_
.info_log
,
770 "Updated total SST file size: %" PRIu64
" bytes.",
774 db_options_
.info_log
,
775 "Failed to update total SST file size after flush or compaction.");
778 // Trigger FIFO eviction if needed.
779 MutexLock
l(&write_mutex_
);
780 Status s
= CheckSizeAndEvictBlobFiles(0, true /*force*/);
782 ROCKS_LOG_WARN(db_options_
.info_log
,
783 "DB grow out-of-space after SST size updated. Current live"
784 " SST size: %" PRIu64
785 " , current blob files size: %" PRIu64
".",
786 live_sst_size_
.load(), total_blob_size_
.load());
791 Status
BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size
,
793 write_mutex_
.AssertHeld();
795 uint64_t live_sst_size
= live_sst_size_
.load();
796 if (bdb_options_
.max_db_size
== 0 ||
797 live_sst_size
+ total_blob_size_
.load() + blob_size
<=
798 bdb_options_
.max_db_size
) {
802 if (bdb_options_
.is_fifo
== false ||
803 (!force_evict
&& live_sst_size
+ blob_size
> bdb_options_
.max_db_size
)) {
804 // FIFO eviction is disabled, or no space to insert new blob even we evict
806 return Status::NoSpace(
807 "Write failed, as writing it would exceed max_db_size limit.");
810 std::vector
<std::shared_ptr
<BlobFile
>> candidate_files
;
811 CopyBlobFiles(&candidate_files
);
812 std::sort(candidate_files
.begin(), candidate_files
.end(),
813 BlobFileComparator());
814 fifo_eviction_seq_
= GetLatestSequenceNumber();
816 WriteLock
l(&mutex_
);
818 while (!candidate_files
.empty() &&
819 live_sst_size
+ total_blob_size_
.load() + blob_size
>
820 bdb_options_
.max_db_size
) {
821 std::shared_ptr
<BlobFile
> blob_file
= candidate_files
.back();
822 candidate_files
.pop_back();
823 WriteLock
file_lock(&blob_file
->mutex_
);
824 if (blob_file
->Obsolete()) {
825 // File already obsoleted by someone else.
828 // FIFO eviction can evict open blob files.
829 if (!blob_file
->Immutable()) {
830 Status s
= CloseBlobFile(blob_file
, false /*need_lock*/);
835 assert(blob_file
->Immutable());
836 auto expiration_range
= blob_file
->GetExpirationRange();
837 ROCKS_LOG_INFO(db_options_
.info_log
,
838 "Evict oldest blob file since DB out of space. Current "
839 "live SST file size: %" PRIu64
", total blob size: %" PRIu64
840 ", max db size: %" PRIu64
", evicted blob file #%" PRIu64
842 live_sst_size
, total_blob_size_
.load(),
843 bdb_options_
.max_db_size
, blob_file
->BlobFileNumber());
844 ObsoleteBlobFile(blob_file
, fifo_eviction_seq_
, true /*update_size*/);
845 evict_expiration_up_to_
= expiration_range
.first
;
846 RecordTick(statistics_
, BLOB_DB_FIFO_NUM_FILES_EVICTED
);
847 RecordTick(statistics_
, BLOB_DB_FIFO_NUM_KEYS_EVICTED
,
848 blob_file
->BlobCount());
849 RecordTick(statistics_
, BLOB_DB_FIFO_BYTES_EVICTED
,
850 blob_file
->GetFileSize());
851 TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
853 if (live_sst_size
+ total_blob_size_
.load() + blob_size
>
854 bdb_options_
.max_db_size
) {
855 return Status::NoSpace(
856 "Write failed, as writing it would exceed max_db_size limit.");
861 Status
BlobDBImpl::AppendBlob(const std::shared_ptr
<BlobFile
>& bfile
,
862 const std::string
& headerbuf
, const Slice
& key
,
863 const Slice
& value
, uint64_t expiration
,
864 std::string
* index_entry
) {
866 uint64_t blob_offset
= 0;
867 uint64_t key_offset
= 0;
869 WriteLock
lockbfile_w(&bfile
->mutex_
);
870 std::shared_ptr
<Writer
> writer
= CheckOrCreateWriterLocked(bfile
);
872 return Status::IOError("Failed to create blob writer");
875 // write the blob to the blob log.
876 s
= writer
->EmitPhysicalRecord(headerbuf
, key
, value
, &key_offset
,
881 ROCKS_LOG_ERROR(db_options_
.info_log
,
882 "Invalid status in AppendBlob: %s status: '%s'",
883 bfile
->PathName().c_str(), s
.ToString().c_str());
887 // increment blob count
888 bfile
->blob_count_
++;
890 uint64_t size_put
= headerbuf
.size() + key
.size() + value
.size();
891 bfile
->file_size_
+= size_put
;
892 total_blob_size_
+= size_put
;
894 if (expiration
== kNoExpiration
) {
895 BlobIndex::EncodeBlob(index_entry
, bfile
->BlobFileNumber(), blob_offset
,
896 value
.size(), bdb_options_
.compression
);
898 BlobIndex::EncodeBlobTTL(index_entry
, expiration
, bfile
->BlobFileNumber(),
899 blob_offset
, value
.size(),
900 bdb_options_
.compression
);
906 std::vector
<Status
> BlobDBImpl::MultiGet(
907 const ReadOptions
& read_options
,
908 const std::vector
<Slice
>& keys
, std::vector
<std::string
>* values
) {
909 StopWatch
multiget_sw(env_
, statistics_
, BLOB_DB_MULTIGET_MICROS
);
910 RecordTick(statistics_
, BLOB_DB_NUM_MULTIGET
);
911 // Get a snapshot to avoid blob file get deleted between we
912 // fetch and index entry and reading from the file.
913 ReadOptions
ro(read_options
);
914 bool snapshot_created
= SetSnapshotIfNeeded(&ro
);
916 std::vector
<Status
> statuses
;
917 statuses
.reserve(keys
.size());
919 values
->reserve(keys
.size());
921 for (size_t i
= 0; i
< keys
.size(); i
++) {
922 statuses
.push_back(Get(ro
, DefaultColumnFamily(), keys
[i
], &value
));
923 values
->push_back(value
.ToString());
926 if (snapshot_created
) {
927 db_
->ReleaseSnapshot(ro
.snapshot
);
932 bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions
* read_options
) {
933 assert(read_options
!= nullptr);
934 if (read_options
->snapshot
!= nullptr) {
937 read_options
->snapshot
= db_
->GetSnapshot();
941 Status
BlobDBImpl::GetBlobValue(const Slice
& key
, const Slice
& index_entry
,
942 PinnableSlice
* value
, uint64_t* expiration
) {
943 assert(value
!= nullptr);
944 BlobIndex blob_index
;
945 Status s
= blob_index
.DecodeFrom(index_entry
);
949 if (blob_index
.HasTTL() && blob_index
.expiration() <= EpochNow()) {
950 return Status::NotFound("Key expired");
952 if (expiration
!= nullptr) {
953 if (blob_index
.HasTTL()) {
954 *expiration
= blob_index
.expiration();
956 *expiration
= kNoExpiration
;
959 if (blob_index
.IsInlined()) {
960 // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
961 // memory buffer to avoid extra copy.
962 value
->PinSelf(blob_index
.value());
965 if (blob_index
.size() == 0) {
970 // offset has to have certain min, as we will read CRC
971 // later from the Blob Header, which needs to be also a
973 if (blob_index
.offset() <
974 (BlobLogHeader::kSize
+ BlobLogRecord::kHeaderSize
+ key
.size())) {
975 if (debug_level_
>= 2) {
976 ROCKS_LOG_ERROR(db_options_
.info_log
,
977 "Invalid blob index file_number: %" PRIu64
978 " blob_offset: %" PRIu64
" blob_size: %" PRIu64
980 blob_index
.file_number(), blob_index
.offset(),
981 blob_index
.size(), key
.data());
983 return Status::NotFound("Invalid blob offset");
986 std::shared_ptr
<BlobFile
> bfile
;
988 ReadLock
rl(&mutex_
);
989 auto hitr
= blob_files_
.find(blob_index
.file_number());
992 if (hitr
== blob_files_
.end()) {
993 return Status::NotFound("Blob Not Found as blob file missing");
996 bfile
= hitr
->second
;
999 if (blob_index
.size() == 0 && value
!= nullptr) {
1001 return Status::OK();
1004 // takes locks when called
1005 std::shared_ptr
<RandomAccessFileReader
> reader
;
1006 s
= GetBlobFileReader(bfile
, &reader
);
1011 assert(blob_index
.offset() > key
.size() + sizeof(uint32_t));
1012 uint64_t record_offset
= blob_index
.offset() - key
.size() - sizeof(uint32_t);
1013 uint64_t record_size
= sizeof(uint32_t) + key
.size() + blob_index
.size();
1015 // Allocate the buffer. This is safe in C++11
1016 std::string
buffer_str(static_cast<size_t>(record_size
), static_cast<char>(0));
1017 char* buffer
= &buffer_str
[0];
1019 // A partial blob record contain checksum, key and value.
1022 StopWatch
read_sw(env_
, statistics_
, BLOB_DB_BLOB_FILE_READ_MICROS
);
1023 s
= reader
->Read(record_offset
, static_cast<size_t>(record_size
), &blob_record
, buffer
);
1024 RecordTick(statistics_
, BLOB_DB_BLOB_FILE_BYTES_READ
, blob_record
.size());
1027 ROCKS_LOG_DEBUG(db_options_
.info_log
,
1028 "Failed to read blob from blob file %" PRIu64
1029 ", blob_offset: %" PRIu64
", blob_size: %" PRIu64
1030 ", key_size: " PRIu64
", read " PRIu64
1031 " bytes, status: '%s'",
1032 bfile
->BlobFileNumber(), blob_index
.offset(),
1033 blob_index
.size(), key
.size(), s
.ToString().c_str());
1036 if (blob_record
.size() != record_size
) {
1037 ROCKS_LOG_DEBUG(db_options_
.info_log
,
1038 "Failed to read blob from blob file %" PRIu64
1039 ", blob_offset: %" PRIu64
", blob_size: %" PRIu64
1040 ", key_size: " PRIu64
", read " PRIu64
1041 " bytes, status: '%s'",
1042 bfile
->BlobFileNumber(), blob_index
.offset(),
1043 blob_index
.size(), key
.size(), s
.ToString().c_str());
1045 return Status::Corruption("Failed to retrieve blob from blob index.");
1047 Slice
crc_slice(blob_record
.data(), sizeof(uint32_t));
1048 Slice
blob_value(blob_record
.data() + sizeof(uint32_t) + key
.size(),
1049 static_cast<size_t>(blob_index
.size()));
1051 if (!GetFixed32(&crc_slice
, &crc_exp
)) {
1052 ROCKS_LOG_DEBUG(db_options_
.info_log
,
1053 "Unable to decode CRC from blob file %" PRIu64
1054 ", blob_offset: %" PRIu64
", blob_size: %" PRIu64
1055 ", key size: %" PRIu64
", status: '%s'",
1056 bfile
->BlobFileNumber(), blob_index
.offset(),
1057 blob_index
.size(), key
.size(), s
.ToString().c_str());
1058 return Status::Corruption("Unable to decode checksum.");
1061 uint32_t crc
= crc32c::Value(blob_record
.data() + sizeof(uint32_t),
1062 blob_record
.size() - sizeof(uint32_t));
1063 crc
= crc32c::Mask(crc
); // Adjust for storage
1064 if (crc
!= crc_exp
) {
1065 if (debug_level_
>= 2) {
1066 ROCKS_LOG_ERROR(db_options_
.info_log
,
1067 "Blob crc mismatch file: %s blob_offset: %" PRIu64
1068 " blob_size: %" PRIu64
" key: %s status: '%s'",
1069 bfile
->PathName().c_str(), blob_index
.offset(),
1070 blob_index
.size(), key
.data(), s
.ToString().c_str());
1072 return Status::Corruption("Corruption. Blob CRC mismatch");
1075 if (bfile
->compression() == kNoCompression
) {
1076 value
->PinSelf(blob_value
);
1078 BlockContents contents
;
1079 auto cfh
= reinterpret_cast<ColumnFamilyHandleImpl
*>(DefaultColumnFamily());
1081 StopWatch
decompression_sw(env_
, statistics_
,
1082 BLOB_DB_DECOMPRESSION_MICROS
);
1083 UncompressionContext
uncompression_ctx(bfile
->compression());
1084 s
= UncompressBlockContentsForCompressionType(
1085 uncompression_ctx
, blob_value
.data(), blob_value
.size(), &contents
,
1086 kBlockBasedTableVersionFormat
, *(cfh
->cfd()->ioptions()));
1088 value
->PinSelf(contents
.data
);
1094 Status
BlobDBImpl::Get(const ReadOptions
& read_options
,
1095 ColumnFamilyHandle
* column_family
, const Slice
& key
,
1096 PinnableSlice
* value
) {
1097 return Get(read_options
, column_family
, key
, value
, nullptr /*expiration*/);
1100 Status
BlobDBImpl::Get(const ReadOptions
& read_options
,
1101 ColumnFamilyHandle
* column_family
, const Slice
& key
,
1102 PinnableSlice
* value
, uint64_t* expiration
) {
1103 StopWatch
get_sw(env_
, statistics_
, BLOB_DB_GET_MICROS
);
1104 RecordTick(statistics_
, BLOB_DB_NUM_GET
);
1105 return GetImpl(read_options
, column_family
, key
, value
, expiration
);
1108 Status
BlobDBImpl::GetImpl(const ReadOptions
& read_options
,
1109 ColumnFamilyHandle
* column_family
, const Slice
& key
,
1110 PinnableSlice
* value
, uint64_t* expiration
) {
1111 if (column_family
!= DefaultColumnFamily()) {
1112 return Status::NotSupported(
1113 "Blob DB doesn't support non-default column family.");
1115 // Get a snapshot to avoid blob file get deleted between we
1116 // fetch and index entry and reading from the file.
1117 // TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
1118 ReadOptions
ro(read_options
);
1119 bool snapshot_created
= SetSnapshotIfNeeded(&ro
);
1121 PinnableSlice index_entry
;
1123 bool is_blob_index
= false;
1124 s
= db_impl_
->GetImpl(ro
, column_family
, key
, &index_entry
,
1125 nullptr /*value_found*/, nullptr /*read_callback*/,
1127 TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1");
1128 TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2");
1129 if (expiration
!= nullptr) {
1130 *expiration
= kNoExpiration
;
1132 RecordTick(statistics_
, BLOB_DB_NUM_KEYS_READ
);
1134 if (is_blob_index
) {
1135 s
= GetBlobValue(key
, index_entry
, value
, expiration
);
1137 // The index entry is the value itself in this case.
1138 value
->PinSelf(index_entry
);
1140 RecordTick(statistics_
, BLOB_DB_BYTES_READ
, value
->size());
1142 if (snapshot_created
) {
1143 db_
->ReleaseSnapshot(ro
.snapshot
);
1148 std::pair
<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted
) {
1150 return std::make_pair(false, -1);
1153 ROCKS_LOG_INFO(db_options_
.info_log
, "Starting Sanity Check");
1154 ROCKS_LOG_INFO(db_options_
.info_log
, "Number of files %" PRIu64
,
1155 blob_files_
.size());
1156 ROCKS_LOG_INFO(db_options_
.info_log
, "Number of open files %" PRIu64
,
1157 open_ttl_files_
.size());
1159 for (auto bfile
: open_ttl_files_
) {
1160 assert(!bfile
->Immutable());
1163 uint64_t now
= EpochNow();
1165 for (auto blob_file_pair
: blob_files_
) {
1166 auto blob_file
= blob_file_pair
.second
;
1168 int pos
= snprintf(buf
, sizeof(buf
),
1169 "Blob file %" PRIu64
", size %" PRIu64
1170 ", blob count %" PRIu64
", immutable %d",
1171 blob_file
->BlobFileNumber(), blob_file
->GetFileSize(),
1172 blob_file
->BlobCount(), blob_file
->Immutable());
1173 if (blob_file
->HasTTL()) {
1174 auto expiration_range
= blob_file
->GetExpirationRange();
1175 pos
+= snprintf(buf
+ pos
, sizeof(buf
) - pos
,
1176 ", expiration range (%" PRIu64
", %" PRIu64
")",
1177 expiration_range
.first
, expiration_range
.second
);
1178 if (!blob_file
->Obsolete()) {
1179 pos
+= snprintf(buf
+ pos
, sizeof(buf
) - pos
,
1180 ", expire in %" PRIu64
" seconds",
1181 expiration_range
.second
- now
);
1184 if (blob_file
->Obsolete()) {
1185 pos
+= snprintf(buf
+ pos
, sizeof(buf
) - pos
, ", obsolete at %" PRIu64
,
1186 blob_file
->GetObsoleteSequence());
1188 snprintf(buf
+ pos
, sizeof(buf
) - pos
, ".");
1189 ROCKS_LOG_INFO(db_options_
.info_log
, "%s", buf
);
1193 return std::make_pair(true, -1);
1196 Status
BlobDBImpl::CloseBlobFile(std::shared_ptr
<BlobFile
> bfile
,
1198 assert(bfile
!= nullptr);
1199 write_mutex_
.AssertHeld();
1201 ROCKS_LOG_INFO(db_options_
.info_log
,
1202 "Closing blob file %" PRIu64
". Path: %s",
1203 bfile
->BlobFileNumber(), bfile
->PathName().c_str());
1205 std::unique_ptr
<WriteLock
> lock
;
1207 lock
.reset(new WriteLock(&mutex_
));
1210 if (bfile
->HasTTL()) {
1211 size_t erased
__attribute__((__unused__
));
1212 erased
= open_ttl_files_
.erase(bfile
);
1213 } else if (bfile
== open_non_ttl_file_
) {
1214 open_non_ttl_file_
= nullptr;
1218 if (!bfile
->closed_
.load()) {
1219 std::unique_ptr
<WriteLock
> file_lock
;
1221 file_lock
.reset(new WriteLock(&bfile
->mutex_
));
1223 s
= bfile
->WriteFooterAndCloseLocked();
1227 total_blob_size_
+= BlobLogFooter::kSize
;
1229 ROCKS_LOG_ERROR(db_options_
.info_log
,
1230 "Failed to close blob file %" PRIu64
"with error: %s",
1231 bfile
->BlobFileNumber(), s
.ToString().c_str());
1237 Status
BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr
<BlobFile
>& bfile
) {
1239 if (bfile
->GetFileSize() < bdb_options_
.blob_file_size
) {
1240 return Status::OK();
1242 return CloseBlobFile(bfile
);
1245 void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr
<BlobFile
> blob_file
,
1246 SequenceNumber obsolete_seq
,
1248 // Should hold write lock of mutex_ or during DB open.
1249 blob_file
->MarkObsolete(obsolete_seq
);
1250 obsolete_files_
.push_back(blob_file
);
1251 assert(total_blob_size_
.load() >= blob_file
->GetFileSize());
1253 total_blob_size_
-= blob_file
->GetFileSize();
1257 bool BlobDBImpl::VisibleToActiveSnapshot(
1258 const std::shared_ptr
<BlobFile
>& bfile
) {
1259 assert(bfile
->Obsolete());
1261 // We check whether the oldest snapshot is no less than the last sequence
1262 // by the time the blob file become obsolete. If so, the blob file is not
1263 // visible to all existing snapshots.
1265 // If we keep track of the earliest sequence of the keys in the blob file,
1266 // we could instead check if there's a snapshot falls in range
1267 // [earliest_sequence, obsolete_sequence). But doing so will make the
1268 // implementation more complicated.
1269 SequenceNumber obsolete_sequence
= bfile
->GetObsoleteSequence();
1270 SequenceNumber oldest_snapshot
= kMaxSequenceNumber
;
1272 // Need to lock DBImpl mutex before access snapshot list.
1273 InstrumentedMutexLock
l(db_impl_
->mutex());
1274 auto& snapshots
= db_impl_
->snapshots();
1275 if (!snapshots
.empty()) {
1276 oldest_snapshot
= snapshots
.oldest()->GetSequenceNumber();
1279 bool visible
= oldest_snapshot
< obsolete_sequence
;
1281 ROCKS_LOG_INFO(db_options_
.info_log
,
1282 "Obsolete blob file %" PRIu64
" (obsolete at %" PRIu64
1283 ") visible to oldest snapshot %" PRIu64
".",
1284 bfile
->BlobFileNumber(), obsolete_sequence
, oldest_snapshot
);
1289 std::pair
<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted
) {
1291 return std::make_pair(false, -1);
1294 std::vector
<std::shared_ptr
<BlobFile
>> process_files
;
1295 uint64_t now
= EpochNow();
1297 ReadLock
rl(&mutex_
);
1298 for (auto p
: blob_files_
) {
1299 auto& blob_file
= p
.second
;
1300 ReadLock
file_lock(&blob_file
->mutex_
);
1301 if (blob_file
->HasTTL() && !blob_file
->Obsolete() &&
1302 blob_file
->GetExpirationRange().second
<= now
) {
1303 process_files
.push_back(blob_file
);
1308 SequenceNumber seq
= GetLatestSequenceNumber();
1310 MutexLock
l(&write_mutex_
);
1311 for (auto& blob_file
: process_files
) {
1312 WriteLock
file_lock(&blob_file
->mutex_
);
1313 if (!blob_file
->Immutable()) {
1314 CloseBlobFile(blob_file
, false /*need_lock*/);
1316 // Need to double check if the file is obsolete.
1317 if (!blob_file
->Obsolete()) {
1318 ObsoleteBlobFile(blob_file
, seq
, true /*update_size*/);
1323 return std::make_pair(true, -1);
1326 Status
BlobDBImpl::SyncBlobFiles() {
1327 MutexLock
l(&write_mutex_
);
1329 std::vector
<std::shared_ptr
<BlobFile
>> process_files
;
1331 ReadLock
rl(&mutex_
);
1332 for (auto fitr
: open_ttl_files_
) {
1333 process_files
.push_back(fitr
);
1335 if (open_non_ttl_file_
!= nullptr) {
1336 process_files
.push_back(open_non_ttl_file_
);
1341 for (auto& blob_file
: process_files
) {
1342 s
= blob_file
->Fsync();
1344 ROCKS_LOG_ERROR(db_options_
.info_log
,
1345 "Failed to sync blob file %" PRIu64
", status: %s",
1346 blob_file
->BlobFileNumber(), s
.ToString().c_str());
1351 s
= dir_ent_
->Fsync();
1353 ROCKS_LOG_ERROR(db_options_
.info_log
,
1354 "Failed to sync blob directory, status: %s",
1355 s
.ToString().c_str());
1360 std::pair
<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted
) {
1361 if (aborted
) return std::make_pair(false, -1);
1363 if (open_file_count_
.load() < kOpenFilesTrigger
) {
1364 return std::make_pair(true, -1);
1367 // in the future, we should sort by last_access_
1368 // instead of closing every file
1369 ReadLock
rl(&mutex_
);
1370 for (auto const& ent
: blob_files_
) {
1371 auto bfile
= ent
.second
;
1372 if (bfile
->last_access_
.load() == -1) continue;
1374 WriteLock
lockbfile_w(&bfile
->mutex_
);
1375 CloseRandomAccessLocked(bfile
);
1378 return std::make_pair(true, -1);
1381 // Write callback for garbage collection to check if key has been updated
1382 // since last read. Similar to how OptimisticTransaction works. See inline
1383 // comment in GCFileAndUpdateLSM().
1384 class BlobDBImpl::GarbageCollectionWriteCallback
: public WriteCallback
{
1386 GarbageCollectionWriteCallback(ColumnFamilyData
* cfd
, const Slice
& key
,
1387 SequenceNumber upper_bound
)
1388 : cfd_(cfd
), key_(key
), upper_bound_(upper_bound
) {}
1390 virtual Status
Callback(DB
* db
) override
{
1391 auto* db_impl
= reinterpret_cast<DBImpl
*>(db
);
1392 auto* sv
= db_impl
->GetAndRefSuperVersion(cfd_
);
1393 SequenceNumber latest_seq
= 0;
1394 bool found_record_for_key
= false;
1395 bool is_blob_index
= false;
1396 Status s
= db_impl
->GetLatestSequenceForKey(
1397 sv
, key_
, false /*cache_only*/, &latest_seq
, &found_record_for_key
,
1399 db_impl
->ReturnAndCleanupSuperVersion(cfd_
, sv
);
1400 if (!s
.ok() && !s
.IsNotFound()) {
1402 assert(!s
.IsBusy());
1405 if (s
.IsNotFound()) {
1406 assert(!found_record_for_key
);
1407 return Status::Busy("Key deleted");
1409 assert(found_record_for_key
);
1410 assert(is_blob_index
);
1411 if (latest_seq
> upper_bound_
) {
1412 return Status::Busy("Key overwritten");
1417 virtual bool AllowWriteBatching() override
{ return false; }
1420 ColumnFamilyData
* cfd_
;
1423 // Upper bound of sequence number to proceed.
1424 SequenceNumber upper_bound_
;
1427 // iterate over the blobs sequentially and check if the blob sequence number
1428 // is the latest. If it is the latest, preserve it, otherwise delete it
1429 // if it is TTL based, and the TTL has expired, then
1430 // we can blow the entity if the key is still the latest or the Key is not
1432 // WHAT HAPPENS IF THE KEY HAS BEEN OVERRIDEN. Then we can drop the blob
1433 // without doing anything if the earliest snapshot is not
1434 // referring to that sequence number, i.e. it is later than the sequence number
1437 // if it is not TTL based, then we can blow the key if the key has been
1438 // DELETED in the LSM
1439 Status
BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr
<BlobFile
>& bfptr
,
1440 GCStats
* gc_stats
) {
1441 StopWatch
gc_sw(env_
, statistics_
, BLOB_DB_GC_MICROS
);
1442 uint64_t now
= EpochNow();
1444 std::shared_ptr
<Reader
> reader
=
1445 bfptr
->OpenRandomAccessReader(env_
, db_options_
, env_options_
);
1447 ROCKS_LOG_ERROR(db_options_
.info_log
,
1448 "File sequential reader could not be opened",
1449 bfptr
->PathName().c_str());
1450 return Status::IOError("failed to create sequential reader");
1453 BlobLogHeader header
;
1454 Status s
= reader
->ReadHeader(&header
);
1456 ROCKS_LOG_ERROR(db_options_
.info_log
,
1457 "Failure to read header for blob-file %s",
1458 bfptr
->PathName().c_str());
1463 db_impl_
->GetColumnFamilyHandleUnlocked(bfptr
->column_family_id());
1464 auto* cfd
= reinterpret_cast<ColumnFamilyHandleImpl
*>(cfh
)->cfd();
1465 auto column_family_id
= cfd
->GetID();
1466 bool has_ttl
= header
.has_ttl
;
1468 // this reads the key but skips the blob
1469 Reader::ReadLevel shallow
= Reader::kReadHeaderKey
;
1471 bool file_expired
= has_ttl
&& now
>= bfptr
->GetExpirationRange().second
;
1473 if (!file_expired
) {
1474 // read the blob because you have to write it back to new file
1475 shallow
= Reader::kReadHeaderKeyBlob
;
1478 BlobLogRecord record
;
1479 std::shared_ptr
<BlobFile
> newfile
;
1480 std::shared_ptr
<Writer
> new_writer
;
1481 uint64_t blob_offset
= 0;
1486 // Read the next blob record.
1487 Status read_record_status
=
1488 reader
->ReadRecord(&record
, shallow
, &blob_offset
);
1489 // Exit if we reach the end of blob file.
1490 // TODO(yiwu): properly handle ReadRecord error.
1491 if (!read_record_status
.ok()) {
1494 gc_stats
->blob_count
++;
1496 // Similar to OptimisticTransaction, we obtain latest_seq from
1497 // base DB, which is guaranteed to be no smaller than the sequence of
1498 // current key. We use a WriteCallback on write to check the key sequence
1499 // on write. If the key sequence is larger than latest_seq, we know
1500 // a new versions is inserted and the old blob can be disgard.
1502 // We cannot use OptimisticTransaction because we need to pass
1503 // is_blob_index flag to GetImpl.
1504 SequenceNumber latest_seq
= GetLatestSequenceNumber();
1505 bool is_blob_index
= false;
1506 PinnableSlice index_entry
;
1507 Status get_status
= db_impl_
->GetImpl(
1508 ReadOptions(), cfh
, record
.key
, &index_entry
, nullptr /*value_found*/,
1509 nullptr /*read_callback*/, &is_blob_index
);
1510 TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB");
1511 if (!get_status
.ok() && !get_status
.IsNotFound()) {
1514 ROCKS_LOG_ERROR(db_options_
.info_log
,
1515 "Error while getting index entry: %s",
1516 s
.ToString().c_str());
1519 if (get_status
.IsNotFound() || !is_blob_index
) {
1520 // Either the key is deleted or updated with a newer version whish is
1522 gc_stats
->num_keys_overwritten
++;
1523 gc_stats
->bytes_overwritten
+= record
.record_size();
1527 BlobIndex blob_index
;
1528 s
= blob_index
.DecodeFrom(index_entry
);
1530 ROCKS_LOG_ERROR(db_options_
.info_log
,
1531 "Error while decoding index entry: %s",
1532 s
.ToString().c_str());
1535 if (blob_index
.IsInlined() ||
1536 blob_index
.file_number() != bfptr
->BlobFileNumber() ||
1537 blob_index
.offset() != blob_offset
) {
1538 // Key has been overwritten. Drop the blob record.
1539 gc_stats
->num_keys_overwritten
++;
1540 gc_stats
->bytes_overwritten
+= record
.record_size();
1544 GarbageCollectionWriteCallback
callback(cfd
, record
.key
, latest_seq
);
1546 // If key has expired, remove it from base DB.
1547 // TODO(yiwu): Blob indexes will be remove by BlobIndexCompactionFilter.
1548 // We can just drop the blob record.
1549 if (file_expired
|| (has_ttl
&& now
>= record
.expiration
)) {
1550 gc_stats
->num_keys_expired
++;
1551 gc_stats
->bytes_expired
+= record
.record_size();
1552 TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete");
1553 WriteBatch delete_batch
;
1554 Status delete_status
= delete_batch
.Delete(record
.key
);
1555 if (delete_status
.ok()) {
1556 delete_status
= db_impl_
->WriteWithCallback(WriteOptions(),
1557 &delete_batch
, &callback
);
1559 if (!delete_status
.ok() && !delete_status
.IsBusy()) {
1562 ROCKS_LOG_ERROR(db_options_
.info_log
,
1563 "Error while deleting expired key: %s",
1564 s
.ToString().c_str());
1567 // Continue to next blob record or retry.
1571 // Relocate the blob record to new file.
1574 std::string
reason("GC of ");
1575 reason
+= bfptr
->PathName();
1576 newfile
= NewBlobFile(reason
);
1578 new_writer
= CheckOrCreateWriterLocked(newfile
);
1579 // Can't use header beyond this point
1580 newfile
->header_
= std::move(header
);
1581 newfile
->header_valid_
= true;
1582 newfile
->file_size_
= BlobLogHeader::kSize
;
1583 newfile
->SetColumnFamilyId(bfptr
->column_family_id());
1584 newfile
->SetHasTTL(bfptr
->HasTTL());
1585 newfile
->SetCompression(bfptr
->compression());
1586 newfile
->expiration_range_
= bfptr
->expiration_range_
;
1588 s
= new_writer
->WriteHeader(newfile
->header_
);
1590 ROCKS_LOG_ERROR(db_options_
.info_log
,
1591 "File: %s - header writing failed",
1592 newfile
->PathName().c_str());
1596 // We don't add the file to open_ttl_files_ or open_non_ttl_files_, to
1597 // avoid user writes writing to the file, and avoid
1598 // EvictExpiredFiles close the file by mistake.
1599 WriteLock
wl(&mutex_
);
1600 blob_files_
.insert(std::make_pair(newfile
->BlobFileNumber(), newfile
));
1603 std::string new_index_entry
;
1604 uint64_t new_blob_offset
= 0;
1605 uint64_t new_key_offset
= 0;
1606 // write the blob to the blob log.
1607 s
= new_writer
->AddRecord(record
.key
, record
.value
, record
.expiration
,
1608 &new_key_offset
, &new_blob_offset
);
1610 BlobIndex::EncodeBlob(&new_index_entry
, newfile
->BlobFileNumber(),
1611 new_blob_offset
, record
.value
.size(),
1612 bdb_options_
.compression
);
1614 newfile
->blob_count_
++;
1615 newfile
->file_size_
+=
1616 BlobLogRecord::kHeaderSize
+ record
.key
.size() + record
.value
.size();
1618 TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate");
1619 WriteBatch rewrite_batch
;
1620 Status rewrite_status
= WriteBatchInternal::PutBlobIndex(
1621 &rewrite_batch
, column_family_id
, record
.key
, new_index_entry
);
1622 if (rewrite_status
.ok()) {
1623 rewrite_status
= db_impl_
->WriteWithCallback(WriteOptions(),
1624 &rewrite_batch
, &callback
);
1626 if (rewrite_status
.ok()) {
1627 gc_stats
->num_keys_relocated
++;
1628 gc_stats
->bytes_relocated
+= record
.record_size();
1629 } else if (rewrite_status
.IsBusy()) {
1630 // The key is overwritten in the meanwhile. Drop the blob record.
1631 gc_stats
->num_keys_overwritten
++;
1632 gc_stats
->bytes_overwritten
+= record
.record_size();
1636 ROCKS_LOG_ERROR(db_options_
.info_log
, "Error while relocating key: %s",
1637 s
.ToString().c_str());
1640 } // end of ReadRecord loop
1643 WriteLock
wl(&mutex_
);
1644 ObsoleteBlobFile(bfptr
, GetLatestSequenceNumber(), true /*update_size*/);
1648 db_options_
.info_log
,
1649 "%s blob file %" PRIu64
". Total blob records: %" PRIu64
1650 ", expired: %" PRIu64
" keys/%" PRIu64
1651 " bytes, updated or deleted by user: %" PRIu64
" keys/%" PRIu64
1652 " bytes, rewrite to new file: %" PRIu64
" keys/%" PRIu64
" bytes.",
1653 s
.ok() ? "Successfully garbage collected" : "Failed to garbage collect",
1654 bfptr
->BlobFileNumber(), gc_stats
->blob_count
, gc_stats
->num_keys_expired
,
1655 gc_stats
->bytes_expired
, gc_stats
->num_keys_overwritten
,
1656 gc_stats
->bytes_overwritten
, gc_stats
->num_keys_relocated
,
1657 gc_stats
->bytes_relocated
);
1658 RecordTick(statistics_
, BLOB_DB_GC_NUM_FILES
);
1659 RecordTick(statistics_
, BLOB_DB_GC_NUM_KEYS_OVERWRITTEN
,
1660 gc_stats
->num_keys_overwritten
);
1661 RecordTick(statistics_
, BLOB_DB_GC_NUM_KEYS_EXPIRED
,
1662 gc_stats
->num_keys_expired
);
1663 RecordTick(statistics_
, BLOB_DB_GC_BYTES_OVERWRITTEN
,
1664 gc_stats
->bytes_overwritten
);
1665 RecordTick(statistics_
, BLOB_DB_GC_BYTES_EXPIRED
, gc_stats
->bytes_expired
);
1666 if (newfile
!= nullptr) {
1668 MutexLock
l(&write_mutex_
);
1669 CloseBlobFile(newfile
);
1671 total_blob_size_
+= newfile
->file_size_
;
1672 ROCKS_LOG_INFO(db_options_
.info_log
, "New blob file %" PRIu64
".",
1673 newfile
->BlobFileNumber());
1674 RecordTick(statistics_
, BLOB_DB_GC_NUM_NEW_FILES
);
1675 RecordTick(statistics_
, BLOB_DB_GC_NUM_KEYS_RELOCATED
,
1676 gc_stats
->num_keys_relocated
);
1677 RecordTick(statistics_
, BLOB_DB_GC_BYTES_RELOCATED
,
1678 gc_stats
->bytes_relocated
);
1681 RecordTick(statistics_
, BLOB_DB_GC_FAILURES
);
1686 std::pair
<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted
) {
1688 return std::make_pair(false, -1);
1691 MutexLock
delete_file_lock(&delete_file_mutex_
);
1692 if (disable_file_deletions_
> 0) {
1693 return std::make_pair(true, -1);
1696 std::list
<std::shared_ptr
<BlobFile
>> tobsolete
;
1698 WriteLock
wl(&mutex_
);
1699 if (obsolete_files_
.empty()) {
1700 return std::make_pair(true, -1);
1702 tobsolete
.swap(obsolete_files_
);
1705 bool file_deleted
= false;
1706 for (auto iter
= tobsolete
.begin(); iter
!= tobsolete
.end();) {
1709 ReadLock
lockbfile_r(&bfile
->mutex_
);
1710 if (VisibleToActiveSnapshot(bfile
)) {
1711 ROCKS_LOG_INFO(db_options_
.info_log
,
1712 "Could not delete file due to snapshot failure %s",
1713 bfile
->PathName().c_str());
1718 ROCKS_LOG_INFO(db_options_
.info_log
,
1719 "Will delete file due to snapshot success %s",
1720 bfile
->PathName().c_str());
1722 blob_files_
.erase(bfile
->BlobFileNumber());
1723 Status s
= env_
->DeleteFile(bfile
->PathName());
1725 ROCKS_LOG_ERROR(db_options_
.info_log
,
1726 "File failed to be deleted as obsolete %s",
1727 bfile
->PathName().c_str());
1732 file_deleted
= true;
1733 ROCKS_LOG_INFO(db_options_
.info_log
,
1734 "File deleted as obsolete from blob dir %s",
1735 bfile
->PathName().c_str());
1737 iter
= tobsolete
.erase(iter
);
1740 // directory change. Fsync
1742 Status s
= dir_ent_
->Fsync();
1744 ROCKS_LOG_ERROR(db_options_
.info_log
, "Failed to sync dir %s: %s",
1745 blob_dir_
.c_str(), s
.ToString().c_str());
1749 // put files back into obsolete if for some reason, delete failed
1750 if (!tobsolete
.empty()) {
1751 WriteLock
wl(&mutex_
);
1752 for (auto bfile
: tobsolete
) {
1753 obsolete_files_
.push_front(bfile
);
1757 return std::make_pair(!aborted
, -1);
1760 void BlobDBImpl::CopyBlobFiles(
1761 std::vector
<std::shared_ptr
<BlobFile
>>* bfiles_copy
) {
1762 ReadLock
rl(&mutex_
);
1763 for (auto const& p
: blob_files_
) {
1764 bfiles_copy
->push_back(p
.second
);
1768 std::pair
<bool, int64_t> BlobDBImpl::RunGC(bool aborted
) {
1770 return std::make_pair(false, -1);
1773 // TODO(yiwu): Garbage collection implementation.
1776 return std::make_pair(true, -1);
1779 Iterator
* BlobDBImpl::NewIterator(const ReadOptions
& read_options
) {
1781 reinterpret_cast<ColumnFamilyHandleImpl
*>(DefaultColumnFamily())->cfd();
1782 // Get a snapshot to avoid blob file get deleted between we
1783 // fetch and index entry and reading from the file.
1784 ManagedSnapshot
* own_snapshot
= nullptr;
1785 const Snapshot
* snapshot
= read_options
.snapshot
;
1786 if (snapshot
== nullptr) {
1787 own_snapshot
= new ManagedSnapshot(db_
);
1788 snapshot
= own_snapshot
->snapshot();
1790 auto* iter
= db_impl_
->NewIteratorImpl(
1791 read_options
, cfd
, snapshot
->GetSequenceNumber(),
1792 nullptr /*read_callback*/, true /*allow_blob*/);
1793 return new BlobDBIterator(own_snapshot
, iter
, this, env_
, statistics_
);
1796 Status
DestroyBlobDB(const std::string
& dbname
, const Options
& options
,
1797 const BlobDBOptions
& bdb_options
) {
1798 const ImmutableDBOptions
soptions(SanitizeOptions(dbname
, options
));
1799 Env
* env
= soptions
.env
;
1802 std::string blobdir
;
1803 blobdir
= (bdb_options
.path_relative
) ? dbname
+ "/" + bdb_options
.blob_dir
1804 : bdb_options
.blob_dir
;
1806 std::vector
<std::string
> filenames
;
1807 env
->GetChildren(blobdir
, &filenames
);
1809 for (const auto& f
: filenames
) {
1812 if (ParseFileName(f
, &number
, &type
) && type
== kBlobFile
) {
1813 Status del
= env
->DeleteFile(blobdir
+ "/" + f
);
1814 if (status
.ok() && !del
.ok()) {
1819 env
->DeleteDir(blobdir
);
1821 Status destroy
= DestroyDB(dbname
, options
);
1822 if (status
.ok() && !destroy
.ok()) {
1830 Status
BlobDBImpl::TEST_GetBlobValue(const Slice
& key
, const Slice
& index_entry
,
1831 PinnableSlice
* value
) {
1832 return GetBlobValue(key
, index_entry
, value
);
1835 std::vector
<std::shared_ptr
<BlobFile
>> BlobDBImpl::TEST_GetBlobFiles() const {
1836 ReadLock
l(&mutex_
);
1837 std::vector
<std::shared_ptr
<BlobFile
>> blob_files
;
1838 for (auto& p
: blob_files_
) {
1839 blob_files
.emplace_back(p
.second
);
1844 std::vector
<std::shared_ptr
<BlobFile
>> BlobDBImpl::TEST_GetObsoleteFiles()
1846 ReadLock
l(&mutex_
);
1847 std::vector
<std::shared_ptr
<BlobFile
>> obsolete_files
;
1848 for (auto& bfile
: obsolete_files_
) {
1849 obsolete_files
.emplace_back(bfile
);
1851 return obsolete_files
;
1854 void BlobDBImpl::TEST_DeleteObsoleteFiles() {
1855 DeleteObsoleteFiles(false /*abort*/);
1858 Status
BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr
<BlobFile
>& bfile
) {
1859 MutexLock
l(&write_mutex_
);
1860 return CloseBlobFile(bfile
);
1863 void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr
<BlobFile
>& blob_file
,
1864 SequenceNumber obsolete_seq
,
1866 return ObsoleteBlobFile(blob_file
, obsolete_seq
, update_size
);
1869 Status
BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr
<BlobFile
>& bfile
,
1870 GCStats
* gc_stats
) {
1871 return GCFileAndUpdateLSM(bfile
, gc_stats
);
1874 void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); }
1876 void BlobDBImpl::TEST_EvictExpiredFiles() {
1877 EvictExpiredFiles(false /*abort*/);
1880 uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_
.load(); }
1883 } // namespace blob_db
1884 } // namespace rocksdb
1885 #endif // ROCKSDB_LITE