Status ExternalSstFileIngestionJob::Prepare(
const std::vector<std::string>& external_files_paths,
+ const std::vector<std::string>& files_checksums,
+ const std::vector<std::string>& files_checksum_func_names,
uint64_t next_file_number, SuperVersion* sv) {
Status status;
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
f.cf_id != cfd_->GetID()) {
return Status::InvalidArgument(
- "External file column family id dont match");
+ "External file column family id don't match");
}
}
if (num_files == 0) {
return Status::InvalidArgument("The list of files is empty");
} else if (num_files > 1) {
- // Verify that passed files dont have overlapping ranges
+ // Verify that passed files don't have overlapping ranges
autovector<const IngestedFileInfo*> sorted_files;
for (size_t i = 0; i < num_files; i++) {
sorted_files.push_back(&files_to_ingest_[i]);
info2->smallest_internal_key) < 0;
});
- for (size_t i = 0; i < num_files - 1; i++) {
+ for (size_t i = 0; i + 1 < num_files; i++) {
if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,
sorted_files[i + 1]->smallest_internal_key) >= 0) {
files_overlap_ = true;
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
nullptr);
// CopyFile also sync the new file.
- status = CopyFile(fs_, path_outside_db, path_inside_db, 0,
- db_options_.use_fsync);
+ status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
+ db_options_.use_fsync, io_tracer_);
}
TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
if (!status.ok()) {
break;
}
f.internal_file_path = path_inside_db;
+ // Initialize the checksum information of ingested files.
+ f.file_checksum = kUnknownFileChecksum;
+ f.file_checksum_func_name = kUnknownFileChecksumFuncName;
ingestion_path_ids.insert(f.fd.GetPathId());
}
TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir");
if (status.ok()) {
for (auto path_id : ingestion_path_ids) {
- status = directories_->GetDataDir(path_id)->Fsync();
+ status = directories_->GetDataDir(path_id)->Fsync(IOOptions(), nullptr);
if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
"Failed to sync directory %" ROCKSDB_PRIszt
}
TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir");
+ // Generate and check the sst file checksum. Note that, if
+ // IngestExternalFileOptions::write_global_seqno is true, we will not update
+ // the checksum information in the files_to_ingests_ here, since the file is
+ // upadted with the new global_seqno. After global_seqno is updated, DB will
+ // generate the new checksum and store it in the Manifest. In all other cases
+ // if ingestion_options_.write_global_seqno == true and
+ // verify_file_checksum is false, we only check the checksum function name.
+ if (status.ok() && db_options_.file_checksum_gen_factory != nullptr) {
+ if (ingestion_options_.verify_file_checksum == false &&
+ files_checksums.size() == files_to_ingest_.size() &&
+ files_checksum_func_names.size() == files_to_ingest_.size()) {
+ // Only when verify_file_checksum == false and the checksum for ingested
+ // files are provided, DB will use the provided checksum and does not
+ // generate the checksum for ingested files.
+ need_generate_file_checksum_ = false;
+ } else {
+ need_generate_file_checksum_ = true;
+ }
+ FileChecksumGenContext gen_context;
+ std::unique_ptr<FileChecksumGenerator> file_checksum_gen =
+ db_options_.file_checksum_gen_factory->CreateFileChecksumGenerator(
+ gen_context);
+ std::vector<std::string> generated_checksums;
+ std::vector<std::string> generated_checksum_func_names;
+ // Step 1: generate the checksum for ingested sst file.
+ if (need_generate_file_checksum_) {
+ for (size_t i = 0; i < files_to_ingest_.size(); i++) {
+ std::string generated_checksum;
+ std::string generated_checksum_func_name;
+ std::string requested_checksum_func_name;
+ IOStatus io_s = GenerateOneFileChecksum(
+ fs_.get(), files_to_ingest_[i].internal_file_path,
+ db_options_.file_checksum_gen_factory.get(),
+ requested_checksum_func_name, &generated_checksum,
+ &generated_checksum_func_name,
+ ingestion_options_.verify_checksums_readahead_size,
+ db_options_.allow_mmap_reads, io_tracer_);
+ if (!io_s.ok()) {
+ status = io_s;
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Sst file checksum generation of file: %s failed: %s",
+ files_to_ingest_[i].internal_file_path.c_str(),
+ status.ToString().c_str());
+ break;
+ }
+ if (ingestion_options_.write_global_seqno == false) {
+ files_to_ingest_[i].file_checksum = generated_checksum;
+ files_to_ingest_[i].file_checksum_func_name =
+ generated_checksum_func_name;
+ }
+ generated_checksums.push_back(generated_checksum);
+ generated_checksum_func_names.push_back(generated_checksum_func_name);
+ }
+ }
+
+ // Step 2: based on the verify_file_checksum and ingested checksum
+ // information, do the verification.
+ if (status.ok()) {
+ if (files_checksums.size() == files_to_ingest_.size() &&
+ files_checksum_func_names.size() == files_to_ingest_.size()) {
+ // Verify the checksum and checksum function name.
+ if (ingestion_options_.verify_file_checksum) {
+ for (size_t i = 0; i < files_to_ingest_.size(); i++) {
+ if (files_checksum_func_names[i] !=
+ generated_checksum_func_names[i]) {
+ status = Status::InvalidArgument(
+ "Checksum function name does not match with the checksum "
+ "function name of this DB");
+ ROCKS_LOG_WARN(
+ db_options_.info_log,
+ "Sst file checksum verification of file: %s failed: %s",
+ external_files_paths[i].c_str(), status.ToString().c_str());
+ break;
+ }
+ if (files_checksums[i] != generated_checksums[i]) {
+ status = Status::Corruption(
+ "Ingested checksum does not match with the generated "
+ "checksum");
+ ROCKS_LOG_WARN(
+ db_options_.info_log,
+ "Sst file checksum verification of file: %s failed: %s",
+ files_to_ingest_[i].internal_file_path.c_str(),
+ status.ToString().c_str());
+ break;
+ }
+ }
+ } else {
+ // If verify_file_checksum is not enabled, we only verify the
+ // checksum function name. If it does not match, fail the ingestion.
+ // If matches, we trust the ingested checksum information and store
+ // in the Manifest.
+ for (size_t i = 0; i < files_to_ingest_.size(); i++) {
+ if (files_checksum_func_names[i] != file_checksum_gen->Name()) {
+ status = Status::InvalidArgument(
+ "Checksum function name does not match with the checksum "
+ "function name of this DB");
+ ROCKS_LOG_WARN(
+ db_options_.info_log,
+ "Sst file checksum verification of file: %s failed: %s",
+ external_files_paths[i].c_str(), status.ToString().c_str());
+ break;
+ }
+ files_to_ingest_[i].file_checksum = files_checksums[i];
+ files_to_ingest_[i].file_checksum_func_name =
+ files_checksum_func_names[i];
+ }
+ }
+ } else if (files_checksums.size() != files_checksum_func_names.size() ||
+ (files_checksums.size() == files_checksum_func_names.size() &&
+ files_checksums.size() != 0)) {
+ // The checksum or checksum function name vector are not both empty
+ // and they are incomplete.
+ status = Status::InvalidArgument(
+ "The checksum information of ingested sst files are nonempty and "
+ "the size of checksums or the size of the checksum function "
+ "names "
+ "does not match with the number of ingested sst files");
+ ROCKS_LOG_WARN(
+ db_options_.info_log,
+ "The ingested sst files checksum information is incomplete: %s",
+ status.ToString().c_str());
+ }
+ }
+ }
+
// TODO: The following is duplicated with Cleanup().
if (!status.ok()) {
// We failed, remove all files that we copied into the db
ranges.emplace_back(file_to_ingest.smallest_internal_key.user_key(),
file_to_ingest.largest_internal_key.user_key());
}
- Status status =
- cfd_->RangesOverlapWithMemtables(ranges, super_version, flush_needed);
+ Status status = cfd_->RangesOverlapWithMemtables(
+ ranges, super_version, db_options_.allow_data_in_errors, flush_needed);
if (status.ok() && *flush_needed &&
!ingestion_options_.allow_blocking_flush) {
status = Status::InvalidArgument("External file requires flush");
if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) {
// We need to assign a global sequence number to all the files even
- // if the dont overlap with any ranges since we have snapshots
+ // if the don't overlap with any ranges since we have snapshots
force_global_seqno = true;
}
// It is safe to use this instead of LastAllocatedSequence since we are
return status;
}
+ status = GenerateChecksumForIngestedFile(&f);
+ if (!status.ok()) {
+ return status;
+ }
+
// We use the import time as the ancester time. This is the time the data
// is written to the database.
int64_t temp_current_time = 0;
static_cast<uint64_t>(temp_current_time);
}
- edit_.AddFile(
- f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(), f.fd.GetFileSize(),
- f.smallest_internal_key, f.largest_internal_key, f.assigned_seqno,
- f.assigned_seqno, false, kInvalidBlobFileNumber, oldest_ancester_time,
- current_time, kUnknownFileChecksum, kUnknownFileChecksumFuncName);
+ edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
+ f.fd.GetFileSize(), f.smallest_internal_key,
+ f.largest_internal_key, f.assigned_seqno, f.assigned_seqno,
+ false, kInvalidBlobFileNumber, oldest_ancester_time,
+ current_time, f.file_checksum, f.file_checksum_func_name);
}
return status;
}
if (!status.ok()) {
return status;
}
- sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file),
- external_file));
+ sst_file_reader.reset(new RandomAccessFileReader(
+ std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_));
status = cfd_->ioptions()->table_factory->NewTableReader(
TableReaderOptions(*cfd_->ioptions(),
file_to_ingest->largest_internal_key =
InternalKey("", 0, ValueType::kTypeValue);
bool bounds_set = false;
+ bool allow_data_in_errors = db_options_.allow_data_in_errors;
iter->SeekToFirst();
if (iter->Valid()) {
- if (!ParseInternalKey(iter->key(), &key)) {
- return Status::Corruption("external file have corrupted keys");
+ Status pik_status =
+ ParseInternalKey(iter->key(), &key, allow_data_in_errors);
+ if (!pik_status.ok()) {
+ return Status::Corruption("Corrupted key in external file. ",
+ pik_status.getState());
}
if (key.sequence != 0) {
- return Status::Corruption("external file have non zero sequence number");
+ return Status::Corruption("External file has non zero sequence number");
}
file_to_ingest->smallest_internal_key.SetFrom(key);
iter->SeekToLast();
- if (!ParseInternalKey(iter->key(), &key)) {
- return Status::Corruption("external file have corrupted keys");
+ pik_status = ParseInternalKey(iter->key(), &key, allow_data_in_errors);
+ if (!pik_status.ok()) {
+ return Status::Corruption("Corrupted key in external file. ",
+ pik_status.getState());
}
if (key.sequence != 0) {
- return Status::Corruption("external file have non zero sequence number");
+ return Status::Corruption("External file has non zero sequence number");
}
file_to_ingest->largest_internal_key.SetFrom(key);
if (range_del_iter != nullptr) {
for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
range_del_iter->Next()) {
- if (!ParseInternalKey(range_del_iter->key(), &key)) {
- return Status::Corruption("external file have corrupted keys");
+ Status pik_status =
+ ParseInternalKey(range_del_iter->key(), &key, allow_data_in_errors);
+ if (!pik_status.ok()) {
+ return Status::Corruption("Corrupted key in external file. ",
+ pik_status.getState());
}
RangeTombstone tombstone(key, range_del_iter->value());
continue;
}
- // We dont overlap with any keys in this level, but we still need to check
+ // We don't overlap with any keys in this level, but we still need to check
// if our file can fit in it
if (IngestedFileFitInLevel(file_to_ingest, lvl)) {
target_level = lvl;
return Status::InvalidArgument("Global seqno is required, but disabled");
} else if (file_to_ingest->global_seqno_offset == 0) {
return Status::InvalidArgument(
- "Trying to set global seqno for a file that dont have a global seqno "
+ "Trying to set global seqno for a file that don't have a global seqno "
"field");
}
fs_->NewRandomRWFile(file_to_ingest->internal_file_path, env_options_,
&rwfile, nullptr);
if (status.ok()) {
+ FSRandomRWFilePtr fsptr(std::move(rwfile), io_tracer_);
std::string seqno_val;
PutFixed64(&seqno_val, seqno);
- status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val,
- IOOptions(), nullptr);
+ status = fsptr->Write(file_to_ingest->global_seqno_offset, seqno_val,
+ IOOptions(), nullptr);
if (status.ok()) {
TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno");
- status = SyncIngestedFile(rwfile.get());
+ status = SyncIngestedFile(fsptr.get());
TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncGlobalSeqno");
if (!status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
return Status::OK();
}
+IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile(
+ IngestedFileInfo* file_to_ingest) {
+ if (db_options_.file_checksum_gen_factory == nullptr ||
+ need_generate_file_checksum_ == false ||
+ ingestion_options_.write_global_seqno == false) {
+ // If file_checksum_gen_factory is not set, we are not able to generate
+ // the checksum. if write_global_seqno is false, it means we will use
+ // file checksum generated during Prepare(). This step will be skipped.
+ return IOStatus::OK();
+ }
+ std::string file_checksum;
+ std::string file_checksum_func_name;
+ std::string requested_checksum_func_name;
+ IOStatus io_s = GenerateOneFileChecksum(
+ fs_.get(), file_to_ingest->internal_file_path,
+ db_options_.file_checksum_gen_factory.get(), requested_checksum_func_name,
+ &file_checksum, &file_checksum_func_name,
+ ingestion_options_.verify_checksums_readahead_size,
+ db_options_.allow_mmap_reads, io_tracer_);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ file_to_ingest->file_checksum = file_checksum;
+ file_to_ingest->file_checksum_func_name = file_checksum_func_name;
+ return IOStatus::OK();
+}
+
bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
const IngestedFileInfo* file_to_ingest, int level) {
if (level == 0) {