1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #include "db/db_impl/db_impl.h"
13 #include "db/builder.h"
14 #include "db/error_handler.h"
15 #include "env/composite_env_wrapper.h"
16 #include "file/read_write_util.h"
17 #include "file/sst_file_manager_impl.h"
18 #include "file/writable_file_writer.h"
19 #include "monitoring/persistent_stats_history.h"
20 #include "options/options_helper.h"
21 #include "rocksdb/wal_filter.h"
22 #include "table/block_based/block_based_table_factory.h"
23 #include "test_util/sync_point.h"
24 #include "util/rate_limiter.h"
26 namespace ROCKSDB_NAMESPACE
{
27 Options
SanitizeOptions(const std::string
& dbname
, const Options
& src
) {
28 auto db_options
= SanitizeOptions(dbname
, DBOptions(src
));
29 ImmutableDBOptions
immutable_db_options(db_options
);
31 SanitizeOptions(immutable_db_options
, ColumnFamilyOptions(src
));
32 return Options(db_options
, cf_options
);
35 DBOptions
SanitizeOptions(const std::string
& dbname
, const DBOptions
& src
) {
36 DBOptions
result(src
);
38 if (result
.file_system
== nullptr) {
39 if (result
.env
== Env::Default()) {
40 result
.file_system
= FileSystem::Default();
42 result
.file_system
.reset(new LegacyFileSystemWrapper(result
.env
));
45 if (result
.env
== nullptr) {
46 result
.env
= Env::Default();
50 // result.max_open_files means an "infinite" open files.
51 if (result
.max_open_files
!= -1) {
52 int max_max_open_files
= port::GetMaxOpenFiles();
53 if (max_max_open_files
== -1) {
54 max_max_open_files
= 0x400000;
56 ClipToRange(&result
.max_open_files
, 20, max_max_open_files
);
57 TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
58 &result
.max_open_files
);
61 if (result
.info_log
== nullptr) {
62 Status s
= CreateLoggerFromOptions(dbname
, result
, &result
.info_log
);
64 // No place suitable for logging
65 result
.info_log
= nullptr;
69 if (!result
.write_buffer_manager
) {
70 result
.write_buffer_manager
.reset(
71 new WriteBufferManager(result
.db_write_buffer_size
));
73 auto bg_job_limits
= DBImpl::GetBGJobLimits(
74 result
.max_background_flushes
, result
.max_background_compactions
,
75 result
.max_background_jobs
, true /* parallelize_compactions */);
76 result
.env
->IncBackgroundThreadsIfNeeded(bg_job_limits
.max_compactions
,
78 result
.env
->IncBackgroundThreadsIfNeeded(bg_job_limits
.max_flushes
,
81 if (result
.rate_limiter
.get() != nullptr) {
82 if (result
.bytes_per_sync
== 0) {
83 result
.bytes_per_sync
= 1024 * 1024;
87 if (result
.delayed_write_rate
== 0) {
88 if (result
.rate_limiter
.get() != nullptr) {
89 result
.delayed_write_rate
= result
.rate_limiter
->GetBytesPerSecond();
91 if (result
.delayed_write_rate
== 0) {
92 result
.delayed_write_rate
= 16 * 1024 * 1024;
96 if (result
.WAL_ttl_seconds
> 0 || result
.WAL_size_limit_MB
> 0) {
97 result
.recycle_log_file_num
= false;
100 if (result
.recycle_log_file_num
&&
101 (result
.wal_recovery_mode
== WALRecoveryMode::kPointInTimeRecovery
||
102 result
.wal_recovery_mode
== WALRecoveryMode::kAbsoluteConsistency
)) {
103 // kPointInTimeRecovery is inconsistent with recycle log file feature since
104 // we define the "end" of the log as the first corrupt record we encounter.
105 // kAbsoluteConsistency doesn't make sense because even a clean
106 // shutdown leaves old junk at the end of the log file.
107 result
.recycle_log_file_num
= 0;
110 if (result
.wal_dir
.empty()) {
111 // Use dbname as default
112 result
.wal_dir
= dbname
;
114 if (result
.wal_dir
.back() == '/') {
115 result
.wal_dir
= result
.wal_dir
.substr(0, result
.wal_dir
.size() - 1);
118 if (result
.db_paths
.size() == 0) {
119 result
.db_paths
.emplace_back(dbname
, std::numeric_limits
<uint64_t>::max());
122 if (result
.use_direct_reads
&& result
.compaction_readahead_size
== 0) {
123 TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
124 result
.compaction_readahead_size
= 1024 * 1024 * 2;
127 if (result
.compaction_readahead_size
> 0 || result
.use_direct_reads
) {
128 result
.new_table_reader_for_compaction_inputs
= true;
131 // Force flush on DB open if 2PC is enabled, since with 2PC we have no
132 // guarantee that consecutive log files have consecutive sequence id, which
133 // make recovery complicated.
134 if (result
.allow_2pc
) {
135 result
.avoid_flush_during_recovery
= false;
139 ImmutableDBOptions
immutable_db_options(result
);
140 if (!IsWalDirSameAsDBPath(&immutable_db_options
)) {
141 // Either the WAL dir and db_paths[0]/db_name are not the same, or we
142 // cannot tell for sure. In either case, assume they're different and
143 // explicitly cleanup the trash log files (bypass DeleteScheduler)
144 // Do this first so even if we end up calling
145 // DeleteScheduler::CleanupDirectory on the same dir later, it will be
147 std::vector
<std::string
> filenames
;
148 result
.env
->GetChildren(result
.wal_dir
, &filenames
);
149 for (std::string
& filename
: filenames
) {
150 if (filename
.find(".log.trash", filename
.length() -
151 std::string(".log.trash").length()) !=
153 std::string trash_file
= result
.wal_dir
+ "/" + filename
;
154 result
.env
->DeleteFile(trash_file
);
158 // When the DB is stopped, it's possible that there are some .trash files that
159 // were not deleted yet, when we open the DB we will find these .trash files
160 // and schedule them to be deleted (or delete immediately if SstFileManager
162 auto sfm
= static_cast<SstFileManagerImpl
*>(result
.sst_file_manager
.get());
163 for (size_t i
= 0; i
< result
.db_paths
.size(); i
++) {
164 DeleteScheduler::CleanupDirectory(result
.env
, sfm
, result
.db_paths
[i
].path
);
167 // Create a default SstFileManager for purposes of tracking compaction size
168 // and facilitating recovery from out of space errors.
169 if (result
.sst_file_manager
.get() == nullptr) {
170 std::shared_ptr
<SstFileManager
> sst_file_manager(
171 NewSstFileManager(result
.env
, result
.info_log
));
172 result
.sst_file_manager
= sst_file_manager
;
176 if (!result
.paranoid_checks
) {
177 result
.skip_checking_sst_file_sizes_on_db_open
= true;
178 ROCKS_LOG_INFO(result
.info_log
,
179 "file size check will be skipped during open.");
186 Status
SanitizeOptionsByTable(
187 const DBOptions
& db_opts
,
188 const std::vector
<ColumnFamilyDescriptor
>& column_families
) {
190 for (auto cf
: column_families
) {
191 s
= cf
.options
.table_factory
->SanitizeOptions(db_opts
, cf
.options
);
200 Status
DBImpl::ValidateOptions(
201 const DBOptions
& db_options
,
202 const std::vector
<ColumnFamilyDescriptor
>& column_families
) {
204 for (auto& cfd
: column_families
) {
205 s
= ColumnFamilyData::ValidateOptions(db_options
, cfd
.options
);
210 s
= ValidateOptions(db_options
);
214 Status
DBImpl::ValidateOptions(const DBOptions
& db_options
) {
215 if (db_options
.db_paths
.size() > 4) {
216 return Status::NotSupported(
217 "More than four DB paths are not supported yet. ");
220 if (db_options
.allow_mmap_reads
&& db_options
.use_direct_reads
) {
221 // Protect against assert in PosixMMapReadableFile constructor
222 return Status::NotSupported(
223 "If memory mapped reads (allow_mmap_reads) are enabled "
224 "then direct I/O reads (use_direct_reads) must be disabled. ");
227 if (db_options
.allow_mmap_writes
&&
228 db_options
.use_direct_io_for_flush_and_compaction
) {
229 return Status::NotSupported(
230 "If memory mapped writes (allow_mmap_writes) are enabled "
231 "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
235 if (db_options
.keep_log_file_num
== 0) {
236 return Status::InvalidArgument("keep_log_file_num must be greater than 0");
239 if (db_options
.unordered_write
&&
240 !db_options
.allow_concurrent_memtable_write
) {
241 return Status::InvalidArgument(
242 "unordered_write is incompatible with !allow_concurrent_memtable_write");
245 if (db_options
.unordered_write
&& db_options
.enable_pipelined_write
) {
246 return Status::InvalidArgument(
247 "unordered_write is incompatible with enable_pipelined_write");
250 if (db_options
.atomic_flush
&& db_options
.enable_pipelined_write
) {
251 return Status::InvalidArgument(
252 "atomic_flush is incompatible with enable_pipelined_write");
258 Status
DBImpl::NewDB() {
260 Status s
= SetIdentityFile(env_
, dbname_
);
264 if (immutable_db_options_
.write_dbid_to_manifest
) {
265 std::string temp_db_id
;
266 GetDbIdentityFromIdentityFile(&temp_db_id
);
267 new_db
.SetDBId(temp_db_id
);
269 new_db
.SetLogNumber(0);
270 new_db
.SetNextFile(2);
271 new_db
.SetLastSequence(0);
273 ROCKS_LOG_INFO(immutable_db_options_
.info_log
, "Creating manifest 1 \n");
274 const std::string manifest
= DescriptorFileName(dbname_
, 1);
276 std::unique_ptr
<FSWritableFile
> file
;
277 FileOptions file_options
= fs_
->OptimizeForManifestWrite(file_options_
);
278 s
= NewWritableFile(fs_
.get(), manifest
, &file
, file_options
);
282 file
->SetPreallocationBlockSize(
283 immutable_db_options_
.manifest_preallocation_size
);
284 std::unique_ptr
<WritableFileWriter
> file_writer(new WritableFileWriter(
285 std::move(file
), manifest
, file_options
, env_
, nullptr /* stats */,
286 immutable_db_options_
.listeners
));
287 log::Writer
log(std::move(file_writer
), 0, false);
289 new_db
.EncodeTo(&record
);
290 s
= log
.AddRecord(record
);
292 s
= SyncManifest(env_
, &immutable_db_options_
, log
.file());
296 // Make "CURRENT" file that points to the new manifest file.
297 s
= SetCurrentFile(env_
, dbname_
, 1, directories_
.GetDbDir());
299 fs_
->DeleteFile(manifest
, IOOptions(), nullptr);
304 Status
DBImpl::CreateAndNewDirectory(Env
* env
, const std::string
& dirname
,
305 std::unique_ptr
<Directory
>* directory
) {
306 // We call CreateDirIfMissing() as the directory may already exist (if we
307 // are reopening a DB), when this happens we don't want creating the
308 // directory to cause an error. However, we need to check if creating the
309 // directory fails or else we may get an obscure message about the lock
310 // file not existing. One real-world example of this occurring is if
311 // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
312 // when dbname_ is "dir/db" but when "dir" doesn't exist.
313 Status s
= env
->CreateDirIfMissing(dirname
);
317 return env
->NewDirectory(dirname
, directory
);
320 Status
Directories::SetDirectories(Env
* env
, const std::string
& dbname
,
321 const std::string
& wal_dir
,
322 const std::vector
<DbPath
>& data_paths
) {
323 Status s
= DBImpl::CreateAndNewDirectory(env
, dbname
, &db_dir_
);
327 if (!wal_dir
.empty() && dbname
!= wal_dir
) {
328 s
= DBImpl::CreateAndNewDirectory(env
, wal_dir
, &wal_dir_
);
335 for (auto& p
: data_paths
) {
336 const std::string db_path
= p
.path
;
337 if (db_path
== dbname
) {
338 data_dirs_
.emplace_back(nullptr);
340 std::unique_ptr
<Directory
> path_directory
;
341 s
= DBImpl::CreateAndNewDirectory(env
, db_path
, &path_directory
);
345 data_dirs_
.emplace_back(path_directory
.release());
348 assert(data_dirs_
.size() == data_paths
.size());
352 Status
DBImpl::Recover(
353 const std::vector
<ColumnFamilyDescriptor
>& column_families
, bool read_only
,
354 bool error_if_log_file_exist
, bool error_if_data_exists_in_logs
,
355 uint64_t* recovered_seq
) {
358 bool is_new_db
= false;
359 assert(db_lock_
== nullptr);
361 Status s
= directories_
.SetDirectories(env_
, dbname_
,
362 immutable_db_options_
.wal_dir
,
363 immutable_db_options_
.db_paths
);
368 s
= env_
->LockFile(LockFileName(dbname_
), &db_lock_
);
373 std::string current_fname
= CurrentFileName(dbname_
);
374 s
= env_
->FileExists(current_fname
);
375 if (s
.IsNotFound()) {
376 if (immutable_db_options_
.create_if_missing
) {
383 return Status::InvalidArgument(
384 current_fname
, "does not exist (create_if_missing is false)");
387 if (immutable_db_options_
.error_if_exists
) {
388 return Status::InvalidArgument(dbname_
,
389 "exists (error_if_exists is true)");
392 // Unexpected error reading file
393 assert(s
.IsIOError());
396 // Verify compatibility of file_options_ and filesystem
398 std::unique_ptr
<FSRandomAccessFile
> idfile
;
399 FileOptions
customized_fs(file_options_
);
400 customized_fs
.use_direct_reads
|=
401 immutable_db_options_
.use_direct_io_for_flush_and_compaction
;
402 s
= fs_
->NewRandomAccessFile(current_fname
, customized_fs
, &idfile
,
405 std::string error_str
= s
.ToString();
406 // Check if unsupported Direct I/O is the root cause
407 customized_fs
.use_direct_reads
= false;
408 s
= fs_
->NewRandomAccessFile(current_fname
, customized_fs
, &idfile
,
411 return Status::InvalidArgument(
412 "Direct I/O is not supported by the specified DB.");
414 return Status::InvalidArgument(
415 "Found options incompatible with filesystem", error_str
.c_str());
420 assert(db_id_
.empty());
421 Status s
= versions_
->Recover(column_families
, read_only
, &db_id_
);
425 // Happens when immutable_db_options_.write_dbid_to_manifest is set to true
426 // the very first time.
427 if (db_id_
.empty()) {
428 // Check for the IDENTITY file and create it if not there.
429 s
= fs_
->FileExists(IdentityFileName(dbname_
), IOOptions(), nullptr);
430 // Typically Identity file is created in NewDB() and for some reason if
431 // it is no longer available then at this point DB ID is not in Identity
433 if (s
.IsNotFound()) {
434 s
= SetIdentityFile(env_
, dbname_
);
438 } else if (!s
.ok()) {
439 assert(s
.IsIOError());
442 s
= GetDbIdentityFromIdentityFile(&db_id_
);
443 if (immutable_db_options_
.write_dbid_to_manifest
&& s
.ok()) {
445 edit
.SetDBId(db_id_
);
447 MutableCFOptions
mutable_cf_options(options
);
448 versions_
->db_id_
= db_id_
;
449 s
= versions_
->LogAndApply(versions_
->GetColumnFamilySet()->GetDefault(),
450 mutable_cf_options
, &edit
, &mutex_
, nullptr,
454 s
= SetIdentityFile(env_
, dbname_
, db_id_
);
457 if (immutable_db_options_
.paranoid_checks
&& s
.ok()) {
458 s
= CheckConsistency();
460 if (s
.ok() && !read_only
) {
461 std::map
<std::string
, std::shared_ptr
<Directory
>> created_dirs
;
462 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
463 s
= cfd
->AddDirectories(&created_dirs
);
469 // DB mutex is already held
470 if (s
.ok() && immutable_db_options_
.persist_stats_to_disk
) {
471 s
= InitPersistStatsColumnFamily();
475 // Initial max_total_in_memory_state_ before recovery logs. Log recovery
476 // may check this value to decide whether to flush.
477 max_total_in_memory_state_
= 0;
478 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
479 auto* mutable_cf_options
= cfd
->GetLatestMutableCFOptions();
480 max_total_in_memory_state_
+= mutable_cf_options
->write_buffer_size
*
481 mutable_cf_options
->max_write_buffer_number
;
484 SequenceNumber
next_sequence(kMaxSequenceNumber
);
485 default_cf_handle_
= new ColumnFamilyHandleImpl(
486 versions_
->GetColumnFamilySet()->GetDefault(), this, &mutex_
);
487 default_cf_internal_stats_
= default_cf_handle_
->cfd()->internal_stats();
488 // TODO(Zhongyi): handle single_column_family_mode_ when
489 // persistent_stats is enabled
490 single_column_family_mode_
=
491 versions_
->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
493 // Recover from all newer log files than the ones named in the
494 // descriptor (new log files may have been added by the previous
495 // incarnation without registering them in the descriptor).
497 // Note that prev_log_number() is no longer used, but we pay
498 // attention to it in case we are recovering a database
499 // produced by an older version of rocksdb.
500 std::vector
<std::string
> filenames
;
501 s
= env_
->GetChildren(immutable_db_options_
.wal_dir
, &filenames
);
502 if (s
.IsNotFound()) {
503 return Status::InvalidArgument("wal_dir not found",
504 immutable_db_options_
.wal_dir
);
505 } else if (!s
.ok()) {
509 std::vector
<uint64_t> logs
;
510 for (size_t i
= 0; i
< filenames
.size(); i
++) {
513 if (ParseFileName(filenames
[i
], &number
, &type
) && type
== kLogFile
) {
515 return Status::Corruption(
516 "While creating a new Db, wal_dir contains "
517 "existing log file: ",
520 logs
.push_back(number
);
525 if (logs
.size() > 0) {
526 if (error_if_log_file_exist
) {
527 return Status::Corruption(
528 "The db was opened in readonly mode with error_if_log_file_exist"
529 "flag but a log file already exists");
530 } else if (error_if_data_exists_in_logs
) {
531 for (auto& log
: logs
) {
532 std::string fname
= LogFileName(immutable_db_options_
.wal_dir
, log
);
534 s
= env_
->GetFileSize(fname
, &bytes
);
537 return Status::Corruption(
538 "error_if_data_exists_in_logs is set but there are data "
547 // Recover in the order in which the logs were generated
548 std::sort(logs
.begin(), logs
.end());
549 bool corrupted_log_found
= false;
550 s
= RecoverLogFiles(logs
, &next_sequence
, read_only
,
551 &corrupted_log_found
);
552 if (corrupted_log_found
&& recovered_seq
!= nullptr) {
553 *recovered_seq
= next_sequence
;
556 // Clear memtables if recovery failed
557 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
558 cfd
->CreateNewMemtable(*cfd
->GetLatestMutableCFOptions(),
566 // If we are opening as read-only, we need to update options_file_number_
567 // to reflect the most recent OPTIONS file. It does not matter for regular
568 // read-write db instance because options_file_number_ will later be
569 // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
570 std::vector
<std::string
> file_names
;
572 s
= env_
->GetChildren(GetName(), &file_names
);
576 uint64_t options_file_number
= 0;
578 for (const auto& fname
: file_names
) {
579 if (ParseFileName(fname
, &number
, &type
) && type
== kOptionsFile
) {
580 options_file_number
= std::max(number
, options_file_number
);
583 versions_
->options_file_number_
= options_file_number
;
590 Status
DBImpl::PersistentStatsProcessFormatVersion() {
593 // persist version when stats CF doesn't exist
594 bool should_persist_format_version
= !persistent_stats_cfd_exists_
;
596 if (persistent_stats_cfd_exists_
) {
597 // Check persistent stats format version compatibility. Drop and recreate
598 // persistent stats CF if format version is incompatible
599 uint64_t format_version_recovered
= 0;
600 Status s_format
= DecodePersistentStatsVersionNumber(
601 this, StatsVersionKeyType::kFormatVersion
, &format_version_recovered
);
602 uint64_t compatible_version_recovered
= 0;
603 Status s_compatible
= DecodePersistentStatsVersionNumber(
604 this, StatsVersionKeyType::kCompatibleVersion
,
605 &compatible_version_recovered
);
606 // abort reading from existing stats CF if any of following is true:
607 // 1. failed to read format version or compatible version from disk
608 // 2. sst's format version is greater than current format version, meaning
609 // this sst is encoded with a newer RocksDB release, and current compatible
610 // version is below the sst's compatible version
611 if (!s_format
.ok() || !s_compatible
.ok() ||
612 (kStatsCFCurrentFormatVersion
< format_version_recovered
&&
613 kStatsCFCompatibleFormatVersion
< compatible_version_recovered
)) {
614 if (!s_format
.ok() || !s_compatible
.ok()) {
616 immutable_db_options_
.info_log
,
617 "Reading persistent stats version key failed. Format key: %s, "
618 "compatible key: %s",
619 s_format
.ToString().c_str(), s_compatible
.ToString().c_str());
622 immutable_db_options_
.info_log
,
623 "Disable persistent stats due to corrupted or incompatible format "
626 DropColumnFamily(persist_stats_cf_handle_
);
627 DestroyColumnFamilyHandle(persist_stats_cf_handle_
);
628 ColumnFamilyHandle
* handle
= nullptr;
629 ColumnFamilyOptions cfo
;
630 OptimizeForPersistentStats(&cfo
);
631 s
= CreateColumnFamily(cfo
, kPersistentStatsColumnFamilyName
, &handle
);
632 persist_stats_cf_handle_
= static_cast<ColumnFamilyHandleImpl
*>(handle
);
633 // should also persist version here because old stats CF is discarded
634 should_persist_format_version
= true;
637 if (s
.ok() && should_persist_format_version
) {
638 // Persistent stats CF being created for the first time, need to write
639 // format version key
641 batch
.Put(persist_stats_cf_handle_
, kFormatVersionKeyString
,
642 ToString(kStatsCFCurrentFormatVersion
));
643 batch
.Put(persist_stats_cf_handle_
, kCompatibleVersionKeyString
,
644 ToString(kStatsCFCompatibleFormatVersion
));
647 wo
.no_slowdown
= true;
649 s
= Write(wo
, &batch
);
655 Status
DBImpl::InitPersistStatsColumnFamily() {
657 assert(!persist_stats_cf_handle_
);
658 ColumnFamilyData
* persistent_stats_cfd
=
659 versions_
->GetColumnFamilySet()->GetColumnFamily(
660 kPersistentStatsColumnFamilyName
);
661 persistent_stats_cfd_exists_
= persistent_stats_cfd
!= nullptr;
664 if (persistent_stats_cfd
!= nullptr) {
665 // We are recovering from a DB which already contains persistent stats CF,
666 // the CF is already created in VersionSet::ApplyOneVersionEdit, but
667 // column family handle was not. Need to explicitly create handle here.
668 persist_stats_cf_handle_
=
669 new ColumnFamilyHandleImpl(persistent_stats_cfd
, this, &mutex_
);
672 ColumnFamilyHandle
* handle
= nullptr;
673 ColumnFamilyOptions cfo
;
674 OptimizeForPersistentStats(&cfo
);
675 s
= CreateColumnFamily(cfo
, kPersistentStatsColumnFamilyName
, &handle
);
676 persist_stats_cf_handle_
= static_cast<ColumnFamilyHandleImpl
*>(handle
);
682 // REQUIRES: log_numbers are sorted in ascending order
683 Status
DBImpl::RecoverLogFiles(const std::vector
<uint64_t>& log_numbers
,
684 SequenceNumber
* next_sequence
, bool read_only
,
685 bool* corrupted_log_found
) {
686 struct LogReporter
: public log::Reader::Reporter
{
690 Status
* status
; // nullptr if immutable_db_options_.paranoid_checks==false
691 void Corruption(size_t bytes
, const Status
& s
) override
{
692 ROCKS_LOG_WARN(info_log
, "%s%s: dropping %d bytes; %s",
693 (this->status
== nullptr ? "(ignoring error) " : ""),
694 fname
, static_cast<int>(bytes
), s
.ToString().c_str());
695 if (this->status
!= nullptr && this->status
->ok()) {
703 std::unordered_map
<int, VersionEdit
> version_edits
;
704 // no need to refcount because iteration is under mutex
705 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
707 edit
.SetColumnFamily(cfd
->GetID());
708 version_edits
.insert({cfd
->GetID(), edit
});
710 int job_id
= next_job_id_
.fetch_add(1);
712 auto stream
= event_logger_
.Log();
713 stream
<< "job" << job_id
<< "event"
714 << "recovery_started";
715 stream
<< "log_files";
717 for (auto log_number
: log_numbers
) {
718 stream
<< log_number
;
724 if (immutable_db_options_
.wal_filter
!= nullptr) {
725 std::map
<std::string
, uint32_t> cf_name_id_map
;
726 std::map
<uint32_t, uint64_t> cf_lognumber_map
;
727 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
728 cf_name_id_map
.insert(std::make_pair(cfd
->GetName(), cfd
->GetID()));
729 cf_lognumber_map
.insert(
730 std::make_pair(cfd
->GetID(), cfd
->GetLogNumber()));
733 immutable_db_options_
.wal_filter
->ColumnFamilyLogNumberMap(cf_lognumber_map
,
738 bool stop_replay_by_wal_filter
= false;
739 bool stop_replay_for_corruption
= false;
740 bool flushed
= false;
741 uint64_t corrupted_log_number
= kMaxSequenceNumber
;
742 uint64_t min_log_number
= MinLogNumberToKeep();
743 for (auto log_number
: log_numbers
) {
744 if (log_number
< min_log_number
) {
745 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
746 "Skipping log #%" PRIu64
747 " since it is older than min log to keep #%" PRIu64
,
748 log_number
, min_log_number
);
751 // The previous incarnation may not have written any MANIFEST
752 // records after allocating this log number. So we manually
753 // update the file number allocation counter in VersionSet.
754 versions_
->MarkFileNumberUsed(log_number
);
756 std::string fname
= LogFileName(immutable_db_options_
.wal_dir
, log_number
);
758 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
759 "Recovering log #%" PRIu64
" mode %d", log_number
,
760 static_cast<int>(immutable_db_options_
.wal_recovery_mode
));
761 auto logFileDropped
= [this, &fname
]() {
763 if (env_
->GetFileSize(fname
, &bytes
).ok()) {
764 auto info_log
= immutable_db_options_
.info_log
.get();
765 ROCKS_LOG_WARN(info_log
, "%s: dropping %d bytes", fname
.c_str(),
766 static_cast<int>(bytes
));
769 if (stop_replay_by_wal_filter
) {
774 std::unique_ptr
<SequentialFileReader
> file_reader
;
776 std::unique_ptr
<FSSequentialFile
> file
;
777 status
= fs_
->NewSequentialFile(fname
,
778 fs_
->OptimizeForLogRead(file_options_
),
781 MaybeIgnoreError(&status
);
785 // Fail with one log file, but that's ok.
790 file_reader
.reset(new SequentialFileReader(
791 std::move(file
), fname
, immutable_db_options_
.log_readahead_size
));
794 // Create the log reader.
795 LogReporter reporter
;
797 reporter
.info_log
= immutable_db_options_
.info_log
.get();
798 reporter
.fname
= fname
.c_str();
799 if (!immutable_db_options_
.paranoid_checks
||
800 immutable_db_options_
.wal_recovery_mode
==
801 WALRecoveryMode::kSkipAnyCorruptedRecords
) {
802 reporter
.status
= nullptr;
804 reporter
.status
= &status
;
806 // We intentially make log::Reader do checksumming even if
807 // paranoid_checks==false so that corruptions cause entire commits
808 // to be skipped instead of propagating bad information (like overly
809 // large sequence numbers).
810 log::Reader
reader(immutable_db_options_
.info_log
, std::move(file_reader
),
811 &reporter
, true /*checksum*/, log_number
);
813 // Determine if we should tolerate incomplete records at the tail end of the
814 // Read all the records and add to a memtable
819 while (!stop_replay_by_wal_filter
&&
820 reader
.ReadRecord(&record
, &scratch
,
821 immutable_db_options_
.wal_recovery_mode
) &&
823 if (record
.size() < WriteBatchInternal::kHeader
) {
824 reporter
.Corruption(record
.size(),
825 Status::Corruption("log record too small"));
828 WriteBatchInternal::SetContents(&batch
, record
);
829 SequenceNumber sequence
= WriteBatchInternal::Sequence(&batch
);
831 if (immutable_db_options_
.wal_recovery_mode
==
832 WALRecoveryMode::kPointInTimeRecovery
) {
833 // In point-in-time recovery mode, if sequence id of log files are
834 // consecutive, we continue recovery despite corruption. This could
835 // happen when we open and write to a corrupted DB, where sequence id
836 // will start from the last sequence id we recovered.
837 if (sequence
== *next_sequence
) {
838 stop_replay_for_corruption
= false;
840 if (stop_replay_for_corruption
) {
847 if (immutable_db_options_
.wal_filter
!= nullptr) {
848 WriteBatch new_batch
;
849 bool batch_changed
= false;
851 WalFilter::WalProcessingOption wal_processing_option
=
852 immutable_db_options_
.wal_filter
->LogRecordFound(
853 log_number
, fname
, batch
, &new_batch
, &batch_changed
);
855 switch (wal_processing_option
) {
856 case WalFilter::WalProcessingOption::kContinueProcessing
:
857 // do nothing, proceeed normally
859 case WalFilter::WalProcessingOption::kIgnoreCurrentRecord
:
860 // skip current record
862 case WalFilter::WalProcessingOption::kStopReplay
:
863 // skip current record and stop replay
864 stop_replay_by_wal_filter
= true;
866 case WalFilter::WalProcessingOption::kCorruptedRecord
: {
868 Status::Corruption("Corruption reported by Wal Filter ",
869 immutable_db_options_
.wal_filter
->Name());
870 MaybeIgnoreError(&status
);
872 reporter
.Corruption(record
.size(), status
);
878 assert(false); // unhandled case
879 status
= Status::NotSupported(
880 "Unknown WalProcessingOption returned"
882 immutable_db_options_
.wal_filter
->Name());
883 MaybeIgnoreError(&status
);
887 // Ignore the error with current record processing.
894 // Make sure that the count in the new batch is
895 // within the orignal count.
896 int new_count
= WriteBatchInternal::Count(&new_batch
);
897 int original_count
= WriteBatchInternal::Count(&batch
);
898 if (new_count
> original_count
) {
900 immutable_db_options_
.info_log
,
901 "Recovering log #%" PRIu64
902 " mode %d log filter %s returned "
903 "more records (%d) than original (%d) which is not allowed. "
904 "Aborting recovery.",
906 static_cast<int>(immutable_db_options_
.wal_recovery_mode
),
907 immutable_db_options_
.wal_filter
->Name(), new_count
,
909 status
= Status::NotSupported(
910 "More than original # of records "
911 "returned by Wal Filter ",
912 immutable_db_options_
.wal_filter
->Name());
915 // Set the same sequence number in the new_batch
916 // as the original batch.
917 WriteBatchInternal::SetSequence(&new_batch
,
918 WriteBatchInternal::Sequence(&batch
));
922 #endif // ROCKSDB_LITE
924 // If column family was not found, it might mean that the WAL write
925 // batch references to the column family that was dropped after the
926 // insert. We don't want to fail the whole write batch in that case --
927 // we just ignore the update.
928 // That's why we set ignore missing column families to true
929 bool has_valid_writes
= false;
930 status
= WriteBatchInternal::InsertInto(
931 &batch
, column_family_memtables_
.get(), &flush_scheduler_
,
932 &trim_history_scheduler_
, true, log_number
, this,
933 false /* concurrent_memtable_writes */, next_sequence
,
934 &has_valid_writes
, seq_per_batch_
, batch_per_txn_
);
935 MaybeIgnoreError(&status
);
937 // We are treating this as a failure while reading since we read valid
938 // blocks that do not form coherent data
939 reporter
.Corruption(record
.size(), status
);
943 if (has_valid_writes
&& !read_only
) {
944 // we can do this because this is called before client has access to the
945 // DB and there is only a single thread operating on DB
946 ColumnFamilyData
* cfd
;
948 while ((cfd
= flush_scheduler_
.TakeNextColumnFamily()) != nullptr) {
949 cfd
->UnrefAndTryDelete();
950 // If this asserts, it means that InsertInto failed in
951 // filtering updates to already-flushed column families
952 assert(cfd
->GetLogNumber() <= log_number
);
953 auto iter
= version_edits
.find(cfd
->GetID());
954 assert(iter
!= version_edits
.end());
955 VersionEdit
* edit
= &iter
->second
;
956 status
= WriteLevel0TableForRecovery(job_id
, cfd
, cfd
->mem(), edit
);
958 // Reflect errors immediately so that conditions like full
959 // file-systems cause the DB::Open() to fail.
964 cfd
->CreateNewMemtable(*cfd
->GetLatestMutableCFOptions(),
971 if (status
.IsNotSupported()) {
972 // We should not treat NotSupported as corruption. It is rather a clear
973 // sign that we are processing a WAL that is produced by an incompatible
974 // version of the code.
977 if (immutable_db_options_
.wal_recovery_mode
==
978 WALRecoveryMode::kSkipAnyCorruptedRecords
) {
979 // We should ignore all errors unconditionally
980 status
= Status::OK();
981 } else if (immutable_db_options_
.wal_recovery_mode
==
982 WALRecoveryMode::kPointInTimeRecovery
) {
983 // We should ignore the error but not continue replaying
984 status
= Status::OK();
985 stop_replay_for_corruption
= true;
986 corrupted_log_number
= log_number
;
987 if (corrupted_log_found
!= nullptr) {
988 *corrupted_log_found
= true;
990 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
991 "Point in time recovered to log #%" PRIu64
993 log_number
, *next_sequence
);
995 assert(immutable_db_options_
.wal_recovery_mode
==
996 WALRecoveryMode::kTolerateCorruptedTailRecords
||
997 immutable_db_options_
.wal_recovery_mode
==
998 WALRecoveryMode::kAbsoluteConsistency
);
1003 flush_scheduler_
.Clear();
1004 trim_history_scheduler_
.Clear();
1005 auto last_sequence
= *next_sequence
- 1;
1006 if ((*next_sequence
!= kMaxSequenceNumber
) &&
1007 (versions_
->LastSequence() <= last_sequence
)) {
1008 versions_
->SetLastAllocatedSequence(last_sequence
);
1009 versions_
->SetLastPublishedSequence(last_sequence
);
1010 versions_
->SetLastSequence(last_sequence
);
1013 // Compare the corrupted log number to all columnfamily's current log number.
1014 // Abort Open() if any column family's log number is greater than
1015 // the corrupted log number, which means CF contains data beyond the point of
1016 // corruption. This could during PIT recovery when the WAL is corrupted and
1017 // some (but not all) CFs are flushed
1018 // Exclude the PIT case where no log is dropped after the corruption point.
1019 // This is to cover the case for empty logs after corrupted log, in which we
1020 // don't reset stop_replay_for_corruption.
1021 if (stop_replay_for_corruption
== true &&
1022 (immutable_db_options_
.wal_recovery_mode
==
1023 WALRecoveryMode::kPointInTimeRecovery
||
1024 immutable_db_options_
.wal_recovery_mode
==
1025 WALRecoveryMode::kTolerateCorruptedTailRecords
)) {
1026 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
1027 if (cfd
->GetLogNumber() > corrupted_log_number
) {
1028 ROCKS_LOG_ERROR(immutable_db_options_
.info_log
,
1029 "Column family inconsistency: SST file contains data"
1030 " beyond the point of corruption.");
1031 return Status::Corruption("SST file is ahead of WALs");
1036 // True if there's any data in the WALs; if not, we can skip re-processing
1038 bool data_seen
= false;
1040 // no need to refcount since client still doesn't have access
1041 // to the DB and can not drop column families while we iterate
1042 auto max_log_number
= log_numbers
.back();
1043 for (auto cfd
: *versions_
->GetColumnFamilySet()) {
1044 auto iter
= version_edits
.find(cfd
->GetID());
1045 assert(iter
!= version_edits
.end());
1046 VersionEdit
* edit
= &iter
->second
;
1048 if (cfd
->GetLogNumber() > max_log_number
) {
1049 // Column family cfd has already flushed the data
1050 // from all logs. Memtable has to be empty because
1051 // we filter the updates based on log_number
1052 // (in WriteBatch::InsertInto)
1053 assert(cfd
->mem()->GetFirstSequenceNumber() == 0);
1054 assert(edit
->NumEntries() == 0);
1058 TEST_SYNC_POINT_CALLBACK(
1059 "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);
1061 // flush the final memtable (if non-empty)
1062 if (cfd
->mem()->GetFirstSequenceNumber() != 0) {
1063 // If flush happened in the middle of recovery (e.g. due to memtable
1064 // being full), we flush at the end. Otherwise we'll need to record
1065 // where we were on last flush, which make the logic complicated.
1066 if (flushed
|| !immutable_db_options_
.avoid_flush_during_recovery
) {
1067 status
= WriteLevel0TableForRecovery(job_id
, cfd
, cfd
->mem(), edit
);
1074 cfd
->CreateNewMemtable(*cfd
->GetLatestMutableCFOptions(),
1075 versions_
->LastSequence());
1080 // Update the log number info in the version edit corresponding to this
1081 // column family. Note that the version edits will be written to MANIFEST
1083 // writing log_number in the manifest means that any log file
1084 // with number strongly less than (log_number + 1) is already
1085 // recovered and should be ignored on next reincarnation.
1086 // Since we already recovered max_log_number, we want all logs
1087 // with numbers `<= max_log_number` (includes this one) to be ignored
1088 if (flushed
|| cfd
->mem()->GetFirstSequenceNumber() == 0) {
1089 edit
->SetLogNumber(max_log_number
+ 1);
1093 // we must mark the next log number as used, even though it's
1094 // not actually used. that is because VersionSet assumes
1095 // VersionSet::next_file_number_ always to be strictly greater than any
1097 versions_
->MarkFileNumberUsed(max_log_number
+ 1);
1099 autovector
<ColumnFamilyData
*> cfds
;
1100 autovector
<const MutableCFOptions
*> cf_opts
;
1101 autovector
<autovector
<VersionEdit
*>> edit_lists
;
1102 for (auto* cfd
: *versions_
->GetColumnFamilySet()) {
1103 cfds
.push_back(cfd
);
1104 cf_opts
.push_back(cfd
->GetLatestMutableCFOptions());
1105 auto iter
= version_edits
.find(cfd
->GetID());
1106 assert(iter
!= version_edits
.end());
1107 edit_lists
.push_back({&iter
->second
});
1109 // write MANIFEST with update
1110 status
= versions_
->LogAndApply(cfds
, cf_opts
, edit_lists
, &mutex_
,
1111 directories_
.GetDbDir(),
1112 /*new_descriptor_log=*/true);
1116 if (status
.ok() && data_seen
&& !flushed
) {
1117 status
= RestoreAliveLogFiles(log_numbers
);
1120 event_logger_
.Log() << "job" << job_id
<< "event"
1121 << "recovery_finished";
1126 Status
DBImpl::RestoreAliveLogFiles(const std::vector
<uint64_t>& log_numbers
) {
1127 if (log_numbers
.empty()) {
1128 return Status::OK();
1131 mutex_
.AssertHeld();
1132 assert(immutable_db_options_
.avoid_flush_during_recovery
);
1133 if (two_write_queues_
) {
1134 log_write_mutex_
.Lock();
1136 // Mark these as alive so they'll be considered for deletion later by
1137 // FindObsoleteFiles()
1138 total_log_size_
= 0;
1140 for (auto log_number
: log_numbers
) {
1141 LogFileNumberSize
log(log_number
);
1142 std::string fname
= LogFileName(immutable_db_options_
.wal_dir
, log_number
);
1143 // This gets the appear size of the logs, not including preallocated space.
1144 s
= env_
->GetFileSize(fname
, &log
.size
);
1148 total_log_size_
+= log
.size
;
1149 alive_log_files_
.push_back(log
);
1150 // We preallocate space for logs, but then after a crash and restart, those
1151 // preallocated space are not needed anymore. It is likely only the last
1152 // log has such preallocated space, so we only truncate for the last log.
1153 if (log_number
== log_numbers
.back()) {
1154 std::unique_ptr
<FSWritableFile
> last_log
;
1155 Status truncate_status
= fs_
->ReopenWritableFile(
1157 fs_
->OptimizeForLogWrite(
1159 BuildDBOptions(immutable_db_options_
, mutable_db_options_
)),
1160 &last_log
, nullptr);
1161 if (truncate_status
.ok()) {
1162 truncate_status
= last_log
->Truncate(log
.size
, IOOptions(), nullptr);
1164 if (truncate_status
.ok()) {
1165 truncate_status
= last_log
->Close(IOOptions(), nullptr);
1167 // Not a critical error if fail to truncate.
1168 if (!truncate_status
.ok()) {
1169 ROCKS_LOG_WARN(immutable_db_options_
.info_log
,
1170 "Failed to truncate log #%" PRIu64
": %s", log_number
,
1171 truncate_status
.ToString().c_str());
1175 if (two_write_queues_
) {
1176 log_write_mutex_
.Unlock();
1181 Status
DBImpl::WriteLevel0TableForRecovery(int job_id
, ColumnFamilyData
* cfd
,
1182 MemTable
* mem
, VersionEdit
* edit
) {
1183 mutex_
.AssertHeld();
1184 const uint64_t start_micros
= env_
->NowMicros();
1186 std::unique_ptr
<std::list
<uint64_t>::iterator
> pending_outputs_inserted_elem(
1187 new std::list
<uint64_t>::iterator(
1188 CaptureCurrentFileNumberInPendingOutputs()));
1189 meta
.fd
= FileDescriptor(versions_
->NewFileNumber(), 0, 0);
1191 ro
.total_order_seek
= true;
1194 TableProperties table_properties
;
1196 ScopedArenaIterator
iter(mem
->NewIterator(ro
, &arena
));
1197 ROCKS_LOG_DEBUG(immutable_db_options_
.info_log
,
1198 "[%s] [WriteLevel0TableForRecovery]"
1199 " Level-0 table #%" PRIu64
": started",
1200 cfd
->GetName().c_str(), meta
.fd
.GetNumber());
1202 // Get the latest mutable cf options while the mutex is still locked
1203 const MutableCFOptions mutable_cf_options
=
1204 *cfd
->GetLatestMutableCFOptions();
1205 bool paranoid_file_checks
=
1206 cfd
->GetLatestMutableCFOptions()->paranoid_file_checks
;
1208 int64_t _current_time
= 0;
1209 env_
->GetCurrentTime(&_current_time
); // ignore error
1210 const uint64_t current_time
= static_cast<uint64_t>(_current_time
);
1211 meta
.oldest_ancester_time
= current_time
;
1214 auto write_hint
= cfd
->CalculateSSTWriteHint(0);
1217 SequenceNumber earliest_write_conflict_snapshot
;
1218 std::vector
<SequenceNumber
> snapshot_seqs
=
1219 snapshots_
.GetAll(&earliest_write_conflict_snapshot
);
1220 auto snapshot_checker
= snapshot_checker_
.get();
1221 if (use_custom_gc_
&& snapshot_checker
== nullptr) {
1222 snapshot_checker
= DisableGCSnapshotChecker::Instance();
1224 std::vector
<std::unique_ptr
<FragmentedRangeTombstoneIterator
>>
1226 auto range_del_iter
=
1227 mem
->NewRangeTombstoneIterator(ro
, kMaxSequenceNumber
);
1228 if (range_del_iter
!= nullptr) {
1229 range_del_iters
.emplace_back(range_del_iter
);
1232 dbname_
, env_
, fs_
.get(), *cfd
->ioptions(), mutable_cf_options
,
1233 file_options_for_compaction_
, cfd
->table_cache(), iter
.get(),
1234 std::move(range_del_iters
), &meta
, cfd
->internal_comparator(),
1235 cfd
->int_tbl_prop_collector_factories(), cfd
->GetID(), cfd
->GetName(),
1236 snapshot_seqs
, earliest_write_conflict_snapshot
, snapshot_checker
,
1237 GetCompressionFlush(*cfd
->ioptions(), mutable_cf_options
),
1238 mutable_cf_options
.sample_for_compression
,
1239 cfd
->ioptions()->compression_opts
, paranoid_file_checks
,
1240 cfd
->internal_stats(), TableFileCreationReason::kRecovery
,
1241 &event_logger_
, job_id
, Env::IO_HIGH
, nullptr /* table_properties */,
1242 -1 /* level */, current_time
, write_hint
);
1243 LogFlush(immutable_db_options_
.info_log
);
1244 ROCKS_LOG_DEBUG(immutable_db_options_
.info_log
,
1245 "[%s] [WriteLevel0TableForRecovery]"
1246 " Level-0 table #%" PRIu64
": %" PRIu64
" bytes %s",
1247 cfd
->GetName().c_str(), meta
.fd
.GetNumber(),
1248 meta
.fd
.GetFileSize(), s
.ToString().c_str());
1252 ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem
);
1254 // Note that if file_size is zero, the file has been deleted and
1255 // should not be added to the manifest.
1257 if (s
.ok() && meta
.fd
.GetFileSize() > 0) {
1258 edit
->AddFile(level
, meta
.fd
.GetNumber(), meta
.fd
.GetPathId(),
1259 meta
.fd
.GetFileSize(), meta
.smallest
, meta
.largest
,
1260 meta
.fd
.smallest_seqno
, meta
.fd
.largest_seqno
,
1261 meta
.marked_for_compaction
, meta
.oldest_blob_file_number
,
1262 meta
.oldest_ancester_time
, meta
.file_creation_time
,
1263 meta
.file_checksum
, meta
.file_checksum_func_name
);
1266 InternalStats::CompactionStats
stats(CompactionReason::kFlush
, 1);
1267 stats
.micros
= env_
->NowMicros() - start_micros
;
1268 stats
.bytes_written
= meta
.fd
.GetFileSize();
1269 stats
.num_output_files
= 1;
1270 cfd
->internal_stats()->AddCompactionStats(level
, Env::Priority::USER
, stats
);
1271 cfd
->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED
,
1272 meta
.fd
.GetFileSize());
1273 RecordTick(stats_
, COMPACT_WRITE_BYTES
, meta
.fd
.GetFileSize());
1277 Status
DB::Open(const Options
& options
, const std::string
& dbname
, DB
** dbptr
) {
1278 DBOptions
db_options(options
);
1279 ColumnFamilyOptions
cf_options(options
);
1280 std::vector
<ColumnFamilyDescriptor
> column_families
;
1281 column_families
.push_back(
1282 ColumnFamilyDescriptor(kDefaultColumnFamilyName
, cf_options
));
1283 if (db_options
.persist_stats_to_disk
) {
1284 column_families
.push_back(
1285 ColumnFamilyDescriptor(kPersistentStatsColumnFamilyName
, cf_options
));
1287 std::vector
<ColumnFamilyHandle
*> handles
;
1288 Status s
= DB::Open(db_options
, dbname
, column_families
, &handles
, dbptr
);
1290 if (db_options
.persist_stats_to_disk
) {
1291 assert(handles
.size() == 2);
1293 assert(handles
.size() == 1);
1295 // i can delete the handle since DBImpl is always holding a reference to
1296 // default column family
1297 if (db_options
.persist_stats_to_disk
&& handles
[1] != nullptr) {
1305 Status
DB::Open(const DBOptions
& db_options
, const std::string
& dbname
,
1306 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
1307 std::vector
<ColumnFamilyHandle
*>* handles
, DB
** dbptr
) {
1308 const bool kSeqPerBatch
= true;
1309 const bool kBatchPerTxn
= true;
1310 return DBImpl::Open(db_options
, dbname
, column_families
, handles
, dbptr
,
1311 !kSeqPerBatch
, kBatchPerTxn
);
1314 Status
DBImpl::CreateWAL(uint64_t log_file_num
, uint64_t recycle_log_number
,
1315 size_t preallocate_block_size
, log::Writer
** new_log
) {
1317 std::unique_ptr
<FSWritableFile
> lfile
;
1319 DBOptions db_options
=
1320 BuildDBOptions(immutable_db_options_
, mutable_db_options_
);
1321 FileOptions opt_file_options
=
1322 fs_
->OptimizeForLogWrite(file_options_
, db_options
);
1323 std::string log_fname
=
1324 LogFileName(immutable_db_options_
.wal_dir
, log_file_num
);
1326 if (recycle_log_number
) {
1327 ROCKS_LOG_INFO(immutable_db_options_
.info_log
,
1328 "reusing log %" PRIu64
" from recycle list\n",
1329 recycle_log_number
);
1330 std::string old_log_fname
=
1331 LogFileName(immutable_db_options_
.wal_dir
, recycle_log_number
);
1332 TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
1333 TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
1334 s
= fs_
->ReuseWritableFile(log_fname
, old_log_fname
, opt_file_options
,
1335 &lfile
, /*dbg=*/nullptr);
1337 s
= NewWritableFile(fs_
.get(), log_fname
, &lfile
, opt_file_options
);
1341 lfile
->SetWriteLifeTimeHint(CalculateWALWriteHint());
1342 lfile
->SetPreallocationBlockSize(preallocate_block_size
);
1344 const auto& listeners
= immutable_db_options_
.listeners
;
1345 std::unique_ptr
<WritableFileWriter
> file_writer(
1346 new WritableFileWriter(std::move(lfile
), log_fname
, opt_file_options
,
1347 env_
, nullptr /* stats */, listeners
));
1348 *new_log
= new log::Writer(std::move(file_writer
), log_file_num
,
1349 immutable_db_options_
.recycle_log_file_num
> 0,
1350 immutable_db_options_
.manual_wal_flush
);
1355 Status
DBImpl::Open(const DBOptions
& db_options
, const std::string
& dbname
,
1356 const std::vector
<ColumnFamilyDescriptor
>& column_families
,
1357 std::vector
<ColumnFamilyHandle
*>* handles
, DB
** dbptr
,
1358 const bool seq_per_batch
, const bool batch_per_txn
) {
1359 Status s
= SanitizeOptionsByTable(db_options
, column_families
);
1364 s
= ValidateOptions(db_options
, column_families
);
1372 size_t max_write_buffer_size
= 0;
1373 for (auto cf
: column_families
) {
1374 max_write_buffer_size
=
1375 std::max(max_write_buffer_size
, cf
.options
.write_buffer_size
);
1378 DBImpl
* impl
= new DBImpl(db_options
, dbname
, seq_per_batch
, batch_per_txn
);
1379 s
= impl
->env_
->CreateDirIfMissing(impl
->immutable_db_options_
.wal_dir
);
1381 std::vector
<std::string
> paths
;
1382 for (auto& db_path
: impl
->immutable_db_options_
.db_paths
) {
1383 paths
.emplace_back(db_path
.path
);
1385 for (auto& cf
: column_families
) {
1386 for (auto& cf_path
: cf
.options
.cf_paths
) {
1387 paths
.emplace_back(cf_path
.path
);
1390 for (auto& path
: paths
) {
1391 s
= impl
->env_
->CreateDirIfMissing(path
);
1397 // For recovery from NoSpace() error, we can only handle
1398 // the case where the database is stored in a single path
1399 if (paths
.size() <= 1) {
1400 impl
->error_handler_
.EnableAutoRecovery();
1409 s
= impl
->CreateArchivalDirectory();
1415 impl
->wal_in_db_path_
= IsWalDirSameAsDBPath(&impl
->immutable_db_options_
);
1417 impl
->mutex_
.Lock();
1418 // Handles create_if_missing, error_if_exists
1419 uint64_t recovered_seq(kMaxSequenceNumber
);
1420 s
= impl
->Recover(column_families
, false, false, false, &recovered_seq
);
1422 uint64_t new_log_number
= impl
->versions_
->NewFileNumber();
1423 log::Writer
* new_log
= nullptr;
1424 const size_t preallocate_block_size
=
1425 impl
->GetWalPreallocateBlockSize(max_write_buffer_size
);
1426 s
= impl
->CreateWAL(new_log_number
, 0 /*recycle_log_number*/,
1427 preallocate_block_size
, &new_log
);
1429 InstrumentedMutexLock
wl(&impl
->log_write_mutex_
);
1430 impl
->logfile_number_
= new_log_number
;
1431 assert(new_log
!= nullptr);
1432 impl
->logs_
.emplace_back(new_log_number
, new_log
);
1436 // set column family handles
1437 for (auto cf
: column_families
) {
1439 impl
->versions_
->GetColumnFamilySet()->GetColumnFamily(cf
.name
);
1440 if (cfd
!= nullptr) {
1442 new ColumnFamilyHandleImpl(cfd
, impl
, &impl
->mutex_
));
1443 impl
->NewThreadStatusCfInfo(cfd
);
1445 if (db_options
.create_missing_column_families
) {
1446 // missing column family, create it
1447 ColumnFamilyHandle
* handle
;
1448 impl
->mutex_
.Unlock();
1449 s
= impl
->CreateColumnFamily(cf
.options
, cf
.name
, &handle
);
1450 impl
->mutex_
.Lock();
1452 handles
->push_back(handle
);
1457 s
= Status::InvalidArgument("Column family not found: ", cf
.name
);
1464 SuperVersionContext
sv_context(/* create_superversion */ true);
1465 for (auto cfd
: *impl
->versions_
->GetColumnFamilySet()) {
1466 impl
->InstallSuperVersionAndScheduleWork(
1467 cfd
, &sv_context
, *cfd
->GetLatestMutableCFOptions());
1470 if (impl
->two_write_queues_
) {
1471 impl
->log_write_mutex_
.Lock();
1473 impl
->alive_log_files_
.push_back(
1474 DBImpl::LogFileNumberSize(impl
->logfile_number_
));
1475 if (impl
->two_write_queues_
) {
1476 impl
->log_write_mutex_
.Unlock();
1479 impl
->DeleteObsoleteFiles();
1480 s
= impl
->directories_
.GetDbDir()->Fsync();
1483 // In WritePrepared there could be gap in sequence numbers. This breaks
1484 // the trick we use in kPointInTimeRecovery which assumes the first seq in
1485 // the log right after the corrupted log is one larger than the last seq
1486 // we read from the logs. To let this trick keep working, we add a dummy
1487 // entry with the expected sequence to the first log right after recovery.
1488 // In non-WritePrepared case also the new log after recovery could be
1489 // empty, and thus missing the consecutive seq hint to distinguish
1490 // middle-log corruption to corrupted-log-remained-after-recovery. This
1491 // case also will be addressed by a dummy write.
1492 if (recovered_seq
!= kMaxSequenceNumber
) {
1493 WriteBatch empty_batch
;
1494 WriteBatchInternal::SetSequence(&empty_batch
, recovered_seq
);
1495 WriteOptions write_options
;
1496 uint64_t log_used
, log_size
;
1497 log::Writer
* log_writer
= impl
->logs_
.back().writer
;
1498 s
= impl
->WriteToWAL(empty_batch
, log_writer
, &log_used
, &log_size
);
1500 // Need to fsync, otherwise it might get lost after a power reset.
1501 s
= impl
->FlushWAL(false);
1503 s
= log_writer
->file()->Sync(impl
->immutable_db_options_
.use_fsync
);
1509 if (s
.ok() && impl
->immutable_db_options_
.persist_stats_to_disk
) {
1510 // try to read format version but no need to fail Open() even if it fails
1511 s
= impl
->PersistentStatsProcessFormatVersion();
1515 for (auto cfd
: *impl
->versions_
->GetColumnFamilySet()) {
1516 if (cfd
->ioptions()->compaction_style
== kCompactionStyleFIFO
) {
1517 auto* vstorage
= cfd
->current()->storage_info();
1518 for (int i
= 1; i
< vstorage
->num_levels(); ++i
) {
1519 int num_files
= vstorage
->NumLevelFiles(i
);
1520 if (num_files
> 0) {
1521 s
= Status::InvalidArgument(
1522 "Not all files are at level 0. Cannot "
1523 "open with FIFO compaction style.");
1528 if (!cfd
->mem()->IsSnapshotSupported()) {
1529 impl
->is_snapshot_supported_
= false;
1531 if (cfd
->ioptions()->merge_operator
!= nullptr &&
1532 !cfd
->mem()->IsMergeOperatorSupported()) {
1533 s
= Status::InvalidArgument(
1534 "The memtable of column family %s does not support merge operator "
1535 "its options.merge_operator is non-null",
1536 cfd
->GetName().c_str());
1543 TEST_SYNC_POINT("DBImpl::Open:Opened");
1544 Status persist_options_status
;
1546 // Persist RocksDB Options before scheduling the compaction.
1547 // The WriteOptionsFile() will release and lock the mutex internally.
1548 persist_options_status
= impl
->WriteOptionsFile(
1549 false /*need_mutex_lock*/, false /*need_enter_write_thread*/);
1552 impl
->opened_successfully_
= true;
1553 impl
->MaybeScheduleFlushOrCompaction();
1555 impl
->mutex_
.Unlock();
1557 #ifndef ROCKSDB_LITE
1558 auto sfm
= static_cast<SstFileManagerImpl
*>(
1559 impl
->immutable_db_options_
.sst_file_manager
.get());
1560 if (s
.ok() && sfm
) {
1561 // Notify SstFileManager about all sst files that already exist in
1562 // db_paths[0] and cf_paths[0] when the DB is opened.
1564 // SstFileManagerImpl needs to know sizes of the files. For files whose size
1565 // we already know (sst files that appear in manifest - typically that's the
1566 // vast majority of all files), we'll pass the size to SstFileManager.
1567 // For all other files SstFileManager will query the size from filesystem.
1569 std::vector
<LiveFileMetaData
> metadata
;
1571 impl
->mutex_
.Lock();
1572 impl
->versions_
->GetLiveFilesMetaData(&metadata
);
1573 impl
->mutex_
.Unlock();
1575 std::unordered_map
<std::string
, uint64_t> known_file_sizes
;
1576 for (const auto& md
: metadata
) {
1577 std::string name
= md
.name
;
1578 if (!name
.empty() && name
[0] == '/') {
1579 name
= name
.substr(1);
1581 known_file_sizes
[name
] = md
.size
;
1584 std::vector
<std::string
> paths
;
1585 paths
.emplace_back(impl
->immutable_db_options_
.db_paths
[0].path
);
1586 for (auto& cf
: column_families
) {
1587 if (!cf
.options
.cf_paths
.empty()) {
1588 paths
.emplace_back(cf
.options
.cf_paths
[0].path
);
1591 // Remove duplicate paths.
1592 std::sort(paths
.begin(), paths
.end());
1593 paths
.erase(std::unique(paths
.begin(), paths
.end()), paths
.end());
1594 for (auto& path
: paths
) {
1595 std::vector
<std::string
> existing_files
;
1596 impl
->immutable_db_options_
.env
->GetChildren(path
, &existing_files
);
1597 for (auto& file_name
: existing_files
) {
1598 uint64_t file_number
;
1600 std::string file_path
= path
+ "/" + file_name
;
1601 if (ParseFileName(file_name
, &file_number
, &file_type
) &&
1602 file_type
== kTableFile
) {
1603 if (known_file_sizes
.count(file_name
)) {
1604 // We're assuming that each sst file name exists in at most one of
1606 sfm
->OnAddFile(file_path
, known_file_sizes
.at(file_name
),
1607 /* compaction */ false);
1609 sfm
->OnAddFile(file_path
);
1615 // Reserve some disk buffer space. This is a heuristic - when we run out
1616 // of disk space, this ensures that there is atleast write_buffer_size
1617 // amount of free space before we resume DB writes. In low disk space
1618 // conditions, we want to avoid a lot of small L0 files due to frequent
1619 // WAL write failures and resultant forced flushes
1620 sfm
->ReserveDiskBuffer(max_write_buffer_size
,
1621 impl
->immutable_db_options_
.db_paths
[0].path
);
1623 #endif // !ROCKSDB_LITE
1626 ROCKS_LOG_HEADER(impl
->immutable_db_options_
.info_log
, "DB pointer %p",
1628 LogFlush(impl
->immutable_db_options_
.info_log
);
1629 assert(impl
->TEST_WALBufferIsEmpty());
1630 // If the assert above fails then we need to FlushWAL before returning
1631 // control back to the user.
1632 if (!persist_options_status
.ok()) {
1633 s
= Status::IOError(
1634 "DB::Open() failed --- Unable to persist Options file",
1635 persist_options_status
.ToString());
1639 impl
->StartTimedTasks();
1642 for (auto* h
: *handles
) {
1651 } // namespace ROCKSDB_NAMESPACE