]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction_iterator.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / db / compaction_iterator.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/compaction_iterator.h"
7
8 #include "db/snapshot_checker.h"
9 #include "port/likely.h"
10 #include "rocksdb/listener.h"
11 #include "table/internal_iterator.h"
12
13 namespace rocksdb {
14
15 CompactionIterator::CompactionIterator(
16 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
17 SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
18 SequenceNumber earliest_write_conflict_snapshot,
19 const SnapshotChecker* snapshot_checker, Env* env,
20 bool report_detailed_time, bool expect_valid_internal_key,
21 RangeDelAggregator* range_del_agg, const Compaction* compaction,
22 const CompactionFilter* compaction_filter,
23 const std::atomic<bool>* shutting_down,
24 const SequenceNumber preserve_deletes_seqnum)
25 : CompactionIterator(
26 input, cmp, merge_helper, last_sequence, snapshots,
27 earliest_write_conflict_snapshot, snapshot_checker, env,
28 report_detailed_time, expect_valid_internal_key, range_del_agg,
29 std::unique_ptr<CompactionProxy>(
30 compaction ? new CompactionProxy(compaction) : nullptr),
31 compaction_filter, shutting_down, preserve_deletes_seqnum) {}
32
33 CompactionIterator::CompactionIterator(
34 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
35 SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
36 SequenceNumber earliest_write_conflict_snapshot,
37 const SnapshotChecker* snapshot_checker, Env* env,
38 bool report_detailed_time, bool expect_valid_internal_key,
39 RangeDelAggregator* range_del_agg,
40 std::unique_ptr<CompactionProxy> compaction,
41 const CompactionFilter* compaction_filter,
42 const std::atomic<bool>* shutting_down,
43 const SequenceNumber preserve_deletes_seqnum)
44 : input_(input),
45 cmp_(cmp),
46 merge_helper_(merge_helper),
47 snapshots_(snapshots),
48 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
49 snapshot_checker_(snapshot_checker),
50 env_(env),
51 report_detailed_time_(report_detailed_time),
52 expect_valid_internal_key_(expect_valid_internal_key),
53 range_del_agg_(range_del_agg),
54 compaction_(std::move(compaction)),
55 compaction_filter_(compaction_filter),
56 shutting_down_(shutting_down),
57 preserve_deletes_seqnum_(preserve_deletes_seqnum),
58 ignore_snapshots_(false),
59 current_user_key_sequence_(0),
60 current_user_key_snapshot_(0),
61 merge_out_iter_(merge_helper_),
62 current_key_committed_(false) {
63 assert(compaction_filter_ == nullptr || compaction_ != nullptr);
64 bottommost_level_ =
65 compaction_ == nullptr ? false : compaction_->bottommost_level();
66 if (compaction_ != nullptr) {
67 level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
68 }
69
70 if (snapshots_->size() == 0) {
71 // optimize for fast path if there are no snapshots
72 visible_at_tip_ = true;
73 earliest_snapshot_ = kMaxSequenceNumber;
74 latest_snapshot_ = 0;
75 } else {
76 visible_at_tip_ = false;
77 earliest_snapshot_ = snapshots_->at(0);
78 latest_snapshot_ = snapshots_->back();
79 }
80 #ifndef NDEBUG
81 // findEarliestVisibleSnapshot assumes this ordering.
82 for (size_t i = 1; i < snapshots_->size(); ++i) {
83 assert(snapshots_->at(i - 1) <= snapshots_->at(i));
84 }
85 #endif
86 if (compaction_filter_ != nullptr) {
87 if (compaction_filter_->IgnoreSnapshots()) {
88 ignore_snapshots_ = true;
89 }
90 } else {
91 ignore_snapshots_ = false;
92 }
93 input_->SetPinnedItersMgr(&pinned_iters_mgr_);
94 }
95
96 CompactionIterator::~CompactionIterator() {
97 // input_ Iteartor lifetime is longer than pinned_iters_mgr_ lifetime
98 input_->SetPinnedItersMgr(nullptr);
99 }
100
101 void CompactionIterator::ResetRecordCounts() {
102 iter_stats_.num_record_drop_user = 0;
103 iter_stats_.num_record_drop_hidden = 0;
104 iter_stats_.num_record_drop_obsolete = 0;
105 iter_stats_.num_record_drop_range_del = 0;
106 iter_stats_.num_range_del_drop_obsolete = 0;
107 iter_stats_.num_optimized_del_drop_obsolete = 0;
108 }
109
110 void CompactionIterator::SeekToFirst() {
111 NextFromInput();
112 PrepareOutput();
113 }
114
115 void CompactionIterator::Next() {
116 // If there is a merge output, return it before continuing to process the
117 // input.
118 if (merge_out_iter_.Valid()) {
119 merge_out_iter_.Next();
120
121 // Check if we returned all records of the merge output.
122 if (merge_out_iter_.Valid()) {
123 key_ = merge_out_iter_.key();
124 value_ = merge_out_iter_.value();
125 bool valid_key __attribute__((__unused__));
126 valid_key = ParseInternalKey(key_, &ikey_);
127 // MergeUntil stops when it encounters a corrupt key and does not
128 // include them in the result, so we expect the keys here to be valid.
129 assert(valid_key);
130 // Keep current_key_ in sync.
131 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
132 key_ = current_key_.GetInternalKey();
133 ikey_.user_key = current_key_.GetUserKey();
134 valid_ = true;
135 } else {
136 // We consumed all pinned merge operands, release pinned iterators
137 pinned_iters_mgr_.ReleasePinnedData();
138 // MergeHelper moves the iterator to the first record after the merged
139 // records, so even though we reached the end of the merge output, we do
140 // not want to advance the iterator.
141 NextFromInput();
142 }
143 } else {
144 // Only advance the input iterator if there is no merge output and the
145 // iterator is not already at the next record.
146 if (!at_next_) {
147 input_->Next();
148 }
149 NextFromInput();
150 }
151
152 if (valid_) {
153 // Record that we've outputted a record for the current key.
154 has_outputted_key_ = true;
155 }
156
157 PrepareOutput();
158 }
159
160 void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
161 Slice* skip_until) {
162 if (compaction_filter_ != nullptr &&
163 (ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex) &&
164 (visible_at_tip_ || ignore_snapshots_ ||
165 ikey_.sequence > latest_snapshot_ ||
166 (snapshot_checker_ != nullptr &&
167 UNLIKELY(!snapshot_checker_->IsInSnapshot(ikey_.sequence,
168 latest_snapshot_))))) {
169 // If the user has specified a compaction filter and the sequence
170 // number is greater than any external snapshot, then invoke the
171 // filter. If the return value of the compaction filter is true,
172 // replace the entry with a deletion marker.
173 CompactionFilter::Decision filter;
174 compaction_filter_value_.clear();
175 compaction_filter_skip_until_.Clear();
176 CompactionFilter::ValueType value_type =
177 ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
178 : CompactionFilter::ValueType::kBlobIndex;
179 // Hack: pass internal key to BlobIndexCompactionFilter since it needs
180 // to get sequence number.
181 Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_;
182 {
183 StopWatchNano timer(env_, report_detailed_time_);
184 filter = compaction_filter_->FilterV2(
185 compaction_->level(), filter_key, value_type, value_,
186 &compaction_filter_value_, compaction_filter_skip_until_.rep());
187 iter_stats_.total_filter_time +=
188 env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
189 }
190
191 if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
192 cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
193 0) {
194 // Can't skip to a key smaller than the current one.
195 // Keep the key as per FilterV2 documentation.
196 filter = CompactionFilter::Decision::kKeep;
197 }
198
199 if (filter == CompactionFilter::Decision::kRemove) {
200 // convert the current key to a delete; key_ is pointing into
201 // current_key_ at this point, so updating current_key_ updates key()
202 ikey_.type = kTypeDeletion;
203 current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
204 // no value associated with delete
205 value_.clear();
206 iter_stats_.num_record_drop_user++;
207 } else if (filter == CompactionFilter::Decision::kChangeValue) {
208 value_ = compaction_filter_value_;
209 } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
210 *need_skip = true;
211 compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
212 kValueTypeForSeek);
213 *skip_until = compaction_filter_skip_until_.Encode();
214 }
215 }
216 }
217
218 void CompactionIterator::NextFromInput() {
219 at_next_ = false;
220 valid_ = false;
221
222 while (!valid_ && input_->Valid() && !IsShuttingDown()) {
223 key_ = input_->key();
224 value_ = input_->value();
225 iter_stats_.num_input_records++;
226
227 if (!ParseInternalKey(key_, &ikey_)) {
228 // If `expect_valid_internal_key_` is false, return the corrupted key
229 // and let the caller decide what to do with it.
230 // TODO(noetzli): We should have a more elegant solution for this.
231 if (expect_valid_internal_key_) {
232 assert(!"Corrupted internal key not expected.");
233 status_ = Status::Corruption("Corrupted internal key not expected.");
234 break;
235 }
236 key_ = current_key_.SetInternalKey(key_);
237 has_current_user_key_ = false;
238 current_user_key_sequence_ = kMaxSequenceNumber;
239 current_user_key_snapshot_ = 0;
240 iter_stats_.num_input_corrupt_records++;
241 valid_ = true;
242 break;
243 }
244
245 // Update input statistics
246 if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
247 iter_stats_.num_input_deletion_records++;
248 }
249 iter_stats_.total_input_raw_key_bytes += key_.size();
250 iter_stats_.total_input_raw_value_bytes += value_.size();
251
252 // If need_skip is true, we should seek the input iterator
253 // to internal key skip_until and continue from there.
254 bool need_skip = false;
255 // Points either into compaction_filter_skip_until_ or into
256 // merge_helper_->compaction_filter_skip_until_.
257 Slice skip_until;
258
259 // Check whether the user key changed. After this if statement current_key_
260 // is a copy of the current input key (maybe converted to a delete by the
261 // compaction filter). ikey_.user_key is pointing to the copy.
262 if (!has_current_user_key_ ||
263 !cmp_->Equal(ikey_.user_key, current_user_key_)) {
264 // First occurrence of this user key
265 // Copy key for output
266 key_ = current_key_.SetInternalKey(key_, &ikey_);
267 current_user_key_ = ikey_.user_key;
268 has_current_user_key_ = true;
269 has_outputted_key_ = false;
270 current_user_key_sequence_ = kMaxSequenceNumber;
271 current_user_key_snapshot_ = 0;
272 current_key_committed_ =
273 (snapshot_checker_ == nullptr ||
274 snapshot_checker_->IsInSnapshot(ikey_.sequence, kMaxSequenceNumber));
275
276 // Apply the compaction filter to the first committed version of the user
277 // key.
278 if (current_key_committed_) {
279 InvokeFilterIfNeeded(&need_skip, &skip_until);
280 }
281 } else {
282 // Update the current key to reflect the new sequence number/type without
283 // copying the user key.
284 // TODO(rven): Compaction filter does not process keys in this path
285 // Need to have the compaction filter process multiple versions
286 // if we have versions on both sides of a snapshot
287 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
288 key_ = current_key_.GetInternalKey();
289 ikey_.user_key = current_key_.GetUserKey();
290
291 // Note that newer version of a key is ordered before older versions. If a
292 // newer version of a key is committed, so as the older version. No need
293 // to query snapshot_checker_ in that case.
294 if (UNLIKELY(!current_key_committed_)) {
295 assert(snapshot_checker_ != nullptr);
296 current_key_committed_ =
297 snapshot_checker_->IsInSnapshot(ikey_.sequence, kMaxSequenceNumber);
298 // Apply the compaction filter to the first committed version of the
299 // user key.
300 if (current_key_committed_) {
301 InvokeFilterIfNeeded(&need_skip, &skip_until);
302 }
303 }
304 }
305
306 if (UNLIKELY(!current_key_committed_)) {
307 assert(snapshot_checker_ != nullptr);
308 valid_ = true;
309 break;
310 }
311
312 // If there are no snapshots, then this kv affect visibility at tip.
313 // Otherwise, search though all existing snapshots to find the earliest
314 // snapshot that is affected by this kv.
315 SequenceNumber last_sequence __attribute__((__unused__));
316 last_sequence = current_user_key_sequence_;
317 current_user_key_sequence_ = ikey_.sequence;
318 SequenceNumber last_snapshot = current_user_key_snapshot_;
319 SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
320 current_user_key_snapshot_ =
321 visible_at_tip_
322 ? earliest_snapshot_
323 : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
324
325 if (need_skip) {
326 // This case is handled below.
327 } else if (clear_and_output_next_key_) {
328 // In the previous iteration we encountered a single delete that we could
329 // not compact out. We will keep this Put, but can drop it's data.
330 // (See Optimization 3, below.)
331 assert(ikey_.type == kTypeValue);
332 assert(current_user_key_snapshot_ == last_snapshot);
333
334 value_.clear();
335 valid_ = true;
336 clear_and_output_next_key_ = false;
337 } else if (ikey_.type == kTypeSingleDeletion) {
338 // We can compact out a SingleDelete if:
339 // 1) We encounter the corresponding PUT -OR- we know that this key
340 // doesn't appear past this output level
341 // =AND=
342 // 2) We've already returned a record in this snapshot -OR-
343 // there are no earlier earliest_write_conflict_snapshot.
344 //
345 // Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to
346 // allow Transactions to do write-conflict checking (if we compacted away
347 // all keys, then we wouldn't know that a write happened in this
348 // snapshot). If there is no earlier snapshot, then we know that there
349 // are no active transactions that need to know about any writes.
350 //
351 // Optimization 3:
352 // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
353 // true, then we must output a SingleDelete. In this case, we will decide
354 // to also output the PUT. While we are compacting less by outputting the
355 // PUT now, hopefully this will lead to better compaction in the future
356 // when Rule 2 is later true (Ie, We are hoping we can later compact out
357 // both the SingleDelete and the Put, while we couldn't if we only
358 // outputted the SingleDelete now).
359 // In this case, we can save space by removing the PUT's value as it will
360 // never be read.
361 //
362 // Deletes and Merges are not supported on the same key that has a
363 // SingleDelete as it is not possible to correctly do any partial
364 // compaction of such a combination of operations. The result of mixing
365 // those operations for a given key is documented as being undefined. So
366 // we can choose how to handle such a combinations of operations. We will
367 // try to compact out as much as we can in these cases.
368 // We will report counts on these anomalous cases.
369
370 // The easiest way to process a SingleDelete during iteration is to peek
371 // ahead at the next key.
372 ParsedInternalKey next_ikey;
373 input_->Next();
374
375 // Check whether the next key exists, is not corrupt, and is the same key
376 // as the single delete.
377 if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
378 cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
379 // Check whether the next key belongs to the same snapshot as the
380 // SingleDelete.
381 if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot ||
382 (snapshot_checker_ != nullptr &&
383 UNLIKELY(!snapshot_checker_->IsInSnapshot(next_ikey.sequence,
384 prev_snapshot)))) {
385 if (next_ikey.type == kTypeSingleDeletion) {
386 // We encountered two SingleDeletes in a row. This could be due to
387 // unexpected user input.
388 // Skip the first SingleDelete and let the next iteration decide how
389 // to handle the second SingleDelete
390
391 // First SingleDelete has been skipped since we already called
392 // input_->Next().
393 ++iter_stats_.num_record_drop_obsolete;
394 ++iter_stats_.num_single_del_mismatch;
395 } else if (has_outputted_key_ ||
396 (ikey_.sequence <= earliest_write_conflict_snapshot_ &&
397 (snapshot_checker_ == nullptr ||
398 LIKELY(snapshot_checker_->IsInSnapshot(
399 ikey_.sequence,
400 earliest_write_conflict_snapshot_))))) {
401 // Found a matching value, we can drop the single delete and the
402 // value. It is safe to drop both records since we've already
403 // outputted a key in this snapshot, or there is no earlier
404 // snapshot (Rule 2 above).
405
406 // Note: it doesn't matter whether the second key is a Put or if it
407 // is an unexpected Merge or Delete. We will compact it out
408 // either way. We will maintain counts of how many mismatches
409 // happened
410 if (next_ikey.type != kTypeValue) {
411 ++iter_stats_.num_single_del_mismatch;
412 }
413
414 ++iter_stats_.num_record_drop_hidden;
415 ++iter_stats_.num_record_drop_obsolete;
416 // Already called input_->Next() once. Call it a second time to
417 // skip past the second key.
418 input_->Next();
419 } else {
420 // Found a matching value, but we cannot drop both keys since
421 // there is an earlier snapshot and we need to leave behind a record
422 // to know that a write happened in this snapshot (Rule 2 above).
423 // Clear the value and output the SingleDelete. (The value will be
424 // outputted on the next iteration.)
425
426 // Setting valid_ to true will output the current SingleDelete
427 valid_ = true;
428
429 // Set up the Put to be outputted in the next iteration.
430 // (Optimization 3).
431 clear_and_output_next_key_ = true;
432 }
433 } else {
434 // We hit the next snapshot without hitting a put, so the iterator
435 // returns the single delete.
436 valid_ = true;
437 }
438 } else {
439 // We are at the end of the input, could not parse the next key, or hit
440 // a different key. The iterator returns the single delete if the key
441 // possibly exists beyond the current output level. We set
442 // has_current_user_key to false so that if the iterator is at the next
443 // key, we do not compare it again against the previous key at the next
444 // iteration. If the next key is corrupt, we return before the
445 // comparison, so the value of has_current_user_key does not matter.
446 has_current_user_key_ = false;
447 if (compaction_ != nullptr && ikey_.sequence <= earliest_snapshot_ &&
448 (snapshot_checker_ == nullptr ||
449 LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence,
450 earliest_snapshot_))) &&
451 compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
452 &level_ptrs_)) {
453 // Key doesn't exist outside of this range.
454 // Can compact out this SingleDelete.
455 ++iter_stats_.num_record_drop_obsolete;
456 ++iter_stats_.num_single_del_fallthru;
457 if (!bottommost_level_) {
458 ++iter_stats_.num_optimized_del_drop_obsolete;
459 }
460 } else {
461 // Output SingleDelete
462 valid_ = true;
463 }
464 }
465
466 if (valid_) {
467 at_next_ = true;
468 }
469 } else if (last_snapshot == current_user_key_snapshot_) {
470 // If the earliest snapshot is which this key is visible in
471 // is the same as the visibility of a previous instance of the
472 // same key, then this kv is not visible in any snapshot.
473 // Hidden by an newer entry for same user key
474 // TODO(noetzli): why not > ?
475 //
476 // Note: Dropping this key will not affect TransactionDB write-conflict
477 // checking since there has already been a record returned for this key
478 // in this snapshot.
479 assert(last_sequence >= current_user_key_sequence_);
480 ++iter_stats_.num_record_drop_hidden; // (A)
481 input_->Next();
482 } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
483 ikey_.sequence <= earliest_snapshot_ &&
484 (snapshot_checker_ == nullptr ||
485 LIKELY(snapshot_checker_->IsInSnapshot(ikey_.sequence,
486 earliest_snapshot_))) &&
487 ikeyNotNeededForIncrementalSnapshot() &&
488 compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
489 &level_ptrs_)) {
490 // TODO(noetzli): This is the only place where we use compaction_
491 // (besides the constructor). We should probably get rid of this
492 // dependency and find a way to do similar filtering during flushes.
493 //
494 // For this user key:
495 // (1) there is no data in higher levels
496 // (2) data in lower levels will have larger sequence numbers
497 // (3) data in layers that are being compacted here and have
498 // smaller sequence numbers will be dropped in the next
499 // few iterations of this loop (by rule (A) above).
500 // Therefore this deletion marker is obsolete and can be dropped.
501 //
502 // Note: Dropping this Delete will not affect TransactionDB
503 // write-conflict checking since it is earlier than any snapshot.
504 //
505 // It seems that we can also drop deletion later than earliest snapshot
506 // given that:
507 // (1) The deletion is earlier than earliest_write_conflict_snapshot, and
508 // (2) No value exist earlier than the deletion.
509 ++iter_stats_.num_record_drop_obsolete;
510 if (!bottommost_level_) {
511 ++iter_stats_.num_optimized_del_drop_obsolete;
512 }
513 input_->Next();
514 } else if ((ikey_.type == kTypeDeletion) && bottommost_level_ &&
515 ikeyNotNeededForIncrementalSnapshot()) {
516 // Handle the case where we have a delete key at the bottom most level
517 // We can skip outputting the key iff there are no subsequent puts for this
518 // key
519 ParsedInternalKey next_ikey;
520 input_->Next();
521 // Skip over all versions of this key that happen to occur in the same snapshot
522 // range as the delete
523 while (input_->Valid() &&
524 ParseInternalKey(input_->key(), &next_ikey) &&
525 cmp_->Equal(ikey_.user_key, next_ikey.user_key) &&
526 (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot ||
527 (snapshot_checker_ != nullptr &&
528 UNLIKELY(!snapshot_checker_->IsInSnapshot(next_ikey.sequence,
529 prev_snapshot))))) {
530 input_->Next();
531 }
532 // If you find you still need to output a row with this key, we need to output the
533 // delete too
534 if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
535 cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
536 valid_ = true;
537 at_next_ = true;
538 }
539 } else if (ikey_.type == kTypeMerge) {
540 if (!merge_helper_->HasOperator()) {
541 status_ = Status::InvalidArgument(
542 "merge_operator is not properly initialized.");
543 return;
544 }
545
546 pinned_iters_mgr_.StartPinning();
547 // We know the merge type entry is not hidden, otherwise we would
548 // have hit (A)
549 // We encapsulate the merge related state machine in a different
550 // object to minimize change to the existing flow.
551 Status s = merge_helper_->MergeUntil(input_, range_del_agg_,
552 prev_snapshot, bottommost_level_);
553 merge_out_iter_.SeekToFirst();
554
555 if (!s.ok() && !s.IsMergeInProgress()) {
556 status_ = s;
557 return;
558 } else if (merge_out_iter_.Valid()) {
559 // NOTE: key, value, and ikey_ refer to old entries.
560 // These will be correctly set below.
561 key_ = merge_out_iter_.key();
562 value_ = merge_out_iter_.value();
563 bool valid_key __attribute__((__unused__));
564 valid_key = ParseInternalKey(key_, &ikey_);
565 // MergeUntil stops when it encounters a corrupt key and does not
566 // include them in the result, so we expect the keys here to valid.
567 assert(valid_key);
568 // Keep current_key_ in sync.
569 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
570 key_ = current_key_.GetInternalKey();
571 ikey_.user_key = current_key_.GetUserKey();
572 valid_ = true;
573 } else {
574 // all merge operands were filtered out. reset the user key, since the
575 // batch consumed by the merge operator should not shadow any keys
576 // coming after the merges
577 has_current_user_key_ = false;
578 pinned_iters_mgr_.ReleasePinnedData();
579
580 if (merge_helper_->FilteredUntil(&skip_until)) {
581 need_skip = true;
582 }
583 }
584 } else {
585 // 1. new user key -OR-
586 // 2. different snapshot stripe
587 bool should_delete = range_del_agg_->ShouldDelete(
588 key_, RangeDelPositioningMode::kForwardTraversal);
589 if (should_delete) {
590 ++iter_stats_.num_record_drop_hidden;
591 ++iter_stats_.num_record_drop_range_del;
592 input_->Next();
593 } else {
594 valid_ = true;
595 }
596 }
597
598 if (need_skip) {
599 input_->Seek(skip_until);
600 }
601 }
602
603 if (!valid_ && IsShuttingDown()) {
604 status_ = Status::ShutdownInProgress();
605 }
606 }
607
608 void CompactionIterator::PrepareOutput() {
609 // Zeroing out the sequence number leads to better compression.
610 // If this is the bottommost level (no files in lower levels)
611 // and the earliest snapshot is larger than this seqno
612 // and the userkey differs from the last userkey in compaction
613 // then we can squash the seqno to zero.
614 //
615 // This is safe for TransactionDB write-conflict checking since transactions
616 // only care about sequence number larger than any active snapshots.
617 //
618 // Can we do the same for levels above bottom level as long as
619 // KeyNotExistsBeyondOutputLevel() return true?
620 if ((compaction_ != nullptr &&
621 !compaction_->allow_ingest_behind()) &&
622 ikeyNotNeededForIncrementalSnapshot() &&
623 bottommost_level_ && valid_ && ikey_.sequence <= earliest_snapshot_ &&
624 (snapshot_checker_ == nullptr || LIKELY(snapshot_checker_->IsInSnapshot(
625 ikey_.sequence, earliest_snapshot_))) &&
626 ikey_.type != kTypeMerge &&
627 !cmp_->Equal(compaction_->GetLargestUserKey(), ikey_.user_key)) {
628 assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
629 ikey_.sequence = 0;
630 current_key_.UpdateInternalKey(0, ikey_.type);
631 }
632 }
633
634 inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
635 SequenceNumber in, SequenceNumber* prev_snapshot) {
636 assert(snapshots_->size());
637 auto snapshots_iter = std::lower_bound(
638 snapshots_->begin(), snapshots_->end(), in);
639 if (snapshots_iter == snapshots_->begin()) {
640 *prev_snapshot = 0;
641 } else {
642 *prev_snapshot = *std::prev(snapshots_iter);
643 assert(*prev_snapshot < in);
644 }
645 for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
646 auto cur = *snapshots_iter;
647 assert(in <= cur);
648 if (snapshot_checker_ == nullptr ||
649 snapshot_checker_->IsInSnapshot(in, cur)) {
650 return cur;
651 }
652 *prev_snapshot = cur;
653 }
654 return kMaxSequenceNumber;
655 }
656
657 // used in 2 places - prevents deletion markers to be dropped if they may be
658 // needed and disables seqnum zero-out in PrepareOutput for recent keys.
659 inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() {
660 return (!compaction_->preserve_deletes()) ||
661 (ikey_.sequence < preserve_deletes_seqnum_);
662 }
663
664 } // namespace rocksdb