#include "util/sync_point.h"
namespace rocksdb {
-Options SanitizeOptions(const std::string& dbname,
- const Options& src) {
+Options SanitizeOptions(const std::string& dbname, const Options& src) {
auto db_options = SanitizeOptions(dbname, DBOptions(src));
ImmutableDBOptions immutable_db_options(db_options);
auto cf_options =
max_max_open_files = 0x400000;
}
ClipToRange(&result.max_open_files, 20, max_max_open_files);
+ TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
+ &result.max_open_files);
}
if (result.info_log == nullptr) {
result.write_buffer_manager.reset(
new WriteBufferManager(result.db_write_buffer_size));
}
- auto bg_job_limits = DBImpl::GetBGJobLimits(result.max_background_flushes,
- result.max_background_compactions,
- result.max_background_jobs,
- true /* parallelize_compactions */);
+ auto bg_job_limits = DBImpl::GetBGJobLimits(
+ result.max_background_flushes, result.max_background_compactions,
+ result.max_background_jobs, true /* parallelize_compactions */);
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
Env::Priority::LOW);
result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
}
- if (result.use_direct_reads &&
- result.compaction_readahead_size == 0) {
+ if (result.use_direct_reads && result.compaction_readahead_size == 0) {
TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
result.compaction_readahead_size = 1024 * 1024 * 2;
}
- if (result.compaction_readahead_size > 0 ||
- result.use_direct_reads) {
+ if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
result.new_table_reader_for_compaction_inputs = true;
}
return s;
}
- if (cfd.options.ttl > 0 || cfd.options.compaction_options_fifo.ttl > 0) {
+ if (cfd.options.ttl > 0) {
if (db_options.max_open_files != -1) {
return Status::NotSupported(
"TTL is only supported when files are always "
return Status::OK();
}
-} // namespace
+} // namespace
Status DBImpl::NewDB() {
VersionEdit new_db;
new_db.SetLogNumber(0);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
const std::string manifest = DescriptorFileName(dbname_, 1);
{
- unique_ptr<WritableFile> file;
+ std::unique_ptr<WritableFile> file;
EnvOptions env_options = env_->OptimizeForManifestWrite(env_options_);
s = NewWritableFile(env_, manifest, &file, env_options);
if (!s.ok()) {
}
file->SetPreallocationBlockSize(
immutable_db_options_.manifest_preallocation_size);
- unique_ptr<WritableFileWriter> file_writer(
- new WritableFileWriter(std::move(file), manifest, env_options));
+ std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+ std::move(file), manifest, env_options, env_, nullptr /* stats */,
+ immutable_db_options_.listeners));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
new_db.EncodeTo(&record);
return s;
}
-Status DBImpl::CreateAndNewDirectory(
- Env* env, const std::string& dirname,
- std::unique_ptr<Directory>* directory) {
+Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname,
+ std::unique_ptr<Directory>* directory) {
// We call CreateDirIfMissing() as the directory may already exist (if we
// are reopening a DB), when this happens we don't want creating the
// directory to cause an error. However, we need to check if creating the
}
} else if (s.ok()) {
if (immutable_db_options_.error_if_exists) {
- return Status::InvalidArgument(
- dbname_, "exists (error_if_exists is true)");
+ return Status::InvalidArgument(dbname_,
+ "exists (error_if_exists is true)");
}
} else {
// Unexpected error reading file
}
// Verify compatibility of env_options_ and filesystem
{
- unique_ptr<RandomAccessFile> idfile;
+ std::unique_ptr<RandomAccessFile> idfile;
EnvOptions customized_env(env_options_);
customized_env.use_direct_reads |=
immutable_db_options_.use_direct_io_for_flush_and_compaction;
// produced by an older version of rocksdb.
std::vector<std::string> filenames;
s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
- if (!s.ok()) {
+ if (s.IsNotFound()) {
+ return Status::InvalidArgument("wal_dir not found",
+ immutable_db_options_.wal_dir);
+ } else if (!s.ok()) {
return s;
}
}
}
+ if (read_only) {
+ // If we are opening as read-only, we need to update options_file_number_
+ // to reflect the most recent OPTIONS file. It does not matter for regular
+ // read-write db instance because options_file_number_ will later be
+ // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
+ std::vector<std::string> file_names;
+ if (s.ok()) {
+ s = env_->GetChildren(GetName(), &file_names);
+ }
+ if (s.ok()) {
+ uint64_t number = 0;
+ uint64_t options_file_number = 0;
+ FileType type;
+ for (const auto& fname : file_names) {
+ if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
+ options_file_number = std::max(number, options_file_number);
+ }
+ }
+ versions_->options_file_number_ = options_file_number;
+ }
+ }
+
return s;
}
Logger* info_log;
const char* fname;
Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
- virtual void Corruption(size_t bytes, const Status& s) override {
+ void Corruption(size_t bytes, const Status& s) override {
ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
(this->status == nullptr ? "(ignoring error) " : ""),
fname, static_cast<int>(bytes), s.ToString().c_str());
std::map<std::string, uint32_t> cf_name_id_map;
std::map<uint32_t, uint64_t> cf_lognumber_map;
for (auto cfd : *versions_->GetColumnFamilySet()) {
- cf_name_id_map.insert(
- std::make_pair(cfd->GetName(), cfd->GetID()));
+ cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
cf_lognumber_map.insert(
- std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
+ std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
}
immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovering log #%" PRIu64 " mode %d", log_number,
- immutable_db_options_.wal_recovery_mode);
+ static_cast<int>(immutable_db_options_.wal_recovery_mode));
auto logFileDropped = [this, &fname]() {
uint64_t bytes;
if (env_->GetFileSize(fname, &bytes).ok()) {
continue;
}
- unique_ptr<SequentialFileReader> file_reader;
+ std::unique_ptr<SequentialFileReader> file_reader;
{
- unique_ptr<SequentialFile> file;
+ std::unique_ptr<SequentialFile> file;
status = env_->NewSequentialFile(fname, &file,
env_->OptimizeForLogRead(env_options_));
if (!status.ok()) {
" mode %d log filter %s returned "
"more records (%d) than original (%d) which is not allowed. "
"Aborting recovery.",
- log_number, immutable_db_options_.wal_recovery_mode,
+ log_number,
+ static_cast<int>(immutable_db_options_.wal_recovery_mode),
immutable_db_options_.wal_filter->Name(), new_count,
original_count);
status = Status::NotSupported(
// VersionSet::next_file_number_ always to be strictly greater than any
// log number
versions_->MarkFileNumberUsed(max_log_number + 1);
- status = versions_->LogAndApply(
- cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_);
+ status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
+ edit, &mutex_);
if (!status.ok()) {
// Recovery failed
break;
if (use_custom_gc_ && snapshot_checker == nullptr) {
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
+ std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
+ range_del_iters;
+ auto range_del_iter =
+ mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
+ if (range_del_iter != nullptr) {
+ range_del_iters.emplace_back(range_del_iter);
+ }
s = BuildTable(
dbname_, env_, *cfd->ioptions(), mutable_cf_options,
env_options_for_compaction_, cfd->table_cache(), iter.get(),
- std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
- &meta, cfd->internal_comparator(),
+ std::move(range_del_iters), &meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
+ mutable_cf_options.sample_for_compression,
cfd->ioptions()->compression_opts, paranoid_file_checks,
cfd->internal_stats(), TableFileCreationReason::kRecovery,
&event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.fd.GetFileSize();
stats.num_output_files = 1;
- cfd->internal_stats()->AddCompactionStats(level, stats);
- cfd->internal_stats()->AddCFStats(
- InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize());
+ cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
+ cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
+ meta.fd.GetFileSize());
RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
return s;
}
s = impl->Recover(column_families);
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();
- unique_ptr<WritableFile> lfile;
+ std::unique_ptr<WritableFile> lfile;
EnvOptions soptions(db_options);
EnvOptions opt_env_options =
impl->immutable_db_options_.env->OptimizeForLogWrite(
{
InstrumentedMutexLock wl(&impl->log_write_mutex_);
impl->logfile_number_ = new_log_number;
- unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
- std::move(lfile), log_fname, opt_env_options));
+ const auto& listeners = impl->immutable_db_options_.listeners;
+ std::unique_ptr<WritableFileWriter> file_writer(
+ new WritableFileWriter(std::move(lfile), log_fname, opt_env_options,
+ impl->env_, nullptr /* stats */, listeners));
impl->logs_.emplace_back(
new_log_number,
new log::Writer(
!cfd->mem()->IsMergeOperatorSupported()) {
s = Status::InvalidArgument(
"The memtable of column family %s does not support merge operator "
- "its options.merge_operator is non-null", cfd->GetName().c_str());
+ "its options.merge_operator is non-null",
+ cfd->GetName().c_str());
}
if (!s.ok()) {
break;
#endif // !ROCKSDB_LITE
if (s.ok()) {
- ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl);
+ ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
+ impl);
LogFlush(impl->immutable_db_options_.info_log);
assert(impl->TEST_WALBufferIsEmpty());
// If the assert above fails then we need to FlushWAL before returning
persist_options_status.ToString());
}
}
+ if (s.ok()) {
+ impl->StartTimedTasks();
+ }
if (!s.ok()) {
for (auto* h : *handles) {
delete h;