]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/db_impl_open.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / db_impl_open.cc
index 4852d41deaee3c578219196f00ff466d56f32a9d..f5008857bc9f39ed5797c9969d365072ac728d31 100644 (file)
@@ -23,8 +23,7 @@
 #include "util/sync_point.h"
 
 namespace rocksdb {
-Options SanitizeOptions(const std::string& dbname,
-                        const Options& src) {
+Options SanitizeOptions(const std::string& dbname, const Options& src) {
   auto db_options = SanitizeOptions(dbname, DBOptions(src));
   ImmutableDBOptions immutable_db_options(db_options);
   auto cf_options =
@@ -42,6 +41,8 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
       max_max_open_files = 0x400000;
     }
     ClipToRange(&result.max_open_files, 20, max_max_open_files);
+    TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
+                             &result.max_open_files);
   }
 
   if (result.info_log == nullptr) {
@@ -56,10 +57,9 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
     result.write_buffer_manager.reset(
         new WriteBufferManager(result.db_write_buffer_size));
   }
-  auto bg_job_limits = DBImpl::GetBGJobLimits(result.max_background_flushes,
-                                              result.max_background_compactions,
-                                              result.max_background_jobs,
-                                              true /* parallelize_compactions */);
+  auto bg_job_limits = DBImpl::GetBGJobLimits(
+      result.max_background_flushes, result.max_background_compactions,
+      result.max_background_jobs, true /* parallelize_compactions */);
   result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
                                            Env::Priority::LOW);
   result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
@@ -107,14 +107,12 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
     result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
   }
 
-  if (result.use_direct_reads &&
-      result.compaction_readahead_size == 0) {
+  if (result.use_direct_reads && result.compaction_readahead_size == 0) {
     TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr);
     result.compaction_readahead_size = 1024 * 1024 * 2;
   }
 
-  if (result.compaction_readahead_size > 0 ||
-      result.use_direct_reads) {
+  if (result.compaction_readahead_size > 0 || result.use_direct_reads) {
     result.new_table_reader_for_compaction_inputs = true;
   }
 
@@ -178,7 +176,7 @@ static Status ValidateOptions(
       return s;
     }
 
-    if (cfd.options.ttl > 0 || cfd.options.compaction_options_fifo.ttl > 0) {
+    if (cfd.options.ttl > 0) {
       if (db_options.max_open_files != -1) {
         return Status::NotSupported(
             "TTL is only supported when files are always "
@@ -218,7 +216,7 @@ static Status ValidateOptions(
 
   return Status::OK();
 }
-} // namespace
+}  // namespace
 Status DBImpl::NewDB() {
   VersionEdit new_db;
   new_db.SetLogNumber(0);
@@ -230,7 +228,7 @@ Status DBImpl::NewDB() {
   ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
   const std::string manifest = DescriptorFileName(dbname_, 1);
   {
-    unique_ptr<WritableFile> file;
+    std::unique_ptr<WritableFile> file;
     EnvOptions env_options = env_->OptimizeForManifestWrite(env_options_);
     s = NewWritableFile(env_, manifest, &file, env_options);
     if (!s.ok()) {
@@ -238,8 +236,9 @@ Status DBImpl::NewDB() {
     }
     file->SetPreallocationBlockSize(
         immutable_db_options_.manifest_preallocation_size);
-    unique_ptr<WritableFileWriter> file_writer(
-        new WritableFileWriter(std::move(file), manifest, env_options));
+    std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
+        std::move(file), manifest, env_options, env_, nullptr /* stats */,
+        immutable_db_options_.listeners));
     log::Writer log(std::move(file_writer), 0, false);
     std::string record;
     new_db.EncodeTo(&record);
@@ -257,9 +256,8 @@ Status DBImpl::NewDB() {
   return s;
 }
 
-Status DBImpl::CreateAndNewDirectory(
-    Env* env, const std::string& dirname,
-    std::unique_ptr<Directory>* directory) {
+Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname,
+                                     std::unique_ptr<Directory>* directory) {
   // We call CreateDirIfMissing() as the directory may already exist (if we
   // are reopening a DB), when this happens we don't want creating the
   // directory to cause an error. However, we need to check if creating the
@@ -340,8 +338,8 @@ Status DBImpl::Recover(
       }
     } else if (s.ok()) {
       if (immutable_db_options_.error_if_exists) {
-        return Status::InvalidArgument(
-            dbname_, "exists (error_if_exists is true)");
+        return Status::InvalidArgument(dbname_,
+                                       "exists (error_if_exists is true)");
       }
     } else {
       // Unexpected error reading file
@@ -361,7 +359,7 @@ Status DBImpl::Recover(
     }
     // Verify compatibility of env_options_ and filesystem
     {
-      unique_ptr<RandomAccessFile> idfile;
+      std::unique_ptr<RandomAccessFile> idfile;
       EnvOptions customized_env(env_options_);
       customized_env.use_direct_reads |=
           immutable_db_options_.use_direct_io_for_flush_and_compaction;
@@ -423,7 +421,10 @@ Status DBImpl::Recover(
     // produced by an older version of rocksdb.
     std::vector<std::string> filenames;
     s = env_->GetChildren(immutable_db_options_.wal_dir, &filenames);
-    if (!s.ok()) {
+    if (s.IsNotFound()) {
+      return Status::InvalidArgument("wal_dir not found",
+                                     immutable_db_options_.wal_dir);
+    } else if (!s.ok()) {
       return s;
     }
 
@@ -478,6 +479,28 @@ Status DBImpl::Recover(
     }
   }
 
+  if (read_only) {
+    // If we are opening as read-only, we need to update options_file_number_
+    // to reflect the most recent OPTIONS file. It does not matter for regular
+    // read-write db instance because options_file_number_ will later be
+    // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
+    std::vector<std::string> file_names;
+    if (s.ok()) {
+      s = env_->GetChildren(GetName(), &file_names);
+    }
+    if (s.ok()) {
+      uint64_t number = 0;
+      uint64_t options_file_number = 0;
+      FileType type;
+      for (const auto& fname : file_names) {
+        if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
+          options_file_number = std::max(number, options_file_number);
+        }
+      }
+      versions_->options_file_number_ = options_file_number;
+    }
+  }
+
   return s;
 }
 
@@ -489,7 +512,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
     Logger* info_log;
     const char* fname;
     Status* status;  // nullptr if immutable_db_options_.paranoid_checks==false
-    virtual void Corruption(size_t bytes, const Status& s) override {
+    void Corruption(size_t bytes, const Status& s) override {
       ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
                      (this->status == nullptr ? "(ignoring error) " : ""),
                      fname, static_cast<int>(bytes), s.ToString().c_str());
@@ -526,10 +549,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
     std::map<std::string, uint32_t> cf_name_id_map;
     std::map<uint32_t, uint64_t> cf_lognumber_map;
     for (auto cfd : *versions_->GetColumnFamilySet()) {
-      cf_name_id_map.insert(
-        std::make_pair(cfd->GetName(), cfd->GetID()));
+      cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
       cf_lognumber_map.insert(
-        std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
+          std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
     }
 
     immutable_db_options_.wal_filter->ColumnFamilyLogNumberMap(cf_lognumber_map,
@@ -558,7 +580,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
 
     ROCKS_LOG_INFO(immutable_db_options_.info_log,
                    "Recovering log #%" PRIu64 " mode %d", log_number,
-                   immutable_db_options_.wal_recovery_mode);
+                   static_cast<int>(immutable_db_options_.wal_recovery_mode));
     auto logFileDropped = [this, &fname]() {
       uint64_t bytes;
       if (env_->GetFileSize(fname, &bytes).ok()) {
@@ -572,9 +594,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
       continue;
     }
 
-    unique_ptr<SequentialFileReader> file_reader;
+    std::unique_ptr<SequentialFileReader> file_reader;
     {
-      unique_ptr<SequentialFile> file;
+      std::unique_ptr<SequentialFile> file;
       status = env_->NewSequentialFile(fname, &file,
                                        env_->OptimizeForLogRead(env_options_));
       if (!status.ok()) {
@@ -701,7 +723,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
                 " mode %d log filter %s returned "
                 "more records (%d) than original (%d) which is not allowed. "
                 "Aborting recovery.",
-                log_number, immutable_db_options_.wal_recovery_mode,
+                log_number,
+                static_cast<int>(immutable_db_options_.wal_recovery_mode),
                 immutable_db_options_.wal_filter->Name(), new_count,
                 original_count);
             status = Status::NotSupported(
@@ -878,8 +901,8 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
       // VersionSet::next_file_number_ always to be strictly greater than any
       // log number
       versions_->MarkFileNumberUsed(max_log_number + 1);
-      status = versions_->LogAndApply(
-          cfd, *cfd->GetLatestMutableCFOptions(), edit, &mutex_);
+      status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
+                                      edit, &mutex_);
       if (!status.ok()) {
         // Recovery failed
         break;
@@ -992,14 +1015,21 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
       if (use_custom_gc_ && snapshot_checker == nullptr) {
         snapshot_checker = DisableGCSnapshotChecker::Instance();
       }
+      std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
+          range_del_iters;
+      auto range_del_iter =
+          mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
+      if (range_del_iter != nullptr) {
+        range_del_iters.emplace_back(range_del_iter);
+      }
       s = BuildTable(
           dbname_, env_, *cfd->ioptions(), mutable_cf_options,
           env_options_for_compaction_, cfd->table_cache(), iter.get(),
-          std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
-          &meta, cfd->internal_comparator(),
+          std::move(range_del_iters), &meta, cfd->internal_comparator(),
           cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
           snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
           GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
+          mutable_cf_options.sample_for_compression,
           cfd->ioptions()->compression_opts, paranoid_file_checks,
           cfd->internal_stats(), TableFileCreationReason::kRecovery,
           &event_logger_, job_id, Env::IO_HIGH, nullptr /* table_properties */,
@@ -1029,9 +1059,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
   stats.micros = env_->NowMicros() - start_micros;
   stats.bytes_written = meta.fd.GetFileSize();
   stats.num_output_files = 1;
-  cfd->internal_stats()->AddCompactionStats(level, stats);
-  cfd->internal_stats()->AddCFStats(
-      InternalStats::BYTES_FLUSHED, meta.fd.GetFileSize());
+  cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
+  cfd->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
+                                    meta.fd.GetFileSize());
   RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
   return s;
 }
@@ -1127,7 +1157,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
   s = impl->Recover(column_families);
   if (s.ok()) {
     uint64_t new_log_number = impl->versions_->NewFileNumber();
-    unique_ptr<WritableFile> lfile;
+    std::unique_ptr<WritableFile> lfile;
     EnvOptions soptions(db_options);
     EnvOptions opt_env_options =
         impl->immutable_db_options_.env->OptimizeForLogWrite(
@@ -1144,8 +1174,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
       {
         InstrumentedMutexLock wl(&impl->log_write_mutex_);
         impl->logfile_number_ = new_log_number;
-        unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
-            std::move(lfile), log_fname, opt_env_options));
+        const auto& listeners = impl->immutable_db_options_.listeners;
+        std::unique_ptr<WritableFileWriter> file_writer(
+            new WritableFileWriter(std::move(lfile), log_fname, opt_env_options,
+                                   impl->env_, nullptr /* stats */, listeners));
         impl->logs_.emplace_back(
             new_log_number,
             new log::Writer(
@@ -1222,7 +1254,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
           !cfd->mem()->IsMergeOperatorSupported()) {
         s = Status::InvalidArgument(
             "The memtable of column family %s does not support merge operator "
-            "its options.merge_operator is non-null", cfd->GetName().c_str());
+            "its options.merge_operator is non-null",
+            cfd->GetName().c_str());
       }
       if (!s.ok()) {
         break;
@@ -1284,7 +1317,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
 #endif  // !ROCKSDB_LITE
 
   if (s.ok()) {
-    ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl);
+    ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
+                     impl);
     LogFlush(impl->immutable_db_options_.info_log);
     assert(impl->TEST_WALBufferIsEmpty());
     // If the assert above fails then we need to FlushWAL before returning
@@ -1295,6 +1329,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
           persist_options_status.ToString());
     }
   }
+  if (s.ok()) {
+    impl->StartTimedTasks();
+  }
   if (!s.ok()) {
     for (auto* h : *handles) {
       delete h;