]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/file/prefetch_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / file / prefetch_test.cc
CommitLineData
20effc67
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#include "db/db_test_util.h"
1e59de90
TL
7#include "file/file_prefetch_buffer.h"
8#include "file/file_util.h"
9#include "rocksdb/file_system.h"
20effc67 10#include "test_util/sync_point.h"
1e59de90
TL
11#ifdef GFLAGS
12#include "tools/io_tracer_parser_tool.h"
13#endif
14#include "util/random.h"
20effc67
TL
15
16namespace ROCKSDB_NAMESPACE {
17
18class MockFS;
19
1e59de90 20class MockRandomAccessFile : public FSRandomAccessFileOwnerWrapper {
20effc67
TL
21 public:
22 MockRandomAccessFile(std::unique_ptr<FSRandomAccessFile>& file,
23 bool support_prefetch, std::atomic_int& prefetch_count)
1e59de90 24 : FSRandomAccessFileOwnerWrapper(std::move(file)),
20effc67
TL
25 support_prefetch_(support_prefetch),
26 prefetch_count_(prefetch_count) {}
27
28 IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options,
29 IODebugContext* dbg) override {
30 if (support_prefetch_) {
31 prefetch_count_.fetch_add(1);
32 return target()->Prefetch(offset, n, options, dbg);
33 } else {
34 return IOStatus::NotSupported("Prefetch not supported");
35 }
36 }
37
38 private:
20effc67
TL
39 const bool support_prefetch_;
40 std::atomic_int& prefetch_count_;
41};
42
43class MockFS : public FileSystemWrapper {
44 public:
45 explicit MockFS(const std::shared_ptr<FileSystem>& wrapped,
46 bool support_prefetch)
47 : FileSystemWrapper(wrapped), support_prefetch_(support_prefetch) {}
48
1e59de90
TL
49 static const char* kClassName() { return "MockFS"; }
50 const char* Name() const override { return kClassName(); }
51
20effc67
TL
52 IOStatus NewRandomAccessFile(const std::string& fname,
53 const FileOptions& opts,
54 std::unique_ptr<FSRandomAccessFile>* result,
55 IODebugContext* dbg) override {
56 std::unique_ptr<FSRandomAccessFile> file;
57 IOStatus s;
58 s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
59 result->reset(
60 new MockRandomAccessFile(file, support_prefetch_, prefetch_count_));
61 return s;
62 }
63
64 void ClearPrefetchCount() { prefetch_count_ = 0; }
65
66 bool IsPrefetchCalled() { return prefetch_count_ > 0; }
67
1e59de90
TL
68 int GetPrefetchCount() {
69 return prefetch_count_.load(std::memory_order_relaxed);
70 }
71
20effc67
TL
72 private:
73 const bool support_prefetch_;
74 std::atomic_int prefetch_count_{0};
75};
76
77class PrefetchTest
78 : public DBTestBase,
79 public ::testing::WithParamInterface<std::tuple<bool, bool>> {
80 public:
1e59de90 81 PrefetchTest() : DBTestBase("prefetch_test", true) {}
20effc67
TL
82};
83
1e59de90
TL
84INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest,
85 ::testing::Combine(::testing::Bool(),
86 ::testing::Bool()));
87
20effc67
TL
88std::string BuildKey(int num, std::string postfix = "") {
89 return "my_key_" + std::to_string(num) + postfix;
90}
91
92TEST_P(PrefetchTest, Basic) {
93 // First param is if the mockFS support_prefetch or not
1e59de90
TL
94 bool support_prefetch =
95 std::get<0>(GetParam()) &&
96 test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
20effc67
TL
97
98 // Second param is if directIO is enabled or not
99 bool use_direct_io = std::get<1>(GetParam());
100 const int kNumKeys = 1100;
101 std::shared_ptr<MockFS> fs =
102 std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
103 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
104 Options options = CurrentOptions();
105 options.write_buffer_size = 1024;
106 options.create_if_missing = true;
107 options.compression = kNoCompression;
108 options.env = env.get();
109 if (use_direct_io) {
110 options.use_direct_reads = true;
111 options.use_direct_io_for_flush_and_compaction = true;
112 }
113
114 int buff_prefetch_count = 0;
115 SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
116 [&](void*) { buff_prefetch_count++; });
117 SyncPoint::GetInstance()->EnableProcessing();
118
119 Status s = TryReopen(options);
120 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
121 // If direct IO is not supported, skip the test
122 return;
123 } else {
124 ASSERT_OK(s);
125 }
126
127 // create first key range
128 WriteBatch batch;
129 for (int i = 0; i < kNumKeys; i++) {
1e59de90 130 ASSERT_OK(batch.Put(BuildKey(i), "value for range 1 key"));
20effc67
TL
131 }
132 ASSERT_OK(db_->Write(WriteOptions(), &batch));
133
134 // create second key range
135 batch.Clear();
136 for (int i = 0; i < kNumKeys; i++) {
1e59de90 137 ASSERT_OK(batch.Put(BuildKey(i, "key2"), "value for range 2 key"));
20effc67
TL
138 }
139 ASSERT_OK(db_->Write(WriteOptions(), &batch));
140
141 // delete second key range
142 batch.Clear();
143 for (int i = 0; i < kNumKeys; i++) {
1e59de90 144 ASSERT_OK(batch.Delete(BuildKey(i, "key2")));
20effc67
TL
145 }
146 ASSERT_OK(db_->Write(WriteOptions(), &batch));
147
148 // compact database
149 std::string start_key = BuildKey(0);
150 std::string end_key = BuildKey(kNumKeys - 1);
151 Slice least(start_key.data(), start_key.size());
152 Slice greatest(end_key.data(), end_key.size());
153
154 // commenting out the line below causes the example to work correctly
1e59de90 155 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
20effc67
TL
156
157 if (support_prefetch && !use_direct_io) {
158 // If underline file system supports prefetch, and directIO is not enabled
159 // make sure prefetch() is called and FilePrefetchBuffer is not used.
160 ASSERT_TRUE(fs->IsPrefetchCalled());
161 fs->ClearPrefetchCount();
162 ASSERT_EQ(0, buff_prefetch_count);
163 } else {
164 // If underline file system doesn't support prefetch, or directIO is
165 // enabled, make sure prefetch() is not called and FilePrefetchBuffer is
166 // used.
167 ASSERT_FALSE(fs->IsPrefetchCalled());
168 ASSERT_GT(buff_prefetch_count, 0);
169 buff_prefetch_count = 0;
170 }
171
172 // count the keys
173 {
174 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
175 int num_keys = 0;
176 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
177 num_keys++;
178 }
179 }
180
181 // Make sure prefetch is called only if file system support prefetch.
182 if (support_prefetch && !use_direct_io) {
183 ASSERT_TRUE(fs->IsPrefetchCalled());
184 fs->ClearPrefetchCount();
185 ASSERT_EQ(0, buff_prefetch_count);
186 } else {
187 ASSERT_FALSE(fs->IsPrefetchCalled());
188 ASSERT_GT(buff_prefetch_count, 0);
189 buff_prefetch_count = 0;
190 }
191 Close();
192}
193
1e59de90
TL
194#ifndef ROCKSDB_LITE
195TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) {
196 // First param is if the mockFS support_prefetch or not
197 bool support_prefetch =
198 std::get<0>(GetParam()) &&
199 test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
200
201 // Second param is if directIO is enabled or not
202 bool use_direct_io = std::get<1>(GetParam());
203
204 std::shared_ptr<MockFS> fs =
205 std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
206 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
207
208 Options options = CurrentOptions();
209 options.write_buffer_size = 1024;
210 options.create_if_missing = true;
211 options.compression = kNoCompression;
212 options.env = env.get();
213 options.disable_auto_compactions = true;
214 if (use_direct_io) {
215 options.use_direct_reads = true;
216 options.use_direct_io_for_flush_and_compaction = true;
217 }
218 BlockBasedTableOptions table_options;
219 table_options.no_block_cache = true;
220 table_options.cache_index_and_filter_blocks = false;
221 table_options.metadata_block_size = 1024;
222 table_options.index_type =
223 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
224 table_options.max_auto_readahead_size = 0;
225 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
226
227 int buff_prefetch_count = 0;
228 SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
229 [&](void*) { buff_prefetch_count++; });
230
231 // DB open will create table readers unless we reduce the table cache
232 // capacity. SanitizeOptions will set max_open_files to minimum of 20. Table
233 // cache is allocated with max_open_files - 10 as capacity. So override
234 // max_open_files to 10 so table cache capacity will become 0. This will
235 // prevent file open during DB open and force the file to be opened during
236 // Iteration.
237 SyncPoint::GetInstance()->SetCallBack(
238 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
239 int* max_open_files = (int*)arg;
240 *max_open_files = 11;
241 });
242
243 SyncPoint::GetInstance()->EnableProcessing();
244
245 Status s = TryReopen(options);
246
247 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
248 // If direct IO is not supported, skip the test
249 return;
250 } else {
251 ASSERT_OK(s);
252 }
253
254 Random rnd(309);
255 int key_count = 0;
256 const int num_keys_per_level = 100;
257 // Level 0 : Keys in range [0, 99], Level 1:[100, 199], Level 2:[200, 299].
258 for (int level = 2; level >= 0; level--) {
259 key_count = level * num_keys_per_level;
260 for (int i = 0; i < num_keys_per_level; ++i) {
261 ASSERT_OK(Put(Key(key_count++), rnd.RandomString(500)));
262 }
263 ASSERT_OK(Flush());
264 MoveFilesToLevel(level);
265 }
266 Close();
267 std::vector<int> buff_prefectch_level_count = {0, 0, 0};
268 TryReopen(options);
269 {
270 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
271 fs->ClearPrefetchCount();
272 buff_prefetch_count = 0;
273
274 for (int level = 2; level >= 0; level--) {
275 key_count = level * num_keys_per_level;
276 switch (level) {
277 case 0:
278 // max_auto_readahead_size is set 0 so data and index blocks are not
279 // prefetched.
280 ASSERT_OK(db_->SetOptions(
281 {{"block_based_table_factory", "{max_auto_readahead_size=0;}"}}));
282 break;
283 case 1:
284 // max_auto_readahead_size is set less than
285 // initial_auto_readahead_size. So readahead_size remains equal to
286 // max_auto_readahead_size.
287 ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
288 "{max_auto_readahead_size=4096;}"}}));
289 break;
290 case 2:
291 ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
292 "{max_auto_readahead_size=65536;}"}}));
293 break;
294 default:
295 assert(false);
296 }
297
298 for (int i = 0; i < num_keys_per_level; ++i) {
299 iter->Seek(Key(key_count++));
300 iter->Next();
301 }
302
303 buff_prefectch_level_count[level] = buff_prefetch_count;
304 if (support_prefetch && !use_direct_io) {
305 if (level == 0) {
306 ASSERT_FALSE(fs->IsPrefetchCalled());
307 } else {
308 ASSERT_TRUE(fs->IsPrefetchCalled());
309 }
310 fs->ClearPrefetchCount();
311 } else {
312 ASSERT_FALSE(fs->IsPrefetchCalled());
313 if (level == 0) {
314 ASSERT_EQ(buff_prefetch_count, 0);
315 } else {
316 ASSERT_GT(buff_prefetch_count, 0);
317 }
318 buff_prefetch_count = 0;
319 }
320 }
321 }
322
323 if (!support_prefetch) {
324 ASSERT_GT(buff_prefectch_level_count[1], buff_prefectch_level_count[2]);
325 }
326
327 SyncPoint::GetInstance()->DisableProcessing();
328 SyncPoint::GetInstance()->ClearAllCallBacks();
329 Close();
330}
331
332TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) {
333 // First param is if the mockFS support_prefetch or not
334 bool support_prefetch =
335 std::get<0>(GetParam()) &&
336 test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
337
338 // Second param is if directIO is enabled or not
339 bool use_direct_io = std::get<1>(GetParam());
340
341 std::shared_ptr<MockFS> fs =
342 std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
343 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
344
345 Options options = CurrentOptions();
346 options.write_buffer_size = 1024;
347 options.create_if_missing = true;
348 options.compression = kNoCompression;
349 options.env = env.get();
350 options.disable_auto_compactions = true;
351 if (use_direct_io) {
352 options.use_direct_reads = true;
353 options.use_direct_io_for_flush_and_compaction = true;
354 }
355 BlockBasedTableOptions table_options;
356 table_options.no_block_cache = true;
357 table_options.cache_index_and_filter_blocks = false;
358 table_options.metadata_block_size = 1024;
359 table_options.index_type =
360 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
361 table_options.initial_auto_readahead_size = 0;
362 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
363
364 int buff_prefetch_count = 0;
365 // DB open will create table readers unless we reduce the table cache
366 // capacity. SanitizeOptions will set max_open_files to minimum of 20.
367 // Table cache is allocated with max_open_files - 10 as capacity. So
368 // override max_open_files to 10 so table cache capacity will become 0.
369 // This will prevent file open during DB open and force the file to be
370 // opened during Iteration.
371 SyncPoint::GetInstance()->SetCallBack(
372 "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) {
373 int* max_open_files = (int*)arg;
374 *max_open_files = 11;
375 });
376
377 SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
378 [&](void*) { buff_prefetch_count++; });
379
380 SyncPoint::GetInstance()->EnableProcessing();
381
382 SyncPoint::GetInstance()->EnableProcessing();
383
384 Status s = TryReopen(options);
385
386 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
387 // If direct IO is not supported, skip the test
388 return;
389 } else {
390 ASSERT_OK(s);
391 }
392
393 Random rnd(309);
394 int key_count = 0;
395 const int num_keys_per_level = 100;
396 // Level 0 : Keys in range [0, 99], Level 1:[100, 199], Level 2:[200, 299].
397 for (int level = 2; level >= 0; level--) {
398 key_count = level * num_keys_per_level;
399 for (int i = 0; i < num_keys_per_level; ++i) {
400 ASSERT_OK(Put(Key(key_count++), rnd.RandomString(500)));
401 }
402 ASSERT_OK(Flush());
403 MoveFilesToLevel(level);
404 }
405 Close();
406
407 TryReopen(options);
408 {
409 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
410 fs->ClearPrefetchCount();
411 buff_prefetch_count = 0;
412 std::vector<int> buff_prefetch_level_count = {0, 0, 0};
413
414 for (int level = 2; level >= 0; level--) {
415 key_count = level * num_keys_per_level;
416 switch (level) {
417 case 0:
418 // initial_auto_readahead_size is set 0 so data and index blocks are
419 // not prefetched.
420 ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
421 "{initial_auto_readahead_size=0;}"}}));
422 break;
423 case 1:
424 // intial_auto_readahead_size and max_auto_readahead_size are set same
425 // so readahead_size remains same.
426 ASSERT_OK(db_->SetOptions({{"block_based_table_factory",
427 "{initial_auto_readahead_size=4096;max_"
428 "auto_readahead_size=4096;}"}}));
429 break;
430 case 2:
431 ASSERT_OK(
432 db_->SetOptions({{"block_based_table_factory",
433 "{initial_auto_readahead_size=65536;}"}}));
434 break;
435 default:
436 assert(false);
437 }
438
439 for (int i = 0; i < num_keys_per_level; ++i) {
440 iter->Seek(Key(key_count++));
441 iter->Next();
442 }
443
444 buff_prefetch_level_count[level] = buff_prefetch_count;
445 if (support_prefetch && !use_direct_io) {
446 if (level == 0) {
447 ASSERT_FALSE(fs->IsPrefetchCalled());
448 } else {
449 ASSERT_TRUE(fs->IsPrefetchCalled());
450 }
451 fs->ClearPrefetchCount();
452 } else {
453 ASSERT_FALSE(fs->IsPrefetchCalled());
454 if (level == 0) {
455 ASSERT_EQ(buff_prefetch_count, 0);
456 } else {
457 ASSERT_GT(buff_prefetch_count, 0);
458 }
459 buff_prefetch_count = 0;
460 }
461 }
462 if (!support_prefetch) {
463 ASSERT_GT(buff_prefetch_level_count[1], buff_prefetch_level_count[2]);
464 }
465 }
466 SyncPoint::GetInstance()->DisableProcessing();
467 SyncPoint::GetInstance()->ClearAllCallBacks();
468 Close();
469}
470
471TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) {
472 // First param is if the mockFS support_prefetch or not
473 bool support_prefetch =
474 std::get<0>(GetParam()) &&
475 test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
476
477 const int kNumKeys = 2000;
478 std::shared_ptr<MockFS> fs =
479 std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
480 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
481
482 // Second param is if directIO is enabled or not
483 bool use_direct_io = std::get<1>(GetParam());
484
485 Options options = CurrentOptions();
486 options.write_buffer_size = 1024;
487 options.create_if_missing = true;
488 options.compression = kNoCompression;
489 options.env = env.get();
490
491 BlockBasedTableOptions table_options;
492 table_options.no_block_cache = true;
493 table_options.cache_index_and_filter_blocks = false;
494 table_options.metadata_block_size = 1024;
495 table_options.index_type =
496 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
497 table_options.num_file_reads_for_auto_readahead = 0;
498 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
499
500 if (use_direct_io) {
501 options.use_direct_reads = true;
502 options.use_direct_io_for_flush_and_compaction = true;
503 }
504
505 int buff_prefetch_count = 0;
506 SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
507 [&](void*) { buff_prefetch_count++; });
508 SyncPoint::GetInstance()->EnableProcessing();
509
510 Status s = TryReopen(options);
511 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
512 // If direct IO is not supported, skip the test
513 return;
514 } else {
515 ASSERT_OK(s);
516 }
517
518 WriteBatch batch;
519 Random rnd(309);
520 for (int i = 0; i < kNumKeys; i++) {
521 ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
522 }
523 ASSERT_OK(db_->Write(WriteOptions(), &batch));
524
525 std::string start_key = BuildKey(0);
526 std::string end_key = BuildKey(kNumKeys - 1);
527 Slice least(start_key.data(), start_key.size());
528 Slice greatest(end_key.data(), end_key.size());
529
530 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
531
532 Close();
533 TryReopen(options);
534
535 fs->ClearPrefetchCount();
536 buff_prefetch_count = 0;
537
538 {
539 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
540 /*
541 * Reseek keys from sequential Data Blocks within same partitioned
542 * index. It will prefetch the data block at the first seek since
543 * num_file_reads_for_auto_readahead = 0. Data Block size is nearly 4076 so
544 * readahead will fetch 8 * 1024 data more initially (2 more data blocks).
545 */
546 iter->Seek(BuildKey(0)); // Prefetch data + index block since
547 // num_file_reads_for_auto_readahead = 0.
548 ASSERT_TRUE(iter->Valid());
549 iter->Seek(BuildKey(1000)); // In buffer
550 ASSERT_TRUE(iter->Valid());
551 iter->Seek(BuildKey(1004)); // In buffer
552 ASSERT_TRUE(iter->Valid());
553 iter->Seek(BuildKey(1008)); // Prefetch Data
554 ASSERT_TRUE(iter->Valid());
555 iter->Seek(BuildKey(1011)); // In buffer
556 ASSERT_TRUE(iter->Valid());
557 iter->Seek(BuildKey(1015)); // In buffer
558 ASSERT_TRUE(iter->Valid());
559 iter->Seek(BuildKey(1019)); // In buffer
560 ASSERT_TRUE(iter->Valid());
561 // Missed 2 blocks but they are already in buffer so no reset.
562 iter->Seek(BuildKey(103)); // Already in buffer.
563 ASSERT_TRUE(iter->Valid());
564 iter->Seek(BuildKey(1033)); // Prefetch Data.
565 ASSERT_TRUE(iter->Valid());
566 if (support_prefetch && !use_direct_io) {
567 ASSERT_EQ(fs->GetPrefetchCount(), 4);
568 fs->ClearPrefetchCount();
569 } else {
570 ASSERT_EQ(buff_prefetch_count, 4);
571 buff_prefetch_count = 0;
572 }
573 }
574
575 SyncPoint::GetInstance()->DisableProcessing();
576 SyncPoint::GetInstance()->ClearAllCallBacks();
577 Close();
578}
579#endif // !ROCKSDB_LITE
580
581TEST_P(PrefetchTest, PrefetchWhenReseek) {
582 // First param is if the mockFS support_prefetch or not
583 bool support_prefetch =
584 std::get<0>(GetParam()) &&
585 test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
586
587 const int kNumKeys = 2000;
588 std::shared_ptr<MockFS> fs =
589 std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
590 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
591
592 // Second param is if directIO is enabled or not
593 bool use_direct_io = std::get<1>(GetParam());
594
595 Options options = CurrentOptions();
596 options.write_buffer_size = 1024;
597 options.create_if_missing = true;
598 options.compression = kNoCompression;
599 options.env = env.get();
600
601 BlockBasedTableOptions table_options;
602 table_options.no_block_cache = true;
603 table_options.cache_index_and_filter_blocks = false;
604 table_options.metadata_block_size = 1024;
605 table_options.index_type =
606 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
607 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
608
609 if (use_direct_io) {
610 options.use_direct_reads = true;
611 options.use_direct_io_for_flush_and_compaction = true;
612 }
613
614 int buff_prefetch_count = 0;
615 SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
616 [&](void*) { buff_prefetch_count++; });
617 SyncPoint::GetInstance()->EnableProcessing();
618
619 Status s = TryReopen(options);
620 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
621 // If direct IO is not supported, skip the test
622 return;
623 } else {
624 ASSERT_OK(s);
625 }
626
627 WriteBatch batch;
628 Random rnd(309);
629 for (int i = 0; i < kNumKeys; i++) {
630 ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
631 }
632 ASSERT_OK(db_->Write(WriteOptions(), &batch));
633
634 std::string start_key = BuildKey(0);
635 std::string end_key = BuildKey(kNumKeys - 1);
636 Slice least(start_key.data(), start_key.size());
637 Slice greatest(end_key.data(), end_key.size());
638
639 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
640
641 fs->ClearPrefetchCount();
642 buff_prefetch_count = 0;
643
644 {
645 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
646 /*
647 * Reseek keys from sequential Data Blocks within same partitioned
648 * index. After 2 sequential reads it will prefetch the data block.
649 * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data more
650 * initially (2 more data blocks).
651 */
652 iter->Seek(BuildKey(0));
653 ASSERT_TRUE(iter->Valid());
654 iter->Seek(BuildKey(1000));
655 ASSERT_TRUE(iter->Valid());
656 iter->Seek(BuildKey(1004)); // Prefetch Data
657 ASSERT_TRUE(iter->Valid());
658 iter->Seek(BuildKey(1008));
659 ASSERT_TRUE(iter->Valid());
660 iter->Seek(BuildKey(1011));
661 ASSERT_TRUE(iter->Valid());
662 iter->Seek(BuildKey(1015)); // Prefetch Data
663 ASSERT_TRUE(iter->Valid());
664 iter->Seek(BuildKey(1019));
665 ASSERT_TRUE(iter->Valid());
666 // Missed 2 blocks but they are already in buffer so no reset.
667 iter->Seek(BuildKey(103)); // Already in buffer.
668 ASSERT_TRUE(iter->Valid());
669 iter->Seek(BuildKey(1033)); // Prefetch Data
670 ASSERT_TRUE(iter->Valid());
671 if (support_prefetch && !use_direct_io) {
672 ASSERT_EQ(fs->GetPrefetchCount(), 3);
673 fs->ClearPrefetchCount();
674 } else {
675 ASSERT_EQ(buff_prefetch_count, 3);
676 buff_prefetch_count = 0;
677 }
678 }
679 {
680 /*
681 * Reseek keys from non sequential data blocks within same partitioned
682 * index. buff_prefetch_count will be 0 in that case.
683 */
684 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
685 iter->Seek(BuildKey(0));
686 ASSERT_TRUE(iter->Valid());
687 iter->Seek(BuildKey(1008));
688 ASSERT_TRUE(iter->Valid());
689 iter->Seek(BuildKey(1019));
690 ASSERT_TRUE(iter->Valid());
691 iter->Seek(BuildKey(1033));
692 ASSERT_TRUE(iter->Valid());
693 iter->Seek(BuildKey(1048));
694 ASSERT_TRUE(iter->Valid());
695 if (support_prefetch && !use_direct_io) {
696 ASSERT_EQ(fs->GetPrefetchCount(), 0);
697 fs->ClearPrefetchCount();
698 } else {
699 ASSERT_EQ(buff_prefetch_count, 0);
700 buff_prefetch_count = 0;
701 }
702 }
703 {
704 /*
705 * Reesek keys from Single Data Block.
706 */
707 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
708 iter->Seek(BuildKey(0));
709 ASSERT_TRUE(iter->Valid());
710 iter->Seek(BuildKey(1));
711 ASSERT_TRUE(iter->Valid());
712 iter->Seek(BuildKey(10));
713 ASSERT_TRUE(iter->Valid());
714 iter->Seek(BuildKey(100));
715 ASSERT_TRUE(iter->Valid());
716 if (support_prefetch && !use_direct_io) {
717 ASSERT_EQ(fs->GetPrefetchCount(), 0);
718 fs->ClearPrefetchCount();
719 } else {
720 ASSERT_EQ(buff_prefetch_count, 0);
721 buff_prefetch_count = 0;
722 }
723 }
724 {
725 /*
726 * Reseek keys from sequential data blocks to set implicit auto readahead
727 * and prefetch data but after that iterate over different (non sequential)
728 * data blocks which won't prefetch any data further. So buff_prefetch_count
729 * will be 1 for the first one.
730 */
731 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
732 iter->Seek(BuildKey(0));
733 ASSERT_TRUE(iter->Valid());
734 iter->Seek(BuildKey(1000));
735 ASSERT_TRUE(iter->Valid());
736 iter->Seek(BuildKey(1004)); // This iteration will prefetch buffer
737 ASSERT_TRUE(iter->Valid());
738 iter->Seek(BuildKey(1008));
739 ASSERT_TRUE(iter->Valid());
740 iter->Seek(
741 BuildKey(996)); // Reseek won't prefetch any data and
742 // readahead_size will be initiallized to 8*1024.
743 ASSERT_TRUE(iter->Valid());
744 iter->Seek(BuildKey(992));
745 ASSERT_TRUE(iter->Valid());
746 iter->Seek(BuildKey(989));
747 ASSERT_TRUE(iter->Valid());
748 if (support_prefetch && !use_direct_io) {
749 ASSERT_EQ(fs->GetPrefetchCount(), 1);
750 fs->ClearPrefetchCount();
751 } else {
752 ASSERT_EQ(buff_prefetch_count, 1);
753 buff_prefetch_count = 0;
754 }
20effc67 755
1e59de90
TL
756 // Read sequentially to confirm readahead_size is reset to initial value (2
757 // more data blocks)
758 iter->Seek(BuildKey(1011));
759 ASSERT_TRUE(iter->Valid());
760 iter->Seek(BuildKey(1015));
761 ASSERT_TRUE(iter->Valid());
762 iter->Seek(BuildKey(1019)); // Prefetch Data
763 ASSERT_TRUE(iter->Valid());
764 iter->Seek(BuildKey(1022));
765 ASSERT_TRUE(iter->Valid());
766 iter->Seek(BuildKey(1026));
767 ASSERT_TRUE(iter->Valid());
768 iter->Seek(BuildKey(103)); // Prefetch Data
769 ASSERT_TRUE(iter->Valid());
770 if (support_prefetch && !use_direct_io) {
771 ASSERT_EQ(fs->GetPrefetchCount(), 2);
772 fs->ClearPrefetchCount();
773 } else {
774 ASSERT_EQ(buff_prefetch_count, 2);
775 buff_prefetch_count = 0;
776 }
777 }
778 {
779 /* Reseek keys from sequential partitioned index block. Since partitioned
780 * index fetch are sequential, buff_prefetch_count will be 1.
781 */
782 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
783 iter->Seek(BuildKey(0));
784 ASSERT_TRUE(iter->Valid());
785 iter->Seek(BuildKey(1167));
786 ASSERT_TRUE(iter->Valid());
787 iter->Seek(BuildKey(1334)); // This iteration will prefetch buffer
788 ASSERT_TRUE(iter->Valid());
789 iter->Seek(BuildKey(1499));
790 ASSERT_TRUE(iter->Valid());
791 iter->Seek(BuildKey(1667));
792 ASSERT_TRUE(iter->Valid());
793 iter->Seek(BuildKey(1847));
794 ASSERT_TRUE(iter->Valid());
795 iter->Seek(BuildKey(1999));
796 ASSERT_TRUE(iter->Valid());
797 if (support_prefetch && !use_direct_io) {
798 ASSERT_EQ(fs->GetPrefetchCount(), 1);
799 fs->ClearPrefetchCount();
800 } else {
801 ASSERT_EQ(buff_prefetch_count, 1);
802 buff_prefetch_count = 0;
803 }
804 }
805 {
806 /*
807 * Reseek over different keys from different blocks. buff_prefetch_count is
808 * set 0.
809 */
810 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
811 int i = 0;
812 int j = 1000;
813 do {
814 iter->Seek(BuildKey(i));
815 if (!iter->Valid()) {
816 break;
817 }
818 i = i + 100;
819 iter->Seek(BuildKey(j));
820 j = j + 100;
821 } while (i < 1000 && j < kNumKeys && iter->Valid());
822 if (support_prefetch && !use_direct_io) {
823 ASSERT_EQ(fs->GetPrefetchCount(), 0);
824 fs->ClearPrefetchCount();
825 } else {
826 ASSERT_EQ(buff_prefetch_count, 0);
827 buff_prefetch_count = 0;
828 }
829 }
830 {
831 /* Iterates sequentially over all keys. It will prefetch the buffer.*/
832 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
833 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
834 }
835 if (support_prefetch && !use_direct_io) {
836 ASSERT_EQ(fs->GetPrefetchCount(), 13);
837 fs->ClearPrefetchCount();
838 } else {
839 ASSERT_EQ(buff_prefetch_count, 13);
840 buff_prefetch_count = 0;
841 }
842 }
843
844 SyncPoint::GetInstance()->DisableProcessing();
845 SyncPoint::GetInstance()->ClearAllCallBacks();
846 Close();
847}
848
849TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) {
850 // First param is if the mockFS support_prefetch or not
851 bool support_prefetch =
852 std::get<0>(GetParam()) &&
853 test::IsPrefetchSupported(env_->GetFileSystem(), dbname_);
854
855 const int kNumKeys = 2000;
856 std::shared_ptr<MockFS> fs =
857 std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch);
858 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
859
860 // Second param is if directIO is enabled or not
861 bool use_direct_io = std::get<1>(GetParam());
862
863 Options options = CurrentOptions();
864 options.write_buffer_size = 1024;
865 options.create_if_missing = true;
866 options.compression = kNoCompression;
867 options.env = env.get();
868
869 BlockBasedTableOptions table_options;
870 std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB
871 table_options.block_cache = cache;
872 table_options.cache_index_and_filter_blocks = false;
873 table_options.metadata_block_size = 1024;
874 table_options.index_type =
875 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
876 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
877
878 if (use_direct_io) {
879 options.use_direct_reads = true;
880 options.use_direct_io_for_flush_and_compaction = true;
881 }
882
883 int buff_prefetch_count = 0;
884 SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
885 [&](void*) { buff_prefetch_count++; });
886 SyncPoint::GetInstance()->EnableProcessing();
887
888 Status s = TryReopen(options);
889 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
890 // If direct IO is not supported, skip the test
891 return;
892 } else {
893 ASSERT_OK(s);
894 }
895
896 WriteBatch batch;
897 Random rnd(309);
898 for (int i = 0; i < kNumKeys; i++) {
899 ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
900 }
901 ASSERT_OK(db_->Write(WriteOptions(), &batch));
902
903 std::string start_key = BuildKey(0);
904 std::string end_key = BuildKey(kNumKeys - 1);
905 Slice least(start_key.data(), start_key.size());
906 Slice greatest(end_key.data(), end_key.size());
907
908 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
909
910 fs->ClearPrefetchCount();
911 buff_prefetch_count = 0;
912
913 {
914 /*
915 * Reseek keys from sequential Data Blocks within same partitioned
916 * index. After 2 sequential reads it will prefetch the data block.
917 * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data more
918 * initially (2 more data blocks).
919 */
920 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
921 // Warm up the cache
922 iter->Seek(BuildKey(1011));
923 ASSERT_TRUE(iter->Valid());
924 iter->Seek(BuildKey(1015));
925 ASSERT_TRUE(iter->Valid());
926 iter->Seek(BuildKey(1019));
927 ASSERT_TRUE(iter->Valid());
928 if (support_prefetch && !use_direct_io) {
929 ASSERT_EQ(fs->GetPrefetchCount(), 1);
930 fs->ClearPrefetchCount();
931 } else {
932 ASSERT_EQ(buff_prefetch_count, 1);
933 buff_prefetch_count = 0;
934 }
935 }
936 {
937 // After caching, blocks will be read from cache (Sequential blocks)
938 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
939 iter->Seek(BuildKey(0));
940 ASSERT_TRUE(iter->Valid());
941 iter->Seek(BuildKey(1000));
942 ASSERT_TRUE(iter->Valid());
943 iter->Seek(BuildKey(1004)); // Prefetch data (not in cache).
944 ASSERT_TRUE(iter->Valid());
945 // Missed one sequential block but next is in already in buffer so readahead
946 // will not be reset.
947 iter->Seek(BuildKey(1011));
948 ASSERT_TRUE(iter->Valid());
949 // Prefetch data but blocks are in cache so no prefetch and reset.
950 iter->Seek(BuildKey(1015));
951 ASSERT_TRUE(iter->Valid());
952 iter->Seek(BuildKey(1019));
953 ASSERT_TRUE(iter->Valid());
954 iter->Seek(BuildKey(1022));
955 ASSERT_TRUE(iter->Valid());
956 // Prefetch data with readahead_size = 4 blocks.
957 iter->Seek(BuildKey(1026));
958 ASSERT_TRUE(iter->Valid());
959 iter->Seek(BuildKey(103));
960 ASSERT_TRUE(iter->Valid());
961 iter->Seek(BuildKey(1033));
962 ASSERT_TRUE(iter->Valid());
963 iter->Seek(BuildKey(1037));
964 ASSERT_TRUE(iter->Valid());
965
966 if (support_prefetch && !use_direct_io) {
967 ASSERT_EQ(fs->GetPrefetchCount(), 3);
968 fs->ClearPrefetchCount();
969 } else {
970 ASSERT_EQ(buff_prefetch_count, 2);
971 buff_prefetch_count = 0;
972 }
973 }
974
975 SyncPoint::GetInstance()->DisableProcessing();
976 SyncPoint::GetInstance()->ClearAllCallBacks();
977 Close();
978}
979
980#ifndef ROCKSDB_LITE
981TEST_P(PrefetchTest, DBIterLevelReadAhead) {
982 const int kNumKeys = 1000;
983 // Set options
984 std::shared_ptr<MockFS> fs =
985 std::make_shared<MockFS>(env_->GetFileSystem(), false);
986 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
987
988 bool use_direct_io = std::get<0>(GetParam());
989 bool is_adaptive_readahead = std::get<1>(GetParam());
990
991 Options options = CurrentOptions();
992 options.write_buffer_size = 1024;
993 options.create_if_missing = true;
994 options.compression = kNoCompression;
995 options.statistics = CreateDBStatistics();
996 options.env = env.get();
997
998 if (use_direct_io) {
999 options.use_direct_reads = true;
1000 options.use_direct_io_for_flush_and_compaction = true;
1001 }
1002 BlockBasedTableOptions table_options;
1003 table_options.no_block_cache = true;
1004 table_options.cache_index_and_filter_blocks = false;
1005 table_options.metadata_block_size = 1024;
1006 table_options.index_type =
1007 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1008 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1009
1010 Status s = TryReopen(options);
1011 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
1012 // If direct IO is not supported, skip the test
1013 return;
1014 } else {
1015 ASSERT_OK(s);
1016 }
1017
1018 WriteBatch batch;
1019 Random rnd(309);
1020 int total_keys = 0;
1021 for (int j = 0; j < 5; j++) {
1022 for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
1023 ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
1024 total_keys++;
1025 }
1026 ASSERT_OK(db_->Write(WriteOptions(), &batch));
1027 ASSERT_OK(Flush());
1028 }
1029 MoveFilesToLevel(2);
1030 int buff_prefetch_count = 0;
1031 int buff_async_prefetch_count = 0;
1032 int readahead_carry_over_count = 0;
1033 int num_sst_files = NumTableFilesAtLevel(2);
1034 size_t current_readahead_size = 0;
1035
1036 // Test - Iterate over the keys sequentially.
1037 {
1038 SyncPoint::GetInstance()->SetCallBack(
1039 "FilePrefetchBuffer::Prefetch:Start",
1040 [&](void*) { buff_prefetch_count++; });
1041
1042 SyncPoint::GetInstance()->SetCallBack(
1043 "FilePrefetchBuffer::PrefetchAsyncInternal:Start",
1044 [&](void*) { buff_async_prefetch_count++; });
1045
1046 // The callback checks, since reads are sequential, readahead_size doesn't
1047 // start from 8KB when iterator moves to next file and its called
1048 // num_sst_files-1 times (excluding for first file).
1049 SyncPoint::GetInstance()->SetCallBack(
1050 "BlockPrefetcher::SetReadaheadState", [&](void* arg) {
1051 readahead_carry_over_count++;
1052 size_t readahead_size = *reinterpret_cast<size_t*>(arg);
1053 if (readahead_carry_over_count) {
1054 ASSERT_GT(readahead_size, 8 * 1024);
1055 }
1056 });
1057
1058 SyncPoint::GetInstance()->SetCallBack(
1059 "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) {
1060 current_readahead_size = *reinterpret_cast<size_t*>(arg);
1061 ASSERT_GT(current_readahead_size, 0);
1062 });
1063
1064 SyncPoint::GetInstance()->EnableProcessing();
1065
1066 ReadOptions ro;
1067 if (is_adaptive_readahead) {
1068 ro.adaptive_readahead = true;
1069 ro.async_io = true;
1070 }
1071
1072 ASSERT_OK(options.statistics->Reset());
1073
1074 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
1075 int num_keys = 0;
1076 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
1077 ASSERT_OK(iter->status());
1078 num_keys++;
1079 }
1080 ASSERT_EQ(num_keys, total_keys);
1081
1082 // For index and data blocks.
1083 if (is_adaptive_readahead) {
1084 ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1));
1085 ASSERT_GT(buff_async_prefetch_count, 0);
1086 } else {
1087 ASSERT_GT(buff_prefetch_count, 0);
1088 ASSERT_EQ(readahead_carry_over_count, 0);
1089 }
1090
1091 // Check stats to make sure async prefetch is done.
1092 {
1093 HistogramData async_read_bytes;
1094 options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
1095 if (ro.async_io) {
1096 ASSERT_GT(async_read_bytes.count, 0);
1097 } else {
1098 ASSERT_EQ(async_read_bytes.count, 0);
1099 }
1100 }
1101
1102 SyncPoint::GetInstance()->DisableProcessing();
1103 SyncPoint::GetInstance()->ClearAllCallBacks();
1104 }
1105 Close();
1106}
1107#endif //! ROCKSDB_LITE
1108
1109class PrefetchTest1 : public DBTestBase,
1110 public ::testing::WithParamInterface<bool> {
1111 public:
1112 PrefetchTest1() : DBTestBase("prefetch_test1", true) {}
1113};
1114
1115INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool());
1116
1117#ifndef ROCKSDB_LITE
1118TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) {
1119 const int kNumKeys = 1000;
1120 // Set options
1121 std::shared_ptr<MockFS> fs =
1122 std::make_shared<MockFS>(env_->GetFileSystem(), false);
1123 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
1124
1125 Options options = CurrentOptions();
1126 options.write_buffer_size = 1024;
1127 options.create_if_missing = true;
1128 options.compression = kNoCompression;
1129 options.env = env.get();
1130 if (GetParam()) {
1131 options.use_direct_reads = true;
1132 options.use_direct_io_for_flush_and_compaction = true;
1133 }
1134 BlockBasedTableOptions table_options;
1135 table_options.no_block_cache = true;
1136 table_options.cache_index_and_filter_blocks = false;
1137 table_options.metadata_block_size = 1024;
1138 table_options.index_type =
1139 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1140 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1141
1142 Status s = TryReopen(options);
1143 if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) {
1144 // If direct IO is not supported, skip the test
1145 return;
1146 } else {
1147 ASSERT_OK(s);
1148 }
1149
1150 WriteBatch batch;
1151 Random rnd(309);
1152 for (int j = 0; j < 5; j++) {
1153 for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
1154 ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
1155 }
1156 ASSERT_OK(db_->Write(WriteOptions(), &batch));
1157 ASSERT_OK(Flush());
1158 }
1159 MoveFilesToLevel(2);
1160
1161 int buff_prefetch_count = 0;
1162 int set_readahead = 0;
1163 size_t readahead_size = 0;
1164
1165 SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
1166 [&](void*) { buff_prefetch_count++; });
1167 SyncPoint::GetInstance()->SetCallBack(
1168 "BlockPrefetcher::SetReadaheadState",
1169 [&](void* /*arg*/) { set_readahead++; });
1170 SyncPoint::GetInstance()->SetCallBack(
1171 "FilePrefetchBuffer::TryReadFromCache",
1172 [&](void* arg) { readahead_size = *reinterpret_cast<size_t*>(arg); });
1173
1174 SyncPoint::GetInstance()->EnableProcessing();
1175
1176 {
1177 // Iterate until prefetch is done.
1178 ReadOptions ro;
1179 ro.adaptive_readahead = true;
1180 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
1181
1182 iter->SeekToFirst();
1183 ASSERT_TRUE(iter->Valid());
1184
1185 while (iter->Valid() && buff_prefetch_count == 0) {
1186 iter->Next();
1187 }
1188
1189 ASSERT_EQ(readahead_size, 8 * 1024);
1190 ASSERT_EQ(buff_prefetch_count, 1);
1191 ASSERT_EQ(set_readahead, 0);
1192 buff_prefetch_count = 0;
1193
1194 // Move to last file and check readahead size fallbacks to 8KB. So next
1195 // readahead size after prefetch should be 8 * 1024;
1196 iter->Seek(BuildKey(4004));
1197 ASSERT_TRUE(iter->Valid());
1198
1199 while (iter->Valid() && buff_prefetch_count == 0) {
1200 iter->Next();
1201 }
1202
1203 ASSERT_EQ(readahead_size, 8 * 1024);
1204 ASSERT_EQ(set_readahead, 0);
1205 ASSERT_EQ(buff_prefetch_count, 1);
1206 }
1207 Close();
1208}
1209#endif //! ROCKSDB_LITE
1210
1211TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) {
1212 const int kNumKeys = 2000;
1213 // Set options
1214 std::shared_ptr<MockFS> fs =
1215 std::make_shared<MockFS>(env_->GetFileSystem(), false);
1216 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
1217
1218 Options options = CurrentOptions();
1219 options.write_buffer_size = 1024;
1220 options.create_if_missing = true;
1221 options.compression = kNoCompression;
1222 options.env = env.get();
1223 if (GetParam()) {
1224 options.use_direct_reads = true;
1225 options.use_direct_io_for_flush_and_compaction = true;
1226 }
1227
1228 options.statistics = CreateDBStatistics();
1229 BlockBasedTableOptions table_options;
1230 std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB
1231 table_options.block_cache = cache;
1232 table_options.cache_index_and_filter_blocks = false;
1233 table_options.metadata_block_size = 1024;
1234 table_options.index_type =
1235 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1236 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1237
1238 Status s = TryReopen(options);
1239 if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) {
1240 // If direct IO is not supported, skip the test
1241 return;
1242 } else {
1243 ASSERT_OK(s);
1244 }
1245
1246 WriteBatch batch;
1247 Random rnd(309);
1248 for (int i = 0; i < kNumKeys; i++) {
1249 ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
1250 }
1251 ASSERT_OK(db_->Write(WriteOptions(), &batch));
1252
1253 std::string start_key = BuildKey(0);
1254 std::string end_key = BuildKey(kNumKeys - 1);
1255 Slice least(start_key.data(), start_key.size());
1256 Slice greatest(end_key.data(), end_key.size());
1257
1258 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
1259
1260 int buff_prefetch_count = 0;
1261 size_t current_readahead_size = 0;
1262 size_t expected_current_readahead_size = 8 * 1024;
1263 size_t decrease_readahead_size = 8 * 1024;
1264
1265 SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start",
1266 [&](void*) { buff_prefetch_count++; });
1267 SyncPoint::GetInstance()->SetCallBack(
1268 "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) {
1269 current_readahead_size = *reinterpret_cast<size_t*>(arg);
1270 });
1271
1272 SyncPoint::GetInstance()->EnableProcessing();
1273 ReadOptions ro;
1274 ro.adaptive_readahead = true;
1275 {
1276 /*
1277 * Reseek keys from sequential Data Blocks within same partitioned
1278 * index. After 2 sequential reads it will prefetch the data block.
1279 * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data
1280 * more initially (2 more data blocks).
1281 */
1282 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
1283 // Warm up the cache
1284 iter->Seek(BuildKey(1011));
1285 ASSERT_TRUE(iter->Valid());
1286 iter->Seek(BuildKey(1015));
1287 ASSERT_TRUE(iter->Valid());
1288 iter->Seek(BuildKey(1019));
1289 ASSERT_TRUE(iter->Valid());
1290 buff_prefetch_count = 0;
1291 }
1292
1293 {
1294 ASSERT_OK(options.statistics->Reset());
1295 // After caching, blocks will be read from cache (Sequential blocks)
1296 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
1297 iter->Seek(
1298 BuildKey(0)); // In cache so it will decrease the readahead_size.
1299 ASSERT_TRUE(iter->Valid());
1300 expected_current_readahead_size = std::max(
1301 decrease_readahead_size,
1302 (expected_current_readahead_size >= decrease_readahead_size
1303 ? (expected_current_readahead_size - decrease_readahead_size)
1304 : 0));
1305
1306 iter->Seek(BuildKey(1000)); // Won't prefetch the block.
1307 ASSERT_TRUE(iter->Valid());
1308 ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
1309
1310 iter->Seek(BuildKey(1004)); // Prefetch the block.
1311 ASSERT_TRUE(iter->Valid());
1312 ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
1313 expected_current_readahead_size *= 2;
1314
1315 iter->Seek(BuildKey(1011));
1316 ASSERT_TRUE(iter->Valid());
1317
1318 // Eligible to Prefetch data (not in buffer) but block is in cache so no
1319 // prefetch will happen and will result in decrease in readahead_size.
1320 // readahead_size will be 8 * 1024
1321 iter->Seek(BuildKey(1015));
1322 ASSERT_TRUE(iter->Valid());
1323 expected_current_readahead_size = std::max(
1324 decrease_readahead_size,
1325 (expected_current_readahead_size >= decrease_readahead_size
1326 ? (expected_current_readahead_size - decrease_readahead_size)
1327 : 0));
1328
1329 // 1016 is the same block as 1015. So no change in readahead_size.
1330 iter->Seek(BuildKey(1016));
1331 ASSERT_TRUE(iter->Valid());
1332
1333 // Prefetch data (not in buffer) but found in cache. So decrease
1334 // readahead_size. Since it will 0 after decrementing so readahead_size will
1335 // be set to initial value.
1336 iter->Seek(BuildKey(1019));
1337 ASSERT_TRUE(iter->Valid());
1338 expected_current_readahead_size = std::max(
1339 decrease_readahead_size,
1340 (expected_current_readahead_size >= decrease_readahead_size
1341 ? (expected_current_readahead_size - decrease_readahead_size)
1342 : 0));
1343
1344 // Prefetch next sequential data.
1345 iter->Seek(BuildKey(1022));
1346 ASSERT_TRUE(iter->Valid());
1347 ASSERT_EQ(current_readahead_size, expected_current_readahead_size);
1348 ASSERT_EQ(buff_prefetch_count, 2);
1349
1350 buff_prefetch_count = 0;
1351 }
1352 Close();
1353}
1354
1355TEST_P(PrefetchTest1, SeekParallelizationTest) {
1356 const int kNumKeys = 2000;
1357 // Set options
1358 std::shared_ptr<MockFS> fs =
1359 std::make_shared<MockFS>(env_->GetFileSystem(), false);
1360 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
1361
1362 Options options = CurrentOptions();
1363 options.write_buffer_size = 1024;
1364 options.create_if_missing = true;
1365 options.compression = kNoCompression;
1366 options.env = env.get();
1367 if (GetParam()) {
1368 options.use_direct_reads = true;
1369 options.use_direct_io_for_flush_and_compaction = true;
1370 }
1371
1372 options.statistics = CreateDBStatistics();
1373 BlockBasedTableOptions table_options;
1374 table_options.no_block_cache = true;
1375 table_options.cache_index_and_filter_blocks = false;
1376 table_options.metadata_block_size = 1024;
1377 table_options.index_type =
1378 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1379 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1380
1381 Status s = TryReopen(options);
1382 if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) {
1383 // If direct IO is not supported, skip the test
1384 return;
1385 } else {
1386 ASSERT_OK(s);
1387 }
1388
1389 WriteBatch batch;
1390 Random rnd(309);
1391 for (int i = 0; i < kNumKeys; i++) {
1392 ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
1393 }
1394 ASSERT_OK(db_->Write(WriteOptions(), &batch));
1395
1396 std::string start_key = BuildKey(0);
1397 std::string end_key = BuildKey(kNumKeys - 1);
1398 Slice least(start_key.data(), start_key.size());
1399 Slice greatest(end_key.data(), end_key.size());
1400
1401 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
1402
1403 int buff_prefetch_count = 0;
1404
1405 SyncPoint::GetInstance()->SetCallBack(
1406 "FilePrefetchBuffer::PrefetchAsyncInternal:Start",
1407 [&](void*) { buff_prefetch_count++; });
1408
1409 SyncPoint::GetInstance()->EnableProcessing();
1410 ReadOptions ro;
1411 ro.adaptive_readahead = true;
1412 ro.async_io = true;
1413
1414 {
1415 ASSERT_OK(options.statistics->Reset());
1416 // Each block contains around 4 keys.
1417 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
1418 iter->Seek(BuildKey(0)); // Prefetch data because of seek parallelization.
1419 ASSERT_TRUE(iter->Valid());
1420 iter->Next();
1421 ASSERT_TRUE(iter->Valid());
1422 iter->Next();
1423 ASSERT_TRUE(iter->Valid());
1424 iter->Next();
1425 ASSERT_TRUE(iter->Valid());
1426
1427 // New data block. Since num_file_reads in FilePrefetch after this read is
1428 // 2, it won't go for prefetching.
1429 iter->Next();
1430 ASSERT_TRUE(iter->Valid());
1431 iter->Next();
1432 ASSERT_TRUE(iter->Valid());
1433 iter->Next();
1434 ASSERT_TRUE(iter->Valid());
1435 iter->Next();
1436 ASSERT_TRUE(iter->Valid());
1437
1438 // Prefetch data.
1439 iter->Next();
1440 ASSERT_TRUE(iter->Valid());
1441
1442 ASSERT_EQ(buff_prefetch_count, 2);
1443
1444 // Check stats to make sure async prefetch is done.
1445 {
1446 HistogramData async_read_bytes;
1447 options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
1448 ASSERT_GT(async_read_bytes.count, 0);
1449 ASSERT_GT(get_perf_context()->number_async_seek, 0);
1450 }
1451
1452 buff_prefetch_count = 0;
1453 }
1454 Close();
1455}
1456
1457extern "C" bool RocksDbIOUringEnable() { return true; }
1458
1459namespace {
1460#ifndef ROCKSDB_LITE
1461#ifdef GFLAGS
1462const int kMaxArgCount = 100;
1463const size_t kArgBufferSize = 100000;
1464
1465void RunIOTracerParserTool(std::string trace_file) {
1466 std::vector<std::string> params = {"./io_tracer_parser",
1467 "-io_trace_file=" + trace_file};
1468
1469 char arg_buffer[kArgBufferSize];
1470 char* argv[kMaxArgCount];
1471 int argc = 0;
1472 int cursor = 0;
1473 for (const auto& arg : params) {
1474 ASSERT_LE(cursor + arg.size() + 1, kArgBufferSize);
1475 ASSERT_LE(argc + 1, kMaxArgCount);
1476
1477 snprintf(arg_buffer + cursor, arg.size() + 1, "%s", arg.c_str());
1478
1479 argv[argc++] = arg_buffer + cursor;
1480 cursor += static_cast<int>(arg.size()) + 1;
1481 }
1482 ASSERT_EQ(0, ROCKSDB_NAMESPACE::io_tracer_parser(argc, argv));
1483}
1484#endif // GFLAGS
1485#endif // ROCKSDB_LITE
1486} // namespace
1487
1488// Tests the default implementation of ReadAsync API with PosixFileSystem.
1489TEST_P(PrefetchTest, ReadAsyncWithPosixFS) {
1490 if (mem_env_ || encrypted_env_) {
1491 ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
1492 return;
1493 }
1494
1495 const int kNumKeys = 1000;
1496 std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
1497 FileSystem::Default(), /*support_prefetch=*/false);
1498 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
1499
1500 bool use_direct_io = std::get<0>(GetParam());
1501 Options options = CurrentOptions();
1502 options.write_buffer_size = 1024;
1503 options.create_if_missing = true;
1504 options.compression = kNoCompression;
1505 options.env = env.get();
1506 options.statistics = CreateDBStatistics();
1507 if (use_direct_io) {
1508 options.use_direct_reads = true;
1509 options.use_direct_io_for_flush_and_compaction = true;
1510 }
1511 BlockBasedTableOptions table_options;
1512 table_options.no_block_cache = true;
1513 table_options.cache_index_and_filter_blocks = false;
1514 table_options.metadata_block_size = 1024;
1515 table_options.index_type =
1516 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1517 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1518
1519 Status s = TryReopen(options);
1520 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
1521 // If direct IO is not supported, skip the test
1522 return;
1523 } else {
1524 ASSERT_OK(s);
1525 }
1526
1527 int total_keys = 0;
1528 // Write the keys.
1529 {
1530 WriteBatch batch;
1531 Random rnd(309);
1532 for (int j = 0; j < 5; j++) {
1533 for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
1534 ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
1535 total_keys++;
1536 }
1537 ASSERT_OK(db_->Write(WriteOptions(), &batch));
1538 ASSERT_OK(Flush());
1539 }
1540 MoveFilesToLevel(2);
1541 }
1542
1543 int buff_prefetch_count = 0;
1544 bool read_async_called = false;
1545 ReadOptions ro;
1546 ro.adaptive_readahead = true;
1547 ro.async_io = true;
1548
1549 if (std::get<1>(GetParam())) {
1550 ro.readahead_size = 16 * 1024;
1551 }
1552
1553 SyncPoint::GetInstance()->SetCallBack(
1554 "FilePrefetchBuffer::PrefetchAsyncInternal:Start",
1555 [&](void*) { buff_prefetch_count++; });
1556
1557 SyncPoint::GetInstance()->SetCallBack(
1558 "UpdateResults::io_uring_result",
1559 [&](void* /*arg*/) { read_async_called = true; });
1560 SyncPoint::GetInstance()->EnableProcessing();
1561
1562 // Read the keys.
1563 {
1564 ASSERT_OK(options.statistics->Reset());
1565 get_perf_context()->Reset();
1566
1567 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
1568 int num_keys = 0;
1569 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
1570 ASSERT_OK(iter->status());
1571 num_keys++;
1572 }
1573
1574 ASSERT_EQ(num_keys, total_keys);
1575 ASSERT_GT(buff_prefetch_count, 0);
1576
1577 // Check stats to make sure async prefetch is done.
1578 {
1579 HistogramData async_read_bytes;
1580 options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
1581 HistogramData prefetched_bytes_discarded;
1582 options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED,
1583 &prefetched_bytes_discarded);
1584
1585 // Not all platforms support iouring. In that case, ReadAsync in posix
1586 // won't submit async requests.
1587 if (read_async_called) {
1588 ASSERT_GT(async_read_bytes.count, 0);
1589 } else {
1590 ASSERT_EQ(async_read_bytes.count, 0);
1591 }
1592 ASSERT_GT(prefetched_bytes_discarded.count, 0);
1593 }
1594 ASSERT_EQ(get_perf_context()->number_async_seek, 0);
1595 }
1596
1597 SyncPoint::GetInstance()->DisableProcessing();
1598 SyncPoint::GetInstance()->ClearAllCallBacks();
1599
1600 Close();
1601}
1602
1603TEST_P(PrefetchTest, MultipleSeekWithPosixFS) {
1604 if (mem_env_ || encrypted_env_) {
1605 ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
1606 return;
1607 }
1608
1609 const int kNumKeys = 1000;
1610 std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
1611 FileSystem::Default(), /*support_prefetch=*/false);
1612 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
1613
1614 bool use_direct_io = std::get<0>(GetParam());
1615 Options options = CurrentOptions();
1616 options.write_buffer_size = 1024;
1617 options.create_if_missing = true;
1618 options.compression = kNoCompression;
1619 options.env = env.get();
1620 options.statistics = CreateDBStatistics();
1621 if (use_direct_io) {
1622 options.use_direct_reads = true;
1623 options.use_direct_io_for_flush_and_compaction = true;
1624 }
1625 BlockBasedTableOptions table_options;
1626 table_options.no_block_cache = true;
1627 table_options.cache_index_and_filter_blocks = false;
1628 table_options.metadata_block_size = 1024;
1629 table_options.index_type =
1630 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1631 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1632
1633 Status s = TryReopen(options);
1634 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
1635 // If direct IO is not supported, skip the test
1636 return;
1637 } else {
1638 ASSERT_OK(s);
1639 }
1640
1641 int total_keys = 0;
1642 // Write the keys.
1643 {
1644 WriteBatch batch;
1645 Random rnd(309);
1646 for (int j = 0; j < 5; j++) {
1647 for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
1648 ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
1649 total_keys++;
1650 }
1651 ASSERT_OK(db_->Write(WriteOptions(), &batch));
1652 ASSERT_OK(Flush());
1653 }
1654 MoveFilesToLevel(2);
1655 }
1656
1657 int num_keys_first_batch = 0;
1658 int num_keys_second_batch = 0;
1659 // Calculate number of keys without async_io for correctness validation.
1660 {
1661 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions()));
1662 // First Seek.
1663 iter->Seek(BuildKey(450));
1664 while (iter->Valid() && num_keys_first_batch < 100) {
1665 ASSERT_OK(iter->status());
1666 num_keys_first_batch++;
1667 iter->Next();
1668 }
1669 ASSERT_OK(iter->status());
1670
1671 iter->Seek(BuildKey(942));
1672 while (iter->Valid()) {
1673 ASSERT_OK(iter->status());
1674 num_keys_second_batch++;
1675 iter->Next();
1676 }
1677 ASSERT_OK(iter->status());
1678 }
1679
1680 int buff_prefetch_count = 0;
1681 bool read_async_called = false;
1682 ReadOptions ro;
1683 ro.adaptive_readahead = true;
1684 ro.async_io = true;
1685
1686 if (std::get<1>(GetParam())) {
1687 ro.readahead_size = 16 * 1024;
1688 }
1689
1690 SyncPoint::GetInstance()->SetCallBack(
1691 "FilePrefetchBuffer::PrefetchAsyncInternal:Start",
1692 [&](void*) { buff_prefetch_count++; });
1693
1694 SyncPoint::GetInstance()->SetCallBack(
1695 "UpdateResults::io_uring_result",
1696 [&](void* /*arg*/) { read_async_called = true; });
1697 SyncPoint::GetInstance()->EnableProcessing();
1698
1699 // Read the keys using seek.
1700 {
1701 ASSERT_OK(options.statistics->Reset());
1702 get_perf_context()->Reset();
1703
1704 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
1705 int num_keys = 0;
1706 // First Seek.
1707 {
1708 iter->Seek(BuildKey(450));
1709 while (iter->Valid() && num_keys < 100) {
1710 ASSERT_OK(iter->status());
1711 num_keys++;
1712 iter->Next();
1713 }
1714 ASSERT_OK(iter->status());
1715 ASSERT_EQ(num_keys, num_keys_first_batch);
1716 // Check stats to make sure async prefetch is done.
1717 {
1718 HistogramData async_read_bytes;
1719 options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
1720
1721 // Not all platforms support iouring. In that case, ReadAsync in posix
1722 // won't submit async requests.
1723 if (read_async_called) {
1724 ASSERT_GT(async_read_bytes.count, 0);
1725 ASSERT_GT(get_perf_context()->number_async_seek, 0);
1726 } else {
1727 ASSERT_EQ(async_read_bytes.count, 0);
1728 ASSERT_EQ(get_perf_context()->number_async_seek, 0);
1729 }
1730 }
1731 }
1732
1733 // Second Seek.
1734 {
1735 num_keys = 0;
1736 ASSERT_OK(options.statistics->Reset());
1737 get_perf_context()->Reset();
1738
1739 iter->Seek(BuildKey(942));
1740 while (iter->Valid()) {
1741 ASSERT_OK(iter->status());
1742 num_keys++;
1743 iter->Next();
1744 }
1745 ASSERT_OK(iter->status());
1746 ASSERT_EQ(num_keys, num_keys_second_batch);
1747
1748 ASSERT_GT(buff_prefetch_count, 0);
1749
1750 // Check stats to make sure async prefetch is done.
1751 {
1752 HistogramData async_read_bytes;
1753 options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
1754 HistogramData prefetched_bytes_discarded;
1755 options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED,
1756 &prefetched_bytes_discarded);
1757
1758 // Not all platforms support iouring. In that case, ReadAsync in posix
1759 // won't submit async requests.
1760 if (read_async_called) {
1761 ASSERT_GT(async_read_bytes.count, 0);
1762 ASSERT_GT(get_perf_context()->number_async_seek, 0);
1763 } else {
1764 ASSERT_EQ(async_read_bytes.count, 0);
1765 ASSERT_EQ(get_perf_context()->number_async_seek, 0);
1766 }
1767 ASSERT_GT(prefetched_bytes_discarded.count, 0);
1768 }
1769 }
1770 }
1771
1772 SyncPoint::GetInstance()->DisableProcessing();
1773 SyncPoint::GetInstance()->ClearAllCallBacks();
1774 Close();
1775}
1776
1777TEST_P(PrefetchTest, SeekParallelizationTest1) {
1778 if (mem_env_ || encrypted_env_) {
1779 ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
1780 return;
1781 }
1782 const int kNumKeys = 2000;
1783 // Set options
1784 std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
1785 FileSystem::Default(), /*support_prefetch=*/false);
1786 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
1787
1788 bool use_direct_io = std::get<0>(GetParam());
1789 Options options = CurrentOptions();
1790 options.write_buffer_size = 1024;
1791 options.create_if_missing = true;
1792 options.compression = kNoCompression;
1793 options.env = env.get();
1794 if (use_direct_io) {
1795 options.use_direct_reads = true;
1796 options.use_direct_io_for_flush_and_compaction = true;
1797 }
1798
1799 options.statistics = CreateDBStatistics();
1800 BlockBasedTableOptions table_options;
1801 table_options.no_block_cache = true;
1802 table_options.cache_index_and_filter_blocks = false;
1803 table_options.metadata_block_size = 1024;
1804 table_options.index_type =
1805 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1806 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1807
1808 Status s = TryReopen(options);
1809 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
1810 // If direct IO is not supported, skip the test
1811 return;
1812 } else {
1813 ASSERT_OK(s);
1814 }
1815
1816 WriteBatch batch;
1817 Random rnd(309);
1818 for (int i = 0; i < kNumKeys; i++) {
1819 ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
1820 }
1821 ASSERT_OK(db_->Write(WriteOptions(), &batch));
1822
1823 std::string start_key = BuildKey(0);
1824 std::string end_key = BuildKey(kNumKeys - 1);
1825 Slice least(start_key.data(), start_key.size());
1826 Slice greatest(end_key.data(), end_key.size());
1827
1828 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest));
1829
1830 int buff_prefetch_count = 0;
1831
1832 SyncPoint::GetInstance()->SetCallBack(
1833 "FilePrefetchBuffer::PrefetchAsyncInternal:Start",
1834 [&](void*) { buff_prefetch_count++; });
1835
1836 bool read_async_called = false;
1837 SyncPoint::GetInstance()->SetCallBack(
1838 "UpdateResults::io_uring_result",
1839 [&](void* /*arg*/) { read_async_called = true; });
1840 SyncPoint::GetInstance()->EnableProcessing();
1841
1842 SyncPoint::GetInstance()->EnableProcessing();
1843 ReadOptions ro;
1844 ro.adaptive_readahead = true;
1845 ro.async_io = true;
1846
1847 if (std::get<1>(GetParam())) {
1848 ro.readahead_size = 16 * 1024;
1849 }
1850
1851 {
1852 ASSERT_OK(options.statistics->Reset());
1853 // Each block contains around 4 keys.
1854 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
1855 iter->Seek(BuildKey(0)); // Prefetch data because of seek parallelization.
1856 ASSERT_TRUE(iter->Valid());
1857 iter->Next();
1858 ASSERT_TRUE(iter->Valid());
1859 iter->Next();
1860 ASSERT_TRUE(iter->Valid());
1861 iter->Next();
1862 ASSERT_TRUE(iter->Valid());
1863
1864 // New data block. Since num_file_reads in FilePrefetch after this read is
1865 // 2, it won't go for prefetching.
1866 iter->Next();
1867 ASSERT_TRUE(iter->Valid());
1868 iter->Next();
1869 ASSERT_TRUE(iter->Valid());
1870 iter->Next();
1871 ASSERT_TRUE(iter->Valid());
1872 iter->Next();
1873 ASSERT_TRUE(iter->Valid());
1874
1875 // Prefetch data.
1876 iter->Next();
1877 ASSERT_TRUE(iter->Valid());
1878
1879 // Check stats to make sure async prefetch is done.
1880 {
1881 HistogramData async_read_bytes;
1882 options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
1883 // Not all platforms support iouring. In that case, ReadAsync in posix
1884 // won't submit async requests.
1885 if (read_async_called) {
1886 ASSERT_GT(async_read_bytes.count, 0);
1887 ASSERT_GT(get_perf_context()->number_async_seek, 0);
1888 if (std::get<1>(GetParam())) {
1889 ASSERT_EQ(buff_prefetch_count, 1);
1890 } else {
1891 ASSERT_EQ(buff_prefetch_count, 2);
1892 }
1893 } else {
1894 ASSERT_EQ(async_read_bytes.count, 0);
1895 ASSERT_EQ(get_perf_context()->number_async_seek, 0);
1896 ASSERT_EQ(buff_prefetch_count, 1);
1897 }
1898 }
1899
1900 buff_prefetch_count = 0;
1901 }
1902 Close();
1903}
1904
1905#ifndef ROCKSDB_LITE
1906#ifdef GFLAGS
1907TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) {
1908 if (mem_env_ || encrypted_env_) {
1909 ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment");
1910 return;
1911 }
1912
1913 const int kNumKeys = 1000;
1914 std::shared_ptr<MockFS> fs = std::make_shared<MockFS>(
1915 FileSystem::Default(), /*support_prefetch=*/false);
1916 std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
1917
1918 bool use_direct_io = std::get<0>(GetParam());
1919 Options options = CurrentOptions();
1920 options.write_buffer_size = 1024;
1921 options.create_if_missing = true;
1922 options.compression = kNoCompression;
1923 options.env = env.get();
1924 options.statistics = CreateDBStatistics();
1925 if (use_direct_io) {
1926 options.use_direct_reads = true;
1927 options.use_direct_io_for_flush_and_compaction = true;
1928 }
1929 BlockBasedTableOptions table_options;
1930 table_options.no_block_cache = true;
1931 table_options.cache_index_and_filter_blocks = false;
1932 table_options.metadata_block_size = 1024;
1933 table_options.index_type =
1934 BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
1935 options.table_factory.reset(NewBlockBasedTableFactory(table_options));
1936
1937 Status s = TryReopen(options);
1938 if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) {
1939 // If direct IO is not supported, skip the test
1940 return;
1941 } else {
1942 ASSERT_OK(s);
1943 }
1944
1945 int total_keys = 0;
1946 // Write the keys.
1947 {
1948 WriteBatch batch;
1949 Random rnd(309);
1950 for (int j = 0; j < 5; j++) {
1951 for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) {
1952 ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000)));
1953 total_keys++;
1954 }
1955 ASSERT_OK(db_->Write(WriteOptions(), &batch));
1956 ASSERT_OK(Flush());
1957 }
1958 MoveFilesToLevel(2);
1959 }
1960
1961 int buff_prefetch_count = 0;
1962 bool read_async_called = false;
1963 ReadOptions ro;
1964 ro.adaptive_readahead = true;
1965 ro.async_io = true;
1966
1967 if (std::get<1>(GetParam())) {
1968 ro.readahead_size = 16 * 1024;
1969 }
1970
1971 SyncPoint::GetInstance()->SetCallBack(
1972 "FilePrefetchBuffer::PrefetchAsyncInternal:Start",
1973 [&](void*) { buff_prefetch_count++; });
1974
1975 SyncPoint::GetInstance()->SetCallBack(
1976 "UpdateResults::io_uring_result",
1977 [&](void* /*arg*/) { read_async_called = true; });
1978 SyncPoint::GetInstance()->EnableProcessing();
1979
1980 // Read the keys.
1981 {
1982 // Start io_tracing.
1983 WriteOptions write_opt;
1984 TraceOptions trace_opt;
1985 std::unique_ptr<TraceWriter> trace_writer;
1986 std::string trace_file_path = dbname_ + "/io_trace_file";
1987
1988 ASSERT_OK(
1989 NewFileTraceWriter(env_, EnvOptions(), trace_file_path, &trace_writer));
1990 ASSERT_OK(db_->StartIOTrace(trace_opt, std::move(trace_writer)));
1991 ASSERT_OK(options.statistics->Reset());
1992
1993 auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro));
1994 int num_keys = 0;
1995 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
1996 ASSERT_OK(iter->status());
1997 num_keys++;
1998 }
1999
2000 // End the tracing.
2001 ASSERT_OK(db_->EndIOTrace());
2002 ASSERT_OK(env_->FileExists(trace_file_path));
2003
2004 ASSERT_EQ(num_keys, total_keys);
2005 ASSERT_GT(buff_prefetch_count, 0);
2006
2007 // Check stats to make sure async prefetch is done.
2008 {
2009 HistogramData async_read_bytes;
2010 options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes);
2011 // Not all platforms support iouring. In that case, ReadAsync in posix
2012 // won't submit async requests.
2013 if (read_async_called) {
2014 ASSERT_GT(async_read_bytes.count, 0);
2015 } else {
2016 ASSERT_EQ(async_read_bytes.count, 0);
2017 }
2018 }
2019
2020 // Check the file to see if ReadAsync is logged.
2021 RunIOTracerParserTool(trace_file_path);
2022 }
2023
2024 SyncPoint::GetInstance()->DisableProcessing();
2025 SyncPoint::GetInstance()->ClearAllCallBacks();
2026
2027 Close();
2028}
2029#endif // GFLAGS
2030
2031class FilePrefetchBufferTest : public testing::Test {
2032 public:
2033 void SetUp() override {
2034 SetupSyncPointsToMockDirectIO();
2035 env_ = Env::Default();
2036 fs_ = FileSystem::Default();
2037 test_dir_ = test::PerThreadDBPath("file_prefetch_buffer_test");
2038 ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
2039 }
2040
2041 void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); }
2042
2043 void Write(const std::string& fname, const std::string& content) {
2044 std::unique_ptr<FSWritableFile> f;
2045 ASSERT_OK(fs_->NewWritableFile(Path(fname), FileOptions(), &f, nullptr));
2046 ASSERT_OK(f->Append(content, IOOptions(), nullptr));
2047 ASSERT_OK(f->Close(IOOptions(), nullptr));
2048 }
2049
2050 void Read(const std::string& fname, const FileOptions& opts,
2051 std::unique_ptr<RandomAccessFileReader>* reader) {
2052 std::string fpath = Path(fname);
2053 std::unique_ptr<FSRandomAccessFile> f;
2054 ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr));
2055 reader->reset(new RandomAccessFileReader(std::move(f), fpath,
2056 env_->GetSystemClock().get()));
2057 }
2058
2059 void AssertResult(const std::string& content,
2060 const std::vector<FSReadRequest>& reqs) {
2061 for (const auto& r : reqs) {
2062 ASSERT_OK(r.status);
2063 ASSERT_EQ(r.len, r.result.size());
2064 ASSERT_EQ(content.substr(r.offset, r.len), r.result.ToString());
2065 }
2066 }
2067
2068 FileSystem* fs() { return fs_.get(); }
2069
2070 private:
2071 Env* env_;
2072 std::shared_ptr<FileSystem> fs_;
2073 std::string test_dir_;
2074
2075 std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
2076};
2077
2078TEST_F(FilePrefetchBufferTest, SeekWithBlockCacheHit) {
2079 std::string fname = "seek-with-block-cache-hit";
2080 Random rand(0);
2081 std::string content = rand.RandomString(32768);
2082 Write(fname, content);
2083
2084 FileOptions opts;
2085 std::unique_ptr<RandomAccessFileReader> r;
2086 Read(fname, opts, &r);
2087
2088 FilePrefetchBuffer fpb(16384, 16384, true, false, false, 0, 0, fs());
2089 Slice result;
2090 // Simulate a seek of 4096 bytes at offset 0. Due to the readahead settings,
2091 // it will do two reads of 4096+8192 and 8192
2092 Status s = fpb.PrefetchAsync(IOOptions(), r.get(), 0, 4096, &result);
2093 ASSERT_EQ(s, Status::TryAgain());
2094 // Simulate a block cache hit
2095 fpb.UpdateReadPattern(0, 4096, false);
2096 // Now read some data that straddles the two prefetch buffers - offset 8192 to
2097 // 16384
2098 ASSERT_TRUE(fpb.TryReadFromCacheAsync(IOOptions(), r.get(), 8192, 8192,
2099 &result, &s, Env::IOPriority::IO_LOW));
2100}
2101#endif // ROCKSDB_LITE
20effc67
TL
2102} // namespace ROCKSDB_NAMESPACE
2103
2104int main(int argc, char** argv) {
1e59de90 2105 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
20effc67
TL
2106 ::testing::InitGoogleTest(&argc, argv);
2107
2108 return RUN_ALL_TESTS();
2109}