X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Frocksdb%2Fdb%2Fexternal_sst_file_ingestion_job.cc;fp=ceph%2Fsrc%2Frocksdb%2Fdb%2Fexternal_sst_file_ingestion_job.cc;h=ba1277eab3345fcc3e5f1e95479353f81797f42d;hb=1e59de90020f1d8d374046ef9cca56ccd4e806e2;hp=1913a211ac9e6b98b4486f86ee32a9b9834470f2;hpb=bd41e436e25044e8e83156060a37c23cb661c364;p=ceph.git diff --git a/ceph/src/rocksdb/db/external_sst_file_ingestion_job.cc b/ceph/src/rocksdb/db/external_sst_file_ingestion_job.cc index 1913a211a..ba1277eab 100644 --- a/ceph/src/rocksdb/db/external_sst_file_ingestion_job.cc +++ b/ceph/src/rocksdb/db/external_sst_file_ingestion_job.cc @@ -17,10 +17,12 @@ #include "db/version_edit.h" #include "file/file_util.h" #include "file/random_access_file_reader.h" +#include "logging/logging.h" #include "table/merging_iterator.h" #include "table/scoped_arena_iterator.h" #include "table/sst_file_writer_collectors.h" #include "table/table_builder.h" +#include "table/unique_id_impl.h" #include "test_util/sync_point.h" #include "util/stop_watch.h" @@ -30,26 +32,37 @@ Status ExternalSstFileIngestionJob::Prepare( const std::vector& external_files_paths, const std::vector& files_checksums, const std::vector& files_checksum_func_names, - uint64_t next_file_number, SuperVersion* sv) { + const Temperature& file_temperature, uint64_t next_file_number, + SuperVersion* sv) { Status status; // Read the information of files we are ingesting for (const std::string& file_path : external_files_paths) { IngestedFileInfo file_to_ingest; - status = GetIngestedFileInfo(file_path, &file_to_ingest, sv); + status = + GetIngestedFileInfo(file_path, next_file_number++, &file_to_ingest, sv); if (!status.ok()) { return status; } - files_to_ingest_.push_back(file_to_ingest); - } - for (const IngestedFileInfo& f : files_to_ingest_) { - if (f.cf_id != + if (file_to_ingest.cf_id != TablePropertiesCollectorFactory::Context::kUnknownColumnFamily && - f.cf_id != cfd_->GetID()) { + file_to_ingest.cf_id != cfd_->GetID()) { return Status::InvalidArgument( "External file column family id don't match"); } + + if (file_to_ingest.num_entries == 0 && + file_to_ingest.num_range_deletions == 0) { + return Status::InvalidArgument("File contain no entries"); + } + + if (!file_to_ingest.smallest_internal_key.Valid() || + !file_to_ingest.largest_internal_key.Valid()) { + return Status::Corruption("Generated table have corrupted keys"); + } + + files_to_ingest_.emplace_back(std::move(file_to_ingest)); } const Comparator* ucmp = cfd_->internal_comparator().user_comparator(); @@ -79,29 +92,22 @@ Status ExternalSstFileIngestionJob::Prepare( } } - if (ingestion_options_.ingest_behind && files_overlap_) { - return Status::NotSupported("Files have overlapping ranges"); + // Hanlde the file temperature + for (size_t i = 0; i < num_files; i++) { + files_to_ingest_[i].file_temperature = file_temperature; } - for (IngestedFileInfo& f : files_to_ingest_) { - if (f.num_entries == 0 && f.num_range_deletions == 0) { - return Status::InvalidArgument("File contain no entries"); - } - - if (!f.smallest_internal_key.Valid() || !f.largest_internal_key.Valid()) { - return Status::Corruption("Generated table have corrupted keys"); - } + if (ingestion_options_.ingest_behind && files_overlap_) { + return Status::NotSupported("Files have overlapping ranges"); } // Copy/Move external files into DB std::unordered_set ingestion_path_ids; for (IngestedFileInfo& f : files_to_ingest_) { - f.fd = FileDescriptor(next_file_number++, 0, f.file_size); f.copy_file = false; const std::string path_outside_db = f.external_file_path; - const std::string path_inside_db = - TableFileName(cfd_->ioptions()->cf_paths, f.fd.GetNumber(), - f.fd.GetPathId()); + const std::string path_inside_db = TableFileName( + cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId()); if (ingestion_options_.move_files) { status = fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr); @@ -110,23 +116,35 @@ Status ExternalSstFileIngestionJob::Prepare( // directory before ingest the file. For integrity of RocksDB we need // to sync the file. std::unique_ptr file_to_sync; - status = fs_->ReopenWritableFile(path_inside_db, env_options_, - &file_to_sync, nullptr); - if (status.ok()) { - TEST_SYNC_POINT( - "ExternalSstFileIngestionJob::BeforeSyncIngestedFile"); - status = SyncIngestedFile(file_to_sync.get()); - TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncIngestedFile"); - if (!status.ok()) { - ROCKS_LOG_WARN(db_options_.info_log, - "Failed to sync ingested file %s: %s", - path_inside_db.c_str(), status.ToString().c_str()); + Status s = fs_->ReopenWritableFile(path_inside_db, env_options_, + &file_to_sync, nullptr); + TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:Reopen", + &s); + // Some file systems (especially remote/distributed) don't support + // reopening a file for writing and don't require reopening and + // syncing the file. Ignore the NotSupported error in that case. + if (!s.IsNotSupported()) { + status = s; + if (status.ok()) { + TEST_SYNC_POINT( + "ExternalSstFileIngestionJob::BeforeSyncIngestedFile"); + status = SyncIngestedFile(file_to_sync.get()); + TEST_SYNC_POINT( + "ExternalSstFileIngestionJob::AfterSyncIngestedFile"); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Failed to sync ingested file %s: %s", + path_inside_db.c_str(), status.ToString().c_str()); + } } } } else if (status.IsNotSupported() && ingestion_options_.failed_move_fall_back_to_copy) { // Original file is on a different FS, use copy instead of hard linking. f.copy_file = true; + ROCKS_LOG_INFO(db_options_.info_log, + "Triy to link file %s but it's not supported : %s", + path_outside_db.c_str(), status.ToString().c_str()); } } else { f.copy_file = true; @@ -136,8 +154,9 @@ Status ExternalSstFileIngestionJob::Prepare( TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile", nullptr); // CopyFile also sync the new file. - status = CopyFile(fs_.get(), path_outside_db, path_inside_db, 0, - db_options_.use_fsync, io_tracer_); + status = + CopyFile(fs_.get(), path_outside_db, path_inside_db, 0, + db_options_.use_fsync, io_tracer_, Temperature::kUnknown); } TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded"); if (!status.ok()) { @@ -153,7 +172,9 @@ Status ExternalSstFileIngestionJob::Prepare( TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir"); if (status.ok()) { for (auto path_id : ingestion_path_ids) { - status = directories_->GetDataDir(path_id)->Fsync(IOOptions(), nullptr); + status = directories_->GetDataDir(path_id)->FsyncWithDirOptions( + IOOptions(), nullptr, + DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); if (!status.ok()) { ROCKS_LOG_WARN(db_options_.info_log, "Failed to sync directory %" ROCKSDB_PRIszt @@ -195,13 +216,17 @@ Status ExternalSstFileIngestionJob::Prepare( std::string generated_checksum; std::string generated_checksum_func_name; std::string requested_checksum_func_name; + // TODO: rate limit file reads for checksum calculation during file + // ingestion. IOStatus io_s = GenerateOneFileChecksum( fs_.get(), files_to_ingest_[i].internal_file_path, db_options_.file_checksum_gen_factory.get(), requested_checksum_func_name, &generated_checksum, &generated_checksum_func_name, ingestion_options_.verify_checksums_readahead_size, - db_options_.allow_mmap_reads, io_tracer_); + db_options_.allow_mmap_reads, io_tracer_, + db_options_.rate_limiter.get(), + Env::IO_TOTAL /* rate_limiter_priority */); if (!io_s.ok()) { status = io_s; ROCKS_LOG_WARN(db_options_.info_log, @@ -292,12 +317,13 @@ Status ExternalSstFileIngestionJob::Prepare( // TODO: The following is duplicated with Cleanup(). if (!status.ok()) { + IOOptions io_opts; // We failed, remove all files that we copied into the db for (IngestedFileInfo& f : files_to_ingest_) { if (f.internal_file_path.empty()) { continue; } - Status s = env_->DeleteFile(f.internal_file_path); + Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr); if (!s.ok()) { ROCKS_LOG_WARN(db_options_.info_log, "AddFile() clean up for file %s failed : %s", @@ -312,9 +338,30 @@ Status ExternalSstFileIngestionJob::Prepare( Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed, SuperVersion* super_version) { autovector ranges; - for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) { - ranges.emplace_back(file_to_ingest.smallest_internal_key.user_key(), - file_to_ingest.largest_internal_key.user_key()); + autovector keys; + size_t ts_sz = cfd_->user_comparator()->timestamp_size(); + if (ts_sz) { + // Check all ranges [begin, end] inclusively. Add maximum + // timestamp to include all `begin` keys, and add minimal timestamp to + // include all `end` keys. + for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) { + std::string begin_str; + std::string end_str; + AppendUserKeyWithMaxTimestamp( + &begin_str, file_to_ingest.smallest_internal_key.user_key(), ts_sz); + AppendKeyWithMinTimestamp( + &end_str, file_to_ingest.largest_internal_key.user_key(), ts_sz); + keys.emplace_back(std::move(begin_str)); + keys.emplace_back(std::move(end_str)); + } + for (size_t i = 0; i < files_to_ingest_.size(); ++i) { + ranges.emplace_back(keys[2 * i], keys[2 * i + 1]); + } + } else { + for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) { + ranges.emplace_back(file_to_ingest.smallest_internal_key.user_key(), + file_to_ingest.largest_internal_key.user_key()); + } } Status status = cfd_->RangesOverlapWithMemtables( ranges, super_version, db_options_.allow_data_in_errors, flush_needed); @@ -335,6 +382,12 @@ Status ExternalSstFileIngestionJob::Run() { // with the files we are ingesting bool need_flush = false; status = NeedsFlush(&need_flush, super_version); + if (!status.ok()) { + return status; + } + if (need_flush) { + return Status::TryAgain(); + } assert(status.ok() && need_flush == false); #endif @@ -360,9 +413,32 @@ Status ExternalSstFileIngestionJob::Run() { super_version, force_global_seqno, cfd_->ioptions()->compaction_style, last_seqno, &f, &assigned_seqno); } + + // Modify the smallest/largest internal key to include the sequence number + // that we just learned. Only overwrite sequence number zero. There could + // be a nonzero sequence number already to indicate a range tombstone's + // exclusive endpoint. + ParsedInternalKey smallest_parsed, largest_parsed; + if (status.ok()) { + status = ParseInternalKey(*f.smallest_internal_key.rep(), + &smallest_parsed, false /* log_err_key */); + } + if (status.ok()) { + status = ParseInternalKey(*f.largest_internal_key.rep(), &largest_parsed, + false /* log_err_key */); + } if (!status.ok()) { return status; } + if (smallest_parsed.sequence == 0) { + UpdateInternalKey(f.smallest_internal_key.rep(), assigned_seqno, + smallest_parsed.type); + } + if (largest_parsed.sequence == 0) { + UpdateInternalKey(f.largest_internal_key.rep(), assigned_seqno, + largest_parsed.type); + } + status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno); TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run", &assigned_seqno); @@ -385,16 +461,18 @@ Status ExternalSstFileIngestionJob::Run() { int64_t temp_current_time = 0; uint64_t current_time = kUnknownFileCreationTime; uint64_t oldest_ancester_time = kUnknownOldestAncesterTime; - if (env_->GetCurrentTime(&temp_current_time).ok()) { + if (clock_->GetCurrentTime(&temp_current_time).ok()) { current_time = oldest_ancester_time = static_cast(temp_current_time); } - - edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(), - f.fd.GetFileSize(), f.smallest_internal_key, - f.largest_internal_key, f.assigned_seqno, f.assigned_seqno, - false, kInvalidBlobFileNumber, oldest_ancester_time, - current_time, f.file_checksum, f.file_checksum_func_name); + FileMetaData f_metadata( + f.fd.GetNumber(), f.fd.GetPathId(), f.fd.GetFileSize(), + f.smallest_internal_key, f.largest_internal_key, f.assigned_seqno, + f.assigned_seqno, false, f.file_temperature, kInvalidBlobFileNumber, + oldest_ancester_time, current_time, f.file_checksum, + f.file_checksum_func_name, f.unique_id); + f_metadata.temperature = f.file_temperature; + edit_.AddFile(f.picked_level, f_metadata); } return status; } @@ -403,7 +481,7 @@ void ExternalSstFileIngestionJob::UpdateStats() { // Update internal stats for new ingested files uint64_t total_keys = 0; uint64_t total_l0_files = 0; - uint64_t total_time = env_->NowMicros() - job_start_time_; + uint64_t total_time = clock_->NowMicros() - job_start_time_; EventLoggerStream stream = event_logger_->Log(); stream << "event" @@ -412,7 +490,8 @@ void ExternalSstFileIngestionJob::UpdateStats() { stream.StartArray(); for (IngestedFileInfo& f : files_to_ingest_) { - InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1); + InternalStats::CompactionStats stats( + CompactionReason::kExternalSstIngestion, 1); stats.micros = total_time; // If actual copy occurred for this file, then we need to count the file // size as the actual bytes written. If the file was linked, then we ignore @@ -459,6 +538,7 @@ void ExternalSstFileIngestionJob::UpdateStats() { } void ExternalSstFileIngestionJob::Cleanup(const Status& status) { + IOOptions io_opts; if (!status.ok()) { // We failed to add the files to the database // remove all the files we copied @@ -466,7 +546,7 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) { if (f.internal_file_path.empty()) { continue; } - Status s = env_->DeleteFile(f.internal_file_path); + Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr); if (!s.ok()) { ROCKS_LOG_WARN(db_options_.info_log, "AddFile() clean up for file %s failed : %s", @@ -478,7 +558,7 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) { } else if (status.ok() && ingestion_options_.move_files) { // The files were moved and added successfully, remove original file links for (IngestedFileInfo& f : files_to_ingest_) { - Status s = env_->DeleteFile(f.external_file_path); + Status s = fs_->DeleteFile(f.external_file_path, io_opts, nullptr); if (!s.ok()) { ROCKS_LOG_WARN( db_options_.info_log, @@ -491,8 +571,8 @@ void ExternalSstFileIngestionJob::Cleanup(const Status& status) { } Status ExternalSstFileIngestionJob::GetIngestedFileInfo( - const std::string& external_file, IngestedFileInfo* file_to_ingest, - SuperVersion* sv) { + const std::string& external_file, uint64_t new_file_number, + IngestedFileInfo* file_to_ingest, SuperVersion* sv) { file_to_ingest->external_file_path = external_file; // Get external file size @@ -502,13 +582,17 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( return status; } + // Assign FD with number + file_to_ingest->fd = + FileDescriptor(new_file_number, 0, file_to_ingest->file_size); + // Create TableReader for external file std::unique_ptr table_reader; std::unique_ptr sst_file; std::unique_ptr sst_file_reader; - status = fs_->NewRandomAccessFile(external_file, env_options_, - &sst_file, nullptr); + status = + fs_->NewRandomAccessFile(external_file, env_options_, &sst_file, nullptr); if (!status.ok()) { return status; } @@ -516,9 +600,14 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_)); status = cfd_->ioptions()->table_factory->NewTableReader( - TableReaderOptions(*cfd_->ioptions(), - sv->mutable_cf_options.prefix_extractor.get(), - env_options_, cfd_->internal_comparator()), + TableReaderOptions( + *cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor, + env_options_, cfd_->internal_comparator(), + /*skip_filters*/ false, /*immortal*/ false, + /*force_direct_prefetch*/ false, /*level*/ -1, + /*block_cache_tracer*/ nullptr, + /*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(), + /*cur_file_num*/ new_file_number), std::move(sst_file_reader), file_to_ingest->file_size, &table_reader); if (!status.ok()) { return status; @@ -558,22 +647,20 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( // Set the global sequence number file_to_ingest->original_seqno = DecodeFixed64(seqno_iter->second.c_str()); - auto offsets_iter = props->properties_offsets.find( - ExternalSstFilePropertyNames::kGlobalSeqno); - if (offsets_iter == props->properties_offsets.end() || - offsets_iter->second == 0) { + if (props->external_sst_file_global_seqno_offset == 0) { file_to_ingest->global_seqno_offset = 0; return Status::Corruption("Was not able to find file global seqno field"); } - file_to_ingest->global_seqno_offset = static_cast(offsets_iter->second); + file_to_ingest->global_seqno_offset = + static_cast(props->external_sst_file_global_seqno_offset); } else if (file_to_ingest->version == 1) { // SST file V1 should not have global seqno field assert(seqno_iter == uprops.end()); file_to_ingest->original_seqno = 0; if (ingestion_options_.allow_blocking_flush || - ingestion_options_.allow_global_seqno) { + ingestion_options_.allow_global_seqno) { return Status::InvalidArgument( - "External SST file V1 does not support global seqno"); + "External SST file V1 does not support global seqno"); } } else { return Status::InvalidArgument("External file version is not supported"); @@ -664,6 +751,16 @@ Status ExternalSstFileIngestionJob::GetIngestedFileInfo( file_to_ingest->table_properties = *props; + auto s = GetSstInternalUniqueId(props->db_id, props->db_session_id, + props->orig_file_number, + &(file_to_ingest->unique_id)); + if (!s.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Failed to get SST unique id for file %s", + file_to_ingest->internal_file_path.c_str()); + file_to_ingest->unique_id = kNullUniqueId64x2; + } + return status; } @@ -676,6 +773,12 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( if (force_global_seqno) { *assigned_seqno = last_seqno + 1; if (compaction_style == kCompactionStyleUniversal || files_overlap_) { + if (ingestion_options_.fail_if_not_bottommost_level) { + status = Status::TryAgain( + "Files cannot be ingested to Lmax. Please make sure key range of " + "Lmax does not overlap with files to ingest."); + return status; + } file_to_ingest->picked_level = 0; return status; } @@ -714,10 +817,11 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( const std::vector& level_files = vstorage->LevelFiles(lvl); const SequenceNumber level_largest_seqno = - (*max_element(level_files.begin(), level_files.end(), - [](FileMetaData* f1, FileMetaData* f2) { - return f1->fd.largest_seqno < f2->fd.largest_seqno; - })) + (*std::max_element(level_files.begin(), level_files.end(), + [](FileMetaData* f1, FileMetaData* f2) { + return f1->fd.largest_seqno < + f2->fd.largest_seqno; + })) ->fd.largest_seqno; // should only assign seqno to current level's largest seqno when // the file fits @@ -744,7 +848,16 @@ Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( target_level = 0; *assigned_seqno = last_seqno + 1; } - TEST_SYNC_POINT_CALLBACK( + + if (ingestion_options_.fail_if_not_bottommost_level && + target_level < cfd_->NumberLevels() - 1) { + status = Status::TryAgain( + "Files cannot be ingested to Lmax. Please make sure key range of Lmax " + "does not overlap with files to ingest."); + return status; + } + + TEST_SYNC_POINT_CALLBACK( "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile", &overlap_with_db); file_to_ingest->picked_level = target_level; @@ -759,10 +872,10 @@ Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile( auto* vstorage = cfd_->current()->storage_info(); // first check if new files fit in the bottommost level int bottom_lvl = cfd_->NumberLevels() - 1; - if(!IngestedFileFitInLevel(file_to_ingest, bottom_lvl)) { + if (!IngestedFileFitInLevel(file_to_ingest, bottom_lvl)) { return Status::InvalidArgument( - "Can't ingest_behind file as it doesn't fit " - "at the bottommost level!"); + "Can't ingest_behind file as it doesn't fit " + "at the bottommost level!"); } // second check if despite allow_ingest_behind=true we still have 0 seqnums @@ -771,8 +884,8 @@ Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile( for (auto file : vstorage->LevelFiles(lvl)) { if (file->fd.smallest_seqno == 0) { return Status::InvalidArgument( - "Can't ingest_behind file as despite allow_ingest_behind=true " - "there are files with 0 seqno in database at upper levels!"); + "Can't ingest_behind file as despite allow_ingest_behind=true " + "there are files with 0 seqno in database at upper levels!"); } } } @@ -799,11 +912,13 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile( // If the file system does not support random write, then we should not. // Otherwise we should. std::unique_ptr rwfile; - Status status = - fs_->NewRandomRWFile(file_to_ingest->internal_file_path, env_options_, - &rwfile, nullptr); + Status status = fs_->NewRandomRWFile(file_to_ingest->internal_file_path, + env_options_, &rwfile, nullptr); + TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::NewRandomRWFile", + &status); if (status.ok()) { - FSRandomRWFilePtr fsptr(std::move(rwfile), io_tracer_); + FSRandomRWFilePtr fsptr(std::move(rwfile), io_tracer_, + file_to_ingest->internal_file_path); std::string seqno_val; PutFixed64(&seqno_val, seqno); status = fsptr->Write(file_to_ingest->global_seqno_offset, seqno_val, @@ -845,12 +960,14 @@ IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile( std::string file_checksum; std::string file_checksum_func_name; std::string requested_checksum_func_name; + // TODO: rate limit file reads for checksum calculation during file ingestion. IOStatus io_s = GenerateOneFileChecksum( fs_.get(), file_to_ingest->internal_file_path, db_options_.file_checksum_gen_factory.get(), requested_checksum_func_name, &file_checksum, &file_checksum_func_name, ingestion_options_.verify_checksums_readahead_size, - db_options_.allow_mmap_reads, io_tracer_); + db_options_.allow_mmap_reads, io_tracer_, db_options_.rate_limiter.get(), + Env::IO_TOTAL /* rate_limiter_priority */); if (!io_s.ok()) { return io_s; }