]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/blob/db_blob_basic_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / blob / db_blob_basic_test.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include <array>
7 #include <sstream>
8 #include <string>
9
10 #include "cache/compressed_secondary_cache.h"
11 #include "db/blob/blob_index.h"
12 #include "db/blob/blob_log_format.h"
13 #include "db/db_test_util.h"
14 #include "port/stack_trace.h"
15 #include "test_util/sync_point.h"
16 #include "utilities/fault_injection_env.h"
17
18 namespace ROCKSDB_NAMESPACE {
19
20 class DBBlobBasicTest : public DBTestBase {
21 protected:
22 DBBlobBasicTest()
23 : DBTestBase("db_blob_basic_test", /* env_do_fsync */ false) {}
24 };
25
26 TEST_F(DBBlobBasicTest, GetBlob) {
27 Options options = GetDefaultOptions();
28 options.enable_blob_files = true;
29 options.min_blob_size = 0;
30
31 Reopen(options);
32
33 constexpr char key[] = "key";
34 constexpr char blob_value[] = "blob_value";
35
36 ASSERT_OK(Put(key, blob_value));
37
38 ASSERT_OK(Flush());
39
40 ASSERT_EQ(Get(key), blob_value);
41
42 // Try again with no I/O allowed. The table and the necessary blocks should
43 // already be in their respective caches; however, the blob itself can only be
44 // read from the blob file, so the read should return Incomplete.
45 ReadOptions read_options;
46 read_options.read_tier = kBlockCacheTier;
47
48 PinnableSlice result;
49 ASSERT_TRUE(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result)
50 .IsIncomplete());
51 }
52
53 TEST_F(DBBlobBasicTest, GetBlobFromCache) {
54 Options options = GetDefaultOptions();
55
56 LRUCacheOptions co;
57 co.capacity = 2 << 20; // 2MB
58 co.num_shard_bits = 2;
59 co.metadata_charge_policy = kDontChargeCacheMetadata;
60 auto backing_cache = NewLRUCache(co);
61
62 options.enable_blob_files = true;
63 options.blob_cache = backing_cache;
64
65 BlockBasedTableOptions block_based_options;
66 block_based_options.no_block_cache = false;
67 block_based_options.block_cache = backing_cache;
68 block_based_options.cache_index_and_filter_blocks = true;
69 options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
70
71 Reopen(options);
72
73 constexpr char key[] = "key";
74 constexpr char blob_value[] = "blob_value";
75
76 ASSERT_OK(Put(key, blob_value));
77
78 ASSERT_OK(Flush());
79
80 ReadOptions read_options;
81
82 read_options.fill_cache = false;
83
84 {
85 PinnableSlice result;
86
87 read_options.read_tier = kReadAllTier;
88 ASSERT_OK(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result));
89 ASSERT_EQ(result, blob_value);
90
91 result.Reset();
92 read_options.read_tier = kBlockCacheTier;
93
94 // Try again with no I/O allowed. Since we didn't re-fill the cache, the
95 // blob itself can only be read from the blob file, so the read should
96 // return Incomplete.
97 ASSERT_TRUE(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result)
98 .IsIncomplete());
99 ASSERT_TRUE(result.empty());
100 }
101
102 read_options.fill_cache = true;
103
104 {
105 PinnableSlice result;
106
107 read_options.read_tier = kReadAllTier;
108 ASSERT_OK(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result));
109 ASSERT_EQ(result, blob_value);
110
111 result.Reset();
112 read_options.read_tier = kBlockCacheTier;
113
114 // Try again with no I/O allowed. The table and the necessary blocks/blobs
115 // should already be in their respective caches.
116 ASSERT_OK(db_->Get(read_options, db_->DefaultColumnFamily(), key, &result));
117 ASSERT_EQ(result, blob_value);
118 }
119 }
120
121 TEST_F(DBBlobBasicTest, IterateBlobsFromCache) {
122 Options options = GetDefaultOptions();
123
124 LRUCacheOptions co;
125 co.capacity = 2 << 20; // 2MB
126 co.num_shard_bits = 2;
127 co.metadata_charge_policy = kDontChargeCacheMetadata;
128 auto backing_cache = NewLRUCache(co);
129
130 options.enable_blob_files = true;
131 options.blob_cache = backing_cache;
132
133 BlockBasedTableOptions block_based_options;
134 block_based_options.no_block_cache = false;
135 block_based_options.block_cache = backing_cache;
136 block_based_options.cache_index_and_filter_blocks = true;
137 options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
138
139 options.statistics = CreateDBStatistics();
140
141 Reopen(options);
142
143 int num_blobs = 5;
144 std::vector<std::string> keys;
145 std::vector<std::string> blobs;
146
147 for (int i = 0; i < num_blobs; ++i) {
148 keys.push_back("key" + std::to_string(i));
149 blobs.push_back("blob" + std::to_string(i));
150 ASSERT_OK(Put(keys[i], blobs[i]));
151 }
152 ASSERT_OK(Flush());
153
154 ReadOptions read_options;
155
156 {
157 read_options.fill_cache = false;
158 read_options.read_tier = kReadAllTier;
159
160 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
161 ASSERT_OK(iter->status());
162
163 int i = 0;
164 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
165 ASSERT_OK(iter->status());
166 ASSERT_EQ(iter->key().ToString(), keys[i]);
167 ASSERT_EQ(iter->value().ToString(), blobs[i]);
168 ++i;
169 }
170 ASSERT_EQ(i, num_blobs);
171 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD), 0);
172 }
173
174 {
175 read_options.fill_cache = false;
176 read_options.read_tier = kBlockCacheTier;
177
178 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
179 ASSERT_OK(iter->status());
180
181 // Try again with no I/O allowed. Since we didn't re-fill the cache,
182 // the blob itself can only be read from the blob file, so iter->Valid()
183 // should be false.
184 iter->SeekToFirst();
185 ASSERT_NOK(iter->status());
186 ASSERT_FALSE(iter->Valid());
187 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD), 0);
188 }
189
190 {
191 read_options.fill_cache = true;
192 read_options.read_tier = kReadAllTier;
193
194 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
195 ASSERT_OK(iter->status());
196
197 // Read blobs from the file and refill the cache.
198 int i = 0;
199 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
200 ASSERT_OK(iter->status());
201 ASSERT_EQ(iter->key().ToString(), keys[i]);
202 ASSERT_EQ(iter->value().ToString(), blobs[i]);
203 ++i;
204 }
205 ASSERT_EQ(i, num_blobs);
206 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD),
207 num_blobs);
208 }
209
210 {
211 read_options.fill_cache = false;
212 read_options.read_tier = kBlockCacheTier;
213
214 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
215 ASSERT_OK(iter->status());
216
217 // Try again with no I/O allowed. The table and the necessary blocks/blobs
218 // should already be in their respective caches.
219 int i = 0;
220 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
221 ASSERT_OK(iter->status());
222 ASSERT_EQ(iter->key().ToString(), keys[i]);
223 ASSERT_EQ(iter->value().ToString(), blobs[i]);
224 ++i;
225 }
226 ASSERT_EQ(i, num_blobs);
227 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD), 0);
228 }
229 }
230
231 TEST_F(DBBlobBasicTest, IterateBlobsFromCachePinning) {
232 constexpr size_t min_blob_size = 6;
233
234 Options options = GetDefaultOptions();
235
236 LRUCacheOptions cache_options;
237 cache_options.capacity = 2048;
238 cache_options.num_shard_bits = 0;
239 cache_options.metadata_charge_policy = kDontChargeCacheMetadata;
240
241 options.blob_cache = NewLRUCache(cache_options);
242 options.enable_blob_files = true;
243 options.min_blob_size = min_blob_size;
244
245 Reopen(options);
246
247 // Put then iterate over three key-values. The second value is below the size
248 // limit and is thus stored inline; the other two are stored separately as
249 // blobs. We expect to have something pinned in the cache iff we are
250 // positioned on a blob.
251
252 constexpr char first_key[] = "first_key";
253 constexpr char first_value[] = "long_value";
254 static_assert(sizeof(first_value) - 1 >= min_blob_size,
255 "first_value too short to be stored as blob");
256
257 ASSERT_OK(Put(first_key, first_value));
258
259 constexpr char second_key[] = "second_key";
260 constexpr char second_value[] = "short";
261 static_assert(sizeof(second_value) - 1 < min_blob_size,
262 "second_value too long to be inlined");
263
264 ASSERT_OK(Put(second_key, second_value));
265
266 constexpr char third_key[] = "third_key";
267 constexpr char third_value[] = "other_long_value";
268 static_assert(sizeof(third_value) - 1 >= min_blob_size,
269 "third_value too short to be stored as blob");
270
271 ASSERT_OK(Put(third_key, third_value));
272
273 ASSERT_OK(Flush());
274
275 {
276 ReadOptions read_options;
277 read_options.fill_cache = true;
278
279 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
280
281 iter->SeekToFirst();
282 ASSERT_TRUE(iter->Valid());
283 ASSERT_OK(iter->status());
284 ASSERT_EQ(iter->key(), first_key);
285 ASSERT_EQ(iter->value(), first_value);
286
287 iter->Next();
288 ASSERT_TRUE(iter->Valid());
289 ASSERT_OK(iter->status());
290 ASSERT_EQ(iter->key(), second_key);
291 ASSERT_EQ(iter->value(), second_value);
292
293 iter->Next();
294 ASSERT_TRUE(iter->Valid());
295 ASSERT_OK(iter->status());
296 ASSERT_EQ(iter->key(), third_key);
297 ASSERT_EQ(iter->value(), third_value);
298
299 iter->Next();
300 ASSERT_FALSE(iter->Valid());
301 ASSERT_OK(iter->status());
302 }
303
304 {
305 ReadOptions read_options;
306 read_options.fill_cache = false;
307 read_options.read_tier = kBlockCacheTier;
308
309 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
310
311 iter->SeekToFirst();
312 ASSERT_TRUE(iter->Valid());
313 ASSERT_OK(iter->status());
314 ASSERT_EQ(iter->key(), first_key);
315 ASSERT_EQ(iter->value(), first_value);
316 ASSERT_GT(options.blob_cache->GetPinnedUsage(), 0);
317
318 iter->Next();
319 ASSERT_TRUE(iter->Valid());
320 ASSERT_OK(iter->status());
321 ASSERT_EQ(iter->key(), second_key);
322 ASSERT_EQ(iter->value(), second_value);
323 ASSERT_EQ(options.blob_cache->GetPinnedUsage(), 0);
324
325 iter->Next();
326 ASSERT_TRUE(iter->Valid());
327 ASSERT_OK(iter->status());
328 ASSERT_EQ(iter->key(), third_key);
329 ASSERT_EQ(iter->value(), third_value);
330 ASSERT_GT(options.blob_cache->GetPinnedUsage(), 0);
331
332 iter->Next();
333 ASSERT_FALSE(iter->Valid());
334 ASSERT_OK(iter->status());
335 ASSERT_EQ(options.blob_cache->GetPinnedUsage(), 0);
336 }
337
338 {
339 ReadOptions read_options;
340 read_options.fill_cache = false;
341 read_options.read_tier = kBlockCacheTier;
342
343 std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
344
345 iter->SeekToLast();
346 ASSERT_TRUE(iter->Valid());
347 ASSERT_OK(iter->status());
348 ASSERT_EQ(iter->key(), third_key);
349 ASSERT_EQ(iter->value(), third_value);
350 ASSERT_GT(options.blob_cache->GetPinnedUsage(), 0);
351
352 iter->Prev();
353 ASSERT_TRUE(iter->Valid());
354 ASSERT_OK(iter->status());
355 ASSERT_EQ(iter->key(), second_key);
356 ASSERT_EQ(iter->value(), second_value);
357 ASSERT_EQ(options.blob_cache->GetPinnedUsage(), 0);
358
359 iter->Prev();
360 ASSERT_TRUE(iter->Valid());
361 ASSERT_OK(iter->status());
362 ASSERT_EQ(iter->key(), first_key);
363 ASSERT_EQ(iter->value(), first_value);
364 ASSERT_GT(options.blob_cache->GetPinnedUsage(), 0);
365
366 iter->Prev();
367 ASSERT_FALSE(iter->Valid());
368 ASSERT_OK(iter->status());
369 ASSERT_EQ(options.blob_cache->GetPinnedUsage(), 0);
370 }
371 }
372
373 TEST_F(DBBlobBasicTest, MultiGetBlobs) {
374 constexpr size_t min_blob_size = 6;
375
376 Options options = GetDefaultOptions();
377 options.enable_blob_files = true;
378 options.min_blob_size = min_blob_size;
379
380 Reopen(options);
381
382 // Put then retrieve three key-values. The first value is below the size limit
383 // and is thus stored inline; the other two are stored separately as blobs.
384 constexpr size_t num_keys = 3;
385
386 constexpr char first_key[] = "first_key";
387 constexpr char first_value[] = "short";
388 static_assert(sizeof(first_value) - 1 < min_blob_size,
389 "first_value too long to be inlined");
390
391 ASSERT_OK(Put(first_key, first_value));
392
393 constexpr char second_key[] = "second_key";
394 constexpr char second_value[] = "long_value";
395 static_assert(sizeof(second_value) - 1 >= min_blob_size,
396 "second_value too short to be stored as blob");
397
398 ASSERT_OK(Put(second_key, second_value));
399
400 constexpr char third_key[] = "third_key";
401 constexpr char third_value[] = "other_long_value";
402 static_assert(sizeof(third_value) - 1 >= min_blob_size,
403 "third_value too short to be stored as blob");
404
405 ASSERT_OK(Put(third_key, third_value));
406
407 ASSERT_OK(Flush());
408
409 ReadOptions read_options;
410
411 std::array<Slice, num_keys> keys{{first_key, second_key, third_key}};
412
413 {
414 std::array<PinnableSlice, num_keys> values;
415 std::array<Status, num_keys> statuses;
416
417 db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
418 &values[0], &statuses[0]);
419
420 ASSERT_OK(statuses[0]);
421 ASSERT_EQ(values[0], first_value);
422
423 ASSERT_OK(statuses[1]);
424 ASSERT_EQ(values[1], second_value);
425
426 ASSERT_OK(statuses[2]);
427 ASSERT_EQ(values[2], third_value);
428 }
429
430 // Try again with no I/O allowed. The table and the necessary blocks should
431 // already be in their respective caches. The first (inlined) value should be
432 // successfully read; however, the two blob values could only be read from the
433 // blob file, so for those the read should return Incomplete.
434 read_options.read_tier = kBlockCacheTier;
435
436 {
437 std::array<PinnableSlice, num_keys> values;
438 std::array<Status, num_keys> statuses;
439
440 db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
441 &values[0], &statuses[0]);
442
443 ASSERT_OK(statuses[0]);
444 ASSERT_EQ(values[0], first_value);
445
446 ASSERT_TRUE(statuses[1].IsIncomplete());
447
448 ASSERT_TRUE(statuses[2].IsIncomplete());
449 }
450 }
451
452 TEST_F(DBBlobBasicTest, MultiGetBlobsFromCache) {
453 Options options = GetDefaultOptions();
454
455 LRUCacheOptions co;
456 co.capacity = 2 << 20; // 2MB
457 co.num_shard_bits = 2;
458 co.metadata_charge_policy = kDontChargeCacheMetadata;
459 auto backing_cache = NewLRUCache(co);
460
461 constexpr size_t min_blob_size = 6;
462 options.min_blob_size = min_blob_size;
463 options.create_if_missing = true;
464 options.enable_blob_files = true;
465 options.blob_cache = backing_cache;
466
467 BlockBasedTableOptions block_based_options;
468 block_based_options.no_block_cache = false;
469 block_based_options.block_cache = backing_cache;
470 block_based_options.cache_index_and_filter_blocks = true;
471 options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
472
473 DestroyAndReopen(options);
474
475 // Put then retrieve three key-values. The first value is below the size limit
476 // and is thus stored inline; the other two are stored separately as blobs.
477 constexpr size_t num_keys = 3;
478
479 constexpr char first_key[] = "first_key";
480 constexpr char first_value[] = "short";
481 static_assert(sizeof(first_value) - 1 < min_blob_size,
482 "first_value too long to be inlined");
483
484 ASSERT_OK(Put(first_key, first_value));
485
486 constexpr char second_key[] = "second_key";
487 constexpr char second_value[] = "long_value";
488 static_assert(sizeof(second_value) - 1 >= min_blob_size,
489 "second_value too short to be stored as blob");
490
491 ASSERT_OK(Put(second_key, second_value));
492
493 constexpr char third_key[] = "third_key";
494 constexpr char third_value[] = "other_long_value";
495 static_assert(sizeof(third_value) - 1 >= min_blob_size,
496 "third_value too short to be stored as blob");
497
498 ASSERT_OK(Put(third_key, third_value));
499
500 ASSERT_OK(Flush());
501
502 ReadOptions read_options;
503 read_options.fill_cache = false;
504
505 std::array<Slice, num_keys> keys{{first_key, second_key, third_key}};
506
507 {
508 std::array<PinnableSlice, num_keys> values;
509 std::array<Status, num_keys> statuses;
510
511 db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
512 &values[0], &statuses[0]);
513
514 ASSERT_OK(statuses[0]);
515 ASSERT_EQ(values[0], first_value);
516
517 ASSERT_OK(statuses[1]);
518 ASSERT_EQ(values[1], second_value);
519
520 ASSERT_OK(statuses[2]);
521 ASSERT_EQ(values[2], third_value);
522 }
523
524 // Try again with no I/O allowed. The first (inlined) value should be
525 // successfully read; however, the two blob values could only be read from the
526 // blob file, so for those the read should return Incomplete.
527 read_options.read_tier = kBlockCacheTier;
528
529 {
530 std::array<PinnableSlice, num_keys> values;
531 std::array<Status, num_keys> statuses;
532
533 db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
534 &values[0], &statuses[0]);
535
536 ASSERT_OK(statuses[0]);
537 ASSERT_EQ(values[0], first_value);
538
539 ASSERT_TRUE(statuses[1].IsIncomplete());
540
541 ASSERT_TRUE(statuses[2].IsIncomplete());
542 }
543
544 // Fill the cache when reading blobs from the blob file.
545 read_options.read_tier = kReadAllTier;
546 read_options.fill_cache = true;
547
548 {
549 std::array<PinnableSlice, num_keys> values;
550 std::array<Status, num_keys> statuses;
551
552 db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
553 &values[0], &statuses[0]);
554
555 ASSERT_OK(statuses[0]);
556 ASSERT_EQ(values[0], first_value);
557
558 ASSERT_OK(statuses[1]);
559 ASSERT_EQ(values[1], second_value);
560
561 ASSERT_OK(statuses[2]);
562 ASSERT_EQ(values[2], third_value);
563 }
564
565 // Try again with no I/O allowed. All blobs should be successfully read from
566 // the cache.
567 read_options.read_tier = kBlockCacheTier;
568
569 {
570 std::array<PinnableSlice, num_keys> values;
571 std::array<Status, num_keys> statuses;
572
573 db_->MultiGet(read_options, db_->DefaultColumnFamily(), num_keys, &keys[0],
574 &values[0], &statuses[0]);
575
576 ASSERT_OK(statuses[0]);
577 ASSERT_EQ(values[0], first_value);
578
579 ASSERT_OK(statuses[1]);
580 ASSERT_EQ(values[1], second_value);
581
582 ASSERT_OK(statuses[2]);
583 ASSERT_EQ(values[2], third_value);
584 }
585 }
586
587 #ifndef ROCKSDB_LITE
588 TEST_F(DBBlobBasicTest, MultiGetWithDirectIO) {
589 Options options = GetDefaultOptions();
590
591 // First, create an external SST file ["b"].
592 const std::string file_path = dbname_ + "/test.sst";
593 {
594 SstFileWriter sst_file_writer(EnvOptions(), GetDefaultOptions());
595 Status s = sst_file_writer.Open(file_path);
596 ASSERT_OK(s);
597 ASSERT_OK(sst_file_writer.Put("b", "b_value"));
598 ASSERT_OK(sst_file_writer.Finish());
599 }
600
601 options.enable_blob_files = true;
602 options.min_blob_size = 1000;
603 options.use_direct_reads = true;
604 options.allow_ingest_behind = true;
605
606 // Open DB with fixed-prefix sst-partitioner so that compaction will cut
607 // new table file when encountering a new key whose 1-byte prefix changes.
608 constexpr size_t key_len = 1;
609 options.sst_partitioner_factory =
610 NewSstPartitionerFixedPrefixFactory(key_len);
611
612 Status s = TryReopen(options);
613 if (s.IsInvalidArgument()) {
614 ROCKSDB_GTEST_SKIP("This test requires direct IO support");
615 return;
616 }
617 ASSERT_OK(s);
618
619 constexpr size_t num_keys = 3;
620 constexpr size_t blob_size = 3000;
621
622 constexpr char first_key[] = "a";
623 const std::string first_blob(blob_size, 'a');
624 ASSERT_OK(Put(first_key, first_blob));
625
626 constexpr char second_key[] = "b";
627 const std::string second_blob(2 * blob_size, 'b');
628 ASSERT_OK(Put(second_key, second_blob));
629
630 constexpr char third_key[] = "d";
631 const std::string third_blob(blob_size, 'd');
632 ASSERT_OK(Put(third_key, third_blob));
633
634 // first_blob, second_blob and third_blob in the same blob file.
635 // SST Blob file
636 // L0 ["a", "b", "d"] |'aaaa', 'bbbb', 'dddd'|
637 // | | | ^ ^ ^
638 // | | | | | |
639 // | | +---------|-------|--------+
640 // | +-----------------|-------+
641 // +-------------------------+
642 ASSERT_OK(Flush());
643
644 constexpr char fourth_key[] = "c";
645 const std::string fourth_blob(blob_size, 'c');
646 ASSERT_OK(Put(fourth_key, fourth_blob));
647 // fourth_blob in another blob file.
648 // SST Blob file SST Blob file
649 // L0 ["a", "b", "d"] |'aaaa', 'bbbb', 'dddd'| ["c"] |'cccc'|
650 // | | | ^ ^ ^ | ^
651 // | | | | | | | |
652 // | | +---------|-------|--------+ +-------+
653 // | +-----------------|-------+
654 // +-------------------------+
655 ASSERT_OK(Flush());
656
657 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
658 /*end=*/nullptr));
659
660 // Due to the above sst partitioner, we get 4 L1 files. The blob files are
661 // unchanged.
662 // |'aaaa', 'bbbb', 'dddd'| |'cccc'|
663 // ^ ^ ^ ^
664 // | | | |
665 // L0 | | | |
666 // L1 ["a"] ["b"] ["c"] | | ["d"] |
667 // | | | | | |
668 // | | +---------|-------|---------------+
669 // | +-----------------|-------+
670 // +-------------------------+
671 ASSERT_EQ(4, NumTableFilesAtLevel(/*level=*/1));
672
673 {
674 // Ingest the external SST file into bottommost level.
675 std::vector<std::string> ext_files{file_path};
676 IngestExternalFileOptions opts;
677 opts.ingest_behind = true;
678 ASSERT_OK(
679 db_->IngestExternalFile(db_->DefaultColumnFamily(), ext_files, opts));
680 }
681
682 // Now the database becomes as follows.
683 // |'aaaa', 'bbbb', 'dddd'| |'cccc'|
684 // ^ ^ ^ ^
685 // | | | |
686 // L0 | | | |
687 // L1 ["a"] ["b"] ["c"] | | ["d"] |
688 // | | | | | |
689 // | | +---------|-------|---------------+
690 // | +-----------------|-------+
691 // +-------------------------+
692 //
693 // L6 ["b"]
694
695 {
696 // Compact ["b"] to bottommost level.
697 Slice begin = Slice(second_key);
698 Slice end = Slice(second_key);
699 CompactRangeOptions cro;
700 cro.bottommost_level_compaction = BottommostLevelCompaction::kForce;
701 ASSERT_OK(db_->CompactRange(cro, &begin, &end));
702 }
703
704 // |'aaaa', 'bbbb', 'dddd'| |'cccc'|
705 // ^ ^ ^ ^
706 // | | | |
707 // L0 | | | |
708 // L1 ["a"] ["c"] | | ["d"] |
709 // | | | | |
710 // | +---------|-------|---------------+
711 // | +-----------------|-------+
712 // +-------|-----------------+
713 // |
714 // L6 ["b"]
715 ASSERT_EQ(3, NumTableFilesAtLevel(/*level=*/1));
716 ASSERT_EQ(1, NumTableFilesAtLevel(/*level=*/6));
717
718 bool called = false;
719 SyncPoint::GetInstance()->ClearAllCallBacks();
720 SyncPoint::GetInstance()->SetCallBack(
721 "RandomAccessFileReader::MultiRead:AlignedReqs", [&](void* arg) {
722 auto* aligned_reqs = static_cast<std::vector<FSReadRequest>*>(arg);
723 assert(aligned_reqs);
724 ASSERT_EQ(1, aligned_reqs->size());
725 called = true;
726 });
727 SyncPoint::GetInstance()->EnableProcessing();
728
729 std::array<Slice, num_keys> keys{{first_key, third_key, second_key}};
730
731 {
732 std::array<PinnableSlice, num_keys> values;
733 std::array<Status, num_keys> statuses;
734
735 // The MultiGet(), when constructing the KeyContexts, will process the keys
736 // in such order: a, d, b. The reason is that ["a"] and ["d"] are in L1,
737 // while ["b"] resides in L6.
738 // Consequently, the original FSReadRequest list prepared by
739 // Version::MultiGetblob() will be for "a", "d" and "b". It is unsorted as
740 // follows:
741 //
742 // ["a", offset=30, len=3033],
743 // ["d", offset=9096, len=3033],
744 // ["b", offset=3063, len=6033]
745 //
746 // If we do not sort them before calling MultiRead() in DirectIO, then the
747 // underlying IO merging logic will yield two requests.
748 //
749 // [offset=0, len=4096] (for "a")
750 // [offset=0, len=12288] (result of merging the request for "d" and "b")
751 //
752 // We need to sort them in Version::MultiGetBlob() so that the underlying
753 // IO merging logic in DirectIO mode works as expected. The correct
754 // behavior will be one aligned request:
755 //
756 // [offset=0, len=12288]
757
758 db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0],
759 &values[0], &statuses[0]);
760
761 SyncPoint::GetInstance()->DisableProcessing();
762 SyncPoint::GetInstance()->ClearAllCallBacks();
763
764 ASSERT_TRUE(called);
765
766 ASSERT_OK(statuses[0]);
767 ASSERT_EQ(values[0], first_blob);
768
769 ASSERT_OK(statuses[1]);
770 ASSERT_EQ(values[1], third_blob);
771
772 ASSERT_OK(statuses[2]);
773 ASSERT_EQ(values[2], second_blob);
774 }
775 }
776 #endif // !ROCKSDB_LITE
777
778 TEST_F(DBBlobBasicTest, MultiGetBlobsFromMultipleFiles) {
779 Options options = GetDefaultOptions();
780
781 LRUCacheOptions co;
782 co.capacity = 2 << 20; // 2MB
783 co.num_shard_bits = 2;
784 co.metadata_charge_policy = kDontChargeCacheMetadata;
785 auto backing_cache = NewLRUCache(co);
786
787 options.min_blob_size = 0;
788 options.create_if_missing = true;
789 options.enable_blob_files = true;
790 options.blob_cache = backing_cache;
791
792 BlockBasedTableOptions block_based_options;
793 block_based_options.no_block_cache = false;
794 block_based_options.block_cache = backing_cache;
795 block_based_options.cache_index_and_filter_blocks = true;
796 options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
797
798 Reopen(options);
799
800 constexpr size_t kNumBlobFiles = 3;
801 constexpr size_t kNumBlobsPerFile = 3;
802 constexpr size_t kNumKeys = kNumBlobsPerFile * kNumBlobFiles;
803
804 std::vector<std::string> key_strs;
805 std::vector<std::string> value_strs;
806 for (size_t i = 0; i < kNumBlobFiles; ++i) {
807 for (size_t j = 0; j < kNumBlobsPerFile; ++j) {
808 std::string key = "key" + std::to_string(i) + "_" + std::to_string(j);
809 std::string value =
810 "value_as_blob" + std::to_string(i) + "_" + std::to_string(j);
811 ASSERT_OK(Put(key, value));
812 key_strs.push_back(key);
813 value_strs.push_back(value);
814 }
815 ASSERT_OK(Flush());
816 }
817 assert(key_strs.size() == kNumKeys);
818 std::array<Slice, kNumKeys> keys;
819 for (size_t i = 0; i < keys.size(); ++i) {
820 keys[i] = key_strs[i];
821 }
822
823 ReadOptions read_options;
824 read_options.read_tier = kReadAllTier;
825 read_options.fill_cache = false;
826
827 {
828 std::array<PinnableSlice, kNumKeys> values;
829 std::array<Status, kNumKeys> statuses;
830 db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0],
831 &values[0], &statuses[0]);
832
833 for (size_t i = 0; i < kNumKeys; ++i) {
834 ASSERT_OK(statuses[i]);
835 ASSERT_EQ(value_strs[i], values[i]);
836 }
837 }
838
839 read_options.read_tier = kBlockCacheTier;
840
841 {
842 std::array<PinnableSlice, kNumKeys> values;
843 std::array<Status, kNumKeys> statuses;
844 db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0],
845 &values[0], &statuses[0]);
846
847 for (size_t i = 0; i < kNumKeys; ++i) {
848 ASSERT_TRUE(statuses[i].IsIncomplete());
849 ASSERT_TRUE(values[i].empty());
850 }
851 }
852
853 read_options.read_tier = kReadAllTier;
854 read_options.fill_cache = true;
855
856 {
857 std::array<PinnableSlice, kNumKeys> values;
858 std::array<Status, kNumKeys> statuses;
859 db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0],
860 &values[0], &statuses[0]);
861
862 for (size_t i = 0; i < kNumKeys; ++i) {
863 ASSERT_OK(statuses[i]);
864 ASSERT_EQ(value_strs[i], values[i]);
865 }
866 }
867
868 read_options.read_tier = kBlockCacheTier;
869
870 {
871 std::array<PinnableSlice, kNumKeys> values;
872 std::array<Status, kNumKeys> statuses;
873 db_->MultiGet(read_options, db_->DefaultColumnFamily(), kNumKeys, &keys[0],
874 &values[0], &statuses[0]);
875
876 for (size_t i = 0; i < kNumKeys; ++i) {
877 ASSERT_OK(statuses[i]);
878 ASSERT_EQ(value_strs[i], values[i]);
879 }
880 }
881 }
882
883 TEST_F(DBBlobBasicTest, GetBlob_CorruptIndex) {
884 Options options = GetDefaultOptions();
885 options.enable_blob_files = true;
886 options.min_blob_size = 0;
887
888 Reopen(options);
889
890 constexpr char key[] = "key";
891 constexpr char blob[] = "blob";
892
893 ASSERT_OK(Put(key, blob));
894 ASSERT_OK(Flush());
895
896 SyncPoint::GetInstance()->SetCallBack(
897 "Version::Get::TamperWithBlobIndex", [](void* arg) {
898 Slice* const blob_index = static_cast<Slice*>(arg);
899 assert(blob_index);
900 assert(!blob_index->empty());
901 blob_index->remove_prefix(1);
902 });
903 SyncPoint::GetInstance()->EnableProcessing();
904
905 PinnableSlice result;
906 ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
907 .IsCorruption());
908
909 SyncPoint::GetInstance()->DisableProcessing();
910 SyncPoint::GetInstance()->ClearAllCallBacks();
911 }
912
913 TEST_F(DBBlobBasicTest, MultiGetBlob_CorruptIndex) {
914 Options options = GetDefaultOptions();
915 options.enable_blob_files = true;
916 options.min_blob_size = 0;
917 options.create_if_missing = true;
918
919 DestroyAndReopen(options);
920
921 constexpr size_t kNumOfKeys = 3;
922 std::array<std::string, kNumOfKeys> key_strs;
923 std::array<std::string, kNumOfKeys> value_strs;
924 std::array<Slice, kNumOfKeys + 1> keys;
925 for (size_t i = 0; i < kNumOfKeys; ++i) {
926 key_strs[i] = "foo" + std::to_string(i);
927 value_strs[i] = "blob_value" + std::to_string(i);
928 ASSERT_OK(Put(key_strs[i], value_strs[i]));
929 keys[i] = key_strs[i];
930 }
931
932 constexpr char key[] = "key";
933 constexpr char blob[] = "blob";
934 ASSERT_OK(Put(key, blob));
935 keys[kNumOfKeys] = key;
936
937 ASSERT_OK(Flush());
938
939 SyncPoint::GetInstance()->SetCallBack(
940 "Version::MultiGet::TamperWithBlobIndex", [&key](void* arg) {
941 KeyContext* const key_context = static_cast<KeyContext*>(arg);
942 assert(key_context);
943 assert(key_context->key);
944
945 if (*(key_context->key) == key) {
946 Slice* const blob_index = key_context->value;
947 assert(blob_index);
948 assert(!blob_index->empty());
949 blob_index->remove_prefix(1);
950 }
951 });
952 SyncPoint::GetInstance()->EnableProcessing();
953
954 std::array<PinnableSlice, kNumOfKeys + 1> values;
955 std::array<Status, kNumOfKeys + 1> statuses;
956 db_->MultiGet(ReadOptions(), dbfull()->DefaultColumnFamily(), kNumOfKeys + 1,
957 keys.data(), values.data(), statuses.data(),
958 /*sorted_input=*/false);
959 for (size_t i = 0; i < kNumOfKeys + 1; ++i) {
960 if (i != kNumOfKeys) {
961 ASSERT_OK(statuses[i]);
962 ASSERT_EQ("blob_value" + std::to_string(i), values[i]);
963 } else {
964 ASSERT_TRUE(statuses[i].IsCorruption());
965 }
966 }
967
968 SyncPoint::GetInstance()->DisableProcessing();
969 SyncPoint::GetInstance()->ClearAllCallBacks();
970 }
971
972 TEST_F(DBBlobBasicTest, MultiGetBlob_ExceedSoftLimit) {
973 Options options = GetDefaultOptions();
974 options.enable_blob_files = true;
975 options.min_blob_size = 0;
976
977 Reopen(options);
978
979 constexpr size_t kNumOfKeys = 3;
980 std::array<std::string, kNumOfKeys> key_bufs;
981 std::array<std::string, kNumOfKeys> value_bufs;
982 std::array<Slice, kNumOfKeys> keys;
983 for (size_t i = 0; i < kNumOfKeys; ++i) {
984 key_bufs[i] = "foo" + std::to_string(i);
985 value_bufs[i] = "blob_value" + std::to_string(i);
986 ASSERT_OK(Put(key_bufs[i], value_bufs[i]));
987 keys[i] = key_bufs[i];
988 }
989 ASSERT_OK(Flush());
990
991 std::array<PinnableSlice, kNumOfKeys> values;
992 std::array<Status, kNumOfKeys> statuses;
993 ReadOptions read_opts;
994 read_opts.value_size_soft_limit = 1;
995 db_->MultiGet(read_opts, dbfull()->DefaultColumnFamily(), kNumOfKeys,
996 keys.data(), values.data(), statuses.data(),
997 /*sorted_input=*/true);
998 for (const auto& s : statuses) {
999 ASSERT_TRUE(s.IsAborted());
1000 }
1001 }
1002
1003 TEST_F(DBBlobBasicTest, GetBlob_InlinedTTLIndex) {
1004 constexpr uint64_t min_blob_size = 10;
1005
1006 Options options = GetDefaultOptions();
1007 options.enable_blob_files = true;
1008 options.min_blob_size = min_blob_size;
1009
1010 Reopen(options);
1011
1012 constexpr char key[] = "key";
1013 constexpr char blob[] = "short";
1014 static_assert(sizeof(short) - 1 < min_blob_size,
1015 "Blob too long to be inlined");
1016
1017 // Fake an inlined TTL blob index.
1018 std::string blob_index;
1019
1020 constexpr uint64_t expiration = 1234567890;
1021
1022 BlobIndex::EncodeInlinedTTL(&blob_index, expiration, blob);
1023
1024 WriteBatch batch;
1025 ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
1026 ASSERT_OK(db_->Write(WriteOptions(), &batch));
1027
1028 ASSERT_OK(Flush());
1029
1030 PinnableSlice result;
1031 ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
1032 .IsCorruption());
1033 }
1034
1035 TEST_F(DBBlobBasicTest, GetBlob_IndexWithInvalidFileNumber) {
1036 Options options = GetDefaultOptions();
1037 options.enable_blob_files = true;
1038 options.min_blob_size = 0;
1039
1040 Reopen(options);
1041
1042 constexpr char key[] = "key";
1043
1044 // Fake a blob index referencing a non-existent blob file.
1045 std::string blob_index;
1046
1047 constexpr uint64_t blob_file_number = 1000;
1048 constexpr uint64_t offset = 1234;
1049 constexpr uint64_t size = 5678;
1050
1051 BlobIndex::EncodeBlob(&blob_index, blob_file_number, offset, size,
1052 kNoCompression);
1053
1054 WriteBatch batch;
1055 ASSERT_OK(WriteBatchInternal::PutBlobIndex(&batch, 0, key, blob_index));
1056 ASSERT_OK(db_->Write(WriteOptions(), &batch));
1057
1058 ASSERT_OK(Flush());
1059
1060 PinnableSlice result;
1061 ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
1062 .IsCorruption());
1063 }
1064
1065 #ifndef ROCKSDB_LITE
1066 TEST_F(DBBlobBasicTest, GenerateIOTracing) {
1067 Options options = GetDefaultOptions();
1068 options.enable_blob_files = true;
1069 options.min_blob_size = 0;
1070 std::string trace_file = dbname_ + "/io_trace_file";
1071
1072 Reopen(options);
1073 {
1074 // Create IO trace file
1075 std::unique_ptr<TraceWriter> trace_writer;
1076 ASSERT_OK(
1077 NewFileTraceWriter(env_, EnvOptions(), trace_file, &trace_writer));
1078 ASSERT_OK(db_->StartIOTrace(TraceOptions(), std::move(trace_writer)));
1079
1080 constexpr char key[] = "key";
1081 constexpr char blob_value[] = "blob_value";
1082
1083 ASSERT_OK(Put(key, blob_value));
1084 ASSERT_OK(Flush());
1085 ASSERT_EQ(Get(key), blob_value);
1086
1087 ASSERT_OK(db_->EndIOTrace());
1088 ASSERT_OK(env_->FileExists(trace_file));
1089 }
1090 {
1091 // Parse trace file to check file operations related to blob files are
1092 // recorded.
1093 std::unique_ptr<TraceReader> trace_reader;
1094 ASSERT_OK(
1095 NewFileTraceReader(env_, EnvOptions(), trace_file, &trace_reader));
1096 IOTraceReader reader(std::move(trace_reader));
1097
1098 IOTraceHeader header;
1099 ASSERT_OK(reader.ReadHeader(&header));
1100 ASSERT_EQ(kMajorVersion, static_cast<int>(header.rocksdb_major_version));
1101 ASSERT_EQ(kMinorVersion, static_cast<int>(header.rocksdb_minor_version));
1102
1103 // Read records.
1104 int blob_files_op_count = 0;
1105 Status status;
1106 while (true) {
1107 IOTraceRecord record;
1108 status = reader.ReadIOOp(&record);
1109 if (!status.ok()) {
1110 break;
1111 }
1112 if (record.file_name.find("blob") != std::string::npos) {
1113 blob_files_op_count++;
1114 }
1115 }
1116 // Assuming blob files will have Append, Close and then Read operations.
1117 ASSERT_GT(blob_files_op_count, 2);
1118 }
1119 }
1120 #endif // !ROCKSDB_LITE
1121
1122 TEST_F(DBBlobBasicTest, BestEffortsRecovery_MissingNewestBlobFile) {
1123 Options options = GetDefaultOptions();
1124 options.enable_blob_files = true;
1125 options.min_blob_size = 0;
1126 options.create_if_missing = true;
1127 Reopen(options);
1128
1129 ASSERT_OK(dbfull()->DisableFileDeletions());
1130 constexpr int kNumTableFiles = 2;
1131 for (int i = 0; i < kNumTableFiles; ++i) {
1132 for (char ch = 'a'; ch != 'c'; ++ch) {
1133 std::string key(1, ch);
1134 ASSERT_OK(Put(key, "value" + std::to_string(i)));
1135 }
1136 ASSERT_OK(Flush());
1137 }
1138
1139 Close();
1140
1141 std::vector<std::string> files;
1142 ASSERT_OK(env_->GetChildren(dbname_, &files));
1143 std::string blob_file_path;
1144 uint64_t max_blob_file_num = kInvalidBlobFileNumber;
1145 for (const auto& fname : files) {
1146 uint64_t file_num = 0;
1147 FileType type;
1148 if (ParseFileName(fname, &file_num, /*info_log_name_prefix=*/"", &type) &&
1149 type == kBlobFile) {
1150 if (file_num > max_blob_file_num) {
1151 max_blob_file_num = file_num;
1152 blob_file_path = dbname_ + "/" + fname;
1153 }
1154 }
1155 }
1156 ASSERT_OK(env_->DeleteFile(blob_file_path));
1157
1158 options.best_efforts_recovery = true;
1159 Reopen(options);
1160 std::string value;
1161 ASSERT_OK(db_->Get(ReadOptions(), "a", &value));
1162 ASSERT_EQ("value" + std::to_string(kNumTableFiles - 2), value);
1163 }
1164
1165 TEST_F(DBBlobBasicTest, GetMergeBlobWithPut) {
1166 Options options = GetDefaultOptions();
1167 options.merge_operator = MergeOperators::CreateStringAppendOperator();
1168 options.enable_blob_files = true;
1169 options.min_blob_size = 0;
1170
1171 Reopen(options);
1172
1173 ASSERT_OK(Put("Key1", "v1"));
1174 ASSERT_OK(Flush());
1175 ASSERT_OK(Merge("Key1", "v2"));
1176 ASSERT_OK(Flush());
1177 ASSERT_OK(Merge("Key1", "v3"));
1178 ASSERT_OK(Flush());
1179
1180 std::string value;
1181 ASSERT_OK(db_->Get(ReadOptions(), "Key1", &value));
1182 ASSERT_EQ(Get("Key1"), "v1,v2,v3");
1183 }
1184
1185 TEST_F(DBBlobBasicTest, MultiGetMergeBlobWithPut) {
1186 constexpr size_t num_keys = 3;
1187
1188 Options options = GetDefaultOptions();
1189 options.merge_operator = MergeOperators::CreateStringAppendOperator();
1190 options.enable_blob_files = true;
1191 options.min_blob_size = 0;
1192
1193 Reopen(options);
1194
1195 ASSERT_OK(Put("Key0", "v0_0"));
1196 ASSERT_OK(Put("Key1", "v1_0"));
1197 ASSERT_OK(Put("Key2", "v2_0"));
1198 ASSERT_OK(Flush());
1199 ASSERT_OK(Merge("Key0", "v0_1"));
1200 ASSERT_OK(Merge("Key1", "v1_1"));
1201 ASSERT_OK(Flush());
1202 ASSERT_OK(Merge("Key0", "v0_2"));
1203 ASSERT_OK(Flush());
1204
1205 std::array<Slice, num_keys> keys{{"Key0", "Key1", "Key2"}};
1206 std::array<PinnableSlice, num_keys> values;
1207 std::array<Status, num_keys> statuses;
1208
1209 db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0],
1210 &values[0], &statuses[0]);
1211
1212 ASSERT_OK(statuses[0]);
1213 ASSERT_EQ(values[0], "v0_0,v0_1,v0_2");
1214
1215 ASSERT_OK(statuses[1]);
1216 ASSERT_EQ(values[1], "v1_0,v1_1");
1217
1218 ASSERT_OK(statuses[2]);
1219 ASSERT_EQ(values[2], "v2_0");
1220 }
1221
1222 #ifndef ROCKSDB_LITE
1223 TEST_F(DBBlobBasicTest, Properties) {
1224 Options options = GetDefaultOptions();
1225 options.enable_blob_files = true;
1226 options.min_blob_size = 0;
1227
1228 Reopen(options);
1229
1230 constexpr char key1[] = "key1";
1231 constexpr size_t key1_size = sizeof(key1) - 1;
1232
1233 constexpr char key2[] = "key2";
1234 constexpr size_t key2_size = sizeof(key2) - 1;
1235
1236 constexpr char key3[] = "key3";
1237 constexpr size_t key3_size = sizeof(key3) - 1;
1238
1239 constexpr char blob[] = "00000000000000";
1240 constexpr size_t blob_size = sizeof(blob) - 1;
1241
1242 constexpr char longer_blob[] = "00000000000000000000";
1243 constexpr size_t longer_blob_size = sizeof(longer_blob) - 1;
1244
1245 ASSERT_OK(Put(key1, blob));
1246 ASSERT_OK(Put(key2, longer_blob));
1247 ASSERT_OK(Flush());
1248
1249 constexpr size_t first_blob_file_expected_size =
1250 BlobLogHeader::kSize +
1251 BlobLogRecord::CalculateAdjustmentForRecordHeader(key1_size) + blob_size +
1252 BlobLogRecord::CalculateAdjustmentForRecordHeader(key2_size) +
1253 longer_blob_size + BlobLogFooter::kSize;
1254
1255 ASSERT_OK(Put(key3, blob));
1256 ASSERT_OK(Flush());
1257
1258 constexpr size_t second_blob_file_expected_size =
1259 BlobLogHeader::kSize +
1260 BlobLogRecord::CalculateAdjustmentForRecordHeader(key3_size) + blob_size +
1261 BlobLogFooter::kSize;
1262
1263 constexpr size_t total_expected_size =
1264 first_blob_file_expected_size + second_blob_file_expected_size;
1265
1266 // Number of blob files
1267 uint64_t num_blob_files = 0;
1268 ASSERT_TRUE(
1269 db_->GetIntProperty(DB::Properties::kNumBlobFiles, &num_blob_files));
1270 ASSERT_EQ(num_blob_files, 2);
1271
1272 // Total size of live blob files
1273 uint64_t live_blob_file_size = 0;
1274 ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kLiveBlobFileSize,
1275 &live_blob_file_size));
1276 ASSERT_EQ(live_blob_file_size, total_expected_size);
1277
1278 // Total amount of garbage in live blob files
1279 {
1280 uint64_t live_blob_file_garbage_size = 0;
1281 ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kLiveBlobFileGarbageSize,
1282 &live_blob_file_garbage_size));
1283 ASSERT_EQ(live_blob_file_garbage_size, 0);
1284 }
1285
1286 // Total size of all blob files across all versions
1287 // Note: this should be the same as above since we only have one
1288 // version at this point.
1289 uint64_t total_blob_file_size = 0;
1290 ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kTotalBlobFileSize,
1291 &total_blob_file_size));
1292 ASSERT_EQ(total_blob_file_size, total_expected_size);
1293
1294 // Delete key2 to create some garbage
1295 ASSERT_OK(Delete(key2));
1296 ASSERT_OK(Flush());
1297
1298 constexpr Slice* begin = nullptr;
1299 constexpr Slice* end = nullptr;
1300 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
1301
1302 constexpr size_t expected_garbage_size =
1303 BlobLogRecord::CalculateAdjustmentForRecordHeader(key2_size) +
1304 longer_blob_size;
1305
1306 constexpr double expected_space_amp =
1307 static_cast<double>(total_expected_size) /
1308 (total_expected_size - expected_garbage_size);
1309
1310 // Blob file stats
1311 std::string blob_stats;
1312 ASSERT_TRUE(db_->GetProperty(DB::Properties::kBlobStats, &blob_stats));
1313
1314 std::ostringstream oss;
1315 oss << "Number of blob files: 2\nTotal size of blob files: "
1316 << total_expected_size
1317 << "\nTotal size of garbage in blob files: " << expected_garbage_size
1318 << "\nBlob file space amplification: " << expected_space_amp << '\n';
1319
1320 ASSERT_EQ(blob_stats, oss.str());
1321
1322 // Total amount of garbage in live blob files
1323 {
1324 uint64_t live_blob_file_garbage_size = 0;
1325 ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kLiveBlobFileGarbageSize,
1326 &live_blob_file_garbage_size));
1327 ASSERT_EQ(live_blob_file_garbage_size, expected_garbage_size);
1328 }
1329 }
1330
1331 TEST_F(DBBlobBasicTest, PropertiesMultiVersion) {
1332 Options options = GetDefaultOptions();
1333 options.enable_blob_files = true;
1334 options.min_blob_size = 0;
1335
1336 Reopen(options);
1337
1338 constexpr char key1[] = "key1";
1339 constexpr char key2[] = "key2";
1340 constexpr char key3[] = "key3";
1341
1342 constexpr size_t key_size = sizeof(key1) - 1;
1343 static_assert(sizeof(key2) - 1 == key_size, "unexpected size: key2");
1344 static_assert(sizeof(key3) - 1 == key_size, "unexpected size: key3");
1345
1346 constexpr char blob[] = "0000000000";
1347 constexpr size_t blob_size = sizeof(blob) - 1;
1348
1349 ASSERT_OK(Put(key1, blob));
1350 ASSERT_OK(Flush());
1351
1352 ASSERT_OK(Put(key2, blob));
1353 ASSERT_OK(Flush());
1354
1355 // Create an iterator to keep the current version alive
1356 std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
1357 ASSERT_OK(iter->status());
1358
1359 // Note: the Delete and subsequent compaction results in the first blob file
1360 // not making it to the final version. (It is still part of the previous
1361 // version kept alive by the iterator though.) On the other hand, the Put
1362 // results in a third blob file.
1363 ASSERT_OK(Delete(key1));
1364 ASSERT_OK(Put(key3, blob));
1365 ASSERT_OK(Flush());
1366
1367 constexpr Slice* begin = nullptr;
1368 constexpr Slice* end = nullptr;
1369 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), begin, end));
1370
1371 // Total size of all blob files across all versions: between the two versions,
1372 // we should have three blob files of the same size with one blob each.
1373 // The version kept alive by the iterator contains the first and the second
1374 // blob file, while the final version contains the second and the third blob
1375 // file. (The second blob file is thus shared by the two versions but should
1376 // be counted only once.)
1377 uint64_t total_blob_file_size = 0;
1378 ASSERT_TRUE(db_->GetIntProperty(DB::Properties::kTotalBlobFileSize,
1379 &total_blob_file_size));
1380 ASSERT_EQ(total_blob_file_size,
1381 3 * (BlobLogHeader::kSize +
1382 BlobLogRecord::CalculateAdjustmentForRecordHeader(key_size) +
1383 blob_size + BlobLogFooter::kSize));
1384 }
1385 #endif // !ROCKSDB_LITE
1386
1387 class DBBlobBasicIOErrorTest : public DBBlobBasicTest,
1388 public testing::WithParamInterface<std::string> {
1389 protected:
1390 DBBlobBasicIOErrorTest() : sync_point_(GetParam()) {
1391 fault_injection_env_.reset(new FaultInjectionTestEnv(env_));
1392 }
1393 ~DBBlobBasicIOErrorTest() { Close(); }
1394
1395 std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
1396 std::string sync_point_;
1397 };
1398
1399 class DBBlobBasicIOErrorMultiGetTest : public DBBlobBasicIOErrorTest {
1400 public:
1401 DBBlobBasicIOErrorMultiGetTest() : DBBlobBasicIOErrorTest() {}
1402 };
1403
1404 INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorTest,
1405 ::testing::ValuesIn(std::vector<std::string>{
1406 "BlobFileReader::OpenFile:NewRandomAccessFile",
1407 "BlobFileReader::GetBlob:ReadFromFile"}));
1408
1409 INSTANTIATE_TEST_CASE_P(DBBlobBasicTest, DBBlobBasicIOErrorMultiGetTest,
1410 ::testing::ValuesIn(std::vector<std::string>{
1411 "BlobFileReader::OpenFile:NewRandomAccessFile",
1412 "BlobFileReader::MultiGetBlob:ReadFromFile"}));
1413
1414 TEST_P(DBBlobBasicIOErrorTest, GetBlob_IOError) {
1415 Options options;
1416 options.env = fault_injection_env_.get();
1417 options.enable_blob_files = true;
1418 options.min_blob_size = 0;
1419
1420 Reopen(options);
1421
1422 constexpr char key[] = "key";
1423 constexpr char blob_value[] = "blob_value";
1424
1425 ASSERT_OK(Put(key, blob_value));
1426
1427 ASSERT_OK(Flush());
1428
1429 SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
1430 fault_injection_env_->SetFilesystemActive(false,
1431 Status::IOError(sync_point_));
1432 });
1433 SyncPoint::GetInstance()->EnableProcessing();
1434
1435 PinnableSlice result;
1436 ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result)
1437 .IsIOError());
1438
1439 SyncPoint::GetInstance()->DisableProcessing();
1440 SyncPoint::GetInstance()->ClearAllCallBacks();
1441 }
1442
1443 TEST_P(DBBlobBasicIOErrorMultiGetTest, MultiGetBlobs_IOError) {
1444 Options options = GetDefaultOptions();
1445 options.env = fault_injection_env_.get();
1446 options.enable_blob_files = true;
1447 options.min_blob_size = 0;
1448
1449 Reopen(options);
1450
1451 constexpr size_t num_keys = 2;
1452
1453 constexpr char first_key[] = "first_key";
1454 constexpr char first_value[] = "first_value";
1455
1456 ASSERT_OK(Put(first_key, first_value));
1457
1458 constexpr char second_key[] = "second_key";
1459 constexpr char second_value[] = "second_value";
1460
1461 ASSERT_OK(Put(second_key, second_value));
1462
1463 ASSERT_OK(Flush());
1464
1465 std::array<Slice, num_keys> keys{{first_key, second_key}};
1466 std::array<PinnableSlice, num_keys> values;
1467 std::array<Status, num_keys> statuses;
1468
1469 SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
1470 fault_injection_env_->SetFilesystemActive(false,
1471 Status::IOError(sync_point_));
1472 });
1473 SyncPoint::GetInstance()->EnableProcessing();
1474
1475 db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys, &keys[0],
1476 &values[0], &statuses[0]);
1477
1478 SyncPoint::GetInstance()->DisableProcessing();
1479 SyncPoint::GetInstance()->ClearAllCallBacks();
1480
1481 ASSERT_TRUE(statuses[0].IsIOError());
1482 ASSERT_TRUE(statuses[1].IsIOError());
1483 }
1484
1485 TEST_P(DBBlobBasicIOErrorMultiGetTest, MultipleBlobFiles) {
1486 Options options = GetDefaultOptions();
1487 options.env = fault_injection_env_.get();
1488 options.enable_blob_files = true;
1489 options.min_blob_size = 0;
1490
1491 Reopen(options);
1492
1493 constexpr size_t num_keys = 2;
1494
1495 constexpr char key1[] = "key1";
1496 constexpr char value1[] = "blob1";
1497
1498 ASSERT_OK(Put(key1, value1));
1499 ASSERT_OK(Flush());
1500
1501 constexpr char key2[] = "key2";
1502 constexpr char value2[] = "blob2";
1503
1504 ASSERT_OK(Put(key2, value2));
1505 ASSERT_OK(Flush());
1506
1507 std::array<Slice, num_keys> keys{{key1, key2}};
1508 std::array<PinnableSlice, num_keys> values;
1509 std::array<Status, num_keys> statuses;
1510
1511 bool first_blob_file = true;
1512 SyncPoint::GetInstance()->SetCallBack(
1513 sync_point_, [&first_blob_file, this](void* /* arg */) {
1514 if (first_blob_file) {
1515 first_blob_file = false;
1516 return;
1517 }
1518 fault_injection_env_->SetFilesystemActive(false,
1519 Status::IOError(sync_point_));
1520 });
1521 SyncPoint::GetInstance()->EnableProcessing();
1522
1523 db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
1524 keys.data(), values.data(), statuses.data());
1525 SyncPoint::GetInstance()->DisableProcessing();
1526 SyncPoint::GetInstance()->ClearAllCallBacks();
1527 ASSERT_OK(statuses[0]);
1528 ASSERT_EQ(value1, values[0]);
1529 ASSERT_TRUE(statuses[1].IsIOError());
1530 }
1531
1532 namespace {
1533
1534 class ReadBlobCompactionFilter : public CompactionFilter {
1535 public:
1536 ReadBlobCompactionFilter() = default;
1537 const char* Name() const override {
1538 return "rocksdb.compaction.filter.read.blob";
1539 }
1540 CompactionFilter::Decision FilterV2(
1541 int /*level*/, const Slice& /*key*/, ValueType value_type,
1542 const Slice& existing_value, std::string* new_value,
1543 std::string* /*skip_until*/) const override {
1544 if (value_type != CompactionFilter::ValueType::kValue) {
1545 return CompactionFilter::Decision::kKeep;
1546 }
1547 assert(new_value);
1548 new_value->assign(existing_value.data(), existing_value.size());
1549 return CompactionFilter::Decision::kChangeValue;
1550 }
1551 };
1552
1553 } // anonymous namespace
1554
1555 TEST_P(DBBlobBasicIOErrorTest, CompactionFilterReadBlob_IOError) {
1556 Options options = GetDefaultOptions();
1557 options.env = fault_injection_env_.get();
1558 options.enable_blob_files = true;
1559 options.min_blob_size = 0;
1560 options.create_if_missing = true;
1561 std::unique_ptr<CompactionFilter> compaction_filter_guard(
1562 new ReadBlobCompactionFilter);
1563 options.compaction_filter = compaction_filter_guard.get();
1564
1565 DestroyAndReopen(options);
1566 constexpr char key[] = "foo";
1567 constexpr char blob_value[] = "foo_blob_value";
1568 ASSERT_OK(Put(key, blob_value));
1569 ASSERT_OK(Flush());
1570
1571 SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* /* arg */) {
1572 fault_injection_env_->SetFilesystemActive(false,
1573 Status::IOError(sync_point_));
1574 });
1575 SyncPoint::GetInstance()->EnableProcessing();
1576
1577 ASSERT_TRUE(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
1578 /*end=*/nullptr)
1579 .IsIOError());
1580
1581 SyncPoint::GetInstance()->DisableProcessing();
1582 SyncPoint::GetInstance()->ClearAllCallBacks();
1583 }
1584
1585 TEST_F(DBBlobBasicTest, WarmCacheWithBlobsDuringFlush) {
1586 Options options = GetDefaultOptions();
1587
1588 LRUCacheOptions co;
1589 co.capacity = 1 << 25;
1590 co.num_shard_bits = 2;
1591 co.metadata_charge_policy = kDontChargeCacheMetadata;
1592 auto backing_cache = NewLRUCache(co);
1593
1594 options.blob_cache = backing_cache;
1595
1596 BlockBasedTableOptions block_based_options;
1597 block_based_options.no_block_cache = false;
1598 block_based_options.block_cache = backing_cache;
1599 block_based_options.cache_index_and_filter_blocks = true;
1600 options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
1601
1602 options.enable_blob_files = true;
1603 options.create_if_missing = true;
1604 options.disable_auto_compactions = true;
1605 options.enable_blob_garbage_collection = true;
1606 options.blob_garbage_collection_age_cutoff = 1.0;
1607 options.prepopulate_blob_cache = PrepopulateBlobCache::kFlushOnly;
1608 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1609
1610 DestroyAndReopen(options);
1611
1612 constexpr size_t kNumBlobs = 10;
1613 constexpr size_t kValueSize = 100;
1614
1615 std::string value(kValueSize, 'a');
1616
1617 for (size_t i = 1; i <= kNumBlobs; i++) {
1618 ASSERT_OK(Put(std::to_string(i), value));
1619 ASSERT_OK(Put(std::to_string(i + kNumBlobs), value)); // Add some overlap
1620 ASSERT_OK(Flush());
1621 ASSERT_EQ(i * 2, options.statistics->getTickerCount(BLOB_DB_CACHE_ADD));
1622 ASSERT_EQ(value, Get(std::to_string(i)));
1623 ASSERT_EQ(value, Get(std::to_string(i + kNumBlobs)));
1624 ASSERT_EQ(0, options.statistics->getTickerCount(BLOB_DB_CACHE_MISS));
1625 ASSERT_EQ(i * 2, options.statistics->getTickerCount(BLOB_DB_CACHE_HIT));
1626 }
1627
1628 // Verify compaction not counted
1629 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
1630 /*end=*/nullptr));
1631 EXPECT_EQ(kNumBlobs * 2,
1632 options.statistics->getTickerCount(BLOB_DB_CACHE_ADD));
1633 }
1634
1635 #ifndef ROCKSDB_LITE
1636 TEST_F(DBBlobBasicTest, DynamicallyWarmCacheDuringFlush) {
1637 Options options = GetDefaultOptions();
1638
1639 LRUCacheOptions co;
1640 co.capacity = 1 << 25;
1641 co.num_shard_bits = 2;
1642 co.metadata_charge_policy = kDontChargeCacheMetadata;
1643 auto backing_cache = NewLRUCache(co);
1644
1645 options.blob_cache = backing_cache;
1646
1647 BlockBasedTableOptions block_based_options;
1648 block_based_options.no_block_cache = false;
1649 block_based_options.block_cache = backing_cache;
1650 block_based_options.cache_index_and_filter_blocks = true;
1651 options.table_factory.reset(NewBlockBasedTableFactory(block_based_options));
1652
1653 options.enable_blob_files = true;
1654 options.create_if_missing = true;
1655 options.disable_auto_compactions = true;
1656 options.enable_blob_garbage_collection = true;
1657 options.blob_garbage_collection_age_cutoff = 1.0;
1658 options.prepopulate_blob_cache = PrepopulateBlobCache::kFlushOnly;
1659 options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
1660
1661 DestroyAndReopen(options);
1662
1663 constexpr size_t kNumBlobs = 10;
1664 constexpr size_t kValueSize = 100;
1665
1666 std::string value(kValueSize, 'a');
1667
1668 for (size_t i = 1; i <= 5; i++) {
1669 ASSERT_OK(Put(std::to_string(i), value));
1670 ASSERT_OK(Put(std::to_string(i + kNumBlobs), value)); // Add some overlap
1671 ASSERT_OK(Flush());
1672 ASSERT_EQ(2, options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD));
1673
1674 ASSERT_EQ(value, Get(std::to_string(i)));
1675 ASSERT_EQ(value, Get(std::to_string(i + kNumBlobs)));
1676 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD));
1677 ASSERT_EQ(0,
1678 options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS));
1679 ASSERT_EQ(2, options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT));
1680 }
1681
1682 ASSERT_OK(dbfull()->SetOptions({{"prepopulate_blob_cache", "kDisable"}}));
1683
1684 for (size_t i = 6; i <= kNumBlobs; i++) {
1685 ASSERT_OK(Put(std::to_string(i), value));
1686 ASSERT_OK(Put(std::to_string(i + kNumBlobs), value)); // Add some overlap
1687 ASSERT_OK(Flush());
1688 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD));
1689
1690 ASSERT_EQ(value, Get(std::to_string(i)));
1691 ASSERT_EQ(value, Get(std::to_string(i + kNumBlobs)));
1692 ASSERT_EQ(2, options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD));
1693 ASSERT_EQ(2,
1694 options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS));
1695 ASSERT_EQ(0, options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT));
1696 }
1697
1698 // Verify compaction not counted
1699 ASSERT_OK(db_->CompactRange(CompactRangeOptions(), /*begin=*/nullptr,
1700 /*end=*/nullptr));
1701 EXPECT_EQ(0, options.statistics->getTickerCount(BLOB_DB_CACHE_ADD));
1702 }
1703 #endif // !ROCKSDB_LITE
1704
1705 TEST_F(DBBlobBasicTest, WarmCacheWithBlobsSecondary) {
1706 CompressedSecondaryCacheOptions secondary_cache_opts;
1707 secondary_cache_opts.capacity = 1 << 20;
1708 secondary_cache_opts.num_shard_bits = 0;
1709 secondary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
1710 secondary_cache_opts.compression_type = kNoCompression;
1711
1712 LRUCacheOptions primary_cache_opts;
1713 primary_cache_opts.capacity = 1024;
1714 primary_cache_opts.num_shard_bits = 0;
1715 primary_cache_opts.metadata_charge_policy = kDontChargeCacheMetadata;
1716 primary_cache_opts.secondary_cache =
1717 NewCompressedSecondaryCache(secondary_cache_opts);
1718
1719 Options options = GetDefaultOptions();
1720 options.create_if_missing = true;
1721 options.statistics = CreateDBStatistics();
1722 options.enable_blob_files = true;
1723 options.blob_cache = NewLRUCache(primary_cache_opts);
1724 options.prepopulate_blob_cache = PrepopulateBlobCache::kFlushOnly;
1725
1726 DestroyAndReopen(options);
1727
1728 // Note: only one of the two blobs fit in the primary cache at any given time.
1729 constexpr char first_key[] = "foo";
1730 constexpr size_t first_blob_size = 512;
1731 const std::string first_blob(first_blob_size, 'a');
1732
1733 constexpr char second_key[] = "bar";
1734 constexpr size_t second_blob_size = 768;
1735 const std::string second_blob(second_blob_size, 'b');
1736
1737 // First blob is inserted into primary cache during flush.
1738 ASSERT_OK(Put(first_key, first_blob));
1739 ASSERT_OK(Flush());
1740 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD), 1);
1741
1742 // Second blob is inserted into primary cache during flush,
1743 // First blob is evicted but only a dummy handle is inserted into secondary
1744 // cache.
1745 ASSERT_OK(Put(second_key, second_blob));
1746 ASSERT_OK(Flush());
1747 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_ADD), 1);
1748
1749 // First blob is inserted into primary cache.
1750 // Second blob is evicted but only a dummy handle is inserted into secondary
1751 // cache.
1752 ASSERT_EQ(Get(first_key), first_blob);
1753 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS), 1);
1754 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT), 0);
1755 ASSERT_EQ(options.statistics->getAndResetTickerCount(SECONDARY_CACHE_HITS),
1756 0);
1757 // Second blob is inserted into primary cache,
1758 // First blob is evicted and is inserted into secondary cache.
1759 ASSERT_EQ(Get(second_key), second_blob);
1760 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS), 1);
1761 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT), 0);
1762 ASSERT_EQ(options.statistics->getAndResetTickerCount(SECONDARY_CACHE_HITS),
1763 0);
1764
1765 // First blob's dummy item is inserted into primary cache b/c of lookup.
1766 // Second blob is still in primary cache.
1767 ASSERT_EQ(Get(first_key), first_blob);
1768 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS), 0);
1769 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT), 1);
1770 ASSERT_EQ(options.statistics->getAndResetTickerCount(SECONDARY_CACHE_HITS),
1771 1);
1772
1773 // First blob's item is inserted into primary cache b/c of lookup.
1774 // Second blob is evicted and inserted into secondary cache.
1775 ASSERT_EQ(Get(first_key), first_blob);
1776 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_MISS), 0);
1777 ASSERT_EQ(options.statistics->getAndResetTickerCount(BLOB_DB_CACHE_HIT), 1);
1778 ASSERT_EQ(options.statistics->getAndResetTickerCount(SECONDARY_CACHE_HITS),
1779 1);
1780 }
1781
1782 } // namespace ROCKSDB_NAMESPACE
1783
1784 int main(int argc, char** argv) {
1785 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1786 ::testing::InitGoogleTest(&argc, argv);
1787 RegisterCustomObjects(argc, argv);
1788 return RUN_ALL_TESTS();
1789 }