]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #include "db/db_test_util.h" | |
1e59de90 TL |
7 | #include "file/file_prefetch_buffer.h" |
8 | #include "file/file_util.h" | |
9 | #include "rocksdb/file_system.h" | |
20effc67 | 10 | #include "test_util/sync_point.h" |
1e59de90 TL |
11 | #ifdef GFLAGS |
12 | #include "tools/io_tracer_parser_tool.h" | |
13 | #endif | |
14 | #include "util/random.h" | |
20effc67 TL |
15 | |
16 | namespace ROCKSDB_NAMESPACE { | |
17 | ||
18 | class MockFS; | |
19 | ||
1e59de90 | 20 | class MockRandomAccessFile : public FSRandomAccessFileOwnerWrapper { |
20effc67 TL |
21 | public: |
22 | MockRandomAccessFile(std::unique_ptr<FSRandomAccessFile>& file, | |
23 | bool support_prefetch, std::atomic_int& prefetch_count) | |
1e59de90 | 24 | : FSRandomAccessFileOwnerWrapper(std::move(file)), |
20effc67 TL |
25 | support_prefetch_(support_prefetch), |
26 | prefetch_count_(prefetch_count) {} | |
27 | ||
28 | IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options, | |
29 | IODebugContext* dbg) override { | |
30 | if (support_prefetch_) { | |
31 | prefetch_count_.fetch_add(1); | |
32 | return target()->Prefetch(offset, n, options, dbg); | |
33 | } else { | |
34 | return IOStatus::NotSupported("Prefetch not supported"); | |
35 | } | |
36 | } | |
37 | ||
38 | private: | |
20effc67 TL |
39 | const bool support_prefetch_; |
40 | std::atomic_int& prefetch_count_; | |
41 | }; | |
42 | ||
43 | class MockFS : public FileSystemWrapper { | |
44 | public: | |
45 | explicit MockFS(const std::shared_ptr<FileSystem>& wrapped, | |
46 | bool support_prefetch) | |
47 | : FileSystemWrapper(wrapped), support_prefetch_(support_prefetch) {} | |
48 | ||
1e59de90 TL |
49 | static const char* kClassName() { return "MockFS"; } |
50 | const char* Name() const override { return kClassName(); } | |
51 | ||
20effc67 TL |
52 | IOStatus NewRandomAccessFile(const std::string& fname, |
53 | const FileOptions& opts, | |
54 | std::unique_ptr<FSRandomAccessFile>* result, | |
55 | IODebugContext* dbg) override { | |
56 | std::unique_ptr<FSRandomAccessFile> file; | |
57 | IOStatus s; | |
58 | s = target()->NewRandomAccessFile(fname, opts, &file, dbg); | |
59 | result->reset( | |
60 | new MockRandomAccessFile(file, support_prefetch_, prefetch_count_)); | |
61 | return s; | |
62 | } | |
63 | ||
64 | void ClearPrefetchCount() { prefetch_count_ = 0; } | |
65 | ||
66 | bool IsPrefetchCalled() { return prefetch_count_ > 0; } | |
67 | ||
1e59de90 TL |
68 | int GetPrefetchCount() { |
69 | return prefetch_count_.load(std::memory_order_relaxed); | |
70 | } | |
71 | ||
20effc67 TL |
72 | private: |
73 | const bool support_prefetch_; | |
74 | std::atomic_int prefetch_count_{0}; | |
75 | }; | |
76 | ||
77 | class PrefetchTest | |
78 | : public DBTestBase, | |
79 | public ::testing::WithParamInterface<std::tuple<bool, bool>> { | |
80 | public: | |
1e59de90 | 81 | PrefetchTest() : DBTestBase("prefetch_test", true) {} |
20effc67 TL |
82 | }; |
83 | ||
1e59de90 TL |
84 | INSTANTIATE_TEST_CASE_P(PrefetchTest, PrefetchTest, |
85 | ::testing::Combine(::testing::Bool(), | |
86 | ::testing::Bool())); | |
87 | ||
20effc67 TL |
88 | std::string BuildKey(int num, std::string postfix = "") { |
89 | return "my_key_" + std::to_string(num) + postfix; | |
90 | } | |
91 | ||
92 | TEST_P(PrefetchTest, Basic) { | |
93 | // First param is if the mockFS support_prefetch or not | |
1e59de90 TL |
94 | bool support_prefetch = |
95 | std::get<0>(GetParam()) && | |
96 | test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); | |
20effc67 TL |
97 | |
98 | // Second param is if directIO is enabled or not | |
99 | bool use_direct_io = std::get<1>(GetParam()); | |
100 | const int kNumKeys = 1100; | |
101 | std::shared_ptr<MockFS> fs = | |
102 | std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch); | |
103 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
104 | Options options = CurrentOptions(); | |
105 | options.write_buffer_size = 1024; | |
106 | options.create_if_missing = true; | |
107 | options.compression = kNoCompression; | |
108 | options.env = env.get(); | |
109 | if (use_direct_io) { | |
110 | options.use_direct_reads = true; | |
111 | options.use_direct_io_for_flush_and_compaction = true; | |
112 | } | |
113 | ||
114 | int buff_prefetch_count = 0; | |
115 | SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", | |
116 | [&](void*) { buff_prefetch_count++; }); | |
117 | SyncPoint::GetInstance()->EnableProcessing(); | |
118 | ||
119 | Status s = TryReopen(options); | |
120 | if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
121 | // If direct IO is not supported, skip the test | |
122 | return; | |
123 | } else { | |
124 | ASSERT_OK(s); | |
125 | } | |
126 | ||
127 | // create first key range | |
128 | WriteBatch batch; | |
129 | for (int i = 0; i < kNumKeys; i++) { | |
1e59de90 | 130 | ASSERT_OK(batch.Put(BuildKey(i), "value for range 1 key")); |
20effc67 TL |
131 | } |
132 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
133 | ||
134 | // create second key range | |
135 | batch.Clear(); | |
136 | for (int i = 0; i < kNumKeys; i++) { | |
1e59de90 | 137 | ASSERT_OK(batch.Put(BuildKey(i, "key2"), "value for range 2 key")); |
20effc67 TL |
138 | } |
139 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
140 | ||
141 | // delete second key range | |
142 | batch.Clear(); | |
143 | for (int i = 0; i < kNumKeys; i++) { | |
1e59de90 | 144 | ASSERT_OK(batch.Delete(BuildKey(i, "key2"))); |
20effc67 TL |
145 | } |
146 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
147 | ||
148 | // compact database | |
149 | std::string start_key = BuildKey(0); | |
150 | std::string end_key = BuildKey(kNumKeys - 1); | |
151 | Slice least(start_key.data(), start_key.size()); | |
152 | Slice greatest(end_key.data(), end_key.size()); | |
153 | ||
154 | // commenting out the line below causes the example to work correctly | |
1e59de90 | 155 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); |
20effc67 TL |
156 | |
157 | if (support_prefetch && !use_direct_io) { | |
158 | // If underline file system supports prefetch, and directIO is not enabled | |
159 | // make sure prefetch() is called and FilePrefetchBuffer is not used. | |
160 | ASSERT_TRUE(fs->IsPrefetchCalled()); | |
161 | fs->ClearPrefetchCount(); | |
162 | ASSERT_EQ(0, buff_prefetch_count); | |
163 | } else { | |
164 | // If underline file system doesn't support prefetch, or directIO is | |
165 | // enabled, make sure prefetch() is not called and FilePrefetchBuffer is | |
166 | // used. | |
167 | ASSERT_FALSE(fs->IsPrefetchCalled()); | |
168 | ASSERT_GT(buff_prefetch_count, 0); | |
169 | buff_prefetch_count = 0; | |
170 | } | |
171 | ||
172 | // count the keys | |
173 | { | |
174 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
175 | int num_keys = 0; | |
176 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
177 | num_keys++; | |
178 | } | |
179 | } | |
180 | ||
181 | // Make sure prefetch is called only if file system support prefetch. | |
182 | if (support_prefetch && !use_direct_io) { | |
183 | ASSERT_TRUE(fs->IsPrefetchCalled()); | |
184 | fs->ClearPrefetchCount(); | |
185 | ASSERT_EQ(0, buff_prefetch_count); | |
186 | } else { | |
187 | ASSERT_FALSE(fs->IsPrefetchCalled()); | |
188 | ASSERT_GT(buff_prefetch_count, 0); | |
189 | buff_prefetch_count = 0; | |
190 | } | |
191 | Close(); | |
192 | } | |
193 | ||
1e59de90 TL |
194 | #ifndef ROCKSDB_LITE |
195 | TEST_P(PrefetchTest, ConfigureAutoMaxReadaheadSize) { | |
196 | // First param is if the mockFS support_prefetch or not | |
197 | bool support_prefetch = | |
198 | std::get<0>(GetParam()) && | |
199 | test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); | |
200 | ||
201 | // Second param is if directIO is enabled or not | |
202 | bool use_direct_io = std::get<1>(GetParam()); | |
203 | ||
204 | std::shared_ptr<MockFS> fs = | |
205 | std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch); | |
206 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
207 | ||
208 | Options options = CurrentOptions(); | |
209 | options.write_buffer_size = 1024; | |
210 | options.create_if_missing = true; | |
211 | options.compression = kNoCompression; | |
212 | options.env = env.get(); | |
213 | options.disable_auto_compactions = true; | |
214 | if (use_direct_io) { | |
215 | options.use_direct_reads = true; | |
216 | options.use_direct_io_for_flush_and_compaction = true; | |
217 | } | |
218 | BlockBasedTableOptions table_options; | |
219 | table_options.no_block_cache = true; | |
220 | table_options.cache_index_and_filter_blocks = false; | |
221 | table_options.metadata_block_size = 1024; | |
222 | table_options.index_type = | |
223 | BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; | |
224 | table_options.max_auto_readahead_size = 0; | |
225 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
226 | ||
227 | int buff_prefetch_count = 0; | |
228 | SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", | |
229 | [&](void*) { buff_prefetch_count++; }); | |
230 | ||
231 | // DB open will create table readers unless we reduce the table cache | |
232 | // capacity. SanitizeOptions will set max_open_files to minimum of 20. Table | |
233 | // cache is allocated with max_open_files - 10 as capacity. So override | |
234 | // max_open_files to 10 so table cache capacity will become 0. This will | |
235 | // prevent file open during DB open and force the file to be opened during | |
236 | // Iteration. | |
237 | SyncPoint::GetInstance()->SetCallBack( | |
238 | "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { | |
239 | int* max_open_files = (int*)arg; | |
240 | *max_open_files = 11; | |
241 | }); | |
242 | ||
243 | SyncPoint::GetInstance()->EnableProcessing(); | |
244 | ||
245 | Status s = TryReopen(options); | |
246 | ||
247 | if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
248 | // If direct IO is not supported, skip the test | |
249 | return; | |
250 | } else { | |
251 | ASSERT_OK(s); | |
252 | } | |
253 | ||
254 | Random rnd(309); | |
255 | int key_count = 0; | |
256 | const int num_keys_per_level = 100; | |
257 | // Level 0 : Keys in range [0, 99], Level 1:[100, 199], Level 2:[200, 299]. | |
258 | for (int level = 2; level >= 0; level--) { | |
259 | key_count = level * num_keys_per_level; | |
260 | for (int i = 0; i < num_keys_per_level; ++i) { | |
261 | ASSERT_OK(Put(Key(key_count++), rnd.RandomString(500))); | |
262 | } | |
263 | ASSERT_OK(Flush()); | |
264 | MoveFilesToLevel(level); | |
265 | } | |
266 | Close(); | |
267 | std::vector<int> buff_prefectch_level_count = {0, 0, 0}; | |
268 | TryReopen(options); | |
269 | { | |
270 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
271 | fs->ClearPrefetchCount(); | |
272 | buff_prefetch_count = 0; | |
273 | ||
274 | for (int level = 2; level >= 0; level--) { | |
275 | key_count = level * num_keys_per_level; | |
276 | switch (level) { | |
277 | case 0: | |
278 | // max_auto_readahead_size is set 0 so data and index blocks are not | |
279 | // prefetched. | |
280 | ASSERT_OK(db_->SetOptions( | |
281 | {{"block_based_table_factory", "{max_auto_readahead_size=0;}"}})); | |
282 | break; | |
283 | case 1: | |
284 | // max_auto_readahead_size is set less than | |
285 | // initial_auto_readahead_size. So readahead_size remains equal to | |
286 | // max_auto_readahead_size. | |
287 | ASSERT_OK(db_->SetOptions({{"block_based_table_factory", | |
288 | "{max_auto_readahead_size=4096;}"}})); | |
289 | break; | |
290 | case 2: | |
291 | ASSERT_OK(db_->SetOptions({{"block_based_table_factory", | |
292 | "{max_auto_readahead_size=65536;}"}})); | |
293 | break; | |
294 | default: | |
295 | assert(false); | |
296 | } | |
297 | ||
298 | for (int i = 0; i < num_keys_per_level; ++i) { | |
299 | iter->Seek(Key(key_count++)); | |
300 | iter->Next(); | |
301 | } | |
302 | ||
303 | buff_prefectch_level_count[level] = buff_prefetch_count; | |
304 | if (support_prefetch && !use_direct_io) { | |
305 | if (level == 0) { | |
306 | ASSERT_FALSE(fs->IsPrefetchCalled()); | |
307 | } else { | |
308 | ASSERT_TRUE(fs->IsPrefetchCalled()); | |
309 | } | |
310 | fs->ClearPrefetchCount(); | |
311 | } else { | |
312 | ASSERT_FALSE(fs->IsPrefetchCalled()); | |
313 | if (level == 0) { | |
314 | ASSERT_EQ(buff_prefetch_count, 0); | |
315 | } else { | |
316 | ASSERT_GT(buff_prefetch_count, 0); | |
317 | } | |
318 | buff_prefetch_count = 0; | |
319 | } | |
320 | } | |
321 | } | |
322 | ||
323 | if (!support_prefetch) { | |
324 | ASSERT_GT(buff_prefectch_level_count[1], buff_prefectch_level_count[2]); | |
325 | } | |
326 | ||
327 | SyncPoint::GetInstance()->DisableProcessing(); | |
328 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
329 | Close(); | |
330 | } | |
331 | ||
332 | TEST_P(PrefetchTest, ConfigureInternalAutoReadaheadSize) { | |
333 | // First param is if the mockFS support_prefetch or not | |
334 | bool support_prefetch = | |
335 | std::get<0>(GetParam()) && | |
336 | test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); | |
337 | ||
338 | // Second param is if directIO is enabled or not | |
339 | bool use_direct_io = std::get<1>(GetParam()); | |
340 | ||
341 | std::shared_ptr<MockFS> fs = | |
342 | std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch); | |
343 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
344 | ||
345 | Options options = CurrentOptions(); | |
346 | options.write_buffer_size = 1024; | |
347 | options.create_if_missing = true; | |
348 | options.compression = kNoCompression; | |
349 | options.env = env.get(); | |
350 | options.disable_auto_compactions = true; | |
351 | if (use_direct_io) { | |
352 | options.use_direct_reads = true; | |
353 | options.use_direct_io_for_flush_and_compaction = true; | |
354 | } | |
355 | BlockBasedTableOptions table_options; | |
356 | table_options.no_block_cache = true; | |
357 | table_options.cache_index_and_filter_blocks = false; | |
358 | table_options.metadata_block_size = 1024; | |
359 | table_options.index_type = | |
360 | BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; | |
361 | table_options.initial_auto_readahead_size = 0; | |
362 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
363 | ||
364 | int buff_prefetch_count = 0; | |
365 | // DB open will create table readers unless we reduce the table cache | |
366 | // capacity. SanitizeOptions will set max_open_files to minimum of 20. | |
367 | // Table cache is allocated with max_open_files - 10 as capacity. So | |
368 | // override max_open_files to 10 so table cache capacity will become 0. | |
369 | // This will prevent file open during DB open and force the file to be | |
370 | // opened during Iteration. | |
371 | SyncPoint::GetInstance()->SetCallBack( | |
372 | "SanitizeOptions::AfterChangeMaxOpenFiles", [&](void* arg) { | |
373 | int* max_open_files = (int*)arg; | |
374 | *max_open_files = 11; | |
375 | }); | |
376 | ||
377 | SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", | |
378 | [&](void*) { buff_prefetch_count++; }); | |
379 | ||
380 | SyncPoint::GetInstance()->EnableProcessing(); | |
381 | ||
382 | SyncPoint::GetInstance()->EnableProcessing(); | |
383 | ||
384 | Status s = TryReopen(options); | |
385 | ||
386 | if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
387 | // If direct IO is not supported, skip the test | |
388 | return; | |
389 | } else { | |
390 | ASSERT_OK(s); | |
391 | } | |
392 | ||
393 | Random rnd(309); | |
394 | int key_count = 0; | |
395 | const int num_keys_per_level = 100; | |
396 | // Level 0 : Keys in range [0, 99], Level 1:[100, 199], Level 2:[200, 299]. | |
397 | for (int level = 2; level >= 0; level--) { | |
398 | key_count = level * num_keys_per_level; | |
399 | for (int i = 0; i < num_keys_per_level; ++i) { | |
400 | ASSERT_OK(Put(Key(key_count++), rnd.RandomString(500))); | |
401 | } | |
402 | ASSERT_OK(Flush()); | |
403 | MoveFilesToLevel(level); | |
404 | } | |
405 | Close(); | |
406 | ||
407 | TryReopen(options); | |
408 | { | |
409 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
410 | fs->ClearPrefetchCount(); | |
411 | buff_prefetch_count = 0; | |
412 | std::vector<int> buff_prefetch_level_count = {0, 0, 0}; | |
413 | ||
414 | for (int level = 2; level >= 0; level--) { | |
415 | key_count = level * num_keys_per_level; | |
416 | switch (level) { | |
417 | case 0: | |
418 | // initial_auto_readahead_size is set 0 so data and index blocks are | |
419 | // not prefetched. | |
420 | ASSERT_OK(db_->SetOptions({{"block_based_table_factory", | |
421 | "{initial_auto_readahead_size=0;}"}})); | |
422 | break; | |
423 | case 1: | |
424 | // intial_auto_readahead_size and max_auto_readahead_size are set same | |
425 | // so readahead_size remains same. | |
426 | ASSERT_OK(db_->SetOptions({{"block_based_table_factory", | |
427 | "{initial_auto_readahead_size=4096;max_" | |
428 | "auto_readahead_size=4096;}"}})); | |
429 | break; | |
430 | case 2: | |
431 | ASSERT_OK( | |
432 | db_->SetOptions({{"block_based_table_factory", | |
433 | "{initial_auto_readahead_size=65536;}"}})); | |
434 | break; | |
435 | default: | |
436 | assert(false); | |
437 | } | |
438 | ||
439 | for (int i = 0; i < num_keys_per_level; ++i) { | |
440 | iter->Seek(Key(key_count++)); | |
441 | iter->Next(); | |
442 | } | |
443 | ||
444 | buff_prefetch_level_count[level] = buff_prefetch_count; | |
445 | if (support_prefetch && !use_direct_io) { | |
446 | if (level == 0) { | |
447 | ASSERT_FALSE(fs->IsPrefetchCalled()); | |
448 | } else { | |
449 | ASSERT_TRUE(fs->IsPrefetchCalled()); | |
450 | } | |
451 | fs->ClearPrefetchCount(); | |
452 | } else { | |
453 | ASSERT_FALSE(fs->IsPrefetchCalled()); | |
454 | if (level == 0) { | |
455 | ASSERT_EQ(buff_prefetch_count, 0); | |
456 | } else { | |
457 | ASSERT_GT(buff_prefetch_count, 0); | |
458 | } | |
459 | buff_prefetch_count = 0; | |
460 | } | |
461 | } | |
462 | if (!support_prefetch) { | |
463 | ASSERT_GT(buff_prefetch_level_count[1], buff_prefetch_level_count[2]); | |
464 | } | |
465 | } | |
466 | SyncPoint::GetInstance()->DisableProcessing(); | |
467 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
468 | Close(); | |
469 | } | |
470 | ||
471 | TEST_P(PrefetchTest, ConfigureNumFilesReadsForReadaheadSize) { | |
472 | // First param is if the mockFS support_prefetch or not | |
473 | bool support_prefetch = | |
474 | std::get<0>(GetParam()) && | |
475 | test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); | |
476 | ||
477 | const int kNumKeys = 2000; | |
478 | std::shared_ptr<MockFS> fs = | |
479 | std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch); | |
480 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
481 | ||
482 | // Second param is if directIO is enabled or not | |
483 | bool use_direct_io = std::get<1>(GetParam()); | |
484 | ||
485 | Options options = CurrentOptions(); | |
486 | options.write_buffer_size = 1024; | |
487 | options.create_if_missing = true; | |
488 | options.compression = kNoCompression; | |
489 | options.env = env.get(); | |
490 | ||
491 | BlockBasedTableOptions table_options; | |
492 | table_options.no_block_cache = true; | |
493 | table_options.cache_index_and_filter_blocks = false; | |
494 | table_options.metadata_block_size = 1024; | |
495 | table_options.index_type = | |
496 | BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; | |
497 | table_options.num_file_reads_for_auto_readahead = 0; | |
498 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
499 | ||
500 | if (use_direct_io) { | |
501 | options.use_direct_reads = true; | |
502 | options.use_direct_io_for_flush_and_compaction = true; | |
503 | } | |
504 | ||
505 | int buff_prefetch_count = 0; | |
506 | SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", | |
507 | [&](void*) { buff_prefetch_count++; }); | |
508 | SyncPoint::GetInstance()->EnableProcessing(); | |
509 | ||
510 | Status s = TryReopen(options); | |
511 | if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
512 | // If direct IO is not supported, skip the test | |
513 | return; | |
514 | } else { | |
515 | ASSERT_OK(s); | |
516 | } | |
517 | ||
518 | WriteBatch batch; | |
519 | Random rnd(309); | |
520 | for (int i = 0; i < kNumKeys; i++) { | |
521 | ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); | |
522 | } | |
523 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
524 | ||
525 | std::string start_key = BuildKey(0); | |
526 | std::string end_key = BuildKey(kNumKeys - 1); | |
527 | Slice least(start_key.data(), start_key.size()); | |
528 | Slice greatest(end_key.data(), end_key.size()); | |
529 | ||
530 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); | |
531 | ||
532 | Close(); | |
533 | TryReopen(options); | |
534 | ||
535 | fs->ClearPrefetchCount(); | |
536 | buff_prefetch_count = 0; | |
537 | ||
538 | { | |
539 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
540 | /* | |
541 | * Reseek keys from sequential Data Blocks within same partitioned | |
542 | * index. It will prefetch the data block at the first seek since | |
543 | * num_file_reads_for_auto_readahead = 0. Data Block size is nearly 4076 so | |
544 | * readahead will fetch 8 * 1024 data more initially (2 more data blocks). | |
545 | */ | |
546 | iter->Seek(BuildKey(0)); // Prefetch data + index block since | |
547 | // num_file_reads_for_auto_readahead = 0. | |
548 | ASSERT_TRUE(iter->Valid()); | |
549 | iter->Seek(BuildKey(1000)); // In buffer | |
550 | ASSERT_TRUE(iter->Valid()); | |
551 | iter->Seek(BuildKey(1004)); // In buffer | |
552 | ASSERT_TRUE(iter->Valid()); | |
553 | iter->Seek(BuildKey(1008)); // Prefetch Data | |
554 | ASSERT_TRUE(iter->Valid()); | |
555 | iter->Seek(BuildKey(1011)); // In buffer | |
556 | ASSERT_TRUE(iter->Valid()); | |
557 | iter->Seek(BuildKey(1015)); // In buffer | |
558 | ASSERT_TRUE(iter->Valid()); | |
559 | iter->Seek(BuildKey(1019)); // In buffer | |
560 | ASSERT_TRUE(iter->Valid()); | |
561 | // Missed 2 blocks but they are already in buffer so no reset. | |
562 | iter->Seek(BuildKey(103)); // Already in buffer. | |
563 | ASSERT_TRUE(iter->Valid()); | |
564 | iter->Seek(BuildKey(1033)); // Prefetch Data. | |
565 | ASSERT_TRUE(iter->Valid()); | |
566 | if (support_prefetch && !use_direct_io) { | |
567 | ASSERT_EQ(fs->GetPrefetchCount(), 4); | |
568 | fs->ClearPrefetchCount(); | |
569 | } else { | |
570 | ASSERT_EQ(buff_prefetch_count, 4); | |
571 | buff_prefetch_count = 0; | |
572 | } | |
573 | } | |
574 | ||
575 | SyncPoint::GetInstance()->DisableProcessing(); | |
576 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
577 | Close(); | |
578 | } | |
579 | #endif // !ROCKSDB_LITE | |
580 | ||
581 | TEST_P(PrefetchTest, PrefetchWhenReseek) { | |
582 | // First param is if the mockFS support_prefetch or not | |
583 | bool support_prefetch = | |
584 | std::get<0>(GetParam()) && | |
585 | test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); | |
586 | ||
587 | const int kNumKeys = 2000; | |
588 | std::shared_ptr<MockFS> fs = | |
589 | std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch); | |
590 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
591 | ||
592 | // Second param is if directIO is enabled or not | |
593 | bool use_direct_io = std::get<1>(GetParam()); | |
594 | ||
595 | Options options = CurrentOptions(); | |
596 | options.write_buffer_size = 1024; | |
597 | options.create_if_missing = true; | |
598 | options.compression = kNoCompression; | |
599 | options.env = env.get(); | |
600 | ||
601 | BlockBasedTableOptions table_options; | |
602 | table_options.no_block_cache = true; | |
603 | table_options.cache_index_and_filter_blocks = false; | |
604 | table_options.metadata_block_size = 1024; | |
605 | table_options.index_type = | |
606 | BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; | |
607 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
608 | ||
609 | if (use_direct_io) { | |
610 | options.use_direct_reads = true; | |
611 | options.use_direct_io_for_flush_and_compaction = true; | |
612 | } | |
613 | ||
614 | int buff_prefetch_count = 0; | |
615 | SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", | |
616 | [&](void*) { buff_prefetch_count++; }); | |
617 | SyncPoint::GetInstance()->EnableProcessing(); | |
618 | ||
619 | Status s = TryReopen(options); | |
620 | if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
621 | // If direct IO is not supported, skip the test | |
622 | return; | |
623 | } else { | |
624 | ASSERT_OK(s); | |
625 | } | |
626 | ||
627 | WriteBatch batch; | |
628 | Random rnd(309); | |
629 | for (int i = 0; i < kNumKeys; i++) { | |
630 | ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); | |
631 | } | |
632 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
633 | ||
634 | std::string start_key = BuildKey(0); | |
635 | std::string end_key = BuildKey(kNumKeys - 1); | |
636 | Slice least(start_key.data(), start_key.size()); | |
637 | Slice greatest(end_key.data(), end_key.size()); | |
638 | ||
639 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); | |
640 | ||
641 | fs->ClearPrefetchCount(); | |
642 | buff_prefetch_count = 0; | |
643 | ||
644 | { | |
645 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
646 | /* | |
647 | * Reseek keys from sequential Data Blocks within same partitioned | |
648 | * index. After 2 sequential reads it will prefetch the data block. | |
649 | * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data more | |
650 | * initially (2 more data blocks). | |
651 | */ | |
652 | iter->Seek(BuildKey(0)); | |
653 | ASSERT_TRUE(iter->Valid()); | |
654 | iter->Seek(BuildKey(1000)); | |
655 | ASSERT_TRUE(iter->Valid()); | |
656 | iter->Seek(BuildKey(1004)); // Prefetch Data | |
657 | ASSERT_TRUE(iter->Valid()); | |
658 | iter->Seek(BuildKey(1008)); | |
659 | ASSERT_TRUE(iter->Valid()); | |
660 | iter->Seek(BuildKey(1011)); | |
661 | ASSERT_TRUE(iter->Valid()); | |
662 | iter->Seek(BuildKey(1015)); // Prefetch Data | |
663 | ASSERT_TRUE(iter->Valid()); | |
664 | iter->Seek(BuildKey(1019)); | |
665 | ASSERT_TRUE(iter->Valid()); | |
666 | // Missed 2 blocks but they are already in buffer so no reset. | |
667 | iter->Seek(BuildKey(103)); // Already in buffer. | |
668 | ASSERT_TRUE(iter->Valid()); | |
669 | iter->Seek(BuildKey(1033)); // Prefetch Data | |
670 | ASSERT_TRUE(iter->Valid()); | |
671 | if (support_prefetch && !use_direct_io) { | |
672 | ASSERT_EQ(fs->GetPrefetchCount(), 3); | |
673 | fs->ClearPrefetchCount(); | |
674 | } else { | |
675 | ASSERT_EQ(buff_prefetch_count, 3); | |
676 | buff_prefetch_count = 0; | |
677 | } | |
678 | } | |
679 | { | |
680 | /* | |
681 | * Reseek keys from non sequential data blocks within same partitioned | |
682 | * index. buff_prefetch_count will be 0 in that case. | |
683 | */ | |
684 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
685 | iter->Seek(BuildKey(0)); | |
686 | ASSERT_TRUE(iter->Valid()); | |
687 | iter->Seek(BuildKey(1008)); | |
688 | ASSERT_TRUE(iter->Valid()); | |
689 | iter->Seek(BuildKey(1019)); | |
690 | ASSERT_TRUE(iter->Valid()); | |
691 | iter->Seek(BuildKey(1033)); | |
692 | ASSERT_TRUE(iter->Valid()); | |
693 | iter->Seek(BuildKey(1048)); | |
694 | ASSERT_TRUE(iter->Valid()); | |
695 | if (support_prefetch && !use_direct_io) { | |
696 | ASSERT_EQ(fs->GetPrefetchCount(), 0); | |
697 | fs->ClearPrefetchCount(); | |
698 | } else { | |
699 | ASSERT_EQ(buff_prefetch_count, 0); | |
700 | buff_prefetch_count = 0; | |
701 | } | |
702 | } | |
703 | { | |
704 | /* | |
705 | * Reesek keys from Single Data Block. | |
706 | */ | |
707 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
708 | iter->Seek(BuildKey(0)); | |
709 | ASSERT_TRUE(iter->Valid()); | |
710 | iter->Seek(BuildKey(1)); | |
711 | ASSERT_TRUE(iter->Valid()); | |
712 | iter->Seek(BuildKey(10)); | |
713 | ASSERT_TRUE(iter->Valid()); | |
714 | iter->Seek(BuildKey(100)); | |
715 | ASSERT_TRUE(iter->Valid()); | |
716 | if (support_prefetch && !use_direct_io) { | |
717 | ASSERT_EQ(fs->GetPrefetchCount(), 0); | |
718 | fs->ClearPrefetchCount(); | |
719 | } else { | |
720 | ASSERT_EQ(buff_prefetch_count, 0); | |
721 | buff_prefetch_count = 0; | |
722 | } | |
723 | } | |
724 | { | |
725 | /* | |
726 | * Reseek keys from sequential data blocks to set implicit auto readahead | |
727 | * and prefetch data but after that iterate over different (non sequential) | |
728 | * data blocks which won't prefetch any data further. So buff_prefetch_count | |
729 | * will be 1 for the first one. | |
730 | */ | |
731 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
732 | iter->Seek(BuildKey(0)); | |
733 | ASSERT_TRUE(iter->Valid()); | |
734 | iter->Seek(BuildKey(1000)); | |
735 | ASSERT_TRUE(iter->Valid()); | |
736 | iter->Seek(BuildKey(1004)); // This iteration will prefetch buffer | |
737 | ASSERT_TRUE(iter->Valid()); | |
738 | iter->Seek(BuildKey(1008)); | |
739 | ASSERT_TRUE(iter->Valid()); | |
740 | iter->Seek( | |
741 | BuildKey(996)); // Reseek won't prefetch any data and | |
742 | // readahead_size will be initiallized to 8*1024. | |
743 | ASSERT_TRUE(iter->Valid()); | |
744 | iter->Seek(BuildKey(992)); | |
745 | ASSERT_TRUE(iter->Valid()); | |
746 | iter->Seek(BuildKey(989)); | |
747 | ASSERT_TRUE(iter->Valid()); | |
748 | if (support_prefetch && !use_direct_io) { | |
749 | ASSERT_EQ(fs->GetPrefetchCount(), 1); | |
750 | fs->ClearPrefetchCount(); | |
751 | } else { | |
752 | ASSERT_EQ(buff_prefetch_count, 1); | |
753 | buff_prefetch_count = 0; | |
754 | } | |
20effc67 | 755 | |
1e59de90 TL |
756 | // Read sequentially to confirm readahead_size is reset to initial value (2 |
757 | // more data blocks) | |
758 | iter->Seek(BuildKey(1011)); | |
759 | ASSERT_TRUE(iter->Valid()); | |
760 | iter->Seek(BuildKey(1015)); | |
761 | ASSERT_TRUE(iter->Valid()); | |
762 | iter->Seek(BuildKey(1019)); // Prefetch Data | |
763 | ASSERT_TRUE(iter->Valid()); | |
764 | iter->Seek(BuildKey(1022)); | |
765 | ASSERT_TRUE(iter->Valid()); | |
766 | iter->Seek(BuildKey(1026)); | |
767 | ASSERT_TRUE(iter->Valid()); | |
768 | iter->Seek(BuildKey(103)); // Prefetch Data | |
769 | ASSERT_TRUE(iter->Valid()); | |
770 | if (support_prefetch && !use_direct_io) { | |
771 | ASSERT_EQ(fs->GetPrefetchCount(), 2); | |
772 | fs->ClearPrefetchCount(); | |
773 | } else { | |
774 | ASSERT_EQ(buff_prefetch_count, 2); | |
775 | buff_prefetch_count = 0; | |
776 | } | |
777 | } | |
778 | { | |
779 | /* Reseek keys from sequential partitioned index block. Since partitioned | |
780 | * index fetch are sequential, buff_prefetch_count will be 1. | |
781 | */ | |
782 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
783 | iter->Seek(BuildKey(0)); | |
784 | ASSERT_TRUE(iter->Valid()); | |
785 | iter->Seek(BuildKey(1167)); | |
786 | ASSERT_TRUE(iter->Valid()); | |
787 | iter->Seek(BuildKey(1334)); // This iteration will prefetch buffer | |
788 | ASSERT_TRUE(iter->Valid()); | |
789 | iter->Seek(BuildKey(1499)); | |
790 | ASSERT_TRUE(iter->Valid()); | |
791 | iter->Seek(BuildKey(1667)); | |
792 | ASSERT_TRUE(iter->Valid()); | |
793 | iter->Seek(BuildKey(1847)); | |
794 | ASSERT_TRUE(iter->Valid()); | |
795 | iter->Seek(BuildKey(1999)); | |
796 | ASSERT_TRUE(iter->Valid()); | |
797 | if (support_prefetch && !use_direct_io) { | |
798 | ASSERT_EQ(fs->GetPrefetchCount(), 1); | |
799 | fs->ClearPrefetchCount(); | |
800 | } else { | |
801 | ASSERT_EQ(buff_prefetch_count, 1); | |
802 | buff_prefetch_count = 0; | |
803 | } | |
804 | } | |
805 | { | |
806 | /* | |
807 | * Reseek over different keys from different blocks. buff_prefetch_count is | |
808 | * set 0. | |
809 | */ | |
810 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
811 | int i = 0; | |
812 | int j = 1000; | |
813 | do { | |
814 | iter->Seek(BuildKey(i)); | |
815 | if (!iter->Valid()) { | |
816 | break; | |
817 | } | |
818 | i = i + 100; | |
819 | iter->Seek(BuildKey(j)); | |
820 | j = j + 100; | |
821 | } while (i < 1000 && j < kNumKeys && iter->Valid()); | |
822 | if (support_prefetch && !use_direct_io) { | |
823 | ASSERT_EQ(fs->GetPrefetchCount(), 0); | |
824 | fs->ClearPrefetchCount(); | |
825 | } else { | |
826 | ASSERT_EQ(buff_prefetch_count, 0); | |
827 | buff_prefetch_count = 0; | |
828 | } | |
829 | } | |
830 | { | |
831 | /* Iterates sequentially over all keys. It will prefetch the buffer.*/ | |
832 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
833 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
834 | } | |
835 | if (support_prefetch && !use_direct_io) { | |
836 | ASSERT_EQ(fs->GetPrefetchCount(), 13); | |
837 | fs->ClearPrefetchCount(); | |
838 | } else { | |
839 | ASSERT_EQ(buff_prefetch_count, 13); | |
840 | buff_prefetch_count = 0; | |
841 | } | |
842 | } | |
843 | ||
844 | SyncPoint::GetInstance()->DisableProcessing(); | |
845 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
846 | Close(); | |
847 | } | |
848 | ||
849 | TEST_P(PrefetchTest, PrefetchWhenReseekwithCache) { | |
850 | // First param is if the mockFS support_prefetch or not | |
851 | bool support_prefetch = | |
852 | std::get<0>(GetParam()) && | |
853 | test::IsPrefetchSupported(env_->GetFileSystem(), dbname_); | |
854 | ||
855 | const int kNumKeys = 2000; | |
856 | std::shared_ptr<MockFS> fs = | |
857 | std::make_shared<MockFS>(env_->GetFileSystem(), support_prefetch); | |
858 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
859 | ||
860 | // Second param is if directIO is enabled or not | |
861 | bool use_direct_io = std::get<1>(GetParam()); | |
862 | ||
863 | Options options = CurrentOptions(); | |
864 | options.write_buffer_size = 1024; | |
865 | options.create_if_missing = true; | |
866 | options.compression = kNoCompression; | |
867 | options.env = env.get(); | |
868 | ||
869 | BlockBasedTableOptions table_options; | |
870 | std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB | |
871 | table_options.block_cache = cache; | |
872 | table_options.cache_index_and_filter_blocks = false; | |
873 | table_options.metadata_block_size = 1024; | |
874 | table_options.index_type = | |
875 | BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; | |
876 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
877 | ||
878 | if (use_direct_io) { | |
879 | options.use_direct_reads = true; | |
880 | options.use_direct_io_for_flush_and_compaction = true; | |
881 | } | |
882 | ||
883 | int buff_prefetch_count = 0; | |
884 | SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", | |
885 | [&](void*) { buff_prefetch_count++; }); | |
886 | SyncPoint::GetInstance()->EnableProcessing(); | |
887 | ||
888 | Status s = TryReopen(options); | |
889 | if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
890 | // If direct IO is not supported, skip the test | |
891 | return; | |
892 | } else { | |
893 | ASSERT_OK(s); | |
894 | } | |
895 | ||
896 | WriteBatch batch; | |
897 | Random rnd(309); | |
898 | for (int i = 0; i < kNumKeys; i++) { | |
899 | ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); | |
900 | } | |
901 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
902 | ||
903 | std::string start_key = BuildKey(0); | |
904 | std::string end_key = BuildKey(kNumKeys - 1); | |
905 | Slice least(start_key.data(), start_key.size()); | |
906 | Slice greatest(end_key.data(), end_key.size()); | |
907 | ||
908 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); | |
909 | ||
910 | fs->ClearPrefetchCount(); | |
911 | buff_prefetch_count = 0; | |
912 | ||
913 | { | |
914 | /* | |
915 | * Reseek keys from sequential Data Blocks within same partitioned | |
916 | * index. After 2 sequential reads it will prefetch the data block. | |
917 | * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data more | |
918 | * initially (2 more data blocks). | |
919 | */ | |
920 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
921 | // Warm up the cache | |
922 | iter->Seek(BuildKey(1011)); | |
923 | ASSERT_TRUE(iter->Valid()); | |
924 | iter->Seek(BuildKey(1015)); | |
925 | ASSERT_TRUE(iter->Valid()); | |
926 | iter->Seek(BuildKey(1019)); | |
927 | ASSERT_TRUE(iter->Valid()); | |
928 | if (support_prefetch && !use_direct_io) { | |
929 | ASSERT_EQ(fs->GetPrefetchCount(), 1); | |
930 | fs->ClearPrefetchCount(); | |
931 | } else { | |
932 | ASSERT_EQ(buff_prefetch_count, 1); | |
933 | buff_prefetch_count = 0; | |
934 | } | |
935 | } | |
936 | { | |
937 | // After caching, blocks will be read from cache (Sequential blocks) | |
938 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
939 | iter->Seek(BuildKey(0)); | |
940 | ASSERT_TRUE(iter->Valid()); | |
941 | iter->Seek(BuildKey(1000)); | |
942 | ASSERT_TRUE(iter->Valid()); | |
943 | iter->Seek(BuildKey(1004)); // Prefetch data (not in cache). | |
944 | ASSERT_TRUE(iter->Valid()); | |
945 | // Missed one sequential block but next is in already in buffer so readahead | |
946 | // will not be reset. | |
947 | iter->Seek(BuildKey(1011)); | |
948 | ASSERT_TRUE(iter->Valid()); | |
949 | // Prefetch data but blocks are in cache so no prefetch and reset. | |
950 | iter->Seek(BuildKey(1015)); | |
951 | ASSERT_TRUE(iter->Valid()); | |
952 | iter->Seek(BuildKey(1019)); | |
953 | ASSERT_TRUE(iter->Valid()); | |
954 | iter->Seek(BuildKey(1022)); | |
955 | ASSERT_TRUE(iter->Valid()); | |
956 | // Prefetch data with readahead_size = 4 blocks. | |
957 | iter->Seek(BuildKey(1026)); | |
958 | ASSERT_TRUE(iter->Valid()); | |
959 | iter->Seek(BuildKey(103)); | |
960 | ASSERT_TRUE(iter->Valid()); | |
961 | iter->Seek(BuildKey(1033)); | |
962 | ASSERT_TRUE(iter->Valid()); | |
963 | iter->Seek(BuildKey(1037)); | |
964 | ASSERT_TRUE(iter->Valid()); | |
965 | ||
966 | if (support_prefetch && !use_direct_io) { | |
967 | ASSERT_EQ(fs->GetPrefetchCount(), 3); | |
968 | fs->ClearPrefetchCount(); | |
969 | } else { | |
970 | ASSERT_EQ(buff_prefetch_count, 2); | |
971 | buff_prefetch_count = 0; | |
972 | } | |
973 | } | |
974 | ||
975 | SyncPoint::GetInstance()->DisableProcessing(); | |
976 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
977 | Close(); | |
978 | } | |
979 | ||
980 | #ifndef ROCKSDB_LITE | |
981 | TEST_P(PrefetchTest, DBIterLevelReadAhead) { | |
982 | const int kNumKeys = 1000; | |
983 | // Set options | |
984 | std::shared_ptr<MockFS> fs = | |
985 | std::make_shared<MockFS>(env_->GetFileSystem(), false); | |
986 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
987 | ||
988 | bool use_direct_io = std::get<0>(GetParam()); | |
989 | bool is_adaptive_readahead = std::get<1>(GetParam()); | |
990 | ||
991 | Options options = CurrentOptions(); | |
992 | options.write_buffer_size = 1024; | |
993 | options.create_if_missing = true; | |
994 | options.compression = kNoCompression; | |
995 | options.statistics = CreateDBStatistics(); | |
996 | options.env = env.get(); | |
997 | ||
998 | if (use_direct_io) { | |
999 | options.use_direct_reads = true; | |
1000 | options.use_direct_io_for_flush_and_compaction = true; | |
1001 | } | |
1002 | BlockBasedTableOptions table_options; | |
1003 | table_options.no_block_cache = true; | |
1004 | table_options.cache_index_and_filter_blocks = false; | |
1005 | table_options.metadata_block_size = 1024; | |
1006 | table_options.index_type = | |
1007 | BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; | |
1008 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
1009 | ||
1010 | Status s = TryReopen(options); | |
1011 | if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
1012 | // If direct IO is not supported, skip the test | |
1013 | return; | |
1014 | } else { | |
1015 | ASSERT_OK(s); | |
1016 | } | |
1017 | ||
1018 | WriteBatch batch; | |
1019 | Random rnd(309); | |
1020 | int total_keys = 0; | |
1021 | for (int j = 0; j < 5; j++) { | |
1022 | for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { | |
1023 | ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); | |
1024 | total_keys++; | |
1025 | } | |
1026 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
1027 | ASSERT_OK(Flush()); | |
1028 | } | |
1029 | MoveFilesToLevel(2); | |
1030 | int buff_prefetch_count = 0; | |
1031 | int buff_async_prefetch_count = 0; | |
1032 | int readahead_carry_over_count = 0; | |
1033 | int num_sst_files = NumTableFilesAtLevel(2); | |
1034 | size_t current_readahead_size = 0; | |
1035 | ||
1036 | // Test - Iterate over the keys sequentially. | |
1037 | { | |
1038 | SyncPoint::GetInstance()->SetCallBack( | |
1039 | "FilePrefetchBuffer::Prefetch:Start", | |
1040 | [&](void*) { buff_prefetch_count++; }); | |
1041 | ||
1042 | SyncPoint::GetInstance()->SetCallBack( | |
1043 | "FilePrefetchBuffer::PrefetchAsyncInternal:Start", | |
1044 | [&](void*) { buff_async_prefetch_count++; }); | |
1045 | ||
1046 | // The callback checks, since reads are sequential, readahead_size doesn't | |
1047 | // start from 8KB when iterator moves to next file and its called | |
1048 | // num_sst_files-1 times (excluding for first file). | |
1049 | SyncPoint::GetInstance()->SetCallBack( | |
1050 | "BlockPrefetcher::SetReadaheadState", [&](void* arg) { | |
1051 | readahead_carry_over_count++; | |
1052 | size_t readahead_size = *reinterpret_cast<size_t*>(arg); | |
1053 | if (readahead_carry_over_count) { | |
1054 | ASSERT_GT(readahead_size, 8 * 1024); | |
1055 | } | |
1056 | }); | |
1057 | ||
1058 | SyncPoint::GetInstance()->SetCallBack( | |
1059 | "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) { | |
1060 | current_readahead_size = *reinterpret_cast<size_t*>(arg); | |
1061 | ASSERT_GT(current_readahead_size, 0); | |
1062 | }); | |
1063 | ||
1064 | SyncPoint::GetInstance()->EnableProcessing(); | |
1065 | ||
1066 | ReadOptions ro; | |
1067 | if (is_adaptive_readahead) { | |
1068 | ro.adaptive_readahead = true; | |
1069 | ro.async_io = true; | |
1070 | } | |
1071 | ||
1072 | ASSERT_OK(options.statistics->Reset()); | |
1073 | ||
1074 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); | |
1075 | int num_keys = 0; | |
1076 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
1077 | ASSERT_OK(iter->status()); | |
1078 | num_keys++; | |
1079 | } | |
1080 | ASSERT_EQ(num_keys, total_keys); | |
1081 | ||
1082 | // For index and data blocks. | |
1083 | if (is_adaptive_readahead) { | |
1084 | ASSERT_EQ(readahead_carry_over_count, 2 * (num_sst_files - 1)); | |
1085 | ASSERT_GT(buff_async_prefetch_count, 0); | |
1086 | } else { | |
1087 | ASSERT_GT(buff_prefetch_count, 0); | |
1088 | ASSERT_EQ(readahead_carry_over_count, 0); | |
1089 | } | |
1090 | ||
1091 | // Check stats to make sure async prefetch is done. | |
1092 | { | |
1093 | HistogramData async_read_bytes; | |
1094 | options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); | |
1095 | if (ro.async_io) { | |
1096 | ASSERT_GT(async_read_bytes.count, 0); | |
1097 | } else { | |
1098 | ASSERT_EQ(async_read_bytes.count, 0); | |
1099 | } | |
1100 | } | |
1101 | ||
1102 | SyncPoint::GetInstance()->DisableProcessing(); | |
1103 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
1104 | } | |
1105 | Close(); | |
1106 | } | |
1107 | #endif //! ROCKSDB_LITE | |
1108 | ||
1109 | class PrefetchTest1 : public DBTestBase, | |
1110 | public ::testing::WithParamInterface<bool> { | |
1111 | public: | |
1112 | PrefetchTest1() : DBTestBase("prefetch_test1", true) {} | |
1113 | }; | |
1114 | ||
1115 | INSTANTIATE_TEST_CASE_P(PrefetchTest1, PrefetchTest1, ::testing::Bool()); | |
1116 | ||
1117 | #ifndef ROCKSDB_LITE | |
1118 | TEST_P(PrefetchTest1, NonSequentialReadsWithAdaptiveReadahead) { | |
1119 | const int kNumKeys = 1000; | |
1120 | // Set options | |
1121 | std::shared_ptr<MockFS> fs = | |
1122 | std::make_shared<MockFS>(env_->GetFileSystem(), false); | |
1123 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
1124 | ||
1125 | Options options = CurrentOptions(); | |
1126 | options.write_buffer_size = 1024; | |
1127 | options.create_if_missing = true; | |
1128 | options.compression = kNoCompression; | |
1129 | options.env = env.get(); | |
1130 | if (GetParam()) { | |
1131 | options.use_direct_reads = true; | |
1132 | options.use_direct_io_for_flush_and_compaction = true; | |
1133 | } | |
1134 | BlockBasedTableOptions table_options; | |
1135 | table_options.no_block_cache = true; | |
1136 | table_options.cache_index_and_filter_blocks = false; | |
1137 | table_options.metadata_block_size = 1024; | |
1138 | table_options.index_type = | |
1139 | BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; | |
1140 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
1141 | ||
1142 | Status s = TryReopen(options); | |
1143 | if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
1144 | // If direct IO is not supported, skip the test | |
1145 | return; | |
1146 | } else { | |
1147 | ASSERT_OK(s); | |
1148 | } | |
1149 | ||
1150 | WriteBatch batch; | |
1151 | Random rnd(309); | |
1152 | for (int j = 0; j < 5; j++) { | |
1153 | for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { | |
1154 | ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); | |
1155 | } | |
1156 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
1157 | ASSERT_OK(Flush()); | |
1158 | } | |
1159 | MoveFilesToLevel(2); | |
1160 | ||
1161 | int buff_prefetch_count = 0; | |
1162 | int set_readahead = 0; | |
1163 | size_t readahead_size = 0; | |
1164 | ||
1165 | SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", | |
1166 | [&](void*) { buff_prefetch_count++; }); | |
1167 | SyncPoint::GetInstance()->SetCallBack( | |
1168 | "BlockPrefetcher::SetReadaheadState", | |
1169 | [&](void* /*arg*/) { set_readahead++; }); | |
1170 | SyncPoint::GetInstance()->SetCallBack( | |
1171 | "FilePrefetchBuffer::TryReadFromCache", | |
1172 | [&](void* arg) { readahead_size = *reinterpret_cast<size_t*>(arg); }); | |
1173 | ||
1174 | SyncPoint::GetInstance()->EnableProcessing(); | |
1175 | ||
1176 | { | |
1177 | // Iterate until prefetch is done. | |
1178 | ReadOptions ro; | |
1179 | ro.adaptive_readahead = true; | |
1180 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); | |
1181 | ||
1182 | iter->SeekToFirst(); | |
1183 | ASSERT_TRUE(iter->Valid()); | |
1184 | ||
1185 | while (iter->Valid() && buff_prefetch_count == 0) { | |
1186 | iter->Next(); | |
1187 | } | |
1188 | ||
1189 | ASSERT_EQ(readahead_size, 8 * 1024); | |
1190 | ASSERT_EQ(buff_prefetch_count, 1); | |
1191 | ASSERT_EQ(set_readahead, 0); | |
1192 | buff_prefetch_count = 0; | |
1193 | ||
1194 | // Move to last file and check readahead size fallbacks to 8KB. So next | |
1195 | // readahead size after prefetch should be 8 * 1024; | |
1196 | iter->Seek(BuildKey(4004)); | |
1197 | ASSERT_TRUE(iter->Valid()); | |
1198 | ||
1199 | while (iter->Valid() && buff_prefetch_count == 0) { | |
1200 | iter->Next(); | |
1201 | } | |
1202 | ||
1203 | ASSERT_EQ(readahead_size, 8 * 1024); | |
1204 | ASSERT_EQ(set_readahead, 0); | |
1205 | ASSERT_EQ(buff_prefetch_count, 1); | |
1206 | } | |
1207 | Close(); | |
1208 | } | |
1209 | #endif //! ROCKSDB_LITE | |
1210 | ||
1211 | TEST_P(PrefetchTest1, DecreaseReadAheadIfInCache) { | |
1212 | const int kNumKeys = 2000; | |
1213 | // Set options | |
1214 | std::shared_ptr<MockFS> fs = | |
1215 | std::make_shared<MockFS>(env_->GetFileSystem(), false); | |
1216 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
1217 | ||
1218 | Options options = CurrentOptions(); | |
1219 | options.write_buffer_size = 1024; | |
1220 | options.create_if_missing = true; | |
1221 | options.compression = kNoCompression; | |
1222 | options.env = env.get(); | |
1223 | if (GetParam()) { | |
1224 | options.use_direct_reads = true; | |
1225 | options.use_direct_io_for_flush_and_compaction = true; | |
1226 | } | |
1227 | ||
1228 | options.statistics = CreateDBStatistics(); | |
1229 | BlockBasedTableOptions table_options; | |
1230 | std::shared_ptr<Cache> cache = NewLRUCache(4 * 1024 * 1024, 2); // 8MB | |
1231 | table_options.block_cache = cache; | |
1232 | table_options.cache_index_and_filter_blocks = false; | |
1233 | table_options.metadata_block_size = 1024; | |
1234 | table_options.index_type = | |
1235 | BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; | |
1236 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
1237 | ||
1238 | Status s = TryReopen(options); | |
1239 | if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
1240 | // If direct IO is not supported, skip the test | |
1241 | return; | |
1242 | } else { | |
1243 | ASSERT_OK(s); | |
1244 | } | |
1245 | ||
1246 | WriteBatch batch; | |
1247 | Random rnd(309); | |
1248 | for (int i = 0; i < kNumKeys; i++) { | |
1249 | ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); | |
1250 | } | |
1251 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
1252 | ||
1253 | std::string start_key = BuildKey(0); | |
1254 | std::string end_key = BuildKey(kNumKeys - 1); | |
1255 | Slice least(start_key.data(), start_key.size()); | |
1256 | Slice greatest(end_key.data(), end_key.size()); | |
1257 | ||
1258 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); | |
1259 | ||
1260 | int buff_prefetch_count = 0; | |
1261 | size_t current_readahead_size = 0; | |
1262 | size_t expected_current_readahead_size = 8 * 1024; | |
1263 | size_t decrease_readahead_size = 8 * 1024; | |
1264 | ||
1265 | SyncPoint::GetInstance()->SetCallBack("FilePrefetchBuffer::Prefetch:Start", | |
1266 | [&](void*) { buff_prefetch_count++; }); | |
1267 | SyncPoint::GetInstance()->SetCallBack( | |
1268 | "FilePrefetchBuffer::TryReadFromCache", [&](void* arg) { | |
1269 | current_readahead_size = *reinterpret_cast<size_t*>(arg); | |
1270 | }); | |
1271 | ||
1272 | SyncPoint::GetInstance()->EnableProcessing(); | |
1273 | ReadOptions ro; | |
1274 | ro.adaptive_readahead = true; | |
1275 | { | |
1276 | /* | |
1277 | * Reseek keys from sequential Data Blocks within same partitioned | |
1278 | * index. After 2 sequential reads it will prefetch the data block. | |
1279 | * Data Block size is nearly 4076 so readahead will fetch 8 * 1024 data | |
1280 | * more initially (2 more data blocks). | |
1281 | */ | |
1282 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); | |
1283 | // Warm up the cache | |
1284 | iter->Seek(BuildKey(1011)); | |
1285 | ASSERT_TRUE(iter->Valid()); | |
1286 | iter->Seek(BuildKey(1015)); | |
1287 | ASSERT_TRUE(iter->Valid()); | |
1288 | iter->Seek(BuildKey(1019)); | |
1289 | ASSERT_TRUE(iter->Valid()); | |
1290 | buff_prefetch_count = 0; | |
1291 | } | |
1292 | ||
1293 | { | |
1294 | ASSERT_OK(options.statistics->Reset()); | |
1295 | // After caching, blocks will be read from cache (Sequential blocks) | |
1296 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); | |
1297 | iter->Seek( | |
1298 | BuildKey(0)); // In cache so it will decrease the readahead_size. | |
1299 | ASSERT_TRUE(iter->Valid()); | |
1300 | expected_current_readahead_size = std::max( | |
1301 | decrease_readahead_size, | |
1302 | (expected_current_readahead_size >= decrease_readahead_size | |
1303 | ? (expected_current_readahead_size - decrease_readahead_size) | |
1304 | : 0)); | |
1305 | ||
1306 | iter->Seek(BuildKey(1000)); // Won't prefetch the block. | |
1307 | ASSERT_TRUE(iter->Valid()); | |
1308 | ASSERT_EQ(current_readahead_size, expected_current_readahead_size); | |
1309 | ||
1310 | iter->Seek(BuildKey(1004)); // Prefetch the block. | |
1311 | ASSERT_TRUE(iter->Valid()); | |
1312 | ASSERT_EQ(current_readahead_size, expected_current_readahead_size); | |
1313 | expected_current_readahead_size *= 2; | |
1314 | ||
1315 | iter->Seek(BuildKey(1011)); | |
1316 | ASSERT_TRUE(iter->Valid()); | |
1317 | ||
1318 | // Eligible to Prefetch data (not in buffer) but block is in cache so no | |
1319 | // prefetch will happen and will result in decrease in readahead_size. | |
1320 | // readahead_size will be 8 * 1024 | |
1321 | iter->Seek(BuildKey(1015)); | |
1322 | ASSERT_TRUE(iter->Valid()); | |
1323 | expected_current_readahead_size = std::max( | |
1324 | decrease_readahead_size, | |
1325 | (expected_current_readahead_size >= decrease_readahead_size | |
1326 | ? (expected_current_readahead_size - decrease_readahead_size) | |
1327 | : 0)); | |
1328 | ||
1329 | // 1016 is the same block as 1015. So no change in readahead_size. | |
1330 | iter->Seek(BuildKey(1016)); | |
1331 | ASSERT_TRUE(iter->Valid()); | |
1332 | ||
1333 | // Prefetch data (not in buffer) but found in cache. So decrease | |
1334 | // readahead_size. Since it will 0 after decrementing so readahead_size will | |
1335 | // be set to initial value. | |
1336 | iter->Seek(BuildKey(1019)); | |
1337 | ASSERT_TRUE(iter->Valid()); | |
1338 | expected_current_readahead_size = std::max( | |
1339 | decrease_readahead_size, | |
1340 | (expected_current_readahead_size >= decrease_readahead_size | |
1341 | ? (expected_current_readahead_size - decrease_readahead_size) | |
1342 | : 0)); | |
1343 | ||
1344 | // Prefetch next sequential data. | |
1345 | iter->Seek(BuildKey(1022)); | |
1346 | ASSERT_TRUE(iter->Valid()); | |
1347 | ASSERT_EQ(current_readahead_size, expected_current_readahead_size); | |
1348 | ASSERT_EQ(buff_prefetch_count, 2); | |
1349 | ||
1350 | buff_prefetch_count = 0; | |
1351 | } | |
1352 | Close(); | |
1353 | } | |
1354 | ||
1355 | TEST_P(PrefetchTest1, SeekParallelizationTest) { | |
1356 | const int kNumKeys = 2000; | |
1357 | // Set options | |
1358 | std::shared_ptr<MockFS> fs = | |
1359 | std::make_shared<MockFS>(env_->GetFileSystem(), false); | |
1360 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
1361 | ||
1362 | Options options = CurrentOptions(); | |
1363 | options.write_buffer_size = 1024; | |
1364 | options.create_if_missing = true; | |
1365 | options.compression = kNoCompression; | |
1366 | options.env = env.get(); | |
1367 | if (GetParam()) { | |
1368 | options.use_direct_reads = true; | |
1369 | options.use_direct_io_for_flush_and_compaction = true; | |
1370 | } | |
1371 | ||
1372 | options.statistics = CreateDBStatistics(); | |
1373 | BlockBasedTableOptions table_options; | |
1374 | table_options.no_block_cache = true; | |
1375 | table_options.cache_index_and_filter_blocks = false; | |
1376 | table_options.metadata_block_size = 1024; | |
1377 | table_options.index_type = | |
1378 | BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; | |
1379 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
1380 | ||
1381 | Status s = TryReopen(options); | |
1382 | if (GetParam() && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
1383 | // If direct IO is not supported, skip the test | |
1384 | return; | |
1385 | } else { | |
1386 | ASSERT_OK(s); | |
1387 | } | |
1388 | ||
1389 | WriteBatch batch; | |
1390 | Random rnd(309); | |
1391 | for (int i = 0; i < kNumKeys; i++) { | |
1392 | ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); | |
1393 | } | |
1394 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
1395 | ||
1396 | std::string start_key = BuildKey(0); | |
1397 | std::string end_key = BuildKey(kNumKeys - 1); | |
1398 | Slice least(start_key.data(), start_key.size()); | |
1399 | Slice greatest(end_key.data(), end_key.size()); | |
1400 | ||
1401 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); | |
1402 | ||
1403 | int buff_prefetch_count = 0; | |
1404 | ||
1405 | SyncPoint::GetInstance()->SetCallBack( | |
1406 | "FilePrefetchBuffer::PrefetchAsyncInternal:Start", | |
1407 | [&](void*) { buff_prefetch_count++; }); | |
1408 | ||
1409 | SyncPoint::GetInstance()->EnableProcessing(); | |
1410 | ReadOptions ro; | |
1411 | ro.adaptive_readahead = true; | |
1412 | ro.async_io = true; | |
1413 | ||
1414 | { | |
1415 | ASSERT_OK(options.statistics->Reset()); | |
1416 | // Each block contains around 4 keys. | |
1417 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); | |
1418 | iter->Seek(BuildKey(0)); // Prefetch data because of seek parallelization. | |
1419 | ASSERT_TRUE(iter->Valid()); | |
1420 | iter->Next(); | |
1421 | ASSERT_TRUE(iter->Valid()); | |
1422 | iter->Next(); | |
1423 | ASSERT_TRUE(iter->Valid()); | |
1424 | iter->Next(); | |
1425 | ASSERT_TRUE(iter->Valid()); | |
1426 | ||
1427 | // New data block. Since num_file_reads in FilePrefetch after this read is | |
1428 | // 2, it won't go for prefetching. | |
1429 | iter->Next(); | |
1430 | ASSERT_TRUE(iter->Valid()); | |
1431 | iter->Next(); | |
1432 | ASSERT_TRUE(iter->Valid()); | |
1433 | iter->Next(); | |
1434 | ASSERT_TRUE(iter->Valid()); | |
1435 | iter->Next(); | |
1436 | ASSERT_TRUE(iter->Valid()); | |
1437 | ||
1438 | // Prefetch data. | |
1439 | iter->Next(); | |
1440 | ASSERT_TRUE(iter->Valid()); | |
1441 | ||
1442 | ASSERT_EQ(buff_prefetch_count, 2); | |
1443 | ||
1444 | // Check stats to make sure async prefetch is done. | |
1445 | { | |
1446 | HistogramData async_read_bytes; | |
1447 | options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); | |
1448 | ASSERT_GT(async_read_bytes.count, 0); | |
1449 | ASSERT_GT(get_perf_context()->number_async_seek, 0); | |
1450 | } | |
1451 | ||
1452 | buff_prefetch_count = 0; | |
1453 | } | |
1454 | Close(); | |
1455 | } | |
1456 | ||
1457 | extern "C" bool RocksDbIOUringEnable() { return true; } | |
1458 | ||
1459 | namespace { | |
1460 | #ifndef ROCKSDB_LITE | |
1461 | #ifdef GFLAGS | |
1462 | const int kMaxArgCount = 100; | |
1463 | const size_t kArgBufferSize = 100000; | |
1464 | ||
1465 | void RunIOTracerParserTool(std::string trace_file) { | |
1466 | std::vector<std::string> params = {"./io_tracer_parser", | |
1467 | "-io_trace_file=" + trace_file}; | |
1468 | ||
1469 | char arg_buffer[kArgBufferSize]; | |
1470 | char* argv[kMaxArgCount]; | |
1471 | int argc = 0; | |
1472 | int cursor = 0; | |
1473 | for (const auto& arg : params) { | |
1474 | ASSERT_LE(cursor + arg.size() + 1, kArgBufferSize); | |
1475 | ASSERT_LE(argc + 1, kMaxArgCount); | |
1476 | ||
1477 | snprintf(arg_buffer + cursor, arg.size() + 1, "%s", arg.c_str()); | |
1478 | ||
1479 | argv[argc++] = arg_buffer + cursor; | |
1480 | cursor += static_cast<int>(arg.size()) + 1; | |
1481 | } | |
1482 | ASSERT_EQ(0, ROCKSDB_NAMESPACE::io_tracer_parser(argc, argv)); | |
1483 | } | |
1484 | #endif // GFLAGS | |
1485 | #endif // ROCKSDB_LITE | |
1486 | } // namespace | |
1487 | ||
1488 | // Tests the default implementation of ReadAsync API with PosixFileSystem. | |
1489 | TEST_P(PrefetchTest, ReadAsyncWithPosixFS) { | |
1490 | if (mem_env_ || encrypted_env_) { | |
1491 | ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); | |
1492 | return; | |
1493 | } | |
1494 | ||
1495 | const int kNumKeys = 1000; | |
1496 | std::shared_ptr<MockFS> fs = std::make_shared<MockFS>( | |
1497 | FileSystem::Default(), /*support_prefetch=*/false); | |
1498 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
1499 | ||
1500 | bool use_direct_io = std::get<0>(GetParam()); | |
1501 | Options options = CurrentOptions(); | |
1502 | options.write_buffer_size = 1024; | |
1503 | options.create_if_missing = true; | |
1504 | options.compression = kNoCompression; | |
1505 | options.env = env.get(); | |
1506 | options.statistics = CreateDBStatistics(); | |
1507 | if (use_direct_io) { | |
1508 | options.use_direct_reads = true; | |
1509 | options.use_direct_io_for_flush_and_compaction = true; | |
1510 | } | |
1511 | BlockBasedTableOptions table_options; | |
1512 | table_options.no_block_cache = true; | |
1513 | table_options.cache_index_and_filter_blocks = false; | |
1514 | table_options.metadata_block_size = 1024; | |
1515 | table_options.index_type = | |
1516 | BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; | |
1517 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
1518 | ||
1519 | Status s = TryReopen(options); | |
1520 | if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
1521 | // If direct IO is not supported, skip the test | |
1522 | return; | |
1523 | } else { | |
1524 | ASSERT_OK(s); | |
1525 | } | |
1526 | ||
1527 | int total_keys = 0; | |
1528 | // Write the keys. | |
1529 | { | |
1530 | WriteBatch batch; | |
1531 | Random rnd(309); | |
1532 | for (int j = 0; j < 5; j++) { | |
1533 | for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { | |
1534 | ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); | |
1535 | total_keys++; | |
1536 | } | |
1537 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
1538 | ASSERT_OK(Flush()); | |
1539 | } | |
1540 | MoveFilesToLevel(2); | |
1541 | } | |
1542 | ||
1543 | int buff_prefetch_count = 0; | |
1544 | bool read_async_called = false; | |
1545 | ReadOptions ro; | |
1546 | ro.adaptive_readahead = true; | |
1547 | ro.async_io = true; | |
1548 | ||
1549 | if (std::get<1>(GetParam())) { | |
1550 | ro.readahead_size = 16 * 1024; | |
1551 | } | |
1552 | ||
1553 | SyncPoint::GetInstance()->SetCallBack( | |
1554 | "FilePrefetchBuffer::PrefetchAsyncInternal:Start", | |
1555 | [&](void*) { buff_prefetch_count++; }); | |
1556 | ||
1557 | SyncPoint::GetInstance()->SetCallBack( | |
1558 | "UpdateResults::io_uring_result", | |
1559 | [&](void* /*arg*/) { read_async_called = true; }); | |
1560 | SyncPoint::GetInstance()->EnableProcessing(); | |
1561 | ||
1562 | // Read the keys. | |
1563 | { | |
1564 | ASSERT_OK(options.statistics->Reset()); | |
1565 | get_perf_context()->Reset(); | |
1566 | ||
1567 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); | |
1568 | int num_keys = 0; | |
1569 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
1570 | ASSERT_OK(iter->status()); | |
1571 | num_keys++; | |
1572 | } | |
1573 | ||
1574 | ASSERT_EQ(num_keys, total_keys); | |
1575 | ASSERT_GT(buff_prefetch_count, 0); | |
1576 | ||
1577 | // Check stats to make sure async prefetch is done. | |
1578 | { | |
1579 | HistogramData async_read_bytes; | |
1580 | options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); | |
1581 | HistogramData prefetched_bytes_discarded; | |
1582 | options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, | |
1583 | &prefetched_bytes_discarded); | |
1584 | ||
1585 | // Not all platforms support iouring. In that case, ReadAsync in posix | |
1586 | // won't submit async requests. | |
1587 | if (read_async_called) { | |
1588 | ASSERT_GT(async_read_bytes.count, 0); | |
1589 | } else { | |
1590 | ASSERT_EQ(async_read_bytes.count, 0); | |
1591 | } | |
1592 | ASSERT_GT(prefetched_bytes_discarded.count, 0); | |
1593 | } | |
1594 | ASSERT_EQ(get_perf_context()->number_async_seek, 0); | |
1595 | } | |
1596 | ||
1597 | SyncPoint::GetInstance()->DisableProcessing(); | |
1598 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
1599 | ||
1600 | Close(); | |
1601 | } | |
1602 | ||
1603 | TEST_P(PrefetchTest, MultipleSeekWithPosixFS) { | |
1604 | if (mem_env_ || encrypted_env_) { | |
1605 | ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); | |
1606 | return; | |
1607 | } | |
1608 | ||
1609 | const int kNumKeys = 1000; | |
1610 | std::shared_ptr<MockFS> fs = std::make_shared<MockFS>( | |
1611 | FileSystem::Default(), /*support_prefetch=*/false); | |
1612 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
1613 | ||
1614 | bool use_direct_io = std::get<0>(GetParam()); | |
1615 | Options options = CurrentOptions(); | |
1616 | options.write_buffer_size = 1024; | |
1617 | options.create_if_missing = true; | |
1618 | options.compression = kNoCompression; | |
1619 | options.env = env.get(); | |
1620 | options.statistics = CreateDBStatistics(); | |
1621 | if (use_direct_io) { | |
1622 | options.use_direct_reads = true; | |
1623 | options.use_direct_io_for_flush_and_compaction = true; | |
1624 | } | |
1625 | BlockBasedTableOptions table_options; | |
1626 | table_options.no_block_cache = true; | |
1627 | table_options.cache_index_and_filter_blocks = false; | |
1628 | table_options.metadata_block_size = 1024; | |
1629 | table_options.index_type = | |
1630 | BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; | |
1631 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
1632 | ||
1633 | Status s = TryReopen(options); | |
1634 | if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
1635 | // If direct IO is not supported, skip the test | |
1636 | return; | |
1637 | } else { | |
1638 | ASSERT_OK(s); | |
1639 | } | |
1640 | ||
1641 | int total_keys = 0; | |
1642 | // Write the keys. | |
1643 | { | |
1644 | WriteBatch batch; | |
1645 | Random rnd(309); | |
1646 | for (int j = 0; j < 5; j++) { | |
1647 | for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { | |
1648 | ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); | |
1649 | total_keys++; | |
1650 | } | |
1651 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
1652 | ASSERT_OK(Flush()); | |
1653 | } | |
1654 | MoveFilesToLevel(2); | |
1655 | } | |
1656 | ||
1657 | int num_keys_first_batch = 0; | |
1658 | int num_keys_second_batch = 0; | |
1659 | // Calculate number of keys without async_io for correctness validation. | |
1660 | { | |
1661 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ReadOptions())); | |
1662 | // First Seek. | |
1663 | iter->Seek(BuildKey(450)); | |
1664 | while (iter->Valid() && num_keys_first_batch < 100) { | |
1665 | ASSERT_OK(iter->status()); | |
1666 | num_keys_first_batch++; | |
1667 | iter->Next(); | |
1668 | } | |
1669 | ASSERT_OK(iter->status()); | |
1670 | ||
1671 | iter->Seek(BuildKey(942)); | |
1672 | while (iter->Valid()) { | |
1673 | ASSERT_OK(iter->status()); | |
1674 | num_keys_second_batch++; | |
1675 | iter->Next(); | |
1676 | } | |
1677 | ASSERT_OK(iter->status()); | |
1678 | } | |
1679 | ||
1680 | int buff_prefetch_count = 0; | |
1681 | bool read_async_called = false; | |
1682 | ReadOptions ro; | |
1683 | ro.adaptive_readahead = true; | |
1684 | ro.async_io = true; | |
1685 | ||
1686 | if (std::get<1>(GetParam())) { | |
1687 | ro.readahead_size = 16 * 1024; | |
1688 | } | |
1689 | ||
1690 | SyncPoint::GetInstance()->SetCallBack( | |
1691 | "FilePrefetchBuffer::PrefetchAsyncInternal:Start", | |
1692 | [&](void*) { buff_prefetch_count++; }); | |
1693 | ||
1694 | SyncPoint::GetInstance()->SetCallBack( | |
1695 | "UpdateResults::io_uring_result", | |
1696 | [&](void* /*arg*/) { read_async_called = true; }); | |
1697 | SyncPoint::GetInstance()->EnableProcessing(); | |
1698 | ||
1699 | // Read the keys using seek. | |
1700 | { | |
1701 | ASSERT_OK(options.statistics->Reset()); | |
1702 | get_perf_context()->Reset(); | |
1703 | ||
1704 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); | |
1705 | int num_keys = 0; | |
1706 | // First Seek. | |
1707 | { | |
1708 | iter->Seek(BuildKey(450)); | |
1709 | while (iter->Valid() && num_keys < 100) { | |
1710 | ASSERT_OK(iter->status()); | |
1711 | num_keys++; | |
1712 | iter->Next(); | |
1713 | } | |
1714 | ASSERT_OK(iter->status()); | |
1715 | ASSERT_EQ(num_keys, num_keys_first_batch); | |
1716 | // Check stats to make sure async prefetch is done. | |
1717 | { | |
1718 | HistogramData async_read_bytes; | |
1719 | options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); | |
1720 | ||
1721 | // Not all platforms support iouring. In that case, ReadAsync in posix | |
1722 | // won't submit async requests. | |
1723 | if (read_async_called) { | |
1724 | ASSERT_GT(async_read_bytes.count, 0); | |
1725 | ASSERT_GT(get_perf_context()->number_async_seek, 0); | |
1726 | } else { | |
1727 | ASSERT_EQ(async_read_bytes.count, 0); | |
1728 | ASSERT_EQ(get_perf_context()->number_async_seek, 0); | |
1729 | } | |
1730 | } | |
1731 | } | |
1732 | ||
1733 | // Second Seek. | |
1734 | { | |
1735 | num_keys = 0; | |
1736 | ASSERT_OK(options.statistics->Reset()); | |
1737 | get_perf_context()->Reset(); | |
1738 | ||
1739 | iter->Seek(BuildKey(942)); | |
1740 | while (iter->Valid()) { | |
1741 | ASSERT_OK(iter->status()); | |
1742 | num_keys++; | |
1743 | iter->Next(); | |
1744 | } | |
1745 | ASSERT_OK(iter->status()); | |
1746 | ASSERT_EQ(num_keys, num_keys_second_batch); | |
1747 | ||
1748 | ASSERT_GT(buff_prefetch_count, 0); | |
1749 | ||
1750 | // Check stats to make sure async prefetch is done. | |
1751 | { | |
1752 | HistogramData async_read_bytes; | |
1753 | options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); | |
1754 | HistogramData prefetched_bytes_discarded; | |
1755 | options.statistics->histogramData(PREFETCHED_BYTES_DISCARDED, | |
1756 | &prefetched_bytes_discarded); | |
1757 | ||
1758 | // Not all platforms support iouring. In that case, ReadAsync in posix | |
1759 | // won't submit async requests. | |
1760 | if (read_async_called) { | |
1761 | ASSERT_GT(async_read_bytes.count, 0); | |
1762 | ASSERT_GT(get_perf_context()->number_async_seek, 0); | |
1763 | } else { | |
1764 | ASSERT_EQ(async_read_bytes.count, 0); | |
1765 | ASSERT_EQ(get_perf_context()->number_async_seek, 0); | |
1766 | } | |
1767 | ASSERT_GT(prefetched_bytes_discarded.count, 0); | |
1768 | } | |
1769 | } | |
1770 | } | |
1771 | ||
1772 | SyncPoint::GetInstance()->DisableProcessing(); | |
1773 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
1774 | Close(); | |
1775 | } | |
1776 | ||
1777 | TEST_P(PrefetchTest, SeekParallelizationTest1) { | |
1778 | if (mem_env_ || encrypted_env_) { | |
1779 | ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); | |
1780 | return; | |
1781 | } | |
1782 | const int kNumKeys = 2000; | |
1783 | // Set options | |
1784 | std::shared_ptr<MockFS> fs = std::make_shared<MockFS>( | |
1785 | FileSystem::Default(), /*support_prefetch=*/false); | |
1786 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
1787 | ||
1788 | bool use_direct_io = std::get<0>(GetParam()); | |
1789 | Options options = CurrentOptions(); | |
1790 | options.write_buffer_size = 1024; | |
1791 | options.create_if_missing = true; | |
1792 | options.compression = kNoCompression; | |
1793 | options.env = env.get(); | |
1794 | if (use_direct_io) { | |
1795 | options.use_direct_reads = true; | |
1796 | options.use_direct_io_for_flush_and_compaction = true; | |
1797 | } | |
1798 | ||
1799 | options.statistics = CreateDBStatistics(); | |
1800 | BlockBasedTableOptions table_options; | |
1801 | table_options.no_block_cache = true; | |
1802 | table_options.cache_index_and_filter_blocks = false; | |
1803 | table_options.metadata_block_size = 1024; | |
1804 | table_options.index_type = | |
1805 | BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; | |
1806 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
1807 | ||
1808 | Status s = TryReopen(options); | |
1809 | if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
1810 | // If direct IO is not supported, skip the test | |
1811 | return; | |
1812 | } else { | |
1813 | ASSERT_OK(s); | |
1814 | } | |
1815 | ||
1816 | WriteBatch batch; | |
1817 | Random rnd(309); | |
1818 | for (int i = 0; i < kNumKeys; i++) { | |
1819 | ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); | |
1820 | } | |
1821 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
1822 | ||
1823 | std::string start_key = BuildKey(0); | |
1824 | std::string end_key = BuildKey(kNumKeys - 1); | |
1825 | Slice least(start_key.data(), start_key.size()); | |
1826 | Slice greatest(end_key.data(), end_key.size()); | |
1827 | ||
1828 | ASSERT_OK(db_->CompactRange(CompactRangeOptions(), &least, &greatest)); | |
1829 | ||
1830 | int buff_prefetch_count = 0; | |
1831 | ||
1832 | SyncPoint::GetInstance()->SetCallBack( | |
1833 | "FilePrefetchBuffer::PrefetchAsyncInternal:Start", | |
1834 | [&](void*) { buff_prefetch_count++; }); | |
1835 | ||
1836 | bool read_async_called = false; | |
1837 | SyncPoint::GetInstance()->SetCallBack( | |
1838 | "UpdateResults::io_uring_result", | |
1839 | [&](void* /*arg*/) { read_async_called = true; }); | |
1840 | SyncPoint::GetInstance()->EnableProcessing(); | |
1841 | ||
1842 | SyncPoint::GetInstance()->EnableProcessing(); | |
1843 | ReadOptions ro; | |
1844 | ro.adaptive_readahead = true; | |
1845 | ro.async_io = true; | |
1846 | ||
1847 | if (std::get<1>(GetParam())) { | |
1848 | ro.readahead_size = 16 * 1024; | |
1849 | } | |
1850 | ||
1851 | { | |
1852 | ASSERT_OK(options.statistics->Reset()); | |
1853 | // Each block contains around 4 keys. | |
1854 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); | |
1855 | iter->Seek(BuildKey(0)); // Prefetch data because of seek parallelization. | |
1856 | ASSERT_TRUE(iter->Valid()); | |
1857 | iter->Next(); | |
1858 | ASSERT_TRUE(iter->Valid()); | |
1859 | iter->Next(); | |
1860 | ASSERT_TRUE(iter->Valid()); | |
1861 | iter->Next(); | |
1862 | ASSERT_TRUE(iter->Valid()); | |
1863 | ||
1864 | // New data block. Since num_file_reads in FilePrefetch after this read is | |
1865 | // 2, it won't go for prefetching. | |
1866 | iter->Next(); | |
1867 | ASSERT_TRUE(iter->Valid()); | |
1868 | iter->Next(); | |
1869 | ASSERT_TRUE(iter->Valid()); | |
1870 | iter->Next(); | |
1871 | ASSERT_TRUE(iter->Valid()); | |
1872 | iter->Next(); | |
1873 | ASSERT_TRUE(iter->Valid()); | |
1874 | ||
1875 | // Prefetch data. | |
1876 | iter->Next(); | |
1877 | ASSERT_TRUE(iter->Valid()); | |
1878 | ||
1879 | // Check stats to make sure async prefetch is done. | |
1880 | { | |
1881 | HistogramData async_read_bytes; | |
1882 | options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); | |
1883 | // Not all platforms support iouring. In that case, ReadAsync in posix | |
1884 | // won't submit async requests. | |
1885 | if (read_async_called) { | |
1886 | ASSERT_GT(async_read_bytes.count, 0); | |
1887 | ASSERT_GT(get_perf_context()->number_async_seek, 0); | |
1888 | if (std::get<1>(GetParam())) { | |
1889 | ASSERT_EQ(buff_prefetch_count, 1); | |
1890 | } else { | |
1891 | ASSERT_EQ(buff_prefetch_count, 2); | |
1892 | } | |
1893 | } else { | |
1894 | ASSERT_EQ(async_read_bytes.count, 0); | |
1895 | ASSERT_EQ(get_perf_context()->number_async_seek, 0); | |
1896 | ASSERT_EQ(buff_prefetch_count, 1); | |
1897 | } | |
1898 | } | |
1899 | ||
1900 | buff_prefetch_count = 0; | |
1901 | } | |
1902 | Close(); | |
1903 | } | |
1904 | ||
1905 | #ifndef ROCKSDB_LITE | |
1906 | #ifdef GFLAGS | |
1907 | TEST_P(PrefetchTest, TraceReadAsyncWithCallbackWrapper) { | |
1908 | if (mem_env_ || encrypted_env_) { | |
1909 | ROCKSDB_GTEST_SKIP("Test requires non-mem or non-encrypted environment"); | |
1910 | return; | |
1911 | } | |
1912 | ||
1913 | const int kNumKeys = 1000; | |
1914 | std::shared_ptr<MockFS> fs = std::make_shared<MockFS>( | |
1915 | FileSystem::Default(), /*support_prefetch=*/false); | |
1916 | std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); | |
1917 | ||
1918 | bool use_direct_io = std::get<0>(GetParam()); | |
1919 | Options options = CurrentOptions(); | |
1920 | options.write_buffer_size = 1024; | |
1921 | options.create_if_missing = true; | |
1922 | options.compression = kNoCompression; | |
1923 | options.env = env.get(); | |
1924 | options.statistics = CreateDBStatistics(); | |
1925 | if (use_direct_io) { | |
1926 | options.use_direct_reads = true; | |
1927 | options.use_direct_io_for_flush_and_compaction = true; | |
1928 | } | |
1929 | BlockBasedTableOptions table_options; | |
1930 | table_options.no_block_cache = true; | |
1931 | table_options.cache_index_and_filter_blocks = false; | |
1932 | table_options.metadata_block_size = 1024; | |
1933 | table_options.index_type = | |
1934 | BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; | |
1935 | options.table_factory.reset(NewBlockBasedTableFactory(table_options)); | |
1936 | ||
1937 | Status s = TryReopen(options); | |
1938 | if (use_direct_io && (s.IsNotSupported() || s.IsInvalidArgument())) { | |
1939 | // If direct IO is not supported, skip the test | |
1940 | return; | |
1941 | } else { | |
1942 | ASSERT_OK(s); | |
1943 | } | |
1944 | ||
1945 | int total_keys = 0; | |
1946 | // Write the keys. | |
1947 | { | |
1948 | WriteBatch batch; | |
1949 | Random rnd(309); | |
1950 | for (int j = 0; j < 5; j++) { | |
1951 | for (int i = j * kNumKeys; i < (j + 1) * kNumKeys; i++) { | |
1952 | ASSERT_OK(batch.Put(BuildKey(i), rnd.RandomString(1000))); | |
1953 | total_keys++; | |
1954 | } | |
1955 | ASSERT_OK(db_->Write(WriteOptions(), &batch)); | |
1956 | ASSERT_OK(Flush()); | |
1957 | } | |
1958 | MoveFilesToLevel(2); | |
1959 | } | |
1960 | ||
1961 | int buff_prefetch_count = 0; | |
1962 | bool read_async_called = false; | |
1963 | ReadOptions ro; | |
1964 | ro.adaptive_readahead = true; | |
1965 | ro.async_io = true; | |
1966 | ||
1967 | if (std::get<1>(GetParam())) { | |
1968 | ro.readahead_size = 16 * 1024; | |
1969 | } | |
1970 | ||
1971 | SyncPoint::GetInstance()->SetCallBack( | |
1972 | "FilePrefetchBuffer::PrefetchAsyncInternal:Start", | |
1973 | [&](void*) { buff_prefetch_count++; }); | |
1974 | ||
1975 | SyncPoint::GetInstance()->SetCallBack( | |
1976 | "UpdateResults::io_uring_result", | |
1977 | [&](void* /*arg*/) { read_async_called = true; }); | |
1978 | SyncPoint::GetInstance()->EnableProcessing(); | |
1979 | ||
1980 | // Read the keys. | |
1981 | { | |
1982 | // Start io_tracing. | |
1983 | WriteOptions write_opt; | |
1984 | TraceOptions trace_opt; | |
1985 | std::unique_ptr<TraceWriter> trace_writer; | |
1986 | std::string trace_file_path = dbname_ + "/io_trace_file"; | |
1987 | ||
1988 | ASSERT_OK( | |
1989 | NewFileTraceWriter(env_, EnvOptions(), trace_file_path, &trace_writer)); | |
1990 | ASSERT_OK(db_->StartIOTrace(trace_opt, std::move(trace_writer))); | |
1991 | ASSERT_OK(options.statistics->Reset()); | |
1992 | ||
1993 | auto iter = std::unique_ptr<Iterator>(db_->NewIterator(ro)); | |
1994 | int num_keys = 0; | |
1995 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
1996 | ASSERT_OK(iter->status()); | |
1997 | num_keys++; | |
1998 | } | |
1999 | ||
2000 | // End the tracing. | |
2001 | ASSERT_OK(db_->EndIOTrace()); | |
2002 | ASSERT_OK(env_->FileExists(trace_file_path)); | |
2003 | ||
2004 | ASSERT_EQ(num_keys, total_keys); | |
2005 | ASSERT_GT(buff_prefetch_count, 0); | |
2006 | ||
2007 | // Check stats to make sure async prefetch is done. | |
2008 | { | |
2009 | HistogramData async_read_bytes; | |
2010 | options.statistics->histogramData(ASYNC_READ_BYTES, &async_read_bytes); | |
2011 | // Not all platforms support iouring. In that case, ReadAsync in posix | |
2012 | // won't submit async requests. | |
2013 | if (read_async_called) { | |
2014 | ASSERT_GT(async_read_bytes.count, 0); | |
2015 | } else { | |
2016 | ASSERT_EQ(async_read_bytes.count, 0); | |
2017 | } | |
2018 | } | |
2019 | ||
2020 | // Check the file to see if ReadAsync is logged. | |
2021 | RunIOTracerParserTool(trace_file_path); | |
2022 | } | |
2023 | ||
2024 | SyncPoint::GetInstance()->DisableProcessing(); | |
2025 | SyncPoint::GetInstance()->ClearAllCallBacks(); | |
2026 | ||
2027 | Close(); | |
2028 | } | |
2029 | #endif // GFLAGS | |
2030 | ||
2031 | class FilePrefetchBufferTest : public testing::Test { | |
2032 | public: | |
2033 | void SetUp() override { | |
2034 | SetupSyncPointsToMockDirectIO(); | |
2035 | env_ = Env::Default(); | |
2036 | fs_ = FileSystem::Default(); | |
2037 | test_dir_ = test::PerThreadDBPath("file_prefetch_buffer_test"); | |
2038 | ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr)); | |
2039 | } | |
2040 | ||
2041 | void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); } | |
2042 | ||
2043 | void Write(const std::string& fname, const std::string& content) { | |
2044 | std::unique_ptr<FSWritableFile> f; | |
2045 | ASSERT_OK(fs_->NewWritableFile(Path(fname), FileOptions(), &f, nullptr)); | |
2046 | ASSERT_OK(f->Append(content, IOOptions(), nullptr)); | |
2047 | ASSERT_OK(f->Close(IOOptions(), nullptr)); | |
2048 | } | |
2049 | ||
2050 | void Read(const std::string& fname, const FileOptions& opts, | |
2051 | std::unique_ptr<RandomAccessFileReader>* reader) { | |
2052 | std::string fpath = Path(fname); | |
2053 | std::unique_ptr<FSRandomAccessFile> f; | |
2054 | ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr)); | |
2055 | reader->reset(new RandomAccessFileReader(std::move(f), fpath, | |
2056 | env_->GetSystemClock().get())); | |
2057 | } | |
2058 | ||
2059 | void AssertResult(const std::string& content, | |
2060 | const std::vector<FSReadRequest>& reqs) { | |
2061 | for (const auto& r : reqs) { | |
2062 | ASSERT_OK(r.status); | |
2063 | ASSERT_EQ(r.len, r.result.size()); | |
2064 | ASSERT_EQ(content.substr(r.offset, r.len), r.result.ToString()); | |
2065 | } | |
2066 | } | |
2067 | ||
2068 | FileSystem* fs() { return fs_.get(); } | |
2069 | ||
2070 | private: | |
2071 | Env* env_; | |
2072 | std::shared_ptr<FileSystem> fs_; | |
2073 | std::string test_dir_; | |
2074 | ||
2075 | std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; } | |
2076 | }; | |
2077 | ||
2078 | TEST_F(FilePrefetchBufferTest, SeekWithBlockCacheHit) { | |
2079 | std::string fname = "seek-with-block-cache-hit"; | |
2080 | Random rand(0); | |
2081 | std::string content = rand.RandomString(32768); | |
2082 | Write(fname, content); | |
2083 | ||
2084 | FileOptions opts; | |
2085 | std::unique_ptr<RandomAccessFileReader> r; | |
2086 | Read(fname, opts, &r); | |
2087 | ||
2088 | FilePrefetchBuffer fpb(16384, 16384, true, false, false, 0, 0, fs()); | |
2089 | Slice result; | |
2090 | // Simulate a seek of 4096 bytes at offset 0. Due to the readahead settings, | |
2091 | // it will do two reads of 4096+8192 and 8192 | |
2092 | Status s = fpb.PrefetchAsync(IOOptions(), r.get(), 0, 4096, &result); | |
2093 | ASSERT_EQ(s, Status::TryAgain()); | |
2094 | // Simulate a block cache hit | |
2095 | fpb.UpdateReadPattern(0, 4096, false); | |
2096 | // Now read some data that straddles the two prefetch buffers - offset 8192 to | |
2097 | // 16384 | |
2098 | ASSERT_TRUE(fpb.TryReadFromCacheAsync(IOOptions(), r.get(), 8192, 8192, | |
2099 | &result, &s, Env::IOPriority::IO_LOW)); | |
2100 | } | |
2101 | #endif // ROCKSDB_LITE | |
20effc67 TL |
2102 | } // namespace ROCKSDB_NAMESPACE |
2103 | ||
2104 | int main(int argc, char** argv) { | |
1e59de90 | 2105 | ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); |
20effc67 TL |
2106 | ::testing::InitGoogleTest(&argc, argv); |
2107 | ||
2108 | return RUN_ALL_TESTS(); | |
2109 | } |