]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/version_builder.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / db / version_builder.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/version_builder.h"
11
12 #ifndef __STDC_FORMAT_MACROS
13 #define __STDC_FORMAT_MACROS
14 #endif
15
16 #include <inttypes.h>
17 #include <algorithm>
18 #include <atomic>
19 #include <functional>
20 #include <set>
21 #include <thread>
22 #include <unordered_map>
23 #include <unordered_set>
24 #include <utility>
25 #include <vector>
26
27 #include "db/dbformat.h"
28 #include "db/internal_stats.h"
29 #include "db/table_cache.h"
30 #include "db/version_set.h"
31 #include "port/port.h"
32 #include "table/table_reader.h"
33
34 namespace rocksdb {
35
36 bool NewestFirstBySeqNo(FileMetaData* a, FileMetaData* b) {
37 if (a->largest_seqno != b->largest_seqno) {
38 return a->largest_seqno > b->largest_seqno;
39 }
40 if (a->smallest_seqno != b->smallest_seqno) {
41 return a->smallest_seqno > b->smallest_seqno;
42 }
43 // Break ties by file number
44 return a->fd.GetNumber() > b->fd.GetNumber();
45 }
46
47 namespace {
48 bool BySmallestKey(FileMetaData* a, FileMetaData* b,
49 const InternalKeyComparator* cmp) {
50 int r = cmp->Compare(a->smallest, b->smallest);
51 if (r != 0) {
52 return (r < 0);
53 }
54 // Break ties by file number
55 return (a->fd.GetNumber() < b->fd.GetNumber());
56 }
57 } // namespace
58
59 class VersionBuilder::Rep {
60 private:
61 // Helper to sort files_ in v
62 // kLevel0 -- NewestFirstBySeqNo
63 // kLevelNon0 -- BySmallestKey
64 struct FileComparator {
65 enum SortMethod { kLevel0 = 0, kLevelNon0 = 1, } sort_method;
66 const InternalKeyComparator* internal_comparator;
67
68 bool operator()(FileMetaData* f1, FileMetaData* f2) const {
69 switch (sort_method) {
70 case kLevel0:
71 return NewestFirstBySeqNo(f1, f2);
72 case kLevelNon0:
73 return BySmallestKey(f1, f2, internal_comparator);
74 }
75 assert(false);
76 return false;
77 }
78 };
79
80 struct LevelState {
81 std::unordered_set<uint64_t> deleted_files;
82 // Map from file number to file meta data.
83 std::unordered_map<uint64_t, FileMetaData*> added_files;
84 };
85
86 const EnvOptions& env_options_;
87 Logger* info_log_;
88 TableCache* table_cache_;
89 VersionStorageInfo* base_vstorage_;
90 LevelState* levels_;
91 FileComparator level_zero_cmp_;
92 FileComparator level_nonzero_cmp_;
93
94 public:
95 Rep(const EnvOptions& env_options, Logger* info_log, TableCache* table_cache,
96 VersionStorageInfo* base_vstorage)
97 : env_options_(env_options),
98 info_log_(info_log),
99 table_cache_(table_cache),
100 base_vstorage_(base_vstorage) {
101 levels_ = new LevelState[base_vstorage_->num_levels()];
102 level_zero_cmp_.sort_method = FileComparator::kLevel0;
103 level_nonzero_cmp_.sort_method = FileComparator::kLevelNon0;
104 level_nonzero_cmp_.internal_comparator =
105 base_vstorage_->InternalComparator();
106 }
107
108 ~Rep() {
109 for (int level = 0; level < base_vstorage_->num_levels(); level++) {
110 const auto& added = levels_[level].added_files;
111 for (auto& pair : added) {
112 UnrefFile(pair.second);
113 }
114 }
115
116 delete[] levels_;
117 }
118
119 void UnrefFile(FileMetaData* f) {
120 f->refs--;
121 if (f->refs <= 0) {
122 if (f->table_reader_handle) {
123 assert(table_cache_ != nullptr);
124 table_cache_->ReleaseHandle(f->table_reader_handle);
125 f->table_reader_handle = nullptr;
126 }
127 delete f;
128 }
129 }
130
131 void CheckConsistency(VersionStorageInfo* vstorage) {
132 #ifdef NDEBUG
133 if (!vstorage->force_consistency_checks()) {
134 // Dont run consistency checks in release mode except if
135 // explicitly asked to
136 return;
137 }
138 #endif
139 // make sure the files are sorted correctly
140 for (int level = 0; level < vstorage->num_levels(); level++) {
141 auto& level_files = vstorage->LevelFiles(level);
142 for (size_t i = 1; i < level_files.size(); i++) {
143 auto f1 = level_files[i - 1];
144 auto f2 = level_files[i];
145 if (level == 0) {
146 if (!level_zero_cmp_(f1, f2)) {
147 fprintf(stderr, "L0 files are not sorted properly");
148 abort();
149 }
150
151 if (f2->smallest_seqno == f2->largest_seqno) {
152 // This is an external file that we ingested
153 SequenceNumber external_file_seqno = f2->smallest_seqno;
154 if (!(external_file_seqno < f1->largest_seqno ||
155 external_file_seqno == 0)) {
156 fprintf(stderr, "L0 file with seqno %" PRIu64 " %" PRIu64
157 " vs. file with global_seqno %" PRIu64 "\n",
158 f1->smallest_seqno, f1->largest_seqno,
159 external_file_seqno);
160 abort();
161 }
162 } else if (f1->smallest_seqno <= f2->smallest_seqno) {
163 fprintf(stderr, "L0 files seqno %" PRIu64 " %" PRIu64
164 " vs. %" PRIu64 " %" PRIu64 "\n",
165 f1->smallest_seqno, f1->largest_seqno, f2->smallest_seqno,
166 f2->largest_seqno);
167 abort();
168 }
169 } else {
170 if (!level_nonzero_cmp_(f1, f2)) {
171 fprintf(stderr, "L%d files are not sorted properly", level);
172 abort();
173 }
174
175 // Make sure there is no overlap in levels > 0
176 if (vstorage->InternalComparator()->Compare(f1->largest,
177 f2->smallest) >= 0) {
178 fprintf(stderr, "L%d have overlapping ranges %s vs. %s\n", level,
179 (f1->largest).DebugString(true).c_str(),
180 (f2->smallest).DebugString(true).c_str());
181 abort();
182 }
183 }
184 }
185 }
186 }
187
188 void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number,
189 int level) {
190 #ifdef NDEBUG
191 if (!base_vstorage_->force_consistency_checks()) {
192 // Dont run consistency checks in release mode except if
193 // explicitly asked to
194 return;
195 }
196 #endif
197 // a file to be deleted better exist in the previous version
198 bool found = false;
199 for (int l = 0; !found && l < base_vstorage_->num_levels(); l++) {
200 const std::vector<FileMetaData*>& base_files =
201 base_vstorage_->LevelFiles(l);
202 for (size_t i = 0; i < base_files.size(); i++) {
203 FileMetaData* f = base_files[i];
204 if (f->fd.GetNumber() == number) {
205 found = true;
206 break;
207 }
208 }
209 }
210 // if the file did not exist in the previous version, then it
211 // is possibly moved from lower level to higher level in current
212 // version
213 for (int l = level + 1; !found && l < base_vstorage_->num_levels(); l++) {
214 auto& level_added = levels_[l].added_files;
215 auto got = level_added.find(number);
216 if (got != level_added.end()) {
217 found = true;
218 break;
219 }
220 }
221
222 // maybe this file was added in a previous edit that was Applied
223 if (!found) {
224 auto& level_added = levels_[level].added_files;
225 auto got = level_added.find(number);
226 if (got != level_added.end()) {
227 found = true;
228 }
229 }
230 if (!found) {
231 fprintf(stderr, "not found %" PRIu64 "\n", number);
232 abort();
233 }
234 }
235
236 // Apply all of the edits in *edit to the current state.
237 void Apply(VersionEdit* edit) {
238 CheckConsistency(base_vstorage_);
239
240 // Delete files
241 const VersionEdit::DeletedFileSet& del = edit->GetDeletedFiles();
242 for (const auto& del_file : del) {
243 const auto level = del_file.first;
244 const auto number = del_file.second;
245 levels_[level].deleted_files.insert(number);
246 CheckConsistencyForDeletes(edit, number, level);
247
248 auto exising = levels_[level].added_files.find(number);
249 if (exising != levels_[level].added_files.end()) {
250 UnrefFile(exising->second);
251 levels_[level].added_files.erase(number);
252 }
253 }
254
255 // Add new files
256 for (const auto& new_file : edit->GetNewFiles()) {
257 const int level = new_file.first;
258 FileMetaData* f = new FileMetaData(new_file.second);
259 f->refs = 1;
260
261 assert(levels_[level].added_files.find(f->fd.GetNumber()) ==
262 levels_[level].added_files.end());
263 levels_[level].deleted_files.erase(f->fd.GetNumber());
264 levels_[level].added_files[f->fd.GetNumber()] = f;
265 }
266 }
267
268 // Save the current state in *v.
269 void SaveTo(VersionStorageInfo* vstorage) {
270 CheckConsistency(base_vstorage_);
271 CheckConsistency(vstorage);
272
273 for (int level = 0; level < base_vstorage_->num_levels(); level++) {
274 const auto& cmp = (level == 0) ? level_zero_cmp_ : level_nonzero_cmp_;
275 // Merge the set of added files with the set of pre-existing files.
276 // Drop any deleted files. Store the result in *v.
277 const auto& base_files = base_vstorage_->LevelFiles(level);
278 auto base_iter = base_files.begin();
279 auto base_end = base_files.end();
280 const auto& unordered_added_files = levels_[level].added_files;
281 vstorage->Reserve(level,
282 base_files.size() + unordered_added_files.size());
283
284 // Sort added files for the level.
285 std::vector<FileMetaData*> added_files;
286 added_files.reserve(unordered_added_files.size());
287 for (const auto& pair : unordered_added_files) {
288 added_files.push_back(pair.second);
289 }
290 std::sort(added_files.begin(), added_files.end(), cmp);
291
292 #ifndef NDEBUG
293 FileMetaData* prev_file = nullptr;
294 #endif
295
296 for (const auto& added : added_files) {
297 #ifndef NDEBUG
298 if (level > 0 && prev_file != nullptr) {
299 assert(base_vstorage_->InternalComparator()->Compare(
300 prev_file->smallest, added->smallest) <= 0);
301 }
302 prev_file = added;
303 #endif
304
305 // Add all smaller files listed in base_
306 for (auto bpos = std::upper_bound(base_iter, base_end, added, cmp);
307 base_iter != bpos; ++base_iter) {
308 MaybeAddFile(vstorage, level, *base_iter);
309 }
310
311 MaybeAddFile(vstorage, level, added);
312 }
313
314 // Add remaining base files
315 for (; base_iter != base_end; ++base_iter) {
316 MaybeAddFile(vstorage, level, *base_iter);
317 }
318 }
319
320 CheckConsistency(vstorage);
321 }
322
323 void LoadTableHandlers(InternalStats* internal_stats, int max_threads,
324 bool prefetch_index_and_filter_in_cache) {
325 assert(table_cache_ != nullptr);
326 // <file metadata, level>
327 std::vector<std::pair<FileMetaData*, int>> files_meta;
328 for (int level = 0; level < base_vstorage_->num_levels(); level++) {
329 for (auto& file_meta_pair : levels_[level].added_files) {
330 auto* file_meta = file_meta_pair.second;
331 assert(!file_meta->table_reader_handle);
332 files_meta.emplace_back(file_meta, level);
333 }
334 }
335
336 std::atomic<size_t> next_file_meta_idx(0);
337 std::function<void()> load_handlers_func = [&]() {
338 while (true) {
339 size_t file_idx = next_file_meta_idx.fetch_add(1);
340 if (file_idx >= files_meta.size()) {
341 break;
342 }
343
344 auto* file_meta = files_meta[file_idx].first;
345 int level = files_meta[file_idx].second;
346 table_cache_->FindTable(env_options_,
347 *(base_vstorage_->InternalComparator()),
348 file_meta->fd, &file_meta->table_reader_handle,
349 false /*no_io */, true /* record_read_stats */,
350 internal_stats->GetFileReadHist(level), false,
351 level, prefetch_index_and_filter_in_cache);
352 if (file_meta->table_reader_handle != nullptr) {
353 // Load table_reader
354 file_meta->fd.table_reader = table_cache_->GetTableReaderFromHandle(
355 file_meta->table_reader_handle);
356 }
357 }
358 };
359
360 if (max_threads <= 1) {
361 load_handlers_func();
362 } else {
363 std::vector<port::Thread> threads;
364 for (int i = 0; i < max_threads; i++) {
365 threads.emplace_back(load_handlers_func);
366 }
367
368 for (auto& t : threads) {
369 t.join();
370 }
371 }
372 }
373
374 void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
375 if (levels_[level].deleted_files.count(f->fd.GetNumber()) > 0) {
376 // f is to-be-delected table file
377 vstorage->RemoveCurrentStats(f);
378 } else {
379 vstorage->AddFile(level, f, info_log_);
380 }
381 }
382 };
383
384 VersionBuilder::VersionBuilder(const EnvOptions& env_options,
385 TableCache* table_cache,
386 VersionStorageInfo* base_vstorage,
387 Logger* info_log)
388 : rep_(new Rep(env_options, info_log, table_cache, base_vstorage)) {}
389 VersionBuilder::~VersionBuilder() { delete rep_; }
390 void VersionBuilder::CheckConsistency(VersionStorageInfo* vstorage) {
391 rep_->CheckConsistency(vstorage);
392 }
393 void VersionBuilder::CheckConsistencyForDeletes(VersionEdit* edit,
394 uint64_t number, int level) {
395 rep_->CheckConsistencyForDeletes(edit, number, level);
396 }
397 void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); }
398 void VersionBuilder::SaveTo(VersionStorageInfo* vstorage) {
399 rep_->SaveTo(vstorage);
400 }
401 void VersionBuilder::LoadTableHandlers(
402 InternalStats* internal_stats, int max_threads,
403 bool prefetch_index_and_filter_in_cache) {
404 rep_->LoadTableHandlers(internal_stats, max_threads,
405 prefetch_index_and_filter_in_cache);
406 }
407 void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,
408 FileMetaData* f) {
409 rep_->MaybeAddFile(vstorage, level, f);
410 }
411
412 } // namespace rocksdb