]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/range_del_aggregator.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / range_del_aggregator.cc
1 // Copyright (c) 2018-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/range_del_aggregator.h"
7
8 #include "db/compaction/compaction_iteration_stats.h"
9 #include "db/dbformat.h"
10 #include "db/pinned_iterators_manager.h"
11 #include "db/range_del_aggregator.h"
12 #include "db/range_tombstone_fragmenter.h"
13 #include "db/version_edit.h"
14 #include "rocksdb/comparator.h"
15 #include "rocksdb/types.h"
16 #include "table/internal_iterator.h"
17 #include "table/scoped_arena_iterator.h"
18 #include "table/table_builder.h"
19 #include "util/heap.h"
20 #include "util/kv_map.h"
21 #include "util/vector_iterator.h"
22
23 namespace ROCKSDB_NAMESPACE {
24
25 TruncatedRangeDelIterator::TruncatedRangeDelIterator(
26 std::unique_ptr<FragmentedRangeTombstoneIterator> iter,
27 const InternalKeyComparator* icmp, const InternalKey* smallest,
28 const InternalKey* largest)
29 : iter_(std::move(iter)),
30 icmp_(icmp),
31 smallest_ikey_(smallest),
32 largest_ikey_(largest) {
33 if (smallest != nullptr) {
34 pinned_bounds_.emplace_back();
35 auto& parsed_smallest = pinned_bounds_.back();
36 if (!ParseInternalKey(smallest->Encode(), &parsed_smallest)) {
37 assert(false);
38 }
39 smallest_ = &parsed_smallest;
40 }
41 if (largest != nullptr) {
42 pinned_bounds_.emplace_back();
43 auto& parsed_largest = pinned_bounds_.back();
44 if (!ParseInternalKey(largest->Encode(), &parsed_largest)) {
45 assert(false);
46 }
47 if (parsed_largest.type == kTypeRangeDeletion &&
48 parsed_largest.sequence == kMaxSequenceNumber) {
49 // The file boundary has been artificially extended by a range tombstone.
50 // We do not need to adjust largest to properly truncate range
51 // tombstones that extend past the boundary.
52 } else if (parsed_largest.sequence == 0) {
53 // The largest key in the sstable has a sequence number of 0. Since we
54 // guarantee that no internal keys with the same user key and sequence
55 // number can exist in a DB, we know that the largest key in this sstable
56 // cannot exist as the smallest key in the next sstable. This further
57 // implies that no range tombstone in this sstable covers largest;
58 // otherwise, the file boundary would have been artificially extended.
59 //
60 // Therefore, we will never truncate a range tombstone at largest, so we
61 // can leave it unchanged.
62 } else {
63 // The same user key may straddle two sstable boundaries. To ensure that
64 // the truncated end key can cover the largest key in this sstable, reduce
65 // its sequence number by 1.
66 parsed_largest.sequence -= 1;
67 }
68 largest_ = &parsed_largest;
69 }
70 }
71
72 bool TruncatedRangeDelIterator::Valid() const {
73 return iter_->Valid() &&
74 (smallest_ == nullptr ||
75 icmp_->Compare(*smallest_, iter_->parsed_end_key()) < 0) &&
76 (largest_ == nullptr ||
77 icmp_->Compare(iter_->parsed_start_key(), *largest_) < 0);
78 }
79
80 void TruncatedRangeDelIterator::Next() { iter_->TopNext(); }
81
82 void TruncatedRangeDelIterator::Prev() { iter_->TopPrev(); }
83
84 void TruncatedRangeDelIterator::InternalNext() { iter_->Next(); }
85
86 // NOTE: target is a user key
87 void TruncatedRangeDelIterator::Seek(const Slice& target) {
88 if (largest_ != nullptr &&
89 icmp_->Compare(*largest_, ParsedInternalKey(target, kMaxSequenceNumber,
90 kTypeRangeDeletion)) <= 0) {
91 iter_->Invalidate();
92 return;
93 }
94 if (smallest_ != nullptr &&
95 icmp_->user_comparator()->Compare(target, smallest_->user_key) < 0) {
96 iter_->Seek(smallest_->user_key);
97 return;
98 }
99 iter_->Seek(target);
100 }
101
102 // NOTE: target is a user key
103 void TruncatedRangeDelIterator::SeekForPrev(const Slice& target) {
104 if (smallest_ != nullptr &&
105 icmp_->Compare(ParsedInternalKey(target, 0, kTypeRangeDeletion),
106 *smallest_) < 0) {
107 iter_->Invalidate();
108 return;
109 }
110 if (largest_ != nullptr &&
111 icmp_->user_comparator()->Compare(largest_->user_key, target) < 0) {
112 iter_->SeekForPrev(largest_->user_key);
113 return;
114 }
115 iter_->SeekForPrev(target);
116 }
117
118 void TruncatedRangeDelIterator::SeekToFirst() {
119 if (smallest_ != nullptr) {
120 iter_->Seek(smallest_->user_key);
121 return;
122 }
123 iter_->SeekToTopFirst();
124 }
125
126 void TruncatedRangeDelIterator::SeekToLast() {
127 if (largest_ != nullptr) {
128 iter_->SeekForPrev(largest_->user_key);
129 return;
130 }
131 iter_->SeekToTopLast();
132 }
133
134 std::map<SequenceNumber, std::unique_ptr<TruncatedRangeDelIterator>>
135 TruncatedRangeDelIterator::SplitBySnapshot(
136 const std::vector<SequenceNumber>& snapshots) {
137 using FragmentedIterPair =
138 std::pair<const SequenceNumber,
139 std::unique_ptr<FragmentedRangeTombstoneIterator>>;
140
141 auto split_untruncated_iters = iter_->SplitBySnapshot(snapshots);
142 std::map<SequenceNumber, std::unique_ptr<TruncatedRangeDelIterator>>
143 split_truncated_iters;
144 std::for_each(
145 split_untruncated_iters.begin(), split_untruncated_iters.end(),
146 [&](FragmentedIterPair& iter_pair) {
147 std::unique_ptr<TruncatedRangeDelIterator> truncated_iter(
148 new TruncatedRangeDelIterator(std::move(iter_pair.second), icmp_,
149 smallest_ikey_, largest_ikey_));
150 split_truncated_iters.emplace(iter_pair.first,
151 std::move(truncated_iter));
152 });
153 return split_truncated_iters;
154 }
155
156 ForwardRangeDelIterator::ForwardRangeDelIterator(
157 const InternalKeyComparator* icmp)
158 : icmp_(icmp),
159 unused_idx_(0),
160 active_seqnums_(SeqMaxComparator()),
161 active_iters_(EndKeyMinComparator(icmp)),
162 inactive_iters_(StartKeyMinComparator(icmp)) {}
163
164 bool ForwardRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) {
165 // Move active iterators that end before parsed.
166 while (!active_iters_.empty() &&
167 icmp_->Compare((*active_iters_.top())->end_key(), parsed) <= 0) {
168 TruncatedRangeDelIterator* iter = PopActiveIter();
169 do {
170 iter->Next();
171 } while (iter->Valid() && icmp_->Compare(iter->end_key(), parsed) <= 0);
172 PushIter(iter, parsed);
173 assert(active_iters_.size() == active_seqnums_.size());
174 }
175
176 // Move inactive iterators that start before parsed.
177 while (!inactive_iters_.empty() &&
178 icmp_->Compare(inactive_iters_.top()->start_key(), parsed) <= 0) {
179 TruncatedRangeDelIterator* iter = PopInactiveIter();
180 while (iter->Valid() && icmp_->Compare(iter->end_key(), parsed) <= 0) {
181 iter->Next();
182 }
183 PushIter(iter, parsed);
184 assert(active_iters_.size() == active_seqnums_.size());
185 }
186
187 return active_seqnums_.empty()
188 ? false
189 : (*active_seqnums_.begin())->seq() > parsed.sequence;
190 }
191
192 void ForwardRangeDelIterator::Invalidate() {
193 unused_idx_ = 0;
194 active_iters_.clear();
195 active_seqnums_.clear();
196 inactive_iters_.clear();
197 }
198
199 ReverseRangeDelIterator::ReverseRangeDelIterator(
200 const InternalKeyComparator* icmp)
201 : icmp_(icmp),
202 unused_idx_(0),
203 active_seqnums_(SeqMaxComparator()),
204 active_iters_(StartKeyMaxComparator(icmp)),
205 inactive_iters_(EndKeyMaxComparator(icmp)) {}
206
207 bool ReverseRangeDelIterator::ShouldDelete(const ParsedInternalKey& parsed) {
208 // Move active iterators that start after parsed.
209 while (!active_iters_.empty() &&
210 icmp_->Compare(parsed, (*active_iters_.top())->start_key()) < 0) {
211 TruncatedRangeDelIterator* iter = PopActiveIter();
212 do {
213 iter->Prev();
214 } while (iter->Valid() && icmp_->Compare(parsed, iter->start_key()) < 0);
215 PushIter(iter, parsed);
216 assert(active_iters_.size() == active_seqnums_.size());
217 }
218
219 // Move inactive iterators that end after parsed.
220 while (!inactive_iters_.empty() &&
221 icmp_->Compare(parsed, inactive_iters_.top()->end_key()) < 0) {
222 TruncatedRangeDelIterator* iter = PopInactiveIter();
223 while (iter->Valid() && icmp_->Compare(parsed, iter->start_key()) < 0) {
224 iter->Prev();
225 }
226 PushIter(iter, parsed);
227 assert(active_iters_.size() == active_seqnums_.size());
228 }
229
230 return active_seqnums_.empty()
231 ? false
232 : (*active_seqnums_.begin())->seq() > parsed.sequence;
233 }
234
235 void ReverseRangeDelIterator::Invalidate() {
236 unused_idx_ = 0;
237 active_iters_.clear();
238 active_seqnums_.clear();
239 inactive_iters_.clear();
240 }
241
242 bool RangeDelAggregator::StripeRep::ShouldDelete(
243 const ParsedInternalKey& parsed, RangeDelPositioningMode mode) {
244 if (!InStripe(parsed.sequence) || IsEmpty()) {
245 return false;
246 }
247 switch (mode) {
248 case RangeDelPositioningMode::kForwardTraversal:
249 InvalidateReverseIter();
250
251 // Pick up previously unseen iterators.
252 for (auto it = std::next(iters_.begin(), forward_iter_.UnusedIdx());
253 it != iters_.end(); ++it, forward_iter_.IncUnusedIdx()) {
254 auto& iter = *it;
255 forward_iter_.AddNewIter(iter.get(), parsed);
256 }
257
258 return forward_iter_.ShouldDelete(parsed);
259 case RangeDelPositioningMode::kBackwardTraversal:
260 InvalidateForwardIter();
261
262 // Pick up previously unseen iterators.
263 for (auto it = std::next(iters_.begin(), reverse_iter_.UnusedIdx());
264 it != iters_.end(); ++it, reverse_iter_.IncUnusedIdx()) {
265 auto& iter = *it;
266 reverse_iter_.AddNewIter(iter.get(), parsed);
267 }
268
269 return reverse_iter_.ShouldDelete(parsed);
270 default:
271 assert(false);
272 return false;
273 }
274 }
275
276 bool RangeDelAggregator::StripeRep::IsRangeOverlapped(const Slice& start,
277 const Slice& end) {
278 Invalidate();
279
280 // Set the internal start/end keys so that:
281 // - if start_ikey has the same user key and sequence number as the
282 // current end key, start_ikey will be considered greater; and
283 // - if end_ikey has the same user key and sequence number as the current
284 // start key, end_ikey will be considered greater.
285 ParsedInternalKey start_ikey(start, kMaxSequenceNumber,
286 static_cast<ValueType>(0));
287 ParsedInternalKey end_ikey(end, 0, static_cast<ValueType>(0));
288 for (auto& iter : iters_) {
289 bool checked_candidate_tombstones = false;
290 for (iter->SeekForPrev(start);
291 iter->Valid() && icmp_->Compare(iter->start_key(), end_ikey) <= 0;
292 iter->Next()) {
293 checked_candidate_tombstones = true;
294 if (icmp_->Compare(start_ikey, iter->end_key()) < 0 &&
295 icmp_->Compare(iter->start_key(), end_ikey) <= 0) {
296 return true;
297 }
298 }
299
300 if (!checked_candidate_tombstones) {
301 // Do an additional check for when the end of the range is the begin
302 // key of a tombstone, which we missed earlier since SeekForPrev'ing
303 // to the start was invalid.
304 iter->SeekForPrev(end);
305 if (iter->Valid() && icmp_->Compare(start_ikey, iter->end_key()) < 0 &&
306 icmp_->Compare(iter->start_key(), end_ikey) <= 0) {
307 return true;
308 }
309 }
310 }
311 return false;
312 }
313
314 void ReadRangeDelAggregator::AddTombstones(
315 std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
316 const InternalKey* smallest, const InternalKey* largest) {
317 if (input_iter == nullptr || input_iter->empty()) {
318 return;
319 }
320 rep_.AddTombstones(
321 std::unique_ptr<TruncatedRangeDelIterator>(new TruncatedRangeDelIterator(
322 std::move(input_iter), icmp_, smallest, largest)));
323 }
324
325 bool ReadRangeDelAggregator::ShouldDeleteImpl(const ParsedInternalKey& parsed,
326 RangeDelPositioningMode mode) {
327 return rep_.ShouldDelete(parsed, mode);
328 }
329
330 bool ReadRangeDelAggregator::IsRangeOverlapped(const Slice& start,
331 const Slice& end) {
332 InvalidateRangeDelMapPositions();
333 return rep_.IsRangeOverlapped(start, end);
334 }
335
336 void CompactionRangeDelAggregator::AddTombstones(
337 std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
338 const InternalKey* smallest, const InternalKey* largest) {
339 if (input_iter == nullptr || input_iter->empty()) {
340 return;
341 }
342 assert(input_iter->lower_bound() == 0);
343 assert(input_iter->upper_bound() == kMaxSequenceNumber);
344 parent_iters_.emplace_back(new TruncatedRangeDelIterator(
345 std::move(input_iter), icmp_, smallest, largest));
346
347 auto split_iters = parent_iters_.back()->SplitBySnapshot(*snapshots_);
348 for (auto& split_iter : split_iters) {
349 auto it = reps_.find(split_iter.first);
350 if (it == reps_.end()) {
351 bool inserted;
352 SequenceNumber upper_bound = split_iter.second->upper_bound();
353 SequenceNumber lower_bound = split_iter.second->lower_bound();
354 std::tie(it, inserted) = reps_.emplace(
355 split_iter.first, StripeRep(icmp_, upper_bound, lower_bound));
356 assert(inserted);
357 }
358 assert(it != reps_.end());
359 it->second.AddTombstones(std::move(split_iter.second));
360 }
361 }
362
363 bool CompactionRangeDelAggregator::ShouldDelete(const ParsedInternalKey& parsed,
364 RangeDelPositioningMode mode) {
365 auto it = reps_.lower_bound(parsed.sequence);
366 if (it == reps_.end()) {
367 return false;
368 }
369 return it->second.ShouldDelete(parsed, mode);
370 }
371
372 namespace {
373
374 class TruncatedRangeDelMergingIter : public InternalIterator {
375 public:
376 TruncatedRangeDelMergingIter(
377 const InternalKeyComparator* icmp, const Slice* lower_bound,
378 const Slice* upper_bound, bool upper_bound_inclusive,
379 const std::vector<std::unique_ptr<TruncatedRangeDelIterator>>& children)
380 : icmp_(icmp),
381 lower_bound_(lower_bound),
382 upper_bound_(upper_bound),
383 upper_bound_inclusive_(upper_bound_inclusive),
384 heap_(StartKeyMinComparator(icmp)) {
385 for (auto& child : children) {
386 if (child != nullptr) {
387 assert(child->lower_bound() == 0);
388 assert(child->upper_bound() == kMaxSequenceNumber);
389 children_.push_back(child.get());
390 }
391 }
392 }
393
394 bool Valid() const override {
395 return !heap_.empty() && BeforeEndKey(heap_.top());
396 }
397 Status status() const override { return Status::OK(); }
398
399 void SeekToFirst() override {
400 heap_.clear();
401 for (auto& child : children_) {
402 if (lower_bound_ != nullptr) {
403 child->Seek(*lower_bound_);
404 } else {
405 child->SeekToFirst();
406 }
407 if (child->Valid()) {
408 heap_.push(child);
409 }
410 }
411 }
412
413 void Next() override {
414 auto* top = heap_.top();
415 top->InternalNext();
416 if (top->Valid()) {
417 heap_.replace_top(top);
418 } else {
419 heap_.pop();
420 }
421 }
422
423 Slice key() const override {
424 auto* top = heap_.top();
425 cur_start_key_.Set(top->start_key().user_key, top->seq(),
426 kTypeRangeDeletion);
427 return cur_start_key_.Encode();
428 }
429
430 Slice value() const override {
431 auto* top = heap_.top();
432 assert(top->end_key().sequence == kMaxSequenceNumber);
433 return top->end_key().user_key;
434 }
435
436 // Unused InternalIterator methods
437 void Prev() override { assert(false); }
438 void Seek(const Slice& /* target */) override { assert(false); }
439 void SeekForPrev(const Slice& /* target */) override { assert(false); }
440 void SeekToLast() override { assert(false); }
441
442 private:
443 bool BeforeEndKey(const TruncatedRangeDelIterator* iter) const {
444 if (upper_bound_ == nullptr) {
445 return true;
446 }
447 int cmp = icmp_->user_comparator()->Compare(iter->start_key().user_key,
448 *upper_bound_);
449 return upper_bound_inclusive_ ? cmp <= 0 : cmp < 0;
450 }
451
452 const InternalKeyComparator* icmp_;
453 const Slice* lower_bound_;
454 const Slice* upper_bound_;
455 bool upper_bound_inclusive_;
456 BinaryHeap<TruncatedRangeDelIterator*, StartKeyMinComparator> heap_;
457 std::vector<TruncatedRangeDelIterator*> children_;
458
459 mutable InternalKey cur_start_key_;
460 };
461
462 } // namespace
463
464 std::unique_ptr<FragmentedRangeTombstoneIterator>
465 CompactionRangeDelAggregator::NewIterator(const Slice* lower_bound,
466 const Slice* upper_bound,
467 bool upper_bound_inclusive) {
468 InvalidateRangeDelMapPositions();
469 std::unique_ptr<TruncatedRangeDelMergingIter> merging_iter(
470 new TruncatedRangeDelMergingIter(icmp_, lower_bound, upper_bound,
471 upper_bound_inclusive, parent_iters_));
472
473 auto fragmented_tombstone_list =
474 std::make_shared<FragmentedRangeTombstoneList>(
475 std::move(merging_iter), *icmp_, true /* for_compaction */,
476 *snapshots_);
477
478 return std::unique_ptr<FragmentedRangeTombstoneIterator>(
479 new FragmentedRangeTombstoneIterator(
480 fragmented_tombstone_list, *icmp_,
481 kMaxSequenceNumber /* upper_bound */));
482 }
483
484 } // namespace ROCKSDB_NAMESPACE