1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "memtable/inlineskiplist.h"
12 #include <unordered_set>
13 #include "rocksdb/env.h"
14 #include "util/concurrent_arena.h"
15 #include "util/hash.h"
16 #include "util/random.h"
17 #include "util/testharness.h"
21 // Our test skip list stores 8-byte unsigned integers
24 static const char* Encode(const uint64_t* key
) {
25 return reinterpret_cast<const char*>(key
);
28 static Key
Decode(const char* key
) {
30 memcpy(&rv
, key
, sizeof(Key
));
34 struct TestComparator
{
35 int operator()(const char* a
, const char* b
) const {
36 if (Decode(a
) < Decode(b
)) {
38 } else if (Decode(a
) > Decode(b
)) {
46 typedef InlineSkipList
<TestComparator
> TestInlineSkipList
;
48 class InlineSkipTest
: public testing::Test
{
50 void Insert(TestInlineSkipList
* list
, Key key
) {
51 char* buf
= list
->AllocateKey(sizeof(Key
));
52 memcpy(buf
, &key
, sizeof(Key
));
57 void InsertWithHint(TestInlineSkipList
* list
, Key key
, void** hint
) {
58 char* buf
= list
->AllocateKey(sizeof(Key
));
59 memcpy(buf
, &key
, sizeof(Key
));
60 list
->InsertWithHint(buf
, hint
);
64 void Validate(TestInlineSkipList
* list
) {
66 for (Key key
: keys_
) {
67 ASSERT_TRUE(list
->Contains(Encode(&key
)));
69 // Iterate over the list, make sure keys appears in order and no extra
71 TestInlineSkipList::Iterator
iter(list
);
72 ASSERT_FALSE(iter
.Valid());
74 iter
.Seek(Encode(&zero
));
75 for (Key key
: keys_
) {
76 ASSERT_TRUE(iter
.Valid());
77 ASSERT_EQ(key
, Decode(iter
.key()));
80 ASSERT_FALSE(iter
.Valid());
81 // Validate the list is well-formed.
82 list
->TEST_Validate();
89 TEST_F(InlineSkipTest
, Empty
) {
92 InlineSkipList
<TestComparator
> list(cmp
, &arena
);
94 ASSERT_TRUE(!list
.Contains(Encode(&key
)));
96 InlineSkipList
<TestComparator
>::Iterator
iter(&list
);
97 ASSERT_TRUE(!iter
.Valid());
99 ASSERT_TRUE(!iter
.Valid());
101 iter
.Seek(Encode(&key
));
102 ASSERT_TRUE(!iter
.Valid());
103 iter
.SeekForPrev(Encode(&key
));
104 ASSERT_TRUE(!iter
.Valid());
106 ASSERT_TRUE(!iter
.Valid());
109 TEST_F(InlineSkipTest
, InsertAndLookup
) {
114 ConcurrentArena arena
;
116 InlineSkipList
<TestComparator
> list(cmp
, &arena
);
117 for (int i
= 0; i
< N
; i
++) {
118 Key key
= rnd
.Next() % R
;
119 if (keys
.insert(key
).second
) {
120 char* buf
= list
.AllocateKey(sizeof(Key
));
121 memcpy(buf
, &key
, sizeof(Key
));
126 for (Key i
= 0; i
< R
; i
++) {
127 if (list
.Contains(Encode(&i
))) {
128 ASSERT_EQ(keys
.count(i
), 1U);
130 ASSERT_EQ(keys
.count(i
), 0U);
134 // Simple iterator tests
136 InlineSkipList
<TestComparator
>::Iterator
iter(&list
);
137 ASSERT_TRUE(!iter
.Valid());
140 iter
.Seek(Encode(&zero
));
141 ASSERT_TRUE(iter
.Valid());
142 ASSERT_EQ(*(keys
.begin()), Decode(iter
.key()));
144 uint64_t max_key
= R
- 1;
145 iter
.SeekForPrev(Encode(&max_key
));
146 ASSERT_TRUE(iter
.Valid());
147 ASSERT_EQ(*(keys
.rbegin()), Decode(iter
.key()));
150 ASSERT_TRUE(iter
.Valid());
151 ASSERT_EQ(*(keys
.begin()), Decode(iter
.key()));
154 ASSERT_TRUE(iter
.Valid());
155 ASSERT_EQ(*(keys
.rbegin()), Decode(iter
.key()));
158 // Forward iteration test
159 for (Key i
= 0; i
< R
; i
++) {
160 InlineSkipList
<TestComparator
>::Iterator
iter(&list
);
161 iter
.Seek(Encode(&i
));
163 // Compare against model iterator
164 std::set
<Key
>::iterator model_iter
= keys
.lower_bound(i
);
165 for (int j
= 0; j
< 3; j
++) {
166 if (model_iter
== keys
.end()) {
167 ASSERT_TRUE(!iter
.Valid());
170 ASSERT_TRUE(iter
.Valid());
171 ASSERT_EQ(*model_iter
, Decode(iter
.key()));
178 // Backward iteration test
179 for (Key i
= 0; i
< R
; i
++) {
180 InlineSkipList
<TestComparator
>::Iterator
iter(&list
);
181 iter
.SeekForPrev(Encode(&i
));
183 // Compare against model iterator
184 std::set
<Key
>::iterator model_iter
= keys
.upper_bound(i
);
185 for (int j
= 0; j
< 3; j
++) {
186 if (model_iter
== keys
.begin()) {
187 ASSERT_TRUE(!iter
.Valid());
190 ASSERT_TRUE(iter
.Valid());
191 ASSERT_EQ(*--model_iter
, Decode(iter
.key()));
198 TEST_F(InlineSkipTest
, InsertWithHint_Sequential
) {
199 const int N
= 100000;
202 TestInlineSkipList
list(cmp
, &arena
);
203 void* hint
= nullptr;
204 for (int i
= 0; i
< N
; i
++) {
206 InsertWithHint(&list
, key
, &hint
);
211 TEST_F(InlineSkipTest
, InsertWithHint_MultipleHints
) {
212 const int N
= 100000;
217 TestInlineSkipList
list(cmp
, &arena
);
220 for (int i
= 0; i
< S
; i
++) {
224 for (int i
= 0; i
< N
; i
++) {
225 Key s
= rnd
.Uniform(S
);
226 Key key
= (s
<< 32) + (++last_key
[s
]);
227 InsertWithHint(&list
, key
, &hints
[s
]);
232 TEST_F(InlineSkipTest
, InsertWithHint_MultipleHintsRandom
) {
233 const int N
= 100000;
238 TestInlineSkipList
list(cmp
, &arena
);
240 for (int i
= 0; i
< S
; i
++) {
243 for (int i
= 0; i
< N
; i
++) {
244 Key s
= rnd
.Uniform(S
);
245 Key key
= (s
<< 32) + rnd
.Next();
246 InsertWithHint(&list
, key
, &hints
[s
]);
251 TEST_F(InlineSkipTest
, InsertWithHint_CompatibleWithInsertWithoutHint
) {
252 const int N
= 100000;
258 TestInlineSkipList
list(cmp
, &arena
);
259 std::unordered_set
<Key
> used
;
261 Key without_hint
[S2
];
263 for (int i
= 0; i
< S1
; i
++) {
267 if (used
.insert(s
).second
) {
273 for (int i
= 0; i
< S2
; i
++) {
276 if (used
.insert(s
).second
) {
282 for (int i
= 0; i
< N
; i
++) {
283 Key s
= rnd
.Uniform(S1
+ S2
);
285 Key key
= (with_hint
[s
] << 32) + rnd
.Next();
286 InsertWithHint(&list
, key
, &hints
[s
]);
288 Key key
= (without_hint
[s
- S1
] << 32) + rnd
.Next();
295 // We want to make sure that with a single writer and multiple
296 // concurrent readers (with no synchronization other than when a
297 // reader's iterator is created), the reader always observes all the
298 // data that was present in the skip list when the iterator was
299 // constructor. Because insertions are happening concurrently, we may
300 // also observe new values that were inserted since the iterator was
301 // constructed, but we should never miss any values that were present
302 // at iterator construction time.
304 // We generate multi-part keys:
307 // key is in range [0..K-1]
308 // gen is a generation number for key
309 // hash is hash(key,gen)
311 // The insertion code picks a random key, sets gen to be 1 + the last
312 // generation number inserted for that key, and sets hash to Hash(key,gen).
314 // At the beginning of a read, we snapshot the last inserted
315 // generation number for each key. We then iterate, including random
316 // calls to Next() and Seek(). For every key we encounter, we
317 // check that it is either expected given the initial snapshot or has
318 // been concurrently added since the iterator started.
319 class ConcurrentTest
{
321 static const uint32_t K
= 8;
324 static uint64_t key(Key key
) { return (key
>> 40); }
325 static uint64_t gen(Key key
) { return (key
>> 8) & 0xffffffffu
; }
326 static uint64_t hash(Key key
) { return key
& 0xff; }
328 static uint64_t HashNumbers(uint64_t k
, uint64_t g
) {
329 uint64_t data
[2] = {k
, g
};
330 return Hash(reinterpret_cast<char*>(data
), sizeof(data
), 0);
333 static Key
MakeKey(uint64_t k
, uint64_t g
) {
334 assert(sizeof(Key
) == sizeof(uint64_t));
335 assert(k
<= K
); // We sometimes pass K to seek to the end of the skiplist
336 assert(g
<= 0xffffffffu
);
337 return ((k
<< 40) | (g
<< 8) | (HashNumbers(k
, g
) & 0xff));
340 static bool IsValidKey(Key k
) {
341 return hash(k
) == (HashNumbers(key(k
), gen(k
)) & 0xff);
344 static Key
RandomTarget(Random
* rnd
) {
345 switch (rnd
->Next() % 10) {
348 return MakeKey(0, 0);
351 return MakeKey(K
, 0);
354 return MakeKey(rnd
->Next() % K
, 0);
358 // Per-key generation
360 std::atomic
<int> generation
[K
];
361 void Set(int k
, int v
) {
362 generation
[k
].store(v
, std::memory_order_release
);
364 int Get(int k
) { return generation
[k
].load(std::memory_order_acquire
); }
367 for (unsigned int k
= 0; k
< K
; k
++) {
373 // Current state of the test
376 ConcurrentArena arena_
;
378 // InlineSkipList is not protected by mu_. We just use a single writer
379 // thread to modify it.
380 InlineSkipList
<TestComparator
> list_
;
383 ConcurrentTest() : list_(TestComparator(), &arena_
) {}
385 // REQUIRES: No concurrent calls to WriteStep or ConcurrentWriteStep
386 void WriteStep(Random
* rnd
) {
387 const uint32_t k
= rnd
->Next() % K
;
388 const int g
= current_
.Get(k
) + 1;
389 const Key new_key
= MakeKey(k
, g
);
390 char* buf
= list_
.AllocateKey(sizeof(Key
));
391 memcpy(buf
, &new_key
, sizeof(Key
));
396 // REQUIRES: No concurrent calls for the same k
397 void ConcurrentWriteStep(uint32_t k
) {
398 const int g
= current_
.Get(k
) + 1;
399 const Key new_key
= MakeKey(k
, g
);
400 char* buf
= list_
.AllocateKey(sizeof(Key
));
401 memcpy(buf
, &new_key
, sizeof(Key
));
402 list_
.InsertConcurrently(buf
);
403 ASSERT_EQ(g
, current_
.Get(k
) + 1);
407 void ReadStep(Random
* rnd
) {
408 // Remember the initial committed state of the skiplist.
410 for (unsigned int k
= 0; k
< K
; k
++) {
411 initial_state
.Set(k
, current_
.Get(k
));
414 Key pos
= RandomTarget(rnd
);
415 InlineSkipList
<TestComparator
>::Iterator
iter(&list_
);
416 iter
.Seek(Encode(&pos
));
420 current
= MakeKey(K
, 0);
422 current
= Decode(iter
.key());
423 ASSERT_TRUE(IsValidKey(current
)) << current
;
425 ASSERT_LE(pos
, current
) << "should not go backwards";
427 // Verify that everything in [pos,current) was not present in
429 while (pos
< current
) {
430 ASSERT_LT(key(pos
), K
) << pos
;
432 // Note that generation 0 is never inserted, so it is ok if
433 // <*,0,*> is missing.
434 ASSERT_TRUE((gen(pos
) == 0U) ||
435 (gen(pos
) > static_cast<uint64_t>(initial_state
.Get(
436 static_cast<int>(key(pos
))))))
437 << "key: " << key(pos
) << "; gen: " << gen(pos
)
438 << "; initgen: " << initial_state
.Get(static_cast<int>(key(pos
)));
440 // Advance to next key in the valid key space
441 if (key(pos
) < key(current
)) {
442 pos
= MakeKey(key(pos
) + 1, 0);
444 pos
= MakeKey(key(pos
), gen(pos
) + 1);
452 if (rnd
->Next() % 2) {
454 pos
= MakeKey(key(pos
), gen(pos
) + 1);
456 Key new_target
= RandomTarget(rnd
);
457 if (new_target
> pos
) {
459 iter
.Seek(Encode(&new_target
));
465 const uint32_t ConcurrentTest::K
;
467 // Simple test that does single-threaded testing of the ConcurrentTest
469 TEST_F(InlineSkipTest
, ConcurrentReadWithoutThreads
) {
471 Random
rnd(test::RandomSeed());
472 for (int i
= 0; i
< 10000; i
++) {
474 test
.WriteStep(&rnd
);
478 TEST_F(InlineSkipTest
, ConcurrentInsertWithoutThreads
) {
480 Random
rnd(test::RandomSeed());
481 for (int i
= 0; i
< 10000; i
++) {
483 uint32_t base
= rnd
.Next();
484 for (int j
= 0; j
< 4; ++j
) {
485 test
.ConcurrentWriteStep((base
+ j
) % ConcurrentTest::K
);
494 std::atomic
<bool> quit_flag_
;
495 std::atomic
<uint32_t> next_writer_
;
497 enum ReaderState
{ STARTING
, RUNNING
, DONE
};
499 explicit TestState(int s
)
506 void Wait(ReaderState s
) {
508 while (state_
!= s
) {
514 void Change(ReaderState s
) {
521 void AdjustPendingWriters(int delta
) {
523 pending_writers_
+= delta
;
524 if (pending_writers_
== 0) {
530 void WaitForPendingWriters() {
532 while (pending_writers_
!= 0) {
541 int pending_writers_
;
542 port::CondVar state_cv_
;
545 static void ConcurrentReader(void* arg
) {
546 TestState
* state
= reinterpret_cast<TestState
*>(arg
);
547 Random
rnd(state
->seed_
);
549 state
->Change(TestState::RUNNING
);
550 while (!state
->quit_flag_
.load(std::memory_order_acquire
)) {
551 state
->t_
.ReadStep(&rnd
);
554 state
->Change(TestState::DONE
);
557 static void ConcurrentWriter(void* arg
) {
558 TestState
* state
= reinterpret_cast<TestState
*>(arg
);
559 uint32_t k
= state
->next_writer_
++ % ConcurrentTest::K
;
560 state
->t_
.ConcurrentWriteStep(k
);
561 state
->AdjustPendingWriters(-1);
564 static void RunConcurrentRead(int run
) {
565 const int seed
= test::RandomSeed() + (run
* 100);
568 const int kSize
= 1000;
569 for (int i
= 0; i
< N
; i
++) {
570 if ((i
% 100) == 0) {
571 fprintf(stderr
, "Run %d of %d\n", i
, N
);
573 TestState
state(seed
+ 1);
574 Env::Default()->Schedule(ConcurrentReader
, &state
);
575 state
.Wait(TestState::RUNNING
);
576 for (int k
= 0; k
< kSize
; ++k
) {
577 state
.t_
.WriteStep(&rnd
);
579 state
.quit_flag_
.store(true, std::memory_order_release
);
580 state
.Wait(TestState::DONE
);
584 static void RunConcurrentInsert(int run
, int write_parallelism
= 4) {
585 Env::Default()->SetBackgroundThreads(1 + write_parallelism
,
587 const int seed
= test::RandomSeed() + (run
* 100);
590 const int kSize
= 1000;
591 for (int i
= 0; i
< N
; i
++) {
592 if ((i
% 100) == 0) {
593 fprintf(stderr
, "Run %d of %d\n", i
, N
);
595 TestState
state(seed
+ 1);
596 Env::Default()->Schedule(ConcurrentReader
, &state
);
597 state
.Wait(TestState::RUNNING
);
598 for (int k
= 0; k
< kSize
; k
+= write_parallelism
) {
599 state
.next_writer_
= rnd
.Next();
600 state
.AdjustPendingWriters(write_parallelism
);
601 for (int p
= 0; p
< write_parallelism
; ++p
) {
602 Env::Default()->Schedule(ConcurrentWriter
, &state
);
604 state
.WaitForPendingWriters();
606 state
.quit_flag_
.store(true, std::memory_order_release
);
607 state
.Wait(TestState::DONE
);
611 TEST_F(InlineSkipTest
, ConcurrentRead1
) { RunConcurrentRead(1); }
612 TEST_F(InlineSkipTest
, ConcurrentRead2
) { RunConcurrentRead(2); }
613 TEST_F(InlineSkipTest
, ConcurrentRead3
) { RunConcurrentRead(3); }
614 TEST_F(InlineSkipTest
, ConcurrentRead4
) { RunConcurrentRead(4); }
615 TEST_F(InlineSkipTest
, ConcurrentRead5
) { RunConcurrentRead(5); }
616 TEST_F(InlineSkipTest
, ConcurrentInsert1
) { RunConcurrentInsert(1); }
617 TEST_F(InlineSkipTest
, ConcurrentInsert2
) { RunConcurrentInsert(2); }
618 TEST_F(InlineSkipTest
, ConcurrentInsert3
) { RunConcurrentInsert(3); }
620 } // namespace rocksdb
622 int main(int argc
, char** argv
) {
623 ::testing::InitGoogleTest(&argc
, argv
);
624 return RUN_ALL_TESTS();