]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/flush_job.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / flush_job.cc
index 5e4e210a25ff2223b47959c8e7cd77c2d31f995e..b64712fd1f06ac7ab417b695fbe9d01debde121b 100644 (file)
@@ -1,7 +1,7 @@
 //  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
-//  This source code is licensed under the BSD-style license found in the
-//  LICENSE file in the root directory of this source tree. An additional grant
-//  of patent rights can be found in the PATENTS file in the same directory.
+//  This source code is licensed under both the GPLv2 (found in the
+//  COPYING file in the root directory) and Apache 2.0 License
+//  (found in the LICENSE.Apache file in the root directory).
 //
 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
 // Use of this source code is governed by a BSD-style license that can be
@@ -30,7 +30,6 @@
 #include "monitoring/iostats_context_imp.h"
 #include "monitoring/perf_context_imp.h"
 #include "monitoring/thread_status_util.h"
-#include "port/likely.h"
 #include "port/port.h"
 #include "db/memtable.h"
 #include "rocksdb/db.h"
 
 namespace rocksdb {
 
+const char* GetFlushReasonString (FlushReason flush_reason) {
+  switch (flush_reason) {
+    case FlushReason::kOthers:
+      return "Other Reasons";
+    case FlushReason::kGetLiveFiles:
+      return "Get Live Files";
+    case FlushReason::kShutDown:
+      return "Shut down";
+    case FlushReason::kExternalFileIngestion:
+      return "External File Ingestion";
+    case FlushReason::kManualCompaction:
+      return "Manual Compaction";
+    case FlushReason::kWriteBufferManager:
+      return "Write Buffer Manager";
+    case FlushReason::kWriteBufferFull:
+      return "Write Buffer Full";
+    case FlushReason::kTest:
+      return "Test";
+    case FlushReason::kDeleteFiles:
+      return "Delete Files";
+    case FlushReason::kAutoCompaction:
+      return "Auto Compaction";
+    case FlushReason::kManualFlush:
+      return "Manual Flush";
+    case FlushReason::kErrorRecovery:
+      return "Error Recovery";
+    default:
+      return "Invalid";
+  }
+}
+
+
 FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
                    const ImmutableDBOptions& db_options,
                    const MutableCFOptions& mutable_cf_options,
-                   const EnvOptions& env_options, VersionSet* versions,
+                   const EnvOptions env_options, VersionSet* versions,
                    InstrumentedMutex* db_mutex,
                    std::atomic<bool>* shutting_down,
                    std::vector<SequenceNumber> existing_snapshots,
                    SequenceNumber earliest_write_conflict_snapshot,
-                   JobContext* job_context, LogBuffer* log_buffer,
-                   Directory* db_directory, Directory* output_file_directory,
+                   SnapshotChecker* snapshot_checker, JobContext* job_context,
+                   LogBuffer* log_buffer, Directory* db_directory,
+                   Directory* output_file_directory,
                    CompressionType output_compression, Statistics* stats,
                    EventLogger* event_logger, bool measure_io_stats)
     : dbname_(dbname),
@@ -77,6 +109,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
       shutting_down_(shutting_down),
       existing_snapshots_(std::move(existing_snapshots)),
       earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
+      snapshot_checker_(snapshot_checker),
       job_context_(job_context),
       log_buffer_(log_buffer),
       db_directory_(db_directory),
@@ -85,6 +118,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
       stats_(stats),
       event_logger_(event_logger),
       measure_io_stats_(measure_io_stats),
+      edit_(nullptr),
+      base_(nullptr),
       pick_memtable_called(false) {
   // Update the thread status to indicate flush.
   ReportStartedFlush();
@@ -152,7 +187,9 @@ void FlushJob::PickMemTable() {
   base_->Ref();  // it is likely that we do not need this reference
 }
 
-Status FlushJob::Run(FileMetaData* file_meta) {
+Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
+                     FileMetaData* file_meta) {
+  TEST_SYNC_POINT("FlushJob::Start");
   db_mutex_->AssertHeld();
   assert(pick_memtable_called);
   AutoThreadOperationStageUpdater stage_run(
@@ -193,7 +230,7 @@ Status FlushJob::Run(FileMetaData* file_meta) {
     TEST_SYNC_POINT("FlushJob::InstallResults");
     // Replace immutable memtable with the generated Table
     s = cfd_->imm()->InstallMemtableFlushResults(
-        cfd_, mutable_cf_options_, mems_, versions_, db_mutex_,
+        cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
         meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
         log_buffer_);
   }
@@ -206,6 +243,8 @@ Status FlushJob::Run(FileMetaData* file_meta) {
   auto stream = event_logger_->LogToBuffer(log_buffer_);
   stream << "job" << job_context_->job_id << "event"
          << "flush_finished";
+  stream << "output_compression"
+         << CompressionTypeToString(output_compression_);
   stream << "lsm_state";
   stream.StartArray();
   auto vstorage = cfd_->current()->storage_info();
@@ -243,6 +282,7 @@ Status FlushJob::WriteLevel0Table() {
   const uint64_t start_micros = db_options_.env->NowMicros();
   Status s;
   {
+    auto write_hint = cfd_->CalculateSSTWriteHint(0);
     db_mutex_->Unlock();
     if (log_buffer_) {
       log_buffer_->FlushBufferToLog();
@@ -272,12 +312,13 @@ Status FlushJob::WriteLevel0Table() {
       total_memory_usage += m->ApproximateMemoryUsage();
     }
 
-    event_logger_->Log() << "job" << job_context_->job_id << "event"
-                         << "flush_started"
-                         << "num_memtables" << mems_.size() << "num_entries"
-                         << total_num_entries << "num_deletes"
-                         << total_num_deletes << "memory_usage"
-                         << total_memory_usage;
+    event_logger_->Log()
+        << "job" << job_context_->job_id << "event"
+        << "flush_started"
+        << "num_memtables" << mems_.size() << "num_entries" << total_num_entries
+        << "num_deletes" << total_num_deletes << "memory_usage"
+        << total_memory_usage << "flush_reason"
+        << GetFlushReasonString(cfd_->GetFlushReason());
 
     {
       ScopedArenaIterator iter(
@@ -294,19 +335,33 @@ Status FlushJob::WriteLevel0Table() {
 
       TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
                                &output_compression_);
-      EnvOptions optimized_env_options =
-          db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_);
+      int64_t _current_time = 0;
+      auto status = db_options_.env->GetCurrentTime(&_current_time);
+      // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
+      if (!status.ok()) {
+        ROCKS_LOG_WARN(
+            db_options_.info_log,
+            "Failed to get current time to populate creation_time property. "
+            "Status: %s",
+            status.ToString().c_str());
+      }
+      const uint64_t current_time = static_cast<uint64_t>(_current_time);
+
+      uint64_t oldest_key_time =
+          mems_.front()->ApproximateOldestKeyTime();
+
       s = BuildTable(
           dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
-          optimized_env_options, cfd_->table_cache(), iter.get(),
+          env_options_, cfd_->table_cache(), iter.get(),
           std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
           cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
           cfd_->GetName(), existing_snapshots_,
-          earliest_write_conflict_snapshot_, output_compression_,
-          cfd_->ioptions()->compression_opts,
+          earliest_write_conflict_snapshot_, snapshot_checker_,
+          output_compression_, cfd_->ioptions()->compression_opts,
           mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
           TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
-          Env::IO_HIGH, &table_properties_, 0 /* level */);
+          Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
+          oldest_key_time, write_hint);
       LogFlush(db_options_.info_log);
     }
     ROCKS_LOG_INFO(db_options_.info_log,
@@ -318,8 +373,8 @@ Status FlushJob::WriteLevel0Table() {
                    s.ToString().c_str(),
                    meta_.marked_for_compaction ? " (needs compaction)" : "");
 
-    if (output_file_directory_ != nullptr) {
-      output_file_directory_->Fsync();
+    if (s.ok() && output_file_directory_ != nullptr) {
+      s = output_file_directory_->Fsync();
     }
     TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
     db_mutex_->Lock();
@@ -336,14 +391,15 @@ Status FlushJob::WriteLevel0Table() {
     // Add file to L0
     edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
                    meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
-                   meta_.smallest_seqno, meta_.largest_seqno,
+                   meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
                    meta_.marked_for_compaction);
   }
 
   // Note that here we treat flush as level 0 compaction in internal stats
-  InternalStats::CompactionStats stats(1);
+  InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
   stats.micros = db_options_.env->NowMicros() - start_micros;
   stats.bytes_written = meta_.fd.GetFileSize();
+  MeasureTime(stats_, FLUSH_TIME, stats.micros);
   cfd_->internal_stats()->AddCompactionStats(0 /* level */, stats);
   cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_FLUSHED,
                                      meta_.fd.GetFileSize());