]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/table/block_fetcher_test.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / table / block_fetcher_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "table/block_fetcher.h"
7
8 #include "db/table_properties_collector.h"
9 #include "env/composite_env_wrapper.h"
10 #include "file/file_util.h"
11 #include "options/options_helper.h"
12 #include "port/port.h"
13 #include "port/stack_trace.h"
14 #include "table/block_based/binary_search_index_reader.h"
15 #include "table/block_based/block_based_table_builder.h"
16 #include "table/block_based/block_based_table_factory.h"
17 #include "table/block_based/block_based_table_reader.h"
18 #include "table/format.h"
19 #include "test_util/testharness.h"
20
21 namespace ROCKSDB_NAMESPACE {
22 namespace {
23
24 class CountedMemoryAllocator : public MemoryAllocator {
25 public:
26 const char* Name() const override { return "CountedMemoryAllocator"; }
27
28 void* Allocate(size_t size) override {
29 num_allocations_++;
30 return static_cast<void*>(new char[size]);
31 }
32
33 void Deallocate(void* p) override {
34 num_deallocations_++;
35 delete[] static_cast<char*>(p);
36 }
37
38 int GetNumAllocations() const { return num_allocations_; }
39 int GetNumDeallocations() const { return num_deallocations_; }
40
41 private:
42 int num_allocations_ = 0;
43 int num_deallocations_ = 0;
44 };
45
46 struct MemcpyStats {
47 int num_stack_buf_memcpy;
48 int num_heap_buf_memcpy;
49 int num_compressed_buf_memcpy;
50 };
51
52 struct BufAllocationStats {
53 int num_heap_buf_allocations;
54 int num_compressed_buf_allocations;
55 };
56
57 struct TestStats {
58 MemcpyStats memcpy_stats;
59 BufAllocationStats buf_allocation_stats;
60 };
61
62 class BlockFetcherTest : public testing::Test {
63 public:
64 enum class Mode {
65 kBufferedRead = 0,
66 kBufferedMmap,
67 kDirectRead,
68 kNumModes,
69 };
70 // use NumModes as array size to avoid "size of array '...' has non-integral
71 // type" errors.
72 const static int NumModes = static_cast<int>(Mode::kNumModes);
73
74 protected:
75 void SetUp() override {
76 SetupSyncPointsToMockDirectIO();
77 test_dir_ = test::PerThreadDBPath("block_fetcher_test");
78 env_ = Env::Default();
79 fs_ = FileSystem::Default();
80 ASSERT_OK(fs_->CreateDir(test_dir_, IOOptions(), nullptr));
81 }
82
83 void TearDown() override { EXPECT_OK(DestroyDir(env_, test_dir_)); }
84
85 void AssertSameBlock(const std::string& block1, const std::string& block2) {
86 ASSERT_EQ(block1, block2);
87 }
88
89 // Creates a table with kv pairs (i, i) where i ranges from 0 to 9, inclusive.
90 void CreateTable(const std::string& table_name,
91 const CompressionType& compression_type) {
92 std::unique_ptr<WritableFileWriter> writer;
93 NewFileWriter(table_name, &writer);
94
95 // Create table builder.
96 ImmutableCFOptions ioptions(options_);
97 InternalKeyComparator comparator(options_.comparator);
98 ColumnFamilyOptions cf_options(options_);
99 MutableCFOptions moptions(cf_options);
100 std::vector<std::unique_ptr<IntTblPropCollectorFactory>> factories;
101 std::unique_ptr<TableBuilder> table_builder(table_factory_.NewTableBuilder(
102 TableBuilderOptions(ioptions, moptions, comparator, &factories,
103 compression_type, 0 /* sample_for_compression */,
104 CompressionOptions(), false /* skip_filters */,
105 kDefaultColumnFamilyName, -1 /* level */),
106 0 /* column_family_id */, writer.get()));
107
108 // Build table.
109 for (int i = 0; i < 9; i++) {
110 std::string key = ToInternalKey(std::to_string(i));
111 std::string value = std::to_string(i);
112 table_builder->Add(key, value);
113 }
114 ASSERT_OK(table_builder->Finish());
115 }
116
117 void FetchIndexBlock(const std::string& table_name,
118 CountedMemoryAllocator* heap_buf_allocator,
119 CountedMemoryAllocator* compressed_buf_allocator,
120 MemcpyStats* memcpy_stats, BlockContents* index_block,
121 std::string* result) {
122 FileOptions fopt(options_);
123 std::unique_ptr<RandomAccessFileReader> file;
124 NewFileReader(table_name, fopt, &file);
125
126 // Get handle of the index block.
127 Footer footer;
128 ReadFooter(file.get(), &footer);
129 const BlockHandle& index_handle = footer.index_handle();
130
131 CompressionType compression_type;
132 FetchBlock(file.get(), index_handle, BlockType::kIndex,
133 false /* compressed */, false /* do_uncompress */,
134 heap_buf_allocator, compressed_buf_allocator, index_block,
135 memcpy_stats, &compression_type);
136 ASSERT_EQ(compression_type, CompressionType::kNoCompression);
137 result->assign(index_block->data.ToString());
138 }
139
140 // Fetches the first data block in both direct IO and non-direct IO mode.
141 //
142 // compressed: whether the data blocks are compressed;
143 // do_uncompress: whether the data blocks should be uncompressed on fetching.
144 // compression_type: the expected compression type.
145 //
146 // Expects:
147 // Block contents are the same.
148 // Bufferr allocation and memory copy statistics are expected.
149 void TestFetchDataBlock(
150 const std::string& table_name_prefix, bool compressed, bool do_uncompress,
151 std::array<TestStats, NumModes> expected_stats_by_mode) {
152 for (CompressionType compression_type : GetSupportedCompressions()) {
153 bool do_compress = compression_type != kNoCompression;
154 if (compressed != do_compress) continue;
155 std::string compression_type_str =
156 CompressionTypeToString(compression_type);
157
158 std::string table_name = table_name_prefix + compression_type_str;
159 CreateTable(table_name, compression_type);
160
161 CompressionType expected_compression_type_after_fetch =
162 (compressed && !do_uncompress) ? compression_type : kNoCompression;
163
164 BlockContents blocks[NumModes];
165 std::string block_datas[NumModes];
166 MemcpyStats memcpy_stats[NumModes];
167 CountedMemoryAllocator heap_buf_allocators[NumModes];
168 CountedMemoryAllocator compressed_buf_allocators[NumModes];
169 for (int i = 0; i < NumModes; ++i) {
170 SetMode(static_cast<Mode>(i));
171 FetchFirstDataBlock(table_name, compressed, do_uncompress,
172 expected_compression_type_after_fetch,
173 &heap_buf_allocators[i],
174 &compressed_buf_allocators[i], &blocks[i],
175 &block_datas[i], &memcpy_stats[i]);
176 }
177
178 for (int i = 0; i < NumModes - 1; ++i) {
179 AssertSameBlock(block_datas[i], block_datas[i + 1]);
180 }
181
182 // Check memcpy and buffer allocation statistics.
183 for (int i = 0; i < NumModes; ++i) {
184 const TestStats& expected_stats = expected_stats_by_mode[i];
185
186 ASSERT_EQ(memcpy_stats[i].num_stack_buf_memcpy,
187 expected_stats.memcpy_stats.num_stack_buf_memcpy);
188 ASSERT_EQ(memcpy_stats[i].num_heap_buf_memcpy,
189 expected_stats.memcpy_stats.num_heap_buf_memcpy);
190 ASSERT_EQ(memcpy_stats[i].num_compressed_buf_memcpy,
191 expected_stats.memcpy_stats.num_compressed_buf_memcpy);
192
193 ASSERT_EQ(heap_buf_allocators[i].GetNumAllocations(),
194 expected_stats.buf_allocation_stats.num_heap_buf_allocations);
195 ASSERT_EQ(
196 compressed_buf_allocators[i].GetNumAllocations(),
197 expected_stats.buf_allocation_stats.num_compressed_buf_allocations);
198
199 // The allocated buffers are not deallocated until
200 // the block content is deleted.
201 ASSERT_EQ(heap_buf_allocators[i].GetNumDeallocations(), 0);
202 ASSERT_EQ(compressed_buf_allocators[i].GetNumDeallocations(), 0);
203 blocks[i].allocation.reset();
204 ASSERT_EQ(heap_buf_allocators[i].GetNumDeallocations(),
205 expected_stats.buf_allocation_stats.num_heap_buf_allocations);
206 ASSERT_EQ(
207 compressed_buf_allocators[i].GetNumDeallocations(),
208 expected_stats.buf_allocation_stats.num_compressed_buf_allocations);
209 }
210 }
211 }
212
213 void SetMode(Mode mode) {
214 switch (mode) {
215 case Mode::kBufferedRead:
216 options_.use_direct_reads = false;
217 options_.allow_mmap_reads = false;
218 break;
219 case Mode::kBufferedMmap:
220 options_.use_direct_reads = false;
221 options_.allow_mmap_reads = true;
222 break;
223 case Mode::kDirectRead:
224 options_.use_direct_reads = true;
225 options_.allow_mmap_reads = false;
226 break;
227 case Mode::kNumModes:
228 assert(false);
229 }
230 }
231
232 private:
233 std::string test_dir_;
234 Env* env_;
235 std::shared_ptr<FileSystem> fs_;
236 BlockBasedTableFactory table_factory_;
237 Options options_;
238
239 std::string Path(const std::string& fname) { return test_dir_ + "/" + fname; }
240
241 void WriteToFile(const std::string& content, const std::string& filename) {
242 std::unique_ptr<FSWritableFile> f;
243 ASSERT_OK(fs_->NewWritableFile(Path(filename), FileOptions(), &f, nullptr));
244 ASSERT_OK(f->Append(content, IOOptions(), nullptr));
245 ASSERT_OK(f->Close(IOOptions(), nullptr));
246 }
247
248 void NewFileWriter(const std::string& filename,
249 std::unique_ptr<WritableFileWriter>* writer) {
250 std::string path = Path(filename);
251 EnvOptions env_options;
252 std::unique_ptr<WritableFile> file;
253 ASSERT_OK(env_->NewWritableFile(path, &file, env_options));
254 writer->reset(new WritableFileWriter(
255 NewLegacyWritableFileWrapper(std::move(file)), path, env_options));
256 }
257
258 void NewFileReader(const std::string& filename, const FileOptions& opt,
259 std::unique_ptr<RandomAccessFileReader>* reader) {
260 std::string path = Path(filename);
261 std::unique_ptr<FSRandomAccessFile> f;
262 ASSERT_OK(fs_->NewRandomAccessFile(path, opt, &f, nullptr));
263 reader->reset(new RandomAccessFileReader(std::move(f), path, env_));
264 }
265
266 void NewTableReader(const ImmutableCFOptions& ioptions,
267 const FileOptions& foptions,
268 const InternalKeyComparator& comparator,
269 const std::string& table_name,
270 std::unique_ptr<BlockBasedTable>* table) {
271 std::unique_ptr<RandomAccessFileReader> file;
272 NewFileReader(table_name, foptions, &file);
273
274 uint64_t file_size = 0;
275 ASSERT_OK(env_->GetFileSize(Path(table_name), &file_size));
276
277 std::unique_ptr<TableReader> table_reader;
278 ReadOptions ro;
279 const auto* table_options =
280 table_factory_.GetOptions<BlockBasedTableOptions>();
281 ASSERT_NE(table_options, nullptr);
282 ASSERT_OK(BlockBasedTable::Open(ro, ioptions, EnvOptions(), *table_options,
283 comparator, std::move(file), file_size,
284 &table_reader));
285
286 table->reset(reinterpret_cast<BlockBasedTable*>(table_reader.release()));
287 }
288
289 std::string ToInternalKey(const std::string& key) {
290 InternalKey internal_key(key, 0, ValueType::kTypeValue);
291 return internal_key.Encode().ToString();
292 }
293
294 void ReadFooter(RandomAccessFileReader* file, Footer* footer) {
295 uint64_t file_size = 0;
296 ASSERT_OK(env_->GetFileSize(file->file_name(), &file_size));
297 IOOptions opts;
298 ASSERT_OK(ReadFooterFromFile(opts, file, nullptr /* prefetch_buffer */,
299 file_size, footer,
300 kBlockBasedTableMagicNumber));
301 }
302
303 // NOTE: compression_type returns the compression type of the fetched block
304 // contents, so if the block is fetched and uncompressed, then it's
305 // kNoCompression.
306 void FetchBlock(RandomAccessFileReader* file, const BlockHandle& block,
307 BlockType block_type, bool compressed, bool do_uncompress,
308 MemoryAllocator* heap_buf_allocator,
309 MemoryAllocator* compressed_buf_allocator,
310 BlockContents* contents, MemcpyStats* stats,
311 CompressionType* compresstion_type) {
312 ImmutableCFOptions ioptions(options_);
313 ReadOptions roptions;
314 PersistentCacheOptions persistent_cache_options;
315 Footer footer;
316 ReadFooter(file, &footer);
317 std::unique_ptr<BlockFetcher> fetcher(new BlockFetcher(
318 file, nullptr /* prefetch_buffer */, footer, roptions, block, contents,
319 ioptions, do_uncompress, compressed, block_type,
320 UncompressionDict::GetEmptyDict(), persistent_cache_options,
321 heap_buf_allocator, compressed_buf_allocator));
322
323 ASSERT_OK(fetcher->ReadBlockContents());
324
325 stats->num_stack_buf_memcpy = fetcher->TEST_GetNumStackBufMemcpy();
326 stats->num_heap_buf_memcpy = fetcher->TEST_GetNumHeapBufMemcpy();
327 stats->num_compressed_buf_memcpy =
328 fetcher->TEST_GetNumCompressedBufMemcpy();
329
330 *compresstion_type = fetcher->get_compression_type();
331 }
332
333 // NOTE: expected_compression_type is the expected compression
334 // type of the fetched block content, if the block is uncompressed,
335 // then the expected compression type is kNoCompression.
336 void FetchFirstDataBlock(const std::string& table_name, bool compressed,
337 bool do_uncompress,
338 CompressionType expected_compression_type,
339 MemoryAllocator* heap_buf_allocator,
340 MemoryAllocator* compressed_buf_allocator,
341 BlockContents* block, std::string* result,
342 MemcpyStats* memcpy_stats) {
343 ImmutableCFOptions ioptions(options_);
344 InternalKeyComparator comparator(options_.comparator);
345 FileOptions foptions(options_);
346
347 // Get block handle for the first data block.
348 std::unique_ptr<BlockBasedTable> table;
349 NewTableReader(ioptions, foptions, comparator, table_name, &table);
350
351 std::unique_ptr<BlockBasedTable::IndexReader> index_reader;
352 ReadOptions ro;
353 ASSERT_OK(BinarySearchIndexReader::Create(
354 table.get(), ro, nullptr /* prefetch_buffer */, false /* use_cache */,
355 false /* prefetch */, false /* pin */, nullptr /* lookup_context */,
356 &index_reader));
357
358 std::unique_ptr<InternalIteratorBase<IndexValue>> iter(
359 index_reader->NewIterator(
360 ReadOptions(), false /* disable_prefix_seek */, nullptr /* iter */,
361 nullptr /* get_context */, nullptr /* lookup_context */));
362 ASSERT_OK(iter->status());
363 iter->SeekToFirst();
364 BlockHandle first_block_handle = iter->value().handle;
365
366 // Fetch first data block.
367 std::unique_ptr<RandomAccessFileReader> file;
368 NewFileReader(table_name, foptions, &file);
369 CompressionType compression_type;
370 FetchBlock(file.get(), first_block_handle, BlockType::kData, compressed,
371 do_uncompress, heap_buf_allocator, compressed_buf_allocator,
372 block, memcpy_stats, &compression_type);
373 ASSERT_EQ(compression_type, expected_compression_type);
374 result->assign(block->data.ToString());
375 }
376 };
377
378 // Skip the following tests in lite mode since direct I/O is unsupported.
379 #ifndef ROCKSDB_LITE
380
381 // Fetch index block under both direct IO and non-direct IO.
382 // Expects:
383 // the index block contents are the same for both read modes.
384 TEST_F(BlockFetcherTest, FetchIndexBlock) {
385 for (CompressionType compression : GetSupportedCompressions()) {
386 std::string table_name =
387 "FetchIndexBlock" + CompressionTypeToString(compression);
388 CreateTable(table_name, compression);
389
390 CountedMemoryAllocator allocator;
391 MemcpyStats memcpy_stats;
392 BlockContents indexes[NumModes];
393 std::string index_datas[NumModes];
394 for (int i = 0; i < NumModes; ++i) {
395 SetMode(static_cast<Mode>(i));
396 FetchIndexBlock(table_name, &allocator, &allocator, &memcpy_stats,
397 &indexes[i], &index_datas[i]);
398 }
399 for (int i = 0; i < NumModes - 1; ++i) {
400 AssertSameBlock(index_datas[i], index_datas[i + 1]);
401 }
402 }
403 }
404
405 // Data blocks are not compressed,
406 // fetch data block under direct IO, mmap IO,and non-direct IO.
407 // Expects:
408 // 1. in non-direct IO mode, allocate a heap buffer and memcpy the block
409 // into the buffer;
410 // 2. in direct IO mode, allocate a heap buffer and memcpy from the
411 // direct IO buffer to the heap buffer.
412 TEST_F(BlockFetcherTest, FetchUncompressedDataBlock) {
413 TestStats expected_non_mmap_stats = {
414 {
415 0 /* num_stack_buf_memcpy */,
416 1 /* num_heap_buf_memcpy */,
417 0 /* num_compressed_buf_memcpy */,
418 },
419 {
420 1 /* num_heap_buf_allocations */,
421 0 /* num_compressed_buf_allocations */,
422 }};
423 TestStats expected_mmap_stats = {{
424 0 /* num_stack_buf_memcpy */,
425 0 /* num_heap_buf_memcpy */,
426 0 /* num_compressed_buf_memcpy */,
427 },
428 {
429 0 /* num_heap_buf_allocations */,
430 0 /* num_compressed_buf_allocations */,
431 }};
432 std::array<TestStats, NumModes> expected_stats_by_mode{{
433 expected_non_mmap_stats /* kBufferedRead */,
434 expected_mmap_stats /* kBufferedMmap */,
435 expected_non_mmap_stats /* kDirectRead */,
436 }};
437 TestFetchDataBlock("FetchUncompressedDataBlock", false, false,
438 expected_stats_by_mode);
439 }
440
441 // Data blocks are compressed,
442 // fetch data block under both direct IO and non-direct IO,
443 // but do not uncompress.
444 // Expects:
445 // 1. in non-direct IO mode, allocate a compressed buffer and memcpy the block
446 // into the buffer;
447 // 2. in direct IO mode, allocate a compressed buffer and memcpy from the
448 // direct IO buffer to the compressed buffer.
449 TEST_F(BlockFetcherTest, FetchCompressedDataBlock) {
450 TestStats expected_non_mmap_stats = {
451 {
452 0 /* num_stack_buf_memcpy */,
453 0 /* num_heap_buf_memcpy */,
454 1 /* num_compressed_buf_memcpy */,
455 },
456 {
457 0 /* num_heap_buf_allocations */,
458 1 /* num_compressed_buf_allocations */,
459 }};
460 TestStats expected_mmap_stats = {{
461 0 /* num_stack_buf_memcpy */,
462 0 /* num_heap_buf_memcpy */,
463 0 /* num_compressed_buf_memcpy */,
464 },
465 {
466 0 /* num_heap_buf_allocations */,
467 0 /* num_compressed_buf_allocations */,
468 }};
469 std::array<TestStats, NumModes> expected_stats_by_mode{{
470 expected_non_mmap_stats /* kBufferedRead */,
471 expected_mmap_stats /* kBufferedMmap */,
472 expected_non_mmap_stats /* kDirectRead */,
473 }};
474 TestFetchDataBlock("FetchCompressedDataBlock", true, false,
475 expected_stats_by_mode);
476 }
477
478 // Data blocks are compressed,
479 // fetch and uncompress data block under both direct IO and non-direct IO.
480 // Expects:
481 // 1. in non-direct IO mode, since the block is small, so it's first memcpyed
482 // to the stack buffer, then a heap buffer is allocated and the block is
483 // uncompressed into the heap.
484 // 2. in direct IO mode mode, allocate a heap buffer, then directly uncompress
485 // and memcpy from the direct IO buffer to the heap buffer.
486 TEST_F(BlockFetcherTest, FetchAndUncompressCompressedDataBlock) {
487 TestStats expected_buffered_read_stats = {
488 {
489 1 /* num_stack_buf_memcpy */,
490 1 /* num_heap_buf_memcpy */,
491 0 /* num_compressed_buf_memcpy */,
492 },
493 {
494 1 /* num_heap_buf_allocations */,
495 0 /* num_compressed_buf_allocations */,
496 }};
497 TestStats expected_mmap_stats = {{
498 0 /* num_stack_buf_memcpy */,
499 1 /* num_heap_buf_memcpy */,
500 0 /* num_compressed_buf_memcpy */,
501 },
502 {
503 1 /* num_heap_buf_allocations */,
504 0 /* num_compressed_buf_allocations */,
505 }};
506 TestStats expected_direct_read_stats = {
507 {
508 0 /* num_stack_buf_memcpy */,
509 1 /* num_heap_buf_memcpy */,
510 0 /* num_compressed_buf_memcpy */,
511 },
512 {
513 1 /* num_heap_buf_allocations */,
514 0 /* num_compressed_buf_allocations */,
515 }};
516 std::array<TestStats, NumModes> expected_stats_by_mode{{
517 expected_buffered_read_stats,
518 expected_mmap_stats,
519 expected_direct_read_stats,
520 }};
521 TestFetchDataBlock("FetchAndUncompressCompressedDataBlock", true, true,
522 expected_stats_by_mode);
523 }
524
525 #endif // ROCKSDB_LITE
526
527 } // namespace
528 } // namespace ROCKSDB_NAMESPACE
529
530 int main(int argc, char** argv) {
531 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
532 ::testing::InitGoogleTest(&argc, argv);
533 return RUN_ALL_TESTS();
534 }