]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | |
6 | #include "db/compaction_iterator.h" | |
11fdf7f2 TL |
7 | |
8 | #include "db/snapshot_checker.h" | |
9 | #include "port/likely.h" | |
10 | #include "rocksdb/listener.h" | |
7c673cae | 11 | #include "table/internal_iterator.h" |
494da23a TL |
12 | #include "util/sync_point.h" |
13 | ||
14 | #define DEFINITELY_IN_SNAPSHOT(seq, snapshot) \ | |
15 | ((seq) <= (snapshot) && \ | |
16 | (snapshot_checker_ == nullptr || \ | |
17 | LIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \ | |
18 | SnapshotCheckerResult::kInSnapshot))) | |
19 | ||
20 | #define DEFINITELY_NOT_IN_SNAPSHOT(seq, snapshot) \ | |
21 | ((seq) > (snapshot) || \ | |
22 | (snapshot_checker_ != nullptr && \ | |
23 | UNLIKELY(snapshot_checker_->CheckInSnapshot((seq), (snapshot)) == \ | |
24 | SnapshotCheckerResult::kNotInSnapshot))) | |
25 | ||
26 | #define IN_EARLIEST_SNAPSHOT(seq) \ | |
27 | ((seq) <= earliest_snapshot_ && \ | |
28 | (snapshot_checker_ == nullptr || LIKELY(IsInEarliestSnapshot(seq)))) | |
7c673cae FG |
29 | |
30 | namespace rocksdb { | |
31 | ||
7c673cae FG |
32 | CompactionIterator::CompactionIterator( |
33 | InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, | |
34 | SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots, | |
11fdf7f2 TL |
35 | SequenceNumber earliest_write_conflict_snapshot, |
36 | const SnapshotChecker* snapshot_checker, Env* env, | |
37 | bool report_detailed_time, bool expect_valid_internal_key, | |
494da23a | 38 | CompactionRangeDelAggregator* range_del_agg, const Compaction* compaction, |
11fdf7f2 TL |
39 | const CompactionFilter* compaction_filter, |
40 | const std::atomic<bool>* shutting_down, | |
41 | const SequenceNumber preserve_deletes_seqnum) | |
7c673cae FG |
42 | : CompactionIterator( |
43 | input, cmp, merge_helper, last_sequence, snapshots, | |
11fdf7f2 TL |
44 | earliest_write_conflict_snapshot, snapshot_checker, env, |
45 | report_detailed_time, expect_valid_internal_key, range_del_agg, | |
7c673cae FG |
46 | std::unique_ptr<CompactionProxy>( |
47 | compaction ? new CompactionProxy(compaction) : nullptr), | |
11fdf7f2 | 48 | compaction_filter, shutting_down, preserve_deletes_seqnum) {} |
7c673cae FG |
49 | |
50 | CompactionIterator::CompactionIterator( | |
51 | InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, | |
11fdf7f2 TL |
52 | SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots, |
53 | SequenceNumber earliest_write_conflict_snapshot, | |
54 | const SnapshotChecker* snapshot_checker, Env* env, | |
55 | bool report_detailed_time, bool expect_valid_internal_key, | |
494da23a | 56 | CompactionRangeDelAggregator* range_del_agg, |
7c673cae FG |
57 | std::unique_ptr<CompactionProxy> compaction, |
58 | const CompactionFilter* compaction_filter, | |
11fdf7f2 TL |
59 | const std::atomic<bool>* shutting_down, |
60 | const SequenceNumber preserve_deletes_seqnum) | |
7c673cae FG |
61 | : input_(input), |
62 | cmp_(cmp), | |
63 | merge_helper_(merge_helper), | |
64 | snapshots_(snapshots), | |
65 | earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), | |
11fdf7f2 | 66 | snapshot_checker_(snapshot_checker), |
7c673cae | 67 | env_(env), |
11fdf7f2 | 68 | report_detailed_time_(report_detailed_time), |
7c673cae FG |
69 | expect_valid_internal_key_(expect_valid_internal_key), |
70 | range_del_agg_(range_del_agg), | |
71 | compaction_(std::move(compaction)), | |
72 | compaction_filter_(compaction_filter), | |
7c673cae | 73 | shutting_down_(shutting_down), |
11fdf7f2 | 74 | preserve_deletes_seqnum_(preserve_deletes_seqnum), |
11fdf7f2 TL |
75 | current_user_key_sequence_(0), |
76 | current_user_key_snapshot_(0), | |
77 | merge_out_iter_(merge_helper_), | |
78 | current_key_committed_(false) { | |
7c673cae | 79 | assert(compaction_filter_ == nullptr || compaction_ != nullptr); |
494da23a | 80 | assert(snapshots_ != nullptr); |
7c673cae FG |
81 | bottommost_level_ = |
82 | compaction_ == nullptr ? false : compaction_->bottommost_level(); | |
83 | if (compaction_ != nullptr) { | |
84 | level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0); | |
85 | } | |
7c673cae FG |
86 | if (snapshots_->size() == 0) { |
87 | // optimize for fast path if there are no snapshots | |
88 | visible_at_tip_ = true; | |
494da23a | 89 | earliest_snapshot_iter_ = snapshots_->end(); |
7c673cae FG |
90 | earliest_snapshot_ = kMaxSequenceNumber; |
91 | latest_snapshot_ = 0; | |
92 | } else { | |
93 | visible_at_tip_ = false; | |
494da23a | 94 | earliest_snapshot_iter_ = snapshots_->begin(); |
7c673cae FG |
95 | earliest_snapshot_ = snapshots_->at(0); |
96 | latest_snapshot_ = snapshots_->back(); | |
97 | } | |
11fdf7f2 TL |
98 | #ifndef NDEBUG |
99 | // findEarliestVisibleSnapshot assumes this ordering. | |
100 | for (size_t i = 1; i < snapshots_->size(); ++i) { | |
494da23a | 101 | assert(snapshots_->at(i - 1) < snapshots_->at(i)); |
11fdf7f2 TL |
102 | } |
103 | #endif | |
7c673cae | 104 | input_->SetPinnedItersMgr(&pinned_iters_mgr_); |
494da23a | 105 | TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get()); |
7c673cae FG |
106 | } |
107 | ||
108 | CompactionIterator::~CompactionIterator() { | |
109 | // input_ Iteartor lifetime is longer than pinned_iters_mgr_ lifetime | |
110 | input_->SetPinnedItersMgr(nullptr); | |
111 | } | |
112 | ||
113 | void CompactionIterator::ResetRecordCounts() { | |
114 | iter_stats_.num_record_drop_user = 0; | |
115 | iter_stats_.num_record_drop_hidden = 0; | |
116 | iter_stats_.num_record_drop_obsolete = 0; | |
117 | iter_stats_.num_record_drop_range_del = 0; | |
118 | iter_stats_.num_range_del_drop_obsolete = 0; | |
11fdf7f2 | 119 | iter_stats_.num_optimized_del_drop_obsolete = 0; |
7c673cae FG |
120 | } |
121 | ||
122 | void CompactionIterator::SeekToFirst() { | |
123 | NextFromInput(); | |
124 | PrepareOutput(); | |
125 | } | |
126 | ||
127 | void CompactionIterator::Next() { | |
128 | // If there is a merge output, return it before continuing to process the | |
129 | // input. | |
130 | if (merge_out_iter_.Valid()) { | |
131 | merge_out_iter_.Next(); | |
132 | ||
133 | // Check if we returned all records of the merge output. | |
134 | if (merge_out_iter_.Valid()) { | |
135 | key_ = merge_out_iter_.key(); | |
136 | value_ = merge_out_iter_.value(); | |
11fdf7f2 TL |
137 | bool valid_key __attribute__((__unused__)); |
138 | valid_key = ParseInternalKey(key_, &ikey_); | |
7c673cae FG |
139 | // MergeUntil stops when it encounters a corrupt key and does not |
140 | // include them in the result, so we expect the keys here to be valid. | |
141 | assert(valid_key); | |
142 | // Keep current_key_ in sync. | |
143 | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); | |
144 | key_ = current_key_.GetInternalKey(); | |
145 | ikey_.user_key = current_key_.GetUserKey(); | |
146 | valid_ = true; | |
147 | } else { | |
148 | // We consumed all pinned merge operands, release pinned iterators | |
149 | pinned_iters_mgr_.ReleasePinnedData(); | |
150 | // MergeHelper moves the iterator to the first record after the merged | |
151 | // records, so even though we reached the end of the merge output, we do | |
152 | // not want to advance the iterator. | |
153 | NextFromInput(); | |
154 | } | |
155 | } else { | |
156 | // Only advance the input iterator if there is no merge output and the | |
157 | // iterator is not already at the next record. | |
158 | if (!at_next_) { | |
159 | input_->Next(); | |
160 | } | |
161 | NextFromInput(); | |
162 | } | |
163 | ||
164 | if (valid_) { | |
165 | // Record that we've outputted a record for the current key. | |
166 | has_outputted_key_ = true; | |
167 | } | |
168 | ||
169 | PrepareOutput(); | |
170 | } | |
171 | ||
11fdf7f2 TL |
172 | void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, |
173 | Slice* skip_until) { | |
174 | if (compaction_filter_ != nullptr && | |
494da23a | 175 | (ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex)) { |
11fdf7f2 TL |
176 | // If the user has specified a compaction filter and the sequence |
177 | // number is greater than any external snapshot, then invoke the | |
178 | // filter. If the return value of the compaction filter is true, | |
179 | // replace the entry with a deletion marker. | |
180 | CompactionFilter::Decision filter; | |
181 | compaction_filter_value_.clear(); | |
182 | compaction_filter_skip_until_.Clear(); | |
183 | CompactionFilter::ValueType value_type = | |
184 | ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue | |
185 | : CompactionFilter::ValueType::kBlobIndex; | |
186 | // Hack: pass internal key to BlobIndexCompactionFilter since it needs | |
187 | // to get sequence number. | |
188 | Slice& filter_key = ikey_.type == kTypeValue ? ikey_.user_key : key_; | |
189 | { | |
190 | StopWatchNano timer(env_, report_detailed_time_); | |
191 | filter = compaction_filter_->FilterV2( | |
192 | compaction_->level(), filter_key, value_type, value_, | |
193 | &compaction_filter_value_, compaction_filter_skip_until_.rep()); | |
194 | iter_stats_.total_filter_time += | |
195 | env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0; | |
196 | } | |
197 | ||
198 | if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil && | |
199 | cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <= | |
200 | 0) { | |
201 | // Can't skip to a key smaller than the current one. | |
202 | // Keep the key as per FilterV2 documentation. | |
203 | filter = CompactionFilter::Decision::kKeep; | |
204 | } | |
205 | ||
206 | if (filter == CompactionFilter::Decision::kRemove) { | |
207 | // convert the current key to a delete; key_ is pointing into | |
208 | // current_key_ at this point, so updating current_key_ updates key() | |
209 | ikey_.type = kTypeDeletion; | |
210 | current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion); | |
211 | // no value associated with delete | |
212 | value_.clear(); | |
213 | iter_stats_.num_record_drop_user++; | |
214 | } else if (filter == CompactionFilter::Decision::kChangeValue) { | |
215 | value_ = compaction_filter_value_; | |
216 | } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { | |
217 | *need_skip = true; | |
218 | compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, | |
219 | kValueTypeForSeek); | |
220 | *skip_until = compaction_filter_skip_until_.Encode(); | |
221 | } | |
222 | } | |
223 | } | |
224 | ||
7c673cae FG |
225 | void CompactionIterator::NextFromInput() { |
226 | at_next_ = false; | |
227 | valid_ = false; | |
228 | ||
229 | while (!valid_ && input_->Valid() && !IsShuttingDown()) { | |
230 | key_ = input_->key(); | |
231 | value_ = input_->value(); | |
232 | iter_stats_.num_input_records++; | |
233 | ||
234 | if (!ParseInternalKey(key_, &ikey_)) { | |
235 | // If `expect_valid_internal_key_` is false, return the corrupted key | |
236 | // and let the caller decide what to do with it. | |
237 | // TODO(noetzli): We should have a more elegant solution for this. | |
238 | if (expect_valid_internal_key_) { | |
239 | assert(!"Corrupted internal key not expected."); | |
240 | status_ = Status::Corruption("Corrupted internal key not expected."); | |
241 | break; | |
242 | } | |
243 | key_ = current_key_.SetInternalKey(key_); | |
244 | has_current_user_key_ = false; | |
245 | current_user_key_sequence_ = kMaxSequenceNumber; | |
246 | current_user_key_snapshot_ = 0; | |
247 | iter_stats_.num_input_corrupt_records++; | |
248 | valid_ = true; | |
249 | break; | |
250 | } | |
494da23a | 251 | TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_); |
7c673cae FG |
252 | |
253 | // Update input statistics | |
254 | if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) { | |
255 | iter_stats_.num_input_deletion_records++; | |
256 | } | |
257 | iter_stats_.total_input_raw_key_bytes += key_.size(); | |
258 | iter_stats_.total_input_raw_value_bytes += value_.size(); | |
259 | ||
260 | // If need_skip is true, we should seek the input iterator | |
261 | // to internal key skip_until and continue from there. | |
262 | bool need_skip = false; | |
263 | // Points either into compaction_filter_skip_until_ or into | |
264 | // merge_helper_->compaction_filter_skip_until_. | |
265 | Slice skip_until; | |
266 | ||
267 | // Check whether the user key changed. After this if statement current_key_ | |
268 | // is a copy of the current input key (maybe converted to a delete by the | |
269 | // compaction filter). ikey_.user_key is pointing to the copy. | |
270 | if (!has_current_user_key_ || | |
271 | !cmp_->Equal(ikey_.user_key, current_user_key_)) { | |
272 | // First occurrence of this user key | |
273 | // Copy key for output | |
274 | key_ = current_key_.SetInternalKey(key_, &ikey_); | |
275 | current_user_key_ = ikey_.user_key; | |
276 | has_current_user_key_ = true; | |
277 | has_outputted_key_ = false; | |
278 | current_user_key_sequence_ = kMaxSequenceNumber; | |
279 | current_user_key_snapshot_ = 0; | |
494da23a | 280 | current_key_committed_ = KeyCommitted(ikey_.sequence); |
11fdf7f2 TL |
281 | |
282 | // Apply the compaction filter to the first committed version of the user | |
283 | // key. | |
284 | if (current_key_committed_) { | |
285 | InvokeFilterIfNeeded(&need_skip, &skip_until); | |
7c673cae FG |
286 | } |
287 | } else { | |
7c673cae FG |
288 | // Update the current key to reflect the new sequence number/type without |
289 | // copying the user key. | |
290 | // TODO(rven): Compaction filter does not process keys in this path | |
291 | // Need to have the compaction filter process multiple versions | |
292 | // if we have versions on both sides of a snapshot | |
293 | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); | |
294 | key_ = current_key_.GetInternalKey(); | |
295 | ikey_.user_key = current_key_.GetUserKey(); | |
11fdf7f2 TL |
296 | |
297 | // Note that newer version of a key is ordered before older versions. If a | |
298 | // newer version of a key is committed, so as the older version. No need | |
299 | // to query snapshot_checker_ in that case. | |
300 | if (UNLIKELY(!current_key_committed_)) { | |
301 | assert(snapshot_checker_ != nullptr); | |
494da23a | 302 | current_key_committed_ = KeyCommitted(ikey_.sequence); |
11fdf7f2 TL |
303 | // Apply the compaction filter to the first committed version of the |
304 | // user key. | |
305 | if (current_key_committed_) { | |
306 | InvokeFilterIfNeeded(&need_skip, &skip_until); | |
307 | } | |
308 | } | |
309 | } | |
310 | ||
311 | if (UNLIKELY(!current_key_committed_)) { | |
312 | assert(snapshot_checker_ != nullptr); | |
313 | valid_ = true; | |
314 | break; | |
7c673cae FG |
315 | } |
316 | ||
317 | // If there are no snapshots, then this kv affect visibility at tip. | |
318 | // Otherwise, search though all existing snapshots to find the earliest | |
319 | // snapshot that is affected by this kv. | |
11fdf7f2 TL |
320 | SequenceNumber last_sequence __attribute__((__unused__)); |
321 | last_sequence = current_user_key_sequence_; | |
7c673cae FG |
322 | current_user_key_sequence_ = ikey_.sequence; |
323 | SequenceNumber last_snapshot = current_user_key_snapshot_; | |
324 | SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot | |
325 | current_user_key_snapshot_ = | |
326 | visible_at_tip_ | |
327 | ? earliest_snapshot_ | |
328 | : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot); | |
329 | ||
330 | if (need_skip) { | |
331 | // This case is handled below. | |
332 | } else if (clear_and_output_next_key_) { | |
333 | // In the previous iteration we encountered a single delete that we could | |
334 | // not compact out. We will keep this Put, but can drop it's data. | |
335 | // (See Optimization 3, below.) | |
336 | assert(ikey_.type == kTypeValue); | |
337 | assert(current_user_key_snapshot_ == last_snapshot); | |
338 | ||
339 | value_.clear(); | |
340 | valid_ = true; | |
341 | clear_and_output_next_key_ = false; | |
342 | } else if (ikey_.type == kTypeSingleDeletion) { | |
343 | // We can compact out a SingleDelete if: | |
344 | // 1) We encounter the corresponding PUT -OR- we know that this key | |
345 | // doesn't appear past this output level | |
346 | // =AND= | |
347 | // 2) We've already returned a record in this snapshot -OR- | |
348 | // there are no earlier earliest_write_conflict_snapshot. | |
349 | // | |
350 | // Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to | |
351 | // allow Transactions to do write-conflict checking (if we compacted away | |
352 | // all keys, then we wouldn't know that a write happened in this | |
353 | // snapshot). If there is no earlier snapshot, then we know that there | |
354 | // are no active transactions that need to know about any writes. | |
355 | // | |
356 | // Optimization 3: | |
357 | // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT | |
358 | // true, then we must output a SingleDelete. In this case, we will decide | |
359 | // to also output the PUT. While we are compacting less by outputting the | |
360 | // PUT now, hopefully this will lead to better compaction in the future | |
361 | // when Rule 2 is later true (Ie, We are hoping we can later compact out | |
362 | // both the SingleDelete and the Put, while we couldn't if we only | |
363 | // outputted the SingleDelete now). | |
364 | // In this case, we can save space by removing the PUT's value as it will | |
365 | // never be read. | |
366 | // | |
367 | // Deletes and Merges are not supported on the same key that has a | |
368 | // SingleDelete as it is not possible to correctly do any partial | |
369 | // compaction of such a combination of operations. The result of mixing | |
370 | // those operations for a given key is documented as being undefined. So | |
371 | // we can choose how to handle such a combinations of operations. We will | |
372 | // try to compact out as much as we can in these cases. | |
373 | // We will report counts on these anomalous cases. | |
374 | ||
375 | // The easiest way to process a SingleDelete during iteration is to peek | |
376 | // ahead at the next key. | |
377 | ParsedInternalKey next_ikey; | |
378 | input_->Next(); | |
379 | ||
380 | // Check whether the next key exists, is not corrupt, and is the same key | |
381 | // as the single delete. | |
382 | if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) && | |
383 | cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { | |
384 | // Check whether the next key belongs to the same snapshot as the | |
385 | // SingleDelete. | |
494da23a TL |
386 | if (prev_snapshot == 0 || |
387 | DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot)) { | |
7c673cae FG |
388 | if (next_ikey.type == kTypeSingleDeletion) { |
389 | // We encountered two SingleDeletes in a row. This could be due to | |
390 | // unexpected user input. | |
391 | // Skip the first SingleDelete and let the next iteration decide how | |
392 | // to handle the second SingleDelete | |
393 | ||
394 | // First SingleDelete has been skipped since we already called | |
395 | // input_->Next(). | |
396 | ++iter_stats_.num_record_drop_obsolete; | |
397 | ++iter_stats_.num_single_del_mismatch; | |
11fdf7f2 | 398 | } else if (has_outputted_key_ || |
494da23a TL |
399 | DEFINITELY_IN_SNAPSHOT( |
400 | ikey_.sequence, earliest_write_conflict_snapshot_)) { | |
7c673cae FG |
401 | // Found a matching value, we can drop the single delete and the |
402 | // value. It is safe to drop both records since we've already | |
403 | // outputted a key in this snapshot, or there is no earlier | |
404 | // snapshot (Rule 2 above). | |
405 | ||
406 | // Note: it doesn't matter whether the second key is a Put or if it | |
407 | // is an unexpected Merge or Delete. We will compact it out | |
408 | // either way. We will maintain counts of how many mismatches | |
409 | // happened | |
494da23a TL |
410 | if (next_ikey.type != kTypeValue && |
411 | next_ikey.type != kTypeBlobIndex) { | |
7c673cae FG |
412 | ++iter_stats_.num_single_del_mismatch; |
413 | } | |
414 | ||
415 | ++iter_stats_.num_record_drop_hidden; | |
416 | ++iter_stats_.num_record_drop_obsolete; | |
417 | // Already called input_->Next() once. Call it a second time to | |
418 | // skip past the second key. | |
419 | input_->Next(); | |
420 | } else { | |
421 | // Found a matching value, but we cannot drop both keys since | |
422 | // there is an earlier snapshot and we need to leave behind a record | |
423 | // to know that a write happened in this snapshot (Rule 2 above). | |
424 | // Clear the value and output the SingleDelete. (The value will be | |
425 | // outputted on the next iteration.) | |
426 | ||
427 | // Setting valid_ to true will output the current SingleDelete | |
428 | valid_ = true; | |
429 | ||
430 | // Set up the Put to be outputted in the next iteration. | |
431 | // (Optimization 3). | |
432 | clear_and_output_next_key_ = true; | |
433 | } | |
434 | } else { | |
435 | // We hit the next snapshot without hitting a put, so the iterator | |
436 | // returns the single delete. | |
437 | valid_ = true; | |
438 | } | |
439 | } else { | |
440 | // We are at the end of the input, could not parse the next key, or hit | |
441 | // a different key. The iterator returns the single delete if the key | |
442 | // possibly exists beyond the current output level. We set | |
443 | // has_current_user_key to false so that if the iterator is at the next | |
444 | // key, we do not compare it again against the previous key at the next | |
445 | // iteration. If the next key is corrupt, we return before the | |
446 | // comparison, so the value of has_current_user_key does not matter. | |
447 | has_current_user_key_ = false; | |
494da23a | 448 | if (compaction_ != nullptr && IN_EARLIEST_SNAPSHOT(ikey_.sequence) && |
7c673cae FG |
449 | compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, |
450 | &level_ptrs_)) { | |
451 | // Key doesn't exist outside of this range. | |
452 | // Can compact out this SingleDelete. | |
453 | ++iter_stats_.num_record_drop_obsolete; | |
454 | ++iter_stats_.num_single_del_fallthru; | |
11fdf7f2 TL |
455 | if (!bottommost_level_) { |
456 | ++iter_stats_.num_optimized_del_drop_obsolete; | |
457 | } | |
7c673cae FG |
458 | } else { |
459 | // Output SingleDelete | |
460 | valid_ = true; | |
461 | } | |
462 | } | |
463 | ||
464 | if (valid_) { | |
465 | at_next_ = true; | |
466 | } | |
494da23a TL |
467 | } else if (last_snapshot == current_user_key_snapshot_ || |
468 | (last_snapshot > 0 && | |
469 | last_snapshot < current_user_key_snapshot_)) { | |
7c673cae FG |
470 | // If the earliest snapshot is which this key is visible in |
471 | // is the same as the visibility of a previous instance of the | |
472 | // same key, then this kv is not visible in any snapshot. | |
473 | // Hidden by an newer entry for same user key | |
7c673cae FG |
474 | // |
475 | // Note: Dropping this key will not affect TransactionDB write-conflict | |
476 | // checking since there has already been a record returned for this key | |
477 | // in this snapshot. | |
478 | assert(last_sequence >= current_user_key_sequence_); | |
494da23a TL |
479 | |
480 | // Note2: if last_snapshot < current_user_key_snapshot, it can only | |
481 | // mean last_snapshot is released between we process last value and | |
482 | // this value, and findEarliestVisibleSnapshot returns the next snapshot | |
483 | // as current_user_key_snapshot. In this case last value and current | |
484 | // value are both in current_user_key_snapshot currently. | |
485 | // Although last_snapshot is released we might still get a definitive | |
486 | // response when key sequence number changes, e.g., when seq is determined | |
487 | // too old and visible in all snapshots. | |
488 | assert(last_snapshot == current_user_key_snapshot_ || | |
489 | (snapshot_checker_ != nullptr && | |
490 | snapshot_checker_->CheckInSnapshot(current_user_key_sequence_, | |
491 | last_snapshot) != | |
492 | SnapshotCheckerResult::kNotInSnapshot)); | |
493 | ||
7c673cae FG |
494 | ++iter_stats_.num_record_drop_hidden; // (A) |
495 | input_->Next(); | |
496 | } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion && | |
494da23a | 497 | IN_EARLIEST_SNAPSHOT(ikey_.sequence) && |
11fdf7f2 | 498 | ikeyNotNeededForIncrementalSnapshot() && |
7c673cae FG |
499 | compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, |
500 | &level_ptrs_)) { | |
501 | // TODO(noetzli): This is the only place where we use compaction_ | |
502 | // (besides the constructor). We should probably get rid of this | |
503 | // dependency and find a way to do similar filtering during flushes. | |
504 | // | |
505 | // For this user key: | |
506 | // (1) there is no data in higher levels | |
507 | // (2) data in lower levels will have larger sequence numbers | |
508 | // (3) data in layers that are being compacted here and have | |
509 | // smaller sequence numbers will be dropped in the next | |
510 | // few iterations of this loop (by rule (A) above). | |
511 | // Therefore this deletion marker is obsolete and can be dropped. | |
512 | // | |
513 | // Note: Dropping this Delete will not affect TransactionDB | |
514 | // write-conflict checking since it is earlier than any snapshot. | |
11fdf7f2 TL |
515 | // |
516 | // It seems that we can also drop deletion later than earliest snapshot | |
517 | // given that: | |
518 | // (1) The deletion is earlier than earliest_write_conflict_snapshot, and | |
519 | // (2) No value exist earlier than the deletion. | |
7c673cae | 520 | ++iter_stats_.num_record_drop_obsolete; |
11fdf7f2 TL |
521 | if (!bottommost_level_) { |
522 | ++iter_stats_.num_optimized_del_drop_obsolete; | |
523 | } | |
524 | input_->Next(); | |
525 | } else if ((ikey_.type == kTypeDeletion) && bottommost_level_ && | |
526 | ikeyNotNeededForIncrementalSnapshot()) { | |
527 | // Handle the case where we have a delete key at the bottom most level | |
528 | // We can skip outputting the key iff there are no subsequent puts for this | |
529 | // key | |
530 | ParsedInternalKey next_ikey; | |
7c673cae | 531 | input_->Next(); |
11fdf7f2 TL |
532 | // Skip over all versions of this key that happen to occur in the same snapshot |
533 | // range as the delete | |
494da23a | 534 | while (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) && |
11fdf7f2 | 535 | cmp_->Equal(ikey_.user_key, next_ikey.user_key) && |
494da23a TL |
536 | (prev_snapshot == 0 || |
537 | DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) { | |
11fdf7f2 TL |
538 | input_->Next(); |
539 | } | |
540 | // If you find you still need to output a row with this key, we need to output the | |
541 | // delete too | |
542 | if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) && | |
543 | cmp_->Equal(ikey_.user_key, next_ikey.user_key)) { | |
544 | valid_ = true; | |
545 | at_next_ = true; | |
546 | } | |
7c673cae FG |
547 | } else if (ikey_.type == kTypeMerge) { |
548 | if (!merge_helper_->HasOperator()) { | |
549 | status_ = Status::InvalidArgument( | |
550 | "merge_operator is not properly initialized."); | |
551 | return; | |
552 | } | |
553 | ||
554 | pinned_iters_mgr_.StartPinning(); | |
555 | // We know the merge type entry is not hidden, otherwise we would | |
556 | // have hit (A) | |
557 | // We encapsulate the merge related state machine in a different | |
558 | // object to minimize change to the existing flow. | |
559 | Status s = merge_helper_->MergeUntil(input_, range_del_agg_, | |
560 | prev_snapshot, bottommost_level_); | |
561 | merge_out_iter_.SeekToFirst(); | |
562 | ||
563 | if (!s.ok() && !s.IsMergeInProgress()) { | |
564 | status_ = s; | |
565 | return; | |
566 | } else if (merge_out_iter_.Valid()) { | |
567 | // NOTE: key, value, and ikey_ refer to old entries. | |
568 | // These will be correctly set below. | |
569 | key_ = merge_out_iter_.key(); | |
570 | value_ = merge_out_iter_.value(); | |
11fdf7f2 TL |
571 | bool valid_key __attribute__((__unused__)); |
572 | valid_key = ParseInternalKey(key_, &ikey_); | |
7c673cae FG |
573 | // MergeUntil stops when it encounters a corrupt key and does not |
574 | // include them in the result, so we expect the keys here to valid. | |
575 | assert(valid_key); | |
576 | // Keep current_key_ in sync. | |
577 | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); | |
578 | key_ = current_key_.GetInternalKey(); | |
579 | ikey_.user_key = current_key_.GetUserKey(); | |
580 | valid_ = true; | |
581 | } else { | |
582 | // all merge operands were filtered out. reset the user key, since the | |
583 | // batch consumed by the merge operator should not shadow any keys | |
584 | // coming after the merges | |
585 | has_current_user_key_ = false; | |
586 | pinned_iters_mgr_.ReleasePinnedData(); | |
587 | ||
588 | if (merge_helper_->FilteredUntil(&skip_until)) { | |
589 | need_skip = true; | |
590 | } | |
591 | } | |
592 | } else { | |
593 | // 1. new user key -OR- | |
594 | // 2. different snapshot stripe | |
595 | bool should_delete = range_del_agg_->ShouldDelete( | |
11fdf7f2 | 596 | key_, RangeDelPositioningMode::kForwardTraversal); |
7c673cae FG |
597 | if (should_delete) { |
598 | ++iter_stats_.num_record_drop_hidden; | |
599 | ++iter_stats_.num_record_drop_range_del; | |
600 | input_->Next(); | |
601 | } else { | |
602 | valid_ = true; | |
603 | } | |
604 | } | |
605 | ||
606 | if (need_skip) { | |
607 | input_->Seek(skip_until); | |
608 | } | |
609 | } | |
610 | ||
611 | if (!valid_ && IsShuttingDown()) { | |
612 | status_ = Status::ShutdownInProgress(); | |
613 | } | |
614 | } | |
615 | ||
616 | void CompactionIterator::PrepareOutput() { | |
617 | // Zeroing out the sequence number leads to better compression. | |
618 | // If this is the bottommost level (no files in lower levels) | |
619 | // and the earliest snapshot is larger than this seqno | |
620 | // and the userkey differs from the last userkey in compaction | |
621 | // then we can squash the seqno to zero. | |
11fdf7f2 | 622 | // |
7c673cae FG |
623 | // This is safe for TransactionDB write-conflict checking since transactions |
624 | // only care about sequence number larger than any active snapshots. | |
11fdf7f2 TL |
625 | // |
626 | // Can we do the same for levels above bottom level as long as | |
627 | // KeyNotExistsBeyondOutputLevel() return true? | |
494da23a TL |
628 | if ((compaction_ != nullptr && !compaction_->allow_ingest_behind()) && |
629 | ikeyNotNeededForIncrementalSnapshot() && bottommost_level_ && valid_ && | |
630 | IN_EARLIEST_SNAPSHOT(ikey_.sequence) && ikey_.type != kTypeMerge) { | |
7c673cae FG |
631 | assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion); |
632 | ikey_.sequence = 0; | |
633 | current_key_.UpdateInternalKey(0, ikey_.type); | |
634 | } | |
635 | } | |
636 | ||
637 | inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot( | |
638 | SequenceNumber in, SequenceNumber* prev_snapshot) { | |
639 | assert(snapshots_->size()); | |
11fdf7f2 TL |
640 | auto snapshots_iter = std::lower_bound( |
641 | snapshots_->begin(), snapshots_->end(), in); | |
642 | if (snapshots_iter == snapshots_->begin()) { | |
643 | *prev_snapshot = 0; | |
644 | } else { | |
645 | *prev_snapshot = *std::prev(snapshots_iter); | |
646 | assert(*prev_snapshot < in); | |
647 | } | |
494da23a TL |
648 | if (snapshot_checker_ == nullptr) { |
649 | return snapshots_iter != snapshots_->end() | |
650 | ? *snapshots_iter : kMaxSequenceNumber; | |
651 | } | |
652 | bool has_released_snapshot = !released_snapshots_.empty(); | |
11fdf7f2 TL |
653 | for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) { |
654 | auto cur = *snapshots_iter; | |
655 | assert(in <= cur); | |
494da23a TL |
656 | // Skip if cur is in released_snapshots. |
657 | if (has_released_snapshot && released_snapshots_.count(cur) > 0) { | |
658 | continue; | |
659 | } | |
660 | auto res = snapshot_checker_->CheckInSnapshot(in, cur); | |
661 | if (res == SnapshotCheckerResult::kInSnapshot) { | |
7c673cae | 662 | return cur; |
494da23a TL |
663 | } else if (res == SnapshotCheckerResult::kSnapshotReleased) { |
664 | released_snapshots_.insert(cur); | |
7c673cae | 665 | } |
11fdf7f2 | 666 | *prev_snapshot = cur; |
7c673cae | 667 | } |
7c673cae FG |
668 | return kMaxSequenceNumber; |
669 | } | |
670 | ||
11fdf7f2 TL |
671 | // used in 2 places - prevents deletion markers to be dropped if they may be |
672 | // needed and disables seqnum zero-out in PrepareOutput for recent keys. | |
673 | inline bool CompactionIterator::ikeyNotNeededForIncrementalSnapshot() { | |
674 | return (!compaction_->preserve_deletes()) || | |
675 | (ikey_.sequence < preserve_deletes_seqnum_); | |
676 | } | |
677 | ||
494da23a TL |
678 | bool CompactionIterator::IsInEarliestSnapshot(SequenceNumber sequence) { |
679 | assert(snapshot_checker_ != nullptr); | |
680 | assert(earliest_snapshot_ == kMaxSequenceNumber || | |
681 | (earliest_snapshot_iter_ != snapshots_->end() && | |
682 | *earliest_snapshot_iter_ == earliest_snapshot_)); | |
683 | auto in_snapshot = | |
684 | snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_); | |
685 | while (UNLIKELY(in_snapshot == SnapshotCheckerResult::kSnapshotReleased)) { | |
686 | // Avoid the the current earliest_snapshot_ being return as | |
687 | // earliest visible snapshot for the next value. So if a value's sequence | |
688 | // is zero-ed out by PrepareOutput(), the next value will be compact out. | |
689 | released_snapshots_.insert(earliest_snapshot_); | |
690 | earliest_snapshot_iter_++; | |
691 | ||
692 | if (earliest_snapshot_iter_ == snapshots_->end()) { | |
693 | earliest_snapshot_ = kMaxSequenceNumber; | |
694 | } else { | |
695 | earliest_snapshot_ = *earliest_snapshot_iter_; | |
696 | } | |
697 | in_snapshot = | |
698 | snapshot_checker_->CheckInSnapshot(sequence, earliest_snapshot_); | |
699 | } | |
700 | assert(in_snapshot != SnapshotCheckerResult::kSnapshotReleased); | |
701 | return in_snapshot == SnapshotCheckerResult::kInSnapshot; | |
702 | } | |
703 | ||
7c673cae | 704 | } // namespace rocksdb |