1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 // Introduction of SyncPoint effectively disabled building and running this test
12 // which is a pity, it is a good test
13 #if !defined(ROCKSDB_LITE)
15 #include "db/db_test_util.h"
16 #include "db/forward_iterator.h"
17 #include "port/stack_trace.h"
19 namespace ROCKSDB_NAMESPACE
{
21 class DBTestTailingIterator
: public DBTestBase
{
23 DBTestTailingIterator() : DBTestBase("/db_tailing_iterator_test") {}
26 TEST_F(DBTestTailingIterator
, TailingIteratorSingle
) {
27 ReadOptions read_options
;
28 read_options
.tailing
= true;
30 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
32 ASSERT_TRUE(!iter
->Valid());
34 // add a record and check that iter can see it
35 ASSERT_OK(db_
->Put(WriteOptions(), "mirko", "fodor"));
37 ASSERT_TRUE(iter
->Valid());
38 ASSERT_EQ(iter
->key().ToString(), "mirko");
41 ASSERT_TRUE(!iter
->Valid());
44 TEST_F(DBTestTailingIterator
, TailingIteratorKeepAdding
) {
45 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
46 ReadOptions read_options
;
47 read_options
.tailing
= true;
49 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
, handles_
[1]));
50 std::string
value(1024, 'a');
52 const int num_records
= 10000;
53 for (int i
= 0; i
< num_records
; ++i
) {
55 snprintf(buf
, sizeof(buf
), "%016d", i
);
58 ASSERT_OK(Put(1, key
, value
));
61 ASSERT_TRUE(iter
->Valid());
62 ASSERT_EQ(iter
->key().compare(key
), 0);
66 TEST_F(DBTestTailingIterator
, TailingIteratorSeekToNext
) {
67 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
68 ReadOptions read_options
;
69 read_options
.tailing
= true;
71 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
, handles_
[1]));
72 std::unique_ptr
<Iterator
> itern(db_
->NewIterator(read_options
, handles_
[1]));
73 std::string
value(1024, 'a');
75 const int num_records
= 1000;
76 for (int i
= 1; i
< num_records
; ++i
) {
79 snprintf(buf1
, sizeof(buf1
), "00a0%016d", i
* 5);
82 ASSERT_OK(Put(1, key
, value
));
88 snprintf(buf2
, sizeof(buf2
), "00a0%016d", i
* 5 - 2);
89 Slice
target(buf2
, 20);
91 ASSERT_TRUE(iter
->Valid());
92 ASSERT_EQ(iter
->key().compare(key
), 0);
98 ASSERT_TRUE(itern
->Valid());
99 ASSERT_EQ(itern
->key().compare(key
), 0);
101 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
102 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
103 for (int i
= 2 * num_records
; i
> 0; --i
) {
106 snprintf(buf1
, sizeof(buf1
), "00a0%016d", i
* 5);
109 ASSERT_OK(Put(1, key
, value
));
115 snprintf(buf2
, sizeof(buf2
), "00a0%016d", i
* 5 - 2);
116 Slice
target(buf2
, 20);
118 ASSERT_TRUE(iter
->Valid());
119 ASSERT_EQ(iter
->key().compare(key
), 0);
123 TEST_F(DBTestTailingIterator
, TailingIteratorTrimSeekToNext
) {
124 const uint64_t k150KB
= 150 * 1024;
126 options
.write_buffer_size
= k150KB
;
127 options
.max_write_buffer_number
= 3;
128 options
.min_write_buffer_number_to_merge
= 2;
130 CreateAndReopenWithCF({"pikachu"}, options
);
131 ReadOptions read_options
;
132 read_options
.tailing
= true;
133 int num_iters
, deleted_iters
;
136 snprintf(bufe
, sizeof(bufe
), "00b0%016d", 0);
137 Slice
keyu(bufe
, 20);
138 read_options
.iterate_upper_bound
= &keyu
;
139 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
, handles_
[1]));
140 std::unique_ptr
<Iterator
> itern(db_
->NewIterator(read_options
, handles_
[1]));
141 std::unique_ptr
<Iterator
> iterh(db_
->NewIterator(read_options
, handles_
[1]));
142 std::string
value(1024, 'a');
143 bool file_iters_deleted
= false;
144 bool file_iters_renewed_null
= false;
145 bool file_iters_renewed_copy
= false;
146 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
147 "ForwardIterator::SeekInternal:Return", [&](void* arg
) {
148 ForwardIterator
* fiter
= reinterpret_cast<ForwardIterator
*>(arg
);
149 ASSERT_TRUE(!file_iters_deleted
||
150 fiter
->TEST_CheckDeletedIters(&deleted_iters
, &num_iters
));
152 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
153 "ForwardIterator::Next:Return", [&](void* arg
) {
154 ForwardIterator
* fiter
= reinterpret_cast<ForwardIterator
*>(arg
);
155 ASSERT_TRUE(!file_iters_deleted
||
156 fiter
->TEST_CheckDeletedIters(&deleted_iters
, &num_iters
));
158 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
159 "ForwardIterator::RenewIterators:Null",
160 [&](void* /*arg*/) { file_iters_renewed_null
= true; });
161 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
162 "ForwardIterator::RenewIterators:Copy",
163 [&](void* /*arg*/) { file_iters_renewed_copy
= true; });
164 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
165 const int num_records
= 1000;
166 for (int i
= 1; i
< num_records
; ++i
) {
171 snprintf(buf1
, sizeof(buf1
), "00a0%016d", i
* 5);
172 snprintf(buf3
, sizeof(buf3
), "00b0%016d", i
* 5);
175 ASSERT_OK(Put(1, key
, value
));
176 Slice
keyn(buf3
, 20);
177 ASSERT_OK(Put(1, keyn
, value
));
181 dbfull()->TEST_WaitForCompact();
183 file_iters_deleted
= true;
185 snprintf(buf4
, sizeof(buf4
), "00a0%016d", i
* 5 / 2);
186 Slice
target(buf4
, 20);
188 ASSERT_TRUE(iter
->Valid());
189 for (int j
= (i
+ 1) * 5 / 2; j
< i
* 5; j
+= 5) {
191 ASSERT_TRUE(iterh
->Valid());
194 file_iters_deleted
= false;
198 file_iters_deleted
= true;
199 snprintf(buf2
, sizeof(buf2
), "00a0%016d", i
* 5 - 2);
200 Slice
target(buf2
, 20);
202 ASSERT_TRUE(iter
->Valid());
203 ASSERT_EQ(iter
->key().compare(key
), 0);
204 ASSERT_LE(num_iters
, 1);
206 itern
->SeekToFirst();
210 ASSERT_TRUE(itern
->Valid());
211 ASSERT_EQ(itern
->key().compare(key
), 0);
212 ASSERT_LE(num_iters
, 1);
213 file_iters_deleted
= false;
215 ASSERT_TRUE(file_iters_renewed_null
);
216 ASSERT_TRUE(file_iters_renewed_copy
);
220 BlockBasedTableOptions table_options
;
221 table_options
.no_block_cache
= true;
222 table_options
.block_cache_compressed
= nullptr;
223 options
.table_factory
.reset(NewBlockBasedTableFactory(table_options
));
224 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
225 read_options
.read_tier
= kBlockCacheTier
;
226 std::unique_ptr
<Iterator
> iteri(db_
->NewIterator(read_options
, handles_
[1]));
228 snprintf(buf5
, sizeof(buf5
), "00a0%016d", (num_records
/ 2) * 5 - 2);
229 Slice
target1(buf5
, 20);
230 iteri
->Seek(target1
);
231 ASSERT_TRUE(iteri
->status().IsIncomplete());
234 read_options
.read_tier
= kReadAllTier
;
235 options
.table_factory
.reset(NewBlockBasedTableFactory());
236 ReopenWithColumnFamilies({"default", "pikachu"}, options
);
237 iter
.reset(db_
->NewIterator(read_options
, handles_
[1]));
238 for (int i
= 2 * num_records
; i
> 0; --i
) {
241 snprintf(buf1
, sizeof(buf1
), "00a0%016d", i
* 5);
244 ASSERT_OK(Put(1, key
, value
));
250 snprintf(buf2
, sizeof(buf2
), "00a0%016d", i
* 5 - 2);
251 Slice
target(buf2
, 20);
253 ASSERT_TRUE(iter
->Valid());
254 ASSERT_EQ(iter
->key().compare(key
), 0);
258 TEST_F(DBTestTailingIterator
, TailingIteratorDeletes
) {
259 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
260 ReadOptions read_options
;
261 read_options
.tailing
= true;
263 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
, handles_
[1]));
265 // write a single record, read it using the iterator, then delete it
266 ASSERT_OK(Put(1, "0test", "test"));
268 ASSERT_TRUE(iter
->Valid());
269 ASSERT_EQ(iter
->key().ToString(), "0test");
270 ASSERT_OK(Delete(1, "0test"));
272 // write many more records
273 const int num_records
= 10000;
274 std::string
value(1024, 'A');
276 for (int i
= 0; i
< num_records
; ++i
) {
278 snprintf(buf
, sizeof(buf
), "1%015d", i
);
281 ASSERT_OK(Put(1, key
, value
));
284 // force a flush to make sure that no records are read from memtable
290 // make sure we can read all new records using the existing iterator
292 for (; iter
->Valid(); iter
->Next(), ++count
) ;
294 ASSERT_EQ(count
, num_records
);
297 TEST_F(DBTestTailingIterator
, TailingIteratorPrefixSeek
) {
298 ReadOptions read_options
;
299 read_options
.tailing
= true;
301 Options options
= CurrentOptions();
302 options
.create_if_missing
= true;
303 options
.disable_auto_compactions
= true;
304 options
.prefix_extractor
.reset(NewFixedPrefixTransform(2));
305 options
.memtable_factory
.reset(NewHashSkipListRepFactory(16));
306 options
.allow_concurrent_memtable_write
= false;
307 DestroyAndReopen(options
);
308 CreateAndReopenWithCF({"pikachu"}, options
);
310 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
, handles_
[1]));
311 ASSERT_OK(Put(1, "0101", "test"));
315 ASSERT_OK(Put(1, "0202", "test"));
317 // Seek(0102) shouldn't find any records since 0202 has a different prefix
319 ASSERT_TRUE(!iter
->Valid());
322 ASSERT_TRUE(iter
->Valid());
323 ASSERT_EQ(iter
->key().ToString(), "0202");
326 ASSERT_TRUE(!iter
->Valid());
329 TEST_F(DBTestTailingIterator
, TailingIteratorIncomplete
) {
330 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
331 ReadOptions read_options
;
332 read_options
.tailing
= true;
333 read_options
.read_tier
= kBlockCacheTier
;
335 std::string
key("key");
336 std::string
value("value");
338 ASSERT_OK(db_
->Put(WriteOptions(), key
, value
));
340 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
342 // we either see the entry or it's not in cache
343 ASSERT_TRUE(iter
->Valid() || iter
->status().IsIncomplete());
345 ASSERT_OK(db_
->CompactRange(CompactRangeOptions(), nullptr, nullptr));
347 // should still be true after compaction
348 ASSERT_TRUE(iter
->Valid() || iter
->status().IsIncomplete());
351 TEST_F(DBTestTailingIterator
, TailingIteratorSeekToSame
) {
352 Options options
= CurrentOptions();
353 options
.compaction_style
= kCompactionStyleUniversal
;
354 options
.write_buffer_size
= 1000;
355 CreateAndReopenWithCF({"pikachu"}, options
);
357 ReadOptions read_options
;
358 read_options
.tailing
= true;
360 const int NROWS
= 10000;
361 // Write rows with keys 00000, 00002, 00004 etc.
362 for (int i
= 0; i
< NROWS
; ++i
) {
364 snprintf(buf
, sizeof(buf
), "%05d", 2*i
);
365 std::string
key(buf
);
366 std::string
value("value");
367 ASSERT_OK(db_
->Put(WriteOptions(), key
, value
));
370 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
371 // Seek to 00001. We expect to find 00002.
372 std::string start_key
= "00001";
373 iter
->Seek(start_key
);
374 ASSERT_TRUE(iter
->Valid());
376 std::string found
= iter
->key().ToString();
377 ASSERT_EQ("00002", found
);
379 // Now seek to the same key. The iterator should remain in the same
382 ASSERT_TRUE(iter
->Valid());
383 ASSERT_EQ(found
, iter
->key().ToString());
386 // Sets iterate_upper_bound and verifies that ForwardIterator doesn't call
387 // Seek() on immutable iterators when target key is >= prev_key and all
388 // iterators, including the memtable iterator, are over the upper bound.
389 TEST_F(DBTestTailingIterator
, TailingIteratorUpperBound
) {
390 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
392 const Slice
upper_bound("20", 3);
393 ReadOptions read_options
;
394 read_options
.tailing
= true;
395 read_options
.iterate_upper_bound
= &upper_bound
;
397 ASSERT_OK(Put(1, "11", "11"));
398 ASSERT_OK(Put(1, "12", "12"));
399 ASSERT_OK(Put(1, "22", "22"));
400 ASSERT_OK(Flush(1)); // flush all those keys to an immutable SST file
402 // Add another key to the memtable.
403 ASSERT_OK(Put(1, "21", "21"));
405 std::unique_ptr
<Iterator
> it(db_
->NewIterator(read_options
, handles_
[1]));
407 ASSERT_TRUE(it
->Valid());
408 ASSERT_EQ("12", it
->key().ToString());
411 // Not valid since "21" is over the upper bound.
412 ASSERT_FALSE(it
->Valid());
414 // This keeps track of the number of times NeedToSeekImmutable() was true.
415 int immutable_seeks
= 0;
416 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
417 "ForwardIterator::SeekInternal:Immutable",
418 [&](void* /*arg*/) { ++immutable_seeks
; });
420 // Seek to 13. This should not require any immutable seeks.
421 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
423 ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
425 ASSERT_FALSE(it
->Valid());
426 ASSERT_EQ(0, immutable_seeks
);
429 TEST_F(DBTestTailingIterator
, TailingIteratorGap
) {
430 // level 1: [20, 25] [35, 40]
431 // level 2: [10 - 15] [45 - 50]
432 // level 3: [20, 30, 40]
433 // Previously there is a bug in tailing_iterator that if there is a gap in
434 // lower level, the key will be skipped if it is within the range between
435 // the largest key of index n file and the smallest key of index n+1 file
436 // if both file fit in that gap. In this example, 25 < key < 35
437 // https://github.com/facebook/rocksdb/issues/1372
438 CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
440 ReadOptions read_options
;
441 read_options
.tailing
= true;
443 ASSERT_OK(Put(1, "20", "20"));
444 ASSERT_OK(Put(1, "30", "30"));
445 ASSERT_OK(Put(1, "40", "40"));
447 MoveFilesToLevel(3, 1);
449 ASSERT_OK(Put(1, "10", "10"));
450 ASSERT_OK(Put(1, "15", "15"));
452 ASSERT_OK(Put(1, "45", "45"));
453 ASSERT_OK(Put(1, "50", "50"));
455 MoveFilesToLevel(2, 1);
457 ASSERT_OK(Put(1, "20", "20"));
458 ASSERT_OK(Put(1, "25", "25"));
460 ASSERT_OK(Put(1, "35", "35"));
461 ASSERT_OK(Put(1, "40", "40"));
463 MoveFilesToLevel(1, 1);
465 ColumnFamilyMetaData meta
;
466 db_
->GetColumnFamilyMetaData(handles_
[1], &meta
);
468 std::unique_ptr
<Iterator
> it(db_
->NewIterator(read_options
, handles_
[1]));
470 ASSERT_TRUE(it
->Valid());
471 ASSERT_EQ("30", it
->key().ToString());
474 ASSERT_TRUE(it
->Valid());
475 ASSERT_EQ("35", it
->key().ToString());
478 ASSERT_TRUE(it
->Valid());
479 ASSERT_EQ("40", it
->key().ToString());
482 TEST_F(DBTestTailingIterator
, SeekWithUpperBoundBug
) {
483 ReadOptions read_options
;
484 read_options
.tailing
= true;
485 const Slice
upper_bound("cc", 3);
486 read_options
.iterate_upper_bound
= &upper_bound
;
490 ASSERT_OK(db_
->Put(WriteOptions(), "aa", "SEEN"));
494 ASSERT_OK(db_
->Put(WriteOptions(), "zz", "NOT-SEEN"));
497 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
500 ASSERT_TRUE(iter
->Valid());
501 ASSERT_EQ(iter
->key().ToString(), "aa");
504 TEST_F(DBTestTailingIterator
, SeekToFirstWithUpperBoundBug
) {
505 ReadOptions read_options
;
506 read_options
.tailing
= true;
507 const Slice
upper_bound("cc", 3);
508 read_options
.iterate_upper_bound
= &upper_bound
;
512 ASSERT_OK(db_
->Put(WriteOptions(), "aa", "SEEN"));
516 ASSERT_OK(db_
->Put(WriteOptions(), "zz", "NOT-SEEN"));
519 std::unique_ptr
<Iterator
> iter(db_
->NewIterator(read_options
));
522 ASSERT_TRUE(iter
->Valid());
523 ASSERT_EQ(iter
->key().ToString(), "aa");
526 ASSERT_FALSE(iter
->Valid());
529 ASSERT_TRUE(iter
->Valid());
530 ASSERT_EQ(iter
->key().ToString(), "aa");
533 } // namespace ROCKSDB_NAMESPACE
535 #endif // !defined(ROCKSDB_LITE)
537 int main(int argc
, char** argv
) {
538 #if !defined(ROCKSDB_LITE)
539 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
540 ::testing::InitGoogleTest(&argc
, argv
);
541 return RUN_ALL_TESTS();