]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include "db/external_sst_file_ingestion_job.h" | |
9 | ||
7c673cae | 10 | #include <algorithm> |
f67539c2 | 11 | #include <cinttypes> |
7c673cae | 12 | #include <string> |
f67539c2 | 13 | #include <unordered_set> |
7c673cae FG |
14 | #include <vector> |
15 | ||
f67539c2 | 16 | #include "db/db_impl/db_impl.h" |
7c673cae | 17 | #include "db/version_edit.h" |
f67539c2 TL |
18 | #include "file/file_util.h" |
19 | #include "file/random_access_file_reader.h" | |
1e59de90 | 20 | #include "logging/logging.h" |
7c673cae FG |
21 | #include "table/merging_iterator.h" |
22 | #include "table/scoped_arena_iterator.h" | |
23 | #include "table/sst_file_writer_collectors.h" | |
24 | #include "table/table_builder.h" | |
1e59de90 | 25 | #include "table/unique_id_impl.h" |
f67539c2 | 26 | #include "test_util/sync_point.h" |
7c673cae | 27 | #include "util/stop_watch.h" |
7c673cae | 28 | |
f67539c2 | 29 | namespace ROCKSDB_NAMESPACE { |
7c673cae FG |
30 | |
31 | Status ExternalSstFileIngestionJob::Prepare( | |
11fdf7f2 | 32 | const std::vector<std::string>& external_files_paths, |
20effc67 TL |
33 | const std::vector<std::string>& files_checksums, |
34 | const std::vector<std::string>& files_checksum_func_names, | |
1e59de90 TL |
35 | const Temperature& file_temperature, uint64_t next_file_number, |
36 | SuperVersion* sv) { | |
7c673cae FG |
37 | Status status; |
38 | ||
39 | // Read the information of files we are ingesting | |
40 | for (const std::string& file_path : external_files_paths) { | |
41 | IngestedFileInfo file_to_ingest; | |
1e59de90 TL |
42 | status = |
43 | GetIngestedFileInfo(file_path, next_file_number++, &file_to_ingest, sv); | |
7c673cae FG |
44 | if (!status.ok()) { |
45 | return status; | |
46 | } | |
7c673cae | 47 | |
1e59de90 | 48 | if (file_to_ingest.cf_id != |
7c673cae | 49 | TablePropertiesCollectorFactory::Context::kUnknownColumnFamily && |
1e59de90 | 50 | file_to_ingest.cf_id != cfd_->GetID()) { |
7c673cae | 51 | return Status::InvalidArgument( |
20effc67 | 52 | "External file column family id don't match"); |
7c673cae | 53 | } |
1e59de90 TL |
54 | |
55 | if (file_to_ingest.num_entries == 0 && | |
56 | file_to_ingest.num_range_deletions == 0) { | |
57 | return Status::InvalidArgument("File contain no entries"); | |
58 | } | |
59 | ||
60 | if (!file_to_ingest.smallest_internal_key.Valid() || | |
61 | !file_to_ingest.largest_internal_key.Valid()) { | |
62 | return Status::Corruption("Generated table have corrupted keys"); | |
63 | } | |
64 | ||
65 | files_to_ingest_.emplace_back(std::move(file_to_ingest)); | |
7c673cae FG |
66 | } |
67 | ||
68 | const Comparator* ucmp = cfd_->internal_comparator().user_comparator(); | |
69 | auto num_files = files_to_ingest_.size(); | |
70 | if (num_files == 0) { | |
71 | return Status::InvalidArgument("The list of files is empty"); | |
72 | } else if (num_files > 1) { | |
20effc67 | 73 | // Verify that passed files don't have overlapping ranges |
7c673cae FG |
74 | autovector<const IngestedFileInfo*> sorted_files; |
75 | for (size_t i = 0; i < num_files; i++) { | |
76 | sorted_files.push_back(&files_to_ingest_[i]); | |
77 | } | |
78 | ||
79 | std::sort( | |
80 | sorted_files.begin(), sorted_files.end(), | |
81 | [&ucmp](const IngestedFileInfo* info1, const IngestedFileInfo* info2) { | |
f67539c2 TL |
82 | return sstableKeyCompare(ucmp, info1->smallest_internal_key, |
83 | info2->smallest_internal_key) < 0; | |
7c673cae FG |
84 | }); |
85 | ||
20effc67 | 86 | for (size_t i = 0; i + 1 < num_files; i++) { |
f67539c2 TL |
87 | if (sstableKeyCompare(ucmp, sorted_files[i]->largest_internal_key, |
88 | sorted_files[i + 1]->smallest_internal_key) >= 0) { | |
89 | files_overlap_ = true; | |
90 | break; | |
7c673cae FG |
91 | } |
92 | } | |
93 | } | |
94 | ||
1e59de90 TL |
95 | // Hanlde the file temperature |
96 | for (size_t i = 0; i < num_files; i++) { | |
97 | files_to_ingest_[i].file_temperature = file_temperature; | |
f67539c2 TL |
98 | } |
99 | ||
1e59de90 TL |
100 | if (ingestion_options_.ingest_behind && files_overlap_) { |
101 | return Status::NotSupported("Files have overlapping ranges"); | |
7c673cae FG |
102 | } |
103 | ||
104 | // Copy/Move external files into DB | |
f67539c2 | 105 | std::unordered_set<size_t> ingestion_path_ids; |
7c673cae | 106 | for (IngestedFileInfo& f : files_to_ingest_) { |
f67539c2 | 107 | f.copy_file = false; |
7c673cae | 108 | const std::string path_outside_db = f.external_file_path; |
1e59de90 TL |
109 | const std::string path_inside_db = TableFileName( |
110 | cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId()); | |
7c673cae | 111 | if (ingestion_options_.move_files) { |
f67539c2 TL |
112 | status = |
113 | fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr); | |
114 | if (status.ok()) { | |
115 | // It is unsafe to assume application had sync the file and file | |
116 | // directory before ingest the file. For integrity of RocksDB we need | |
117 | // to sync the file. | |
118 | std::unique_ptr<FSWritableFile> file_to_sync; | |
1e59de90 TL |
119 | Status s = fs_->ReopenWritableFile(path_inside_db, env_options_, |
120 | &file_to_sync, nullptr); | |
121 | TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:Reopen", | |
122 | &s); | |
123 | // Some file systems (especially remote/distributed) don't support | |
124 | // reopening a file for writing and don't require reopening and | |
125 | // syncing the file. Ignore the NotSupported error in that case. | |
126 | if (!s.IsNotSupported()) { | |
127 | status = s; | |
128 | if (status.ok()) { | |
129 | TEST_SYNC_POINT( | |
130 | "ExternalSstFileIngestionJob::BeforeSyncIngestedFile"); | |
131 | status = SyncIngestedFile(file_to_sync.get()); | |
132 | TEST_SYNC_POINT( | |
133 | "ExternalSstFileIngestionJob::AfterSyncIngestedFile"); | |
134 | if (!status.ok()) { | |
135 | ROCKS_LOG_WARN(db_options_.info_log, | |
136 | "Failed to sync ingested file %s: %s", | |
137 | path_inside_db.c_str(), status.ToString().c_str()); | |
138 | } | |
f67539c2 TL |
139 | } |
140 | } | |
141 | } else if (status.IsNotSupported() && | |
142 | ingestion_options_.failed_move_fall_back_to_copy) { | |
143 | // Original file is on a different FS, use copy instead of hard linking. | |
11fdf7f2 | 144 | f.copy_file = true; |
1e59de90 TL |
145 | ROCKS_LOG_INFO(db_options_.info_log, |
146 | "Triy to link file %s but it's not supported : %s", | |
147 | path_outside_db.c_str(), status.ToString().c_str()); | |
7c673cae FG |
148 | } |
149 | } else { | |
11fdf7f2 | 150 | f.copy_file = true; |
7c673cae | 151 | } |
f67539c2 TL |
152 | |
153 | if (f.copy_file) { | |
154 | TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile", | |
155 | nullptr); | |
156 | // CopyFile also sync the new file. | |
1e59de90 TL |
157 | status = |
158 | CopyFile(fs_.get(), path_outside_db, path_inside_db, 0, | |
159 | db_options_.use_fsync, io_tracer_, Temperature::kUnknown); | |
f67539c2 | 160 | } |
11fdf7f2 | 161 | TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded"); |
7c673cae FG |
162 | if (!status.ok()) { |
163 | break; | |
164 | } | |
165 | f.internal_file_path = path_inside_db; | |
20effc67 TL |
166 | // Initialize the checksum information of ingested files. |
167 | f.file_checksum = kUnknownFileChecksum; | |
168 | f.file_checksum_func_name = kUnknownFileChecksumFuncName; | |
f67539c2 TL |
169 | ingestion_path_ids.insert(f.fd.GetPathId()); |
170 | } | |
171 | ||
172 | TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir"); | |
173 | if (status.ok()) { | |
174 | for (auto path_id : ingestion_path_ids) { | |
1e59de90 TL |
175 | status = directories_->GetDataDir(path_id)->FsyncWithDirOptions( |
176 | IOOptions(), nullptr, | |
177 | DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); | |
f67539c2 TL |
178 | if (!status.ok()) { |
179 | ROCKS_LOG_WARN(db_options_.info_log, | |
180 | "Failed to sync directory %" ROCKSDB_PRIszt | |
181 | " while ingest file: %s", | |
182 | path_id, status.ToString().c_str()); | |
183 | break; | |
184 | } | |
185 | } | |
7c673cae | 186 | } |
f67539c2 | 187 | TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir"); |
7c673cae | 188 | |
20effc67 TL |
189 | // Generate and check the sst file checksum. Note that, if |
190 | // IngestExternalFileOptions::write_global_seqno is true, we will not update | |
191 | // the checksum information in the files_to_ingests_ here, since the file is | |
192 | // upadted with the new global_seqno. After global_seqno is updated, DB will | |
193 | // generate the new checksum and store it in the Manifest. In all other cases | |
194 | // if ingestion_options_.write_global_seqno == true and | |
195 | // verify_file_checksum is false, we only check the checksum function name. | |
196 | if (status.ok() && db_options_.file_checksum_gen_factory != nullptr) { | |
197 | if (ingestion_options_.verify_file_checksum == false && | |
198 | files_checksums.size() == files_to_ingest_.size() && | |
199 | files_checksum_func_names.size() == files_to_ingest_.size()) { | |
200 | // Only when verify_file_checksum == false and the checksum for ingested | |
201 | // files are provided, DB will use the provided checksum and does not | |
202 | // generate the checksum for ingested files. | |
203 | need_generate_file_checksum_ = false; | |
204 | } else { | |
205 | need_generate_file_checksum_ = true; | |
206 | } | |
207 | FileChecksumGenContext gen_context; | |
208 | std::unique_ptr<FileChecksumGenerator> file_checksum_gen = | |
209 | db_options_.file_checksum_gen_factory->CreateFileChecksumGenerator( | |
210 | gen_context); | |
211 | std::vector<std::string> generated_checksums; | |
212 | std::vector<std::string> generated_checksum_func_names; | |
213 | // Step 1: generate the checksum for ingested sst file. | |
214 | if (need_generate_file_checksum_) { | |
215 | for (size_t i = 0; i < files_to_ingest_.size(); i++) { | |
216 | std::string generated_checksum; | |
217 | std::string generated_checksum_func_name; | |
218 | std::string requested_checksum_func_name; | |
1e59de90 TL |
219 | // TODO: rate limit file reads for checksum calculation during file |
220 | // ingestion. | |
20effc67 TL |
221 | IOStatus io_s = GenerateOneFileChecksum( |
222 | fs_.get(), files_to_ingest_[i].internal_file_path, | |
223 | db_options_.file_checksum_gen_factory.get(), | |
224 | requested_checksum_func_name, &generated_checksum, | |
225 | &generated_checksum_func_name, | |
226 | ingestion_options_.verify_checksums_readahead_size, | |
1e59de90 TL |
227 | db_options_.allow_mmap_reads, io_tracer_, |
228 | db_options_.rate_limiter.get(), | |
229 | Env::IO_TOTAL /* rate_limiter_priority */); | |
20effc67 TL |
230 | if (!io_s.ok()) { |
231 | status = io_s; | |
232 | ROCKS_LOG_WARN(db_options_.info_log, | |
233 | "Sst file checksum generation of file: %s failed: %s", | |
234 | files_to_ingest_[i].internal_file_path.c_str(), | |
235 | status.ToString().c_str()); | |
236 | break; | |
237 | } | |
238 | if (ingestion_options_.write_global_seqno == false) { | |
239 | files_to_ingest_[i].file_checksum = generated_checksum; | |
240 | files_to_ingest_[i].file_checksum_func_name = | |
241 | generated_checksum_func_name; | |
242 | } | |
243 | generated_checksums.push_back(generated_checksum); | |
244 | generated_checksum_func_names.push_back(generated_checksum_func_name); | |
245 | } | |
246 | } | |
247 | ||
248 | // Step 2: based on the verify_file_checksum and ingested checksum | |
249 | // information, do the verification. | |
250 | if (status.ok()) { | |
251 | if (files_checksums.size() == files_to_ingest_.size() && | |
252 | files_checksum_func_names.size() == files_to_ingest_.size()) { | |
253 | // Verify the checksum and checksum function name. | |
254 | if (ingestion_options_.verify_file_checksum) { | |
255 | for (size_t i = 0; i < files_to_ingest_.size(); i++) { | |
256 | if (files_checksum_func_names[i] != | |
257 | generated_checksum_func_names[i]) { | |
258 | status = Status::InvalidArgument( | |
259 | "Checksum function name does not match with the checksum " | |
260 | "function name of this DB"); | |
261 | ROCKS_LOG_WARN( | |
262 | db_options_.info_log, | |
263 | "Sst file checksum verification of file: %s failed: %s", | |
264 | external_files_paths[i].c_str(), status.ToString().c_str()); | |
265 | break; | |
266 | } | |
267 | if (files_checksums[i] != generated_checksums[i]) { | |
268 | status = Status::Corruption( | |
269 | "Ingested checksum does not match with the generated " | |
270 | "checksum"); | |
271 | ROCKS_LOG_WARN( | |
272 | db_options_.info_log, | |
273 | "Sst file checksum verification of file: %s failed: %s", | |
274 | files_to_ingest_[i].internal_file_path.c_str(), | |
275 | status.ToString().c_str()); | |
276 | break; | |
277 | } | |
278 | } | |
279 | } else { | |
280 | // If verify_file_checksum is not enabled, we only verify the | |
281 | // checksum function name. If it does not match, fail the ingestion. | |
282 | // If matches, we trust the ingested checksum information and store | |
283 | // in the Manifest. | |
284 | for (size_t i = 0; i < files_to_ingest_.size(); i++) { | |
285 | if (files_checksum_func_names[i] != file_checksum_gen->Name()) { | |
286 | status = Status::InvalidArgument( | |
287 | "Checksum function name does not match with the checksum " | |
288 | "function name of this DB"); | |
289 | ROCKS_LOG_WARN( | |
290 | db_options_.info_log, | |
291 | "Sst file checksum verification of file: %s failed: %s", | |
292 | external_files_paths[i].c_str(), status.ToString().c_str()); | |
293 | break; | |
294 | } | |
295 | files_to_ingest_[i].file_checksum = files_checksums[i]; | |
296 | files_to_ingest_[i].file_checksum_func_name = | |
297 | files_checksum_func_names[i]; | |
298 | } | |
299 | } | |
300 | } else if (files_checksums.size() != files_checksum_func_names.size() || | |
301 | (files_checksums.size() == files_checksum_func_names.size() && | |
302 | files_checksums.size() != 0)) { | |
303 | // The checksum or checksum function name vector are not both empty | |
304 | // and they are incomplete. | |
305 | status = Status::InvalidArgument( | |
306 | "The checksum information of ingested sst files are nonempty and " | |
307 | "the size of checksums or the size of the checksum function " | |
308 | "names " | |
309 | "does not match with the number of ingested sst files"); | |
310 | ROCKS_LOG_WARN( | |
311 | db_options_.info_log, | |
312 | "The ingested sst files checksum information is incomplete: %s", | |
313 | status.ToString().c_str()); | |
314 | } | |
315 | } | |
316 | } | |
317 | ||
f67539c2 | 318 | // TODO: The following is duplicated with Cleanup(). |
7c673cae | 319 | if (!status.ok()) { |
1e59de90 | 320 | IOOptions io_opts; |
7c673cae FG |
321 | // We failed, remove all files that we copied into the db |
322 | for (IngestedFileInfo& f : files_to_ingest_) { | |
11fdf7f2 | 323 | if (f.internal_file_path.empty()) { |
f67539c2 | 324 | continue; |
7c673cae | 325 | } |
1e59de90 | 326 | Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr); |
7c673cae FG |
327 | if (!s.ok()) { |
328 | ROCKS_LOG_WARN(db_options_.info_log, | |
329 | "AddFile() clean up for file %s failed : %s", | |
330 | f.internal_file_path.c_str(), s.ToString().c_str()); | |
331 | } | |
332 | } | |
333 | } | |
334 | ||
335 | return status; | |
336 | } | |
337 | ||
11fdf7f2 TL |
338 | Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed, |
339 | SuperVersion* super_version) { | |
340 | autovector<Range> ranges; | |
1e59de90 TL |
341 | autovector<std::string> keys; |
342 | size_t ts_sz = cfd_->user_comparator()->timestamp_size(); | |
343 | if (ts_sz) { | |
344 | // Check all ranges [begin, end] inclusively. Add maximum | |
345 | // timestamp to include all `begin` keys, and add minimal timestamp to | |
346 | // include all `end` keys. | |
347 | for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) { | |
348 | std::string begin_str; | |
349 | std::string end_str; | |
350 | AppendUserKeyWithMaxTimestamp( | |
351 | &begin_str, file_to_ingest.smallest_internal_key.user_key(), ts_sz); | |
352 | AppendKeyWithMinTimestamp( | |
353 | &end_str, file_to_ingest.largest_internal_key.user_key(), ts_sz); | |
354 | keys.emplace_back(std::move(begin_str)); | |
355 | keys.emplace_back(std::move(end_str)); | |
356 | } | |
357 | for (size_t i = 0; i < files_to_ingest_.size(); ++i) { | |
358 | ranges.emplace_back(keys[2 * i], keys[2 * i + 1]); | |
359 | } | |
360 | } else { | |
361 | for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) { | |
362 | ranges.emplace_back(file_to_ingest.smallest_internal_key.user_key(), | |
363 | file_to_ingest.largest_internal_key.user_key()); | |
364 | } | |
11fdf7f2 | 365 | } |
20effc67 TL |
366 | Status status = cfd_->RangesOverlapWithMemtables( |
367 | ranges, super_version, db_options_.allow_data_in_errors, flush_needed); | |
7c673cae FG |
368 | if (status.ok() && *flush_needed && |
369 | !ingestion_options_.allow_blocking_flush) { | |
370 | status = Status::InvalidArgument("External file requires flush"); | |
371 | } | |
372 | return status; | |
373 | } | |
374 | ||
11fdf7f2 TL |
375 | // REQUIRES: we have become the only writer by entering both write_thread_ and |
376 | // nonmem_write_thread_ | |
7c673cae FG |
377 | Status ExternalSstFileIngestionJob::Run() { |
378 | Status status; | |
11fdf7f2 | 379 | SuperVersion* super_version = cfd_->GetSuperVersion(); |
7c673cae FG |
380 | #ifndef NDEBUG |
381 | // We should never run the job with a memtable that is overlapping | |
382 | // with the files we are ingesting | |
383 | bool need_flush = false; | |
11fdf7f2 | 384 | status = NeedsFlush(&need_flush, super_version); |
1e59de90 TL |
385 | if (!status.ok()) { |
386 | return status; | |
387 | } | |
388 | if (need_flush) { | |
389 | return Status::TryAgain(); | |
390 | } | |
7c673cae FG |
391 | assert(status.ok() && need_flush == false); |
392 | #endif | |
393 | ||
7c673cae FG |
394 | bool force_global_seqno = false; |
395 | ||
396 | if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) { | |
397 | // We need to assign a global sequence number to all the files even | |
20effc67 | 398 | // if the don't overlap with any ranges since we have snapshots |
7c673cae FG |
399 | force_global_seqno = true; |
400 | } | |
11fdf7f2 TL |
401 | // It is safe to use this instead of LastAllocatedSequence since we are |
402 | // the only active writer, and hence they are equal | |
f67539c2 | 403 | SequenceNumber last_seqno = versions_->LastSequence(); |
7c673cae FG |
404 | edit_.SetColumnFamily(cfd_->GetID()); |
405 | // The levels that the files will be ingested into | |
406 | ||
407 | for (IngestedFileInfo& f : files_to_ingest_) { | |
408 | SequenceNumber assigned_seqno = 0; | |
11fdf7f2 TL |
409 | if (ingestion_options_.ingest_behind) { |
410 | status = CheckLevelForIngestedBehindFile(&f); | |
411 | } else { | |
412 | status = AssignLevelAndSeqnoForIngestedFile( | |
f67539c2 TL |
413 | super_version, force_global_seqno, cfd_->ioptions()->compaction_style, |
414 | last_seqno, &f, &assigned_seqno); | |
11fdf7f2 | 415 | } |
1e59de90 TL |
416 | |
417 | // Modify the smallest/largest internal key to include the sequence number | |
418 | // that we just learned. Only overwrite sequence number zero. There could | |
419 | // be a nonzero sequence number already to indicate a range tombstone's | |
420 | // exclusive endpoint. | |
421 | ParsedInternalKey smallest_parsed, largest_parsed; | |
422 | if (status.ok()) { | |
423 | status = ParseInternalKey(*f.smallest_internal_key.rep(), | |
424 | &smallest_parsed, false /* log_err_key */); | |
425 | } | |
426 | if (status.ok()) { | |
427 | status = ParseInternalKey(*f.largest_internal_key.rep(), &largest_parsed, | |
428 | false /* log_err_key */); | |
429 | } | |
7c673cae FG |
430 | if (!status.ok()) { |
431 | return status; | |
432 | } | |
1e59de90 TL |
433 | if (smallest_parsed.sequence == 0) { |
434 | UpdateInternalKey(f.smallest_internal_key.rep(), assigned_seqno, | |
435 | smallest_parsed.type); | |
436 | } | |
437 | if (largest_parsed.sequence == 0) { | |
438 | UpdateInternalKey(f.largest_internal_key.rep(), assigned_seqno, | |
439 | largest_parsed.type); | |
440 | } | |
441 | ||
7c673cae FG |
442 | status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno); |
443 | TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run", | |
444 | &assigned_seqno); | |
f67539c2 TL |
445 | if (assigned_seqno > last_seqno) { |
446 | assert(assigned_seqno == last_seqno + 1); | |
447 | last_seqno = assigned_seqno; | |
448 | ++consumed_seqno_count_; | |
7c673cae FG |
449 | } |
450 | if (!status.ok()) { | |
451 | return status; | |
452 | } | |
f67539c2 | 453 | |
20effc67 TL |
454 | status = GenerateChecksumForIngestedFile(&f); |
455 | if (!status.ok()) { | |
456 | return status; | |
457 | } | |
458 | ||
f67539c2 TL |
459 | // We use the import time as the ancester time. This is the time the data |
460 | // is written to the database. | |
461 | int64_t temp_current_time = 0; | |
462 | uint64_t current_time = kUnknownFileCreationTime; | |
463 | uint64_t oldest_ancester_time = kUnknownOldestAncesterTime; | |
1e59de90 | 464 | if (clock_->GetCurrentTime(&temp_current_time).ok()) { |
f67539c2 TL |
465 | current_time = oldest_ancester_time = |
466 | static_cast<uint64_t>(temp_current_time); | |
467 | } | |
1e59de90 TL |
468 | FileMetaData f_metadata( |
469 | f.fd.GetNumber(), f.fd.GetPathId(), f.fd.GetFileSize(), | |
470 | f.smallest_internal_key, f.largest_internal_key, f.assigned_seqno, | |
471 | f.assigned_seqno, false, f.file_temperature, kInvalidBlobFileNumber, | |
472 | oldest_ancester_time, current_time, f.file_checksum, | |
473 | f.file_checksum_func_name, f.unique_id); | |
474 | f_metadata.temperature = f.file_temperature; | |
475 | edit_.AddFile(f.picked_level, f_metadata); | |
7c673cae | 476 | } |
7c673cae FG |
477 | return status; |
478 | } | |
479 | ||
480 | void ExternalSstFileIngestionJob::UpdateStats() { | |
481 | // Update internal stats for new ingested files | |
482 | uint64_t total_keys = 0; | |
483 | uint64_t total_l0_files = 0; | |
1e59de90 | 484 | uint64_t total_time = clock_->NowMicros() - job_start_time_; |
f67539c2 TL |
485 | |
486 | EventLoggerStream stream = event_logger_->Log(); | |
487 | stream << "event" | |
488 | << "ingest_finished"; | |
489 | stream << "files_ingested"; | |
490 | stream.StartArray(); | |
491 | ||
7c673cae | 492 | for (IngestedFileInfo& f : files_to_ingest_) { |
1e59de90 TL |
493 | InternalStats::CompactionStats stats( |
494 | CompactionReason::kExternalSstIngestion, 1); | |
7c673cae | 495 | stats.micros = total_time; |
494da23a | 496 | // If actual copy occurred for this file, then we need to count the file |
11fdf7f2 TL |
497 | // size as the actual bytes written. If the file was linked, then we ignore |
498 | // the bytes written for file metadata. | |
499 | // TODO (yanqin) maybe account for file metadata bytes for exact accuracy? | |
500 | if (f.copy_file) { | |
501 | stats.bytes_written = f.fd.GetFileSize(); | |
502 | } else { | |
503 | stats.bytes_moved = f.fd.GetFileSize(); | |
504 | } | |
7c673cae | 505 | stats.num_output_files = 1; |
494da23a TL |
506 | cfd_->internal_stats()->AddCompactionStats(f.picked_level, |
507 | Env::Priority::USER, stats); | |
7c673cae FG |
508 | cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE, |
509 | f.fd.GetFileSize()); | |
510 | total_keys += f.num_entries; | |
511 | if (f.picked_level == 0) { | |
512 | total_l0_files += 1; | |
513 | } | |
514 | ROCKS_LOG_INFO( | |
515 | db_options_.info_log, | |
516 | "[AddFile] External SST file %s was ingested in L%d with path %s " | |
517 | "(global_seqno=%" PRIu64 ")\n", | |
518 | f.external_file_path.c_str(), f.picked_level, | |
519 | f.internal_file_path.c_str(), f.assigned_seqno); | |
f67539c2 TL |
520 | stream << "file" << f.internal_file_path << "level" << f.picked_level; |
521 | } | |
522 | stream.EndArray(); | |
523 | ||
524 | stream << "lsm_state"; | |
525 | stream.StartArray(); | |
526 | auto vstorage = cfd_->current()->storage_info(); | |
527 | for (int level = 0; level < vstorage->num_levels(); ++level) { | |
528 | stream << vstorage->NumLevelFiles(level); | |
7c673cae | 529 | } |
f67539c2 TL |
530 | stream.EndArray(); |
531 | ||
7c673cae FG |
532 | cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL, |
533 | total_keys); | |
534 | cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL, | |
535 | files_to_ingest_.size()); | |
536 | cfd_->internal_stats()->AddCFStats( | |
537 | InternalStats::INGESTED_LEVEL0_NUM_FILES_TOTAL, total_l0_files); | |
538 | } | |
539 | ||
540 | void ExternalSstFileIngestionJob::Cleanup(const Status& status) { | |
1e59de90 | 541 | IOOptions io_opts; |
7c673cae FG |
542 | if (!status.ok()) { |
543 | // We failed to add the files to the database | |
544 | // remove all the files we copied | |
545 | for (IngestedFileInfo& f : files_to_ingest_) { | |
f67539c2 TL |
546 | if (f.internal_file_path.empty()) { |
547 | continue; | |
548 | } | |
1e59de90 | 549 | Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr); |
7c673cae FG |
550 | if (!s.ok()) { |
551 | ROCKS_LOG_WARN(db_options_.info_log, | |
552 | "AddFile() clean up for file %s failed : %s", | |
553 | f.internal_file_path.c_str(), s.ToString().c_str()); | |
554 | } | |
555 | } | |
f67539c2 TL |
556 | consumed_seqno_count_ = 0; |
557 | files_overlap_ = false; | |
7c673cae FG |
558 | } else if (status.ok() && ingestion_options_.move_files) { |
559 | // The files were moved and added successfully, remove original file links | |
560 | for (IngestedFileInfo& f : files_to_ingest_) { | |
1e59de90 | 561 | Status s = fs_->DeleteFile(f.external_file_path, io_opts, nullptr); |
7c673cae FG |
562 | if (!s.ok()) { |
563 | ROCKS_LOG_WARN( | |
564 | db_options_.info_log, | |
565 | "%s was added to DB successfully but failed to remove original " | |
566 | "file link : %s", | |
567 | f.external_file_path.c_str(), s.ToString().c_str()); | |
568 | } | |
569 | } | |
570 | } | |
571 | } | |
572 | ||
573 | Status ExternalSstFileIngestionJob::GetIngestedFileInfo( | |
1e59de90 TL |
574 | const std::string& external_file, uint64_t new_file_number, |
575 | IngestedFileInfo* file_to_ingest, SuperVersion* sv) { | |
7c673cae FG |
576 | file_to_ingest->external_file_path = external_file; |
577 | ||
578 | // Get external file size | |
f67539c2 TL |
579 | Status status = fs_->GetFileSize(external_file, IOOptions(), |
580 | &file_to_ingest->file_size, nullptr); | |
7c673cae FG |
581 | if (!status.ok()) { |
582 | return status; | |
583 | } | |
584 | ||
1e59de90 TL |
585 | // Assign FD with number |
586 | file_to_ingest->fd = | |
587 | FileDescriptor(new_file_number, 0, file_to_ingest->file_size); | |
588 | ||
7c673cae FG |
589 | // Create TableReader for external file |
590 | std::unique_ptr<TableReader> table_reader; | |
f67539c2 | 591 | std::unique_ptr<FSRandomAccessFile> sst_file; |
7c673cae FG |
592 | std::unique_ptr<RandomAccessFileReader> sst_file_reader; |
593 | ||
1e59de90 TL |
594 | status = |
595 | fs_->NewRandomAccessFile(external_file, env_options_, &sst_file, nullptr); | |
7c673cae FG |
596 | if (!status.ok()) { |
597 | return status; | |
598 | } | |
20effc67 TL |
599 | sst_file_reader.reset(new RandomAccessFileReader( |
600 | std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_)); | |
7c673cae FG |
601 | |
602 | status = cfd_->ioptions()->table_factory->NewTableReader( | |
1e59de90 TL |
603 | TableReaderOptions( |
604 | *cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor, | |
605 | env_options_, cfd_->internal_comparator(), | |
606 | /*skip_filters*/ false, /*immortal*/ false, | |
607 | /*force_direct_prefetch*/ false, /*level*/ -1, | |
608 | /*block_cache_tracer*/ nullptr, | |
609 | /*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(), | |
610 | /*cur_file_num*/ new_file_number), | |
7c673cae FG |
611 | std::move(sst_file_reader), file_to_ingest->file_size, &table_reader); |
612 | if (!status.ok()) { | |
613 | return status; | |
614 | } | |
615 | ||
494da23a | 616 | if (ingestion_options_.verify_checksums_before_ingest) { |
f67539c2 TL |
617 | // If customized readahead size is needed, we can pass a user option |
618 | // all the way to here. Right now we just rely on the default readahead | |
619 | // to keep things simple. | |
620 | ReadOptions ro; | |
621 | ro.readahead_size = ingestion_options_.verify_checksums_readahead_size; | |
622 | status = table_reader->VerifyChecksum( | |
623 | ro, TableReaderCaller::kExternalSSTIngestion); | |
494da23a TL |
624 | } |
625 | if (!status.ok()) { | |
626 | return status; | |
627 | } | |
628 | ||
7c673cae FG |
629 | // Get the external file properties |
630 | auto props = table_reader->GetTableProperties(); | |
631 | const auto& uprops = props->user_collected_properties; | |
632 | ||
633 | // Get table version | |
634 | auto version_iter = uprops.find(ExternalSstFilePropertyNames::kVersion); | |
635 | if (version_iter == uprops.end()) { | |
636 | return Status::Corruption("External file version not found"); | |
637 | } | |
638 | file_to_ingest->version = DecodeFixed32(version_iter->second.c_str()); | |
639 | ||
640 | auto seqno_iter = uprops.find(ExternalSstFilePropertyNames::kGlobalSeqno); | |
641 | if (file_to_ingest->version == 2) { | |
642 | // version 2 imply that we have global sequence number | |
643 | if (seqno_iter == uprops.end()) { | |
644 | return Status::Corruption( | |
645 | "External file global sequence number not found"); | |
646 | } | |
647 | ||
648 | // Set the global sequence number | |
649 | file_to_ingest->original_seqno = DecodeFixed64(seqno_iter->second.c_str()); | |
1e59de90 | 650 | if (props->external_sst_file_global_seqno_offset == 0) { |
11fdf7f2 | 651 | file_to_ingest->global_seqno_offset = 0; |
7c673cae FG |
652 | return Status::Corruption("Was not able to find file global seqno field"); |
653 | } | |
1e59de90 TL |
654 | file_to_ingest->global_seqno_offset = |
655 | static_cast<size_t>(props->external_sst_file_global_seqno_offset); | |
7c673cae FG |
656 | } else if (file_to_ingest->version == 1) { |
657 | // SST file V1 should not have global seqno field | |
658 | assert(seqno_iter == uprops.end()); | |
659 | file_to_ingest->original_seqno = 0; | |
660 | if (ingestion_options_.allow_blocking_flush || | |
1e59de90 | 661 | ingestion_options_.allow_global_seqno) { |
7c673cae | 662 | return Status::InvalidArgument( |
1e59de90 | 663 | "External SST file V1 does not support global seqno"); |
7c673cae FG |
664 | } |
665 | } else { | |
666 | return Status::InvalidArgument("External file version is not supported"); | |
667 | } | |
668 | // Get number of entries in table | |
669 | file_to_ingest->num_entries = props->num_entries; | |
11fdf7f2 | 670 | file_to_ingest->num_range_deletions = props->num_range_deletions; |
7c673cae FG |
671 | |
672 | ParsedInternalKey key; | |
673 | ReadOptions ro; | |
674 | // During reading the external file we can cache blocks that we read into | |
675 | // the block cache, if we later change the global seqno of this file, we will | |
676 | // have block in cache that will include keys with wrong seqno. | |
677 | // We need to disable fill_cache so that we read from the file without | |
678 | // updating the block cache. | |
679 | ro.fill_cache = false; | |
11fdf7f2 | 680 | std::unique_ptr<InternalIterator> iter(table_reader->NewIterator( |
f67539c2 TL |
681 | ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr, |
682 | /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion)); | |
11fdf7f2 TL |
683 | std::unique_ptr<InternalIterator> range_del_iter( |
684 | table_reader->NewRangeTombstoneIterator(ro)); | |
7c673cae | 685 | |
11fdf7f2 | 686 | // Get first (smallest) and last (largest) key from file. |
f67539c2 TL |
687 | file_to_ingest->smallest_internal_key = |
688 | InternalKey("", 0, ValueType::kTypeValue); | |
689 | file_to_ingest->largest_internal_key = | |
690 | InternalKey("", 0, ValueType::kTypeValue); | |
11fdf7f2 | 691 | bool bounds_set = false; |
20effc67 | 692 | bool allow_data_in_errors = db_options_.allow_data_in_errors; |
7c673cae | 693 | iter->SeekToFirst(); |
11fdf7f2 | 694 | if (iter->Valid()) { |
20effc67 TL |
695 | Status pik_status = |
696 | ParseInternalKey(iter->key(), &key, allow_data_in_errors); | |
697 | if (!pik_status.ok()) { | |
698 | return Status::Corruption("Corrupted key in external file. ", | |
699 | pik_status.getState()); | |
11fdf7f2 TL |
700 | } |
701 | if (key.sequence != 0) { | |
20effc67 | 702 | return Status::Corruption("External file has non zero sequence number"); |
11fdf7f2 | 703 | } |
f67539c2 | 704 | file_to_ingest->smallest_internal_key.SetFrom(key); |
11fdf7f2 TL |
705 | |
706 | iter->SeekToLast(); | |
20effc67 TL |
707 | pik_status = ParseInternalKey(iter->key(), &key, allow_data_in_errors); |
708 | if (!pik_status.ok()) { | |
709 | return Status::Corruption("Corrupted key in external file. ", | |
710 | pik_status.getState()); | |
11fdf7f2 TL |
711 | } |
712 | if (key.sequence != 0) { | |
20effc67 | 713 | return Status::Corruption("External file has non zero sequence number"); |
11fdf7f2 | 714 | } |
f67539c2 | 715 | file_to_ingest->largest_internal_key.SetFrom(key); |
7c673cae | 716 | |
11fdf7f2 | 717 | bounds_set = true; |
7c673cae | 718 | } |
11fdf7f2 TL |
719 | |
720 | // We may need to adjust these key bounds, depending on whether any range | |
721 | // deletion tombstones extend past them. | |
722 | const Comparator* ucmp = cfd_->internal_comparator().user_comparator(); | |
723 | if (range_del_iter != nullptr) { | |
724 | for (range_del_iter->SeekToFirst(); range_del_iter->Valid(); | |
725 | range_del_iter->Next()) { | |
20effc67 TL |
726 | Status pik_status = |
727 | ParseInternalKey(range_del_iter->key(), &key, allow_data_in_errors); | |
728 | if (!pik_status.ok()) { | |
729 | return Status::Corruption("Corrupted key in external file. ", | |
730 | pik_status.getState()); | |
11fdf7f2 TL |
731 | } |
732 | RangeTombstone tombstone(key, range_del_iter->value()); | |
733 | ||
f67539c2 TL |
734 | InternalKey start_key = tombstone.SerializeKey(); |
735 | if (!bounds_set || | |
736 | sstableKeyCompare(ucmp, start_key, | |
737 | file_to_ingest->smallest_internal_key) < 0) { | |
738 | file_to_ingest->smallest_internal_key = start_key; | |
11fdf7f2 | 739 | } |
f67539c2 TL |
740 | InternalKey end_key = tombstone.SerializeEndKey(); |
741 | if (!bounds_set || | |
742 | sstableKeyCompare(ucmp, end_key, | |
743 | file_to_ingest->largest_internal_key) > 0) { | |
744 | file_to_ingest->largest_internal_key = end_key; | |
11fdf7f2 TL |
745 | } |
746 | bounds_set = true; | |
747 | } | |
7c673cae | 748 | } |
7c673cae FG |
749 | |
750 | file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id); | |
751 | ||
752 | file_to_ingest->table_properties = *props; | |
753 | ||
1e59de90 TL |
754 | auto s = GetSstInternalUniqueId(props->db_id, props->db_session_id, |
755 | props->orig_file_number, | |
756 | &(file_to_ingest->unique_id)); | |
757 | if (!s.ok()) { | |
758 | ROCKS_LOG_WARN(db_options_.info_log, | |
759 | "Failed to get SST unique id for file %s", | |
760 | file_to_ingest->internal_file_path.c_str()); | |
761 | file_to_ingest->unique_id = kNullUniqueId64x2; | |
762 | } | |
763 | ||
7c673cae FG |
764 | return status; |
765 | } | |
766 | ||
7c673cae FG |
767 | Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile( |
768 | SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style, | |
f67539c2 TL |
769 | SequenceNumber last_seqno, IngestedFileInfo* file_to_ingest, |
770 | SequenceNumber* assigned_seqno) { | |
7c673cae FG |
771 | Status status; |
772 | *assigned_seqno = 0; | |
7c673cae FG |
773 | if (force_global_seqno) { |
774 | *assigned_seqno = last_seqno + 1; | |
f67539c2 | 775 | if (compaction_style == kCompactionStyleUniversal || files_overlap_) { |
1e59de90 TL |
776 | if (ingestion_options_.fail_if_not_bottommost_level) { |
777 | status = Status::TryAgain( | |
778 | "Files cannot be ingested to Lmax. Please make sure key range of " | |
779 | "Lmax does not overlap with files to ingest."); | |
780 | return status; | |
781 | } | |
7c673cae FG |
782 | file_to_ingest->picked_level = 0; |
783 | return status; | |
784 | } | |
785 | } | |
786 | ||
787 | bool overlap_with_db = false; | |
788 | Arena arena; | |
789 | ReadOptions ro; | |
790 | ro.total_order_seek = true; | |
7c673cae FG |
791 | int target_level = 0; |
792 | auto* vstorage = cfd_->current()->storage_info(); | |
11fdf7f2 | 793 | |
7c673cae FG |
794 | for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) { |
795 | if (lvl > 0 && lvl < vstorage->base_level()) { | |
796 | continue; | |
797 | } | |
798 | ||
799 | if (vstorage->NumLevelFiles(lvl) > 0) { | |
800 | bool overlap_with_level = false; | |
f67539c2 TL |
801 | status = sv->current->OverlapWithLevelIterator( |
802 | ro, env_options_, file_to_ingest->smallest_internal_key.user_key(), | |
803 | file_to_ingest->largest_internal_key.user_key(), lvl, | |
804 | &overlap_with_level); | |
7c673cae FG |
805 | if (!status.ok()) { |
806 | return status; | |
807 | } | |
808 | if (overlap_with_level) { | |
809 | // We must use L0 or any level higher than `lvl` to be able to overwrite | |
810 | // the keys that we overlap with in this level, We also need to assign | |
811 | // this file a seqno to overwrite the existing keys in level `lvl` | |
812 | overlap_with_db = true; | |
813 | break; | |
814 | } | |
815 | ||
816 | if (compaction_style == kCompactionStyleUniversal && lvl != 0) { | |
817 | const std::vector<FileMetaData*>& level_files = | |
818 | vstorage->LevelFiles(lvl); | |
819 | const SequenceNumber level_largest_seqno = | |
1e59de90 TL |
820 | (*std::max_element(level_files.begin(), level_files.end(), |
821 | [](FileMetaData* f1, FileMetaData* f2) { | |
822 | return f1->fd.largest_seqno < | |
823 | f2->fd.largest_seqno; | |
824 | })) | |
11fdf7f2 TL |
825 | ->fd.largest_seqno; |
826 | // should only assign seqno to current level's largest seqno when | |
827 | // the file fits | |
828 | if (level_largest_seqno != 0 && | |
829 | IngestedFileFitInLevel(file_to_ingest, lvl)) { | |
7c673cae FG |
830 | *assigned_seqno = level_largest_seqno; |
831 | } else { | |
832 | continue; | |
833 | } | |
834 | } | |
835 | } else if (compaction_style == kCompactionStyleUniversal) { | |
836 | continue; | |
837 | } | |
838 | ||
20effc67 | 839 | // We don't overlap with any keys in this level, but we still need to check |
7c673cae FG |
840 | // if our file can fit in it |
841 | if (IngestedFileFitInLevel(file_to_ingest, lvl)) { | |
842 | target_level = lvl; | |
843 | } | |
844 | } | |
f67539c2 TL |
845 | // If files overlap, we have to ingest them at level 0 and assign the newest |
846 | // sequence number | |
847 | if (files_overlap_) { | |
848 | target_level = 0; | |
849 | *assigned_seqno = last_seqno + 1; | |
850 | } | |
1e59de90 TL |
851 | |
852 | if (ingestion_options_.fail_if_not_bottommost_level && | |
853 | target_level < cfd_->NumberLevels() - 1) { | |
854 | status = Status::TryAgain( | |
855 | "Files cannot be ingested to Lmax. Please make sure key range of Lmax " | |
856 | "does not overlap with files to ingest."); | |
857 | return status; | |
858 | } | |
859 | ||
860 | TEST_SYNC_POINT_CALLBACK( | |
7c673cae FG |
861 | "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile", |
862 | &overlap_with_db); | |
863 | file_to_ingest->picked_level = target_level; | |
864 | if (overlap_with_db && *assigned_seqno == 0) { | |
865 | *assigned_seqno = last_seqno + 1; | |
866 | } | |
867 | return status; | |
868 | } | |
869 | ||
11fdf7f2 TL |
870 | Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile( |
871 | IngestedFileInfo* file_to_ingest) { | |
872 | auto* vstorage = cfd_->current()->storage_info(); | |
873 | // first check if new files fit in the bottommost level | |
874 | int bottom_lvl = cfd_->NumberLevels() - 1; | |
1e59de90 | 875 | if (!IngestedFileFitInLevel(file_to_ingest, bottom_lvl)) { |
11fdf7f2 | 876 | return Status::InvalidArgument( |
1e59de90 TL |
877 | "Can't ingest_behind file as it doesn't fit " |
878 | "at the bottommost level!"); | |
11fdf7f2 TL |
879 | } |
880 | ||
881 | // second check if despite allow_ingest_behind=true we still have 0 seqnums | |
882 | // at some upper level | |
883 | for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) { | |
884 | for (auto file : vstorage->LevelFiles(lvl)) { | |
885 | if (file->fd.smallest_seqno == 0) { | |
886 | return Status::InvalidArgument( | |
1e59de90 TL |
887 | "Can't ingest_behind file as despite allow_ingest_behind=true " |
888 | "there are files with 0 seqno in database at upper levels!"); | |
11fdf7f2 TL |
889 | } |
890 | } | |
891 | } | |
892 | ||
893 | file_to_ingest->picked_level = bottom_lvl; | |
894 | return Status::OK(); | |
895 | } | |
896 | ||
7c673cae FG |
897 | Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile( |
898 | IngestedFileInfo* file_to_ingest, SequenceNumber seqno) { | |
899 | if (file_to_ingest->original_seqno == seqno) { | |
900 | // This file already have the correct global seqno | |
901 | return Status::OK(); | |
902 | } else if (!ingestion_options_.allow_global_seqno) { | |
903 | return Status::InvalidArgument("Global seqno is required, but disabled"); | |
904 | } else if (file_to_ingest->global_seqno_offset == 0) { | |
905 | return Status::InvalidArgument( | |
20effc67 | 906 | "Trying to set global seqno for a file that don't have a global seqno " |
7c673cae FG |
907 | "field"); |
908 | } | |
909 | ||
11fdf7f2 TL |
910 | if (ingestion_options_.write_global_seqno) { |
911 | // Determine if we can write global_seqno to a given offset of file. | |
912 | // If the file system does not support random write, then we should not. | |
913 | // Otherwise we should. | |
f67539c2 | 914 | std::unique_ptr<FSRandomRWFile> rwfile; |
1e59de90 TL |
915 | Status status = fs_->NewRandomRWFile(file_to_ingest->internal_file_path, |
916 | env_options_, &rwfile, nullptr); | |
917 | TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::NewRandomRWFile", | |
918 | &status); | |
11fdf7f2 | 919 | if (status.ok()) { |
1e59de90 TL |
920 | FSRandomRWFilePtr fsptr(std::move(rwfile), io_tracer_, |
921 | file_to_ingest->internal_file_path); | |
11fdf7f2 TL |
922 | std::string seqno_val; |
923 | PutFixed64(&seqno_val, seqno); | |
20effc67 TL |
924 | status = fsptr->Write(file_to_ingest->global_seqno_offset, seqno_val, |
925 | IOOptions(), nullptr); | |
f67539c2 TL |
926 | if (status.ok()) { |
927 | TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno"); | |
20effc67 | 928 | status = SyncIngestedFile(fsptr.get()); |
f67539c2 TL |
929 | TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncGlobalSeqno"); |
930 | if (!status.ok()) { | |
931 | ROCKS_LOG_WARN(db_options_.info_log, | |
932 | "Failed to sync ingested file %s after writing global " | |
933 | "sequence number: %s", | |
934 | file_to_ingest->internal_file_path.c_str(), | |
935 | status.ToString().c_str()); | |
936 | } | |
937 | } | |
11fdf7f2 TL |
938 | if (!status.ok()) { |
939 | return status; | |
940 | } | |
941 | } else if (!status.IsNotSupported()) { | |
942 | return status; | |
7c673cae FG |
943 | } |
944 | } | |
945 | ||
11fdf7f2 TL |
946 | file_to_ingest->assigned_seqno = seqno; |
947 | return Status::OK(); | |
7c673cae FG |
948 | } |
949 | ||
20effc67 TL |
950 | IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile( |
951 | IngestedFileInfo* file_to_ingest) { | |
952 | if (db_options_.file_checksum_gen_factory == nullptr || | |
953 | need_generate_file_checksum_ == false || | |
954 | ingestion_options_.write_global_seqno == false) { | |
955 | // If file_checksum_gen_factory is not set, we are not able to generate | |
956 | // the checksum. if write_global_seqno is false, it means we will use | |
957 | // file checksum generated during Prepare(). This step will be skipped. | |
958 | return IOStatus::OK(); | |
959 | } | |
960 | std::string file_checksum; | |
961 | std::string file_checksum_func_name; | |
962 | std::string requested_checksum_func_name; | |
1e59de90 | 963 | // TODO: rate limit file reads for checksum calculation during file ingestion. |
20effc67 TL |
964 | IOStatus io_s = GenerateOneFileChecksum( |
965 | fs_.get(), file_to_ingest->internal_file_path, | |
966 | db_options_.file_checksum_gen_factory.get(), requested_checksum_func_name, | |
967 | &file_checksum, &file_checksum_func_name, | |
968 | ingestion_options_.verify_checksums_readahead_size, | |
1e59de90 TL |
969 | db_options_.allow_mmap_reads, io_tracer_, db_options_.rate_limiter.get(), |
970 | Env::IO_TOTAL /* rate_limiter_priority */); | |
20effc67 TL |
971 | if (!io_s.ok()) { |
972 | return io_s; | |
973 | } | |
974 | file_to_ingest->file_checksum = file_checksum; | |
975 | file_to_ingest->file_checksum_func_name = file_checksum_func_name; | |
976 | return IOStatus::OK(); | |
977 | } | |
978 | ||
7c673cae FG |
979 | bool ExternalSstFileIngestionJob::IngestedFileFitInLevel( |
980 | const IngestedFileInfo* file_to_ingest, int level) { | |
981 | if (level == 0) { | |
982 | // Files can always fit in L0 | |
983 | return true; | |
984 | } | |
985 | ||
986 | auto* vstorage = cfd_->current()->storage_info(); | |
f67539c2 TL |
987 | Slice file_smallest_user_key( |
988 | file_to_ingest->smallest_internal_key.user_key()); | |
989 | Slice file_largest_user_key(file_to_ingest->largest_internal_key.user_key()); | |
7c673cae FG |
990 | |
991 | if (vstorage->OverlapInLevel(level, &file_smallest_user_key, | |
992 | &file_largest_user_key)) { | |
993 | // File overlap with another files in this level, we cannot | |
994 | // add it to this level | |
995 | return false; | |
996 | } | |
997 | if (cfd_->RangeOverlapWithCompaction(file_smallest_user_key, | |
998 | file_largest_user_key, level)) { | |
999 | // File overlap with a running compaction output that will be stored | |
1000 | // in this level, we cannot add this file to this level | |
1001 | return false; | |
1002 | } | |
1003 | ||
1004 | // File did not overlap with level files, our compaction output | |
1005 | return true; | |
1006 | } | |
1007 | ||
f67539c2 TL |
1008 | template <typename TWritableFile> |
1009 | Status ExternalSstFileIngestionJob::SyncIngestedFile(TWritableFile* file) { | |
1010 | assert(file != nullptr); | |
1011 | if (db_options_.use_fsync) { | |
1012 | return file->Fsync(IOOptions(), nullptr); | |
1013 | } else { | |
1014 | return file->Sync(IOOptions(), nullptr); | |
1015 | } | |
1016 | } | |
1017 | ||
1018 | } // namespace ROCKSDB_NAMESPACE | |
7c673cae FG |
1019 | |
1020 | #endif // !ROCKSDB_LITE |