]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/flush_job.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / flush_job.cc
index f03188141a506cb17880dcdb5df8cbf3d6c30c38..997bd80807c54d54da5325f1aebdf2747f850994 100644 (file)
@@ -9,11 +9,7 @@
 
 #include "db/flush_job.h"
 
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
+#include <cinttypes>
 
 #include <algorithm>
 #include <vector>
 #include "db/merge_context.h"
 #include "db/range_tombstone_fragmenter.h"
 #include "db/version_set.h"
+#include "file/file_util.h"
+#include "file/filename.h"
+#include "logging/event_logger.h"
+#include "logging/log_buffer.h"
+#include "logging/logging.h"
 #include "monitoring/iostats_context_imp.h"
 #include "monitoring/perf_context_imp.h"
 #include "monitoring/thread_status_util.h"
 #include "rocksdb/statistics.h"
 #include "rocksdb/status.h"
 #include "rocksdb/table.h"
-#include "table/block.h"
-#include "table/block_based_table_factory.h"
+#include "table/block_based/block.h"
+#include "table/block_based/block_based_table_factory.h"
 #include "table/merging_iterator.h"
 #include "table/table_builder.h"
 #include "table/two_level_iterator.h"
+#include "test_util/sync_point.h"
 #include "util/coding.h"
-#include "util/event_logger.h"
-#include "util/file_util.h"
-#include "util/filename.h"
-#include "util/log_buffer.h"
-#include "util/logging.h"
 #include "util/mutexlock.h"
 #include "util/stop_watch.h"
-#include "util/sync_point.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 const char* GetFlushReasonString (FlushReason flush_reason) {
   switch (flush_reason) {
@@ -90,7 +86,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
                    const ImmutableDBOptions& db_options,
                    const MutableCFOptions& mutable_cf_options,
                    const uint64_t* max_memtable_id,
-                   const EnvOptions& env_options, VersionSet* versions,
+                   const FileOptions& file_options, VersionSet* versions,
                    InstrumentedMutex* db_mutex,
                    std::atomic<bool>* shutting_down,
                    std::vector<SequenceNumber> existing_snapshots,
@@ -107,7 +103,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
       db_options_(db_options),
       mutable_cf_options_(mutable_cf_options),
       max_memtable_id_(max_memtable_id),
-      env_options_(env_options),
+      file_options_(file_options),
       versions_(versions),
       db_mutex_(db_mutex),
       shutting_down_(shutting_down),
@@ -229,10 +225,12 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
   // This will release and re-acquire the mutex.
   Status s = WriteLevel0Table();
 
-  if (s.ok() &&
-      (shutting_down_->load(std::memory_order_acquire) || cfd_->IsDropped())) {
-    s = Status::ShutdownInProgress(
-        "Database shutdown or Column family drop during flush");
+  if (s.ok() && cfd_->IsDropped()) {
+    s = Status::ColumnFamilyDropped("Column family dropped during compaction");
+  }
+  if ((s.ok() || s.IsColumnFamilyDropped()) &&
+      shutting_down_->load(std::memory_order_acquire)) {
+    s = Status::ShutdownInProgress("Database shutdown");
   }
 
   if (!s.ok()) {
@@ -243,7 +241,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
     s = cfd_->imm()->TryInstallMemtableFlushResults(
         cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
         meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
-        log_buffer_);
+        log_buffer_, &committed_flush_jobs_info_);
   }
 
   if (s.ok() && file_meta != nullptr) {
@@ -251,7 +249,8 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
   }
   RecordFlushIOStats();
 
-  auto stream = event_logger_->LogToBuffer(log_buffer_);
+  // When measure_io_stats_ is true, the default 512 bytes is not enough.
+  auto stream = event_logger_->LogToBuffer(log_buffer_, 1024);
   stream << "job" << job_context_->job_id << "event"
          << "flush_finished";
   stream << "output_compression"
@@ -367,9 +366,19 @@ Status FlushJob::WriteLevel0Table() {
       uint64_t oldest_key_time =
           mems_.front()->ApproximateOldestKeyTime();
 
+      // It's not clear whether oldest_key_time is always available. In case
+      // it is not available, use current_time.
+      meta_.oldest_ancester_time = std::min(current_time, oldest_key_time);
+      meta_.file_creation_time = current_time;
+
+      uint64_t creation_time = (cfd_->ioptions()->compaction_style ==
+                                CompactionStyle::kCompactionStyleFIFO)
+                                   ? current_time
+                                   : meta_.oldest_ancester_time;
+
       s = BuildTable(
-          dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
-          env_options_, cfd_->table_cache(), iter.get(),
+          dbname_, db_options_.env, db_options_.fs.get(), *cfd_->ioptions(),
+          mutable_cf_options_, file_options_, cfd_->table_cache(), iter.get(),
           std::move(range_del_iters), &meta_, cfd_->internal_comparator(),
           cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
           cfd_->GetName(), existing_snapshots_,
@@ -378,8 +387,8 @@ Status FlushJob::WriteLevel0Table() {
           cfd_->ioptions()->compression_opts,
           mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
           TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
-          Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
-          oldest_key_time, write_hint);
+          Env::IO_HIGH, &table_properties_, 0 /* level */,
+          creation_time, oldest_key_time, write_hint, current_time);
       LogFlush(db_options_.info_log);
     }
     ROCKS_LOG_INFO(db_options_.info_log,
@@ -394,7 +403,7 @@ Status FlushJob::WriteLevel0Table() {
     if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
       s = output_file_directory_->Fsync();
     }
-    TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
+    TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
     db_mutex_->Lock();
   }
   base_->Unref();
@@ -410,8 +419,14 @@ Status FlushJob::WriteLevel0Table() {
     edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
                    meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
                    meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
-                   meta_.marked_for_compaction);
+                   meta_.marked_for_compaction, meta_.oldest_blob_file_number,
+                   meta_.oldest_ancester_time, meta_.file_creation_time,
+                   meta_.file_checksum, meta_.file_checksum_func_name);
   }
+#ifndef ROCKSDB_LITE
+  // Piggyback FlushJobInfo on the first first flushed memtable.
+  mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
+#endif  // !ROCKSDB_LITE
 
   // Note that here we treat flush as level 0 compaction in internal stats
   InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
@@ -426,4 +441,26 @@ Status FlushJob::WriteLevel0Table() {
   return s;
 }
 
-}  // namespace rocksdb
+#ifndef ROCKSDB_LITE
+std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
+  db_mutex_->AssertHeld();
+  std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{});
+  info->cf_id = cfd_->GetID();
+  info->cf_name = cfd_->GetName();
+
+  const uint64_t file_number = meta_.fd.GetNumber();
+  info->file_path =
+      MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, file_number);
+  info->file_number = file_number;
+  info->oldest_blob_file_number = meta_.oldest_blob_file_number;
+  info->thread_id = db_options_.env->GetThreadID();
+  info->job_id = job_context_->job_id;
+  info->smallest_seqno = meta_.fd.smallest_seqno;
+  info->largest_seqno = meta_.fd.largest_seqno;
+  info->table_properties = table_properties_;
+  info->flush_reason = cfd_->GetFlushReason();
+  return info;
+}
+#endif  // !ROCKSDB_LITE
+
+}  // namespace ROCKSDB_NAMESPACE