// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
env_(db_options.env),
env_options_(),
db_options_(SanitizeOptions(dbname_, db_options)),
- immutable_db_options_(db_options_),
+ immutable_db_options_(ImmutableDBOptions(db_options_)),
icmp_(default_cf_opts.comparator),
- default_cf_opts_(default_cf_opts),
+ default_cf_opts_(
+ SanitizeOptions(immutable_db_options_, default_cf_opts)),
default_cf_iopts_(
- ImmutableCFOptions(immutable_db_options_, default_cf_opts)),
- unknown_cf_opts_(unknown_cf_opts),
+ ImmutableCFOptions(immutable_db_options_, default_cf_opts_)),
+ unknown_cf_opts_(
+ SanitizeOptions(immutable_db_options_, unknown_cf_opts)),
create_unknown_cfs_(create_unknown_cfs),
raw_table_cache_(
// TableCache can be small since we expect each table to be opened
status = db_impl->NewDB();
delete db_impl;
}
+
if (status.ok()) {
// Recover using the fresh manifest created by NewDB()
status =
ROCKS_LOG_WARN(db_options_.info_log,
"**** Repaired rocksdb %s; "
"recovered %" ROCKSDB_PRIszt " files; %" PRIu64
- "bytes. "
+ " bytes. "
"Some data may have been lost. "
"****",
dbname_.c_str(), tables_.size(), bytes);
Status FindFiles() {
std::vector<std::string> filenames;
bool found_file = false;
+ std::vector<std::string> to_search_paths;
+
for (size_t path_id = 0; path_id < db_options_.db_paths.size(); path_id++) {
- Status status =
- env_->GetChildren(db_options_.db_paths[path_id].path, &filenames);
+ to_search_paths.push_back(db_options_.db_paths[path_id].path);
+ }
+
+ // search wal_dir if user uses a customize wal_dir
+ bool same = false;
+ Status status = env_->AreFilesSame(db_options_.wal_dir, dbname_, &same);
+ if (status.IsNotSupported()) {
+ same = db_options_.wal_dir == dbname_;
+ status = Status::OK();
+ } else if (!status.ok()) {
+ return status;
+ }
+
+ if (!same) {
+ to_search_paths.push_back(db_options_.wal_dir);
+ }
+
+ for (size_t path_id = 0; path_id < to_search_paths.size(); path_id++) {
+ status = env_->GetChildren(to_search_paths[path_id], &filenames);
if (!status.ok()) {
return status;
}
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
if (type == kDescriptorFile) {
- assert(path_id == 0);
manifests_.push_back(filenames[i]);
} else {
if (number + 1 > next_file_number_) {
next_file_number_ = number + 1;
}
if (type == kLogFile) {
- assert(path_id == 0);
logs_.push_back(number);
} else if (type == kTableFile) {
table_fds_.emplace_back(number, static_cast<uint32_t>(path_id),
void ConvertLogFilesToTables() {
for (size_t i = 0; i < logs_.size(); i++) {
- std::string logname = LogFileName(dbname_, logs_[i]);
+ // we should use LogFileName(wal_dir, logs_[i]) here. user might uses wal_dir option.
+ std::string logname = LogFileName(db_options_.wal_dir, logs_[i]);
Status status = ConvertLogToTable(logs_[i]);
if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
};
// Open the log file
- std::string logname = LogFileName(dbname_, log);
+ std::string logname = LogFileName(db_options_.wal_dir, log);
unique_ptr<SequentialFile> lfile;
- Status status = env_->NewSequentialFile(logname, &lfile, env_options_);
+ Status status = env_->NewSequentialFile(
+ logname, &lfile, env_->OptimizeForLogRead(env_options_));
if (!status.ok()) {
return status;
}
unique_ptr<SequentialFileReader> lfile_reader(
- new SequentialFileReader(std::move(lfile)));
+ new SequentialFileReader(std::move(lfile), logname));
// Create the log reader.
LogReporter reporter;
// propagating bad information (like overly large sequence
// numbers).
log::Reader reader(db_options_.info_log, std::move(lfile_reader), &reporter,
- true /*enable checksum*/, 0 /*initial_offset*/, log);
+ true /*enable checksum*/, log);
// Initialize per-column family memtables
for (auto* cfd : *vset_.GetColumnFamilySet()) {
ro.total_order_seek = true;
Arena arena;
ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
- EnvOptions optimized_env_options =
- env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_);
+ int64_t _current_time = 0;
+ status = env_->GetCurrentTime(&_current_time); // ignore error
+ const uint64_t current_time = static_cast<uint64_t>(_current_time);
+ SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
+
+ auto write_hint = cfd->CalculateSSTWriteHint(0);
status = BuildTable(
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
- optimized_env_options, table_cache_, iter.get(),
+ env_options_, table_cache_, iter.get(),
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
&meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
- {}, kMaxSequenceNumber, kNoCompression, CompressionOptions(), false,
- nullptr /* internal_stats */, TableFileCreationReason::kRecovery);
+ {}, kMaxSequenceNumber, snapshot_checker, kNoCompression,
+ CompressionOptions(), false, nullptr /* internal_stats */,
+ TableFileCreationReason::kRecovery, nullptr /* event_logger */,
+ 0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */,
+ -1 /* level */, current_time, write_hint);
ROCKS_LOG_INFO(db_options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
log, counter, meta.fd.GetNumber(),
}
if (status.ok()) {
InternalIterator* iter = table_cache_->NewIterator(
- ReadOptions(), env_options_, cfd->internal_comparator(), t->meta.fd,
- nullptr /* range_del_agg */);
+ ReadOptions(), env_options_, cfd->internal_comparator(), t->meta,
+ nullptr /* range_del_agg */,
+ cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
bool empty = true;
ParsedInternalKey parsed;
t->min_sequence = 0;
max_sequence = tables_[i].max_sequence;
}
}
+ vset_.SetLastAllocatedSequence(max_sequence);
+ vset_.SetLastPublishedSequence(max_sequence);
vset_.SetLastSequence(max_sequence);
for (const auto& cf_id_and_tables : cf_id_to_tables) {
table->meta.largest, table->min_sequence,
table->max_sequence, table->meta.marked_for_compaction);
}
+ assert(next_file_number_ > 0);
+ vset_.MarkFileNumberUsed(next_file_number_ - 1);
mutex_.Lock();
Status status = vset_.LogAndApply(
cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
} // anonymous namespace
Status RepairDB(const std::string& dbname, const DBOptions& db_options,
- const std::vector<ColumnFamilyDescriptor>& column_families) {
+ const std::vector<ColumnFamilyDescriptor>& column_families
+ ) {
ColumnFamilyOptions default_cf_opts;
Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
if (status.ok()) {
- Repairer repairer(dbname, db_options, column_families, default_cf_opts,
+ Repairer repairer(dbname, db_options, column_families,
+ default_cf_opts,
ColumnFamilyOptions() /* unknown_cf_opts */,
false /* create_unknown_cfs */);
status = repairer.Run();
ColumnFamilyOptions default_cf_opts;
Status status = GetDefaultCFOptions(column_families, &default_cf_opts);
if (status.ok()) {
- Repairer repairer(dbname, db_options, column_families, default_cf_opts,
+ Repairer repairer(dbname, db_options,
+ column_families, default_cf_opts,
unknown_cf_opts, true /* create_unknown_cfs */);
status = repairer.Run();
}
Status RepairDB(const std::string& dbname, const Options& options) {
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
- Repairer repairer(dbname, db_options, {}, cf_options /* default_cf_opts */,
+ Repairer repairer(dbname, db_options,
+ {}, cf_options /* default_cf_opts */,
cf_options /* unknown_cf_opts */,
true /* create_unknown_cfs */);
return repairer.Run();