1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl.h"
11 #ifndef __STDC_FORMAT_MACROS
12 #define __STDC_FORMAT_MACROS
16 #include "db/builder.h"
17 #include "db/error_handler.h"
18 #include "options/options_helper.h"
19 #include "rocksdb/wal_filter.h"
20 #include "table/block_based_table_factory.h"
21 #include "util/rate_limiter.h"
22 #include "util/sst_file_manager_impl.h"
23 #include "util/sync_point.h"
26 Options
SanitizeOptions(const std::string
& dbname
, const Options
& src
) {
27 auto db_options
= SanitizeOptions(dbname
, DBOptions(src
));
28 ImmutableDBOptions
immutable_db_options(db_options
);
30 SanitizeOptions(immutable_db_options
, ColumnFamilyOptions(src
));
31 return Options(db_options
, cf_options
);
34 DBOptions
SanitizeOptions(const std::string
& dbname
, const DBOptions
& src
) {
35 DBOptions
result(src
);
37 // result.max_open_files means an "infinite" open files.
38 if (result
.max_open_files
!= -1) {
39 int max_max_open_files
= port::GetMaxOpenFiles();
40 if (max_max_open_files
== -1) {
41 max_max_open_files
= 0x400000;
43 ClipToRange(&result
.max_open_files
, 20, max_max_open_files
);
44 TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
45 &result
.max_open_files
);
48 if (result
.info_log
== nullptr) {
49 Status s
= CreateLoggerFromOptions(dbname
, result
, &result
.info_log
);
51 // No place suitable for logging
52 result
.info_log
= nullptr;
56 if (!result
.write_buffer_manager
) {
57 result
.write_buffer_manager
.reset(
58 new WriteBufferManager(result
.db_write_buffer_size
));
60 auto bg_job_limits
= DBImpl::GetBGJobLimits(
61 result
.max_background_flushes
, result
.max_background_compactions
,
62 result
.max_background_jobs
, true /* parallelize_compactions */);
63 result
.env
->IncBackgroundThreadsIfNeeded(bg_job_limits
.max_compactions
,
65 result
.env
->IncBackgroundThreadsIfNeeded(bg_job_limits
.max_flushes
,
68 if (result
.rate_limiter
.get() != nullptr) {
69 if (result
.bytes_per_sync
== 0) {
70 result
.bytes_per_sync
= 1024 * 1024;
74 if (result
.delayed_write_rate
== 0) {
75 if (result
.rate_limiter
.get() != nullptr) {
76 result
.delayed_write_rate
= result
.rate_limiter
->GetBytesPerSecond();
78 if (result
.delayed_write_rate
== 0) {
79 result
.delayed_write_rate
= 16 * 1024 * 1024;
83 if (result
.WAL_ttl_seconds
> 0 || result
.WAL_size_limit_MB
> 0) {
84 result
.recycle_log_file_num
= false;
87 if (result
.recycle_log_file_num
&&
88 (result
.wal_recovery_mode
== WALRecoveryMode::kPointInTimeRecovery
||
89 result
.wal_recovery_mode
== WALRecoveryMode::kAbsoluteConsistency
)) {
90 // kPointInTimeRecovery is indistinguishable from
91 // kTolerateCorruptedTailRecords in recycle mode since we define
92 // the "end" of the log as the first corrupt record we encounter.
93 // kAbsoluteConsistency doesn't make sense because even a clean
94 // shutdown leaves old junk at the end of the log file.
95 result
.wal_recovery_mode
= WALRecoveryMode::kTolerateCorruptedTailRecords
;
98 if (result
.wal_dir
.empty()) {
99 // Use dbname as default
100 result
.wal_dir
= dbname
;
102 if (result
.wal_dir
.back() == '/') {
103 result
.wal_dir
= result
.wal_dir
.substr(0, result
.wal_dir
.size() - 1);
106 if (result
.db_paths
.size() == 0) {
107 result
.db_paths
.emplace_back(dbname
, std::numeric_limits
<uint64_t>::max());
110 if (result
.use_direct_reads
&& result
.compaction_readahead_size
== 0) {
111 TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
112 result
.compaction_readahead_size
= 1024 * 1024 * 2;
115 if (result
.compaction_readahead_size
> 0 || result
.use_direct_reads
) {
116 result
.new_table_reader_for_compaction_inputs
= true;
119 // Force flush on DB open if 2PC is enabled, since with 2PC we have no
120 // guarantee that consecutive log files have consecutive sequence id, which
121 // make recovery complicated.
122 if (result
.allow_2pc
) {
123 result
.avoid_flush_during_recovery
= false;
127 // When the DB is stopped, it's possible that there are some .trash files that
128 // were not deleted yet, when we open the DB we will find these .trash files
129 // and schedule them to be deleted (or delete immediately if SstFileManager
131 auto sfm
= static_cast<SstFileManagerImpl
*>(result
.sst_file_manager
.get());
132 for (size_t i
= 0; i
< result
.db_paths
.size(); i
++) {
133 DeleteScheduler::CleanupDirectory(result
.env
, sfm
, result
.db_paths
[i
].path
);
136 // Create a default SstFileManager for purposes of tracking compaction size
137 // and facilitating recovery from out of space errors.
138 if (result
.sst_file_manager
.get() == nullptr) {
139 std::shared_ptr
<SstFileManager
> sst_file_manager(
140 NewSstFileManager(result
.env
, result
.info_log
));
141 result
.sst_file_manager
= sst_file_manager
;
149 Status
SanitizeOptionsByTable(
150 const DBOptions
& db_opts
,
151 const std::vector
<ColumnFamilyDescriptor
>& column_families
) {
153 for (auto cf
: column_families
) {
154 s
= cf
.options
.table_factory
->SanitizeOptions(db_opts
, cf
.options
);
162 static Status
ValidateOptions(
163 const DBOptions
& db_options
,
164 const std::vector
<ColumnFamilyDescriptor
>& column_families
) {
167 for (auto& cfd
: column_families
) {
168 s
= CheckCompressionSupported(cfd
.options
);
169 if (s
.ok() && db_options
.allow_concurrent_memtable_write
) {
170 s
= CheckConcurrentWritesSupported(cfd
.options
);
173 s
= CheckCFPathsSupported(db_options
, cfd
.options
);
179 if (cfd
.options
.ttl
> 0) {
180 if (db_options
.max_open_files
!= -1) {
181 return Status::NotSupported(
182 "TTL is only supported when files are always "
183 "kept open (set max_open_files = -1). ");
185 if (cfd
.options
.table_factory
->Name() !=
186 BlockBasedTableFactory().Name()) {
187 return Status::NotSupported(
188 "TTL is only supported in Block-Based Table format. ");
193 if (db_options
.db_paths
.size() > 4) {
194 return Status::NotSupported(
195 "More than four DB paths are not supported yet. ");
198 if (db_options
.allow_mmap_reads
&& db_options
.use_direct_reads
) {
199 // Protect against assert in PosixMMapReadableFile constructor
200 return Status::NotSupported(
201 "If memory mapped reads (allow_mmap_reads) are enabled "
202 "then direct I/O reads (use_direct_reads) must be disabled. ");
205 if (db_options
.allow_mmap_writes
&&
206 db_options
.use_direct_io_for_flush_and_compaction
) {
207 return Status::NotSupported(
208 "If memory mapped writes (allow_mmap_writes) are enabled "
209 "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
213 if (db_options
.keep_log_file_num
== 0) {
214 return Status::InvalidArgument("keep_log_file_num must be greater than 0");
220 Status
DBImpl::NewDB() {
222 new_db
.SetLogNumber(0);
223 new_db
.SetNextFile(2);
224 new_db
.SetLastSequence(0);
228 ROCKS_LOG_INFO(immutable_db_options_
.info_log
, "Creating manifest 1 \n");
229 const std::string manifest
= DescriptorFileName(dbname_
, 1);
231 std::unique_ptr
<WritableFile
> file
;
232 EnvOptions env_options
= env_
->OptimizeForManifestWrite(env_options_
);
233 s
= NewWritableFile(env_
, manifest
, &file
, env_options
);
237 file
->SetPreallocationBlockSize(
238 immutable_db_options_
.manifest_preallocation_size
);
239 std::unique_ptr
<WritableFileWriter
> file_writer(new WritableFileWriter(
240 std::move(file
), manifest
, env_options
, env_
, nullptr /* stats */,
241 immutable_db_options_
.listeners
));
242 log::Writer
log(std::move(file_writer
), 0, false);
244 new_db
.EncodeTo(&record
);
245 s
= log
.AddRecord(record
);
247 s
= SyncManifest(env_
, &immutable_db_options_
, log
.file());
251 // Make "CURRENT" file that points to the new manifest file.
252 s
= SetCurrentFile(env_
, dbname_
, 1, directories_
.GetDbDir());
254 env_
->DeleteFile(manifest
);
259 Status
DBImpl::CreateAndNewDirectory(Env
* env
, const std::string
& dirname
,
260 std::unique_ptr
<Directory
>* directory
) {
261 // We call CreateDirIfMissing() as the directory may already exist (if we
262 // are reopening a DB), when this happens we don't want creating the
263 // directory to cause an error. However, we need to check if creating the
264 // directory fails or else we may get an obscure message about the lock
265 // file not existing. One real-world example of this occurring is if
266 // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
267 // when dbname_ is "dir/db" but when "dir" doesn't exist.
268 Status s
= env
->CreateDirIfMissing(dirname
);
272 return env
->NewDirectory(dirname
, directory
);
275 Status
DBImpl::Directories::SetDirectories(
276 Env
* env
, const std::string
& dbname
, const std::string
& wal_dir
,
277 const std::vector
<DbPath
>& data_paths
) {
278 Status s
= DBImpl::CreateAndNewDirectory(env
, dbname
, &db_dir_
);
282 if (!wal_dir
.empty() && dbname
!= wal_dir
) {
283 s
= DBImpl::CreateAndNewDirectory(env
, wal_dir
, &wal_dir_
);
290 for (auto& p
: data_paths
) {
291 const std::string db_path
= p
.path
;
292 if (db_path
== dbname
) {
293 data_dirs_
.emplace_back(nullptr);
295 std::unique_ptr
<Directory
> path_directory
;
296 s
= DBImpl::CreateAndNewDirectory(env
, db_path
, &path_directory
);
300 data_dirs_
.emplace_back(path_directory
.release());
303 assert(data_dirs_
.size() == data_paths
.size());
307 Status
DBImpl::Recover(
308 const std::vector
<ColumnFamilyDescriptor
>& column_families
, bool read_only
,
309 bool error_if_log_file_exist
, bool error_if_data_exists_in_logs
) {
312 bool is_new_db
= false;
313 assert(db_lock_
== nullptr);
315 Status s
= directories_
.SetDirectories(env_
, dbname_
,
316 immutable_db_options_
.wal_dir
,
317 immutable_db_options_
.db_paths
);
322 s
= env_
->LockFile(LockFileName(dbname_
), &db_lock_
);
327 s
= env_
->FileExists(CurrentFileName(dbname_
));
328 if (s
.IsNotFound()) {
329 if (immutable_db_options_
.create_if_missing
) {
336 return Status::InvalidArgument(
337 dbname_
, "does not exist (create_if_missing is false)");
340 if (immutable_db_options_
.error_if_exists
) {
341 return Status::InvalidArgument(dbname_
,
342 "exists (error_if_exists is true)");
345 // Unexpected error reading file
346 assert(s
.IsIOError());
349 // Check for the IDENTITY file and create it if not there
350 s
= env_
->FileExists(IdentityFileName(dbname_
));
351 if (s
.IsNotFound()) {
352 s
= SetIdentityFile(env_
, dbname_
);
356 } else if (!s
.ok()) {
357 assert(s
.IsIOError());
360 // Verify compatibility of env_options_ and filesystem
362 std::unique_ptr
<RandomAccessFile
> idfile
;
363 EnvOptions
customized_env(env_options_
);
364 customized_env
.use_direct_reads
|=
365 immutable_db_options_
.use_direct_io_for_flush_and_compaction
;
366 s
= env_
->NewRandomAccessFile(IdentityFileName(dbname_
), &idfile
,
369 std::string error_str
= s
.ToString();
370 // Check if unsupported Direct I/O is the root cause
371 customized_env
.use_direct_reads
= false;
372 s
= env_
->NewRandomAccessFile(IdentityFileName(dbname_
), &idfile
,
375 return Status::InvalidArgument(
376 "Direct I/O is not supported by the specified DB.");
378 return Status::InvalidArgument(
379 "Found options incompatible with filesystem", error_str
.c_str());
385 Status s
= versions_
->Recover(column_families
, read_only
);
386 if (immutable_db_options_
.paranoid_checks
&& s
.ok()) {
387 s
= CheckConsistency();
389 if (s
.ok() && !read_only
) {
390 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
391 s
= cfd
->AddDirectories();
398 // Initial max_total_in_memory_state_ before recovery logs. Log recovery
399 // may check this value to decide whether to flush.
400 max_total_in_memory_state_
= 0;
401 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
402 auto* mutable_cf_options
= cfd
->GetLatestMutableCFOptions();
403 max_total_in_memory_state_
+= mutable_cf_options
->write_buffer_size
*
404 mutable_cf_options
->max_write_buffer_number
;
408 SequenceNumber
next_sequence(kMaxSequenceNumber
);
409 default_cf_handle_
= new ColumnFamilyHandleImpl(
410 versions_
->GetColumnFamilySet()->GetDefault(), this, &mutex_
);
411 default_cf_internal_stats_
= default_cf_handle_
->cfd()->internal_stats();
412 single_column_family_mode_
=
413 versions_
->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
415 // Recover from all newer log files than the ones named in the
416 // descriptor (new log files may have been added by the previous
417 // incarnation without registering them in the descriptor).
419 // Note that prev_log_number() is no longer used, but we pay
420 // attention to it in case we are recovering a database
421 // produced by an older version of rocksdb.
422 std::vector
<std::string
> filenames
;
423 s
= env_
->GetChildren(immutable_db_options_
.wal_dir
, &filenames
);
424 if (s
.IsNotFound()) {
425 return Status::InvalidArgument("wal_dir not found",
426 immutable_db_options_
.wal_dir
);
427 } else if (!s
.ok()) {
431 std::vector
<uint64_t> logs
;
432 for (size_t i
= 0; i
< filenames
.size(); i
++) {
435 if (ParseFileName(filenames
[i
], &number
, &type
) && type
== kLogFile
) {
437 return Status::Corruption(
438 "While creating a new Db, wal_dir contains "
439 "existing log file: ",
442 logs
.push_back(number
);
447 if (logs
.size() > 0) {
448 if (error_if_log_file_exist
) {
449 return Status::Corruption(
450 "The db was opened in readonly mode with error_if_log_file_exist"
451 "flag but a log file already exists");
452 } else if (error_if_data_exists_in_logs
) {
453 for (auto& log
: logs
) {
454 std::string fname
= LogFileName(immutable_db_options_
.wal_dir
, log
);
456 s
= env_
->GetFileSize(fname
, &bytes
);
459 return Status::Corruption(
460 "error_if_data_exists_in_logs is set but there are data "
469 // Recover in the order in which the logs were generated
470 std::sort(logs
.begin(), logs
.end());
471 s
= RecoverLogFiles(logs
, &next_sequence
, read_only
);
473 // Clear memtables if recovery failed
474 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
475 cfd
->CreateNewMemtable(*cfd
->GetLatestMutableCFOptions(),
483 // If we are opening as read-only, we need to update options_file_number_
484 // to reflect the most recent OPTIONS file. It does not matter for regular
485 // read-write db instance because options_file_number_ will later be
486 // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
487 std::vector
<std::string
> file_names
;
489 s
= env_
->GetChildren(GetName(), &file_names
);
493 uint64_t options_file_number
= 0;
495 for (const auto& fname
: file_names
) {
496 if (ParseFileName(fname
, &number
, &type
) && type
== kOptionsFile
) {
497 options_file_number
= std::max(number
, options_file_number
);
500 versions_
->options_file_number_
= options_file_number
;
507 // REQUIRES: log_numbers are sorted in ascending order
508 Status
DBImpl::RecoverLogFiles(const std::vector
<uint64_t>& log_numbers
,
509 SequenceNumber
* next_sequence
, bool read_only
) {
510 struct LogReporter
: public log::Reader::Reporter
{
514 Status
* status
; // nullptr if immutable_db_options_.paranoid_checks==false
515 void Corruption(size_t bytes
, const Status
& s
) override
{
516 ROCKS_LOG_WARN(info_log
, "%s%s: dropping %d bytes; %s",
517 (this->status
== nullptr ? "(ignoring error) " : ""),
518 fname
, static_cast<int>(bytes
), s
.ToString().c_str());
519 if (this->status
!= nullptr && this->status
->ok()) {
527 std::unordered_map
<int, VersionEdit
> version_edits
;
528 // no need to refcount because iteration is under mutex
529 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
531 edit
.SetColumnFamily(cfd
->GetID());
532 version_edits
.insert({cfd
->GetID(), edit
});
534 int job_id
= next_job_id_
.fetch_add(1);
536 auto stream
= event_logger_
.Log();
537 stream
<< "job" << job_id
<< "event"
538 << "recovery_started";
539 stream
<< "log_files";
541 for (auto log_number
: log_numbers
) {
542 stream
<< log_number
;
548 if (immutable_db_options_
.wal_filter
!= nullptr) {
549 std::map
<std::string
, uint32_t> cf_name_id_map
;
550 std::map
<uint32_t, uint64_t> cf_lognumber_map
;
551 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
552 cf_name_id_map
.insert(std::make_pair(cfd
->GetName(), cfd
->GetID()));
553 cf_lognumber_map
.insert(
554 std::make_pair(cfd
->GetID(), cfd
->GetLogNumber()));
557 immutable_db_options_
.wal_filter
->ColumnFamilyLogNumberMap(cf_lognumber_map
,
562 bool stop_replay_by_wal_filter
= false;
563 bool stop_replay_for_corruption
= false;
564 bool flushed
= false;
565 uint64_t corrupted_log_number
= kMaxSequenceNumber
;
566 for (auto log_number
: log_numbers
) {
567 if (log_number
< versions_
->min_log_number_to_keep_2pc()) {
568 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
569 "Skipping log #%" PRIu64
570 " since it is older than min log to keep #%" PRIu64
,
571 log_number
, versions_
->min_log_number_to_keep_2pc());
574 // The previous incarnation may not have written any MANIFEST
575 // records after allocating this log number. So we manually
576 // update the file number allocation counter in VersionSet.
577 versions_
->MarkFileNumberUsed(log_number
);
579 std::string fname
= LogFileName(immutable_db_options_
.wal_dir
, log_number
);
581 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
582 "Recovering log #%" PRIu64
" mode %d", log_number
,
583 static_cast<int>(immutable_db_options_
.wal_recovery_mode
));
584 auto logFileDropped
= [this, &fname
]() {
586 if (env_
->GetFileSize(fname
, &bytes
).ok()) {
587 auto info_log
= immutable_db_options_
.info_log
.get();
588 ROCKS_LOG_WARN(info_log
, "%s: dropping %d bytes", fname
.c_str(),
589 static_cast<int>(bytes
));
592 if (stop_replay_by_wal_filter
) {
597 std::unique_ptr
<SequentialFileReader
> file_reader
;
599 std::unique_ptr
<SequentialFile
> file
;
600 status
= env_
->NewSequentialFile(fname
, &file
,
601 env_
->OptimizeForLogRead(env_options_
));
603 MaybeIgnoreError(&status
);
607 // Fail with one log file, but that's ok.
612 file_reader
.reset(new SequentialFileReader(std::move(file
), fname
));
615 // Create the log reader.
616 LogReporter reporter
;
618 reporter
.info_log
= immutable_db_options_
.info_log
.get();
619 reporter
.fname
= fname
.c_str();
620 if (!immutable_db_options_
.paranoid_checks
||
621 immutable_db_options_
.wal_recovery_mode
==
622 WALRecoveryMode::kSkipAnyCorruptedRecords
) {
623 reporter
.status
= nullptr;
625 reporter
.status
= &status
;
627 // We intentially make log::Reader do checksumming even if
628 // paranoid_checks==false so that corruptions cause entire commits
629 // to be skipped instead of propagating bad information (like overly
630 // large sequence numbers).
631 log::Reader
reader(immutable_db_options_
.info_log
, std::move(file_reader
),
632 &reporter
, true /*checksum*/, log_number
);
634 // Determine if we should tolerate incomplete records at the tail end of the
635 // Read all the records and add to a memtable
640 while (!stop_replay_by_wal_filter
&&
641 reader
.ReadRecord(&record
, &scratch
,
642 immutable_db_options_
.wal_recovery_mode
) &&
644 if (record
.size() < WriteBatchInternal::kHeader
) {
645 reporter
.Corruption(record
.size(),
646 Status::Corruption("log record too small"));
649 WriteBatchInternal::SetContents(&batch
, record
);
650 SequenceNumber sequence
= WriteBatchInternal::Sequence(&batch
);
652 if (immutable_db_options_
.wal_recovery_mode
==
653 WALRecoveryMode::kPointInTimeRecovery
) {
654 // In point-in-time recovery mode, if sequence id of log files are
655 // consecutive, we continue recovery despite corruption. This could
656 // happen when we open and write to a corrupted DB, where sequence id
657 // will start from the last sequence id we recovered.
658 if (sequence
== *next_sequence
) {
659 stop_replay_for_corruption
= false;
661 if (stop_replay_for_corruption
) {
668 if (immutable_db_options_
.wal_filter
!= nullptr) {
669 WriteBatch new_batch
;
670 bool batch_changed
= false;
672 WalFilter::WalProcessingOption wal_processing_option
=
673 immutable_db_options_
.wal_filter
->LogRecordFound(
674 log_number
, fname
, batch
, &new_batch
, &batch_changed
);
676 switch (wal_processing_option
) {
677 case WalFilter::WalProcessingOption::kContinueProcessing
:
678 // do nothing, proceeed normally
680 case WalFilter::WalProcessingOption::kIgnoreCurrentRecord
:
681 // skip current record
683 case WalFilter::WalProcessingOption::kStopReplay
:
684 // skip current record and stop replay
685 stop_replay_by_wal_filter
= true;
687 case WalFilter::WalProcessingOption::kCorruptedRecord
: {
689 Status::Corruption("Corruption reported by Wal Filter ",
690 immutable_db_options_
.wal_filter
->Name());
691 MaybeIgnoreError(&status
);
693 reporter
.Corruption(record
.size(), status
);
699 assert(false); // unhandled case
700 status
= Status::NotSupported(
701 "Unknown WalProcessingOption returned"
703 immutable_db_options_
.wal_filter
->Name());
704 MaybeIgnoreError(&status
);
708 // Ignore the error with current record processing.
715 // Make sure that the count in the new batch is
716 // within the orignal count.
717 int new_count
= WriteBatchInternal::Count(&new_batch
);
718 int original_count
= WriteBatchInternal::Count(&batch
);
719 if (new_count
> original_count
) {
721 immutable_db_options_
.info_log
,
722 "Recovering log #%" PRIu64
723 " mode %d log filter %s returned "
724 "more records (%d) than original (%d) which is not allowed. "
725 "Aborting recovery.",
727 static_cast<int>(immutable_db_options_
.wal_recovery_mode
),
728 immutable_db_options_
.wal_filter
->Name(), new_count
,
730 status
= Status::NotSupported(
731 "More than original # of records "
732 "returned by Wal Filter ",
733 immutable_db_options_
.wal_filter
->Name());
736 // Set the same sequence number in the new_batch
737 // as the original batch.
738 WriteBatchInternal::SetSequence(&new_batch
,
739 WriteBatchInternal::Sequence(&batch
));
743 #endif // ROCKSDB_LITE
745 // If column family was not found, it might mean that the WAL write
746 // batch references to the column family that was dropped after the
747 // insert. We don't want to fail the whole write batch in that case --
748 // we just ignore the update.
749 // That's why we set ignore missing column families to true
750 bool has_valid_writes
= false;
751 status
= WriteBatchInternal::InsertInto(
752 &batch
, column_family_memtables_
.get(), &flush_scheduler_
, true,
753 log_number
, this, false /* concurrent_memtable_writes */,
754 next_sequence
, &has_valid_writes
, seq_per_batch_
, batch_per_txn_
);
755 MaybeIgnoreError(&status
);
757 // We are treating this as a failure while reading since we read valid
758 // blocks that do not form coherent data
759 reporter
.Corruption(record
.size(), status
);
763 if (has_valid_writes
&& !read_only
) {
764 // we can do this because this is called before client has access to the
765 // DB and there is only a single thread operating on DB
766 ColumnFamilyData
* cfd
;
768 while ((cfd
= flush_scheduler_
.TakeNextColumnFamily()) != nullptr) {
770 // If this asserts, it means that InsertInto failed in
771 // filtering updates to already-flushed column families
772 assert(cfd
->GetLogNumber() <= log_number
);
773 auto iter
= version_edits
.find(cfd
->GetID());
774 assert(iter
!= version_edits
.end());
775 VersionEdit
* edit
= &iter
->second
;
776 status
= WriteLevel0TableForRecovery(job_id
, cfd
, cfd
->mem(), edit
);
778 // Reflect errors immediately so that conditions like full
779 // file-systems cause the DB::Open() to fail.
784 cfd
->CreateNewMemtable(*cfd
->GetLatestMutableCFOptions(),
791 if (status
.IsNotSupported()) {
792 // We should not treat NotSupported as corruption. It is rather a clear
793 // sign that we are processing a WAL that is produced by an incompatible
794 // version of the code.
797 if (immutable_db_options_
.wal_recovery_mode
==
798 WALRecoveryMode::kSkipAnyCorruptedRecords
) {
799 // We should ignore all errors unconditionally
800 status
= Status::OK();
801 } else if (immutable_db_options_
.wal_recovery_mode
==
802 WALRecoveryMode::kPointInTimeRecovery
) {
803 // We should ignore the error but not continue replaying
804 status
= Status::OK();
805 stop_replay_for_corruption
= true;
806 corrupted_log_number
= log_number
;
807 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
808 "Point in time recovered to log #%" PRIu64
810 log_number
, *next_sequence
);
812 assert(immutable_db_options_
.wal_recovery_mode
==
813 WALRecoveryMode::kTolerateCorruptedTailRecords
||
814 immutable_db_options_
.wal_recovery_mode
==
815 WALRecoveryMode::kAbsoluteConsistency
);
820 flush_scheduler_
.Clear();
821 auto last_sequence
= *next_sequence
- 1;
822 if ((*next_sequence
!= kMaxSequenceNumber
) &&
823 (versions_
->LastSequence() <= last_sequence
)) {
824 versions_
->SetLastAllocatedSequence(last_sequence
);
825 versions_
->SetLastPublishedSequence(last_sequence
);
826 versions_
->SetLastSequence(last_sequence
);
829 // Compare the corrupted log number to all columnfamily's current log number.
830 // Abort Open() if any column family's log number is greater than
831 // the corrupted log number, which means CF contains data beyond the point of
832 // corruption. This could during PIT recovery when the WAL is corrupted and
833 // some (but not all) CFs are flushed
834 if (stop_replay_for_corruption
== true &&
835 (immutable_db_options_
.wal_recovery_mode
==
836 WALRecoveryMode::kPointInTimeRecovery
||
837 immutable_db_options_
.wal_recovery_mode
==
838 WALRecoveryMode::kTolerateCorruptedTailRecords
)) {
839 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
840 if (cfd
->GetLogNumber() > corrupted_log_number
) {
841 ROCKS_LOG_ERROR(immutable_db_options_
.info_log
,
842 "Column family inconsistency: SST file contains data"
843 " beyond the point of corruption.");
844 return Status::Corruption("SST file is ahead of WALs");
849 // True if there's any data in the WALs; if not, we can skip re-processing
851 bool data_seen
= false;
853 // no need to refcount since client still doesn't have access
854 // to the DB and can not drop column families while we iterate
855 auto max_log_number
= log_numbers
.back();
856 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
857 auto iter
= version_edits
.find(cfd
->GetID());
858 assert(iter
!= version_edits
.end());
859 VersionEdit
* edit
= &iter
->second
;
861 if (cfd
->GetLogNumber() > max_log_number
) {
862 // Column family cfd has already flushed the data
863 // from all logs. Memtable has to be empty because
864 // we filter the updates based on log_number
865 // (in WriteBatch::InsertInto)
866 assert(cfd
->mem()->GetFirstSequenceNumber() == 0);
867 assert(edit
->NumEntries() == 0);
871 // flush the final memtable (if non-empty)
872 if (cfd
->mem()->GetFirstSequenceNumber() != 0) {
873 // If flush happened in the middle of recovery (e.g. due to memtable
874 // being full), we flush at the end. Otherwise we'll need to record
875 // where we were on last flush, which make the logic complicated.
876 if (flushed
|| !immutable_db_options_
.avoid_flush_during_recovery
) {
877 status
= WriteLevel0TableForRecovery(job_id
, cfd
, cfd
->mem(), edit
);
884 cfd
->CreateNewMemtable(*cfd
->GetLatestMutableCFOptions(),
885 versions_
->LastSequence());
890 // write MANIFEST with update
891 // writing log_number in the manifest means that any log file
892 // with number strongly less than (log_number + 1) is already
893 // recovered and should be ignored on next reincarnation.
894 // Since we already recovered max_log_number, we want all logs
895 // with numbers `<= max_log_number` (includes this one) to be ignored
896 if (flushed
|| cfd
->mem()->GetFirstSequenceNumber() == 0) {
897 edit
->SetLogNumber(max_log_number
+ 1);
899 // we must mark the next log number as used, even though it's
900 // not actually used. that is because VersionSet assumes
901 // VersionSet::next_file_number_ always to be strictly greater than any
903 versions_
->MarkFileNumberUsed(max_log_number
+ 1);
904 status
= versions_
->LogAndApply(cfd
, *cfd
->GetLatestMutableCFOptions(),
913 if (status
.ok() && data_seen
&& !flushed
) {
914 status
= RestoreAliveLogFiles(log_numbers
);
917 event_logger_
.Log() << "job" << job_id
<< "event"
918 << "recovery_finished";
923 Status
DBImpl::RestoreAliveLogFiles(const std::vector
<uint64_t>& log_numbers
) {
924 if (log_numbers
.empty()) {
929 assert(immutable_db_options_
.avoid_flush_during_recovery
);
930 if (two_write_queues_
) {
931 log_write_mutex_
.Lock();
933 // Mark these as alive so they'll be considered for deletion later by
934 // FindObsoleteFiles()
937 for (auto log_number
: log_numbers
) {
938 LogFileNumberSize
log(log_number
);
939 std::string fname
= LogFileName(immutable_db_options_
.wal_dir
, log_number
);
940 // This gets the appear size of the logs, not including preallocated space.
941 s
= env_
->GetFileSize(fname
, &log
.size
);
945 total_log_size_
+= log
.size
;
946 alive_log_files_
.push_back(log
);
947 // We preallocate space for logs, but then after a crash and restart, those
948 // preallocated space are not needed anymore. It is likely only the last
949 // log has such preallocated space, so we only truncate for the last log.
950 if (log_number
== log_numbers
.back()) {
951 std::unique_ptr
<WritableFile
> last_log
;
952 Status truncate_status
= env_
->ReopenWritableFile(
954 env_
->OptimizeForLogWrite(
956 BuildDBOptions(immutable_db_options_
, mutable_db_options_
)));
957 if (truncate_status
.ok()) {
958 truncate_status
= last_log
->Truncate(log
.size
);
960 if (truncate_status
.ok()) {
961 truncate_status
= last_log
->Close();
963 // Not a critical error if fail to truncate.
964 if (!truncate_status
.ok()) {
965 ROCKS_LOG_WARN(immutable_db_options_
.info_log
,
966 "Failed to truncate log #%" PRIu64
": %s", log_number
,
967 truncate_status
.ToString().c_str());
971 if (two_write_queues_
) {
972 log_write_mutex_
.Unlock();
977 Status
DBImpl::WriteLevel0TableForRecovery(int job_id
, ColumnFamilyData
* cfd
,
978 MemTable
* mem
, VersionEdit
* edit
) {
980 const uint64_t start_micros
= env_
->NowMicros();
982 auto pending_outputs_inserted_elem
=
983 CaptureCurrentFileNumberInPendingOutputs();
984 meta
.fd
= FileDescriptor(versions_
->NewFileNumber(), 0, 0);
986 ro
.total_order_seek
= true;
989 TableProperties table_properties
;
991 ScopedArenaIterator
iter(mem
->NewIterator(ro
, &arena
));
992 ROCKS_LOG_DEBUG(immutable_db_options_
.info_log
,
993 "[%s] [WriteLevel0TableForRecovery]"
994 " Level-0 table #%" PRIu64
": started",
995 cfd
->GetName().c_str(), meta
.fd
.GetNumber());
997 // Get the latest mutable cf options while the mutex is still locked
998 const MutableCFOptions mutable_cf_options
=
999 *cfd
->GetLatestMutableCFOptions();
1000 bool paranoid_file_checks
=
1001 cfd
->GetLatestMutableCFOptions()->paranoid_file_checks
;
1003 int64_t _current_time
= 0;
1004 env_
->GetCurrentTime(&_current_time
); // ignore error
1005 const uint64_t current_time
= static_cast<uint64_t>(_current_time
);
1008 auto write_hint
= cfd
->CalculateSSTWriteHint(0);
1011 SequenceNumber earliest_write_conflict_snapshot
;
1012 std::vector
<SequenceNumber
> snapshot_seqs
=
1013 snapshots_
.GetAll(&earliest_write_conflict_snapshot
);
1014 auto snapshot_checker
= snapshot_checker_
.get();
1015 if (use_custom_gc_
&& snapshot_checker
== nullptr) {
1016 snapshot_checker
= DisableGCSnapshotChecker::Instance();
1018 std::vector
<std::unique_ptr
<FragmentedRangeTombstoneIterator
>>
1020 auto range_del_iter
=
1021 mem
->NewRangeTombstoneIterator(ro
, kMaxSequenceNumber
);
1022 if (range_del_iter
!= nullptr) {
1023 range_del_iters
.emplace_back(range_del_iter
);
1026 dbname_
, env_
, *cfd
->ioptions(), mutable_cf_options
,
1027 env_options_for_compaction_
, cfd
->table_cache(), iter
.get(),
1028 std::move(range_del_iters
), &meta
, cfd
->internal_comparator(),
1029 cfd
->int_tbl_prop_collector_factories(), cfd
->GetID(), cfd
->GetName(),
1030 snapshot_seqs
, earliest_write_conflict_snapshot
, snapshot_checker
,
1031 GetCompressionFlush(*cfd
->ioptions(), mutable_cf_options
),
1032 mutable_cf_options
.sample_for_compression
,
1033 cfd
->ioptions()->compression_opts
, paranoid_file_checks
,
1034 cfd
->internal_stats(), TableFileCreationReason::kRecovery
,
1035 &event_logger_
, job_id
, Env::IO_HIGH
, nullptr /* table_properties */,
1036 -1 /* level */, current_time
, write_hint
);
1037 LogFlush(immutable_db_options_
.info_log
);
1038 ROCKS_LOG_DEBUG(immutable_db_options_
.info_log
,
1039 "[%s] [WriteLevel0TableForRecovery]"
1040 " Level-0 table #%" PRIu64
": %" PRIu64
" bytes %s",
1041 cfd
->GetName().c_str(), meta
.fd
.GetNumber(),
1042 meta
.fd
.GetFileSize(), s
.ToString().c_str());
1046 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem
);
1048 // Note that if file_size is zero, the file has been deleted and
1049 // should not be added to the manifest.
1051 if (s
.ok() && meta
.fd
.GetFileSize() > 0) {
1052 edit
->AddFile(level
, meta
.fd
.GetNumber(), meta
.fd
.GetPathId(),
1053 meta
.fd
.GetFileSize(), meta
.smallest
, meta
.largest
,
1054 meta
.fd
.smallest_seqno
, meta
.fd
.largest_seqno
,
1055 meta
.marked_for_compaction
);
1058 InternalStats::CompactionStats
stats(CompactionReason::kFlush
, 1);
1059 stats
.micros
= env_
->NowMicros() - start_micros
;
1060 stats
.bytes_written
= meta
.fd
.GetFileSize();
1061 stats
.num_output_files
= 1;
1062 cfd
->internal_stats()->AddCompactionStats(level
, Env::Priority::USER
, stats
);
1063 cfd
->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED
,
1064 meta
.fd
.GetFileSize());
1065 RecordTick(stats_
, COMPACT_WRITE_BYTES
, meta
.fd
.GetFileSize());
1069 Status
DB::Open(const Options
& options
, const std::string
& dbname
, DB
** dbptr
) {
1070 DBOptions
db_options(options
);
1071 ColumnFamilyOptions
cf_options(options
);
1072 std::vector
<ColumnFamilyDescriptor
> column_families
;
1073 column_families
.push_back(
1074 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, cf_options
));
1075 std::vector
<ColumnFamilyHandle
*> handles
;
1076 Status s
= DB::Open(db_options
, dbname
, column_families
, &handles
, dbptr
);
1078 assert(handles
.size() == 1);
1079 // i can delete the handle since DBImpl is always holding a reference to
1080 // default column family
1086 Status
DB::Open(const DBOptions
& db_options
, const std::string
& dbname
,
1087 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
1088 std::vector
<ColumnFamilyHandle
*>* handles
, DB
** dbptr
) {
1089 const bool kSeqPerBatch
= true;
1090 const bool kBatchPerTxn
= true;
1091 return DBImpl::Open(db_options
, dbname
, column_families
, handles
, dbptr
,
1092 !kSeqPerBatch
, kBatchPerTxn
);
1095 Status
DBImpl::Open(const DBOptions
& db_options
, const std::string
& dbname
,
1096 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
1097 std::vector
<ColumnFamilyHandle
*>* handles
, DB
** dbptr
,
1098 const bool seq_per_batch
, const bool batch_per_txn
) {
1099 Status s
= SanitizeOptionsByTable(db_options
, column_families
);
1104 s
= ValidateOptions(db_options
, column_families
);
1112 size_t max_write_buffer_size
= 0;
1113 for (auto cf
: column_families
) {
1114 max_write_buffer_size
=
1115 std::max(max_write_buffer_size
, cf
.options
.write_buffer_size
);
1118 DBImpl
* impl
= new DBImpl(db_options
, dbname
, seq_per_batch
, batch_per_txn
);
1119 s
= impl
->env_
->CreateDirIfMissing(impl
->immutable_db_options_
.wal_dir
);
1121 std::vector
<std::string
> paths
;
1122 for (auto& db_path
: impl
->immutable_db_options_
.db_paths
) {
1123 paths
.emplace_back(db_path
.path
);
1125 for (auto& cf
: column_families
) {
1126 for (auto& cf_path
: cf
.options
.cf_paths
) {
1127 paths
.emplace_back(cf_path
.path
);
1130 for (auto& path
: paths
) {
1131 s
= impl
->env_
->CreateDirIfMissing(path
);
1137 // For recovery from NoSpace() error, we can only handle
1138 // the case where the database is stored in a single path
1139 if (paths
.size() <= 1) {
1140 impl
->error_handler_
.EnableAutoRecovery();
1149 s
= impl
->CreateArchivalDirectory();
1154 impl
->mutex_
.Lock();
1155 auto write_hint
= impl
->CalculateWALWriteHint();
1156 // Handles create_if_missing, error_if_exists
1157 s
= impl
->Recover(column_families
);
1159 uint64_t new_log_number
= impl
->versions_
->NewFileNumber();
1160 std::unique_ptr
<WritableFile
> lfile
;
1161 EnvOptions
soptions(db_options
);
1162 EnvOptions opt_env_options
=
1163 impl
->immutable_db_options_
.env
->OptimizeForLogWrite(
1164 soptions
, BuildDBOptions(impl
->immutable_db_options_
,
1165 impl
->mutable_db_options_
));
1166 std::string log_fname
=
1167 LogFileName(impl
->immutable_db_options_
.wal_dir
, new_log_number
);
1168 s
= NewWritableFile(impl
->immutable_db_options_
.env
, log_fname
, &lfile
,
1171 lfile
->SetWriteLifeTimeHint(write_hint
);
1172 lfile
->SetPreallocationBlockSize(
1173 impl
->GetWalPreallocateBlockSize(max_write_buffer_size
));
1175 InstrumentedMutexLock
wl(&impl
->log_write_mutex_
);
1176 impl
->logfile_number_
= new_log_number
;
1177 const auto& listeners
= impl
->immutable_db_options_
.listeners
;
1178 std::unique_ptr
<WritableFileWriter
> file_writer(
1179 new WritableFileWriter(std::move(lfile
), log_fname
, opt_env_options
,
1180 impl
->env_
, nullptr /* stats */, listeners
));
1181 impl
->logs_
.emplace_back(
1184 std::move(file_writer
), new_log_number
,
1185 impl
->immutable_db_options_
.recycle_log_file_num
> 0,
1186 impl
->immutable_db_options_
.manual_wal_flush
));
1189 // set column family handles
1190 for (auto cf
: column_families
) {
1192 impl
->versions_
->GetColumnFamilySet()->GetColumnFamily(cf
.name
);
1193 if (cfd
!= nullptr) {
1195 new ColumnFamilyHandleImpl(cfd
, impl
, &impl
->mutex_
));
1196 impl
->NewThreadStatusCfInfo(cfd
);
1198 if (db_options
.create_missing_column_families
) {
1199 // missing column family, create it
1200 ColumnFamilyHandle
* handle
;
1201 impl
->mutex_
.Unlock();
1202 s
= impl
->CreateColumnFamily(cf
.options
, cf
.name
, &handle
);
1203 impl
->mutex_
.Lock();
1205 handles
->push_back(handle
);
1210 s
= Status::InvalidArgument("Column family not found: ", cf
.name
);
1217 SuperVersionContext
sv_context(/* create_superversion */ true);
1218 for (auto cfd
: *impl
->versions_
->GetColumnFamilySet()) {
1219 impl
->InstallSuperVersionAndScheduleWork(
1220 cfd
, &sv_context
, *cfd
->GetLatestMutableCFOptions());
1223 if (impl
->two_write_queues_
) {
1224 impl
->log_write_mutex_
.Lock();
1226 impl
->alive_log_files_
.push_back(
1227 DBImpl::LogFileNumberSize(impl
->logfile_number_
));
1228 if (impl
->two_write_queues_
) {
1229 impl
->log_write_mutex_
.Unlock();
1231 impl
->DeleteObsoleteFiles();
1232 s
= impl
->directories_
.GetDbDir()->Fsync();
1237 for (auto cfd
: *impl
->versions_
->GetColumnFamilySet()) {
1238 if (cfd
->ioptions()->compaction_style
== kCompactionStyleFIFO
) {
1239 auto* vstorage
= cfd
->current()->storage_info();
1240 for (int i
= 1; i
< vstorage
->num_levels(); ++i
) {
1241 int num_files
= vstorage
->NumLevelFiles(i
);
1242 if (num_files
> 0) {
1243 s
= Status::InvalidArgument(
1244 "Not all files are at level 0. Cannot "
1245 "open with FIFO compaction style.");
1250 if (!cfd
->mem()->IsSnapshotSupported()) {
1251 impl
->is_snapshot_supported_
= false;
1253 if (cfd
->ioptions()->merge_operator
!= nullptr &&
1254 !cfd
->mem()->IsMergeOperatorSupported()) {
1255 s
= Status::InvalidArgument(
1256 "The memtable of column family %s does not support merge operator "
1257 "its options.merge_operator is non-null",
1258 cfd
->GetName().c_str());
1265 TEST_SYNC_POINT("DBImpl::Open:Opened");
1266 Status persist_options_status
;
1268 // Persist RocksDB Options before scheduling the compaction.
1269 // The WriteOptionsFile() will release and lock the mutex internally.
1270 persist_options_status
= impl
->WriteOptionsFile(
1271 false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1274 impl
->opened_successfully_
= true;
1275 impl
->MaybeScheduleFlushOrCompaction();
1277 impl
->mutex_
.Unlock();
1279 #ifndef ROCKSDB_LITE
1280 auto sfm
= static_cast<SstFileManagerImpl
*>(
1281 impl
->immutable_db_options_
.sst_file_manager
.get());
1282 if (s
.ok() && sfm
) {
1283 // Notify SstFileManager about all sst files that already exist in
1284 // db_paths[0] and cf_paths[0] when the DB is opened.
1285 std::vector
<std::string
> paths
;
1286 paths
.emplace_back(impl
->immutable_db_options_
.db_paths
[0].path
);
1287 for (auto& cf
: column_families
) {
1288 if (!cf
.options
.cf_paths
.empty()) {
1289 paths
.emplace_back(cf
.options
.cf_paths
[0].path
);
1292 // Remove duplicate paths.
1293 std::sort(paths
.begin(), paths
.end());
1294 paths
.erase(std::unique(paths
.begin(), paths
.end()), paths
.end());
1295 for (auto& path
: paths
) {
1296 std::vector
<std::string
> existing_files
;
1297 impl
->immutable_db_options_
.env
->GetChildren(path
, &existing_files
);
1298 for (auto& file_name
: existing_files
) {
1299 uint64_t file_number
;
1301 std::string file_path
= path
+ "/" + file_name
;
1302 if (ParseFileName(file_name
, &file_number
, &file_type
) &&
1303 file_type
== kTableFile
) {
1304 sfm
->OnAddFile(file_path
);
1309 // Reserve some disk buffer space. This is a heuristic - when we run out
1310 // of disk space, this ensures that there is atleast write_buffer_size
1311 // amount of free space before we resume DB writes. In low disk space
1312 // conditions, we want to avoid a lot of small L0 files due to frequent
1313 // WAL write failures and resultant forced flushes
1314 sfm
->ReserveDiskBuffer(max_write_buffer_size
,
1315 impl
->immutable_db_options_
.db_paths
[0].path
);
1317 #endif // !ROCKSDB_LITE
1320 ROCKS_LOG_HEADER(impl
->immutable_db_options_
.info_log
, "DB pointer %p",
1322 LogFlush(impl
->immutable_db_options_
.info_log
);
1323 assert(impl
->TEST_WALBufferIsEmpty());
1324 // If the assert above fails then we need to FlushWAL before returning
1325 // control back to the user.
1326 if (!persist_options_status
.ok()) {
1327 s
= Status::IOError(
1328 "DB::Open() failed --- Unable to persist Options file",
1329 persist_options_status
.ToString());
1333 impl
->StartTimedTasks();
1336 for (auto* h
: *handles
) {
1345 } // namespace rocksdb