1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
8 #include "db/external_sst_file_ingestion_job.h"
13 #include <unordered_set>
16 #include "db/db_impl/db_impl.h"
17 #include "db/version_edit.h"
18 #include "file/file_util.h"
19 #include "file/random_access_file_reader.h"
20 #include "table/merging_iterator.h"
21 #include "table/scoped_arena_iterator.h"
22 #include "table/sst_file_writer_collectors.h"
23 #include "table/table_builder.h"
24 #include "test_util/sync_point.h"
25 #include "util/stop_watch.h"
27 namespace ROCKSDB_NAMESPACE
{
29 Status
ExternalSstFileIngestionJob::Prepare(
30 const std::vector
<std::string
>& external_files_paths
,
31 const std::vector
<std::string
>& files_checksums
,
32 const std::vector
<std::string
>& files_checksum_func_names
,
33 uint64_t next_file_number
, SuperVersion
* sv
) {
36 // Read the information of files we are ingesting
37 for (const std::string
& file_path
: external_files_paths
) {
38 IngestedFileInfo file_to_ingest
;
39 status
= GetIngestedFileInfo(file_path
, &file_to_ingest
, sv
);
43 files_to_ingest_
.push_back(file_to_ingest
);
46 for (const IngestedFileInfo
& f
: files_to_ingest_
) {
48 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily
&&
49 f
.cf_id
!= cfd_
->GetID()) {
50 return Status::InvalidArgument(
51 "External file column family id don't match");
55 const Comparator
* ucmp
= cfd_
->internal_comparator().user_comparator();
56 auto num_files
= files_to_ingest_
.size();
58 return Status::InvalidArgument("The list of files is empty");
59 } else if (num_files
> 1) {
60 // Verify that passed files don't have overlapping ranges
61 autovector
<const IngestedFileInfo
*> sorted_files
;
62 for (size_t i
= 0; i
< num_files
; i
++) {
63 sorted_files
.push_back(&files_to_ingest_
[i
]);
67 sorted_files
.begin(), sorted_files
.end(),
68 [&ucmp
](const IngestedFileInfo
* info1
, const IngestedFileInfo
* info2
) {
69 return sstableKeyCompare(ucmp
, info1
->smallest_internal_key
,
70 info2
->smallest_internal_key
) < 0;
73 for (size_t i
= 0; i
+ 1 < num_files
; i
++) {
74 if (sstableKeyCompare(ucmp
, sorted_files
[i
]->largest_internal_key
,
75 sorted_files
[i
+ 1]->smallest_internal_key
) >= 0) {
76 files_overlap_
= true;
82 if (ingestion_options_
.ingest_behind
&& files_overlap_
) {
83 return Status::NotSupported("Files have overlapping ranges");
86 for (IngestedFileInfo
& f
: files_to_ingest_
) {
87 if (f
.num_entries
== 0 && f
.num_range_deletions
== 0) {
88 return Status::InvalidArgument("File contain no entries");
91 if (!f
.smallest_internal_key
.Valid() || !f
.largest_internal_key
.Valid()) {
92 return Status::Corruption("Generated table have corrupted keys");
96 // Copy/Move external files into DB
97 std::unordered_set
<size_t> ingestion_path_ids
;
98 for (IngestedFileInfo
& f
: files_to_ingest_
) {
99 f
.fd
= FileDescriptor(next_file_number
++, 0, f
.file_size
);
101 const std::string path_outside_db
= f
.external_file_path
;
102 const std::string path_inside_db
=
103 TableFileName(cfd_
->ioptions()->cf_paths
, f
.fd
.GetNumber(),
105 if (ingestion_options_
.move_files
) {
107 fs_
->LinkFile(path_outside_db
, path_inside_db
, IOOptions(), nullptr);
109 // It is unsafe to assume application had sync the file and file
110 // directory before ingest the file. For integrity of RocksDB we need
112 std::unique_ptr
<FSWritableFile
> file_to_sync
;
113 status
= fs_
->ReopenWritableFile(path_inside_db
, env_options_
,
114 &file_to_sync
, nullptr);
117 "ExternalSstFileIngestionJob::BeforeSyncIngestedFile");
118 status
= SyncIngestedFile(file_to_sync
.get());
119 TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncIngestedFile");
121 ROCKS_LOG_WARN(db_options_
.info_log
,
122 "Failed to sync ingested file %s: %s",
123 path_inside_db
.c_str(), status
.ToString().c_str());
126 } else if (status
.IsNotSupported() &&
127 ingestion_options_
.failed_move_fall_back_to_copy
) {
128 // Original file is on a different FS, use copy instead of hard linking.
136 TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
138 // CopyFile also sync the new file.
139 status
= CopyFile(fs_
.get(), path_outside_db
, path_inside_db
, 0,
140 db_options_
.use_fsync
, io_tracer_
);
142 TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
146 f
.internal_file_path
= path_inside_db
;
147 // Initialize the checksum information of ingested files.
148 f
.file_checksum
= kUnknownFileChecksum
;
149 f
.file_checksum_func_name
= kUnknownFileChecksumFuncName
;
150 ingestion_path_ids
.insert(f
.fd
.GetPathId());
153 TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir");
155 for (auto path_id
: ingestion_path_ids
) {
156 status
= directories_
->GetDataDir(path_id
)->Fsync(IOOptions(), nullptr);
158 ROCKS_LOG_WARN(db_options_
.info_log
,
159 "Failed to sync directory %" ROCKSDB_PRIszt
160 " while ingest file: %s",
161 path_id
, status
.ToString().c_str());
166 TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir");
168 // Generate and check the sst file checksum. Note that, if
169 // IngestExternalFileOptions::write_global_seqno is true, we will not update
170 // the checksum information in the files_to_ingests_ here, since the file is
171 // upadted with the new global_seqno. After global_seqno is updated, DB will
172 // generate the new checksum and store it in the Manifest. In all other cases
173 // if ingestion_options_.write_global_seqno == true and
174 // verify_file_checksum is false, we only check the checksum function name.
175 if (status
.ok() && db_options_
.file_checksum_gen_factory
!= nullptr) {
176 if (ingestion_options_
.verify_file_checksum
== false &&
177 files_checksums
.size() == files_to_ingest_
.size() &&
178 files_checksum_func_names
.size() == files_to_ingest_
.size()) {
179 // Only when verify_file_checksum == false and the checksum for ingested
180 // files are provided, DB will use the provided checksum and does not
181 // generate the checksum for ingested files.
182 need_generate_file_checksum_
= false;
184 need_generate_file_checksum_
= true;
186 FileChecksumGenContext gen_context
;
187 std::unique_ptr
<FileChecksumGenerator
> file_checksum_gen
=
188 db_options_
.file_checksum_gen_factory
->CreateFileChecksumGenerator(
190 std::vector
<std::string
> generated_checksums
;
191 std::vector
<std::string
> generated_checksum_func_names
;
192 // Step 1: generate the checksum for ingested sst file.
193 if (need_generate_file_checksum_
) {
194 for (size_t i
= 0; i
< files_to_ingest_
.size(); i
++) {
195 std::string generated_checksum
;
196 std::string generated_checksum_func_name
;
197 std::string requested_checksum_func_name
;
198 IOStatus io_s
= GenerateOneFileChecksum(
199 fs_
.get(), files_to_ingest_
[i
].internal_file_path
,
200 db_options_
.file_checksum_gen_factory
.get(),
201 requested_checksum_func_name
, &generated_checksum
,
202 &generated_checksum_func_name
,
203 ingestion_options_
.verify_checksums_readahead_size
,
204 db_options_
.allow_mmap_reads
, io_tracer_
);
207 ROCKS_LOG_WARN(db_options_
.info_log
,
208 "Sst file checksum generation of file: %s failed: %s",
209 files_to_ingest_
[i
].internal_file_path
.c_str(),
210 status
.ToString().c_str());
213 if (ingestion_options_
.write_global_seqno
== false) {
214 files_to_ingest_
[i
].file_checksum
= generated_checksum
;
215 files_to_ingest_
[i
].file_checksum_func_name
=
216 generated_checksum_func_name
;
218 generated_checksums
.push_back(generated_checksum
);
219 generated_checksum_func_names
.push_back(generated_checksum_func_name
);
223 // Step 2: based on the verify_file_checksum and ingested checksum
224 // information, do the verification.
226 if (files_checksums
.size() == files_to_ingest_
.size() &&
227 files_checksum_func_names
.size() == files_to_ingest_
.size()) {
228 // Verify the checksum and checksum function name.
229 if (ingestion_options_
.verify_file_checksum
) {
230 for (size_t i
= 0; i
< files_to_ingest_
.size(); i
++) {
231 if (files_checksum_func_names
[i
] !=
232 generated_checksum_func_names
[i
]) {
233 status
= Status::InvalidArgument(
234 "Checksum function name does not match with the checksum "
235 "function name of this DB");
237 db_options_
.info_log
,
238 "Sst file checksum verification of file: %s failed: %s",
239 external_files_paths
[i
].c_str(), status
.ToString().c_str());
242 if (files_checksums
[i
] != generated_checksums
[i
]) {
243 status
= Status::Corruption(
244 "Ingested checksum does not match with the generated "
247 db_options_
.info_log
,
248 "Sst file checksum verification of file: %s failed: %s",
249 files_to_ingest_
[i
].internal_file_path
.c_str(),
250 status
.ToString().c_str());
255 // If verify_file_checksum is not enabled, we only verify the
256 // checksum function name. If it does not match, fail the ingestion.
257 // If matches, we trust the ingested checksum information and store
259 for (size_t i
= 0; i
< files_to_ingest_
.size(); i
++) {
260 if (files_checksum_func_names
[i
] != file_checksum_gen
->Name()) {
261 status
= Status::InvalidArgument(
262 "Checksum function name does not match with the checksum "
263 "function name of this DB");
265 db_options_
.info_log
,
266 "Sst file checksum verification of file: %s failed: %s",
267 external_files_paths
[i
].c_str(), status
.ToString().c_str());
270 files_to_ingest_
[i
].file_checksum
= files_checksums
[i
];
271 files_to_ingest_
[i
].file_checksum_func_name
=
272 files_checksum_func_names
[i
];
275 } else if (files_checksums
.size() != files_checksum_func_names
.size() ||
276 (files_checksums
.size() == files_checksum_func_names
.size() &&
277 files_checksums
.size() != 0)) {
278 // The checksum or checksum function name vector are not both empty
279 // and they are incomplete.
280 status
= Status::InvalidArgument(
281 "The checksum information of ingested sst files are nonempty and "
282 "the size of checksums or the size of the checksum function "
284 "does not match with the number of ingested sst files");
286 db_options_
.info_log
,
287 "The ingested sst files checksum information is incomplete: %s",
288 status
.ToString().c_str());
293 // TODO: The following is duplicated with Cleanup().
295 // We failed, remove all files that we copied into the db
296 for (IngestedFileInfo
& f
: files_to_ingest_
) {
297 if (f
.internal_file_path
.empty()) {
300 Status s
= env_
->DeleteFile(f
.internal_file_path
);
302 ROCKS_LOG_WARN(db_options_
.info_log
,
303 "AddFile() clean up for file %s failed : %s",
304 f
.internal_file_path
.c_str(), s
.ToString().c_str());
312 Status
ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed
,
313 SuperVersion
* super_version
) {
314 autovector
<Range
> ranges
;
315 for (const IngestedFileInfo
& file_to_ingest
: files_to_ingest_
) {
316 ranges
.emplace_back(file_to_ingest
.smallest_internal_key
.user_key(),
317 file_to_ingest
.largest_internal_key
.user_key());
319 Status status
= cfd_
->RangesOverlapWithMemtables(
320 ranges
, super_version
, db_options_
.allow_data_in_errors
, flush_needed
);
321 if (status
.ok() && *flush_needed
&&
322 !ingestion_options_
.allow_blocking_flush
) {
323 status
= Status::InvalidArgument("External file requires flush");
328 // REQUIRES: we have become the only writer by entering both write_thread_ and
329 // nonmem_write_thread_
330 Status
ExternalSstFileIngestionJob::Run() {
332 SuperVersion
* super_version
= cfd_
->GetSuperVersion();
334 // We should never run the job with a memtable that is overlapping
335 // with the files we are ingesting
336 bool need_flush
= false;
337 status
= NeedsFlush(&need_flush
, super_version
);
338 assert(status
.ok() && need_flush
== false);
341 bool force_global_seqno
= false;
343 if (ingestion_options_
.snapshot_consistency
&& !db_snapshots_
->empty()) {
344 // We need to assign a global sequence number to all the files even
345 // if the don't overlap with any ranges since we have snapshots
346 force_global_seqno
= true;
348 // It is safe to use this instead of LastAllocatedSequence since we are
349 // the only active writer, and hence they are equal
350 SequenceNumber last_seqno
= versions_
->LastSequence();
351 edit_
.SetColumnFamily(cfd_
->GetID());
352 // The levels that the files will be ingested into
354 for (IngestedFileInfo
& f
: files_to_ingest_
) {
355 SequenceNumber assigned_seqno
= 0;
356 if (ingestion_options_
.ingest_behind
) {
357 status
= CheckLevelForIngestedBehindFile(&f
);
359 status
= AssignLevelAndSeqnoForIngestedFile(
360 super_version
, force_global_seqno
, cfd_
->ioptions()->compaction_style
,
361 last_seqno
, &f
, &assigned_seqno
);
366 status
= AssignGlobalSeqnoForIngestedFile(&f
, assigned_seqno
);
367 TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
369 if (assigned_seqno
> last_seqno
) {
370 assert(assigned_seqno
== last_seqno
+ 1);
371 last_seqno
= assigned_seqno
;
372 ++consumed_seqno_count_
;
378 status
= GenerateChecksumForIngestedFile(&f
);
383 // We use the import time as the ancester time. This is the time the data
384 // is written to the database.
385 int64_t temp_current_time
= 0;
386 uint64_t current_time
= kUnknownFileCreationTime
;
387 uint64_t oldest_ancester_time
= kUnknownOldestAncesterTime
;
388 if (env_
->GetCurrentTime(&temp_current_time
).ok()) {
389 current_time
= oldest_ancester_time
=
390 static_cast<uint64_t>(temp_current_time
);
393 edit_
.AddFile(f
.picked_level
, f
.fd
.GetNumber(), f
.fd
.GetPathId(),
394 f
.fd
.GetFileSize(), f
.smallest_internal_key
,
395 f
.largest_internal_key
, f
.assigned_seqno
, f
.assigned_seqno
,
396 false, kInvalidBlobFileNumber
, oldest_ancester_time
,
397 current_time
, f
.file_checksum
, f
.file_checksum_func_name
);
402 void ExternalSstFileIngestionJob::UpdateStats() {
403 // Update internal stats for new ingested files
404 uint64_t total_keys
= 0;
405 uint64_t total_l0_files
= 0;
406 uint64_t total_time
= env_
->NowMicros() - job_start_time_
;
408 EventLoggerStream stream
= event_logger_
->Log();
410 << "ingest_finished";
411 stream
<< "files_ingested";
414 for (IngestedFileInfo
& f
: files_to_ingest_
) {
415 InternalStats::CompactionStats
stats(CompactionReason::kExternalSstIngestion
, 1);
416 stats
.micros
= total_time
;
417 // If actual copy occurred for this file, then we need to count the file
418 // size as the actual bytes written. If the file was linked, then we ignore
419 // the bytes written for file metadata.
420 // TODO (yanqin) maybe account for file metadata bytes for exact accuracy?
422 stats
.bytes_written
= f
.fd
.GetFileSize();
424 stats
.bytes_moved
= f
.fd
.GetFileSize();
426 stats
.num_output_files
= 1;
427 cfd_
->internal_stats()->AddCompactionStats(f
.picked_level
,
428 Env::Priority::USER
, stats
);
429 cfd_
->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE
,
431 total_keys
+= f
.num_entries
;
432 if (f
.picked_level
== 0) {
436 db_options_
.info_log
,
437 "[AddFile] External SST file %s was ingested in L%d with path %s "
438 "(global_seqno=%" PRIu64
")\n",
439 f
.external_file_path
.c_str(), f
.picked_level
,
440 f
.internal_file_path
.c_str(), f
.assigned_seqno
);
441 stream
<< "file" << f
.internal_file_path
<< "level" << f
.picked_level
;
445 stream
<< "lsm_state";
447 auto vstorage
= cfd_
->current()->storage_info();
448 for (int level
= 0; level
< vstorage
->num_levels(); ++level
) {
449 stream
<< vstorage
->NumLevelFiles(level
);
453 cfd_
->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL
,
455 cfd_
->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL
,
456 files_to_ingest_
.size());
457 cfd_
->internal_stats()->AddCFStats(
458 InternalStats::INGESTED_LEVEL0_NUM_FILES_TOTAL
, total_l0_files
);
461 void ExternalSstFileIngestionJob::Cleanup(const Status
& status
) {
463 // We failed to add the files to the database
464 // remove all the files we copied
465 for (IngestedFileInfo
& f
: files_to_ingest_
) {
466 if (f
.internal_file_path
.empty()) {
469 Status s
= env_
->DeleteFile(f
.internal_file_path
);
471 ROCKS_LOG_WARN(db_options_
.info_log
,
472 "AddFile() clean up for file %s failed : %s",
473 f
.internal_file_path
.c_str(), s
.ToString().c_str());
476 consumed_seqno_count_
= 0;
477 files_overlap_
= false;
478 } else if (status
.ok() && ingestion_options_
.move_files
) {
479 // The files were moved and added successfully, remove original file links
480 for (IngestedFileInfo
& f
: files_to_ingest_
) {
481 Status s
= env_
->DeleteFile(f
.external_file_path
);
484 db_options_
.info_log
,
485 "%s was added to DB successfully but failed to remove original "
487 f
.external_file_path
.c_str(), s
.ToString().c_str());
493 Status
ExternalSstFileIngestionJob::GetIngestedFileInfo(
494 const std::string
& external_file
, IngestedFileInfo
* file_to_ingest
,
496 file_to_ingest
->external_file_path
= external_file
;
498 // Get external file size
499 Status status
= fs_
->GetFileSize(external_file
, IOOptions(),
500 &file_to_ingest
->file_size
, nullptr);
505 // Create TableReader for external file
506 std::unique_ptr
<TableReader
> table_reader
;
507 std::unique_ptr
<FSRandomAccessFile
> sst_file
;
508 std::unique_ptr
<RandomAccessFileReader
> sst_file_reader
;
510 status
= fs_
->NewRandomAccessFile(external_file
, env_options_
,
515 sst_file_reader
.reset(new RandomAccessFileReader(
516 std::move(sst_file
), external_file
, nullptr /*Env*/, io_tracer_
));
518 status
= cfd_
->ioptions()->table_factory
->NewTableReader(
519 TableReaderOptions(*cfd_
->ioptions(),
520 sv
->mutable_cf_options
.prefix_extractor
.get(),
521 env_options_
, cfd_
->internal_comparator()),
522 std::move(sst_file_reader
), file_to_ingest
->file_size
, &table_reader
);
527 if (ingestion_options_
.verify_checksums_before_ingest
) {
528 // If customized readahead size is needed, we can pass a user option
529 // all the way to here. Right now we just rely on the default readahead
530 // to keep things simple.
532 ro
.readahead_size
= ingestion_options_
.verify_checksums_readahead_size
;
533 status
= table_reader
->VerifyChecksum(
534 ro
, TableReaderCaller::kExternalSSTIngestion
);
540 // Get the external file properties
541 auto props
= table_reader
->GetTableProperties();
542 const auto& uprops
= props
->user_collected_properties
;
545 auto version_iter
= uprops
.find(ExternalSstFilePropertyNames::kVersion
);
546 if (version_iter
== uprops
.end()) {
547 return Status::Corruption("External file version not found");
549 file_to_ingest
->version
= DecodeFixed32(version_iter
->second
.c_str());
551 auto seqno_iter
= uprops
.find(ExternalSstFilePropertyNames::kGlobalSeqno
);
552 if (file_to_ingest
->version
== 2) {
553 // version 2 imply that we have global sequence number
554 if (seqno_iter
== uprops
.end()) {
555 return Status::Corruption(
556 "External file global sequence number not found");
559 // Set the global sequence number
560 file_to_ingest
->original_seqno
= DecodeFixed64(seqno_iter
->second
.c_str());
561 auto offsets_iter
= props
->properties_offsets
.find(
562 ExternalSstFilePropertyNames::kGlobalSeqno
);
563 if (offsets_iter
== props
->properties_offsets
.end() ||
564 offsets_iter
->second
== 0) {
565 file_to_ingest
->global_seqno_offset
= 0;
566 return Status::Corruption("Was not able to find file global seqno field");
568 file_to_ingest
->global_seqno_offset
= static_cast<size_t>(offsets_iter
->second
);
569 } else if (file_to_ingest
->version
== 1) {
570 // SST file V1 should not have global seqno field
571 assert(seqno_iter
== uprops
.end());
572 file_to_ingest
->original_seqno
= 0;
573 if (ingestion_options_
.allow_blocking_flush
||
574 ingestion_options_
.allow_global_seqno
) {
575 return Status::InvalidArgument(
576 "External SST file V1 does not support global seqno");
579 return Status::InvalidArgument("External file version is not supported");
581 // Get number of entries in table
582 file_to_ingest
->num_entries
= props
->num_entries
;
583 file_to_ingest
->num_range_deletions
= props
->num_range_deletions
;
585 ParsedInternalKey key
;
587 // During reading the external file we can cache blocks that we read into
588 // the block cache, if we later change the global seqno of this file, we will
589 // have block in cache that will include keys with wrong seqno.
590 // We need to disable fill_cache so that we read from the file without
591 // updating the block cache.
592 ro
.fill_cache
= false;
593 std::unique_ptr
<InternalIterator
> iter(table_reader
->NewIterator(
594 ro
, sv
->mutable_cf_options
.prefix_extractor
.get(), /*arena=*/nullptr,
595 /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion
));
596 std::unique_ptr
<InternalIterator
> range_del_iter(
597 table_reader
->NewRangeTombstoneIterator(ro
));
599 // Get first (smallest) and last (largest) key from file.
600 file_to_ingest
->smallest_internal_key
=
601 InternalKey("", 0, ValueType::kTypeValue
);
602 file_to_ingest
->largest_internal_key
=
603 InternalKey("", 0, ValueType::kTypeValue
);
604 bool bounds_set
= false;
605 bool allow_data_in_errors
= db_options_
.allow_data_in_errors
;
609 ParseInternalKey(iter
->key(), &key
, allow_data_in_errors
);
610 if (!pik_status
.ok()) {
611 return Status::Corruption("Corrupted key in external file. ",
612 pik_status
.getState());
614 if (key
.sequence
!= 0) {
615 return Status::Corruption("External file has non zero sequence number");
617 file_to_ingest
->smallest_internal_key
.SetFrom(key
);
620 pik_status
= ParseInternalKey(iter
->key(), &key
, allow_data_in_errors
);
621 if (!pik_status
.ok()) {
622 return Status::Corruption("Corrupted key in external file. ",
623 pik_status
.getState());
625 if (key
.sequence
!= 0) {
626 return Status::Corruption("External file has non zero sequence number");
628 file_to_ingest
->largest_internal_key
.SetFrom(key
);
633 // We may need to adjust these key bounds, depending on whether any range
634 // deletion tombstones extend past them.
635 const Comparator
* ucmp
= cfd_
->internal_comparator().user_comparator();
636 if (range_del_iter
!= nullptr) {
637 for (range_del_iter
->SeekToFirst(); range_del_iter
->Valid();
638 range_del_iter
->Next()) {
640 ParseInternalKey(range_del_iter
->key(), &key
, allow_data_in_errors
);
641 if (!pik_status
.ok()) {
642 return Status::Corruption("Corrupted key in external file. ",
643 pik_status
.getState());
645 RangeTombstone
tombstone(key
, range_del_iter
->value());
647 InternalKey start_key
= tombstone
.SerializeKey();
649 sstableKeyCompare(ucmp
, start_key
,
650 file_to_ingest
->smallest_internal_key
) < 0) {
651 file_to_ingest
->smallest_internal_key
= start_key
;
653 InternalKey end_key
= tombstone
.SerializeEndKey();
655 sstableKeyCompare(ucmp
, end_key
,
656 file_to_ingest
->largest_internal_key
) > 0) {
657 file_to_ingest
->largest_internal_key
= end_key
;
663 file_to_ingest
->cf_id
= static_cast<uint32_t>(props
->column_family_id
);
665 file_to_ingest
->table_properties
= *props
;
670 Status
ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
671 SuperVersion
* sv
, bool force_global_seqno
, CompactionStyle compaction_style
,
672 SequenceNumber last_seqno
, IngestedFileInfo
* file_to_ingest
,
673 SequenceNumber
* assigned_seqno
) {
676 if (force_global_seqno
) {
677 *assigned_seqno
= last_seqno
+ 1;
678 if (compaction_style
== kCompactionStyleUniversal
|| files_overlap_
) {
679 file_to_ingest
->picked_level
= 0;
684 bool overlap_with_db
= false;
687 ro
.total_order_seek
= true;
688 int target_level
= 0;
689 auto* vstorage
= cfd_
->current()->storage_info();
691 for (int lvl
= 0; lvl
< cfd_
->NumberLevels(); lvl
++) {
692 if (lvl
> 0 && lvl
< vstorage
->base_level()) {
696 if (vstorage
->NumLevelFiles(lvl
) > 0) {
697 bool overlap_with_level
= false;
698 status
= sv
->current
->OverlapWithLevelIterator(
699 ro
, env_options_
, file_to_ingest
->smallest_internal_key
.user_key(),
700 file_to_ingest
->largest_internal_key
.user_key(), lvl
,
701 &overlap_with_level
);
705 if (overlap_with_level
) {
706 // We must use L0 or any level higher than `lvl` to be able to overwrite
707 // the keys that we overlap with in this level, We also need to assign
708 // this file a seqno to overwrite the existing keys in level `lvl`
709 overlap_with_db
= true;
713 if (compaction_style
== kCompactionStyleUniversal
&& lvl
!= 0) {
714 const std::vector
<FileMetaData
*>& level_files
=
715 vstorage
->LevelFiles(lvl
);
716 const SequenceNumber level_largest_seqno
=
717 (*max_element(level_files
.begin(), level_files
.end(),
718 [](FileMetaData
* f1
, FileMetaData
* f2
) {
719 return f1
->fd
.largest_seqno
< f2
->fd
.largest_seqno
;
722 // should only assign seqno to current level's largest seqno when
724 if (level_largest_seqno
!= 0 &&
725 IngestedFileFitInLevel(file_to_ingest
, lvl
)) {
726 *assigned_seqno
= level_largest_seqno
;
731 } else if (compaction_style
== kCompactionStyleUniversal
) {
735 // We don't overlap with any keys in this level, but we still need to check
736 // if our file can fit in it
737 if (IngestedFileFitInLevel(file_to_ingest
, lvl
)) {
741 // If files overlap, we have to ingest them at level 0 and assign the newest
743 if (files_overlap_
) {
745 *assigned_seqno
= last_seqno
+ 1;
747 TEST_SYNC_POINT_CALLBACK(
748 "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
750 file_to_ingest
->picked_level
= target_level
;
751 if (overlap_with_db
&& *assigned_seqno
== 0) {
752 *assigned_seqno
= last_seqno
+ 1;
757 Status
ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
758 IngestedFileInfo
* file_to_ingest
) {
759 auto* vstorage
= cfd_
->current()->storage_info();
760 // first check if new files fit in the bottommost level
761 int bottom_lvl
= cfd_
->NumberLevels() - 1;
762 if(!IngestedFileFitInLevel(file_to_ingest
, bottom_lvl
)) {
763 return Status::InvalidArgument(
764 "Can't ingest_behind file as it doesn't fit "
765 "at the bottommost level!");
768 // second check if despite allow_ingest_behind=true we still have 0 seqnums
769 // at some upper level
770 for (int lvl
= 0; lvl
< cfd_
->NumberLevels() - 1; lvl
++) {
771 for (auto file
: vstorage
->LevelFiles(lvl
)) {
772 if (file
->fd
.smallest_seqno
== 0) {
773 return Status::InvalidArgument(
774 "Can't ingest_behind file as despite allow_ingest_behind=true "
775 "there are files with 0 seqno in database at upper levels!");
780 file_to_ingest
->picked_level
= bottom_lvl
;
784 Status
ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
785 IngestedFileInfo
* file_to_ingest
, SequenceNumber seqno
) {
786 if (file_to_ingest
->original_seqno
== seqno
) {
787 // This file already have the correct global seqno
789 } else if (!ingestion_options_
.allow_global_seqno
) {
790 return Status::InvalidArgument("Global seqno is required, but disabled");
791 } else if (file_to_ingest
->global_seqno_offset
== 0) {
792 return Status::InvalidArgument(
793 "Trying to set global seqno for a file that don't have a global seqno "
797 if (ingestion_options_
.write_global_seqno
) {
798 // Determine if we can write global_seqno to a given offset of file.
799 // If the file system does not support random write, then we should not.
800 // Otherwise we should.
801 std::unique_ptr
<FSRandomRWFile
> rwfile
;
803 fs_
->NewRandomRWFile(file_to_ingest
->internal_file_path
, env_options_
,
806 FSRandomRWFilePtr
fsptr(std::move(rwfile
), io_tracer_
);
807 std::string seqno_val
;
808 PutFixed64(&seqno_val
, seqno
);
809 status
= fsptr
->Write(file_to_ingest
->global_seqno_offset
, seqno_val
,
810 IOOptions(), nullptr);
812 TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno");
813 status
= SyncIngestedFile(fsptr
.get());
814 TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncGlobalSeqno");
816 ROCKS_LOG_WARN(db_options_
.info_log
,
817 "Failed to sync ingested file %s after writing global "
818 "sequence number: %s",
819 file_to_ingest
->internal_file_path
.c_str(),
820 status
.ToString().c_str());
826 } else if (!status
.IsNotSupported()) {
831 file_to_ingest
->assigned_seqno
= seqno
;
835 IOStatus
ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile(
836 IngestedFileInfo
* file_to_ingest
) {
837 if (db_options_
.file_checksum_gen_factory
== nullptr ||
838 need_generate_file_checksum_
== false ||
839 ingestion_options_
.write_global_seqno
== false) {
840 // If file_checksum_gen_factory is not set, we are not able to generate
841 // the checksum. if write_global_seqno is false, it means we will use
842 // file checksum generated during Prepare(). This step will be skipped.
843 return IOStatus::OK();
845 std::string file_checksum
;
846 std::string file_checksum_func_name
;
847 std::string requested_checksum_func_name
;
848 IOStatus io_s
= GenerateOneFileChecksum(
849 fs_
.get(), file_to_ingest
->internal_file_path
,
850 db_options_
.file_checksum_gen_factory
.get(), requested_checksum_func_name
,
851 &file_checksum
, &file_checksum_func_name
,
852 ingestion_options_
.verify_checksums_readahead_size
,
853 db_options_
.allow_mmap_reads
, io_tracer_
);
857 file_to_ingest
->file_checksum
= file_checksum
;
858 file_to_ingest
->file_checksum_func_name
= file_checksum_func_name
;
859 return IOStatus::OK();
862 bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
863 const IngestedFileInfo
* file_to_ingest
, int level
) {
865 // Files can always fit in L0
869 auto* vstorage
= cfd_
->current()->storage_info();
870 Slice
file_smallest_user_key(
871 file_to_ingest
->smallest_internal_key
.user_key());
872 Slice
file_largest_user_key(file_to_ingest
->largest_internal_key
.user_key());
874 if (vstorage
->OverlapInLevel(level
, &file_smallest_user_key
,
875 &file_largest_user_key
)) {
876 // File overlap with another files in this level, we cannot
877 // add it to this level
880 if (cfd_
->RangeOverlapWithCompaction(file_smallest_user_key
,
881 file_largest_user_key
, level
)) {
882 // File overlap with a running compaction output that will be stored
883 // in this level, we cannot add this file to this level
887 // File did not overlap with level files, our compaction output
891 template <typename TWritableFile
>
892 Status
ExternalSstFileIngestionJob::SyncIngestedFile(TWritableFile
* file
) {
893 assert(file
!= nullptr);
894 if (db_options_
.use_fsync
) {
895 return file
->Fsync(IOOptions(), nullptr);
897 return file
->Sync(IOOptions(), nullptr);
901 } // namespace ROCKSDB_NAMESPACE
903 #endif // !ROCKSDB_LITE