]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/external_sst_file_ingestion_job.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / external_sst_file_ingestion_job.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
8#include "db/external_sst_file_ingestion_job.h"
9
7c673cae 10#include <algorithm>
f67539c2 11#include <cinttypes>
7c673cae 12#include <string>
f67539c2 13#include <unordered_set>
7c673cae
FG
14#include <vector>
15
f67539c2 16#include "db/db_impl/db_impl.h"
7c673cae 17#include "db/version_edit.h"
f67539c2
TL
18#include "file/file_util.h"
19#include "file/random_access_file_reader.h"
1e59de90 20#include "logging/logging.h"
7c673cae
FG
21#include "table/merging_iterator.h"
22#include "table/scoped_arena_iterator.h"
23#include "table/sst_file_writer_collectors.h"
24#include "table/table_builder.h"
1e59de90 25#include "table/unique_id_impl.h"
f67539c2 26#include "test_util/sync_point.h"
7c673cae 27#include "util/stop_watch.h"
7c673cae 28
f67539c2 29namespace ROCKSDB_NAMESPACE {
7c673cae
FG
30
31Status ExternalSstFileIngestionJob::Prepare(
11fdf7f2 32 const std::vector<std::string>& external_files_paths,
20effc67
TL
33 const std::vector<std::string>& files_checksums,
34 const std::vector<std::string>& files_checksum_func_names,
1e59de90
TL
35 const Temperature& file_temperature, uint64_t next_file_number,
36 SuperVersion* sv) {
7c673cae
FG
37 Status status;
38
39 // Read the information of files we are ingesting
40 for (const std::string& file_path : external_files_paths) {
41 IngestedFileInfo file_to_ingest;
1e59de90
TL
42 status =
43 GetIngestedFileInfo(file_path, next_file_number++, &file_to_ingest, sv);
7c673cae
FG
44 if (!status.ok()) {
45 return status;
46 }
7c673cae 47
1e59de90 48 if (file_to_ingest.cf_id !=
7c673cae 49 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
1e59de90 50 file_to_ingest.cf_id != cfd_->GetID()) {
7c673cae 51 return Status::InvalidArgument(
20effc67 52 "External file column family id don't match");
7c673cae 53 }
1e59de90
TL
54
55 if (file_to_ingest.num_entries == 0 &&
56 file_to_ingest.num_range_deletions == 0) {
57 return Status::InvalidArgument("File contain no entries");
58 }
59
60 if (!file_to_ingest.smallest_internal_key.Valid() ||
61 !file_to_ingest.largest_internal_key.Valid()) {
62 return Status::Corruption("Generated table have corrupted keys");
63 }
64
65 files_to_ingest_.emplace_back(std::move(file_to_ingest));
7c673cae
FG
66 }
67
68 const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
69 auto num_files = files_to_ingest_.size();
70 if (num_files == 0) {
71 return Status::InvalidArgument("The list of files is empty");
72 } else if (num_files > 1) {
20effc67 73 // Verify that passed files don't have overlapping ranges
7c673cae
FG
74 autovector<const IngestedFileInfo*> sorted_files;
75 for (size_t i = 0; i < num_files; i++) {
76 sorted_files.push_back(&files_to_ingest_[i]);
77 }
78
79 std::sort(
80 sorted_files.begin(), sorted_files.end(),
81 [&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {
f67539c2
TL
82 return sstableKeyCompare(ucmp, info1->smallest_internal_key,
83 info2->smallest_internal_key) < 0;
7c673cae
FG
84 });
85
20effc67 86 for (size_t i = 0; i + 1 < num_files; i++) {
f67539c2
TL
87 if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,
88 sorted_files[i + 1]->smallest_internal_key) >= 0) {
89 files_overlap_ = true;
90 break;
7c673cae
FG
91 }
92 }
93 }
94
1e59de90
TL
95 // Hanlde the file temperature
96 for (size_t i = 0; i < num_files; i++) {
97 files_to_ingest_[i].file_temperature = file_temperature;
f67539c2
TL
98 }
99
1e59de90
TL
100 if (ingestion_options_.ingest_behind && files_overlap_) {
101 return Status::NotSupported("Files have overlapping ranges");
7c673cae
FG
102 }
103
104 // Copy/Move external files into DB
f67539c2 105 std::unordered_set<size_t> ingestion_path_ids;
7c673cae 106 for (IngestedFileInfo& f : files_to_ingest_) {
f67539c2 107 f.copy_file = false;
7c673cae 108 const std::string path_outside_db = f.external_file_path;
1e59de90
TL
109 const std::string path_inside_db = TableFileName(
110 cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
7c673cae 111 if (ingestion_options_.move_files) {
f67539c2
TL
112 status =
113 fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr);
114 if (status.ok()) {
115 // It is unsafe to assume application had sync the file and file
116 // directory before ingest the file. For integrity of RocksDB we need
117 // to sync the file.
118 std::unique_ptr<FSWritableFile> file_to_sync;
1e59de90
TL
119 Status s = fs_->ReopenWritableFile(path_inside_db, env_options_,
120 &file_to_sync, nullptr);
121 TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:Reopen",
122 &s);
123 // Some file systems (especially remote/distributed) don't support
124 // reopening a file for writing and don't require reopening and
125 // syncing the file. Ignore the NotSupported error in that case.
126 if (!s.IsNotSupported()) {
127 status = s;
128 if (status.ok()) {
129 TEST_SYNC_POINT(
130 "ExternalSstFileIngestionJob::BeforeSyncIngestedFile");
131 status = SyncIngestedFile(file_to_sync.get());
132 TEST_SYNC_POINT(
133 "ExternalSstFileIngestionJob::AfterSyncIngestedFile");
134 if (!status.ok()) {
135 ROCKS_LOG_WARN(db_options_.info_log,
136 "Failed to sync ingested file %s: %s",
137 path_inside_db.c_str(), status.ToString().c_str());
138 }
f67539c2
TL
139 }
140 }
141 } else if (status.IsNotSupported() &&
142 ingestion_options_.failed_move_fall_back_to_copy) {
143 // Original file is on a different FS, use copy instead of hard linking.
11fdf7f2 144 f.copy_file = true;
1e59de90
TL
145 ROCKS_LOG_INFO(db_options_.info_log,
146 "Triy to link file %s but it's not supported : %s",
147 path_outside_db.c_str(), status.ToString().c_str());
7c673cae
FG
148 }
149 } else {
11fdf7f2 150 f.copy_file = true;
7c673cae 151 }
f67539c2
TL
152
153 if (f.copy_file) {
154 TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
155 nullptr);
156 // CopyFile also sync the new file.
1e59de90
TL
157 status =
158 CopyFile(fs_.get(), path_outside_db, path_inside_db, 0,
159 db_options_.use_fsync, io_tracer_, Temperature::kUnknown);
f67539c2 160 }
11fdf7f2 161 TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
7c673cae
FG
162 if (!status.ok()) {
163 break;
164 }
165 f.internal_file_path = path_inside_db;
20effc67
TL
166 // Initialize the checksum information of ingested files.
167 f.file_checksum = kUnknownFileChecksum;
168 f.file_checksum_func_name = kUnknownFileChecksumFuncName;
f67539c2
TL
169 ingestion_path_ids.insert(f.fd.GetPathId());
170 }
171
172 TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir");
173 if (status.ok()) {
174 for (auto path_id : ingestion_path_ids) {
1e59de90
TL
175 status = directories_->GetDataDir(path_id)->FsyncWithDirOptions(
176 IOOptions(), nullptr,
177 DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
f67539c2
TL
178 if (!status.ok()) {
179 ROCKS_LOG_WARN(db_options_.info_log,
180 "Failed to sync directory %" ROCKSDB_PRIszt
181 " while ingest file: %s",
182 path_id, status.ToString().c_str());
183 break;
184 }
185 }
7c673cae 186 }
f67539c2 187 TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir");
7c673cae 188
20effc67
TL
189 // Generate and check the sst file checksum. Note that, if
190 // IngestExternalFileOptions::write_global_seqno is true, we will not update
191 // the checksum information in the files_to_ingests_ here, since the file is
192 // upadted with the new global_seqno. After global_seqno is updated, DB will
193 // generate the new checksum and store it in the Manifest. In all other cases
194 // if ingestion_options_.write_global_seqno == true and
195 // verify_file_checksum is false, we only check the checksum function name.
196 if (status.ok() && db_options_.file_checksum_gen_factory != nullptr) {
197 if (ingestion_options_.verify_file_checksum == false &&
198 files_checksums.size() == files_to_ingest_.size() &&
199 files_checksum_func_names.size() == files_to_ingest_.size()) {
200 // Only when verify_file_checksum == false and the checksum for ingested
201 // files are provided, DB will use the provided checksum and does not
202 // generate the checksum for ingested files.
203 need_generate_file_checksum_ = false;
204 } else {
205 need_generate_file_checksum_ = true;
206 }
207 FileChecksumGenContext gen_context;
208 std::unique_ptr<FileChecksumGenerator> file_checksum_gen =
209 db_options_.file_checksum_gen_factory->CreateFileChecksumGenerator(
210 gen_context);
211 std::vector<std::string> generated_checksums;
212 std::vector<std::string> generated_checksum_func_names;
213 // Step 1: generate the checksum for ingested sst file.
214 if (need_generate_file_checksum_) {
215 for (size_t i = 0; i < files_to_ingest_.size(); i++) {
216 std::string generated_checksum;
217 std::string generated_checksum_func_name;
218 std::string requested_checksum_func_name;
1e59de90
TL
219 // TODO: rate limit file reads for checksum calculation during file
220 // ingestion.
20effc67
TL
221 IOStatus io_s = GenerateOneFileChecksum(
222 fs_.get(), files_to_ingest_[i].internal_file_path,
223 db_options_.file_checksum_gen_factory.get(),
224 requested_checksum_func_name, &generated_checksum,
225 &generated_checksum_func_name,
226 ingestion_options_.verify_checksums_readahead_size,
1e59de90
TL
227 db_options_.allow_mmap_reads, io_tracer_,
228 db_options_.rate_limiter.get(),
229 Env::IO_TOTAL /* rate_limiter_priority */);
20effc67
TL
230 if (!io_s.ok()) {
231 status = io_s;
232 ROCKS_LOG_WARN(db_options_.info_log,
233 "Sst file checksum generation of file: %s failed: %s",
234 files_to_ingest_[i].internal_file_path.c_str(),
235 status.ToString().c_str());
236 break;
237 }
238 if (ingestion_options_.write_global_seqno == false) {
239 files_to_ingest_[i].file_checksum = generated_checksum;
240 files_to_ingest_[i].file_checksum_func_name =
241 generated_checksum_func_name;
242 }
243 generated_checksums.push_back(generated_checksum);
244 generated_checksum_func_names.push_back(generated_checksum_func_name);
245 }
246 }
247
248 // Step 2: based on the verify_file_checksum and ingested checksum
249 // information, do the verification.
250 if (status.ok()) {
251 if (files_checksums.size() == files_to_ingest_.size() &&
252 files_checksum_func_names.size() == files_to_ingest_.size()) {
253 // Verify the checksum and checksum function name.
254 if (ingestion_options_.verify_file_checksum) {
255 for (size_t i = 0; i < files_to_ingest_.size(); i++) {
256 if (files_checksum_func_names[i] !=
257 generated_checksum_func_names[i]) {
258 status = Status::InvalidArgument(
259 "Checksum function name does not match with the checksum "
260 "function name of this DB");
261 ROCKS_LOG_WARN(
262 db_options_.info_log,
263 "Sst file checksum verification of file: %s failed: %s",
264 external_files_paths[i].c_str(), status.ToString().c_str());
265 break;
266 }
267 if (files_checksums[i] != generated_checksums[i]) {
268 status = Status::Corruption(
269 "Ingested checksum does not match with the generated "
270 "checksum");
271 ROCKS_LOG_WARN(
272 db_options_.info_log,
273 "Sst file checksum verification of file: %s failed: %s",
274 files_to_ingest_[i].internal_file_path.c_str(),
275 status.ToString().c_str());
276 break;
277 }
278 }
279 } else {
280 // If verify_file_checksum is not enabled, we only verify the
281 // checksum function name. If it does not match, fail the ingestion.
282 // If matches, we trust the ingested checksum information and store
283 // in the Manifest.
284 for (size_t i = 0; i < files_to_ingest_.size(); i++) {
285 if (files_checksum_func_names[i] != file_checksum_gen->Name()) {
286 status = Status::InvalidArgument(
287 "Checksum function name does not match with the checksum "
288 "function name of this DB");
289 ROCKS_LOG_WARN(
290 db_options_.info_log,
291 "Sst file checksum verification of file: %s failed: %s",
292 external_files_paths[i].c_str(), status.ToString().c_str());
293 break;
294 }
295 files_to_ingest_[i].file_checksum = files_checksums[i];
296 files_to_ingest_[i].file_checksum_func_name =
297 files_checksum_func_names[i];
298 }
299 }
300 } else if (files_checksums.size() != files_checksum_func_names.size() ||
301 (files_checksums.size() == files_checksum_func_names.size() &&
302 files_checksums.size() != 0)) {
303 // The checksum or checksum function name vector are not both empty
304 // and they are incomplete.
305 status = Status::InvalidArgument(
306 "The checksum information of ingested sst files are nonempty and "
307 "the size of checksums or the size of the checksum function "
308 "names "
309 "does not match with the number of ingested sst files");
310 ROCKS_LOG_WARN(
311 db_options_.info_log,
312 "The ingested sst files checksum information is incomplete: %s",
313 status.ToString().c_str());
314 }
315 }
316 }
317
f67539c2 318 // TODO: The following is duplicated with Cleanup().
7c673cae 319 if (!status.ok()) {
1e59de90 320 IOOptions io_opts;
7c673cae
FG
321 // We failed, remove all files that we copied into the db
322 for (IngestedFileInfo& f : files_to_ingest_) {
11fdf7f2 323 if (f.internal_file_path.empty()) {
f67539c2 324 continue;
7c673cae 325 }
1e59de90 326 Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr);
7c673cae
FG
327 if (!s.ok()) {
328 ROCKS_LOG_WARN(db_options_.info_log,
329 "AddFile() clean up for file %s failed : %s",
330 f.internal_file_path.c_str(), s.ToString().c_str());
331 }
332 }
333 }
334
335 return status;
336}
337
11fdf7f2
TL
338Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed,
339 SuperVersion* super_version) {
340 autovector<Range> ranges;
1e59de90
TL
341 autovector<std::string> keys;
342 size_t ts_sz = cfd_->user_comparator()->timestamp_size();
343 if (ts_sz) {
344 // Check all ranges [begin, end] inclusively. Add maximum
345 // timestamp to include all `begin` keys, and add minimal timestamp to
346 // include all `end` keys.
347 for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) {
348 std::string begin_str;
349 std::string end_str;
350 AppendUserKeyWithMaxTimestamp(
351 &begin_str, file_to_ingest.smallest_internal_key.user_key(), ts_sz);
352 AppendKeyWithMinTimestamp(
353 &end_str, file_to_ingest.largest_internal_key.user_key(), ts_sz);
354 keys.emplace_back(std::move(begin_str));
355 keys.emplace_back(std::move(end_str));
356 }
357 for (size_t i = 0; i < files_to_ingest_.size(); ++i) {
358 ranges.emplace_back(keys[2 * i], keys[2 * i + 1]);
359 }
360 } else {
361 for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) {
362 ranges.emplace_back(file_to_ingest.smallest_internal_key.user_key(),
363 file_to_ingest.largest_internal_key.user_key());
364 }
11fdf7f2 365 }
20effc67
TL
366 Status status = cfd_->RangesOverlapWithMemtables(
367 ranges, super_version, db_options_.allow_data_in_errors, flush_needed);
7c673cae
FG
368 if (status.ok() && *flush_needed &&
369 !ingestion_options_.allow_blocking_flush) {
370 status = Status::InvalidArgument("External file requires flush");
371 }
372 return status;
373}
374
11fdf7f2
TL
375// REQUIRES: we have become the only writer by entering both write_thread_ and
376// nonmem_write_thread_
7c673cae
FG
377Status ExternalSstFileIngestionJob::Run() {
378 Status status;
11fdf7f2 379 SuperVersion* super_version = cfd_->GetSuperVersion();
7c673cae
FG
380#ifndef NDEBUG
381 // We should never run the job with a memtable that is overlapping
382 // with the files we are ingesting
383 bool need_flush = false;
11fdf7f2 384 status = NeedsFlush(&need_flush, super_version);
1e59de90
TL
385 if (!status.ok()) {
386 return status;
387 }
388 if (need_flush) {
389 return Status::TryAgain();
390 }
7c673cae
FG
391 assert(status.ok() && need_flush == false);
392#endif
393
7c673cae
FG
394 bool force_global_seqno = false;
395
396 if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) {
397 // We need to assign a global sequence number to all the files even
20effc67 398 // if the don't overlap with any ranges since we have snapshots
7c673cae
FG
399 force_global_seqno = true;
400 }
11fdf7f2
TL
401 // It is safe to use this instead of LastAllocatedSequence since we are
402 // the only active writer, and hence they are equal
f67539c2 403 SequenceNumber last_seqno = versions_->LastSequence();
7c673cae
FG
404 edit_.SetColumnFamily(cfd_->GetID());
405 // The levels that the files will be ingested into
406
407 for (IngestedFileInfo& f : files_to_ingest_) {
408 SequenceNumber assigned_seqno = 0;
11fdf7f2
TL
409 if (ingestion_options_.ingest_behind) {
410 status = CheckLevelForIngestedBehindFile(&f);
411 } else {
412 status = AssignLevelAndSeqnoForIngestedFile(
f67539c2
TL
413 super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
414 last_seqno, &f, &assigned_seqno);
11fdf7f2 415 }
1e59de90
TL
416
417 // Modify the smallest/largest internal key to include the sequence number
418 // that we just learned. Only overwrite sequence number zero. There could
419 // be a nonzero sequence number already to indicate a range tombstone's
420 // exclusive endpoint.
421 ParsedInternalKey smallest_parsed, largest_parsed;
422 if (status.ok()) {
423 status = ParseInternalKey(*f.smallest_internal_key.rep(),
424 &smallest_parsed, false /* log_err_key */);
425 }
426 if (status.ok()) {
427 status = ParseInternalKey(*f.largest_internal_key.rep(), &largest_parsed,
428 false /* log_err_key */);
429 }
7c673cae
FG
430 if (!status.ok()) {
431 return status;
432 }
1e59de90
TL
433 if (smallest_parsed.sequence == 0) {
434 UpdateInternalKey(f.smallest_internal_key.rep(), assigned_seqno,
435 smallest_parsed.type);
436 }
437 if (largest_parsed.sequence == 0) {
438 UpdateInternalKey(f.largest_internal_key.rep(), assigned_seqno,
439 largest_parsed.type);
440 }
441
7c673cae
FG
442 status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
443 TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
444 &assigned_seqno);
f67539c2
TL
445 if (assigned_seqno > last_seqno) {
446 assert(assigned_seqno == last_seqno + 1);
447 last_seqno = assigned_seqno;
448 ++consumed_seqno_count_;
7c673cae
FG
449 }
450 if (!status.ok()) {
451 return status;
452 }
f67539c2 453
20effc67
TL
454 status = GenerateChecksumForIngestedFile(&f);
455 if (!status.ok()) {
456 return status;
457 }
458
f67539c2
TL
459 // We use the import time as the ancester time. This is the time the data
460 // is written to the database.
461 int64_t temp_current_time = 0;
462 uint64_t current_time = kUnknownFileCreationTime;
463 uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
1e59de90 464 if (clock_->GetCurrentTime(&temp_current_time).ok()) {
f67539c2
TL
465 current_time = oldest_ancester_time =
466 static_cast<uint64_t>(temp_current_time);
467 }
1e59de90
TL
468 FileMetaData f_metadata(
469 f.fd.GetNumber(), f.fd.GetPathId(), f.fd.GetFileSize(),
470 f.smallest_internal_key, f.largest_internal_key, f.assigned_seqno,
471 f.assigned_seqno, false, f.file_temperature, kInvalidBlobFileNumber,
472 oldest_ancester_time, current_time, f.file_checksum,
473 f.file_checksum_func_name, f.unique_id);
474 f_metadata.temperature = f.file_temperature;
475 edit_.AddFile(f.picked_level, f_metadata);
7c673cae 476 }
7c673cae
FG
477 return status;
478}
479
480void ExternalSstFileIngestionJob::UpdateStats() {
481 // Update internal stats for new ingested files
482 uint64_t total_keys = 0;
483 uint64_t total_l0_files = 0;
1e59de90 484 uint64_t total_time = clock_->NowMicros() - job_start_time_;
f67539c2
TL
485
486 EventLoggerStream stream = event_logger_->Log();
487 stream << "event"
488 << "ingest_finished";
489 stream << "files_ingested";
490 stream.StartArray();
491
7c673cae 492 for (IngestedFileInfo& f : files_to_ingest_) {
1e59de90
TL
493 InternalStats::CompactionStats stats(
494 CompactionReason::kExternalSstIngestion, 1);
7c673cae 495 stats.micros = total_time;
494da23a 496 // If actual copy occurred for this file, then we need to count the file
11fdf7f2
TL
497 // size as the actual bytes written. If the file was linked, then we ignore
498 // the bytes written for file metadata.
499 // TODO (yanqin) maybe account for file metadata bytes for exact accuracy?
500 if (f.copy_file) {
501 stats.bytes_written = f.fd.GetFileSize();
502 } else {
503 stats.bytes_moved = f.fd.GetFileSize();
504 }
7c673cae 505 stats.num_output_files = 1;
494da23a
TL
506 cfd_->internal_stats()->AddCompactionStats(f.picked_level,
507 Env::Priority::USER, stats);
7c673cae
FG
508 cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE,
509 f.fd.GetFileSize());
510 total_keys += f.num_entries;
511 if (f.picked_level == 0) {
512 total_l0_files += 1;
513 }
514 ROCKS_LOG_INFO(
515 db_options_.info_log,
516 "[AddFile] External SST file %s was ingested in L%d with path %s "
517 "(global_seqno=%" PRIu64 ")\n",
518 f.external_file_path.c_str(), f.picked_level,
519 f.internal_file_path.c_str(), f.assigned_seqno);
f67539c2
TL
520 stream << "file" << f.internal_file_path << "level" << f.picked_level;
521 }
522 stream.EndArray();
523
524 stream << "lsm_state";
525 stream.StartArray();
526 auto vstorage = cfd_->current()->storage_info();
527 for (int level = 0; level < vstorage->num_levels(); ++level) {
528 stream << vstorage->NumLevelFiles(level);
7c673cae 529 }
f67539c2
TL
530 stream.EndArray();
531
7c673cae
FG
532 cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL,
533 total_keys);
534 cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL,
535 files_to_ingest_.size());
536 cfd_->internal_stats()->AddCFStats(
537 InternalStats::INGESTED_LEVEL0_NUM_FILES_TOTAL, total_l0_files);
538}
539
540void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
1e59de90 541 IOOptions io_opts;
7c673cae
FG
542 if (!status.ok()) {
543 // We failed to add the files to the database
544 // remove all the files we copied
545 for (IngestedFileInfo& f : files_to_ingest_) {
f67539c2
TL
546 if (f.internal_file_path.empty()) {
547 continue;
548 }
1e59de90 549 Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr);
7c673cae
FG
550 if (!s.ok()) {
551 ROCKS_LOG_WARN(db_options_.info_log,
552 "AddFile() clean up for file %s failed : %s",
553 f.internal_file_path.c_str(), s.ToString().c_str());
554 }
555 }
f67539c2
TL
556 consumed_seqno_count_ = 0;
557 files_overlap_ = false;
7c673cae
FG
558 } else if (status.ok() && ingestion_options_.move_files) {
559 // The files were moved and added successfully, remove original file links
560 for (IngestedFileInfo& f : files_to_ingest_) {
1e59de90 561 Status s = fs_->DeleteFile(f.external_file_path, io_opts, nullptr);
7c673cae
FG
562 if (!s.ok()) {
563 ROCKS_LOG_WARN(
564 db_options_.info_log,
565 "%s was added to DB successfully but failed to remove original "
566 "file link : %s",
567 f.external_file_path.c_str(), s.ToString().c_str());
568 }
569 }
570 }
571}
572
573Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
1e59de90
TL
574 const std::string& external_file, uint64_t new_file_number,
575 IngestedFileInfo* file_to_ingest, SuperVersion* sv) {
7c673cae
FG
576 file_to_ingest->external_file_path = external_file;
577
578 // Get external file size
f67539c2
TL
579 Status status = fs_->GetFileSize(external_file, IOOptions(),
580 &file_to_ingest->file_size, nullptr);
7c673cae
FG
581 if (!status.ok()) {
582 return status;
583 }
584
1e59de90
TL
585 // Assign FD with number
586 file_to_ingest->fd =
587 FileDescriptor(new_file_number, 0, file_to_ingest->file_size);
588
7c673cae
FG
589 // Create TableReader for external file
590 std::unique_ptr<TableReader> table_reader;
f67539c2 591 std::unique_ptr<FSRandomAccessFile> sst_file;
7c673cae
FG
592 std::unique_ptr<RandomAccessFileReader> sst_file_reader;
593
1e59de90
TL
594 status =
595 fs_->NewRandomAccessFile(external_file, env_options_, &sst_file, nullptr);
7c673cae
FG
596 if (!status.ok()) {
597 return status;
598 }
20effc67
TL
599 sst_file_reader.reset(new RandomAccessFileReader(
600 std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_));
7c673cae
FG
601
602 status = cfd_->ioptions()->table_factory->NewTableReader(
1e59de90
TL
603 TableReaderOptions(
604 *cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
605 env_options_, cfd_->internal_comparator(),
606 /*skip_filters*/ false, /*immortal*/ false,
607 /*force_direct_prefetch*/ false, /*level*/ -1,
608 /*block_cache_tracer*/ nullptr,
609 /*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
610 /*cur_file_num*/ new_file_number),
7c673cae
FG
611 std::move(sst_file_reader), file_to_ingest->file_size, &table_reader);
612 if (!status.ok()) {
613 return status;
614 }
615
494da23a 616 if (ingestion_options_.verify_checksums_before_ingest) {
f67539c2
TL
617 // If customized readahead size is needed, we can pass a user option
618 // all the way to here. Right now we just rely on the default readahead
619 // to keep things simple.
620 ReadOptions ro;
621 ro.readahead_size = ingestion_options_.verify_checksums_readahead_size;
622 status = table_reader->VerifyChecksum(
623 ro, TableReaderCaller::kExternalSSTIngestion);
494da23a
TL
624 }
625 if (!status.ok()) {
626 return status;
627 }
628
7c673cae
FG
629 // Get the external file properties
630 auto props = table_reader->GetTableProperties();
631 const auto& uprops = props->user_collected_properties;
632
633 // Get table version
634 auto version_iter = uprops.find(ExternalSstFilePropertyNames::kVersion);
635 if (version_iter == uprops.end()) {
636 return Status::Corruption("External file version not found");
637 }
638 file_to_ingest->version = DecodeFixed32(version_iter->second.c_str());
639
640 auto seqno_iter = uprops.find(ExternalSstFilePropertyNames::kGlobalSeqno);
641 if (file_to_ingest->version == 2) {
642 // version 2 imply that we have global sequence number
643 if (seqno_iter == uprops.end()) {
644 return Status::Corruption(
645 "External file global sequence number not found");
646 }
647
648 // Set the global sequence number
649 file_to_ingest->original_seqno = DecodeFixed64(seqno_iter->second.c_str());
1e59de90 650 if (props->external_sst_file_global_seqno_offset == 0) {
11fdf7f2 651 file_to_ingest->global_seqno_offset = 0;
7c673cae
FG
652 return Status::Corruption("Was not able to find file global seqno field");
653 }
1e59de90
TL
654 file_to_ingest->global_seqno_offset =
655 static_cast<size_t>(props->external_sst_file_global_seqno_offset);
7c673cae
FG
656 } else if (file_to_ingest->version == 1) {
657 // SST file V1 should not have global seqno field
658 assert(seqno_iter == uprops.end());
659 file_to_ingest->original_seqno = 0;
660 if (ingestion_options_.allow_blocking_flush ||
1e59de90 661 ingestion_options_.allow_global_seqno) {
7c673cae 662 return Status::InvalidArgument(
1e59de90 663 "External SST file V1 does not support global seqno");
7c673cae
FG
664 }
665 } else {
666 return Status::InvalidArgument("External file version is not supported");
667 }
668 // Get number of entries in table
669 file_to_ingest->num_entries = props->num_entries;
11fdf7f2 670 file_to_ingest->num_range_deletions = props->num_range_deletions;
7c673cae
FG
671
672 ParsedInternalKey key;
673 ReadOptions ro;
674 // During reading the external file we can cache blocks that we read into
675 // the block cache, if we later change the global seqno of this file, we will
676 // have block in cache that will include keys with wrong seqno.
677 // We need to disable fill_cache so that we read from the file without
678 // updating the block cache.
679 ro.fill_cache = false;
11fdf7f2 680 std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
f67539c2
TL
681 ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
682 /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
11fdf7f2
TL
683 std::unique_ptr<InternalIterator> range_del_iter(
684 table_reader->NewRangeTombstoneIterator(ro));
7c673cae 685
11fdf7f2 686 // Get first (smallest) and last (largest) key from file.
f67539c2
TL
687 file_to_ingest->smallest_internal_key =
688 InternalKey("", 0, ValueType::kTypeValue);
689 file_to_ingest->largest_internal_key =
690 InternalKey("", 0, ValueType::kTypeValue);
11fdf7f2 691 bool bounds_set = false;
20effc67 692 bool allow_data_in_errors = db_options_.allow_data_in_errors;
7c673cae 693 iter->SeekToFirst();
11fdf7f2 694 if (iter->Valid()) {
20effc67
TL
695 Status pik_status =
696 ParseInternalKey(iter->key(), &key, allow_data_in_errors);
697 if (!pik_status.ok()) {
698 return Status::Corruption("Corrupted key in external file. ",
699 pik_status.getState());
11fdf7f2
TL
700 }
701 if (key.sequence != 0) {
20effc67 702 return Status::Corruption("External file has non zero sequence number");
11fdf7f2 703 }
f67539c2 704 file_to_ingest->smallest_internal_key.SetFrom(key);
11fdf7f2
TL
705
706 iter->SeekToLast();
20effc67
TL
707 pik_status = ParseInternalKey(iter->key(), &key, allow_data_in_errors);
708 if (!pik_status.ok()) {
709 return Status::Corruption("Corrupted key in external file. ",
710 pik_status.getState());
11fdf7f2
TL
711 }
712 if (key.sequence != 0) {
20effc67 713 return Status::Corruption("External file has non zero sequence number");
11fdf7f2 714 }
f67539c2 715 file_to_ingest->largest_internal_key.SetFrom(key);
7c673cae 716
11fdf7f2 717 bounds_set = true;
7c673cae 718 }
11fdf7f2
TL
719
720 // We may need to adjust these key bounds, depending on whether any range
721 // deletion tombstones extend past them.
722 const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
723 if (range_del_iter != nullptr) {
724 for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
725 range_del_iter->Next()) {
20effc67
TL
726 Status pik_status =
727 ParseInternalKey(range_del_iter->key(), &key, allow_data_in_errors);
728 if (!pik_status.ok()) {
729 return Status::Corruption("Corrupted key in external file. ",
730 pik_status.getState());
11fdf7f2
TL
731 }
732 RangeTombstone tombstone(key, range_del_iter->value());
733
f67539c2
TL
734 InternalKey start_key = tombstone.SerializeKey();
735 if (!bounds_set ||
736 sstableKeyCompare(ucmp, start_key,
737 file_to_ingest->smallest_internal_key) < 0) {
738 file_to_ingest->smallest_internal_key = start_key;
11fdf7f2 739 }
f67539c2
TL
740 InternalKey end_key = tombstone.SerializeEndKey();
741 if (!bounds_set ||
742 sstableKeyCompare(ucmp, end_key,
743 file_to_ingest->largest_internal_key) > 0) {
744 file_to_ingest->largest_internal_key = end_key;
11fdf7f2
TL
745 }
746 bounds_set = true;
747 }
7c673cae 748 }
7c673cae
FG
749
750 file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
751
752 file_to_ingest->table_properties = *props;
753
1e59de90
TL
754 auto s = GetSstInternalUniqueId(props->db_id, props->db_session_id,
755 props->orig_file_number,
756 &(file_to_ingest->unique_id));
757 if (!s.ok()) {
758 ROCKS_LOG_WARN(db_options_.info_log,
759 "Failed to get SST unique id for file %s",
760 file_to_ingest->internal_file_path.c_str());
761 file_to_ingest->unique_id = kNullUniqueId64x2;
762 }
763
7c673cae
FG
764 return status;
765}
766
7c673cae
FG
767Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
768 SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
f67539c2
TL
769 SequenceNumber last_seqno, IngestedFileInfo* file_to_ingest,
770 SequenceNumber* assigned_seqno) {
7c673cae
FG
771 Status status;
772 *assigned_seqno = 0;
7c673cae
FG
773 if (force_global_seqno) {
774 *assigned_seqno = last_seqno + 1;
f67539c2 775 if (compaction_style == kCompactionStyleUniversal || files_overlap_) {
1e59de90
TL
776 if (ingestion_options_.fail_if_not_bottommost_level) {
777 status = Status::TryAgain(
778 "Files cannot be ingested to Lmax. Please make sure key range of "
779 "Lmax does not overlap with files to ingest.");
780 return status;
781 }
7c673cae
FG
782 file_to_ingest->picked_level = 0;
783 return status;
784 }
785 }
786
787 bool overlap_with_db = false;
788 Arena arena;
789 ReadOptions ro;
790 ro.total_order_seek = true;
7c673cae
FG
791 int target_level = 0;
792 auto* vstorage = cfd_->current()->storage_info();
11fdf7f2 793
7c673cae
FG
794 for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) {
795 if (lvl > 0 && lvl < vstorage->base_level()) {
796 continue;
797 }
798
799 if (vstorage->NumLevelFiles(lvl) > 0) {
800 bool overlap_with_level = false;
f67539c2
TL
801 status = sv->current->OverlapWithLevelIterator(
802 ro, env_options_, file_to_ingest->smallest_internal_key.user_key(),
803 file_to_ingest->largest_internal_key.user_key(), lvl,
804 &overlap_with_level);
7c673cae
FG
805 if (!status.ok()) {
806 return status;
807 }
808 if (overlap_with_level) {
809 // We must use L0 or any level higher than `lvl` to be able to overwrite
810 // the keys that we overlap with in this level, We also need to assign
811 // this file a seqno to overwrite the existing keys in level `lvl`
812 overlap_with_db = true;
813 break;
814 }
815
816 if (compaction_style == kCompactionStyleUniversal && lvl != 0) {
817 const std::vector<FileMetaData*>& level_files =
818 vstorage->LevelFiles(lvl);
819 const SequenceNumber level_largest_seqno =
1e59de90
TL
820 (*std::max_element(level_files.begin(), level_files.end(),
821 [](FileMetaData* f1, FileMetaData* f2) {
822 return f1->fd.largest_seqno <
823 f2->fd.largest_seqno;
824 }))
11fdf7f2
TL
825 ->fd.largest_seqno;
826 // should only assign seqno to current level's largest seqno when
827 // the file fits
828 if (level_largest_seqno != 0 &&
829 IngestedFileFitInLevel(file_to_ingest, lvl)) {
7c673cae
FG
830 *assigned_seqno = level_largest_seqno;
831 } else {
832 continue;
833 }
834 }
835 } else if (compaction_style == kCompactionStyleUniversal) {
836 continue;
837 }
838
20effc67 839 // We don't overlap with any keys in this level, but we still need to check
7c673cae
FG
840 // if our file can fit in it
841 if (IngestedFileFitInLevel(file_to_ingest, lvl)) {
842 target_level = lvl;
843 }
844 }
f67539c2
TL
845 // If files overlap, we have to ingest them at level 0 and assign the newest
846 // sequence number
847 if (files_overlap_) {
848 target_level = 0;
849 *assigned_seqno = last_seqno + 1;
850 }
1e59de90
TL
851
852 if (ingestion_options_.fail_if_not_bottommost_level &&
853 target_level < cfd_->NumberLevels() - 1) {
854 status = Status::TryAgain(
855 "Files cannot be ingested to Lmax. Please make sure key range of Lmax "
856 "does not overlap with files to ingest.");
857 return status;
858 }
859
860 TEST_SYNC_POINT_CALLBACK(
7c673cae
FG
861 "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
862 &overlap_with_db);
863 file_to_ingest->picked_level = target_level;
864 if (overlap_with_db && *assigned_seqno == 0) {
865 *assigned_seqno = last_seqno + 1;
866 }
867 return status;
868}
869
11fdf7f2
TL
870Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
871 IngestedFileInfo* file_to_ingest) {
872 auto* vstorage = cfd_->current()->storage_info();
873 // first check if new files fit in the bottommost level
874 int bottom_lvl = cfd_->NumberLevels() - 1;
1e59de90 875 if (!IngestedFileFitInLevel(file_to_ingest, bottom_lvl)) {
11fdf7f2 876 return Status::InvalidArgument(
1e59de90
TL
877 "Can't ingest_behind file as it doesn't fit "
878 "at the bottommost level!");
11fdf7f2
TL
879 }
880
881 // second check if despite allow_ingest_behind=true we still have 0 seqnums
882 // at some upper level
883 for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) {
884 for (auto file : vstorage->LevelFiles(lvl)) {
885 if (file->fd.smallest_seqno == 0) {
886 return Status::InvalidArgument(
1e59de90
TL
887 "Can't ingest_behind file as despite allow_ingest_behind=true "
888 "there are files with 0 seqno in database at upper levels!");
11fdf7f2
TL
889 }
890 }
891 }
892
893 file_to_ingest->picked_level = bottom_lvl;
894 return Status::OK();
895}
896
7c673cae
FG
897Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
898 IngestedFileInfo* file_to_ingest, SequenceNumber seqno) {
899 if (file_to_ingest->original_seqno == seqno) {
900 // This file already have the correct global seqno
901 return Status::OK();
902 } else if (!ingestion_options_.allow_global_seqno) {
903 return Status::InvalidArgument("Global seqno is required, but disabled");
904 } else if (file_to_ingest->global_seqno_offset == 0) {
905 return Status::InvalidArgument(
20effc67 906 "Trying to set global seqno for a file that don't have a global seqno "
7c673cae
FG
907 "field");
908 }
909
11fdf7f2
TL
910 if (ingestion_options_.write_global_seqno) {
911 // Determine if we can write global_seqno to a given offset of file.
912 // If the file system does not support random write, then we should not.
913 // Otherwise we should.
f67539c2 914 std::unique_ptr<FSRandomRWFile> rwfile;
1e59de90
TL
915 Status status = fs_->NewRandomRWFile(file_to_ingest->internal_file_path,
916 env_options_, &rwfile, nullptr);
917 TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::NewRandomRWFile",
918 &status);
11fdf7f2 919 if (status.ok()) {
1e59de90
TL
920 FSRandomRWFilePtr fsptr(std::move(rwfile), io_tracer_,
921 file_to_ingest->internal_file_path);
11fdf7f2
TL
922 std::string seqno_val;
923 PutFixed64(&seqno_val, seqno);
20effc67
TL
924 status = fsptr->Write(file_to_ingest->global_seqno_offset, seqno_val,
925 IOOptions(), nullptr);
f67539c2
TL
926 if (status.ok()) {
927 TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno");
20effc67 928 status = SyncIngestedFile(fsptr.get());
f67539c2
TL
929 TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncGlobalSeqno");
930 if (!status.ok()) {
931 ROCKS_LOG_WARN(db_options_.info_log,
932 "Failed to sync ingested file %s after writing global "
933 "sequence number: %s",
934 file_to_ingest->internal_file_path.c_str(),
935 status.ToString().c_str());
936 }
937 }
11fdf7f2
TL
938 if (!status.ok()) {
939 return status;
940 }
941 } else if (!status.IsNotSupported()) {
942 return status;
7c673cae
FG
943 }
944 }
945
11fdf7f2
TL
946 file_to_ingest->assigned_seqno = seqno;
947 return Status::OK();
7c673cae
FG
948}
949
20effc67
TL
950IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile(
951 IngestedFileInfo* file_to_ingest) {
952 if (db_options_.file_checksum_gen_factory == nullptr ||
953 need_generate_file_checksum_ == false ||
954 ingestion_options_.write_global_seqno == false) {
955 // If file_checksum_gen_factory is not set, we are not able to generate
956 // the checksum. if write_global_seqno is false, it means we will use
957 // file checksum generated during Prepare(). This step will be skipped.
958 return IOStatus::OK();
959 }
960 std::string file_checksum;
961 std::string file_checksum_func_name;
962 std::string requested_checksum_func_name;
1e59de90 963 // TODO: rate limit file reads for checksum calculation during file ingestion.
20effc67
TL
964 IOStatus io_s = GenerateOneFileChecksum(
965 fs_.get(), file_to_ingest->internal_file_path,
966 db_options_.file_checksum_gen_factory.get(), requested_checksum_func_name,
967 &file_checksum, &file_checksum_func_name,
968 ingestion_options_.verify_checksums_readahead_size,
1e59de90
TL
969 db_options_.allow_mmap_reads, io_tracer_, db_options_.rate_limiter.get(),
970 Env::IO_TOTAL /* rate_limiter_priority */);
20effc67
TL
971 if (!io_s.ok()) {
972 return io_s;
973 }
974 file_to_ingest->file_checksum = file_checksum;
975 file_to_ingest->file_checksum_func_name = file_checksum_func_name;
976 return IOStatus::OK();
977}
978
7c673cae
FG
979bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
980 const IngestedFileInfo* file_to_ingest, int level) {
981 if (level == 0) {
982 // Files can always fit in L0
983 return true;
984 }
985
986 auto* vstorage = cfd_->current()->storage_info();
f67539c2
TL
987 Slice file_smallest_user_key(
988 file_to_ingest->smallest_internal_key.user_key());
989 Slice file_largest_user_key(file_to_ingest->largest_internal_key.user_key());
7c673cae
FG
990
991 if (vstorage->OverlapInLevel(level, &file_smallest_user_key,
992 &file_largest_user_key)) {
993 // File overlap with another files in this level, we cannot
994 // add it to this level
995 return false;
996 }
997 if (cfd_->RangeOverlapWithCompaction(file_smallest_user_key,
998 file_largest_user_key, level)) {
999 // File overlap with a running compaction output that will be stored
1000 // in this level, we cannot add this file to this level
1001 return false;
1002 }
1003
1004 // File did not overlap with level files, our compaction output
1005 return true;
1006}
1007
f67539c2
TL
1008template <typename TWritableFile>
1009Status ExternalSstFileIngestionJob::SyncIngestedFile(TWritableFile* file) {
1010 assert(file != nullptr);
1011 if (db_options_.use_fsync) {
1012 return file->Fsync(IOOptions(), nullptr);
1013 } else {
1014 return file->Sync(IOOptions(), nullptr);
1015 }
1016}
1017
1018} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
1019
1020#endif // !ROCKSDB_LITE