1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/version_builder.h"
12 #ifndef __STDC_FORMAT_MACROS
13 #define __STDC_FORMAT_MACROS
22 #include <unordered_map>
23 #include <unordered_set>
27 #include "db/dbformat.h"
28 #include "db/internal_stats.h"
29 #include "db/table_cache.h"
30 #include "db/version_set.h"
31 #include "port/port.h"
32 #include "table/table_reader.h"
36 bool NewestFirstBySeqNo(FileMetaData
* a
, FileMetaData
* b
) {
37 if (a
->largest_seqno
!= b
->largest_seqno
) {
38 return a
->largest_seqno
> b
->largest_seqno
;
40 if (a
->smallest_seqno
!= b
->smallest_seqno
) {
41 return a
->smallest_seqno
> b
->smallest_seqno
;
43 // Break ties by file number
44 return a
->fd
.GetNumber() > b
->fd
.GetNumber();
48 bool BySmallestKey(FileMetaData
* a
, FileMetaData
* b
,
49 const InternalKeyComparator
* cmp
) {
50 int r
= cmp
->Compare(a
->smallest
, b
->smallest
);
54 // Break ties by file number
55 return (a
->fd
.GetNumber() < b
->fd
.GetNumber());
59 class VersionBuilder::Rep
{
61 // Helper to sort files_ in v
62 // kLevel0 -- NewestFirstBySeqNo
63 // kLevelNon0 -- BySmallestKey
64 struct FileComparator
{
65 enum SortMethod
{ kLevel0
= 0, kLevelNon0
= 1, } sort_method
;
66 const InternalKeyComparator
* internal_comparator
;
68 bool operator()(FileMetaData
* f1
, FileMetaData
* f2
) const {
69 switch (sort_method
) {
71 return NewestFirstBySeqNo(f1
, f2
);
73 return BySmallestKey(f1
, f2
, internal_comparator
);
81 std::unordered_set
<uint64_t> deleted_files
;
82 // Map from file number to file meta data.
83 std::unordered_map
<uint64_t, FileMetaData
*> added_files
;
86 const EnvOptions
& env_options_
;
88 TableCache
* table_cache_
;
89 VersionStorageInfo
* base_vstorage_
;
91 FileComparator level_zero_cmp_
;
92 FileComparator level_nonzero_cmp_
;
95 Rep(const EnvOptions
& env_options
, Logger
* info_log
, TableCache
* table_cache
,
96 VersionStorageInfo
* base_vstorage
)
97 : env_options_(env_options
),
99 table_cache_(table_cache
),
100 base_vstorage_(base_vstorage
) {
101 levels_
= new LevelState
[base_vstorage_
->num_levels()];
102 level_zero_cmp_
.sort_method
= FileComparator::kLevel0
;
103 level_nonzero_cmp_
.sort_method
= FileComparator::kLevelNon0
;
104 level_nonzero_cmp_
.internal_comparator
=
105 base_vstorage_
->InternalComparator();
109 for (int level
= 0; level
< base_vstorage_
->num_levels(); level
++) {
110 const auto& added
= levels_
[level
].added_files
;
111 for (auto& pair
: added
) {
112 UnrefFile(pair
.second
);
119 void UnrefFile(FileMetaData
* f
) {
122 if (f
->table_reader_handle
) {
123 assert(table_cache_
!= nullptr);
124 table_cache_
->ReleaseHandle(f
->table_reader_handle
);
125 f
->table_reader_handle
= nullptr;
131 void CheckConsistency(VersionStorageInfo
* vstorage
) {
133 if (!vstorage
->force_consistency_checks()) {
134 // Dont run consistency checks in release mode except if
135 // explicitly asked to
139 // make sure the files are sorted correctly
140 for (int level
= 0; level
< vstorage
->num_levels(); level
++) {
141 auto& level_files
= vstorage
->LevelFiles(level
);
142 for (size_t i
= 1; i
< level_files
.size(); i
++) {
143 auto f1
= level_files
[i
- 1];
144 auto f2
= level_files
[i
];
146 if (!level_zero_cmp_(f1
, f2
)) {
147 fprintf(stderr
, "L0 files are not sorted properly");
151 if (f2
->smallest_seqno
== f2
->largest_seqno
) {
152 // This is an external file that we ingested
153 SequenceNumber external_file_seqno
= f2
->smallest_seqno
;
154 if (!(external_file_seqno
< f1
->largest_seqno
||
155 external_file_seqno
== 0)) {
156 fprintf(stderr
, "L0 file with seqno %" PRIu64
" %" PRIu64
157 " vs. file with global_seqno %" PRIu64
"\n",
158 f1
->smallest_seqno
, f1
->largest_seqno
,
159 external_file_seqno
);
162 } else if (f1
->smallest_seqno
<= f2
->smallest_seqno
) {
163 fprintf(stderr
, "L0 files seqno %" PRIu64
" %" PRIu64
164 " vs. %" PRIu64
" %" PRIu64
"\n",
165 f1
->smallest_seqno
, f1
->largest_seqno
, f2
->smallest_seqno
,
170 if (!level_nonzero_cmp_(f1
, f2
)) {
171 fprintf(stderr
, "L%d files are not sorted properly", level
);
175 // Make sure there is no overlap in levels > 0
176 if (vstorage
->InternalComparator()->Compare(f1
->largest
,
177 f2
->smallest
) >= 0) {
178 fprintf(stderr
, "L%d have overlapping ranges %s vs. %s\n", level
,
179 (f1
->largest
).DebugString(true).c_str(),
180 (f2
->smallest
).DebugString(true).c_str());
188 void CheckConsistencyForDeletes(VersionEdit
* edit
, uint64_t number
,
191 if (!base_vstorage_
->force_consistency_checks()) {
192 // Dont run consistency checks in release mode except if
193 // explicitly asked to
197 // a file to be deleted better exist in the previous version
199 for (int l
= 0; !found
&& l
< base_vstorage_
->num_levels(); l
++) {
200 const std::vector
<FileMetaData
*>& base_files
=
201 base_vstorage_
->LevelFiles(l
);
202 for (size_t i
= 0; i
< base_files
.size(); i
++) {
203 FileMetaData
* f
= base_files
[i
];
204 if (f
->fd
.GetNumber() == number
) {
210 // if the file did not exist in the previous version, then it
211 // is possibly moved from lower level to higher level in current
213 for (int l
= level
+ 1; !found
&& l
< base_vstorage_
->num_levels(); l
++) {
214 auto& level_added
= levels_
[l
].added_files
;
215 auto got
= level_added
.find(number
);
216 if (got
!= level_added
.end()) {
222 // maybe this file was added in a previous edit that was Applied
224 auto& level_added
= levels_
[level
].added_files
;
225 auto got
= level_added
.find(number
);
226 if (got
!= level_added
.end()) {
231 fprintf(stderr
, "not found %" PRIu64
"\n", number
);
236 // Apply all of the edits in *edit to the current state.
237 void Apply(VersionEdit
* edit
) {
238 CheckConsistency(base_vstorage_
);
241 const VersionEdit::DeletedFileSet
& del
= edit
->GetDeletedFiles();
242 for (const auto& del_file
: del
) {
243 const auto level
= del_file
.first
;
244 const auto number
= del_file
.second
;
245 levels_
[level
].deleted_files
.insert(number
);
246 CheckConsistencyForDeletes(edit
, number
, level
);
248 auto exising
= levels_
[level
].added_files
.find(number
);
249 if (exising
!= levels_
[level
].added_files
.end()) {
250 UnrefFile(exising
->second
);
251 levels_
[level
].added_files
.erase(number
);
256 for (const auto& new_file
: edit
->GetNewFiles()) {
257 const int level
= new_file
.first
;
258 FileMetaData
* f
= new FileMetaData(new_file
.second
);
261 assert(levels_
[level
].added_files
.find(f
->fd
.GetNumber()) ==
262 levels_
[level
].added_files
.end());
263 levels_
[level
].deleted_files
.erase(f
->fd
.GetNumber());
264 levels_
[level
].added_files
[f
->fd
.GetNumber()] = f
;
268 // Save the current state in *v.
269 void SaveTo(VersionStorageInfo
* vstorage
) {
270 CheckConsistency(base_vstorage_
);
271 CheckConsistency(vstorage
);
273 for (int level
= 0; level
< base_vstorage_
->num_levels(); level
++) {
274 const auto& cmp
= (level
== 0) ? level_zero_cmp_
: level_nonzero_cmp_
;
275 // Merge the set of added files with the set of pre-existing files.
276 // Drop any deleted files. Store the result in *v.
277 const auto& base_files
= base_vstorage_
->LevelFiles(level
);
278 auto base_iter
= base_files
.begin();
279 auto base_end
= base_files
.end();
280 const auto& unordered_added_files
= levels_
[level
].added_files
;
281 vstorage
->Reserve(level
,
282 base_files
.size() + unordered_added_files
.size());
284 // Sort added files for the level.
285 std::vector
<FileMetaData
*> added_files
;
286 added_files
.reserve(unordered_added_files
.size());
287 for (const auto& pair
: unordered_added_files
) {
288 added_files
.push_back(pair
.second
);
290 std::sort(added_files
.begin(), added_files
.end(), cmp
);
293 FileMetaData
* prev_file
= nullptr;
296 for (const auto& added
: added_files
) {
298 if (level
> 0 && prev_file
!= nullptr) {
299 assert(base_vstorage_
->InternalComparator()->Compare(
300 prev_file
->smallest
, added
->smallest
) <= 0);
305 // Add all smaller files listed in base_
306 for (auto bpos
= std::upper_bound(base_iter
, base_end
, added
, cmp
);
307 base_iter
!= bpos
; ++base_iter
) {
308 MaybeAddFile(vstorage
, level
, *base_iter
);
311 MaybeAddFile(vstorage
, level
, added
);
314 // Add remaining base files
315 for (; base_iter
!= base_end
; ++base_iter
) {
316 MaybeAddFile(vstorage
, level
, *base_iter
);
320 CheckConsistency(vstorage
);
323 void LoadTableHandlers(InternalStats
* internal_stats
, int max_threads
,
324 bool prefetch_index_and_filter_in_cache
) {
325 assert(table_cache_
!= nullptr);
326 // <file metadata, level>
327 std::vector
<std::pair
<FileMetaData
*, int>> files_meta
;
328 for (int level
= 0; level
< base_vstorage_
->num_levels(); level
++) {
329 for (auto& file_meta_pair
: levels_
[level
].added_files
) {
330 auto* file_meta
= file_meta_pair
.second
;
331 assert(!file_meta
->table_reader_handle
);
332 files_meta
.emplace_back(file_meta
, level
);
336 std::atomic
<size_t> next_file_meta_idx(0);
337 std::function
<void()> load_handlers_func
= [&]() {
339 size_t file_idx
= next_file_meta_idx
.fetch_add(1);
340 if (file_idx
>= files_meta
.size()) {
344 auto* file_meta
= files_meta
[file_idx
].first
;
345 int level
= files_meta
[file_idx
].second
;
346 table_cache_
->FindTable(env_options_
,
347 *(base_vstorage_
->InternalComparator()),
348 file_meta
->fd
, &file_meta
->table_reader_handle
,
349 false /*no_io */, true /* record_read_stats */,
350 internal_stats
->GetFileReadHist(level
), false,
351 level
, prefetch_index_and_filter_in_cache
);
352 if (file_meta
->table_reader_handle
!= nullptr) {
354 file_meta
->fd
.table_reader
= table_cache_
->GetTableReaderFromHandle(
355 file_meta
->table_reader_handle
);
360 if (max_threads
<= 1) {
361 load_handlers_func();
363 std::vector
<port::Thread
> threads
;
364 for (int i
= 0; i
< max_threads
; i
++) {
365 threads
.emplace_back(load_handlers_func
);
368 for (auto& t
: threads
) {
374 void MaybeAddFile(VersionStorageInfo
* vstorage
, int level
, FileMetaData
* f
) {
375 if (levels_
[level
].deleted_files
.count(f
->fd
.GetNumber()) > 0) {
376 // f is to-be-delected table file
377 vstorage
->RemoveCurrentStats(f
);
379 vstorage
->AddFile(level
, f
, info_log_
);
384 VersionBuilder::VersionBuilder(const EnvOptions
& env_options
,
385 TableCache
* table_cache
,
386 VersionStorageInfo
* base_vstorage
,
388 : rep_(new Rep(env_options
, info_log
, table_cache
, base_vstorage
)) {}
389 VersionBuilder::~VersionBuilder() { delete rep_
; }
390 void VersionBuilder::CheckConsistency(VersionStorageInfo
* vstorage
) {
391 rep_
->CheckConsistency(vstorage
);
393 void VersionBuilder::CheckConsistencyForDeletes(VersionEdit
* edit
,
394 uint64_t number
, int level
) {
395 rep_
->CheckConsistencyForDeletes(edit
, number
, level
);
397 void VersionBuilder::Apply(VersionEdit
* edit
) { rep_
->Apply(edit
); }
398 void VersionBuilder::SaveTo(VersionStorageInfo
* vstorage
) {
399 rep_
->SaveTo(vstorage
);
401 void VersionBuilder::LoadTableHandlers(
402 InternalStats
* internal_stats
, int max_threads
,
403 bool prefetch_index_and_filter_in_cache
) {
404 rep_
->LoadTableHandlers(internal_stats
, max_threads
,
405 prefetch_index_and_filter_in_cache
);
407 void VersionBuilder::MaybeAddFile(VersionStorageInfo
* vstorage
, int level
,
409 rep_
->MaybeAddFile(vstorage
, level
, f
);
412 } // namespace rocksdb