1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "db/blob/blob_file_reader.h"
11 #include "db/blob/blob_contents.h"
12 #include "db/blob/blob_log_format.h"
13 #include "db/blob/blob_log_writer.h"
14 #include "env/mock_env.h"
15 #include "file/filename.h"
16 #include "file/read_write_util.h"
17 #include "file/writable_file_writer.h"
18 #include "options/cf_options.h"
19 #include "rocksdb/env.h"
20 #include "rocksdb/file_system.h"
21 #include "rocksdb/options.h"
22 #include "test_util/sync_point.h"
23 #include "test_util/testharness.h"
24 #include "util/compression.h"
25 #include "utilities/fault_injection_env.h"
27 namespace ROCKSDB_NAMESPACE
{
31 // Creates a test blob file with `num` blobs in it.
32 void WriteBlobFile(const ImmutableOptions
& immutable_options
,
33 uint32_t column_family_id
, bool has_ttl
,
34 const ExpirationRange
& expiration_range_header
,
35 const ExpirationRange
& expiration_range_footer
,
36 uint64_t blob_file_number
, const std::vector
<Slice
>& keys
,
37 const std::vector
<Slice
>& blobs
, CompressionType compression
,
38 std::vector
<uint64_t>& blob_offsets
,
39 std::vector
<uint64_t>& blob_sizes
) {
40 assert(!immutable_options
.cf_paths
.empty());
41 size_t num
= keys
.size();
42 assert(num
== blobs
.size());
43 assert(num
== blob_offsets
.size());
44 assert(num
== blob_sizes
.size());
46 const std::string blob_file_path
=
47 BlobFileName(immutable_options
.cf_paths
.front().path
, blob_file_number
);
48 std::unique_ptr
<FSWritableFile
> file
;
49 ASSERT_OK(NewWritableFile(immutable_options
.fs
.get(), blob_file_path
, &file
,
52 std::unique_ptr
<WritableFileWriter
> file_writer(new WritableFileWriter(
53 std::move(file
), blob_file_path
, FileOptions(), immutable_options
.clock
));
55 constexpr Statistics
* statistics
= nullptr;
56 constexpr bool use_fsync
= false;
57 constexpr bool do_flush
= false;
59 BlobLogWriter
blob_log_writer(std::move(file_writer
), immutable_options
.clock
,
60 statistics
, blob_file_number
, use_fsync
,
63 BlobLogHeader
header(column_family_id
, compression
, has_ttl
,
64 expiration_range_header
);
66 ASSERT_OK(blob_log_writer
.WriteHeader(header
));
68 std::vector
<std::string
> compressed_blobs(num
);
69 std::vector
<Slice
> blobs_to_write(num
);
70 if (kNoCompression
== compression
) {
71 for (size_t i
= 0; i
< num
; ++i
) {
72 blobs_to_write
[i
] = blobs
[i
];
73 blob_sizes
[i
] = blobs
[i
].size();
76 CompressionOptions opts
;
77 CompressionContext
context(compression
);
78 constexpr uint64_t sample_for_compression
= 0;
79 CompressionInfo
info(opts
, context
, CompressionDict::GetEmptyDict(),
80 compression
, sample_for_compression
);
82 constexpr uint32_t compression_format_version
= 2;
84 for (size_t i
= 0; i
< num
; ++i
) {
85 ASSERT_TRUE(CompressData(blobs
[i
], info
, compression_format_version
,
86 &compressed_blobs
[i
]));
87 blobs_to_write
[i
] = compressed_blobs
[i
];
88 blob_sizes
[i
] = compressed_blobs
[i
].size();
92 for (size_t i
= 0; i
< num
; ++i
) {
93 uint64_t key_offset
= 0;
94 ASSERT_OK(blob_log_writer
.AddRecord(keys
[i
], blobs_to_write
[i
], &key_offset
,
99 footer
.blob_count
= num
;
100 footer
.expiration_range
= expiration_range_footer
;
102 std::string checksum_method
;
103 std::string checksum_value
;
105 blob_log_writer
.AppendFooter(footer
, &checksum_method
, &checksum_value
));
108 // Creates a test blob file with a single blob in it. Note: this method
109 // makes it possible to test various corner cases by allowing the caller
110 // to specify the contents of various blob file header/footer fields.
111 void WriteBlobFile(const ImmutableOptions
& immutable_options
,
112 uint32_t column_family_id
, bool has_ttl
,
113 const ExpirationRange
& expiration_range_header
,
114 const ExpirationRange
& expiration_range_footer
,
115 uint64_t blob_file_number
, const Slice
& key
,
116 const Slice
& blob
, CompressionType compression
,
117 uint64_t* blob_offset
, uint64_t* blob_size
) {
118 std::vector
<Slice
> keys
{key
};
119 std::vector
<Slice
> blobs
{blob
};
120 std::vector
<uint64_t> blob_offsets
{0};
121 std::vector
<uint64_t> blob_sizes
{0};
122 WriteBlobFile(immutable_options
, column_family_id
, has_ttl
,
123 expiration_range_header
, expiration_range_footer
,
124 blob_file_number
, keys
, blobs
, compression
, blob_offsets
,
127 *blob_offset
= blob_offsets
[0];
130 *blob_size
= blob_sizes
[0];
134 } // anonymous namespace
136 class BlobFileReaderTest
: public testing::Test
{
138 BlobFileReaderTest() { mock_env_
.reset(MockEnv::Create(Env::Default())); }
139 std::unique_ptr
<Env
> mock_env_
;
142 TEST_F(BlobFileReaderTest
, CreateReaderAndGetBlob
) {
144 options
.env
= mock_env_
.get();
145 options
.cf_paths
.emplace_back(
146 test::PerThreadDBPath(mock_env_
.get(),
147 "BlobFileReaderTest_CreateReaderAndGetBlob"),
149 options
.enable_blob_files
= true;
151 ImmutableOptions
immutable_options(options
);
153 constexpr uint32_t column_family_id
= 1;
154 constexpr bool has_ttl
= false;
155 constexpr ExpirationRange expiration_range
;
156 constexpr uint64_t blob_file_number
= 1;
157 constexpr size_t num_blobs
= 3;
158 const std::vector
<std::string
> key_strs
= {"key1", "key2", "key3"};
159 const std::vector
<std::string
> blob_strs
= {"blob1", "blob2", "blob3"};
161 const std::vector
<Slice
> keys
= {key_strs
[0], key_strs
[1], key_strs
[2]};
162 const std::vector
<Slice
> blobs
= {blob_strs
[0], blob_strs
[1], blob_strs
[2]};
164 std::vector
<uint64_t> blob_offsets(keys
.size());
165 std::vector
<uint64_t> blob_sizes(keys
.size());
167 WriteBlobFile(immutable_options
, column_family_id
, has_ttl
, expiration_range
,
168 expiration_range
, blob_file_number
, keys
, blobs
, kNoCompression
,
169 blob_offsets
, blob_sizes
);
171 constexpr HistogramImpl
* blob_file_read_hist
= nullptr;
173 std::unique_ptr
<BlobFileReader
> reader
;
175 ASSERT_OK(BlobFileReader::Create(
176 immutable_options
, FileOptions(), column_family_id
, blob_file_read_hist
,
177 blob_file_number
, nullptr /*IOTracer*/, &reader
));
179 // Make sure the blob can be retrieved with and without checksum verification
180 ReadOptions read_options
;
181 read_options
.verify_checksums
= false;
183 constexpr FilePrefetchBuffer
* prefetch_buffer
= nullptr;
184 constexpr MemoryAllocator
* allocator
= nullptr;
187 std::unique_ptr
<BlobContents
> value
;
188 uint64_t bytes_read
= 0;
190 ASSERT_OK(reader
->GetBlob(read_options
, keys
[0], blob_offsets
[0],
191 blob_sizes
[0], kNoCompression
, prefetch_buffer
,
192 allocator
, &value
, &bytes_read
));
193 ASSERT_NE(value
, nullptr);
194 ASSERT_EQ(value
->data(), blobs
[0]);
195 ASSERT_EQ(bytes_read
, blob_sizes
[0]);
199 size_t total_size
= 0;
201 std::array
<Status
, num_blobs
> statuses_buf
;
202 std::array
<BlobReadRequest
, num_blobs
> requests_buf
;
203 autovector
<std::pair
<BlobReadRequest
*, std::unique_ptr
<BlobContents
>>>
206 for (size_t i
= 0; i
< num_blobs
; ++i
) {
208 BlobReadRequest(keys
[i
], blob_offsets
[i
], blob_sizes
[i
],
209 kNoCompression
, nullptr, &statuses_buf
[i
]);
210 blob_reqs
.emplace_back(&requests_buf
[i
], std::unique_ptr
<BlobContents
>());
213 reader
->MultiGetBlob(read_options
, allocator
, blob_reqs
, &bytes_read
);
215 for (size_t i
= 0; i
< num_blobs
; ++i
) {
216 const auto& result
= blob_reqs
[i
].second
;
218 ASSERT_OK(statuses_buf
[i
]);
219 ASSERT_NE(result
, nullptr);
220 ASSERT_EQ(result
->data(), blobs
[i
]);
221 total_size
+= blob_sizes
[i
];
223 ASSERT_EQ(bytes_read
, total_size
);
226 read_options
.verify_checksums
= true;
229 std::unique_ptr
<BlobContents
> value
;
230 uint64_t bytes_read
= 0;
232 ASSERT_OK(reader
->GetBlob(read_options
, keys
[1], blob_offsets
[1],
233 blob_sizes
[1], kNoCompression
, prefetch_buffer
,
234 allocator
, &value
, &bytes_read
));
235 ASSERT_NE(value
, nullptr);
236 ASSERT_EQ(value
->data(), blobs
[1]);
238 const uint64_t key_size
= keys
[1].size();
239 ASSERT_EQ(bytes_read
,
240 BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size
) +
244 // Invalid offset (too close to start of file)
246 std::unique_ptr
<BlobContents
> value
;
247 uint64_t bytes_read
= 0;
250 ->GetBlob(read_options
, keys
[0], blob_offsets
[0] - 1,
251 blob_sizes
[0], kNoCompression
, prefetch_buffer
,
252 allocator
, &value
, &bytes_read
)
254 ASSERT_EQ(value
, nullptr);
255 ASSERT_EQ(bytes_read
, 0);
258 // Invalid offset (too close to end of file)
260 std::unique_ptr
<BlobContents
> value
;
261 uint64_t bytes_read
= 0;
264 ->GetBlob(read_options
, keys
[2], blob_offsets
[2] + 1,
265 blob_sizes
[2], kNoCompression
, prefetch_buffer
,
266 allocator
, &value
, &bytes_read
)
268 ASSERT_EQ(value
, nullptr);
269 ASSERT_EQ(bytes_read
, 0);
272 // Incorrect compression type
274 std::unique_ptr
<BlobContents
> value
;
275 uint64_t bytes_read
= 0;
278 ->GetBlob(read_options
, keys
[0], blob_offsets
[0],
279 blob_sizes
[0], kZSTD
, prefetch_buffer
, allocator
,
282 ASSERT_EQ(value
, nullptr);
283 ASSERT_EQ(bytes_read
, 0);
286 // Incorrect key size
288 constexpr char shorter_key
[] = "k";
289 std::unique_ptr
<BlobContents
> value
;
290 uint64_t bytes_read
= 0;
293 ->GetBlob(read_options
, shorter_key
,
295 (keys
[0].size() - sizeof(shorter_key
) + 1),
296 blob_sizes
[0], kNoCompression
, prefetch_buffer
,
297 allocator
, &value
, &bytes_read
)
299 ASSERT_EQ(value
, nullptr);
300 ASSERT_EQ(bytes_read
, 0);
303 autovector
<std::reference_wrapper
<const Slice
>> key_refs
;
304 for (const auto& key_ref
: keys
) {
305 key_refs
.emplace_back(std::cref(key_ref
));
307 Slice
shorter_key_slice(shorter_key
, sizeof(shorter_key
) - 1);
308 key_refs
[1] = std::cref(shorter_key_slice
);
310 autovector
<uint64_t> offsets
{
312 blob_offsets
[1] - (keys
[1].size() - key_refs
[1].get().size()),
315 std::array
<Status
, num_blobs
> statuses_buf
;
316 std::array
<BlobReadRequest
, num_blobs
> requests_buf
;
317 autovector
<std::pair
<BlobReadRequest
*, std::unique_ptr
<BlobContents
>>>
320 for (size_t i
= 0; i
< num_blobs
; ++i
) {
322 BlobReadRequest(key_refs
[i
], offsets
[i
], blob_sizes
[i
],
323 kNoCompression
, nullptr, &statuses_buf
[i
]);
324 blob_reqs
.emplace_back(&requests_buf
[i
], std::unique_ptr
<BlobContents
>());
327 reader
->MultiGetBlob(read_options
, allocator
, blob_reqs
, &bytes_read
);
329 for (size_t i
= 0; i
< num_blobs
; ++i
) {
331 ASSERT_TRUE(statuses_buf
[i
].IsCorruption());
333 ASSERT_OK(statuses_buf
[i
]);
340 constexpr char incorrect_key
[] = "foo1";
341 std::unique_ptr
<BlobContents
> value
;
342 uint64_t bytes_read
= 0;
345 ->GetBlob(read_options
, incorrect_key
, blob_offsets
[0],
346 blob_sizes
[0], kNoCompression
, prefetch_buffer
,
347 allocator
, &value
, &bytes_read
)
349 ASSERT_EQ(value
, nullptr);
350 ASSERT_EQ(bytes_read
, 0);
353 autovector
<std::reference_wrapper
<const Slice
>> key_refs
;
354 for (const auto& key_ref
: keys
) {
355 key_refs
.emplace_back(std::cref(key_ref
));
357 Slice
wrong_key_slice(incorrect_key
, sizeof(incorrect_key
) - 1);
358 key_refs
[2] = std::cref(wrong_key_slice
);
360 std::array
<Status
, num_blobs
> statuses_buf
;
361 std::array
<BlobReadRequest
, num_blobs
> requests_buf
;
362 autovector
<std::pair
<BlobReadRequest
*, std::unique_ptr
<BlobContents
>>>
365 for (size_t i
= 0; i
< num_blobs
; ++i
) {
367 BlobReadRequest(key_refs
[i
], blob_offsets
[i
], blob_sizes
[i
],
368 kNoCompression
, nullptr, &statuses_buf
[i
]);
369 blob_reqs
.emplace_back(&requests_buf
[i
], std::unique_ptr
<BlobContents
>());
372 reader
->MultiGetBlob(read_options
, allocator
, blob_reqs
, &bytes_read
);
374 for (size_t i
= 0; i
< num_blobs
; ++i
) {
375 if (i
== num_blobs
- 1) {
376 ASSERT_TRUE(statuses_buf
[i
].IsCorruption());
378 ASSERT_OK(statuses_buf
[i
]);
383 // Incorrect value size
385 std::unique_ptr
<BlobContents
> value
;
386 uint64_t bytes_read
= 0;
389 ->GetBlob(read_options
, keys
[1], blob_offsets
[1],
390 blob_sizes
[1] + 1, kNoCompression
,
391 prefetch_buffer
, allocator
, &value
, &bytes_read
)
393 ASSERT_EQ(value
, nullptr);
394 ASSERT_EQ(bytes_read
, 0);
397 autovector
<std::reference_wrapper
<const Slice
>> key_refs
;
398 for (const auto& key_ref
: keys
) {
399 key_refs
.emplace_back(std::cref(key_ref
));
402 std::array
<Status
, num_blobs
> statuses_buf
;
403 std::array
<BlobReadRequest
, num_blobs
> requests_buf
;
406 BlobReadRequest(key_refs
[0], blob_offsets
[0], blob_sizes
[0],
407 kNoCompression
, nullptr, &statuses_buf
[0]);
409 BlobReadRequest(key_refs
[1], blob_offsets
[1], blob_sizes
[1] + 1,
410 kNoCompression
, nullptr, &statuses_buf
[1]);
412 BlobReadRequest(key_refs
[2], blob_offsets
[2], blob_sizes
[2],
413 kNoCompression
, nullptr, &statuses_buf
[2]);
415 autovector
<std::pair
<BlobReadRequest
*, std::unique_ptr
<BlobContents
>>>
418 for (size_t i
= 0; i
< num_blobs
; ++i
) {
419 blob_reqs
.emplace_back(&requests_buf
[i
], std::unique_ptr
<BlobContents
>());
422 reader
->MultiGetBlob(read_options
, allocator
, blob_reqs
, &bytes_read
);
424 for (size_t i
= 0; i
< num_blobs
; ++i
) {
426 ASSERT_OK(statuses_buf
[i
]);
428 ASSERT_TRUE(statuses_buf
[i
].IsCorruption());
434 TEST_F(BlobFileReaderTest
, Malformed
) {
435 // Write a blob file consisting of nothing but a header, and make sure we
436 // detect the error when we open it for reading
439 options
.env
= mock_env_
.get();
440 options
.cf_paths
.emplace_back(
441 test::PerThreadDBPath(mock_env_
.get(), "BlobFileReaderTest_Malformed"),
443 options
.enable_blob_files
= true;
445 ImmutableOptions
immutable_options(options
);
447 constexpr uint32_t column_family_id
= 1;
448 constexpr uint64_t blob_file_number
= 1;
451 constexpr bool has_ttl
= false;
452 constexpr ExpirationRange expiration_range
;
454 const std::string blob_file_path
=
455 BlobFileName(immutable_options
.cf_paths
.front().path
, blob_file_number
);
457 std::unique_ptr
<FSWritableFile
> file
;
458 ASSERT_OK(NewWritableFile(immutable_options
.fs
.get(), blob_file_path
, &file
,
461 std::unique_ptr
<WritableFileWriter
> file_writer(
462 new WritableFileWriter(std::move(file
), blob_file_path
, FileOptions(),
463 immutable_options
.clock
));
465 constexpr Statistics
* statistics
= nullptr;
466 constexpr bool use_fsync
= false;
467 constexpr bool do_flush
= false;
469 BlobLogWriter
blob_log_writer(std::move(file_writer
),
470 immutable_options
.clock
, statistics
,
471 blob_file_number
, use_fsync
, do_flush
);
473 BlobLogHeader
header(column_family_id
, kNoCompression
, has_ttl
,
476 ASSERT_OK(blob_log_writer
.WriteHeader(header
));
479 constexpr HistogramImpl
* blob_file_read_hist
= nullptr;
481 std::unique_ptr
<BlobFileReader
> reader
;
483 ASSERT_TRUE(BlobFileReader::Create(immutable_options
, FileOptions(),
484 column_family_id
, blob_file_read_hist
,
485 blob_file_number
, nullptr /*IOTracer*/,
490 TEST_F(BlobFileReaderTest
, TTL
) {
492 options
.env
= mock_env_
.get();
493 options
.cf_paths
.emplace_back(
494 test::PerThreadDBPath(mock_env_
.get(), "BlobFileReaderTest_TTL"), 0);
495 options
.enable_blob_files
= true;
497 ImmutableOptions
immutable_options(options
);
499 constexpr uint32_t column_family_id
= 1;
500 constexpr bool has_ttl
= true;
501 constexpr ExpirationRange expiration_range
;
502 constexpr uint64_t blob_file_number
= 1;
503 constexpr char key
[] = "key";
504 constexpr char blob
[] = "blob";
506 uint64_t blob_offset
= 0;
507 uint64_t blob_size
= 0;
509 WriteBlobFile(immutable_options
, column_family_id
, has_ttl
, expiration_range
,
510 expiration_range
, blob_file_number
, key
, blob
, kNoCompression
,
511 &blob_offset
, &blob_size
);
513 constexpr HistogramImpl
* blob_file_read_hist
= nullptr;
515 std::unique_ptr
<BlobFileReader
> reader
;
517 ASSERT_TRUE(BlobFileReader::Create(immutable_options
, FileOptions(),
518 column_family_id
, blob_file_read_hist
,
519 blob_file_number
, nullptr /*IOTracer*/,
524 TEST_F(BlobFileReaderTest
, ExpirationRangeInHeader
) {
526 options
.env
= mock_env_
.get();
527 options
.cf_paths
.emplace_back(
528 test::PerThreadDBPath(mock_env_
.get(),
529 "BlobFileReaderTest_ExpirationRangeInHeader"),
531 options
.enable_blob_files
= true;
533 ImmutableOptions
immutable_options(options
);
535 constexpr uint32_t column_family_id
= 1;
536 constexpr bool has_ttl
= false;
537 const ExpirationRange
expiration_range_header(
538 1, 2); // can be made constexpr when we adopt C++14
539 constexpr ExpirationRange expiration_range_footer
;
540 constexpr uint64_t blob_file_number
= 1;
541 constexpr char key
[] = "key";
542 constexpr char blob
[] = "blob";
544 uint64_t blob_offset
= 0;
545 uint64_t blob_size
= 0;
547 WriteBlobFile(immutable_options
, column_family_id
, has_ttl
,
548 expiration_range_header
, expiration_range_footer
,
549 blob_file_number
, key
, blob
, kNoCompression
, &blob_offset
,
552 constexpr HistogramImpl
* blob_file_read_hist
= nullptr;
554 std::unique_ptr
<BlobFileReader
> reader
;
556 ASSERT_TRUE(BlobFileReader::Create(immutable_options
, FileOptions(),
557 column_family_id
, blob_file_read_hist
,
558 blob_file_number
, nullptr /*IOTracer*/,
563 TEST_F(BlobFileReaderTest
, ExpirationRangeInFooter
) {
565 options
.env
= mock_env_
.get();
566 options
.cf_paths
.emplace_back(
567 test::PerThreadDBPath(mock_env_
.get(),
568 "BlobFileReaderTest_ExpirationRangeInFooter"),
570 options
.enable_blob_files
= true;
572 ImmutableOptions
immutable_options(options
);
574 constexpr uint32_t column_family_id
= 1;
575 constexpr bool has_ttl
= false;
576 constexpr ExpirationRange expiration_range_header
;
577 const ExpirationRange
expiration_range_footer(
578 1, 2); // can be made constexpr when we adopt C++14
579 constexpr uint64_t blob_file_number
= 1;
580 constexpr char key
[] = "key";
581 constexpr char blob
[] = "blob";
583 uint64_t blob_offset
= 0;
584 uint64_t blob_size
= 0;
586 WriteBlobFile(immutable_options
, column_family_id
, has_ttl
,
587 expiration_range_header
, expiration_range_footer
,
588 blob_file_number
, key
, blob
, kNoCompression
, &blob_offset
,
591 constexpr HistogramImpl
* blob_file_read_hist
= nullptr;
593 std::unique_ptr
<BlobFileReader
> reader
;
595 ASSERT_TRUE(BlobFileReader::Create(immutable_options
, FileOptions(),
596 column_family_id
, blob_file_read_hist
,
597 blob_file_number
, nullptr /*IOTracer*/,
602 TEST_F(BlobFileReaderTest
, IncorrectColumnFamily
) {
604 options
.env
= mock_env_
.get();
605 options
.cf_paths
.emplace_back(
606 test::PerThreadDBPath(mock_env_
.get(),
607 "BlobFileReaderTest_IncorrectColumnFamily"),
609 options
.enable_blob_files
= true;
611 ImmutableOptions
immutable_options(options
);
613 constexpr uint32_t column_family_id
= 1;
614 constexpr bool has_ttl
= false;
615 constexpr ExpirationRange expiration_range
;
616 constexpr uint64_t blob_file_number
= 1;
617 constexpr char key
[] = "key";
618 constexpr char blob
[] = "blob";
620 uint64_t blob_offset
= 0;
621 uint64_t blob_size
= 0;
623 WriteBlobFile(immutable_options
, column_family_id
, has_ttl
, expiration_range
,
624 expiration_range
, blob_file_number
, key
, blob
, kNoCompression
,
625 &blob_offset
, &blob_size
);
627 constexpr HistogramImpl
* blob_file_read_hist
= nullptr;
629 std::unique_ptr
<BlobFileReader
> reader
;
631 constexpr uint32_t incorrect_column_family_id
= 2;
633 ASSERT_TRUE(BlobFileReader::Create(immutable_options
, FileOptions(),
634 incorrect_column_family_id
,
635 blob_file_read_hist
, blob_file_number
,
636 nullptr /*IOTracer*/, &reader
)
640 TEST_F(BlobFileReaderTest
, BlobCRCError
) {
642 options
.env
= mock_env_
.get();
643 options
.cf_paths
.emplace_back(
644 test::PerThreadDBPath(mock_env_
.get(), "BlobFileReaderTest_BlobCRCError"),
646 options
.enable_blob_files
= true;
648 ImmutableOptions
immutable_options(options
);
650 constexpr uint32_t column_family_id
= 1;
651 constexpr bool has_ttl
= false;
652 constexpr ExpirationRange expiration_range
;
653 constexpr uint64_t blob_file_number
= 1;
654 constexpr char key
[] = "key";
655 constexpr char blob
[] = "blob";
657 uint64_t blob_offset
= 0;
658 uint64_t blob_size
= 0;
660 WriteBlobFile(immutable_options
, column_family_id
, has_ttl
, expiration_range
,
661 expiration_range
, blob_file_number
, key
, blob
, kNoCompression
,
662 &blob_offset
, &blob_size
);
664 constexpr HistogramImpl
* blob_file_read_hist
= nullptr;
666 std::unique_ptr
<BlobFileReader
> reader
;
668 ASSERT_OK(BlobFileReader::Create(
669 immutable_options
, FileOptions(), column_family_id
, blob_file_read_hist
,
670 blob_file_number
, nullptr /*IOTracer*/, &reader
));
672 SyncPoint::GetInstance()->SetCallBack(
673 "BlobFileReader::VerifyBlob:CheckBlobCRC", [](void* arg
) {
674 BlobLogRecord
* const record
= static_cast<BlobLogRecord
*>(arg
);
677 record
->blob_crc
= 0xfaceb00c;
680 SyncPoint::GetInstance()->EnableProcessing();
682 constexpr FilePrefetchBuffer
* prefetch_buffer
= nullptr;
683 constexpr MemoryAllocator
* allocator
= nullptr;
685 std::unique_ptr
<BlobContents
> value
;
686 uint64_t bytes_read
= 0;
689 ->GetBlob(ReadOptions(), key
, blob_offset
, blob_size
,
690 kNoCompression
, prefetch_buffer
, allocator
, &value
,
693 ASSERT_EQ(value
, nullptr);
694 ASSERT_EQ(bytes_read
, 0);
696 SyncPoint::GetInstance()->DisableProcessing();
697 SyncPoint::GetInstance()->ClearAllCallBacks();
700 TEST_F(BlobFileReaderTest
, Compression
) {
701 if (!Snappy_Supported()) {
706 options
.env
= mock_env_
.get();
707 options
.cf_paths
.emplace_back(
708 test::PerThreadDBPath(mock_env_
.get(), "BlobFileReaderTest_Compression"),
710 options
.enable_blob_files
= true;
712 ImmutableOptions
immutable_options(options
);
714 constexpr uint32_t column_family_id
= 1;
715 constexpr bool has_ttl
= false;
716 constexpr ExpirationRange expiration_range
;
717 constexpr uint64_t blob_file_number
= 1;
718 constexpr char key
[] = "key";
719 constexpr char blob
[] = "blob";
721 uint64_t blob_offset
= 0;
722 uint64_t blob_size
= 0;
724 WriteBlobFile(immutable_options
, column_family_id
, has_ttl
, expiration_range
,
725 expiration_range
, blob_file_number
, key
, blob
,
726 kSnappyCompression
, &blob_offset
, &blob_size
);
728 constexpr HistogramImpl
* blob_file_read_hist
= nullptr;
730 std::unique_ptr
<BlobFileReader
> reader
;
732 ASSERT_OK(BlobFileReader::Create(
733 immutable_options
, FileOptions(), column_family_id
, blob_file_read_hist
,
734 blob_file_number
, nullptr /*IOTracer*/, &reader
));
736 // Make sure the blob can be retrieved with and without checksum verification
737 ReadOptions read_options
;
738 read_options
.verify_checksums
= false;
740 constexpr FilePrefetchBuffer
* prefetch_buffer
= nullptr;
741 constexpr MemoryAllocator
* allocator
= nullptr;
744 std::unique_ptr
<BlobContents
> value
;
745 uint64_t bytes_read
= 0;
747 ASSERT_OK(reader
->GetBlob(read_options
, key
, blob_offset
, blob_size
,
748 kSnappyCompression
, prefetch_buffer
, allocator
,
749 &value
, &bytes_read
));
750 ASSERT_NE(value
, nullptr);
751 ASSERT_EQ(value
->data(), blob
);
752 ASSERT_EQ(bytes_read
, blob_size
);
755 read_options
.verify_checksums
= true;
758 std::unique_ptr
<BlobContents
> value
;
759 uint64_t bytes_read
= 0;
761 ASSERT_OK(reader
->GetBlob(read_options
, key
, blob_offset
, blob_size
,
762 kSnappyCompression
, prefetch_buffer
, allocator
,
763 &value
, &bytes_read
));
764 ASSERT_NE(value
, nullptr);
765 ASSERT_EQ(value
->data(), blob
);
767 constexpr uint64_t key_size
= sizeof(key
) - 1;
768 ASSERT_EQ(bytes_read
,
769 BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size
) +
774 TEST_F(BlobFileReaderTest
, UncompressionError
) {
775 if (!Snappy_Supported()) {
780 options
.env
= mock_env_
.get();
781 options
.cf_paths
.emplace_back(
782 test::PerThreadDBPath(mock_env_
.get(),
783 "BlobFileReaderTest_UncompressionError"),
785 options
.enable_blob_files
= true;
787 ImmutableOptions
immutable_options(options
);
789 constexpr uint32_t column_family_id
= 1;
790 constexpr bool has_ttl
= false;
791 constexpr ExpirationRange expiration_range
;
792 constexpr uint64_t blob_file_number
= 1;
793 constexpr char key
[] = "key";
794 constexpr char blob
[] = "blob";
796 uint64_t blob_offset
= 0;
797 uint64_t blob_size
= 0;
799 WriteBlobFile(immutable_options
, column_family_id
, has_ttl
, expiration_range
,
800 expiration_range
, blob_file_number
, key
, blob
,
801 kSnappyCompression
, &blob_offset
, &blob_size
);
803 constexpr HistogramImpl
* blob_file_read_hist
= nullptr;
805 std::unique_ptr
<BlobFileReader
> reader
;
807 ASSERT_OK(BlobFileReader::Create(
808 immutable_options
, FileOptions(), column_family_id
, blob_file_read_hist
,
809 blob_file_number
, nullptr /*IOTracer*/, &reader
));
811 SyncPoint::GetInstance()->SetCallBack(
812 "BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", [](void* arg
) {
813 CacheAllocationPtr
* const output
=
814 static_cast<CacheAllocationPtr
*>(arg
);
820 SyncPoint::GetInstance()->EnableProcessing();
822 constexpr FilePrefetchBuffer
* prefetch_buffer
= nullptr;
823 constexpr MemoryAllocator
* allocator
= nullptr;
825 std::unique_ptr
<BlobContents
> value
;
826 uint64_t bytes_read
= 0;
829 ->GetBlob(ReadOptions(), key
, blob_offset
, blob_size
,
830 kSnappyCompression
, prefetch_buffer
, allocator
,
833 ASSERT_EQ(value
, nullptr);
834 ASSERT_EQ(bytes_read
, 0);
836 SyncPoint::GetInstance()->DisableProcessing();
837 SyncPoint::GetInstance()->ClearAllCallBacks();
840 class BlobFileReaderIOErrorTest
841 : public testing::Test
,
842 public testing::WithParamInterface
<std::string
> {
844 BlobFileReaderIOErrorTest() : sync_point_(GetParam()) {
845 mock_env_
.reset(MockEnv::Create(Env::Default()));
846 fault_injection_env_
.reset(new FaultInjectionTestEnv(mock_env_
.get()));
849 std::unique_ptr
<Env
> mock_env_
;
850 std::unique_ptr
<FaultInjectionTestEnv
> fault_injection_env_
;
851 std::string sync_point_
;
854 INSTANTIATE_TEST_CASE_P(BlobFileReaderTest
, BlobFileReaderIOErrorTest
,
855 ::testing::ValuesIn(std::vector
<std::string
>{
856 "BlobFileReader::OpenFile:GetFileSize",
857 "BlobFileReader::OpenFile:NewRandomAccessFile",
858 "BlobFileReader::ReadHeader:ReadFromFile",
859 "BlobFileReader::ReadFooter:ReadFromFile",
860 "BlobFileReader::GetBlob:ReadFromFile"}));
862 TEST_P(BlobFileReaderIOErrorTest
, IOError
) {
863 // Simulates an I/O error during the specified step
866 options
.env
= fault_injection_env_
.get();
867 options
.cf_paths
.emplace_back(
868 test::PerThreadDBPath(fault_injection_env_
.get(),
869 "BlobFileReaderIOErrorTest_IOError"),
871 options
.enable_blob_files
= true;
873 ImmutableOptions
immutable_options(options
);
875 constexpr uint32_t column_family_id
= 1;
876 constexpr bool has_ttl
= false;
877 constexpr ExpirationRange expiration_range
;
878 constexpr uint64_t blob_file_number
= 1;
879 constexpr char key
[] = "key";
880 constexpr char blob
[] = "blob";
882 uint64_t blob_offset
= 0;
883 uint64_t blob_size
= 0;
885 WriteBlobFile(immutable_options
, column_family_id
, has_ttl
, expiration_range
,
886 expiration_range
, blob_file_number
, key
, blob
, kNoCompression
,
887 &blob_offset
, &blob_size
);
889 SyncPoint::GetInstance()->SetCallBack(sync_point_
, [this](void* /* arg */) {
890 fault_injection_env_
->SetFilesystemActive(false,
891 Status::IOError(sync_point_
));
893 SyncPoint::GetInstance()->EnableProcessing();
895 constexpr HistogramImpl
* blob_file_read_hist
= nullptr;
897 std::unique_ptr
<BlobFileReader
> reader
;
899 const Status s
= BlobFileReader::Create(
900 immutable_options
, FileOptions(), column_family_id
, blob_file_read_hist
,
901 blob_file_number
, nullptr /*IOTracer*/, &reader
);
903 const bool fail_during_create
=
904 (sync_point_
!= "BlobFileReader::GetBlob:ReadFromFile");
906 if (fail_during_create
) {
907 ASSERT_TRUE(s
.IsIOError());
911 constexpr FilePrefetchBuffer
* prefetch_buffer
= nullptr;
912 constexpr MemoryAllocator
* allocator
= nullptr;
914 std::unique_ptr
<BlobContents
> value
;
915 uint64_t bytes_read
= 0;
918 ->GetBlob(ReadOptions(), key
, blob_offset
, blob_size
,
919 kNoCompression
, prefetch_buffer
, allocator
,
922 ASSERT_EQ(value
, nullptr);
923 ASSERT_EQ(bytes_read
, 0);
926 SyncPoint::GetInstance()->DisableProcessing();
927 SyncPoint::GetInstance()->ClearAllCallBacks();
930 class BlobFileReaderDecodingErrorTest
931 : public testing::Test
,
932 public testing::WithParamInterface
<std::string
> {
934 BlobFileReaderDecodingErrorTest() : sync_point_(GetParam()) {
935 mock_env_
.reset(MockEnv::Create(Env::Default()));
938 std::unique_ptr
<Env
> mock_env_
;
939 std::string sync_point_
;
942 INSTANTIATE_TEST_CASE_P(BlobFileReaderTest
, BlobFileReaderDecodingErrorTest
,
943 ::testing::ValuesIn(std::vector
<std::string
>{
944 "BlobFileReader::ReadHeader:TamperWithResult",
945 "BlobFileReader::ReadFooter:TamperWithResult",
946 "BlobFileReader::GetBlob:TamperWithResult"}));
948 TEST_P(BlobFileReaderDecodingErrorTest
, DecodingError
) {
950 options
.env
= mock_env_
.get();
951 options
.cf_paths
.emplace_back(
952 test::PerThreadDBPath(mock_env_
.get(),
953 "BlobFileReaderDecodingErrorTest_DecodingError"),
955 options
.enable_blob_files
= true;
957 ImmutableOptions
immutable_options(options
);
959 constexpr uint32_t column_family_id
= 1;
960 constexpr bool has_ttl
= false;
961 constexpr ExpirationRange expiration_range
;
962 constexpr uint64_t blob_file_number
= 1;
963 constexpr char key
[] = "key";
964 constexpr char blob
[] = "blob";
966 uint64_t blob_offset
= 0;
967 uint64_t blob_size
= 0;
969 WriteBlobFile(immutable_options
, column_family_id
, has_ttl
, expiration_range
,
970 expiration_range
, blob_file_number
, key
, blob
, kNoCompression
,
971 &blob_offset
, &blob_size
);
973 SyncPoint::GetInstance()->SetCallBack(sync_point_
, [](void* arg
) {
974 Slice
* const slice
= static_cast<Slice
*>(arg
);
976 assert(!slice
->empty());
978 slice
->remove_prefix(1);
981 SyncPoint::GetInstance()->EnableProcessing();
983 constexpr HistogramImpl
* blob_file_read_hist
= nullptr;
985 std::unique_ptr
<BlobFileReader
> reader
;
987 const Status s
= BlobFileReader::Create(
988 immutable_options
, FileOptions(), column_family_id
, blob_file_read_hist
,
989 blob_file_number
, nullptr /*IOTracer*/, &reader
);
991 const bool fail_during_create
=
992 sync_point_
!= "BlobFileReader::GetBlob:TamperWithResult";
994 if (fail_during_create
) {
995 ASSERT_TRUE(s
.IsCorruption());
999 constexpr FilePrefetchBuffer
* prefetch_buffer
= nullptr;
1000 constexpr MemoryAllocator
* allocator
= nullptr;
1002 std::unique_ptr
<BlobContents
> value
;
1003 uint64_t bytes_read
= 0;
1006 ->GetBlob(ReadOptions(), key
, blob_offset
, blob_size
,
1007 kNoCompression
, prefetch_buffer
, allocator
,
1008 &value
, &bytes_read
)
1010 ASSERT_EQ(value
, nullptr);
1011 ASSERT_EQ(bytes_read
, 0);
1014 SyncPoint::GetInstance()->DisableProcessing();
1015 SyncPoint::GetInstance()->ClearAllCallBacks();
1018 } // namespace ROCKSDB_NAMESPACE
1020 int main(int argc
, char** argv
) {
1021 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1022 ::testing::InitGoogleTest(&argc
, argv
);
1023 return RUN_ALL_TESTS();