]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/db/external_sst_file_ingestion_job.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / external_sst_file_ingestion_job.cc
index 4cec5d3767c70b8ba07a162fbe5bd63329f90617..1913a211ac9e6b98b4486f86ee32a9b9834470f2 100644 (file)
@@ -28,6 +28,8 @@ namespace ROCKSDB_NAMESPACE {
 
 Status ExternalSstFileIngestionJob::Prepare(
     const std::vector<std::string>& external_files_paths,
+    const std::vector<std::string>& files_checksums,
+    const std::vector<std::string>& files_checksum_func_names,
     uint64_t next_file_number, SuperVersion* sv) {
   Status status;
 
@@ -46,7 +48,7 @@ Status ExternalSstFileIngestionJob::Prepare(
             TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
         f.cf_id != cfd_->GetID()) {
       return Status::InvalidArgument(
-          "External file column family id dont match");
+          "External file column family id don't match");
     }
   }
 
@@ -55,7 +57,7 @@ Status ExternalSstFileIngestionJob::Prepare(
   if (num_files == 0) {
     return Status::InvalidArgument("The list of files is empty");
   } else if (num_files > 1) {
-    // Verify that passed files dont have overlapping ranges
+    // Verify that passed files don't have overlapping ranges
     autovector<const IngestedFileInfo*> sorted_files;
     for (size_t i = 0; i < num_files; i++) {
       sorted_files.push_back(&files_to_ingest_[i]);
@@ -68,7 +70,7 @@ Status ExternalSstFileIngestionJob::Prepare(
                                    info2->smallest_internal_key) < 0;
         });
 
-    for (size_t i = 0; i < num_files - 1; i++) {
+    for (size_t i = 0; i + 1 < num_files; i++) {
       if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,
                             sorted_files[i + 1]->smallest_internal_key) >= 0) {
         files_overlap_ = true;
@@ -134,21 +136,24 @@ Status ExternalSstFileIngestionJob::Prepare(
       TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
                                nullptr);
       // CopyFile also sync the new file.
-      status = CopyFile(fs_, path_outside_db, path_inside_db, 0,
-                        db_options_.use_fsync);
+      status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
+                        db_options_.use_fsync, io_tracer_);
     }
     TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
     if (!status.ok()) {
       break;
     }
     f.internal_file_path = path_inside_db;
+    // Initialize the checksum information of ingested files.
+    f.file_checksum = kUnknownFileChecksum;
+    f.file_checksum_func_name = kUnknownFileChecksumFuncName;
     ingestion_path_ids.insert(f.fd.GetPathId());
   }
 
   TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir");
   if (status.ok()) {
     for (auto path_id : ingestion_path_ids) {
-      status = directories_->GetDataDir(path_id)->Fsync();
+      status = directories_->GetDataDir(path_id)->Fsync(IOOptions(), nullptr);
       if (!status.ok()) {
         ROCKS_LOG_WARN(db_options_.info_log,
                        "Failed to sync directory %" ROCKSDB_PRIszt
@@ -160,6 +165,131 @@ Status ExternalSstFileIngestionJob::Prepare(
   }
   TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir");
 
+  // Generate and check the sst file checksum. Note that, if
+  // IngestExternalFileOptions::write_global_seqno is true, we will not update
+  // the checksum information in the files_to_ingests_ here, since the file is
+  // upadted with the new global_seqno. After global_seqno is updated, DB will
+  // generate the new checksum and store it in the Manifest. In all other cases
+  // if ingestion_options_.write_global_seqno == true and
+  // verify_file_checksum is false, we only check the checksum function name.
+  if (status.ok() && db_options_.file_checksum_gen_factory != nullptr) {
+    if (ingestion_options_.verify_file_checksum == false &&
+        files_checksums.size() == files_to_ingest_.size() &&
+        files_checksum_func_names.size() == files_to_ingest_.size()) {
+      // Only when verify_file_checksum == false and the checksum for ingested
+      // files are provided, DB will use the provided checksum and does not
+      // generate the checksum for ingested files.
+      need_generate_file_checksum_ = false;
+    } else {
+      need_generate_file_checksum_ = true;
+    }
+    FileChecksumGenContext gen_context;
+    std::unique_ptr<FileChecksumGenerator> file_checksum_gen =
+        db_options_.file_checksum_gen_factory->CreateFileChecksumGenerator(
+            gen_context);
+    std::vector<std::string> generated_checksums;
+    std::vector<std::string> generated_checksum_func_names;
+    // Step 1: generate the checksum for ingested sst file.
+    if (need_generate_file_checksum_) {
+      for (size_t i = 0; i < files_to_ingest_.size(); i++) {
+        std::string generated_checksum;
+        std::string generated_checksum_func_name;
+        std::string requested_checksum_func_name;
+        IOStatus io_s = GenerateOneFileChecksum(
+            fs_.get(), files_to_ingest_[i].internal_file_path,
+            db_options_.file_checksum_gen_factory.get(),
+            requested_checksum_func_name, &generated_checksum,
+            &generated_checksum_func_name,
+            ingestion_options_.verify_checksums_readahead_size,
+            db_options_.allow_mmap_reads, io_tracer_);
+        if (!io_s.ok()) {
+          status = io_s;
+          ROCKS_LOG_WARN(db_options_.info_log,
+                         "Sst file checksum generation of file: %s failed: %s",
+                         files_to_ingest_[i].internal_file_path.c_str(),
+                         status.ToString().c_str());
+          break;
+        }
+        if (ingestion_options_.write_global_seqno == false) {
+          files_to_ingest_[i].file_checksum = generated_checksum;
+          files_to_ingest_[i].file_checksum_func_name =
+              generated_checksum_func_name;
+        }
+        generated_checksums.push_back(generated_checksum);
+        generated_checksum_func_names.push_back(generated_checksum_func_name);
+      }
+    }
+
+    // Step 2: based on the verify_file_checksum and ingested checksum
+    // information, do the verification.
+    if (status.ok()) {
+      if (files_checksums.size() == files_to_ingest_.size() &&
+          files_checksum_func_names.size() == files_to_ingest_.size()) {
+        // Verify the checksum and checksum function name.
+        if (ingestion_options_.verify_file_checksum) {
+          for (size_t i = 0; i < files_to_ingest_.size(); i++) {
+            if (files_checksum_func_names[i] !=
+                generated_checksum_func_names[i]) {
+              status = Status::InvalidArgument(
+                  "Checksum function name does not match with the checksum "
+                  "function name of this DB");
+              ROCKS_LOG_WARN(
+                  db_options_.info_log,
+                  "Sst file checksum verification of file: %s failed: %s",
+                  external_files_paths[i].c_str(), status.ToString().c_str());
+              break;
+            }
+            if (files_checksums[i] != generated_checksums[i]) {
+              status = Status::Corruption(
+                  "Ingested checksum does not match with the generated "
+                  "checksum");
+              ROCKS_LOG_WARN(
+                  db_options_.info_log,
+                  "Sst file checksum verification of file: %s failed: %s",
+                  files_to_ingest_[i].internal_file_path.c_str(),
+                  status.ToString().c_str());
+              break;
+            }
+          }
+        } else {
+          // If verify_file_checksum is not enabled, we only verify the
+          // checksum function name. If it does not match, fail the ingestion.
+          // If matches, we trust the ingested checksum information and store
+          // in the Manifest.
+          for (size_t i = 0; i < files_to_ingest_.size(); i++) {
+            if (files_checksum_func_names[i] != file_checksum_gen->Name()) {
+              status = Status::InvalidArgument(
+                  "Checksum function name does not match with the checksum "
+                  "function name of this DB");
+              ROCKS_LOG_WARN(
+                  db_options_.info_log,
+                  "Sst file checksum verification of file: %s failed: %s",
+                  external_files_paths[i].c_str(), status.ToString().c_str());
+              break;
+            }
+            files_to_ingest_[i].file_checksum = files_checksums[i];
+            files_to_ingest_[i].file_checksum_func_name =
+                files_checksum_func_names[i];
+          }
+        }
+      } else if (files_checksums.size() != files_checksum_func_names.size() ||
+                 (files_checksums.size() == files_checksum_func_names.size() &&
+                  files_checksums.size() != 0)) {
+        // The checksum or checksum function name vector are not both empty
+        // and they are incomplete.
+        status = Status::InvalidArgument(
+            "The checksum information of ingested sst files are nonempty and "
+            "the size of checksums or the size of the checksum function "
+            "names "
+            "does not match with the number of ingested sst files");
+        ROCKS_LOG_WARN(
+            db_options_.info_log,
+            "The ingested sst files checksum information is incomplete: %s",
+            status.ToString().c_str());
+      }
+    }
+  }
+
   // TODO: The following is duplicated with Cleanup().
   if (!status.ok()) {
     // We failed, remove all files that we copied into the db
@@ -186,8 +316,8 @@ Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed,
     ranges.emplace_back(file_to_ingest.smallest_internal_key.user_key(),
                         file_to_ingest.largest_internal_key.user_key());
   }
-  Status status =
-      cfd_->RangesOverlapWithMemtables(ranges, super_version, flush_needed);
+  Status status = cfd_->RangesOverlapWithMemtables(
+      ranges, super_version, db_options_.allow_data_in_errors, flush_needed);
   if (status.ok() && *flush_needed &&
       !ingestion_options_.allow_blocking_flush) {
     status = Status::InvalidArgument("External file requires flush");
@@ -212,7 +342,7 @@ Status ExternalSstFileIngestionJob::Run() {
 
   if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) {
     // We need to assign a global sequence number to all the files even
-    // if the dont overlap with any ranges since we have snapshots
+    // if the don't overlap with any ranges since we have snapshots
     force_global_seqno = true;
   }
   // It is safe to use this instead of LastAllocatedSequence since we are
@@ -245,6 +375,11 @@ Status ExternalSstFileIngestionJob::Run() {
       return status;
     }
 
+    status = GenerateChecksumForIngestedFile(&f);
+    if (!status.ok()) {
+      return status;
+    }
+
     // We use the import time as the ancester time. This is the time the data
     // is written to the database.
     int64_t temp_current_time = 0;
@@ -255,11 +390,11 @@ Status ExternalSstFileIngestionJob::Run() {
           static_cast<uint64_t>(temp_current_time);
     }
 
-    edit_.AddFile(
-        f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(), f.fd.GetFileSize(),
-        f.smallest_internal_key, f.largest_internal_key, f.assigned_seqno,
-        f.assigned_seqno, false, kInvalidBlobFileNumber, oldest_ancester_time,
-        current_time, kUnknownFileChecksum, kUnknownFileChecksumFuncName);
+    edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
+                  f.fd.GetFileSize(), f.smallest_internal_key,
+                  f.largest_internal_key, f.assigned_seqno, f.assigned_seqno,
+                  false, kInvalidBlobFileNumber, oldest_ancester_time,
+                  current_time, f.file_checksum, f.file_checksum_func_name);
   }
   return status;
 }
@@ -377,8 +512,8 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
   if (!status.ok()) {
     return status;
   }
-  sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file),
-                                                   external_file));
+  sst_file_reader.reset(new RandomAccessFileReader(
+      std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_));
 
   status = cfd_->ioptions()->table_factory->NewTableReader(
       TableReaderOptions(*cfd_->ioptions(),
@@ -467,22 +602,28 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
   file_to_ingest->largest_internal_key =
       InternalKey("", 0, ValueType::kTypeValue);
   bool bounds_set = false;
+  bool allow_data_in_errors = db_options_.allow_data_in_errors;
   iter->SeekToFirst();
   if (iter->Valid()) {
-    if (!ParseInternalKey(iter->key(), &key)) {
-      return Status::Corruption("external file have corrupted keys");
+    Status pik_status =
+        ParseInternalKey(iter->key(), &key, allow_data_in_errors);
+    if (!pik_status.ok()) {
+      return Status::Corruption("Corrupted key in external file. ",
+                                pik_status.getState());
     }
     if (key.sequence != 0) {
-      return Status::Corruption("external file have non zero sequence number");
+      return Status::Corruption("External file has non zero sequence number");
     }
     file_to_ingest->smallest_internal_key.SetFrom(key);
 
     iter->SeekToLast();
-    if (!ParseInternalKey(iter->key(), &key)) {
-      return Status::Corruption("external file have corrupted keys");
+    pik_status = ParseInternalKey(iter->key(), &key, allow_data_in_errors);
+    if (!pik_status.ok()) {
+      return Status::Corruption("Corrupted key in external file. ",
+                                pik_status.getState());
     }
     if (key.sequence != 0) {
-      return Status::Corruption("external file have non zero sequence number");
+      return Status::Corruption("External file has non zero sequence number");
     }
     file_to_ingest->largest_internal_key.SetFrom(key);
 
@@ -495,8 +636,11 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
   if (range_del_iter != nullptr) {
     for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
          range_del_iter->Next()) {
-      if (!ParseInternalKey(range_del_iter->key(), &key)) {
-        return Status::Corruption("external file have corrupted keys");
+      Status pik_status =
+          ParseInternalKey(range_del_iter->key(), &key, allow_data_in_errors);
+      if (!pik_status.ok()) {
+        return Status::Corruption("Corrupted key in external file. ",
+                                  pik_status.getState());
       }
       RangeTombstone tombstone(key, range_del_iter->value());
 
@@ -588,7 +732,7 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
       continue;
     }
 
-    // We dont overlap with any keys in this level, but we still need to check
+    // We don't overlap with any keys in this level, but we still need to check
     // if our file can fit in it
     if (IngestedFileFitInLevel(file_to_ingest, lvl)) {
       target_level = lvl;
@@ -646,7 +790,7 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
     return Status::InvalidArgument("Global seqno is required, but disabled");
   } else if (file_to_ingest->global_seqno_offset == 0) {
     return Status::InvalidArgument(
-        "Trying to set global seqno for a file that dont have a global seqno "
+        "Trying to set global seqno for a file that don't have a global seqno "
         "field");
   }
 
@@ -659,13 +803,14 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
         fs_->NewRandomRWFile(file_to_ingest->internal_file_path, env_options_,
                              &rwfile, nullptr);
     if (status.ok()) {
+      FSRandomRWFilePtr fsptr(std::move(rwfile), io_tracer_);
       std::string seqno_val;
       PutFixed64(&seqno_val, seqno);
-      status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val,
-                             IOOptions(), nullptr);
+      status = fsptr->Write(file_to_ingest->global_seqno_offset, seqno_val,
+                            IOOptions(), nullptr);
       if (status.ok()) {
         TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno");
-        status = SyncIngestedFile(rwfile.get());
+        status = SyncIngestedFile(fsptr.get());
         TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncGlobalSeqno");
         if (!status.ok()) {
           ROCKS_LOG_WARN(db_options_.info_log,
@@ -687,6 +832,33 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
   return Status::OK();
 }
 
+IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile(
+    IngestedFileInfo* file_to_ingest) {
+  if (db_options_.file_checksum_gen_factory == nullptr ||
+      need_generate_file_checksum_ == false ||
+      ingestion_options_.write_global_seqno == false) {
+    // If file_checksum_gen_factory is not set, we are not able to generate
+    // the checksum. if write_global_seqno is false, it means we will use
+    // file checksum generated during Prepare(). This step will be skipped.
+    return IOStatus::OK();
+  }
+  std::string file_checksum;
+  std::string file_checksum_func_name;
+  std::string requested_checksum_func_name;
+  IOStatus io_s = GenerateOneFileChecksum(
+      fs_.get(), file_to_ingest->internal_file_path,
+      db_options_.file_checksum_gen_factory.get(), requested_checksum_func_name,
+      &file_checksum, &file_checksum_func_name,
+      ingestion_options_.verify_checksums_readahead_size,
+      db_options_.allow_mmap_reads, io_tracer_);
+  if (!io_s.ok()) {
+    return io_s;
+  }
+  file_to_ingest->file_checksum = file_checksum;
+  file_to_ingest->file_checksum_func_name = file_checksum_func_name;
+  return IOStatus::OK();
+}
+
 bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
     const IngestedFileInfo* file_to_ingest, int level) {
   if (level == 0) {