1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl.h"
11 #ifndef __STDC_FORMAT_MACROS
12 #define __STDC_FORMAT_MACROS
16 #include "db/builder.h"
17 #include "options/options_helper.h"
18 #include "rocksdb/wal_filter.h"
19 #include "util/sst_file_manager_impl.h"
20 #include "util/sync_point.h"
23 Options
SanitizeOptions(const std::string
& dbname
,
25 auto db_options
= SanitizeOptions(dbname
, DBOptions(src
));
26 ImmutableDBOptions
immutable_db_options(db_options
);
28 SanitizeOptions(immutable_db_options
, ColumnFamilyOptions(src
));
29 return Options(db_options
, cf_options
);
32 DBOptions
SanitizeOptions(const std::string
& dbname
, const DBOptions
& src
) {
33 DBOptions
result(src
);
35 // result.max_open_files means an "infinite" open files.
36 if (result
.max_open_files
!= -1) {
37 int max_max_open_files
= port::GetMaxOpenFiles();
38 if (max_max_open_files
== -1) {
39 max_max_open_files
= 1000000;
41 ClipToRange(&result
.max_open_files
, 20, max_max_open_files
);
44 if (result
.info_log
== nullptr) {
45 Status s
= CreateLoggerFromOptions(dbname
, result
, &result
.info_log
);
47 // No place suitable for logging
48 result
.info_log
= nullptr;
52 if (!result
.write_buffer_manager
) {
53 result
.write_buffer_manager
.reset(
54 new WriteBufferManager(result
.db_write_buffer_size
));
56 if (result
.base_background_compactions
== -1) {
57 result
.base_background_compactions
= result
.max_background_compactions
;
59 if (result
.base_background_compactions
> result
.max_background_compactions
) {
60 result
.base_background_compactions
= result
.max_background_compactions
;
62 result
.env
->IncBackgroundThreadsIfNeeded(src
.max_background_compactions
,
64 result
.env
->IncBackgroundThreadsIfNeeded(src
.max_background_flushes
,
67 if (result
.rate_limiter
.get() != nullptr) {
68 if (result
.bytes_per_sync
== 0) {
69 result
.bytes_per_sync
= 1024 * 1024;
73 if (result
.WAL_ttl_seconds
> 0 || result
.WAL_size_limit_MB
> 0) {
74 result
.recycle_log_file_num
= false;
77 if (result
.recycle_log_file_num
&&
78 (result
.wal_recovery_mode
== WALRecoveryMode::kPointInTimeRecovery
||
79 result
.wal_recovery_mode
== WALRecoveryMode::kAbsoluteConsistency
)) {
80 // kPointInTimeRecovery is indistinguishable from
81 // kTolerateCorruptedTailRecords in recycle mode since we define
82 // the "end" of the log as the first corrupt record we encounter.
83 // kAbsoluteConsistency doesn't make sense because even a clean
84 // shutdown leaves old junk at the end of the log file.
85 result
.wal_recovery_mode
= WALRecoveryMode::kTolerateCorruptedTailRecords
;
88 if (result
.wal_dir
.empty()) {
89 // Use dbname as default
90 result
.wal_dir
= dbname
;
92 if (result
.wal_dir
.back() == '/') {
93 result
.wal_dir
= result
.wal_dir
.substr(0, result
.wal_dir
.size() - 1);
96 if (result
.db_paths
.size() == 0) {
97 result
.db_paths
.emplace_back(dbname
, std::numeric_limits
<uint64_t>::max());
100 if (result
.use_direct_reads
&& result
.compaction_readahead_size
== 0) {
101 result
.compaction_readahead_size
= 1024 * 1024 * 2;
104 if (result
.compaction_readahead_size
> 0 ||
105 result
.use_direct_io_for_flush_and_compaction
) {
106 result
.new_table_reader_for_compaction_inputs
= true;
109 // Force flush on DB open if 2PC is enabled, since with 2PC we have no
110 // guarantee that consecutive log files have consecutive sequence id, which
111 // make recovery complicated.
112 if (result
.allow_2pc
) {
113 result
.avoid_flush_during_recovery
= false;
121 Status
SanitizeOptionsByTable(
122 const DBOptions
& db_opts
,
123 const std::vector
<ColumnFamilyDescriptor
>& column_families
) {
125 for (auto cf
: column_families
) {
126 s
= cf
.options
.table_factory
->SanitizeOptions(db_opts
, cf
.options
);
134 static Status
ValidateOptions(
135 const DBOptions
& db_options
,
136 const std::vector
<ColumnFamilyDescriptor
>& column_families
) {
139 for (auto& cfd
: column_families
) {
140 s
= CheckCompressionSupported(cfd
.options
);
141 if (s
.ok() && db_options
.allow_concurrent_memtable_write
) {
142 s
= CheckConcurrentWritesSupported(cfd
.options
);
147 if (db_options
.db_paths
.size() > 1) {
148 if ((cfd
.options
.compaction_style
!= kCompactionStyleUniversal
) &&
149 (cfd
.options
.compaction_style
!= kCompactionStyleLevel
)) {
150 return Status::NotSupported(
151 "More than one DB paths are only supported in "
152 "universal and level compaction styles. ");
157 if (db_options
.db_paths
.size() > 4) {
158 return Status::NotSupported(
159 "More than four DB paths are not supported yet. ");
162 if (db_options
.allow_mmap_reads
&& db_options
.use_direct_reads
) {
163 // Protect against assert in PosixMMapReadableFile constructor
164 return Status::NotSupported(
165 "If memory mapped reads (allow_mmap_reads) are enabled "
166 "then direct I/O reads (use_direct_reads) must be disabled. ");
169 if (db_options
.allow_mmap_writes
&&
170 db_options
.use_direct_io_for_flush_and_compaction
) {
171 return Status::NotSupported(
172 "If memory mapped writes (allow_mmap_writes) are enabled "
173 "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
177 if (db_options
.keep_log_file_num
== 0) {
178 return Status::InvalidArgument("keep_log_file_num must be greater than 0");
184 Status
DBImpl::NewDB() {
186 new_db
.SetLogNumber(0);
187 new_db
.SetNextFile(2);
188 new_db
.SetLastSequence(0);
192 ROCKS_LOG_INFO(immutable_db_options_
.info_log
, "Creating manifest 1 \n");
193 const std::string manifest
= DescriptorFileName(dbname_
, 1);
195 unique_ptr
<WritableFile
> file
;
196 EnvOptions env_options
= env_
->OptimizeForManifestWrite(env_options_
);
197 s
= NewWritableFile(env_
, manifest
, &file
, env_options
);
201 file
->SetPreallocationBlockSize(
202 immutable_db_options_
.manifest_preallocation_size
);
203 unique_ptr
<WritableFileWriter
> file_writer(
204 new WritableFileWriter(std::move(file
), env_options
));
205 log::Writer
log(std::move(file_writer
), 0, false);
207 new_db
.EncodeTo(&record
);
208 s
= log
.AddRecord(record
);
210 s
= SyncManifest(env_
, &immutable_db_options_
, log
.file());
214 // Make "CURRENT" file that points to the new manifest file.
215 s
= SetCurrentFile(env_
, dbname_
, 1, directories_
.GetDbDir());
217 env_
->DeleteFile(manifest
);
222 Status
DBImpl::Directories::CreateAndNewDirectory(
223 Env
* env
, const std::string
& dirname
,
224 std::unique_ptr
<Directory
>* directory
) const {
225 // We call CreateDirIfMissing() as the directory may already exist (if we
226 // are reopening a DB), when this happens we don't want creating the
227 // directory to cause an error. However, we need to check if creating the
228 // directory fails or else we may get an obscure message about the lock
229 // file not existing. One real-world example of this occurring is if
230 // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
231 // when dbname_ is "dir/db" but when "dir" doesn't exist.
232 Status s
= env
->CreateDirIfMissing(dirname
);
236 return env
->NewDirectory(dirname
, directory
);
239 Status
DBImpl::Directories::SetDirectories(
240 Env
* env
, const std::string
& dbname
, const std::string
& wal_dir
,
241 const std::vector
<DbPath
>& data_paths
) {
242 Status s
= CreateAndNewDirectory(env
, dbname
, &db_dir_
);
246 if (!wal_dir
.empty() && dbname
!= wal_dir
) {
247 s
= CreateAndNewDirectory(env
, wal_dir
, &wal_dir_
);
254 for (auto& p
: data_paths
) {
255 const std::string db_path
= p
.path
;
256 if (db_path
== dbname
) {
257 data_dirs_
.emplace_back(nullptr);
259 std::unique_ptr
<Directory
> path_directory
;
260 s
= CreateAndNewDirectory(env
, db_path
, &path_directory
);
264 data_dirs_
.emplace_back(path_directory
.release());
267 assert(data_dirs_
.size() == data_paths
.size());
271 Status
DBImpl::Recover(
272 const std::vector
<ColumnFamilyDescriptor
>& column_families
, bool read_only
,
273 bool error_if_log_file_exist
, bool error_if_data_exists_in_logs
) {
276 bool is_new_db
= false;
277 assert(db_lock_
== nullptr);
279 Status s
= directories_
.SetDirectories(env_
, dbname_
,
280 immutable_db_options_
.wal_dir
,
281 immutable_db_options_
.db_paths
);
286 s
= env_
->LockFile(LockFileName(dbname_
), &db_lock_
);
291 s
= env_
->FileExists(CurrentFileName(dbname_
));
292 if (s
.IsNotFound()) {
293 if (immutable_db_options_
.create_if_missing
) {
300 return Status::InvalidArgument(
301 dbname_
, "does not exist (create_if_missing is false)");
304 if (immutable_db_options_
.error_if_exists
) {
305 return Status::InvalidArgument(
306 dbname_
, "exists (error_if_exists is true)");
309 // Unexpected error reading file
310 assert(s
.IsIOError());
313 // Check for the IDENTITY file and create it if not there
314 s
= env_
->FileExists(IdentityFileName(dbname_
));
315 if (s
.IsNotFound()) {
316 s
= SetIdentityFile(env_
, dbname_
);
320 } else if (!s
.ok()) {
321 assert(s
.IsIOError());
326 Status s
= versions_
->Recover(column_families
, read_only
);
327 if (immutable_db_options_
.paranoid_checks
&& s
.ok()) {
328 s
= CheckConsistency();
331 SequenceNumber
next_sequence(kMaxSequenceNumber
);
332 default_cf_handle_
= new ColumnFamilyHandleImpl(
333 versions_
->GetColumnFamilySet()->GetDefault(), this, &mutex_
);
334 default_cf_internal_stats_
= default_cf_handle_
->cfd()->internal_stats();
335 single_column_family_mode_
=
336 versions_
->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
338 // Recover from all newer log files than the ones named in the
339 // descriptor (new log files may have been added by the previous
340 // incarnation without registering them in the descriptor).
342 // Note that prev_log_number() is no longer used, but we pay
343 // attention to it in case we are recovering a database
344 // produced by an older version of rocksdb.
345 std::vector
<std::string
> filenames
;
346 s
= env_
->GetChildren(immutable_db_options_
.wal_dir
, &filenames
);
351 std::vector
<uint64_t> logs
;
352 for (size_t i
= 0; i
< filenames
.size(); i
++) {
355 if (ParseFileName(filenames
[i
], &number
, &type
) && type
== kLogFile
) {
357 return Status::Corruption(
358 "While creating a new Db, wal_dir contains "
359 "existing log file: ",
362 logs
.push_back(number
);
367 if (logs
.size() > 0) {
368 if (error_if_log_file_exist
) {
369 return Status::Corruption(
370 "The db was opened in readonly mode with error_if_log_file_exist"
371 "flag but a log file already exists");
372 } else if (error_if_data_exists_in_logs
) {
373 for (auto& log
: logs
) {
374 std::string fname
= LogFileName(immutable_db_options_
.wal_dir
, log
);
376 s
= env_
->GetFileSize(fname
, &bytes
);
379 return Status::Corruption(
380 "error_if_data_exists_in_logs is set but there are data "
389 // Recover in the order in which the logs were generated
390 std::sort(logs
.begin(), logs
.end());
391 s
= RecoverLogFiles(logs
, &next_sequence
, read_only
);
393 // Clear memtables if recovery failed
394 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
395 cfd
->CreateNewMemtable(*cfd
->GetLatestMutableCFOptions(),
403 max_total_in_memory_state_
= 0;
404 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
405 auto* mutable_cf_options
= cfd
->GetLatestMutableCFOptions();
406 max_total_in_memory_state_
+= mutable_cf_options
->write_buffer_size
*
407 mutable_cf_options
->max_write_buffer_number
;
413 // REQUIRES: log_numbers are sorted in ascending order
414 Status
DBImpl::RecoverLogFiles(const std::vector
<uint64_t>& log_numbers
,
415 SequenceNumber
* next_sequence
, bool read_only
) {
416 struct LogReporter
: public log::Reader::Reporter
{
420 Status
* status
; // nullptr if immutable_db_options_.paranoid_checks==false
421 virtual void Corruption(size_t bytes
, const Status
& s
) override
{
422 ROCKS_LOG_WARN(info_log
, "%s%s: dropping %d bytes; %s",
423 (this->status
== nullptr ? "(ignoring error) " : ""),
424 fname
, static_cast<int>(bytes
), s
.ToString().c_str());
425 if (this->status
!= nullptr && this->status
->ok()) {
433 std::unordered_map
<int, VersionEdit
> version_edits
;
434 // no need to refcount because iteration is under mutex
435 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
437 edit
.SetColumnFamily(cfd
->GetID());
438 version_edits
.insert({cfd
->GetID(), edit
});
440 int job_id
= next_job_id_
.fetch_add(1);
442 auto stream
= event_logger_
.Log();
443 stream
<< "job" << job_id
<< "event"
444 << "recovery_started";
445 stream
<< "log_files";
447 for (auto log_number
: log_numbers
) {
448 stream
<< log_number
;
454 if (immutable_db_options_
.wal_filter
!= nullptr) {
455 std::map
<std::string
, uint32_t> cf_name_id_map
;
456 std::map
<uint32_t, uint64_t> cf_lognumber_map
;
457 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
458 cf_name_id_map
.insert(
459 std::make_pair(cfd
->GetName(), cfd
->GetID()));
460 cf_lognumber_map
.insert(
461 std::make_pair(cfd
->GetID(), cfd
->GetLogNumber()));
464 immutable_db_options_
.wal_filter
->ColumnFamilyLogNumberMap(cf_lognumber_map
,
469 bool stop_replay_by_wal_filter
= false;
470 bool stop_replay_for_corruption
= false;
471 bool flushed
= false;
472 for (auto log_number
: log_numbers
) {
473 // The previous incarnation may not have written any MANIFEST
474 // records after allocating this log number. So we manually
475 // update the file number allocation counter in VersionSet.
476 versions_
->MarkFileNumberUsedDuringRecovery(log_number
);
478 std::string fname
= LogFileName(immutable_db_options_
.wal_dir
, log_number
);
480 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
481 "Recovering log #%" PRIu64
" mode %d", log_number
,
482 immutable_db_options_
.wal_recovery_mode
);
483 auto logFileDropped
= [this, &fname
]() {
485 if (env_
->GetFileSize(fname
, &bytes
).ok()) {
486 auto info_log
= immutable_db_options_
.info_log
.get();
487 ROCKS_LOG_WARN(info_log
, "%s: dropping %d bytes", fname
.c_str(),
488 static_cast<int>(bytes
));
491 if (stop_replay_by_wal_filter
) {
496 unique_ptr
<SequentialFileReader
> file_reader
;
498 unique_ptr
<SequentialFile
> file
;
499 status
= env_
->NewSequentialFile(fname
, &file
, env_options_
);
501 MaybeIgnoreError(&status
);
505 // Fail with one log file, but that's ok.
510 file_reader
.reset(new SequentialFileReader(std::move(file
)));
513 // Create the log reader.
514 LogReporter reporter
;
516 reporter
.info_log
= immutable_db_options_
.info_log
.get();
517 reporter
.fname
= fname
.c_str();
518 if (!immutable_db_options_
.paranoid_checks
||
519 immutable_db_options_
.wal_recovery_mode
==
520 WALRecoveryMode::kSkipAnyCorruptedRecords
) {
521 reporter
.status
= nullptr;
523 reporter
.status
= &status
;
525 // We intentially make log::Reader do checksumming even if
526 // paranoid_checks==false so that corruptions cause entire commits
527 // to be skipped instead of propagating bad information (like overly
528 // large sequence numbers).
529 log::Reader
reader(immutable_db_options_
.info_log
, std::move(file_reader
),
530 &reporter
, true /*checksum*/, 0 /*initial_offset*/,
533 // Determine if we should tolerate incomplete records at the tail end of the
534 // Read all the records and add to a memtable
539 while (!stop_replay_by_wal_filter
&&
540 reader
.ReadRecord(&record
, &scratch
,
541 immutable_db_options_
.wal_recovery_mode
) &&
543 if (record
.size() < WriteBatchInternal::kHeader
) {
544 reporter
.Corruption(record
.size(),
545 Status::Corruption("log record too small"));
548 WriteBatchInternal::SetContents(&batch
, record
);
549 SequenceNumber sequence
= WriteBatchInternal::Sequence(&batch
);
551 if (immutable_db_options_
.wal_recovery_mode
==
552 WALRecoveryMode::kPointInTimeRecovery
) {
553 // In point-in-time recovery mode, if sequence id of log files are
554 // consecutive, we continue recovery despite corruption. This could
555 // happen when we open and write to a corrupted DB, where sequence id
556 // will start from the last sequence id we recovered.
557 if (sequence
== *next_sequence
) {
558 stop_replay_for_corruption
= false;
560 if (stop_replay_for_corruption
) {
567 if (immutable_db_options_
.wal_filter
!= nullptr) {
568 WriteBatch new_batch
;
569 bool batch_changed
= false;
571 WalFilter::WalProcessingOption wal_processing_option
=
572 immutable_db_options_
.wal_filter
->LogRecordFound(
573 log_number
, fname
, batch
, &new_batch
, &batch_changed
);
575 switch (wal_processing_option
) {
576 case WalFilter::WalProcessingOption::kContinueProcessing
:
577 // do nothing, proceeed normally
579 case WalFilter::WalProcessingOption::kIgnoreCurrentRecord
:
580 // skip current record
582 case WalFilter::WalProcessingOption::kStopReplay
:
583 // skip current record and stop replay
584 stop_replay_by_wal_filter
= true;
586 case WalFilter::WalProcessingOption::kCorruptedRecord
: {
588 Status::Corruption("Corruption reported by Wal Filter ",
589 immutable_db_options_
.wal_filter
->Name());
590 MaybeIgnoreError(&status
);
592 reporter
.Corruption(record
.size(), status
);
598 assert(false); // unhandled case
599 status
= Status::NotSupported(
600 "Unknown WalProcessingOption returned"
602 immutable_db_options_
.wal_filter
->Name());
603 MaybeIgnoreError(&status
);
607 // Ignore the error with current record processing.
614 // Make sure that the count in the new batch is
615 // within the orignal count.
616 int new_count
= WriteBatchInternal::Count(&new_batch
);
617 int original_count
= WriteBatchInternal::Count(&batch
);
618 if (new_count
> original_count
) {
620 immutable_db_options_
.info_log
,
621 "Recovering log #%" PRIu64
622 " mode %d log filter %s returned "
623 "more records (%d) than original (%d) which is not allowed. "
624 "Aborting recovery.",
625 log_number
, immutable_db_options_
.wal_recovery_mode
,
626 immutable_db_options_
.wal_filter
->Name(), new_count
,
628 status
= Status::NotSupported(
629 "More than original # of records "
630 "returned by Wal Filter ",
631 immutable_db_options_
.wal_filter
->Name());
634 // Set the same sequence number in the new_batch
635 // as the original batch.
636 WriteBatchInternal::SetSequence(&new_batch
,
637 WriteBatchInternal::Sequence(&batch
));
641 #endif // ROCKSDB_LITE
643 // If column family was not found, it might mean that the WAL write
644 // batch references to the column family that was dropped after the
645 // insert. We don't want to fail the whole write batch in that case --
646 // we just ignore the update.
647 // That's why we set ignore missing column families to true
648 bool has_valid_writes
= false;
649 status
= WriteBatchInternal::InsertInto(
650 &batch
, column_family_memtables_
.get(), &flush_scheduler_
, true,
651 log_number
, this, false /* concurrent_memtable_writes */,
652 next_sequence
, &has_valid_writes
);
653 MaybeIgnoreError(&status
);
655 // We are treating this as a failure while reading since we read valid
656 // blocks that do not form coherent data
657 reporter
.Corruption(record
.size(), status
);
661 if (has_valid_writes
&& !read_only
) {
662 // we can do this because this is called before client has access to the
663 // DB and there is only a single thread operating on DB
664 ColumnFamilyData
* cfd
;
666 while ((cfd
= flush_scheduler_
.TakeNextColumnFamily()) != nullptr) {
668 // If this asserts, it means that InsertInto failed in
669 // filtering updates to already-flushed column families
670 assert(cfd
->GetLogNumber() <= log_number
);
671 auto iter
= version_edits
.find(cfd
->GetID());
672 assert(iter
!= version_edits
.end());
673 VersionEdit
* edit
= &iter
->second
;
674 status
= WriteLevel0TableForRecovery(job_id
, cfd
, cfd
->mem(), edit
);
676 // Reflect errors immediately so that conditions like full
677 // file-systems cause the DB::Open() to fail.
682 cfd
->CreateNewMemtable(*cfd
->GetLatestMutableCFOptions(),
689 if (immutable_db_options_
.wal_recovery_mode
==
690 WALRecoveryMode::kSkipAnyCorruptedRecords
) {
691 // We should ignore all errors unconditionally
692 status
= Status::OK();
693 } else if (immutable_db_options_
.wal_recovery_mode
==
694 WALRecoveryMode::kPointInTimeRecovery
) {
695 // We should ignore the error but not continue replaying
696 status
= Status::OK();
697 stop_replay_for_corruption
= true;
698 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
699 "Point in time recovered to log #%" PRIu64
701 log_number
, *next_sequence
);
703 assert(immutable_db_options_
.wal_recovery_mode
==
704 WALRecoveryMode::kTolerateCorruptedTailRecords
||
705 immutable_db_options_
.wal_recovery_mode
==
706 WALRecoveryMode::kAbsoluteConsistency
);
711 flush_scheduler_
.Clear();
712 auto last_sequence
= *next_sequence
- 1;
713 if ((*next_sequence
!= kMaxSequenceNumber
) &&
714 (versions_
->LastSequence() <= last_sequence
)) {
715 versions_
->SetLastSequence(last_sequence
);
719 // True if there's any data in the WALs; if not, we can skip re-processing
721 bool data_seen
= false;
723 // no need to refcount since client still doesn't have access
724 // to the DB and can not drop column families while we iterate
725 auto max_log_number
= log_numbers
.back();
726 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
727 auto iter
= version_edits
.find(cfd
->GetID());
728 assert(iter
!= version_edits
.end());
729 VersionEdit
* edit
= &iter
->second
;
731 if (cfd
->GetLogNumber() > max_log_number
) {
732 // Column family cfd has already flushed the data
733 // from all logs. Memtable has to be empty because
734 // we filter the updates based on log_number
735 // (in WriteBatch::InsertInto)
736 assert(cfd
->mem()->GetFirstSequenceNumber() == 0);
737 assert(edit
->NumEntries() == 0);
741 // flush the final memtable (if non-empty)
742 if (cfd
->mem()->GetFirstSequenceNumber() != 0) {
743 // If flush happened in the middle of recovery (e.g. due to memtable
744 // being full), we flush at the end. Otherwise we'll need to record
745 // where we were on last flush, which make the logic complicated.
746 if (flushed
|| !immutable_db_options_
.avoid_flush_during_recovery
) {
747 status
= WriteLevel0TableForRecovery(job_id
, cfd
, cfd
->mem(), edit
);
754 cfd
->CreateNewMemtable(*cfd
->GetLatestMutableCFOptions(),
755 versions_
->LastSequence());
760 // write MANIFEST with update
761 // writing log_number in the manifest means that any log file
762 // with number strongly less than (log_number + 1) is already
763 // recovered and should be ignored on next reincarnation.
764 // Since we already recovered max_log_number, we want all logs
765 // with numbers `<= max_log_number` (includes this one) to be ignored
766 if (flushed
|| cfd
->mem()->GetFirstSequenceNumber() == 0) {
767 edit
->SetLogNumber(max_log_number
+ 1);
769 // we must mark the next log number as used, even though it's
770 // not actually used. that is because VersionSet assumes
771 // VersionSet::next_file_number_ always to be strictly greater than any
773 versions_
->MarkFileNumberUsedDuringRecovery(max_log_number
+ 1);
774 status
= versions_
->LogAndApply(
775 cfd
, *cfd
->GetLatestMutableCFOptions(), edit
, &mutex_
);
783 if (data_seen
&& !flushed
) {
784 // Mark these as alive so they'll be considered for deletion later by
785 // FindObsoleteFiles()
786 for (auto log_number
: log_numbers
) {
787 alive_log_files_
.push_back(LogFileNumberSize(log_number
));
791 event_logger_
.Log() << "job" << job_id
<< "event"
792 << "recovery_finished";
797 Status
DBImpl::WriteLevel0TableForRecovery(int job_id
, ColumnFamilyData
* cfd
,
798 MemTable
* mem
, VersionEdit
* edit
) {
800 const uint64_t start_micros
= env_
->NowMicros();
802 auto pending_outputs_inserted_elem
=
803 CaptureCurrentFileNumberInPendingOutputs();
804 meta
.fd
= FileDescriptor(versions_
->NewFileNumber(), 0, 0);
806 ro
.total_order_seek
= true;
809 TableProperties table_properties
;
811 ScopedArenaIterator
iter(mem
->NewIterator(ro
, &arena
));
812 ROCKS_LOG_DEBUG(immutable_db_options_
.info_log
,
813 "[%s] [WriteLevel0TableForRecovery]"
814 " Level-0 table #%" PRIu64
": started",
815 cfd
->GetName().c_str(), meta
.fd
.GetNumber());
817 // Get the latest mutable cf options while the mutex is still locked
818 const MutableCFOptions mutable_cf_options
=
819 *cfd
->GetLatestMutableCFOptions();
820 bool paranoid_file_checks
=
821 cfd
->GetLatestMutableCFOptions()->paranoid_file_checks
;
825 SequenceNumber earliest_write_conflict_snapshot
;
826 std::vector
<SequenceNumber
> snapshot_seqs
=
827 snapshots_
.GetAll(&earliest_write_conflict_snapshot
);
829 EnvOptions optimized_env_options
=
830 env_
->OptimizeForCompactionTableWrite(env_options_
, immutable_db_options_
);
832 dbname_
, env_
, *cfd
->ioptions(), mutable_cf_options
,
833 optimized_env_options
, cfd
->table_cache(), iter
.get(),
834 std::unique_ptr
<InternalIterator
>(mem
->NewRangeTombstoneIterator(ro
)),
835 &meta
, cfd
->internal_comparator(),
836 cfd
->int_tbl_prop_collector_factories(), cfd
->GetID(), cfd
->GetName(),
837 snapshot_seqs
, earliest_write_conflict_snapshot
,
838 GetCompressionFlush(*cfd
->ioptions(), mutable_cf_options
),
839 cfd
->ioptions()->compression_opts
, paranoid_file_checks
,
840 cfd
->internal_stats(), TableFileCreationReason::kRecovery
,
841 &event_logger_
, job_id
);
842 LogFlush(immutable_db_options_
.info_log
);
843 ROCKS_LOG_DEBUG(immutable_db_options_
.info_log
,
844 "[%s] [WriteLevel0TableForRecovery]"
845 " Level-0 table #%" PRIu64
": %" PRIu64
" bytes %s",
846 cfd
->GetName().c_str(), meta
.fd
.GetNumber(),
847 meta
.fd
.GetFileSize(), s
.ToString().c_str());
851 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem
);
853 // Note that if file_size is zero, the file has been deleted and
854 // should not be added to the manifest.
856 if (s
.ok() && meta
.fd
.GetFileSize() > 0) {
857 edit
->AddFile(level
, meta
.fd
.GetNumber(), meta
.fd
.GetPathId(),
858 meta
.fd
.GetFileSize(), meta
.smallest
, meta
.largest
,
859 meta
.smallest_seqno
, meta
.largest_seqno
,
860 meta
.marked_for_compaction
);
863 InternalStats::CompactionStats
stats(1);
864 stats
.micros
= env_
->NowMicros() - start_micros
;
865 stats
.bytes_written
= meta
.fd
.GetFileSize();
866 stats
.num_output_files
= 1;
867 cfd
->internal_stats()->AddCompactionStats(level
, stats
);
868 cfd
->internal_stats()->AddCFStats(
869 InternalStats::BYTES_FLUSHED
, meta
.fd
.GetFileSize());
870 RecordTick(stats_
, COMPACT_WRITE_BYTES
, meta
.fd
.GetFileSize());
874 Status
DB::Open(const Options
& options
, const std::string
& dbname
, DB
** dbptr
) {
875 DBOptions
db_options(options
);
876 ColumnFamilyOptions
cf_options(options
);
877 std::vector
<ColumnFamilyDescriptor
> column_families
;
878 column_families
.push_back(
879 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, cf_options
));
880 std::vector
<ColumnFamilyHandle
*> handles
;
881 Status s
= DB::Open(db_options
, dbname
, column_families
, &handles
, dbptr
);
883 assert(handles
.size() == 1);
884 // i can delete the handle since DBImpl is always holding a reference to
885 // default column family
891 Status
DB::Open(const DBOptions
& db_options
, const std::string
& dbname
,
892 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
893 std::vector
<ColumnFamilyHandle
*>* handles
, DB
** dbptr
) {
894 Status s
= SanitizeOptionsByTable(db_options
, column_families
);
899 s
= ValidateOptions(db_options
, column_families
);
907 size_t max_write_buffer_size
= 0;
908 for (auto cf
: column_families
) {
909 max_write_buffer_size
=
910 std::max(max_write_buffer_size
, cf
.options
.write_buffer_size
);
913 DBImpl
* impl
= new DBImpl(db_options
, dbname
);
914 s
= impl
->env_
->CreateDirIfMissing(impl
->immutable_db_options_
.wal_dir
);
916 for (auto db_path
: impl
->immutable_db_options_
.db_paths
) {
917 s
= impl
->env_
->CreateDirIfMissing(db_path
.path
);
929 s
= impl
->CreateArchivalDirectory();
935 // Handles create_if_missing, error_if_exists
936 s
= impl
->Recover(column_families
);
938 uint64_t new_log_number
= impl
->versions_
->NewFileNumber();
939 unique_ptr
<WritableFile
> lfile
;
940 EnvOptions
soptions(db_options
);
941 EnvOptions opt_env_options
=
942 impl
->immutable_db_options_
.env
->OptimizeForLogWrite(
943 soptions
, BuildDBOptions(impl
->immutable_db_options_
,
944 impl
->mutable_db_options_
));
946 impl
->immutable_db_options_
.env
,
947 LogFileName(impl
->immutable_db_options_
.wal_dir
, new_log_number
),
948 &lfile
, opt_env_options
);
950 lfile
->SetPreallocationBlockSize(
951 impl
->GetWalPreallocateBlockSize(max_write_buffer_size
));
952 impl
->logfile_number_
= new_log_number
;
953 unique_ptr
<WritableFileWriter
> file_writer(
954 new WritableFileWriter(std::move(lfile
), opt_env_options
));
955 impl
->logs_
.emplace_back(
958 std::move(file_writer
), new_log_number
,
959 impl
->immutable_db_options_
.recycle_log_file_num
> 0));
961 // set column family handles
962 for (auto cf
: column_families
) {
964 impl
->versions_
->GetColumnFamilySet()->GetColumnFamily(cf
.name
);
965 if (cfd
!= nullptr) {
967 new ColumnFamilyHandleImpl(cfd
, impl
, &impl
->mutex_
));
968 impl
->NewThreadStatusCfInfo(cfd
);
970 if (db_options
.create_missing_column_families
) {
971 // missing column family, create it
972 ColumnFamilyHandle
* handle
;
973 impl
->mutex_
.Unlock();
974 s
= impl
->CreateColumnFamily(cf
.options
, cf
.name
, &handle
);
977 handles
->push_back(handle
);
982 s
= Status::InvalidArgument("Column family not found: ", cf
.name
);
989 for (auto cfd
: *impl
->versions_
->GetColumnFamilySet()) {
990 delete impl
->InstallSuperVersionAndScheduleWork(
991 cfd
, nullptr, *cfd
->GetLatestMutableCFOptions());
993 impl
->alive_log_files_
.push_back(
994 DBImpl::LogFileNumberSize(impl
->logfile_number_
));
995 impl
->DeleteObsoleteFiles();
996 s
= impl
->directories_
.GetDbDir()->Fsync();
1001 for (auto cfd
: *impl
->versions_
->GetColumnFamilySet()) {
1002 if (cfd
->ioptions()->compaction_style
== kCompactionStyleFIFO
) {
1003 auto* vstorage
= cfd
->current()->storage_info();
1004 for (int i
= 1; i
< vstorage
->num_levels(); ++i
) {
1005 int num_files
= vstorage
->NumLevelFiles(i
);
1006 if (num_files
> 0) {
1007 s
= Status::InvalidArgument(
1008 "Not all files are at level 0. Cannot "
1009 "open with FIFO compaction style.");
1014 if (!cfd
->mem()->IsSnapshotSupported()) {
1015 impl
->is_snapshot_supported_
= false;
1017 if (cfd
->ioptions()->merge_operator
!= nullptr &&
1018 !cfd
->mem()->IsMergeOperatorSupported()) {
1019 s
= Status::InvalidArgument(
1020 "The memtable of column family %s does not support merge operator "
1021 "its options.merge_operator is non-null", cfd
->GetName().c_str());
1028 TEST_SYNC_POINT("DBImpl::Open:Opened");
1029 Status persist_options_status
;
1031 // Persist RocksDB Options before scheduling the compaction.
1032 // The WriteOptionsFile() will release and lock the mutex internally.
1033 persist_options_status
= impl
->WriteOptionsFile();
1036 impl
->opened_successfully_
= true;
1037 impl
->MaybeScheduleFlushOrCompaction();
1039 impl
->mutex_
.Unlock();
1041 #ifndef ROCKSDB_LITE
1042 auto sfm
= static_cast<SstFileManagerImpl
*>(
1043 impl
->immutable_db_options_
.sst_file_manager
.get());
1044 if (s
.ok() && sfm
) {
1045 // Notify SstFileManager about all sst files that already exist in
1046 // db_paths[0] when the DB is opened.
1047 auto& db_path
= impl
->immutable_db_options_
.db_paths
[0];
1048 std::vector
<std::string
> existing_files
;
1049 impl
->immutable_db_options_
.env
->GetChildren(db_path
.path
, &existing_files
);
1050 for (auto& file_name
: existing_files
) {
1051 uint64_t file_number
;
1053 std::string file_path
= db_path
.path
+ "/" + file_name
;
1054 if (ParseFileName(file_name
, &file_number
, &file_type
) &&
1055 file_type
== kTableFile
) {
1056 sfm
->OnAddFile(file_path
);
1060 #endif // !ROCKSDB_LITE
1063 ROCKS_LOG_INFO(impl
->immutable_db_options_
.info_log
, "DB pointer %p", impl
);
1064 LogFlush(impl
->immutable_db_options_
.info_log
);
1065 if (!persist_options_status
.ok()) {
1066 if (db_options
.fail_if_options_file_error
) {
1067 s
= Status::IOError(
1068 "DB::Open() failed --- Unable to persist Options file",
1069 persist_options_status
.ToString());
1071 ROCKS_LOG_WARN(impl
->immutable_db_options_
.info_log
,
1072 "Unable to persist options in DB::Open() -- %s",
1073 persist_options_status
.ToString().c_str());
1077 for (auto* h
: *handles
) {
1086 } // namespace rocksdb