]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/flush_job.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / flush_job.cc
index b64712fd1f06ac7ab417b695fbe9d01debde121b..f03188141a506cb17880dcdb5df8cbf3d6c30c38 100644 (file)
 #include "db/event_helpers.h"
 #include "db/log_reader.h"
 #include "db/log_writer.h"
+#include "db/memtable.h"
 #include "db/memtable_list.h"
 #include "db/merge_context.h"
+#include "db/range_tombstone_fragmenter.h"
 #include "db/version_set.h"
 #include "monitoring/iostats_context_imp.h"
 #include "monitoring/perf_context_imp.h"
 #include "monitoring/thread_status_util.h"
 #include "port/port.h"
-#include "db/memtable.h"
 #include "rocksdb/db.h"
 #include "rocksdb/env.h"
 #include "rocksdb/statistics.h"
@@ -85,11 +86,11 @@ const char* GetFlushReasonString (FlushReason flush_reason) {
   }
 }
 
-
 FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
                    const ImmutableDBOptions& db_options,
                    const MutableCFOptions& mutable_cf_options,
-                   const EnvOptions env_options, VersionSet* versions,
+                   const uint64_t* max_memtable_id,
+                   const EnvOptions& env_options, VersionSet* versions,
                    InstrumentedMutex* db_mutex,
                    std::atomic<bool>* shutting_down,
                    std::vector<SequenceNumber> existing_snapshots,
@@ -98,11 +99,14 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
                    LogBuffer* log_buffer, Directory* db_directory,
                    Directory* output_file_directory,
                    CompressionType output_compression, Statistics* stats,
-                   EventLogger* event_logger, bool measure_io_stats)
+                   EventLogger* event_logger, bool measure_io_stats,
+                   const bool sync_output_directory, const bool write_manifest,
+                   Env::Priority thread_pri)
     : dbname_(dbname),
       cfd_(cfd),
       db_options_(db_options),
       mutable_cf_options_(mutable_cf_options),
+      max_memtable_id_(max_memtable_id),
       env_options_(env_options),
       versions_(versions),
       db_mutex_(db_mutex),
@@ -118,9 +122,12 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
       stats_(stats),
       event_logger_(event_logger),
       measure_io_stats_(measure_io_stats),
+      sync_output_directory_(sync_output_directory),
+      write_manifest_(write_manifest),
       edit_(nullptr),
       base_(nullptr),
-      pick_memtable_called(false) {
+      pick_memtable_called(false),
+      thread_pri_(thread_pri) {
   // Update the thread status to indicate flush.
   ReportStartedFlush();
   TEST_SYNC_POINT("FlushJob::FlushJob()");
@@ -162,7 +169,7 @@ void FlushJob::PickMemTable() {
   assert(!pick_memtable_called);
   pick_memtable_called = true;
   // Save the contents of the earliest memtable as a new Table
-  cfd_->imm()->PickMemtablesToFlush(&mems_);
+  cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_);
   if (mems_.empty()) {
     return;
   }
@@ -206,6 +213,8 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
   uint64_t prev_fsync_nanos = 0;
   uint64_t prev_range_sync_nanos = 0;
   uint64_t prev_prepare_write_nanos = 0;
+  uint64_t prev_cpu_write_nanos = 0;
+  uint64_t prev_cpu_read_nanos = 0;
   if (measure_io_stats_) {
     prev_perf_level = GetPerfLevel();
     SetPerfLevel(PerfLevel::kEnableTime);
@@ -213,6 +222,8 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
     prev_fsync_nanos = IOSTATS(fsync_nanos);
     prev_range_sync_nanos = IOSTATS(range_sync_nanos);
     prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
+    prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
+    prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
   }
 
   // This will release and re-acquire the mutex.
@@ -226,10 +237,10 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
 
   if (!s.ok()) {
     cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
-  } else {
+  } else if (write_manifest_) {
     TEST_SYNC_POINT("FlushJob::InstallResults");
     // Replace immutable memtable with the generated Table
-    s = cfd_->imm()->InstallMemtableFlushResults(
+    s = cfd_->imm()->TryInstallMemtableFlushResults(
         cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
         meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
         log_buffer_);
@@ -264,6 +275,10 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
     stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
     stream << "file_prepare_write_nanos"
            << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
+    stream << "file_cpu_write_nanos"
+           << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
+    stream << "file_cpu_read_nanos"
+           << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
   }
 
   return s;
@@ -280,6 +295,7 @@ Status FlushJob::WriteLevel0Table() {
       ThreadStatus::STAGE_FLUSH_WRITE_L0);
   db_mutex_->AssertHeld();
   const uint64_t start_micros = db_options_.env->NowMicros();
+  const uint64_t start_cpu_micros = db_options_.env->NowCPUNanos() / 1000;
   Status s;
   {
     auto write_hint = cfd_->CalculateSSTWriteHint(0);
@@ -291,11 +307,13 @@ Status FlushJob::WriteLevel0Table() {
     // memtable and its associated range deletion memtable, respectively, at
     // corresponding indexes.
     std::vector<InternalIterator*> memtables;
-    std::vector<InternalIterator*> range_del_iters;
+    std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
+        range_del_iters;
     ReadOptions ro;
     ro.total_order_seek = true;
     Arena arena;
     uint64_t total_num_entries = 0, total_num_deletes = 0;
+    uint64_t total_data_size = 0;
     size_t total_memory_usage = 0;
     for (MemTable* m : mems_) {
       ROCKS_LOG_INFO(
@@ -303,31 +321,30 @@ Status FlushJob::WriteLevel0Table() {
           "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
           cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
       memtables.push_back(m->NewIterator(ro, &arena));
-      auto* range_del_iter = m->NewRangeTombstoneIterator(ro);
+      auto* range_del_iter =
+          m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber);
       if (range_del_iter != nullptr) {
-        range_del_iters.push_back(range_del_iter);
+        range_del_iters.emplace_back(range_del_iter);
       }
       total_num_entries += m->num_entries();
       total_num_deletes += m->num_deletes();
+      total_data_size += m->get_data_size();
       total_memory_usage += m->ApproximateMemoryUsage();
     }
 
-    event_logger_->Log()
-        << "job" << job_context_->job_id << "event"
-        << "flush_started"
-        << "num_memtables" << mems_.size() << "num_entries" << total_num_entries
-        << "num_deletes" << total_num_deletes << "memory_usage"
-        << total_memory_usage << "flush_reason"
-        << GetFlushReasonString(cfd_->GetFlushReason());
+    event_logger_->Log() << "job" << job_context_->job_id << "event"
+                         << "flush_started"
+                         << "num_memtables" << mems_.size() << "num_entries"
+                         << total_num_entries << "num_deletes"
+                         << total_num_deletes << "total_data_size"
+                         << total_data_size << "memory_usage"
+                         << total_memory_usage << "flush_reason"
+                         << GetFlushReasonString(cfd_->GetFlushReason());
 
     {
       ScopedArenaIterator iter(
           NewMergingIterator(&cfd_->internal_comparator(), &memtables[0],
                              static_cast<int>(memtables.size()), &arena));
-      std::unique_ptr<InternalIterator> range_del_iter(NewMergingIterator(
-          &cfd_->internal_comparator(),
-          range_del_iters.empty() ? nullptr : &range_del_iters[0],
-          static_cast<int>(range_del_iters.size())));
       ROCKS_LOG_INFO(db_options_.info_log,
                      "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
                      cfd_->GetName().c_str(), job_context_->job_id,
@@ -353,11 +370,12 @@ Status FlushJob::WriteLevel0Table() {
       s = BuildTable(
           dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
           env_options_, cfd_->table_cache(), iter.get(),
-          std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
+          std::move(range_del_iters), &meta_, cfd_->internal_comparator(),
           cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
           cfd_->GetName(), existing_snapshots_,
           earliest_write_conflict_snapshot_, snapshot_checker_,
-          output_compression_, cfd_->ioptions()->compression_opts,
+          output_compression_, mutable_cf_options_.sample_for_compression,
+          cfd_->ioptions()->compression_opts,
           mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
           TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
           Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
@@ -373,7 +391,7 @@ Status FlushJob::WriteLevel0Table() {
                    s.ToString().c_str(),
                    meta_.marked_for_compaction ? " (needs compaction)" : "");
 
-    if (s.ok() && output_file_directory_ != nullptr) {
+    if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
       s = output_file_directory_->Fsync();
     }
     TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
@@ -398,9 +416,10 @@ Status FlushJob::WriteLevel0Table() {
   // Note that here we treat flush as level 0 compaction in internal stats
   InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
   stats.micros = db_options_.env->NowMicros() - start_micros;
+  stats.cpu_micros = db_options_.env->NowCPUNanos() / 1000 - start_cpu_micros;
   stats.bytes_written = meta_.fd.GetFileSize();
-  MeasureTime(stats_, FLUSH_TIME, stats.micros);
-  cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
+  RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
+  cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
   cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
                                      meta_.fd.GetFileSize());
   RecordFlushIOStats();