]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/memtable/inlineskiplist_test.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / memtable / inlineskiplist_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "memtable/inlineskiplist.h"
11 #include <set>
12 #include <unordered_set>
13 #include "rocksdb/env.h"
14 #include "util/concurrent_arena.h"
15 #include "util/hash.h"
16 #include "util/random.h"
17 #include "util/testharness.h"
18
19 namespace rocksdb {
20
21 // Our test skip list stores 8-byte unsigned integers
22 typedef uint64_t Key;
23
24 static const char* Encode(const uint64_t* key) {
25 return reinterpret_cast<const char*>(key);
26 }
27
28 static Key Decode(const char* key) {
29 Key rv;
30 memcpy(&rv, key, sizeof(Key));
31 return rv;
32 }
33
34 struct TestComparator {
35 int operator()(const char* a, const char* b) const {
36 if (Decode(a) < Decode(b)) {
37 return -1;
38 } else if (Decode(a) > Decode(b)) {
39 return +1;
40 } else {
41 return 0;
42 }
43 }
44 };
45
46 typedef InlineSkipList<TestComparator> TestInlineSkipList;
47
48 class InlineSkipTest : public testing::Test {
49 public:
50 void Insert(TestInlineSkipList* list, Key key) {
51 char* buf = list->AllocateKey(sizeof(Key));
52 memcpy(buf, &key, sizeof(Key));
53 list->Insert(buf);
54 keys_.insert(key);
55 }
56
57 void InsertWithHint(TestInlineSkipList* list, Key key, void** hint) {
58 char* buf = list->AllocateKey(sizeof(Key));
59 memcpy(buf, &key, sizeof(Key));
60 list->InsertWithHint(buf, hint);
61 keys_.insert(key);
62 }
63
64 void Validate(TestInlineSkipList* list) {
65 // Check keys exist.
66 for (Key key : keys_) {
67 ASSERT_TRUE(list->Contains(Encode(&key)));
68 }
69 // Iterate over the list, make sure keys appears in order and no extra
70 // keys exist.
71 TestInlineSkipList::Iterator iter(list);
72 ASSERT_FALSE(iter.Valid());
73 Key zero = 0;
74 iter.Seek(Encode(&zero));
75 for (Key key : keys_) {
76 ASSERT_TRUE(iter.Valid());
77 ASSERT_EQ(key, Decode(iter.key()));
78 iter.Next();
79 }
80 ASSERT_FALSE(iter.Valid());
81 // Validate the list is well-formed.
82 list->TEST_Validate();
83 }
84
85 private:
86 std::set<Key> keys_;
87 };
88
89 TEST_F(InlineSkipTest, Empty) {
90 Arena arena;
91 TestComparator cmp;
92 InlineSkipList<TestComparator> list(cmp, &arena);
93 Key key = 10;
94 ASSERT_TRUE(!list.Contains(Encode(&key)));
95
96 InlineSkipList<TestComparator>::Iterator iter(&list);
97 ASSERT_TRUE(!iter.Valid());
98 iter.SeekToFirst();
99 ASSERT_TRUE(!iter.Valid());
100 key = 100;
101 iter.Seek(Encode(&key));
102 ASSERT_TRUE(!iter.Valid());
103 iter.SeekForPrev(Encode(&key));
104 ASSERT_TRUE(!iter.Valid());
105 iter.SeekToLast();
106 ASSERT_TRUE(!iter.Valid());
107 }
108
109 TEST_F(InlineSkipTest, InsertAndLookup) {
110 const int N = 2000;
111 const int R = 5000;
112 Random rnd(1000);
113 std::set<Key> keys;
114 ConcurrentArena arena;
115 TestComparator cmp;
116 InlineSkipList<TestComparator> list(cmp, &arena);
117 for (int i = 0; i < N; i++) {
118 Key key = rnd.Next() % R;
119 if (keys.insert(key).second) {
120 char* buf = list.AllocateKey(sizeof(Key));
121 memcpy(buf, &key, sizeof(Key));
122 list.Insert(buf);
123 }
124 }
125
126 for (Key i = 0; i < R; i++) {
127 if (list.Contains(Encode(&i))) {
128 ASSERT_EQ(keys.count(i), 1U);
129 } else {
130 ASSERT_EQ(keys.count(i), 0U);
131 }
132 }
133
134 // Simple iterator tests
135 {
136 InlineSkipList<TestComparator>::Iterator iter(&list);
137 ASSERT_TRUE(!iter.Valid());
138
139 uint64_t zero = 0;
140 iter.Seek(Encode(&zero));
141 ASSERT_TRUE(iter.Valid());
142 ASSERT_EQ(*(keys.begin()), Decode(iter.key()));
143
144 uint64_t max_key = R - 1;
145 iter.SeekForPrev(Encode(&max_key));
146 ASSERT_TRUE(iter.Valid());
147 ASSERT_EQ(*(keys.rbegin()), Decode(iter.key()));
148
149 iter.SeekToFirst();
150 ASSERT_TRUE(iter.Valid());
151 ASSERT_EQ(*(keys.begin()), Decode(iter.key()));
152
153 iter.SeekToLast();
154 ASSERT_TRUE(iter.Valid());
155 ASSERT_EQ(*(keys.rbegin()), Decode(iter.key()));
156 }
157
158 // Forward iteration test
159 for (Key i = 0; i < R; i++) {
160 InlineSkipList<TestComparator>::Iterator iter(&list);
161 iter.Seek(Encode(&i));
162
163 // Compare against model iterator
164 std::set<Key>::iterator model_iter = keys.lower_bound(i);
165 for (int j = 0; j < 3; j++) {
166 if (model_iter == keys.end()) {
167 ASSERT_TRUE(!iter.Valid());
168 break;
169 } else {
170 ASSERT_TRUE(iter.Valid());
171 ASSERT_EQ(*model_iter, Decode(iter.key()));
172 ++model_iter;
173 iter.Next();
174 }
175 }
176 }
177
178 // Backward iteration test
179 for (Key i = 0; i < R; i++) {
180 InlineSkipList<TestComparator>::Iterator iter(&list);
181 iter.SeekForPrev(Encode(&i));
182
183 // Compare against model iterator
184 std::set<Key>::iterator model_iter = keys.upper_bound(i);
185 for (int j = 0; j < 3; j++) {
186 if (model_iter == keys.begin()) {
187 ASSERT_TRUE(!iter.Valid());
188 break;
189 } else {
190 ASSERT_TRUE(iter.Valid());
191 ASSERT_EQ(*--model_iter, Decode(iter.key()));
192 iter.Prev();
193 }
194 }
195 }
196 }
197
198 TEST_F(InlineSkipTest, InsertWithHint_Sequential) {
199 const int N = 100000;
200 Arena arena;
201 TestComparator cmp;
202 TestInlineSkipList list(cmp, &arena);
203 void* hint = nullptr;
204 for (int i = 0; i < N; i++) {
205 Key key = i;
206 InsertWithHint(&list, key, &hint);
207 }
208 Validate(&list);
209 }
210
211 TEST_F(InlineSkipTest, InsertWithHint_MultipleHints) {
212 const int N = 100000;
213 const int S = 100;
214 Random rnd(534);
215 Arena arena;
216 TestComparator cmp;
217 TestInlineSkipList list(cmp, &arena);
218 void* hints[S];
219 Key last_key[S];
220 for (int i = 0; i < S; i++) {
221 hints[i] = nullptr;
222 last_key[i] = 0;
223 }
224 for (int i = 0; i < N; i++) {
225 Key s = rnd.Uniform(S);
226 Key key = (s << 32) + (++last_key[s]);
227 InsertWithHint(&list, key, &hints[s]);
228 }
229 Validate(&list);
230 }
231
232 TEST_F(InlineSkipTest, InsertWithHint_MultipleHintsRandom) {
233 const int N = 100000;
234 const int S = 100;
235 Random rnd(534);
236 Arena arena;
237 TestComparator cmp;
238 TestInlineSkipList list(cmp, &arena);
239 void* hints[S];
240 for (int i = 0; i < S; i++) {
241 hints[i] = nullptr;
242 }
243 for (int i = 0; i < N; i++) {
244 Key s = rnd.Uniform(S);
245 Key key = (s << 32) + rnd.Next();
246 InsertWithHint(&list, key, &hints[s]);
247 }
248 Validate(&list);
249 }
250
251 TEST_F(InlineSkipTest, InsertWithHint_CompatibleWithInsertWithoutHint) {
252 const int N = 100000;
253 const int S1 = 100;
254 const int S2 = 100;
255 Random rnd(534);
256 Arena arena;
257 TestComparator cmp;
258 TestInlineSkipList list(cmp, &arena);
259 std::unordered_set<Key> used;
260 Key with_hint[S1];
261 Key without_hint[S2];
262 void* hints[S1];
263 for (int i = 0; i < S1; i++) {
264 hints[i] = nullptr;
265 while (true) {
266 Key s = rnd.Next();
267 if (used.insert(s).second) {
268 with_hint[i] = s;
269 break;
270 }
271 }
272 }
273 for (int i = 0; i < S2; i++) {
274 while (true) {
275 Key s = rnd.Next();
276 if (used.insert(s).second) {
277 without_hint[i] = s;
278 break;
279 }
280 }
281 }
282 for (int i = 0; i < N; i++) {
283 Key s = rnd.Uniform(S1 + S2);
284 if (s < S1) {
285 Key key = (with_hint[s] << 32) + rnd.Next();
286 InsertWithHint(&list, key, &hints[s]);
287 } else {
288 Key key = (without_hint[s - S1] << 32) + rnd.Next();
289 Insert(&list, key);
290 }
291 }
292 Validate(&list);
293 }
294
295 // We want to make sure that with a single writer and multiple
296 // concurrent readers (with no synchronization other than when a
297 // reader's iterator is created), the reader always observes all the
298 // data that was present in the skip list when the iterator was
299 // constructor. Because insertions are happening concurrently, we may
300 // also observe new values that were inserted since the iterator was
301 // constructed, but we should never miss any values that were present
302 // at iterator construction time.
303 //
304 // We generate multi-part keys:
305 // <key,gen,hash>
306 // where:
307 // key is in range [0..K-1]
308 // gen is a generation number for key
309 // hash is hash(key,gen)
310 //
311 // The insertion code picks a random key, sets gen to be 1 + the last
312 // generation number inserted for that key, and sets hash to Hash(key,gen).
313 //
314 // At the beginning of a read, we snapshot the last inserted
315 // generation number for each key. We then iterate, including random
316 // calls to Next() and Seek(). For every key we encounter, we
317 // check that it is either expected given the initial snapshot or has
318 // been concurrently added since the iterator started.
319 class ConcurrentTest {
320 public:
321 static const uint32_t K = 8;
322
323 private:
324 static uint64_t key(Key key) { return (key >> 40); }
325 static uint64_t gen(Key key) { return (key >> 8) & 0xffffffffu; }
326 static uint64_t hash(Key key) { return key & 0xff; }
327
328 static uint64_t HashNumbers(uint64_t k, uint64_t g) {
329 uint64_t data[2] = {k, g};
330 return Hash(reinterpret_cast<char*>(data), sizeof(data), 0);
331 }
332
333 static Key MakeKey(uint64_t k, uint64_t g) {
334 assert(sizeof(Key) == sizeof(uint64_t));
335 assert(k <= K); // We sometimes pass K to seek to the end of the skiplist
336 assert(g <= 0xffffffffu);
337 return ((k << 40) | (g << 8) | (HashNumbers(k, g) & 0xff));
338 }
339
340 static bool IsValidKey(Key k) {
341 return hash(k) == (HashNumbers(key(k), gen(k)) & 0xff);
342 }
343
344 static Key RandomTarget(Random* rnd) {
345 switch (rnd->Next() % 10) {
346 case 0:
347 // Seek to beginning
348 return MakeKey(0, 0);
349 case 1:
350 // Seek to end
351 return MakeKey(K, 0);
352 default:
353 // Seek to middle
354 return MakeKey(rnd->Next() % K, 0);
355 }
356 }
357
358 // Per-key generation
359 struct State {
360 std::atomic<int> generation[K];
361 void Set(int k, int v) {
362 generation[k].store(v, std::memory_order_release);
363 }
364 int Get(int k) { return generation[k].load(std::memory_order_acquire); }
365
366 State() {
367 for (unsigned int k = 0; k < K; k++) {
368 Set(k, 0);
369 }
370 }
371 };
372
373 // Current state of the test
374 State current_;
375
376 ConcurrentArena arena_;
377
378 // InlineSkipList is not protected by mu_. We just use a single writer
379 // thread to modify it.
380 InlineSkipList<TestComparator> list_;
381
382 public:
383 ConcurrentTest() : list_(TestComparator(), &arena_) {}
384
385 // REQUIRES: No concurrent calls to WriteStep or ConcurrentWriteStep
386 void WriteStep(Random* rnd) {
387 const uint32_t k = rnd->Next() % K;
388 const int g = current_.Get(k) + 1;
389 const Key new_key = MakeKey(k, g);
390 char* buf = list_.AllocateKey(sizeof(Key));
391 memcpy(buf, &new_key, sizeof(Key));
392 list_.Insert(buf);
393 current_.Set(k, g);
394 }
395
396 // REQUIRES: No concurrent calls for the same k
397 void ConcurrentWriteStep(uint32_t k) {
398 const int g = current_.Get(k) + 1;
399 const Key new_key = MakeKey(k, g);
400 char* buf = list_.AllocateKey(sizeof(Key));
401 memcpy(buf, &new_key, sizeof(Key));
402 list_.InsertConcurrently(buf);
403 ASSERT_EQ(g, current_.Get(k) + 1);
404 current_.Set(k, g);
405 }
406
407 void ReadStep(Random* rnd) {
408 // Remember the initial committed state of the skiplist.
409 State initial_state;
410 for (unsigned int k = 0; k < K; k++) {
411 initial_state.Set(k, current_.Get(k));
412 }
413
414 Key pos = RandomTarget(rnd);
415 InlineSkipList<TestComparator>::Iterator iter(&list_);
416 iter.Seek(Encode(&pos));
417 while (true) {
418 Key current;
419 if (!iter.Valid()) {
420 current = MakeKey(K, 0);
421 } else {
422 current = Decode(iter.key());
423 ASSERT_TRUE(IsValidKey(current)) << current;
424 }
425 ASSERT_LE(pos, current) << "should not go backwards";
426
427 // Verify that everything in [pos,current) was not present in
428 // initial_state.
429 while (pos < current) {
430 ASSERT_LT(key(pos), K) << pos;
431
432 // Note that generation 0 is never inserted, so it is ok if
433 // <*,0,*> is missing.
434 ASSERT_TRUE((gen(pos) == 0U) ||
435 (gen(pos) > static_cast<uint64_t>(initial_state.Get(
436 static_cast<int>(key(pos))))))
437 << "key: " << key(pos) << "; gen: " << gen(pos)
438 << "; initgen: " << initial_state.Get(static_cast<int>(key(pos)));
439
440 // Advance to next key in the valid key space
441 if (key(pos) < key(current)) {
442 pos = MakeKey(key(pos) + 1, 0);
443 } else {
444 pos = MakeKey(key(pos), gen(pos) + 1);
445 }
446 }
447
448 if (!iter.Valid()) {
449 break;
450 }
451
452 if (rnd->Next() % 2) {
453 iter.Next();
454 pos = MakeKey(key(pos), gen(pos) + 1);
455 } else {
456 Key new_target = RandomTarget(rnd);
457 if (new_target > pos) {
458 pos = new_target;
459 iter.Seek(Encode(&new_target));
460 }
461 }
462 }
463 }
464 };
465 const uint32_t ConcurrentTest::K;
466
467 // Simple test that does single-threaded testing of the ConcurrentTest
468 // scaffolding.
469 TEST_F(InlineSkipTest, ConcurrentReadWithoutThreads) {
470 ConcurrentTest test;
471 Random rnd(test::RandomSeed());
472 for (int i = 0; i < 10000; i++) {
473 test.ReadStep(&rnd);
474 test.WriteStep(&rnd);
475 }
476 }
477
478 TEST_F(InlineSkipTest, ConcurrentInsertWithoutThreads) {
479 ConcurrentTest test;
480 Random rnd(test::RandomSeed());
481 for (int i = 0; i < 10000; i++) {
482 test.ReadStep(&rnd);
483 uint32_t base = rnd.Next();
484 for (int j = 0; j < 4; ++j) {
485 test.ConcurrentWriteStep((base + j) % ConcurrentTest::K);
486 }
487 }
488 }
489
490 class TestState {
491 public:
492 ConcurrentTest t_;
493 int seed_;
494 std::atomic<bool> quit_flag_;
495 std::atomic<uint32_t> next_writer_;
496
497 enum ReaderState { STARTING, RUNNING, DONE };
498
499 explicit TestState(int s)
500 : seed_(s),
501 quit_flag_(false),
502 state_(STARTING),
503 pending_writers_(0),
504 state_cv_(&mu_) {}
505
506 void Wait(ReaderState s) {
507 mu_.Lock();
508 while (state_ != s) {
509 state_cv_.Wait();
510 }
511 mu_.Unlock();
512 }
513
514 void Change(ReaderState s) {
515 mu_.Lock();
516 state_ = s;
517 state_cv_.Signal();
518 mu_.Unlock();
519 }
520
521 void AdjustPendingWriters(int delta) {
522 mu_.Lock();
523 pending_writers_ += delta;
524 if (pending_writers_ == 0) {
525 state_cv_.Signal();
526 }
527 mu_.Unlock();
528 }
529
530 void WaitForPendingWriters() {
531 mu_.Lock();
532 while (pending_writers_ != 0) {
533 state_cv_.Wait();
534 }
535 mu_.Unlock();
536 }
537
538 private:
539 port::Mutex mu_;
540 ReaderState state_;
541 int pending_writers_;
542 port::CondVar state_cv_;
543 };
544
545 static void ConcurrentReader(void* arg) {
546 TestState* state = reinterpret_cast<TestState*>(arg);
547 Random rnd(state->seed_);
548 int64_t reads = 0;
549 state->Change(TestState::RUNNING);
550 while (!state->quit_flag_.load(std::memory_order_acquire)) {
551 state->t_.ReadStep(&rnd);
552 ++reads;
553 }
554 state->Change(TestState::DONE);
555 }
556
557 static void ConcurrentWriter(void* arg) {
558 TestState* state = reinterpret_cast<TestState*>(arg);
559 uint32_t k = state->next_writer_++ % ConcurrentTest::K;
560 state->t_.ConcurrentWriteStep(k);
561 state->AdjustPendingWriters(-1);
562 }
563
564 static void RunConcurrentRead(int run) {
565 const int seed = test::RandomSeed() + (run * 100);
566 Random rnd(seed);
567 const int N = 1000;
568 const int kSize = 1000;
569 for (int i = 0; i < N; i++) {
570 if ((i % 100) == 0) {
571 fprintf(stderr, "Run %d of %d\n", i, N);
572 }
573 TestState state(seed + 1);
574 Env::Default()->Schedule(ConcurrentReader, &state);
575 state.Wait(TestState::RUNNING);
576 for (int k = 0; k < kSize; ++k) {
577 state.t_.WriteStep(&rnd);
578 }
579 state.quit_flag_.store(true, std::memory_order_release);
580 state.Wait(TestState::DONE);
581 }
582 }
583
584 static void RunConcurrentInsert(int run, int write_parallelism = 4) {
585 Env::Default()->SetBackgroundThreads(1 + write_parallelism,
586 Env::Priority::LOW);
587 const int seed = test::RandomSeed() + (run * 100);
588 Random rnd(seed);
589 const int N = 1000;
590 const int kSize = 1000;
591 for (int i = 0; i < N; i++) {
592 if ((i % 100) == 0) {
593 fprintf(stderr, "Run %d of %d\n", i, N);
594 }
595 TestState state(seed + 1);
596 Env::Default()->Schedule(ConcurrentReader, &state);
597 state.Wait(TestState::RUNNING);
598 for (int k = 0; k < kSize; k += write_parallelism) {
599 state.next_writer_ = rnd.Next();
600 state.AdjustPendingWriters(write_parallelism);
601 for (int p = 0; p < write_parallelism; ++p) {
602 Env::Default()->Schedule(ConcurrentWriter, &state);
603 }
604 state.WaitForPendingWriters();
605 }
606 state.quit_flag_.store(true, std::memory_order_release);
607 state.Wait(TestState::DONE);
608 }
609 }
610
611 TEST_F(InlineSkipTest, ConcurrentRead1) { RunConcurrentRead(1); }
612 TEST_F(InlineSkipTest, ConcurrentRead2) { RunConcurrentRead(2); }
613 TEST_F(InlineSkipTest, ConcurrentRead3) { RunConcurrentRead(3); }
614 TEST_F(InlineSkipTest, ConcurrentRead4) { RunConcurrentRead(4); }
615 TEST_F(InlineSkipTest, ConcurrentRead5) { RunConcurrentRead(5); }
616 TEST_F(InlineSkipTest, ConcurrentInsert1) { RunConcurrentInsert(1); }
617 TEST_F(InlineSkipTest, ConcurrentInsert2) { RunConcurrentInsert(2); }
618 TEST_F(InlineSkipTest, ConcurrentInsert3) { RunConcurrentInsert(3); }
619
620 } // namespace rocksdb
621
622 int main(int argc, char** argv) {
623 ::testing::InitGoogleTest(&argc, argv);
624 return RUN_ALL_TESTS();
625 }