]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/memtable/inlineskiplist.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional
4 // grant of patent rights can be found in the PATENTS file in the same
7 // Copyright (c) 2011 The LevelDB Authors. All rights reserved. Use of
8 // this source code is governed by a BSD-style license that can be found
9 // in the LICENSE file. See the AUTHORS file for names of contributors.
11 // InlineSkipList is derived from SkipList (skiplist.h), but it optimizes
12 // the memory layout by requiring that the key storage be allocated through
13 // the skip list instance. For the common case of SkipList<const char*,
14 // Cmp> this saves 1 pointer per skip list node and gives better cache
15 // locality, at the expense of wasted padding from using AllocateAligned
16 // instead of Allocate for the keys. The unused padding will be from
17 // 0 to sizeof(void*)-1 bytes, and the space savings are sizeof(void*)
18 // bytes, so despite the padding the space used is always less than
19 // SkipList<const char*, ..>.
21 // Thread safety -------------
23 // Writes via Insert require external synchronization, most likely a mutex.
24 // InsertConcurrently can be safely called concurrently with reads and
25 // with other concurrent inserts. Reads require a guarantee that the
26 // InlineSkipList will not be destroyed while the read is in progress.
27 // Apart from that, reads progress without any internal locking or
32 // (1) Allocated nodes are never deleted until the InlineSkipList is
33 // destroyed. This is trivially guaranteed by the code since we never
34 // delete any skip list nodes.
36 // (2) The contents of a Node except for the next/prev pointers are
37 // immutable after the Node has been linked into the InlineSkipList.
38 // Only Insert() modifies the list, and it is careful to initialize a
39 // node and use release-stores to publish the nodes in one or more lists.
41 // ... prev vs. next pointer ordering ...
49 #include "port/port.h"
50 #include "util/allocator.h"
51 #include "util/random.h"
55 template <class Comparator
>
56 class InlineSkipList
{
62 static const uint16_t kMaxPossibleHeight
= 32;
64 // Create a new InlineSkipList object that will use "cmp" for comparing
65 // keys, and will allocate memory using "*allocator". Objects allocated
66 // in the allocator must remain allocated for the lifetime of the
68 explicit InlineSkipList(Comparator cmp
, Allocator
* allocator
,
69 int32_t max_height
= 12,
70 int32_t branching_factor
= 4);
72 // Allocates a key and a skip-list node, returning a pointer to the key
73 // portion of the node. This method is thread-safe if the allocator
75 char* AllocateKey(size_t key_size
);
77 // Allocate a splice using allocator.
78 Splice
* AllocateSplice();
80 // Inserts a key allocated by AllocateKey, after the actual key value
81 // has been filled in.
83 // REQUIRES: nothing that compares equal to key is currently in the list.
84 // REQUIRES: no concurrent calls to any of inserts.
85 void Insert(const char* key
);
87 // Inserts a key allocated by AllocateKey with a hint of last insert
88 // position in the skip-list. If hint points to nullptr, a new hint will be
89 // populated, which can be used in subsequent calls.
91 // It can be used to optimize the workload where there are multiple groups
92 // of keys, and each key is likely to insert to a location close to the last
93 // inserted key in the same group. One example is sequential inserts.
95 // REQUIRES: nothing that compares equal to key is currently in the list.
96 // REQUIRES: no concurrent calls to any of inserts.
97 void InsertWithHint(const char* key
, void** hint
);
99 // Like Insert, but external synchronization is not required.
100 void InsertConcurrently(const char* key
);
102 // Inserts a node into the skip list. key must have been allocated by
103 // AllocateKey and then filled in by the caller. If UseCAS is true,
104 // then external synchronization is not required, otherwise this method
105 // may not be called concurrently with any other insertions.
107 // Regardless of whether UseCAS is true, the splice must be owned
108 // exclusively by the current thread. If allow_partial_splice_fix is
109 // true, then the cost of insertion is amortized O(log D), where D is
110 // the distance from the splice to the inserted key (measured as the
111 // number of intervening nodes). Note that this bound is very good for
112 // sequential insertions! If allow_partial_splice_fix is false then
113 // the existing splice will be ignored unless the current key is being
114 // inserted immediately after the splice. allow_partial_splice_fix ==
115 // false has worse running time for the non-sequential case O(log N),
116 // but a better constant factor.
117 template <bool UseCAS
>
118 void Insert(const char* key
, Splice
* splice
, bool allow_partial_splice_fix
);
120 // Returns true iff an entry that compares equal to key is in the list.
121 bool Contains(const char* key
) const;
123 // Return estimated number of entries smaller than `key`.
124 uint64_t EstimateCount(const char* key
) const;
126 // Validate correctness of the skip-list.
127 void TEST_Validate() const;
129 // Iteration over the contents of a skip list
132 // Initialize an iterator over the specified list.
133 // The returned iterator is not valid.
134 explicit Iterator(const InlineSkipList
* list
);
136 // Change the underlying skiplist used for this iterator
137 // This enables us not changing the iterator without deallocating
138 // an old one and then allocating a new one
139 void SetList(const InlineSkipList
* list
);
141 // Returns true iff the iterator is positioned at a valid node.
144 // Returns the key at the current position.
146 const char* key() const;
148 // Advances to the next position.
152 // Advances to the previous position.
156 // Advance to the first entry with a key >= target
157 void Seek(const char* target
);
159 // Retreat to the last entry with a key <= target
160 void SeekForPrev(const char* target
);
162 // Position at the first entry in list.
163 // Final state of iterator is Valid() iff list is not empty.
166 // Position at the last entry in list.
167 // Final state of iterator is Valid() iff list is not empty.
171 const InlineSkipList
* list_
;
173 // Intentionally copyable
177 const uint16_t kMaxHeight_
;
178 const uint16_t kBranching_
;
179 const uint32_t kScaledInverseBranching_
;
181 // Immutable after construction
182 Comparator
const compare_
;
183 Allocator
* const allocator_
; // Allocator used for allocations of nodes
187 // Modified only by Insert(). Read racily by readers, but stale
189 std::atomic
<int> max_height_
; // Height of the entire list
191 // seq_splice_ is a Splice used for insertions in the non-concurrent
192 // case. It caches the prev and next found during the most recent
193 // non-concurrent insertion.
196 inline int GetMaxHeight() const {
197 return max_height_
.load(std::memory_order_relaxed
);
202 Node
* AllocateNode(size_t key_size
, int height
);
204 bool Equal(const char* a
, const char* b
) const {
205 return (compare_(a
, b
) == 0);
208 bool LessThan(const char* a
, const char* b
) const {
209 return (compare_(a
, b
) < 0);
212 // Return true if key is greater than the data stored in "n". Null n
213 // is considered infinite. n should not be head_.
214 bool KeyIsAfterNode(const char* key
, Node
* n
) const;
216 // Returns the earliest node with a key >= key.
217 // Return nullptr if there is no such node.
218 Node
* FindGreaterOrEqual(const char* key
) const;
220 // Return the latest node with a key < key.
221 // Return head_ if there is no such node.
222 // Fills prev[level] with pointer to previous node at "level" for every
223 // level in [0..max_height_-1], if prev is non-null.
224 Node
* FindLessThan(const char* key
, Node
** prev
= nullptr) const;
226 // Return the latest node with a key < key on bottom_level. Start searching
227 // from root node on the level below top_level.
228 // Fills prev[level] with pointer to previous node at "level" for every
229 // level in [bottom_level..top_level-1], if prev is non-null.
230 Node
* FindLessThan(const char* key
, Node
** prev
, Node
* root
, int top_level
,
231 int bottom_level
) const;
233 // Return the last node in the list.
234 // Return head_ if list is empty.
235 Node
* FindLast() const;
237 // Traverses a single level of the list, setting *out_prev to the last
238 // node before the key and *out_next to the first node after. Assumes
239 // that the key is not present in the skip list. On entry, before should
240 // point to a node that is before the key, and after should point to
241 // a node that is after the key. after should be nullptr if a good after
242 // node isn't conveniently available.
243 void FindSpliceForLevel(const char* key
, Node
* before
, Node
* after
, int level
,
244 Node
** out_prev
, Node
** out_next
);
246 // Recomputes Splice levels from highest_level (inclusive) down to
247 // lowest_level (inclusive).
248 void RecomputeSpliceLevels(const char* key
, Splice
* splice
,
249 int recompute_level
);
251 // No copying allowed
252 InlineSkipList(const InlineSkipList
&);
253 InlineSkipList
& operator=(const InlineSkipList
&);
256 // Implementation details follow
258 template <class Comparator
>
259 struct InlineSkipList
<Comparator
>::Splice
{
260 // The invariant of a Splice is that prev_[i+1].key <= prev_[i].key <
261 // next_[i].key <= next_[i+1].key for all i. That means that if a
262 // key is bracketed by prev_[i] and next_[i] then it is bracketed by
263 // all higher levels. It is _not_ required that prev_[i]->Next(i) ==
264 // next_[i] (it probably did at some point in the past, but intervening
265 // or concurrent operations might have inserted nodes in between).
271 // The Node data type is more of a pointer into custom-managed memory than
272 // a traditional C++ struct. The key is stored in the bytes immediately
273 // after the struct, and the next_ pointers for nodes with height > 1 are
274 // stored immediately _before_ the struct. This avoids the need to include
275 // any pointer or sizing data, which reduces per-node memory overheads.
276 template <class Comparator
>
277 struct InlineSkipList
<Comparator
>::Node
{
278 // Stores the height of the node in the memory location normally used for
279 // next_[0]. This is used for passing data from AllocateKey to Insert.
280 void StashHeight(const int height
) {
281 assert(sizeof(int) <= sizeof(next_
[0]));
282 memcpy(&next_
[0], &height
, sizeof(int));
285 // Retrieves the value passed to StashHeight. Undefined after a call
286 // to SetNext or NoBarrier_SetNext.
287 int UnstashHeight() const {
289 memcpy(&rv
, &next_
[0], sizeof(int));
293 const char* Key() const { return reinterpret_cast<const char*>(&next_
[1]); }
295 // Accessors/mutators for links. Wrapped in methods so we can add
296 // the appropriate barriers as necessary, and perform the necessary
297 // addressing trickery for storing links below the Node in memory.
300 // Use an 'acquire load' so that we observe a fully initialized
301 // version of the returned Node.
302 return (next_
[-n
].load(std::memory_order_acquire
));
305 void SetNext(int n
, Node
* x
) {
307 // Use a 'release store' so that anybody who reads through this
308 // pointer observes a fully initialized version of the inserted node.
309 next_
[-n
].store(x
, std::memory_order_release
);
312 bool CASNext(int n
, Node
* expected
, Node
* x
) {
314 return next_
[-n
].compare_exchange_strong(expected
, x
);
317 // No-barrier variants that can be safely used in a few locations.
318 Node
* NoBarrier_Next(int n
) {
320 return next_
[-n
].load(std::memory_order_relaxed
);
323 void NoBarrier_SetNext(int n
, Node
* x
) {
325 next_
[-n
].store(x
, std::memory_order_relaxed
);
328 // Insert node after prev on specific level.
329 void InsertAfter(Node
* prev
, int level
) {
330 // NoBarrier_SetNext() suffices since we will add a barrier when
331 // we publish a pointer to "this" in prev.
332 NoBarrier_SetNext(level
, prev
->NoBarrier_Next(level
));
333 prev
->SetNext(level
, this);
337 // next_[0] is the lowest level link (level 0). Higher levels are
338 // stored _earlier_, so level 1 is at next_[-1].
339 std::atomic
<Node
*> next_
[1];
342 template <class Comparator
>
343 inline InlineSkipList
<Comparator
>::Iterator::Iterator(
344 const InlineSkipList
* list
) {
348 template <class Comparator
>
349 inline void InlineSkipList
<Comparator
>::Iterator::SetList(
350 const InlineSkipList
* list
) {
355 template <class Comparator
>
356 inline bool InlineSkipList
<Comparator
>::Iterator::Valid() const {
357 return node_
!= nullptr;
360 template <class Comparator
>
361 inline const char* InlineSkipList
<Comparator
>::Iterator::key() const {
366 template <class Comparator
>
367 inline void InlineSkipList
<Comparator
>::Iterator::Next() {
369 node_
= node_
->Next(0);
372 template <class Comparator
>
373 inline void InlineSkipList
<Comparator
>::Iterator::Prev() {
374 // Instead of using explicit "prev" links, we just search for the
375 // last node that falls before key.
377 node_
= list_
->FindLessThan(node_
->Key());
378 if (node_
== list_
->head_
) {
383 template <class Comparator
>
384 inline void InlineSkipList
<Comparator
>::Iterator::Seek(const char* target
) {
385 node_
= list_
->FindGreaterOrEqual(target
);
388 template <class Comparator
>
389 inline void InlineSkipList
<Comparator
>::Iterator::SeekForPrev(
390 const char* target
) {
395 while (Valid() && list_
->LessThan(target
, key())) {
400 template <class Comparator
>
401 inline void InlineSkipList
<Comparator
>::Iterator::SeekToFirst() {
402 node_
= list_
->head_
->Next(0);
405 template <class Comparator
>
406 inline void InlineSkipList
<Comparator
>::Iterator::SeekToLast() {
407 node_
= list_
->FindLast();
408 if (node_
== list_
->head_
) {
413 template <class Comparator
>
414 int InlineSkipList
<Comparator
>::RandomHeight() {
415 auto rnd
= Random::GetTLSInstance();
417 // Increase height with probability 1 in kBranching
419 while (height
< kMaxHeight_
&& height
< kMaxPossibleHeight
&&
420 rnd
->Next() < kScaledInverseBranching_
) {
424 assert(height
<= kMaxHeight_
);
425 assert(height
<= kMaxPossibleHeight
);
429 template <class Comparator
>
430 bool InlineSkipList
<Comparator
>::KeyIsAfterNode(const char* key
,
432 // nullptr n is considered infinite
434 return (n
!= nullptr) && (compare_(n
->Key(), key
) < 0);
437 template <class Comparator
>
438 typename InlineSkipList
<Comparator
>::Node
*
439 InlineSkipList
<Comparator
>::FindGreaterOrEqual(const char* key
) const {
440 // Note: It looks like we could reduce duplication by implementing
441 // this function as FindLessThan(key)->Next(0), but we wouldn't be able
442 // to exit early on equality and the result wouldn't even be correct.
443 // A concurrent insert might occur after FindLessThan(key) but before
444 // we get a chance to call Next(0).
446 int level
= GetMaxHeight() - 1;
447 Node
* last_bigger
= nullptr;
449 Node
* next
= x
->Next(level
);
450 // Make sure the lists are sorted
451 assert(x
== head_
|| next
== nullptr || KeyIsAfterNode(next
->Key(), x
));
452 // Make sure we haven't overshot during our search
453 assert(x
== head_
|| KeyIsAfterNode(key
, x
));
454 int cmp
= (next
== nullptr || next
== last_bigger
)
456 : compare_(next
->Key(), key
);
457 if (cmp
== 0 || (cmp
> 0 && level
== 0)) {
459 } else if (cmp
< 0) {
460 // Keep searching in this list
463 // Switch to next list, reuse compare_() result
470 template <class Comparator
>
471 typename InlineSkipList
<Comparator
>::Node
*
472 InlineSkipList
<Comparator
>::FindLessThan(const char* key
, Node
** prev
) const {
473 return FindLessThan(key
, prev
, head_
, GetMaxHeight(), 0);
476 template <class Comparator
>
477 typename InlineSkipList
<Comparator
>::Node
*
478 InlineSkipList
<Comparator
>::FindLessThan(const char* key
, Node
** prev
,
479 Node
* root
, int top_level
,
480 int bottom_level
) const {
481 assert(top_level
> bottom_level
);
482 int level
= top_level
- 1;
484 // KeyIsAfter(key, last_not_after) is definitely false
485 Node
* last_not_after
= nullptr;
487 Node
* next
= x
->Next(level
);
488 assert(x
== head_
|| next
== nullptr || KeyIsAfterNode(next
->Key(), x
));
489 assert(x
== head_
|| KeyIsAfterNode(key
, x
));
490 if (next
!= last_not_after
&& KeyIsAfterNode(key
, next
)) {
491 // Keep searching in this list
494 if (prev
!= nullptr) {
497 if (level
== bottom_level
) {
500 // Switch to next list, reuse KeyIsAfterNode() result
501 last_not_after
= next
;
508 template <class Comparator
>
509 typename InlineSkipList
<Comparator
>::Node
*
510 InlineSkipList
<Comparator
>::FindLast() const {
512 int level
= GetMaxHeight() - 1;
514 Node
* next
= x
->Next(level
);
515 if (next
== nullptr) {
519 // Switch to next list
528 template <class Comparator
>
529 uint64_t InlineSkipList
<Comparator
>::EstimateCount(const char* key
) const {
533 int level
= GetMaxHeight() - 1;
535 assert(x
== head_
|| compare_(x
->Key(), key
) < 0);
536 Node
* next
= x
->Next(level
);
537 if (next
== nullptr || compare_(next
->Key(), key
) >= 0) {
541 // Switch to next list
542 count
*= kBranching_
;
552 template <class Comparator
>
553 InlineSkipList
<Comparator
>::InlineSkipList(const Comparator cmp
,
554 Allocator
* allocator
,
556 int32_t branching_factor
)
557 : kMaxHeight_(max_height
),
558 kBranching_(branching_factor
),
559 kScaledInverseBranching_((Random::kMaxNext
+ 1) / kBranching_
),
561 allocator_(allocator
),
562 head_(AllocateNode(0, max_height
)),
564 seq_splice_(AllocateSplice()) {
565 assert(max_height
> 0 && kMaxHeight_
== static_cast<uint32_t>(max_height
));
566 assert(branching_factor
> 1 &&
567 kBranching_
== static_cast<uint32_t>(branching_factor
));
568 assert(kScaledInverseBranching_
> 0);
570 for (int i
= 0; i
< kMaxHeight_
; ++i
) {
571 head_
->SetNext(i
, nullptr);
575 template <class Comparator
>
576 char* InlineSkipList
<Comparator
>::AllocateKey(size_t key_size
) {
577 return const_cast<char*>(AllocateNode(key_size
, RandomHeight())->Key());
580 template <class Comparator
>
581 typename InlineSkipList
<Comparator
>::Node
*
582 InlineSkipList
<Comparator
>::AllocateNode(size_t key_size
, int height
) {
583 auto prefix
= sizeof(std::atomic
<Node
*>) * (height
- 1);
585 // prefix is space for the height - 1 pointers that we store before
586 // the Node instance (next_[-(height - 1) .. -1]). Node starts at
587 // raw + prefix, and holds the bottom-mode (level 0) skip list pointer
588 // next_[0]. key_size is the bytes for the key, which comes just after
590 char* raw
= allocator_
->AllocateAligned(prefix
+ sizeof(Node
) + key_size
);
591 Node
* x
= reinterpret_cast<Node
*>(raw
+ prefix
);
593 // Once we've linked the node into the skip list we don't actually need
594 // to know its height, because we can implicitly use the fact that we
595 // traversed into a node at level h to known that h is a valid level
596 // for that node. We need to convey the height to the Insert step,
597 // however, so that it can perform the proper links. Since we're not
598 // using the pointers at the moment, StashHeight temporarily borrow
599 // storage from next_[0] for that purpose.
600 x
->StashHeight(height
);
604 template <class Comparator
>
605 typename InlineSkipList
<Comparator
>::Splice
*
606 InlineSkipList
<Comparator
>::AllocateSplice() {
607 // size of prev_ and next_
608 size_t array_size
= sizeof(Node
*) * (kMaxHeight_
+ 1);
609 char* raw
= allocator_
->AllocateAligned(sizeof(Splice
) + array_size
* 2);
610 Splice
* splice
= reinterpret_cast<Splice
*>(raw
);
612 splice
->prev_
= reinterpret_cast<Node
**>(raw
+ sizeof(Splice
));
613 splice
->next_
= reinterpret_cast<Node
**>(raw
+ sizeof(Splice
) + array_size
);
617 template <class Comparator
>
618 void InlineSkipList
<Comparator
>::Insert(const char* key
) {
619 Insert
<false>(key
, seq_splice_
, false);
622 template <class Comparator
>
623 void InlineSkipList
<Comparator
>::InsertConcurrently(const char* key
) {
624 Node
* prev
[kMaxPossibleHeight
];
625 Node
* next
[kMaxPossibleHeight
];
629 Insert
<true>(key
, &splice
, false);
632 template <class Comparator
>
633 void InlineSkipList
<Comparator
>::InsertWithHint(const char* key
, void** hint
) {
634 assert(hint
!= nullptr);
635 Splice
* splice
= reinterpret_cast<Splice
*>(*hint
);
636 if (splice
== nullptr) {
637 splice
= AllocateSplice();
638 *hint
= reinterpret_cast<void*>(splice
);
640 Insert
<false>(key
, splice
, true);
643 template <class Comparator
>
644 void InlineSkipList
<Comparator
>::FindSpliceForLevel(const char* key
,
645 Node
* before
, Node
* after
,
646 int level
, Node
** out_prev
,
649 Node
* next
= before
->Next(level
);
650 assert(before
== head_
|| next
== nullptr ||
651 KeyIsAfterNode(next
->Key(), before
));
652 assert(before
== head_
|| KeyIsAfterNode(key
, before
));
653 if (next
== after
|| !KeyIsAfterNode(key
, next
)) {
663 template <class Comparator
>
664 void InlineSkipList
<Comparator
>::RecomputeSpliceLevels(const char* key
,
666 int recompute_level
) {
667 assert(recompute_level
> 0);
668 assert(recompute_level
<= splice
->height_
);
669 for (int i
= recompute_level
- 1; i
>= 0; --i
) {
670 FindSpliceForLevel(key
, splice
->prev_
[i
+ 1], splice
->next_
[i
+ 1], i
,
671 &splice
->prev_
[i
], &splice
->next_
[i
]);
675 template <class Comparator
>
676 template <bool UseCAS
>
677 void InlineSkipList
<Comparator
>::Insert(const char* key
, Splice
* splice
,
678 bool allow_partial_splice_fix
) {
679 Node
* x
= reinterpret_cast<Node
*>(const_cast<char*>(key
)) - 1;
680 int height
= x
->UnstashHeight();
681 assert(height
>= 1 && height
<= kMaxHeight_
);
683 int max_height
= max_height_
.load(std::memory_order_relaxed
);
684 while (height
> max_height
) {
685 if (max_height_
.compare_exchange_weak(max_height
, height
)) {
686 // successfully updated it
690 // else retry, possibly exiting the loop because somebody else
693 assert(max_height
<= kMaxPossibleHeight
);
695 int recompute_height
= 0;
696 if (splice
->height_
< max_height
) {
697 // Either splice has never been used or max_height has grown since
698 // last use. We could potentially fix it in the latter case, but
700 splice
->prev_
[max_height
] = head_
;
701 splice
->next_
[max_height
] = nullptr;
702 splice
->height_
= max_height
;
703 recompute_height
= max_height
;
705 // Splice is a valid proper-height splice that brackets some
706 // key, but does it bracket this one? We need to validate it and
707 // recompute a portion of the splice (levels 0..recompute_height-1)
708 // that is a superset of all levels that don't bracket the new key.
709 // Several choices are reasonable, because we have to balance the work
710 // saved against the extra comparisons required to validate the Splice.
712 // One strategy is just to recompute all of orig_splice_height if the
713 // bottom level isn't bracketing. This pessimistically assumes that
714 // we will either get a perfect Splice hit (increasing sequential
715 // inserts) or have no locality.
717 // Another strategy is to walk up the Splice's levels until we find
718 // a level that brackets the key. This strategy lets the Splice
719 // hint help for other cases: it turns insertion from O(log N) into
720 // O(log D), where D is the number of nodes in between the key that
721 // produced the Splice and the current insert (insertion is aided
722 // whether the new key is before or after the splice). If you have
723 // a way of using a prefix of the key to map directly to the closest
724 // Splice out of O(sqrt(N)) Splices and we make it so that splices
725 // can also be used as hints during read, then we end up with Oshman's
726 // and Shavit's SkipTrie, which has O(log log N) lookup and insertion
727 // (compare to O(log N) for skip list).
729 // We control the pessimistic strategy with allow_partial_splice_fix.
730 // A good strategy is probably to be pessimistic for seq_splice_,
731 // optimistic if the caller actually went to the work of providing
733 while (recompute_height
< max_height
) {
734 if (splice
->prev_
[recompute_height
]->Next(recompute_height
) !=
735 splice
->next_
[recompute_height
]) {
736 // splice isn't tight at this level, there must have been some inserts
738 // location that didn't update the splice. We might only be a little
740 // the splice is very stale it would be O(N) to fix it. We haven't used
742 // our budget of comparisons, so always move up even if we are
744 // our chances of success.
746 } else if (splice
->prev_
[recompute_height
] != head_
&&
747 !KeyIsAfterNode(key
, splice
->prev_
[recompute_height
])) {
748 // key is from before splice
749 if (allow_partial_splice_fix
) {
750 // skip all levels with the same node without more comparisons
751 Node
* bad
= splice
->prev_
[recompute_height
];
752 while (splice
->prev_
[recompute_height
] == bad
) {
756 // we're pessimistic, recompute everything
757 recompute_height
= max_height
;
759 } else if (KeyIsAfterNode(key
, splice
->next_
[recompute_height
])) {
760 // key is from after splice
761 if (allow_partial_splice_fix
) {
762 Node
* bad
= splice
->next_
[recompute_height
];
763 while (splice
->next_
[recompute_height
] == bad
) {
767 recompute_height
= max_height
;
770 // this level brackets the key, we won!
775 assert(recompute_height
<= max_height
);
776 if (recompute_height
> 0) {
777 RecomputeSpliceLevels(key
, splice
, recompute_height
);
780 bool splice_is_valid
= true;
782 for (int i
= 0; i
< height
; ++i
) {
784 assert(splice
->next_
[i
] == nullptr ||
785 compare_(x
->Key(), splice
->next_
[i
]->Key()) < 0);
786 assert(splice
->prev_
[i
] == head_
||
787 compare_(splice
->prev_
[i
]->Key(), x
->Key()) < 0);
788 x
->NoBarrier_SetNext(i
, splice
->next_
[i
]);
789 if (splice
->prev_
[i
]->CASNext(i
, splice
->next_
[i
], x
)) {
793 // CAS failed, we need to recompute prev and next. It is unlikely
794 // to be helpful to try to use a different level as we redo the
795 // search, because it should be unlikely that lots of nodes have
796 // been inserted between prev[i] and next[i]. No point in using
797 // next[i] as the after hint, because we know it is stale.
798 FindSpliceForLevel(key
, splice
->prev_
[i
], nullptr, i
, &splice
->prev_
[i
],
801 // Since we've narrowed the bracket for level i, we might have
802 // violated the Splice constraint between i and i-1. Make sure
803 // we recompute the whole thing next time.
805 splice_is_valid
= false;
810 for (int i
= 0; i
< height
; ++i
) {
811 if (i
>= recompute_height
&&
812 splice
->prev_
[i
]->Next(i
) != splice
->next_
[i
]) {
813 FindSpliceForLevel(key
, splice
->prev_
[i
], nullptr, i
, &splice
->prev_
[i
],
816 assert(splice
->next_
[i
] == nullptr ||
817 compare_(x
->Key(), splice
->next_
[i
]->Key()) < 0);
818 assert(splice
->prev_
[i
] == head_
||
819 compare_(splice
->prev_
[i
]->Key(), x
->Key()) < 0);
820 assert(splice
->prev_
[i
]->Next(i
) == splice
->next_
[i
]);
821 x
->NoBarrier_SetNext(i
, splice
->next_
[i
]);
822 splice
->prev_
[i
]->SetNext(i
, x
);
825 if (splice_is_valid
) {
826 for (int i
= 0; i
< height
; ++i
) {
827 splice
->prev_
[i
] = x
;
829 assert(splice
->prev_
[splice
->height_
] == head_
);
830 assert(splice
->next_
[splice
->height_
] == nullptr);
831 for (int i
= 0; i
< splice
->height_
; ++i
) {
832 assert(splice
->next_
[i
] == nullptr ||
833 compare_(key
, splice
->next_
[i
]->Key()) < 0);
834 assert(splice
->prev_
[i
] == head_
||
835 compare_(splice
->prev_
[i
]->Key(), key
) <= 0);
836 assert(splice
->prev_
[i
+ 1] == splice
->prev_
[i
] ||
837 splice
->prev_
[i
+ 1] == head_
||
838 compare_(splice
->prev_
[i
+ 1]->Key(), splice
->prev_
[i
]->Key()) <
840 assert(splice
->next_
[i
+ 1] == splice
->next_
[i
] ||
841 splice
->next_
[i
+ 1] == nullptr ||
842 compare_(splice
->next_
[i
]->Key(), splice
->next_
[i
+ 1]->Key()) <
850 template <class Comparator
>
851 bool InlineSkipList
<Comparator
>::Contains(const char* key
) const {
852 Node
* x
= FindGreaterOrEqual(key
);
853 if (x
!= nullptr && Equal(key
, x
->Key())) {
860 template <class Comparator
>
861 void InlineSkipList
<Comparator
>::TEST_Validate() const {
862 // Interate over all levels at the same time, and verify nodes appear in
863 // the right order, and nodes appear in upper level also appear in lower
865 Node
* nodes
[kMaxPossibleHeight
];
866 int max_height
= GetMaxHeight();
867 for (int i
= 0; i
< max_height
; i
++) {
870 while (nodes
[0] != nullptr) {
871 Node
* l0_next
= nodes
[0]->Next(0);
872 if (l0_next
== nullptr) {
875 assert(nodes
[0] == head_
|| compare_(nodes
[0]->Key(), l0_next
->Key()) < 0);
879 while (i
< max_height
) {
880 Node
* next
= nodes
[i
]->Next(i
);
881 if (next
== nullptr) {
884 auto cmp
= compare_(nodes
[0]->Key(), next
->Key());
887 assert(next
== nodes
[0]);
895 for (int i
= 1; i
< max_height
; i
++) {
896 assert(nodes
[i
]->Next(i
) == nullptr);
900 } // namespace rocksdb