1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "table/block_fetcher.h"
8 #include "db/table_properties_collector.h"
9 #include "env/composite_env_wrapper.h"
10 #include "file/file_util.h"
11 #include "options/options_helper.h"
12 #include "port/port.h"
13 #include "port/stack_trace.h"
14 #include "table/block_based/binary_search_index_reader.h"
15 #include "table/block_based/block_based_table_builder.h"
16 #include "table/block_based/block_based_table_factory.h"
17 #include "table/block_based/block_based_table_reader.h"
18 #include "table/format.h"
19 #include "test_util/testharness.h"
21 namespace ROCKSDB_NAMESPACE
{
24 class CountedMemoryAllocator
: public MemoryAllocator
{
26 const char* Name() const override
{ return "CountedMemoryAllocator"; }
28 void* Allocate(size_t size
) override
{
30 return static_cast<void*>(new char[size
]);
33 void Deallocate(void* p
) override
{
35 delete[] static_cast<char*>(p
);
38 int GetNumAllocations() const { return num_allocations_
; }
39 int GetNumDeallocations() const { return num_deallocations_
; }
42 int num_allocations_
= 0;
43 int num_deallocations_
= 0;
47 int num_stack_buf_memcpy
;
48 int num_heap_buf_memcpy
;
49 int num_compressed_buf_memcpy
;
52 struct BufAllocationStats
{
53 int num_heap_buf_allocations
;
54 int num_compressed_buf_allocations
;
58 MemcpyStats memcpy_stats
;
59 BufAllocationStats buf_allocation_stats
;
62 class BlockFetcherTest
: public testing::Test
{
70 // use NumModes as array size to avoid "size of array '...' has non-integral
72 const static int NumModes
= static_cast<int>(Mode::kNumModes
);
75 void SetUp() override
{
76 SetupSyncPointsToMockDirectIO();
77 test_dir_
= test::PerThreadDBPath("block_fetcher_test");
78 env_
= Env::Default();
79 fs_
= FileSystem::Default();
80 ASSERT_OK(fs_
->CreateDir(test_dir_
, IOOptions(), nullptr));
83 void TearDown() override
{ EXPECT_OK(DestroyDir(env_
, test_dir_
)); }
85 void AssertSameBlock(const std::string
& block1
, const std::string
& block2
) {
86 ASSERT_EQ(block1
, block2
);
89 // Creates a table with kv pairs (i, i) where i ranges from 0 to 9, inclusive.
90 void CreateTable(const std::string
& table_name
,
91 const CompressionType
& compression_type
) {
92 std::unique_ptr
<WritableFileWriter
> writer
;
93 NewFileWriter(table_name
, &writer
);
95 // Create table builder.
96 ImmutableCFOptions
ioptions(options_
);
97 InternalKeyComparator
comparator(options_
.comparator
);
98 ColumnFamilyOptions
cf_options(options_
);
99 MutableCFOptions
moptions(cf_options
);
100 std::vector
<std::unique_ptr
<IntTblPropCollectorFactory
>> factories
;
101 std::unique_ptr
<TableBuilder
> table_builder(table_factory_
.NewTableBuilder(
102 TableBuilderOptions(ioptions
, moptions
, comparator
, &factories
,
103 compression_type
, 0 /* sample_for_compression */,
104 CompressionOptions(), false /* skip_filters */,
105 kDefaultColumnFamilyName
, -1 /* level */),
106 0 /* column_family_id */, writer
.get()));
109 for (int i
= 0; i
< 9; i
++) {
110 std::string key
= ToInternalKey(std::to_string(i
));
111 std::string value
= std::to_string(i
);
112 table_builder
->Add(key
, value
);
114 ASSERT_OK(table_builder
->Finish());
117 void FetchIndexBlock(const std::string
& table_name
,
118 CountedMemoryAllocator
* heap_buf_allocator
,
119 CountedMemoryAllocator
* compressed_buf_allocator
,
120 MemcpyStats
* memcpy_stats
, BlockContents
* index_block
,
121 std::string
* result
) {
122 FileOptions
fopt(options_
);
123 std::unique_ptr
<RandomAccessFileReader
> file
;
124 NewFileReader(table_name
, fopt
, &file
);
126 // Get handle of the index block.
128 ReadFooter(file
.get(), &footer
);
129 const BlockHandle
& index_handle
= footer
.index_handle();
131 CompressionType compression_type
;
132 FetchBlock(file
.get(), index_handle
, BlockType::kIndex
,
133 false /* compressed */, false /* do_uncompress */,
134 heap_buf_allocator
, compressed_buf_allocator
, index_block
,
135 memcpy_stats
, &compression_type
);
136 ASSERT_EQ(compression_type
, CompressionType::kNoCompression
);
137 result
->assign(index_block
->data
.ToString());
140 // Fetches the first data block in both direct IO and non-direct IO mode.
142 // compressed: whether the data blocks are compressed;
143 // do_uncompress: whether the data blocks should be uncompressed on fetching.
144 // compression_type: the expected compression type.
147 // Block contents are the same.
148 // Bufferr allocation and memory copy statistics are expected.
149 void TestFetchDataBlock(
150 const std::string
& table_name_prefix
, bool compressed
, bool do_uncompress
,
151 std::array
<TestStats
, NumModes
> expected_stats_by_mode
) {
152 for (CompressionType compression_type
: GetSupportedCompressions()) {
153 bool do_compress
= compression_type
!= kNoCompression
;
154 if (compressed
!= do_compress
) continue;
155 std::string compression_type_str
=
156 CompressionTypeToString(compression_type
);
158 std::string table_name
= table_name_prefix
+ compression_type_str
;
159 CreateTable(table_name
, compression_type
);
161 CompressionType expected_compression_type_after_fetch
=
162 (compressed
&& !do_uncompress
) ? compression_type
: kNoCompression
;
164 BlockContents blocks
[NumModes
];
165 std::string block_datas
[NumModes
];
166 MemcpyStats memcpy_stats
[NumModes
];
167 CountedMemoryAllocator heap_buf_allocators
[NumModes
];
168 CountedMemoryAllocator compressed_buf_allocators
[NumModes
];
169 for (int i
= 0; i
< NumModes
; ++i
) {
170 SetMode(static_cast<Mode
>(i
));
171 FetchFirstDataBlock(table_name
, compressed
, do_uncompress
,
172 expected_compression_type_after_fetch
,
173 &heap_buf_allocators
[i
],
174 &compressed_buf_allocators
[i
], &blocks
[i
],
175 &block_datas
[i
], &memcpy_stats
[i
]);
178 for (int i
= 0; i
< NumModes
- 1; ++i
) {
179 AssertSameBlock(block_datas
[i
], block_datas
[i
+ 1]);
182 // Check memcpy and buffer allocation statistics.
183 for (int i
= 0; i
< NumModes
; ++i
) {
184 const TestStats
& expected_stats
= expected_stats_by_mode
[i
];
186 ASSERT_EQ(memcpy_stats
[i
].num_stack_buf_memcpy
,
187 expected_stats
.memcpy_stats
.num_stack_buf_memcpy
);
188 ASSERT_EQ(memcpy_stats
[i
].num_heap_buf_memcpy
,
189 expected_stats
.memcpy_stats
.num_heap_buf_memcpy
);
190 ASSERT_EQ(memcpy_stats
[i
].num_compressed_buf_memcpy
,
191 expected_stats
.memcpy_stats
.num_compressed_buf_memcpy
);
193 ASSERT_EQ(heap_buf_allocators
[i
].GetNumAllocations(),
194 expected_stats
.buf_allocation_stats
.num_heap_buf_allocations
);
196 compressed_buf_allocators
[i
].GetNumAllocations(),
197 expected_stats
.buf_allocation_stats
.num_compressed_buf_allocations
);
199 // The allocated buffers are not deallocated until
200 // the block content is deleted.
201 ASSERT_EQ(heap_buf_allocators
[i
].GetNumDeallocations(), 0);
202 ASSERT_EQ(compressed_buf_allocators
[i
].GetNumDeallocations(), 0);
203 blocks
[i
].allocation
.reset();
204 ASSERT_EQ(heap_buf_allocators
[i
].GetNumDeallocations(),
205 expected_stats
.buf_allocation_stats
.num_heap_buf_allocations
);
207 compressed_buf_allocators
[i
].GetNumDeallocations(),
208 expected_stats
.buf_allocation_stats
.num_compressed_buf_allocations
);
213 void SetMode(Mode mode
) {
215 case Mode::kBufferedRead
:
216 options_
.use_direct_reads
= false;
217 options_
.allow_mmap_reads
= false;
219 case Mode::kBufferedMmap
:
220 options_
.use_direct_reads
= false;
221 options_
.allow_mmap_reads
= true;
223 case Mode::kDirectRead
:
224 options_
.use_direct_reads
= true;
225 options_
.allow_mmap_reads
= false;
227 case Mode::kNumModes
:
233 std::string test_dir_
;
235 std::shared_ptr
<FileSystem
> fs_
;
236 BlockBasedTableFactory table_factory_
;
239 std::string
Path(const std::string
& fname
) { return test_dir_
+ "/" + fname
; }
241 void WriteToFile(const std::string
& content
, const std::string
& filename
) {
242 std::unique_ptr
<FSWritableFile
> f
;
243 ASSERT_OK(fs_
->NewWritableFile(Path(filename
), FileOptions(), &f
, nullptr));
244 ASSERT_OK(f
->Append(content
, IOOptions(), nullptr));
245 ASSERT_OK(f
->Close(IOOptions(), nullptr));
248 void NewFileWriter(const std::string
& filename
,
249 std::unique_ptr
<WritableFileWriter
>* writer
) {
250 std::string path
= Path(filename
);
251 EnvOptions env_options
;
252 std::unique_ptr
<WritableFile
> file
;
253 ASSERT_OK(env_
->NewWritableFile(path
, &file
, env_options
));
254 writer
->reset(new WritableFileWriter(
255 NewLegacyWritableFileWrapper(std::move(file
)), path
, env_options
));
258 void NewFileReader(const std::string
& filename
, const FileOptions
& opt
,
259 std::unique_ptr
<RandomAccessFileReader
>* reader
) {
260 std::string path
= Path(filename
);
261 std::unique_ptr
<FSRandomAccessFile
> f
;
262 ASSERT_OK(fs_
->NewRandomAccessFile(path
, opt
, &f
, nullptr));
263 reader
->reset(new RandomAccessFileReader(std::move(f
), path
, env_
));
266 void NewTableReader(const ImmutableCFOptions
& ioptions
,
267 const FileOptions
& foptions
,
268 const InternalKeyComparator
& comparator
,
269 const std::string
& table_name
,
270 std::unique_ptr
<BlockBasedTable
>* table
) {
271 std::unique_ptr
<RandomAccessFileReader
> file
;
272 NewFileReader(table_name
, foptions
, &file
);
274 uint64_t file_size
= 0;
275 ASSERT_OK(env_
->GetFileSize(Path(table_name
), &file_size
));
277 std::unique_ptr
<TableReader
> table_reader
;
279 const auto* table_options
=
280 table_factory_
.GetOptions
<BlockBasedTableOptions
>();
281 ASSERT_NE(table_options
, nullptr);
282 ASSERT_OK(BlockBasedTable::Open(ro
, ioptions
, EnvOptions(), *table_options
,
283 comparator
, std::move(file
), file_size
,
286 table
->reset(reinterpret_cast<BlockBasedTable
*>(table_reader
.release()));
289 std::string
ToInternalKey(const std::string
& key
) {
290 InternalKey
internal_key(key
, 0, ValueType::kTypeValue
);
291 return internal_key
.Encode().ToString();
294 void ReadFooter(RandomAccessFileReader
* file
, Footer
* footer
) {
295 uint64_t file_size
= 0;
296 ASSERT_OK(env_
->GetFileSize(file
->file_name(), &file_size
));
298 ASSERT_OK(ReadFooterFromFile(opts
, file
, nullptr /* prefetch_buffer */,
300 kBlockBasedTableMagicNumber
));
303 // NOTE: compression_type returns the compression type of the fetched block
304 // contents, so if the block is fetched and uncompressed, then it's
306 void FetchBlock(RandomAccessFileReader
* file
, const BlockHandle
& block
,
307 BlockType block_type
, bool compressed
, bool do_uncompress
,
308 MemoryAllocator
* heap_buf_allocator
,
309 MemoryAllocator
* compressed_buf_allocator
,
310 BlockContents
* contents
, MemcpyStats
* stats
,
311 CompressionType
* compresstion_type
) {
312 ImmutableCFOptions
ioptions(options_
);
313 ReadOptions roptions
;
314 PersistentCacheOptions persistent_cache_options
;
316 ReadFooter(file
, &footer
);
317 std::unique_ptr
<BlockFetcher
> fetcher(new BlockFetcher(
318 file
, nullptr /* prefetch_buffer */, footer
, roptions
, block
, contents
,
319 ioptions
, do_uncompress
, compressed
, block_type
,
320 UncompressionDict::GetEmptyDict(), persistent_cache_options
,
321 heap_buf_allocator
, compressed_buf_allocator
));
323 ASSERT_OK(fetcher
->ReadBlockContents());
325 stats
->num_stack_buf_memcpy
= fetcher
->TEST_GetNumStackBufMemcpy();
326 stats
->num_heap_buf_memcpy
= fetcher
->TEST_GetNumHeapBufMemcpy();
327 stats
->num_compressed_buf_memcpy
=
328 fetcher
->TEST_GetNumCompressedBufMemcpy();
330 *compresstion_type
= fetcher
->get_compression_type();
333 // NOTE: expected_compression_type is the expected compression
334 // type of the fetched block content, if the block is uncompressed,
335 // then the expected compression type is kNoCompression.
336 void FetchFirstDataBlock(const std::string
& table_name
, bool compressed
,
338 CompressionType expected_compression_type
,
339 MemoryAllocator
* heap_buf_allocator
,
340 MemoryAllocator
* compressed_buf_allocator
,
341 BlockContents
* block
, std::string
* result
,
342 MemcpyStats
* memcpy_stats
) {
343 ImmutableCFOptions
ioptions(options_
);
344 InternalKeyComparator
comparator(options_
.comparator
);
345 FileOptions
foptions(options_
);
347 // Get block handle for the first data block.
348 std::unique_ptr
<BlockBasedTable
> table
;
349 NewTableReader(ioptions
, foptions
, comparator
, table_name
, &table
);
351 std::unique_ptr
<BlockBasedTable::IndexReader
> index_reader
;
353 ASSERT_OK(BinarySearchIndexReader::Create(
354 table
.get(), ro
, nullptr /* prefetch_buffer */, false /* use_cache */,
355 false /* prefetch */, false /* pin */, nullptr /* lookup_context */,
358 std::unique_ptr
<InternalIteratorBase
<IndexValue
>> iter(
359 index_reader
->NewIterator(
360 ReadOptions(), false /* disable_prefix_seek */, nullptr /* iter */,
361 nullptr /* get_context */, nullptr /* lookup_context */));
362 ASSERT_OK(iter
->status());
364 BlockHandle first_block_handle
= iter
->value().handle
;
366 // Fetch first data block.
367 std::unique_ptr
<RandomAccessFileReader
> file
;
368 NewFileReader(table_name
, foptions
, &file
);
369 CompressionType compression_type
;
370 FetchBlock(file
.get(), first_block_handle
, BlockType::kData
, compressed
,
371 do_uncompress
, heap_buf_allocator
, compressed_buf_allocator
,
372 block
, memcpy_stats
, &compression_type
);
373 ASSERT_EQ(compression_type
, expected_compression_type
);
374 result
->assign(block
->data
.ToString());
378 // Skip the following tests in lite mode since direct I/O is unsupported.
381 // Fetch index block under both direct IO and non-direct IO.
383 // the index block contents are the same for both read modes.
384 TEST_F(BlockFetcherTest
, FetchIndexBlock
) {
385 for (CompressionType compression
: GetSupportedCompressions()) {
386 std::string table_name
=
387 "FetchIndexBlock" + CompressionTypeToString(compression
);
388 CreateTable(table_name
, compression
);
390 CountedMemoryAllocator allocator
;
391 MemcpyStats memcpy_stats
;
392 BlockContents indexes
[NumModes
];
393 std::string index_datas
[NumModes
];
394 for (int i
= 0; i
< NumModes
; ++i
) {
395 SetMode(static_cast<Mode
>(i
));
396 FetchIndexBlock(table_name
, &allocator
, &allocator
, &memcpy_stats
,
397 &indexes
[i
], &index_datas
[i
]);
399 for (int i
= 0; i
< NumModes
- 1; ++i
) {
400 AssertSameBlock(index_datas
[i
], index_datas
[i
+ 1]);
405 // Data blocks are not compressed,
406 // fetch data block under direct IO, mmap IO,and non-direct IO.
408 // 1. in non-direct IO mode, allocate a heap buffer and memcpy the block
410 // 2. in direct IO mode, allocate a heap buffer and memcpy from the
411 // direct IO buffer to the heap buffer.
412 TEST_F(BlockFetcherTest
, FetchUncompressedDataBlock
) {
413 TestStats expected_non_mmap_stats
= {
415 0 /* num_stack_buf_memcpy */,
416 1 /* num_heap_buf_memcpy */,
417 0 /* num_compressed_buf_memcpy */,
420 1 /* num_heap_buf_allocations */,
421 0 /* num_compressed_buf_allocations */,
423 TestStats expected_mmap_stats
= {{
424 0 /* num_stack_buf_memcpy */,
425 0 /* num_heap_buf_memcpy */,
426 0 /* num_compressed_buf_memcpy */,
429 0 /* num_heap_buf_allocations */,
430 0 /* num_compressed_buf_allocations */,
432 std::array
<TestStats
, NumModes
> expected_stats_by_mode
{{
433 expected_non_mmap_stats
/* kBufferedRead */,
434 expected_mmap_stats
/* kBufferedMmap */,
435 expected_non_mmap_stats
/* kDirectRead */,
437 TestFetchDataBlock("FetchUncompressedDataBlock", false, false,
438 expected_stats_by_mode
);
441 // Data blocks are compressed,
442 // fetch data block under both direct IO and non-direct IO,
443 // but do not uncompress.
445 // 1. in non-direct IO mode, allocate a compressed buffer and memcpy the block
447 // 2. in direct IO mode, allocate a compressed buffer and memcpy from the
448 // direct IO buffer to the compressed buffer.
449 TEST_F(BlockFetcherTest
, FetchCompressedDataBlock
) {
450 TestStats expected_non_mmap_stats
= {
452 0 /* num_stack_buf_memcpy */,
453 0 /* num_heap_buf_memcpy */,
454 1 /* num_compressed_buf_memcpy */,
457 0 /* num_heap_buf_allocations */,
458 1 /* num_compressed_buf_allocations */,
460 TestStats expected_mmap_stats
= {{
461 0 /* num_stack_buf_memcpy */,
462 0 /* num_heap_buf_memcpy */,
463 0 /* num_compressed_buf_memcpy */,
466 0 /* num_heap_buf_allocations */,
467 0 /* num_compressed_buf_allocations */,
469 std::array
<TestStats
, NumModes
> expected_stats_by_mode
{{
470 expected_non_mmap_stats
/* kBufferedRead */,
471 expected_mmap_stats
/* kBufferedMmap */,
472 expected_non_mmap_stats
/* kDirectRead */,
474 TestFetchDataBlock("FetchCompressedDataBlock", true, false,
475 expected_stats_by_mode
);
478 // Data blocks are compressed,
479 // fetch and uncompress data block under both direct IO and non-direct IO.
481 // 1. in non-direct IO mode, since the block is small, so it's first memcpyed
482 // to the stack buffer, then a heap buffer is allocated and the block is
483 // uncompressed into the heap.
484 // 2. in direct IO mode mode, allocate a heap buffer, then directly uncompress
485 // and memcpy from the direct IO buffer to the heap buffer.
486 TEST_F(BlockFetcherTest
, FetchAndUncompressCompressedDataBlock
) {
487 TestStats expected_buffered_read_stats
= {
489 1 /* num_stack_buf_memcpy */,
490 1 /* num_heap_buf_memcpy */,
491 0 /* num_compressed_buf_memcpy */,
494 1 /* num_heap_buf_allocations */,
495 0 /* num_compressed_buf_allocations */,
497 TestStats expected_mmap_stats
= {{
498 0 /* num_stack_buf_memcpy */,
499 1 /* num_heap_buf_memcpy */,
500 0 /* num_compressed_buf_memcpy */,
503 1 /* num_heap_buf_allocations */,
504 0 /* num_compressed_buf_allocations */,
506 TestStats expected_direct_read_stats
= {
508 0 /* num_stack_buf_memcpy */,
509 1 /* num_heap_buf_memcpy */,
510 0 /* num_compressed_buf_memcpy */,
513 1 /* num_heap_buf_allocations */,
514 0 /* num_compressed_buf_allocations */,
516 std::array
<TestStats
, NumModes
> expected_stats_by_mode
{{
517 expected_buffered_read_stats
,
519 expected_direct_read_stats
,
521 TestFetchDataBlock("FetchAndUncompressCompressedDataBlock", true, true,
522 expected_stats_by_mode
);
525 #endif // ROCKSDB_LITE
528 } // namespace ROCKSDB_NAMESPACE
530 int main(int argc
, char** argv
) {
531 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
532 ::testing::InitGoogleTest(&argc
, argv
);
533 return RUN_ALL_TESTS();