]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "db/db_iter.h" | |
7c673cae | 11 | #include <string> |
11fdf7f2 | 12 | #include <iostream> |
7c673cae FG |
13 | #include <limits> |
14 | ||
15 | #include "db/dbformat.h" | |
16 | #include "db/merge_context.h" | |
17 | #include "db/merge_helper.h" | |
18 | #include "db/pinned_iterators_manager.h" | |
19 | #include "monitoring/perf_context_imp.h" | |
7c673cae FG |
20 | #include "rocksdb/env.h" |
21 | #include "rocksdb/iterator.h" | |
22 | #include "rocksdb/merge_operator.h" | |
23 | #include "rocksdb/options.h" | |
24 | #include "table/internal_iterator.h" | |
25 | #include "util/arena.h" | |
26 | #include "util/filename.h" | |
27 | #include "util/logging.h" | |
28 | #include "util/mutexlock.h" | |
29 | #include "util/string_util.h" | |
11fdf7f2 | 30 | #include "util/trace_replay.h" |
7c673cae FG |
31 | |
32 | namespace rocksdb { | |
33 | ||
34 | #if 0 | |
35 | static void DumpInternalIter(Iterator* iter) { | |
36 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
37 | ParsedInternalKey k; | |
38 | if (!ParseInternalKey(iter->key(), &k)) { | |
39 | fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str()); | |
40 | } else { | |
41 | fprintf(stderr, "@ '%s'\n", k.DebugString().c_str()); | |
42 | } | |
43 | } | |
44 | } | |
45 | #endif | |
46 | ||
47 | // Memtables and sstables that make the DB representation contain | |
48 | // (userkey,seq,type) => uservalue entries. DBIter | |
49 | // combines multiple entries for the same userkey found in the DB | |
50 | // representation into a single entry while accounting for sequence | |
51 | // numbers, deletion markers, overwrites, etc. | |
11fdf7f2 | 52 | class DBIter final: public Iterator { |
7c673cae FG |
53 | public: |
54 | // The following is grossly complicated. TODO: clean it up | |
55 | // Which direction is the iterator currently moving? | |
11fdf7f2 TL |
56 | // (1) When moving forward: |
57 | // (1a) if current_entry_is_merged_ = false, the internal iterator is | |
58 | // positioned at the exact entry that yields this->key(), this->value() | |
59 | // (1b) if current_entry_is_merged_ = true, the internal iterator is | |
60 | // positioned immediately after the last entry that contributed to the | |
61 | // current this->value(). That entry may or may not have key equal to | |
62 | // this->key(). | |
7c673cae FG |
63 | // (2) When moving backwards, the internal iterator is positioned |
64 | // just before all entries whose user key == this->key(). | |
65 | enum Direction { | |
66 | kForward, | |
67 | kReverse | |
68 | }; | |
69 | ||
70 | // LocalStatistics contain Statistics counters that will be aggregated per | |
71 | // each iterator instance and then will be sent to the global statistics when | |
72 | // the iterator is destroyed. | |
73 | // | |
74 | // The purpose of this approach is to avoid perf regression happening | |
75 | // when multiple threads bump the atomic counters from a DBIter::Next(). | |
76 | struct LocalStatistics { | |
77 | explicit LocalStatistics() { ResetCounters(); } | |
78 | ||
79 | void ResetCounters() { | |
80 | next_count_ = 0; | |
81 | next_found_count_ = 0; | |
82 | prev_count_ = 0; | |
83 | prev_found_count_ = 0; | |
84 | bytes_read_ = 0; | |
11fdf7f2 | 85 | skip_count_ = 0; |
7c673cae FG |
86 | } |
87 | ||
88 | void BumpGlobalStatistics(Statistics* global_statistics) { | |
89 | RecordTick(global_statistics, NUMBER_DB_NEXT, next_count_); | |
90 | RecordTick(global_statistics, NUMBER_DB_NEXT_FOUND, next_found_count_); | |
91 | RecordTick(global_statistics, NUMBER_DB_PREV, prev_count_); | |
92 | RecordTick(global_statistics, NUMBER_DB_PREV_FOUND, prev_found_count_); | |
93 | RecordTick(global_statistics, ITER_BYTES_READ, bytes_read_); | |
11fdf7f2 TL |
94 | RecordTick(global_statistics, NUMBER_ITER_SKIP, skip_count_); |
95 | PERF_COUNTER_ADD(iter_read_bytes, bytes_read_); | |
7c673cae FG |
96 | ResetCounters(); |
97 | } | |
98 | ||
99 | // Map to Tickers::NUMBER_DB_NEXT | |
100 | uint64_t next_count_; | |
101 | // Map to Tickers::NUMBER_DB_NEXT_FOUND | |
102 | uint64_t next_found_count_; | |
103 | // Map to Tickers::NUMBER_DB_PREV | |
104 | uint64_t prev_count_; | |
105 | // Map to Tickers::NUMBER_DB_PREV_FOUND | |
106 | uint64_t prev_found_count_; | |
107 | // Map to Tickers::ITER_BYTES_READ | |
108 | uint64_t bytes_read_; | |
11fdf7f2 TL |
109 | // Map to Tickers::NUMBER_ITER_SKIP |
110 | uint64_t skip_count_; | |
7c673cae FG |
111 | }; |
112 | ||
11fdf7f2 TL |
113 | DBIter(Env* _env, const ReadOptions& read_options, |
114 | const ImmutableCFOptions& cf_options, | |
115 | const MutableCFOptions& mutable_cf_options, const Comparator* cmp, | |
7c673cae | 116 | InternalIterator* iter, SequenceNumber s, bool arena_mode, |
11fdf7f2 TL |
117 | uint64_t max_sequential_skip_in_iterations, |
118 | ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, | |
119 | bool allow_blob) | |
7c673cae | 120 | : arena_mode_(arena_mode), |
11fdf7f2 | 121 | env_(_env), |
7c673cae FG |
122 | logger_(cf_options.info_log), |
123 | user_comparator_(cmp), | |
124 | merge_operator_(cf_options.merge_operator), | |
125 | iter_(iter), | |
126 | sequence_(s), | |
127 | direction_(kForward), | |
128 | valid_(false), | |
129 | current_entry_is_merged_(false), | |
130 | statistics_(cf_options.statistics), | |
11fdf7f2 TL |
131 | num_internal_keys_skipped_(0), |
132 | iterate_lower_bound_(read_options.iterate_lower_bound), | |
7c673cae FG |
133 | iterate_upper_bound_(read_options.iterate_upper_bound), |
134 | prefix_same_as_start_(read_options.prefix_same_as_start), | |
135 | pin_thru_lifetime_(read_options.pin_data), | |
136 | total_order_seek_(read_options.total_order_seek), | |
137 | range_del_agg_(cf_options.internal_comparator, s, | |
11fdf7f2 TL |
138 | true /* collapse_deletions */), |
139 | read_callback_(read_callback), | |
140 | db_impl_(db_impl), | |
141 | cfd_(cfd), | |
142 | allow_blob_(allow_blob), | |
143 | is_blob_(false), | |
144 | start_seqnum_(read_options.iter_start_seqnum) { | |
7c673cae | 145 | RecordTick(statistics_, NO_ITERATORS); |
11fdf7f2 | 146 | prefix_extractor_ = mutable_cf_options.prefix_extractor.get(); |
7c673cae FG |
147 | max_skip_ = max_sequential_skip_in_iterations; |
148 | max_skippable_internal_keys_ = read_options.max_skippable_internal_keys; | |
149 | if (pin_thru_lifetime_) { | |
150 | pinned_iters_mgr_.StartPinning(); | |
151 | } | |
152 | if (iter_) { | |
153 | iter_->SetPinnedItersMgr(&pinned_iters_mgr_); | |
154 | } | |
155 | } | |
156 | virtual ~DBIter() { | |
157 | // Release pinned data if any | |
158 | if (pinned_iters_mgr_.PinningEnabled()) { | |
159 | pinned_iters_mgr_.ReleasePinnedData(); | |
160 | } | |
11fdf7f2 TL |
161 | // Compiler warning issue filed: |
162 | // https://github.com/facebook/rocksdb/issues/3013 | |
163 | RecordTick(statistics_, NO_ITERATORS, uint64_t(-1)); | |
164 | ResetInternalKeysSkippedCounter(); | |
7c673cae FG |
165 | local_stats_.BumpGlobalStatistics(statistics_); |
166 | if (!arena_mode_) { | |
167 | delete iter_; | |
168 | } else { | |
169 | iter_->~InternalIterator(); | |
170 | } | |
171 | } | |
172 | virtual void SetIter(InternalIterator* iter) { | |
173 | assert(iter_ == nullptr); | |
174 | iter_ = iter; | |
175 | iter_->SetPinnedItersMgr(&pinned_iters_mgr_); | |
176 | } | |
177 | virtual RangeDelAggregator* GetRangeDelAggregator() { | |
178 | return &range_del_agg_; | |
179 | } | |
180 | ||
181 | virtual bool Valid() const override { return valid_; } | |
182 | virtual Slice key() const override { | |
183 | assert(valid_); | |
11fdf7f2 TL |
184 | if(start_seqnum_ > 0) { |
185 | return saved_key_.GetInternalKey(); | |
186 | } else { | |
187 | return saved_key_.GetUserKey(); | |
188 | } | |
189 | ||
7c673cae FG |
190 | } |
191 | virtual Slice value() const override { | |
192 | assert(valid_); | |
193 | if (current_entry_is_merged_) { | |
194 | // If pinned_value_ is set then the result of merge operator is one of | |
195 | // the merge operands and we should return it. | |
196 | return pinned_value_.data() ? pinned_value_ : saved_value_; | |
197 | } else if (direction_ == kReverse) { | |
198 | return pinned_value_; | |
199 | } else { | |
200 | return iter_->value(); | |
201 | } | |
202 | } | |
203 | virtual Status status() const override { | |
204 | if (status_.ok()) { | |
205 | return iter_->status(); | |
206 | } else { | |
11fdf7f2 | 207 | assert(!valid_); |
7c673cae FG |
208 | return status_; |
209 | } | |
210 | } | |
11fdf7f2 TL |
211 | bool IsBlob() const { |
212 | assert(valid_ && (allow_blob_ || !is_blob_)); | |
213 | return is_blob_; | |
214 | } | |
7c673cae FG |
215 | |
216 | virtual Status GetProperty(std::string prop_name, | |
217 | std::string* prop) override { | |
218 | if (prop == nullptr) { | |
219 | return Status::InvalidArgument("prop is nullptr"); | |
220 | } | |
221 | if (prop_name == "rocksdb.iterator.super-version-number") { | |
222 | // First try to pass the value returned from inner iterator. | |
11fdf7f2 | 223 | return iter_->GetProperty(prop_name, prop); |
7c673cae FG |
224 | } else if (prop_name == "rocksdb.iterator.is-key-pinned") { |
225 | if (valid_) { | |
226 | *prop = (pin_thru_lifetime_ && saved_key_.IsKeyPinned()) ? "1" : "0"; | |
227 | } else { | |
228 | *prop = "Iterator is not valid."; | |
229 | } | |
230 | return Status::OK(); | |
11fdf7f2 TL |
231 | } else if (prop_name == "rocksdb.iterator.internal-key") { |
232 | *prop = saved_key_.GetUserKey().ToString(); | |
233 | return Status::OK(); | |
7c673cae FG |
234 | } |
235 | return Status::InvalidArgument("Undentified property."); | |
236 | } | |
237 | ||
238 | virtual void Next() override; | |
239 | virtual void Prev() override; | |
240 | virtual void Seek(const Slice& target) override; | |
241 | virtual void SeekForPrev(const Slice& target) override; | |
242 | virtual void SeekToFirst() override; | |
243 | virtual void SeekToLast() override; | |
11fdf7f2 TL |
244 | Env* env() { return env_; } |
245 | void set_sequence(uint64_t s) { sequence_ = s; } | |
246 | void set_valid(bool v) { valid_ = v; } | |
7c673cae FG |
247 | |
248 | private: | |
11fdf7f2 TL |
249 | // For all methods in this block: |
250 | // PRE: iter_->Valid() && status_.ok() | |
251 | // Return false if there was an error, and status() is non-ok, valid_ = false; | |
252 | // in this case callers would usually stop what they were doing and return. | |
253 | bool ReverseToForward(); | |
254 | bool ReverseToBackward(); | |
7c673cae FG |
255 | bool FindValueForCurrentKey(); |
256 | bool FindValueForCurrentKeyUsingSeek(); | |
11fdf7f2 TL |
257 | bool FindUserKeyBeforeSavedKey(); |
258 | inline bool FindNextUserEntry(bool skipping, bool prefix_check); | |
259 | bool FindNextUserEntryInternal(bool skipping, bool prefix_check); | |
7c673cae | 260 | bool ParseKey(ParsedInternalKey* key); |
11fdf7f2 TL |
261 | bool MergeValuesNewToOld(); |
262 | ||
263 | void PrevInternal(); | |
7c673cae | 264 | bool TooManyInternalKeysSkipped(bool increment = true); |
11fdf7f2 TL |
265 | bool IsVisible(SequenceNumber sequence); |
266 | ||
267 | // CanReseekToSkip() returns whether the iterator can use the optimization | |
268 | // where it reseek by sequence number to get the next key when there are too | |
269 | // many versions. This is disabled for write unprepared because seeking to | |
270 | // sequence number does not guarantee that it is visible. | |
271 | inline bool CanReseekToSkip(); | |
272 | ||
273 | // MaxVisibleSequenceNumber() returns the maximum visible sequence number | |
274 | // for this snapshot. This sequence number may be greater than snapshot | |
275 | // seqno because uncommitted data written to DB for write unprepared will | |
276 | // have a higher sequence number. | |
277 | inline SequenceNumber MaxVisibleSequenceNumber(); | |
7c673cae FG |
278 | |
279 | // Temporarily pin the blocks that we encounter until ReleaseTempPinnedData() | |
280 | // is called | |
281 | void TempPinData() { | |
282 | if (!pin_thru_lifetime_) { | |
283 | pinned_iters_mgr_.StartPinning(); | |
284 | } | |
285 | } | |
286 | ||
287 | // Release blocks pinned by TempPinData() | |
288 | void ReleaseTempPinnedData() { | |
289 | if (!pin_thru_lifetime_ && pinned_iters_mgr_.PinningEnabled()) { | |
290 | pinned_iters_mgr_.ReleasePinnedData(); | |
291 | } | |
292 | } | |
293 | ||
294 | inline void ClearSavedValue() { | |
295 | if (saved_value_.capacity() > 1048576) { | |
296 | std::string empty; | |
297 | swap(empty, saved_value_); | |
298 | } else { | |
299 | saved_value_.clear(); | |
300 | } | |
301 | } | |
302 | ||
303 | inline void ResetInternalKeysSkippedCounter() { | |
11fdf7f2 TL |
304 | local_stats_.skip_count_ += num_internal_keys_skipped_; |
305 | if (valid_) { | |
306 | local_stats_.skip_count_--; | |
307 | } | |
7c673cae FG |
308 | num_internal_keys_skipped_ = 0; |
309 | } | |
310 | ||
311 | const SliceTransform* prefix_extractor_; | |
312 | bool arena_mode_; | |
313 | Env* const env_; | |
314 | Logger* logger_; | |
315 | const Comparator* const user_comparator_; | |
316 | const MergeOperator* const merge_operator_; | |
317 | InternalIterator* iter_; | |
11fdf7f2 | 318 | SequenceNumber sequence_; |
7c673cae FG |
319 | |
320 | Status status_; | |
321 | IterKey saved_key_; | |
11fdf7f2 TL |
322 | // Reusable internal key data structure. This is only used inside one function |
323 | // and should not be used across functions. Reusing this object can reduce | |
324 | // overhead of calling construction of the function if creating it each time. | |
325 | ParsedInternalKey ikey_; | |
7c673cae FG |
326 | std::string saved_value_; |
327 | Slice pinned_value_; | |
328 | Direction direction_; | |
329 | bool valid_; | |
330 | bool current_entry_is_merged_; | |
331 | // for prefix seek mode to support prev() | |
332 | Statistics* statistics_; | |
333 | uint64_t max_skip_; | |
334 | uint64_t max_skippable_internal_keys_; | |
335 | uint64_t num_internal_keys_skipped_; | |
11fdf7f2 | 336 | const Slice* iterate_lower_bound_; |
7c673cae FG |
337 | const Slice* iterate_upper_bound_; |
338 | IterKey prefix_start_buf_; | |
339 | Slice prefix_start_key_; | |
340 | const bool prefix_same_as_start_; | |
341 | // Means that we will pin all data blocks we read as long the Iterator | |
342 | // is not deleted, will be true if ReadOptions::pin_data is true | |
343 | const bool pin_thru_lifetime_; | |
344 | const bool total_order_seek_; | |
345 | // List of operands for merge operator. | |
346 | MergeContext merge_context_; | |
347 | RangeDelAggregator range_del_agg_; | |
348 | LocalStatistics local_stats_; | |
349 | PinnedIteratorsManager pinned_iters_mgr_; | |
11fdf7f2 TL |
350 | ReadCallback* read_callback_; |
351 | DBImpl* db_impl_; | |
352 | ColumnFamilyData* cfd_; | |
353 | bool allow_blob_; | |
354 | bool is_blob_; | |
355 | // for diff snapshots we want the lower bound on the seqnum; | |
356 | // if this value > 0 iterator will return internal keys | |
357 | SequenceNumber start_seqnum_; | |
7c673cae FG |
358 | |
359 | // No copying allowed | |
360 | DBIter(const DBIter&); | |
361 | void operator=(const DBIter&); | |
362 | }; | |
363 | ||
364 | inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { | |
365 | if (!ParseInternalKey(iter_->key(), ikey)) { | |
366 | status_ = Status::Corruption("corrupted internal key in DBIter"); | |
11fdf7f2 | 367 | valid_ = false; |
7c673cae FG |
368 | ROCKS_LOG_ERROR(logger_, "corrupted internal key in DBIter: %s", |
369 | iter_->key().ToString(true).c_str()); | |
370 | return false; | |
371 | } else { | |
372 | return true; | |
373 | } | |
374 | } | |
375 | ||
376 | void DBIter::Next() { | |
377 | assert(valid_); | |
11fdf7f2 | 378 | assert(status_.ok()); |
7c673cae FG |
379 | |
380 | // Release temporarily pinned blocks from last operation | |
381 | ReleaseTempPinnedData(); | |
382 | ResetInternalKeysSkippedCounter(); | |
11fdf7f2 | 383 | bool ok = true; |
7c673cae | 384 | if (direction_ == kReverse) { |
11fdf7f2 TL |
385 | if (!ReverseToForward()) { |
386 | ok = false; | |
387 | } | |
7c673cae FG |
388 | } else if (iter_->Valid() && !current_entry_is_merged_) { |
389 | // If the current value is not a merge, the iter position is the | |
390 | // current key, which is already returned. We can safely issue a | |
391 | // Next() without checking the current key. | |
392 | // If the current key is a merge, very likely iter already points | |
393 | // to the next internal position. | |
394 | iter_->Next(); | |
395 | PERF_COUNTER_ADD(internal_key_skipped_count, 1); | |
396 | } | |
397 | ||
398 | if (statistics_ != nullptr) { | |
399 | local_stats_.next_count_++; | |
400 | } | |
11fdf7f2 TL |
401 | if (ok && iter_->Valid()) { |
402 | FindNextUserEntry(true /* skipping the current user key */, | |
403 | prefix_same_as_start_); | |
404 | } else { | |
7c673cae | 405 | valid_ = false; |
7c673cae | 406 | } |
7c673cae FG |
407 | if (statistics_ != nullptr && valid_) { |
408 | local_stats_.next_found_count_++; | |
409 | local_stats_.bytes_read_ += (key().size() + value().size()); | |
410 | } | |
411 | } | |
412 | ||
413 | // PRE: saved_key_ has the current user key if skipping | |
414 | // POST: saved_key_ should have the next user key if valid_, | |
415 | // if the current entry is a result of merge | |
416 | // current_entry_is_merged_ => true | |
417 | // saved_value_ => the merged value | |
418 | // | |
419 | // NOTE: In between, saved_key_ can point to a user key that has | |
420 | // a delete marker or a sequence number higher than sequence_ | |
421 | // saved_key_ MUST have a proper user_key before calling this function | |
422 | // | |
423 | // The prefix_check parameter controls whether we check the iterated | |
424 | // keys against the prefix of the seeked key. Set to false when | |
425 | // performing a seek without a key (e.g. SeekToFirst). Set to | |
426 | // prefix_same_as_start_ for other iterations. | |
11fdf7f2 | 427 | inline bool DBIter::FindNextUserEntry(bool skipping, bool prefix_check) { |
7c673cae | 428 | PERF_TIMER_GUARD(find_next_user_entry_time); |
11fdf7f2 | 429 | return FindNextUserEntryInternal(skipping, prefix_check); |
7c673cae FG |
430 | } |
431 | ||
432 | // Actual implementation of DBIter::FindNextUserEntry() | |
11fdf7f2 | 433 | bool DBIter::FindNextUserEntryInternal(bool skipping, bool prefix_check) { |
7c673cae FG |
434 | // Loop until we hit an acceptable entry to yield |
435 | assert(iter_->Valid()); | |
11fdf7f2 | 436 | assert(status_.ok()); |
7c673cae FG |
437 | assert(direction_ == kForward); |
438 | current_entry_is_merged_ = false; | |
439 | ||
440 | // How many times in a row we have skipped an entry with user key less than | |
441 | // or equal to saved_key_. We could skip these entries either because | |
442 | // sequence numbers were too high or because skipping = true. | |
443 | // What saved_key_ contains throughout this method: | |
444 | // - if skipping : saved_key_ contains the key that we need to skip, | |
445 | // and we haven't seen any keys greater than that, | |
446 | // - if num_skipped > 0 : saved_key_ contains the key that we have skipped | |
447 | // num_skipped times, and we haven't seen any keys | |
448 | // greater than that, | |
449 | // - none of the above : saved_key_ can contain anything, it doesn't matter. | |
450 | uint64_t num_skipped = 0; | |
451 | ||
11fdf7f2 | 452 | is_blob_ = false; |
7c673cae | 453 | |
11fdf7f2 TL |
454 | do { |
455 | if (!ParseKey(&ikey_)) { | |
456 | return false; | |
7c673cae FG |
457 | } |
458 | ||
459 | if (iterate_upper_bound_ != nullptr && | |
11fdf7f2 | 460 | user_comparator_->Compare(ikey_.user_key, *iterate_upper_bound_) >= 0) { |
7c673cae FG |
461 | break; |
462 | } | |
463 | ||
464 | if (prefix_extractor_ && prefix_check && | |
11fdf7f2 TL |
465 | prefix_extractor_->Transform(ikey_.user_key) |
466 | .compare(prefix_start_key_) != 0) { | |
7c673cae FG |
467 | break; |
468 | } | |
469 | ||
470 | if (TooManyInternalKeysSkipped()) { | |
11fdf7f2 | 471 | return false; |
7c673cae FG |
472 | } |
473 | ||
11fdf7f2 TL |
474 | if (IsVisible(ikey_.sequence)) { |
475 | if (skipping && user_comparator_->Compare(ikey_.user_key, | |
476 | saved_key_.GetUserKey()) <= 0) { | |
7c673cae FG |
477 | num_skipped++; // skip this entry |
478 | PERF_COUNTER_ADD(internal_key_skipped_count, 1); | |
479 | } else { | |
480 | num_skipped = 0; | |
11fdf7f2 | 481 | switch (ikey_.type) { |
7c673cae FG |
482 | case kTypeDeletion: |
483 | case kTypeSingleDeletion: | |
484 | // Arrange to skip all upcoming entries for this key since | |
485 | // they are hidden by this deletion. | |
11fdf7f2 TL |
486 | // if iterartor specified start_seqnum we |
487 | // 1) return internal key, including the type | |
488 | // 2) return ikey only if ikey.seqnum >= start_seqnum_ | |
489 | // note that if deletion seqnum is < start_seqnum_ we | |
490 | // just skip it like in normal iterator. | |
491 | if (start_seqnum_ > 0 && ikey_.sequence >= start_seqnum_) { | |
492 | saved_key_.SetInternalKey(ikey_); | |
493 | valid_ = true; | |
494 | return true; | |
495 | } else { | |
496 | saved_key_.SetUserKey( | |
497 | ikey_.user_key, | |
498 | !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); | |
7c673cae | 499 | skipping = true; |
7c673cae | 500 | PERF_COUNTER_ADD(internal_delete_skipped_count, 1); |
11fdf7f2 TL |
501 | } |
502 | break; | |
503 | case kTypeValue: | |
504 | case kTypeBlobIndex: | |
505 | if (start_seqnum_ > 0) { | |
506 | // we are taking incremental snapshot here | |
507 | // incremental snapshots aren't supported on DB with range deletes | |
508 | assert(!( | |
509 | (ikey_.type == kTypeBlobIndex) && (start_seqnum_ > 0) | |
510 | )); | |
511 | if (ikey_.sequence >= start_seqnum_) { | |
512 | saved_key_.SetInternalKey(ikey_); | |
513 | valid_ = true; | |
514 | return true; | |
515 | } else { | |
516 | // this key and all previous versions shouldn't be included, | |
517 | // skipping | |
518 | saved_key_.SetUserKey(ikey_.user_key, | |
519 | !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); | |
520 | skipping = true; | |
521 | } | |
7c673cae | 522 | } else { |
11fdf7f2 TL |
523 | saved_key_.SetUserKey( |
524 | ikey_.user_key, | |
525 | !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); | |
526 | if (range_del_agg_.ShouldDelete( | |
527 | ikey_, RangeDelPositioningMode::kForwardTraversal)) { | |
528 | // Arrange to skip all upcoming entries for this key since | |
529 | // they are hidden by this deletion. | |
530 | skipping = true; | |
531 | num_skipped = 0; | |
532 | PERF_COUNTER_ADD(internal_delete_skipped_count, 1); | |
533 | } else if (ikey_.type == kTypeBlobIndex) { | |
534 | if (!allow_blob_) { | |
535 | ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); | |
536 | status_ = Status::NotSupported( | |
537 | "Encounter unexpected blob index. Please open DB with " | |
538 | "rocksdb::blob_db::BlobDB instead."); | |
539 | valid_ = false; | |
540 | return false; | |
541 | } | |
542 | ||
543 | is_blob_ = true; | |
544 | valid_ = true; | |
545 | return true; | |
546 | } else { | |
547 | valid_ = true; | |
548 | return true; | |
549 | } | |
7c673cae FG |
550 | } |
551 | break; | |
552 | case kTypeMerge: | |
553 | saved_key_.SetUserKey( | |
11fdf7f2 TL |
554 | ikey_.user_key, |
555 | !pin_thru_lifetime_ || !iter_->IsKeyPinned() /* copy */); | |
7c673cae | 556 | if (range_del_agg_.ShouldDelete( |
11fdf7f2 | 557 | ikey_, RangeDelPositioningMode::kForwardTraversal)) { |
7c673cae FG |
558 | // Arrange to skip all upcoming entries for this key since |
559 | // they are hidden by this deletion. | |
560 | skipping = true; | |
561 | num_skipped = 0; | |
562 | PERF_COUNTER_ADD(internal_delete_skipped_count, 1); | |
563 | } else { | |
564 | // By now, we are sure the current ikey is going to yield a | |
565 | // value | |
566 | current_entry_is_merged_ = true; | |
567 | valid_ = true; | |
11fdf7f2 | 568 | return MergeValuesNewToOld(); // Go to a different state machine |
7c673cae FG |
569 | } |
570 | break; | |
571 | default: | |
572 | assert(false); | |
573 | break; | |
574 | } | |
575 | } | |
576 | } else { | |
7c673cae FG |
577 | PERF_COUNTER_ADD(internal_recent_skipped_count, 1); |
578 | ||
11fdf7f2 TL |
579 | // This key was inserted after our snapshot was taken. |
580 | // If this happens too many times in a row for the same user key, we want | |
581 | // to seek to the target sequence number. | |
582 | int cmp = | |
583 | user_comparator_->Compare(ikey_.user_key, saved_key_.GetUserKey()); | |
584 | if (cmp == 0 || (skipping && cmp <= 0)) { | |
7c673cae FG |
585 | num_skipped++; |
586 | } else { | |
587 | saved_key_.SetUserKey( | |
11fdf7f2 | 588 | ikey_.user_key, |
7c673cae FG |
589 | !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); |
590 | skipping = false; | |
591 | num_skipped = 0; | |
592 | } | |
593 | } | |
594 | ||
595 | // If we have sequentially iterated via numerous equal keys, then it's | |
596 | // better to seek so that we can avoid too many key comparisons. | |
11fdf7f2 | 597 | if (num_skipped > max_skip_ && CanReseekToSkip()) { |
7c673cae FG |
598 | num_skipped = 0; |
599 | std::string last_key; | |
600 | if (skipping) { | |
601 | // We're looking for the next user-key but all we see are the same | |
602 | // user-key with decreasing sequence numbers. Fast forward to | |
603 | // sequence number 0 and type deletion (the smallest type). | |
604 | AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(), | |
605 | 0, kTypeDeletion)); | |
606 | // Don't set skipping = false because we may still see more user-keys | |
607 | // equal to saved_key_. | |
608 | } else { | |
609 | // We saw multiple entries with this user key and sequence numbers | |
610 | // higher than sequence_. Fast forward to sequence_. | |
611 | // Note that this only covers a case when a higher key was overwritten | |
612 | // many times since our snapshot was taken, not the case when a lot of | |
613 | // different keys were inserted after our snapshot was taken. | |
614 | AppendInternalKey(&last_key, | |
615 | ParsedInternalKey(saved_key_.GetUserKey(), sequence_, | |
616 | kValueTypeForSeek)); | |
617 | } | |
618 | iter_->Seek(last_key); | |
619 | RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); | |
620 | } else { | |
621 | iter_->Next(); | |
622 | } | |
623 | } while (iter_->Valid()); | |
11fdf7f2 | 624 | |
7c673cae | 625 | valid_ = false; |
11fdf7f2 | 626 | return iter_->status().ok(); |
7c673cae FG |
627 | } |
628 | ||
629 | // Merge values of the same user key starting from the current iter_ position | |
630 | // Scan from the newer entries to older entries. | |
631 | // PRE: iter_->key() points to the first merge type entry | |
632 | // saved_key_ stores the user key | |
633 | // POST: saved_value_ has the merged value for the user key | |
634 | // iter_ points to the next entry (or invalid) | |
11fdf7f2 | 635 | bool DBIter::MergeValuesNewToOld() { |
7c673cae FG |
636 | if (!merge_operator_) { |
637 | ROCKS_LOG_ERROR(logger_, "Options::merge_operator is null."); | |
638 | status_ = Status::InvalidArgument("merge_operator_ must be set."); | |
639 | valid_ = false; | |
11fdf7f2 | 640 | return false; |
7c673cae FG |
641 | } |
642 | ||
643 | // Temporarily pin the blocks that hold merge operands | |
644 | TempPinData(); | |
645 | merge_context_.Clear(); | |
646 | // Start the merge process by pushing the first operand | |
647 | merge_context_.PushOperand(iter_->value(), | |
648 | iter_->IsValuePinned() /* operand_pinned */); | |
11fdf7f2 | 649 | TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand"); |
7c673cae FG |
650 | |
651 | ParsedInternalKey ikey; | |
652 | Status s; | |
653 | for (iter_->Next(); iter_->Valid(); iter_->Next()) { | |
11fdf7f2 | 654 | TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand"); |
7c673cae | 655 | if (!ParseKey(&ikey)) { |
11fdf7f2 | 656 | return false; |
7c673cae FG |
657 | } |
658 | ||
659 | if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { | |
660 | // hit the next user key, stop right here | |
661 | break; | |
662 | } else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type || | |
663 | range_del_agg_.ShouldDelete( | |
11fdf7f2 | 664 | ikey, RangeDelPositioningMode::kForwardTraversal)) { |
7c673cae FG |
665 | // hit a delete with the same user key, stop right here |
666 | // iter_ is positioned after delete | |
667 | iter_->Next(); | |
668 | break; | |
669 | } else if (kTypeValue == ikey.type) { | |
670 | // hit a put, merge the put value with operands and store the | |
671 | // final result in saved_value_. We are done! | |
7c673cae FG |
672 | const Slice val = iter_->value(); |
673 | s = MergeHelper::TimedFullMerge( | |
674 | merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(), | |
11fdf7f2 | 675 | &saved_value_, logger_, statistics_, env_, &pinned_value_, true); |
7c673cae | 676 | if (!s.ok()) { |
11fdf7f2 | 677 | valid_ = false; |
7c673cae | 678 | status_ = s; |
11fdf7f2 | 679 | return false; |
7c673cae FG |
680 | } |
681 | // iter_ is positioned after put | |
682 | iter_->Next(); | |
11fdf7f2 TL |
683 | if (!iter_->status().ok()) { |
684 | valid_ = false; | |
685 | return false; | |
686 | } | |
687 | return true; | |
7c673cae FG |
688 | } else if (kTypeMerge == ikey.type) { |
689 | // hit a merge, add the value as an operand and run associative merge. | |
690 | // when complete, add result to operands and continue. | |
691 | merge_context_.PushOperand(iter_->value(), | |
692 | iter_->IsValuePinned() /* operand_pinned */); | |
693 | PERF_COUNTER_ADD(internal_merge_count, 1); | |
11fdf7f2 TL |
694 | } else if (kTypeBlobIndex == ikey.type) { |
695 | if (!allow_blob_) { | |
696 | ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); | |
697 | status_ = Status::NotSupported( | |
698 | "Encounter unexpected blob index. Please open DB with " | |
699 | "rocksdb::blob_db::BlobDB instead."); | |
700 | } else { | |
701 | status_ = | |
702 | Status::NotSupported("Blob DB does not support merge operator."); | |
703 | } | |
704 | valid_ = false; | |
705 | return false; | |
7c673cae FG |
706 | } else { |
707 | assert(false); | |
708 | } | |
709 | } | |
710 | ||
11fdf7f2 TL |
711 | if (!iter_->status().ok()) { |
712 | valid_ = false; | |
713 | return false; | |
714 | } | |
715 | ||
7c673cae FG |
716 | // we either exhausted all internal keys under this user key, or hit |
717 | // a deletion marker. | |
718 | // feed null as the existing value to the merge operator, such that | |
719 | // client can differentiate this scenario and do things accordingly. | |
720 | s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(), | |
721 | nullptr, merge_context_.GetOperands(), | |
722 | &saved_value_, logger_, statistics_, env_, | |
11fdf7f2 | 723 | &pinned_value_, true); |
7c673cae | 724 | if (!s.ok()) { |
11fdf7f2 | 725 | valid_ = false; |
7c673cae | 726 | status_ = s; |
11fdf7f2 | 727 | return false; |
7c673cae | 728 | } |
11fdf7f2 TL |
729 | |
730 | assert(status_.ok()); | |
731 | return true; | |
7c673cae FG |
732 | } |
733 | ||
734 | void DBIter::Prev() { | |
735 | assert(valid_); | |
11fdf7f2 | 736 | assert(status_.ok()); |
7c673cae FG |
737 | ReleaseTempPinnedData(); |
738 | ResetInternalKeysSkippedCounter(); | |
11fdf7f2 | 739 | bool ok = true; |
7c673cae | 740 | if (direction_ == kForward) { |
11fdf7f2 TL |
741 | if (!ReverseToBackward()) { |
742 | ok = false; | |
743 | } | |
744 | } | |
745 | if (ok) { | |
746 | PrevInternal(); | |
7c673cae | 747 | } |
7c673cae FG |
748 | if (statistics_ != nullptr) { |
749 | local_stats_.prev_count_++; | |
750 | if (valid_) { | |
751 | local_stats_.prev_found_count_++; | |
752 | local_stats_.bytes_read_ += (key().size() + value().size()); | |
753 | } | |
754 | } | |
755 | } | |
756 | ||
11fdf7f2 TL |
757 | bool DBIter::ReverseToForward() { |
758 | assert(iter_->status().ok()); | |
759 | ||
760 | // When moving backwards, iter_ is positioned on _previous_ key, which may | |
761 | // not exist or may have different prefix than the current key(). | |
762 | // If that's the case, seek iter_ to current key. | |
763 | if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) { | |
7c673cae FG |
764 | IterKey last_key; |
765 | last_key.SetInternalKey(ParsedInternalKey( | |
766 | saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); | |
767 | iter_->Seek(last_key.GetInternalKey()); | |
768 | } | |
11fdf7f2 | 769 | |
7c673cae | 770 | direction_ = kForward; |
11fdf7f2 TL |
771 | // Skip keys less than the current key() (a.k.a. saved_key_). |
772 | while (iter_->Valid()) { | |
773 | ParsedInternalKey ikey; | |
774 | if (!ParseKey(&ikey)) { | |
775 | return false; | |
776 | } | |
777 | if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) >= | |
778 | 0) { | |
779 | return true; | |
780 | } | |
781 | iter_->Next(); | |
782 | } | |
783 | ||
784 | if (!iter_->status().ok()) { | |
785 | valid_ = false; | |
786 | return false; | |
7c673cae | 787 | } |
11fdf7f2 TL |
788 | |
789 | return true; | |
7c673cae FG |
790 | } |
791 | ||
11fdf7f2 TL |
792 | // Move iter_ to the key before saved_key_. |
793 | bool DBIter::ReverseToBackward() { | |
794 | assert(iter_->status().ok()); | |
795 | ||
796 | // When current_entry_is_merged_ is true, iter_ may be positioned on the next | |
797 | // key, which may not exist or may have prefix different from current. | |
798 | // If that's the case, seek to saved_key_. | |
799 | if (current_entry_is_merged_ && | |
800 | ((prefix_extractor_ != nullptr && !total_order_seek_) || | |
801 | !iter_->Valid())) { | |
7c673cae | 802 | IterKey last_key; |
11fdf7f2 TL |
803 | // Using kMaxSequenceNumber and kValueTypeForSeek |
804 | // (not kValueTypeForSeekForPrev) to seek to a key strictly smaller | |
805 | // than saved_key_. | |
806 | last_key.SetInternalKey(ParsedInternalKey( | |
807 | saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); | |
808 | if (prefix_extractor_ != nullptr && !total_order_seek_) { | |
809 | iter_->SeekForPrev(last_key.GetInternalKey()); | |
810 | } else { | |
811 | // Some iterators may not support SeekForPrev(), so we avoid using it | |
812 | // when prefix seek mode is disabled. This is somewhat expensive | |
813 | // (an extra Prev(), as well as an extra change of direction of iter_), | |
814 | // so we may need to reconsider it later. | |
815 | iter_->Seek(last_key.GetInternalKey()); | |
816 | if (!iter_->Valid() && iter_->status().ok()) { | |
817 | iter_->SeekToLast(); | |
7c673cae | 818 | } |
7c673cae FG |
819 | } |
820 | } | |
7c673cae | 821 | |
7c673cae | 822 | direction_ = kReverse; |
11fdf7f2 | 823 | return FindUserKeyBeforeSavedKey(); |
7c673cae FG |
824 | } |
825 | ||
826 | void DBIter::PrevInternal() { | |
7c673cae FG |
827 | while (iter_->Valid()) { |
828 | saved_key_.SetUserKey( | |
829 | ExtractUserKey(iter_->key()), | |
830 | !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); | |
831 | ||
11fdf7f2 TL |
832 | if (prefix_extractor_ && prefix_same_as_start_ && |
833 | prefix_extractor_->Transform(saved_key_.GetUserKey()) | |
834 | .compare(prefix_start_key_) != 0) { | |
835 | // Current key does not have the same prefix as start | |
836 | valid_ = false; | |
7c673cae FG |
837 | return; |
838 | } | |
839 | ||
11fdf7f2 TL |
840 | if (iterate_lower_bound_ != nullptr && |
841 | user_comparator_->Compare(saved_key_.GetUserKey(), | |
842 | *iterate_lower_bound_) < 0) { | |
843 | // We've iterated earlier than the user-specified lower bound. | |
844 | valid_ = false; | |
7c673cae FG |
845 | return; |
846 | } | |
847 | ||
11fdf7f2 TL |
848 | if (!FindValueForCurrentKey()) { // assigns valid_ |
849 | return; | |
850 | } | |
851 | ||
852 | // Whether or not we found a value for current key, we need iter_ to end up | |
853 | // on a smaller key. | |
854 | if (!FindUserKeyBeforeSavedKey()) { | |
855 | return; | |
7c673cae | 856 | } |
11fdf7f2 TL |
857 | |
858 | if (valid_) { | |
859 | // Found the value. | |
860 | return; | |
861 | } | |
862 | ||
863 | if (TooManyInternalKeysSkipped(false)) { | |
864 | return; | |
7c673cae FG |
865 | } |
866 | } | |
11fdf7f2 | 867 | |
7c673cae | 868 | // We haven't found any key - iterator is not valid |
7c673cae FG |
869 | valid_ = false; |
870 | } | |
871 | ||
11fdf7f2 TL |
872 | // Used for backwards iteration. |
873 | // Looks at the entries with user key saved_key_ and finds the most up-to-date | |
874 | // value for it, or executes a merge, or determines that the value was deleted. | |
875 | // Sets valid_ to true if the value is found and is ready to be presented to | |
876 | // the user through value(). | |
877 | // Sets valid_ to false if the value was deleted, and we should try another key. | |
878 | // Returns false if an error occurred, and !status().ok() and !valid_. | |
879 | // | |
880 | // PRE: iter_ is positioned on the last entry with user key equal to saved_key_. | |
881 | // POST: iter_ is positioned on one of the entries equal to saved_key_, or on | |
882 | // the entry just before them, or on the entry just after them. | |
7c673cae FG |
883 | bool DBIter::FindValueForCurrentKey() { |
884 | assert(iter_->Valid()); | |
885 | merge_context_.Clear(); | |
886 | current_entry_is_merged_ = false; | |
887 | // last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or | |
888 | // kTypeValue) | |
889 | ValueType last_not_merge_type = kTypeDeletion; | |
890 | ValueType last_key_entry_type = kTypeDeletion; | |
891 | ||
7c673cae FG |
892 | // Temporarily pin blocks that hold (merge operands / the value) |
893 | ReleaseTempPinnedData(); | |
894 | TempPinData(); | |
895 | size_t num_skipped = 0; | |
11fdf7f2 TL |
896 | while (iter_->Valid()) { |
897 | ParsedInternalKey ikey; | |
898 | if (!ParseKey(&ikey)) { | |
899 | return false; | |
900 | } | |
901 | ||
902 | if (!IsVisible(ikey.sequence) || | |
903 | !user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { | |
904 | break; | |
905 | } | |
7c673cae FG |
906 | if (TooManyInternalKeysSkipped()) { |
907 | return false; | |
908 | } | |
909 | ||
11fdf7f2 TL |
910 | // This user key has lots of entries. |
911 | // We're going from old to new, and it's taking too long. Let's do a Seek() | |
912 | // and go from new to old. This helps when a key was overwritten many times. | |
913 | if (num_skipped >= max_skip_ && CanReseekToSkip()) { | |
7c673cae FG |
914 | return FindValueForCurrentKeyUsingSeek(); |
915 | } | |
916 | ||
917 | last_key_entry_type = ikey.type; | |
918 | switch (last_key_entry_type) { | |
919 | case kTypeValue: | |
11fdf7f2 | 920 | case kTypeBlobIndex: |
7c673cae | 921 | if (range_del_agg_.ShouldDelete( |
11fdf7f2 | 922 | ikey, RangeDelPositioningMode::kBackwardTraversal)) { |
7c673cae FG |
923 | last_key_entry_type = kTypeRangeDeletion; |
924 | PERF_COUNTER_ADD(internal_delete_skipped_count, 1); | |
925 | } else { | |
926 | assert(iter_->IsValuePinned()); | |
927 | pinned_value_ = iter_->value(); | |
928 | } | |
929 | merge_context_.Clear(); | |
930 | last_not_merge_type = last_key_entry_type; | |
931 | break; | |
932 | case kTypeDeletion: | |
933 | case kTypeSingleDeletion: | |
934 | merge_context_.Clear(); | |
935 | last_not_merge_type = last_key_entry_type; | |
936 | PERF_COUNTER_ADD(internal_delete_skipped_count, 1); | |
937 | break; | |
938 | case kTypeMerge: | |
939 | if (range_del_agg_.ShouldDelete( | |
11fdf7f2 | 940 | ikey, RangeDelPositioningMode::kBackwardTraversal)) { |
7c673cae FG |
941 | merge_context_.Clear(); |
942 | last_key_entry_type = kTypeRangeDeletion; | |
943 | last_not_merge_type = last_key_entry_type; | |
944 | PERF_COUNTER_ADD(internal_delete_skipped_count, 1); | |
945 | } else { | |
946 | assert(merge_operator_ != nullptr); | |
947 | merge_context_.PushOperandBack( | |
948 | iter_->value(), iter_->IsValuePinned() /* operand_pinned */); | |
949 | PERF_COUNTER_ADD(internal_merge_count, 1); | |
950 | } | |
951 | break; | |
952 | default: | |
953 | assert(false); | |
954 | } | |
955 | ||
956 | PERF_COUNTER_ADD(internal_key_skipped_count, 1); | |
7c673cae FG |
957 | iter_->Prev(); |
958 | ++num_skipped; | |
11fdf7f2 TL |
959 | } |
960 | ||
961 | if (!iter_->status().ok()) { | |
962 | valid_ = false; | |
963 | return false; | |
7c673cae FG |
964 | } |
965 | ||
966 | Status s; | |
11fdf7f2 | 967 | is_blob_ = false; |
7c673cae FG |
968 | switch (last_key_entry_type) { |
969 | case kTypeDeletion: | |
970 | case kTypeSingleDeletion: | |
971 | case kTypeRangeDeletion: | |
972 | valid_ = false; | |
11fdf7f2 | 973 | return true; |
7c673cae FG |
974 | case kTypeMerge: |
975 | current_entry_is_merged_ = true; | |
976 | if (last_not_merge_type == kTypeDeletion || | |
977 | last_not_merge_type == kTypeSingleDeletion || | |
978 | last_not_merge_type == kTypeRangeDeletion) { | |
979 | s = MergeHelper::TimedFullMerge( | |
980 | merge_operator_, saved_key_.GetUserKey(), nullptr, | |
981 | merge_context_.GetOperands(), &saved_value_, logger_, statistics_, | |
11fdf7f2 TL |
982 | env_, &pinned_value_, true); |
983 | } else if (last_not_merge_type == kTypeBlobIndex) { | |
984 | if (!allow_blob_) { | |
985 | ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); | |
986 | status_ = Status::NotSupported( | |
987 | "Encounter unexpected blob index. Please open DB with " | |
988 | "rocksdb::blob_db::BlobDB instead."); | |
989 | } else { | |
990 | status_ = | |
991 | Status::NotSupported("Blob DB does not support merge operator."); | |
992 | } | |
993 | valid_ = false; | |
994 | return false; | |
7c673cae FG |
995 | } else { |
996 | assert(last_not_merge_type == kTypeValue); | |
997 | s = MergeHelper::TimedFullMerge( | |
998 | merge_operator_, saved_key_.GetUserKey(), &pinned_value_, | |
999 | merge_context_.GetOperands(), &saved_value_, logger_, statistics_, | |
11fdf7f2 | 1000 | env_, &pinned_value_, true); |
7c673cae FG |
1001 | } |
1002 | break; | |
1003 | case kTypeValue: | |
11fdf7f2 TL |
1004 | // do nothing - we've already has value in pinned_value_ |
1005 | break; | |
1006 | case kTypeBlobIndex: | |
1007 | if (!allow_blob_) { | |
1008 | ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); | |
1009 | status_ = Status::NotSupported( | |
1010 | "Encounter unexpected blob index. Please open DB with " | |
1011 | "rocksdb::blob_db::BlobDB instead."); | |
1012 | valid_ = false; | |
1013 | return false; | |
1014 | } | |
1015 | is_blob_ = true; | |
7c673cae FG |
1016 | break; |
1017 | default: | |
1018 | assert(false); | |
1019 | break; | |
1020 | } | |
7c673cae | 1021 | if (!s.ok()) { |
11fdf7f2 | 1022 | valid_ = false; |
7c673cae | 1023 | status_ = s; |
11fdf7f2 | 1024 | return false; |
7c673cae | 1025 | } |
11fdf7f2 | 1026 | valid_ = true; |
7c673cae FG |
1027 | return true; |
1028 | } | |
1029 | ||
1030 | // This function is used in FindValueForCurrentKey. | |
1031 | // We use Seek() function instead of Prev() to find necessary value | |
11fdf7f2 TL |
1032 | // TODO: This is very similar to FindNextUserEntry() and MergeValuesNewToOld(). |
1033 | // Would be nice to reuse some code. | |
7c673cae FG |
1034 | bool DBIter::FindValueForCurrentKeyUsingSeek() { |
1035 | // FindValueForCurrentKey will enable pinning before calling | |
1036 | // FindValueForCurrentKeyUsingSeek() | |
1037 | assert(pinned_iters_mgr_.PinningEnabled()); | |
1038 | std::string last_key; | |
1039 | AppendInternalKey(&last_key, ParsedInternalKey(saved_key_.GetUserKey(), | |
1040 | sequence_, kValueTypeForSeek)); | |
1041 | iter_->Seek(last_key); | |
1042 | RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); | |
1043 | ||
11fdf7f2 TL |
1044 | // In case read_callback presents, the value we seek to may not be visible. |
1045 | // Find the next value that's visible. | |
7c673cae | 1046 | ParsedInternalKey ikey; |
11fdf7f2 TL |
1047 | while (true) { |
1048 | if (!iter_->Valid()) { | |
1049 | valid_ = false; | |
1050 | return iter_->status().ok(); | |
1051 | } | |
1052 | ||
1053 | if (!ParseKey(&ikey)) { | |
1054 | return false; | |
1055 | } | |
1056 | if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { | |
1057 | // No visible values for this key, even though FindValueForCurrentKey() | |
1058 | // has seen some. This is possible if we're using a tailing iterator, and | |
1059 | // the entries were discarded in a compaction. | |
1060 | valid_ = false; | |
1061 | return true; | |
1062 | } | |
1063 | ||
1064 | if (IsVisible(ikey.sequence)) { | |
1065 | break; | |
1066 | } | |
1067 | ||
1068 | iter_->Next(); | |
1069 | } | |
7c673cae FG |
1070 | |
1071 | if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || | |
1072 | range_del_agg_.ShouldDelete( | |
11fdf7f2 TL |
1073 | ikey, RangeDelPositioningMode::kBackwardTraversal)) { |
1074 | valid_ = false; | |
1075 | return true; | |
1076 | } | |
1077 | if (ikey.type == kTypeBlobIndex && !allow_blob_) { | |
1078 | ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); | |
1079 | status_ = Status::NotSupported( | |
1080 | "Encounter unexpected blob index. Please open DB with " | |
1081 | "rocksdb::blob_db::BlobDB instead."); | |
7c673cae FG |
1082 | valid_ = false; |
1083 | return false; | |
1084 | } | |
11fdf7f2 | 1085 | if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) { |
7c673cae FG |
1086 | assert(iter_->IsValuePinned()); |
1087 | pinned_value_ = iter_->value(); | |
1088 | valid_ = true; | |
1089 | return true; | |
1090 | } | |
1091 | ||
1092 | // kTypeMerge. We need to collect all kTypeMerge values and save them | |
1093 | // in operands | |
11fdf7f2 | 1094 | assert(ikey.type == kTypeMerge); |
7c673cae FG |
1095 | current_entry_is_merged_ = true; |
1096 | merge_context_.Clear(); | |
11fdf7f2 TL |
1097 | merge_context_.PushOperand(iter_->value(), |
1098 | iter_->IsValuePinned() /* operand_pinned */); | |
1099 | while (true) { | |
7c673cae | 1100 | iter_->Next(); |
7c673cae | 1101 | |
11fdf7f2 TL |
1102 | if (!iter_->Valid()) { |
1103 | if (!iter_->status().ok()) { | |
1104 | valid_ = false; | |
1105 | return false; | |
1106 | } | |
1107 | break; | |
7c673cae | 1108 | } |
11fdf7f2 TL |
1109 | if (!ParseKey(&ikey)) { |
1110 | return false; | |
1111 | } | |
1112 | if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetUserKey())) { | |
1113 | break; | |
1114 | } | |
1115 | ||
1116 | if (ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion || | |
1117 | range_del_agg_.ShouldDelete( | |
1118 | ikey, RangeDelPositioningMode::kBackwardTraversal)) { | |
1119 | break; | |
1120 | } else if (ikey.type == kTypeValue) { | |
1121 | const Slice val = iter_->value(); | |
1122 | Status s = MergeHelper::TimedFullMerge( | |
1123 | merge_operator_, saved_key_.GetUserKey(), &val, | |
1124 | merge_context_.GetOperands(), &saved_value_, logger_, statistics_, | |
1125 | env_, &pinned_value_, true); | |
1126 | if (!s.ok()) { | |
1127 | valid_ = false; | |
1128 | status_ = s; | |
1129 | return false; | |
1130 | } | |
1131 | valid_ = true; | |
1132 | return true; | |
1133 | } else if (ikey.type == kTypeMerge) { | |
1134 | merge_context_.PushOperand(iter_->value(), | |
1135 | iter_->IsValuePinned() /* operand_pinned */); | |
1136 | PERF_COUNTER_ADD(internal_merge_count, 1); | |
1137 | } else if (ikey.type == kTypeBlobIndex) { | |
1138 | if (!allow_blob_) { | |
1139 | ROCKS_LOG_ERROR(logger_, "Encounter unexpected blob index."); | |
1140 | status_ = Status::NotSupported( | |
1141 | "Encounter unexpected blob index. Please open DB with " | |
1142 | "rocksdb::blob_db::BlobDB instead."); | |
1143 | } else { | |
1144 | status_ = | |
1145 | Status::NotSupported("Blob DB does not support merge operator."); | |
1146 | } | |
1147 | valid_ = false; | |
1148 | return false; | |
1149 | } else { | |
1150 | assert(false); | |
7c673cae | 1151 | } |
7c673cae FG |
1152 | } |
1153 | ||
11fdf7f2 TL |
1154 | Status s = MergeHelper::TimedFullMerge( |
1155 | merge_operator_, saved_key_.GetUserKey(), nullptr, | |
1156 | merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_, | |
1157 | &pinned_value_, true); | |
7c673cae | 1158 | if (!s.ok()) { |
11fdf7f2 | 1159 | valid_ = false; |
7c673cae | 1160 | status_ = s; |
11fdf7f2 | 1161 | return false; |
7c673cae | 1162 | } |
7c673cae | 1163 | |
11fdf7f2 TL |
1164 | // Make sure we leave iter_ in a good state. If it's valid and we don't care |
1165 | // about prefixes, that's already good enough. Otherwise it needs to be | |
1166 | // seeked to the current key. | |
1167 | if ((prefix_extractor_ != nullptr && !total_order_seek_) || !iter_->Valid()) { | |
1168 | if (prefix_extractor_ != nullptr && !total_order_seek_) { | |
1169 | iter_->SeekForPrev(last_key); | |
1170 | } else { | |
1171 | iter_->Seek(last_key); | |
1172 | if (!iter_->Valid() && iter_->status().ok()) { | |
1173 | iter_->SeekToLast(); | |
1174 | } | |
1175 | } | |
1176 | RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); | |
7c673cae | 1177 | } |
11fdf7f2 TL |
1178 | |
1179 | valid_ = true; | |
1180 | return true; | |
7c673cae FG |
1181 | } |
1182 | ||
11fdf7f2 TL |
1183 | // Move backwards until the key smaller than saved_key_. |
1184 | // Changes valid_ only if return value is false. | |
1185 | bool DBIter::FindUserKeyBeforeSavedKey() { | |
1186 | assert(status_.ok()); | |
7c673cae | 1187 | size_t num_skipped = 0; |
11fdf7f2 TL |
1188 | while (iter_->Valid()) { |
1189 | ParsedInternalKey ikey; | |
1190 | if (!ParseKey(&ikey)) { | |
1191 | return false; | |
7c673cae FG |
1192 | } |
1193 | ||
11fdf7f2 TL |
1194 | if (user_comparator_->Compare(ikey.user_key, saved_key_.GetUserKey()) < 0) { |
1195 | return true; | |
7c673cae | 1196 | } |
11fdf7f2 TL |
1197 | |
1198 | if (TooManyInternalKeysSkipped()) { | |
1199 | return false; | |
1200 | } | |
1201 | ||
1202 | assert(ikey.sequence != kMaxSequenceNumber); | |
1203 | if (!IsVisible(ikey.sequence)) { | |
7c673cae FG |
1204 | PERF_COUNTER_ADD(internal_recent_skipped_count, 1); |
1205 | } else { | |
1206 | PERF_COUNTER_ADD(internal_key_skipped_count, 1); | |
1207 | } | |
11fdf7f2 TL |
1208 | |
1209 | if (num_skipped >= max_skip_ && CanReseekToSkip()) { | |
1210 | num_skipped = 0; | |
1211 | IterKey last_key; | |
1212 | last_key.SetInternalKey(ParsedInternalKey( | |
1213 | saved_key_.GetUserKey(), kMaxSequenceNumber, kValueTypeForSeek)); | |
1214 | // It would be more efficient to use SeekForPrev() here, but some | |
1215 | // iterators may not support it. | |
1216 | iter_->Seek(last_key.GetInternalKey()); | |
1217 | RecordTick(statistics_, NUMBER_OF_RESEEKS_IN_ITERATION); | |
1218 | if (!iter_->Valid()) { | |
1219 | break; | |
1220 | } | |
1221 | } else { | |
1222 | ++num_skipped; | |
1223 | } | |
1224 | ||
7c673cae | 1225 | iter_->Prev(); |
7c673cae | 1226 | } |
11fdf7f2 TL |
1227 | |
1228 | if (!iter_->status().ok()) { | |
1229 | valid_ = false; | |
1230 | return false; | |
1231 | } | |
1232 | ||
1233 | return true; | |
7c673cae FG |
1234 | } |
1235 | ||
1236 | bool DBIter::TooManyInternalKeysSkipped(bool increment) { | |
1237 | if ((max_skippable_internal_keys_ > 0) && | |
1238 | (num_internal_keys_skipped_ > max_skippable_internal_keys_)) { | |
1239 | valid_ = false; | |
1240 | status_ = Status::Incomplete("Too many internal keys skipped."); | |
1241 | return true; | |
1242 | } else if (increment) { | |
1243 | num_internal_keys_skipped_++; | |
1244 | } | |
1245 | return false; | |
1246 | } | |
1247 | ||
11fdf7f2 TL |
1248 | bool DBIter::IsVisible(SequenceNumber sequence) { |
1249 | return sequence <= MaxVisibleSequenceNumber() && | |
1250 | (read_callback_ == nullptr || read_callback_->IsVisible(sequence)); | |
1251 | } | |
1252 | ||
1253 | bool DBIter::CanReseekToSkip() { | |
1254 | return read_callback_ == nullptr || | |
1255 | read_callback_->MaxUnpreparedSequenceNumber() == 0; | |
1256 | } | |
1257 | ||
1258 | SequenceNumber DBIter::MaxVisibleSequenceNumber() { | |
1259 | if (read_callback_ == nullptr) { | |
1260 | return sequence_; | |
7c673cae | 1261 | } |
11fdf7f2 TL |
1262 | |
1263 | return std::max(sequence_, read_callback_->MaxUnpreparedSequenceNumber()); | |
7c673cae FG |
1264 | } |
1265 | ||
1266 | void DBIter::Seek(const Slice& target) { | |
1267 | StopWatch sw(env_, statistics_, DB_SEEK); | |
11fdf7f2 | 1268 | status_ = Status::OK(); |
7c673cae FG |
1269 | ReleaseTempPinnedData(); |
1270 | ResetInternalKeysSkippedCounter(); | |
11fdf7f2 TL |
1271 | |
1272 | SequenceNumber seq = MaxVisibleSequenceNumber(); | |
7c673cae | 1273 | saved_key_.Clear(); |
11fdf7f2 TL |
1274 | saved_key_.SetInternalKey(target, seq); |
1275 | ||
1276 | #ifndef ROCKSDB_LITE | |
1277 | if (db_impl_ != nullptr && cfd_ != nullptr) { | |
1278 | db_impl_->TraceIteratorSeek(cfd_->GetID(), target); | |
1279 | } | |
1280 | #endif // ROCKSDB_LITE | |
1281 | ||
1282 | if (iterate_lower_bound_ != nullptr && | |
1283 | user_comparator_->Compare(saved_key_.GetUserKey(), | |
1284 | *iterate_lower_bound_) < 0) { | |
1285 | saved_key_.Clear(); | |
1286 | saved_key_.SetInternalKey(*iterate_lower_bound_, seq); | |
1287 | } | |
7c673cae FG |
1288 | |
1289 | { | |
1290 | PERF_TIMER_GUARD(seek_internal_seek_time); | |
1291 | iter_->Seek(saved_key_.GetInternalKey()); | |
11fdf7f2 | 1292 | range_del_agg_.InvalidateRangeDelMapPositions(); |
7c673cae FG |
1293 | } |
1294 | RecordTick(statistics_, NUMBER_DB_SEEK); | |
1295 | if (iter_->Valid()) { | |
1296 | if (prefix_extractor_ && prefix_same_as_start_) { | |
1297 | prefix_start_key_ = prefix_extractor_->Transform(target); | |
1298 | } | |
1299 | direction_ = kForward; | |
1300 | ClearSavedValue(); | |
1301 | FindNextUserEntry(false /* not skipping */, prefix_same_as_start_); | |
1302 | if (!valid_) { | |
1303 | prefix_start_key_.clear(); | |
1304 | } | |
1305 | if (statistics_ != nullptr) { | |
1306 | if (valid_) { | |
11fdf7f2 | 1307 | // Decrement since we don't want to count this key as skipped |
7c673cae FG |
1308 | RecordTick(statistics_, NUMBER_DB_SEEK_FOUND); |
1309 | RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); | |
11fdf7f2 | 1310 | PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size()); |
7c673cae FG |
1311 | } |
1312 | } | |
1313 | } else { | |
1314 | valid_ = false; | |
1315 | } | |
1316 | ||
1317 | if (valid_ && prefix_extractor_ && prefix_same_as_start_) { | |
1318 | prefix_start_buf_.SetUserKey(prefix_start_key_); | |
1319 | prefix_start_key_ = prefix_start_buf_.GetUserKey(); | |
1320 | } | |
1321 | } | |
1322 | ||
1323 | void DBIter::SeekForPrev(const Slice& target) { | |
1324 | StopWatch sw(env_, statistics_, DB_SEEK); | |
11fdf7f2 | 1325 | status_ = Status::OK(); |
7c673cae FG |
1326 | ReleaseTempPinnedData(); |
1327 | ResetInternalKeysSkippedCounter(); | |
1328 | saved_key_.Clear(); | |
1329 | // now saved_key is used to store internal key. | |
1330 | saved_key_.SetInternalKey(target, 0 /* sequence_number */, | |
1331 | kValueTypeForSeekForPrev); | |
1332 | ||
11fdf7f2 TL |
1333 | if (iterate_upper_bound_ != nullptr && |
1334 | user_comparator_->Compare(saved_key_.GetUserKey(), | |
1335 | *iterate_upper_bound_) >= 0) { | |
1336 | saved_key_.Clear(); | |
1337 | saved_key_.SetInternalKey(*iterate_upper_bound_, kMaxSequenceNumber); | |
1338 | } | |
1339 | ||
7c673cae FG |
1340 | { |
1341 | PERF_TIMER_GUARD(seek_internal_seek_time); | |
1342 | iter_->SeekForPrev(saved_key_.GetInternalKey()); | |
11fdf7f2 | 1343 | range_del_agg_.InvalidateRangeDelMapPositions(); |
7c673cae FG |
1344 | } |
1345 | ||
11fdf7f2 TL |
1346 | #ifndef ROCKSDB_LITE |
1347 | if (db_impl_ != nullptr && cfd_ != nullptr) { | |
1348 | db_impl_->TraceIteratorSeekForPrev(cfd_->GetID(), target); | |
1349 | } | |
1350 | #endif // ROCKSDB_LITE | |
1351 | ||
7c673cae FG |
1352 | RecordTick(statistics_, NUMBER_DB_SEEK); |
1353 | if (iter_->Valid()) { | |
1354 | if (prefix_extractor_ && prefix_same_as_start_) { | |
1355 | prefix_start_key_ = prefix_extractor_->Transform(target); | |
1356 | } | |
1357 | direction_ = kReverse; | |
1358 | ClearSavedValue(); | |
1359 | PrevInternal(); | |
1360 | if (!valid_) { | |
1361 | prefix_start_key_.clear(); | |
1362 | } | |
1363 | if (statistics_ != nullptr) { | |
1364 | if (valid_) { | |
1365 | RecordTick(statistics_, NUMBER_DB_SEEK_FOUND); | |
1366 | RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); | |
11fdf7f2 | 1367 | PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size()); |
7c673cae FG |
1368 | } |
1369 | } | |
1370 | } else { | |
1371 | valid_ = false; | |
1372 | } | |
1373 | if (valid_ && prefix_extractor_ && prefix_same_as_start_) { | |
1374 | prefix_start_buf_.SetUserKey(prefix_start_key_); | |
1375 | prefix_start_key_ = prefix_start_buf_.GetUserKey(); | |
1376 | } | |
1377 | } | |
1378 | ||
1379 | void DBIter::SeekToFirst() { | |
11fdf7f2 TL |
1380 | if (iterate_lower_bound_ != nullptr) { |
1381 | Seek(*iterate_lower_bound_); | |
1382 | return; | |
1383 | } | |
7c673cae FG |
1384 | // Don't use iter_::Seek() if we set a prefix extractor |
1385 | // because prefix seek will be used. | |
11fdf7f2 | 1386 | if (prefix_extractor_ != nullptr && !total_order_seek_) { |
7c673cae FG |
1387 | max_skip_ = std::numeric_limits<uint64_t>::max(); |
1388 | } | |
11fdf7f2 | 1389 | status_ = Status::OK(); |
7c673cae FG |
1390 | direction_ = kForward; |
1391 | ReleaseTempPinnedData(); | |
1392 | ResetInternalKeysSkippedCounter(); | |
1393 | ClearSavedValue(); | |
1394 | ||
1395 | { | |
1396 | PERF_TIMER_GUARD(seek_internal_seek_time); | |
1397 | iter_->SeekToFirst(); | |
11fdf7f2 | 1398 | range_del_agg_.InvalidateRangeDelMapPositions(); |
7c673cae FG |
1399 | } |
1400 | ||
1401 | RecordTick(statistics_, NUMBER_DB_SEEK); | |
1402 | if (iter_->Valid()) { | |
1403 | saved_key_.SetUserKey( | |
1404 | ExtractUserKey(iter_->key()), | |
1405 | !iter_->IsKeyPinned() || !pin_thru_lifetime_ /* copy */); | |
1406 | FindNextUserEntry(false /* not skipping */, false /* no prefix check */); | |
1407 | if (statistics_ != nullptr) { | |
1408 | if (valid_) { | |
1409 | RecordTick(statistics_, NUMBER_DB_SEEK_FOUND); | |
1410 | RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); | |
11fdf7f2 | 1411 | PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size()); |
7c673cae FG |
1412 | } |
1413 | } | |
1414 | } else { | |
1415 | valid_ = false; | |
1416 | } | |
1417 | if (valid_ && prefix_extractor_ && prefix_same_as_start_) { | |
1418 | prefix_start_buf_.SetUserKey( | |
1419 | prefix_extractor_->Transform(saved_key_.GetUserKey())); | |
1420 | prefix_start_key_ = prefix_start_buf_.GetUserKey(); | |
1421 | } | |
1422 | } | |
1423 | ||
1424 | void DBIter::SeekToLast() { | |
11fdf7f2 TL |
1425 | if (iterate_upper_bound_ != nullptr) { |
1426 | // Seek to last key strictly less than ReadOptions.iterate_upper_bound. | |
1427 | SeekForPrev(*iterate_upper_bound_); | |
1428 | if (Valid() && user_comparator_->Equal(*iterate_upper_bound_, key())) { | |
1429 | ReleaseTempPinnedData(); | |
1430 | PrevInternal(); | |
1431 | } | |
1432 | return; | |
1433 | } | |
1434 | ||
7c673cae FG |
1435 | // Don't use iter_::Seek() if we set a prefix extractor |
1436 | // because prefix seek will be used. | |
11fdf7f2 | 1437 | if (prefix_extractor_ != nullptr && !total_order_seek_) { |
7c673cae FG |
1438 | max_skip_ = std::numeric_limits<uint64_t>::max(); |
1439 | } | |
11fdf7f2 | 1440 | status_ = Status::OK(); |
7c673cae FG |
1441 | direction_ = kReverse; |
1442 | ReleaseTempPinnedData(); | |
1443 | ResetInternalKeysSkippedCounter(); | |
1444 | ClearSavedValue(); | |
1445 | ||
1446 | { | |
1447 | PERF_TIMER_GUARD(seek_internal_seek_time); | |
1448 | iter_->SeekToLast(); | |
11fdf7f2 | 1449 | range_del_agg_.InvalidateRangeDelMapPositions(); |
7c673cae | 1450 | } |
11fdf7f2 | 1451 | PrevInternal(); |
7c673cae FG |
1452 | if (statistics_ != nullptr) { |
1453 | RecordTick(statistics_, NUMBER_DB_SEEK); | |
1454 | if (valid_) { | |
1455 | RecordTick(statistics_, NUMBER_DB_SEEK_FOUND); | |
1456 | RecordTick(statistics_, ITER_BYTES_READ, key().size() + value().size()); | |
11fdf7f2 | 1457 | PERF_COUNTER_ADD(iter_read_bytes, key().size() + value().size()); |
7c673cae FG |
1458 | } |
1459 | } | |
1460 | if (valid_ && prefix_extractor_ && prefix_same_as_start_) { | |
1461 | prefix_start_buf_.SetUserKey( | |
1462 | prefix_extractor_->Transform(saved_key_.GetUserKey())); | |
1463 | prefix_start_key_ = prefix_start_buf_.GetUserKey(); | |
1464 | } | |
1465 | } | |
1466 | ||
1467 | Iterator* NewDBIterator(Env* env, const ReadOptions& read_options, | |
1468 | const ImmutableCFOptions& cf_options, | |
11fdf7f2 | 1469 | const MutableCFOptions& mutable_cf_options, |
7c673cae FG |
1470 | const Comparator* user_key_comparator, |
1471 | InternalIterator* internal_iter, | |
1472 | const SequenceNumber& sequence, | |
1473 | uint64_t max_sequential_skip_in_iterations, | |
11fdf7f2 TL |
1474 | ReadCallback* read_callback, DBImpl* db_impl, |
1475 | ColumnFamilyData* cfd, bool allow_blob) { | |
7c673cae | 1476 | DBIter* db_iter = new DBIter( |
11fdf7f2 TL |
1477 | env, read_options, cf_options, mutable_cf_options, user_key_comparator, |
1478 | internal_iter, sequence, false, max_sequential_skip_in_iterations, | |
1479 | read_callback, db_impl, cfd, allow_blob); | |
7c673cae FG |
1480 | return db_iter; |
1481 | } | |
1482 | ||
1483 | ArenaWrappedDBIter::~ArenaWrappedDBIter() { db_iter_->~DBIter(); } | |
1484 | ||
7c673cae FG |
1485 | RangeDelAggregator* ArenaWrappedDBIter::GetRangeDelAggregator() { |
1486 | return db_iter_->GetRangeDelAggregator(); | |
1487 | } | |
1488 | ||
1489 | void ArenaWrappedDBIter::SetIterUnderDBIter(InternalIterator* iter) { | |
1490 | static_cast<DBIter*>(db_iter_)->SetIter(iter); | |
1491 | } | |
1492 | ||
1493 | inline bool ArenaWrappedDBIter::Valid() const { return db_iter_->Valid(); } | |
1494 | inline void ArenaWrappedDBIter::SeekToFirst() { db_iter_->SeekToFirst(); } | |
1495 | inline void ArenaWrappedDBIter::SeekToLast() { db_iter_->SeekToLast(); } | |
1496 | inline void ArenaWrappedDBIter::Seek(const Slice& target) { | |
1497 | db_iter_->Seek(target); | |
1498 | } | |
1499 | inline void ArenaWrappedDBIter::SeekForPrev(const Slice& target) { | |
1500 | db_iter_->SeekForPrev(target); | |
1501 | } | |
1502 | inline void ArenaWrappedDBIter::Next() { db_iter_->Next(); } | |
1503 | inline void ArenaWrappedDBIter::Prev() { db_iter_->Prev(); } | |
1504 | inline Slice ArenaWrappedDBIter::key() const { return db_iter_->key(); } | |
1505 | inline Slice ArenaWrappedDBIter::value() const { return db_iter_->value(); } | |
1506 | inline Status ArenaWrappedDBIter::status() const { return db_iter_->status(); } | |
11fdf7f2 | 1507 | bool ArenaWrappedDBIter::IsBlob() const { return db_iter_->IsBlob(); } |
7c673cae FG |
1508 | inline Status ArenaWrappedDBIter::GetProperty(std::string prop_name, |
1509 | std::string* prop) { | |
11fdf7f2 TL |
1510 | if (prop_name == "rocksdb.iterator.super-version-number") { |
1511 | // First try to pass the value returned from inner iterator. | |
1512 | if (!db_iter_->GetProperty(prop_name, prop).ok()) { | |
1513 | *prop = ToString(sv_number_); | |
1514 | } | |
1515 | return Status::OK(); | |
1516 | } | |
7c673cae FG |
1517 | return db_iter_->GetProperty(prop_name, prop); |
1518 | } | |
11fdf7f2 TL |
1519 | |
1520 | void ArenaWrappedDBIter::Init(Env* env, const ReadOptions& read_options, | |
1521 | const ImmutableCFOptions& cf_options, | |
1522 | const MutableCFOptions& mutable_cf_options, | |
1523 | const SequenceNumber& sequence, | |
1524 | uint64_t max_sequential_skip_in_iteration, | |
1525 | uint64_t version_number, | |
1526 | ReadCallback* read_callback, DBImpl* db_impl, | |
1527 | ColumnFamilyData* cfd, bool allow_blob, | |
1528 | bool allow_refresh) { | |
1529 | auto mem = arena_.AllocateAligned(sizeof(DBIter)); | |
1530 | db_iter_ = new (mem) DBIter(env, read_options, cf_options, mutable_cf_options, | |
1531 | cf_options.user_comparator, nullptr, sequence, | |
1532 | true, max_sequential_skip_in_iteration, | |
1533 | read_callback, db_impl, cfd, allow_blob); | |
1534 | sv_number_ = version_number; | |
1535 | allow_refresh_ = allow_refresh; | |
1536 | } | |
1537 | ||
1538 | Status ArenaWrappedDBIter::Refresh() { | |
1539 | if (cfd_ == nullptr || db_impl_ == nullptr || !allow_refresh_) { | |
1540 | return Status::NotSupported("Creating renew iterator is not allowed."); | |
1541 | } | |
1542 | assert(db_iter_ != nullptr); | |
1543 | // TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the | |
1544 | // correct behavior. Will be corrected automatically when we take a snapshot | |
1545 | // here for the case of WritePreparedTxnDB. | |
1546 | SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber(); | |
1547 | uint64_t cur_sv_number = cfd_->GetSuperVersionNumber(); | |
1548 | if (sv_number_ != cur_sv_number) { | |
1549 | Env* env = db_iter_->env(); | |
1550 | db_iter_->~DBIter(); | |
1551 | arena_.~Arena(); | |
1552 | new (&arena_) Arena(); | |
1553 | ||
1554 | SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_->mutex()); | |
1555 | Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options, | |
1556 | latest_seq, sv->mutable_cf_options.max_sequential_skip_in_iterations, | |
1557 | cur_sv_number, read_callback_, db_impl_, cfd_, allow_blob_, | |
1558 | allow_refresh_); | |
1559 | ||
1560 | InternalIterator* internal_iter = db_impl_->NewInternalIterator( | |
1561 | read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator()); | |
1562 | SetIterUnderDBIter(internal_iter); | |
1563 | } else { | |
1564 | db_iter_->set_sequence(latest_seq); | |
1565 | db_iter_->set_valid(false); | |
1566 | } | |
1567 | return Status::OK(); | |
7c673cae FG |
1568 | } |
1569 | ||
1570 | ArenaWrappedDBIter* NewArenaWrappedDbIterator( | |
1571 | Env* env, const ReadOptions& read_options, | |
11fdf7f2 TL |
1572 | const ImmutableCFOptions& cf_options, |
1573 | const MutableCFOptions& mutable_cf_options, const SequenceNumber& sequence, | |
1574 | uint64_t max_sequential_skip_in_iterations, uint64_t version_number, | |
1575 | ReadCallback* read_callback, DBImpl* db_impl, ColumnFamilyData* cfd, | |
1576 | bool allow_blob, bool allow_refresh) { | |
7c673cae | 1577 | ArenaWrappedDBIter* iter = new ArenaWrappedDBIter(); |
11fdf7f2 TL |
1578 | iter->Init(env, read_options, cf_options, mutable_cf_options, sequence, |
1579 | max_sequential_skip_in_iterations, version_number, read_callback, | |
1580 | db_impl, cfd, allow_blob, allow_refresh); | |
1581 | if (db_impl != nullptr && cfd != nullptr && allow_refresh) { | |
1582 | iter->StoreRefreshInfo(read_options, db_impl, cfd, read_callback, | |
1583 | allow_blob); | |
1584 | } | |
7c673cae FG |
1585 | |
1586 | return iter; | |
1587 | } | |
1588 | ||
1589 | } // namespace rocksdb |