]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/db_tailing_iter_test.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / db_tailing_iter_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 // Introduction of SyncPoint effectively disabled building and running this test
11 // in Release build.
12 // which is a pity, it is a good test
13 #if !defined(ROCKSDB_LITE)
14
15 #include "db/db_test_util.h"
16 #include "db/forward_iterator.h"
17 #include "port/stack_trace.h"
18
19 namespace ROCKSDB_NAMESPACE {
20
21 class DBTestTailingIterator : public DBTestBase {
22 public:
23 DBTestTailingIterator() : DBTestBase("/db_tailing_iterator_test") {}
24 };
25
26 TEST_F(DBTestTailingIterator, TailingIteratorSingle) {
27 ReadOptions read_options;
28 read_options.tailing = true;
29
30 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
31 iter->SeekToFirst();
32 ASSERT_TRUE(!iter->Valid());
33
34 // add a record and check that iter can see it
35 ASSERT_OK(db_->Put(WriteOptions(), "mirko", "fodor"));
36 iter->SeekToFirst();
37 ASSERT_TRUE(iter->Valid());
38 ASSERT_EQ(iter->key().ToString(), "mirko");
39
40 iter->Next();
41 ASSERT_TRUE(!iter->Valid());
42 }
43
44 TEST_F(DBTestTailingIterator, TailingIteratorKeepAdding) {
45 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
46 ReadOptions read_options;
47 read_options.tailing = true;
48
49 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
50 std::string value(1024, 'a');
51
52 const int num_records = 10000;
53 for (int i = 0; i < num_records; ++i) {
54 char buf[32];
55 snprintf(buf, sizeof(buf), "%016d", i);
56
57 Slice key(buf, 16);
58 ASSERT_OK(Put(1, key, value));
59
60 iter->Seek(key);
61 ASSERT_TRUE(iter->Valid());
62 ASSERT_EQ(iter->key().compare(key), 0);
63 }
64 }
65
66 TEST_F(DBTestTailingIterator, TailingIteratorSeekToNext) {
67 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
68 ReadOptions read_options;
69 read_options.tailing = true;
70
71 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
72 std::unique_ptr<Iterator> itern(db_->NewIterator(read_options, handles_[1]));
73 std::string value(1024, 'a');
74
75 const int num_records = 1000;
76 for (int i = 1; i < num_records; ++i) {
77 char buf1[32];
78 char buf2[32];
79 snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
80
81 Slice key(buf1, 20);
82 ASSERT_OK(Put(1, key, value));
83
84 if (i % 100 == 99) {
85 ASSERT_OK(Flush(1));
86 }
87
88 snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
89 Slice target(buf2, 20);
90 iter->Seek(target);
91 ASSERT_TRUE(iter->Valid());
92 ASSERT_EQ(iter->key().compare(key), 0);
93 if (i == 1) {
94 itern->SeekToFirst();
95 } else {
96 itern->Next();
97 }
98 ASSERT_TRUE(itern->Valid());
99 ASSERT_EQ(itern->key().compare(key), 0);
100 }
101 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
102 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
103 for (int i = 2 * num_records; i > 0; --i) {
104 char buf1[32];
105 char buf2[32];
106 snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
107
108 Slice key(buf1, 20);
109 ASSERT_OK(Put(1, key, value));
110
111 if (i % 100 == 99) {
112 ASSERT_OK(Flush(1));
113 }
114
115 snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
116 Slice target(buf2, 20);
117 iter->Seek(target);
118 ASSERT_TRUE(iter->Valid());
119 ASSERT_EQ(iter->key().compare(key), 0);
120 }
121 }
122
123 TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) {
124 const uint64_t k150KB = 150 * 1024;
125 Options options;
126 options.write_buffer_size = k150KB;
127 options.max_write_buffer_number = 3;
128 options.min_write_buffer_number_to_merge = 2;
129 options.env = env_;
130 CreateAndReopenWithCF({"pikachu"}, options);
131 ReadOptions read_options;
132 read_options.tailing = true;
133 int num_iters, deleted_iters;
134
135 char bufe[32];
136 snprintf(bufe, sizeof(bufe), "00b0%016d", 0);
137 Slice keyu(bufe, 20);
138 read_options.iterate_upper_bound = &keyu;
139 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
140 std::unique_ptr<Iterator> itern(db_->NewIterator(read_options, handles_[1]));
141 std::unique_ptr<Iterator> iterh(db_->NewIterator(read_options, handles_[1]));
142 std::string value(1024, 'a');
143 bool file_iters_deleted = false;
144 bool file_iters_renewed_null = false;
145 bool file_iters_renewed_copy = false;
146 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
147 "ForwardIterator::SeekInternal:Return", [&](void* arg) {
148 ForwardIterator* fiter = reinterpret_cast<ForwardIterator*>(arg);
149 ASSERT_TRUE(!file_iters_deleted ||
150 fiter->TEST_CheckDeletedIters(&deleted_iters, &num_iters));
151 });
152 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
153 "ForwardIterator::Next:Return", [&](void* arg) {
154 ForwardIterator* fiter = reinterpret_cast<ForwardIterator*>(arg);
155 ASSERT_TRUE(!file_iters_deleted ||
156 fiter->TEST_CheckDeletedIters(&deleted_iters, &num_iters));
157 });
158 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
159 "ForwardIterator::RenewIterators:Null",
160 [&](void* /*arg*/) { file_iters_renewed_null = true; });
161 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
162 "ForwardIterator::RenewIterators:Copy",
163 [&](void* /*arg*/) { file_iters_renewed_copy = true; });
164 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
165 const int num_records = 1000;
166 for (int i = 1; i < num_records; ++i) {
167 char buf1[32];
168 char buf2[32];
169 char buf3[32];
170 char buf4[32];
171 snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
172 snprintf(buf3, sizeof(buf3), "00b0%016d", i * 5);
173
174 Slice key(buf1, 20);
175 ASSERT_OK(Put(1, key, value));
176 Slice keyn(buf3, 20);
177 ASSERT_OK(Put(1, keyn, value));
178
179 if (i % 100 == 99) {
180 ASSERT_OK(Flush(1));
181 dbfull()->TEST_WaitForCompact();
182 if (i == 299) {
183 file_iters_deleted = true;
184 }
185 snprintf(buf4, sizeof(buf4), "00a0%016d", i * 5 / 2);
186 Slice target(buf4, 20);
187 iterh->Seek(target);
188 ASSERT_TRUE(iter->Valid());
189 for (int j = (i + 1) * 5 / 2; j < i * 5; j += 5) {
190 iterh->Next();
191 ASSERT_TRUE(iterh->Valid());
192 }
193 if (i == 299) {
194 file_iters_deleted = false;
195 }
196 }
197
198 file_iters_deleted = true;
199 snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
200 Slice target(buf2, 20);
201 iter->Seek(target);
202 ASSERT_TRUE(iter->Valid());
203 ASSERT_EQ(iter->key().compare(key), 0);
204 ASSERT_LE(num_iters, 1);
205 if (i == 1) {
206 itern->SeekToFirst();
207 } else {
208 itern->Next();
209 }
210 ASSERT_TRUE(itern->Valid());
211 ASSERT_EQ(itern->key().compare(key), 0);
212 ASSERT_LE(num_iters, 1);
213 file_iters_deleted = false;
214 }
215 ASSERT_TRUE(file_iters_renewed_null);
216 ASSERT_TRUE(file_iters_renewed_copy);
217 iter = nullptr;
218 itern = nullptr;
219 iterh = nullptr;
220 BlockBasedTableOptions table_options;
221 table_options.no_block_cache = true;
222 table_options.block_cache_compressed = nullptr;
223 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
224 ReopenWithColumnFamilies({"default", "pikachu"}, options);
225 read_options.read_tier = kBlockCacheTier;
226 std::unique_ptr<Iterator> iteri(db_->NewIterator(read_options, handles_[1]));
227 char buf5[32];
228 snprintf(buf5, sizeof(buf5), "00a0%016d", (num_records / 2) * 5 - 2);
229 Slice target1(buf5, 20);
230 iteri->Seek(target1);
231 ASSERT_TRUE(iteri->status().IsIncomplete());
232 iteri = nullptr;
233
234 read_options.read_tier = kReadAllTier;
235 options.table_factory.reset(NewBlockBasedTableFactory());
236 ReopenWithColumnFamilies({"default", "pikachu"}, options);
237 iter.reset(db_->NewIterator(read_options, handles_[1]));
238 for (int i = 2 * num_records; i > 0; --i) {
239 char buf1[32];
240 char buf2[32];
241 snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
242
243 Slice key(buf1, 20);
244 ASSERT_OK(Put(1, key, value));
245
246 if (i % 100 == 99) {
247 ASSERT_OK(Flush(1));
248 }
249
250 snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
251 Slice target(buf2, 20);
252 iter->Seek(target);
253 ASSERT_TRUE(iter->Valid());
254 ASSERT_EQ(iter->key().compare(key), 0);
255 }
256 }
257
258 TEST_F(DBTestTailingIterator, TailingIteratorDeletes) {
259 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
260 ReadOptions read_options;
261 read_options.tailing = true;
262
263 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
264
265 // write a single record, read it using the iterator, then delete it
266 ASSERT_OK(Put(1, "0test", "test"));
267 iter->SeekToFirst();
268 ASSERT_TRUE(iter->Valid());
269 ASSERT_EQ(iter->key().ToString(), "0test");
270 ASSERT_OK(Delete(1, "0test"));
271
272 // write many more records
273 const int num_records = 10000;
274 std::string value(1024, 'A');
275
276 for (int i = 0; i < num_records; ++i) {
277 char buf[32];
278 snprintf(buf, sizeof(buf), "1%015d", i);
279
280 Slice key(buf, 16);
281 ASSERT_OK(Put(1, key, value));
282 }
283
284 // force a flush to make sure that no records are read from memtable
285 ASSERT_OK(Flush(1));
286
287 // skip "0test"
288 iter->Next();
289
290 // make sure we can read all new records using the existing iterator
291 int count = 0;
292 for (; iter->Valid(); iter->Next(), ++count) ;
293
294 ASSERT_EQ(count, num_records);
295 }
296
297 TEST_F(DBTestTailingIterator, TailingIteratorPrefixSeek) {
298 ReadOptions read_options;
299 read_options.tailing = true;
300
301 Options options = CurrentOptions();
302 options.create_if_missing = true;
303 options.disable_auto_compactions = true;
304 options.prefix_extractor.reset(NewFixedPrefixTransform(2));
305 options.memtable_factory.reset(NewHashSkipListRepFactory(16));
306 options.allow_concurrent_memtable_write = false;
307 DestroyAndReopen(options);
308 CreateAndReopenWithCF({"pikachu"}, options);
309
310 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
311 ASSERT_OK(Put(1, "0101", "test"));
312
313 ASSERT_OK(Flush(1));
314
315 ASSERT_OK(Put(1, "0202", "test"));
316
317 // Seek(0102) shouldn't find any records since 0202 has a different prefix
318 iter->Seek("0102");
319 ASSERT_TRUE(!iter->Valid());
320
321 iter->Seek("0202");
322 ASSERT_TRUE(iter->Valid());
323 ASSERT_EQ(iter->key().ToString(), "0202");
324
325 iter->Next();
326 ASSERT_TRUE(!iter->Valid());
327 }
328
329 TEST_F(DBTestTailingIterator, TailingIteratorIncomplete) {
330 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
331 ReadOptions read_options;
332 read_options.tailing = true;
333 read_options.read_tier = kBlockCacheTier;
334
335 std::string key("key");
336 std::string value("value");
337
338 ASSERT_OK(db_->Put(WriteOptions(), key, value));
339
340 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
341 iter->SeekToFirst();
342 // we either see the entry or it's not in cache
343 ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
344
345 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
346 iter->SeekToFirst();
347 // should still be true after compaction
348 ASSERT_TRUE(iter->Valid() || iter->status().IsIncomplete());
349 }
350
351 TEST_F(DBTestTailingIterator, TailingIteratorSeekToSame) {
352 Options options = CurrentOptions();
353 options.compaction_style = kCompactionStyleUniversal;
354 options.write_buffer_size = 1000;
355 CreateAndReopenWithCF({"pikachu"}, options);
356
357 ReadOptions read_options;
358 read_options.tailing = true;
359
360 const int NROWS = 10000;
361 // Write rows with keys 00000, 00002, 00004 etc.
362 for (int i = 0; i < NROWS; ++i) {
363 char buf[100];
364 snprintf(buf, sizeof(buf), "%05d", 2*i);
365 std::string key(buf);
366 std::string value("value");
367 ASSERT_OK(db_->Put(WriteOptions(), key, value));
368 }
369
370 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
371 // Seek to 00001. We expect to find 00002.
372 std::string start_key = "00001";
373 iter->Seek(start_key);
374 ASSERT_TRUE(iter->Valid());
375
376 std::string found = iter->key().ToString();
377 ASSERT_EQ("00002", found);
378
379 // Now seek to the same key. The iterator should remain in the same
380 // position.
381 iter->Seek(found);
382 ASSERT_TRUE(iter->Valid());
383 ASSERT_EQ(found, iter->key().ToString());
384 }
385
386 // Sets iterate_upper_bound and verifies that ForwardIterator doesn't call
387 // Seek() on immutable iterators when target key is >= prev_key and all
388 // iterators, including the memtable iterator, are over the upper bound.
389 TEST_F(DBTestTailingIterator, TailingIteratorUpperBound) {
390 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
391
392 const Slice upper_bound("20", 3);
393 ReadOptions read_options;
394 read_options.tailing = true;
395 read_options.iterate_upper_bound = &upper_bound;
396
397 ASSERT_OK(Put(1, "11", "11"));
398 ASSERT_OK(Put(1, "12", "12"));
399 ASSERT_OK(Put(1, "22", "22"));
400 ASSERT_OK(Flush(1)); // flush all those keys to an immutable SST file
401
402 // Add another key to the memtable.
403 ASSERT_OK(Put(1, "21", "21"));
404
405 std::unique_ptr<Iterator> it(db_->NewIterator(read_options, handles_[1]));
406 it->Seek("12");
407 ASSERT_TRUE(it->Valid());
408 ASSERT_EQ("12", it->key().ToString());
409
410 it->Next();
411 // Not valid since "21" is over the upper bound.
412 ASSERT_FALSE(it->Valid());
413
414 // This keeps track of the number of times NeedToSeekImmutable() was true.
415 int immutable_seeks = 0;
416 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
417 "ForwardIterator::SeekInternal:Immutable",
418 [&](void* /*arg*/) { ++immutable_seeks; });
419
420 // Seek to 13. This should not require any immutable seeks.
421 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
422 it->Seek("13");
423 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
424
425 ASSERT_FALSE(it->Valid());
426 ASSERT_EQ(0, immutable_seeks);
427 }
428
429 TEST_F(DBTestTailingIterator, TailingIteratorGap) {
430 // level 1: [20, 25] [35, 40]
431 // level 2: [10 - 15] [45 - 50]
432 // level 3: [20, 30, 40]
433 // Previously there is a bug in tailing_iterator that if there is a gap in
434 // lower level, the key will be skipped if it is within the range between
435 // the largest key of index n file and the smallest key of index n+1 file
436 // if both file fit in that gap. In this example, 25 < key < 35
437 // https://github.com/facebook/rocksdb/issues/1372
438 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
439
440 ReadOptions read_options;
441 read_options.tailing = true;
442
443 ASSERT_OK(Put(1, "20", "20"));
444 ASSERT_OK(Put(1, "30", "30"));
445 ASSERT_OK(Put(1, "40", "40"));
446 ASSERT_OK(Flush(1));
447 MoveFilesToLevel(3, 1);
448
449 ASSERT_OK(Put(1, "10", "10"));
450 ASSERT_OK(Put(1, "15", "15"));
451 ASSERT_OK(Flush(1));
452 ASSERT_OK(Put(1, "45", "45"));
453 ASSERT_OK(Put(1, "50", "50"));
454 ASSERT_OK(Flush(1));
455 MoveFilesToLevel(2, 1);
456
457 ASSERT_OK(Put(1, "20", "20"));
458 ASSERT_OK(Put(1, "25", "25"));
459 ASSERT_OK(Flush(1));
460 ASSERT_OK(Put(1, "35", "35"));
461 ASSERT_OK(Put(1, "40", "40"));
462 ASSERT_OK(Flush(1));
463 MoveFilesToLevel(1, 1);
464
465 ColumnFamilyMetaData meta;
466 db_->GetColumnFamilyMetaData(handles_[1], &meta);
467
468 std::unique_ptr<Iterator> it(db_->NewIterator(read_options, handles_[1]));
469 it->Seek("30");
470 ASSERT_TRUE(it->Valid());
471 ASSERT_EQ("30", it->key().ToString());
472
473 it->Next();
474 ASSERT_TRUE(it->Valid());
475 ASSERT_EQ("35", it->key().ToString());
476
477 it->Next();
478 ASSERT_TRUE(it->Valid());
479 ASSERT_EQ("40", it->key().ToString());
480 }
481
482 TEST_F(DBTestTailingIterator, SeekWithUpperBoundBug) {
483 ReadOptions read_options;
484 read_options.tailing = true;
485 const Slice upper_bound("cc", 3);
486 read_options.iterate_upper_bound = &upper_bound;
487
488
489 // 1st L0 file
490 ASSERT_OK(db_->Put(WriteOptions(), "aa", "SEEN"));
491 ASSERT_OK(Flush());
492
493 // 2nd L0 file
494 ASSERT_OK(db_->Put(WriteOptions(), "zz", "NOT-SEEN"));
495 ASSERT_OK(Flush());
496
497 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
498
499 iter->Seek("aa");
500 ASSERT_TRUE(iter->Valid());
501 ASSERT_EQ(iter->key().ToString(), "aa");
502 }
503
504 TEST_F(DBTestTailingIterator, SeekToFirstWithUpperBoundBug) {
505 ReadOptions read_options;
506 read_options.tailing = true;
507 const Slice upper_bound("cc", 3);
508 read_options.iterate_upper_bound = &upper_bound;
509
510
511 // 1st L0 file
512 ASSERT_OK(db_->Put(WriteOptions(), "aa", "SEEN"));
513 ASSERT_OK(Flush());
514
515 // 2nd L0 file
516 ASSERT_OK(db_->Put(WriteOptions(), "zz", "NOT-SEEN"));
517 ASSERT_OK(Flush());
518
519 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
520
521 iter->SeekToFirst();
522 ASSERT_TRUE(iter->Valid());
523 ASSERT_EQ(iter->key().ToString(), "aa");
524
525 iter->Next();
526 ASSERT_FALSE(iter->Valid());
527
528 iter->SeekToFirst();
529 ASSERT_TRUE(iter->Valid());
530 ASSERT_EQ(iter->key().ToString(), "aa");
531 }
532
533 } // namespace ROCKSDB_NAMESPACE
534
535 #endif // !defined(ROCKSDB_LITE)
536
537 int main(int argc, char** argv) {
538 #if !defined(ROCKSDB_LITE)
539 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
540 ::testing::InitGoogleTest(&argc, argv);
541 return RUN_ALL_TESTS();
542 #else
543 (void) argc;
544 (void) argv;
545 return 0;
546 #endif
547 }