]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/external_sst_file_ingestion_job.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / external_sst_file_ingestion_job.cc
index 28b481678ab4a7337534de9e2bf85d634ea7e8d1..4cec5d3767c70b8ba07a162fbe5bd63329f90617 100644 (file)
@@ -7,26 +7,24 @@
 
 #include "db/external_sst_file_ingestion_job.h"
 
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
 #include <algorithm>
+#include <cinttypes>
 #include <string>
+#include <unordered_set>
 #include <vector>
 
+#include "db/db_impl/db_impl.h"
 #include "db/version_edit.h"
+#include "file/file_util.h"
+#include "file/random_access_file_reader.h"
 #include "table/merging_iterator.h"
 #include "table/scoped_arena_iterator.h"
 #include "table/sst_file_writer_collectors.h"
 #include "table/table_builder.h"
-#include "util/file_reader_writer.h"
-#include "util/file_util.h"
+#include "test_util/sync_point.h"
 #include "util/stop_watch.h"
-#include "util/sync_point.h"
 
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
 
 Status ExternalSstFileIngestionJob::Prepare(
     const std::vector<std::string>& external_files_paths,
@@ -66,65 +64,108 @@ Status ExternalSstFileIngestionJob::Prepare(
     std::sort(
         sorted_files.begin(), sorted_files.end(),
         [&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {
-          return ucmp->Compare(info1->smallest_user_key,
-                               info2->smallest_user_key) < 0;
+          return sstableKeyCompare(ucmp, info1->smallest_internal_key,
+                                   info2->smallest_internal_key) < 0;
         });
 
     for (size_t i = 0; i < num_files - 1; i++) {
-      if (ucmp->Compare(sorted_files[i]->largest_user_key,
-                        sorted_files[i + 1]->smallest_user_key) >= 0) {
-        return Status::NotSupported("Files have overlapping ranges");
+      if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,
+                            sorted_files[i + 1]->smallest_internal_key) >= 0) {
+        files_overlap_ = true;
+        break;
       }
     }
   }
 
+  if (ingestion_options_.ingest_behind && files_overlap_) {
+    return Status::NotSupported("Files have overlapping ranges");
+  }
+
   for (IngestedFileInfo& f : files_to_ingest_) {
     if (f.num_entries == 0 && f.num_range_deletions == 0) {
       return Status::InvalidArgument("File contain no entries");
     }
 
-    if (!f.smallest_internal_key().Valid() ||
-        !f.largest_internal_key().Valid()) {
+    if (!f.smallest_internal_key.Valid() || !f.largest_internal_key.Valid()) {
       return Status::Corruption("Generated table have corrupted keys");
     }
   }
 
   // Copy/Move external files into DB
+  std::unordered_set<size_t> ingestion_path_ids;
   for (IngestedFileInfo& f : files_to_ingest_) {
     f.fd = FileDescriptor(next_file_number++, 0, f.file_size);
-
+    f.copy_file = false;
     const std::string path_outside_db = f.external_file_path;
     const std::string path_inside_db =
         TableFileName(cfd_->ioptions()->cf_paths, f.fd.GetNumber(),
                       f.fd.GetPathId());
-
     if (ingestion_options_.move_files) {
-      status = env_->LinkFile(path_outside_db, path_inside_db);
-      if (status.IsNotSupported()) {
-        // Original file is on a different FS, use copy instead of hard linking
-        status = CopyFile(env_, path_outside_db, path_inside_db, 0,
-                          db_options_.use_fsync);
+      status =
+          fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr);
+      if (status.ok()) {
+        // It is unsafe to assume application had sync the file and file
+        // directory before ingest the file. For integrity of RocksDB we need
+        // to sync the file.
+        std::unique_ptr<FSWritableFile> file_to_sync;
+        status = fs_->ReopenWritableFile(path_inside_db, env_options_,
+                                         &file_to_sync, nullptr);
+        if (status.ok()) {
+          TEST_SYNC_POINT(
+              "ExternalSstFileIngestionJob::BeforeSyncIngestedFile");
+          status = SyncIngestedFile(file_to_sync.get());
+          TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncIngestedFile");
+          if (!status.ok()) {
+            ROCKS_LOG_WARN(db_options_.info_log,
+                           "Failed to sync ingested file %s: %s",
+                           path_inside_db.c_str(), status.ToString().c_str());
+          }
+        }
+      } else if (status.IsNotSupported() &&
+                 ingestion_options_.failed_move_fall_back_to_copy) {
+        // Original file is on a different FS, use copy instead of hard linking.
         f.copy_file = true;
-      } else {
-        f.copy_file = false;
       }
     } else {
-      status = CopyFile(env_, path_outside_db, path_inside_db, 0,
-                        db_options_.use_fsync);
       f.copy_file = true;
     }
+
+    if (f.copy_file) {
+      TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
+                               nullptr);
+      // CopyFile also sync the new file.
+      status = CopyFile(fs_, path_outside_db, path_inside_db, 0,
+                        db_options_.use_fsync);
+    }
     TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
     if (!status.ok()) {
       break;
     }
     f.internal_file_path = path_inside_db;
+    ingestion_path_ids.insert(f.fd.GetPathId());
+  }
+
+  TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir");
+  if (status.ok()) {
+    for (auto path_id : ingestion_path_ids) {
+      status = directories_->GetDataDir(path_id)->Fsync();
+      if (!status.ok()) {
+        ROCKS_LOG_WARN(db_options_.info_log,
+                       "Failed to sync directory %" ROCKSDB_PRIszt
+                       " while ingest file: %s",
+                       path_id, status.ToString().c_str());
+        break;
+      }
+    }
   }
+  TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir");
 
+  // TODO: The following is duplicated with Cleanup().
   if (!status.ok()) {
     // We failed, remove all files that we copied into the db
     for (IngestedFileInfo& f : files_to_ingest_) {
       if (f.internal_file_path.empty()) {
-        break;
+        continue;
       }
       Status s = env_->DeleteFile(f.internal_file_path);
       if (!s.ok()) {
@@ -142,8 +183,8 @@ Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed,
                                                SuperVersion* super_version) {
   autovector<Range> ranges;
   for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) {
-    ranges.emplace_back(file_to_ingest.smallest_user_key,
-                        file_to_ingest.largest_user_key);
+    ranges.emplace_back(file_to_ingest.smallest_internal_key.user_key(),
+                        file_to_ingest.largest_internal_key.user_key());
   }
   Status status =
       cfd_->RangesOverlapWithMemtables(ranges, super_version, flush_needed);
@@ -176,7 +217,7 @@ Status ExternalSstFileIngestionJob::Run() {
   }
   // It is safe to use this instead of LastAllocatedSequence since we are
   // the only active writer, and hence they are equal
-  const SequenceNumber last_seqno = versions_->LastSequence();
+  SequenceNumber last_seqno = versions_->LastSequence();
   edit_.SetColumnFamily(cfd_->GetID());
   // The levels that the files will be ingested into
 
@@ -186,8 +227,8 @@ Status ExternalSstFileIngestionJob::Run() {
       status = CheckLevelForIngestedBehindFile(&f);
     } else {
       status = AssignLevelAndSeqnoForIngestedFile(
-         super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
-         &f, &assigned_seqno);
+          super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
+          last_seqno, &f, &assigned_seqno);
     }
     if (!status.ok()) {
       return status;
@@ -195,16 +236,30 @@ Status ExternalSstFileIngestionJob::Run() {
     status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
     TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
                              &assigned_seqno);
-    if (assigned_seqno == last_seqno + 1) {
-      consumed_seqno_ = true;
+    if (assigned_seqno > last_seqno) {
+      assert(assigned_seqno == last_seqno + 1);
+      last_seqno = assigned_seqno;
+      ++consumed_seqno_count_;
     }
     if (!status.ok()) {
       return status;
     }
-    edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
-                  f.fd.GetFileSize(), f.smallest_internal_key(),
-                  f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno,
-                  false);
+
+    // We use the import time as the ancester time. This is the time the data
+    // is written to the database.
+    int64_t temp_current_time = 0;
+    uint64_t current_time = kUnknownFileCreationTime;
+    uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
+    if (env_->GetCurrentTime(&temp_current_time).ok()) {
+      current_time = oldest_ancester_time =
+          static_cast<uint64_t>(temp_current_time);
+    }
+
+    edit_.AddFile(
+        f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(), f.fd.GetFileSize(),
+        f.smallest_internal_key, f.largest_internal_key, f.assigned_seqno,
+        f.assigned_seqno, false, kInvalidBlobFileNumber, oldest_ancester_time,
+        current_time, kUnknownFileChecksum, kUnknownFileChecksumFuncName);
   }
   return status;
 }
@@ -214,6 +269,13 @@ void ExternalSstFileIngestionJob::UpdateStats() {
   uint64_t total_keys = 0;
   uint64_t total_l0_files = 0;
   uint64_t total_time = env_->NowMicros() - job_start_time_;
+
+  EventLoggerStream stream = event_logger_->Log();
+  stream << "event"
+         << "ingest_finished";
+  stream << "files_ingested";
+  stream.StartArray();
+
   for (IngestedFileInfo& f : files_to_ingest_) {
     InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1);
     stats.micros = total_time;
@@ -241,7 +303,18 @@ void ExternalSstFileIngestionJob::UpdateStats() {
         "(global_seqno=%" PRIu64 ")\n",
         f.external_file_path.c_str(), f.picked_level,
         f.internal_file_path.c_str(), f.assigned_seqno);
+    stream << "file" << f.internal_file_path << "level" << f.picked_level;
+  }
+  stream.EndArray();
+
+  stream << "lsm_state";
+  stream.StartArray();
+  auto vstorage = cfd_->current()->storage_info();
+  for (int level = 0; level < vstorage->num_levels(); ++level) {
+    stream << vstorage->NumLevelFiles(level);
   }
+  stream.EndArray();
+
   cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL,
                                      total_keys);
   cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL,
@@ -255,6 +328,9 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
     // We failed to add the files to the database
     // remove all the files we copied
     for (IngestedFileInfo& f : files_to_ingest_) {
+      if (f.internal_file_path.empty()) {
+        continue;
+      }
       Status s = env_->DeleteFile(f.internal_file_path);
       if (!s.ok()) {
         ROCKS_LOG_WARN(db_options_.info_log,
@@ -262,7 +338,8 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
                        f.internal_file_path.c_str(), s.ToString().c_str());
       }
     }
-    consumed_seqno_ = false;
+    consumed_seqno_count_ = 0;
+    files_overlap_ = false;
   } else if (status.ok() && ingestion_options_.move_files) {
     // The files were moved and added successfully, remove original file links
     for (IngestedFileInfo& f : files_to_ingest_) {
@@ -284,17 +361,19 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
   file_to_ingest->external_file_path = external_file;
 
   // Get external file size
-  Status status = env_->GetFileSize(external_file, &file_to_ingest->file_size);
+  Status status = fs_->GetFileSize(external_file, IOOptions(),
+                                   &file_to_ingest->file_size, nullptr);
   if (!status.ok()) {
     return status;
   }
 
   // Create TableReader for external file
   std::unique_ptr<TableReader> table_reader;
-  std::unique_ptr<RandomAccessFile> sst_file;
+  std::unique_ptr<FSRandomAccessFile> sst_file;
   std::unique_ptr<RandomAccessFileReader> sst_file_reader;
 
-  status = env_->NewRandomAccessFile(external_file, &sst_file, env_options_);
+  status = fs_->NewRandomAccessFile(external_file, env_options_,
+                                    &sst_file, nullptr);
   if (!status.ok()) {
     return status;
   }
@@ -311,7 +390,13 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
   }
 
   if (ingestion_options_.verify_checksums_before_ingest) {
-    status = table_reader->VerifyChecksum();
+    // If customized readahead size is needed, we can pass a user option
+    // all the way to here. Right now we just rely on the default readahead
+    // to keep things simple.
+    ReadOptions ro;
+    ro.readahead_size = ingestion_options_.verify_checksums_readahead_size;
+    status = table_reader->VerifyChecksum(
+        ro, TableReaderCaller::kExternalSSTIngestion);
   }
   if (!status.ok()) {
     return status;
@@ -371,11 +456,16 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
   // updating the block cache.
   ro.fill_cache = false;
   std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
-      ro, sv->mutable_cf_options.prefix_extractor.get()));
+      ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
+      /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
   std::unique_ptr<InternalIterator> range_del_iter(
       table_reader->NewRangeTombstoneIterator(ro));
 
   // Get first (smallest) and last (largest) key from file.
+  file_to_ingest->smallest_internal_key =
+      InternalKey("", 0, ValueType::kTypeValue);
+  file_to_ingest->largest_internal_key =
+      InternalKey("", 0, ValueType::kTypeValue);
   bool bounds_set = false;
   iter->SeekToFirst();
   if (iter->Valid()) {
@@ -385,7 +475,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
     if (key.sequence != 0) {
       return Status::Corruption("external file have non zero sequence number");
     }
-    file_to_ingest->smallest_user_key = key.user_key.ToString();
+    file_to_ingest->smallest_internal_key.SetFrom(key);
 
     iter->SeekToLast();
     if (!ParseInternalKey(iter->key(), &key)) {
@@ -394,7 +484,7 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
     if (key.sequence != 0) {
       return Status::Corruption("external file have non zero sequence number");
     }
-    file_to_ingest->largest_user_key = key.user_key.ToString();
+    file_to_ingest->largest_internal_key.SetFrom(key);
 
     bounds_set = true;
   }
@@ -410,13 +500,17 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
       }
       RangeTombstone tombstone(key, range_del_iter->value());
 
-      if (!bounds_set || ucmp->Compare(tombstone.start_key_,
-                                       file_to_ingest->smallest_user_key) < 0) {
-        file_to_ingest->smallest_user_key = tombstone.start_key_.ToString();
+      InternalKey start_key = tombstone.SerializeKey();
+      if (!bounds_set ||
+          sstableKeyCompare(ucmp, start_key,
+                            file_to_ingest->smallest_internal_key) < 0) {
+        file_to_ingest->smallest_internal_key = start_key;
       }
-      if (!bounds_set || ucmp->Compare(tombstone.end_key_,
-                                       file_to_ingest->largest_user_key) > 0) {
-        file_to_ingest->largest_user_key = tombstone.end_key_.ToString();
+      InternalKey end_key = tombstone.SerializeEndKey();
+      if (!bounds_set ||
+          sstableKeyCompare(ucmp, end_key,
+                            file_to_ingest->largest_internal_key) > 0) {
+        file_to_ingest->largest_internal_key = end_key;
       }
       bounds_set = true;
     }
@@ -431,13 +525,13 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
 
 Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
     SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
-    IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) {
+    SequenceNumber last_seqno, IngestedFileInfo* file_to_ingest,
+    SequenceNumber* assigned_seqno) {
   Status status;
   *assigned_seqno = 0;
-  const SequenceNumber last_seqno = versions_->LastSequence();
   if (force_global_seqno) {
     *assigned_seqno = last_seqno + 1;
-    if (compaction_style == kCompactionStyleUniversal) {
+    if (compaction_style == kCompactionStyleUniversal || files_overlap_) {
       file_to_ingest->picked_level = 0;
       return status;
     }
@@ -457,9 +551,10 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
 
     if (vstorage->NumLevelFiles(lvl) > 0) {
       bool overlap_with_level = false;
-      status = sv->current->OverlapWithLevelIterator(ro, env_options_,
-          file_to_ingest->smallest_user_key, file_to_ingest->largest_user_key,
-          lvl, &overlap_with_level);
+      status = sv->current->OverlapWithLevelIterator(
+          ro, env_options_, file_to_ingest->smallest_internal_key.user_key(),
+          file_to_ingest->largest_internal_key.user_key(), lvl,
+          &overlap_with_level);
       if (!status.ok()) {
         return status;
       }
@@ -499,6 +594,12 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
       target_level = lvl;
     }
   }
+  // If files overlap, we have to ingest them at level 0 and assign the newest
+  // sequence number
+  if (files_overlap_) {
+    target_level = 0;
+    *assigned_seqno = last_seqno + 1;
+  }
  TEST_SYNC_POINT_CALLBACK(
       "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
       &overlap_with_db);
@@ -553,13 +654,27 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
     // Determine if we can write global_seqno to a given offset of file.
     // If the file system does not support random write, then we should not.
     // Otherwise we should.
-    std::unique_ptr<RandomRWFile> rwfile;
-    Status status = env_->NewRandomRWFile(file_to_ingest->internal_file_path,
-                                          &rwfile, env_options_);
+    std::unique_ptr<FSRandomRWFile> rwfile;
+    Status status =
+        fs_->NewRandomRWFile(file_to_ingest->internal_file_path, env_options_,
+                             &rwfile, nullptr);
     if (status.ok()) {
       std::string seqno_val;
       PutFixed64(&seqno_val, seqno);
-      status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val);
+      status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val,
+                             IOOptions(), nullptr);
+      if (status.ok()) {
+        TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno");
+        status = SyncIngestedFile(rwfile.get());
+        TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncGlobalSeqno");
+        if (!status.ok()) {
+          ROCKS_LOG_WARN(db_options_.info_log,
+                         "Failed to sync ingested file %s after writing global "
+                         "sequence number: %s",
+                         file_to_ingest->internal_file_path.c_str(),
+                         status.ToString().c_str());
+        }
+      }
       if (!status.ok()) {
         return status;
       }
@@ -580,8 +695,9 @@ bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
   }
 
   auto* vstorage = cfd_->current()->storage_info();
-  Slice file_smallest_user_key(file_to_ingest->smallest_user_key);
-  Slice file_largest_user_key(file_to_ingest->largest_user_key);
+  Slice file_smallest_user_key(
+      file_to_ingest->smallest_internal_key.user_key());
+  Slice file_largest_user_key(file_to_ingest->largest_internal_key.user_key());
 
   if (vstorage->OverlapInLevel(level, &file_smallest_user_key,
                                &file_largest_user_key)) {
@@ -600,6 +716,16 @@ bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
   return true;
 }
 
-}  // namespace rocksdb
+template <typename TWritableFile>
+Status ExternalSstFileIngestionJob::SyncIngestedFile(TWritableFile* file) {
+  assert(file != nullptr);
+  if (db_options_.use_fsync) {
+    return file->Fsync(IOOptions(), nullptr);
+  } else {
+    return file->Sync(IOOptions(), nullptr);
+  }
+}
+
+}  // namespace ROCKSDB_NAMESPACE
 
 #endif  // !ROCKSDB_LITE