#include "db/builder.h"
#include "db/db_impl/db_impl.h"
#include "db/error_handler.h"
-#include "db/periodic_work_scheduler.h"
+#include "db/periodic_task_scheduler.h"
#include "env/composite_env_wrapper.h"
+#include "file/filename.h"
#include "file/read_write_util.h"
#include "file/sst_file_manager_impl.h"
#include "file/writable_file_writer.h"
+#include "logging/logging.h"
#include "monitoring/persistent_stats_history.h"
#include "options/options_helper.h"
#include "rocksdb/table.h"
#include "util/rate_limiter.h"
namespace ROCKSDB_NAMESPACE {
-Options SanitizeOptions(const std::string& dbname, const Options& src) {
- auto db_options = SanitizeOptions(dbname, DBOptions(src));
+Options SanitizeOptions(const std::string& dbname, const Options& src,
+ bool read_only, Status* logger_creation_s) {
+ auto db_options =
+ SanitizeOptions(dbname, DBOptions(src), read_only, logger_creation_s);
ImmutableDBOptions immutable_db_options(db_options);
auto cf_options =
SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
return Options(db_options, cf_options);
}
-DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
+DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
+ bool read_only, Status* logger_creation_s) {
DBOptions result(src);
if (result.env == nullptr) {
&result.max_open_files);
}
- if (result.info_log == nullptr) {
+ if (result.info_log == nullptr && !read_only) {
Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
if (!s.ok()) {
// No place suitable for logging
result.info_log = nullptr;
+ if (logger_creation_s) {
+ *logger_creation_s = s;
+ }
}
}
result.recycle_log_file_num = 0;
}
- if (result.wal_dir.empty()) {
+ if (result.db_paths.size() == 0) {
+ result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
+ } else if (result.wal_dir.empty()) {
// Use dbname as default
result.wal_dir = dbname;
}
- if (result.wal_dir.back() == '/') {
- result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
+ if (!result.wal_dir.empty()) {
+ // If there is a wal_dir already set, check to see if the wal_dir is the
+ // same as the dbname AND the same as the db_path[0] (which must exist from
+ // a few lines ago). If the wal_dir matches both of these values, then clear
+ // the wal_dir value, which will make wal_dir == dbname. Most likely this
+ // condition was the result of reading an old options file where we forced
+ // wal_dir to be set (to dbname).
+ auto npath = NormalizePath(dbname + "/");
+ if (npath == NormalizePath(result.wal_dir + "/") &&
+ npath == NormalizePath(result.db_paths[0].path + "/")) {
+ result.wal_dir.clear();
+ }
}
- if (result.db_paths.size() == 0) {
- result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
+ if (!result.wal_dir.empty() && result.wal_dir.back() == '/') {
+ result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
}
if (result.use_direct_reads && result.compaction_readahead_size == 0) {
result.compaction_readahead_size = 1024 * 1024 * 2;
}
- if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
- result.new_table_reader_for_compaction_inputs = true;
- }
-
// Force flush on DB open if 2PC is enabled, since with 2PC we have no
// guarantee that consecutive log files have consecutive sequence id, which
// make recovery complicated.
#ifndef ROCKSDB_LITE
ImmutableDBOptions immutable_db_options(result);
- if (!IsWalDirSameAsDBPath(&immutable_db_options)) {
+ if (!immutable_db_options.IsWalDirSameAsDBPath()) {
// Either the WAL dir and db_paths[0]/db_name are not the same, or we
// cannot tell for sure. In either case, assume they're different and
// explicitly cleanup the trash log files (bypass DeleteScheduler)
// DeleteScheduler::CleanupDirectory on the same dir later, it will be
// safe
std::vector<std::string> filenames;
- result.env->GetChildren(result.wal_dir, &filenames).PermitUncheckedError();
+ IOOptions io_opts;
+ io_opts.do_not_recurse = true;
+ auto wal_dir = immutable_db_options.GetWalDir();
+ Status s = immutable_db_options.fs->GetChildren(
+ wal_dir, io_opts, &filenames, /*IODebugContext*=*/nullptr);
+ s.PermitUncheckedError(); //**TODO: What to do on error?
for (std::string& filename : filenames) {
if (filename.find(".log.trash", filename.length() -
std::string(".log.trash").length()) !=
std::string::npos) {
- std::string trash_file = result.wal_dir + "/" + filename;
+ std::string trash_file = wal_dir + "/" + filename;
result.env->DeleteFile(trash_file).PermitUncheckedError();
}
}
// was not used)
auto sfm = static_cast<SstFileManagerImpl*>(result.sst_file_manager.get());
for (size_t i = 0; i < result.db_paths.size(); i++) {
- DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path);
+ DeleteScheduler::CleanupDirectory(result.env, sfm, result.db_paths[i].path)
+ .PermitUncheckedError();
}
// Create a default SstFileManager for purposes of tracking compaction size
NewSstFileManager(result.env, result.info_log));
result.sst_file_manager = sst_file_manager;
}
-#endif
+#endif // !ROCKSDB_LITE
+
+ // Supported wal compression types
+ if (!StreamingCompressionTypeSupported(result.wal_compression)) {
+ result.wal_compression = kNoCompression;
+ ROCKS_LOG_WARN(result.info_log,
+ "wal_compression is disabled since only zstd is supported");
+ }
if (!result.paranoid_checks) {
result.skip_checking_sst_file_sizes_on_db_open = true;
if (db_options.unordered_write &&
!db_options.allow_concurrent_memtable_write) {
return Status::InvalidArgument(
- "unordered_write is incompatible with !allow_concurrent_memtable_write");
+ "unordered_write is incompatible with "
+ "!allow_concurrent_memtable_write");
}
if (db_options.unordered_write && db_options.enable_pipelined_write) {
"atomic_flush is currently incompatible with best-efforts recovery");
}
+ if (db_options.use_direct_io_for_flush_and_compaction &&
+ 0 == db_options.writable_file_max_buffer_size) {
+ return Status::InvalidArgument(
+ "writes in direct IO require writable_file_max_buffer_size > 0");
+ }
+
return Status::OK();
}
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
const std::string manifest = DescriptorFileName(dbname_, 1);
{
+ if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
+ fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
+ }
std::unique_ptr<FSWritableFile> file;
FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
s = NewWritableFile(fs_.get(), manifest, &file, file_options);
if (!s.ok()) {
return s;
}
+ FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
file->SetPreallocationBlockSize(
immutable_db_options_.manifest_preallocation_size);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
- std::move(file), manifest, file_options, env_, io_tracer_,
- nullptr /* stats */, immutable_db_options_.listeners));
+ std::move(file), manifest, file_options, immutable_db_options_.clock,
+ io_tracer_, nullptr /* stats */, immutable_db_options_.listeners,
+ nullptr, tmp_set.Contains(FileType::kDescriptorFile),
+ tmp_set.Contains(FileType::kDescriptorFile)));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
if (s.ok()) {
- s = SyncManifest(env_, &immutable_db_options_, log.file());
+ s = SyncManifest(&immutable_db_options_, log.file());
}
}
if (s.ok()) {
manifest.substr(manifest.find_last_of("/\\") + 1));
}
} else {
- fs_->DeleteFile(manifest, IOOptions(), nullptr);
+ fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
}
return s;
}
Status DBImpl::Recover(
const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
- uint64_t* recovered_seq) {
+ uint64_t* recovered_seq, RecoveryContext* recovery_ctx) {
mutex_.AssertHeld();
bool is_new_db = false;
s = env_->FileExists(current_fname);
} else {
s = Status::NotFound();
- Status io_s = env_->GetChildren(dbname_, &files_in_dbname);
+ IOOptions io_opts;
+ io_opts.do_not_recurse = true;
+ Status io_s = immutable_db_options_.fs->GetChildren(
+ dbname_, io_opts, &files_in_dbname, /*IODebugContext*=*/nullptr);
if (!io_s.ok()) {
s = io_s;
files_in_dbname.clear();
uint64_t number = 0;
FileType type = kWalFile; // initialize
if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
- // Found MANIFEST (descriptor log), thus best-efforts recovery does
- // not have to treat the db as empty.
- s = Status::OK();
- manifest_path = dbname_ + "/" + file;
- break;
+ uint64_t bytes;
+ s = env_->GetFileSize(DescriptorFileName(dbname_, number), &bytes);
+ if (s.ok() && bytes != 0) {
+ // Found non-empty MANIFEST (descriptor log), thus best-efforts
+ // recovery does not have to treat the db as empty.
+ manifest_path = dbname_ + "/" + file;
+ break;
+ }
}
}
}
}
} else if (immutable_db_options_.best_efforts_recovery) {
assert(files_in_dbname.empty());
- Status s = env_->GetChildren(dbname_, &files_in_dbname);
+ IOOptions io_opts;
+ io_opts.do_not_recurse = true;
+ Status s = immutable_db_options_.fs->GetChildren(
+ dbname_, io_opts, &files_in_dbname, /*IODebugContext*=*/nullptr);
if (s.IsNotFound()) {
return Status::InvalidArgument(dbname_,
"does not exist (open for read only)");
if (!s.ok()) {
return s;
}
- s = SetDBId();
+ s = SetupDBId(read_only, recovery_ctx);
+ ROCKS_LOG_INFO(immutable_db_options_.info_log, "DB ID: %s\n", db_id_.c_str());
if (s.ok() && !read_only) {
- s = DeleteUnreferencedSstFiles();
+ s = DeleteUnreferencedSstFiles(recovery_ctx);
}
if (immutable_db_options_.paranoid_checks && s.ok()) {
s = CheckConsistency();
}
if (s.ok() && !read_only) {
+ // TODO: share file descriptors (FSDirectory) with SetDirectories above
std::map<std::string, std::shared_ptr<FSDirectory>> created_dirs;
for (auto cfd : *versions_->GetColumnFamilySet()) {
s = cfd->AddDirectories(&created_dirs);
}
}
}
- // DB mutex is already held
- if (s.ok() && immutable_db_options_.persist_stats_to_disk) {
- s = InitPersistStatsColumnFamily();
- }
std::vector<std::string> files_in_wal_dir;
if (s.ok()) {
default_cf_handle_ = new ColumnFamilyHandleImpl(
versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
- // TODO(Zhongyi): handle single_column_family_mode_ when
- // persistent_stats is enabled
- single_column_family_mode_ =
- versions_->GetColumnFamilySet()->NumberOfColumnFamilies() == 1;
// Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// Note that prev_log_number() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of rocksdb.
+ auto wal_dir = immutable_db_options_.GetWalDir();
if (!immutable_db_options_.best_efforts_recovery) {
- s = env_->GetChildren(immutable_db_options_.wal_dir, &files_in_wal_dir);
+ IOOptions io_opts;
+ io_opts.do_not_recurse = true;
+ s = immutable_db_options_.fs->GetChildren(
+ wal_dir, io_opts, &files_in_wal_dir, /*IODebugContext*=*/nullptr);
}
if (s.IsNotFound()) {
- return Status::InvalidArgument("wal_dir not found",
- immutable_db_options_.wal_dir);
+ return Status::InvalidArgument("wal_dir not found", wal_dir);
} else if (!s.ok()) {
return s;
}
"existing log file: ",
file);
} else {
- wal_files[number] =
- LogFileName(immutable_db_options_.wal_dir, number);
+ wal_files[number] = LogFileName(wal_dir, number);
}
}
}
WalNumber max_wal_number =
versions_->GetWalSet().GetWals().rbegin()->first;
edit.DeleteWalsBefore(max_wal_number + 1);
- s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_);
+ assert(recovery_ctx != nullptr);
+ assert(versions_->GetColumnFamilySet() != nullptr);
+ recovery_ctx->UpdateVersionEdits(
+ versions_->GetColumnFamilySet()->GetDefault(), edit);
}
if (!s.ok()) {
return s;
std::sort(wals.begin(), wals.end());
bool corrupted_wal_found = false;
- s = RecoverLogFiles(wals, &next_sequence, read_only,
- &corrupted_wal_found);
+ s = RecoverLogFiles(wals, &next_sequence, read_only, &corrupted_wal_found,
+ recovery_ctx);
if (corrupted_wal_found && recovered_seq != nullptr) {
*recovered_seq = next_sequence;
}
if (s.ok()) {
const std::string normalized_dbname = NormalizePath(dbname_);
const std::string normalized_wal_dir =
- NormalizePath(immutable_db_options_.wal_dir);
+ NormalizePath(immutable_db_options_.GetWalDir());
if (immutable_db_options_.best_efforts_recovery) {
filenames = std::move(files_in_dbname);
} else if (normalized_dbname == normalized_wal_dir) {
filenames = std::move(files_in_wal_dir);
} else {
- s = env_->GetChildren(GetName(), &filenames);
+ IOOptions io_opts;
+ io_opts.do_not_recurse = true;
+ s = immutable_db_options_.fs->GetChildren(
+ GetName(), io_opts, &filenames, /*IODebugContext*=*/nullptr);
}
}
if (s.ok()) {
}
}
versions_->options_file_number_ = options_file_number;
+ uint64_t options_file_size = 0;
+ if (options_file_number > 0) {
+ s = env_->GetFileSize(OptionsFileName(GetName(), options_file_number),
+ &options_file_size);
+ }
+ versions_->options_file_size_ = options_file_size;
}
}
return s;
WriteBatch batch;
if (s.ok()) {
s = batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
- ToString(kStatsCFCurrentFormatVersion));
+ std::to_string(kStatsCFCurrentFormatVersion));
}
if (s.ok()) {
s = batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
- ToString(kStatsCFCompatibleFormatVersion));
+ std::to_string(kStatsCFCompatibleFormatVersion));
}
if (s.ok()) {
WriteOptions wo;
return s;
}
+Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) {
+ mutex_.AssertHeld();
+ assert(versions_->descriptor_log_ == nullptr);
+ Status s = versions_->LogAndApply(
+ recovery_ctx.cfds_, recovery_ctx.mutable_cf_opts_,
+ recovery_ctx.edit_lists_, &mutex_, directories_.GetDbDir());
+ if (s.ok() && !(recovery_ctx.files_to_delete_.empty())) {
+ mutex_.Unlock();
+ for (const auto& fname : recovery_ctx.files_to_delete_) {
+ s = env_->DeleteFile(fname);
+ if (!s.ok()) {
+ break;
+ }
+ }
+ mutex_.Lock();
+ }
+ return s;
+}
+
+void DBImpl::InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap() {
+#ifndef ROCKSDB_LITE
+ if (immutable_db_options_.wal_filter == nullptr) {
+ return;
+ }
+ assert(immutable_db_options_.wal_filter != nullptr);
+ WalFilter& wal_filter = *(immutable_db_options_.wal_filter);
+
+ std::map<std::string, uint32_t> cf_name_id_map;
+ std::map<uint32_t, uint64_t> cf_lognumber_map;
+ assert(versions_);
+ assert(versions_->GetColumnFamilySet());
+ for (auto cfd : *versions_->GetColumnFamilySet()) {
+ assert(cfd);
+ cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
+ cf_lognumber_map.insert(std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
+ }
+
+ wal_filter.ColumnFamilyLogNumberMap(cf_lognumber_map, cf_name_id_map);
+#endif // !ROCKSDB_LITE
+}
+
+bool DBImpl::InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
+ const std::string& wal_fname,
+ log::Reader::Reporter& reporter,
+ Status& status,
+ bool& stop_replay,
+ WriteBatch& batch) {
+#ifndef ROCKSDB_LITE
+ if (immutable_db_options_.wal_filter == nullptr) {
+ return true;
+ }
+ assert(immutable_db_options_.wal_filter != nullptr);
+ WalFilter& wal_filter = *(immutable_db_options_.wal_filter);
+
+ WriteBatch new_batch;
+ bool batch_changed = false;
+
+ bool process_current_record = true;
+
+ WalFilter::WalProcessingOption wal_processing_option =
+ wal_filter.LogRecordFound(wal_number, wal_fname, batch, &new_batch,
+ &batch_changed);
+
+ switch (wal_processing_option) {
+ case WalFilter::WalProcessingOption::kContinueProcessing:
+ // do nothing, proceeed normally
+ break;
+ case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
+ // skip current record
+ process_current_record = false;
+ break;
+ case WalFilter::WalProcessingOption::kStopReplay:
+ // skip current record and stop replay
+ process_current_record = false;
+ stop_replay = true;
+ break;
+ case WalFilter::WalProcessingOption::kCorruptedRecord: {
+ status = Status::Corruption("Corruption reported by Wal Filter ",
+ wal_filter.Name());
+ MaybeIgnoreError(&status);
+ if (!status.ok()) {
+ process_current_record = false;
+ reporter.Corruption(batch.GetDataSize(), status);
+ }
+ break;
+ }
+ default: {
+ // logical error which should not happen. If RocksDB throws, we would
+ // just do `throw std::logic_error`.
+ assert(false);
+ status = Status::NotSupported(
+ "Unknown WalProcessingOption returned by Wal Filter ",
+ wal_filter.Name());
+ MaybeIgnoreError(&status);
+ if (!status.ok()) {
+ // Ignore the error with current record processing.
+ stop_replay = true;
+ }
+ break;
+ }
+ }
+
+ if (!process_current_record) {
+ return false;
+ }
+
+ if (batch_changed) {
+ // Make sure that the count in the new batch is
+ // within the orignal count.
+ int new_count = WriteBatchInternal::Count(&new_batch);
+ int original_count = WriteBatchInternal::Count(&batch);
+ if (new_count > original_count) {
+ ROCKS_LOG_FATAL(
+ immutable_db_options_.info_log,
+ "Recovering log #%" PRIu64
+ " mode %d log filter %s returned "
+ "more records (%d) than original (%d) which is not allowed. "
+ "Aborting recovery.",
+ wal_number, static_cast<int>(immutable_db_options_.wal_recovery_mode),
+ wal_filter.Name(), new_count, original_count);
+ status = Status::NotSupported(
+ "More than original # of records "
+ "returned by Wal Filter ",
+ wal_filter.Name());
+ return false;
+ }
+ // Set the same sequence number in the new_batch
+ // as the original batch.
+ WriteBatchInternal::SetSequence(&new_batch,
+ WriteBatchInternal::Sequence(&batch));
+ batch = new_batch;
+ }
+ return true;
+#else // !ROCKSDB_LITE
+ (void)wal_number;
+ (void)wal_fname;
+ (void)reporter;
+ (void)status;
+ (void)stop_replay;
+ (void)batch;
+ return true;
+#endif // ROCKSDB_LITE
+}
+
// REQUIRES: wal_numbers are sorted in ascending order
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
SequenceNumber* next_sequence, bool read_only,
- bool* corrupted_wal_found) {
+ bool* corrupted_wal_found,
+ RecoveryContext* recovery_ctx) {
struct LogReporter : public log::Reader::Reporter {
Env* env;
Logger* info_log;
stream.EndArray();
}
-#ifndef ROCKSDB_LITE
- if (immutable_db_options_.wal_filter != nullptr) {
- std::map<std::string, uint32_t> cf_name_id_map;
- std::map<uint32_t, uint64_t> cf_lognumber_map;
- for (auto cfd : *versions_->GetColumnFamilySet()) {
- cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
- cf_lognumber_map.insert(
- std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
- }
-
- immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
- cf_name_id_map);
- }
-#endif
+ // No-op for immutable_db_options_.wal_filter == nullptr.
+ InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap();
bool stop_replay_by_wal_filter = false;
bool stop_replay_for_corruption = false;
bool flushed = false;
uint64_t corrupted_wal_number = kMaxSequenceNumber;
uint64_t min_wal_number = MinLogNumberToKeep();
+ if (!allow_2pc()) {
+ // In non-2pc mode, we skip WALs that do not back unflushed data.
+ min_wal_number =
+ std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData());
+ }
for (auto wal_number : wal_numbers) {
if (wal_number < min_wal_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(wal_number);
// Open the log file
- std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number);
+ std::string fname =
+ LogFileName(immutable_db_options_.GetWalDir(), wal_number);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d", wal_number,
std::unique_ptr<SequentialFileReader> file_reader;
{
std::unique_ptr<FSSequentialFile> file;
- status = fs_->NewSequentialFile(fname,
- fs_->OptimizeForLogRead(file_options_),
- &file, nullptr);
+ status = fs_->NewSequentialFile(
+ fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr);
if (!status.ok()) {
MaybeIgnoreError(&status);
if (!status.ok()) {
// Read all the records and add to a memtable
std::string scratch;
Slice record;
- WriteBatch batch;
TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
/*arg=*/nullptr);
+ uint64_t record_checksum;
while (!stop_replay_by_wal_filter &&
reader.ReadRecord(&record, &scratch,
- immutable_db_options_.wal_recovery_mode) &&
+ immutable_db_options_.wal_recovery_mode,
+ &record_checksum) &&
status.ok()) {
if (record.size() < WriteBatchInternal::kHeader) {
reporter.Corruption(record.size(),
continue;
}
+ // We create a new batch and initialize with a valid prot_info_ to store
+ // the data checksums
+ WriteBatch batch;
+
status = WriteBatchInternal::SetContents(&batch, record);
if (!status.ok()) {
return status;
}
+ TEST_SYNC_POINT_CALLBACK(
+ "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch", &batch);
+ TEST_SYNC_POINT_CALLBACK(
+ "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum",
+ &record_checksum);
+ status = WriteBatchInternal::UpdateProtectionInfo(
+ &batch, 8 /* bytes_per_key */, &record_checksum);
+ if (!status.ok()) {
+ return status;
+ }
+
SequenceNumber sequence = WriteBatchInternal::Sequence(&batch);
if (immutable_db_options_.wal_recovery_mode ==
}
}
-#ifndef ROCKSDB_LITE
- if (immutable_db_options_.wal_filter != nullptr) {
- WriteBatch new_batch;
- bool batch_changed = false;
-
- WalFilter::WalProcessingOption wal_processing_option =
- immutable_db_options_.wal_filter->LogRecordFound(
- wal_number, fname, batch, &new_batch, &batch_changed);
-
- switch (wal_processing_option) {
- case WalFilter::WalProcessingOption::kContinueProcessing:
- // do nothing, proceeed normally
- break;
- case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
- // skip current record
- continue;
- case WalFilter::WalProcessingOption::kStopReplay:
- // skip current record and stop replay
- stop_replay_by_wal_filter = true;
- continue;
- case WalFilter::WalProcessingOption::kCorruptedRecord: {
- status =
- Status::Corruption("Corruption reported by Wal Filter ",
- immutable_db_options_.wal_filter->Name());
- MaybeIgnoreError(&status);
- if (!status.ok()) {
- reporter.Corruption(record.size(), status);
- continue;
- }
- break;
- }
- default: {
- assert(false); // unhandled case
- status = Status::NotSupported(
- "Unknown WalProcessingOption returned"
- " by Wal Filter ",
- immutable_db_options_.wal_filter->Name());
- MaybeIgnoreError(&status);
- if (!status.ok()) {
- return status;
- } else {
- // Ignore the error with current record processing.
- continue;
- }
- }
- }
-
- if (batch_changed) {
- // Make sure that the count in the new batch is
- // within the orignal count.
- int new_count = WriteBatchInternal::Count(&new_batch);
- int original_count = WriteBatchInternal::Count(&batch);
- if (new_count > original_count) {
- ROCKS_LOG_FATAL(
- immutable_db_options_.info_log,
- "Recovering log #%" PRIu64
- " mode %d log filter %s returned "
- "more records (%d) than original (%d) which is not allowed. "
- "Aborting recovery.",
- wal_number,
- static_cast<int>(immutable_db_options_.wal_recovery_mode),
- immutable_db_options_.wal_filter->Name(), new_count,
- original_count);
- status = Status::NotSupported(
- "More than original # of records "
- "returned by Wal Filter ",
- immutable_db_options_.wal_filter->Name());
- return status;
- }
- // Set the same sequence number in the new_batch
- // as the original batch.
- WriteBatchInternal::SetSequence(&new_batch,
- WriteBatchInternal::Sequence(&batch));
- batch = new_batch;
- }
+ // For the default case of wal_filter == nullptr, always performs no-op
+ // and returns true.
+ if (!InvokeWalFilterIfNeededOnWalRecord(wal_number, fname, reporter,
+ status, stop_replay_by_wal_filter,
+ batch)) {
+ continue;
}
-#endif // ROCKSDB_LITE
// If column family was not found, it might mean that the WAL write
// batch references to the column family that was dropped after the
immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords)) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
- if (cfd->GetLogNumber() > corrupted_wal_number) {
+ // One special case cause cfd->GetLogNumber() > corrupted_wal_number but
+ // the CF is still consistent: If a new column family is created during
+ // the flush and the WAL sync fails at the same time, the new CF points to
+ // the new WAL but the old WAL is curropted. Since the new CF is empty, it
+ // is still consistent. We add the check of CF sst file size to avoid the
+ // false positive alert.
+
+ // Note that, the check of (cfd->GetLiveSstFilesSize() > 0) may leads to
+ // the ignorance of a very rare inconsistency case caused in data
+ // canclation. One CF is empty due to KV deletion. But those operations
+ // are in the WAL. If the WAL is corrupted, the status of this CF might
+ // not be consistent with others. However, the consistency check will be
+ // bypassed due to empty CF.
+ // TODO: a better and complete implementation is needed to ensure strict
+ // consistency check in WAL recovery including hanlding the tailing
+ // issues.
+ if (cfd->GetLogNumber() > corrupted_wal_number &&
+ cfd->GetLiveSstFilesSize() > 0) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Column family inconsistency: SST file contains data"
" beyond the point of corruption.");
- return Status::Corruption("SST file is ahead of WALs");
+ return Status::Corruption("SST file is ahead of WALs in CF " +
+ cfd->GetName());
}
}
}
// VersionSet::next_file_number_ always to be strictly greater than any
// log number
versions_->MarkFileNumberUsed(max_wal_number + 1);
+ assert(recovery_ctx != nullptr);
- autovector<ColumnFamilyData*> cfds;
- autovector<const MutableCFOptions*> cf_opts;
- autovector<autovector<VersionEdit*>> edit_lists;
for (auto* cfd : *versions_->GetColumnFamilySet()) {
- cfds.push_back(cfd);
- cf_opts.push_back(cfd->GetLatestMutableCFOptions());
auto iter = version_edits.find(cfd->GetID());
assert(iter != version_edits.end());
- edit_lists.push_back({&iter->second});
+ recovery_ctx->UpdateVersionEdits(cfd, iter->second);
}
- std::unique_ptr<VersionEdit> wal_deletion;
- if (immutable_db_options_.track_and_verify_wals_in_manifest) {
- wal_deletion.reset(new VersionEdit);
- wal_deletion->DeleteWalsBefore(max_wal_number + 1);
- edit_lists.back().push_back(wal_deletion.get());
+ if (flushed) {
+ VersionEdit wal_deletion;
+ if (immutable_db_options_.track_and_verify_wals_in_manifest) {
+ wal_deletion.DeleteWalsBefore(max_wal_number + 1);
+ }
+ if (!allow_2pc()) {
+ // In non-2pc mode, flushing the memtables of the column families
+ // means we can advance min_log_number_to_keep.
+ wal_deletion.SetMinLogNumberToKeep(max_wal_number + 1);
+ }
+ assert(versions_->GetColumnFamilySet() != nullptr);
+ recovery_ctx->UpdateVersionEdits(
+ versions_->GetColumnFamilySet()->GetDefault(), wal_deletion);
}
-
- // write MANIFEST with update
- status = versions_->LogAndApply(cfds, cf_opts, edit_lists, &mutex_,
- directories_.GetDbDir(),
- /*new_descriptor_log=*/true);
}
}
- if (status.ok() && data_seen && !flushed) {
- status = RestoreAliveLogFiles(wal_numbers);
+ if (status.ok()) {
+ if (data_seen && !flushed) {
+ status = RestoreAliveLogFiles(wal_numbers);
+ } else if (!wal_numbers.empty()) { // If there's no data in the WAL, or we
+ // flushed all the data, still
+ // truncate the log file. If the process goes into a crash loop before
+ // the file is deleted, the preallocated space will never get freed.
+ const bool truncate = !read_only;
+ GetLogSizeAndMaybeTruncate(wal_numbers.back(), truncate, nullptr)
+ .PermitUncheckedError();
+ }
}
event_logger_.Log() << "job" << job_id << "event"
return status;
}
+Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
+ LogFileNumberSize* log_ptr) {
+ LogFileNumberSize log(wal_number);
+ std::string fname =
+ LogFileName(immutable_db_options_.GetWalDir(), wal_number);
+ Status s;
+ // This gets the appear size of the wals, not including preallocated space.
+ s = env_->GetFileSize(fname, &log.size);
+ TEST_SYNC_POINT_CALLBACK("DBImpl::GetLogSizeAndMaybeTruncate:0", /*arg=*/&s);
+ if (s.ok() && truncate) {
+ std::unique_ptr<FSWritableFile> last_log;
+ Status truncate_status = fs_->ReopenWritableFile(
+ fname,
+ fs_->OptimizeForLogWrite(
+ file_options_,
+ BuildDBOptions(immutable_db_options_, mutable_db_options_)),
+ &last_log, nullptr);
+ if (truncate_status.ok()) {
+ truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
+ }
+ if (truncate_status.ok()) {
+ truncate_status = last_log->Close(IOOptions(), nullptr);
+ }
+ // Not a critical error if fail to truncate.
+ if (!truncate_status.ok() && !truncate_status.IsNotSupported()) {
+ ROCKS_LOG_WARN(immutable_db_options_.info_log,
+ "Failed to truncate log #%" PRIu64 ": %s", wal_number,
+ truncate_status.ToString().c_str());
+ }
+ }
+ if (log_ptr) {
+ *log_ptr = log;
+ }
+ return s;
+}
+
Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
if (wal_numbers.empty()) {
return Status::OK();
Status s;
mutex_.AssertHeld();
assert(immutable_db_options_.avoid_flush_during_recovery);
- if (two_write_queues_) {
- log_write_mutex_.Lock();
- }
// Mark these as alive so they'll be considered for deletion later by
// FindObsoleteFiles()
total_log_size_ = 0;
log_empty_ = false;
+ uint64_t min_wal_with_unflushed_data =
+ versions_->MinLogNumberWithUnflushedData();
for (auto wal_number : wal_numbers) {
- LogFileNumberSize log(wal_number);
- std::string fname = LogFileName(immutable_db_options_.wal_dir, wal_number);
- // This gets the appear size of the wals, not including preallocated space.
- s = env_->GetFileSize(fname, &log.size);
- if (!s.ok()) {
- break;
+ if (!allow_2pc() && wal_number < min_wal_with_unflushed_data) {
+ // In non-2pc mode, the WAL files not backing unflushed data are not
+ // alive, thus should not be added to the alive_log_files_.
+ continue;
}
- total_log_size_ += log.size;
- alive_log_files_.push_back(log);
// We preallocate space for wals, but then after a crash and restart, those
// preallocated space are not needed anymore. It is likely only the last
// log has such preallocated space, so we only truncate for the last log.
- if (wal_number == wal_numbers.back()) {
- std::unique_ptr<FSWritableFile> last_log;
- Status truncate_status = fs_->ReopenWritableFile(
- fname,
- fs_->OptimizeForLogWrite(
- file_options_,
- BuildDBOptions(immutable_db_options_, mutable_db_options_)),
- &last_log, nullptr);
- if (truncate_status.ok()) {
- truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
- }
- if (truncate_status.ok()) {
- truncate_status = last_log->Close(IOOptions(), nullptr);
- }
- // Not a critical error if fail to truncate.
- if (!truncate_status.ok()) {
- ROCKS_LOG_WARN(immutable_db_options_.info_log,
- "Failed to truncate log #%" PRIu64 ": %s", wal_number,
- truncate_status.ToString().c_str());
- }
+ LogFileNumberSize log;
+ s = GetLogSizeAndMaybeTruncate(
+ wal_number, /*truncate=*/(wal_number == wal_numbers.back()), &log);
+ if (!s.ok()) {
+ break;
}
- }
- if (two_write_queues_) {
- log_write_mutex_.Unlock();
+ total_log_size_ += log.size;
+ alive_log_files_.push_back(log);
}
return s;
}
Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit) {
mutex_.AssertHeld();
- const uint64_t start_micros = env_->NowMicros();
+ assert(cfd);
+ assert(cfd->imm());
+ // The immutable memtable list must be empty.
+ assert(std::numeric_limits<uint64_t>::max() ==
+ cfd->imm()->GetEarliestMemTableID());
+
+ const uint64_t start_micros = immutable_db_options_.clock->NowMicros();
FileMetaData meta;
std::vector<BlobFileAddition> blob_file_additions;
cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
int64_t _current_time = 0;
- env_->GetCurrentTime(&_current_time)
+ immutable_db_options_.clock->GetCurrentTime(&_current_time)
.PermitUncheckedError(); // ignore error
const uint64_t current_time = static_cast<uint64_t>(_current_time);
meta.oldest_ancester_time = current_time;
std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
range_del_iters;
auto range_del_iter =
- mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
+ // This is called during recovery, where a live memtable is flushed
+ // directly. In this case, no fragmented tombstone list is cached in
+ // this memtable yet.
+ mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
+ false /* immutable_memtable */);
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
IOStatus io_s;
- s = BuildTable(
- dbname_, versions_.get(), immutable_db_options_, *cfd->ioptions(),
- mutable_cf_options, file_options_for_compaction_, cfd->table_cache(),
- iter.get(), std::move(range_del_iters), &meta, &blob_file_additions,
- cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
- cfd->GetID(), cfd->GetName(), snapshot_seqs,
- earliest_write_conflict_snapshot, snapshot_checker,
+ TableBuilderOptions tboptions(
+ *cfd->ioptions(), mutable_cf_options, cfd->internal_comparator(),
+ cfd->int_tbl_prop_collector_factories(),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
- mutable_cf_options.sample_for_compression,
- mutable_cf_options.compression_opts, paranoid_file_checks,
- cfd->internal_stats(), TableFileCreationReason::kRecovery, &io_s,
- io_tracer_, &event_logger_, job_id, Env::IO_HIGH,
- nullptr /* table_properties */, -1 /* level */, current_time,
- 0 /* oldest_key_time */, write_hint, 0 /* file_creation_time */,
- db_id_, db_session_id_);
+ mutable_cf_options.compression_opts, cfd->GetID(), cfd->GetName(),
+ 0 /* level */, false /* is_bottommost */,
+ TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
+ 0 /* file_creation_time */, db_id_, db_session_id_,
+ 0 /* target_file_size */, meta.fd.GetNumber());
+ SeqnoToTimeMapping empty_seqno_time_mapping;
+ s = BuildTable(
+ dbname_, versions_.get(), immutable_db_options_, tboptions,
+ file_options_for_compaction_, cfd->table_cache(), iter.get(),
+ std::move(range_del_iters), &meta, &blob_file_additions,
+ snapshot_seqs, earliest_write_conflict_snapshot, kMaxSequenceNumber,
+ snapshot_checker, paranoid_file_checks, cfd->internal_stats(), &io_s,
+ io_tracer_, BlobFileCreationReason::kRecovery,
+ empty_seqno_time_mapping, &event_logger_, job_id, Env::IO_HIGH,
+ nullptr /* table_properties */, write_hint,
+ nullptr /*full_history_ts_low*/, &blob_callback_);
LogFlush(immutable_db_options_.info_log);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
cfd->GetName().c_str(), meta.fd.GetNumber(),
meta.fd.GetFileSize(), s.ToString().c_str());
mutex_.Lock();
+
+ // TODO(AR) is this ok?
+ if (!io_s.ok() && s.ok()) {
+ s = io_s;
+ }
}
}
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.fd.smallest_seqno, meta.fd.largest_seqno,
- meta.marked_for_compaction, meta.oldest_blob_file_number,
- meta.oldest_ancester_time, meta.file_creation_time,
- meta.file_checksum, meta.file_checksum_func_name);
+ meta.marked_for_compaction, meta.temperature,
+ meta.oldest_blob_file_number, meta.oldest_ancester_time,
+ meta.file_creation_time, meta.file_checksum,
+ meta.file_checksum_func_name, meta.unique_id);
- edit->SetBlobFileAdditions(std::move(blob_file_additions));
+ for (const auto& blob : blob_file_additions) {
+ edit->AddBlobFile(blob);
+ }
}
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
- stats.micros = env_->NowMicros() - start_micros;
+ stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;
if (has_output) {
stats.bytes_written = meta.fd.GetFileSize();
const auto& blobs = edit->GetBlobFileAdditions();
for (const auto& blob : blobs) {
- stats.bytes_written += blob.GetTotalBlobBytes();
+ stats.bytes_written_blob += blob.GetTotalBlobBytes();
}
- stats.num_output_files += static_cast<int>(blobs.size());
+ stats.num_output_files_blob = static_cast<int>(blobs.size());
cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
- cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
- stats.bytes_written);
+ cfd->internal_stats()->AddCFStats(
+ InternalStats::BYTES_FLUSHED,
+ stats.bytes_written + stats.bytes_written_blob);
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
return s;
}
!kSeqPerBatch, kBatchPerTxn);
}
+// TODO: Implement the trimming in flush code path.
+// TODO: Perform trimming before inserting into memtable during recovery.
+// TODO: Pick files with max_timestamp > trim_ts by each file's timestamp meta
+// info, and handle only these files to reduce io.
+Status DB::OpenAndTrimHistory(
+ const DBOptions& db_options, const std::string& dbname,
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
+ std::string trim_ts) {
+ assert(dbptr != nullptr);
+ assert(handles != nullptr);
+ auto validate_options = [&db_options] {
+ if (db_options.avoid_flush_during_recovery) {
+ return Status::InvalidArgument(
+ "avoid_flush_during_recovery incompatible with "
+ "OpenAndTrimHistory");
+ }
+ return Status::OK();
+ };
+ auto s = validate_options();
+ if (!s.ok()) {
+ return s;
+ }
+
+ DB* db = nullptr;
+ s = DB::Open(db_options, dbname, column_families, handles, &db);
+ if (!s.ok()) {
+ return s;
+ }
+ assert(db);
+ CompactRangeOptions options;
+ options.bottommost_level_compaction =
+ BottommostLevelCompaction::kForceOptimized;
+ auto db_impl = static_cast_with_check<DBImpl>(db);
+ for (auto handle : *handles) {
+ assert(handle != nullptr);
+ auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(handle);
+ auto cfd = cfh->cfd();
+ assert(cfd != nullptr);
+ // Only compact column families with timestamp enabled
+ if (cfd->user_comparator() != nullptr &&
+ cfd->user_comparator()->timestamp_size() > 0) {
+ s = db_impl->CompactRangeInternal(options, handle, nullptr, nullptr,
+ trim_ts);
+ if (!s.ok()) {
+ break;
+ }
+ }
+ }
+ auto clean_op = [&handles, &db] {
+ for (auto handle : *handles) {
+ auto temp_s = db->DestroyColumnFamilyHandle(handle);
+ assert(temp_s.ok());
+ }
+ handles->clear();
+ delete db;
+ };
+ if (!s.ok()) {
+ clean_op();
+ return s;
+ }
+
+ *dbptr = db;
+ return s;
+}
+
IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
size_t preallocate_block_size,
log::Writer** new_log) {
BuildDBOptions(immutable_db_options_, mutable_db_options_);
FileOptions opt_file_options =
fs_->OptimizeForLogWrite(file_options_, db_options);
- std::string log_fname =
- LogFileName(immutable_db_options_.wal_dir, log_file_num);
+ std::string wal_dir = immutable_db_options_.GetWalDir();
+ std::string log_fname = LogFileName(wal_dir, log_file_num);
if (recycle_log_number) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"reusing log %" PRIu64 " from recycle list\n",
recycle_log_number);
- std::string old_log_fname =
- LogFileName(immutable_db_options_.wal_dir, recycle_log_number);
+ std::string old_log_fname = LogFileName(wal_dir, recycle_log_number);
TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
lfile->SetPreallocationBlockSize(preallocate_block_size);
const auto& listeners = immutable_db_options_.listeners;
+ FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
- std::move(lfile), log_fname, opt_file_options, env_, io_tracer_,
- nullptr /* stats */, listeners));
+ std::move(lfile), log_fname, opt_file_options,
+ immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners,
+ nullptr, tmp_set.Contains(FileType::kWalFile),
+ tmp_set.Contains(FileType::kWalFile)));
*new_log = new log::Writer(std::move(file_writer), log_file_num,
immutable_db_options_.recycle_log_file_num > 0,
- immutable_db_options_.manual_wal_flush);
+ immutable_db_options_.manual_wal_flush,
+ immutable_db_options_.wal_compression);
+ io_s = (*new_log)->AddCompressionTypeRecord();
}
return io_s;
}
}
*dbptr = nullptr;
+ assert(handles);
handles->clear();
size_t max_write_buffer_size = 0;
}
DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
- s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
+ if (!impl->immutable_db_options_.info_log) {
+ s = impl->init_logger_creation_s_;
+ delete impl;
+ return s;
+ } else {
+ assert(impl->init_logger_creation_s_.ok());
+ }
+ s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
if (s.ok()) {
std::vector<std::string> paths;
for (auto& db_path : impl->immutable_db_options_.db_paths) {
return s;
}
- impl->wal_in_db_path_ = IsWalDirSameAsDBPath(&impl->immutable_db_options_);
-
+ impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
+ RecoveryContext recovery_ctx;
impl->mutex_.Lock();
+
// Handles create_if_missing, error_if_exists
uint64_t recovered_seq(kMaxSequenceNumber);
- s = impl->Recover(column_families, false, false, false, &recovered_seq);
+ s = impl->Recover(column_families, false, false, false, &recovered_seq,
+ &recovery_ctx);
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
log::Writer* new_log = nullptr;
InstrumentedMutexLock wl(&impl->log_write_mutex_);
impl->logfile_number_ = new_log_number;
assert(new_log != nullptr);
+ assert(impl->logs_.empty());
impl->logs_.emplace_back(new_log_number, new_log);
}
if (s.ok()) {
- // set column family handles
- for (auto cf : column_families) {
- auto cfd =
- impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
- if (cfd != nullptr) {
- handles->push_back(
- new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
- impl->NewThreadStatusCfInfo(cfd);
- } else {
- if (db_options.create_missing_column_families) {
- // missing column family, create it
- ColumnFamilyHandle* handle;
- impl->mutex_.Unlock();
- s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
- impl->mutex_.Lock();
- if (s.ok()) {
- handles->push_back(handle);
- } else {
- break;
- }
- } else {
- s = Status::InvalidArgument("Column family not found", cf.name);
- break;
- }
- }
- }
- }
- if (s.ok()) {
- SuperVersionContext sv_context(/* create_superversion */ true);
- for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
- impl->InstallSuperVersionAndScheduleWork(
- cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
- }
- sv_context.Clean();
- if (impl->two_write_queues_) {
- impl->log_write_mutex_.Lock();
- }
impl->alive_log_files_.push_back(
DBImpl::LogFileNumberSize(impl->logfile_number_));
- if (impl->two_write_queues_) {
- impl->log_write_mutex_.Unlock();
- }
-
- impl->DeleteObsoleteFiles();
- s = impl->directories_.GetDbDir()->Fsync(IOOptions(), nullptr);
- }
- if (s.ok()) {
// In WritePrepared there could be gap in sequence numbers. This breaks
// the trick we use in kPointInTimeRecovery which assumes the first seq in
// the log right after the corrupted log is one larger than the last seq
WriteOptions write_options;
uint64_t log_used, log_size;
log::Writer* log_writer = impl->logs_.back().writer;
- s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size);
+ LogFileNumberSize& log_file_number_size = impl->alive_log_files_.back();
+
+ assert(log_writer->get_log_number() == log_file_number_size.number);
+ impl->mutex_.AssertHeld();
+ s = impl->WriteToWAL(empty_batch, log_writer, &log_used, &log_size,
+ Env::IO_TOTAL, log_file_number_size);
if (s.ok()) {
// Need to fsync, otherwise it might get lost after a power reset.
s = impl->FlushWAL(false);
+ TEST_SYNC_POINT_CALLBACK("DBImpl::Open::BeforeSyncWAL", /*arg=*/&s);
if (s.ok()) {
s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
}
}
}
}
+ if (s.ok()) {
+ s = impl->LogAndApplyForRecovery(recovery_ctx);
+ }
+
if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
- // try to read format version
- s = impl->PersistentStatsProcessFormatVersion();
+ impl->mutex_.AssertHeld();
+ s = impl->InitPersistStatsColumnFamily();
}
if (s.ok()) {
- for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
- if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
- auto* vstorage = cfd->current()->storage_info();
- for (int i = 1; i < vstorage->num_levels(); ++i) {
- int num_files = vstorage->NumLevelFiles(i);
- if (num_files > 0) {
- s = Status::InvalidArgument(
- "Not all files are at level 0. Cannot "
- "open with FIFO compaction style.");
+ // set column family handles
+ for (auto cf : column_families) {
+ auto cfd =
+ impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
+ if (cfd != nullptr) {
+ handles->push_back(
+ new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
+ impl->NewThreadStatusCfInfo(cfd);
+ } else {
+ if (db_options.create_missing_column_families) {
+ // missing column family, create it
+ ColumnFamilyHandle* handle = nullptr;
+ impl->mutex_.Unlock();
+ s = impl->CreateColumnFamily(cf.options, cf.name, &handle);
+ impl->mutex_.Lock();
+ if (s.ok()) {
+ handles->push_back(handle);
+ } else {
break;
}
+ } else {
+ s = Status::InvalidArgument("Column family not found", cf.name);
+ break;
}
}
+ }
+ }
+
+ if (s.ok()) {
+ SuperVersionContext sv_context(/* create_superversion */ true);
+ for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
+ impl->InstallSuperVersionAndScheduleWork(
+ cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
+ }
+ sv_context.Clean();
+ }
+
+ if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
+ // try to read format version
+ s = impl->PersistentStatsProcessFormatVersion();
+ }
+
+ if (s.ok()) {
+ for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
if (!cfd->mem()->IsSnapshotSupported()) {
impl->is_snapshot_supported_ = false;
}
*dbptr = impl;
impl->opened_successfully_ = true;
+ impl->DeleteObsoleteFiles();
+ TEST_SYNC_POINT("DBImpl::Open:AfterDeleteFiles");
impl->MaybeScheduleFlushOrCompaction();
} else {
persist_options_status.PermitUncheckedError();
// vast majority of all files), we'll pass the size to SstFileManager.
// For all other files SstFileManager will query the size from filesystem.
- std::vector<LiveFileMetaData> metadata;
-
- impl->mutex_.Lock();
- impl->versions_->GetLiveFilesMetaData(&metadata);
- impl->mutex_.Unlock();
+ std::vector<ColumnFamilyMetaData> metadata;
+ impl->GetAllColumnFamilyMetaData(&metadata);
std::unordered_map<std::string, uint64_t> known_file_sizes;
for (const auto& md : metadata) {
- std::string name = md.name;
- if (!name.empty() && name[0] == '/') {
- name = name.substr(1);
+ for (const auto& lmd : md.levels) {
+ for (const auto& fmd : lmd.files) {
+ known_file_sizes[fmd.relative_filename] = fmd.size;
+ }
+ }
+ for (const auto& bmd : md.blob_files) {
+ std::string name = bmd.blob_file_name;
+ // The BlobMetaData.blob_file_name may start with "/".
+ if (!name.empty() && name[0] == '/') {
+ name = name.substr(1);
+ }
+ known_file_sizes[name] = bmd.blob_file_size;
}
- known_file_sizes[name] = md.size;
}
std::vector<std::string> paths;
// Remove duplicate paths.
std::sort(paths.begin(), paths.end());
paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
+ IOOptions io_opts;
+ io_opts.do_not_recurse = true;
for (auto& path : paths) {
std::vector<std::string> existing_files;
- // TODO: Check for errors here?
- impl->immutable_db_options_.env->GetChildren(path, &existing_files)
- .PermitUncheckedError();
+ impl->immutable_db_options_.fs
+ ->GetChildren(path, io_opts, &existing_files,
+ /*IODebugContext*=*/nullptr)
+ .PermitUncheckedError(); //**TODO: What do to on error?
for (auto& file_name : existing_files) {
uint64_t file_number;
FileType file_type;
std::string file_path = path + "/" + file_name;
if (ParseFileName(file_name, &file_number, &file_type) &&
- file_type == kTableFile) {
+ (file_type == kTableFile || file_type == kBlobFile)) {
// TODO: Check for errors from OnAddFile?
if (known_file_sizes.count(file_name)) {
// We're assuming that each sst file name exists in at most one of
// the paths.
- sfm->OnAddFile(file_path, known_file_sizes.at(file_name),
- /* compaction */ false)
+ sfm->OnAddFile(file_path, known_file_sizes.at(file_name))
.PermitUncheckedError();
} else {
sfm->OnAddFile(file_path).PermitUncheckedError();
ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
impl);
LogFlush(impl->immutable_db_options_.info_log);
- assert(impl->TEST_WALBufferIsEmpty());
- // If the assert above fails then we need to FlushWAL before returning
- // control back to the user.
- if (!persist_options_status.ok()) {
+ if (!impl->WALBufferIsEmpty()) {
+ s = impl->FlushWAL(false);
+ if (s.ok()) {
+ // Sync is needed otherwise WAL buffered data might get lost after a
+ // power reset.
+ log::Writer* log_writer = impl->logs_.back().writer;
+ s = log_writer->file()->Sync(impl->immutable_db_options_.use_fsync);
+ }
+ }
+ if (s.ok() && !persist_options_status.ok()) {
s = Status::IOError(
"DB::Open() failed --- Unable to persist Options file",
persist_options_status.ToString());
}
- } else {
+ }
+ if (!s.ok()) {
ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
- "Persisting Option File error: %s",
- persist_options_status.ToString().c_str());
+ "DB::Open() failed: %s", s.ToString().c_str());
}
if (s.ok()) {
- impl->StartPeriodicWorkScheduler();
- } else {
+ s = impl->StartPeriodicTaskScheduler();
+ }
+
+ if (s.ok()) {
+ s = impl->RegisterRecordSeqnoTimeWorker();
+ }
+ if (!s.ok()) {
for (auto* h : *handles) {
delete h;
}