]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/external_sst_file_ingestion_job.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / external_sst_file_ingestion_job.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
8#include "db/external_sst_file_ingestion_job.h"
9
10#ifndef __STDC_FORMAT_MACROS
11#define __STDC_FORMAT_MACROS
12#endif
13
14#include <inttypes.h>
15#include <algorithm>
16#include <string>
17#include <vector>
18
19#include "db/version_edit.h"
20#include "table/merging_iterator.h"
21#include "table/scoped_arena_iterator.h"
22#include "table/sst_file_writer_collectors.h"
23#include "table/table_builder.h"
24#include "util/file_reader_writer.h"
25#include "util/file_util.h"
26#include "util/stop_watch.h"
27#include "util/sync_point.h"
28
29namespace rocksdb {
30
31Status ExternalSstFileIngestionJob::Prepare(
11fdf7f2
TL
32 const std::vector<std::string>& external_files_paths,
33 uint64_t next_file_number, SuperVersion* sv) {
7c673cae
FG
34 Status status;
35
36 // Read the information of files we are ingesting
37 for (const std::string& file_path : external_files_paths) {
38 IngestedFileInfo file_to_ingest;
11fdf7f2 39 status = GetIngestedFileInfo(file_path, &file_to_ingest, sv);
7c673cae
FG
40 if (!status.ok()) {
41 return status;
42 }
43 files_to_ingest_.push_back(file_to_ingest);
44 }
45
46 for (const IngestedFileInfo& f : files_to_ingest_) {
47 if (f.cf_id !=
48 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
49 f.cf_id != cfd_->GetID()) {
50 return Status::InvalidArgument(
51 "External file column family id dont match");
52 }
53 }
54
55 const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
56 auto num_files = files_to_ingest_.size();
57 if (num_files == 0) {
58 return Status::InvalidArgument("The list of files is empty");
59 } else if (num_files > 1) {
60 // Verify that passed files dont have overlapping ranges
61 autovector<const IngestedFileInfo*> sorted_files;
62 for (size_t i = 0; i < num_files; i++) {
63 sorted_files.push_back(&files_to_ingest_[i]);
64 }
65
66 std::sort(
67 sorted_files.begin(), sorted_files.end(),
68 [&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {
69 return ucmp->Compare(info1->smallest_user_key,
70 info2->smallest_user_key) < 0;
71 });
72
73 for (size_t i = 0; i < num_files - 1; i++) {
74 if (ucmp->Compare(sorted_files[i]->largest_user_key,
75 sorted_files[i + 1]->smallest_user_key) >= 0) {
76 return Status::NotSupported("Files have overlapping ranges");
77 }
78 }
79 }
80
81 for (IngestedFileInfo& f : files_to_ingest_) {
11fdf7f2 82 if (f.num_entries == 0 && f.num_range_deletions == 0) {
7c673cae
FG
83 return Status::InvalidArgument("File contain no entries");
84 }
85
86 if (!f.smallest_internal_key().Valid() ||
87 !f.largest_internal_key().Valid()) {
88 return Status::Corruption("Generated table have corrupted keys");
89 }
90 }
91
92 // Copy/Move external files into DB
93 for (IngestedFileInfo& f : files_to_ingest_) {
11fdf7f2 94 f.fd = FileDescriptor(next_file_number++, 0, f.file_size);
7c673cae
FG
95
96 const std::string path_outside_db = f.external_file_path;
97 const std::string path_inside_db =
11fdf7f2
TL
98 TableFileName(cfd_->ioptions()->cf_paths, f.fd.GetNumber(),
99 f.fd.GetPathId());
7c673cae
FG
100
101 if (ingestion_options_.move_files) {
102 status = env_->LinkFile(path_outside_db, path_inside_db);
103 if (status.IsNotSupported()) {
104 // Original file is on a different FS, use copy instead of hard linking
105 status = CopyFile(env_, path_outside_db, path_inside_db, 0,
106 db_options_.use_fsync);
11fdf7f2
TL
107 f.copy_file = true;
108 } else {
109 f.copy_file = false;
7c673cae
FG
110 }
111 } else {
112 status = CopyFile(env_, path_outside_db, path_inside_db, 0,
113 db_options_.use_fsync);
11fdf7f2 114 f.copy_file = true;
7c673cae 115 }
11fdf7f2 116 TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
7c673cae
FG
117 if (!status.ok()) {
118 break;
119 }
120 f.internal_file_path = path_inside_db;
121 }
122
123 if (!status.ok()) {
124 // We failed, remove all files that we copied into the db
125 for (IngestedFileInfo& f : files_to_ingest_) {
11fdf7f2 126 if (f.internal_file_path.empty()) {
7c673cae
FG
127 break;
128 }
129 Status s = env_->DeleteFile(f.internal_file_path);
130 if (!s.ok()) {
131 ROCKS_LOG_WARN(db_options_.info_log,
132 "AddFile() clean up for file %s failed : %s",
133 f.internal_file_path.c_str(), s.ToString().c_str());
134 }
135 }
136 }
137
138 return status;
139}
140
11fdf7f2
TL
141Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed,
142 SuperVersion* super_version) {
143 autovector<Range> ranges;
144 for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) {
145 ranges.emplace_back(file_to_ingest.smallest_user_key,
146 file_to_ingest.largest_user_key);
147 }
7c673cae 148 Status status =
11fdf7f2 149 cfd_->RangesOverlapWithMemtables(ranges, super_version, flush_needed);
7c673cae
FG
150 if (status.ok() && *flush_needed &&
151 !ingestion_options_.allow_blocking_flush) {
152 status = Status::InvalidArgument("External file requires flush");
153 }
154 return status;
155}
156
11fdf7f2
TL
157// REQUIRES: we have become the only writer by entering both write_thread_ and
158// nonmem_write_thread_
7c673cae
FG
159Status ExternalSstFileIngestionJob::Run() {
160 Status status;
11fdf7f2 161 SuperVersion* super_version = cfd_->GetSuperVersion();
7c673cae
FG
162#ifndef NDEBUG
163 // We should never run the job with a memtable that is overlapping
164 // with the files we are ingesting
165 bool need_flush = false;
11fdf7f2 166 status = NeedsFlush(&need_flush, super_version);
7c673cae
FG
167 assert(status.ok() && need_flush == false);
168#endif
169
170 bool consumed_seqno = false;
171 bool force_global_seqno = false;
172
173 if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) {
174 // We need to assign a global sequence number to all the files even
175 // if the dont overlap with any ranges since we have snapshots
176 force_global_seqno = true;
177 }
11fdf7f2
TL
178 // It is safe to use this instead of LastAllocatedSequence since we are
179 // the only active writer, and hence they are equal
7c673cae 180 const SequenceNumber last_seqno = versions_->LastSequence();
7c673cae
FG
181 edit_.SetColumnFamily(cfd_->GetID());
182 // The levels that the files will be ingested into
183
184 for (IngestedFileInfo& f : files_to_ingest_) {
185 SequenceNumber assigned_seqno = 0;
11fdf7f2
TL
186 if (ingestion_options_.ingest_behind) {
187 status = CheckLevelForIngestedBehindFile(&f);
188 } else {
189 status = AssignLevelAndSeqnoForIngestedFile(
190 super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
191 &f, &assigned_seqno);
192 }
7c673cae
FG
193 if (!status.ok()) {
194 return status;
195 }
196 status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
197 TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
198 &assigned_seqno);
199 if (assigned_seqno == last_seqno + 1) {
200 consumed_seqno = true;
201 }
202 if (!status.ok()) {
203 return status;
204 }
205 edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
206 f.fd.GetFileSize(), f.smallest_internal_key(),
207 f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno,
208 false);
209 }
210
211 if (consumed_seqno) {
11fdf7f2
TL
212 versions_->SetLastAllocatedSequence(last_seqno + 1);
213 versions_->SetLastPublishedSequence(last_seqno + 1);
7c673cae
FG
214 versions_->SetLastSequence(last_seqno + 1);
215 }
216
217 return status;
218}
219
220void ExternalSstFileIngestionJob::UpdateStats() {
221 // Update internal stats for new ingested files
222 uint64_t total_keys = 0;
223 uint64_t total_l0_files = 0;
224 uint64_t total_time = env_->NowMicros() - job_start_time_;
225 for (IngestedFileInfo& f : files_to_ingest_) {
11fdf7f2 226 InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1);
7c673cae 227 stats.micros = total_time;
11fdf7f2
TL
228 // If actual copy occured for this file, then we need to count the file
229 // size as the actual bytes written. If the file was linked, then we ignore
230 // the bytes written for file metadata.
231 // TODO (yanqin) maybe account for file metadata bytes for exact accuracy?
232 if (f.copy_file) {
233 stats.bytes_written = f.fd.GetFileSize();
234 } else {
235 stats.bytes_moved = f.fd.GetFileSize();
236 }
7c673cae
FG
237 stats.num_output_files = 1;
238 cfd_->internal_stats()->AddCompactionStats(f.picked_level, stats);
239 cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE,
240 f.fd.GetFileSize());
241 total_keys += f.num_entries;
242 if (f.picked_level == 0) {
243 total_l0_files += 1;
244 }
245 ROCKS_LOG_INFO(
246 db_options_.info_log,
247 "[AddFile] External SST file %s was ingested in L%d with path %s "
248 "(global_seqno=%" PRIu64 ")\n",
249 f.external_file_path.c_str(), f.picked_level,
250 f.internal_file_path.c_str(), f.assigned_seqno);
251 }
252 cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL,
253 total_keys);
254 cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL,
255 files_to_ingest_.size());
256 cfd_->internal_stats()->AddCFStats(
257 InternalStats::INGESTED_LEVEL0_NUM_FILES_TOTAL, total_l0_files);
258}
259
260void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
261 if (!status.ok()) {
262 // We failed to add the files to the database
263 // remove all the files we copied
264 for (IngestedFileInfo& f : files_to_ingest_) {
265 Status s = env_->DeleteFile(f.internal_file_path);
266 if (!s.ok()) {
267 ROCKS_LOG_WARN(db_options_.info_log,
268 "AddFile() clean up for file %s failed : %s",
269 f.internal_file_path.c_str(), s.ToString().c_str());
270 }
271 }
272 } else if (status.ok() && ingestion_options_.move_files) {
273 // The files were moved and added successfully, remove original file links
274 for (IngestedFileInfo& f : files_to_ingest_) {
275 Status s = env_->DeleteFile(f.external_file_path);
276 if (!s.ok()) {
277 ROCKS_LOG_WARN(
278 db_options_.info_log,
279 "%s was added to DB successfully but failed to remove original "
280 "file link : %s",
281 f.external_file_path.c_str(), s.ToString().c_str());
282 }
283 }
284 }
285}
286
287Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
11fdf7f2
TL
288 const std::string& external_file, IngestedFileInfo* file_to_ingest,
289 SuperVersion* sv) {
7c673cae
FG
290 file_to_ingest->external_file_path = external_file;
291
292 // Get external file size
293 Status status = env_->GetFileSize(external_file, &file_to_ingest->file_size);
294 if (!status.ok()) {
295 return status;
296 }
297
298 // Create TableReader for external file
299 std::unique_ptr<TableReader> table_reader;
300 std::unique_ptr<RandomAccessFile> sst_file;
301 std::unique_ptr<RandomAccessFileReader> sst_file_reader;
302
303 status = env_->NewRandomAccessFile(external_file, &sst_file, env_options_);
304 if (!status.ok()) {
305 return status;
306 }
11fdf7f2
TL
307 sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file),
308 external_file));
7c673cae
FG
309
310 status = cfd_->ioptions()->table_factory->NewTableReader(
11fdf7f2
TL
311 TableReaderOptions(*cfd_->ioptions(),
312 sv->mutable_cf_options.prefix_extractor.get(),
313 env_options_, cfd_->internal_comparator()),
7c673cae
FG
314 std::move(sst_file_reader), file_to_ingest->file_size, &table_reader);
315 if (!status.ok()) {
316 return status;
317 }
318
319 // Get the external file properties
320 auto props = table_reader->GetTableProperties();
321 const auto& uprops = props->user_collected_properties;
322
323 // Get table version
324 auto version_iter = uprops.find(ExternalSstFilePropertyNames::kVersion);
325 if (version_iter == uprops.end()) {
326 return Status::Corruption("External file version not found");
327 }
328 file_to_ingest->version = DecodeFixed32(version_iter->second.c_str());
329
330 auto seqno_iter = uprops.find(ExternalSstFilePropertyNames::kGlobalSeqno);
331 if (file_to_ingest->version == 2) {
332 // version 2 imply that we have global sequence number
333 if (seqno_iter == uprops.end()) {
334 return Status::Corruption(
335 "External file global sequence number not found");
336 }
337
338 // Set the global sequence number
339 file_to_ingest->original_seqno = DecodeFixed64(seqno_iter->second.c_str());
11fdf7f2 340 auto offsets_iter = props->properties_offsets.find(
7c673cae 341 ExternalSstFilePropertyNames::kGlobalSeqno);
11fdf7f2
TL
342 if (offsets_iter == props->properties_offsets.end() ||
343 offsets_iter->second == 0) {
344 file_to_ingest->global_seqno_offset = 0;
7c673cae
FG
345 return Status::Corruption("Was not able to find file global seqno field");
346 }
11fdf7f2 347 file_to_ingest->global_seqno_offset = static_cast<size_t>(offsets_iter->second);
7c673cae
FG
348 } else if (file_to_ingest->version == 1) {
349 // SST file V1 should not have global seqno field
350 assert(seqno_iter == uprops.end());
351 file_to_ingest->original_seqno = 0;
352 if (ingestion_options_.allow_blocking_flush ||
353 ingestion_options_.allow_global_seqno) {
354 return Status::InvalidArgument(
355 "External SST file V1 does not support global seqno");
356 }
357 } else {
358 return Status::InvalidArgument("External file version is not supported");
359 }
360 // Get number of entries in table
361 file_to_ingest->num_entries = props->num_entries;
11fdf7f2 362 file_to_ingest->num_range_deletions = props->num_range_deletions;
7c673cae
FG
363
364 ParsedInternalKey key;
365 ReadOptions ro;
366 // During reading the external file we can cache blocks that we read into
367 // the block cache, if we later change the global seqno of this file, we will
368 // have block in cache that will include keys with wrong seqno.
369 // We need to disable fill_cache so that we read from the file without
370 // updating the block cache.
371 ro.fill_cache = false;
11fdf7f2
TL
372 std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
373 ro, sv->mutable_cf_options.prefix_extractor.get()));
374 std::unique_ptr<InternalIterator> range_del_iter(
375 table_reader->NewRangeTombstoneIterator(ro));
7c673cae 376
11fdf7f2
TL
377 // Get first (smallest) and last (largest) key from file.
378 bool bounds_set = false;
7c673cae 379 iter->SeekToFirst();
11fdf7f2
TL
380 if (iter->Valid()) {
381 if (!ParseInternalKey(iter->key(), &key)) {
382 return Status::Corruption("external file have corrupted keys");
383 }
384 if (key.sequence != 0) {
385 return Status::Corruption("external file have non zero sequence number");
386 }
387 file_to_ingest->smallest_user_key = key.user_key.ToString();
388
389 iter->SeekToLast();
390 if (!ParseInternalKey(iter->key(), &key)) {
391 return Status::Corruption("external file have corrupted keys");
392 }
393 if (key.sequence != 0) {
394 return Status::Corruption("external file have non zero sequence number");
395 }
396 file_to_ingest->largest_user_key = key.user_key.ToString();
7c673cae 397
11fdf7f2 398 bounds_set = true;
7c673cae 399 }
11fdf7f2
TL
400
401 // We may need to adjust these key bounds, depending on whether any range
402 // deletion tombstones extend past them.
403 const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
404 if (range_del_iter != nullptr) {
405 for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
406 range_del_iter->Next()) {
407 if (!ParseInternalKey(range_del_iter->key(), &key)) {
408 return Status::Corruption("external file have corrupted keys");
409 }
410 RangeTombstone tombstone(key, range_del_iter->value());
411
412 if (!bounds_set || ucmp->Compare(tombstone.start_key_,
413 file_to_ingest->smallest_user_key) < 0) {
414 file_to_ingest->smallest_user_key = tombstone.start_key_.ToString();
415 }
416 if (!bounds_set || ucmp->Compare(tombstone.end_key_,
417 file_to_ingest->largest_user_key) > 0) {
418 file_to_ingest->largest_user_key = tombstone.end_key_.ToString();
419 }
420 bounds_set = true;
421 }
7c673cae 422 }
7c673cae
FG
423
424 file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
425
426 file_to_ingest->table_properties = *props;
427
428 return status;
429}
430
7c673cae
FG
431Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
432 SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
433 IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) {
434 Status status;
435 *assigned_seqno = 0;
436 const SequenceNumber last_seqno = versions_->LastSequence();
437 if (force_global_seqno) {
438 *assigned_seqno = last_seqno + 1;
439 if (compaction_style == kCompactionStyleUniversal) {
440 file_to_ingest->picked_level = 0;
441 return status;
442 }
443 }
444
445 bool overlap_with_db = false;
446 Arena arena;
447 ReadOptions ro;
448 ro.total_order_seek = true;
7c673cae
FG
449 int target_level = 0;
450 auto* vstorage = cfd_->current()->storage_info();
11fdf7f2 451
7c673cae
FG
452 for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) {
453 if (lvl > 0 && lvl < vstorage->base_level()) {
454 continue;
455 }
456
457 if (vstorage->NumLevelFiles(lvl) > 0) {
458 bool overlap_with_level = false;
11fdf7f2
TL
459 status = sv->current->OverlapWithLevelIterator(ro, env_options_,
460 file_to_ingest->smallest_user_key, file_to_ingest->largest_user_key,
461 lvl, &overlap_with_level);
7c673cae
FG
462 if (!status.ok()) {
463 return status;
464 }
465 if (overlap_with_level) {
466 // We must use L0 or any level higher than `lvl` to be able to overwrite
467 // the keys that we overlap with in this level, We also need to assign
468 // this file a seqno to overwrite the existing keys in level `lvl`
469 overlap_with_db = true;
470 break;
471 }
472
473 if (compaction_style == kCompactionStyleUniversal && lvl != 0) {
474 const std::vector<FileMetaData*>& level_files =
475 vstorage->LevelFiles(lvl);
476 const SequenceNumber level_largest_seqno =
477 (*max_element(level_files.begin(), level_files.end(),
478 [](FileMetaData* f1, FileMetaData* f2) {
11fdf7f2 479 return f1->fd.largest_seqno < f2->fd.largest_seqno;
7c673cae 480 }))
11fdf7f2
TL
481 ->fd.largest_seqno;
482 // should only assign seqno to current level's largest seqno when
483 // the file fits
484 if (level_largest_seqno != 0 &&
485 IngestedFileFitInLevel(file_to_ingest, lvl)) {
7c673cae
FG
486 *assigned_seqno = level_largest_seqno;
487 } else {
488 continue;
489 }
490 }
491 } else if (compaction_style == kCompactionStyleUniversal) {
492 continue;
493 }
494
495 // We dont overlap with any keys in this level, but we still need to check
496 // if our file can fit in it
497 if (IngestedFileFitInLevel(file_to_ingest, lvl)) {
498 target_level = lvl;
499 }
500 }
501 TEST_SYNC_POINT_CALLBACK(
502 "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
503 &overlap_with_db);
504 file_to_ingest->picked_level = target_level;
505 if (overlap_with_db && *assigned_seqno == 0) {
506 *assigned_seqno = last_seqno + 1;
507 }
508 return status;
509}
510
11fdf7f2
TL
511Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
512 IngestedFileInfo* file_to_ingest) {
513 auto* vstorage = cfd_->current()->storage_info();
514 // first check if new files fit in the bottommost level
515 int bottom_lvl = cfd_->NumberLevels() - 1;
516 if(!IngestedFileFitInLevel(file_to_ingest, bottom_lvl)) {
517 return Status::InvalidArgument(
518 "Can't ingest_behind file as it doesn't fit "
519 "at the bottommost level!");
520 }
521
522 // second check if despite allow_ingest_behind=true we still have 0 seqnums
523 // at some upper level
524 for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) {
525 for (auto file : vstorage->LevelFiles(lvl)) {
526 if (file->fd.smallest_seqno == 0) {
527 return Status::InvalidArgument(
528 "Can't ingest_behind file as despite allow_ingest_behind=true "
529 "there are files with 0 seqno in database at upper levels!");
530 }
531 }
532 }
533
534 file_to_ingest->picked_level = bottom_lvl;
535 return Status::OK();
536}
537
7c673cae
FG
538Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
539 IngestedFileInfo* file_to_ingest, SequenceNumber seqno) {
540 if (file_to_ingest->original_seqno == seqno) {
541 // This file already have the correct global seqno
542 return Status::OK();
543 } else if (!ingestion_options_.allow_global_seqno) {
544 return Status::InvalidArgument("Global seqno is required, but disabled");
545 } else if (file_to_ingest->global_seqno_offset == 0) {
546 return Status::InvalidArgument(
547 "Trying to set global seqno for a file that dont have a global seqno "
548 "field");
549 }
550
11fdf7f2
TL
551 if (ingestion_options_.write_global_seqno) {
552 // Determine if we can write global_seqno to a given offset of file.
553 // If the file system does not support random write, then we should not.
554 // Otherwise we should.
555 std::unique_ptr<RandomRWFile> rwfile;
556 Status status = env_->NewRandomRWFile(file_to_ingest->internal_file_path,
557 &rwfile, env_options_);
558 if (status.ok()) {
559 std::string seqno_val;
560 PutFixed64(&seqno_val, seqno);
561 status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val);
562 if (!status.ok()) {
563 return status;
564 }
565 } else if (!status.IsNotSupported()) {
566 return status;
7c673cae
FG
567 }
568 }
569
11fdf7f2
TL
570 file_to_ingest->assigned_seqno = seqno;
571 return Status::OK();
7c673cae
FG
572}
573
574bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
575 const IngestedFileInfo* file_to_ingest, int level) {
576 if (level == 0) {
577 // Files can always fit in L0
578 return true;
579 }
580
581 auto* vstorage = cfd_->current()->storage_info();
582 Slice file_smallest_user_key(file_to_ingest->smallest_user_key);
583 Slice file_largest_user_key(file_to_ingest->largest_user_key);
584
585 if (vstorage->OverlapInLevel(level, &file_smallest_user_key,
586 &file_largest_user_key)) {
587 // File overlap with another files in this level, we cannot
588 // add it to this level
589 return false;
590 }
591 if (cfd_->RangeOverlapWithCompaction(file_smallest_user_key,
592 file_largest_user_key, level)) {
593 // File overlap with a running compaction output that will be stored
594 // in this level, we cannot add this file to this level
595 return false;
596 }
597
598 // File did not overlap with level files, our compaction output
599 return true;
600}
601
602} // namespace rocksdb
603
604#endif // !ROCKSDB_LITE