1 // Copyright (c) Meta Platforms, Inc. and affiliates.
3 // This source code is licensed under both the GPLv2 (found in the
4 // COPYING file in the root directory) and Apache 2.0 License
5 // (found in the LICENSE.Apache file in the root directory).
9 #include "db/import_column_family_job.h"
16 #include "db/version_edit.h"
17 #include "file/file_util.h"
18 #include "file/random_access_file_reader.h"
19 #include "logging/logging.h"
20 #include "table/merging_iterator.h"
21 #include "table/scoped_arena_iterator.h"
22 #include "table/sst_file_writer_collectors.h"
23 #include "table/table_builder.h"
24 #include "table/unique_id_impl.h"
25 #include "util/stop_watch.h"
27 namespace ROCKSDB_NAMESPACE
{
29 Status
ImportColumnFamilyJob::Prepare(uint64_t next_file_number
,
33 // Read the information of files we are importing
34 for (const auto& file_metadata
: metadata_
) {
35 const auto file_path
= file_metadata
.db_path
+ "/" + file_metadata
.name
;
36 IngestedFileInfo file_to_import
;
38 GetIngestedFileInfo(file_path
, next_file_number
++, &file_to_import
, sv
);
42 files_to_import_
.push_back(file_to_import
);
45 auto num_files
= files_to_import_
.size();
47 return Status::InvalidArgument("The list of files is empty");
48 } else if (num_files
> 1) {
49 // Verify that passed files don't have overlapping ranges in any particular
51 int min_level
= 1; // Check for overlaps in Level 1 and above.
53 for (const auto& file_metadata
: metadata_
) {
54 if (file_metadata
.level
> max_level
) {
55 max_level
= file_metadata
.level
;
58 for (int level
= min_level
; level
<= max_level
; ++level
) {
59 autovector
<const IngestedFileInfo
*> sorted_files
;
60 for (size_t i
= 0; i
< num_files
; i
++) {
61 if (metadata_
[i
].level
== level
) {
62 sorted_files
.push_back(&files_to_import_
[i
]);
67 sorted_files
.begin(), sorted_files
.end(),
68 [this](const IngestedFileInfo
* info1
, const IngestedFileInfo
* info2
) {
69 return cfd_
->internal_comparator().Compare(
70 info1
->smallest_internal_key
,
71 info2
->smallest_internal_key
) < 0;
74 for (size_t i
= 0; i
+ 1 < sorted_files
.size(); i
++) {
75 if (cfd_
->internal_comparator().Compare(
76 sorted_files
[i
]->largest_internal_key
,
77 sorted_files
[i
+ 1]->smallest_internal_key
) >= 0) {
78 return Status::InvalidArgument("Files have overlapping ranges");
84 for (const auto& f
: files_to_import_
) {
85 if (f
.num_entries
== 0) {
86 return Status::InvalidArgument("File contain no entries");
89 if (!f
.smallest_internal_key
.Valid() || !f
.largest_internal_key
.Valid()) {
90 return Status::Corruption("File has corrupted keys");
94 // Copy/Move external files into DB
95 auto hardlink_files
= import_options_
.move_files
;
96 for (auto& f
: files_to_import_
) {
97 const auto path_outside_db
= f
.external_file_path
;
98 const auto path_inside_db
= TableFileName(
99 cfd_
->ioptions()->cf_paths
, f
.fd
.GetNumber(), f
.fd
.GetPathId());
101 if (hardlink_files
) {
103 fs_
->LinkFile(path_outside_db
, path_inside_db
, IOOptions(), nullptr);
104 if (status
.IsNotSupported()) {
105 // Original file is on a different FS, use copy instead of hard linking
106 hardlink_files
= false;
107 ROCKS_LOG_INFO(db_options_
.info_log
,
108 "Try to link file %s but it's not supported : %s",
109 f
.internal_file_path
.c_str(), status
.ToString().c_str());
112 if (!hardlink_files
) {
114 CopyFile(fs_
.get(), path_outside_db
, path_inside_db
, 0,
115 db_options_
.use_fsync
, io_tracer_
, Temperature::kUnknown
);
120 f
.copy_file
= !hardlink_files
;
121 f
.internal_file_path
= path_inside_db
;
125 // We failed, remove all files that we copied into the db
126 for (const auto& f
: files_to_import_
) {
127 if (f
.internal_file_path
.empty()) {
131 fs_
->DeleteFile(f
.internal_file_path
, IOOptions(), nullptr);
133 ROCKS_LOG_WARN(db_options_
.info_log
,
134 "AddFile() clean up for file %s failed : %s",
135 f
.internal_file_path
.c_str(), s
.ToString().c_str());
143 // REQUIRES: we have become the only writer by entering both write_thread_ and
144 // nonmem_write_thread_
145 Status
ImportColumnFamilyJob::Run() {
147 edit_
.SetColumnFamily(cfd_
->GetID());
149 // We use the import time as the ancester time. This is the time the data
150 // is written to the database.
151 int64_t temp_current_time
= 0;
152 uint64_t oldest_ancester_time
= kUnknownOldestAncesterTime
;
153 uint64_t current_time
= kUnknownOldestAncesterTime
;
154 if (clock_
->GetCurrentTime(&temp_current_time
).ok()) {
155 current_time
= oldest_ancester_time
=
156 static_cast<uint64_t>(temp_current_time
);
159 for (size_t i
= 0; i
< files_to_import_
.size(); ++i
) {
160 const auto& f
= files_to_import_
[i
];
161 const auto& file_metadata
= metadata_
[i
];
163 edit_
.AddFile(file_metadata
.level
, f
.fd
.GetNumber(), f
.fd
.GetPathId(),
164 f
.fd
.GetFileSize(), f
.smallest_internal_key
,
165 f
.largest_internal_key
, file_metadata
.smallest_seqno
,
166 file_metadata
.largest_seqno
, false, file_metadata
.temperature
,
167 kInvalidBlobFileNumber
, oldest_ancester_time
, current_time
,
168 kUnknownFileChecksum
, kUnknownFileChecksumFuncName
,
171 // If incoming sequence number is higher, update local sequence number.
172 if (file_metadata
.largest_seqno
> versions_
->LastSequence()) {
173 versions_
->SetLastAllocatedSequence(file_metadata
.largest_seqno
);
174 versions_
->SetLastPublishedSequence(file_metadata
.largest_seqno
);
175 versions_
->SetLastSequence(file_metadata
.largest_seqno
);
182 void ImportColumnFamilyJob::Cleanup(const Status
& status
) {
184 // We failed to add files to the database remove all the files we copied.
185 for (const auto& f
: files_to_import_
) {
187 fs_
->DeleteFile(f
.internal_file_path
, IOOptions(), nullptr);
189 ROCKS_LOG_WARN(db_options_
.info_log
,
190 "AddFile() clean up for file %s failed : %s",
191 f
.internal_file_path
.c_str(), s
.ToString().c_str());
194 } else if (status
.ok() && import_options_
.move_files
) {
195 // The files were moved and added successfully, remove original file links
196 for (IngestedFileInfo
& f
: files_to_import_
) {
198 fs_
->DeleteFile(f
.external_file_path
, IOOptions(), nullptr);
201 db_options_
.info_log
,
202 "%s was added to DB successfully but failed to remove original "
204 f
.external_file_path
.c_str(), s
.ToString().c_str());
210 Status
ImportColumnFamilyJob::GetIngestedFileInfo(
211 const std::string
& external_file
, uint64_t new_file_number
,
212 IngestedFileInfo
* file_to_import
, SuperVersion
* sv
) {
213 file_to_import
->external_file_path
= external_file
;
215 // Get external file size
216 Status status
= fs_
->GetFileSize(external_file
, IOOptions(),
217 &file_to_import
->file_size
, nullptr);
222 // Assign FD with number
224 FileDescriptor(new_file_number
, 0, file_to_import
->file_size
);
226 // Create TableReader for external file
227 std::unique_ptr
<TableReader
> table_reader
;
228 std::unique_ptr
<FSRandomAccessFile
> sst_file
;
229 std::unique_ptr
<RandomAccessFileReader
> sst_file_reader
;
232 fs_
->NewRandomAccessFile(external_file
, env_options_
, &sst_file
, nullptr);
236 sst_file_reader
.reset(new RandomAccessFileReader(
237 std::move(sst_file
), external_file
, nullptr /*Env*/, io_tracer_
));
239 status
= cfd_
->ioptions()->table_factory
->NewTableReader(
241 *cfd_
->ioptions(), sv
->mutable_cf_options
.prefix_extractor
,
242 env_options_
, cfd_
->internal_comparator(),
243 /*skip_filters*/ false, /*immortal*/ false,
244 /*force_direct_prefetch*/ false, /*level*/ -1,
245 /*block_cache_tracer*/ nullptr,
246 /*max_file_size_for_l0_meta_pin*/ 0, versions_
->DbSessionId(),
247 /*cur_file_num*/ new_file_number
),
248 std::move(sst_file_reader
), file_to_import
->file_size
, &table_reader
);
253 // Get the external file properties
254 auto props
= table_reader
->GetTableProperties();
256 // Set original_seqno to 0.
257 file_to_import
->original_seqno
= 0;
259 // Get number of entries in table
260 file_to_import
->num_entries
= props
->num_entries
;
262 ParsedInternalKey key
;
264 // During reading the external file we can cache blocks that we read into
265 // the block cache, if we later change the global seqno of this file, we will
266 // have block in cache that will include keys with wrong seqno.
267 // We need to disable fill_cache so that we read from the file without
268 // updating the block cache.
269 ro
.fill_cache
= false;
270 std::unique_ptr
<InternalIterator
> iter(table_reader
->NewIterator(
271 ro
, sv
->mutable_cf_options
.prefix_extractor
.get(), /*arena=*/nullptr,
272 /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion
));
274 // Get first (smallest) key from file
277 ParseInternalKey(iter
->key(), &key
, db_options_
.allow_data_in_errors
);
278 if (!pik_status
.ok()) {
279 return Status::Corruption("Corrupted Key in external file. ",
280 pik_status
.getState());
282 file_to_import
->smallest_internal_key
.SetFrom(key
);
284 // Get last (largest) key from file
287 ParseInternalKey(iter
->key(), &key
, db_options_
.allow_data_in_errors
);
288 if (!pik_status
.ok()) {
289 return Status::Corruption("Corrupted Key in external file. ",
290 pik_status
.getState());
292 file_to_import
->largest_internal_key
.SetFrom(key
);
294 file_to_import
->cf_id
= static_cast<uint32_t>(props
->column_family_id
);
296 file_to_import
->table_properties
= *props
;
298 auto s
= GetSstInternalUniqueId(props
->db_id
, props
->db_session_id
,
299 props
->orig_file_number
,
300 &(file_to_import
->unique_id
));
302 ROCKS_LOG_WARN(db_options_
.info_log
,
303 "Failed to get SST unique id for file %s",
304 file_to_import
->internal_file_path
.c_str());
310 } // namespace ROCKSDB_NAMESPACE
312 #endif // !ROCKSDB_LITE