]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/version_set.cc
cffc5979d5283849be09ef649625cba231057585
[ceph.git] / ceph / src / rocksdb / db / version_set.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/version_set.h"
11
12 #include <stdio.h>
13
14 #include <algorithm>
15 #include <array>
16 #include <cinttypes>
17 #include <list>
18 #include <map>
19 #include <set>
20 #include <string>
21 #include <unordered_map>
22 #include <vector>
23
24 #include "compaction/compaction.h"
25 #include "db/blob/blob_file_cache.h"
26 #include "db/blob/blob_file_reader.h"
27 #include "db/blob/blob_index.h"
28 #include "db/internal_stats.h"
29 #include "db/log_reader.h"
30 #include "db/log_writer.h"
31 #include "db/memtable.h"
32 #include "db/merge_context.h"
33 #include "db/merge_helper.h"
34 #include "db/pinned_iterators_manager.h"
35 #include "db/table_cache.h"
36 #include "db/version_builder.h"
37 #include "db/version_edit_handler.h"
38 #include "file/filename.h"
39 #include "file/random_access_file_reader.h"
40 #include "file/read_write_util.h"
41 #include "file/writable_file_writer.h"
42 #include "monitoring/file_read_sample.h"
43 #include "monitoring/perf_context_imp.h"
44 #include "monitoring/persistent_stats_history.h"
45 #include "rocksdb/env.h"
46 #include "rocksdb/merge_operator.h"
47 #include "rocksdb/write_buffer_manager.h"
48 #include "table/format.h"
49 #include "table/get_context.h"
50 #include "table/internal_iterator.h"
51 #include "table/merging_iterator.h"
52 #include "table/meta_blocks.h"
53 #include "table/multiget_context.h"
54 #include "table/plain/plain_table_factory.h"
55 #include "table/table_reader.h"
56 #include "table/two_level_iterator.h"
57 #include "test_util/sync_point.h"
58 #include "util/cast_util.h"
59 #include "util/coding.h"
60 #include "util/stop_watch.h"
61 #include "util/string_util.h"
62 #include "util/user_comparator_wrapper.h"
63
64 namespace ROCKSDB_NAMESPACE {
65
66 namespace {
67
68 // Find File in LevelFilesBrief data structure
69 // Within an index range defined by left and right
70 int FindFileInRange(const InternalKeyComparator& icmp,
71 const LevelFilesBrief& file_level,
72 const Slice& key,
73 uint32_t left,
74 uint32_t right) {
75 auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
76 return icmp.InternalKeyComparator::Compare(f.largest_key, k) < 0;
77 };
78 const auto &b = file_level.files;
79 return static_cast<int>(std::lower_bound(b + left,
80 b + right, key, cmp) - b);
81 }
82
83 Status OverlapWithIterator(const Comparator* ucmp,
84 const Slice& smallest_user_key,
85 const Slice& largest_user_key,
86 InternalIterator* iter,
87 bool* overlap) {
88 InternalKey range_start(smallest_user_key, kMaxSequenceNumber,
89 kValueTypeForSeek);
90 iter->Seek(range_start.Encode());
91 if (!iter->status().ok()) {
92 return iter->status();
93 }
94
95 *overlap = false;
96 if (iter->Valid()) {
97 ParsedInternalKey seek_result;
98 Status s = ParseInternalKey(iter->key(), &seek_result,
99 false /* log_err_key */); // TODO
100 if (!s.ok()) return s;
101
102 if (ucmp->CompareWithoutTimestamp(seek_result.user_key, largest_user_key) <=
103 0) {
104 *overlap = true;
105 }
106 }
107
108 return iter->status();
109 }
110
111 // Class to help choose the next file to search for the particular key.
112 // Searches and returns files level by level.
113 // We can search level-by-level since entries never hop across
114 // levels. Therefore we are guaranteed that if we find data
115 // in a smaller level, later levels are irrelevant (unless we
116 // are MergeInProgress).
117 class FilePicker {
118 public:
119 FilePicker(std::vector<FileMetaData*>* files, const Slice& user_key,
120 const Slice& ikey, autovector<LevelFilesBrief>* file_levels,
121 unsigned int num_levels, FileIndexer* file_indexer,
122 const Comparator* user_comparator,
123 const InternalKeyComparator* internal_comparator)
124 : num_levels_(num_levels),
125 curr_level_(static_cast<unsigned int>(-1)),
126 returned_file_level_(static_cast<unsigned int>(-1)),
127 hit_file_level_(static_cast<unsigned int>(-1)),
128 search_left_bound_(0),
129 search_right_bound_(FileIndexer::kLevelMaxIndex),
130 #ifndef NDEBUG
131 files_(files),
132 #endif
133 level_files_brief_(file_levels),
134 is_hit_file_last_in_level_(false),
135 curr_file_level_(nullptr),
136 user_key_(user_key),
137 ikey_(ikey),
138 file_indexer_(file_indexer),
139 user_comparator_(user_comparator),
140 internal_comparator_(internal_comparator) {
141 #ifdef NDEBUG
142 (void)files;
143 #endif
144 // Setup member variables to search first level.
145 search_ended_ = !PrepareNextLevel();
146 if (!search_ended_) {
147 // Prefetch Level 0 table data to avoid cache miss if possible.
148 for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
149 auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
150 if (r) {
151 r->Prepare(ikey);
152 }
153 }
154 }
155 }
156
157 int GetCurrentLevel() const { return curr_level_; }
158
159 FdWithKeyRange* GetNextFile() {
160 while (!search_ended_) { // Loops over different levels.
161 while (curr_index_in_curr_level_ < curr_file_level_->num_files) {
162 // Loops over all files in current level.
163 FdWithKeyRange* f = &curr_file_level_->files[curr_index_in_curr_level_];
164 hit_file_level_ = curr_level_;
165 is_hit_file_last_in_level_ =
166 curr_index_in_curr_level_ == curr_file_level_->num_files - 1;
167 int cmp_largest = -1;
168
169 // Do key range filtering of files or/and fractional cascading if:
170 // (1) not all the files are in level 0, or
171 // (2) there are more than 3 current level files
172 // If there are only 3 or less current level files in the system, we skip
173 // the key range filtering. In this case, more likely, the system is
174 // highly tuned to minimize number of tables queried by each query,
175 // so it is unlikely that key range filtering is more efficient than
176 // querying the files.
177 if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
178 // Check if key is within a file's range. If search left bound and
179 // right bound point to the same find, we are sure key falls in
180 // range.
181 assert(curr_level_ == 0 ||
182 curr_index_in_curr_level_ == start_index_in_curr_level_ ||
183 user_comparator_->CompareWithoutTimestamp(
184 user_key_, ExtractUserKey(f->smallest_key)) <= 0);
185
186 int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
187 user_key_, ExtractUserKey(f->smallest_key));
188 if (cmp_smallest >= 0) {
189 cmp_largest = user_comparator_->CompareWithoutTimestamp(
190 user_key_, ExtractUserKey(f->largest_key));
191 }
192
193 // Setup file search bound for the next level based on the
194 // comparison results
195 if (curr_level_ > 0) {
196 file_indexer_->GetNextLevelIndex(curr_level_,
197 curr_index_in_curr_level_,
198 cmp_smallest, cmp_largest,
199 &search_left_bound_,
200 &search_right_bound_);
201 }
202 // Key falls out of current file's range
203 if (cmp_smallest < 0 || cmp_largest > 0) {
204 if (curr_level_ == 0) {
205 ++curr_index_in_curr_level_;
206 continue;
207 } else {
208 // Search next level.
209 break;
210 }
211 }
212 }
213 #ifndef NDEBUG
214 // Sanity check to make sure that the files are correctly sorted
215 if (prev_file_) {
216 if (curr_level_ != 0) {
217 int comp_sign = internal_comparator_->Compare(
218 prev_file_->largest_key, f->smallest_key);
219 assert(comp_sign < 0);
220 } else {
221 // level == 0, the current file cannot be newer than the previous
222 // one. Use compressed data structure, has no attribute seqNo
223 assert(curr_index_in_curr_level_ > 0);
224 assert(!NewestFirstBySeqNo(files_[0][curr_index_in_curr_level_],
225 files_[0][curr_index_in_curr_level_-1]));
226 }
227 }
228 prev_file_ = f;
229 #endif
230 returned_file_level_ = curr_level_;
231 if (curr_level_ > 0 && cmp_largest < 0) {
232 // No more files to search in this level.
233 search_ended_ = !PrepareNextLevel();
234 } else {
235 ++curr_index_in_curr_level_;
236 }
237 return f;
238 }
239 // Start searching next level.
240 search_ended_ = !PrepareNextLevel();
241 }
242 // Search ended.
243 return nullptr;
244 }
245
246 // getter for current file level
247 // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
248 unsigned int GetHitFileLevel() { return hit_file_level_; }
249
250 // Returns true if the most recent "hit file" (i.e., one returned by
251 // GetNextFile()) is at the last index in its level.
252 bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
253
254 private:
255 unsigned int num_levels_;
256 unsigned int curr_level_;
257 unsigned int returned_file_level_;
258 unsigned int hit_file_level_;
259 int32_t search_left_bound_;
260 int32_t search_right_bound_;
261 #ifndef NDEBUG
262 std::vector<FileMetaData*>* files_;
263 #endif
264 autovector<LevelFilesBrief>* level_files_brief_;
265 bool search_ended_;
266 bool is_hit_file_last_in_level_;
267 LevelFilesBrief* curr_file_level_;
268 unsigned int curr_index_in_curr_level_;
269 unsigned int start_index_in_curr_level_;
270 Slice user_key_;
271 Slice ikey_;
272 FileIndexer* file_indexer_;
273 const Comparator* user_comparator_;
274 const InternalKeyComparator* internal_comparator_;
275 #ifndef NDEBUG
276 FdWithKeyRange* prev_file_;
277 #endif
278
279 // Setup local variables to search next level.
280 // Returns false if there are no more levels to search.
281 bool PrepareNextLevel() {
282 curr_level_++;
283 while (curr_level_ < num_levels_) {
284 curr_file_level_ = &(*level_files_brief_)[curr_level_];
285 if (curr_file_level_->num_files == 0) {
286 // When current level is empty, the search bound generated from upper
287 // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
288 // also empty.
289 assert(search_left_bound_ == 0);
290 assert(search_right_bound_ == -1 ||
291 search_right_bound_ == FileIndexer::kLevelMaxIndex);
292 // Since current level is empty, it will need to search all files in
293 // the next level
294 search_left_bound_ = 0;
295 search_right_bound_ = FileIndexer::kLevelMaxIndex;
296 curr_level_++;
297 continue;
298 }
299
300 // Some files may overlap each other. We find
301 // all files that overlap user_key and process them in order from
302 // newest to oldest. In the context of merge-operator, this can occur at
303 // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
304 // are always compacted into a single entry).
305 int32_t start_index;
306 if (curr_level_ == 0) {
307 // On Level-0, we read through all files to check for overlap.
308 start_index = 0;
309 } else {
310 // On Level-n (n>=1), files are sorted. Binary search to find the
311 // earliest file whose largest key >= ikey. Search left bound and
312 // right bound are used to narrow the range.
313 if (search_left_bound_ <= search_right_bound_) {
314 if (search_right_bound_ == FileIndexer::kLevelMaxIndex) {
315 search_right_bound_ =
316 static_cast<int32_t>(curr_file_level_->num_files) - 1;
317 }
318 // `search_right_bound_` is an inclusive upper-bound, but since it was
319 // determined based on user key, it is still possible the lookup key
320 // falls to the right of `search_right_bound_`'s corresponding file.
321 // So, pass a limit one higher, which allows us to detect this case.
322 start_index =
323 FindFileInRange(*internal_comparator_, *curr_file_level_, ikey_,
324 static_cast<uint32_t>(search_left_bound_),
325 static_cast<uint32_t>(search_right_bound_) + 1);
326 if (start_index == search_right_bound_ + 1) {
327 // `ikey_` comes after `search_right_bound_`. The lookup key does
328 // not exist on this level, so let's skip this level and do a full
329 // binary search on the next level.
330 search_left_bound_ = 0;
331 search_right_bound_ = FileIndexer::kLevelMaxIndex;
332 curr_level_++;
333 continue;
334 }
335 } else {
336 // search_left_bound > search_right_bound, key does not exist in
337 // this level. Since no comparison is done in this level, it will
338 // need to search all files in the next level.
339 search_left_bound_ = 0;
340 search_right_bound_ = FileIndexer::kLevelMaxIndex;
341 curr_level_++;
342 continue;
343 }
344 }
345 start_index_in_curr_level_ = start_index;
346 curr_index_in_curr_level_ = start_index;
347 #ifndef NDEBUG
348 prev_file_ = nullptr;
349 #endif
350 return true;
351 }
352 // curr_level_ = num_levels_. So, no more levels to search.
353 return false;
354 }
355 };
356
357 class FilePickerMultiGet {
358 private:
359 struct FilePickerContext;
360
361 public:
362 FilePickerMultiGet(MultiGetRange* range,
363 autovector<LevelFilesBrief>* file_levels,
364 unsigned int num_levels, FileIndexer* file_indexer,
365 const Comparator* user_comparator,
366 const InternalKeyComparator* internal_comparator)
367 : num_levels_(num_levels),
368 curr_level_(static_cast<unsigned int>(-1)),
369 returned_file_level_(static_cast<unsigned int>(-1)),
370 hit_file_level_(static_cast<unsigned int>(-1)),
371 range_(range),
372 batch_iter_(range->begin()),
373 batch_iter_prev_(range->begin()),
374 upper_key_(range->begin()),
375 maybe_repeat_key_(false),
376 current_level_range_(*range, range->begin(), range->end()),
377 current_file_range_(*range, range->begin(), range->end()),
378 level_files_brief_(file_levels),
379 is_hit_file_last_in_level_(false),
380 curr_file_level_(nullptr),
381 file_indexer_(file_indexer),
382 user_comparator_(user_comparator),
383 internal_comparator_(internal_comparator) {
384 for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
385 fp_ctx_array_[iter.index()] =
386 FilePickerContext(0, FileIndexer::kLevelMaxIndex);
387 }
388
389 // Setup member variables to search first level.
390 search_ended_ = !PrepareNextLevel();
391 if (!search_ended_) {
392 // REVISIT
393 // Prefetch Level 0 table data to avoid cache miss if possible.
394 // As of now, only PlainTableReader and CuckooTableReader do any
395 // prefetching. This may not be necessary anymore once we implement
396 // batching in those table readers
397 for (unsigned int i = 0; i < (*level_files_brief_)[0].num_files; ++i) {
398 auto* r = (*level_files_brief_)[0].files[i].fd.table_reader;
399 if (r) {
400 for (auto iter = range_->begin(); iter != range_->end(); ++iter) {
401 r->Prepare(iter->ikey);
402 }
403 }
404 }
405 }
406 }
407
408 int GetCurrentLevel() const { return curr_level_; }
409
410 // Iterates through files in the current level until it finds a file that
411 // contains atleast one key from the MultiGet batch
412 bool GetNextFileInLevelWithKeys(MultiGetRange* next_file_range,
413 size_t* file_index, FdWithKeyRange** fd,
414 bool* is_last_key_in_file) {
415 size_t curr_file_index = *file_index;
416 FdWithKeyRange* f = nullptr;
417 bool file_hit = false;
418 int cmp_largest = -1;
419 if (curr_file_index >= curr_file_level_->num_files) {
420 // In the unlikely case the next key is a duplicate of the current key,
421 // and the current key is the last in the level and the internal key
422 // was not found, we need to skip lookup for the remaining keys and
423 // reset the search bounds
424 if (batch_iter_ != current_level_range_.end()) {
425 ++batch_iter_;
426 for (; batch_iter_ != current_level_range_.end(); ++batch_iter_) {
427 struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
428 fp_ctx.search_left_bound = 0;
429 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
430 }
431 }
432 return false;
433 }
434 // Loops over keys in the MultiGet batch until it finds a file with
435 // atleast one of the keys. Then it keeps moving forward until the
436 // last key in the batch that falls in that file
437 while (batch_iter_ != current_level_range_.end() &&
438 (fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level ==
439 curr_file_index ||
440 !file_hit)) {
441 struct FilePickerContext& fp_ctx = fp_ctx_array_[batch_iter_.index()];
442 f = &curr_file_level_->files[fp_ctx.curr_index_in_curr_level];
443 Slice& user_key = batch_iter_->ukey_without_ts;
444
445 // Do key range filtering of files or/and fractional cascading if:
446 // (1) not all the files are in level 0, or
447 // (2) there are more than 3 current level files
448 // If there are only 3 or less current level files in the system, we
449 // skip the key range filtering. In this case, more likely, the system
450 // is highly tuned to minimize number of tables queried by each query,
451 // so it is unlikely that key range filtering is more efficient than
452 // querying the files.
453 if (num_levels_ > 1 || curr_file_level_->num_files > 3) {
454 // Check if key is within a file's range. If search left bound and
455 // right bound point to the same find, we are sure key falls in
456 // range.
457 int cmp_smallest = user_comparator_->CompareWithoutTimestamp(
458 user_key, false, ExtractUserKey(f->smallest_key), true);
459
460 assert(curr_level_ == 0 ||
461 fp_ctx.curr_index_in_curr_level ==
462 fp_ctx.start_index_in_curr_level ||
463 cmp_smallest <= 0);
464
465 if (cmp_smallest >= 0) {
466 cmp_largest = user_comparator_->CompareWithoutTimestamp(
467 user_key, false, ExtractUserKey(f->largest_key), true);
468 } else {
469 cmp_largest = -1;
470 }
471
472 // Setup file search bound for the next level based on the
473 // comparison results
474 if (curr_level_ > 0) {
475 file_indexer_->GetNextLevelIndex(
476 curr_level_, fp_ctx.curr_index_in_curr_level, cmp_smallest,
477 cmp_largest, &fp_ctx.search_left_bound,
478 &fp_ctx.search_right_bound);
479 }
480 // Key falls out of current file's range
481 if (cmp_smallest < 0 || cmp_largest > 0) {
482 next_file_range->SkipKey(batch_iter_);
483 } else {
484 file_hit = true;
485 }
486 } else {
487 file_hit = true;
488 }
489 if (cmp_largest == 0) {
490 // cmp_largest is 0, which means the next key will not be in this
491 // file, so stop looking further. However, its possible there are
492 // duplicates in the batch, so find the upper bound for the batch
493 // in this file (upper_key_) by skipping past the duplicates. We
494 // leave batch_iter_ as is since we may have to pick up from there
495 // for the next file, if this file has a merge value rather than
496 // final value
497 upper_key_ = batch_iter_;
498 ++upper_key_;
499 while (upper_key_ != current_level_range_.end() &&
500 user_comparator_->CompareWithoutTimestamp(
501 batch_iter_->ukey_without_ts, false,
502 upper_key_->ukey_without_ts, false) == 0) {
503 ++upper_key_;
504 }
505 break;
506 } else {
507 if (curr_level_ == 0) {
508 // We need to look through all files in level 0
509 ++fp_ctx.curr_index_in_curr_level;
510 }
511 ++batch_iter_;
512 }
513 if (!file_hit) {
514 curr_file_index =
515 (batch_iter_ != current_level_range_.end())
516 ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
517 : curr_file_level_->num_files;
518 }
519 }
520
521 *fd = f;
522 *file_index = curr_file_index;
523 *is_last_key_in_file = cmp_largest == 0;
524 if (!*is_last_key_in_file) {
525 // If the largest key in the batch overlapping the file is not the
526 // largest key in the file, upper_ley_ would not have been updated so
527 // update it here
528 upper_key_ = batch_iter_;
529 }
530 return file_hit;
531 }
532
533 FdWithKeyRange* GetNextFile() {
534 while (!search_ended_) {
535 // Start searching next level.
536 if (batch_iter_ == current_level_range_.end()) {
537 search_ended_ = !PrepareNextLevel();
538 continue;
539 } else {
540 if (maybe_repeat_key_) {
541 maybe_repeat_key_ = false;
542 // Check if we found the final value for the last key in the
543 // previous lookup range. If we did, then there's no need to look
544 // any further for that key, so advance batch_iter_. Else, keep
545 // batch_iter_ positioned on that key so we look it up again in
546 // the next file
547 // For L0, always advance the key because we will look in the next
548 // file regardless for all keys not found yet
549 if (current_level_range_.CheckKeyDone(batch_iter_) ||
550 curr_level_ == 0) {
551 batch_iter_ = upper_key_;
552 }
553 }
554 // batch_iter_prev_ will become the start key for the next file
555 // lookup
556 batch_iter_prev_ = batch_iter_;
557 }
558
559 MultiGetRange next_file_range(current_level_range_, batch_iter_prev_,
560 current_level_range_.end());
561 size_t curr_file_index =
562 (batch_iter_ != current_level_range_.end())
563 ? fp_ctx_array_[batch_iter_.index()].curr_index_in_curr_level
564 : curr_file_level_->num_files;
565 FdWithKeyRange* f;
566 bool is_last_key_in_file;
567 if (!GetNextFileInLevelWithKeys(&next_file_range, &curr_file_index, &f,
568 &is_last_key_in_file)) {
569 search_ended_ = !PrepareNextLevel();
570 } else {
571 if (is_last_key_in_file) {
572 // Since cmp_largest is 0, batch_iter_ still points to the last key
573 // that falls in this file, instead of the next one. Increment
574 // the file index for all keys between batch_iter_ and upper_key_
575 auto tmp_iter = batch_iter_;
576 while (tmp_iter != upper_key_) {
577 ++(fp_ctx_array_[tmp_iter.index()].curr_index_in_curr_level);
578 ++tmp_iter;
579 }
580 maybe_repeat_key_ = true;
581 }
582 // Set the range for this file
583 current_file_range_ =
584 MultiGetRange(next_file_range, batch_iter_prev_, upper_key_);
585 returned_file_level_ = curr_level_;
586 hit_file_level_ = curr_level_;
587 is_hit_file_last_in_level_ =
588 curr_file_index == curr_file_level_->num_files - 1;
589 return f;
590 }
591 }
592
593 // Search ended
594 return nullptr;
595 }
596
597 // getter for current file level
598 // for GET_HIT_L0, GET_HIT_L1 & GET_HIT_L2_AND_UP counts
599 unsigned int GetHitFileLevel() { return hit_file_level_; }
600
601 // Returns true if the most recent "hit file" (i.e., one returned by
602 // GetNextFile()) is at the last index in its level.
603 bool IsHitFileLastInLevel() { return is_hit_file_last_in_level_; }
604
605 const MultiGetRange& CurrentFileRange() { return current_file_range_; }
606
607 private:
608 unsigned int num_levels_;
609 unsigned int curr_level_;
610 unsigned int returned_file_level_;
611 unsigned int hit_file_level_;
612
613 struct FilePickerContext {
614 int32_t search_left_bound;
615 int32_t search_right_bound;
616 unsigned int curr_index_in_curr_level;
617 unsigned int start_index_in_curr_level;
618
619 FilePickerContext(int32_t left, int32_t right)
620 : search_left_bound(left), search_right_bound(right),
621 curr_index_in_curr_level(0), start_index_in_curr_level(0) {}
622
623 FilePickerContext() = default;
624 };
625 std::array<FilePickerContext, MultiGetContext::MAX_BATCH_SIZE> fp_ctx_array_;
626 MultiGetRange* range_;
627 // Iterator to iterate through the keys in a MultiGet batch, that gets reset
628 // at the beginning of each level. Each call to GetNextFile() will position
629 // batch_iter_ at or right after the last key that was found in the returned
630 // SST file
631 MultiGetRange::Iterator batch_iter_;
632 // An iterator that records the previous position of batch_iter_, i.e last
633 // key found in the previous SST file, in order to serve as the start of
634 // the batch key range for the next SST file
635 MultiGetRange::Iterator batch_iter_prev_;
636 MultiGetRange::Iterator upper_key_;
637 bool maybe_repeat_key_;
638 MultiGetRange current_level_range_;
639 MultiGetRange current_file_range_;
640 autovector<LevelFilesBrief>* level_files_brief_;
641 bool search_ended_;
642 bool is_hit_file_last_in_level_;
643 LevelFilesBrief* curr_file_level_;
644 FileIndexer* file_indexer_;
645 const Comparator* user_comparator_;
646 const InternalKeyComparator* internal_comparator_;
647
648 // Setup local variables to search next level.
649 // Returns false if there are no more levels to search.
650 bool PrepareNextLevel() {
651 if (curr_level_ == 0) {
652 MultiGetRange::Iterator mget_iter = current_level_range_.begin();
653 if (fp_ctx_array_[mget_iter.index()].curr_index_in_curr_level <
654 curr_file_level_->num_files) {
655 batch_iter_prev_ = current_level_range_.begin();
656 upper_key_ = batch_iter_ = current_level_range_.begin();
657 return true;
658 }
659 }
660
661 curr_level_++;
662 // Reset key range to saved value
663 while (curr_level_ < num_levels_) {
664 bool level_contains_keys = false;
665 curr_file_level_ = &(*level_files_brief_)[curr_level_];
666 if (curr_file_level_->num_files == 0) {
667 // When current level is empty, the search bound generated from upper
668 // level must be [0, -1] or [0, FileIndexer::kLevelMaxIndex] if it is
669 // also empty.
670
671 for (auto mget_iter = current_level_range_.begin();
672 mget_iter != current_level_range_.end(); ++mget_iter) {
673 struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
674
675 assert(fp_ctx.search_left_bound == 0);
676 assert(fp_ctx.search_right_bound == -1 ||
677 fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex);
678 // Since current level is empty, it will need to search all files in
679 // the next level
680 fp_ctx.search_left_bound = 0;
681 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
682 }
683 // Skip all subsequent empty levels
684 do {
685 ++curr_level_;
686 } while ((curr_level_ < num_levels_) &&
687 (*level_files_brief_)[curr_level_].num_files == 0);
688 continue;
689 }
690
691 // Some files may overlap each other. We find
692 // all files that overlap user_key and process them in order from
693 // newest to oldest. In the context of merge-operator, this can occur at
694 // any level. Otherwise, it only occurs at Level-0 (since Put/Deletes
695 // are always compacted into a single entry).
696 int32_t start_index = -1;
697 current_level_range_ =
698 MultiGetRange(*range_, range_->begin(), range_->end());
699 for (auto mget_iter = current_level_range_.begin();
700 mget_iter != current_level_range_.end(); ++mget_iter) {
701 struct FilePickerContext& fp_ctx = fp_ctx_array_[mget_iter.index()];
702 if (curr_level_ == 0) {
703 // On Level-0, we read through all files to check for overlap.
704 start_index = 0;
705 level_contains_keys = true;
706 } else {
707 // On Level-n (n>=1), files are sorted. Binary search to find the
708 // earliest file whose largest key >= ikey. Search left bound and
709 // right bound are used to narrow the range.
710 if (fp_ctx.search_left_bound <= fp_ctx.search_right_bound) {
711 if (fp_ctx.search_right_bound == FileIndexer::kLevelMaxIndex) {
712 fp_ctx.search_right_bound =
713 static_cast<int32_t>(curr_file_level_->num_files) - 1;
714 }
715 // `search_right_bound_` is an inclusive upper-bound, but since it
716 // was determined based on user key, it is still possible the lookup
717 // key falls to the right of `search_right_bound_`'s corresponding
718 // file. So, pass a limit one higher, which allows us to detect this
719 // case.
720 Slice& ikey = mget_iter->ikey;
721 start_index = FindFileInRange(
722 *internal_comparator_, *curr_file_level_, ikey,
723 static_cast<uint32_t>(fp_ctx.search_left_bound),
724 static_cast<uint32_t>(fp_ctx.search_right_bound) + 1);
725 if (start_index == fp_ctx.search_right_bound + 1) {
726 // `ikey_` comes after `search_right_bound_`. The lookup key does
727 // not exist on this level, so let's skip this level and do a full
728 // binary search on the next level.
729 fp_ctx.search_left_bound = 0;
730 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
731 current_level_range_.SkipKey(mget_iter);
732 continue;
733 } else {
734 level_contains_keys = true;
735 }
736 } else {
737 // search_left_bound > search_right_bound, key does not exist in
738 // this level. Since no comparison is done in this level, it will
739 // need to search all files in the next level.
740 fp_ctx.search_left_bound = 0;
741 fp_ctx.search_right_bound = FileIndexer::kLevelMaxIndex;
742 current_level_range_.SkipKey(mget_iter);
743 continue;
744 }
745 }
746 fp_ctx.start_index_in_curr_level = start_index;
747 fp_ctx.curr_index_in_curr_level = start_index;
748 }
749 if (level_contains_keys) {
750 batch_iter_prev_ = current_level_range_.begin();
751 upper_key_ = batch_iter_ = current_level_range_.begin();
752 return true;
753 }
754 curr_level_++;
755 }
756 // curr_level_ = num_levels_. So, no more levels to search.
757 return false;
758 }
759 };
760 } // anonymous namespace
761
762 VersionStorageInfo::~VersionStorageInfo() { delete[] files_; }
763
764 Version::~Version() {
765 assert(refs_ == 0);
766
767 // Remove from linked list
768 prev_->next_ = next_;
769 next_->prev_ = prev_;
770
771 // Drop references to files
772 for (int level = 0; level < storage_info_.num_levels_; level++) {
773 for (size_t i = 0; i < storage_info_.files_[level].size(); i++) {
774 FileMetaData* f = storage_info_.files_[level][i];
775 assert(f->refs > 0);
776 f->refs--;
777 if (f->refs <= 0) {
778 assert(cfd_ != nullptr);
779 uint32_t path_id = f->fd.GetPathId();
780 assert(path_id < cfd_->ioptions()->cf_paths.size());
781 vset_->obsolete_files_.push_back(
782 ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
783 }
784 }
785 }
786 }
787
788 int FindFile(const InternalKeyComparator& icmp,
789 const LevelFilesBrief& file_level,
790 const Slice& key) {
791 return FindFileInRange(icmp, file_level, key, 0,
792 static_cast<uint32_t>(file_level.num_files));
793 }
794
795 void DoGenerateLevelFilesBrief(LevelFilesBrief* file_level,
796 const std::vector<FileMetaData*>& files,
797 Arena* arena) {
798 assert(file_level);
799 assert(arena);
800
801 size_t num = files.size();
802 file_level->num_files = num;
803 char* mem = arena->AllocateAligned(num * sizeof(FdWithKeyRange));
804 file_level->files = new (mem)FdWithKeyRange[num];
805
806 for (size_t i = 0; i < num; i++) {
807 Slice smallest_key = files[i]->smallest.Encode();
808 Slice largest_key = files[i]->largest.Encode();
809
810 // Copy key slice to sequential memory
811 size_t smallest_size = smallest_key.size();
812 size_t largest_size = largest_key.size();
813 mem = arena->AllocateAligned(smallest_size + largest_size);
814 memcpy(mem, smallest_key.data(), smallest_size);
815 memcpy(mem + smallest_size, largest_key.data(), largest_size);
816
817 FdWithKeyRange& f = file_level->files[i];
818 f.fd = files[i]->fd;
819 f.file_metadata = files[i];
820 f.smallest_key = Slice(mem, smallest_size);
821 f.largest_key = Slice(mem + smallest_size, largest_size);
822 }
823 }
824
825 static bool AfterFile(const Comparator* ucmp,
826 const Slice* user_key, const FdWithKeyRange* f) {
827 // nullptr user_key occurs before all keys and is therefore never after *f
828 return (user_key != nullptr &&
829 ucmp->CompareWithoutTimestamp(*user_key,
830 ExtractUserKey(f->largest_key)) > 0);
831 }
832
833 static bool BeforeFile(const Comparator* ucmp,
834 const Slice* user_key, const FdWithKeyRange* f) {
835 // nullptr user_key occurs after all keys and is therefore never before *f
836 return (user_key != nullptr &&
837 ucmp->CompareWithoutTimestamp(*user_key,
838 ExtractUserKey(f->smallest_key)) < 0);
839 }
840
841 bool SomeFileOverlapsRange(
842 const InternalKeyComparator& icmp,
843 bool disjoint_sorted_files,
844 const LevelFilesBrief& file_level,
845 const Slice* smallest_user_key,
846 const Slice* largest_user_key) {
847 const Comparator* ucmp = icmp.user_comparator();
848 if (!disjoint_sorted_files) {
849 // Need to check against all files
850 for (size_t i = 0; i < file_level.num_files; i++) {
851 const FdWithKeyRange* f = &(file_level.files[i]);
852 if (AfterFile(ucmp, smallest_user_key, f) ||
853 BeforeFile(ucmp, largest_user_key, f)) {
854 // No overlap
855 } else {
856 return true; // Overlap
857 }
858 }
859 return false;
860 }
861
862 // Binary search over file list
863 uint32_t index = 0;
864 if (smallest_user_key != nullptr) {
865 // Find the leftmost possible internal key for smallest_user_key
866 InternalKey small;
867 small.SetMinPossibleForUserKey(*smallest_user_key);
868 index = FindFile(icmp, file_level, small.Encode());
869 }
870
871 if (index >= file_level.num_files) {
872 // beginning of range is after all files, so no overlap.
873 return false;
874 }
875
876 return !BeforeFile(ucmp, largest_user_key, &file_level.files[index]);
877 }
878
879 namespace {
880
881 class LevelIterator final : public InternalIterator {
882 public:
883 // @param read_options Must outlive this iterator.
884 LevelIterator(TableCache* table_cache, const ReadOptions& read_options,
885 const FileOptions& file_options,
886 const InternalKeyComparator& icomparator,
887 const LevelFilesBrief* flevel,
888 const SliceTransform* prefix_extractor, bool should_sample,
889 HistogramImpl* file_read_hist, TableReaderCaller caller,
890 bool skip_filters, int level, RangeDelAggregator* range_del_agg,
891 const std::vector<AtomicCompactionUnitBoundary>*
892 compaction_boundaries = nullptr,
893 bool allow_unprepared_value = false)
894 : table_cache_(table_cache),
895 read_options_(read_options),
896 file_options_(file_options),
897 icomparator_(icomparator),
898 user_comparator_(icomparator.user_comparator()),
899 flevel_(flevel),
900 prefix_extractor_(prefix_extractor),
901 file_read_hist_(file_read_hist),
902 should_sample_(should_sample),
903 caller_(caller),
904 skip_filters_(skip_filters),
905 allow_unprepared_value_(allow_unprepared_value),
906 file_index_(flevel_->num_files),
907 level_(level),
908 range_del_agg_(range_del_agg),
909 pinned_iters_mgr_(nullptr),
910 compaction_boundaries_(compaction_boundaries) {
911 // Empty level is not supported.
912 assert(flevel_ != nullptr && flevel_->num_files > 0);
913 }
914
915 ~LevelIterator() override { delete file_iter_.Set(nullptr); }
916
917 void Seek(const Slice& target) override;
918 void SeekForPrev(const Slice& target) override;
919 void SeekToFirst() override;
920 void SeekToLast() override;
921 void Next() final override;
922 bool NextAndGetResult(IterateResult* result) override;
923 void Prev() override;
924
925 bool Valid() const override { return file_iter_.Valid(); }
926 Slice key() const override {
927 assert(Valid());
928 return file_iter_.key();
929 }
930
931 Slice value() const override {
932 assert(Valid());
933 return file_iter_.value();
934 }
935
936 Status status() const override {
937 return file_iter_.iter() ? file_iter_.status() : Status::OK();
938 }
939
940 bool PrepareValue() override {
941 return file_iter_.PrepareValue();
942 }
943
944 inline bool MayBeOutOfLowerBound() override {
945 assert(Valid());
946 return may_be_out_of_lower_bound_ && file_iter_.MayBeOutOfLowerBound();
947 }
948
949 inline IterBoundCheck UpperBoundCheckResult() override {
950 if (Valid()) {
951 return file_iter_.UpperBoundCheckResult();
952 } else {
953 return IterBoundCheck::kUnknown;
954 }
955 }
956
957 void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
958 pinned_iters_mgr_ = pinned_iters_mgr;
959 if (file_iter_.iter()) {
960 file_iter_.SetPinnedItersMgr(pinned_iters_mgr);
961 }
962 }
963
964 bool IsKeyPinned() const override {
965 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
966 file_iter_.iter() && file_iter_.IsKeyPinned();
967 }
968
969 bool IsValuePinned() const override {
970 return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
971 file_iter_.iter() && file_iter_.IsValuePinned();
972 }
973
974 private:
975 // Return true if at least one invalid file is seen and skipped.
976 bool SkipEmptyFileForward();
977 void SkipEmptyFileBackward();
978 void SetFileIterator(InternalIterator* iter);
979 void InitFileIterator(size_t new_file_index);
980
981 const Slice& file_smallest_key(size_t file_index) {
982 assert(file_index < flevel_->num_files);
983 return flevel_->files[file_index].smallest_key;
984 }
985
986 bool KeyReachedUpperBound(const Slice& internal_key) {
987 return read_options_.iterate_upper_bound != nullptr &&
988 user_comparator_.CompareWithoutTimestamp(
989 ExtractUserKey(internal_key), /*a_has_ts=*/true,
990 *read_options_.iterate_upper_bound, /*b_has_ts=*/false) >= 0;
991 }
992
993 InternalIterator* NewFileIterator() {
994 assert(file_index_ < flevel_->num_files);
995 auto file_meta = flevel_->files[file_index_];
996 if (should_sample_) {
997 sample_file_read_inc(file_meta.file_metadata);
998 }
999
1000 const InternalKey* smallest_compaction_key = nullptr;
1001 const InternalKey* largest_compaction_key = nullptr;
1002 if (compaction_boundaries_ != nullptr) {
1003 smallest_compaction_key = (*compaction_boundaries_)[file_index_].smallest;
1004 largest_compaction_key = (*compaction_boundaries_)[file_index_].largest;
1005 }
1006 CheckMayBeOutOfLowerBound();
1007 return table_cache_->NewIterator(
1008 read_options_, file_options_, icomparator_, *file_meta.file_metadata,
1009 range_del_agg_, prefix_extractor_,
1010 nullptr /* don't need reference to table */, file_read_hist_, caller_,
1011 /*arena=*/nullptr, skip_filters_, level_,
1012 /*max_file_size_for_l0_meta_pin=*/0, smallest_compaction_key,
1013 largest_compaction_key, allow_unprepared_value_);
1014 }
1015
1016 // Check if current file being fully within iterate_lower_bound.
1017 //
1018 // Note MyRocks may update iterate bounds between seek. To workaround it,
1019 // we need to check and update may_be_out_of_lower_bound_ accordingly.
1020 void CheckMayBeOutOfLowerBound() {
1021 if (read_options_.iterate_lower_bound != nullptr &&
1022 file_index_ < flevel_->num_files) {
1023 may_be_out_of_lower_bound_ =
1024 user_comparator_.CompareWithoutTimestamp(
1025 ExtractUserKey(file_smallest_key(file_index_)), /*a_has_ts=*/true,
1026 *read_options_.iterate_lower_bound, /*b_has_ts=*/false) < 0;
1027 }
1028 }
1029
1030 TableCache* table_cache_;
1031 const ReadOptions& read_options_;
1032 const FileOptions& file_options_;
1033 const InternalKeyComparator& icomparator_;
1034 const UserComparatorWrapper user_comparator_;
1035 const LevelFilesBrief* flevel_;
1036 mutable FileDescriptor current_value_;
1037 // `prefix_extractor_` may be non-null even for total order seek. Checking
1038 // this variable is not the right way to identify whether prefix iterator
1039 // is used.
1040 const SliceTransform* prefix_extractor_;
1041
1042 HistogramImpl* file_read_hist_;
1043 bool should_sample_;
1044 TableReaderCaller caller_;
1045 bool skip_filters_;
1046 bool allow_unprepared_value_;
1047 bool may_be_out_of_lower_bound_ = true;
1048 size_t file_index_;
1049 int level_;
1050 RangeDelAggregator* range_del_agg_;
1051 IteratorWrapper file_iter_; // May be nullptr
1052 PinnedIteratorsManager* pinned_iters_mgr_;
1053
1054 // To be propagated to RangeDelAggregator in order to safely truncate range
1055 // tombstones.
1056 const std::vector<AtomicCompactionUnitBoundary>* compaction_boundaries_;
1057 };
1058
1059 void LevelIterator::Seek(const Slice& target) {
1060 // Check whether the seek key fall under the same file
1061 bool need_to_reseek = true;
1062 if (file_iter_.iter() != nullptr && file_index_ < flevel_->num_files) {
1063 const FdWithKeyRange& cur_file = flevel_->files[file_index_];
1064 if (icomparator_.InternalKeyComparator::Compare(
1065 target, cur_file.largest_key) <= 0 &&
1066 icomparator_.InternalKeyComparator::Compare(
1067 target, cur_file.smallest_key) >= 0) {
1068 need_to_reseek = false;
1069 assert(static_cast<size_t>(FindFile(icomparator_, *flevel_, target)) ==
1070 file_index_);
1071 }
1072 }
1073 if (need_to_reseek) {
1074 TEST_SYNC_POINT("LevelIterator::Seek:BeforeFindFile");
1075 size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1076 InitFileIterator(new_file_index);
1077 }
1078
1079 if (file_iter_.iter() != nullptr) {
1080 file_iter_.Seek(target);
1081 }
1082 if (SkipEmptyFileForward() && prefix_extractor_ != nullptr &&
1083 !read_options_.total_order_seek && !read_options_.auto_prefix_mode &&
1084 file_iter_.iter() != nullptr && file_iter_.Valid()) {
1085 // We've skipped the file we initially positioned to. In the prefix
1086 // seek case, it is likely that the file is skipped because of
1087 // prefix bloom or hash, where more keys are skipped. We then check
1088 // the current key and invalidate the iterator if the prefix is
1089 // already passed.
1090 // When doing prefix iterator seek, when keys for one prefix have
1091 // been exhausted, it can jump to any key that is larger. Here we are
1092 // enforcing a stricter contract than that, in order to make it easier for
1093 // higher layers (merging and DB iterator) to reason the correctness:
1094 // 1. Within the prefix, the result should be accurate.
1095 // 2. If keys for the prefix is exhausted, it is either positioned to the
1096 // next key after the prefix, or make the iterator invalid.
1097 // A side benefit will be that it invalidates the iterator earlier so that
1098 // the upper level merging iterator can merge fewer child iterators.
1099 size_t ts_sz = user_comparator_.timestamp_size();
1100 Slice target_user_key_without_ts =
1101 ExtractUserKeyAndStripTimestamp(target, ts_sz);
1102 Slice file_user_key_without_ts =
1103 ExtractUserKeyAndStripTimestamp(file_iter_.key(), ts_sz);
1104 if (prefix_extractor_->InDomain(target_user_key_without_ts) &&
1105 (!prefix_extractor_->InDomain(file_user_key_without_ts) ||
1106 user_comparator_.CompareWithoutTimestamp(
1107 prefix_extractor_->Transform(target_user_key_without_ts), false,
1108 prefix_extractor_->Transform(file_user_key_without_ts),
1109 false) != 0)) {
1110 SetFileIterator(nullptr);
1111 }
1112 }
1113 CheckMayBeOutOfLowerBound();
1114 }
1115
1116 void LevelIterator::SeekForPrev(const Slice& target) {
1117 size_t new_file_index = FindFile(icomparator_, *flevel_, target);
1118 if (new_file_index >= flevel_->num_files) {
1119 new_file_index = flevel_->num_files - 1;
1120 }
1121
1122 InitFileIterator(new_file_index);
1123 if (file_iter_.iter() != nullptr) {
1124 file_iter_.SeekForPrev(target);
1125 SkipEmptyFileBackward();
1126 }
1127 CheckMayBeOutOfLowerBound();
1128 }
1129
1130 void LevelIterator::SeekToFirst() {
1131 InitFileIterator(0);
1132 if (file_iter_.iter() != nullptr) {
1133 file_iter_.SeekToFirst();
1134 }
1135 SkipEmptyFileForward();
1136 CheckMayBeOutOfLowerBound();
1137 }
1138
1139 void LevelIterator::SeekToLast() {
1140 InitFileIterator(flevel_->num_files - 1);
1141 if (file_iter_.iter() != nullptr) {
1142 file_iter_.SeekToLast();
1143 }
1144 SkipEmptyFileBackward();
1145 CheckMayBeOutOfLowerBound();
1146 }
1147
1148 void LevelIterator::Next() {
1149 assert(Valid());
1150 file_iter_.Next();
1151 SkipEmptyFileForward();
1152 }
1153
1154 bool LevelIterator::NextAndGetResult(IterateResult* result) {
1155 assert(Valid());
1156 bool is_valid = file_iter_.NextAndGetResult(result);
1157 if (!is_valid) {
1158 SkipEmptyFileForward();
1159 is_valid = Valid();
1160 if (is_valid) {
1161 result->key = key();
1162 result->bound_check_result = file_iter_.UpperBoundCheckResult();
1163 // Ideally, we should return the real file_iter_.value_prepared but the
1164 // information is not here. It would casue an extra PrepareValue()
1165 // for the first key of a file.
1166 result->value_prepared = !allow_unprepared_value_;
1167 }
1168 }
1169 return is_valid;
1170 }
1171
1172 void LevelIterator::Prev() {
1173 assert(Valid());
1174 file_iter_.Prev();
1175 SkipEmptyFileBackward();
1176 }
1177
1178 bool LevelIterator::SkipEmptyFileForward() {
1179 bool seen_empty_file = false;
1180 while (file_iter_.iter() == nullptr ||
1181 (!file_iter_.Valid() && file_iter_.status().ok() &&
1182 file_iter_.iter()->UpperBoundCheckResult() !=
1183 IterBoundCheck::kOutOfBound)) {
1184 seen_empty_file = true;
1185 // Move to next file
1186 if (file_index_ >= flevel_->num_files - 1) {
1187 // Already at the last file
1188 SetFileIterator(nullptr);
1189 break;
1190 }
1191 if (KeyReachedUpperBound(file_smallest_key(file_index_ + 1))) {
1192 SetFileIterator(nullptr);
1193 break;
1194 }
1195 InitFileIterator(file_index_ + 1);
1196 if (file_iter_.iter() != nullptr) {
1197 file_iter_.SeekToFirst();
1198 }
1199 }
1200 return seen_empty_file;
1201 }
1202
1203 void LevelIterator::SkipEmptyFileBackward() {
1204 while (file_iter_.iter() == nullptr ||
1205 (!file_iter_.Valid() && file_iter_.status().ok())) {
1206 // Move to previous file
1207 if (file_index_ == 0) {
1208 // Already the first file
1209 SetFileIterator(nullptr);
1210 return;
1211 }
1212 InitFileIterator(file_index_ - 1);
1213 if (file_iter_.iter() != nullptr) {
1214 file_iter_.SeekToLast();
1215 }
1216 }
1217 }
1218
1219 void LevelIterator::SetFileIterator(InternalIterator* iter) {
1220 if (pinned_iters_mgr_ && iter) {
1221 iter->SetPinnedItersMgr(pinned_iters_mgr_);
1222 }
1223
1224 InternalIterator* old_iter = file_iter_.Set(iter);
1225 if (pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled()) {
1226 pinned_iters_mgr_->PinIterator(old_iter);
1227 } else {
1228 delete old_iter;
1229 }
1230 }
1231
1232 void LevelIterator::InitFileIterator(size_t new_file_index) {
1233 if (new_file_index >= flevel_->num_files) {
1234 file_index_ = new_file_index;
1235 SetFileIterator(nullptr);
1236 return;
1237 } else {
1238 // If the file iterator shows incomplete, we try it again if users seek
1239 // to the same file, as this time we may go to a different data block
1240 // which is cached in block cache.
1241 //
1242 if (file_iter_.iter() != nullptr && !file_iter_.status().IsIncomplete() &&
1243 new_file_index == file_index_) {
1244 // file_iter_ is already constructed with this iterator, so
1245 // no need to change anything
1246 } else {
1247 file_index_ = new_file_index;
1248 InternalIterator* iter = NewFileIterator();
1249 SetFileIterator(iter);
1250 }
1251 }
1252 }
1253 } // anonymous namespace
1254
1255 Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
1256 const FileMetaData* file_meta,
1257 const std::string* fname) const {
1258 auto table_cache = cfd_->table_cache();
1259 auto ioptions = cfd_->ioptions();
1260 Status s = table_cache->GetTableProperties(
1261 file_options_, cfd_->internal_comparator(), file_meta->fd, tp,
1262 mutable_cf_options_.prefix_extractor.get(), true /* no io */);
1263 if (s.ok()) {
1264 return s;
1265 }
1266
1267 // We only ignore error type `Incomplete` since it's by design that we
1268 // disallow table when it's not in table cache.
1269 if (!s.IsIncomplete()) {
1270 return s;
1271 }
1272
1273 // 2. Table is not present in table cache, we'll read the table properties
1274 // directly from the properties block in the file.
1275 std::unique_ptr<FSRandomAccessFile> file;
1276 std::string file_name;
1277 if (fname != nullptr) {
1278 file_name = *fname;
1279 } else {
1280 file_name =
1281 TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
1282 file_meta->fd.GetPathId());
1283 }
1284 s = ioptions->fs->NewRandomAccessFile(file_name, file_options_, &file,
1285 nullptr);
1286 if (!s.ok()) {
1287 return s;
1288 }
1289
1290 TableProperties* raw_table_properties;
1291 // By setting the magic number to kInvalidTableMagicNumber, we can by
1292 // pass the magic number check in the footer.
1293 std::unique_ptr<RandomAccessFileReader> file_reader(
1294 new RandomAccessFileReader(
1295 std::move(file), file_name, nullptr /* env */, io_tracer_,
1296 nullptr /* stats */, 0 /* hist_type */, nullptr /* file_read_hist */,
1297 nullptr /* rate_limiter */, ioptions->listeners));
1298 s = ReadTableProperties(
1299 file_reader.get(), file_meta->fd.GetFileSize(),
1300 Footer::kInvalidTableMagicNumber /* table's magic number */, *ioptions,
1301 &raw_table_properties, false /* compression_type_missing */);
1302 if (!s.ok()) {
1303 return s;
1304 }
1305 RecordTick(ioptions->statistics, NUMBER_DIRECT_LOAD_TABLE_PROPERTIES);
1306
1307 *tp = std::shared_ptr<const TableProperties>(raw_table_properties);
1308 return s;
1309 }
1310
1311 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) {
1312 Status s;
1313 for (int level = 0; level < storage_info_.num_levels_; level++) {
1314 s = GetPropertiesOfAllTables(props, level);
1315 if (!s.ok()) {
1316 return s;
1317 }
1318 }
1319
1320 return Status::OK();
1321 }
1322
1323 Status Version::TablesRangeTombstoneSummary(int max_entries_to_print,
1324 std::string* out_str) {
1325 if (max_entries_to_print <= 0) {
1326 return Status::OK();
1327 }
1328 int num_entries_left = max_entries_to_print;
1329
1330 std::stringstream ss;
1331
1332 for (int level = 0; level < storage_info_.num_levels_; level++) {
1333 for (const auto& file_meta : storage_info_.files_[level]) {
1334 auto fname =
1335 TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1336 file_meta->fd.GetPathId());
1337
1338 ss << "=== file : " << fname << " ===\n";
1339
1340 TableCache* table_cache = cfd_->table_cache();
1341 std::unique_ptr<FragmentedRangeTombstoneIterator> tombstone_iter;
1342
1343 Status s = table_cache->GetRangeTombstoneIterator(
1344 ReadOptions(), cfd_->internal_comparator(), *file_meta,
1345 &tombstone_iter);
1346 if (!s.ok()) {
1347 return s;
1348 }
1349 if (tombstone_iter) {
1350 tombstone_iter->SeekToFirst();
1351
1352 while (tombstone_iter->Valid() && num_entries_left > 0) {
1353 ss << "start: " << tombstone_iter->start_key().ToString(true)
1354 << " end: " << tombstone_iter->end_key().ToString(true)
1355 << " seq: " << tombstone_iter->seq() << '\n';
1356 tombstone_iter->Next();
1357 num_entries_left--;
1358 }
1359 if (num_entries_left <= 0) {
1360 break;
1361 }
1362 }
1363 }
1364 if (num_entries_left <= 0) {
1365 break;
1366 }
1367 }
1368 assert(num_entries_left >= 0);
1369 if (num_entries_left <= 0) {
1370 ss << "(results may not be complete)\n";
1371 }
1372
1373 *out_str = ss.str();
1374 return Status::OK();
1375 }
1376
1377 Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
1378 int level) {
1379 for (const auto& file_meta : storage_info_.files_[level]) {
1380 auto fname =
1381 TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
1382 file_meta->fd.GetPathId());
1383 // 1. If the table is already present in table cache, load table
1384 // properties from there.
1385 std::shared_ptr<const TableProperties> table_properties;
1386 Status s = GetTableProperties(&table_properties, file_meta, &fname);
1387 if (s.ok()) {
1388 props->insert({fname, table_properties});
1389 } else {
1390 return s;
1391 }
1392 }
1393
1394 return Status::OK();
1395 }
1396
1397 Status Version::GetPropertiesOfTablesInRange(
1398 const Range* range, std::size_t n, TablePropertiesCollection* props) const {
1399 for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1400 for (decltype(n) i = 0; i < n; i++) {
1401 // Convert user_key into a corresponding internal key.
1402 InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1403 InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1404 std::vector<FileMetaData*> files;
1405 storage_info_.GetOverlappingInputs(level, &k1, &k2, &files, -1, nullptr,
1406 false);
1407 for (const auto& file_meta : files) {
1408 auto fname =
1409 TableFileName(cfd_->ioptions()->cf_paths,
1410 file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
1411 if (props->count(fname) == 0) {
1412 // 1. If the table is already present in table cache, load table
1413 // properties from there.
1414 std::shared_ptr<const TableProperties> table_properties;
1415 Status s = GetTableProperties(&table_properties, file_meta, &fname);
1416 if (s.ok()) {
1417 props->insert({fname, table_properties});
1418 } else {
1419 return s;
1420 }
1421 }
1422 }
1423 }
1424 }
1425
1426 return Status::OK();
1427 }
1428
1429 Status Version::GetAggregatedTableProperties(
1430 std::shared_ptr<const TableProperties>* tp, int level) {
1431 TablePropertiesCollection props;
1432 Status s;
1433 if (level < 0) {
1434 s = GetPropertiesOfAllTables(&props);
1435 } else {
1436 s = GetPropertiesOfAllTables(&props, level);
1437 }
1438 if (!s.ok()) {
1439 return s;
1440 }
1441
1442 auto* new_tp = new TableProperties();
1443 for (const auto& item : props) {
1444 new_tp->Add(*item.second);
1445 }
1446 tp->reset(new_tp);
1447 return Status::OK();
1448 }
1449
1450 size_t Version::GetMemoryUsageByTableReaders() {
1451 size_t total_usage = 0;
1452 for (auto& file_level : storage_info_.level_files_brief_) {
1453 for (size_t i = 0; i < file_level.num_files; i++) {
1454 total_usage += cfd_->table_cache()->GetMemoryUsageByTableReader(
1455 file_options_, cfd_->internal_comparator(), file_level.files[i].fd,
1456 mutable_cf_options_.prefix_extractor.get());
1457 }
1458 }
1459 return total_usage;
1460 }
1461
1462 void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
1463 assert(cf_meta);
1464 assert(cfd_);
1465
1466 cf_meta->name = cfd_->GetName();
1467 cf_meta->size = 0;
1468 cf_meta->file_count = 0;
1469 cf_meta->levels.clear();
1470
1471 auto* ioptions = cfd_->ioptions();
1472 auto* vstorage = storage_info();
1473
1474 for (int level = 0; level < cfd_->NumberLevels(); level++) {
1475 uint64_t level_size = 0;
1476 cf_meta->file_count += vstorage->LevelFiles(level).size();
1477 std::vector<SstFileMetaData> files;
1478 for (const auto& file : vstorage->LevelFiles(level)) {
1479 uint32_t path_id = file->fd.GetPathId();
1480 std::string file_path;
1481 if (path_id < ioptions->cf_paths.size()) {
1482 file_path = ioptions->cf_paths[path_id].path;
1483 } else {
1484 assert(!ioptions->cf_paths.empty());
1485 file_path = ioptions->cf_paths.back().path;
1486 }
1487 const uint64_t file_number = file->fd.GetNumber();
1488 files.emplace_back(SstFileMetaData{
1489 MakeTableFileName("", file_number), file_number, file_path,
1490 static_cast<size_t>(file->fd.GetFileSize()), file->fd.smallest_seqno,
1491 file->fd.largest_seqno, file->smallest.user_key().ToString(),
1492 file->largest.user_key().ToString(),
1493 file->stats.num_reads_sampled.load(std::memory_order_relaxed),
1494 file->being_compacted, file->oldest_blob_file_number,
1495 file->TryGetOldestAncesterTime(), file->TryGetFileCreationTime(),
1496 file->file_checksum, file->file_checksum_func_name});
1497 files.back().num_entries = file->num_entries;
1498 files.back().num_deletions = file->num_deletions;
1499 level_size += file->fd.GetFileSize();
1500 }
1501 cf_meta->levels.emplace_back(
1502 level, level_size, std::move(files));
1503 cf_meta->size += level_size;
1504 }
1505 }
1506
1507 uint64_t Version::GetSstFilesSize() {
1508 uint64_t sst_files_size = 0;
1509 for (int level = 0; level < storage_info_.num_levels_; level++) {
1510 for (const auto& file_meta : storage_info_.LevelFiles(level)) {
1511 sst_files_size += file_meta->fd.GetFileSize();
1512 }
1513 }
1514 return sst_files_size;
1515 }
1516
1517 void Version::GetCreationTimeOfOldestFile(uint64_t* creation_time) {
1518 uint64_t oldest_time = port::kMaxUint64;
1519 for (int level = 0; level < storage_info_.num_non_empty_levels_; level++) {
1520 for (FileMetaData* meta : storage_info_.LevelFiles(level)) {
1521 assert(meta->fd.table_reader != nullptr);
1522 uint64_t file_creation_time = meta->TryGetFileCreationTime();
1523 if (file_creation_time == kUnknownFileCreationTime) {
1524 *creation_time = 0;
1525 return;
1526 }
1527 if (file_creation_time < oldest_time) {
1528 oldest_time = file_creation_time;
1529 }
1530 }
1531 }
1532 *creation_time = oldest_time;
1533 }
1534
1535 uint64_t VersionStorageInfo::GetEstimatedActiveKeys() const {
1536 // Estimation will be inaccurate when:
1537 // (1) there exist merge keys
1538 // (2) keys are directly overwritten
1539 // (3) deletion on non-existing keys
1540 // (4) low number of samples
1541 if (current_num_samples_ == 0) {
1542 return 0;
1543 }
1544
1545 if (current_num_non_deletions_ <= current_num_deletions_) {
1546 return 0;
1547 }
1548
1549 uint64_t est = current_num_non_deletions_ - current_num_deletions_;
1550
1551 uint64_t file_count = 0;
1552 for (int level = 0; level < num_levels_; ++level) {
1553 file_count += files_[level].size();
1554 }
1555
1556 if (current_num_samples_ < file_count) {
1557 // casting to avoid overflowing
1558 return
1559 static_cast<uint64_t>(
1560 (est * static_cast<double>(file_count) / current_num_samples_)
1561 );
1562 } else {
1563 return est;
1564 }
1565 }
1566
1567 double VersionStorageInfo::GetEstimatedCompressionRatioAtLevel(
1568 int level) const {
1569 assert(level < num_levels_);
1570 uint64_t sum_file_size_bytes = 0;
1571 uint64_t sum_data_size_bytes = 0;
1572 for (auto* file_meta : files_[level]) {
1573 sum_file_size_bytes += file_meta->fd.GetFileSize();
1574 sum_data_size_bytes += file_meta->raw_key_size + file_meta->raw_value_size;
1575 }
1576 if (sum_file_size_bytes == 0) {
1577 return -1.0;
1578 }
1579 return static_cast<double>(sum_data_size_bytes) / sum_file_size_bytes;
1580 }
1581
1582 void Version::AddIterators(const ReadOptions& read_options,
1583 const FileOptions& soptions,
1584 MergeIteratorBuilder* merge_iter_builder,
1585 RangeDelAggregator* range_del_agg,
1586 bool allow_unprepared_value) {
1587 assert(storage_info_.finalized_);
1588
1589 for (int level = 0; level < storage_info_.num_non_empty_levels(); level++) {
1590 AddIteratorsForLevel(read_options, soptions, merge_iter_builder, level,
1591 range_del_agg, allow_unprepared_value);
1592 }
1593 }
1594
1595 void Version::AddIteratorsForLevel(const ReadOptions& read_options,
1596 const FileOptions& soptions,
1597 MergeIteratorBuilder* merge_iter_builder,
1598 int level,
1599 RangeDelAggregator* range_del_agg,
1600 bool allow_unprepared_value) {
1601 assert(storage_info_.finalized_);
1602 if (level >= storage_info_.num_non_empty_levels()) {
1603 // This is an empty level
1604 return;
1605 } else if (storage_info_.LevelFilesBrief(level).num_files == 0) {
1606 // No files in this level
1607 return;
1608 }
1609
1610 bool should_sample = should_sample_file_read();
1611
1612 auto* arena = merge_iter_builder->GetArena();
1613 if (level == 0) {
1614 // Merge all level zero files together since they may overlap
1615 for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1616 const auto& file = storage_info_.LevelFilesBrief(0).files[i];
1617 merge_iter_builder->AddIterator(cfd_->table_cache()->NewIterator(
1618 read_options, soptions, cfd_->internal_comparator(),
1619 *file.file_metadata, range_del_agg,
1620 mutable_cf_options_.prefix_extractor.get(), nullptr,
1621 cfd_->internal_stats()->GetFileReadHist(0),
1622 TableReaderCaller::kUserIterator, arena,
1623 /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
1624 /*smallest_compaction_key=*/nullptr,
1625 /*largest_compaction_key=*/nullptr, allow_unprepared_value));
1626 }
1627 if (should_sample) {
1628 // Count ones for every L0 files. This is done per iterator creation
1629 // rather than Seek(), while files in other levels are recored per seek.
1630 // If users execute one range query per iterator, there may be some
1631 // discrepancy here.
1632 for (FileMetaData* meta : storage_info_.LevelFiles(0)) {
1633 sample_file_read_inc(meta);
1634 }
1635 }
1636 } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1637 // For levels > 0, we can use a concatenating iterator that sequentially
1638 // walks through the non-overlapping files in the level, opening them
1639 // lazily.
1640 auto* mem = arena->AllocateAligned(sizeof(LevelIterator));
1641 merge_iter_builder->AddIterator(new (mem) LevelIterator(
1642 cfd_->table_cache(), read_options, soptions,
1643 cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1644 mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1645 cfd_->internal_stats()->GetFileReadHist(level),
1646 TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1647 range_del_agg,
1648 /*compaction_boundaries=*/nullptr, allow_unprepared_value));
1649 }
1650 }
1651
1652 Status Version::OverlapWithLevelIterator(const ReadOptions& read_options,
1653 const FileOptions& file_options,
1654 const Slice& smallest_user_key,
1655 const Slice& largest_user_key,
1656 int level, bool* overlap) {
1657 assert(storage_info_.finalized_);
1658
1659 auto icmp = cfd_->internal_comparator();
1660 auto ucmp = icmp.user_comparator();
1661
1662 Arena arena;
1663 Status status;
1664 ReadRangeDelAggregator range_del_agg(&icmp,
1665 kMaxSequenceNumber /* upper_bound */);
1666
1667 *overlap = false;
1668
1669 if (level == 0) {
1670 for (size_t i = 0; i < storage_info_.LevelFilesBrief(0).num_files; i++) {
1671 const auto file = &storage_info_.LevelFilesBrief(0).files[i];
1672 if (AfterFile(ucmp, &smallest_user_key, file) ||
1673 BeforeFile(ucmp, &largest_user_key, file)) {
1674 continue;
1675 }
1676 ScopedArenaIterator iter(cfd_->table_cache()->NewIterator(
1677 read_options, file_options, cfd_->internal_comparator(),
1678 *file->file_metadata, &range_del_agg,
1679 mutable_cf_options_.prefix_extractor.get(), nullptr,
1680 cfd_->internal_stats()->GetFileReadHist(0),
1681 TableReaderCaller::kUserIterator, &arena,
1682 /*skip_filters=*/false, /*level=*/0, max_file_size_for_l0_meta_pin_,
1683 /*smallest_compaction_key=*/nullptr,
1684 /*largest_compaction_key=*/nullptr,
1685 /*allow_unprepared_value=*/false));
1686 status = OverlapWithIterator(
1687 ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1688 if (!status.ok() || *overlap) {
1689 break;
1690 }
1691 }
1692 } else if (storage_info_.LevelFilesBrief(level).num_files > 0) {
1693 auto mem = arena.AllocateAligned(sizeof(LevelIterator));
1694 ScopedArenaIterator iter(new (mem) LevelIterator(
1695 cfd_->table_cache(), read_options, file_options,
1696 cfd_->internal_comparator(), &storage_info_.LevelFilesBrief(level),
1697 mutable_cf_options_.prefix_extractor.get(), should_sample_file_read(),
1698 cfd_->internal_stats()->GetFileReadHist(level),
1699 TableReaderCaller::kUserIterator, IsFilterSkipped(level), level,
1700 &range_del_agg));
1701 status = OverlapWithIterator(
1702 ucmp, smallest_user_key, largest_user_key, iter.get(), overlap);
1703 }
1704
1705 if (status.ok() && *overlap == false &&
1706 range_del_agg.IsRangeOverlapped(smallest_user_key, largest_user_key)) {
1707 *overlap = true;
1708 }
1709 return status;
1710 }
1711
1712 VersionStorageInfo::VersionStorageInfo(
1713 const InternalKeyComparator* internal_comparator,
1714 const Comparator* user_comparator, int levels,
1715 CompactionStyle compaction_style, VersionStorageInfo* ref_vstorage,
1716 bool _force_consistency_checks)
1717 : internal_comparator_(internal_comparator),
1718 user_comparator_(user_comparator),
1719 // cfd is nullptr if Version is dummy
1720 num_levels_(levels),
1721 num_non_empty_levels_(0),
1722 file_indexer_(user_comparator),
1723 compaction_style_(compaction_style),
1724 files_(new std::vector<FileMetaData*>[num_levels_]),
1725 base_level_(num_levels_ == 1 ? -1 : 1),
1726 level_multiplier_(0.0),
1727 files_by_compaction_pri_(num_levels_),
1728 level0_non_overlapping_(false),
1729 next_file_to_compact_by_size_(num_levels_),
1730 compaction_score_(num_levels_),
1731 compaction_level_(num_levels_),
1732 l0_delay_trigger_count_(0),
1733 accumulated_file_size_(0),
1734 accumulated_raw_key_size_(0),
1735 accumulated_raw_value_size_(0),
1736 accumulated_num_non_deletions_(0),
1737 accumulated_num_deletions_(0),
1738 current_num_non_deletions_(0),
1739 current_num_deletions_(0),
1740 current_num_samples_(0),
1741 estimated_compaction_needed_bytes_(0),
1742 finalized_(false),
1743 force_consistency_checks_(_force_consistency_checks) {
1744 if (ref_vstorage != nullptr) {
1745 accumulated_file_size_ = ref_vstorage->accumulated_file_size_;
1746 accumulated_raw_key_size_ = ref_vstorage->accumulated_raw_key_size_;
1747 accumulated_raw_value_size_ = ref_vstorage->accumulated_raw_value_size_;
1748 accumulated_num_non_deletions_ =
1749 ref_vstorage->accumulated_num_non_deletions_;
1750 accumulated_num_deletions_ = ref_vstorage->accumulated_num_deletions_;
1751 current_num_non_deletions_ = ref_vstorage->current_num_non_deletions_;
1752 current_num_deletions_ = ref_vstorage->current_num_deletions_;
1753 current_num_samples_ = ref_vstorage->current_num_samples_;
1754 oldest_snapshot_seqnum_ = ref_vstorage->oldest_snapshot_seqnum_;
1755 }
1756 }
1757
1758 Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
1759 const FileOptions& file_opt,
1760 const MutableCFOptions mutable_cf_options,
1761 const std::shared_ptr<IOTracer>& io_tracer,
1762 uint64_t version_number)
1763 : env_(vset->env_),
1764 cfd_(column_family_data),
1765 info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
1766 db_statistics_((cfd_ == nullptr) ? nullptr
1767 : cfd_->ioptions()->statistics),
1768 table_cache_((cfd_ == nullptr) ? nullptr : cfd_->table_cache()),
1769 blob_file_cache_(cfd_ ? cfd_->blob_file_cache() : nullptr),
1770 merge_operator_((cfd_ == nullptr) ? nullptr
1771 : cfd_->ioptions()->merge_operator),
1772 storage_info_(
1773 (cfd_ == nullptr) ? nullptr : &cfd_->internal_comparator(),
1774 (cfd_ == nullptr) ? nullptr : cfd_->user_comparator(),
1775 cfd_ == nullptr ? 0 : cfd_->NumberLevels(),
1776 cfd_ == nullptr ? kCompactionStyleLevel
1777 : cfd_->ioptions()->compaction_style,
1778 (cfd_ == nullptr || cfd_->current() == nullptr)
1779 ? nullptr
1780 : cfd_->current()->storage_info(),
1781 cfd_ == nullptr ? false : cfd_->ioptions()->force_consistency_checks),
1782 vset_(vset),
1783 next_(this),
1784 prev_(this),
1785 refs_(0),
1786 file_options_(file_opt),
1787 mutable_cf_options_(mutable_cf_options),
1788 max_file_size_for_l0_meta_pin_(
1789 MaxFileSizeForL0MetaPin(mutable_cf_options_)),
1790 version_number_(version_number),
1791 io_tracer_(io_tracer) {}
1792
1793 Status Version::GetBlob(const ReadOptions& read_options, const Slice& user_key,
1794 PinnableSlice* value) const {
1795 assert(value);
1796
1797 if (read_options.read_tier == kBlockCacheTier) {
1798 return Status::Incomplete("Cannot read blob: no disk I/O allowed");
1799 }
1800
1801 BlobIndex blob_index;
1802
1803 {
1804 Status s = blob_index.DecodeFrom(*value);
1805 if (!s.ok()) {
1806 return s;
1807 }
1808 }
1809
1810 if (blob_index.HasTTL() || blob_index.IsInlined()) {
1811 return Status::Corruption("Unexpected TTL/inlined blob index");
1812 }
1813
1814 const auto& blob_files = storage_info_.GetBlobFiles();
1815
1816 const uint64_t blob_file_number = blob_index.file_number();
1817
1818 const auto it = blob_files.find(blob_file_number);
1819 if (it == blob_files.end()) {
1820 return Status::Corruption("Invalid blob file number");
1821 }
1822
1823 CacheHandleGuard<BlobFileReader> blob_file_reader;
1824
1825 {
1826 assert(blob_file_cache_);
1827 const Status s = blob_file_cache_->GetBlobFileReader(blob_file_number,
1828 &blob_file_reader);
1829 if (!s.ok()) {
1830 return s;
1831 }
1832 }
1833
1834 assert(blob_file_reader.GetValue());
1835 const Status s = blob_file_reader.GetValue()->GetBlob(
1836 read_options, user_key, blob_index.offset(), blob_index.size(),
1837 blob_index.compression(), value);
1838
1839 return s;
1840 }
1841
1842 void Version::Get(const ReadOptions& read_options, const LookupKey& k,
1843 PinnableSlice* value, std::string* timestamp, Status* status,
1844 MergeContext* merge_context,
1845 SequenceNumber* max_covering_tombstone_seq, bool* value_found,
1846 bool* key_exists, SequenceNumber* seq, ReadCallback* callback,
1847 bool* is_blob, bool do_merge) {
1848 Slice ikey = k.internal_key();
1849 Slice user_key = k.user_key();
1850
1851 assert(status->ok() || status->IsMergeInProgress());
1852
1853 if (key_exists != nullptr) {
1854 // will falsify below if not found
1855 *key_exists = true;
1856 }
1857
1858 PinnedIteratorsManager pinned_iters_mgr;
1859 uint64_t tracing_get_id = BlockCacheTraceHelper::kReservedGetId;
1860 if (vset_ && vset_->block_cache_tracer_ &&
1861 vset_->block_cache_tracer_->is_tracing_enabled()) {
1862 tracing_get_id = vset_->block_cache_tracer_->NextGetId();
1863 }
1864
1865 // Note: the old StackableDB-based BlobDB passes in
1866 // GetImplOptions::is_blob_index; for the integrated BlobDB implementation, we
1867 // need to provide it here.
1868 bool is_blob_index = false;
1869 bool* const is_blob_to_use = is_blob ? is_blob : &is_blob_index;
1870
1871 GetContext get_context(
1872 user_comparator(), merge_operator_, info_log_, db_statistics_,
1873 status->ok() ? GetContext::kNotFound : GetContext::kMerge, user_key,
1874 do_merge ? value : nullptr, do_merge ? timestamp : nullptr, value_found,
1875 merge_context, do_merge, max_covering_tombstone_seq, this->env_, seq,
1876 merge_operator_ ? &pinned_iters_mgr : nullptr, callback, is_blob_to_use,
1877 tracing_get_id);
1878
1879 // Pin blocks that we read to hold merge operands
1880 if (merge_operator_) {
1881 pinned_iters_mgr.StartPinning();
1882 }
1883
1884 FilePicker fp(
1885 storage_info_.files_, user_key, ikey, &storage_info_.level_files_brief_,
1886 storage_info_.num_non_empty_levels_, &storage_info_.file_indexer_,
1887 user_comparator(), internal_comparator());
1888 FdWithKeyRange* f = fp.GetNextFile();
1889
1890 while (f != nullptr) {
1891 if (*max_covering_tombstone_seq > 0) {
1892 // The remaining files we look at will only contain covered keys, so we
1893 // stop here.
1894 break;
1895 }
1896 if (get_context.sample()) {
1897 sample_file_read_inc(f->file_metadata);
1898 }
1899
1900 bool timer_enabled =
1901 GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
1902 get_perf_context()->per_level_perf_context_enabled;
1903 StopWatchNano timer(env_, timer_enabled /* auto_start */);
1904 *status = table_cache_->Get(
1905 read_options, *internal_comparator(), *f->file_metadata, ikey,
1906 &get_context, mutable_cf_options_.prefix_extractor.get(),
1907 cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
1908 IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
1909 fp.IsHitFileLastInLevel()),
1910 fp.GetHitFileLevel(), max_file_size_for_l0_meta_pin_);
1911 // TODO: examine the behavior for corrupted key
1912 if (timer_enabled) {
1913 PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
1914 fp.GetHitFileLevel());
1915 }
1916 if (!status->ok()) {
1917 return;
1918 }
1919
1920 // report the counters before returning
1921 if (get_context.State() != GetContext::kNotFound &&
1922 get_context.State() != GetContext::kMerge &&
1923 db_statistics_ != nullptr) {
1924 get_context.ReportCounters();
1925 }
1926 switch (get_context.State()) {
1927 case GetContext::kNotFound:
1928 // Keep searching in other files
1929 break;
1930 case GetContext::kMerge:
1931 // TODO: update per-level perfcontext user_key_return_count for kMerge
1932 break;
1933 case GetContext::kFound:
1934 if (is_blob_index) {
1935 if (do_merge && value) {
1936 *status = GetBlob(read_options, user_key, value);
1937 if (!status->ok()) {
1938 if (status->IsIncomplete()) {
1939 get_context.MarkKeyMayExist();
1940 }
1941 return;
1942 }
1943 }
1944 }
1945
1946 if (fp.GetHitFileLevel() == 0) {
1947 RecordTick(db_statistics_, GET_HIT_L0);
1948 } else if (fp.GetHitFileLevel() == 1) {
1949 RecordTick(db_statistics_, GET_HIT_L1);
1950 } else if (fp.GetHitFileLevel() >= 2) {
1951 RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
1952 }
1953 PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
1954 fp.GetHitFileLevel());
1955 return;
1956 case GetContext::kDeleted:
1957 // Use empty error message for speed
1958 *status = Status::NotFound();
1959 return;
1960 case GetContext::kCorrupt:
1961 *status = Status::Corruption("corrupted key for ", user_key);
1962 return;
1963 case GetContext::kUnexpectedBlobIndex:
1964 ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
1965 *status = Status::NotSupported(
1966 "Encounter unexpected blob index. Please open DB with "
1967 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
1968 return;
1969 }
1970 f = fp.GetNextFile();
1971 }
1972 if (db_statistics_ != nullptr) {
1973 get_context.ReportCounters();
1974 }
1975 if (GetContext::kMerge == get_context.State()) {
1976 if (!do_merge) {
1977 *status = Status::OK();
1978 return;
1979 }
1980 if (!merge_operator_) {
1981 *status = Status::InvalidArgument(
1982 "merge_operator is not properly initialized.");
1983 return;
1984 }
1985 // merge_operands are in saver and we hit the beginning of the key history
1986 // do a final merge of nullptr and operands;
1987 std::string* str_value = value != nullptr ? value->GetSelf() : nullptr;
1988 *status = MergeHelper::TimedFullMerge(
1989 merge_operator_, user_key, nullptr, merge_context->GetOperands(),
1990 str_value, info_log_, db_statistics_, env_,
1991 nullptr /* result_operand */, true);
1992 if (LIKELY(value != nullptr)) {
1993 value->PinSelf();
1994 }
1995 } else {
1996 if (key_exists != nullptr) {
1997 *key_exists = false;
1998 }
1999 *status = Status::NotFound(); // Use an empty error message for speed
2000 }
2001 }
2002
2003 void Version::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
2004 ReadCallback* callback, bool* is_blob) {
2005 PinnedIteratorsManager pinned_iters_mgr;
2006
2007 // Pin blocks that we read to hold merge operands
2008 if (merge_operator_) {
2009 pinned_iters_mgr.StartPinning();
2010 }
2011 uint64_t tracing_mget_id = BlockCacheTraceHelper::kReservedGetId;
2012
2013 if (vset_ && vset_->block_cache_tracer_ &&
2014 vset_->block_cache_tracer_->is_tracing_enabled()) {
2015 tracing_mget_id = vset_->block_cache_tracer_->NextGetId();
2016 }
2017 // Even though we know the batch size won't be > MAX_BATCH_SIZE,
2018 // use autovector in order to avoid unnecessary construction of GetContext
2019 // objects, which is expensive
2020 autovector<GetContext, 16> get_ctx;
2021 for (auto iter = range->begin(); iter != range->end(); ++iter) {
2022 assert(iter->s->ok() || iter->s->IsMergeInProgress());
2023 get_ctx.emplace_back(
2024 user_comparator(), merge_operator_, info_log_, db_statistics_,
2025 iter->s->ok() ? GetContext::kNotFound : GetContext::kMerge,
2026 iter->ukey_with_ts, iter->value, iter->timestamp, nullptr,
2027 &(iter->merge_context), true, &iter->max_covering_tombstone_seq,
2028 this->env_, nullptr, merge_operator_ ? &pinned_iters_mgr : nullptr,
2029 callback, is_blob, tracing_mget_id);
2030 // MergeInProgress status, if set, has been transferred to the get_context
2031 // state, so we set status to ok here. From now on, the iter status will
2032 // be used for IO errors, and get_context state will be used for any
2033 // key level errors
2034 *(iter->s) = Status::OK();
2035 }
2036 int get_ctx_index = 0;
2037 for (auto iter = range->begin(); iter != range->end();
2038 ++iter, get_ctx_index++) {
2039 iter->get_context = &(get_ctx[get_ctx_index]);
2040 }
2041
2042 MultiGetRange file_picker_range(*range, range->begin(), range->end());
2043 FilePickerMultiGet fp(
2044 &file_picker_range,
2045 &storage_info_.level_files_brief_, storage_info_.num_non_empty_levels_,
2046 &storage_info_.file_indexer_, user_comparator(), internal_comparator());
2047 FdWithKeyRange* f = fp.GetNextFile();
2048 Status s;
2049 uint64_t num_index_read = 0;
2050 uint64_t num_filter_read = 0;
2051 uint64_t num_data_read = 0;
2052 uint64_t num_sst_read = 0;
2053
2054 while (f != nullptr) {
2055 MultiGetRange file_range = fp.CurrentFileRange();
2056 bool timer_enabled =
2057 GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
2058 get_perf_context()->per_level_perf_context_enabled;
2059 StopWatchNano timer(env_, timer_enabled /* auto_start */);
2060 s = table_cache_->MultiGet(
2061 read_options, *internal_comparator(), *f->file_metadata, &file_range,
2062 mutable_cf_options_.prefix_extractor.get(),
2063 cfd_->internal_stats()->GetFileReadHist(fp.GetHitFileLevel()),
2064 IsFilterSkipped(static_cast<int>(fp.GetHitFileLevel()),
2065 fp.IsHitFileLastInLevel()),
2066 fp.GetHitFileLevel());
2067 // TODO: examine the behavior for corrupted key
2068 if (timer_enabled) {
2069 PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
2070 fp.GetHitFileLevel());
2071 }
2072 if (!s.ok()) {
2073 // TODO: Set status for individual keys appropriately
2074 for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
2075 *iter->s = s;
2076 file_range.MarkKeyDone(iter);
2077 }
2078 return;
2079 }
2080 uint64_t batch_size = 0;
2081 for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
2082 ++iter) {
2083 GetContext& get_context = *iter->get_context;
2084 Status* status = iter->s;
2085 // The Status in the KeyContext takes precedence over GetContext state
2086 // Status may be an error if there were any IO errors in the table
2087 // reader. We never expect Status to be NotFound(), as that is
2088 // determined by get_context
2089 assert(!status->IsNotFound());
2090 if (!status->ok()) {
2091 file_range.MarkKeyDone(iter);
2092 continue;
2093 }
2094
2095 if (get_context.sample()) {
2096 sample_file_read_inc(f->file_metadata);
2097 }
2098 batch_size++;
2099 num_index_read += get_context.get_context_stats_.num_index_read;
2100 num_filter_read += get_context.get_context_stats_.num_filter_read;
2101 num_data_read += get_context.get_context_stats_.num_data_read;
2102 num_sst_read += get_context.get_context_stats_.num_sst_read;
2103
2104 // report the counters before returning
2105 if (get_context.State() != GetContext::kNotFound &&
2106 get_context.State() != GetContext::kMerge &&
2107 db_statistics_ != nullptr) {
2108 get_context.ReportCounters();
2109 } else {
2110 if (iter->max_covering_tombstone_seq > 0) {
2111 // The remaining files we look at will only contain covered keys, so
2112 // we stop here for this key
2113 file_picker_range.SkipKey(iter);
2114 }
2115 }
2116 switch (get_context.State()) {
2117 case GetContext::kNotFound:
2118 // Keep searching in other files
2119 break;
2120 case GetContext::kMerge:
2121 // TODO: update per-level perfcontext user_key_return_count for kMerge
2122 break;
2123 case GetContext::kFound:
2124 if (fp.GetHitFileLevel() == 0) {
2125 RecordTick(db_statistics_, GET_HIT_L0);
2126 } else if (fp.GetHitFileLevel() == 1) {
2127 RecordTick(db_statistics_, GET_HIT_L1);
2128 } else if (fp.GetHitFileLevel() >= 2) {
2129 RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
2130 }
2131 PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1,
2132 fp.GetHitFileLevel());
2133 file_range.AddValueSize(iter->value->size());
2134 file_range.MarkKeyDone(iter);
2135 if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
2136 s = Status::Aborted();
2137 break;
2138 }
2139 continue;
2140 case GetContext::kDeleted:
2141 // Use empty error message for speed
2142 *status = Status::NotFound();
2143 file_range.MarkKeyDone(iter);
2144 continue;
2145 case GetContext::kCorrupt:
2146 *status =
2147 Status::Corruption("corrupted key for ", iter->lkey->user_key());
2148 file_range.MarkKeyDone(iter);
2149 continue;
2150 case GetContext::kUnexpectedBlobIndex:
2151 ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
2152 *status = Status::NotSupported(
2153 "Encounter unexpected blob index. Please open DB with "
2154 "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
2155 file_range.MarkKeyDone(iter);
2156 continue;
2157 }
2158 }
2159
2160 // Report MultiGet stats per level.
2161 if (fp.IsHitFileLastInLevel()) {
2162 // Dump the stats if this is the last file of this level and reset for
2163 // next level.
2164 RecordInHistogram(db_statistics_,
2165 NUM_INDEX_AND_FILTER_BLOCKS_READ_PER_LEVEL,
2166 num_index_read + num_filter_read);
2167 RecordInHistogram(db_statistics_, NUM_DATA_BLOCKS_READ_PER_LEVEL,
2168 num_data_read);
2169 RecordInHistogram(db_statistics_, NUM_SST_READ_PER_LEVEL, num_sst_read);
2170 num_filter_read = 0;
2171 num_index_read = 0;
2172 num_data_read = 0;
2173 num_sst_read = 0;
2174 }
2175
2176 RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
2177 if (!s.ok() || file_picker_range.empty()) {
2178 break;
2179 }
2180 f = fp.GetNextFile();
2181 }
2182
2183 // Process any left over keys
2184 for (auto iter = range->begin(); s.ok() && iter != range->end(); ++iter) {
2185 GetContext& get_context = *iter->get_context;
2186 Status* status = iter->s;
2187 Slice user_key = iter->lkey->user_key();
2188
2189 if (db_statistics_ != nullptr) {
2190 get_context.ReportCounters();
2191 }
2192 if (GetContext::kMerge == get_context.State()) {
2193 if (!merge_operator_) {
2194 *status = Status::InvalidArgument(
2195 "merge_operator is not properly initialized.");
2196 range->MarkKeyDone(iter);
2197 continue;
2198 }
2199 // merge_operands are in saver and we hit the beginning of the key history
2200 // do a final merge of nullptr and operands;
2201 std::string* str_value =
2202 iter->value != nullptr ? iter->value->GetSelf() : nullptr;
2203 *status = MergeHelper::TimedFullMerge(
2204 merge_operator_, user_key, nullptr, iter->merge_context.GetOperands(),
2205 str_value, info_log_, db_statistics_, env_,
2206 nullptr /* result_operand */, true);
2207 if (LIKELY(iter->value != nullptr)) {
2208 iter->value->PinSelf();
2209 range->AddValueSize(iter->value->size());
2210 range->MarkKeyDone(iter);
2211 if (range->GetValueSize() > read_options.value_size_soft_limit) {
2212 s = Status::Aborted();
2213 break;
2214 }
2215 }
2216 } else {
2217 range->MarkKeyDone(iter);
2218 *status = Status::NotFound(); // Use an empty error message for speed
2219 }
2220 }
2221
2222 for (auto iter = range->begin(); iter != range->end(); ++iter) {
2223 range->MarkKeyDone(iter);
2224 *(iter->s) = s;
2225 }
2226 }
2227
2228 bool Version::IsFilterSkipped(int level, bool is_file_last_in_level) {
2229 // Reaching the bottom level implies misses at all upper levels, so we'll
2230 // skip checking the filters when we predict a hit.
2231 return cfd_->ioptions()->optimize_filters_for_hits &&
2232 (level > 0 || is_file_last_in_level) &&
2233 level == storage_info_.num_non_empty_levels() - 1;
2234 }
2235
2236 void VersionStorageInfo::GenerateLevelFilesBrief() {
2237 level_files_brief_.resize(num_non_empty_levels_);
2238 for (int level = 0; level < num_non_empty_levels_; level++) {
2239 DoGenerateLevelFilesBrief(
2240 &level_files_brief_[level], files_[level], &arena_);
2241 }
2242 }
2243
2244 void Version::PrepareApply(
2245 const MutableCFOptions& mutable_cf_options,
2246 bool update_stats) {
2247 TEST_SYNC_POINT_CALLBACK(
2248 "Version::PrepareApply:forced_check",
2249 reinterpret_cast<void*>(&storage_info_.force_consistency_checks_));
2250 UpdateAccumulatedStats(update_stats);
2251 storage_info_.UpdateNumNonEmptyLevels();
2252 storage_info_.CalculateBaseBytes(*cfd_->ioptions(), mutable_cf_options);
2253 storage_info_.UpdateFilesByCompactionPri(cfd_->ioptions()->compaction_pri);
2254 storage_info_.GenerateFileIndexer();
2255 storage_info_.GenerateLevelFilesBrief();
2256 storage_info_.GenerateLevel0NonOverlapping();
2257 storage_info_.GenerateBottommostFiles();
2258 }
2259
2260 bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) {
2261 if (file_meta->init_stats_from_file ||
2262 file_meta->compensated_file_size > 0) {
2263 return false;
2264 }
2265 std::shared_ptr<const TableProperties> tp;
2266 Status s = GetTableProperties(&tp, file_meta);
2267 file_meta->init_stats_from_file = true;
2268 if (!s.ok()) {
2269 ROCKS_LOG_ERROR(vset_->db_options_->info_log,
2270 "Unable to load table properties for file %" PRIu64
2271 " --- %s\n",
2272 file_meta->fd.GetNumber(), s.ToString().c_str());
2273 return false;
2274 }
2275 if (tp.get() == nullptr) return false;
2276 file_meta->num_entries = tp->num_entries;
2277 file_meta->num_deletions = tp->num_deletions;
2278 file_meta->raw_value_size = tp->raw_value_size;
2279 file_meta->raw_key_size = tp->raw_key_size;
2280
2281 return true;
2282 }
2283
2284 void VersionStorageInfo::UpdateAccumulatedStats(FileMetaData* file_meta) {
2285 TEST_SYNC_POINT_CALLBACK("VersionStorageInfo::UpdateAccumulatedStats",
2286 nullptr);
2287
2288 assert(file_meta->init_stats_from_file);
2289 accumulated_file_size_ += file_meta->fd.GetFileSize();
2290 accumulated_raw_key_size_ += file_meta->raw_key_size;
2291 accumulated_raw_value_size_ += file_meta->raw_value_size;
2292 accumulated_num_non_deletions_ +=
2293 file_meta->num_entries - file_meta->num_deletions;
2294 accumulated_num_deletions_ += file_meta->num_deletions;
2295
2296 current_num_non_deletions_ +=
2297 file_meta->num_entries - file_meta->num_deletions;
2298 current_num_deletions_ += file_meta->num_deletions;
2299 current_num_samples_++;
2300 }
2301
2302 void VersionStorageInfo::RemoveCurrentStats(FileMetaData* file_meta) {
2303 if (file_meta->init_stats_from_file) {
2304 current_num_non_deletions_ -=
2305 file_meta->num_entries - file_meta->num_deletions;
2306 current_num_deletions_ -= file_meta->num_deletions;
2307 current_num_samples_--;
2308 }
2309 }
2310
2311 void Version::UpdateAccumulatedStats(bool update_stats) {
2312 if (update_stats) {
2313 // maximum number of table properties loaded from files.
2314 const int kMaxInitCount = 20;
2315 int init_count = 0;
2316 // here only the first kMaxInitCount files which haven't been
2317 // initialized from file will be updated with num_deletions.
2318 // The motivation here is to cap the maximum I/O per Version creation.
2319 // The reason for choosing files from lower-level instead of higher-level
2320 // is that such design is able to propagate the initialization from
2321 // lower-level to higher-level: When the num_deletions of lower-level
2322 // files are updated, it will make the lower-level files have accurate
2323 // compensated_file_size, making lower-level to higher-level compaction
2324 // will be triggered, which creates higher-level files whose num_deletions
2325 // will be updated here.
2326 for (int level = 0;
2327 level < storage_info_.num_levels_ && init_count < kMaxInitCount;
2328 ++level) {
2329 for (auto* file_meta : storage_info_.files_[level]) {
2330 if (MaybeInitializeFileMetaData(file_meta)) {
2331 // each FileMeta will be initialized only once.
2332 storage_info_.UpdateAccumulatedStats(file_meta);
2333 // when option "max_open_files" is -1, all the file metadata has
2334 // already been read, so MaybeInitializeFileMetaData() won't incur
2335 // any I/O cost. "max_open_files=-1" means that the table cache passed
2336 // to the VersionSet and then to the ColumnFamilySet has a size of
2337 // TableCache::kInfiniteCapacity
2338 if (vset_->GetColumnFamilySet()->get_table_cache()->GetCapacity() ==
2339 TableCache::kInfiniteCapacity) {
2340 continue;
2341 }
2342 if (++init_count >= kMaxInitCount) {
2343 break;
2344 }
2345 }
2346 }
2347 }
2348 // In case all sampled-files contain only deletion entries, then we
2349 // load the table-property of a file in higher-level to initialize
2350 // that value.
2351 for (int level = storage_info_.num_levels_ - 1;
2352 storage_info_.accumulated_raw_value_size_ == 0 && level >= 0;
2353 --level) {
2354 for (int i = static_cast<int>(storage_info_.files_[level].size()) - 1;
2355 storage_info_.accumulated_raw_value_size_ == 0 && i >= 0; --i) {
2356 if (MaybeInitializeFileMetaData(storage_info_.files_[level][i])) {
2357 storage_info_.UpdateAccumulatedStats(storage_info_.files_[level][i]);
2358 }
2359 }
2360 }
2361 }
2362
2363 storage_info_.ComputeCompensatedSizes();
2364 }
2365
2366 void VersionStorageInfo::ComputeCompensatedSizes() {
2367 static const int kDeletionWeightOnCompaction = 2;
2368 uint64_t average_value_size = GetAverageValueSize();
2369
2370 // compute the compensated size
2371 for (int level = 0; level < num_levels_; level++) {
2372 for (auto* file_meta : files_[level]) {
2373 // Here we only compute compensated_file_size for those file_meta
2374 // which compensated_file_size is uninitialized (== 0). This is true only
2375 // for files that have been created right now and no other thread has
2376 // access to them. That's why we can safely mutate compensated_file_size.
2377 if (file_meta->compensated_file_size == 0) {
2378 file_meta->compensated_file_size = file_meta->fd.GetFileSize();
2379 // Here we only boost the size of deletion entries of a file only
2380 // when the number of deletion entries is greater than the number of
2381 // non-deletion entries in the file. The motivation here is that in
2382 // a stable workload, the number of deletion entries should be roughly
2383 // equal to the number of non-deletion entries. If we compensate the
2384 // size of deletion entries in a stable workload, the deletion
2385 // compensation logic might introduce unwanted effet which changes the
2386 // shape of LSM tree.
2387 if (file_meta->num_deletions * 2 >= file_meta->num_entries) {
2388 file_meta->compensated_file_size +=
2389 (file_meta->num_deletions * 2 - file_meta->num_entries) *
2390 average_value_size * kDeletionWeightOnCompaction;
2391 }
2392 }
2393 }
2394 }
2395 }
2396
2397 int VersionStorageInfo::MaxInputLevel() const {
2398 if (compaction_style_ == kCompactionStyleLevel) {
2399 return num_levels() - 2;
2400 }
2401 return 0;
2402 }
2403
2404 int VersionStorageInfo::MaxOutputLevel(bool allow_ingest_behind) const {
2405 if (allow_ingest_behind) {
2406 assert(num_levels() > 1);
2407 return num_levels() - 2;
2408 }
2409 return num_levels() - 1;
2410 }
2411
2412 void VersionStorageInfo::EstimateCompactionBytesNeeded(
2413 const MutableCFOptions& mutable_cf_options) {
2414 // Only implemented for level-based compaction
2415 if (compaction_style_ != kCompactionStyleLevel) {
2416 estimated_compaction_needed_bytes_ = 0;
2417 return;
2418 }
2419
2420 // Start from Level 0, if level 0 qualifies compaction to level 1,
2421 // we estimate the size of compaction.
2422 // Then we move on to the next level and see whether it qualifies compaction
2423 // to the next level. The size of the level is estimated as the actual size
2424 // on the level plus the input bytes from the previous level if there is any.
2425 // If it exceeds, take the exceeded bytes as compaction input and add the size
2426 // of the compaction size to tatal size.
2427 // We keep doing it to Level 2, 3, etc, until the last level and return the
2428 // accumulated bytes.
2429
2430 uint64_t bytes_compact_to_next_level = 0;
2431 uint64_t level_size = 0;
2432 for (auto* f : files_[0]) {
2433 level_size += f->fd.GetFileSize();
2434 }
2435 // Level 0
2436 bool level0_compact_triggered = false;
2437 if (static_cast<int>(files_[0].size()) >=
2438 mutable_cf_options.level0_file_num_compaction_trigger ||
2439 level_size >= mutable_cf_options.max_bytes_for_level_base) {
2440 level0_compact_triggered = true;
2441 estimated_compaction_needed_bytes_ = level_size;
2442 bytes_compact_to_next_level = level_size;
2443 } else {
2444 estimated_compaction_needed_bytes_ = 0;
2445 }
2446
2447 // Level 1 and up.
2448 uint64_t bytes_next_level = 0;
2449 for (int level = base_level(); level <= MaxInputLevel(); level++) {
2450 level_size = 0;
2451 if (bytes_next_level > 0) {
2452 #ifndef NDEBUG
2453 uint64_t level_size2 = 0;
2454 for (auto* f : files_[level]) {
2455 level_size2 += f->fd.GetFileSize();
2456 }
2457 assert(level_size2 == bytes_next_level);
2458 #endif
2459 level_size = bytes_next_level;
2460 bytes_next_level = 0;
2461 } else {
2462 for (auto* f : files_[level]) {
2463 level_size += f->fd.GetFileSize();
2464 }
2465 }
2466 if (level == base_level() && level0_compact_triggered) {
2467 // Add base level size to compaction if level0 compaction triggered.
2468 estimated_compaction_needed_bytes_ += level_size;
2469 }
2470 // Add size added by previous compaction
2471 level_size += bytes_compact_to_next_level;
2472 bytes_compact_to_next_level = 0;
2473 uint64_t level_target = MaxBytesForLevel(level);
2474 if (level_size > level_target) {
2475 bytes_compact_to_next_level = level_size - level_target;
2476 // Estimate the actual compaction fan-out ratio as size ratio between
2477 // the two levels.
2478
2479 assert(bytes_next_level == 0);
2480 if (level + 1 < num_levels_) {
2481 for (auto* f : files_[level + 1]) {
2482 bytes_next_level += f->fd.GetFileSize();
2483 }
2484 }
2485 if (bytes_next_level > 0) {
2486 assert(level_size > 0);
2487 estimated_compaction_needed_bytes_ += static_cast<uint64_t>(
2488 static_cast<double>(bytes_compact_to_next_level) *
2489 (static_cast<double>(bytes_next_level) /
2490 static_cast<double>(level_size) +
2491 1));
2492 }
2493 }
2494 }
2495 }
2496
2497 namespace {
2498 uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
2499 const MutableCFOptions& mutable_cf_options,
2500 const std::vector<FileMetaData*>& files) {
2501 uint32_t ttl_expired_files_count = 0;
2502
2503 int64_t _current_time;
2504 auto status = ioptions.env->GetCurrentTime(&_current_time);
2505 if (status.ok()) {
2506 const uint64_t current_time = static_cast<uint64_t>(_current_time);
2507 for (FileMetaData* f : files) {
2508 if (!f->being_compacted) {
2509 uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2510 if (oldest_ancester_time != 0 &&
2511 oldest_ancester_time < (current_time - mutable_cf_options.ttl)) {
2512 ttl_expired_files_count++;
2513 }
2514 }
2515 }
2516 }
2517 return ttl_expired_files_count;
2518 }
2519 } // anonymous namespace
2520
2521 void VersionStorageInfo::ComputeCompactionScore(
2522 const ImmutableCFOptions& immutable_cf_options,
2523 const MutableCFOptions& mutable_cf_options) {
2524 for (int level = 0; level <= MaxInputLevel(); level++) {
2525 double score;
2526 if (level == 0) {
2527 // We treat level-0 specially by bounding the number of files
2528 // instead of number of bytes for two reasons:
2529 //
2530 // (1) With larger write-buffer sizes, it is nice not to do too
2531 // many level-0 compactions.
2532 //
2533 // (2) The files in level-0 are merged on every read and
2534 // therefore we wish to avoid too many files when the individual
2535 // file size is small (perhaps because of a small write-buffer
2536 // setting, or very high compression ratios, or lots of
2537 // overwrites/deletions).
2538 int num_sorted_runs = 0;
2539 uint64_t total_size = 0;
2540 for (auto* f : files_[level]) {
2541 if (!f->being_compacted) {
2542 total_size += f->compensated_file_size;
2543 num_sorted_runs++;
2544 }
2545 }
2546 if (compaction_style_ == kCompactionStyleUniversal) {
2547 // For universal compaction, we use level0 score to indicate
2548 // compaction score for the whole DB. Adding other levels as if
2549 // they are L0 files.
2550 for (int i = 1; i < num_levels(); i++) {
2551 // Its possible that a subset of the files in a level may be in a
2552 // compaction, due to delete triggered compaction or trivial move.
2553 // In that case, the below check may not catch a level being
2554 // compacted as it only checks the first file. The worst that can
2555 // happen is a scheduled compaction thread will find nothing to do.
2556 if (!files_[i].empty() && !files_[i][0]->being_compacted) {
2557 num_sorted_runs++;
2558 }
2559 }
2560 }
2561
2562 if (compaction_style_ == kCompactionStyleFIFO) {
2563 score = static_cast<double>(total_size) /
2564 mutable_cf_options.compaction_options_fifo.max_table_files_size;
2565 if (mutable_cf_options.compaction_options_fifo.allow_compaction) {
2566 score = std::max(
2567 static_cast<double>(num_sorted_runs) /
2568 mutable_cf_options.level0_file_num_compaction_trigger,
2569 score);
2570 }
2571 if (mutable_cf_options.ttl > 0) {
2572 score = std::max(
2573 static_cast<double>(GetExpiredTtlFilesCount(
2574 immutable_cf_options, mutable_cf_options, files_[level])),
2575 score);
2576 }
2577
2578 } else {
2579 score = static_cast<double>(num_sorted_runs) /
2580 mutable_cf_options.level0_file_num_compaction_trigger;
2581 if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
2582 // Level-based involves L0->L0 compactions that can lead to oversized
2583 // L0 files. Take into account size as well to avoid later giant
2584 // compactions to the base level.
2585 uint64_t l0_target_size = mutable_cf_options.max_bytes_for_level_base;
2586 if (immutable_cf_options.level_compaction_dynamic_level_bytes &&
2587 level_multiplier_ != 0.0) {
2588 // Prevent L0 to Lbase fanout from growing larger than
2589 // `level_multiplier_`. This prevents us from getting stuck picking
2590 // L0 forever even when it is hurting write-amp. That could happen
2591 // in dynamic level compaction's write-burst mode where the base
2592 // level's target size can grow to be enormous.
2593 l0_target_size =
2594 std::max(l0_target_size,
2595 static_cast<uint64_t>(level_max_bytes_[base_level_] /
2596 level_multiplier_));
2597 }
2598 score =
2599 std::max(score, static_cast<double>(total_size) / l0_target_size);
2600 }
2601 }
2602 } else {
2603 // Compute the ratio of current size to size limit.
2604 uint64_t level_bytes_no_compacting = 0;
2605 for (auto f : files_[level]) {
2606 if (!f->being_compacted) {
2607 level_bytes_no_compacting += f->compensated_file_size;
2608 }
2609 }
2610 score = static_cast<double>(level_bytes_no_compacting) /
2611 MaxBytesForLevel(level);
2612 }
2613 compaction_level_[level] = level;
2614 compaction_score_[level] = score;
2615 }
2616
2617 // sort all the levels based on their score. Higher scores get listed
2618 // first. Use bubble sort because the number of entries are small.
2619 for (int i = 0; i < num_levels() - 2; i++) {
2620 for (int j = i + 1; j < num_levels() - 1; j++) {
2621 if (compaction_score_[i] < compaction_score_[j]) {
2622 double score = compaction_score_[i];
2623 int level = compaction_level_[i];
2624 compaction_score_[i] = compaction_score_[j];
2625 compaction_level_[i] = compaction_level_[j];
2626 compaction_score_[j] = score;
2627 compaction_level_[j] = level;
2628 }
2629 }
2630 }
2631 ComputeFilesMarkedForCompaction();
2632 ComputeBottommostFilesMarkedForCompaction();
2633 if (mutable_cf_options.ttl > 0) {
2634 ComputeExpiredTtlFiles(immutable_cf_options, mutable_cf_options.ttl);
2635 }
2636 if (mutable_cf_options.periodic_compaction_seconds > 0) {
2637 ComputeFilesMarkedForPeriodicCompaction(
2638 immutable_cf_options, mutable_cf_options.periodic_compaction_seconds);
2639 }
2640 EstimateCompactionBytesNeeded(mutable_cf_options);
2641 }
2642
2643 void VersionStorageInfo::ComputeFilesMarkedForCompaction() {
2644 files_marked_for_compaction_.clear();
2645 int last_qualify_level = 0;
2646
2647 // Do not include files from the last level with data
2648 // If table properties collector suggests a file on the last level,
2649 // we should not move it to a new level.
2650 for (int level = num_levels() - 1; level >= 1; level--) {
2651 if (!files_[level].empty()) {
2652 last_qualify_level = level - 1;
2653 break;
2654 }
2655 }
2656
2657 for (int level = 0; level <= last_qualify_level; level++) {
2658 for (auto* f : files_[level]) {
2659 if (!f->being_compacted && f->marked_for_compaction) {
2660 files_marked_for_compaction_.emplace_back(level, f);
2661 }
2662 }
2663 }
2664 }
2665
2666 void VersionStorageInfo::ComputeExpiredTtlFiles(
2667 const ImmutableCFOptions& ioptions, const uint64_t ttl) {
2668 assert(ttl > 0);
2669
2670 expired_ttl_files_.clear();
2671
2672 int64_t _current_time;
2673 auto status = ioptions.env->GetCurrentTime(&_current_time);
2674 if (!status.ok()) {
2675 return;
2676 }
2677 const uint64_t current_time = static_cast<uint64_t>(_current_time);
2678
2679 for (int level = 0; level < num_levels() - 1; level++) {
2680 for (FileMetaData* f : files_[level]) {
2681 if (!f->being_compacted) {
2682 uint64_t oldest_ancester_time = f->TryGetOldestAncesterTime();
2683 if (oldest_ancester_time > 0 &&
2684 oldest_ancester_time < (current_time - ttl)) {
2685 expired_ttl_files_.emplace_back(level, f);
2686 }
2687 }
2688 }
2689 }
2690 }
2691
2692 void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
2693 const ImmutableCFOptions& ioptions,
2694 const uint64_t periodic_compaction_seconds) {
2695 assert(periodic_compaction_seconds > 0);
2696
2697 files_marked_for_periodic_compaction_.clear();
2698
2699 int64_t temp_current_time;
2700 auto status = ioptions.env->GetCurrentTime(&temp_current_time);
2701 if (!status.ok()) {
2702 return;
2703 }
2704 const uint64_t current_time = static_cast<uint64_t>(temp_current_time);
2705
2706 // If periodic_compaction_seconds is larger than current time, periodic
2707 // compaction can't possibly be triggered.
2708 if (periodic_compaction_seconds > current_time) {
2709 return;
2710 }
2711
2712 const uint64_t allowed_time_limit =
2713 current_time - periodic_compaction_seconds;
2714
2715 for (int level = 0; level < num_levels(); level++) {
2716 for (auto f : files_[level]) {
2717 if (!f->being_compacted) {
2718 // Compute a file's modification time in the following order:
2719 // 1. Use file_creation_time table property if it is > 0.
2720 // 2. Use creation_time table property if it is > 0.
2721 // 3. Use file's mtime metadata if the above two table properties are 0.
2722 // Don't consider the file at all if the modification time cannot be
2723 // correctly determined based on the above conditions.
2724 uint64_t file_modification_time = f->TryGetFileCreationTime();
2725 if (file_modification_time == kUnknownFileCreationTime) {
2726 file_modification_time = f->TryGetOldestAncesterTime();
2727 }
2728 if (file_modification_time == kUnknownOldestAncesterTime) {
2729 auto file_path = TableFileName(ioptions.cf_paths, f->fd.GetNumber(),
2730 f->fd.GetPathId());
2731 status = ioptions.env->GetFileModificationTime(
2732 file_path, &file_modification_time);
2733 if (!status.ok()) {
2734 ROCKS_LOG_WARN(ioptions.info_log,
2735 "Can't get file modification time: %s: %s",
2736 file_path.c_str(), status.ToString().c_str());
2737 continue;
2738 }
2739 }
2740 if (file_modification_time > 0 &&
2741 file_modification_time < allowed_time_limit) {
2742 files_marked_for_periodic_compaction_.emplace_back(level, f);
2743 }
2744 }
2745 }
2746 }
2747 }
2748
2749 namespace {
2750
2751 // used to sort files by size
2752 struct Fsize {
2753 size_t index;
2754 FileMetaData* file;
2755 };
2756
2757 // Compator that is used to sort files based on their size
2758 // In normal mode: descending size
2759 bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) {
2760 return (first.file->compensated_file_size >
2761 second.file->compensated_file_size);
2762 }
2763 } // anonymous namespace
2764
2765 void VersionStorageInfo::AddFile(int level, FileMetaData* f) {
2766 auto& level_files = files_[level];
2767 level_files.push_back(f);
2768
2769 f->refs++;
2770
2771 const uint64_t file_number = f->fd.GetNumber();
2772
2773 assert(file_locations_.find(file_number) == file_locations_.end());
2774 file_locations_.emplace(file_number,
2775 FileLocation(level, level_files.size() - 1));
2776 }
2777
2778 void VersionStorageInfo::AddBlobFile(
2779 std::shared_ptr<BlobFileMetaData> blob_file_meta) {
2780 assert(blob_file_meta);
2781
2782 const uint64_t blob_file_number = blob_file_meta->GetBlobFileNumber();
2783
2784 auto it = blob_files_.lower_bound(blob_file_number);
2785 assert(it == blob_files_.end() || it->first != blob_file_number);
2786
2787 blob_files_.insert(
2788 it, BlobFiles::value_type(blob_file_number, std::move(blob_file_meta)));
2789 }
2790
2791 // Version::PrepareApply() need to be called before calling the function, or
2792 // following functions called:
2793 // 1. UpdateNumNonEmptyLevels();
2794 // 2. CalculateBaseBytes();
2795 // 3. UpdateFilesByCompactionPri();
2796 // 4. GenerateFileIndexer();
2797 // 5. GenerateLevelFilesBrief();
2798 // 6. GenerateLevel0NonOverlapping();
2799 // 7. GenerateBottommostFiles();
2800 void VersionStorageInfo::SetFinalized() {
2801 finalized_ = true;
2802 #ifndef NDEBUG
2803 if (compaction_style_ != kCompactionStyleLevel) {
2804 // Not level based compaction.
2805 return;
2806 }
2807 assert(base_level_ < 0 || num_levels() == 1 ||
2808 (base_level_ >= 1 && base_level_ < num_levels()));
2809 // Verify all levels newer than base_level are empty except L0
2810 for (int level = 1; level < base_level(); level++) {
2811 assert(NumLevelBytes(level) == 0);
2812 }
2813 uint64_t max_bytes_prev_level = 0;
2814 for (int level = base_level(); level < num_levels() - 1; level++) {
2815 if (LevelFiles(level).size() == 0) {
2816 continue;
2817 }
2818 assert(MaxBytesForLevel(level) >= max_bytes_prev_level);
2819 max_bytes_prev_level = MaxBytesForLevel(level);
2820 }
2821 int num_empty_non_l0_level = 0;
2822 for (int level = 0; level < num_levels(); level++) {
2823 assert(LevelFiles(level).size() == 0 ||
2824 LevelFiles(level).size() == LevelFilesBrief(level).num_files);
2825 if (level > 0 && NumLevelBytes(level) > 0) {
2826 num_empty_non_l0_level++;
2827 }
2828 if (LevelFiles(level).size() > 0) {
2829 assert(level < num_non_empty_levels());
2830 }
2831 }
2832 assert(compaction_level_.size() > 0);
2833 assert(compaction_level_.size() == compaction_score_.size());
2834 #endif
2835 }
2836
2837 void VersionStorageInfo::UpdateNumNonEmptyLevels() {
2838 num_non_empty_levels_ = num_levels_;
2839 for (int i = num_levels_ - 1; i >= 0; i--) {
2840 if (files_[i].size() != 0) {
2841 return;
2842 } else {
2843 num_non_empty_levels_ = i;
2844 }
2845 }
2846 }
2847
2848 namespace {
2849 // Sort `temp` based on ratio of overlapping size over file size
2850 void SortFileByOverlappingRatio(
2851 const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files,
2852 const std::vector<FileMetaData*>& next_level_files,
2853 std::vector<Fsize>* temp) {
2854 std::unordered_map<uint64_t, uint64_t> file_to_order;
2855 auto next_level_it = next_level_files.begin();
2856
2857 for (auto& file : files) {
2858 uint64_t overlapping_bytes = 0;
2859 // Skip files in next level that is smaller than current file
2860 while (next_level_it != next_level_files.end() &&
2861 icmp.Compare((*next_level_it)->largest, file->smallest) < 0) {
2862 next_level_it++;
2863 }
2864
2865 while (next_level_it != next_level_files.end() &&
2866 icmp.Compare((*next_level_it)->smallest, file->largest) < 0) {
2867 overlapping_bytes += (*next_level_it)->fd.file_size;
2868
2869 if (icmp.Compare((*next_level_it)->largest, file->largest) > 0) {
2870 // next level file cross large boundary of current file.
2871 break;
2872 }
2873 next_level_it++;
2874 }
2875
2876 assert(file->compensated_file_size != 0);
2877 file_to_order[file->fd.GetNumber()] =
2878 overlapping_bytes * 1024u / file->compensated_file_size;
2879 }
2880
2881 std::sort(temp->begin(), temp->end(),
2882 [&](const Fsize& f1, const Fsize& f2) -> bool {
2883 return file_to_order[f1.file->fd.GetNumber()] <
2884 file_to_order[f2.file->fd.GetNumber()];
2885 });
2886 }
2887 } // namespace
2888
2889 void VersionStorageInfo::UpdateFilesByCompactionPri(
2890 CompactionPri compaction_pri) {
2891 if (compaction_style_ == kCompactionStyleNone ||
2892 compaction_style_ == kCompactionStyleFIFO ||
2893 compaction_style_ == kCompactionStyleUniversal) {
2894 // don't need this
2895 return;
2896 }
2897 // No need to sort the highest level because it is never compacted.
2898 for (int level = 0; level < num_levels() - 1; level++) {
2899 const std::vector<FileMetaData*>& files = files_[level];
2900 auto& files_by_compaction_pri = files_by_compaction_pri_[level];
2901 assert(files_by_compaction_pri.size() == 0);
2902
2903 // populate a temp vector for sorting based on size
2904 std::vector<Fsize> temp(files.size());
2905 for (size_t i = 0; i < files.size(); i++) {
2906 temp[i].index = i;
2907 temp[i].file = files[i];
2908 }
2909
2910 // sort the top number_of_files_to_sort_ based on file size
2911 size_t num = VersionStorageInfo::kNumberFilesToSort;
2912 if (num > temp.size()) {
2913 num = temp.size();
2914 }
2915 switch (compaction_pri) {
2916 case kByCompensatedSize:
2917 std::partial_sort(temp.begin(), temp.begin() + num, temp.end(),
2918 CompareCompensatedSizeDescending);
2919 break;
2920 case kOldestLargestSeqFirst:
2921 std::sort(temp.begin(), temp.end(),
2922 [](const Fsize& f1, const Fsize& f2) -> bool {
2923 return f1.file->fd.largest_seqno <
2924 f2.file->fd.largest_seqno;
2925 });
2926 break;
2927 case kOldestSmallestSeqFirst:
2928 std::sort(temp.begin(), temp.end(),
2929 [](const Fsize& f1, const Fsize& f2) -> bool {
2930 return f1.file->fd.smallest_seqno <
2931 f2.file->fd.smallest_seqno;
2932 });
2933 break;
2934 case kMinOverlappingRatio:
2935 SortFileByOverlappingRatio(*internal_comparator_, files_[level],
2936 files_[level + 1], &temp);
2937 break;
2938 default:
2939 assert(false);
2940 }
2941 assert(temp.size() == files.size());
2942
2943 // initialize files_by_compaction_pri_
2944 for (size_t i = 0; i < temp.size(); i++) {
2945 files_by_compaction_pri.push_back(static_cast<int>(temp[i].index));
2946 }
2947 next_file_to_compact_by_size_[level] = 0;
2948 assert(files_[level].size() == files_by_compaction_pri_[level].size());
2949 }
2950 }
2951
2952 void VersionStorageInfo::GenerateLevel0NonOverlapping() {
2953 assert(!finalized_);
2954 level0_non_overlapping_ = true;
2955 if (level_files_brief_.size() == 0) {
2956 return;
2957 }
2958
2959 // A copy of L0 files sorted by smallest key
2960 std::vector<FdWithKeyRange> level0_sorted_file(
2961 level_files_brief_[0].files,
2962 level_files_brief_[0].files + level_files_brief_[0].num_files);
2963 std::sort(level0_sorted_file.begin(), level0_sorted_file.end(),
2964 [this](const FdWithKeyRange& f1, const FdWithKeyRange& f2) -> bool {
2965 return (internal_comparator_->Compare(f1.smallest_key,
2966 f2.smallest_key) < 0);
2967 });
2968
2969 for (size_t i = 1; i < level0_sorted_file.size(); ++i) {
2970 FdWithKeyRange& f = level0_sorted_file[i];
2971 FdWithKeyRange& prev = level0_sorted_file[i - 1];
2972 if (internal_comparator_->Compare(prev.largest_key, f.smallest_key) >= 0) {
2973 level0_non_overlapping_ = false;
2974 break;
2975 }
2976 }
2977 }
2978
2979 void VersionStorageInfo::GenerateBottommostFiles() {
2980 assert(!finalized_);
2981 assert(bottommost_files_.empty());
2982 for (size_t level = 0; level < level_files_brief_.size(); ++level) {
2983 for (size_t file_idx = 0; file_idx < level_files_brief_[level].num_files;
2984 ++file_idx) {
2985 const FdWithKeyRange& f = level_files_brief_[level].files[file_idx];
2986 int l0_file_idx;
2987 if (level == 0) {
2988 l0_file_idx = static_cast<int>(file_idx);
2989 } else {
2990 l0_file_idx = -1;
2991 }
2992 Slice smallest_user_key = ExtractUserKey(f.smallest_key);
2993 Slice largest_user_key = ExtractUserKey(f.largest_key);
2994 if (!RangeMightExistAfterSortedRun(smallest_user_key, largest_user_key,
2995 static_cast<int>(level),
2996 l0_file_idx)) {
2997 bottommost_files_.emplace_back(static_cast<int>(level),
2998 f.file_metadata);
2999 }
3000 }
3001 }
3002 }
3003
3004 void VersionStorageInfo::UpdateOldestSnapshot(SequenceNumber seqnum) {
3005 assert(seqnum >= oldest_snapshot_seqnum_);
3006 oldest_snapshot_seqnum_ = seqnum;
3007 if (oldest_snapshot_seqnum_ > bottommost_files_mark_threshold_) {
3008 ComputeBottommostFilesMarkedForCompaction();
3009 }
3010 }
3011
3012 void VersionStorageInfo::ComputeBottommostFilesMarkedForCompaction() {
3013 bottommost_files_marked_for_compaction_.clear();
3014 bottommost_files_mark_threshold_ = kMaxSequenceNumber;
3015 for (auto& level_and_file : bottommost_files_) {
3016 if (!level_and_file.second->being_compacted &&
3017 level_and_file.second->fd.largest_seqno != 0 &&
3018 level_and_file.second->num_deletions > 1) {
3019 // largest_seqno might be nonzero due to containing the final key in an
3020 // earlier compaction, whose seqnum we didn't zero out. Multiple deletions
3021 // ensures the file really contains deleted or overwritten keys.
3022 if (level_and_file.second->fd.largest_seqno < oldest_snapshot_seqnum_) {
3023 bottommost_files_marked_for_compaction_.push_back(level_and_file);
3024 } else {
3025 bottommost_files_mark_threshold_ =
3026 std::min(bottommost_files_mark_threshold_,
3027 level_and_file.second->fd.largest_seqno);
3028 }
3029 }
3030 }
3031 }
3032
3033 void Version::Ref() {
3034 ++refs_;
3035 }
3036
3037 bool Version::Unref() {
3038 assert(refs_ >= 1);
3039 --refs_;
3040 if (refs_ == 0) {
3041 delete this;
3042 return true;
3043 }
3044 return false;
3045 }
3046
3047 bool VersionStorageInfo::OverlapInLevel(int level,
3048 const Slice* smallest_user_key,
3049 const Slice* largest_user_key) {
3050 if (level >= num_non_empty_levels_) {
3051 // empty level, no overlap
3052 return false;
3053 }
3054 return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
3055 level_files_brief_[level], smallest_user_key,
3056 largest_user_key);
3057 }
3058
3059 // Store in "*inputs" all files in "level" that overlap [begin,end]
3060 // If hint_index is specified, then it points to a file in the
3061 // overlapping range.
3062 // The file_index returns a pointer to any file in an overlapping range.
3063 void VersionStorageInfo::GetOverlappingInputs(
3064 int level, const InternalKey* begin, const InternalKey* end,
3065 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
3066 bool expand_range, InternalKey** next_smallest) const {
3067 if (level >= num_non_empty_levels_) {
3068 // this level is empty, no overlapping inputs
3069 return;
3070 }
3071
3072 inputs->clear();
3073 if (file_index) {
3074 *file_index = -1;
3075 }
3076 const Comparator* user_cmp = user_comparator_;
3077 if (level > 0) {
3078 GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs, hint_index,
3079 file_index, false, next_smallest);
3080 return;
3081 }
3082
3083 if (next_smallest) {
3084 // next_smallest key only makes sense for non-level 0, where files are
3085 // non-overlapping
3086 *next_smallest = nullptr;
3087 }
3088
3089 Slice user_begin, user_end;
3090 if (begin != nullptr) {
3091 user_begin = begin->user_key();
3092 }
3093 if (end != nullptr) {
3094 user_end = end->user_key();
3095 }
3096
3097 // index stores the file index need to check.
3098 std::list<size_t> index;
3099 for (size_t i = 0; i < level_files_brief_[level].num_files; i++) {
3100 index.emplace_back(i);
3101 }
3102
3103 while (!index.empty()) {
3104 bool found_overlapping_file = false;
3105 auto iter = index.begin();
3106 while (iter != index.end()) {
3107 FdWithKeyRange* f = &(level_files_brief_[level].files[*iter]);
3108 const Slice file_start = ExtractUserKey(f->smallest_key);
3109 const Slice file_limit = ExtractUserKey(f->largest_key);
3110 if (begin != nullptr &&
3111 user_cmp->CompareWithoutTimestamp(file_limit, user_begin) < 0) {
3112 // "f" is completely before specified range; skip it
3113 iter++;
3114 } else if (end != nullptr &&
3115 user_cmp->CompareWithoutTimestamp(file_start, user_end) > 0) {
3116 // "f" is completely after specified range; skip it
3117 iter++;
3118 } else {
3119 // if overlap
3120 inputs->emplace_back(files_[level][*iter]);
3121 found_overlapping_file = true;
3122 // record the first file index.
3123 if (file_index && *file_index == -1) {
3124 *file_index = static_cast<int>(*iter);
3125 }
3126 // the related file is overlap, erase to avoid checking again.
3127 iter = index.erase(iter);
3128 if (expand_range) {
3129 if (begin != nullptr &&
3130 user_cmp->CompareWithoutTimestamp(file_start, user_begin) < 0) {
3131 user_begin = file_start;
3132 }
3133 if (end != nullptr &&
3134 user_cmp->CompareWithoutTimestamp(file_limit, user_end) > 0) {
3135 user_end = file_limit;
3136 }
3137 }
3138 }
3139 }
3140 // if all the files left are not overlap, break
3141 if (!found_overlapping_file) {
3142 break;
3143 }
3144 }
3145 }
3146
3147 // Store in "*inputs" files in "level" that within range [begin,end]
3148 // Guarantee a "clean cut" boundary between the files in inputs
3149 // and the surrounding files and the maxinum number of files.
3150 // This will ensure that no parts of a key are lost during compaction.
3151 // If hint_index is specified, then it points to a file in the range.
3152 // The file_index returns a pointer to any file in an overlapping range.
3153 void VersionStorageInfo::GetCleanInputsWithinInterval(
3154 int level, const InternalKey* begin, const InternalKey* end,
3155 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) const {
3156 inputs->clear();
3157 if (file_index) {
3158 *file_index = -1;
3159 }
3160 if (level >= num_non_empty_levels_ || level == 0 ||
3161 level_files_brief_[level].num_files == 0) {
3162 // this level is empty, no inputs within range
3163 // also don't support clean input interval within L0
3164 return;
3165 }
3166
3167 GetOverlappingInputsRangeBinarySearch(level, begin, end, inputs,
3168 hint_index, file_index,
3169 true /* within_interval */);
3170 }
3171
3172 // Store in "*inputs" all files in "level" that overlap [begin,end]
3173 // Employ binary search to find at least one file that overlaps the
3174 // specified range. From that file, iterate backwards and
3175 // forwards to find all overlapping files.
3176 // if within_range is set, then only store the maximum clean inputs
3177 // within range [begin, end]. "clean" means there is a boudnary
3178 // between the files in "*inputs" and the surrounding files
3179 void VersionStorageInfo::GetOverlappingInputsRangeBinarySearch(
3180 int level, const InternalKey* begin, const InternalKey* end,
3181 std::vector<FileMetaData*>* inputs, int hint_index, int* file_index,
3182 bool within_interval, InternalKey** next_smallest) const {
3183 assert(level > 0);
3184
3185 auto user_cmp = user_comparator_;
3186 const FdWithKeyRange* files = level_files_brief_[level].files;
3187 const int num_files = static_cast<int>(level_files_brief_[level].num_files);
3188
3189 // begin to use binary search to find lower bound
3190 // and upper bound.
3191 int start_index = 0;
3192 int end_index = num_files;
3193
3194 if (begin != nullptr) {
3195 // if within_interval is true, with file_key would find
3196 // not overlapping ranges in std::lower_bound.
3197 auto cmp = [&user_cmp, &within_interval](const FdWithKeyRange& f,
3198 const InternalKey* k) {
3199 auto& file_key = within_interval ? f.file_metadata->smallest
3200 : f.file_metadata->largest;
3201 return sstableKeyCompare(user_cmp, file_key, *k) < 0;
3202 };
3203
3204 start_index = static_cast<int>(
3205 std::lower_bound(files,
3206 files + (hint_index == -1 ? num_files : hint_index),
3207 begin, cmp) -
3208 files);
3209
3210 if (start_index > 0 && within_interval) {
3211 bool is_overlapping = true;
3212 while (is_overlapping && start_index < num_files) {
3213 auto& pre_limit = files[start_index - 1].file_metadata->largest;
3214 auto& cur_start = files[start_index].file_metadata->smallest;
3215 is_overlapping = sstableKeyCompare(user_cmp, pre_limit, cur_start) == 0;
3216 start_index += is_overlapping;
3217 }
3218 }
3219 }
3220
3221 if (end != nullptr) {
3222 // if within_interval is true, with file_key would find
3223 // not overlapping ranges in std::upper_bound.
3224 auto cmp = [&user_cmp, &within_interval](const InternalKey* k,
3225 const FdWithKeyRange& f) {
3226 auto& file_key = within_interval ? f.file_metadata->largest
3227 : f.file_metadata->smallest;
3228 return sstableKeyCompare(user_cmp, *k, file_key) < 0;
3229 };
3230
3231 end_index = static_cast<int>(
3232 std::upper_bound(files + start_index, files + num_files, end, cmp) -
3233 files);
3234
3235 if (end_index < num_files && within_interval) {
3236 bool is_overlapping = true;
3237 while (is_overlapping && end_index > start_index) {
3238 auto& next_start = files[end_index].file_metadata->smallest;
3239 auto& cur_limit = files[end_index - 1].file_metadata->largest;
3240 is_overlapping =
3241 sstableKeyCompare(user_cmp, cur_limit, next_start) == 0;
3242 end_index -= is_overlapping;
3243 }
3244 }
3245 }
3246
3247 assert(start_index <= end_index);
3248
3249 // If there were no overlapping files, return immediately.
3250 if (start_index == end_index) {
3251 if (next_smallest) {
3252 *next_smallest = nullptr;
3253 }
3254 return;
3255 }
3256
3257 assert(start_index < end_index);
3258
3259 // returns the index where an overlap is found
3260 if (file_index) {
3261 *file_index = start_index;
3262 }
3263
3264 // insert overlapping files into vector
3265 for (int i = start_index; i < end_index; i++) {
3266 inputs->push_back(files_[level][i]);
3267 }
3268
3269 if (next_smallest != nullptr) {
3270 // Provide the next key outside the range covered by inputs
3271 if (end_index < static_cast<int>(files_[level].size())) {
3272 **next_smallest = files_[level][end_index]->smallest;
3273 } else {
3274 *next_smallest = nullptr;
3275 }
3276 }
3277 }
3278
3279 uint64_t VersionStorageInfo::NumLevelBytes(int level) const {
3280 assert(level >= 0);
3281 assert(level < num_levels());
3282 return TotalFileSize(files_[level]);
3283 }
3284
3285 const char* VersionStorageInfo::LevelSummary(
3286 LevelSummaryStorage* scratch) const {
3287 int len = 0;
3288 if (compaction_style_ == kCompactionStyleLevel && num_levels() > 1) {
3289 assert(base_level_ < static_cast<int>(level_max_bytes_.size()));
3290 if (level_multiplier_ != 0.0) {
3291 len = snprintf(
3292 scratch->buffer, sizeof(scratch->buffer),
3293 "base level %d level multiplier %.2f max bytes base %" PRIu64 " ",
3294 base_level_, level_multiplier_, level_max_bytes_[base_level_]);
3295 }
3296 }
3297 len +=
3298 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "files[");
3299 for (int i = 0; i < num_levels(); i++) {
3300 int sz = sizeof(scratch->buffer) - len;
3301 int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
3302 if (ret < 0 || ret >= sz) break;
3303 len += ret;
3304 }
3305 if (len > 0) {
3306 // overwrite the last space
3307 --len;
3308 }
3309 len += snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3310 "] max score %.2f", compaction_score_[0]);
3311
3312 if (!files_marked_for_compaction_.empty()) {
3313 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len,
3314 " (%" ROCKSDB_PRIszt " files need compaction)",
3315 files_marked_for_compaction_.size());
3316 }
3317
3318 return scratch->buffer;
3319 }
3320
3321 const char* VersionStorageInfo::LevelFileSummary(FileSummaryStorage* scratch,
3322 int level) const {
3323 int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
3324 for (const auto& f : files_[level]) {
3325 int sz = sizeof(scratch->buffer) - len;
3326 char sztxt[16];
3327 AppendHumanBytes(f->fd.GetFileSize(), sztxt, sizeof(sztxt));
3328 int ret = snprintf(scratch->buffer + len, sz,
3329 "#%" PRIu64 "(seq=%" PRIu64 ",sz=%s,%d) ",
3330 f->fd.GetNumber(), f->fd.smallest_seqno, sztxt,
3331 static_cast<int>(f->being_compacted));
3332 if (ret < 0 || ret >= sz)
3333 break;
3334 len += ret;
3335 }
3336 // overwrite the last space (only if files_[level].size() is non-zero)
3337 if (files_[level].size() && len > 0) {
3338 --len;
3339 }
3340 snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
3341 return scratch->buffer;
3342 }
3343
3344 int64_t VersionStorageInfo::MaxNextLevelOverlappingBytes() {
3345 uint64_t result = 0;
3346 std::vector<FileMetaData*> overlaps;
3347 for (int level = 1; level < num_levels() - 1; level++) {
3348 for (const auto& f : files_[level]) {
3349 GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
3350 const uint64_t sum = TotalFileSize(overlaps);
3351 if (sum > result) {
3352 result = sum;
3353 }
3354 }
3355 }
3356 return result;
3357 }
3358
3359 uint64_t VersionStorageInfo::MaxBytesForLevel(int level) const {
3360 // Note: the result for level zero is not really used since we set
3361 // the level-0 compaction threshold based on number of files.
3362 assert(level >= 0);
3363 assert(level < static_cast<int>(level_max_bytes_.size()));
3364 return level_max_bytes_[level];
3365 }
3366
3367 void VersionStorageInfo::CalculateBaseBytes(const ImmutableCFOptions& ioptions,
3368 const MutableCFOptions& options) {
3369 // Special logic to set number of sorted runs.
3370 // It is to match the previous behavior when all files are in L0.
3371 int num_l0_count = static_cast<int>(files_[0].size());
3372 if (compaction_style_ == kCompactionStyleUniversal) {
3373 // For universal compaction, we use level0 score to indicate
3374 // compaction score for the whole DB. Adding other levels as if
3375 // they are L0 files.
3376 for (int i = 1; i < num_levels(); i++) {
3377 if (!files_[i].empty()) {
3378 num_l0_count++;
3379 }
3380 }
3381 }
3382 set_l0_delay_trigger_count(num_l0_count);
3383
3384 level_max_bytes_.resize(ioptions.num_levels);
3385 if (!ioptions.level_compaction_dynamic_level_bytes) {
3386 base_level_ = (ioptions.compaction_style == kCompactionStyleLevel) ? 1 : -1;
3387
3388 // Calculate for static bytes base case
3389 for (int i = 0; i < ioptions.num_levels; ++i) {
3390 if (i == 0 && ioptions.compaction_style == kCompactionStyleUniversal) {
3391 level_max_bytes_[i] = options.max_bytes_for_level_base;
3392 } else if (i > 1) {
3393 level_max_bytes_[i] = MultiplyCheckOverflow(
3394 MultiplyCheckOverflow(level_max_bytes_[i - 1],
3395 options.max_bytes_for_level_multiplier),
3396 options.MaxBytesMultiplerAdditional(i - 1));
3397 } else {
3398 level_max_bytes_[i] = options.max_bytes_for_level_base;
3399 }
3400 }
3401 } else {
3402 uint64_t max_level_size = 0;
3403
3404 int first_non_empty_level = -1;
3405 // Find size of non-L0 level of most data.
3406 // Cannot use the size of the last level because it can be empty or less
3407 // than previous levels after compaction.
3408 for (int i = 1; i < num_levels_; i++) {
3409 uint64_t total_size = 0;
3410 for (const auto& f : files_[i]) {
3411 total_size += f->fd.GetFileSize();
3412 }
3413 if (total_size > 0 && first_non_empty_level == -1) {
3414 first_non_empty_level = i;
3415 }
3416 if (total_size > max_level_size) {
3417 max_level_size = total_size;
3418 }
3419 }
3420
3421 // Prefill every level's max bytes to disallow compaction from there.
3422 for (int i = 0; i < num_levels_; i++) {
3423 level_max_bytes_[i] = std::numeric_limits<uint64_t>::max();
3424 }
3425
3426 if (max_level_size == 0) {
3427 // No data for L1 and up. L0 compacts to last level directly.
3428 // No compaction from L1+ needs to be scheduled.
3429 base_level_ = num_levels_ - 1;
3430 } else {
3431 uint64_t l0_size = 0;
3432 for (const auto& f : files_[0]) {
3433 l0_size += f->fd.GetFileSize();
3434 }
3435
3436 uint64_t base_bytes_max =
3437 std::max(options.max_bytes_for_level_base, l0_size);
3438 uint64_t base_bytes_min = static_cast<uint64_t>(
3439 base_bytes_max / options.max_bytes_for_level_multiplier);
3440
3441 // Try whether we can make last level's target size to be max_level_size
3442 uint64_t cur_level_size = max_level_size;
3443 for (int i = num_levels_ - 2; i >= first_non_empty_level; i--) {
3444 // Round up after dividing
3445 cur_level_size = static_cast<uint64_t>(
3446 cur_level_size / options.max_bytes_for_level_multiplier);
3447 }
3448
3449 // Calculate base level and its size.
3450 uint64_t base_level_size;
3451 if (cur_level_size <= base_bytes_min) {
3452 // Case 1. If we make target size of last level to be max_level_size,
3453 // target size of the first non-empty level would be smaller than
3454 // base_bytes_min. We set it be base_bytes_min.
3455 base_level_size = base_bytes_min + 1U;
3456 base_level_ = first_non_empty_level;
3457 ROCKS_LOG_INFO(ioptions.info_log,
3458 "More existing levels in DB than needed. "
3459 "max_bytes_for_level_multiplier may not be guaranteed.");
3460 } else {
3461 // Find base level (where L0 data is compacted to).
3462 base_level_ = first_non_empty_level;
3463 while (base_level_ > 1 && cur_level_size > base_bytes_max) {
3464 --base_level_;
3465 cur_level_size = static_cast<uint64_t>(
3466 cur_level_size / options.max_bytes_for_level_multiplier);
3467 }
3468 if (cur_level_size > base_bytes_max) {
3469 // Even L1 will be too large
3470 assert(base_level_ == 1);
3471 base_level_size = base_bytes_max;
3472 } else {
3473 base_level_size = cur_level_size;
3474 }
3475 }
3476
3477 level_multiplier_ = options.max_bytes_for_level_multiplier;
3478 assert(base_level_size > 0);
3479 if (l0_size > base_level_size &&
3480 (l0_size > options.max_bytes_for_level_base ||
3481 static_cast<int>(files_[0].size() / 2) >=
3482 options.level0_file_num_compaction_trigger)) {
3483 // We adjust the base level according to actual L0 size, and adjust
3484 // the level multiplier accordingly, when:
3485 // 1. the L0 size is larger than level size base, or
3486 // 2. number of L0 files reaches twice the L0->L1 compaction trigger
3487 // We don't do this otherwise to keep the LSM-tree structure stable
3488 // unless the L0 compation is backlogged.
3489 base_level_size = l0_size;
3490 if (base_level_ == num_levels_ - 1) {
3491 level_multiplier_ = 1.0;
3492 } else {
3493 level_multiplier_ = std::pow(
3494 static_cast<double>(max_level_size) /
3495 static_cast<double>(base_level_size),
3496 1.0 / static_cast<double>(num_levels_ - base_level_ - 1));
3497 }
3498 }
3499
3500 uint64_t level_size = base_level_size;
3501 for (int i = base_level_; i < num_levels_; i++) {
3502 if (i > base_level_) {
3503 level_size = MultiplyCheckOverflow(level_size, level_multiplier_);
3504 }
3505 // Don't set any level below base_bytes_max. Otherwise, the LSM can
3506 // assume an hourglass shape where L1+ sizes are smaller than L0. This
3507 // causes compaction scoring, which depends on level sizes, to favor L1+
3508 // at the expense of L0, which may fill up and stall.
3509 level_max_bytes_[i] = std::max(level_size, base_bytes_max);
3510 }
3511 }
3512 }
3513 }
3514
3515 uint64_t VersionStorageInfo::EstimateLiveDataSize() const {
3516 // Estimate the live data size by adding up the size of a maximal set of
3517 // sst files with no range overlap in same or higher level. The less
3518 // compacted, the more optimistic (smaller) this estimate is. Also,
3519 // for multiple sorted runs within a level, file order will matter.
3520 uint64_t size = 0;
3521
3522 auto ikey_lt = [this](InternalKey* x, InternalKey* y) {
3523 return internal_comparator_->Compare(*x, *y) < 0;
3524 };
3525 // (Ordered) map of largest keys in files being included in size estimate
3526 std::map<InternalKey*, FileMetaData*, decltype(ikey_lt)> ranges(ikey_lt);
3527
3528 for (int l = num_levels_ - 1; l >= 0; l--) {
3529 bool found_end = false;
3530 for (auto file : files_[l]) {
3531 // Find the first file already included with largest key is larger than
3532 // the smallest key of `file`. If that file does not overlap with the
3533 // current file, none of the files in the map does. If there is
3534 // no potential overlap, we can safely insert the rest of this level
3535 // (if the level is not 0) into the map without checking again because
3536 // the elements in the level are sorted and non-overlapping.
3537 auto lb = (found_end && l != 0) ?
3538 ranges.end() : ranges.lower_bound(&file->smallest);
3539 found_end = (lb == ranges.end());
3540 if (found_end || internal_comparator_->Compare(
3541 file->largest, (*lb).second->smallest) < 0) {
3542 ranges.emplace_hint(lb, &file->largest, file);
3543 size += file->fd.file_size;
3544 }
3545 }
3546 }
3547 return size;
3548 }
3549
3550 bool VersionStorageInfo::RangeMightExistAfterSortedRun(
3551 const Slice& smallest_user_key, const Slice& largest_user_key,
3552 int last_level, int last_l0_idx) {
3553 assert((last_l0_idx != -1) == (last_level == 0));
3554 // TODO(ajkr): this preserves earlier behavior where we considered an L0 file
3555 // bottommost only if it's the oldest L0 file and there are no files on older
3556 // levels. It'd be better to consider it bottommost if there's no overlap in
3557 // older levels/files.
3558 if (last_level == 0 &&
3559 last_l0_idx != static_cast<int>(LevelFiles(0).size() - 1)) {
3560 return true;
3561 }
3562
3563 // Checks whether there are files living beyond the `last_level`. If lower
3564 // levels have files, it checks for overlap between [`smallest_key`,
3565 // `largest_key`] and those files. Bottomlevel optimizations can be made if
3566 // there are no files in lower levels or if there is no overlap with the files
3567 // in the lower levels.
3568 for (int level = last_level + 1; level < num_levels(); level++) {
3569 // The range is not in the bottommost level if there are files in lower
3570 // levels when the `last_level` is 0 or if there are files in lower levels
3571 // which overlap with [`smallest_key`, `largest_key`].
3572 if (files_[level].size() > 0 &&
3573 (last_level == 0 ||
3574 OverlapInLevel(level, &smallest_user_key, &largest_user_key))) {
3575 return true;
3576 }
3577 }
3578 return false;
3579 }
3580
3581 void Version::AddLiveFiles(std::vector<uint64_t>* live_table_files,
3582 std::vector<uint64_t>* live_blob_files) const {
3583 assert(live_table_files);
3584 assert(live_blob_files);
3585
3586 for (int level = 0; level < storage_info_.num_levels(); ++level) {
3587 const auto& level_files = storage_info_.LevelFiles(level);
3588 for (const auto& meta : level_files) {
3589 assert(meta);
3590
3591 live_table_files->emplace_back(meta->fd.GetNumber());
3592 }
3593 }
3594
3595 const auto& blob_files = storage_info_.GetBlobFiles();
3596 for (const auto& pair : blob_files) {
3597 const auto& meta = pair.second;
3598 assert(meta);
3599
3600 live_blob_files->emplace_back(meta->GetBlobFileNumber());
3601 }
3602 }
3603
3604 std::string Version::DebugString(bool hex, bool print_stats) const {
3605 std::string r;
3606 for (int level = 0; level < storage_info_.num_levels_; level++) {
3607 // E.g.,
3608 // --- level 1 ---
3609 // 17:123[1 .. 124]['a' .. 'd']
3610 // 20:43[124 .. 128]['e' .. 'g']
3611 //
3612 // if print_stats=true:
3613 // 17:123[1 .. 124]['a' .. 'd'](4096)
3614 r.append("--- level ");
3615 AppendNumberTo(&r, level);
3616 r.append(" --- version# ");
3617 AppendNumberTo(&r, version_number_);
3618 r.append(" ---\n");
3619 const std::vector<FileMetaData*>& files = storage_info_.files_[level];
3620 for (size_t i = 0; i < files.size(); i++) {
3621 r.push_back(' ');
3622 AppendNumberTo(&r, files[i]->fd.GetNumber());
3623 r.push_back(':');
3624 AppendNumberTo(&r, files[i]->fd.GetFileSize());
3625 r.append("[");
3626 AppendNumberTo(&r, files[i]->fd.smallest_seqno);
3627 r.append(" .. ");
3628 AppendNumberTo(&r, files[i]->fd.largest_seqno);
3629 r.append("]");
3630 r.append("[");
3631 r.append(files[i]->smallest.DebugString(hex));
3632 r.append(" .. ");
3633 r.append(files[i]->largest.DebugString(hex));
3634 r.append("]");
3635 if (files[i]->oldest_blob_file_number != kInvalidBlobFileNumber) {
3636 r.append(" blob_file:");
3637 AppendNumberTo(&r, files[i]->oldest_blob_file_number);
3638 }
3639 if (print_stats) {
3640 r.append("(");
3641 r.append(ToString(
3642 files[i]->stats.num_reads_sampled.load(std::memory_order_relaxed)));
3643 r.append(")");
3644 }
3645 r.append("\n");
3646 }
3647 }
3648
3649 const auto& blob_files = storage_info_.GetBlobFiles();
3650 if (!blob_files.empty()) {
3651 r.append("--- blob files --- version# ");
3652 AppendNumberTo(&r, version_number_);
3653 r.append(" ---\n");
3654 for (const auto& pair : blob_files) {
3655 const auto& blob_file_meta = pair.second;
3656 assert(blob_file_meta);
3657
3658 r.append(blob_file_meta->DebugString());
3659 r.push_back('\n');
3660 }
3661 }
3662
3663 return r;
3664 }
3665
3666 // this is used to batch writes to the manifest file
3667 struct VersionSet::ManifestWriter {
3668 Status status;
3669 bool done;
3670 InstrumentedCondVar cv;
3671 ColumnFamilyData* cfd;
3672 const MutableCFOptions mutable_cf_options;
3673 const autovector<VersionEdit*>& edit_list;
3674 const std::function<void(const Status&)> manifest_write_callback;
3675
3676 explicit ManifestWriter(
3677 InstrumentedMutex* mu, ColumnFamilyData* _cfd,
3678 const MutableCFOptions& cf_options, const autovector<VersionEdit*>& e,
3679 const std::function<void(const Status&)>& manifest_wcb)
3680 : done(false),
3681 cv(mu),
3682 cfd(_cfd),
3683 mutable_cf_options(cf_options),
3684 edit_list(e),
3685 manifest_write_callback(manifest_wcb) {}
3686 ~ManifestWriter() { status.PermitUncheckedError(); }
3687
3688 bool IsAllWalEdits() const {
3689 bool all_wal_edits = true;
3690 for (const auto& e : edit_list) {
3691 if (!e->IsWalManipulation()) {
3692 all_wal_edits = false;
3693 break;
3694 }
3695 }
3696 return all_wal_edits;
3697 }
3698 };
3699
3700 Status AtomicGroupReadBuffer::AddEdit(VersionEdit* edit) {
3701 assert(edit);
3702 if (edit->is_in_atomic_group_) {
3703 TEST_SYNC_POINT("AtomicGroupReadBuffer::AddEdit:AtomicGroup");
3704 if (replay_buffer_.empty()) {
3705 replay_buffer_.resize(edit->remaining_entries_ + 1);
3706 TEST_SYNC_POINT_CALLBACK(
3707 "AtomicGroupReadBuffer::AddEdit:FirstInAtomicGroup", edit);
3708 }
3709 read_edits_in_atomic_group_++;
3710 if (read_edits_in_atomic_group_ + edit->remaining_entries_ !=
3711 static_cast<uint32_t>(replay_buffer_.size())) {
3712 TEST_SYNC_POINT_CALLBACK(
3713 "AtomicGroupReadBuffer::AddEdit:IncorrectAtomicGroupSize", edit);
3714 return Status::Corruption("corrupted atomic group");
3715 }
3716 replay_buffer_[read_edits_in_atomic_group_ - 1] = *edit;
3717 if (read_edits_in_atomic_group_ == replay_buffer_.size()) {
3718 TEST_SYNC_POINT_CALLBACK(
3719 "AtomicGroupReadBuffer::AddEdit:LastInAtomicGroup", edit);
3720 return Status::OK();
3721 }
3722 return Status::OK();
3723 }
3724
3725 // A normal edit.
3726 if (!replay_buffer().empty()) {
3727 TEST_SYNC_POINT_CALLBACK(
3728 "AtomicGroupReadBuffer::AddEdit:AtomicGroupMixedWithNormalEdits", edit);
3729 return Status::Corruption("corrupted atomic group");
3730 }
3731 return Status::OK();
3732 }
3733
3734 bool AtomicGroupReadBuffer::IsFull() const {
3735 return read_edits_in_atomic_group_ == replay_buffer_.size();
3736 }
3737
3738 bool AtomicGroupReadBuffer::IsEmpty() const { return replay_buffer_.empty(); }
3739
3740 void AtomicGroupReadBuffer::Clear() {
3741 read_edits_in_atomic_group_ = 0;
3742 replay_buffer_.clear();
3743 }
3744
3745 VersionSet::VersionSet(const std::string& dbname,
3746 const ImmutableDBOptions* _db_options,
3747 const FileOptions& storage_options, Cache* table_cache,
3748 WriteBufferManager* write_buffer_manager,
3749 WriteController* write_controller,
3750 BlockCacheTracer* const block_cache_tracer,
3751 const std::shared_ptr<IOTracer>& io_tracer)
3752 : column_family_set_(
3753 new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
3754 write_buffer_manager, write_controller,
3755 block_cache_tracer, io_tracer)),
3756 table_cache_(table_cache),
3757 env_(_db_options->env),
3758 fs_(_db_options->fs, io_tracer),
3759 dbname_(dbname),
3760 db_options_(_db_options),
3761 next_file_number_(2),
3762 manifest_file_number_(0), // Filled by Recover()
3763 options_file_number_(0),
3764 pending_manifest_file_number_(0),
3765 last_sequence_(0),
3766 last_allocated_sequence_(0),
3767 last_published_sequence_(0),
3768 prev_log_number_(0),
3769 current_version_number_(0),
3770 manifest_file_size_(0),
3771 file_options_(storage_options),
3772 block_cache_tracer_(block_cache_tracer),
3773 io_tracer_(io_tracer) {}
3774
3775 VersionSet::~VersionSet() {
3776 // we need to delete column_family_set_ because its destructor depends on
3777 // VersionSet
3778 column_family_set_.reset();
3779 for (auto& file : obsolete_files_) {
3780 if (file.metadata->table_reader_handle) {
3781 table_cache_->Release(file.metadata->table_reader_handle);
3782 TableCache::Evict(table_cache_, file.metadata->fd.GetNumber());
3783 }
3784 file.DeleteMetadata();
3785 }
3786 obsolete_files_.clear();
3787 io_status_.PermitUncheckedError();
3788 }
3789
3790 void VersionSet::Reset() {
3791 if (column_family_set_) {
3792 WriteBufferManager* wbm = column_family_set_->write_buffer_manager();
3793 WriteController* wc = column_family_set_->write_controller();
3794 column_family_set_.reset(
3795 new ColumnFamilySet(dbname_, db_options_, file_options_, table_cache_,
3796 wbm, wc, block_cache_tracer_, io_tracer_));
3797 }
3798 db_id_.clear();
3799 next_file_number_.store(2);
3800 min_log_number_to_keep_2pc_.store(0);
3801 manifest_file_number_ = 0;
3802 options_file_number_ = 0;
3803 pending_manifest_file_number_ = 0;
3804 last_sequence_.store(0);
3805 last_allocated_sequence_.store(0);
3806 last_published_sequence_.store(0);
3807 prev_log_number_ = 0;
3808 descriptor_log_.reset();
3809 current_version_number_ = 0;
3810 manifest_writers_.clear();
3811 manifest_file_size_ = 0;
3812 obsolete_files_.clear();
3813 obsolete_manifests_.clear();
3814 wals_.Reset();
3815 }
3816
3817 void VersionSet::AppendVersion(ColumnFamilyData* column_family_data,
3818 Version* v) {
3819 // compute new compaction score
3820 v->storage_info()->ComputeCompactionScore(
3821 *column_family_data->ioptions(),
3822 *column_family_data->GetLatestMutableCFOptions());
3823
3824 // Mark v finalized
3825 v->storage_info_.SetFinalized();
3826
3827 // Make "v" current
3828 assert(v->refs_ == 0);
3829 Version* current = column_family_data->current();
3830 assert(v != current);
3831 if (current != nullptr) {
3832 assert(current->refs_ > 0);
3833 current->Unref();
3834 }
3835 column_family_data->SetCurrent(v);
3836 v->Ref();
3837
3838 // Append to linked list
3839 v->prev_ = column_family_data->dummy_versions()->prev_;
3840 v->next_ = column_family_data->dummy_versions();
3841 v->prev_->next_ = v;
3842 v->next_->prev_ = v;
3843 }
3844
3845 Status VersionSet::ProcessManifestWrites(
3846 std::deque<ManifestWriter>& writers, InstrumentedMutex* mu,
3847 FSDirectory* db_directory, bool new_descriptor_log,
3848 const ColumnFamilyOptions* new_cf_options) {
3849 mu->AssertHeld();
3850 assert(!writers.empty());
3851 ManifestWriter& first_writer = writers.front();
3852 ManifestWriter* last_writer = &first_writer;
3853
3854 assert(!manifest_writers_.empty());
3855 assert(manifest_writers_.front() == &first_writer);
3856
3857 autovector<VersionEdit*> batch_edits;
3858 autovector<Version*> versions;
3859 autovector<const MutableCFOptions*> mutable_cf_options_ptrs;
3860 std::vector<std::unique_ptr<BaseReferencedVersionBuilder>> builder_guards;
3861
3862 if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
3863 // No group commits for column family add or drop
3864 LogAndApplyCFHelper(first_writer.edit_list.front());
3865 batch_edits.push_back(first_writer.edit_list.front());
3866 } else {
3867 auto it = manifest_writers_.cbegin();
3868 size_t group_start = std::numeric_limits<size_t>::max();
3869 while (it != manifest_writers_.cend()) {
3870 if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) {
3871 // no group commits for column family add or drop
3872 break;
3873 }
3874 last_writer = *(it++);
3875 assert(last_writer != nullptr);
3876 assert(last_writer->cfd != nullptr);
3877 if (last_writer->cfd->IsDropped()) {
3878 // If we detect a dropped CF at this point, and the corresponding
3879 // version edits belong to an atomic group, then we need to find out
3880 // the preceding version edits in the same atomic group, and update
3881 // their `remaining_entries_` member variable because we are NOT going
3882 // to write the version edits' of dropped CF to the MANIFEST. If we
3883 // don't update, then Recover can report corrupted atomic group because
3884 // the `remaining_entries_` do not match.
3885 if (!batch_edits.empty()) {
3886 if (batch_edits.back()->is_in_atomic_group_ &&
3887 batch_edits.back()->remaining_entries_ > 0) {
3888 assert(group_start < batch_edits.size());
3889 const auto& edit_list = last_writer->edit_list;
3890 size_t k = 0;
3891 while (k < edit_list.size()) {
3892 if (!edit_list[k]->is_in_atomic_group_) {
3893 break;
3894 } else if (edit_list[k]->remaining_entries_ == 0) {
3895 ++k;
3896 break;
3897 }
3898 ++k;
3899 }
3900 for (auto i = group_start; i < batch_edits.size(); ++i) {
3901 assert(static_cast<uint32_t>(k) <=
3902 batch_edits.back()->remaining_entries_);
3903 batch_edits[i]->remaining_entries_ -= static_cast<uint32_t>(k);
3904 }
3905 }
3906 }
3907 continue;
3908 }
3909 // We do a linear search on versions because versions is small.
3910 // TODO(yanqin) maybe consider unordered_map
3911 Version* version = nullptr;
3912 VersionBuilder* builder = nullptr;
3913 for (int i = 0; i != static_cast<int>(versions.size()); ++i) {
3914 uint32_t cf_id = last_writer->cfd->GetID();
3915 if (versions[i]->cfd()->GetID() == cf_id) {
3916 version = versions[i];
3917 assert(!builder_guards.empty() &&
3918 builder_guards.size() == versions.size());
3919 builder = builder_guards[i]->version_builder();
3920 TEST_SYNC_POINT_CALLBACK(
3921 "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id);
3922 break;
3923 }
3924 }
3925 if (version == nullptr) {
3926 // WAL manipulations do not need to be applied to versions.
3927 if (!last_writer->IsAllWalEdits()) {
3928 version = new Version(last_writer->cfd, this, file_options_,
3929 last_writer->mutable_cf_options, io_tracer_,
3930 current_version_number_++);
3931 versions.push_back(version);
3932 mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options);
3933 builder_guards.emplace_back(
3934 new BaseReferencedVersionBuilder(last_writer->cfd));
3935 builder = builder_guards.back()->version_builder();
3936 }
3937 assert(last_writer->IsAllWalEdits() || builder);
3938 assert(last_writer->IsAllWalEdits() || version);
3939 TEST_SYNC_POINT_CALLBACK("VersionSet::ProcessManifestWrites:NewVersion",
3940 version);
3941 }
3942 for (const auto& e : last_writer->edit_list) {
3943 if (e->is_in_atomic_group_) {
3944 if (batch_edits.empty() || !batch_edits.back()->is_in_atomic_group_ ||
3945 (batch_edits.back()->is_in_atomic_group_ &&
3946 batch_edits.back()->remaining_entries_ == 0)) {
3947 group_start = batch_edits.size();
3948 }
3949 } else if (group_start != std::numeric_limits<size_t>::max()) {
3950 group_start = std::numeric_limits<size_t>::max();
3951 }
3952 Status s = LogAndApplyHelper(last_writer->cfd, builder, e, mu);
3953 if (!s.ok()) {
3954 // free up the allocated memory
3955 for (auto v : versions) {
3956 delete v;
3957 }
3958 return s;
3959 }
3960 batch_edits.push_back(e);
3961 }
3962 }
3963 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
3964 assert(!builder_guards.empty() &&
3965 builder_guards.size() == versions.size());
3966 auto* builder = builder_guards[i]->version_builder();
3967 Status s = builder->SaveTo(versions[i]->storage_info());
3968 if (!s.ok()) {
3969 // free up the allocated memory
3970 for (auto v : versions) {
3971 delete v;
3972 }
3973 return s;
3974 }
3975 }
3976 }
3977
3978 #ifndef NDEBUG
3979 // Verify that version edits of atomic groups have correct
3980 // remaining_entries_.
3981 size_t k = 0;
3982 while (k < batch_edits.size()) {
3983 while (k < batch_edits.size() && !batch_edits[k]->is_in_atomic_group_) {
3984 ++k;
3985 }
3986 if (k == batch_edits.size()) {
3987 break;
3988 }
3989 size_t i = k;
3990 while (i < batch_edits.size()) {
3991 if (!batch_edits[i]->is_in_atomic_group_) {
3992 break;
3993 }
3994 assert(i - k + batch_edits[i]->remaining_entries_ ==
3995 batch_edits[k]->remaining_entries_);
3996 if (batch_edits[i]->remaining_entries_ == 0) {
3997 ++i;
3998 break;
3999 }
4000 ++i;
4001 }
4002 assert(batch_edits[i - 1]->is_in_atomic_group_);
4003 assert(0 == batch_edits[i - 1]->remaining_entries_);
4004 std::vector<VersionEdit*> tmp;
4005 for (size_t j = k; j != i; ++j) {
4006 tmp.emplace_back(batch_edits[j]);
4007 }
4008 TEST_SYNC_POINT_CALLBACK(
4009 "VersionSet::ProcessManifestWrites:CheckOneAtomicGroup", &tmp);
4010 k = i;
4011 }
4012 #endif // NDEBUG
4013
4014 assert(pending_manifest_file_number_ == 0);
4015 if (!descriptor_log_ ||
4016 manifest_file_size_ > db_options_->max_manifest_file_size) {
4017 TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:BeforeNewManifest");
4018 new_descriptor_log = true;
4019 } else {
4020 pending_manifest_file_number_ = manifest_file_number_;
4021 }
4022
4023 // Local cached copy of state variable(s). WriteCurrentStateToManifest()
4024 // reads its content after releasing db mutex to avoid race with
4025 // SwitchMemtable().
4026 std::unordered_map<uint32_t, MutableCFState> curr_state;
4027 VersionEdit wal_additions;
4028 if (new_descriptor_log) {
4029 pending_manifest_file_number_ = NewFileNumber();
4030 batch_edits.back()->SetNextFile(next_file_number_.load());
4031
4032 // if we are writing out new snapshot make sure to persist max column
4033 // family.
4034 if (column_family_set_->GetMaxColumnFamily() > 0) {
4035 first_writer.edit_list.front()->SetMaxColumnFamily(
4036 column_family_set_->GetMaxColumnFamily());
4037 }
4038 for (const auto* cfd : *column_family_set_) {
4039 assert(curr_state.find(cfd->GetID()) == curr_state.end());
4040 curr_state[cfd->GetID()] = {cfd->GetLogNumber()};
4041 }
4042
4043 for (const auto& wal : wals_.GetWals()) {
4044 wal_additions.AddWal(wal.first, wal.second);
4045 }
4046 }
4047
4048 uint64_t new_manifest_file_size = 0;
4049 Status s;
4050 IOStatus io_s;
4051 {
4052 FileOptions opt_file_opts = fs_->OptimizeForManifestWrite(file_options_);
4053 mu->Unlock();
4054
4055 TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WriteManifest", nullptr);
4056 if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
4057 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4058 assert(!builder_guards.empty() &&
4059 builder_guards.size() == versions.size());
4060 assert(!mutable_cf_options_ptrs.empty() &&
4061 builder_guards.size() == versions.size());
4062 ColumnFamilyData* cfd = versions[i]->cfd_;
4063 s = builder_guards[i]->version_builder()->LoadTableHandlers(
4064 cfd->internal_stats(), 1 /* max_threads */,
4065 true /* prefetch_index_and_filter_in_cache */,
4066 false /* is_initial_load */,
4067 mutable_cf_options_ptrs[i]->prefix_extractor.get(),
4068 MaxFileSizeForL0MetaPin(*mutable_cf_options_ptrs[i]));
4069 if (!s.ok()) {
4070 if (db_options_->paranoid_checks) {
4071 break;
4072 }
4073 s = Status::OK();
4074 }
4075 }
4076 }
4077
4078 if (s.ok() && new_descriptor_log) {
4079 // This is fine because everything inside of this block is serialized --
4080 // only one thread can be here at the same time
4081 // create new manifest file
4082 ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n",
4083 pending_manifest_file_number_);
4084 std::string descriptor_fname =
4085 DescriptorFileName(dbname_, pending_manifest_file_number_);
4086 std::unique_ptr<FSWritableFile> descriptor_file;
4087 io_s = NewWritableFile(fs_.get(), descriptor_fname, &descriptor_file,
4088 opt_file_opts);
4089 if (io_s.ok()) {
4090 descriptor_file->SetPreallocationBlockSize(
4091 db_options_->manifest_preallocation_size);
4092
4093 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
4094 std::move(descriptor_file), descriptor_fname, opt_file_opts, env_,
4095 io_tracer_, nullptr, db_options_->listeners));
4096 descriptor_log_.reset(
4097 new log::Writer(std::move(file_writer), 0, false));
4098 s = WriteCurrentStateToManifest(curr_state, wal_additions,
4099 descriptor_log_.get(), io_s);
4100 } else {
4101 s = io_s;
4102 }
4103 }
4104
4105 if (s.ok()) {
4106 if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) {
4107 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4108 versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true);
4109 }
4110 }
4111
4112 // Write new records to MANIFEST log
4113 #ifndef NDEBUG
4114 size_t idx = 0;
4115 #endif
4116 for (auto& e : batch_edits) {
4117 std::string record;
4118 if (!e->EncodeTo(&record)) {
4119 s = Status::Corruption("Unable to encode VersionEdit:" +
4120 e->DebugString(true));
4121 break;
4122 }
4123 TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord",
4124 rocksdb_kill_odds * REDUCE_ODDS2);
4125 #ifndef NDEBUG
4126 if (batch_edits.size() > 1 && batch_edits.size() - 1 == idx) {
4127 TEST_SYNC_POINT_CALLBACK(
4128 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:0",
4129 nullptr);
4130 TEST_SYNC_POINT(
4131 "VersionSet::ProcessManifestWrites:BeforeWriteLastVersionEdit:1");
4132 }
4133 ++idx;
4134 #endif /* !NDEBUG */
4135 io_s = descriptor_log_->AddRecord(record);
4136 if (!io_s.ok()) {
4137 s = io_s;
4138 break;
4139 }
4140 }
4141 if (s.ok()) {
4142 io_s = SyncManifest(env_, db_options_, descriptor_log_->file());
4143 TEST_SYNC_POINT_CALLBACK(
4144 "VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
4145 }
4146 if (!io_s.ok()) {
4147 s = io_s;
4148 ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n",
4149 s.ToString().c_str());
4150 }
4151 }
4152
4153 // If we just created a new descriptor file, install it by writing a
4154 // new CURRENT file that points to it.
4155 if (s.ok() && new_descriptor_log) {
4156 io_s = SetCurrentFile(fs_.get(), dbname_, pending_manifest_file_number_,
4157 db_directory);
4158 if (!io_s.ok()) {
4159 s = io_s;
4160 }
4161 TEST_SYNC_POINT("VersionSet::ProcessManifestWrites:AfterNewManifest");
4162 }
4163
4164 if (s.ok()) {
4165 // find offset in manifest file where this version is stored.
4166 new_manifest_file_size = descriptor_log_->file()->GetFileSize();
4167 }
4168
4169 if (first_writer.edit_list.front()->is_column_family_drop_) {
4170 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0");
4171 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
4172 TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
4173 }
4174
4175 LogFlush(db_options_->info_log);
4176 TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifestDone");
4177 mu->Lock();
4178 }
4179
4180 if (s.ok()) {
4181 // Apply WAL edits, DB mutex must be held.
4182 for (auto& e : batch_edits) {
4183 if (e->IsWalAddition()) {
4184 s = wals_.AddWals(e->GetWalAdditions());
4185 } else if (e->IsWalDeletion()) {
4186 s = wals_.DeleteWalsBefore(e->GetWalDeletion().GetLogNumber());
4187 }
4188 if (!s.ok()) {
4189 break;
4190 }
4191 }
4192 }
4193
4194 if (!io_s.ok()) {
4195 if (io_status_.ok()) {
4196 io_status_ = io_s;
4197 }
4198 } else if (!io_status_.ok()) {
4199 io_status_ = io_s;
4200 }
4201
4202 // Append the old manifest file to the obsolete_manifest_ list to be deleted
4203 // by PurgeObsoleteFiles later.
4204 if (s.ok() && new_descriptor_log) {
4205 obsolete_manifests_.emplace_back(
4206 DescriptorFileName("", manifest_file_number_));
4207 }
4208
4209 // Install the new versions
4210 if (s.ok()) {
4211 if (first_writer.edit_list.front()->is_column_family_add_) {
4212 assert(batch_edits.size() == 1);
4213 assert(new_cf_options != nullptr);
4214 CreateColumnFamily(*new_cf_options, first_writer.edit_list.front());
4215 } else if (first_writer.edit_list.front()->is_column_family_drop_) {
4216 assert(batch_edits.size() == 1);
4217 first_writer.cfd->SetDropped();
4218 first_writer.cfd->UnrefAndTryDelete();
4219 } else {
4220 // Each version in versions corresponds to a column family.
4221 // For each column family, update its log number indicating that logs
4222 // with number smaller than this should be ignored.
4223 for (const auto version : versions) {
4224 uint64_t max_log_number_in_batch = 0;
4225 uint32_t cf_id = version->cfd_->GetID();
4226 for (const auto& e : batch_edits) {
4227 if (e->has_log_number_ && e->column_family_ == cf_id) {
4228 max_log_number_in_batch =
4229 std::max(max_log_number_in_batch, e->log_number_);
4230 }
4231 }
4232 if (max_log_number_in_batch != 0) {
4233 assert(version->cfd_->GetLogNumber() <= max_log_number_in_batch);
4234 version->cfd_->SetLogNumber(max_log_number_in_batch);
4235 }
4236 }
4237
4238 uint64_t last_min_log_number_to_keep = 0;
4239 for (auto& e : batch_edits) {
4240 if (e->has_min_log_number_to_keep_) {
4241 last_min_log_number_to_keep =
4242 std::max(last_min_log_number_to_keep, e->min_log_number_to_keep_);
4243 }
4244 }
4245
4246 if (last_min_log_number_to_keep != 0) {
4247 // Should only be set in 2PC mode.
4248 MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep);
4249 }
4250
4251 for (int i = 0; i < static_cast<int>(versions.size()); ++i) {
4252 ColumnFamilyData* cfd = versions[i]->cfd_;
4253 AppendVersion(cfd, versions[i]);
4254 }
4255 }
4256 manifest_file_number_ = pending_manifest_file_number_;
4257 manifest_file_size_ = new_manifest_file_size;
4258 prev_log_number_ = first_writer.edit_list.front()->prev_log_number_;
4259 } else {
4260 std::string version_edits;
4261 for (auto& e : batch_edits) {
4262 version_edits += ("\n" + e->DebugString(true));
4263 }
4264 ROCKS_LOG_ERROR(db_options_->info_log,
4265 "Error in committing version edit to MANIFEST: %s",
4266 version_edits.c_str());
4267 for (auto v : versions) {
4268 delete v;
4269 }
4270 // If manifest append failed for whatever reason, the file could be
4271 // corrupted. So we need to force the next version update to start a
4272 // new manifest file.
4273 descriptor_log_.reset();
4274 if (new_descriptor_log) {
4275 ROCKS_LOG_INFO(db_options_->info_log,
4276 "Deleting manifest %" PRIu64 " current manifest %" PRIu64
4277 "\n",
4278 pending_manifest_file_number_, manifest_file_number_);
4279 Status manifest_del_status = env_->DeleteFile(
4280 DescriptorFileName(dbname_, pending_manifest_file_number_));
4281 if (!manifest_del_status.ok()) {
4282 ROCKS_LOG_WARN(db_options_->info_log,
4283 "Failed to delete manifest %" PRIu64 ": %s",
4284 pending_manifest_file_number_,
4285 manifest_del_status.ToString().c_str());
4286 }
4287 }
4288 }
4289
4290 pending_manifest_file_number_ = 0;
4291
4292 // wake up all the waiting writers
4293 while (true) {
4294 ManifestWriter* ready = manifest_writers_.front();
4295 manifest_writers_.pop_front();
4296 bool need_signal = true;
4297 for (const auto& w : writers) {
4298 if (&w == ready) {
4299 need_signal = false;
4300 break;
4301 }
4302 }
4303 ready->status = s;
4304 ready->done = true;
4305 if (ready->manifest_write_callback) {
4306 (ready->manifest_write_callback)(s);
4307 }
4308 if (need_signal) {
4309 ready->cv.Signal();
4310 }
4311 if (ready == last_writer) {
4312 break;
4313 }
4314 }
4315 if (!manifest_writers_.empty()) {
4316 manifest_writers_.front()->cv.Signal();
4317 }
4318 return s;
4319 }
4320
4321 // 'datas' is gramatically incorrect. We still use this notation to indicate
4322 // that this variable represents a collection of column_family_data.
4323 Status VersionSet::LogAndApply(
4324 const autovector<ColumnFamilyData*>& column_family_datas,
4325 const autovector<const MutableCFOptions*>& mutable_cf_options_list,
4326 const autovector<autovector<VersionEdit*>>& edit_lists,
4327 InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log,
4328 const ColumnFamilyOptions* new_cf_options,
4329 const std::vector<std::function<void(const Status&)>>& manifest_wcbs) {
4330 mu->AssertHeld();
4331 int num_edits = 0;
4332 for (const auto& elist : edit_lists) {
4333 num_edits += static_cast<int>(elist.size());
4334 }
4335 if (num_edits == 0) {
4336 return Status::OK();
4337 } else if (num_edits > 1) {
4338 #ifndef NDEBUG
4339 for (const auto& edit_list : edit_lists) {
4340 for (const auto& edit : edit_list) {
4341 assert(!edit->IsColumnFamilyManipulation());
4342 }
4343 }
4344 #endif /* ! NDEBUG */
4345 }
4346
4347 int num_cfds = static_cast<int>(column_family_datas.size());
4348 if (num_cfds == 1 && column_family_datas[0] == nullptr) {
4349 assert(edit_lists.size() == 1 && edit_lists[0].size() == 1);
4350 assert(edit_lists[0][0]->is_column_family_add_);
4351 assert(new_cf_options != nullptr);
4352 }
4353 std::deque<ManifestWriter> writers;
4354 if (num_cfds > 0) {
4355 assert(static_cast<size_t>(num_cfds) == mutable_cf_options_list.size());
4356 assert(static_cast<size_t>(num_cfds) == edit_lists.size());
4357 }
4358 for (int i = 0; i < num_cfds; ++i) {
4359 const auto wcb =
4360 manifest_wcbs.empty() ? [](const Status&) {} : manifest_wcbs[i];
4361 writers.emplace_back(mu, column_family_datas[i],
4362 *mutable_cf_options_list[i], edit_lists[i], wcb);
4363 manifest_writers_.push_back(&writers[i]);
4364 }
4365 assert(!writers.empty());
4366 ManifestWriter& first_writer = writers.front();
4367 TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:BeforeWriterWaiting",
4368 nullptr);
4369 while (!first_writer.done && &first_writer != manifest_writers_.front()) {
4370 first_writer.cv.Wait();
4371 }
4372 if (first_writer.done) {
4373 // All non-CF-manipulation operations can be grouped together and committed
4374 // to MANIFEST. They should all have finished. The status code is stored in
4375 // the first manifest writer.
4376 #ifndef NDEBUG
4377 for (const auto& writer : writers) {
4378 assert(writer.done);
4379 }
4380 TEST_SYNC_POINT_CALLBACK("VersionSet::LogAndApply:WakeUpAndDone", mu);
4381 #endif /* !NDEBUG */
4382 return first_writer.status;
4383 }
4384
4385 int num_undropped_cfds = 0;
4386 for (auto cfd : column_family_datas) {
4387 // if cfd == nullptr, it is a column family add.
4388 if (cfd == nullptr || !cfd->IsDropped()) {
4389 ++num_undropped_cfds;
4390 }
4391 }
4392 if (0 == num_undropped_cfds) {
4393 for (int i = 0; i != num_cfds; ++i) {
4394 manifest_writers_.pop_front();
4395 }
4396 // Notify new head of manifest write queue.
4397 if (!manifest_writers_.empty()) {
4398 manifest_writers_.front()->cv.Signal();
4399 }
4400 return Status::ColumnFamilyDropped();
4401 }
4402
4403 return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log,
4404 new_cf_options);
4405 }
4406
4407 void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) {
4408 assert(edit->IsColumnFamilyManipulation());
4409 edit->SetNextFile(next_file_number_.load());
4410 // The log might have data that is not visible to memtbale and hence have not
4411 // updated the last_sequence_ yet. It is also possible that the log has is
4412 // expecting some new data that is not written yet. Since LastSequence is an
4413 // upper bound on the sequence, it is ok to record
4414 // last_allocated_sequence_ as the last sequence.
4415 edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4416 : last_sequence_);
4417 if (edit->is_column_family_drop_) {
4418 // if we drop column family, we have to make sure to save max column family,
4419 // so that we don't reuse existing ID
4420 edit->SetMaxColumnFamily(column_family_set_->GetMaxColumnFamily());
4421 }
4422 }
4423
4424 Status VersionSet::LogAndApplyHelper(ColumnFamilyData* cfd,
4425 VersionBuilder* builder, VersionEdit* edit,
4426 InstrumentedMutex* mu) {
4427 #ifdef NDEBUG
4428 (void)cfd;
4429 #endif
4430 mu->AssertHeld();
4431 assert(!edit->IsColumnFamilyManipulation());
4432
4433 if (edit->has_log_number_) {
4434 assert(edit->log_number_ >= cfd->GetLogNumber());
4435 assert(edit->log_number_ < next_file_number_.load());
4436 }
4437
4438 if (!edit->has_prev_log_number_) {
4439 edit->SetPrevLogNumber(prev_log_number_);
4440 }
4441 edit->SetNextFile(next_file_number_.load());
4442 // The log might have data that is not visible to memtbale and hence have not
4443 // updated the last_sequence_ yet. It is also possible that the log has is
4444 // expecting some new data that is not written yet. Since LastSequence is an
4445 // upper bound on the sequence, it is ok to record
4446 // last_allocated_sequence_ as the last sequence.
4447 edit->SetLastSequence(db_options_->two_write_queues ? last_allocated_sequence_
4448 : last_sequence_);
4449
4450 // The builder can be nullptr only if edit is WAL manipulation,
4451 // because WAL edits do not need to be applied to versions,
4452 // we return Status::OK() in this case.
4453 assert(builder || edit->IsWalManipulation());
4454 return builder ? builder->Apply(edit) : Status::OK();
4455 }
4456
4457 Status VersionSet::ApplyOneVersionEditToBuilder(
4458 VersionEdit& edit,
4459 const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
4460 std::unordered_map<int, std::string>& column_families_not_found,
4461 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
4462 builders,
4463 VersionEditParams* version_edit_params) {
4464 // Not found means that user didn't supply that column
4465 // family option AND we encountered column family add
4466 // record. Once we encounter column family drop record,
4467 // we will delete the column family from
4468 // column_families_not_found.
4469 bool cf_in_not_found = (column_families_not_found.find(edit.column_family_) !=
4470 column_families_not_found.end());
4471 // in builders means that user supplied that column family
4472 // option AND that we encountered column family add record
4473 bool cf_in_builders = builders.find(edit.column_family_) != builders.end();
4474
4475 // they can't both be true
4476 assert(!(cf_in_not_found && cf_in_builders));
4477
4478 ColumnFamilyData* cfd = nullptr;
4479
4480 if (edit.is_column_family_add_) {
4481 if (cf_in_builders || cf_in_not_found) {
4482 return Status::Corruption(
4483 "Manifest adding the same column family twice: " +
4484 edit.column_family_name_);
4485 }
4486 auto cf_options = name_to_options.find(edit.column_family_name_);
4487 // implicitly add persistent_stats column family without requiring user
4488 // to specify
4489 bool is_persistent_stats_column_family =
4490 edit.column_family_name_.compare(kPersistentStatsColumnFamilyName) == 0;
4491 if (cf_options == name_to_options.end() &&
4492 !is_persistent_stats_column_family) {
4493 column_families_not_found.insert(
4494 {edit.column_family_, edit.column_family_name_});
4495 } else {
4496 // recover persistent_stats CF from a DB that already contains it
4497 if (is_persistent_stats_column_family) {
4498 ColumnFamilyOptions cfo;
4499 OptimizeForPersistentStats(&cfo);
4500 cfd = CreateColumnFamily(cfo, &edit);
4501 } else {
4502 cfd = CreateColumnFamily(cf_options->second, &edit);
4503 }
4504 cfd->set_initialized();
4505 builders.insert(std::make_pair(
4506 edit.column_family_, std::unique_ptr<BaseReferencedVersionBuilder>(
4507 new BaseReferencedVersionBuilder(cfd))));
4508 }
4509 } else if (edit.is_column_family_drop_) {
4510 if (cf_in_builders) {
4511 auto builder = builders.find(edit.column_family_);
4512 assert(builder != builders.end());
4513 builders.erase(builder);
4514 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4515 assert(cfd != nullptr);
4516 if (cfd->UnrefAndTryDelete()) {
4517 cfd = nullptr;
4518 } else {
4519 // who else can have reference to cfd!?
4520 assert(false);
4521 }
4522 } else if (cf_in_not_found) {
4523 column_families_not_found.erase(edit.column_family_);
4524 } else {
4525 return Status::Corruption(
4526 "Manifest - dropping non-existing column family");
4527 }
4528 } else if (edit.IsWalAddition()) {
4529 Status s = wals_.AddWals(edit.GetWalAdditions());
4530 if (!s.ok()) {
4531 return s;
4532 }
4533 } else if (edit.IsWalDeletion()) {
4534 Status s = wals_.DeleteWalsBefore(edit.GetWalDeletion().GetLogNumber());
4535 if (!s.ok()) {
4536 return s;
4537 }
4538 } else if (!cf_in_not_found) {
4539 if (!cf_in_builders) {
4540 return Status::Corruption(
4541 "Manifest record referencing unknown column family");
4542 }
4543
4544 cfd = column_family_set_->GetColumnFamily(edit.column_family_);
4545 // this should never happen since cf_in_builders is true
4546 assert(cfd != nullptr);
4547
4548 // if it is not column family add or column family drop,
4549 // then it's a file add/delete, which should be forwarded
4550 // to builder
4551 auto builder = builders.find(edit.column_family_);
4552 assert(builder != builders.end());
4553 Status s = builder->second->version_builder()->Apply(&edit);
4554 if (!s.ok()) {
4555 return s;
4556 }
4557 }
4558 return ExtractInfoFromVersionEdit(cfd, edit, version_edit_params);
4559 }
4560
4561 Status VersionSet::ExtractInfoFromVersionEdit(
4562 ColumnFamilyData* cfd, const VersionEdit& from_edit,
4563 VersionEditParams* version_edit_params) {
4564 if (cfd != nullptr) {
4565 if (from_edit.has_db_id_) {
4566 version_edit_params->SetDBId(from_edit.db_id_);
4567 }
4568 if (from_edit.has_log_number_) {
4569 if (cfd->GetLogNumber() > from_edit.log_number_) {
4570 ROCKS_LOG_WARN(
4571 db_options_->info_log,
4572 "MANIFEST corruption detected, but ignored - Log numbers in "
4573 "records NOT monotonically increasing");
4574 } else {
4575 cfd->SetLogNumber(from_edit.log_number_);
4576 version_edit_params->SetLogNumber(from_edit.log_number_);
4577 }
4578 }
4579 if (from_edit.has_comparator_ &&
4580 from_edit.comparator_ != cfd->user_comparator()->Name()) {
4581 return Status::InvalidArgument(
4582 cfd->user_comparator()->Name(),
4583 "does not match existing comparator " + from_edit.comparator_);
4584 }
4585 }
4586
4587 if (from_edit.has_prev_log_number_) {
4588 version_edit_params->SetPrevLogNumber(from_edit.prev_log_number_);
4589 }
4590
4591 if (from_edit.has_next_file_number_) {
4592 version_edit_params->SetNextFile(from_edit.next_file_number_);
4593 }
4594
4595 if (from_edit.has_max_column_family_) {
4596 version_edit_params->SetMaxColumnFamily(from_edit.max_column_family_);
4597 }
4598
4599 if (from_edit.has_min_log_number_to_keep_) {
4600 version_edit_params->min_log_number_to_keep_ =
4601 std::max(version_edit_params->min_log_number_to_keep_,
4602 from_edit.min_log_number_to_keep_);
4603 }
4604
4605 if (from_edit.has_last_sequence_) {
4606 version_edit_params->SetLastSequence(from_edit.last_sequence_);
4607 }
4608 return Status::OK();
4609 }
4610
4611 Status VersionSet::GetCurrentManifestPath(const std::string& dbname,
4612 FileSystem* fs,
4613 std::string* manifest_path,
4614 uint64_t* manifest_file_number) {
4615 assert(fs != nullptr);
4616 assert(manifest_path != nullptr);
4617 assert(manifest_file_number != nullptr);
4618
4619 std::string fname;
4620 Status s = ReadFileToString(fs, CurrentFileName(dbname), &fname);
4621 if (!s.ok()) {
4622 return s;
4623 }
4624 if (fname.empty() || fname.back() != '\n') {
4625 return Status::Corruption("CURRENT file does not end with newline");
4626 }
4627 // remove the trailing '\n'
4628 fname.resize(fname.size() - 1);
4629 FileType type;
4630 bool parse_ok = ParseFileName(fname, manifest_file_number, &type);
4631 if (!parse_ok || type != kDescriptorFile) {
4632 return Status::Corruption("CURRENT file corrupted");
4633 }
4634 *manifest_path = dbname;
4635 if (dbname.back() != '/') {
4636 manifest_path->push_back('/');
4637 }
4638 manifest_path->append(fname);
4639 return Status::OK();
4640 }
4641
4642 Status VersionSet::ReadAndRecover(
4643 log::Reader& reader, AtomicGroupReadBuffer* read_buffer,
4644 const std::unordered_map<std::string, ColumnFamilyOptions>& name_to_options,
4645 std::unordered_map<int, std::string>& column_families_not_found,
4646 std::unordered_map<uint32_t, std::unique_ptr<BaseReferencedVersionBuilder>>&
4647 builders,
4648 Status* log_read_status, VersionEditParams* version_edit_params,
4649 std::string* db_id) {
4650 assert(read_buffer != nullptr);
4651 assert(log_read_status != nullptr);
4652 Status s;
4653 Slice record;
4654 std::string scratch;
4655 size_t recovered_edits = 0;
4656 while (s.ok() && reader.ReadRecord(&record, &scratch) &&
4657 log_read_status->ok()) {
4658 VersionEdit edit;
4659 s = edit.DecodeFrom(record);
4660 if (!s.ok()) {
4661 break;
4662 }
4663 if (edit.has_db_id_) {
4664 db_id_ = edit.GetDbId();
4665 if (db_id != nullptr) {
4666 db_id->assign(edit.GetDbId());
4667 }
4668 }
4669 s = read_buffer->AddEdit(&edit);
4670 if (!s.ok()) {
4671 break;
4672 }
4673 if (edit.is_in_atomic_group_) {
4674 if (read_buffer->IsFull()) {
4675 // Apply edits in an atomic group when we have read all edits in the
4676 // group.
4677 for (auto& e : read_buffer->replay_buffer()) {
4678 s = ApplyOneVersionEditToBuilder(e, name_to_options,
4679 column_families_not_found, builders,
4680 version_edit_params);
4681 if (!s.ok()) {
4682 break;
4683 }
4684 recovered_edits++;
4685 }
4686 if (!s.ok()) {
4687 break;
4688 }
4689 read_buffer->Clear();
4690 }
4691 } else {
4692 // Apply a normal edit immediately.
4693 s = ApplyOneVersionEditToBuilder(edit, name_to_options,
4694 column_families_not_found, builders,
4695 version_edit_params);
4696 if (s.ok()) {
4697 recovered_edits++;
4698 }
4699 }
4700 }
4701 if (!log_read_status->ok()) {
4702 s = *log_read_status;
4703 }
4704 if (!s.ok()) {
4705 // Clear the buffer if we fail to decode/apply an edit.
4706 read_buffer->Clear();
4707 }
4708 TEST_SYNC_POINT_CALLBACK("VersionSet::ReadAndRecover:RecoveredEdits",
4709 &recovered_edits);
4710 return s;
4711 }
4712
4713 Status VersionSet::Recover(
4714 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4715 std::string* db_id) {
4716 // Read "CURRENT" file, which contains a pointer to the current manifest file
4717 std::string manifest_path;
4718 Status s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
4719 &manifest_file_number_);
4720 if (!s.ok()) {
4721 return s;
4722 }
4723
4724 ROCKS_LOG_INFO(db_options_->info_log, "Recovering from manifest file: %s\n",
4725 manifest_path.c_str());
4726
4727 std::unique_ptr<SequentialFileReader> manifest_file_reader;
4728 {
4729 std::unique_ptr<FSSequentialFile> manifest_file;
4730 s = fs_->NewSequentialFile(manifest_path,
4731 fs_->OptimizeForManifestRead(file_options_),
4732 &manifest_file, nullptr);
4733 if (!s.ok()) {
4734 return s;
4735 }
4736 manifest_file_reader.reset(
4737 new SequentialFileReader(std::move(manifest_file), manifest_path,
4738 db_options_->log_readahead_size, io_tracer_));
4739 }
4740 uint64_t current_manifest_file_size = 0;
4741 uint64_t log_number = 0;
4742 {
4743 VersionSet::LogReporter reporter;
4744 Status log_read_status;
4745 reporter.status = &log_read_status;
4746 log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
4747 true /* checksum */, 0 /* log_number */);
4748 VersionEditHandler handler(
4749 read_only, column_families, const_cast<VersionSet*>(this),
4750 /*track_missing_files=*/false,
4751 /*no_error_if_table_files_missing=*/false, io_tracer_);
4752 handler.Iterate(reader, &log_read_status);
4753 s = handler.status();
4754 if (s.ok()) {
4755 log_number = handler.GetVersionEditParams().log_number_;
4756 current_manifest_file_size = reader.GetReadOffset();
4757 assert(current_manifest_file_size != 0);
4758 handler.GetDbId(db_id);
4759 }
4760 }
4761
4762 if (s.ok()) {
4763 manifest_file_size_ = current_manifest_file_size;
4764 ROCKS_LOG_INFO(
4765 db_options_->info_log,
4766 "Recovered from manifest file:%s succeeded,"
4767 "manifest_file_number is %" PRIu64 ", next_file_number is %" PRIu64
4768 ", last_sequence is %" PRIu64 ", log_number is %" PRIu64
4769 ",prev_log_number is %" PRIu64 ",max_column_family is %" PRIu32
4770 ",min_log_number_to_keep is %" PRIu64 "\n",
4771 manifest_path.c_str(), manifest_file_number_, next_file_number_.load(),
4772 last_sequence_.load(), log_number, prev_log_number_,
4773 column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
4774
4775 for (auto cfd : *column_family_set_) {
4776 if (cfd->IsDropped()) {
4777 continue;
4778 }
4779 ROCKS_LOG_INFO(db_options_->info_log,
4780 "Column family [%s] (ID %" PRIu32
4781 "), log number is %" PRIu64 "\n",
4782 cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
4783 }
4784 }
4785
4786 return s;
4787 }
4788
4789 namespace {
4790 class ManifestPicker {
4791 public:
4792 explicit ManifestPicker(const std::string& dbname,
4793 const std::vector<std::string>& files_in_dbname);
4794 // REQUIRES Valid() == true
4795 std::string GetNextManifest(uint64_t* file_number, std::string* file_name);
4796 bool Valid() const { return manifest_file_iter_ != manifest_files_.end(); }
4797
4798 private:
4799 const std::string& dbname_;
4800 // MANIFEST file names(s)
4801 std::vector<std::string> manifest_files_;
4802 std::vector<std::string>::const_iterator manifest_file_iter_;
4803 };
4804
4805 ManifestPicker::ManifestPicker(const std::string& dbname,
4806 const std::vector<std::string>& files_in_dbname)
4807 : dbname_(dbname) {
4808 // populate manifest files
4809 assert(!files_in_dbname.empty());
4810 for (const auto& fname : files_in_dbname) {
4811 uint64_t file_num = 0;
4812 FileType file_type;
4813 bool parse_ok = ParseFileName(fname, &file_num, &file_type);
4814 if (parse_ok && file_type == kDescriptorFile) {
4815 manifest_files_.push_back(fname);
4816 }
4817 }
4818 // seek to first manifest
4819 std::sort(manifest_files_.begin(), manifest_files_.end(),
4820 [](const std::string& lhs, const std::string& rhs) {
4821 uint64_t num1 = 0;
4822 uint64_t num2 = 0;
4823 FileType type1;
4824 FileType type2;
4825 bool parse_ok1 = ParseFileName(lhs, &num1, &type1);
4826 bool parse_ok2 = ParseFileName(rhs, &num2, &type2);
4827 #ifndef NDEBUG
4828 assert(parse_ok1);
4829 assert(parse_ok2);
4830 #else
4831 (void)parse_ok1;
4832 (void)parse_ok2;
4833 #endif
4834 return num1 > num2;
4835 });
4836 manifest_file_iter_ = manifest_files_.begin();
4837 }
4838
4839 std::string ManifestPicker::GetNextManifest(uint64_t* number,
4840 std::string* file_name) {
4841 assert(Valid());
4842 std::string ret;
4843 if (manifest_file_iter_ != manifest_files_.end()) {
4844 ret.assign(dbname_);
4845 if (ret.back() != kFilePathSeparator) {
4846 ret.push_back(kFilePathSeparator);
4847 }
4848 ret.append(*manifest_file_iter_);
4849 if (number) {
4850 FileType type;
4851 bool parse = ParseFileName(*manifest_file_iter_, number, &type);
4852 assert(type == kDescriptorFile);
4853 #ifndef NDEBUG
4854 assert(parse);
4855 #else
4856 (void)parse;
4857 #endif
4858 }
4859 if (file_name) {
4860 *file_name = *manifest_file_iter_;
4861 }
4862 ++manifest_file_iter_;
4863 }
4864 return ret;
4865 }
4866 } // namespace
4867
4868 Status VersionSet::TryRecover(
4869 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4870 const std::vector<std::string>& files_in_dbname, std::string* db_id,
4871 bool* has_missing_table_file) {
4872 ManifestPicker manifest_picker(dbname_, files_in_dbname);
4873 if (!manifest_picker.Valid()) {
4874 return Status::Corruption("Cannot locate MANIFEST file in " + dbname_);
4875 }
4876 Status s;
4877 std::string manifest_path =
4878 manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
4879 while (!manifest_path.empty()) {
4880 s = TryRecoverFromOneManifest(manifest_path, column_families, read_only,
4881 db_id, has_missing_table_file);
4882 if (s.ok() || !manifest_picker.Valid()) {
4883 break;
4884 }
4885 Reset();
4886 manifest_path =
4887 manifest_picker.GetNextManifest(&manifest_file_number_, nullptr);
4888 }
4889 return s;
4890 }
4891
4892 Status VersionSet::TryRecoverFromOneManifest(
4893 const std::string& manifest_path,
4894 const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
4895 std::string* db_id, bool* has_missing_table_file) {
4896 ROCKS_LOG_INFO(db_options_->info_log, "Trying to recover from manifest: %s\n",
4897 manifest_path.c_str());
4898 std::unique_ptr<SequentialFileReader> manifest_file_reader;
4899 Status s;
4900 {
4901 std::unique_ptr<FSSequentialFile> manifest_file;
4902 s = fs_->NewSequentialFile(manifest_path,
4903 fs_->OptimizeForManifestRead(file_options_),
4904 &manifest_file, nullptr);
4905 if (!s.ok()) {
4906 return s;
4907 }
4908 manifest_file_reader.reset(
4909 new SequentialFileReader(std::move(manifest_file), manifest_path,
4910 db_options_->log_readahead_size, io_tracer_));
4911 }
4912
4913 assert(s.ok());
4914 VersionSet::LogReporter reporter;
4915 reporter.status = &s;
4916 log::Reader reader(nullptr, std::move(manifest_file_reader), &reporter,
4917 /*checksum=*/true, /*log_num=*/0);
4918 VersionEditHandlerPointInTime handler_pit(
4919 read_only, column_families, const_cast<VersionSet*>(this), io_tracer_);
4920
4921 handler_pit.Iterate(reader, &s);
4922
4923 handler_pit.GetDbId(db_id);
4924
4925 assert(nullptr != has_missing_table_file);
4926 *has_missing_table_file = handler_pit.HasMissingFiles();
4927
4928 return handler_pit.status();
4929 }
4930
4931 Status VersionSet::ListColumnFamilies(std::vector<std::string>* column_families,
4932 const std::string& dbname,
4933 FileSystem* fs) {
4934 // these are just for performance reasons, not correcntes,
4935 // so we're fine using the defaults
4936 FileOptions soptions;
4937 // Read "CURRENT" file, which contains a pointer to the current manifest file
4938 std::string manifest_path;
4939 uint64_t manifest_file_number;
4940 Status s =
4941 GetCurrentManifestPath(dbname, fs, &manifest_path, &manifest_file_number);
4942 if (!s.ok()) {
4943 return s;
4944 }
4945
4946 std::unique_ptr<SequentialFileReader> file_reader;
4947 {
4948 std::unique_ptr<FSSequentialFile> file;
4949 s = fs->NewSequentialFile(manifest_path, soptions, &file, nullptr);
4950 if (!s.ok()) {
4951 return s;
4952 }
4953 file_reader.reset(new SequentialFileReader(std::move(file), manifest_path,
4954 nullptr /*IOTracer*/));
4955 }
4956
4957 VersionSet::LogReporter reporter;
4958 reporter.status = &s;
4959 log::Reader reader(nullptr, std::move(file_reader), &reporter,
4960 true /* checksum */, 0 /* log_number */);
4961
4962 ListColumnFamiliesHandler handler;
4963 handler.Iterate(reader, &s);
4964
4965 assert(column_families);
4966 column_families->clear();
4967 if (handler.status().ok()) {
4968 for (const auto& iter : handler.GetColumnFamilyNames()) {
4969 column_families->push_back(iter.second);
4970 }
4971 }
4972
4973 return handler.status();
4974 }
4975
4976 #ifndef ROCKSDB_LITE
4977 Status VersionSet::ReduceNumberOfLevels(const std::string& dbname,
4978 const Options* options,
4979 const FileOptions& file_options,
4980 int new_levels) {
4981 if (new_levels <= 1) {
4982 return Status::InvalidArgument(
4983 "Number of levels needs to be bigger than 1");
4984 }
4985
4986 ImmutableDBOptions db_options(*options);
4987 ColumnFamilyOptions cf_options(*options);
4988 std::shared_ptr<Cache> tc(NewLRUCache(options->max_open_files - 10,
4989 options->table_cache_numshardbits));
4990 WriteController wc(options->delayed_write_rate);
4991 WriteBufferManager wb(options->db_write_buffer_size);
4992 VersionSet versions(dbname, &db_options, file_options, tc.get(), &wb, &wc,
4993 nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/);
4994 Status status;
4995
4996 std::vector<ColumnFamilyDescriptor> dummy;
4997 ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName,
4998 ColumnFamilyOptions(*options));
4999 dummy.push_back(dummy_descriptor);
5000 status = versions.Recover(dummy);
5001 if (!status.ok()) {
5002 return status;
5003 }
5004
5005 Version* current_version =
5006 versions.GetColumnFamilySet()->GetDefault()->current();
5007 auto* vstorage = current_version->storage_info();
5008 int current_levels = vstorage->num_levels();
5009
5010 if (current_levels <= new_levels) {
5011 return Status::OK();
5012 }
5013
5014 // Make sure there are file only on one level from
5015 // (new_levels-1) to (current_levels-1)
5016 int first_nonempty_level = -1;
5017 int first_nonempty_level_filenum = 0;
5018 for (int i = new_levels - 1; i < current_levels; i++) {
5019 int file_num = vstorage->NumLevelFiles(i);
5020 if (file_num != 0) {
5021 if (first_nonempty_level < 0) {
5022 first_nonempty_level = i;
5023 first_nonempty_level_filenum = file_num;
5024 } else {
5025 char msg[255];
5026 snprintf(msg, sizeof(msg),
5027 "Found at least two levels containing files: "
5028 "[%d:%d],[%d:%d].\n",
5029 first_nonempty_level, first_nonempty_level_filenum, i,
5030 file_num);
5031 return Status::InvalidArgument(msg);
5032 }
5033 }
5034 }
5035
5036 // we need to allocate an array with the old number of levels size to
5037 // avoid SIGSEGV in WriteCurrentStatetoManifest()
5038 // however, all levels bigger or equal to new_levels will be empty
5039 std::vector<FileMetaData*>* new_files_list =
5040 new std::vector<FileMetaData*>[current_levels];
5041 for (int i = 0; i < new_levels - 1; i++) {
5042 new_files_list[i] = vstorage->LevelFiles(i);
5043 }
5044
5045 if (first_nonempty_level > 0) {
5046 auto& new_last_level = new_files_list[new_levels - 1];
5047
5048 new_last_level = vstorage->LevelFiles(first_nonempty_level);
5049
5050 for (size_t i = 0; i < new_last_level.size(); ++i) {
5051 const FileMetaData* const meta = new_last_level[i];
5052 assert(meta);
5053
5054 const uint64_t file_number = meta->fd.GetNumber();
5055
5056 vstorage->file_locations_[file_number] =
5057 VersionStorageInfo::FileLocation(new_levels - 1, i);
5058 }
5059 }
5060
5061 delete[] vstorage -> files_;
5062 vstorage->files_ = new_files_list;
5063 vstorage->num_levels_ = new_levels;
5064
5065 MutableCFOptions mutable_cf_options(*options);
5066 VersionEdit ve;
5067 InstrumentedMutex dummy_mutex;
5068 InstrumentedMutexLock l(&dummy_mutex);
5069 return versions.LogAndApply(
5070 versions.GetColumnFamilySet()->GetDefault(),
5071 mutable_cf_options, &ve, &dummy_mutex, nullptr, true);
5072 }
5073
5074 // Get the checksum information including the checksum and checksum function
5075 // name of all SST files in VersionSet. Store the information in
5076 // FileChecksumList which contains a map from file number to its checksum info.
5077 // If DB is not running, make sure call VersionSet::Recover() to load the file
5078 // metadata from Manifest to VersionSet before calling this function.
5079 Status VersionSet::GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) {
5080 // Clean the previously stored checksum information if any.
5081 Status s;
5082 if (checksum_list == nullptr) {
5083 s = Status::InvalidArgument("checksum_list is nullptr");
5084 return s;
5085 }
5086 checksum_list->reset();
5087
5088 for (auto cfd : *column_family_set_) {
5089 if (cfd->IsDropped() || !cfd->initialized()) {
5090 continue;
5091 }
5092 for (int level = 0; level < cfd->NumberLevels(); level++) {
5093 for (const auto& file :
5094 cfd->current()->storage_info()->LevelFiles(level)) {
5095 s = checksum_list->InsertOneFileChecksum(file->fd.GetNumber(),
5096 file->file_checksum,
5097 file->file_checksum_func_name);
5098 if (!s.ok()) {
5099 break;
5100 }
5101 }
5102 if (!s.ok()) {
5103 break;
5104 }
5105 }
5106 if (!s.ok()) {
5107 break;
5108 }
5109 }
5110 return s;
5111 }
5112
5113 Status VersionSet::DumpManifest(Options& options, std::string& dscname,
5114 bool verbose, bool hex, bool json) {
5115 // Open the specified manifest file.
5116 std::unique_ptr<SequentialFileReader> file_reader;
5117 Status s;
5118 {
5119 std::unique_ptr<FSSequentialFile> file;
5120 const std::shared_ptr<FileSystem>& fs = options.env->GetFileSystem();
5121 s = fs->NewSequentialFile(
5122 dscname,
5123 fs->OptimizeForManifestRead(file_options_), &file,
5124 nullptr);
5125 if (!s.ok()) {
5126 return s;
5127 }
5128 file_reader.reset(new SequentialFileReader(
5129 std::move(file), dscname, db_options_->log_readahead_size, io_tracer_));
5130 }
5131
5132 std::vector<ColumnFamilyDescriptor> column_families(
5133 1, ColumnFamilyDescriptor(kDefaultColumnFamilyName, options));
5134 DumpManifestHandler handler(column_families, this, io_tracer_, verbose, hex,
5135 json);
5136 {
5137 VersionSet::LogReporter reporter;
5138 reporter.status = &s;
5139 log::Reader reader(nullptr, std::move(file_reader), &reporter,
5140 true /* checksum */, 0 /* log_number */);
5141 handler.Iterate(reader, &s);
5142 }
5143
5144 return handler.status();
5145 }
5146 #endif // ROCKSDB_LITE
5147
5148 void VersionSet::MarkFileNumberUsed(uint64_t number) {
5149 // only called during recovery and repair which are single threaded, so this
5150 // works because there can't be concurrent calls
5151 if (next_file_number_.load(std::memory_order_relaxed) <= number) {
5152 next_file_number_.store(number + 1, std::memory_order_relaxed);
5153 }
5154 }
5155 // Called only either from ::LogAndApply which is protected by mutex or during
5156 // recovery which is single-threaded.
5157 void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
5158 if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
5159 min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
5160 }
5161 }
5162
5163 Status VersionSet::WriteCurrentStateToManifest(
5164 const std::unordered_map<uint32_t, MutableCFState>& curr_state,
5165 const VersionEdit& wal_additions, log::Writer* log, IOStatus& io_s) {
5166 // TODO: Break up into multiple records to reduce memory usage on recovery?
5167
5168 // WARNING: This method doesn't hold a mutex!!
5169
5170 // This is done without DB mutex lock held, but only within single-threaded
5171 // LogAndApply. Column family manipulations can only happen within LogAndApply
5172 // (the same single thread), so we're safe to iterate.
5173
5174 assert(io_s.ok());
5175 if (db_options_->write_dbid_to_manifest) {
5176 VersionEdit edit_for_db_id;
5177 assert(!db_id_.empty());
5178 edit_for_db_id.SetDBId(db_id_);
5179 std::string db_id_record;
5180 if (!edit_for_db_id.EncodeTo(&db_id_record)) {
5181 return Status::Corruption("Unable to Encode VersionEdit:" +
5182 edit_for_db_id.DebugString(true));
5183 }
5184 io_s = log->AddRecord(db_id_record);
5185 if (!io_s.ok()) {
5186 return io_s;
5187 }
5188 }
5189
5190 // Save WALs.
5191 if (!wal_additions.GetWalAdditions().empty()) {
5192 TEST_SYNC_POINT_CALLBACK("VersionSet::WriteCurrentStateToManifest:SaveWal",
5193 const_cast<VersionEdit*>(&wal_additions));
5194 std::string record;
5195 if (!wal_additions.EncodeTo(&record)) {
5196 return Status::Corruption("Unable to Encode VersionEdit: " +
5197 wal_additions.DebugString(true));
5198 }
5199 io_s = log->AddRecord(record);
5200 if (!io_s.ok()) {
5201 return io_s;
5202 }
5203 }
5204
5205 for (auto cfd : *column_family_set_) {
5206 assert(cfd);
5207
5208 if (cfd->IsDropped()) {
5209 continue;
5210 }
5211 assert(cfd->initialized());
5212 {
5213 // Store column family info
5214 VersionEdit edit;
5215 if (cfd->GetID() != 0) {
5216 // default column family is always there,
5217 // no need to explicitly write it
5218 edit.AddColumnFamily(cfd->GetName());
5219 edit.SetColumnFamily(cfd->GetID());
5220 }
5221 edit.SetComparatorName(
5222 cfd->internal_comparator().user_comparator()->Name());
5223 std::string record;
5224 if (!edit.EncodeTo(&record)) {
5225 return Status::Corruption(
5226 "Unable to Encode VersionEdit:" + edit.DebugString(true));
5227 }
5228 io_s = log->AddRecord(record);
5229 if (!io_s.ok()) {
5230 return io_s;
5231 }
5232 }
5233
5234 {
5235 // Save files
5236 VersionEdit edit;
5237 edit.SetColumnFamily(cfd->GetID());
5238
5239 assert(cfd->current());
5240 assert(cfd->current()->storage_info());
5241
5242 for (int level = 0; level < cfd->NumberLevels(); level++) {
5243 for (const auto& f :
5244 cfd->current()->storage_info()->LevelFiles(level)) {
5245 edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(),
5246 f->fd.GetFileSize(), f->smallest, f->largest,
5247 f->fd.smallest_seqno, f->fd.largest_seqno,
5248 f->marked_for_compaction, f->oldest_blob_file_number,
5249 f->oldest_ancester_time, f->file_creation_time,
5250 f->file_checksum, f->file_checksum_func_name);
5251 }
5252 }
5253
5254 const auto& blob_files = cfd->current()->storage_info()->GetBlobFiles();
5255 for (const auto& pair : blob_files) {
5256 const uint64_t blob_file_number = pair.first;
5257 const auto& meta = pair.second;
5258
5259 assert(meta);
5260 assert(blob_file_number == meta->GetBlobFileNumber());
5261
5262 edit.AddBlobFile(blob_file_number, meta->GetTotalBlobCount(),
5263 meta->GetTotalBlobBytes(), meta->GetChecksumMethod(),
5264 meta->GetChecksumValue());
5265 if (meta->GetGarbageBlobCount() > 0) {
5266 edit.AddBlobFileGarbage(blob_file_number, meta->GetGarbageBlobCount(),
5267 meta->GetGarbageBlobBytes());
5268 }
5269 }
5270
5271 const auto iter = curr_state.find(cfd->GetID());
5272 assert(iter != curr_state.end());
5273 uint64_t log_number = iter->second.log_number;
5274 edit.SetLogNumber(log_number);
5275 std::string record;
5276 if (!edit.EncodeTo(&record)) {
5277 return Status::Corruption(
5278 "Unable to Encode VersionEdit:" + edit.DebugString(true));
5279 }
5280 io_s = log->AddRecord(record);
5281 if (!io_s.ok()) {
5282 return io_s;
5283 }
5284 }
5285 }
5286 return Status::OK();
5287 }
5288
5289 // TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this
5290 // function is called repeatedly with consecutive pairs of slices. For example
5291 // if the slice list is [a, b, c, d] this function is called with arguments
5292 // (a,b) then (b,c) then (c,d). Knowing this, an optimization is possible where
5293 // we avoid doing binary search for the keys b and c twice and instead somehow
5294 // maintain state of where they first appear in the files.
5295 uint64_t VersionSet::ApproximateSize(const SizeApproximationOptions& options,
5296 Version* v, const Slice& start,
5297 const Slice& end, int start_level,
5298 int end_level, TableReaderCaller caller) {
5299 const auto& icmp = v->cfd_->internal_comparator();
5300
5301 // pre-condition
5302 assert(icmp.Compare(start, end) <= 0);
5303
5304 uint64_t total_full_size = 0;
5305 const auto* vstorage = v->storage_info();
5306 const int num_non_empty_levels = vstorage->num_non_empty_levels();
5307 end_level = (end_level == -1) ? num_non_empty_levels
5308 : std::min(end_level, num_non_empty_levels);
5309
5310 assert(start_level <= end_level);
5311
5312 // Outline of the optimization that uses options.files_size_error_margin.
5313 // When approximating the files total size that is used to store a keys range,
5314 // we first sum up the sizes of the files that fully fall into the range.
5315 // Then we sum up the sizes of all the files that may intersect with the range
5316 // (this includes all files in L0 as well). Then, if total_intersecting_size
5317 // is smaller than total_full_size * options.files_size_error_margin - we can
5318 // infer that the intersecting files have a sufficiently negligible
5319 // contribution to the total size, and we can approximate the storage required
5320 // for the keys in range as just half of the intersecting_files_size.
5321 // E.g., if the value of files_size_error_margin is 0.1, then the error of the
5322 // approximation is limited to only ~10% of the total size of files that fully
5323 // fall into the keys range. In such case, this helps to avoid a costly
5324 // process of binary searching the intersecting files that is required only
5325 // for a more precise calculation of the total size.
5326
5327 autovector<FdWithKeyRange*, 32> first_files;
5328 autovector<FdWithKeyRange*, 16> last_files;
5329
5330 // scan all the levels
5331 for (int level = start_level; level < end_level; ++level) {
5332 const LevelFilesBrief& files_brief = vstorage->LevelFilesBrief(level);
5333 if (files_brief.num_files == 0) {
5334 // empty level, skip exploration
5335 continue;
5336 }
5337
5338 if (level == 0) {
5339 // level 0 files are not in sorted order, we need to iterate through
5340 // the list to compute the total bytes that require scanning,
5341 // so handle the case explicitly (similarly to first_files case)
5342 for (size_t i = 0; i < files_brief.num_files; i++) {
5343 first_files.push_back(&files_brief.files[i]);
5344 }
5345 continue;
5346 }
5347
5348 assert(level > 0);
5349 assert(files_brief.num_files > 0);
5350
5351 // identify the file position for start key
5352 const int idx_start =
5353 FindFileInRange(icmp, files_brief, start, 0,
5354 static_cast<uint32_t>(files_brief.num_files - 1));
5355 assert(static_cast<size_t>(idx_start) < files_brief.num_files);
5356
5357 // identify the file position for end key
5358 int idx_end = idx_start;
5359 if (icmp.Compare(files_brief.files[idx_end].largest_key, end) < 0) {
5360 idx_end =
5361 FindFileInRange(icmp, files_brief, end, idx_start,
5362 static_cast<uint32_t>(files_brief.num_files - 1));
5363 }
5364 assert(idx_end >= idx_start &&
5365 static_cast<size_t>(idx_end) < files_brief.num_files);
5366
5367 // scan all files from the starting index to the ending index
5368 // (inferred from the sorted order)
5369
5370 // first scan all the intermediate full files (excluding first and last)
5371 for (int i = idx_start + 1; i < idx_end; ++i) {
5372 uint64_t file_size = files_brief.files[i].fd.GetFileSize();
5373 // The entire file falls into the range, so we can just take its size.
5374 assert(file_size ==
5375 ApproximateSize(v, files_brief.files[i], start, end, caller));
5376 total_full_size += file_size;
5377 }
5378
5379 // save the first and the last files (which may be the same file), so we
5380 // can scan them later.
5381 first_files.push_back(&files_brief.files[idx_start]);
5382 if (idx_start != idx_end) {
5383 // we need to estimate size for both files, only if they are different
5384 last_files.push_back(&files_brief.files[idx_end]);
5385 }
5386 }
5387
5388 // The sum of all file sizes that intersect the [start, end] keys range.
5389 uint64_t total_intersecting_size = 0;
5390 for (const auto* file_ptr : first_files) {
5391 total_intersecting_size += file_ptr->fd.GetFileSize();
5392 }
5393 for (const auto* file_ptr : last_files) {
5394 total_intersecting_size += file_ptr->fd.GetFileSize();
5395 }
5396
5397 // Now scan all the first & last files at each level, and estimate their size.
5398 // If the total_intersecting_size is less than X% of the total_full_size - we
5399 // want to approximate the result in order to avoid the costly binary search
5400 // inside ApproximateSize. We use half of file size as an approximation below.
5401
5402 const double margin = options.files_size_error_margin;
5403 if (margin > 0 && total_intersecting_size <
5404 static_cast<uint64_t>(total_full_size * margin)) {
5405 total_full_size += total_intersecting_size / 2;
5406 } else {
5407 // Estimate for all the first files (might also be last files), at each
5408 // level
5409 for (const auto file_ptr : first_files) {
5410 total_full_size += ApproximateSize(v, *file_ptr, start, end, caller);
5411 }
5412
5413 // Estimate for all the last files, at each level
5414 for (const auto file_ptr : last_files) {
5415 // We could use ApproximateSize here, but calling ApproximateOffsetOf
5416 // directly is just more efficient.
5417 total_full_size += ApproximateOffsetOf(v, *file_ptr, end, caller);
5418 }
5419 }
5420
5421 return total_full_size;
5422 }
5423
5424 uint64_t VersionSet::ApproximateOffsetOf(Version* v, const FdWithKeyRange& f,
5425 const Slice& key,
5426 TableReaderCaller caller) {
5427 // pre-condition
5428 assert(v);
5429 const auto& icmp = v->cfd_->internal_comparator();
5430
5431 uint64_t result = 0;
5432 if (icmp.Compare(f.largest_key, key) <= 0) {
5433 // Entire file is before "key", so just add the file size
5434 result = f.fd.GetFileSize();
5435 } else if (icmp.Compare(f.smallest_key, key) > 0) {
5436 // Entire file is after "key", so ignore
5437 result = 0;
5438 } else {
5439 // "key" falls in the range for this table. Add the
5440 // approximate offset of "key" within the table.
5441 TableCache* table_cache = v->cfd_->table_cache();
5442 if (table_cache != nullptr) {
5443 result = table_cache->ApproximateOffsetOf(
5444 key, f.file_metadata->fd, caller, icmp,
5445 v->GetMutableCFOptions().prefix_extractor.get());
5446 }
5447 }
5448 return result;
5449 }
5450
5451 uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f,
5452 const Slice& start, const Slice& end,
5453 TableReaderCaller caller) {
5454 // pre-condition
5455 assert(v);
5456 const auto& icmp = v->cfd_->internal_comparator();
5457 assert(icmp.Compare(start, end) <= 0);
5458
5459 if (icmp.Compare(f.largest_key, start) <= 0 ||
5460 icmp.Compare(f.smallest_key, end) > 0) {
5461 // Entire file is before or after the start/end keys range
5462 return 0;
5463 }
5464
5465 if (icmp.Compare(f.smallest_key, start) >= 0) {
5466 // Start of the range is before the file start - approximate by end offset
5467 return ApproximateOffsetOf(v, f, end, caller);
5468 }
5469
5470 if (icmp.Compare(f.largest_key, end) < 0) {
5471 // End of the range is after the file end - approximate by subtracting
5472 // start offset from the file size
5473 uint64_t start_offset = ApproximateOffsetOf(v, f, start, caller);
5474 assert(f.fd.GetFileSize() >= start_offset);
5475 return f.fd.GetFileSize() - start_offset;
5476 }
5477
5478 // The interval falls entirely in the range for this file.
5479 TableCache* table_cache = v->cfd_->table_cache();
5480 if (table_cache == nullptr) {
5481 return 0;
5482 }
5483 return table_cache->ApproximateSize(
5484 start, end, f.file_metadata->fd, caller, icmp,
5485 v->GetMutableCFOptions().prefix_extractor.get());
5486 }
5487
5488 void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_table_files,
5489 std::vector<uint64_t>* live_blob_files) const {
5490 assert(live_table_files);
5491 assert(live_blob_files);
5492
5493 // pre-calculate space requirement
5494 size_t total_table_files = 0;
5495 size_t total_blob_files = 0;
5496
5497 assert(column_family_set_);
5498 for (auto cfd : *column_family_set_) {
5499 assert(cfd);
5500
5501 if (!cfd->initialized()) {
5502 continue;
5503 }
5504
5505 Version* const dummy_versions = cfd->dummy_versions();
5506 assert(dummy_versions);
5507
5508 for (Version* v = dummy_versions->next_; v != dummy_versions;
5509 v = v->next_) {
5510 assert(v);
5511
5512 const auto* vstorage = v->storage_info();
5513 assert(vstorage);
5514
5515 for (int level = 0; level < vstorage->num_levels(); ++level) {
5516 total_table_files += vstorage->LevelFiles(level).size();
5517 }
5518
5519 total_blob_files += vstorage->GetBlobFiles().size();
5520 }
5521 }
5522
5523 // just one time extension to the right size
5524 live_table_files->reserve(live_table_files->size() + total_table_files);
5525 live_blob_files->reserve(live_blob_files->size() + total_blob_files);
5526
5527 assert(column_family_set_);
5528 for (auto cfd : *column_family_set_) {
5529 assert(cfd);
5530 if (!cfd->initialized()) {
5531 continue;
5532 }
5533
5534 auto* current = cfd->current();
5535 bool found_current = false;
5536
5537 Version* const dummy_versions = cfd->dummy_versions();
5538 assert(dummy_versions);
5539
5540 for (Version* v = dummy_versions->next_; v != dummy_versions;
5541 v = v->next_) {
5542 v->AddLiveFiles(live_table_files, live_blob_files);
5543 if (v == current) {
5544 found_current = true;
5545 }
5546 }
5547
5548 if (!found_current && current != nullptr) {
5549 // Should never happen unless it is a bug.
5550 assert(false);
5551 current->AddLiveFiles(live_table_files, live_blob_files);
5552 }
5553 }
5554 }
5555
5556 InternalIterator* VersionSet::MakeInputIterator(
5557 const ReadOptions& read_options, const Compaction* c,
5558 RangeDelAggregator* range_del_agg,
5559 const FileOptions& file_options_compactions) {
5560 auto cfd = c->column_family_data();
5561 // Level-0 files have to be merged together. For other levels,
5562 // we will make a concatenating iterator per level.
5563 // TODO(opt): use concatenating iterator for level-0 if there is no overlap
5564 const size_t space = (c->level() == 0 ? c->input_levels(0)->num_files +
5565 c->num_input_levels() - 1
5566 : c->num_input_levels());
5567 InternalIterator** list = new InternalIterator* [space];
5568 size_t num = 0;
5569 for (size_t which = 0; which < c->num_input_levels(); which++) {
5570 if (c->input_levels(which)->num_files != 0) {
5571 if (c->level(which) == 0) {
5572 const LevelFilesBrief* flevel = c->input_levels(which);
5573 for (size_t i = 0; i < flevel->num_files; i++) {
5574 list[num++] = cfd->table_cache()->NewIterator(
5575 read_options, file_options_compactions,
5576 cfd->internal_comparator(), *flevel->files[i].file_metadata,
5577 range_del_agg, c->mutable_cf_options()->prefix_extractor.get(),
5578 /*table_reader_ptr=*/nullptr,
5579 /*file_read_hist=*/nullptr, TableReaderCaller::kCompaction,
5580 /*arena=*/nullptr,
5581 /*skip_filters=*/false,
5582 /*level=*/static_cast<int>(c->level(which)),
5583 MaxFileSizeForL0MetaPin(*c->mutable_cf_options()),
5584 /*smallest_compaction_key=*/nullptr,
5585 /*largest_compaction_key=*/nullptr,
5586 /*allow_unprepared_value=*/false);
5587 }
5588 } else {
5589 // Create concatenating iterator for the files from this level
5590 list[num++] = new LevelIterator(
5591 cfd->table_cache(), read_options, file_options_compactions,
5592 cfd->internal_comparator(), c->input_levels(which),
5593 c->mutable_cf_options()->prefix_extractor.get(),
5594 /*should_sample=*/false,
5595 /*no per level latency histogram=*/nullptr,
5596 TableReaderCaller::kCompaction, /*skip_filters=*/false,
5597 /*level=*/static_cast<int>(c->level(which)), range_del_agg,
5598 c->boundaries(which));
5599 }
5600 }
5601 }
5602 assert(num <= space);
5603 InternalIterator* result =
5604 NewMergingIterator(&c->column_family_data()->internal_comparator(), list,
5605 static_cast<int>(num));
5606 delete[] list;
5607 return result;
5608 }
5609
5610 // verify that the files listed in this compaction are present
5611 // in the current version
5612 bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
5613 #ifndef NDEBUG
5614 Version* version = c->column_family_data()->current();
5615 const VersionStorageInfo* vstorage = version->storage_info();
5616 if (c->input_version() != version) {
5617 ROCKS_LOG_INFO(
5618 db_options_->info_log,
5619 "[%s] compaction output being applied to a different base version from"
5620 " input version",
5621 c->column_family_data()->GetName().c_str());
5622
5623 if (vstorage->compaction_style_ == kCompactionStyleLevel &&
5624 c->start_level() == 0 && c->num_input_levels() > 2U) {
5625 // We are doing a L0->base_level compaction. The assumption is if
5626 // base level is not L1, levels from L1 to base_level - 1 is empty.
5627 // This is ensured by having one compaction from L0 going on at the
5628 // same time in level-based compaction. So that during the time, no
5629 // compaction/flush can put files to those levels.
5630 for (int l = c->start_level() + 1; l < c->output_level(); l++) {
5631 if (vstorage->NumLevelFiles(l) != 0) {
5632 return false;
5633 }
5634 }
5635 }
5636 }
5637
5638 for (size_t input = 0; input < c->num_input_levels(); ++input) {
5639 int level = c->level(input);
5640 for (size_t i = 0; i < c->num_input_files(input); ++i) {
5641 uint64_t number = c->input(input, i)->fd.GetNumber();
5642 bool found = false;
5643 for (size_t j = 0; j < vstorage->files_[level].size(); j++) {
5644 FileMetaData* f = vstorage->files_[level][j];
5645 if (f->fd.GetNumber() == number) {
5646 found = true;
5647 break;
5648 }
5649 }
5650 if (!found) {
5651 return false; // input files non existent in current version
5652 }
5653 }
5654 }
5655 #else
5656 (void)c;
5657 #endif
5658 return true; // everything good
5659 }
5660
5661 Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
5662 FileMetaData** meta,
5663 ColumnFamilyData** cfd) {
5664 for (auto cfd_iter : *column_family_set_) {
5665 if (!cfd_iter->initialized()) {
5666 continue;
5667 }
5668 Version* version = cfd_iter->current();
5669 const auto* vstorage = version->storage_info();
5670 for (int level = 0; level < vstorage->num_levels(); level++) {
5671 for (const auto& file : vstorage->LevelFiles(level)) {
5672 if (file->fd.GetNumber() == number) {
5673 *meta = file;
5674 *filelevel = level;
5675 *cfd = cfd_iter;
5676 return Status::OK();
5677 }
5678 }
5679 }
5680 }
5681 return Status::NotFound("File not present in any level");
5682 }
5683
5684 void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
5685 for (auto cfd : *column_family_set_) {
5686 if (cfd->IsDropped() || !cfd->initialized()) {
5687 continue;
5688 }
5689 for (int level = 0; level < cfd->NumberLevels(); level++) {
5690 for (const auto& file :
5691 cfd->current()->storage_info()->LevelFiles(level)) {
5692 LiveFileMetaData filemetadata;
5693 filemetadata.column_family_name = cfd->GetName();
5694 uint32_t path_id = file->fd.GetPathId();
5695 if (path_id < cfd->ioptions()->cf_paths.size()) {
5696 filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
5697 } else {
5698 assert(!cfd->ioptions()->cf_paths.empty());
5699 filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
5700 }
5701 const uint64_t file_number = file->fd.GetNumber();
5702 filemetadata.name = MakeTableFileName("", file_number);
5703 filemetadata.file_number = file_number;
5704 filemetadata.level = level;
5705 filemetadata.size = static_cast<size_t>(file->fd.GetFileSize());
5706 filemetadata.smallestkey = file->smallest.user_key().ToString();
5707 filemetadata.largestkey = file->largest.user_key().ToString();
5708 filemetadata.smallest_seqno = file->fd.smallest_seqno;
5709 filemetadata.largest_seqno = file->fd.largest_seqno;
5710 filemetadata.num_reads_sampled = file->stats.num_reads_sampled.load(
5711 std::memory_order_relaxed);
5712 filemetadata.being_compacted = file->being_compacted;
5713 filemetadata.num_entries = file->num_entries;
5714 filemetadata.num_deletions = file->num_deletions;
5715 filemetadata.oldest_blob_file_number = file->oldest_blob_file_number;
5716 filemetadata.file_checksum = file->file_checksum;
5717 filemetadata.file_checksum_func_name = file->file_checksum_func_name;
5718 metadata->push_back(filemetadata);
5719 }
5720 }
5721 }
5722 }
5723
5724 void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
5725 std::vector<ObsoleteBlobFileInfo>* blob_files,
5726 std::vector<std::string>* manifest_filenames,
5727 uint64_t min_pending_output) {
5728 assert(files);
5729 assert(blob_files);
5730 assert(manifest_filenames);
5731 assert(files->empty());
5732 assert(blob_files->empty());
5733 assert(manifest_filenames->empty());
5734
5735 std::vector<ObsoleteFileInfo> pending_files;
5736 for (auto& f : obsolete_files_) {
5737 if (f.metadata->fd.GetNumber() < min_pending_output) {
5738 files->emplace_back(std::move(f));
5739 } else {
5740 pending_files.emplace_back(std::move(f));
5741 }
5742 }
5743 obsolete_files_.swap(pending_files);
5744
5745 std::vector<ObsoleteBlobFileInfo> pending_blob_files;
5746 for (auto& blob_file : obsolete_blob_files_) {
5747 if (blob_file.GetBlobFileNumber() < min_pending_output) {
5748 blob_files->emplace_back(std::move(blob_file));
5749 } else {
5750 pending_blob_files.emplace_back(std::move(blob_file));
5751 }
5752 }
5753 obsolete_blob_files_.swap(pending_blob_files);
5754
5755 obsolete_manifests_.swap(*manifest_filenames);
5756 }
5757
5758 ColumnFamilyData* VersionSet::CreateColumnFamily(
5759 const ColumnFamilyOptions& cf_options, const VersionEdit* edit) {
5760 assert(edit->is_column_family_add_);
5761
5762 MutableCFOptions dummy_cf_options;
5763 Version* dummy_versions =
5764 new Version(nullptr, this, file_options_, dummy_cf_options, io_tracer_);
5765 // Ref() dummy version once so that later we can call Unref() to delete it
5766 // by avoiding calling "delete" explicitly (~Version is private)
5767 dummy_versions->Ref();
5768 auto new_cfd = column_family_set_->CreateColumnFamily(
5769 edit->column_family_name_, edit->column_family_, dummy_versions,
5770 cf_options);
5771
5772 Version* v = new Version(new_cfd, this, file_options_,
5773 *new_cfd->GetLatestMutableCFOptions(), io_tracer_,
5774 current_version_number_++);
5775
5776 // Fill level target base information.
5777 v->storage_info()->CalculateBaseBytes(*new_cfd->ioptions(),
5778 *new_cfd->GetLatestMutableCFOptions());
5779 AppendVersion(new_cfd, v);
5780 // GetLatestMutableCFOptions() is safe here without mutex since the
5781 // cfd is not available to client
5782 new_cfd->CreateNewMemtable(*new_cfd->GetLatestMutableCFOptions(),
5783 LastSequence());
5784 new_cfd->SetLogNumber(edit->log_number_);
5785 return new_cfd;
5786 }
5787
5788 uint64_t VersionSet::GetNumLiveVersions(Version* dummy_versions) {
5789 uint64_t count = 0;
5790 for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5791 count++;
5792 }
5793 return count;
5794 }
5795
5796 uint64_t VersionSet::GetTotalSstFilesSize(Version* dummy_versions) {
5797 std::unordered_set<uint64_t> unique_files;
5798 uint64_t total_files_size = 0;
5799 for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) {
5800 VersionStorageInfo* storage_info = v->storage_info();
5801 for (int level = 0; level < storage_info->num_levels_; level++) {
5802 for (const auto& file_meta : storage_info->LevelFiles(level)) {
5803 if (unique_files.find(file_meta->fd.packed_number_and_path_id) ==
5804 unique_files.end()) {
5805 unique_files.insert(file_meta->fd.packed_number_and_path_id);
5806 total_files_size += file_meta->fd.GetFileSize();
5807 }
5808 }
5809 }
5810 }
5811 return total_files_size;
5812 }
5813
5814 Status VersionSet::VerifyFileMetadata(const std::string& fpath,
5815 const FileMetaData& meta) const {
5816 uint64_t fsize = 0;
5817 Status status = fs_->GetFileSize(fpath, IOOptions(), &fsize, nullptr);
5818 if (status.ok()) {
5819 if (fsize != meta.fd.GetFileSize()) {
5820 status = Status::Corruption("File size mismatch: " + fpath);
5821 }
5822 }
5823 return status;
5824 }
5825
5826 ReactiveVersionSet::ReactiveVersionSet(
5827 const std::string& dbname, const ImmutableDBOptions* _db_options,
5828 const FileOptions& _file_options, Cache* table_cache,
5829 WriteBufferManager* write_buffer_manager, WriteController* write_controller,
5830 const std::shared_ptr<IOTracer>& io_tracer)
5831 : VersionSet(dbname, _db_options, _file_options, table_cache,
5832 write_buffer_manager, write_controller,
5833 /*block_cache_tracer=*/nullptr, io_tracer),
5834 number_of_edits_to_skip_(0) {}
5835
5836 ReactiveVersionSet::~ReactiveVersionSet() {}
5837
5838 Status ReactiveVersionSet::Recover(
5839 const std::vector<ColumnFamilyDescriptor>& column_families,
5840 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
5841 std::unique_ptr<log::Reader::Reporter>* manifest_reporter,
5842 std::unique_ptr<Status>* manifest_reader_status) {
5843 assert(manifest_reader != nullptr);
5844 assert(manifest_reporter != nullptr);
5845 assert(manifest_reader_status != nullptr);
5846
5847 std::unordered_map<std::string, ColumnFamilyOptions> cf_name_to_options;
5848 for (const auto& cf : column_families) {
5849 cf_name_to_options.insert({cf.name, cf.options});
5850 }
5851
5852 // add default column family
5853 auto default_cf_iter = cf_name_to_options.find(kDefaultColumnFamilyName);
5854 if (default_cf_iter == cf_name_to_options.end()) {
5855 return Status::InvalidArgument("Default column family not specified");
5856 }
5857 VersionEdit default_cf_edit;
5858 default_cf_edit.AddColumnFamily(kDefaultColumnFamilyName);
5859 default_cf_edit.SetColumnFamily(0);
5860 ColumnFamilyData* default_cfd =
5861 CreateColumnFamily(default_cf_iter->second, &default_cf_edit);
5862 // In recovery, nobody else can access it, so it's fine to set it to be
5863 // initialized earlier.
5864 default_cfd->set_initialized();
5865 VersionBuilderMap builders;
5866 std::unordered_map<int, std::string> column_families_not_found;
5867 builders.insert(
5868 std::make_pair(0, std::unique_ptr<BaseReferencedVersionBuilder>(
5869 new BaseReferencedVersionBuilder(default_cfd))));
5870
5871 manifest_reader_status->reset(new Status());
5872 manifest_reporter->reset(new LogReporter());
5873 static_cast_with_check<LogReporter>(manifest_reporter->get())->status =
5874 manifest_reader_status->get();
5875 Status s = MaybeSwitchManifest(manifest_reporter->get(), manifest_reader);
5876 log::Reader* reader = manifest_reader->get();
5877
5878 int retry = 0;
5879 VersionEdit version_edit;
5880 while (s.ok() && retry < 1) {
5881 assert(reader != nullptr);
5882 s = ReadAndRecover(*reader, &read_buffer_, cf_name_to_options,
5883 column_families_not_found, builders,
5884 manifest_reader_status->get(), &version_edit);
5885 if (s.ok()) {
5886 bool enough = version_edit.has_next_file_number_ &&
5887 version_edit.has_log_number_ &&
5888 version_edit.has_last_sequence_;
5889 if (enough) {
5890 for (const auto& cf : column_families) {
5891 auto cfd = column_family_set_->GetColumnFamily(cf.name);
5892 if (cfd == nullptr) {
5893 enough = false;
5894 break;
5895 }
5896 }
5897 }
5898 if (enough) {
5899 for (const auto& cf : column_families) {
5900 auto cfd = column_family_set_->GetColumnFamily(cf.name);
5901 assert(cfd != nullptr);
5902 if (!cfd->IsDropped()) {
5903 auto builder_iter = builders.find(cfd->GetID());
5904 assert(builder_iter != builders.end());
5905 auto builder = builder_iter->second->version_builder();
5906 assert(builder != nullptr);
5907 s = builder->LoadTableHandlers(
5908 cfd->internal_stats(), db_options_->max_file_opening_threads,
5909 false /* prefetch_index_and_filter_in_cache */,
5910 true /* is_initial_load */,
5911 cfd->GetLatestMutableCFOptions()->prefix_extractor.get(),
5912 MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions()));
5913 if (!s.ok()) {
5914 enough = false;
5915 if (s.IsPathNotFound()) {
5916 s = Status::OK();
5917 }
5918 break;
5919 }
5920 }
5921 }
5922 }
5923 if (enough) {
5924 break;
5925 }
5926 }
5927 ++retry;
5928 }
5929
5930 if (s.ok()) {
5931 if (!version_edit.has_prev_log_number_) {
5932 version_edit.prev_log_number_ = 0;
5933 }
5934 column_family_set_->UpdateMaxColumnFamily(version_edit.max_column_family_);
5935
5936 MarkMinLogNumberToKeep2PC(version_edit.min_log_number_to_keep_);
5937 MarkFileNumberUsed(version_edit.prev_log_number_);
5938 MarkFileNumberUsed(version_edit.log_number_);
5939
5940 for (auto cfd : *column_family_set_) {
5941 assert(builders.count(cfd->GetID()) > 0);
5942 auto builder = builders[cfd->GetID()]->version_builder();
5943 if (!builder->CheckConsistencyForNumLevels()) {
5944 s = Status::InvalidArgument(
5945 "db has more levels than options.num_levels");
5946 break;
5947 }
5948 }
5949 }
5950
5951 if (s.ok()) {
5952 for (auto cfd : *column_family_set_) {
5953 if (cfd->IsDropped()) {
5954 continue;
5955 }
5956 assert(cfd->initialized());
5957 auto builders_iter = builders.find(cfd->GetID());
5958 assert(builders_iter != builders.end());
5959 auto* builder = builders_iter->second->version_builder();
5960
5961 Version* v = new Version(cfd, this, file_options_,
5962 *cfd->GetLatestMutableCFOptions(), io_tracer_,
5963 current_version_number_++);
5964 s = builder->SaveTo(v->storage_info());
5965
5966 if (s.ok()) {
5967 // Install recovered version
5968 v->PrepareApply(*cfd->GetLatestMutableCFOptions(),
5969 !(db_options_->skip_stats_update_on_db_open));
5970 AppendVersion(cfd, v);
5971 } else {
5972 ROCKS_LOG_ERROR(db_options_->info_log,
5973 "[%s]: inconsistent version: %s\n",
5974 cfd->GetName().c_str(), s.ToString().c_str());
5975 delete v;
5976 break;
5977 }
5978 }
5979 }
5980 if (s.ok()) {
5981 next_file_number_.store(version_edit.next_file_number_ + 1);
5982 last_allocated_sequence_ = version_edit.last_sequence_;
5983 last_published_sequence_ = version_edit.last_sequence_;
5984 last_sequence_ = version_edit.last_sequence_;
5985 prev_log_number_ = version_edit.prev_log_number_;
5986 for (auto cfd : *column_family_set_) {
5987 if (cfd->IsDropped()) {
5988 continue;
5989 }
5990 ROCKS_LOG_INFO(db_options_->info_log,
5991 "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
5992 cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
5993 }
5994 }
5995 return s;
5996 }
5997
5998 Status ReactiveVersionSet::ReadAndApply(
5999 InstrumentedMutex* mu,
6000 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader,
6001 std::unordered_set<ColumnFamilyData*>* cfds_changed) {
6002 assert(manifest_reader != nullptr);
6003 assert(cfds_changed != nullptr);
6004 mu->AssertHeld();
6005
6006 Status s;
6007 uint64_t applied_edits = 0;
6008 while (s.ok()) {
6009 Slice record;
6010 std::string scratch;
6011 log::Reader* reader = manifest_reader->get();
6012 std::string old_manifest_path = reader->file()->file_name();
6013 while (reader->ReadRecord(&record, &scratch)) {
6014 VersionEdit edit;
6015 s = edit.DecodeFrom(record);
6016 if (!s.ok()) {
6017 break;
6018 }
6019
6020 // Skip the first VersionEdits of each MANIFEST generated by
6021 // VersionSet::WriteCurrentStatetoManifest.
6022 if (number_of_edits_to_skip_ > 0) {
6023 ColumnFamilyData* cfd =
6024 column_family_set_->GetColumnFamily(edit.column_family_);
6025 if (cfd != nullptr && !cfd->IsDropped()) {
6026 --number_of_edits_to_skip_;
6027 }
6028 continue;
6029 }
6030
6031 s = read_buffer_.AddEdit(&edit);
6032 if (!s.ok()) {
6033 break;
6034 }
6035 VersionEdit temp_edit;
6036 if (edit.is_in_atomic_group_) {
6037 if (read_buffer_.IsFull()) {
6038 // Apply edits in an atomic group when we have read all edits in the
6039 // group.
6040 for (auto& e : read_buffer_.replay_buffer()) {
6041 s = ApplyOneVersionEditToBuilder(e, cfds_changed, &temp_edit);
6042 if (!s.ok()) {
6043 break;
6044 }
6045 applied_edits++;
6046 }
6047 if (!s.ok()) {
6048 break;
6049 }
6050 read_buffer_.Clear();
6051 }
6052 } else {
6053 // Apply a normal edit immediately.
6054 s = ApplyOneVersionEditToBuilder(edit, cfds_changed, &temp_edit);
6055 if (s.ok()) {
6056 applied_edits++;
6057 } else {
6058 break;
6059 }
6060 }
6061 }
6062 if (!s.ok()) {
6063 // Clear the buffer if we fail to decode/apply an edit.
6064 read_buffer_.Clear();
6065 }
6066 // It's possible that:
6067 // 1) s.IsCorruption(), indicating the current MANIFEST is corrupted.
6068 // Or the version(s) rebuilt from tailing the MANIFEST is inconsistent.
6069 // 2) we have finished reading the current MANIFEST.
6070 // 3) we have encountered an IOError reading the current MANIFEST.
6071 // We need to look for the next MANIFEST and start from there. If we cannot
6072 // find the next MANIFEST, we should exit the loop.
6073 Status tmp_s = MaybeSwitchManifest(reader->GetReporter(), manifest_reader);
6074 reader = manifest_reader->get();
6075 if (tmp_s.ok()) {
6076 if (reader->file()->file_name() == old_manifest_path) {
6077 // Still processing the same MANIFEST, thus no need to continue this
6078 // loop since no record is available if we have reached here.
6079 break;
6080 } else {
6081 // We have switched to a new MANIFEST whose first records have been
6082 // generated by VersionSet::WriteCurrentStatetoManifest. Since the
6083 // secondary instance has already finished recovering upon start, there
6084 // is no need for the secondary to process these records. Actually, if
6085 // the secondary were to replay these records, the secondary may end up
6086 // adding the same SST files AGAIN to each column family, causing
6087 // consistency checks done by VersionBuilder to fail. Therefore, we
6088 // record the number of records to skip at the beginning of the new
6089 // MANIFEST and ignore them.
6090 number_of_edits_to_skip_ = 0;
6091 for (auto* cfd : *column_family_set_) {
6092 if (cfd->IsDropped()) {
6093 continue;
6094 }
6095 // Increase number_of_edits_to_skip by 2 because
6096 // WriteCurrentStatetoManifest() writes 2 version edits for each
6097 // column family at the beginning of the newly-generated MANIFEST.
6098 // TODO(yanqin) remove hard-coded value.
6099 if (db_options_->write_dbid_to_manifest) {
6100 number_of_edits_to_skip_ += 3;
6101 } else {
6102 number_of_edits_to_skip_ += 2;
6103 }
6104 }
6105 s = tmp_s;
6106 }
6107 }
6108 }
6109
6110 if (s.ok()) {
6111 for (auto cfd : *column_family_set_) {
6112 auto builder_iter = active_version_builders_.find(cfd->GetID());
6113 if (builder_iter == active_version_builders_.end()) {
6114 continue;
6115 }
6116 auto builder = builder_iter->second->version_builder();
6117 if (!builder->CheckConsistencyForNumLevels()) {
6118 s = Status::InvalidArgument(
6119 "db has more levels than options.num_levels");
6120 break;
6121 }
6122 }
6123 }
6124 TEST_SYNC_POINT_CALLBACK("ReactiveVersionSet::ReadAndApply:AppliedEdits",
6125 &applied_edits);
6126 return s;
6127 }
6128
6129 Status ReactiveVersionSet::ApplyOneVersionEditToBuilder(
6130 VersionEdit& edit, std::unordered_set<ColumnFamilyData*>* cfds_changed,
6131 VersionEdit* version_edit) {
6132 ColumnFamilyData* cfd =
6133 column_family_set_->GetColumnFamily(edit.column_family_);
6134
6135 // If we cannot find this column family in our column family set, then it
6136 // may be a new column family created by the primary after the secondary
6137 // starts. It is also possible that the secondary instance opens only a subset
6138 // of column families. Ignore it for now.
6139 if (nullptr == cfd) {
6140 return Status::OK();
6141 }
6142 if (active_version_builders_.find(edit.column_family_) ==
6143 active_version_builders_.end() &&
6144 !cfd->IsDropped()) {
6145 std::unique_ptr<BaseReferencedVersionBuilder> builder_guard(
6146 new BaseReferencedVersionBuilder(cfd));
6147 active_version_builders_.insert(
6148 std::make_pair(edit.column_family_, std::move(builder_guard)));
6149 }
6150
6151 auto builder_iter = active_version_builders_.find(edit.column_family_);
6152 assert(builder_iter != active_version_builders_.end());
6153 auto builder = builder_iter->second->version_builder();
6154 assert(builder != nullptr);
6155
6156 if (edit.is_column_family_add_) {
6157 // TODO (yanqin) for now the secondary ignores column families created
6158 // after Open. This also simplifies handling of switching to a new MANIFEST
6159 // and processing the snapshot of the system at the beginning of the
6160 // MANIFEST.
6161 } else if (edit.is_column_family_drop_) {
6162 // Drop the column family by setting it to be 'dropped' without destroying
6163 // the column family handle.
6164 // TODO (haoyu) figure out how to handle column faimly drop for
6165 // secondary instance. (Is it possible that the ref count for cfd is 0 but
6166 // the ref count for its versions is higher than 0?)
6167 cfd->SetDropped();
6168 if (cfd->UnrefAndTryDelete()) {
6169 cfd = nullptr;
6170 }
6171 active_version_builders_.erase(builder_iter);
6172 } else {
6173 Status s = builder->Apply(&edit);
6174 if (!s.ok()) {
6175 return s;
6176 }
6177 }
6178 Status s = ExtractInfoFromVersionEdit(cfd, edit, version_edit);
6179 if (!s.ok()) {
6180 return s;
6181 }
6182
6183 if (cfd != nullptr && !cfd->IsDropped()) {
6184 s = builder->LoadTableHandlers(
6185 cfd->internal_stats(), db_options_->max_file_opening_threads,
6186 false /* prefetch_index_and_filter_in_cache */,
6187 false /* is_initial_load */,
6188 cfd->GetLatestMutableCFOptions()->prefix_extractor.get(),
6189 MaxFileSizeForL0MetaPin(*cfd->GetLatestMutableCFOptions()));
6190 TEST_SYNC_POINT_CALLBACK(
6191 "ReactiveVersionSet::ApplyOneVersionEditToBuilder:"
6192 "AfterLoadTableHandlers",
6193 &s);
6194
6195 if (s.ok()) {
6196 auto version = new Version(cfd, this, file_options_,
6197 *cfd->GetLatestMutableCFOptions(), io_tracer_,
6198 current_version_number_++);
6199 s = builder->SaveTo(version->storage_info());
6200 if (s.ok()) {
6201 version->PrepareApply(*cfd->GetLatestMutableCFOptions(), true);
6202 AppendVersion(cfd, version);
6203 active_version_builders_.erase(builder_iter);
6204 if (cfds_changed->count(cfd) == 0) {
6205 cfds_changed->insert(cfd);
6206 }
6207 } else {
6208 delete version;
6209 }
6210 } else if (s.IsPathNotFound()) {
6211 s = Status::OK();
6212 }
6213 // Some other error has occurred during LoadTableHandlers.
6214 }
6215
6216 if (s.ok()) {
6217 if (version_edit->HasNextFile()) {
6218 next_file_number_.store(version_edit->next_file_number_ + 1);
6219 }
6220 if (version_edit->has_last_sequence_) {
6221 last_allocated_sequence_ = version_edit->last_sequence_;
6222 last_published_sequence_ = version_edit->last_sequence_;
6223 last_sequence_ = version_edit->last_sequence_;
6224 }
6225 if (version_edit->has_prev_log_number_) {
6226 prev_log_number_ = version_edit->prev_log_number_;
6227 MarkFileNumberUsed(version_edit->prev_log_number_);
6228 }
6229 if (version_edit->has_log_number_) {
6230 MarkFileNumberUsed(version_edit->log_number_);
6231 }
6232 column_family_set_->UpdateMaxColumnFamily(version_edit->max_column_family_);
6233 MarkMinLogNumberToKeep2PC(version_edit->min_log_number_to_keep_);
6234 }
6235 return s;
6236 }
6237
6238 Status ReactiveVersionSet::MaybeSwitchManifest(
6239 log::Reader::Reporter* reporter,
6240 std::unique_ptr<log::FragmentBufferedReader>* manifest_reader) {
6241 assert(manifest_reader != nullptr);
6242 Status s;
6243 do {
6244 std::string manifest_path;
6245 s = GetCurrentManifestPath(dbname_, fs_.get(), &manifest_path,
6246 &manifest_file_number_);
6247 std::unique_ptr<FSSequentialFile> manifest_file;
6248 if (s.ok()) {
6249 if (nullptr == manifest_reader->get() ||
6250 manifest_reader->get()->file()->file_name() != manifest_path) {
6251 TEST_SYNC_POINT(
6252 "ReactiveVersionSet::MaybeSwitchManifest:"
6253 "AfterGetCurrentManifestPath:0");
6254 TEST_SYNC_POINT(
6255 "ReactiveVersionSet::MaybeSwitchManifest:"
6256 "AfterGetCurrentManifestPath:1");
6257 s = fs_->NewSequentialFile(manifest_path,
6258 env_->OptimizeForManifestRead(file_options_),
6259 &manifest_file, nullptr);
6260 } else {
6261 // No need to switch manifest.
6262 break;
6263 }
6264 }
6265 std::unique_ptr<SequentialFileReader> manifest_file_reader;
6266 if (s.ok()) {
6267 manifest_file_reader.reset(new SequentialFileReader(
6268 std::move(manifest_file), manifest_path,
6269 db_options_->log_readahead_size, io_tracer_));
6270 manifest_reader->reset(new log::FragmentBufferedReader(
6271 nullptr, std::move(manifest_file_reader), reporter,
6272 true /* checksum */, 0 /* log_number */));
6273 ROCKS_LOG_INFO(db_options_->info_log, "Switched to new manifest: %s\n",
6274 manifest_path.c_str());
6275 // TODO (yanqin) every time we switch to a new MANIFEST, we clear the
6276 // active_version_builders_ map because we choose to construct the
6277 // versions from scratch, thanks to the first part of each MANIFEST
6278 // written by VersionSet::WriteCurrentStatetoManifest. This is not
6279 // necessary, but we choose this at present for the sake of simplicity.
6280 active_version_builders_.clear();
6281 }
6282 } while (s.IsPathNotFound());
6283 return s;
6284 }
6285
6286 } // namespace ROCKSDB_NAMESPACE