]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/blob_db/blob_db_impl.cc
1a32bd562eb6b04592d2c33fb2d8df53963ebfe2
[ceph.git] / ceph / src / rocksdb / utilities / blob_db / blob_db_impl.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 #ifndef ROCKSDB_LITE
6
7 #include "utilities/blob_db/blob_db_impl.h"
8 #include <algorithm>
9 #include <cinttypes>
10 #include <iomanip>
11 #include <memory>
12
13 #include "db/db_impl.h"
14 #include "db/write_batch_internal.h"
15 #include "monitoring/instrumented_mutex.h"
16 #include "monitoring/statistics.h"
17 #include "rocksdb/convenience.h"
18 #include "rocksdb/env.h"
19 #include "rocksdb/iterator.h"
20 #include "rocksdb/utilities/stackable_db.h"
21 #include "rocksdb/utilities/transaction.h"
22 #include "table/block.h"
23 #include "table/block_based_table_builder.h"
24 #include "table/block_builder.h"
25 #include "table/meta_blocks.h"
26 #include "util/cast_util.h"
27 #include "util/crc32c.h"
28 #include "util/file_reader_writer.h"
29 #include "util/filename.h"
30 #include "util/logging.h"
31 #include "util/mutexlock.h"
32 #include "util/random.h"
33 #include "util/stop_watch.h"
34 #include "util/sync_point.h"
35 #include "util/timer_queue.h"
36 #include "utilities/blob_db/blob_compaction_filter.h"
37 #include "utilities/blob_db/blob_db_iterator.h"
38 #include "utilities/blob_db/blob_db_listener.h"
39 #include "utilities/blob_db/blob_index.h"
40
41 namespace {
42 int kBlockBasedTableVersionFormat = 2;
43 } // end namespace
44
45 namespace rocksdb {
46 namespace blob_db {
47
48 WalFilter::WalProcessingOption BlobReconcileWalFilter::LogRecordFound(
49 unsigned long long /*log_number*/, const std::string& /*log_file_name*/,
50 const WriteBatch& /*batch*/, WriteBatch* /*new_batch*/,
51 bool* /*batch_changed*/) {
52 return WalFilter::WalProcessingOption::kContinueProcessing;
53 }
54
55 bool BlobFileComparator::operator()(
56 const std::shared_ptr<BlobFile>& lhs,
57 const std::shared_ptr<BlobFile>& rhs) const {
58 return lhs->BlobFileNumber() > rhs->BlobFileNumber();
59 }
60
61 bool BlobFileComparatorTTL::operator()(
62 const std::shared_ptr<BlobFile>& lhs,
63 const std::shared_ptr<BlobFile>& rhs) const {
64 assert(lhs->HasTTL() && rhs->HasTTL());
65 if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
66 return true;
67 }
68 if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
69 return false;
70 }
71 return lhs->BlobFileNumber() < rhs->BlobFileNumber();
72 }
73
74 BlobDBImpl::BlobDBImpl(const std::string& dbname,
75 const BlobDBOptions& blob_db_options,
76 const DBOptions& db_options,
77 const ColumnFamilyOptions& cf_options)
78 : BlobDB(),
79 dbname_(dbname),
80 db_impl_(nullptr),
81 env_(db_options.env),
82 bdb_options_(blob_db_options),
83 db_options_(db_options),
84 cf_options_(cf_options),
85 env_options_(db_options),
86 statistics_(db_options_.statistics.get()),
87 next_file_number_(1),
88 epoch_of_(0),
89 closed_(true),
90 open_file_count_(0),
91 total_blob_size_(0),
92 live_sst_size_(0),
93 fifo_eviction_seq_(0),
94 evict_expiration_up_to_(0),
95 debug_level_(0) {
96 blob_dir_ = (bdb_options_.path_relative)
97 ? dbname + "/" + bdb_options_.blob_dir
98 : bdb_options_.blob_dir;
99 env_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
100 }
101
102 BlobDBImpl::~BlobDBImpl() {
103 // CancelAllBackgroundWork(db_, true);
104 Status s __attribute__((__unused__)) = Close();
105 assert(s.ok());
106 }
107
108 Status BlobDBImpl::Close() {
109 if (closed_) {
110 return Status::OK();
111 }
112 closed_ = true;
113
114 // Close base DB before BlobDBImpl destructs to stop event listener and
115 // compaction filter call.
116 Status s = db_->Close();
117 // delete db_ anyway even if close failed.
118 delete db_;
119 // Reset pointers to avoid StackableDB delete the pointer again.
120 db_ = nullptr;
121 db_impl_ = nullptr;
122 if (!s.ok()) {
123 return s;
124 }
125
126 s = SyncBlobFiles();
127 return s;
128 }
129
130 BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
131
132 Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
133 assert(handles != nullptr);
134 assert(db_ == nullptr);
135 if (blob_dir_.empty()) {
136 return Status::NotSupported("No blob directory in options");
137 }
138 if (cf_options_.compaction_filter != nullptr ||
139 cf_options_.compaction_filter_factory != nullptr) {
140 return Status::NotSupported("Blob DB doesn't support compaction filter.");
141 }
142
143 Status s;
144
145 // Create info log.
146 if (db_options_.info_log == nullptr) {
147 s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
148 if (!s.ok()) {
149 return s;
150 }
151 }
152
153 ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
154
155 // Open blob directory.
156 s = env_->CreateDirIfMissing(blob_dir_);
157 if (!s.ok()) {
158 ROCKS_LOG_ERROR(db_options_.info_log,
159 "Failed to create blob_dir %s, status: %s",
160 blob_dir_.c_str(), s.ToString().c_str());
161 }
162 s = env_->NewDirectory(blob_dir_, &dir_ent_);
163 if (!s.ok()) {
164 ROCKS_LOG_ERROR(db_options_.info_log,
165 "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
166 s.ToString().c_str());
167 return s;
168 }
169
170 // Open blob files.
171 s = OpenAllBlobFiles();
172 if (!s.ok()) {
173 return s;
174 }
175
176 // Update options
177 db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
178 cf_options_.compaction_filter_factory.reset(
179 new BlobIndexCompactionFilterFactory(this, env_, statistics_));
180
181 // Open base db.
182 ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
183 s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
184 if (!s.ok()) {
185 return s;
186 }
187 db_impl_ = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
188 UpdateLiveSSTSize();
189
190 // Start background jobs.
191 if (!bdb_options_.disable_background_tasks) {
192 StartBackgroundTasks();
193 }
194
195 ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
196 bdb_options_.Dump(db_options_.info_log.get());
197 closed_ = false;
198 return s;
199 }
200
201 void BlobDBImpl::StartBackgroundTasks() {
202 // store a call to a member function and object
203 tqueue_.add(
204 kReclaimOpenFilesPeriodMillisecs,
205 std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
206 tqueue_.add(static_cast<int64_t>(
207 bdb_options_.garbage_collection_interval_secs * 1000),
208 std::bind(&BlobDBImpl::RunGC, this, std::placeholders::_1));
209 tqueue_.add(
210 kDeleteObsoleteFilesPeriodMillisecs,
211 std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
212 tqueue_.add(kSanityCheckPeriodMillisecs,
213 std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
214 tqueue_.add(
215 kEvictExpiredFilesPeriodMillisecs,
216 std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1));
217 }
218
219 Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
220 assert(file_numbers != nullptr);
221 std::vector<std::string> all_files;
222 Status s = env_->GetChildren(blob_dir_, &all_files);
223 if (!s.ok()) {
224 ROCKS_LOG_ERROR(db_options_.info_log,
225 "Failed to get list of blob files, status: %s",
226 s.ToString().c_str());
227 return s;
228 }
229
230 for (const auto& file_name : all_files) {
231 uint64_t file_number;
232 FileType type;
233 bool success = ParseFileName(file_name, &file_number, &type);
234 if (success && type == kBlobFile) {
235 file_numbers->insert(file_number);
236 } else {
237 ROCKS_LOG_WARN(db_options_.info_log,
238 "Skipping file in blob directory: %s", file_name.c_str());
239 }
240 }
241
242 return s;
243 }
244
245 Status BlobDBImpl::OpenAllBlobFiles() {
246 std::set<uint64_t> file_numbers;
247 Status s = GetAllBlobFiles(&file_numbers);
248 if (!s.ok()) {
249 return s;
250 }
251
252 if (!file_numbers.empty()) {
253 next_file_number_.store(*file_numbers.rbegin() + 1);
254 }
255
256 std::string blob_file_list;
257 std::string obsolete_file_list;
258
259 for (auto& file_number : file_numbers) {
260 std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
261 this, blob_dir_, file_number, db_options_.info_log.get());
262 blob_file->MarkImmutable();
263
264 // Read file header and footer
265 Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_);
266 if (read_metadata_status.IsCorruption()) {
267 // Remove incomplete file.
268 ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
269 if (!obsolete_file_list.empty()) {
270 obsolete_file_list.append(", ");
271 }
272 obsolete_file_list.append(ToString(file_number));
273 continue;
274 } else if (!read_metadata_status.ok()) {
275 ROCKS_LOG_ERROR(db_options_.info_log,
276 "Unable to read metadata of blob file % " PRIu64
277 ", status: '%s'",
278 file_number, read_metadata_status.ToString().c_str());
279 return read_metadata_status;
280 }
281
282 total_blob_size_ += blob_file->GetFileSize();
283
284 blob_files_[file_number] = blob_file;
285 if (!blob_file_list.empty()) {
286 blob_file_list.append(", ");
287 }
288 blob_file_list.append(ToString(file_number));
289 }
290
291 ROCKS_LOG_INFO(db_options_.info_log,
292 "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
293 blob_file_list.c_str());
294 ROCKS_LOG_INFO(db_options_.info_log,
295 "Found %" ROCKSDB_PRIszt
296 " incomplete or corrupted blob files: %s",
297 obsolete_files_.size(), obsolete_file_list.c_str());
298 return s;
299 }
300
301 void BlobDBImpl::CloseRandomAccessLocked(
302 const std::shared_ptr<BlobFile>& bfile) {
303 bfile->CloseRandomAccessLocked();
304 open_file_count_--;
305 }
306
307 Status BlobDBImpl::GetBlobFileReader(
308 const std::shared_ptr<BlobFile>& blob_file,
309 std::shared_ptr<RandomAccessFileReader>* reader) {
310 assert(reader != nullptr);
311 bool fresh_open = false;
312 Status s = blob_file->GetReader(env_, env_options_, reader, &fresh_open);
313 if (s.ok() && fresh_open) {
314 assert(*reader != nullptr);
315 open_file_count_++;
316 }
317 return s;
318 }
319
320 std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(const std::string& reason) {
321 uint64_t file_num = next_file_number_++;
322 auto bfile = std::make_shared<BlobFile>(this, blob_dir_, file_num,
323 db_options_.info_log.get());
324 ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
325 bfile->PathName().c_str(), reason.c_str());
326 LogFlush(db_options_.info_log);
327 return bfile;
328 }
329
330 Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
331 std::string fpath(bfile->PathName());
332 std::unique_ptr<WritableFile> wfile;
333
334 Status s = env_->ReopenWritableFile(fpath, &wfile, env_options_);
335 if (!s.ok()) {
336 ROCKS_LOG_ERROR(db_options_.info_log,
337 "Failed to open blob file for write: %s status: '%s'"
338 " exists: '%s'",
339 fpath.c_str(), s.ToString().c_str(),
340 env_->FileExists(fpath).ToString().c_str());
341 return s;
342 }
343
344 std::unique_ptr<WritableFileWriter> fwriter;
345 fwriter.reset(new WritableFileWriter(std::move(wfile), fpath, env_options_));
346
347 uint64_t boffset = bfile->GetFileSize();
348 if (debug_level_ >= 2 && boffset) {
349 ROCKS_LOG_DEBUG(db_options_.info_log, "Open blob file: %s with offset: %d",
350 fpath.c_str(), boffset);
351 }
352
353 Writer::ElemType et = Writer::kEtNone;
354 if (bfile->file_size_ == BlobLogHeader::kSize) {
355 et = Writer::kEtFileHdr;
356 } else if (bfile->file_size_ > BlobLogHeader::kSize) {
357 et = Writer::kEtRecord;
358 } else if (bfile->file_size_) {
359 ROCKS_LOG_WARN(db_options_.info_log,
360 "Open blob file: %s with wrong size: %d", fpath.c_str(),
361 boffset);
362 return Status::Corruption("Invalid blob file size");
363 }
364
365 bfile->log_writer_ = std::make_shared<Writer>(
366 std::move(fwriter), env_, statistics_, bfile->file_number_,
367 bdb_options_.bytes_per_sync, db_options_.use_fsync, boffset);
368 bfile->log_writer_->last_elem_type_ = et;
369
370 return s;
371 }
372
373 std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
374 uint64_t expiration) const {
375 if (open_ttl_files_.empty()) {
376 return nullptr;
377 }
378
379 std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
380 tmp->SetHasTTL(true);
381 tmp->expiration_range_ = std::make_pair(expiration, 0);
382 tmp->file_number_ = std::numeric_limits<uint64_t>::max();
383
384 auto citr = open_ttl_files_.equal_range(tmp);
385 if (citr.first == open_ttl_files_.end()) {
386 assert(citr.second == open_ttl_files_.end());
387
388 std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
389 return (check->expiration_range_.second <= expiration) ? nullptr : check;
390 }
391
392 if (citr.first != citr.second) {
393 return *(citr.first);
394 }
395
396 auto finditr = citr.second;
397 if (finditr != open_ttl_files_.begin()) {
398 --finditr;
399 }
400
401 bool b2 = (*finditr)->expiration_range_.second <= expiration;
402 bool b1 = (*finditr)->expiration_range_.first > expiration;
403
404 return (b1 || b2) ? nullptr : (*finditr);
405 }
406
407 std::shared_ptr<Writer> BlobDBImpl::CheckOrCreateWriterLocked(
408 const std::shared_ptr<BlobFile>& bfile) {
409 std::shared_ptr<Writer> writer = bfile->GetWriter();
410 if (writer) return writer;
411
412 Status s = CreateWriterLocked(bfile);
413 if (!s.ok()) return nullptr;
414
415 writer = bfile->GetWriter();
416 return writer;
417 }
418
419 std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFile() {
420 {
421 ReadLock rl(&mutex_);
422 if (open_non_ttl_file_ != nullptr) {
423 return open_non_ttl_file_;
424 }
425 }
426
427 // CHECK again
428 WriteLock wl(&mutex_);
429 if (open_non_ttl_file_ != nullptr) {
430 return open_non_ttl_file_;
431 }
432
433 std::shared_ptr<BlobFile> bfile = NewBlobFile("SelectBlobFile");
434 assert(bfile);
435
436 // file not visible, hence no lock
437 std::shared_ptr<Writer> writer = CheckOrCreateWriterLocked(bfile);
438 if (!writer) {
439 ROCKS_LOG_ERROR(db_options_.info_log,
440 "Failed to get writer from blob file: %s",
441 bfile->PathName().c_str());
442 return nullptr;
443 }
444
445 bfile->file_size_ = BlobLogHeader::kSize;
446 bfile->header_.compression = bdb_options_.compression;
447 bfile->header_.has_ttl = false;
448 bfile->header_.column_family_id =
449 reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
450 bfile->header_valid_ = true;
451 bfile->SetColumnFamilyId(bfile->header_.column_family_id);
452 bfile->SetHasTTL(false);
453 bfile->SetCompression(bdb_options_.compression);
454
455 Status s = writer->WriteHeader(bfile->header_);
456 if (!s.ok()) {
457 ROCKS_LOG_ERROR(db_options_.info_log,
458 "Failed to write header to new blob file: %s"
459 " status: '%s'",
460 bfile->PathName().c_str(), s.ToString().c_str());
461 return nullptr;
462 }
463
464 blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
465 open_non_ttl_file_ = bfile;
466 total_blob_size_ += BlobLogHeader::kSize;
467 return bfile;
468 }
469
470 std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint64_t expiration) {
471 assert(expiration != kNoExpiration);
472 uint64_t epoch_read = 0;
473 std::shared_ptr<BlobFile> bfile;
474 {
475 ReadLock rl(&mutex_);
476 bfile = FindBlobFileLocked(expiration);
477 epoch_read = epoch_of_.load();
478 }
479
480 if (bfile) {
481 assert(!bfile->Immutable());
482 return bfile;
483 }
484
485 uint64_t exp_low =
486 (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
487 uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
488 ExpirationRange expiration_range = std::make_pair(exp_low, exp_high);
489
490 bfile = NewBlobFile("SelectBlobFileTTL");
491 assert(bfile);
492
493 ROCKS_LOG_INFO(db_options_.info_log, "New blob file TTL range: %s %d %d",
494 bfile->PathName().c_str(), exp_low, exp_high);
495 LogFlush(db_options_.info_log);
496
497 // we don't need to take lock as no other thread is seeing bfile yet
498 std::shared_ptr<Writer> writer = CheckOrCreateWriterLocked(bfile);
499 if (!writer) {
500 ROCKS_LOG_ERROR(db_options_.info_log,
501 "Failed to get writer from blob file with TTL: %s",
502 bfile->PathName().c_str());
503 return nullptr;
504 }
505
506 bfile->header_.expiration_range = expiration_range;
507 bfile->header_.compression = bdb_options_.compression;
508 bfile->header_.has_ttl = true;
509 bfile->header_.column_family_id =
510 reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
511 ;
512 bfile->header_valid_ = true;
513 bfile->SetColumnFamilyId(bfile->header_.column_family_id);
514 bfile->SetHasTTL(true);
515 bfile->SetCompression(bdb_options_.compression);
516 bfile->file_size_ = BlobLogHeader::kSize;
517
518 // set the first value of the range, since that is
519 // concrete at this time. also necessary to add to open_ttl_files_
520 bfile->expiration_range_ = expiration_range;
521
522 WriteLock wl(&mutex_);
523 // in case the epoch has shifted in the interim, then check
524 // check condition again - should be rare.
525 if (epoch_of_.load() != epoch_read) {
526 auto bfile2 = FindBlobFileLocked(expiration);
527 if (bfile2) return bfile2;
528 }
529
530 Status s = writer->WriteHeader(bfile->header_);
531 if (!s.ok()) {
532 ROCKS_LOG_ERROR(db_options_.info_log,
533 "Failed to write header to new blob file: %s"
534 " status: '%s'",
535 bfile->PathName().c_str(), s.ToString().c_str());
536 return nullptr;
537 }
538
539 blob_files_.insert(std::make_pair(bfile->BlobFileNumber(), bfile));
540 open_ttl_files_.insert(bfile);
541 total_blob_size_ += BlobLogHeader::kSize;
542 epoch_of_++;
543
544 return bfile;
545 }
546
547 class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
548 private:
549 const WriteOptions& options_;
550 BlobDBImpl* blob_db_impl_;
551 uint32_t default_cf_id_;
552 WriteBatch batch_;
553
554 public:
555 BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
556 uint32_t default_cf_id)
557 : options_(options),
558 blob_db_impl_(blob_db_impl),
559 default_cf_id_(default_cf_id) {}
560
561 WriteBatch* batch() { return &batch_; }
562
563 virtual Status PutCF(uint32_t column_family_id, const Slice& key,
564 const Slice& value) override {
565 if (column_family_id != default_cf_id_) {
566 return Status::NotSupported(
567 "Blob DB doesn't support non-default column family.");
568 }
569 Status s = blob_db_impl_->PutBlobValue(options_, key, value, kNoExpiration,
570 &batch_);
571 return s;
572 }
573
574 virtual Status DeleteCF(uint32_t column_family_id,
575 const Slice& key) override {
576 if (column_family_id != default_cf_id_) {
577 return Status::NotSupported(
578 "Blob DB doesn't support non-default column family.");
579 }
580 Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
581 return s;
582 }
583
584 virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
585 const Slice& end_key) {
586 if (column_family_id != default_cf_id_) {
587 return Status::NotSupported(
588 "Blob DB doesn't support non-default column family.");
589 }
590 Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
591 begin_key, end_key);
592 return s;
593 }
594
595 virtual Status SingleDeleteCF(uint32_t /*column_family_id*/,
596 const Slice& /*key*/) override {
597 return Status::NotSupported("Not supported operation in blob db.");
598 }
599
600 virtual Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
601 const Slice& /*value*/) override {
602 return Status::NotSupported("Not supported operation in blob db.");
603 }
604
605 virtual void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
606 };
607
608 Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
609 StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
610 RecordTick(statistics_, BLOB_DB_NUM_WRITE);
611 uint32_t default_cf_id =
612 reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
613 Status s;
614 BlobInserter blob_inserter(options, this, default_cf_id);
615 {
616 // Release write_mutex_ before DB write to avoid race condition with
617 // flush begin listener, which also require write_mutex_ to sync
618 // blob files.
619 MutexLock l(&write_mutex_);
620 s = updates->Iterate(&blob_inserter);
621 }
622 if (!s.ok()) {
623 return s;
624 }
625 return db_->Write(options, blob_inserter.batch());
626 }
627
628 Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
629 const Slice& value) {
630 return PutUntil(options, key, value, kNoExpiration);
631 }
632
633 Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
634 const Slice& key, const Slice& value,
635 uint64_t ttl) {
636 uint64_t now = EpochNow();
637 uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
638 return PutUntil(options, key, value, expiration);
639 }
640
641 Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
642 const Slice& value, uint64_t expiration) {
643 StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
644 RecordTick(statistics_, BLOB_DB_NUM_PUT);
645 TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
646 Status s;
647 WriteBatch batch;
648 {
649 // Release write_mutex_ before DB write to avoid race condition with
650 // flush begin listener, which also require write_mutex_ to sync
651 // blob files.
652 MutexLock l(&write_mutex_);
653 s = PutBlobValue(options, key, value, expiration, &batch);
654 }
655 if (s.ok()) {
656 s = db_->Write(options, &batch);
657 }
658 TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish");
659 return s;
660 }
661
662 Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
663 const Slice& key, const Slice& value,
664 uint64_t expiration, WriteBatch* batch) {
665 write_mutex_.AssertHeld();
666 Status s;
667 std::string index_entry;
668 uint32_t column_family_id =
669 reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
670 if (value.size() < bdb_options_.min_blob_size) {
671 if (expiration == kNoExpiration) {
672 // Put as normal value
673 s = batch->Put(key, value);
674 RecordTick(statistics_, BLOB_DB_WRITE_INLINED);
675 } else {
676 // Inlined with TTL
677 BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
678 s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
679 index_entry);
680 RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
681 }
682 } else {
683 std::string compression_output;
684 Slice value_compressed = GetCompressedSlice(value, &compression_output);
685
686 std::string headerbuf;
687 Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration);
688
689 // Check DB size limit before selecting blob file to
690 // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
691 // done before calling SelectBlobFile().
692 s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() +
693 value_compressed.size());
694 if (!s.ok()) {
695 return s;
696 }
697
698 std::shared_ptr<BlobFile> bfile = (expiration != kNoExpiration)
699 ? SelectBlobFileTTL(expiration)
700 : SelectBlobFile();
701 assert(bfile != nullptr);
702 assert(bfile->compression() == bdb_options_.compression);
703
704 s = AppendBlob(bfile, headerbuf, key, value_compressed, expiration,
705 &index_entry);
706 if (expiration == kNoExpiration) {
707 RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
708 } else {
709 RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
710 }
711
712 if (s.ok()) {
713 if (expiration != kNoExpiration) {
714 bfile->ExtendExpirationRange(expiration);
715 }
716 s = CloseBlobFileIfNeeded(bfile);
717 if (s.ok()) {
718 s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
719 index_entry);
720 }
721 } else {
722 ROCKS_LOG_ERROR(db_options_.info_log,
723 "Failed to append blob to FILE: %s: KEY: %s VALSZ: %d"
724 " status: '%s' blob_file: '%s'",
725 bfile->PathName().c_str(), key.ToString().c_str(),
726 value.size(), s.ToString().c_str(),
727 bfile->DumpState().c_str());
728 }
729 }
730
731 RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN);
732 RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size());
733 MeasureTime(statistics_, BLOB_DB_KEY_SIZE, key.size());
734 MeasureTime(statistics_, BLOB_DB_VALUE_SIZE, value.size());
735
736 return s;
737 }
738
739 Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
740 std::string* compression_output) const {
741 if (bdb_options_.compression == kNoCompression) {
742 return raw;
743 }
744 StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS);
745 CompressionType ct = bdb_options_.compression;
746 CompressionContext compression_ctx(ct);
747 CompressBlock(raw, compression_ctx, &ct, kBlockBasedTableVersionFormat,
748 compression_output);
749 return *compression_output;
750 }
751
752 void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
753 ReadLock l(&mutex_);
754
755 context->next_file_number = next_file_number_.load();
756 context->current_blob_files.clear();
757 for (auto& p : blob_files_) {
758 context->current_blob_files.insert(p.first);
759 }
760 context->fifo_eviction_seq = fifo_eviction_seq_;
761 context->evict_expiration_up_to = evict_expiration_up_to_;
762 }
763
764 void BlobDBImpl::UpdateLiveSSTSize() {
765 uint64_t live_sst_size = 0;
766 bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size);
767 if (ok) {
768 live_sst_size_.store(live_sst_size);
769 ROCKS_LOG_INFO(db_options_.info_log,
770 "Updated total SST file size: %" PRIu64 " bytes.",
771 live_sst_size);
772 } else {
773 ROCKS_LOG_ERROR(
774 db_options_.info_log,
775 "Failed to update total SST file size after flush or compaction.");
776 }
777 {
778 // Trigger FIFO eviction if needed.
779 MutexLock l(&write_mutex_);
780 Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/);
781 if (s.IsNoSpace()) {
782 ROCKS_LOG_WARN(db_options_.info_log,
783 "DB grow out-of-space after SST size updated. Current live"
784 " SST size: %" PRIu64
785 " , current blob files size: %" PRIu64 ".",
786 live_sst_size_.load(), total_blob_size_.load());
787 }
788 }
789 }
790
791 Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
792 bool force_evict) {
793 write_mutex_.AssertHeld();
794
795 uint64_t live_sst_size = live_sst_size_.load();
796 if (bdb_options_.max_db_size == 0 ||
797 live_sst_size + total_blob_size_.load() + blob_size <=
798 bdb_options_.max_db_size) {
799 return Status::OK();
800 }
801
802 if (bdb_options_.is_fifo == false ||
803 (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) {
804 // FIFO eviction is disabled, or no space to insert new blob even we evict
805 // all blob files.
806 return Status::NoSpace(
807 "Write failed, as writing it would exceed max_db_size limit.");
808 }
809
810 std::vector<std::shared_ptr<BlobFile>> candidate_files;
811 CopyBlobFiles(&candidate_files);
812 std::sort(candidate_files.begin(), candidate_files.end(),
813 BlobFileComparator());
814 fifo_eviction_seq_ = GetLatestSequenceNumber();
815
816 WriteLock l(&mutex_);
817
818 while (!candidate_files.empty() &&
819 live_sst_size + total_blob_size_.load() + blob_size >
820 bdb_options_.max_db_size) {
821 std::shared_ptr<BlobFile> blob_file = candidate_files.back();
822 candidate_files.pop_back();
823 WriteLock file_lock(&blob_file->mutex_);
824 if (blob_file->Obsolete()) {
825 // File already obsoleted by someone else.
826 continue;
827 }
828 // FIFO eviction can evict open blob files.
829 if (!blob_file->Immutable()) {
830 Status s = CloseBlobFile(blob_file, false /*need_lock*/);
831 if (!s.ok()) {
832 return s;
833 }
834 }
835 assert(blob_file->Immutable());
836 auto expiration_range = blob_file->GetExpirationRange();
837 ROCKS_LOG_INFO(db_options_.info_log,
838 "Evict oldest blob file since DB out of space. Current "
839 "live SST file size: %" PRIu64 ", total blob size: %" PRIu64
840 ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64
841 ".",
842 live_sst_size, total_blob_size_.load(),
843 bdb_options_.max_db_size, blob_file->BlobFileNumber());
844 ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/);
845 evict_expiration_up_to_ = expiration_range.first;
846 RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
847 RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
848 blob_file->BlobCount());
849 RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
850 blob_file->GetFileSize());
851 TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
852 }
853 if (live_sst_size + total_blob_size_.load() + blob_size >
854 bdb_options_.max_db_size) {
855 return Status::NoSpace(
856 "Write failed, as writing it would exceed max_db_size limit.");
857 }
858 return Status::OK();
859 }
860
861 Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
862 const std::string& headerbuf, const Slice& key,
863 const Slice& value, uint64_t expiration,
864 std::string* index_entry) {
865 Status s;
866 uint64_t blob_offset = 0;
867 uint64_t key_offset = 0;
868 {
869 WriteLock lockbfile_w(&bfile->mutex_);
870 std::shared_ptr<Writer> writer = CheckOrCreateWriterLocked(bfile);
871 if (!writer) {
872 return Status::IOError("Failed to create blob writer");
873 }
874
875 // write the blob to the blob log.
876 s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset,
877 &blob_offset);
878 }
879
880 if (!s.ok()) {
881 ROCKS_LOG_ERROR(db_options_.info_log,
882 "Invalid status in AppendBlob: %s status: '%s'",
883 bfile->PathName().c_str(), s.ToString().c_str());
884 return s;
885 }
886
887 // increment blob count
888 bfile->blob_count_++;
889
890 uint64_t size_put = headerbuf.size() + key.size() + value.size();
891 bfile->file_size_ += size_put;
892 total_blob_size_ += size_put;
893
894 if (expiration == kNoExpiration) {
895 BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
896 value.size(), bdb_options_.compression);
897 } else {
898 BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
899 blob_offset, value.size(),
900 bdb_options_.compression);
901 }
902
903 return s;
904 }
905
906 std::vector<Status> BlobDBImpl::MultiGet(
907 const ReadOptions& read_options,
908 const std::vector<Slice>& keys, std::vector<std::string>* values) {
909 StopWatch multiget_sw(env_, statistics_, BLOB_DB_MULTIGET_MICROS);
910 RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
911 // Get a snapshot to avoid blob file get deleted between we
912 // fetch and index entry and reading from the file.
913 ReadOptions ro(read_options);
914 bool snapshot_created = SetSnapshotIfNeeded(&ro);
915
916 std::vector<Status> statuses;
917 statuses.reserve(keys.size());
918 values->clear();
919 values->reserve(keys.size());
920 PinnableSlice value;
921 for (size_t i = 0; i < keys.size(); i++) {
922 statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value));
923 values->push_back(value.ToString());
924 value.Reset();
925 }
926 if (snapshot_created) {
927 db_->ReleaseSnapshot(ro.snapshot);
928 }
929 return statuses;
930 }
931
932 bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
933 assert(read_options != nullptr);
934 if (read_options->snapshot != nullptr) {
935 return false;
936 }
937 read_options->snapshot = db_->GetSnapshot();
938 return true;
939 }
940
941 Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
942 PinnableSlice* value, uint64_t* expiration) {
943 assert(value != nullptr);
944 BlobIndex blob_index;
945 Status s = blob_index.DecodeFrom(index_entry);
946 if (!s.ok()) {
947 return s;
948 }
949 if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
950 return Status::NotFound("Key expired");
951 }
952 if (expiration != nullptr) {
953 if (blob_index.HasTTL()) {
954 *expiration = blob_index.expiration();
955 } else {
956 *expiration = kNoExpiration;
957 }
958 }
959 if (blob_index.IsInlined()) {
960 // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
961 // memory buffer to avoid extra copy.
962 value->PinSelf(blob_index.value());
963 return Status::OK();
964 }
965 if (blob_index.size() == 0) {
966 value->PinSelf("");
967 return Status::OK();
968 }
969
970 // offset has to have certain min, as we will read CRC
971 // later from the Blob Header, which needs to be also a
972 // valid offset.
973 if (blob_index.offset() <
974 (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
975 if (debug_level_ >= 2) {
976 ROCKS_LOG_ERROR(db_options_.info_log,
977 "Invalid blob index file_number: %" PRIu64
978 " blob_offset: %" PRIu64 " blob_size: %" PRIu64
979 " key: %s",
980 blob_index.file_number(), blob_index.offset(),
981 blob_index.size(), key.data());
982 }
983 return Status::NotFound("Invalid blob offset");
984 }
985
986 std::shared_ptr<BlobFile> bfile;
987 {
988 ReadLock rl(&mutex_);
989 auto hitr = blob_files_.find(blob_index.file_number());
990
991 // file was deleted
992 if (hitr == blob_files_.end()) {
993 return Status::NotFound("Blob Not Found as blob file missing");
994 }
995
996 bfile = hitr->second;
997 }
998
999 if (blob_index.size() == 0 && value != nullptr) {
1000 value->PinSelf("");
1001 return Status::OK();
1002 }
1003
1004 // takes locks when called
1005 std::shared_ptr<RandomAccessFileReader> reader;
1006 s = GetBlobFileReader(bfile, &reader);
1007 if (!s.ok()) {
1008 return s;
1009 }
1010
1011 assert(blob_index.offset() > key.size() + sizeof(uint32_t));
1012 uint64_t record_offset = blob_index.offset() - key.size() - sizeof(uint32_t);
1013 uint64_t record_size = sizeof(uint32_t) + key.size() + blob_index.size();
1014
1015 // Allocate the buffer. This is safe in C++11
1016 std::string buffer_str(static_cast<size_t>(record_size), static_cast<char>(0));
1017 char* buffer = &buffer_str[0];
1018
1019 // A partial blob record contain checksum, key and value.
1020 Slice blob_record;
1021 {
1022 StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
1023 s = reader->Read(record_offset, static_cast<size_t>(record_size), &blob_record, buffer);
1024 RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
1025 }
1026 if (!s.ok()) {
1027 ROCKS_LOG_DEBUG(db_options_.info_log,
1028 "Failed to read blob from blob file %" PRIu64
1029 ", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
1030 ", key_size: " PRIu64 ", read " PRIu64
1031 " bytes, status: '%s'",
1032 bfile->BlobFileNumber(), blob_index.offset(),
1033 blob_index.size(), key.size(), s.ToString().c_str());
1034 return s;
1035 }
1036 if (blob_record.size() != record_size) {
1037 ROCKS_LOG_DEBUG(db_options_.info_log,
1038 "Failed to read blob from blob file %" PRIu64
1039 ", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
1040 ", key_size: " PRIu64 ", read " PRIu64
1041 " bytes, status: '%s'",
1042 bfile->BlobFileNumber(), blob_index.offset(),
1043 blob_index.size(), key.size(), s.ToString().c_str());
1044
1045 return Status::Corruption("Failed to retrieve blob from blob index.");
1046 }
1047 Slice crc_slice(blob_record.data(), sizeof(uint32_t));
1048 Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
1049 static_cast<size_t>(blob_index.size()));
1050 uint32_t crc_exp;
1051 if (!GetFixed32(&crc_slice, &crc_exp)) {
1052 ROCKS_LOG_DEBUG(db_options_.info_log,
1053 "Unable to decode CRC from blob file %" PRIu64
1054 ", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
1055 ", key size: %" PRIu64 ", status: '%s'",
1056 bfile->BlobFileNumber(), blob_index.offset(),
1057 blob_index.size(), key.size(), s.ToString().c_str());
1058 return Status::Corruption("Unable to decode checksum.");
1059 }
1060
1061 uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t),
1062 blob_record.size() - sizeof(uint32_t));
1063 crc = crc32c::Mask(crc); // Adjust for storage
1064 if (crc != crc_exp) {
1065 if (debug_level_ >= 2) {
1066 ROCKS_LOG_ERROR(db_options_.info_log,
1067 "Blob crc mismatch file: %s blob_offset: %" PRIu64
1068 " blob_size: %" PRIu64 " key: %s status: '%s'",
1069 bfile->PathName().c_str(), blob_index.offset(),
1070 blob_index.size(), key.data(), s.ToString().c_str());
1071 }
1072 return Status::Corruption("Corruption. Blob CRC mismatch");
1073 }
1074
1075 if (bfile->compression() == kNoCompression) {
1076 value->PinSelf(blob_value);
1077 } else {
1078 BlockContents contents;
1079 auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
1080 {
1081 StopWatch decompression_sw(env_, statistics_,
1082 BLOB_DB_DECOMPRESSION_MICROS);
1083 UncompressionContext uncompression_ctx(bfile->compression());
1084 s = UncompressBlockContentsForCompressionType(
1085 uncompression_ctx, blob_value.data(), blob_value.size(), &contents,
1086 kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
1087 }
1088 value->PinSelf(contents.data);
1089 }
1090
1091 return s;
1092 }
1093
1094 Status BlobDBImpl::Get(const ReadOptions& read_options,
1095 ColumnFamilyHandle* column_family, const Slice& key,
1096 PinnableSlice* value) {
1097 return Get(read_options, column_family, key, value, nullptr /*expiration*/);
1098 }
1099
1100 Status BlobDBImpl::Get(const ReadOptions& read_options,
1101 ColumnFamilyHandle* column_family, const Slice& key,
1102 PinnableSlice* value, uint64_t* expiration) {
1103 StopWatch get_sw(env_, statistics_, BLOB_DB_GET_MICROS);
1104 RecordTick(statistics_, BLOB_DB_NUM_GET);
1105 return GetImpl(read_options, column_family, key, value, expiration);
1106 }
1107
1108 Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
1109 ColumnFamilyHandle* column_family, const Slice& key,
1110 PinnableSlice* value, uint64_t* expiration) {
1111 if (column_family != DefaultColumnFamily()) {
1112 return Status::NotSupported(
1113 "Blob DB doesn't support non-default column family.");
1114 }
1115 // Get a snapshot to avoid blob file get deleted between we
1116 // fetch and index entry and reading from the file.
1117 // TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
1118 ReadOptions ro(read_options);
1119 bool snapshot_created = SetSnapshotIfNeeded(&ro);
1120
1121 PinnableSlice index_entry;
1122 Status s;
1123 bool is_blob_index = false;
1124 s = db_impl_->GetImpl(ro, column_family, key, &index_entry,
1125 nullptr /*value_found*/, nullptr /*read_callback*/,
1126 &is_blob_index);
1127 TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1");
1128 TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2");
1129 if (expiration != nullptr) {
1130 *expiration = kNoExpiration;
1131 }
1132 RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ);
1133 if (s.ok()) {
1134 if (is_blob_index) {
1135 s = GetBlobValue(key, index_entry, value, expiration);
1136 } else {
1137 // The index entry is the value itself in this case.
1138 value->PinSelf(index_entry);
1139 }
1140 RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size());
1141 }
1142 if (snapshot_created) {
1143 db_->ReleaseSnapshot(ro.snapshot);
1144 }
1145 return s;
1146 }
1147
1148 std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
1149 if (aborted) {
1150 return std::make_pair(false, -1);
1151 }
1152
1153 ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check");
1154 ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" PRIu64,
1155 blob_files_.size());
1156 ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" PRIu64,
1157 open_ttl_files_.size());
1158
1159 for (auto bfile : open_ttl_files_) {
1160 assert(!bfile->Immutable());
1161 }
1162
1163 uint64_t now = EpochNow();
1164
1165 for (auto blob_file_pair : blob_files_) {
1166 auto blob_file = blob_file_pair.second;
1167 char buf[1000];
1168 int pos = snprintf(buf, sizeof(buf),
1169 "Blob file %" PRIu64 ", size %" PRIu64
1170 ", blob count %" PRIu64 ", immutable %d",
1171 blob_file->BlobFileNumber(), blob_file->GetFileSize(),
1172 blob_file->BlobCount(), blob_file->Immutable());
1173 if (blob_file->HasTTL()) {
1174 auto expiration_range = blob_file->GetExpirationRange();
1175 pos += snprintf(buf + pos, sizeof(buf) - pos,
1176 ", expiration range (%" PRIu64 ", %" PRIu64 ")",
1177 expiration_range.first, expiration_range.second);
1178 if (!blob_file->Obsolete()) {
1179 pos += snprintf(buf + pos, sizeof(buf) - pos,
1180 ", expire in %" PRIu64 " seconds",
1181 expiration_range.second - now);
1182 }
1183 }
1184 if (blob_file->Obsolete()) {
1185 pos += snprintf(buf + pos, sizeof(buf) - pos, ", obsolete at %" PRIu64,
1186 blob_file->GetObsoleteSequence());
1187 }
1188 snprintf(buf + pos, sizeof(buf) - pos, ".");
1189 ROCKS_LOG_INFO(db_options_.info_log, "%s", buf);
1190 }
1191
1192 // reschedule
1193 return std::make_pair(true, -1);
1194 }
1195
1196 Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile,
1197 bool need_lock) {
1198 assert(bfile != nullptr);
1199 write_mutex_.AssertHeld();
1200 Status s;
1201 ROCKS_LOG_INFO(db_options_.info_log,
1202 "Closing blob file %" PRIu64 ". Path: %s",
1203 bfile->BlobFileNumber(), bfile->PathName().c_str());
1204 {
1205 std::unique_ptr<WriteLock> lock;
1206 if (need_lock) {
1207 lock.reset(new WriteLock(&mutex_));
1208 }
1209
1210 if (bfile->HasTTL()) {
1211 size_t erased __attribute__((__unused__));
1212 erased = open_ttl_files_.erase(bfile);
1213 } else if (bfile == open_non_ttl_file_) {
1214 open_non_ttl_file_ = nullptr;
1215 }
1216 }
1217
1218 if (!bfile->closed_.load()) {
1219 std::unique_ptr<WriteLock> file_lock;
1220 if (need_lock) {
1221 file_lock.reset(new WriteLock(&bfile->mutex_));
1222 }
1223 s = bfile->WriteFooterAndCloseLocked();
1224 }
1225
1226 if (s.ok()) {
1227 total_blob_size_ += BlobLogFooter::kSize;
1228 } else {
1229 ROCKS_LOG_ERROR(db_options_.info_log,
1230 "Failed to close blob file %" PRIu64 "with error: %s",
1231 bfile->BlobFileNumber(), s.ToString().c_str());
1232 }
1233
1234 return s;
1235 }
1236
1237 Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
1238 // atomic read
1239 if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
1240 return Status::OK();
1241 }
1242 return CloseBlobFile(bfile);
1243 }
1244
1245 void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
1246 SequenceNumber obsolete_seq,
1247 bool update_size) {
1248 // Should hold write lock of mutex_ or during DB open.
1249 blob_file->MarkObsolete(obsolete_seq);
1250 obsolete_files_.push_back(blob_file);
1251 assert(total_blob_size_.load() >= blob_file->GetFileSize());
1252 if (update_size) {
1253 total_blob_size_ -= blob_file->GetFileSize();
1254 }
1255 }
1256
1257 bool BlobDBImpl::VisibleToActiveSnapshot(
1258 const std::shared_ptr<BlobFile>& bfile) {
1259 assert(bfile->Obsolete());
1260
1261 // We check whether the oldest snapshot is no less than the last sequence
1262 // by the time the blob file become obsolete. If so, the blob file is not
1263 // visible to all existing snapshots.
1264 //
1265 // If we keep track of the earliest sequence of the keys in the blob file,
1266 // we could instead check if there's a snapshot falls in range
1267 // [earliest_sequence, obsolete_sequence). But doing so will make the
1268 // implementation more complicated.
1269 SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
1270 SequenceNumber oldest_snapshot = kMaxSequenceNumber;
1271 {
1272 // Need to lock DBImpl mutex before access snapshot list.
1273 InstrumentedMutexLock l(db_impl_->mutex());
1274 auto& snapshots = db_impl_->snapshots();
1275 if (!snapshots.empty()) {
1276 oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
1277 }
1278 }
1279 bool visible = oldest_snapshot < obsolete_sequence;
1280 if (visible) {
1281 ROCKS_LOG_INFO(db_options_.info_log,
1282 "Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
1283 ") visible to oldest snapshot %" PRIu64 ".",
1284 bfile->BlobFileNumber(), obsolete_sequence, oldest_snapshot);
1285 }
1286 return visible;
1287 }
1288
1289 std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
1290 if (aborted) {
1291 return std::make_pair(false, -1);
1292 }
1293
1294 std::vector<std::shared_ptr<BlobFile>> process_files;
1295 uint64_t now = EpochNow();
1296 {
1297 ReadLock rl(&mutex_);
1298 for (auto p : blob_files_) {
1299 auto& blob_file = p.second;
1300 ReadLock file_lock(&blob_file->mutex_);
1301 if (blob_file->HasTTL() && !blob_file->Obsolete() &&
1302 blob_file->GetExpirationRange().second <= now) {
1303 process_files.push_back(blob_file);
1304 }
1305 }
1306 }
1307
1308 SequenceNumber seq = GetLatestSequenceNumber();
1309 {
1310 MutexLock l(&write_mutex_);
1311 for (auto& blob_file : process_files) {
1312 WriteLock file_lock(&blob_file->mutex_);
1313 if (!blob_file->Immutable()) {
1314 CloseBlobFile(blob_file, false /*need_lock*/);
1315 }
1316 // Need to double check if the file is obsolete.
1317 if (!blob_file->Obsolete()) {
1318 ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
1319 }
1320 }
1321 }
1322
1323 return std::make_pair(true, -1);
1324 }
1325
1326 Status BlobDBImpl::SyncBlobFiles() {
1327 MutexLock l(&write_mutex_);
1328
1329 std::vector<std::shared_ptr<BlobFile>> process_files;
1330 {
1331 ReadLock rl(&mutex_);
1332 for (auto fitr : open_ttl_files_) {
1333 process_files.push_back(fitr);
1334 }
1335 if (open_non_ttl_file_ != nullptr) {
1336 process_files.push_back(open_non_ttl_file_);
1337 }
1338 }
1339
1340 Status s;
1341 for (auto& blob_file : process_files) {
1342 s = blob_file->Fsync();
1343 if (!s.ok()) {
1344 ROCKS_LOG_ERROR(db_options_.info_log,
1345 "Failed to sync blob file %" PRIu64 ", status: %s",
1346 blob_file->BlobFileNumber(), s.ToString().c_str());
1347 return s;
1348 }
1349 }
1350
1351 s = dir_ent_->Fsync();
1352 if (!s.ok()) {
1353 ROCKS_LOG_ERROR(db_options_.info_log,
1354 "Failed to sync blob directory, status: %s",
1355 s.ToString().c_str());
1356 }
1357 return s;
1358 }
1359
1360 std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
1361 if (aborted) return std::make_pair(false, -1);
1362
1363 if (open_file_count_.load() < kOpenFilesTrigger) {
1364 return std::make_pair(true, -1);
1365 }
1366
1367 // in the future, we should sort by last_access_
1368 // instead of closing every file
1369 ReadLock rl(&mutex_);
1370 for (auto const& ent : blob_files_) {
1371 auto bfile = ent.second;
1372 if (bfile->last_access_.load() == -1) continue;
1373
1374 WriteLock lockbfile_w(&bfile->mutex_);
1375 CloseRandomAccessLocked(bfile);
1376 }
1377
1378 return std::make_pair(true, -1);
1379 }
1380
1381 // Write callback for garbage collection to check if key has been updated
1382 // since last read. Similar to how OptimisticTransaction works. See inline
1383 // comment in GCFileAndUpdateLSM().
1384 class BlobDBImpl::GarbageCollectionWriteCallback : public WriteCallback {
1385 public:
1386 GarbageCollectionWriteCallback(ColumnFamilyData* cfd, const Slice& key,
1387 SequenceNumber upper_bound)
1388 : cfd_(cfd), key_(key), upper_bound_(upper_bound) {}
1389
1390 virtual Status Callback(DB* db) override {
1391 auto* db_impl = reinterpret_cast<DBImpl*>(db);
1392 auto* sv = db_impl->GetAndRefSuperVersion(cfd_);
1393 SequenceNumber latest_seq = 0;
1394 bool found_record_for_key = false;
1395 bool is_blob_index = false;
1396 Status s = db_impl->GetLatestSequenceForKey(
1397 sv, key_, false /*cache_only*/, &latest_seq, &found_record_for_key,
1398 &is_blob_index);
1399 db_impl->ReturnAndCleanupSuperVersion(cfd_, sv);
1400 if (!s.ok() && !s.IsNotFound()) {
1401 // Error.
1402 assert(!s.IsBusy());
1403 return s;
1404 }
1405 if (s.IsNotFound()) {
1406 assert(!found_record_for_key);
1407 return Status::Busy("Key deleted");
1408 }
1409 assert(found_record_for_key);
1410 assert(is_blob_index);
1411 if (latest_seq > upper_bound_) {
1412 return Status::Busy("Key overwritten");
1413 }
1414 return s;
1415 }
1416
1417 virtual bool AllowWriteBatching() override { return false; }
1418
1419 private:
1420 ColumnFamilyData* cfd_;
1421 // Key to check
1422 Slice key_;
1423 // Upper bound of sequence number to proceed.
1424 SequenceNumber upper_bound_;
1425 };
1426
1427 // iterate over the blobs sequentially and check if the blob sequence number
1428 // is the latest. If it is the latest, preserve it, otherwise delete it
1429 // if it is TTL based, and the TTL has expired, then
1430 // we can blow the entity if the key is still the latest or the Key is not
1431 // found
1432 // WHAT HAPPENS IF THE KEY HAS BEEN OVERRIDEN. Then we can drop the blob
1433 // without doing anything if the earliest snapshot is not
1434 // referring to that sequence number, i.e. it is later than the sequence number
1435 // of the new key
1436 //
1437 // if it is not TTL based, then we can blow the key if the key has been
1438 // DELETED in the LSM
1439 Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
1440 GCStats* gc_stats) {
1441 StopWatch gc_sw(env_, statistics_, BLOB_DB_GC_MICROS);
1442 uint64_t now = EpochNow();
1443
1444 std::shared_ptr<Reader> reader =
1445 bfptr->OpenRandomAccessReader(env_, db_options_, env_options_);
1446 if (!reader) {
1447 ROCKS_LOG_ERROR(db_options_.info_log,
1448 "File sequential reader could not be opened",
1449 bfptr->PathName().c_str());
1450 return Status::IOError("failed to create sequential reader");
1451 }
1452
1453 BlobLogHeader header;
1454 Status s = reader->ReadHeader(&header);
1455 if (!s.ok()) {
1456 ROCKS_LOG_ERROR(db_options_.info_log,
1457 "Failure to read header for blob-file %s",
1458 bfptr->PathName().c_str());
1459 return s;
1460 }
1461
1462 auto* cfh =
1463 db_impl_->GetColumnFamilyHandleUnlocked(bfptr->column_family_id());
1464 auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
1465 auto column_family_id = cfd->GetID();
1466 bool has_ttl = header.has_ttl;
1467
1468 // this reads the key but skips the blob
1469 Reader::ReadLevel shallow = Reader::kReadHeaderKey;
1470
1471 bool file_expired = has_ttl && now >= bfptr->GetExpirationRange().second;
1472
1473 if (!file_expired) {
1474 // read the blob because you have to write it back to new file
1475 shallow = Reader::kReadHeaderKeyBlob;
1476 }
1477
1478 BlobLogRecord record;
1479 std::shared_ptr<BlobFile> newfile;
1480 std::shared_ptr<Writer> new_writer;
1481 uint64_t blob_offset = 0;
1482
1483 while (true) {
1484 assert(s.ok());
1485
1486 // Read the next blob record.
1487 Status read_record_status =
1488 reader->ReadRecord(&record, shallow, &blob_offset);
1489 // Exit if we reach the end of blob file.
1490 // TODO(yiwu): properly handle ReadRecord error.
1491 if (!read_record_status.ok()) {
1492 break;
1493 }
1494 gc_stats->blob_count++;
1495
1496 // Similar to OptimisticTransaction, we obtain latest_seq from
1497 // base DB, which is guaranteed to be no smaller than the sequence of
1498 // current key. We use a WriteCallback on write to check the key sequence
1499 // on write. If the key sequence is larger than latest_seq, we know
1500 // a new versions is inserted and the old blob can be disgard.
1501 //
1502 // We cannot use OptimisticTransaction because we need to pass
1503 // is_blob_index flag to GetImpl.
1504 SequenceNumber latest_seq = GetLatestSequenceNumber();
1505 bool is_blob_index = false;
1506 PinnableSlice index_entry;
1507 Status get_status = db_impl_->GetImpl(
1508 ReadOptions(), cfh, record.key, &index_entry, nullptr /*value_found*/,
1509 nullptr /*read_callback*/, &is_blob_index);
1510 TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB");
1511 if (!get_status.ok() && !get_status.IsNotFound()) {
1512 // error
1513 s = get_status;
1514 ROCKS_LOG_ERROR(db_options_.info_log,
1515 "Error while getting index entry: %s",
1516 s.ToString().c_str());
1517 break;
1518 }
1519 if (get_status.IsNotFound() || !is_blob_index) {
1520 // Either the key is deleted or updated with a newer version whish is
1521 // inlined in LSM.
1522 gc_stats->num_keys_overwritten++;
1523 gc_stats->bytes_overwritten += record.record_size();
1524 continue;
1525 }
1526
1527 BlobIndex blob_index;
1528 s = blob_index.DecodeFrom(index_entry);
1529 if (!s.ok()) {
1530 ROCKS_LOG_ERROR(db_options_.info_log,
1531 "Error while decoding index entry: %s",
1532 s.ToString().c_str());
1533 break;
1534 }
1535 if (blob_index.IsInlined() ||
1536 blob_index.file_number() != bfptr->BlobFileNumber() ||
1537 blob_index.offset() != blob_offset) {
1538 // Key has been overwritten. Drop the blob record.
1539 gc_stats->num_keys_overwritten++;
1540 gc_stats->bytes_overwritten += record.record_size();
1541 continue;
1542 }
1543
1544 GarbageCollectionWriteCallback callback(cfd, record.key, latest_seq);
1545
1546 // If key has expired, remove it from base DB.
1547 // TODO(yiwu): Blob indexes will be remove by BlobIndexCompactionFilter.
1548 // We can just drop the blob record.
1549 if (file_expired || (has_ttl && now >= record.expiration)) {
1550 gc_stats->num_keys_expired++;
1551 gc_stats->bytes_expired += record.record_size();
1552 TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete");
1553 WriteBatch delete_batch;
1554 Status delete_status = delete_batch.Delete(record.key);
1555 if (delete_status.ok()) {
1556 delete_status = db_impl_->WriteWithCallback(WriteOptions(),
1557 &delete_batch, &callback);
1558 }
1559 if (!delete_status.ok() && !delete_status.IsBusy()) {
1560 // We hit an error.
1561 s = delete_status;
1562 ROCKS_LOG_ERROR(db_options_.info_log,
1563 "Error while deleting expired key: %s",
1564 s.ToString().c_str());
1565 break;
1566 }
1567 // Continue to next blob record or retry.
1568 continue;
1569 }
1570
1571 // Relocate the blob record to new file.
1572 if (!newfile) {
1573 // new file
1574 std::string reason("GC of ");
1575 reason += bfptr->PathName();
1576 newfile = NewBlobFile(reason);
1577
1578 new_writer = CheckOrCreateWriterLocked(newfile);
1579 // Can't use header beyond this point
1580 newfile->header_ = std::move(header);
1581 newfile->header_valid_ = true;
1582 newfile->file_size_ = BlobLogHeader::kSize;
1583 newfile->SetColumnFamilyId(bfptr->column_family_id());
1584 newfile->SetHasTTL(bfptr->HasTTL());
1585 newfile->SetCompression(bfptr->compression());
1586 newfile->expiration_range_ = bfptr->expiration_range_;
1587
1588 s = new_writer->WriteHeader(newfile->header_);
1589 if (!s.ok()) {
1590 ROCKS_LOG_ERROR(db_options_.info_log,
1591 "File: %s - header writing failed",
1592 newfile->PathName().c_str());
1593 break;
1594 }
1595
1596 // We don't add the file to open_ttl_files_ or open_non_ttl_files_, to
1597 // avoid user writes writing to the file, and avoid
1598 // EvictExpiredFiles close the file by mistake.
1599 WriteLock wl(&mutex_);
1600 blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile));
1601 }
1602
1603 std::string new_index_entry;
1604 uint64_t new_blob_offset = 0;
1605 uint64_t new_key_offset = 0;
1606 // write the blob to the blob log.
1607 s = new_writer->AddRecord(record.key, record.value, record.expiration,
1608 &new_key_offset, &new_blob_offset);
1609
1610 BlobIndex::EncodeBlob(&new_index_entry, newfile->BlobFileNumber(),
1611 new_blob_offset, record.value.size(),
1612 bdb_options_.compression);
1613
1614 newfile->blob_count_++;
1615 newfile->file_size_ +=
1616 BlobLogRecord::kHeaderSize + record.key.size() + record.value.size();
1617
1618 TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate");
1619 WriteBatch rewrite_batch;
1620 Status rewrite_status = WriteBatchInternal::PutBlobIndex(
1621 &rewrite_batch, column_family_id, record.key, new_index_entry);
1622 if (rewrite_status.ok()) {
1623 rewrite_status = db_impl_->WriteWithCallback(WriteOptions(),
1624 &rewrite_batch, &callback);
1625 }
1626 if (rewrite_status.ok()) {
1627 gc_stats->num_keys_relocated++;
1628 gc_stats->bytes_relocated += record.record_size();
1629 } else if (rewrite_status.IsBusy()) {
1630 // The key is overwritten in the meanwhile. Drop the blob record.
1631 gc_stats->num_keys_overwritten++;
1632 gc_stats->bytes_overwritten += record.record_size();
1633 } else {
1634 // We hit an error.
1635 s = rewrite_status;
1636 ROCKS_LOG_ERROR(db_options_.info_log, "Error while relocating key: %s",
1637 s.ToString().c_str());
1638 break;
1639 }
1640 } // end of ReadRecord loop
1641
1642 {
1643 WriteLock wl(&mutex_);
1644 ObsoleteBlobFile(bfptr, GetLatestSequenceNumber(), true /*update_size*/);
1645 }
1646
1647 ROCKS_LOG_INFO(
1648 db_options_.info_log,
1649 "%s blob file %" PRIu64 ". Total blob records: %" PRIu64
1650 ", expired: %" PRIu64 " keys/%" PRIu64
1651 " bytes, updated or deleted by user: %" PRIu64 " keys/%" PRIu64
1652 " bytes, rewrite to new file: %" PRIu64 " keys/%" PRIu64 " bytes.",
1653 s.ok() ? "Successfully garbage collected" : "Failed to garbage collect",
1654 bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->num_keys_expired,
1655 gc_stats->bytes_expired, gc_stats->num_keys_overwritten,
1656 gc_stats->bytes_overwritten, gc_stats->num_keys_relocated,
1657 gc_stats->bytes_relocated);
1658 RecordTick(statistics_, BLOB_DB_GC_NUM_FILES);
1659 RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_OVERWRITTEN,
1660 gc_stats->num_keys_overwritten);
1661 RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_EXPIRED,
1662 gc_stats->num_keys_expired);
1663 RecordTick(statistics_, BLOB_DB_GC_BYTES_OVERWRITTEN,
1664 gc_stats->bytes_overwritten);
1665 RecordTick(statistics_, BLOB_DB_GC_BYTES_EXPIRED, gc_stats->bytes_expired);
1666 if (newfile != nullptr) {
1667 {
1668 MutexLock l(&write_mutex_);
1669 CloseBlobFile(newfile);
1670 }
1671 total_blob_size_ += newfile->file_size_;
1672 ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".",
1673 newfile->BlobFileNumber());
1674 RecordTick(statistics_, BLOB_DB_GC_NUM_NEW_FILES);
1675 RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
1676 gc_stats->num_keys_relocated);
1677 RecordTick(statistics_, BLOB_DB_GC_BYTES_RELOCATED,
1678 gc_stats->bytes_relocated);
1679 }
1680 if (!s.ok()) {
1681 RecordTick(statistics_, BLOB_DB_GC_FAILURES);
1682 }
1683 return s;
1684 }
1685
1686 std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
1687 if (aborted) {
1688 return std::make_pair(false, -1);
1689 }
1690
1691 MutexLock delete_file_lock(&delete_file_mutex_);
1692 if (disable_file_deletions_ > 0) {
1693 return std::make_pair(true, -1);
1694 }
1695
1696 std::list<std::shared_ptr<BlobFile>> tobsolete;
1697 {
1698 WriteLock wl(&mutex_);
1699 if (obsolete_files_.empty()) {
1700 return std::make_pair(true, -1);
1701 }
1702 tobsolete.swap(obsolete_files_);
1703 }
1704
1705 bool file_deleted = false;
1706 for (auto iter = tobsolete.begin(); iter != tobsolete.end();) {
1707 auto bfile = *iter;
1708 {
1709 ReadLock lockbfile_r(&bfile->mutex_);
1710 if (VisibleToActiveSnapshot(bfile)) {
1711 ROCKS_LOG_INFO(db_options_.info_log,
1712 "Could not delete file due to snapshot failure %s",
1713 bfile->PathName().c_str());
1714 ++iter;
1715 continue;
1716 }
1717 }
1718 ROCKS_LOG_INFO(db_options_.info_log,
1719 "Will delete file due to snapshot success %s",
1720 bfile->PathName().c_str());
1721
1722 blob_files_.erase(bfile->BlobFileNumber());
1723 Status s = env_->DeleteFile(bfile->PathName());
1724 if (!s.ok()) {
1725 ROCKS_LOG_ERROR(db_options_.info_log,
1726 "File failed to be deleted as obsolete %s",
1727 bfile->PathName().c_str());
1728 ++iter;
1729 continue;
1730 }
1731
1732 file_deleted = true;
1733 ROCKS_LOG_INFO(db_options_.info_log,
1734 "File deleted as obsolete from blob dir %s",
1735 bfile->PathName().c_str());
1736
1737 iter = tobsolete.erase(iter);
1738 }
1739
1740 // directory change. Fsync
1741 if (file_deleted) {
1742 Status s = dir_ent_->Fsync();
1743 if (!s.ok()) {
1744 ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s",
1745 blob_dir_.c_str(), s.ToString().c_str());
1746 }
1747 }
1748
1749 // put files back into obsolete if for some reason, delete failed
1750 if (!tobsolete.empty()) {
1751 WriteLock wl(&mutex_);
1752 for (auto bfile : tobsolete) {
1753 obsolete_files_.push_front(bfile);
1754 }
1755 }
1756
1757 return std::make_pair(!aborted, -1);
1758 }
1759
1760 void BlobDBImpl::CopyBlobFiles(
1761 std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
1762 ReadLock rl(&mutex_);
1763 for (auto const& p : blob_files_) {
1764 bfiles_copy->push_back(p.second);
1765 }
1766 }
1767
1768 std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
1769 if (aborted) {
1770 return std::make_pair(false, -1);
1771 }
1772
1773 // TODO(yiwu): Garbage collection implementation.
1774
1775 // reschedule
1776 return std::make_pair(true, -1);
1777 }
1778
1779 Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
1780 auto* cfd =
1781 reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
1782 // Get a snapshot to avoid blob file get deleted between we
1783 // fetch and index entry and reading from the file.
1784 ManagedSnapshot* own_snapshot = nullptr;
1785 const Snapshot* snapshot = read_options.snapshot;
1786 if (snapshot == nullptr) {
1787 own_snapshot = new ManagedSnapshot(db_);
1788 snapshot = own_snapshot->snapshot();
1789 }
1790 auto* iter = db_impl_->NewIteratorImpl(
1791 read_options, cfd, snapshot->GetSequenceNumber(),
1792 nullptr /*read_callback*/, true /*allow_blob*/);
1793 return new BlobDBIterator(own_snapshot, iter, this, env_, statistics_);
1794 }
1795
1796 Status DestroyBlobDB(const std::string& dbname, const Options& options,
1797 const BlobDBOptions& bdb_options) {
1798 const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
1799 Env* env = soptions.env;
1800
1801 Status status;
1802 std::string blobdir;
1803 blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir
1804 : bdb_options.blob_dir;
1805
1806 std::vector<std::string> filenames;
1807 env->GetChildren(blobdir, &filenames);
1808
1809 for (const auto& f : filenames) {
1810 uint64_t number;
1811 FileType type;
1812 if (ParseFileName(f, &number, &type) && type == kBlobFile) {
1813 Status del = env->DeleteFile(blobdir + "/" + f);
1814 if (status.ok() && !del.ok()) {
1815 status = del;
1816 }
1817 }
1818 }
1819 env->DeleteDir(blobdir);
1820
1821 Status destroy = DestroyDB(dbname, options);
1822 if (status.ok() && !destroy.ok()) {
1823 status = destroy;
1824 }
1825
1826 return status;
1827 }
1828
1829 #ifndef NDEBUG
1830 Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
1831 PinnableSlice* value) {
1832 return GetBlobValue(key, index_entry, value);
1833 }
1834
1835 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
1836 ReadLock l(&mutex_);
1837 std::vector<std::shared_ptr<BlobFile>> blob_files;
1838 for (auto& p : blob_files_) {
1839 blob_files.emplace_back(p.second);
1840 }
1841 return blob_files;
1842 }
1843
1844 std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
1845 const {
1846 ReadLock l(&mutex_);
1847 std::vector<std::shared_ptr<BlobFile>> obsolete_files;
1848 for (auto& bfile : obsolete_files_) {
1849 obsolete_files.emplace_back(bfile);
1850 }
1851 return obsolete_files;
1852 }
1853
1854 void BlobDBImpl::TEST_DeleteObsoleteFiles() {
1855 DeleteObsoleteFiles(false /*abort*/);
1856 }
1857
1858 Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
1859 MutexLock l(&write_mutex_);
1860 return CloseBlobFile(bfile);
1861 }
1862
1863 void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
1864 SequenceNumber obsolete_seq,
1865 bool update_size) {
1866 return ObsoleteBlobFile(blob_file, obsolete_seq, update_size);
1867 }
1868
1869 Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
1870 GCStats* gc_stats) {
1871 return GCFileAndUpdateLSM(bfile, gc_stats);
1872 }
1873
1874 void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); }
1875
1876 void BlobDBImpl::TEST_EvictExpiredFiles() {
1877 EvictExpiredFiles(false /*abort*/);
1878 }
1879
1880 uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
1881 #endif // !NDEBUG
1882
1883 } // namespace blob_db
1884 } // namespace rocksdb
1885 #endif // ROCKSDB_LITE