]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/blob/blob_file_reader_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / blob / blob_file_reader_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "db/blob/blob_file_reader.h"
7
8 #include <cassert>
9 #include <string>
10
11 #include "db/blob/blob_contents.h"
12 #include "db/blob/blob_log_format.h"
13 #include "db/blob/blob_log_writer.h"
14 #include "env/mock_env.h"
15 #include "file/filename.h"
16 #include "file/read_write_util.h"
17 #include "file/writable_file_writer.h"
18 #include "options/cf_options.h"
19 #include "rocksdb/env.h"
20 #include "rocksdb/file_system.h"
21 #include "rocksdb/options.h"
22 #include "test_util/sync_point.h"
23 #include "test_util/testharness.h"
24 #include "util/compression.h"
25 #include "utilities/fault_injection_env.h"
26
27 namespace ROCKSDB_NAMESPACE {
28
29 namespace {
30
31 // Creates a test blob file with `num` blobs in it.
32 void WriteBlobFile(const ImmutableOptions& immutable_options,
33 uint32_t column_family_id, bool has_ttl,
34 const ExpirationRange& expiration_range_header,
35 const ExpirationRange& expiration_range_footer,
36 uint64_t blob_file_number, const std::vector<Slice>& keys,
37 const std::vector<Slice>& blobs, CompressionType compression,
38 std::vector<uint64_t>& blob_offsets,
39 std::vector<uint64_t>& blob_sizes) {
40 assert(!immutable_options.cf_paths.empty());
41 size_t num = keys.size();
42 assert(num == blobs.size());
43 assert(num == blob_offsets.size());
44 assert(num == blob_sizes.size());
45
46 const std::string blob_file_path =
47 BlobFileName(immutable_options.cf_paths.front().path, blob_file_number);
48 std::unique_ptr<FSWritableFile> file;
49 ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file,
50 FileOptions()));
51
52 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
53 std::move(file), blob_file_path, FileOptions(), immutable_options.clock));
54
55 constexpr Statistics* statistics = nullptr;
56 constexpr bool use_fsync = false;
57 constexpr bool do_flush = false;
58
59 BlobLogWriter blob_log_writer(std::move(file_writer), immutable_options.clock,
60 statistics, blob_file_number, use_fsync,
61 do_flush);
62
63 BlobLogHeader header(column_family_id, compression, has_ttl,
64 expiration_range_header);
65
66 ASSERT_OK(blob_log_writer.WriteHeader(header));
67
68 std::vector<std::string> compressed_blobs(num);
69 std::vector<Slice> blobs_to_write(num);
70 if (kNoCompression == compression) {
71 for (size_t i = 0; i < num; ++i) {
72 blobs_to_write[i] = blobs[i];
73 blob_sizes[i] = blobs[i].size();
74 }
75 } else {
76 CompressionOptions opts;
77 CompressionContext context(compression);
78 constexpr uint64_t sample_for_compression = 0;
79 CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
80 compression, sample_for_compression);
81
82 constexpr uint32_t compression_format_version = 2;
83
84 for (size_t i = 0; i < num; ++i) {
85 ASSERT_TRUE(CompressData(blobs[i], info, compression_format_version,
86 &compressed_blobs[i]));
87 blobs_to_write[i] = compressed_blobs[i];
88 blob_sizes[i] = compressed_blobs[i].size();
89 }
90 }
91
92 for (size_t i = 0; i < num; ++i) {
93 uint64_t key_offset = 0;
94 ASSERT_OK(blob_log_writer.AddRecord(keys[i], blobs_to_write[i], &key_offset,
95 &blob_offsets[i]));
96 }
97
98 BlobLogFooter footer;
99 footer.blob_count = num;
100 footer.expiration_range = expiration_range_footer;
101
102 std::string checksum_method;
103 std::string checksum_value;
104 ASSERT_OK(
105 blob_log_writer.AppendFooter(footer, &checksum_method, &checksum_value));
106 }
107
108 // Creates a test blob file with a single blob in it. Note: this method
109 // makes it possible to test various corner cases by allowing the caller
110 // to specify the contents of various blob file header/footer fields.
111 void WriteBlobFile(const ImmutableOptions& immutable_options,
112 uint32_t column_family_id, bool has_ttl,
113 const ExpirationRange& expiration_range_header,
114 const ExpirationRange& expiration_range_footer,
115 uint64_t blob_file_number, const Slice& key,
116 const Slice& blob, CompressionType compression,
117 uint64_t* blob_offset, uint64_t* blob_size) {
118 std::vector<Slice> keys{key};
119 std::vector<Slice> blobs{blob};
120 std::vector<uint64_t> blob_offsets{0};
121 std::vector<uint64_t> blob_sizes{0};
122 WriteBlobFile(immutable_options, column_family_id, has_ttl,
123 expiration_range_header, expiration_range_footer,
124 blob_file_number, keys, blobs, compression, blob_offsets,
125 blob_sizes);
126 if (blob_offset) {
127 *blob_offset = blob_offsets[0];
128 }
129 if (blob_size) {
130 *blob_size = blob_sizes[0];
131 }
132 }
133
134 } // anonymous namespace
135
136 class BlobFileReaderTest : public testing::Test {
137 protected:
138 BlobFileReaderTest() { mock_env_.reset(MockEnv::Create(Env::Default())); }
139 std::unique_ptr<Env> mock_env_;
140 };
141
142 TEST_F(BlobFileReaderTest, CreateReaderAndGetBlob) {
143 Options options;
144 options.env = mock_env_.get();
145 options.cf_paths.emplace_back(
146 test::PerThreadDBPath(mock_env_.get(),
147 "BlobFileReaderTest_CreateReaderAndGetBlob"),
148 0);
149 options.enable_blob_files = true;
150
151 ImmutableOptions immutable_options(options);
152
153 constexpr uint32_t column_family_id = 1;
154 constexpr bool has_ttl = false;
155 constexpr ExpirationRange expiration_range;
156 constexpr uint64_t blob_file_number = 1;
157 constexpr size_t num_blobs = 3;
158 const std::vector<std::string> key_strs = {"key1", "key2", "key3"};
159 const std::vector<std::string> blob_strs = {"blob1", "blob2", "blob3"};
160
161 const std::vector<Slice> keys = {key_strs[0], key_strs[1], key_strs[2]};
162 const std::vector<Slice> blobs = {blob_strs[0], blob_strs[1], blob_strs[2]};
163
164 std::vector<uint64_t> blob_offsets(keys.size());
165 std::vector<uint64_t> blob_sizes(keys.size());
166
167 WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
168 expiration_range, blob_file_number, keys, blobs, kNoCompression,
169 blob_offsets, blob_sizes);
170
171 constexpr HistogramImpl* blob_file_read_hist = nullptr;
172
173 std::unique_ptr<BlobFileReader> reader;
174
175 ASSERT_OK(BlobFileReader::Create(
176 immutable_options, FileOptions(), column_family_id, blob_file_read_hist,
177 blob_file_number, nullptr /*IOTracer*/, &reader));
178
179 // Make sure the blob can be retrieved with and without checksum verification
180 ReadOptions read_options;
181 read_options.verify_checksums = false;
182
183 constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
184 constexpr MemoryAllocator* allocator = nullptr;
185
186 {
187 std::unique_ptr<BlobContents> value;
188 uint64_t bytes_read = 0;
189
190 ASSERT_OK(reader->GetBlob(read_options, keys[0], blob_offsets[0],
191 blob_sizes[0], kNoCompression, prefetch_buffer,
192 allocator, &value, &bytes_read));
193 ASSERT_NE(value, nullptr);
194 ASSERT_EQ(value->data(), blobs[0]);
195 ASSERT_EQ(bytes_read, blob_sizes[0]);
196
197 // MultiGetBlob
198 bytes_read = 0;
199 size_t total_size = 0;
200
201 std::array<Status, num_blobs> statuses_buf;
202 std::array<BlobReadRequest, num_blobs> requests_buf;
203 autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
204 blob_reqs;
205
206 for (size_t i = 0; i < num_blobs; ++i) {
207 requests_buf[i] =
208 BlobReadRequest(keys[i], blob_offsets[i], blob_sizes[i],
209 kNoCompression, nullptr, &statuses_buf[i]);
210 blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
211 }
212
213 reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
214
215 for (size_t i = 0; i < num_blobs; ++i) {
216 const auto& result = blob_reqs[i].second;
217
218 ASSERT_OK(statuses_buf[i]);
219 ASSERT_NE(result, nullptr);
220 ASSERT_EQ(result->data(), blobs[i]);
221 total_size += blob_sizes[i];
222 }
223 ASSERT_EQ(bytes_read, total_size);
224 }
225
226 read_options.verify_checksums = true;
227
228 {
229 std::unique_ptr<BlobContents> value;
230 uint64_t bytes_read = 0;
231
232 ASSERT_OK(reader->GetBlob(read_options, keys[1], blob_offsets[1],
233 blob_sizes[1], kNoCompression, prefetch_buffer,
234 allocator, &value, &bytes_read));
235 ASSERT_NE(value, nullptr);
236 ASSERT_EQ(value->data(), blobs[1]);
237
238 const uint64_t key_size = keys[1].size();
239 ASSERT_EQ(bytes_read,
240 BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) +
241 blob_sizes[1]);
242 }
243
244 // Invalid offset (too close to start of file)
245 {
246 std::unique_ptr<BlobContents> value;
247 uint64_t bytes_read = 0;
248
249 ASSERT_TRUE(reader
250 ->GetBlob(read_options, keys[0], blob_offsets[0] - 1,
251 blob_sizes[0], kNoCompression, prefetch_buffer,
252 allocator, &value, &bytes_read)
253 .IsCorruption());
254 ASSERT_EQ(value, nullptr);
255 ASSERT_EQ(bytes_read, 0);
256 }
257
258 // Invalid offset (too close to end of file)
259 {
260 std::unique_ptr<BlobContents> value;
261 uint64_t bytes_read = 0;
262
263 ASSERT_TRUE(reader
264 ->GetBlob(read_options, keys[2], blob_offsets[2] + 1,
265 blob_sizes[2], kNoCompression, prefetch_buffer,
266 allocator, &value, &bytes_read)
267 .IsCorruption());
268 ASSERT_EQ(value, nullptr);
269 ASSERT_EQ(bytes_read, 0);
270 }
271
272 // Incorrect compression type
273 {
274 std::unique_ptr<BlobContents> value;
275 uint64_t bytes_read = 0;
276
277 ASSERT_TRUE(reader
278 ->GetBlob(read_options, keys[0], blob_offsets[0],
279 blob_sizes[0], kZSTD, prefetch_buffer, allocator,
280 &value, &bytes_read)
281 .IsCorruption());
282 ASSERT_EQ(value, nullptr);
283 ASSERT_EQ(bytes_read, 0);
284 }
285
286 // Incorrect key size
287 {
288 constexpr char shorter_key[] = "k";
289 std::unique_ptr<BlobContents> value;
290 uint64_t bytes_read = 0;
291
292 ASSERT_TRUE(reader
293 ->GetBlob(read_options, shorter_key,
294 blob_offsets[0] -
295 (keys[0].size() - sizeof(shorter_key) + 1),
296 blob_sizes[0], kNoCompression, prefetch_buffer,
297 allocator, &value, &bytes_read)
298 .IsCorruption());
299 ASSERT_EQ(value, nullptr);
300 ASSERT_EQ(bytes_read, 0);
301
302 // MultiGetBlob
303 autovector<std::reference_wrapper<const Slice>> key_refs;
304 for (const auto& key_ref : keys) {
305 key_refs.emplace_back(std::cref(key_ref));
306 }
307 Slice shorter_key_slice(shorter_key, sizeof(shorter_key) - 1);
308 key_refs[1] = std::cref(shorter_key_slice);
309
310 autovector<uint64_t> offsets{
311 blob_offsets[0],
312 blob_offsets[1] - (keys[1].size() - key_refs[1].get().size()),
313 blob_offsets[2]};
314
315 std::array<Status, num_blobs> statuses_buf;
316 std::array<BlobReadRequest, num_blobs> requests_buf;
317 autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
318 blob_reqs;
319
320 for (size_t i = 0; i < num_blobs; ++i) {
321 requests_buf[i] =
322 BlobReadRequest(key_refs[i], offsets[i], blob_sizes[i],
323 kNoCompression, nullptr, &statuses_buf[i]);
324 blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
325 }
326
327 reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
328
329 for (size_t i = 0; i < num_blobs; ++i) {
330 if (i == 1) {
331 ASSERT_TRUE(statuses_buf[i].IsCorruption());
332 } else {
333 ASSERT_OK(statuses_buf[i]);
334 }
335 }
336 }
337
338 // Incorrect key
339 {
340 constexpr char incorrect_key[] = "foo1";
341 std::unique_ptr<BlobContents> value;
342 uint64_t bytes_read = 0;
343
344 ASSERT_TRUE(reader
345 ->GetBlob(read_options, incorrect_key, blob_offsets[0],
346 blob_sizes[0], kNoCompression, prefetch_buffer,
347 allocator, &value, &bytes_read)
348 .IsCorruption());
349 ASSERT_EQ(value, nullptr);
350 ASSERT_EQ(bytes_read, 0);
351
352 // MultiGetBlob
353 autovector<std::reference_wrapper<const Slice>> key_refs;
354 for (const auto& key_ref : keys) {
355 key_refs.emplace_back(std::cref(key_ref));
356 }
357 Slice wrong_key_slice(incorrect_key, sizeof(incorrect_key) - 1);
358 key_refs[2] = std::cref(wrong_key_slice);
359
360 std::array<Status, num_blobs> statuses_buf;
361 std::array<BlobReadRequest, num_blobs> requests_buf;
362 autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
363 blob_reqs;
364
365 for (size_t i = 0; i < num_blobs; ++i) {
366 requests_buf[i] =
367 BlobReadRequest(key_refs[i], blob_offsets[i], blob_sizes[i],
368 kNoCompression, nullptr, &statuses_buf[i]);
369 blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
370 }
371
372 reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
373
374 for (size_t i = 0; i < num_blobs; ++i) {
375 if (i == num_blobs - 1) {
376 ASSERT_TRUE(statuses_buf[i].IsCorruption());
377 } else {
378 ASSERT_OK(statuses_buf[i]);
379 }
380 }
381 }
382
383 // Incorrect value size
384 {
385 std::unique_ptr<BlobContents> value;
386 uint64_t bytes_read = 0;
387
388 ASSERT_TRUE(reader
389 ->GetBlob(read_options, keys[1], blob_offsets[1],
390 blob_sizes[1] + 1, kNoCompression,
391 prefetch_buffer, allocator, &value, &bytes_read)
392 .IsCorruption());
393 ASSERT_EQ(value, nullptr);
394 ASSERT_EQ(bytes_read, 0);
395
396 // MultiGetBlob
397 autovector<std::reference_wrapper<const Slice>> key_refs;
398 for (const auto& key_ref : keys) {
399 key_refs.emplace_back(std::cref(key_ref));
400 }
401
402 std::array<Status, num_blobs> statuses_buf;
403 std::array<BlobReadRequest, num_blobs> requests_buf;
404
405 requests_buf[0] =
406 BlobReadRequest(key_refs[0], blob_offsets[0], blob_sizes[0],
407 kNoCompression, nullptr, &statuses_buf[0]);
408 requests_buf[1] =
409 BlobReadRequest(key_refs[1], blob_offsets[1], blob_sizes[1] + 1,
410 kNoCompression, nullptr, &statuses_buf[1]);
411 requests_buf[2] =
412 BlobReadRequest(key_refs[2], blob_offsets[2], blob_sizes[2],
413 kNoCompression, nullptr, &statuses_buf[2]);
414
415 autovector<std::pair<BlobReadRequest*, std::unique_ptr<BlobContents>>>
416 blob_reqs;
417
418 for (size_t i = 0; i < num_blobs; ++i) {
419 blob_reqs.emplace_back(&requests_buf[i], std::unique_ptr<BlobContents>());
420 }
421
422 reader->MultiGetBlob(read_options, allocator, blob_reqs, &bytes_read);
423
424 for (size_t i = 0; i < num_blobs; ++i) {
425 if (i != 1) {
426 ASSERT_OK(statuses_buf[i]);
427 } else {
428 ASSERT_TRUE(statuses_buf[i].IsCorruption());
429 }
430 }
431 }
432 }
433
434 TEST_F(BlobFileReaderTest, Malformed) {
435 // Write a blob file consisting of nothing but a header, and make sure we
436 // detect the error when we open it for reading
437
438 Options options;
439 options.env = mock_env_.get();
440 options.cf_paths.emplace_back(
441 test::PerThreadDBPath(mock_env_.get(), "BlobFileReaderTest_Malformed"),
442 0);
443 options.enable_blob_files = true;
444
445 ImmutableOptions immutable_options(options);
446
447 constexpr uint32_t column_family_id = 1;
448 constexpr uint64_t blob_file_number = 1;
449
450 {
451 constexpr bool has_ttl = false;
452 constexpr ExpirationRange expiration_range;
453
454 const std::string blob_file_path =
455 BlobFileName(immutable_options.cf_paths.front().path, blob_file_number);
456
457 std::unique_ptr<FSWritableFile> file;
458 ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file,
459 FileOptions()));
460
461 std::unique_ptr<WritableFileWriter> file_writer(
462 new WritableFileWriter(std::move(file), blob_file_path, FileOptions(),
463 immutable_options.clock));
464
465 constexpr Statistics* statistics = nullptr;
466 constexpr bool use_fsync = false;
467 constexpr bool do_flush = false;
468
469 BlobLogWriter blob_log_writer(std::move(file_writer),
470 immutable_options.clock, statistics,
471 blob_file_number, use_fsync, do_flush);
472
473 BlobLogHeader header(column_family_id, kNoCompression, has_ttl,
474 expiration_range);
475
476 ASSERT_OK(blob_log_writer.WriteHeader(header));
477 }
478
479 constexpr HistogramImpl* blob_file_read_hist = nullptr;
480
481 std::unique_ptr<BlobFileReader> reader;
482
483 ASSERT_TRUE(BlobFileReader::Create(immutable_options, FileOptions(),
484 column_family_id, blob_file_read_hist,
485 blob_file_number, nullptr /*IOTracer*/,
486 &reader)
487 .IsCorruption());
488 }
489
490 TEST_F(BlobFileReaderTest, TTL) {
491 Options options;
492 options.env = mock_env_.get();
493 options.cf_paths.emplace_back(
494 test::PerThreadDBPath(mock_env_.get(), "BlobFileReaderTest_TTL"), 0);
495 options.enable_blob_files = true;
496
497 ImmutableOptions immutable_options(options);
498
499 constexpr uint32_t column_family_id = 1;
500 constexpr bool has_ttl = true;
501 constexpr ExpirationRange expiration_range;
502 constexpr uint64_t blob_file_number = 1;
503 constexpr char key[] = "key";
504 constexpr char blob[] = "blob";
505
506 uint64_t blob_offset = 0;
507 uint64_t blob_size = 0;
508
509 WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
510 expiration_range, blob_file_number, key, blob, kNoCompression,
511 &blob_offset, &blob_size);
512
513 constexpr HistogramImpl* blob_file_read_hist = nullptr;
514
515 std::unique_ptr<BlobFileReader> reader;
516
517 ASSERT_TRUE(BlobFileReader::Create(immutable_options, FileOptions(),
518 column_family_id, blob_file_read_hist,
519 blob_file_number, nullptr /*IOTracer*/,
520 &reader)
521 .IsCorruption());
522 }
523
524 TEST_F(BlobFileReaderTest, ExpirationRangeInHeader) {
525 Options options;
526 options.env = mock_env_.get();
527 options.cf_paths.emplace_back(
528 test::PerThreadDBPath(mock_env_.get(),
529 "BlobFileReaderTest_ExpirationRangeInHeader"),
530 0);
531 options.enable_blob_files = true;
532
533 ImmutableOptions immutable_options(options);
534
535 constexpr uint32_t column_family_id = 1;
536 constexpr bool has_ttl = false;
537 const ExpirationRange expiration_range_header(
538 1, 2); // can be made constexpr when we adopt C++14
539 constexpr ExpirationRange expiration_range_footer;
540 constexpr uint64_t blob_file_number = 1;
541 constexpr char key[] = "key";
542 constexpr char blob[] = "blob";
543
544 uint64_t blob_offset = 0;
545 uint64_t blob_size = 0;
546
547 WriteBlobFile(immutable_options, column_family_id, has_ttl,
548 expiration_range_header, expiration_range_footer,
549 blob_file_number, key, blob, kNoCompression, &blob_offset,
550 &blob_size);
551
552 constexpr HistogramImpl* blob_file_read_hist = nullptr;
553
554 std::unique_ptr<BlobFileReader> reader;
555
556 ASSERT_TRUE(BlobFileReader::Create(immutable_options, FileOptions(),
557 column_family_id, blob_file_read_hist,
558 blob_file_number, nullptr /*IOTracer*/,
559 &reader)
560 .IsCorruption());
561 }
562
563 TEST_F(BlobFileReaderTest, ExpirationRangeInFooter) {
564 Options options;
565 options.env = mock_env_.get();
566 options.cf_paths.emplace_back(
567 test::PerThreadDBPath(mock_env_.get(),
568 "BlobFileReaderTest_ExpirationRangeInFooter"),
569 0);
570 options.enable_blob_files = true;
571
572 ImmutableOptions immutable_options(options);
573
574 constexpr uint32_t column_family_id = 1;
575 constexpr bool has_ttl = false;
576 constexpr ExpirationRange expiration_range_header;
577 const ExpirationRange expiration_range_footer(
578 1, 2); // can be made constexpr when we adopt C++14
579 constexpr uint64_t blob_file_number = 1;
580 constexpr char key[] = "key";
581 constexpr char blob[] = "blob";
582
583 uint64_t blob_offset = 0;
584 uint64_t blob_size = 0;
585
586 WriteBlobFile(immutable_options, column_family_id, has_ttl,
587 expiration_range_header, expiration_range_footer,
588 blob_file_number, key, blob, kNoCompression, &blob_offset,
589 &blob_size);
590
591 constexpr HistogramImpl* blob_file_read_hist = nullptr;
592
593 std::unique_ptr<BlobFileReader> reader;
594
595 ASSERT_TRUE(BlobFileReader::Create(immutable_options, FileOptions(),
596 column_family_id, blob_file_read_hist,
597 blob_file_number, nullptr /*IOTracer*/,
598 &reader)
599 .IsCorruption());
600 }
601
602 TEST_F(BlobFileReaderTest, IncorrectColumnFamily) {
603 Options options;
604 options.env = mock_env_.get();
605 options.cf_paths.emplace_back(
606 test::PerThreadDBPath(mock_env_.get(),
607 "BlobFileReaderTest_IncorrectColumnFamily"),
608 0);
609 options.enable_blob_files = true;
610
611 ImmutableOptions immutable_options(options);
612
613 constexpr uint32_t column_family_id = 1;
614 constexpr bool has_ttl = false;
615 constexpr ExpirationRange expiration_range;
616 constexpr uint64_t blob_file_number = 1;
617 constexpr char key[] = "key";
618 constexpr char blob[] = "blob";
619
620 uint64_t blob_offset = 0;
621 uint64_t blob_size = 0;
622
623 WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
624 expiration_range, blob_file_number, key, blob, kNoCompression,
625 &blob_offset, &blob_size);
626
627 constexpr HistogramImpl* blob_file_read_hist = nullptr;
628
629 std::unique_ptr<BlobFileReader> reader;
630
631 constexpr uint32_t incorrect_column_family_id = 2;
632
633 ASSERT_TRUE(BlobFileReader::Create(immutable_options, FileOptions(),
634 incorrect_column_family_id,
635 blob_file_read_hist, blob_file_number,
636 nullptr /*IOTracer*/, &reader)
637 .IsCorruption());
638 }
639
640 TEST_F(BlobFileReaderTest, BlobCRCError) {
641 Options options;
642 options.env = mock_env_.get();
643 options.cf_paths.emplace_back(
644 test::PerThreadDBPath(mock_env_.get(), "BlobFileReaderTest_BlobCRCError"),
645 0);
646 options.enable_blob_files = true;
647
648 ImmutableOptions immutable_options(options);
649
650 constexpr uint32_t column_family_id = 1;
651 constexpr bool has_ttl = false;
652 constexpr ExpirationRange expiration_range;
653 constexpr uint64_t blob_file_number = 1;
654 constexpr char key[] = "key";
655 constexpr char blob[] = "blob";
656
657 uint64_t blob_offset = 0;
658 uint64_t blob_size = 0;
659
660 WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
661 expiration_range, blob_file_number, key, blob, kNoCompression,
662 &blob_offset, &blob_size);
663
664 constexpr HistogramImpl* blob_file_read_hist = nullptr;
665
666 std::unique_ptr<BlobFileReader> reader;
667
668 ASSERT_OK(BlobFileReader::Create(
669 immutable_options, FileOptions(), column_family_id, blob_file_read_hist,
670 blob_file_number, nullptr /*IOTracer*/, &reader));
671
672 SyncPoint::GetInstance()->SetCallBack(
673 "BlobFileReader::VerifyBlob:CheckBlobCRC", [](void* arg) {
674 BlobLogRecord* const record = static_cast<BlobLogRecord*>(arg);
675 assert(record);
676
677 record->blob_crc = 0xfaceb00c;
678 });
679
680 SyncPoint::GetInstance()->EnableProcessing();
681
682 constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
683 constexpr MemoryAllocator* allocator = nullptr;
684
685 std::unique_ptr<BlobContents> value;
686 uint64_t bytes_read = 0;
687
688 ASSERT_TRUE(reader
689 ->GetBlob(ReadOptions(), key, blob_offset, blob_size,
690 kNoCompression, prefetch_buffer, allocator, &value,
691 &bytes_read)
692 .IsCorruption());
693 ASSERT_EQ(value, nullptr);
694 ASSERT_EQ(bytes_read, 0);
695
696 SyncPoint::GetInstance()->DisableProcessing();
697 SyncPoint::GetInstance()->ClearAllCallBacks();
698 }
699
700 TEST_F(BlobFileReaderTest, Compression) {
701 if (!Snappy_Supported()) {
702 return;
703 }
704
705 Options options;
706 options.env = mock_env_.get();
707 options.cf_paths.emplace_back(
708 test::PerThreadDBPath(mock_env_.get(), "BlobFileReaderTest_Compression"),
709 0);
710 options.enable_blob_files = true;
711
712 ImmutableOptions immutable_options(options);
713
714 constexpr uint32_t column_family_id = 1;
715 constexpr bool has_ttl = false;
716 constexpr ExpirationRange expiration_range;
717 constexpr uint64_t blob_file_number = 1;
718 constexpr char key[] = "key";
719 constexpr char blob[] = "blob";
720
721 uint64_t blob_offset = 0;
722 uint64_t blob_size = 0;
723
724 WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
725 expiration_range, blob_file_number, key, blob,
726 kSnappyCompression, &blob_offset, &blob_size);
727
728 constexpr HistogramImpl* blob_file_read_hist = nullptr;
729
730 std::unique_ptr<BlobFileReader> reader;
731
732 ASSERT_OK(BlobFileReader::Create(
733 immutable_options, FileOptions(), column_family_id, blob_file_read_hist,
734 blob_file_number, nullptr /*IOTracer*/, &reader));
735
736 // Make sure the blob can be retrieved with and without checksum verification
737 ReadOptions read_options;
738 read_options.verify_checksums = false;
739
740 constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
741 constexpr MemoryAllocator* allocator = nullptr;
742
743 {
744 std::unique_ptr<BlobContents> value;
745 uint64_t bytes_read = 0;
746
747 ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
748 kSnappyCompression, prefetch_buffer, allocator,
749 &value, &bytes_read));
750 ASSERT_NE(value, nullptr);
751 ASSERT_EQ(value->data(), blob);
752 ASSERT_EQ(bytes_read, blob_size);
753 }
754
755 read_options.verify_checksums = true;
756
757 {
758 std::unique_ptr<BlobContents> value;
759 uint64_t bytes_read = 0;
760
761 ASSERT_OK(reader->GetBlob(read_options, key, blob_offset, blob_size,
762 kSnappyCompression, prefetch_buffer, allocator,
763 &value, &bytes_read));
764 ASSERT_NE(value, nullptr);
765 ASSERT_EQ(value->data(), blob);
766
767 constexpr uint64_t key_size = sizeof(key) - 1;
768 ASSERT_EQ(bytes_read,
769 BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) +
770 blob_size);
771 }
772 }
773
774 TEST_F(BlobFileReaderTest, UncompressionError) {
775 if (!Snappy_Supported()) {
776 return;
777 }
778
779 Options options;
780 options.env = mock_env_.get();
781 options.cf_paths.emplace_back(
782 test::PerThreadDBPath(mock_env_.get(),
783 "BlobFileReaderTest_UncompressionError"),
784 0);
785 options.enable_blob_files = true;
786
787 ImmutableOptions immutable_options(options);
788
789 constexpr uint32_t column_family_id = 1;
790 constexpr bool has_ttl = false;
791 constexpr ExpirationRange expiration_range;
792 constexpr uint64_t blob_file_number = 1;
793 constexpr char key[] = "key";
794 constexpr char blob[] = "blob";
795
796 uint64_t blob_offset = 0;
797 uint64_t blob_size = 0;
798
799 WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
800 expiration_range, blob_file_number, key, blob,
801 kSnappyCompression, &blob_offset, &blob_size);
802
803 constexpr HistogramImpl* blob_file_read_hist = nullptr;
804
805 std::unique_ptr<BlobFileReader> reader;
806
807 ASSERT_OK(BlobFileReader::Create(
808 immutable_options, FileOptions(), column_family_id, blob_file_read_hist,
809 blob_file_number, nullptr /*IOTracer*/, &reader));
810
811 SyncPoint::GetInstance()->SetCallBack(
812 "BlobFileReader::UncompressBlobIfNeeded:TamperWithResult", [](void* arg) {
813 CacheAllocationPtr* const output =
814 static_cast<CacheAllocationPtr*>(arg);
815 assert(output);
816
817 output->reset();
818 });
819
820 SyncPoint::GetInstance()->EnableProcessing();
821
822 constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
823 constexpr MemoryAllocator* allocator = nullptr;
824
825 std::unique_ptr<BlobContents> value;
826 uint64_t bytes_read = 0;
827
828 ASSERT_TRUE(reader
829 ->GetBlob(ReadOptions(), key, blob_offset, blob_size,
830 kSnappyCompression, prefetch_buffer, allocator,
831 &value, &bytes_read)
832 .IsCorruption());
833 ASSERT_EQ(value, nullptr);
834 ASSERT_EQ(bytes_read, 0);
835
836 SyncPoint::GetInstance()->DisableProcessing();
837 SyncPoint::GetInstance()->ClearAllCallBacks();
838 }
839
840 class BlobFileReaderIOErrorTest
841 : public testing::Test,
842 public testing::WithParamInterface<std::string> {
843 protected:
844 BlobFileReaderIOErrorTest() : sync_point_(GetParam()) {
845 mock_env_.reset(MockEnv::Create(Env::Default()));
846 fault_injection_env_.reset(new FaultInjectionTestEnv(mock_env_.get()));
847 }
848
849 std::unique_ptr<Env> mock_env_;
850 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
851 std::string sync_point_;
852 };
853
854 INSTANTIATE_TEST_CASE_P(BlobFileReaderTest, BlobFileReaderIOErrorTest,
855 ::testing::ValuesIn(std::vector<std::string>{
856 "BlobFileReader::OpenFile:GetFileSize",
857 "BlobFileReader::OpenFile:NewRandomAccessFile",
858 "BlobFileReader::ReadHeader:ReadFromFile",
859 "BlobFileReader::ReadFooter:ReadFromFile",
860 "BlobFileReader::GetBlob:ReadFromFile"}));
861
862 TEST_P(BlobFileReaderIOErrorTest, IOError) {
863 // Simulates an I/O error during the specified step
864
865 Options options;
866 options.env = fault_injection_env_.get();
867 options.cf_paths.emplace_back(
868 test::PerThreadDBPath(fault_injection_env_.get(),
869 "BlobFileReaderIOErrorTest_IOError"),
870 0);
871 options.enable_blob_files = true;
872
873 ImmutableOptions immutable_options(options);
874
875 constexpr uint32_t column_family_id = 1;
876 constexpr bool has_ttl = false;
877 constexpr ExpirationRange expiration_range;
878 constexpr uint64_t blob_file_number = 1;
879 constexpr char key[] = "key";
880 constexpr char blob[] = "blob";
881
882 uint64_t blob_offset = 0;
883 uint64_t blob_size = 0;
884
885 WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
886 expiration_range, blob_file_number, key, blob, kNoCompression,
887 &blob_offset, &blob_size);
888
889 SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
890 fault_injection_env_->SetFilesystemActive(false,
891 Status::IOError(sync_point_));
892 });
893 SyncPoint::GetInstance()->EnableProcessing();
894
895 constexpr HistogramImpl* blob_file_read_hist = nullptr;
896
897 std::unique_ptr<BlobFileReader> reader;
898
899 const Status s = BlobFileReader::Create(
900 immutable_options, FileOptions(), column_family_id, blob_file_read_hist,
901 blob_file_number, nullptr /*IOTracer*/, &reader);
902
903 const bool fail_during_create =
904 (sync_point_ != "BlobFileReader::GetBlob:ReadFromFile");
905
906 if (fail_during_create) {
907 ASSERT_TRUE(s.IsIOError());
908 } else {
909 ASSERT_OK(s);
910
911 constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
912 constexpr MemoryAllocator* allocator = nullptr;
913
914 std::unique_ptr<BlobContents> value;
915 uint64_t bytes_read = 0;
916
917 ASSERT_TRUE(reader
918 ->GetBlob(ReadOptions(), key, blob_offset, blob_size,
919 kNoCompression, prefetch_buffer, allocator,
920 &value, &bytes_read)
921 .IsIOError());
922 ASSERT_EQ(value, nullptr);
923 ASSERT_EQ(bytes_read, 0);
924 }
925
926 SyncPoint::GetInstance()->DisableProcessing();
927 SyncPoint::GetInstance()->ClearAllCallBacks();
928 }
929
930 class BlobFileReaderDecodingErrorTest
931 : public testing::Test,
932 public testing::WithParamInterface<std::string> {
933 protected:
934 BlobFileReaderDecodingErrorTest() : sync_point_(GetParam()) {
935 mock_env_.reset(MockEnv::Create(Env::Default()));
936 }
937
938 std::unique_ptr<Env> mock_env_;
939 std::string sync_point_;
940 };
941
942 INSTANTIATE_TEST_CASE_P(BlobFileReaderTest, BlobFileReaderDecodingErrorTest,
943 ::testing::ValuesIn(std::vector<std::string>{
944 "BlobFileReader::ReadHeader:TamperWithResult",
945 "BlobFileReader::ReadFooter:TamperWithResult",
946 "BlobFileReader::GetBlob:TamperWithResult"}));
947
948 TEST_P(BlobFileReaderDecodingErrorTest, DecodingError) {
949 Options options;
950 options.env = mock_env_.get();
951 options.cf_paths.emplace_back(
952 test::PerThreadDBPath(mock_env_.get(),
953 "BlobFileReaderDecodingErrorTest_DecodingError"),
954 0);
955 options.enable_blob_files = true;
956
957 ImmutableOptions immutable_options(options);
958
959 constexpr uint32_t column_family_id = 1;
960 constexpr bool has_ttl = false;
961 constexpr ExpirationRange expiration_range;
962 constexpr uint64_t blob_file_number = 1;
963 constexpr char key[] = "key";
964 constexpr char blob[] = "blob";
965
966 uint64_t blob_offset = 0;
967 uint64_t blob_size = 0;
968
969 WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
970 expiration_range, blob_file_number, key, blob, kNoCompression,
971 &blob_offset, &blob_size);
972
973 SyncPoint::GetInstance()->SetCallBack(sync_point_, [](void* arg) {
974 Slice* const slice = static_cast<Slice*>(arg);
975 assert(slice);
976 assert(!slice->empty());
977
978 slice->remove_prefix(1);
979 });
980
981 SyncPoint::GetInstance()->EnableProcessing();
982
983 constexpr HistogramImpl* blob_file_read_hist = nullptr;
984
985 std::unique_ptr<BlobFileReader> reader;
986
987 const Status s = BlobFileReader::Create(
988 immutable_options, FileOptions(), column_family_id, blob_file_read_hist,
989 blob_file_number, nullptr /*IOTracer*/, &reader);
990
991 const bool fail_during_create =
992 sync_point_ != "BlobFileReader::GetBlob:TamperWithResult";
993
994 if (fail_during_create) {
995 ASSERT_TRUE(s.IsCorruption());
996 } else {
997 ASSERT_OK(s);
998
999 constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
1000 constexpr MemoryAllocator* allocator = nullptr;
1001
1002 std::unique_ptr<BlobContents> value;
1003 uint64_t bytes_read = 0;
1004
1005 ASSERT_TRUE(reader
1006 ->GetBlob(ReadOptions(), key, blob_offset, blob_size,
1007 kNoCompression, prefetch_buffer, allocator,
1008 &value, &bytes_read)
1009 .IsCorruption());
1010 ASSERT_EQ(value, nullptr);
1011 ASSERT_EQ(bytes_read, 0);
1012 }
1013
1014 SyncPoint::GetInstance()->DisableProcessing();
1015 SyncPoint::GetInstance()->ClearAllCallBacks();
1016 }
1017
1018 } // namespace ROCKSDB_NAMESPACE
1019
1020 int main(int argc, char** argv) {
1021 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1022 ::testing::InitGoogleTest(&argc, argv);
1023 return RUN_ALL_TESTS();
1024 }