1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/version_builder.h"
12 #ifndef __STDC_FORMAT_MACROS
13 #define __STDC_FORMAT_MACROS
23 #include <unordered_map>
24 #include <unordered_set>
28 #include "db/dbformat.h"
29 #include "db/internal_stats.h"
30 #include "db/table_cache.h"
31 #include "db/version_set.h"
32 #include "port/port.h"
33 #include "table/table_reader.h"
37 bool NewestFirstBySeqNo(FileMetaData
* a
, FileMetaData
* b
) {
38 if (a
->fd
.largest_seqno
!= b
->fd
.largest_seqno
) {
39 return a
->fd
.largest_seqno
> b
->fd
.largest_seqno
;
41 if (a
->fd
.smallest_seqno
!= b
->fd
.smallest_seqno
) {
42 return a
->fd
.smallest_seqno
> b
->fd
.smallest_seqno
;
44 // Break ties by file number
45 return a
->fd
.GetNumber() > b
->fd
.GetNumber();
49 bool BySmallestKey(FileMetaData
* a
, FileMetaData
* b
,
50 const InternalKeyComparator
* cmp
) {
51 int r
= cmp
->Compare(a
->smallest
, b
->smallest
);
55 // Break ties by file number
56 return (a
->fd
.GetNumber() < b
->fd
.GetNumber());
60 class VersionBuilder::Rep
{
62 // Helper to sort files_ in v
63 // kLevel0 -- NewestFirstBySeqNo
64 // kLevelNon0 -- BySmallestKey
65 struct FileComparator
{
66 enum SortMethod
{ kLevel0
= 0, kLevelNon0
= 1, } sort_method
;
67 const InternalKeyComparator
* internal_comparator
;
69 FileComparator() : internal_comparator(nullptr) {}
71 bool operator()(FileMetaData
* f1
, FileMetaData
* f2
) const {
72 switch (sort_method
) {
74 return NewestFirstBySeqNo(f1
, f2
);
76 return BySmallestKey(f1
, f2
, internal_comparator
);
84 std::unordered_set
<uint64_t> deleted_files
;
85 // Map from file number to file meta data.
86 std::unordered_map
<uint64_t, FileMetaData
*> added_files
;
89 const EnvOptions
& env_options_
;
91 TableCache
* table_cache_
;
92 VersionStorageInfo
* base_vstorage_
;
95 // Store states of levels larger than num_levels_. We do this instead of
96 // storing them in levels_ to avoid regression in case there are no files
97 // on invalid levels. The version is not consistent if in the end the files
98 // on invalid levels don't cancel out.
99 std::map
<int, std::unordered_set
<uint64_t>> invalid_levels_
;
100 // Whether there are invalid new files or invalid deletion on levels larger
102 bool has_invalid_levels_
;
103 FileComparator level_zero_cmp_
;
104 FileComparator level_nonzero_cmp_
;
107 Rep(const EnvOptions
& env_options
, Logger
* info_log
, TableCache
* table_cache
,
108 VersionStorageInfo
* base_vstorage
)
109 : env_options_(env_options
),
111 table_cache_(table_cache
),
112 base_vstorage_(base_vstorage
),
113 num_levels_(base_vstorage
->num_levels()),
114 has_invalid_levels_(false) {
115 levels_
= new LevelState
[num_levels_
];
116 level_zero_cmp_
.sort_method
= FileComparator::kLevel0
;
117 level_nonzero_cmp_
.sort_method
= FileComparator::kLevelNon0
;
118 level_nonzero_cmp_
.internal_comparator
=
119 base_vstorage_
->InternalComparator();
123 for (int level
= 0; level
< num_levels_
; level
++) {
124 const auto& added
= levels_
[level
].added_files
;
125 for (auto& pair
: added
) {
126 UnrefFile(pair
.second
);
133 void UnrefFile(FileMetaData
* f
) {
136 if (f
->table_reader_handle
) {
137 assert(table_cache_
!= nullptr);
138 table_cache_
->ReleaseHandle(f
->table_reader_handle
);
139 f
->table_reader_handle
= nullptr;
145 void CheckConsistency(VersionStorageInfo
* vstorage
) {
147 if (!vstorage
->force_consistency_checks()) {
148 // Dont run consistency checks in release mode except if
149 // explicitly asked to
153 // make sure the files are sorted correctly
154 for (int level
= 0; level
< num_levels_
; level
++) {
155 auto& level_files
= vstorage
->LevelFiles(level
);
156 for (size_t i
= 1; i
< level_files
.size(); i
++) {
157 auto f1
= level_files
[i
- 1];
158 auto f2
= level_files
[i
];
160 if (!level_zero_cmp_(f1
, f2
)) {
161 fprintf(stderr
, "L0 files are not sorted properly");
165 if (f2
->fd
.smallest_seqno
== f2
->fd
.largest_seqno
) {
166 // This is an external file that we ingested
167 SequenceNumber external_file_seqno
= f2
->fd
.smallest_seqno
;
168 if (!(external_file_seqno
< f1
->fd
.largest_seqno
||
169 external_file_seqno
== 0)) {
171 "L0 file with seqno %" PRIu64
" %" PRIu64
172 " vs. file with global_seqno %" PRIu64
"\n",
173 f1
->fd
.smallest_seqno
, f1
->fd
.largest_seqno
,
174 external_file_seqno
);
177 } else if (f1
->fd
.smallest_seqno
<= f2
->fd
.smallest_seqno
) {
179 "L0 files seqno %" PRIu64
" %" PRIu64
" vs. %" PRIu64
181 f1
->fd
.smallest_seqno
, f1
->fd
.largest_seqno
,
182 f2
->fd
.smallest_seqno
, f2
->fd
.largest_seqno
);
186 if (!level_nonzero_cmp_(f1
, f2
)) {
187 fprintf(stderr
, "L%d files are not sorted properly", level
);
191 // Make sure there is no overlap in levels > 0
192 if (vstorage
->InternalComparator()->Compare(f1
->largest
,
193 f2
->smallest
) >= 0) {
194 fprintf(stderr
, "L%d have overlapping ranges %s vs. %s\n", level
,
195 (f1
->largest
).DebugString(true).c_str(),
196 (f2
->smallest
).DebugString(true).c_str());
204 void CheckConsistencyForDeletes(VersionEdit
* /*edit*/, uint64_t number
,
207 if (!base_vstorage_
->force_consistency_checks()) {
208 // Dont run consistency checks in release mode except if
209 // explicitly asked to
213 // a file to be deleted better exist in the previous version
215 for (int l
= 0; !found
&& l
< num_levels_
; l
++) {
216 const std::vector
<FileMetaData
*>& base_files
=
217 base_vstorage_
->LevelFiles(l
);
218 for (size_t i
= 0; i
< base_files
.size(); i
++) {
219 FileMetaData
* f
= base_files
[i
];
220 if (f
->fd
.GetNumber() == number
) {
226 // if the file did not exist in the previous version, then it
227 // is possibly moved from lower level to higher level in current
229 for (int l
= level
+ 1; !found
&& l
< num_levels_
; l
++) {
230 auto& level_added
= levels_
[l
].added_files
;
231 auto got
= level_added
.find(number
);
232 if (got
!= level_added
.end()) {
238 // maybe this file was added in a previous edit that was Applied
240 auto& level_added
= levels_
[level
].added_files
;
241 auto got
= level_added
.find(number
);
242 if (got
!= level_added
.end()) {
247 fprintf(stderr
, "not found %" PRIu64
"\n", number
);
252 bool CheckConsistencyForNumLevels() {
253 // Make sure there are no files on or beyond num_levels().
254 if (has_invalid_levels_
) {
257 for (auto& level
: invalid_levels_
) {
258 if (level
.second
.size() > 0) {
265 // Apply all of the edits in *edit to the current state.
266 void Apply(VersionEdit
* edit
) {
267 CheckConsistency(base_vstorage_
);
270 const VersionEdit::DeletedFileSet
& del
= edit
->GetDeletedFiles();
271 for (const auto& del_file
: del
) {
272 const auto level
= del_file
.first
;
273 const auto number
= del_file
.second
;
274 if (level
< num_levels_
) {
275 levels_
[level
].deleted_files
.insert(number
);
276 CheckConsistencyForDeletes(edit
, number
, level
);
278 auto exising
= levels_
[level
].added_files
.find(number
);
279 if (exising
!= levels_
[level
].added_files
.end()) {
280 UnrefFile(exising
->second
);
281 levels_
[level
].added_files
.erase(exising
);
284 auto exising
= invalid_levels_
[level
].find(number
);
285 if (exising
!= invalid_levels_
[level
].end()) {
286 invalid_levels_
[level
].erase(exising
);
288 // Deleting an non-existing file on invalid level.
289 has_invalid_levels_
= true;
295 for (const auto& new_file
: edit
->GetNewFiles()) {
296 const int level
= new_file
.first
;
297 if (level
< num_levels_
) {
298 FileMetaData
* f
= new FileMetaData(new_file
.second
);
301 assert(levels_
[level
].added_files
.find(f
->fd
.GetNumber()) ==
302 levels_
[level
].added_files
.end());
303 levels_
[level
].deleted_files
.erase(f
->fd
.GetNumber());
304 levels_
[level
].added_files
[f
->fd
.GetNumber()] = f
;
306 uint64_t number
= new_file
.second
.fd
.GetNumber();
307 if (invalid_levels_
[level
].count(number
) == 0) {
308 invalid_levels_
[level
].insert(number
);
310 // Creating an already existing file on invalid level.
311 has_invalid_levels_
= true;
317 // Save the current state in *v.
318 void SaveTo(VersionStorageInfo
* vstorage
) {
319 CheckConsistency(base_vstorage_
);
320 CheckConsistency(vstorage
);
322 for (int level
= 0; level
< num_levels_
; level
++) {
323 const auto& cmp
= (level
== 0) ? level_zero_cmp_
: level_nonzero_cmp_
;
324 // Merge the set of added files with the set of pre-existing files.
325 // Drop any deleted files. Store the result in *v.
326 const auto& base_files
= base_vstorage_
->LevelFiles(level
);
327 const auto& unordered_added_files
= levels_
[level
].added_files
;
328 vstorage
->Reserve(level
,
329 base_files
.size() + unordered_added_files
.size());
331 // Sort added files for the level.
332 std::vector
<FileMetaData
*> added_files
;
333 added_files
.reserve(unordered_added_files
.size());
334 for (const auto& pair
: unordered_added_files
) {
335 added_files
.push_back(pair
.second
);
337 std::sort(added_files
.begin(), added_files
.end(), cmp
);
340 FileMetaData
* prev_added_file
= nullptr;
341 for (const auto& added
: added_files
) {
342 if (level
> 0 && prev_added_file
!= nullptr) {
343 assert(base_vstorage_
->InternalComparator()->Compare(
344 prev_added_file
->smallest
, added
->smallest
) <= 0);
346 prev_added_file
= added
;
350 auto base_iter
= base_files
.begin();
351 auto base_end
= base_files
.end();
352 auto added_iter
= added_files
.begin();
353 auto added_end
= added_files
.end();
354 while (added_iter
!= added_end
|| base_iter
!= base_end
) {
355 if (base_iter
== base_end
||
356 (added_iter
!= added_end
&& cmp(*added_iter
, *base_iter
))) {
357 MaybeAddFile(vstorage
, level
, *added_iter
++);
359 MaybeAddFile(vstorage
, level
, *base_iter
++);
364 CheckConsistency(vstorage
);
367 Status
LoadTableHandlers(InternalStats
* internal_stats
, int max_threads
,
368 bool prefetch_index_and_filter_in_cache
,
369 bool is_initial_load
,
370 const SliceTransform
* prefix_extractor
) {
371 assert(table_cache_
!= nullptr);
373 size_t table_cache_capacity
= table_cache_
->get_cache()->GetCapacity();
374 bool always_load
= (table_cache_capacity
== TableCache::kInfiniteCapacity
);
375 size_t max_load
= port::kMaxSizet
;
378 // If it is initial loading and not set to always laoding all the
379 // files, we only load up to kInitialLoadLimit files, to limit the
380 // time reopening the DB.
381 const size_t kInitialLoadLimit
= 16;
383 // If the table cache is not 1/4 full, we pin the table handle to
384 // file metadata to avoid the cache read costs when reading the file.
385 // The downside of pinning those files is that LRU won't be followed
386 // for those files. This doesn't matter much because if number of files
387 // of the DB excceeds table cache capacity, eventually no table reader
388 // will be pinned and LRU will be followed.
389 if (is_initial_load
) {
390 load_limit
= std::min(kInitialLoadLimit
, table_cache_capacity
/ 4);
392 load_limit
= table_cache_capacity
/ 4;
395 size_t table_cache_usage
= table_cache_
->get_cache()->GetUsage();
396 if (table_cache_usage
>= load_limit
) {
397 // TODO (yanqin) find a suitable status code.
400 max_load
= load_limit
- table_cache_usage
;
404 // <file metadata, level>
405 std::vector
<std::pair
<FileMetaData
*, int>> files_meta
;
406 std::vector
<Status
> statuses
;
407 for (int level
= 0; level
< num_levels_
; level
++) {
408 for (auto& file_meta_pair
: levels_
[level
].added_files
) {
409 auto* file_meta
= file_meta_pair
.second
;
410 // If the file has been opened before, just skip it.
411 if (!file_meta
->table_reader_handle
) {
412 files_meta
.emplace_back(file_meta
, level
);
413 statuses
.emplace_back(Status::OK());
415 if (files_meta
.size() >= max_load
) {
419 if (files_meta
.size() >= max_load
) {
424 std::atomic
<size_t> next_file_meta_idx(0);
425 std::function
<void()> load_handlers_func([&]() {
427 size_t file_idx
= next_file_meta_idx
.fetch_add(1);
428 if (file_idx
>= files_meta
.size()) {
432 auto* file_meta
= files_meta
[file_idx
].first
;
433 int level
= files_meta
[file_idx
].second
;
434 statuses
[file_idx
] = table_cache_
->FindTable(
435 env_options_
, *(base_vstorage_
->InternalComparator()),
436 file_meta
->fd
, &file_meta
->table_reader_handle
, prefix_extractor
,
437 false /*no_io */, true /* record_read_stats */,
438 internal_stats
->GetFileReadHist(level
), false, level
,
439 prefetch_index_and_filter_in_cache
);
440 if (file_meta
->table_reader_handle
!= nullptr) {
442 file_meta
->fd
.table_reader
= table_cache_
->GetTableReaderFromHandle(
443 file_meta
->table_reader_handle
);
448 std::vector
<port::Thread
> threads
;
449 for (int i
= 1; i
< max_threads
; i
++) {
450 threads
.emplace_back(load_handlers_func
);
452 load_handlers_func();
453 for (auto& t
: threads
) {
456 for (const auto& s
: statuses
) {
464 void MaybeAddFile(VersionStorageInfo
* vstorage
, int level
, FileMetaData
* f
) {
465 if (levels_
[level
].deleted_files
.count(f
->fd
.GetNumber()) > 0) {
466 // f is to-be-deleted table file
467 vstorage
->RemoveCurrentStats(f
);
469 vstorage
->AddFile(level
, f
, info_log_
);
474 VersionBuilder::VersionBuilder(const EnvOptions
& env_options
,
475 TableCache
* table_cache
,
476 VersionStorageInfo
* base_vstorage
,
478 : rep_(new Rep(env_options
, info_log
, table_cache
, base_vstorage
)) {}
480 VersionBuilder::~VersionBuilder() { delete rep_
; }
482 void VersionBuilder::CheckConsistency(VersionStorageInfo
* vstorage
) {
483 rep_
->CheckConsistency(vstorage
);
486 void VersionBuilder::CheckConsistencyForDeletes(VersionEdit
* edit
,
487 uint64_t number
, int level
) {
488 rep_
->CheckConsistencyForDeletes(edit
, number
, level
);
491 bool VersionBuilder::CheckConsistencyForNumLevels() {
492 return rep_
->CheckConsistencyForNumLevels();
495 void VersionBuilder::Apply(VersionEdit
* edit
) { rep_
->Apply(edit
); }
497 void VersionBuilder::SaveTo(VersionStorageInfo
* vstorage
) {
498 rep_
->SaveTo(vstorage
);
501 Status
VersionBuilder::LoadTableHandlers(
502 InternalStats
* internal_stats
, int max_threads
,
503 bool prefetch_index_and_filter_in_cache
, bool is_initial_load
,
504 const SliceTransform
* prefix_extractor
) {
505 return rep_
->LoadTableHandlers(internal_stats
, max_threads
,
506 prefetch_index_and_filter_in_cache
,
507 is_initial_load
, prefix_extractor
);
510 void VersionBuilder::MaybeAddFile(VersionStorageInfo
* vstorage
, int level
,
512 rep_
->MaybeAddFile(vstorage
, level
, f
);
515 } // namespace rocksdb