]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/external_sst_file_ingestion_job.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / external_sst_file_ingestion_job.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7
8#include "db/external_sst_file_ingestion_job.h"
9
7c673cae 10#include <algorithm>
f67539c2 11#include <cinttypes>
7c673cae 12#include <string>
f67539c2 13#include <unordered_set>
7c673cae
FG
14#include <vector>
15
f67539c2 16#include "db/db_impl/db_impl.h"
7c673cae 17#include "db/version_edit.h"
f67539c2
TL
18#include "file/file_util.h"
19#include "file/random_access_file_reader.h"
7c673cae
FG
20#include "table/merging_iterator.h"
21#include "table/scoped_arena_iterator.h"
22#include "table/sst_file_writer_collectors.h"
23#include "table/table_builder.h"
f67539c2 24#include "test_util/sync_point.h"
7c673cae 25#include "util/stop_watch.h"
7c673cae 26
f67539c2 27namespace ROCKSDB_NAMESPACE {
7c673cae
FG
28
29Status ExternalSstFileIngestionJob::Prepare(
11fdf7f2
TL
30 const std::vector<std::string>& external_files_paths,
31 uint64_t next_file_number, SuperVersion* sv) {
7c673cae
FG
32 Status status;
33
34 // Read the information of files we are ingesting
35 for (const std::string& file_path : external_files_paths) {
36 IngestedFileInfo file_to_ingest;
11fdf7f2 37 status = GetIngestedFileInfo(file_path, &file_to_ingest, sv);
7c673cae
FG
38 if (!status.ok()) {
39 return status;
40 }
41 files_to_ingest_.push_back(file_to_ingest);
42 }
43
44 for (const IngestedFileInfo& f : files_to_ingest_) {
45 if (f.cf_id !=
46 TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
47 f.cf_id != cfd_->GetID()) {
48 return Status::InvalidArgument(
49 "External file column family id dont match");
50 }
51 }
52
53 const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
54 auto num_files = files_to_ingest_.size();
55 if (num_files == 0) {
56 return Status::InvalidArgument("The list of files is empty");
57 } else if (num_files > 1) {
58 // Verify that passed files dont have overlapping ranges
59 autovector<const IngestedFileInfo*> sorted_files;
60 for (size_t i = 0; i < num_files; i++) {
61 sorted_files.push_back(&files_to_ingest_[i]);
62 }
63
64 std::sort(
65 sorted_files.begin(), sorted_files.end(),
66 [&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) {
f67539c2
TL
67 return sstableKeyCompare(ucmp, info1->smallest_internal_key,
68 info2->smallest_internal_key) < 0;
7c673cae
FG
69 });
70
71 for (size_t i = 0; i < num_files - 1; i++) {
f67539c2
TL
72 if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key,
73 sorted_files[i + 1]->smallest_internal_key) >= 0) {
74 files_overlap_ = true;
75 break;
7c673cae
FG
76 }
77 }
78 }
79
f67539c2
TL
80 if (ingestion_options_.ingest_behind && files_overlap_) {
81 return Status::NotSupported("Files have overlapping ranges");
82 }
83
7c673cae 84 for (IngestedFileInfo& f : files_to_ingest_) {
11fdf7f2 85 if (f.num_entries == 0 && f.num_range_deletions == 0) {
7c673cae
FG
86 return Status::InvalidArgument("File contain no entries");
87 }
88
f67539c2 89 if (!f.smallest_internal_key.Valid() || !f.largest_internal_key.Valid()) {
7c673cae
FG
90 return Status::Corruption("Generated table have corrupted keys");
91 }
92 }
93
94 // Copy/Move external files into DB
f67539c2 95 std::unordered_set<size_t> ingestion_path_ids;
7c673cae 96 for (IngestedFileInfo& f : files_to_ingest_) {
11fdf7f2 97 f.fd = FileDescriptor(next_file_number++, 0, f.file_size);
f67539c2 98 f.copy_file = false;
7c673cae
FG
99 const std::string path_outside_db = f.external_file_path;
100 const std::string path_inside_db =
11fdf7f2
TL
101 TableFileName(cfd_->ioptions()->cf_paths, f.fd.GetNumber(),
102 f.fd.GetPathId());
7c673cae 103 if (ingestion_options_.move_files) {
f67539c2
TL
104 status =
105 fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr);
106 if (status.ok()) {
107 // It is unsafe to assume application had sync the file and file
108 // directory before ingest the file. For integrity of RocksDB we need
109 // to sync the file.
110 std::unique_ptr<FSWritableFile> file_to_sync;
111 status = fs_->ReopenWritableFile(path_inside_db, env_options_,
112 &file_to_sync, nullptr);
113 if (status.ok()) {
114 TEST_SYNC_POINT(
115 "ExternalSstFileIngestionJob::BeforeSyncIngestedFile");
116 status = SyncIngestedFile(file_to_sync.get());
117 TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncIngestedFile");
118 if (!status.ok()) {
119 ROCKS_LOG_WARN(db_options_.info_log,
120 "Failed to sync ingested file %s: %s",
121 path_inside_db.c_str(), status.ToString().c_str());
122 }
123 }
124 } else if (status.IsNotSupported() &&
125 ingestion_options_.failed_move_fall_back_to_copy) {
126 // Original file is on a different FS, use copy instead of hard linking.
11fdf7f2 127 f.copy_file = true;
7c673cae
FG
128 }
129 } else {
11fdf7f2 130 f.copy_file = true;
7c673cae 131 }
f67539c2
TL
132
133 if (f.copy_file) {
134 TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
135 nullptr);
136 // CopyFile also sync the new file.
137 status = CopyFile(fs_, path_outside_db, path_inside_db, 0,
138 db_options_.use_fsync);
139 }
11fdf7f2 140 TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
7c673cae
FG
141 if (!status.ok()) {
142 break;
143 }
144 f.internal_file_path = path_inside_db;
f67539c2
TL
145 ingestion_path_ids.insert(f.fd.GetPathId());
146 }
147
148 TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir");
149 if (status.ok()) {
150 for (auto path_id : ingestion_path_ids) {
151 status = directories_->GetDataDir(path_id)->Fsync();
152 if (!status.ok()) {
153 ROCKS_LOG_WARN(db_options_.info_log,
154 "Failed to sync directory %" ROCKSDB_PRIszt
155 " while ingest file: %s",
156 path_id, status.ToString().c_str());
157 break;
158 }
159 }
7c673cae 160 }
f67539c2 161 TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir");
7c673cae 162
f67539c2 163 // TODO: The following is duplicated with Cleanup().
7c673cae
FG
164 if (!status.ok()) {
165 // We failed, remove all files that we copied into the db
166 for (IngestedFileInfo& f : files_to_ingest_) {
11fdf7f2 167 if (f.internal_file_path.empty()) {
f67539c2 168 continue;
7c673cae
FG
169 }
170 Status s = env_->DeleteFile(f.internal_file_path);
171 if (!s.ok()) {
172 ROCKS_LOG_WARN(db_options_.info_log,
173 "AddFile() clean up for file %s failed : %s",
174 f.internal_file_path.c_str(), s.ToString().c_str());
175 }
176 }
177 }
178
179 return status;
180}
181
11fdf7f2
TL
182Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed,
183 SuperVersion* super_version) {
184 autovector<Range> ranges;
185 for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) {
f67539c2
TL
186 ranges.emplace_back(file_to_ingest.smallest_internal_key.user_key(),
187 file_to_ingest.largest_internal_key.user_key());
11fdf7f2 188 }
7c673cae 189 Status status =
11fdf7f2 190 cfd_->RangesOverlapWithMemtables(ranges, super_version, flush_needed);
7c673cae
FG
191 if (status.ok() && *flush_needed &&
192 !ingestion_options_.allow_blocking_flush) {
193 status = Status::InvalidArgument("External file requires flush");
194 }
195 return status;
196}
197
11fdf7f2
TL
198// REQUIRES: we have become the only writer by entering both write_thread_ and
199// nonmem_write_thread_
7c673cae
FG
200Status ExternalSstFileIngestionJob::Run() {
201 Status status;
11fdf7f2 202 SuperVersion* super_version = cfd_->GetSuperVersion();
7c673cae
FG
203#ifndef NDEBUG
204 // We should never run the job with a memtable that is overlapping
205 // with the files we are ingesting
206 bool need_flush = false;
11fdf7f2 207 status = NeedsFlush(&need_flush, super_version);
7c673cae
FG
208 assert(status.ok() && need_flush == false);
209#endif
210
7c673cae
FG
211 bool force_global_seqno = false;
212
213 if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) {
214 // We need to assign a global sequence number to all the files even
215 // if the dont overlap with any ranges since we have snapshots
216 force_global_seqno = true;
217 }
11fdf7f2
TL
218 // It is safe to use this instead of LastAllocatedSequence since we are
219 // the only active writer, and hence they are equal
f67539c2 220 SequenceNumber last_seqno = versions_->LastSequence();
7c673cae
FG
221 edit_.SetColumnFamily(cfd_->GetID());
222 // The levels that the files will be ingested into
223
224 for (IngestedFileInfo& f : files_to_ingest_) {
225 SequenceNumber assigned_seqno = 0;
11fdf7f2
TL
226 if (ingestion_options_.ingest_behind) {
227 status = CheckLevelForIngestedBehindFile(&f);
228 } else {
229 status = AssignLevelAndSeqnoForIngestedFile(
f67539c2
TL
230 super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
231 last_seqno, &f, &assigned_seqno);
11fdf7f2 232 }
7c673cae
FG
233 if (!status.ok()) {
234 return status;
235 }
236 status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
237 TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
238 &assigned_seqno);
f67539c2
TL
239 if (assigned_seqno > last_seqno) {
240 assert(assigned_seqno == last_seqno + 1);
241 last_seqno = assigned_seqno;
242 ++consumed_seqno_count_;
7c673cae
FG
243 }
244 if (!status.ok()) {
245 return status;
246 }
f67539c2
TL
247
248 // We use the import time as the ancester time. This is the time the data
249 // is written to the database.
250 int64_t temp_current_time = 0;
251 uint64_t current_time = kUnknownFileCreationTime;
252 uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
253 if (env_->GetCurrentTime(&temp_current_time).ok()) {
254 current_time = oldest_ancester_time =
255 static_cast<uint64_t>(temp_current_time);
256 }
257
258 edit_.AddFile(
259 f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(), f.fd.GetFileSize(),
260 f.smallest_internal_key, f.largest_internal_key, f.assigned_seqno,
261 f.assigned_seqno, false, kInvalidBlobFileNumber, oldest_ancester_time,
262 current_time, kUnknownFileChecksum, kUnknownFileChecksumFuncName);
7c673cae 263 }
7c673cae
FG
264 return status;
265}
266
267void ExternalSstFileIngestionJob::UpdateStats() {
268 // Update internal stats for new ingested files
269 uint64_t total_keys = 0;
270 uint64_t total_l0_files = 0;
271 uint64_t total_time = env_->NowMicros() - job_start_time_;
f67539c2
TL
272
273 EventLoggerStream stream = event_logger_->Log();
274 stream << "event"
275 << "ingest_finished";
276 stream << "files_ingested";
277 stream.StartArray();
278
7c673cae 279 for (IngestedFileInfo& f : files_to_ingest_) {
11fdf7f2 280 InternalStats::CompactionStats stats(CompactionReason::kExternalSstIngestion, 1);
7c673cae 281 stats.micros = total_time;
494da23a 282 // If actual copy occurred for this file, then we need to count the file
11fdf7f2
TL
283 // size as the actual bytes written. If the file was linked, then we ignore
284 // the bytes written for file metadata.
285 // TODO (yanqin) maybe account for file metadata bytes for exact accuracy?
286 if (f.copy_file) {
287 stats.bytes_written = f.fd.GetFileSize();
288 } else {
289 stats.bytes_moved = f.fd.GetFileSize();
290 }
7c673cae 291 stats.num_output_files = 1;
494da23a
TL
292 cfd_->internal_stats()->AddCompactionStats(f.picked_level,
293 Env::Priority::USER, stats);
7c673cae
FG
294 cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE,
295 f.fd.GetFileSize());
296 total_keys += f.num_entries;
297 if (f.picked_level == 0) {
298 total_l0_files += 1;
299 }
300 ROCKS_LOG_INFO(
301 db_options_.info_log,
302 "[AddFile] External SST file %s was ingested in L%d with path %s "
303 "(global_seqno=%" PRIu64 ")\n",
304 f.external_file_path.c_str(), f.picked_level,
305 f.internal_file_path.c_str(), f.assigned_seqno);
f67539c2
TL
306 stream << "file" << f.internal_file_path << "level" << f.picked_level;
307 }
308 stream.EndArray();
309
310 stream << "lsm_state";
311 stream.StartArray();
312 auto vstorage = cfd_->current()->storage_info();
313 for (int level = 0; level < vstorage->num_levels(); ++level) {
314 stream << vstorage->NumLevelFiles(level);
7c673cae 315 }
f67539c2
TL
316 stream.EndArray();
317
7c673cae
FG
318 cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL,
319 total_keys);
320 cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL,
321 files_to_ingest_.size());
322 cfd_->internal_stats()->AddCFStats(
323 InternalStats::INGESTED_LEVEL0_NUM_FILES_TOTAL, total_l0_files);
324}
325
326void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
327 if (!status.ok()) {
328 // We failed to add the files to the database
329 // remove all the files we copied
330 for (IngestedFileInfo& f : files_to_ingest_) {
f67539c2
TL
331 if (f.internal_file_path.empty()) {
332 continue;
333 }
7c673cae
FG
334 Status s = env_->DeleteFile(f.internal_file_path);
335 if (!s.ok()) {
336 ROCKS_LOG_WARN(db_options_.info_log,
337 "AddFile() clean up for file %s failed : %s",
338 f.internal_file_path.c_str(), s.ToString().c_str());
339 }
340 }
f67539c2
TL
341 consumed_seqno_count_ = 0;
342 files_overlap_ = false;
7c673cae
FG
343 } else if (status.ok() && ingestion_options_.move_files) {
344 // The files were moved and added successfully, remove original file links
345 for (IngestedFileInfo& f : files_to_ingest_) {
346 Status s = env_->DeleteFile(f.external_file_path);
347 if (!s.ok()) {
348 ROCKS_LOG_WARN(
349 db_options_.info_log,
350 "%s was added to DB successfully but failed to remove original "
351 "file link : %s",
352 f.external_file_path.c_str(), s.ToString().c_str());
353 }
354 }
355 }
356}
357
358Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
11fdf7f2
TL
359 const std::string& external_file, IngestedFileInfo* file_to_ingest,
360 SuperVersion* sv) {
7c673cae
FG
361 file_to_ingest->external_file_path = external_file;
362
363 // Get external file size
f67539c2
TL
364 Status status = fs_->GetFileSize(external_file, IOOptions(),
365 &file_to_ingest->file_size, nullptr);
7c673cae
FG
366 if (!status.ok()) {
367 return status;
368 }
369
370 // Create TableReader for external file
371 std::unique_ptr<TableReader> table_reader;
f67539c2 372 std::unique_ptr<FSRandomAccessFile> sst_file;
7c673cae
FG
373 std::unique_ptr<RandomAccessFileReader> sst_file_reader;
374
f67539c2
TL
375 status = fs_->NewRandomAccessFile(external_file, env_options_,
376 &sst_file, nullptr);
7c673cae
FG
377 if (!status.ok()) {
378 return status;
379 }
11fdf7f2
TL
380 sst_file_reader.reset(new RandomAccessFileReader(std::move(sst_file),
381 external_file));
7c673cae
FG
382
383 status = cfd_->ioptions()->table_factory->NewTableReader(
11fdf7f2
TL
384 TableReaderOptions(*cfd_->ioptions(),
385 sv->mutable_cf_options.prefix_extractor.get(),
386 env_options_, cfd_->internal_comparator()),
7c673cae
FG
387 std::move(sst_file_reader), file_to_ingest->file_size, &table_reader);
388 if (!status.ok()) {
389 return status;
390 }
391
494da23a 392 if (ingestion_options_.verify_checksums_before_ingest) {
f67539c2
TL
393 // If customized readahead size is needed, we can pass a user option
394 // all the way to here. Right now we just rely on the default readahead
395 // to keep things simple.
396 ReadOptions ro;
397 ro.readahead_size = ingestion_options_.verify_checksums_readahead_size;
398 status = table_reader->VerifyChecksum(
399 ro, TableReaderCaller::kExternalSSTIngestion);
494da23a
TL
400 }
401 if (!status.ok()) {
402 return status;
403 }
404
7c673cae
FG
405 // Get the external file properties
406 auto props = table_reader->GetTableProperties();
407 const auto& uprops = props->user_collected_properties;
408
409 // Get table version
410 auto version_iter = uprops.find(ExternalSstFilePropertyNames::kVersion);
411 if (version_iter == uprops.end()) {
412 return Status::Corruption("External file version not found");
413 }
414 file_to_ingest->version = DecodeFixed32(version_iter->second.c_str());
415
416 auto seqno_iter = uprops.find(ExternalSstFilePropertyNames::kGlobalSeqno);
417 if (file_to_ingest->version == 2) {
418 // version 2 imply that we have global sequence number
419 if (seqno_iter == uprops.end()) {
420 return Status::Corruption(
421 "External file global sequence number not found");
422 }
423
424 // Set the global sequence number
425 file_to_ingest->original_seqno = DecodeFixed64(seqno_iter->second.c_str());
11fdf7f2 426 auto offsets_iter = props->properties_offsets.find(
7c673cae 427 ExternalSstFilePropertyNames::kGlobalSeqno);
11fdf7f2
TL
428 if (offsets_iter == props->properties_offsets.end() ||
429 offsets_iter->second == 0) {
430 file_to_ingest->global_seqno_offset = 0;
7c673cae
FG
431 return Status::Corruption("Was not able to find file global seqno field");
432 }
11fdf7f2 433 file_to_ingest->global_seqno_offset = static_cast<size_t>(offsets_iter->second);
7c673cae
FG
434 } else if (file_to_ingest->version == 1) {
435 // SST file V1 should not have global seqno field
436 assert(seqno_iter == uprops.end());
437 file_to_ingest->original_seqno = 0;
438 if (ingestion_options_.allow_blocking_flush ||
439 ingestion_options_.allow_global_seqno) {
440 return Status::InvalidArgument(
441 "External SST file V1 does not support global seqno");
442 }
443 } else {
444 return Status::InvalidArgument("External file version is not supported");
445 }
446 // Get number of entries in table
447 file_to_ingest->num_entries = props->num_entries;
11fdf7f2 448 file_to_ingest->num_range_deletions = props->num_range_deletions;
7c673cae
FG
449
450 ParsedInternalKey key;
451 ReadOptions ro;
452 // During reading the external file we can cache blocks that we read into
453 // the block cache, if we later change the global seqno of this file, we will
454 // have block in cache that will include keys with wrong seqno.
455 // We need to disable fill_cache so that we read from the file without
456 // updating the block cache.
457 ro.fill_cache = false;
11fdf7f2 458 std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
f67539c2
TL
459 ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
460 /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
11fdf7f2
TL
461 std::unique_ptr<InternalIterator> range_del_iter(
462 table_reader->NewRangeTombstoneIterator(ro));
7c673cae 463
11fdf7f2 464 // Get first (smallest) and last (largest) key from file.
f67539c2
TL
465 file_to_ingest->smallest_internal_key =
466 InternalKey("", 0, ValueType::kTypeValue);
467 file_to_ingest->largest_internal_key =
468 InternalKey("", 0, ValueType::kTypeValue);
11fdf7f2 469 bool bounds_set = false;
7c673cae 470 iter->SeekToFirst();
11fdf7f2
TL
471 if (iter->Valid()) {
472 if (!ParseInternalKey(iter->key(), &key)) {
473 return Status::Corruption("external file have corrupted keys");
474 }
475 if (key.sequence != 0) {
476 return Status::Corruption("external file have non zero sequence number");
477 }
f67539c2 478 file_to_ingest->smallest_internal_key.SetFrom(key);
11fdf7f2
TL
479
480 iter->SeekToLast();
481 if (!ParseInternalKey(iter->key(), &key)) {
482 return Status::Corruption("external file have corrupted keys");
483 }
484 if (key.sequence != 0) {
485 return Status::Corruption("external file have non zero sequence number");
486 }
f67539c2 487 file_to_ingest->largest_internal_key.SetFrom(key);
7c673cae 488
11fdf7f2 489 bounds_set = true;
7c673cae 490 }
11fdf7f2
TL
491
492 // We may need to adjust these key bounds, depending on whether any range
493 // deletion tombstones extend past them.
494 const Comparator* ucmp = cfd_->internal_comparator().user_comparator();
495 if (range_del_iter != nullptr) {
496 for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
497 range_del_iter->Next()) {
498 if (!ParseInternalKey(range_del_iter->key(), &key)) {
499 return Status::Corruption("external file have corrupted keys");
500 }
501 RangeTombstone tombstone(key, range_del_iter->value());
502
f67539c2
TL
503 InternalKey start_key = tombstone.SerializeKey();
504 if (!bounds_set ||
505 sstableKeyCompare(ucmp, start_key,
506 file_to_ingest->smallest_internal_key) < 0) {
507 file_to_ingest->smallest_internal_key = start_key;
11fdf7f2 508 }
f67539c2
TL
509 InternalKey end_key = tombstone.SerializeEndKey();
510 if (!bounds_set ||
511 sstableKeyCompare(ucmp, end_key,
512 file_to_ingest->largest_internal_key) > 0) {
513 file_to_ingest->largest_internal_key = end_key;
11fdf7f2
TL
514 }
515 bounds_set = true;
516 }
7c673cae 517 }
7c673cae
FG
518
519 file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
520
521 file_to_ingest->table_properties = *props;
522
523 return status;
524}
525
7c673cae
FG
526Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
527 SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
f67539c2
TL
528 SequenceNumber last_seqno, IngestedFileInfo* file_to_ingest,
529 SequenceNumber* assigned_seqno) {
7c673cae
FG
530 Status status;
531 *assigned_seqno = 0;
7c673cae
FG
532 if (force_global_seqno) {
533 *assigned_seqno = last_seqno + 1;
f67539c2 534 if (compaction_style == kCompactionStyleUniversal || files_overlap_) {
7c673cae
FG
535 file_to_ingest->picked_level = 0;
536 return status;
537 }
538 }
539
540 bool overlap_with_db = false;
541 Arena arena;
542 ReadOptions ro;
543 ro.total_order_seek = true;
7c673cae
FG
544 int target_level = 0;
545 auto* vstorage = cfd_->current()->storage_info();
11fdf7f2 546
7c673cae
FG
547 for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) {
548 if (lvl > 0 && lvl < vstorage->base_level()) {
549 continue;
550 }
551
552 if (vstorage->NumLevelFiles(lvl) > 0) {
553 bool overlap_with_level = false;
f67539c2
TL
554 status = sv->current->OverlapWithLevelIterator(
555 ro, env_options_, file_to_ingest->smallest_internal_key.user_key(),
556 file_to_ingest->largest_internal_key.user_key(), lvl,
557 &overlap_with_level);
7c673cae
FG
558 if (!status.ok()) {
559 return status;
560 }
561 if (overlap_with_level) {
562 // We must use L0 or any level higher than `lvl` to be able to overwrite
563 // the keys that we overlap with in this level, We also need to assign
564 // this file a seqno to overwrite the existing keys in level `lvl`
565 overlap_with_db = true;
566 break;
567 }
568
569 if (compaction_style == kCompactionStyleUniversal && lvl != 0) {
570 const std::vector<FileMetaData*>& level_files =
571 vstorage->LevelFiles(lvl);
572 const SequenceNumber level_largest_seqno =
573 (*max_element(level_files.begin(), level_files.end(),
574 [](FileMetaData* f1, FileMetaData* f2) {
11fdf7f2 575 return f1->fd.largest_seqno < f2->fd.largest_seqno;
7c673cae 576 }))
11fdf7f2
TL
577 ->fd.largest_seqno;
578 // should only assign seqno to current level's largest seqno when
579 // the file fits
580 if (level_largest_seqno != 0 &&
581 IngestedFileFitInLevel(file_to_ingest, lvl)) {
7c673cae
FG
582 *assigned_seqno = level_largest_seqno;
583 } else {
584 continue;
585 }
586 }
587 } else if (compaction_style == kCompactionStyleUniversal) {
588 continue;
589 }
590
591 // We dont overlap with any keys in this level, but we still need to check
592 // if our file can fit in it
593 if (IngestedFileFitInLevel(file_to_ingest, lvl)) {
594 target_level = lvl;
595 }
596 }
f67539c2
TL
597 // If files overlap, we have to ingest them at level 0 and assign the newest
598 // sequence number
599 if (files_overlap_) {
600 target_level = 0;
601 *assigned_seqno = last_seqno + 1;
602 }
7c673cae
FG
603 TEST_SYNC_POINT_CALLBACK(
604 "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
605 &overlap_with_db);
606 file_to_ingest->picked_level = target_level;
607 if (overlap_with_db && *assigned_seqno == 0) {
608 *assigned_seqno = last_seqno + 1;
609 }
610 return status;
611}
612
11fdf7f2
TL
613Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
614 IngestedFileInfo* file_to_ingest) {
615 auto* vstorage = cfd_->current()->storage_info();
616 // first check if new files fit in the bottommost level
617 int bottom_lvl = cfd_->NumberLevels() - 1;
618 if(!IngestedFileFitInLevel(file_to_ingest, bottom_lvl)) {
619 return Status::InvalidArgument(
620 "Can't ingest_behind file as it doesn't fit "
621 "at the bottommost level!");
622 }
623
624 // second check if despite allow_ingest_behind=true we still have 0 seqnums
625 // at some upper level
626 for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) {
627 for (auto file : vstorage->LevelFiles(lvl)) {
628 if (file->fd.smallest_seqno == 0) {
629 return Status::InvalidArgument(
630 "Can't ingest_behind file as despite allow_ingest_behind=true "
631 "there are files with 0 seqno in database at upper levels!");
632 }
633 }
634 }
635
636 file_to_ingest->picked_level = bottom_lvl;
637 return Status::OK();
638}
639
7c673cae
FG
640Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
641 IngestedFileInfo* file_to_ingest, SequenceNumber seqno) {
642 if (file_to_ingest->original_seqno == seqno) {
643 // This file already have the correct global seqno
644 return Status::OK();
645 } else if (!ingestion_options_.allow_global_seqno) {
646 return Status::InvalidArgument("Global seqno is required, but disabled");
647 } else if (file_to_ingest->global_seqno_offset == 0) {
648 return Status::InvalidArgument(
649 "Trying to set global seqno for a file that dont have a global seqno "
650 "field");
651 }
652
11fdf7f2
TL
653 if (ingestion_options_.write_global_seqno) {
654 // Determine if we can write global_seqno to a given offset of file.
655 // If the file system does not support random write, then we should not.
656 // Otherwise we should.
f67539c2
TL
657 std::unique_ptr<FSRandomRWFile> rwfile;
658 Status status =
659 fs_->NewRandomRWFile(file_to_ingest->internal_file_path, env_options_,
660 &rwfile, nullptr);
11fdf7f2
TL
661 if (status.ok()) {
662 std::string seqno_val;
663 PutFixed64(&seqno_val, seqno);
f67539c2
TL
664 status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val,
665 IOOptions(), nullptr);
666 if (status.ok()) {
667 TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno");
668 status = SyncIngestedFile(rwfile.get());
669 TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncGlobalSeqno");
670 if (!status.ok()) {
671 ROCKS_LOG_WARN(db_options_.info_log,
672 "Failed to sync ingested file %s after writing global "
673 "sequence number: %s",
674 file_to_ingest->internal_file_path.c_str(),
675 status.ToString().c_str());
676 }
677 }
11fdf7f2
TL
678 if (!status.ok()) {
679 return status;
680 }
681 } else if (!status.IsNotSupported()) {
682 return status;
7c673cae
FG
683 }
684 }
685
11fdf7f2
TL
686 file_to_ingest->assigned_seqno = seqno;
687 return Status::OK();
7c673cae
FG
688}
689
690bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
691 const IngestedFileInfo* file_to_ingest, int level) {
692 if (level == 0) {
693 // Files can always fit in L0
694 return true;
695 }
696
697 auto* vstorage = cfd_->current()->storage_info();
f67539c2
TL
698 Slice file_smallest_user_key(
699 file_to_ingest->smallest_internal_key.user_key());
700 Slice file_largest_user_key(file_to_ingest->largest_internal_key.user_key());
7c673cae
FG
701
702 if (vstorage->OverlapInLevel(level, &file_smallest_user_key,
703 &file_largest_user_key)) {
704 // File overlap with another files in this level, we cannot
705 // add it to this level
706 return false;
707 }
708 if (cfd_->RangeOverlapWithCompaction(file_smallest_user_key,
709 file_largest_user_key, level)) {
710 // File overlap with a running compaction output that will be stored
711 // in this level, we cannot add this file to this level
712 return false;
713 }
714
715 // File did not overlap with level files, our compaction output
716 return true;
717}
718
f67539c2
TL
719template <typename TWritableFile>
720Status ExternalSstFileIngestionJob::SyncIngestedFile(TWritableFile* file) {
721 assert(file != nullptr);
722 if (db_options_.use_fsync) {
723 return file->Fsync(IOOptions(), nullptr);
724 } else {
725 return file->Sync(IOOptions(), nullptr);
726 }
727}
728
729} // namespace ROCKSDB_NAMESPACE
7c673cae
FG
730
731#endif // !ROCKSDB_LITE