]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/compaction_iterator.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rocksdb / db / compaction_iterator.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#include "db/compaction_iterator.h"
11fdf7f2
TL
7
8#include "db/snapshot_checker.h"
9#include "port/likely.h"
10#include "rocksdb/listener.h"
7c673cae 11#include "table/internal_iterator.h"
494da23a
TL
12#include "util/sync_point.h"
13
14#define DEFINITELY_IN_SNAPSHOT(seq, snapshot) \
15 ((seq) <= (snapshot) && \
16 (snapshot_checker_ == nullptr || \
17 LIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \
18 SnapshotCheckerResult::kInSnapshot)))
19
20#define DEFINITELY_NOT_IN_SNAPSHOT(seq, snapshot) \
21 ((seq) > (snapshot) || \
22 (snapshot_checker_ != nullptr && \
23 UNLIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \
24 SnapshotCheckerResult::kNotInSnapshot)))
25
26#define IN_EARLIEST_SNAPSHOT(seq) \
27 ((seq) <= earliest_snapshot_ && \
28 (snapshot_checker_ == nullptr || LIKELY(IsInEarliestSnapshot(seq))))
7c673cae
FG
29
30namespace rocksdb {
31
7c673cae
FG
32CompactionIterator::CompactionIterator(
33 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
34 SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
11fdf7f2
TL
35 SequenceNumber earliest_write_conflict_snapshot,
36 const SnapshotChecker* snapshot_checker, Env* env,
37 bool report_detailed_time, bool expect_valid_internal_key,
494da23a 38 CompactionRangeDelAggregator* range_del_agg, const Compaction* compaction,
11fdf7f2
TL
39 const CompactionFilter* compaction_filter,
40 const std::atomic<bool>* shutting_down,
41 const SequenceNumber preserve_deletes_seqnum)
7c673cae
FG
42 : CompactionIterator(
43 input, cmp, merge_helper, last_sequence, snapshots,
11fdf7f2
TL
44 earliest_write_conflict_snapshot, snapshot_checker, env,
45 report_detailed_time, expect_valid_internal_key, range_del_agg,
7c673cae
FG
46 std::unique_ptr<CompactionProxy>(
47 compaction ? new CompactionProxy(compaction) : nullptr),
11fdf7f2 48 compaction_filter, shutting_down, preserve_deletes_seqnum) {}
7c673cae
FG
49
50CompactionIterator::CompactionIterator(
51 InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
11fdf7f2
TL
52 SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots,
53 SequenceNumber earliest_write_conflict_snapshot,
54 const SnapshotChecker* snapshot_checker, Env* env,
55 bool report_detailed_time, bool expect_valid_internal_key,
494da23a 56 CompactionRangeDelAggregator* range_del_agg,
7c673cae
FG
57 std::unique_ptr<CompactionProxy> compaction,
58 const CompactionFilter* compaction_filter,
11fdf7f2
TL
59 const std::atomic<bool>* shutting_down,
60 const SequenceNumber preserve_deletes_seqnum)
7c673cae
FG
61 : input_(input),
62 cmp_(cmp),
63 merge_helper_(merge_helper),
64 snapshots_(snapshots),
65 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
11fdf7f2 66 snapshot_checker_(snapshot_checker),
7c673cae 67 env_(env),
11fdf7f2 68 report_detailed_time_(report_detailed_time),
7c673cae
FG
69 expect_valid_internal_key_(expect_valid_internal_key),
70 range_del_agg_(range_del_agg),
71 compaction_(std::move(compaction)),
72 compaction_filter_(compaction_filter),
7c673cae 73 shutting_down_(shutting_down),
11fdf7f2 74 preserve_deletes_seqnum_(preserve_deletes_seqnum),
11fdf7f2
TL
75 current_user_key_sequence_(0),
76 current_user_key_snapshot_(0),
77 merge_out_iter_(merge_helper_),
78 current_key_committed_(false) {
7c673cae 79 assert(compaction_filter_ == nullptr || compaction_ != nullptr);
494da23a 80 assert(snapshots_ != nullptr);
7c673cae
FG
81 bottommost_level_ =
82 compaction_ == nullptr ? false : compaction_->bottommost_level();
83 if (compaction_ != nullptr) {
84 level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
85 }
7c673cae
FG
86 if (snapshots_->size() == 0) {
87 // optimize for fast path if there are no snapshots
88 visible_at_tip_ = true;
494da23a 89 earliest_snapshot_iter_ = snapshots_->end();
7c673cae
FG
90 earliest_snapshot_ = kMaxSequenceNumber;
91 latest_snapshot_ = 0;
92 } else {
93 visible_at_tip_ = false;
494da23a 94 earliest_snapshot_iter_ = snapshots_->begin();
7c673cae
FG
95 earliest_snapshot_ = snapshots_->at(0);
96 latest_snapshot_ = snapshots_->back();
97 }
11fdf7f2
TL
98#ifndef NDEBUG
99 // findEarliestVisibleSnapshot assumes this ordering.
100 for (size_t i = 1; i < snapshots_->size(); ++i) {
494da23a 101 assert(snapshots_->at(i - 1) < snapshots_->at(i));
11fdf7f2
TL
102 }
103#endif
7c673cae 104 input_->SetPinnedItersMgr(&pinned_iters_mgr_);
494da23a 105 TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
7c673cae
FG
106}
107
108CompactionIterator::~CompactionIterator() {
109 // input_ Iteartor lifetime is longer than pinned_iters_mgr_ lifetime
110 input_->SetPinnedItersMgr(nullptr);
111}
112
113void CompactionIterator::ResetRecordCounts() {
114 iter_stats_.num_record_drop_user = 0;
115 iter_stats_.num_record_drop_hidden = 0;
116 iter_stats_.num_record_drop_obsolete = 0;
117 iter_stats_.num_record_drop_range_del = 0;
118 iter_stats_.num_range_del_drop_obsolete = 0;
11fdf7f2 119 iter_stats_.num_optimized_del_drop_obsolete = 0;
7c673cae
FG
120}
121
122void CompactionIterator::SeekToFirst() {
123 NextFromInput();
124 PrepareOutput();
125}
126
127void CompactionIterator::Next() {
128 // If there is a merge output, return it before continuing to process the
129 // input.
130 if (merge_out_iter_.Valid()) {
131 merge_out_iter_.Next();
132
133 // Check if we returned all records of the merge output.
134 if (merge_out_iter_.Valid()) {
135 key_ = merge_out_iter_.key();
136 value_ = merge_out_iter_.value();
11fdf7f2
TL
137 bool valid_key __attribute__((__unused__));
138 valid_key = ParseInternalKey(key_, &ikey_);
7c673cae
FG
139 // MergeUntil stops when it encounters a corrupt key and does not
140 // include them in the result, so we expect the keys here to be valid.
141 assert(valid_key);
142 // Keep current_key_ in sync.
143 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
144 key_ = current_key_.GetInternalKey();
145 ikey_.user_key = current_key_.GetUserKey();
146 valid_ = true;
147 } else {
148 // We consumed all pinned merge operands, release pinned iterators
149 pinned_iters_mgr_.ReleasePinnedData();
150 // MergeHelper moves the iterator to the first record after the merged
151 // records, so even though we reached the end of the merge output, we do
152 // not want to advance the iterator.
153 NextFromInput();
154 }
155 } else {
156 // Only advance the input iterator if there is no merge output and the
157 // iterator is not already at the next record.
158 if (!at_next_) {
159 input_->Next();
160 }
161 NextFromInput();
162 }
163
164 if (valid_) {
165 // Record that we've outputted a record for the current key.
166 has_outputted_key_ = true;
167 }
168
169 PrepareOutput();
170}
171
11fdf7f2
TL
172void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
173 Slice* skip_until) {
174 if (compaction_filter_ != nullptr &&
494da23a 175 (ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex)) {
11fdf7f2
TL
176 // If the user has specified a compaction filter and the sequence
177 // number is greater than any external snapshot, then invoke the
178 // filter. If the return value of the compaction filter is true,
179 // replace the entry with a deletion marker.
180 CompactionFilter::Decision filter;
181 compaction_filter_value_.clear();
182 compaction_filter_skip_until_.Clear();
183 CompactionFilter::ValueType value_type =
184 ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue
185 : CompactionFilter::ValueType::kBlobIndex;
186 // Hack: pass internal key to BlobIndexCompactionFilter since it needs
187 // to get sequence number.
188 Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_;
189 {
190 StopWatchNano timer(env_, report_detailed_time_);
191 filter = compaction_filter_->FilterV2(
192 compaction_->level(), filter_key, value_type, value_,
193 &compaction_filter_value_, compaction_filter_skip_until_.rep());
194 iter_stats_.total_filter_time +=
195 env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0;
196 }
197
198 if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil &&
199 cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <=
200 0) {
201 // Can't skip to a key smaller than the current one.
202 // Keep the key as per FilterV2 documentation.
203 filter = CompactionFilter::Decision::kKeep;
204 }
205
206 if (filter == CompactionFilter::Decision::kRemove) {
207 // convert the current key to a delete; key_ is pointing into
208 // current_key_ at this point, so updating current_key_ updates key()
209 ikey_.type = kTypeDeletion;
210 current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
211 // no value associated with delete
212 value_.clear();
213 iter_stats_.num_record_drop_user++;
214 } else if (filter == CompactionFilter::Decision::kChangeValue) {
215 value_ = compaction_filter_value_;
216 } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
217 *need_skip = true;
218 compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
219 kValueTypeForSeek);
220 *skip_until = compaction_filter_skip_until_.Encode();
221 }
222 }
223}
224
7c673cae
FG
225void CompactionIterator::NextFromInput() {
226 at_next_ = false;
227 valid_ = false;
228
229 while (!valid_ && input_->Valid() && !IsShuttingDown()) {
230 key_ = input_->key();
231 value_ = input_->value();
232 iter_stats_.num_input_records++;
233
234 if (!ParseInternalKey(key_, &ikey_)) {
235 // If `expect_valid_internal_key_` is false, return the corrupted key
236 // and let the caller decide what to do with it.
237 // TODO(noetzli): We should have a more elegant solution for this.
238 if (expect_valid_internal_key_) {
239 assert(!"Corrupted internal key not expected.");
240 status_ = Status::Corruption("Corrupted internal key not expected.");
241 break;
242 }
243 key_ = current_key_.SetInternalKey(key_);
244 has_current_user_key_ = false;
245 current_user_key_sequence_ = kMaxSequenceNumber;
246 current_user_key_snapshot_ = 0;
247 iter_stats_.num_input_corrupt_records++;
248 valid_ = true;
249 break;
250 }
494da23a 251 TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
7c673cae
FG
252
253 // Update input statistics
254 if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
255 iter_stats_.num_input_deletion_records++;
256 }
257 iter_stats_.total_input_raw_key_bytes += key_.size();
258 iter_stats_.total_input_raw_value_bytes += value_.size();
259
260 // If need_skip is true, we should seek the input iterator
261 // to internal key skip_until and continue from there.
262 bool need_skip = false;
263 // Points either into compaction_filter_skip_until_ or into
264 // merge_helper_->compaction_filter_skip_until_.
265 Slice skip_until;
266
267 // Check whether the user key changed. After this if statement current_key_
268 // is a copy of the current input key (maybe converted to a delete by the
269 // compaction filter). ikey_.user_key is pointing to the copy.
270 if (!has_current_user_key_ ||
271 !cmp_->Equal(ikey_.user_key, current_user_key_)) {
272 // First occurrence of this user key
273 // Copy key for output
274 key_ = current_key_.SetInternalKey(key_, &ikey_);
275 current_user_key_ = ikey_.user_key;
276 has_current_user_key_ = true;
277 has_outputted_key_ = false;
278 current_user_key_sequence_ = kMaxSequenceNumber;
279 current_user_key_snapshot_ = 0;
494da23a 280 current_key_committed_ = KeyCommitted(ikey_.sequence);
11fdf7f2
TL
281
282 // Apply the compaction filter to the first committed version of the user
283 // key.
284 if (current_key_committed_) {
285 InvokeFilterIfNeeded(&need_skip, &skip_until);
7c673cae
FG
286 }
287 } else {
7c673cae
FG
288 // Update the current key to reflect the new sequence number/type without
289 // copying the user key.
290 // TODO(rven): Compaction filter does not process keys in this path
291 // Need to have the compaction filter process multiple versions
292 // if we have versions on both sides of a snapshot
293 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
294 key_ = current_key_.GetInternalKey();
295 ikey_.user_key = current_key_.GetUserKey();
11fdf7f2
TL
296
297 // Note that newer version of a key is ordered before older versions. If a
298 // newer version of a key is committed, so as the older version. No need
299 // to query snapshot_checker_ in that case.
300 if (UNLIKELY(!current_key_committed_)) {
301 assert(snapshot_checker_ != nullptr);
494da23a 302 current_key_committed_ = KeyCommitted(ikey_.sequence);
11fdf7f2
TL
303 // Apply the compaction filter to the first committed version of the
304 // user key.
305 if (current_key_committed_) {
306 InvokeFilterIfNeeded(&need_skip, &skip_until);
307 }
308 }
309 }
310
311 if (UNLIKELY(!current_key_committed_)) {
312 assert(snapshot_checker_ != nullptr);
313 valid_ = true;
314 break;
7c673cae
FG
315 }
316
317 // If there are no snapshots, then this kv affect visibility at tip.
318 // Otherwise, search though all existing snapshots to find the earliest
319 // snapshot that is affected by this kv.
11fdf7f2
TL
320 SequenceNumber last_sequence __attribute__((__unused__));
321 last_sequence = current_user_key_sequence_;
7c673cae
FG
322 current_user_key_sequence_ = ikey_.sequence;
323 SequenceNumber last_snapshot = current_user_key_snapshot_;
324 SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
325 current_user_key_snapshot_ =
326 visible_at_tip_
327 ? earliest_snapshot_
328 : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot);
329
330 if (need_skip) {
331 // This case is handled below.
332 } else if (clear_and_output_next_key_) {
333 // In the previous iteration we encountered a single delete that we could
334 // not compact out. We will keep this Put, but can drop it's data.
335 // (See Optimization 3, below.)
336 assert(ikey_.type == kTypeValue);
337 assert(current_user_key_snapshot_ == last_snapshot);
338
339 value_.clear();
340 valid_ = true;
341 clear_and_output_next_key_ = false;
342 } else if (ikey_.type == kTypeSingleDeletion) {
343 // We can compact out a SingleDelete if:
344 // 1) We encounter the corresponding PUT -OR- we know that this key
345 // doesn't appear past this output level
346 // =AND=
347 // 2) We've already returned a record in this snapshot -OR-
348 // there are no earlier earliest_write_conflict_snapshot.
349 //
350 // Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to
351 // allow Transactions to do write-conflict checking (if we compacted away
352 // all keys, then we wouldn't know that a write happened in this
353 // snapshot). If there is no earlier snapshot, then we know that there
354 // are no active transactions that need to know about any writes.
355 //
356 // Optimization 3:
357 // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT
358 // true, then we must output a SingleDelete. In this case, we will decide
359 // to also output the PUT. While we are compacting less by outputting the
360 // PUT now, hopefully this will lead to better compaction in the future
361 // when Rule 2 is later true (Ie, We are hoping we can later compact out
362 // both the SingleDelete and the Put, while we couldn't if we only
363 // outputted the SingleDelete now).
364 // In this case, we can save space by removing the PUT's value as it will
365 // never be read.
366 //
367 // Deletes and Merges are not supported on the same key that has a
368 // SingleDelete as it is not possible to correctly do any partial
369 // compaction of such a combination of operations. The result of mixing
370 // those operations for a given key is documented as being undefined. So
371 // we can choose how to handle such a combinations of operations. We will
372 // try to compact out as much as we can in these cases.
373 // We will report counts on these anomalous cases.
374
375 // The easiest way to process a SingleDelete during iteration is to peek
376 // ahead at the next key.
377 ParsedInternalKey next_ikey;
378 input_->Next();
379
380 // Check whether the next key exists, is not corrupt, and is the same key
381 // as the single delete.
382 if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
383 cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
384 // Check whether the next key belongs to the same snapshot as the
385 // SingleDelete.
494da23a
TL
386 if (prev_snapshot == 0 ||
387 DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot)) {
7c673cae
FG
388 if (next_ikey.type == kTypeSingleDeletion) {
389 // We encountered two SingleDeletes in a row. This could be due to
390 // unexpected user input.
391 // Skip the first SingleDelete and let the next iteration decide how
392 // to handle the second SingleDelete
393
394 // First SingleDelete has been skipped since we already called
395 // input_->Next().
396 ++iter_stats_.num_record_drop_obsolete;
397 ++iter_stats_.num_single_del_mismatch;
11fdf7f2 398 } else if (has_outputted_key_ ||
494da23a
TL
399 DEFINITELY_IN_SNAPSHOT(
400 ikey_.sequence, earliest_write_conflict_snapshot_)) {
7c673cae
FG
401 // Found a matching value, we can drop the single delete and the
402 // value. It is safe to drop both records since we've already
403 // outputted a key in this snapshot, or there is no earlier
404 // snapshot (Rule 2 above).
405
406 // Note: it doesn't matter whether the second key is a Put or if it
407 // is an unexpected Merge or Delete. We will compact it out
408 // either way. We will maintain counts of how many mismatches
409 // happened
494da23a
TL
410 if (next_ikey.type != kTypeValue &&
411 next_ikey.type != kTypeBlobIndex) {
7c673cae
FG
412 ++iter_stats_.num_single_del_mismatch;
413 }
414
415 ++iter_stats_.num_record_drop_hidden;
416 ++iter_stats_.num_record_drop_obsolete;
417 // Already called input_->Next() once. Call it a second time to
418 // skip past the second key.
419 input_->Next();
420 } else {
421 // Found a matching value, but we cannot drop both keys since
422 // there is an earlier snapshot and we need to leave behind a record
423 // to know that a write happened in this snapshot (Rule 2 above).
424 // Clear the value and output the SingleDelete. (The value will be
425 // outputted on the next iteration.)
426
427 // Setting valid_ to true will output the current SingleDelete
428 valid_ = true;
429
430 // Set up the Put to be outputted in the next iteration.
431 // (Optimization 3).
432 clear_and_output_next_key_ = true;
433 }
434 } else {
435 // We hit the next snapshot without hitting a put, so the iterator
436 // returns the single delete.
437 valid_ = true;
438 }
439 } else {
440 // We are at the end of the input, could not parse the next key, or hit
441 // a different key. The iterator returns the single delete if the key
442 // possibly exists beyond the current output level. We set
443 // has_current_user_key to false so that if the iterator is at the next
444 // key, we do not compare it again against the previous key at the next
445 // iteration. If the next key is corrupt, we return before the
446 // comparison, so the value of has_current_user_key does not matter.
447 has_current_user_key_ = false;
494da23a 448 if (compaction_ != nullptr && IN_EARLIEST_SNAPSHOT(ikey_.sequence) &&
7c673cae
FG
449 compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
450 &level_ptrs_)) {
451 // Key doesn't exist outside of this range.
452 // Can compact out this SingleDelete.
453 ++iter_stats_.num_record_drop_obsolete;
454 ++iter_stats_.num_single_del_fallthru;
11fdf7f2
TL
455 if (!bottommost_level_) {
456 ++iter_stats_.num_optimized_del_drop_obsolete;
457 }
7c673cae
FG
458 } else {
459 // Output SingleDelete
460 valid_ = true;
461 }
462 }
463
464 if (valid_) {
465 at_next_ = true;
466 }
494da23a
TL
467 } else if (last_snapshot == current_user_key_snapshot_ ||
468 (last_snapshot > 0 &&
469 last_snapshot < current_user_key_snapshot_)) {
7c673cae
FG
470 // If the earliest snapshot is which this key is visible in
471 // is the same as the visibility of a previous instance of the
472 // same key, then this kv is not visible in any snapshot.
473 // Hidden by an newer entry for same user key
7c673cae
FG
474 //
475 // Note: Dropping this key will not affect TransactionDB write-conflict
476 // checking since there has already been a record returned for this key
477 // in this snapshot.
478 assert(last_sequence >= current_user_key_sequence_);
494da23a
TL
479
480 // Note2: if last_snapshot < current_user_key_snapshot, it can only
481 // mean last_snapshot is released between we process last value and
482 // this value, and findEarliestVisibleSnapshot returns the next snapshot
483 // as current_user_key_snapshot. In this case last value and current
484 // value are both in current_user_key_snapshot currently.
485 // Although last_snapshot is released we might still get a definitive
486 // response when key sequence number changes, e.g., when seq is determined
487 // too old and visible in all snapshots.
488 assert(last_snapshot == current_user_key_snapshot_ ||
489 (snapshot_checker_ != nullptr &&
490 snapshot_checker_->CheckInSnapshot(current_user_key_sequence_,
491 last_snapshot) !=
492 SnapshotCheckerResult::kNotInSnapshot));
493
7c673cae
FG
494 ++iter_stats_.num_record_drop_hidden; // (A)
495 input_->Next();
496 } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
494da23a 497 IN_EARLIEST_SNAPSHOT(ikey_.sequence) &&
11fdf7f2 498 ikeyNotNeededForIncrementalSnapshot() &&
7c673cae
FG
499 compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
500 &level_ptrs_)) {
501 // TODO(noetzli): This is the only place where we use compaction_
502 // (besides the constructor). We should probably get rid of this
503 // dependency and find a way to do similar filtering during flushes.
504 //
505 // For this user key:
506 // (1) there is no data in higher levels
507 // (2) data in lower levels will have larger sequence numbers
508 // (3) data in layers that are being compacted here and have
509 // smaller sequence numbers will be dropped in the next
510 // few iterations of this loop (by rule (A) above).
511 // Therefore this deletion marker is obsolete and can be dropped.
512 //
513 // Note: Dropping this Delete will not affect TransactionDB
514 // write-conflict checking since it is earlier than any snapshot.
11fdf7f2
TL
515 //
516 // It seems that we can also drop deletion later than earliest snapshot
517 // given that:
518 // (1) The deletion is earlier than earliest_write_conflict_snapshot, and
519 // (2) No value exist earlier than the deletion.
7c673cae 520 ++iter_stats_.num_record_drop_obsolete;
11fdf7f2
TL
521 if (!bottommost_level_) {
522 ++iter_stats_.num_optimized_del_drop_obsolete;
523 }
524 input_->Next();
525 } else if ((ikey_.type == kTypeDeletion) && bottommost_level_ &&
526 ikeyNotNeededForIncrementalSnapshot()) {
527 // Handle the case where we have a delete key at the bottom most level
528 // We can skip outputting the key iff there are no subsequent puts for this
529 // key
530 ParsedInternalKey next_ikey;
7c673cae 531 input_->Next();
11fdf7f2
TL
532 // Skip over all versions of this key that happen to occur in the same snapshot
533 // range as the delete
494da23a 534 while (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
11fdf7f2 535 cmp_->Equal(ikey_.user_key, next_ikey.user_key) &&
494da23a
TL
536 (prev_snapshot == 0 ||
537 DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) {
11fdf7f2
TL
538 input_->Next();
539 }
540 // If you find you still need to output a row with this key, we need to output the
541 // delete too
542 if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
543 cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
544 valid_ = true;
545 at_next_ = true;
546 }
7c673cae
FG
547 } else if (ikey_.type == kTypeMerge) {
548 if (!merge_helper_->HasOperator()) {
549 status_ = Status::InvalidArgument(
550 "merge_operator is not properly initialized.");
551 return;
552 }
553
554 pinned_iters_mgr_.StartPinning();
555 // We know the merge type entry is not hidden, otherwise we would
556 // have hit (A)
557 // We encapsulate the merge related state machine in a different
558 // object to minimize change to the existing flow.
559 Status s = merge_helper_->MergeUntil(input_, range_del_agg_,
560 prev_snapshot, bottommost_level_);
561 merge_out_iter_.SeekToFirst();
562
563 if (!s.ok() && !s.IsMergeInProgress()) {
564 status_ = s;
565 return;
566 } else if (merge_out_iter_.Valid()) {
567 // NOTE: key, value, and ikey_ refer to old entries.
568 // These will be correctly set below.
569 key_ = merge_out_iter_.key();
570 value_ = merge_out_iter_.value();
11fdf7f2
TL
571 bool valid_key __attribute__((__unused__));
572 valid_key = ParseInternalKey(key_, &ikey_);
7c673cae
FG
573 // MergeUntil stops when it encounters a corrupt key and does not
574 // include them in the result, so we expect the keys here to valid.
575 assert(valid_key);
576 // Keep current_key_ in sync.
577 current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
578 key_ = current_key_.GetInternalKey();
579 ikey_.user_key = current_key_.GetUserKey();
580 valid_ = true;
581 } else {
582 // all merge operands were filtered out. reset the user key, since the
583 // batch consumed by the merge operator should not shadow any keys
584 // coming after the merges
585 has_current_user_key_ = false;
586 pinned_iters_mgr_.ReleasePinnedData();
587
588 if (merge_helper_->FilteredUntil(&skip_until)) {
589 need_skip = true;
590 }
591 }
592 } else {
593 // 1. new user key -OR-
594 // 2. different snapshot stripe
595 bool should_delete = range_del_agg_->ShouldDelete(
11fdf7f2 596 key_, RangeDelPositioningMode::kForwardTraversal);
7c673cae
FG
597 if (should_delete) {
598 ++iter_stats_.num_record_drop_hidden;
599 ++iter_stats_.num_record_drop_range_del;
600 input_->Next();
601 } else {
602 valid_ = true;
603 }
604 }
605
606 if (need_skip) {
607 input_->Seek(skip_until);
608 }
609 }
610
611 if (!valid_ && IsShuttingDown()) {
612 status_ = Status::ShutdownInProgress();
613 }
614}
615
616void CompactionIterator::PrepareOutput() {
617 // Zeroing out the sequence number leads to better compression.
618 // If this is the bottommost level (no files in lower levels)
619 // and the earliest snapshot is larger than this seqno
620 // and the userkey differs from the last userkey in compaction
621 // then we can squash the seqno to zero.
11fdf7f2 622 //
7c673cae
FG
623 // This is safe for TransactionDB write-conflict checking since transactions
624 // only care about sequence number larger than any active snapshots.
11fdf7f2
TL
625 //
626 // Can we do the same for levels above bottom level as long as
627 // KeyNotExistsBeyondOutputLevel() return true?
494da23a
TL
628 if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) &&
629 ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ && valid_ &&
630 IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikey_.type != kTypeMerge) {
7c673cae
FG
631 assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
632 ikey_.sequence = 0;
633 current_key_.UpdateInternalKey(0, ikey_.type);
634 }
635}
636
637inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot(
638 SequenceNumber in, SequenceNumber* prev_snapshot) {
639 assert(snapshots_->size());
11fdf7f2
TL
640 auto snapshots_iter = std::lower_bound(
641 snapshots_->begin(), snapshots_->end(), in);
642 if (snapshots_iter == snapshots_->begin()) {
643 *prev_snapshot = 0;
644 } else {
645 *prev_snapshot = *std::prev(snapshots_iter);
646 assert(*prev_snapshot < in);
647 }
494da23a
TL
648 if (snapshot_checker_ == nullptr) {
649 return snapshots_iter != snapshots_->end()
650 ? *snapshots_iter : kMaxSequenceNumber;
651 }
652 bool has_released_snapshot = !released_snapshots_.empty();
11fdf7f2
TL
653 for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) {
654 auto cur = *snapshots_iter;
655 assert(in <= cur);
494da23a
TL
656 // Skip if cur is in released_snapshots.
657 if (has_released_snapshot && released_snapshots_.count(cur) > 0) {
658 continue;
659 }
660 auto res = snapshot_checker_->CheckInSnapshot(in, cur);
661 if (res == SnapshotCheckerResult::kInSnapshot) {
7c673cae 662 return cur;
494da23a
TL
663 } else if (res == SnapshotCheckerResult::kSnapshotReleased) {
664 released_snapshots_.insert(cur);
7c673cae 665 }
11fdf7f2 666 *prev_snapshot = cur;
7c673cae 667 }
7c673cae
FG
668 return kMaxSequenceNumber;
669}
670
11fdf7f2
TL
671// used in 2 places - prevents deletion markers to be dropped if they may be
672// needed and disables seqnum zero-out in PrepareOutput for recent keys.
673inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() {
674 return (!compaction_->preserve_deletes()) ||
675 (ikey_.sequence < preserve_deletes_seqnum_);
676}
677
494da23a
TL
678bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) {
679 assert(snapshot_checker_ != nullptr);
680 assert(earliest_snapshot_ == kMaxSequenceNumber ||
681 (earliest_snapshot_iter_ != snapshots_->end() &&
682 *earliest_snapshot_iter_ == earliest_snapshot_));
683 auto in_snapshot =
684 snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
685 while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) {
686 // Avoid the the current earliest_snapshot_ being return as
687 // earliest visible snapshot for the next value. So if a value's sequence
688 // is zero-ed out by PrepareOutput(), the next value will be compact out.
689 released_snapshots_.insert(earliest_snapshot_);
690 earliest_snapshot_iter_++;
691
692 if (earliest_snapshot_iter_ == snapshots_->end()) {
693 earliest_snapshot_ = kMaxSequenceNumber;
694 } else {
695 earliest_snapshot_ = *earliest_snapshot_iter_;
696 }
697 in_snapshot =
698 snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_);
699 }
700 assert(in_snapshot != SnapshotCheckerResult::kSnapshotReleased);
701 return in_snapshot == SnapshotCheckerResult::kInSnapshot;
702}
703
7c673cae 704} // namespace rocksdb