]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/blob_db/blob_db_impl.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / blob_db / blob_db_impl.cc
1
2 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
3 // This source code is licensed under both the GPLv2 (found in the
4 // COPYING file in the root directory) and Apache 2.0 License
5 // (found in the LICENSE.Apache file in the root directory).
6 #ifndef ROCKSDB_LITE
7
8 #include "utilities/blob_db/blob_db_impl.h"
9 #include <algorithm>
10 #include <cinttypes>
11 #include <iomanip>
12 #include <memory>
13 #include <sstream>
14
15 #include "db/blob/blob_index.h"
16 #include "db/db_impl/db_impl.h"
17 #include "db/write_batch_internal.h"
18 #include "env/composite_env_wrapper.h"
19 #include "file/file_util.h"
20 #include "file/filename.h"
21 #include "file/random_access_file_reader.h"
22 #include "file/sst_file_manager_impl.h"
23 #include "file/writable_file_writer.h"
24 #include "logging/logging.h"
25 #include "monitoring/instrumented_mutex.h"
26 #include "monitoring/statistics.h"
27 #include "rocksdb/convenience.h"
28 #include "rocksdb/env.h"
29 #include "rocksdb/iterator.h"
30 #include "rocksdb/utilities/stackable_db.h"
31 #include "rocksdb/utilities/transaction.h"
32 #include "table/block_based/block.h"
33 #include "table/block_based/block_based_table_builder.h"
34 #include "table/block_based/block_builder.h"
35 #include "table/meta_blocks.h"
36 #include "test_util/sync_point.h"
37 #include "util/cast_util.h"
38 #include "util/crc32c.h"
39 #include "util/mutexlock.h"
40 #include "util/random.h"
41 #include "util/stop_watch.h"
42 #include "util/timer_queue.h"
43 #include "utilities/blob_db/blob_compaction_filter.h"
44 #include "utilities/blob_db/blob_db_iterator.h"
45 #include "utilities/blob_db/blob_db_listener.h"
46
47 namespace {
48 int kBlockBasedTableVersionFormat = 2;
49 } // end namespace
50
51 namespace ROCKSDB_NAMESPACE {
52 namespace blob_db {
53
54 bool BlobFileComparator::operator()(
55 const std::shared_ptr<BlobFile>& lhs,
56 const std::shared_ptr<BlobFile>& rhs) const {
57 return lhs->BlobFileNumber() > rhs->BlobFileNumber();
58 }
59
60 bool BlobFileComparatorTTL::operator()(
61 const std::shared_ptr<BlobFile>& lhs,
62 const std::shared_ptr<BlobFile>& rhs) const {
63 assert(lhs->HasTTL() && rhs->HasTTL());
64 if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
65 return true;
66 }
67 if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
68 return false;
69 }
70 return lhs->BlobFileNumber() < rhs->BlobFileNumber();
71 }
72
73 BlobDBImpl::BlobDBImpl(const std::string& dbname,
74 const BlobDBOptions& blob_db_options,
75 const DBOptions& db_options,
76 const ColumnFamilyOptions& cf_options)
77 : BlobDB(),
78 dbname_(dbname),
79 db_impl_(nullptr),
80 env_(db_options.env),
81 bdb_options_(blob_db_options),
82 db_options_(db_options),
83 cf_options_(cf_options),
84 env_options_(db_options),
85 statistics_(db_options_.statistics.get()),
86 next_file_number_(1),
87 flush_sequence_(0),
88 closed_(true),
89 open_file_count_(0),
90 total_blob_size_(0),
91 live_sst_size_(0),
92 fifo_eviction_seq_(0),
93 evict_expiration_up_to_(0),
94 debug_level_(0) {
95 blob_dir_ = (bdb_options_.path_relative)
96 ? dbname + "/" + bdb_options_.blob_dir
97 : bdb_options_.blob_dir;
98 env_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
99 }
100
101 BlobDBImpl::~BlobDBImpl() {
102 tqueue_.shutdown();
103 // CancelAllBackgroundWork(db_, true);
104 Status s __attribute__((__unused__)) = Close();
105 assert(s.ok());
106 }
107
108 Status BlobDBImpl::Close() {
109 if (closed_) {
110 return Status::OK();
111 }
112 closed_ = true;
113
114 // Close base DB before BlobDBImpl destructs to stop event listener and
115 // compaction filter call.
116 Status s = db_->Close();
117 // delete db_ anyway even if close failed.
118 delete db_;
119 // Reset pointers to avoid StackableDB delete the pointer again.
120 db_ = nullptr;
121 db_impl_ = nullptr;
122 if (!s.ok()) {
123 return s;
124 }
125
126 s = SyncBlobFiles();
127 return s;
128 }
129
130 BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
131
132 Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
133 assert(handles != nullptr);
134 assert(db_ == nullptr);
135
136 if (blob_dir_.empty()) {
137 return Status::NotSupported("No blob directory in options");
138 }
139
140 if (bdb_options_.garbage_collection_cutoff < 0.0 ||
141 bdb_options_.garbage_collection_cutoff > 1.0) {
142 return Status::InvalidArgument(
143 "Garbage collection cutoff must be in the interval [0.0, 1.0]");
144 }
145
146 // Temporarily disable compactions in the base DB during open; save the user
147 // defined value beforehand so we can restore it once BlobDB is initialized.
148 // Note: this is only needed if garbage collection is enabled.
149 const bool disable_auto_compactions = cf_options_.disable_auto_compactions;
150
151 if (bdb_options_.enable_garbage_collection) {
152 cf_options_.disable_auto_compactions = true;
153 }
154
155 Status s;
156
157 // Create info log.
158 if (db_options_.info_log == nullptr) {
159 s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
160 if (!s.ok()) {
161 return s;
162 }
163 }
164
165 ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
166
167 if ((cf_options_.compaction_filter != nullptr ||
168 cf_options_.compaction_filter_factory != nullptr)) {
169 ROCKS_LOG_INFO(db_options_.info_log,
170 "BlobDB only support compaction filter on non-TTL values.");
171 }
172
173 // Open blob directory.
174 s = env_->CreateDirIfMissing(blob_dir_);
175 if (!s.ok()) {
176 ROCKS_LOG_ERROR(db_options_.info_log,
177 "Failed to create blob_dir %s, status: %s",
178 blob_dir_.c_str(), s.ToString().c_str());
179 }
180 s = env_->NewDirectory(blob_dir_, &dir_ent_);
181 if (!s.ok()) {
182 ROCKS_LOG_ERROR(db_options_.info_log,
183 "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
184 s.ToString().c_str());
185 return s;
186 }
187
188 // Open blob files.
189 s = OpenAllBlobFiles();
190 if (!s.ok()) {
191 return s;
192 }
193
194 // Update options
195 if (bdb_options_.enable_garbage_collection) {
196 db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
197 cf_options_.compaction_filter_factory =
198 std::make_shared<BlobIndexCompactionFilterFactoryGC>(
199 this, env_, cf_options_, statistics_);
200 } else {
201 db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
202 cf_options_.compaction_filter_factory =
203 std::make_shared<BlobIndexCompactionFilterFactory>(
204 this, env_, cf_options_, statistics_);
205 }
206
207 // Reset user compaction filter after building into compaction factory.
208 cf_options_.compaction_filter = nullptr;
209
210 // Open base db.
211 ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
212 s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
213 if (!s.ok()) {
214 return s;
215 }
216 db_impl_ = static_cast_with_check<DBImpl>(db_->GetRootDB());
217
218 // Sanitize the blob_dir provided. Using a directory where the
219 // base DB stores its files for the default CF is not supported.
220 const ColumnFamilyData* const cfd =
221 static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
222 assert(cfd);
223
224 const ImmutableCFOptions* const ioptions = cfd->ioptions();
225 assert(ioptions);
226
227 assert(env_);
228
229 for (const auto& cf_path : ioptions->cf_paths) {
230 bool blob_dir_same_as_cf_dir = false;
231 s = env_->AreFilesSame(blob_dir_, cf_path.path, &blob_dir_same_as_cf_dir);
232 if (!s.ok()) {
233 ROCKS_LOG_ERROR(db_options_.info_log,
234 "Error while sanitizing blob_dir %s, status: %s",
235 blob_dir_.c_str(), s.ToString().c_str());
236 return s;
237 }
238
239 if (blob_dir_same_as_cf_dir) {
240 return Status::NotSupported(
241 "Using the base DB's storage directories for BlobDB files is not "
242 "supported.");
243 }
244 }
245
246 // Initialize SST file <-> oldest blob file mapping if garbage collection
247 // is enabled.
248 if (bdb_options_.enable_garbage_collection) {
249 std::vector<LiveFileMetaData> live_files;
250 db_->GetLiveFilesMetaData(&live_files);
251
252 InitializeBlobFileToSstMapping(live_files);
253
254 MarkUnreferencedBlobFilesObsoleteDuringOpen();
255
256 if (!disable_auto_compactions) {
257 s = db_->EnableAutoCompaction(*handles);
258 if (!s.ok()) {
259 ROCKS_LOG_ERROR(
260 db_options_.info_log,
261 "Failed to enable automatic compactions during open, status: %s",
262 s.ToString().c_str());
263 return s;
264 }
265 }
266 }
267
268 // Add trash files in blob dir to file delete scheduler.
269 SstFileManagerImpl* sfm = static_cast<SstFileManagerImpl*>(
270 db_impl_->immutable_db_options().sst_file_manager.get());
271 DeleteScheduler::CleanupDirectory(env_, sfm, blob_dir_);
272
273 UpdateLiveSSTSize();
274
275 // Start background jobs.
276 if (!bdb_options_.disable_background_tasks) {
277 StartBackgroundTasks();
278 }
279
280 ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
281 bdb_options_.Dump(db_options_.info_log.get());
282 closed_ = false;
283 return s;
284 }
285
286 void BlobDBImpl::StartBackgroundTasks() {
287 // store a call to a member function and object
288 tqueue_.add(
289 kReclaimOpenFilesPeriodMillisecs,
290 std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
291 tqueue_.add(
292 kDeleteObsoleteFilesPeriodMillisecs,
293 std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
294 tqueue_.add(kSanityCheckPeriodMillisecs,
295 std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
296 tqueue_.add(
297 kEvictExpiredFilesPeriodMillisecs,
298 std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1));
299 }
300
301 Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
302 assert(file_numbers != nullptr);
303 std::vector<std::string> all_files;
304 Status s = env_->GetChildren(blob_dir_, &all_files);
305 if (!s.ok()) {
306 ROCKS_LOG_ERROR(db_options_.info_log,
307 "Failed to get list of blob files, status: %s",
308 s.ToString().c_str());
309 return s;
310 }
311
312 for (const auto& file_name : all_files) {
313 uint64_t file_number;
314 FileType type;
315 bool success = ParseFileName(file_name, &file_number, &type);
316 if (success && type == kBlobFile) {
317 file_numbers->insert(file_number);
318 } else {
319 ROCKS_LOG_WARN(db_options_.info_log,
320 "Skipping file in blob directory: %s", file_name.c_str());
321 }
322 }
323
324 return s;
325 }
326
327 Status BlobDBImpl::OpenAllBlobFiles() {
328 std::set<uint64_t> file_numbers;
329 Status s = GetAllBlobFiles(&file_numbers);
330 if (!s.ok()) {
331 return s;
332 }
333
334 if (!file_numbers.empty()) {
335 next_file_number_.store(*file_numbers.rbegin() + 1);
336 }
337
338 std::ostringstream blob_file_oss;
339 std::ostringstream live_imm_oss;
340 std::ostringstream obsolete_file_oss;
341
342 for (auto& file_number : file_numbers) {
343 std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
344 this, blob_dir_, file_number, db_options_.info_log.get());
345 blob_file->MarkImmutable(/* sequence */ 0);
346
347 // Read file header and footer
348 Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_);
349 if (read_metadata_status.IsCorruption()) {
350 // Remove incomplete file.
351 if (!obsolete_files_.empty()) {
352 obsolete_file_oss << ", ";
353 }
354 obsolete_file_oss << file_number;
355
356 ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
357 continue;
358 } else if (!read_metadata_status.ok()) {
359 ROCKS_LOG_ERROR(db_options_.info_log,
360 "Unable to read metadata of blob file %" PRIu64
361 ", status: '%s'",
362 file_number, read_metadata_status.ToString().c_str());
363 return read_metadata_status;
364 }
365
366 total_blob_size_ += blob_file->GetFileSize();
367
368 if (!blob_files_.empty()) {
369 blob_file_oss << ", ";
370 }
371 blob_file_oss << file_number;
372
373 blob_files_[file_number] = blob_file;
374
375 if (!blob_file->HasTTL()) {
376 if (!live_imm_non_ttl_blob_files_.empty()) {
377 live_imm_oss << ", ";
378 }
379 live_imm_oss << file_number;
380
381 live_imm_non_ttl_blob_files_[file_number] = blob_file;
382 }
383 }
384
385 ROCKS_LOG_INFO(db_options_.info_log,
386 "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
387 blob_file_oss.str().c_str());
388 ROCKS_LOG_INFO(
389 db_options_.info_log, "Found %" ROCKSDB_PRIszt " non-TTL blob files: %s",
390 live_imm_non_ttl_blob_files_.size(), live_imm_oss.str().c_str());
391 ROCKS_LOG_INFO(db_options_.info_log,
392 "Found %" ROCKSDB_PRIszt
393 " incomplete or corrupted blob files: %s",
394 obsolete_files_.size(), obsolete_file_oss.str().c_str());
395 return s;
396 }
397
398 template <typename Linker>
399 void BlobDBImpl::LinkSstToBlobFileImpl(uint64_t sst_file_number,
400 uint64_t blob_file_number,
401 Linker linker) {
402 assert(bdb_options_.enable_garbage_collection);
403 assert(blob_file_number != kInvalidBlobFileNumber);
404
405 auto it = blob_files_.find(blob_file_number);
406 if (it == blob_files_.end()) {
407 ROCKS_LOG_WARN(db_options_.info_log,
408 "Blob file %" PRIu64
409 " not found while trying to link "
410 "SST file %" PRIu64,
411 blob_file_number, sst_file_number);
412 return;
413 }
414
415 BlobFile* const blob_file = it->second.get();
416 assert(blob_file);
417
418 linker(blob_file, sst_file_number);
419
420 ROCKS_LOG_INFO(db_options_.info_log,
421 "Blob file %" PRIu64 " linked to SST file %" PRIu64,
422 blob_file_number, sst_file_number);
423 }
424
425 void BlobDBImpl::LinkSstToBlobFile(uint64_t sst_file_number,
426 uint64_t blob_file_number) {
427 auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
428 WriteLock file_lock(&blob_file->mutex_);
429 blob_file->LinkSstFile(sst_file);
430 };
431
432 LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
433 }
434
435 void BlobDBImpl::LinkSstToBlobFileNoLock(uint64_t sst_file_number,
436 uint64_t blob_file_number) {
437 auto linker = [](BlobFile* blob_file, uint64_t sst_file) {
438 blob_file->LinkSstFile(sst_file);
439 };
440
441 LinkSstToBlobFileImpl(sst_file_number, blob_file_number, linker);
442 }
443
444 void BlobDBImpl::UnlinkSstFromBlobFile(uint64_t sst_file_number,
445 uint64_t blob_file_number) {
446 assert(bdb_options_.enable_garbage_collection);
447 assert(blob_file_number != kInvalidBlobFileNumber);
448
449 auto it = blob_files_.find(blob_file_number);
450 if (it == blob_files_.end()) {
451 ROCKS_LOG_WARN(db_options_.info_log,
452 "Blob file %" PRIu64
453 " not found while trying to unlink "
454 "SST file %" PRIu64,
455 blob_file_number, sst_file_number);
456 return;
457 }
458
459 BlobFile* const blob_file = it->second.get();
460 assert(blob_file);
461
462 {
463 WriteLock file_lock(&blob_file->mutex_);
464 blob_file->UnlinkSstFile(sst_file_number);
465 }
466
467 ROCKS_LOG_INFO(db_options_.info_log,
468 "Blob file %" PRIu64 " unlinked from SST file %" PRIu64,
469 blob_file_number, sst_file_number);
470 }
471
472 void BlobDBImpl::InitializeBlobFileToSstMapping(
473 const std::vector<LiveFileMetaData>& live_files) {
474 assert(bdb_options_.enable_garbage_collection);
475
476 for (const auto& live_file : live_files) {
477 const uint64_t sst_file_number = live_file.file_number;
478 const uint64_t blob_file_number = live_file.oldest_blob_file_number;
479
480 if (blob_file_number == kInvalidBlobFileNumber) {
481 continue;
482 }
483
484 LinkSstToBlobFileNoLock(sst_file_number, blob_file_number);
485 }
486 }
487
488 void BlobDBImpl::ProcessFlushJobInfo(const FlushJobInfo& info) {
489 assert(bdb_options_.enable_garbage_collection);
490
491 WriteLock lock(&mutex_);
492
493 if (info.oldest_blob_file_number != kInvalidBlobFileNumber) {
494 LinkSstToBlobFile(info.file_number, info.oldest_blob_file_number);
495 }
496
497 assert(flush_sequence_ < info.largest_seqno);
498 flush_sequence_ = info.largest_seqno;
499
500 MarkUnreferencedBlobFilesObsolete();
501 }
502
503 void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) {
504 assert(bdb_options_.enable_garbage_collection);
505
506 if (!info.status.ok()) {
507 return;
508 }
509
510 // Note: the same SST file may appear in both the input and the output
511 // file list in case of a trivial move. We walk through the two lists
512 // below in a fashion that's similar to merge sort to detect this.
513
514 auto cmp = [](const CompactionFileInfo& lhs, const CompactionFileInfo& rhs) {
515 return lhs.file_number < rhs.file_number;
516 };
517
518 auto inputs = info.input_file_infos;
519 auto iit = inputs.begin();
520 const auto iit_end = inputs.end();
521
522 std::sort(iit, iit_end, cmp);
523
524 auto outputs = info.output_file_infos;
525 auto oit = outputs.begin();
526 const auto oit_end = outputs.end();
527
528 std::sort(oit, oit_end, cmp);
529
530 WriteLock lock(&mutex_);
531
532 while (iit != iit_end && oit != oit_end) {
533 const auto& input = *iit;
534 const auto& output = *oit;
535
536 if (input.file_number == output.file_number) {
537 ++iit;
538 ++oit;
539 } else if (input.file_number < output.file_number) {
540 if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
541 UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
542 }
543
544 ++iit;
545 } else {
546 assert(output.file_number < input.file_number);
547
548 if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
549 LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
550 }
551
552 ++oit;
553 }
554 }
555
556 while (iit != iit_end) {
557 const auto& input = *iit;
558
559 if (input.oldest_blob_file_number != kInvalidBlobFileNumber) {
560 UnlinkSstFromBlobFile(input.file_number, input.oldest_blob_file_number);
561 }
562
563 ++iit;
564 }
565
566 while (oit != oit_end) {
567 const auto& output = *oit;
568
569 if (output.oldest_blob_file_number != kInvalidBlobFileNumber) {
570 LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
571 }
572
573 ++oit;
574 }
575
576 MarkUnreferencedBlobFilesObsolete();
577 }
578
579 bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded(
580 const std::shared_ptr<BlobFile>& blob_file, SequenceNumber obsolete_seq) {
581 assert(blob_file);
582 assert(!blob_file->HasTTL());
583 assert(blob_file->Immutable());
584 assert(bdb_options_.enable_garbage_collection);
585
586 // Note: FIFO eviction could have marked this file obsolete already.
587 if (blob_file->Obsolete()) {
588 return true;
589 }
590
591 // We cannot mark this file (or any higher-numbered files for that matter)
592 // obsolete if it is referenced by any memtables or SSTs. We keep track of
593 // the SSTs explicitly. To account for memtables, we keep track of the highest
594 // sequence number received in flush notifications, and we do not mark the
595 // blob file obsolete if there are still unflushed memtables from before
596 // the time the blob file was closed.
597 if (blob_file->GetImmutableSequence() > flush_sequence_ ||
598 !blob_file->GetLinkedSstFiles().empty()) {
599 return false;
600 }
601
602 ROCKS_LOG_INFO(db_options_.info_log,
603 "Blob file %" PRIu64 " is no longer needed, marking obsolete",
604 blob_file->BlobFileNumber());
605
606 ObsoleteBlobFile(blob_file, obsolete_seq, /* update_size */ true);
607 return true;
608 }
609
610 template <class Functor>
611 void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) {
612 assert(bdb_options_.enable_garbage_collection);
613
614 // Iterate through all live immutable non-TTL blob files, and mark them
615 // obsolete assuming no SST files or memtables rely on the blobs in them.
616 // Note: we need to stop as soon as we find a blob file that has any
617 // linked SSTs (or one potentially referenced by memtables).
618
619 uint64_t obsoleted_files = 0;
620
621 auto it = live_imm_non_ttl_blob_files_.begin();
622 while (it != live_imm_non_ttl_blob_files_.end()) {
623 const auto& blob_file = it->second;
624 assert(blob_file);
625 assert(blob_file->BlobFileNumber() == it->first);
626 assert(!blob_file->HasTTL());
627 assert(blob_file->Immutable());
628
629 // Small optimization: Obsolete() does an atomic read, so we can do
630 // this check without taking a lock on the blob file's mutex.
631 if (blob_file->Obsolete()) {
632 it = live_imm_non_ttl_blob_files_.erase(it);
633 continue;
634 }
635
636 if (!mark_if_needed(blob_file)) {
637 break;
638 }
639
640 it = live_imm_non_ttl_blob_files_.erase(it);
641
642 ++obsoleted_files;
643 }
644
645 if (obsoleted_files > 0) {
646 ROCKS_LOG_INFO(db_options_.info_log,
647 "%" PRIu64 " blob file(s) marked obsolete by GC",
648 obsoleted_files);
649 RecordTick(statistics_, BLOB_DB_GC_NUM_FILES, obsoleted_files);
650 }
651 }
652
653 void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() {
654 const SequenceNumber obsolete_seq = GetLatestSequenceNumber();
655
656 MarkUnreferencedBlobFilesObsoleteImpl(
657 [this, obsolete_seq](const std::shared_ptr<BlobFile>& blob_file) {
658 WriteLock file_lock(&blob_file->mutex_);
659 return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq);
660 });
661 }
662
663 void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() {
664 MarkUnreferencedBlobFilesObsoleteImpl(
665 [this](const std::shared_ptr<BlobFile>& blob_file) {
666 return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0);
667 });
668 }
669
670 void BlobDBImpl::CloseRandomAccessLocked(
671 const std::shared_ptr<BlobFile>& bfile) {
672 bfile->CloseRandomAccessLocked();
673 open_file_count_--;
674 }
675
676 Status BlobDBImpl::GetBlobFileReader(
677 const std::shared_ptr<BlobFile>& blob_file,
678 std::shared_ptr<RandomAccessFileReader>* reader) {
679 assert(reader != nullptr);
680 bool fresh_open = false;
681 Status s = blob_file->GetReader(env_, env_options_, reader, &fresh_open);
682 if (s.ok() && fresh_open) {
683 assert(*reader != nullptr);
684 open_file_count_++;
685 }
686 return s;
687 }
688
689 std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(
690 bool has_ttl, const ExpirationRange& expiration_range,
691 const std::string& reason) {
692 assert(has_ttl == (expiration_range.first || expiration_range.second));
693
694 uint64_t file_num = next_file_number_++;
695
696 const uint32_t column_family_id =
697 static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
698 auto blob_file = std::make_shared<BlobFile>(
699 this, blob_dir_, file_num, db_options_.info_log.get(), column_family_id,
700 bdb_options_.compression, has_ttl, expiration_range);
701
702 ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
703 blob_file->PathName().c_str(), reason.c_str());
704 LogFlush(db_options_.info_log);
705
706 return blob_file;
707 }
708
709 void BlobDBImpl::RegisterBlobFile(std::shared_ptr<BlobFile> blob_file) {
710 const uint64_t blob_file_number = blob_file->BlobFileNumber();
711
712 auto it = blob_files_.lower_bound(blob_file_number);
713 assert(it == blob_files_.end() || it->first != blob_file_number);
714
715 blob_files_.insert(it,
716 std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
717 blob_file_number, std::move(blob_file)));
718 }
719
720 Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
721 std::string fpath(bfile->PathName());
722 std::unique_ptr<WritableFile> wfile;
723
724 Status s = env_->ReopenWritableFile(fpath, &wfile, env_options_);
725 if (!s.ok()) {
726 ROCKS_LOG_ERROR(db_options_.info_log,
727 "Failed to open blob file for write: %s status: '%s'"
728 " exists: '%s'",
729 fpath.c_str(), s.ToString().c_str(),
730 env_->FileExists(fpath).ToString().c_str());
731 return s;
732 }
733
734 std::unique_ptr<WritableFileWriter> fwriter;
735 fwriter.reset(new WritableFileWriter(
736 NewLegacyWritableFileWrapper(std::move(wfile)), fpath, env_options_));
737
738 uint64_t boffset = bfile->GetFileSize();
739 if (debug_level_ >= 2 && boffset) {
740 ROCKS_LOG_DEBUG(db_options_.info_log,
741 "Open blob file: %s with offset: %" PRIu64, fpath.c_str(),
742 boffset);
743 }
744
745 BlobLogWriter::ElemType et = BlobLogWriter::kEtNone;
746 if (bfile->file_size_ == BlobLogHeader::kSize) {
747 et = BlobLogWriter::kEtFileHdr;
748 } else if (bfile->file_size_ > BlobLogHeader::kSize) {
749 et = BlobLogWriter::kEtRecord;
750 } else if (bfile->file_size_) {
751 ROCKS_LOG_WARN(db_options_.info_log,
752 "Open blob file: %s with wrong size: %" PRIu64,
753 fpath.c_str(), boffset);
754 return Status::Corruption("Invalid blob file size");
755 }
756
757 bfile->log_writer_ = std::make_shared<BlobLogWriter>(
758 std::move(fwriter), env_, statistics_, bfile->file_number_,
759 db_options_.use_fsync, boffset);
760 bfile->log_writer_->last_elem_type_ = et;
761
762 return s;
763 }
764
765 std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
766 uint64_t expiration) const {
767 if (open_ttl_files_.empty()) {
768 return nullptr;
769 }
770
771 std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
772 tmp->SetHasTTL(true);
773 tmp->expiration_range_ = std::make_pair(expiration, 0);
774 tmp->file_number_ = std::numeric_limits<uint64_t>::max();
775
776 auto citr = open_ttl_files_.equal_range(tmp);
777 if (citr.first == open_ttl_files_.end()) {
778 assert(citr.second == open_ttl_files_.end());
779
780 std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
781 return (check->expiration_range_.second <= expiration) ? nullptr : check;
782 }
783
784 if (citr.first != citr.second) {
785 return *(citr.first);
786 }
787
788 auto finditr = citr.second;
789 if (finditr != open_ttl_files_.begin()) {
790 --finditr;
791 }
792
793 bool b2 = (*finditr)->expiration_range_.second <= expiration;
794 bool b1 = (*finditr)->expiration_range_.first > expiration;
795
796 return (b1 || b2) ? nullptr : (*finditr);
797 }
798
799 Status BlobDBImpl::CheckOrCreateWriterLocked(
800 const std::shared_ptr<BlobFile>& blob_file,
801 std::shared_ptr<BlobLogWriter>* writer) {
802 assert(writer != nullptr);
803 *writer = blob_file->GetWriter();
804 if (*writer != nullptr) {
805 return Status::OK();
806 }
807 Status s = CreateWriterLocked(blob_file);
808 if (s.ok()) {
809 *writer = blob_file->GetWriter();
810 }
811 return s;
812 }
813
814 Status BlobDBImpl::CreateBlobFileAndWriter(
815 bool has_ttl, const ExpirationRange& expiration_range,
816 const std::string& reason, std::shared_ptr<BlobFile>* blob_file,
817 std::shared_ptr<BlobLogWriter>* writer) {
818 TEST_SYNC_POINT("BlobDBImpl::CreateBlobFileAndWriter");
819 assert(has_ttl == (expiration_range.first || expiration_range.second));
820 assert(blob_file);
821 assert(writer);
822
823 *blob_file = NewBlobFile(has_ttl, expiration_range, reason);
824 assert(*blob_file);
825
826 // file not visible, hence no lock
827 Status s = CheckOrCreateWriterLocked(*blob_file, writer);
828 if (!s.ok()) {
829 ROCKS_LOG_ERROR(db_options_.info_log,
830 "Failed to get writer for blob file: %s, error: %s",
831 (*blob_file)->PathName().c_str(), s.ToString().c_str());
832 return s;
833 }
834
835 assert(*writer);
836
837 s = (*writer)->WriteHeader((*blob_file)->header_);
838 if (!s.ok()) {
839 ROCKS_LOG_ERROR(db_options_.info_log,
840 "Failed to write header to new blob file: %s"
841 " status: '%s'",
842 (*blob_file)->PathName().c_str(), s.ToString().c_str());
843 return s;
844 }
845
846 (*blob_file)->SetFileSize(BlobLogHeader::kSize);
847 total_blob_size_ += BlobLogHeader::kSize;
848
849 return s;
850 }
851
852 Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
853 assert(blob_file);
854
855 {
856 ReadLock rl(&mutex_);
857
858 if (open_non_ttl_file_) {
859 assert(!open_non_ttl_file_->Immutable());
860 *blob_file = open_non_ttl_file_;
861 return Status::OK();
862 }
863 }
864
865 // Check again
866 WriteLock wl(&mutex_);
867
868 if (open_non_ttl_file_) {
869 assert(!open_non_ttl_file_->Immutable());
870 *blob_file = open_non_ttl_file_;
871 return Status::OK();
872 }
873
874 std::shared_ptr<BlobLogWriter> writer;
875 const Status s = CreateBlobFileAndWriter(
876 /* has_ttl */ false, ExpirationRange(),
877 /* reason */ "SelectBlobFile", blob_file, &writer);
878 if (!s.ok()) {
879 return s;
880 }
881
882 RegisterBlobFile(*blob_file);
883 open_non_ttl_file_ = *blob_file;
884
885 return s;
886 }
887
888 Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
889 std::shared_ptr<BlobFile>* blob_file) {
890 assert(blob_file);
891 assert(expiration != kNoExpiration);
892
893 {
894 ReadLock rl(&mutex_);
895
896 *blob_file = FindBlobFileLocked(expiration);
897 if (*blob_file != nullptr) {
898 assert(!(*blob_file)->Immutable());
899 return Status::OK();
900 }
901 }
902
903 // Check again
904 WriteLock wl(&mutex_);
905
906 *blob_file = FindBlobFileLocked(expiration);
907 if (*blob_file != nullptr) {
908 assert(!(*blob_file)->Immutable());
909 return Status::OK();
910 }
911
912 const uint64_t exp_low =
913 (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
914 const uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
915 const ExpirationRange expiration_range(exp_low, exp_high);
916
917 std::ostringstream oss;
918 oss << "SelectBlobFileTTL range: [" << exp_low << ',' << exp_high << ')';
919
920 std::shared_ptr<BlobLogWriter> writer;
921 const Status s =
922 CreateBlobFileAndWriter(/* has_ttl */ true, expiration_range,
923 /* reason */ oss.str(), blob_file, &writer);
924 if (!s.ok()) {
925 return s;
926 }
927
928 RegisterBlobFile(*blob_file);
929 open_ttl_files_.insert(*blob_file);
930
931 return s;
932 }
933
934 class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
935 private:
936 const WriteOptions& options_;
937 BlobDBImpl* blob_db_impl_;
938 uint32_t default_cf_id_;
939 WriteBatch batch_;
940
941 public:
942 BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
943 uint32_t default_cf_id)
944 : options_(options),
945 blob_db_impl_(blob_db_impl),
946 default_cf_id_(default_cf_id) {}
947
948 WriteBatch* batch() { return &batch_; }
949
950 Status PutCF(uint32_t column_family_id, const Slice& key,
951 const Slice& value) override {
952 if (column_family_id != default_cf_id_) {
953 return Status::NotSupported(
954 "Blob DB doesn't support non-default column family.");
955 }
956 Status s = blob_db_impl_->PutBlobValue(options_, key, value, kNoExpiration,
957 &batch_);
958 return s;
959 }
960
961 Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
962 if (column_family_id != default_cf_id_) {
963 return Status::NotSupported(
964 "Blob DB doesn't support non-default column family.");
965 }
966 Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
967 return s;
968 }
969
970 virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
971 const Slice& end_key) {
972 if (column_family_id != default_cf_id_) {
973 return Status::NotSupported(
974 "Blob DB doesn't support non-default column family.");
975 }
976 Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
977 begin_key, end_key);
978 return s;
979 }
980
981 Status SingleDeleteCF(uint32_t /*column_family_id*/,
982 const Slice& /*key*/) override {
983 return Status::NotSupported("Not supported operation in blob db.");
984 }
985
986 Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
987 const Slice& /*value*/) override {
988 return Status::NotSupported("Not supported operation in blob db.");
989 }
990
991 void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
992 };
993
994 Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
995 StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
996 RecordTick(statistics_, BLOB_DB_NUM_WRITE);
997 uint32_t default_cf_id =
998 static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
999 ->GetID();
1000 Status s;
1001 BlobInserter blob_inserter(options, this, default_cf_id);
1002 {
1003 // Release write_mutex_ before DB write to avoid race condition with
1004 // flush begin listener, which also require write_mutex_ to sync
1005 // blob files.
1006 MutexLock l(&write_mutex_);
1007 s = updates->Iterate(&blob_inserter);
1008 }
1009 if (!s.ok()) {
1010 return s;
1011 }
1012 return db_->Write(options, blob_inserter.batch());
1013 }
1014
1015 Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
1016 const Slice& value) {
1017 return PutUntil(options, key, value, kNoExpiration);
1018 }
1019
1020 Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
1021 const Slice& key, const Slice& value,
1022 uint64_t ttl) {
1023 uint64_t now = EpochNow();
1024 uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
1025 return PutUntil(options, key, value, expiration);
1026 }
1027
1028 Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
1029 const Slice& value, uint64_t expiration) {
1030 StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
1031 RecordTick(statistics_, BLOB_DB_NUM_PUT);
1032 Status s;
1033 WriteBatch batch;
1034 {
1035 // Release write_mutex_ before DB write to avoid race condition with
1036 // flush begin listener, which also require write_mutex_ to sync
1037 // blob files.
1038 MutexLock l(&write_mutex_);
1039 s = PutBlobValue(options, key, value, expiration, &batch);
1040 }
1041 if (s.ok()) {
1042 s = db_->Write(options, &batch);
1043 }
1044 return s;
1045 }
1046
1047 Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
1048 const Slice& key, const Slice& value,
1049 uint64_t expiration, WriteBatch* batch) {
1050 write_mutex_.AssertHeld();
1051 Status s;
1052 std::string index_entry;
1053 uint32_t column_family_id =
1054 static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
1055 ->GetID();
1056 if (value.size() < bdb_options_.min_blob_size) {
1057 if (expiration == kNoExpiration) {
1058 // Put as normal value
1059 s = batch->Put(key, value);
1060 RecordTick(statistics_, BLOB_DB_WRITE_INLINED);
1061 } else {
1062 // Inlined with TTL
1063 BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
1064 s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
1065 index_entry);
1066 RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
1067 }
1068 } else {
1069 std::string compression_output;
1070 Slice value_compressed = GetCompressedSlice(value, &compression_output);
1071
1072 std::string headerbuf;
1073 BlobLogWriter::ConstructBlobHeader(&headerbuf, key, value_compressed,
1074 expiration);
1075
1076 // Check DB size limit before selecting blob file to
1077 // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
1078 // done before calling SelectBlobFile().
1079 s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() +
1080 value_compressed.size());
1081 if (!s.ok()) {
1082 return s;
1083 }
1084
1085 std::shared_ptr<BlobFile> blob_file;
1086 if (expiration != kNoExpiration) {
1087 s = SelectBlobFileTTL(expiration, &blob_file);
1088 } else {
1089 s = SelectBlobFile(&blob_file);
1090 }
1091 if (s.ok()) {
1092 assert(blob_file != nullptr);
1093 assert(blob_file->GetCompressionType() == bdb_options_.compression);
1094 s = AppendBlob(blob_file, headerbuf, key, value_compressed, expiration,
1095 &index_entry);
1096 }
1097 if (s.ok()) {
1098 if (expiration != kNoExpiration) {
1099 WriteLock file_lock(&blob_file->mutex_);
1100 blob_file->ExtendExpirationRange(expiration);
1101 }
1102 s = CloseBlobFileIfNeeded(blob_file);
1103 }
1104 if (s.ok()) {
1105 s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
1106 index_entry);
1107 }
1108 if (s.ok()) {
1109 if (expiration == kNoExpiration) {
1110 RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
1111 } else {
1112 RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
1113 }
1114 } else {
1115 ROCKS_LOG_ERROR(
1116 db_options_.info_log,
1117 "Failed to append blob to FILE: %s: KEY: %s VALSZ: %" ROCKSDB_PRIszt
1118 " status: '%s' blob_file: '%s'",
1119 blob_file->PathName().c_str(), key.ToString().c_str(), value.size(),
1120 s.ToString().c_str(), blob_file->DumpState().c_str());
1121 }
1122 }
1123
1124 RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN);
1125 RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size());
1126 RecordInHistogram(statistics_, BLOB_DB_KEY_SIZE, key.size());
1127 RecordInHistogram(statistics_, BLOB_DB_VALUE_SIZE, value.size());
1128
1129 return s;
1130 }
1131
1132 Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
1133 std::string* compression_output) const {
1134 if (bdb_options_.compression == kNoCompression) {
1135 return raw;
1136 }
1137 StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS);
1138 CompressionType type = bdb_options_.compression;
1139 CompressionOptions opts;
1140 CompressionContext context(type);
1141 CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type,
1142 0 /* sample_for_compression */);
1143 CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false,
1144 compression_output, nullptr, nullptr);
1145 return *compression_output;
1146 }
1147
1148 Status BlobDBImpl::DecompressSlice(const Slice& compressed_value,
1149 CompressionType compression_type,
1150 PinnableSlice* value_output) const {
1151 assert(compression_type != kNoCompression);
1152
1153 BlockContents contents;
1154 auto cfh = static_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
1155
1156 {
1157 StopWatch decompression_sw(env_, statistics_, BLOB_DB_DECOMPRESSION_MICROS);
1158 UncompressionContext context(compression_type);
1159 UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
1160 compression_type);
1161 Status s = UncompressBlockContentsForCompressionType(
1162 info, compressed_value.data(), compressed_value.size(), &contents,
1163 kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
1164 if (!s.ok()) {
1165 return Status::Corruption("Unable to decompress blob.");
1166 }
1167 }
1168
1169 value_output->PinSelf(contents.data);
1170
1171 return Status::OK();
1172 }
1173
1174 Status BlobDBImpl::CompactFiles(
1175 const CompactionOptions& compact_options,
1176 const std::vector<std::string>& input_file_names, const int output_level,
1177 const int output_path_id, std::vector<std::string>* const output_file_names,
1178 CompactionJobInfo* compaction_job_info) {
1179 // Note: we need CompactionJobInfo to be able to track updates to the
1180 // blob file <-> SST mappings, so we provide one if the user hasn't,
1181 // assuming that GC is enabled.
1182 CompactionJobInfo info{};
1183 if (bdb_options_.enable_garbage_collection && !compaction_job_info) {
1184 compaction_job_info = &info;
1185 }
1186
1187 const Status s =
1188 db_->CompactFiles(compact_options, input_file_names, output_level,
1189 output_path_id, output_file_names, compaction_job_info);
1190 if (!s.ok()) {
1191 return s;
1192 }
1193
1194 if (bdb_options_.enable_garbage_collection) {
1195 assert(compaction_job_info);
1196 ProcessCompactionJobInfo(*compaction_job_info);
1197 }
1198
1199 return s;
1200 }
1201
1202 void BlobDBImpl::GetCompactionContextCommon(BlobCompactionContext* context) {
1203 assert(context);
1204
1205 context->blob_db_impl = this;
1206 context->next_file_number = next_file_number_.load();
1207 context->current_blob_files.clear();
1208 for (auto& p : blob_files_) {
1209 context->current_blob_files.insert(p.first);
1210 }
1211 context->fifo_eviction_seq = fifo_eviction_seq_;
1212 context->evict_expiration_up_to = evict_expiration_up_to_;
1213 }
1214
1215 void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
1216 assert(context);
1217
1218 ReadLock l(&mutex_);
1219 GetCompactionContextCommon(context);
1220 }
1221
1222 void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context,
1223 BlobCompactionContextGC* context_gc) {
1224 assert(context);
1225 assert(context_gc);
1226
1227 ReadLock l(&mutex_);
1228 GetCompactionContextCommon(context);
1229
1230 if (!live_imm_non_ttl_blob_files_.empty()) {
1231 auto it = live_imm_non_ttl_blob_files_.begin();
1232 std::advance(it, bdb_options_.garbage_collection_cutoff *
1233 live_imm_non_ttl_blob_files_.size());
1234 context_gc->cutoff_file_number = it != live_imm_non_ttl_blob_files_.end()
1235 ? it->first
1236 : std::numeric_limits<uint64_t>::max();
1237 }
1238 }
1239
1240 void BlobDBImpl::UpdateLiveSSTSize() {
1241 uint64_t live_sst_size = 0;
1242 bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size);
1243 if (ok) {
1244 live_sst_size_.store(live_sst_size);
1245 ROCKS_LOG_INFO(db_options_.info_log,
1246 "Updated total SST file size: %" PRIu64 " bytes.",
1247 live_sst_size);
1248 } else {
1249 ROCKS_LOG_ERROR(
1250 db_options_.info_log,
1251 "Failed to update total SST file size after flush or compaction.");
1252 }
1253 {
1254 // Trigger FIFO eviction if needed.
1255 MutexLock l(&write_mutex_);
1256 Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/);
1257 if (s.IsNoSpace()) {
1258 ROCKS_LOG_WARN(db_options_.info_log,
1259 "DB grow out-of-space after SST size updated. Current live"
1260 " SST size: %" PRIu64
1261 " , current blob files size: %" PRIu64 ".",
1262 live_sst_size_.load(), total_blob_size_.load());
1263 }
1264 }
1265 }
1266
1267 Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
1268 bool force_evict) {
1269 write_mutex_.AssertHeld();
1270
1271 uint64_t live_sst_size = live_sst_size_.load();
1272 if (bdb_options_.max_db_size == 0 ||
1273 live_sst_size + total_blob_size_.load() + blob_size <=
1274 bdb_options_.max_db_size) {
1275 return Status::OK();
1276 }
1277
1278 if (bdb_options_.is_fifo == false ||
1279 (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) {
1280 // FIFO eviction is disabled, or no space to insert new blob even we evict
1281 // all blob files.
1282 return Status::NoSpace(
1283 "Write failed, as writing it would exceed max_db_size limit.");
1284 }
1285
1286 std::vector<std::shared_ptr<BlobFile>> candidate_files;
1287 CopyBlobFiles(&candidate_files);
1288 std::sort(candidate_files.begin(), candidate_files.end(),
1289 BlobFileComparator());
1290 fifo_eviction_seq_ = GetLatestSequenceNumber();
1291
1292 WriteLock l(&mutex_);
1293
1294 while (!candidate_files.empty() &&
1295 live_sst_size + total_blob_size_.load() + blob_size >
1296 bdb_options_.max_db_size) {
1297 std::shared_ptr<BlobFile> blob_file = candidate_files.back();
1298 candidate_files.pop_back();
1299 WriteLock file_lock(&blob_file->mutex_);
1300 if (blob_file->Obsolete()) {
1301 // File already obsoleted by someone else.
1302 assert(blob_file->Immutable());
1303 continue;
1304 }
1305 // FIFO eviction can evict open blob files.
1306 if (!blob_file->Immutable()) {
1307 Status s = CloseBlobFile(blob_file);
1308 if (!s.ok()) {
1309 return s;
1310 }
1311 }
1312 assert(blob_file->Immutable());
1313 auto expiration_range = blob_file->GetExpirationRange();
1314 ROCKS_LOG_INFO(db_options_.info_log,
1315 "Evict oldest blob file since DB out of space. Current "
1316 "live SST file size: %" PRIu64 ", total blob size: %" PRIu64
1317 ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64
1318 ".",
1319 live_sst_size, total_blob_size_.load(),
1320 bdb_options_.max_db_size, blob_file->BlobFileNumber());
1321 ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/);
1322 evict_expiration_up_to_ = expiration_range.first;
1323 RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
1324 RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
1325 blob_file->BlobCount());
1326 RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
1327 blob_file->GetFileSize());
1328 TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
1329 }
1330 if (live_sst_size + total_blob_size_.load() + blob_size >
1331 bdb_options_.max_db_size) {
1332 return Status::NoSpace(
1333 "Write failed, as writing it would exceed max_db_size limit.");
1334 }
1335 return Status::OK();
1336 }
1337
1338 Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
1339 const std::string& headerbuf, const Slice& key,
1340 const Slice& value, uint64_t expiration,
1341 std::string* index_entry) {
1342 Status s;
1343 uint64_t blob_offset = 0;
1344 uint64_t key_offset = 0;
1345 {
1346 WriteLock lockbfile_w(&bfile->mutex_);
1347 std::shared_ptr<BlobLogWriter> writer;
1348 s = CheckOrCreateWriterLocked(bfile, &writer);
1349 if (!s.ok()) {
1350 return s;
1351 }
1352
1353 // write the blob to the blob log.
1354 s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset,
1355 &blob_offset);
1356 }
1357
1358 if (!s.ok()) {
1359 ROCKS_LOG_ERROR(db_options_.info_log,
1360 "Invalid status in AppendBlob: %s status: '%s'",
1361 bfile->PathName().c_str(), s.ToString().c_str());
1362 return s;
1363 }
1364
1365 uint64_t size_put = headerbuf.size() + key.size() + value.size();
1366 bfile->BlobRecordAdded(size_put);
1367 total_blob_size_ += size_put;
1368
1369 if (expiration == kNoExpiration) {
1370 BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
1371 value.size(), bdb_options_.compression);
1372 } else {
1373 BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
1374 blob_offset, value.size(),
1375 bdb_options_.compression);
1376 }
1377
1378 return s;
1379 }
1380
1381 std::vector<Status> BlobDBImpl::MultiGet(
1382 const ReadOptions& read_options,
1383 const std::vector<Slice>& keys, std::vector<std::string>* values) {
1384 StopWatch multiget_sw(env_, statistics_, BLOB_DB_MULTIGET_MICROS);
1385 RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
1386 // Get a snapshot to avoid blob file get deleted between we
1387 // fetch and index entry and reading from the file.
1388 ReadOptions ro(read_options);
1389 bool snapshot_created = SetSnapshotIfNeeded(&ro);
1390
1391 std::vector<Status> statuses;
1392 statuses.reserve(keys.size());
1393 values->clear();
1394 values->reserve(keys.size());
1395 PinnableSlice value;
1396 for (size_t i = 0; i < keys.size(); i++) {
1397 statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value));
1398 values->push_back(value.ToString());
1399 value.Reset();
1400 }
1401 if (snapshot_created) {
1402 db_->ReleaseSnapshot(ro.snapshot);
1403 }
1404 return statuses;
1405 }
1406
1407 bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
1408 assert(read_options != nullptr);
1409 if (read_options->snapshot != nullptr) {
1410 return false;
1411 }
1412 read_options->snapshot = db_->GetSnapshot();
1413 return true;
1414 }
1415
1416 Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
1417 PinnableSlice* value, uint64_t* expiration) {
1418 assert(value);
1419
1420 BlobIndex blob_index;
1421 Status s = blob_index.DecodeFrom(index_entry);
1422 if (!s.ok()) {
1423 return s;
1424 }
1425
1426 if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
1427 return Status::NotFound("Key expired");
1428 }
1429
1430 if (expiration != nullptr) {
1431 if (blob_index.HasTTL()) {
1432 *expiration = blob_index.expiration();
1433 } else {
1434 *expiration = kNoExpiration;
1435 }
1436 }
1437
1438 if (blob_index.IsInlined()) {
1439 // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
1440 // memory buffer to avoid extra copy.
1441 value->PinSelf(blob_index.value());
1442 return Status::OK();
1443 }
1444
1445 CompressionType compression_type = kNoCompression;
1446 s = GetRawBlobFromFile(key, blob_index.file_number(), blob_index.offset(),
1447 blob_index.size(), value, &compression_type);
1448 if (!s.ok()) {
1449 return s;
1450 }
1451
1452 if (compression_type != kNoCompression) {
1453 s = DecompressSlice(*value, compression_type, value);
1454 if (!s.ok()) {
1455 if (debug_level_ >= 2) {
1456 ROCKS_LOG_ERROR(
1457 db_options_.info_log,
1458 "Uncompression error during blob read from file: %" PRIu64
1459 " blob_offset: %" PRIu64 " blob_size: %" PRIu64
1460 " key: %s status: '%s'",
1461 blob_index.file_number(), blob_index.offset(), blob_index.size(),
1462 key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
1463 }
1464 return s;
1465 }
1466 }
1467
1468 return Status::OK();
1469 }
1470
1471 Status BlobDBImpl::GetRawBlobFromFile(const Slice& key, uint64_t file_number,
1472 uint64_t offset, uint64_t size,
1473 PinnableSlice* value,
1474 CompressionType* compression_type) {
1475 assert(value);
1476 assert(compression_type);
1477 assert(*compression_type == kNoCompression);
1478
1479 if (!size) {
1480 value->PinSelf("");
1481 return Status::OK();
1482 }
1483
1484 // offset has to have certain min, as we will read CRC
1485 // later from the Blob Header, which needs to be also a
1486 // valid offset.
1487 if (offset <
1488 (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
1489 if (debug_level_ >= 2) {
1490 ROCKS_LOG_ERROR(db_options_.info_log,
1491 "Invalid blob index file_number: %" PRIu64
1492 " blob_offset: %" PRIu64 " blob_size: %" PRIu64
1493 " key: %s",
1494 file_number, offset, size,
1495 key.ToString(/* output_hex */ true).c_str());
1496 }
1497
1498 return Status::NotFound("Invalid blob offset");
1499 }
1500
1501 std::shared_ptr<BlobFile> blob_file;
1502
1503 {
1504 ReadLock rl(&mutex_);
1505 auto it = blob_files_.find(file_number);
1506
1507 // file was deleted
1508 if (it == blob_files_.end()) {
1509 return Status::NotFound("Blob Not Found as blob file missing");
1510 }
1511
1512 blob_file = it->second;
1513 }
1514
1515 *compression_type = blob_file->GetCompressionType();
1516
1517 // takes locks when called
1518 std::shared_ptr<RandomAccessFileReader> reader;
1519 Status s = GetBlobFileReader(blob_file, &reader);
1520 if (!s.ok()) {
1521 return s;
1522 }
1523
1524 assert(offset >= key.size() + sizeof(uint32_t));
1525 const uint64_t record_offset = offset - key.size() - sizeof(uint32_t);
1526 const uint64_t record_size = sizeof(uint32_t) + key.size() + size;
1527
1528 // Allocate the buffer. This is safe in C++11
1529 std::string buf;
1530 AlignedBuf aligned_buf;
1531
1532 // A partial blob record contain checksum, key and value.
1533 Slice blob_record;
1534
1535 {
1536 StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
1537 if (reader->use_direct_io()) {
1538 s = reader->Read(IOOptions(), record_offset,
1539 static_cast<size_t>(record_size), &blob_record, nullptr,
1540 &aligned_buf);
1541 } else {
1542 buf.reserve(static_cast<size_t>(record_size));
1543 s = reader->Read(IOOptions(), record_offset,
1544 static_cast<size_t>(record_size), &blob_record, &buf[0],
1545 nullptr);
1546 }
1547 RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
1548 }
1549
1550 if (!s.ok()) {
1551 ROCKS_LOG_DEBUG(
1552 db_options_.info_log,
1553 "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
1554 ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
1555 file_number, offset, size, key.size(), s.ToString().c_str());
1556 return s;
1557 }
1558
1559 if (blob_record.size() != record_size) {
1560 ROCKS_LOG_DEBUG(
1561 db_options_.info_log,
1562 "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
1563 ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt
1564 ", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes",
1565 file_number, offset, size, key.size(), blob_record.size(), record_size);
1566
1567 return Status::Corruption("Failed to retrieve blob from blob index.");
1568 }
1569
1570 Slice crc_slice(blob_record.data(), sizeof(uint32_t));
1571 Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
1572 static_cast<size_t>(size));
1573
1574 uint32_t crc_exp = 0;
1575 if (!GetFixed32(&crc_slice, &crc_exp)) {
1576 ROCKS_LOG_DEBUG(
1577 db_options_.info_log,
1578 "Unable to decode CRC from blob file %" PRIu64 ", blob_offset: %" PRIu64
1579 ", blob_size: %" PRIu64 ", key size: %" ROCKSDB_PRIszt ", status: '%s'",
1580 file_number, offset, size, key.size(), s.ToString().c_str());
1581 return Status::Corruption("Unable to decode checksum.");
1582 }
1583
1584 uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t),
1585 blob_record.size() - sizeof(uint32_t));
1586 crc = crc32c::Mask(crc); // Adjust for storage
1587 if (crc != crc_exp) {
1588 if (debug_level_ >= 2) {
1589 ROCKS_LOG_ERROR(
1590 db_options_.info_log,
1591 "Blob crc mismatch file: %" PRIu64 " blob_offset: %" PRIu64
1592 " blob_size: %" PRIu64 " key: %s status: '%s'",
1593 file_number, offset, size,
1594 key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
1595 }
1596
1597 return Status::Corruption("Corruption. Blob CRC mismatch");
1598 }
1599
1600 value->PinSelf(blob_value);
1601
1602 return Status::OK();
1603 }
1604
1605 Status BlobDBImpl::Get(const ReadOptions& read_options,
1606 ColumnFamilyHandle* column_family, const Slice& key,
1607 PinnableSlice* value) {
1608 return Get(read_options, column_family, key, value,
1609 static_cast<uint64_t*>(nullptr) /*expiration*/);
1610 }
1611
1612 Status BlobDBImpl::Get(const ReadOptions& read_options,
1613 ColumnFamilyHandle* column_family, const Slice& key,
1614 PinnableSlice* value, uint64_t* expiration) {
1615 StopWatch get_sw(env_, statistics_, BLOB_DB_GET_MICROS);
1616 RecordTick(statistics_, BLOB_DB_NUM_GET);
1617 return GetImpl(read_options, column_family, key, value, expiration);
1618 }
1619
1620 Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
1621 ColumnFamilyHandle* column_family, const Slice& key,
1622 PinnableSlice* value, uint64_t* expiration) {
1623 if (column_family->GetID() != DefaultColumnFamily()->GetID()) {
1624 return Status::NotSupported(
1625 "Blob DB doesn't support non-default column family.");
1626 }
1627 // Get a snapshot to avoid blob file get deleted between we
1628 // fetch and index entry and reading from the file.
1629 // TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
1630 ReadOptions ro(read_options);
1631 bool snapshot_created = SetSnapshotIfNeeded(&ro);
1632
1633 PinnableSlice index_entry;
1634 Status s;
1635 bool is_blob_index = false;
1636 DBImpl::GetImplOptions get_impl_options;
1637 get_impl_options.column_family = column_family;
1638 get_impl_options.value = &index_entry;
1639 get_impl_options.is_blob_index = &is_blob_index;
1640 s = db_impl_->GetImpl(ro, key, get_impl_options);
1641 if (expiration != nullptr) {
1642 *expiration = kNoExpiration;
1643 }
1644 RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ);
1645 if (s.ok()) {
1646 if (is_blob_index) {
1647 s = GetBlobValue(key, index_entry, value, expiration);
1648 } else {
1649 // The index entry is the value itself in this case.
1650 value->PinSelf(index_entry);
1651 }
1652 RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size());
1653 }
1654 if (snapshot_created) {
1655 db_->ReleaseSnapshot(ro.snapshot);
1656 }
1657 return s;
1658 }
1659
1660 std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
1661 if (aborted) {
1662 return std::make_pair(false, -1);
1663 }
1664
1665 ReadLock rl(&mutex_);
1666
1667 ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check");
1668 ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" ROCKSDB_PRIszt,
1669 blob_files_.size());
1670 ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt,
1671 open_ttl_files_.size());
1672
1673 for (const auto& blob_file : open_ttl_files_) {
1674 (void)blob_file;
1675 assert(!blob_file->Immutable());
1676 }
1677
1678 for (const auto& pair : live_imm_non_ttl_blob_files_) {
1679 const auto& blob_file = pair.second;
1680 (void)blob_file;
1681 assert(!blob_file->HasTTL());
1682 assert(blob_file->Immutable());
1683 }
1684
1685 uint64_t now = EpochNow();
1686
1687 for (auto blob_file_pair : blob_files_) {
1688 auto blob_file = blob_file_pair.second;
1689 char buf[1000];
1690 int pos = snprintf(buf, sizeof(buf),
1691 "Blob file %" PRIu64 ", size %" PRIu64
1692 ", blob count %" PRIu64 ", immutable %d",
1693 blob_file->BlobFileNumber(), blob_file->GetFileSize(),
1694 blob_file->BlobCount(), blob_file->Immutable());
1695 if (blob_file->HasTTL()) {
1696 ExpirationRange expiration_range;
1697
1698 {
1699 ReadLock file_lock(&blob_file->mutex_);
1700 expiration_range = blob_file->GetExpirationRange();
1701 }
1702
1703 pos += snprintf(buf + pos, sizeof(buf) - pos,
1704 ", expiration range (%" PRIu64 ", %" PRIu64 ")",
1705 expiration_range.first, expiration_range.second);
1706 if (!blob_file->Obsolete()) {
1707 pos += snprintf(buf + pos, sizeof(buf) - pos,
1708 ", expire in %" PRIu64 " seconds",
1709 expiration_range.second - now);
1710 }
1711 }
1712 if (blob_file->Obsolete()) {
1713 pos += snprintf(buf + pos, sizeof(buf) - pos, ", obsolete at %" PRIu64,
1714 blob_file->GetObsoleteSequence());
1715 }
1716 snprintf(buf + pos, sizeof(buf) - pos, ".");
1717 ROCKS_LOG_INFO(db_options_.info_log, "%s", buf);
1718 }
1719
1720 // reschedule
1721 return std::make_pair(true, -1);
1722 }
1723
1724 Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
1725 TEST_SYNC_POINT("BlobDBImpl::CloseBlobFile");
1726 assert(bfile);
1727 assert(!bfile->Immutable());
1728 assert(!bfile->Obsolete());
1729
1730 if (bfile->HasTTL() || bfile == open_non_ttl_file_) {
1731 write_mutex_.AssertHeld();
1732 }
1733
1734 ROCKS_LOG_INFO(db_options_.info_log,
1735 "Closing blob file %" PRIu64 ". Path: %s",
1736 bfile->BlobFileNumber(), bfile->PathName().c_str());
1737
1738 const SequenceNumber sequence = GetLatestSequenceNumber();
1739
1740 const Status s = bfile->WriteFooterAndCloseLocked(sequence);
1741
1742 if (s.ok()) {
1743 total_blob_size_ += BlobLogFooter::kSize;
1744 } else {
1745 bfile->MarkImmutable(sequence);
1746
1747 ROCKS_LOG_ERROR(db_options_.info_log,
1748 "Failed to close blob file %" PRIu64 "with error: %s",
1749 bfile->BlobFileNumber(), s.ToString().c_str());
1750 }
1751
1752 if (bfile->HasTTL()) {
1753 size_t erased __attribute__((__unused__));
1754 erased = open_ttl_files_.erase(bfile);
1755 } else {
1756 if (bfile == open_non_ttl_file_) {
1757 open_non_ttl_file_ = nullptr;
1758 }
1759
1760 const uint64_t blob_file_number = bfile->BlobFileNumber();
1761 auto it = live_imm_non_ttl_blob_files_.lower_bound(blob_file_number);
1762 assert(it == live_imm_non_ttl_blob_files_.end() ||
1763 it->first != blob_file_number);
1764 live_imm_non_ttl_blob_files_.insert(
1765 it, std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
1766 blob_file_number, bfile));
1767 }
1768
1769 return s;
1770 }
1771
1772 Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
1773 write_mutex_.AssertHeld();
1774
1775 // atomic read
1776 if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
1777 return Status::OK();
1778 }
1779
1780 WriteLock lock(&mutex_);
1781 WriteLock file_lock(&bfile->mutex_);
1782
1783 assert(!bfile->Obsolete() || bfile->Immutable());
1784 if (bfile->Immutable()) {
1785 return Status::OK();
1786 }
1787
1788 return CloseBlobFile(bfile);
1789 }
1790
1791 void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
1792 SequenceNumber obsolete_seq,
1793 bool update_size) {
1794 assert(blob_file->Immutable());
1795 assert(!blob_file->Obsolete());
1796
1797 // Should hold write lock of mutex_ or during DB open.
1798 blob_file->MarkObsolete(obsolete_seq);
1799 obsolete_files_.push_back(blob_file);
1800 assert(total_blob_size_.load() >= blob_file->GetFileSize());
1801 if (update_size) {
1802 total_blob_size_ -= blob_file->GetFileSize();
1803 }
1804 }
1805
1806 bool BlobDBImpl::VisibleToActiveSnapshot(
1807 const std::shared_ptr<BlobFile>& bfile) {
1808 assert(bfile->Obsolete());
1809
1810 // We check whether the oldest snapshot is no less than the last sequence
1811 // by the time the blob file become obsolete. If so, the blob file is not
1812 // visible to all existing snapshots.
1813 //
1814 // If we keep track of the earliest sequence of the keys in the blob file,
1815 // we could instead check if there's a snapshot falls in range
1816 // [earliest_sequence, obsolete_sequence). But doing so will make the
1817 // implementation more complicated.
1818 SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
1819 SequenceNumber oldest_snapshot = kMaxSequenceNumber;
1820 {
1821 // Need to lock DBImpl mutex before access snapshot list.
1822 InstrumentedMutexLock l(db_impl_->mutex());
1823 auto& snapshots = db_impl_->snapshots();
1824 if (!snapshots.empty()) {
1825 oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
1826 }
1827 }
1828 bool visible = oldest_snapshot < obsolete_sequence;
1829 if (visible) {
1830 ROCKS_LOG_INFO(db_options_.info_log,
1831 "Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
1832 ") visible to oldest snapshot %" PRIu64 ".",
1833 bfile->BlobFileNumber(), obsolete_sequence, oldest_snapshot);
1834 }
1835 return visible;
1836 }
1837
1838 std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
1839 if (aborted) {
1840 return std::make_pair(false, -1);
1841 }
1842
1843 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
1844 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
1845
1846 std::vector<std::shared_ptr<BlobFile>> process_files;
1847 uint64_t now = EpochNow();
1848 {
1849 ReadLock rl(&mutex_);
1850 for (auto p : blob_files_) {
1851 auto& blob_file = p.second;
1852 ReadLock file_lock(&blob_file->mutex_);
1853 if (blob_file->HasTTL() && !blob_file->Obsolete() &&
1854 blob_file->GetExpirationRange().second <= now) {
1855 process_files.push_back(blob_file);
1856 }
1857 }
1858 }
1859
1860 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
1861 TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
1862 TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
1863
1864 SequenceNumber seq = GetLatestSequenceNumber();
1865 {
1866 MutexLock l(&write_mutex_);
1867 WriteLock lock(&mutex_);
1868 for (auto& blob_file : process_files) {
1869 WriteLock file_lock(&blob_file->mutex_);
1870
1871 // Need to double check if the file is obsolete.
1872 if (blob_file->Obsolete()) {
1873 assert(blob_file->Immutable());
1874 continue;
1875 }
1876
1877 if (!blob_file->Immutable()) {
1878 CloseBlobFile(blob_file);
1879 }
1880
1881 assert(blob_file->Immutable());
1882
1883 ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
1884 }
1885 }
1886
1887 return std::make_pair(true, -1);
1888 }
1889
1890 Status BlobDBImpl::SyncBlobFiles() {
1891 MutexLock l(&write_mutex_);
1892
1893 std::vector<std::shared_ptr<BlobFile>> process_files;
1894 {
1895 ReadLock rl(&mutex_);
1896 for (auto fitr : open_ttl_files_) {
1897 process_files.push_back(fitr);
1898 }
1899 if (open_non_ttl_file_ != nullptr) {
1900 process_files.push_back(open_non_ttl_file_);
1901 }
1902 }
1903
1904 Status s;
1905 for (auto& blob_file : process_files) {
1906 s = blob_file->Fsync();
1907 if (!s.ok()) {
1908 ROCKS_LOG_ERROR(db_options_.info_log,
1909 "Failed to sync blob file %" PRIu64 ", status: %s",
1910 blob_file->BlobFileNumber(), s.ToString().c_str());
1911 return s;
1912 }
1913 }
1914
1915 s = dir_ent_->Fsync();
1916 if (!s.ok()) {
1917 ROCKS_LOG_ERROR(db_options_.info_log,
1918 "Failed to sync blob directory, status: %s",
1919 s.ToString().c_str());
1920 }
1921 return s;
1922 }
1923
1924 std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
1925 if (aborted) return std::make_pair(false, -1);
1926
1927 if (open_file_count_.load() < kOpenFilesTrigger) {
1928 return std::make_pair(true, -1);
1929 }
1930
1931 // in the future, we should sort by last_access_
1932 // instead of closing every file
1933 ReadLock rl(&mutex_);
1934 for (auto const& ent : blob_files_) {
1935 auto bfile = ent.second;
1936 if (bfile->last_access_.load() == -1) continue;
1937
1938 WriteLock lockbfile_w(&bfile->mutex_);
1939 CloseRandomAccessLocked(bfile);
1940 }
1941
1942 return std::make_pair(true, -1);
1943 }
1944
1945 std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
1946 if (aborted) {
1947 return std::make_pair(false, -1);
1948 }
1949
1950 MutexLock delete_file_lock(&delete_file_mutex_);
1951 if (disable_file_deletions_ > 0) {
1952 return std::make_pair(true, -1);
1953 }
1954
1955 std::list<std::shared_ptr<BlobFile>> tobsolete;
1956 {
1957 WriteLock wl(&mutex_);
1958 if (obsolete_files_.empty()) {
1959 return std::make_pair(true, -1);
1960 }
1961 tobsolete.swap(obsolete_files_);
1962 }
1963
1964 bool file_deleted = false;
1965 for (auto iter = tobsolete.begin(); iter != tobsolete.end();) {
1966 auto bfile = *iter;
1967 {
1968 ReadLock lockbfile_r(&bfile->mutex_);
1969 if (VisibleToActiveSnapshot(bfile)) {
1970 ROCKS_LOG_INFO(db_options_.info_log,
1971 "Could not delete file due to snapshot failure %s",
1972 bfile->PathName().c_str());
1973 ++iter;
1974 continue;
1975 }
1976 }
1977 ROCKS_LOG_INFO(db_options_.info_log,
1978 "Will delete file due to snapshot success %s",
1979 bfile->PathName().c_str());
1980
1981 {
1982 WriteLock wl(&mutex_);
1983 blob_files_.erase(bfile->BlobFileNumber());
1984 }
1985
1986 Status s = DeleteDBFile(&(db_impl_->immutable_db_options()),
1987 bfile->PathName(), blob_dir_, true,
1988 /*force_fg=*/false);
1989 if (!s.ok()) {
1990 ROCKS_LOG_ERROR(db_options_.info_log,
1991 "File failed to be deleted as obsolete %s",
1992 bfile->PathName().c_str());
1993 ++iter;
1994 continue;
1995 }
1996
1997 file_deleted = true;
1998 ROCKS_LOG_INFO(db_options_.info_log,
1999 "File deleted as obsolete from blob dir %s",
2000 bfile->PathName().c_str());
2001
2002 iter = tobsolete.erase(iter);
2003 }
2004
2005 // directory change. Fsync
2006 if (file_deleted) {
2007 Status s = dir_ent_->Fsync();
2008 if (!s.ok()) {
2009 ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s",
2010 blob_dir_.c_str(), s.ToString().c_str());
2011 }
2012 }
2013
2014 // put files back into obsolete if for some reason, delete failed
2015 if (!tobsolete.empty()) {
2016 WriteLock wl(&mutex_);
2017 for (auto bfile : tobsolete) {
2018 blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
2019 obsolete_files_.push_front(bfile);
2020 }
2021 }
2022
2023 return std::make_pair(!aborted, -1);
2024 }
2025
2026 void BlobDBImpl::CopyBlobFiles(
2027 std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
2028 ReadLock rl(&mutex_);
2029 for (auto const& p : blob_files_) {
2030 bfiles_copy->push_back(p.second);
2031 }
2032 }
2033
2034 Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
2035 auto* cfd =
2036 static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
2037 ->cfd();
2038 // Get a snapshot to avoid blob file get deleted between we
2039 // fetch and index entry and reading from the file.
2040 ManagedSnapshot* own_snapshot = nullptr;
2041 const Snapshot* snapshot = read_options.snapshot;
2042 if (snapshot == nullptr) {
2043 own_snapshot = new ManagedSnapshot(db_);
2044 snapshot = own_snapshot->snapshot();
2045 }
2046 auto* iter = db_impl_->NewIteratorImpl(
2047 read_options, cfd, snapshot->GetSequenceNumber(),
2048 nullptr /*read_callback*/, true /*allow_blob*/);
2049 return new BlobDBIterator(own_snapshot, iter, this, env_, statistics_);
2050 }
2051
2052 Status DestroyBlobDB(const std::string& dbname, const Options& options,
2053 const BlobDBOptions& bdb_options) {
2054 const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
2055 Env* env = soptions.env;
2056
2057 Status status;
2058 std::string blobdir;
2059 blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir
2060 : bdb_options.blob_dir;
2061
2062 std::vector<std::string> filenames;
2063 env->GetChildren(blobdir, &filenames);
2064
2065 for (const auto& f : filenames) {
2066 uint64_t number;
2067 FileType type;
2068 if (ParseFileName(f, &number, &type) && type == kBlobFile) {
2069 Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true,
2070 /*force_fg=*/false);
2071 if (status.ok() && !del.ok()) {
2072 status = del;
2073 }
2074 }
2075 }
2076 env->DeleteDir(blobdir);
2077
2078 Status destroy = DestroyDB(dbname, options);
2079 if (status.ok() && !destroy.ok()) {
2080 status = destroy;
2081 }
2082
2083 return status;
2084 }
2085
2086 #ifndef NDEBUG
2087 Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
2088 PinnableSlice* value) {
2089 return GetBlobValue(key, index_entry, value);
2090 }
2091
2092 void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number,
2093 SequenceNumber immutable_sequence) {
2094 auto blob_file = std::make_shared<BlobFile>(this, blob_dir_, blob_file_number,
2095 db_options_.info_log.get());
2096 blob_file->MarkImmutable(immutable_sequence);
2097
2098 blob_files_[blob_file_number] = blob_file;
2099 live_imm_non_ttl_blob_files_[blob_file_number] = blob_file;
2100 }
2101
2102 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
2103 ReadLock l(&mutex_);
2104 std::vector<std::shared_ptr<BlobFile>> blob_files;
2105 for (auto& p : blob_files_) {
2106 blob_files.emplace_back(p.second);
2107 }
2108 return blob_files;
2109 }
2110
2111 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetLiveImmNonTTLFiles()
2112 const {
2113 ReadLock l(&mutex_);
2114 std::vector<std::shared_ptr<BlobFile>> live_imm_non_ttl_files;
2115 for (const auto& pair : live_imm_non_ttl_blob_files_) {
2116 live_imm_non_ttl_files.emplace_back(pair.second);
2117 }
2118 return live_imm_non_ttl_files;
2119 }
2120
2121 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
2122 const {
2123 ReadLock l(&mutex_);
2124 std::vector<std::shared_ptr<BlobFile>> obsolete_files;
2125 for (auto& bfile : obsolete_files_) {
2126 obsolete_files.emplace_back(bfile);
2127 }
2128 return obsolete_files;
2129 }
2130
2131 void BlobDBImpl::TEST_DeleteObsoleteFiles() {
2132 DeleteObsoleteFiles(false /*abort*/);
2133 }
2134
2135 Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
2136 MutexLock l(&write_mutex_);
2137 WriteLock lock(&mutex_);
2138 WriteLock file_lock(&bfile->mutex_);
2139
2140 return CloseBlobFile(bfile);
2141 }
2142
2143 void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
2144 SequenceNumber obsolete_seq,
2145 bool update_size) {
2146 return ObsoleteBlobFile(blob_file, obsolete_seq, update_size);
2147 }
2148
2149 void BlobDBImpl::TEST_EvictExpiredFiles() {
2150 EvictExpiredFiles(false /*abort*/);
2151 }
2152
2153 uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
2154
2155 void BlobDBImpl::TEST_InitializeBlobFileToSstMapping(
2156 const std::vector<LiveFileMetaData>& live_files) {
2157 InitializeBlobFileToSstMapping(live_files);
2158 }
2159
2160 void BlobDBImpl::TEST_ProcessFlushJobInfo(const FlushJobInfo& info) {
2161 ProcessFlushJobInfo(info);
2162 }
2163
2164 void BlobDBImpl::TEST_ProcessCompactionJobInfo(const CompactionJobInfo& info) {
2165 ProcessCompactionJobInfo(info);
2166 }
2167
2168 #endif // !NDEBUG
2169
2170 } // namespace blob_db
2171 } // namespace ROCKSDB_NAMESPACE
2172 #endif // ROCKSDB_LITE