]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/blob/blob_source_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / blob / blob_source_test.cc
CommitLineData
1e59de90
TL
1// Copyright (c) Meta Platforms, Inc. and affiliates.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#include "db/blob/blob_source.h"
7
8#include <cassert>
9#include <cstdint>
10#include <cstdio>
11#include <memory>
12#include <string>
13
14#include "cache/charged_cache.h"
15#include "cache/compressed_secondary_cache.h"
16#include "db/blob/blob_contents.h"
17#include "db/blob/blob_file_cache.h"
18#include "db/blob/blob_file_reader.h"
19#include "db/blob/blob_log_format.h"
20#include "db/blob/blob_log_writer.h"
21#include "db/db_test_util.h"
22#include "file/filename.h"
23#include "file/read_write_util.h"
24#include "options/cf_options.h"
25#include "rocksdb/options.h"
26#include "util/compression.h"
27#include "util/random.h"
28
29namespace ROCKSDB_NAMESPACE {
30
31namespace {
32
33// Creates a test blob file with `num` blobs in it.
34void WriteBlobFile(const ImmutableOptions& immutable_options,
35 uint32_t column_family_id, bool has_ttl,
36 const ExpirationRange& expiration_range_header,
37 const ExpirationRange& expiration_range_footer,
38 uint64_t blob_file_number, const std::vector<Slice>& keys,
39 const std::vector<Slice>& blobs, CompressionType compression,
40 std::vector<uint64_t>& blob_offsets,
41 std::vector<uint64_t>& blob_sizes) {
42 assert(!immutable_options.cf_paths.empty());
43 size_t num = keys.size();
44 assert(num == blobs.size());
45 assert(num == blob_offsets.size());
46 assert(num == blob_sizes.size());
47
48 const std::string blob_file_path =
49 BlobFileName(immutable_options.cf_paths.front().path, blob_file_number);
50 std::unique_ptr<FSWritableFile> file;
51 ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file,
52 FileOptions()));
53
54 std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
55 std::move(file), blob_file_path, FileOptions(), immutable_options.clock));
56
57 constexpr Statistics* statistics = nullptr;
58 constexpr bool use_fsync = false;
59 constexpr bool do_flush = false;
60
61 BlobLogWriter blob_log_writer(std::move(file_writer), immutable_options.clock,
62 statistics, blob_file_number, use_fsync,
63 do_flush);
64
65 BlobLogHeader header(column_family_id, compression, has_ttl,
66 expiration_range_header);
67
68 ASSERT_OK(blob_log_writer.WriteHeader(header));
69
70 std::vector<std::string> compressed_blobs(num);
71 std::vector<Slice> blobs_to_write(num);
72 if (kNoCompression == compression) {
73 for (size_t i = 0; i < num; ++i) {
74 blobs_to_write[i] = blobs[i];
75 blob_sizes[i] = blobs[i].size();
76 }
77 } else {
78 CompressionOptions opts;
79 CompressionContext context(compression);
80 constexpr uint64_t sample_for_compression = 0;
81 CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(),
82 compression, sample_for_compression);
83
84 constexpr uint32_t compression_format_version = 2;
85
86 for (size_t i = 0; i < num; ++i) {
87 ASSERT_TRUE(CompressData(blobs[i], info, compression_format_version,
88 &compressed_blobs[i]));
89 blobs_to_write[i] = compressed_blobs[i];
90 blob_sizes[i] = compressed_blobs[i].size();
91 }
92 }
93
94 for (size_t i = 0; i < num; ++i) {
95 uint64_t key_offset = 0;
96 ASSERT_OK(blob_log_writer.AddRecord(keys[i], blobs_to_write[i], &key_offset,
97 &blob_offsets[i]));
98 }
99
100 BlobLogFooter footer;
101 footer.blob_count = num;
102 footer.expiration_range = expiration_range_footer;
103
104 std::string checksum_method;
105 std::string checksum_value;
106 ASSERT_OK(
107 blob_log_writer.AppendFooter(footer, &checksum_method, &checksum_value));
108}
109
110} // anonymous namespace
111
112class BlobSourceTest : public DBTestBase {
113 protected:
114 public:
115 explicit BlobSourceTest()
116 : DBTestBase("blob_source_test", /*env_do_fsync=*/true) {
117 options_.env = env_;
118 options_.enable_blob_files = true;
119 options_.create_if_missing = true;
120
121 LRUCacheOptions co;
122 co.capacity = 8 << 20;
123 co.num_shard_bits = 2;
124 co.metadata_charge_policy = kDontChargeCacheMetadata;
125 co.high_pri_pool_ratio = 0.2;
126 co.low_pri_pool_ratio = 0.2;
127 options_.blob_cache = NewLRUCache(co);
128 options_.lowest_used_cache_tier = CacheTier::kVolatileTier;
129
130 assert(db_->GetDbIdentity(db_id_).ok());
131 assert(db_->GetDbSessionId(db_session_id_).ok());
132 }
133
134 Options options_;
135 std::string db_id_;
136 std::string db_session_id_;
137};
138
139TEST_F(BlobSourceTest, GetBlobsFromCache) {
140 options_.cf_paths.emplace_back(
141 test::PerThreadDBPath(env_, "BlobSourceTest_GetBlobsFromCache"), 0);
142
143 options_.statistics = CreateDBStatistics();
144 Statistics* statistics = options_.statistics.get();
145 assert(statistics);
146
147 DestroyAndReopen(options_);
148
149 ImmutableOptions immutable_options(options_);
150
151 constexpr uint32_t column_family_id = 1;
152 constexpr bool has_ttl = false;
153 constexpr ExpirationRange expiration_range;
154 constexpr uint64_t blob_file_number = 1;
155 constexpr size_t num_blobs = 16;
156
157 std::vector<std::string> key_strs;
158 std::vector<std::string> blob_strs;
159
160 for (size_t i = 0; i < num_blobs; ++i) {
161 key_strs.push_back("key" + std::to_string(i));
162 blob_strs.push_back("blob" + std::to_string(i));
163 }
164
165 std::vector<Slice> keys;
166 std::vector<Slice> blobs;
167
168 uint64_t file_size = BlobLogHeader::kSize;
169 for (size_t i = 0; i < num_blobs; ++i) {
170 keys.push_back({key_strs[i]});
171 blobs.push_back({blob_strs[i]});
172 file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size();
173 }
174 file_size += BlobLogFooter::kSize;
175
176 std::vector<uint64_t> blob_offsets(keys.size());
177 std::vector<uint64_t> blob_sizes(keys.size());
178
179 WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
180 expiration_range, blob_file_number, keys, blobs, kNoCompression,
181 blob_offsets, blob_sizes);
182
183 constexpr size_t capacity = 1024;
184 std::shared_ptr<Cache> backing_cache =
185 NewLRUCache(capacity); // Blob file cache
186
187 FileOptions file_options;
188 constexpr HistogramImpl* blob_file_read_hist = nullptr;
189
190 std::unique_ptr<BlobFileCache> blob_file_cache =
191 std::make_unique<BlobFileCache>(
192 backing_cache.get(), &immutable_options, &file_options,
193 column_family_id, blob_file_read_hist, nullptr /*IOTracer*/);
194
195 BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
196 blob_file_cache.get());
197
198 ReadOptions read_options;
199 read_options.verify_checksums = true;
200
201 constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
202
203 {
204 // GetBlob
205 std::vector<PinnableSlice> values(keys.size());
206 uint64_t bytes_read = 0;
207 uint64_t blob_bytes = 0;
208 uint64_t total_bytes = 0;
209
210 read_options.fill_cache = false;
211 get_perf_context()->Reset();
212
213 for (size_t i = 0; i < num_blobs; ++i) {
214 ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
215 blob_offsets[i]));
216
217 ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number,
218 blob_offsets[i], file_size, blob_sizes[i],
219 kNoCompression, prefetch_buffer, &values[i],
220 &bytes_read));
221 ASSERT_EQ(values[i], blobs[i]);
222 ASSERT_TRUE(values[i].IsPinned());
223 ASSERT_EQ(bytes_read,
224 BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
225
226 ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
227 blob_offsets[i]));
228 total_bytes += bytes_read;
229 }
230
231 // Retrieved the blob cache num_blobs * 3 times via TEST_BlobInCache,
232 // GetBlob, and TEST_BlobInCache.
233 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, 0);
234 ASSERT_EQ((int)get_perf_context()->blob_read_count, num_blobs);
235 ASSERT_EQ((int)get_perf_context()->blob_read_byte, total_bytes);
236 ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0);
237 ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0);
238
239 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 3);
240 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), 0);
241 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0);
242 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), 0);
243 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0);
244
245 read_options.fill_cache = true;
246 blob_bytes = 0;
247 total_bytes = 0;
248 get_perf_context()->Reset();
249 statistics->Reset().PermitUncheckedError();
250
251 for (size_t i = 0; i < num_blobs; ++i) {
252 ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
253 blob_offsets[i]));
254
255 ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number,
256 blob_offsets[i], file_size, blob_sizes[i],
257 kNoCompression, prefetch_buffer, &values[i],
258 &bytes_read));
259 ASSERT_EQ(values[i], blobs[i]);
260 ASSERT_TRUE(values[i].IsPinned());
261 ASSERT_EQ(bytes_read,
262 BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
263
264 blob_bytes += blob_sizes[i];
265 total_bytes += bytes_read;
266 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, i);
267 ASSERT_EQ((int)get_perf_context()->blob_read_count, i + 1);
268 ASSERT_EQ((int)get_perf_context()->blob_read_byte, total_bytes);
269
270 ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
271 blob_offsets[i]));
272
273 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, i + 1);
274 ASSERT_EQ((int)get_perf_context()->blob_read_count, i + 1);
275 ASSERT_EQ((int)get_perf_context()->blob_read_byte, total_bytes);
276 }
277
278 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, num_blobs);
279 ASSERT_EQ((int)get_perf_context()->blob_read_count, num_blobs);
280 ASSERT_EQ((int)get_perf_context()->blob_read_byte, total_bytes);
281
282 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 2);
283 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), num_blobs);
284 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), num_blobs);
285 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), blob_bytes);
286 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE),
287 blob_bytes);
288
289 read_options.fill_cache = true;
290 total_bytes = 0;
291 blob_bytes = 0;
292 get_perf_context()->Reset();
293 statistics->Reset().PermitUncheckedError();
294
295 for (size_t i = 0; i < num_blobs; ++i) {
296 ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
297 blob_offsets[i]));
298
299 ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number,
300 blob_offsets[i], file_size, blob_sizes[i],
301 kNoCompression, prefetch_buffer, &values[i],
302 &bytes_read));
303 ASSERT_EQ(values[i], blobs[i]);
304 ASSERT_TRUE(values[i].IsPinned());
305 ASSERT_EQ(bytes_read,
306 BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
307
308 ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
309 blob_offsets[i]));
310 total_bytes += bytes_read; // on-disk blob record size
311 blob_bytes += blob_sizes[i]; // cached blob value size
312 }
313
314 // Retrieved the blob cache num_blobs * 3 times via TEST_BlobInCache,
315 // GetBlob, and TEST_BlobInCache.
316 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, num_blobs * 3);
317 ASSERT_EQ((int)get_perf_context()->blob_read_count, 0); // without i/o
318 ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0); // without i/o
319
320 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), 0);
321 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), num_blobs * 3);
322 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0);
323 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ),
324 blob_bytes * 3);
325 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0);
326
327 // Cache-only GetBlob
328 read_options.read_tier = ReadTier::kBlockCacheTier;
329 total_bytes = 0;
330 blob_bytes = 0;
331 get_perf_context()->Reset();
332 statistics->Reset().PermitUncheckedError();
333
334 for (size_t i = 0; i < num_blobs; ++i) {
335 ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
336 blob_offsets[i]));
337
338 ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number,
339 blob_offsets[i], file_size, blob_sizes[i],
340 kNoCompression, prefetch_buffer, &values[i],
341 &bytes_read));
342 ASSERT_EQ(values[i], blobs[i]);
343 ASSERT_TRUE(values[i].IsPinned());
344 ASSERT_EQ(bytes_read,
345 BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
346
347 ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
348 blob_offsets[i]));
349 total_bytes += bytes_read;
350 blob_bytes += blob_sizes[i];
351 }
352
353 // Retrieved the blob cache num_blobs * 3 times via TEST_BlobInCache,
354 // GetBlob, and TEST_BlobInCache.
355 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, num_blobs * 3);
356 ASSERT_EQ((int)get_perf_context()->blob_read_count, 0); // without i/o
357 ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0); // without i/o
358
359 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), 0);
360 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), num_blobs * 3);
361 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0);
362 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ),
363 blob_bytes * 3);
364 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0);
365 }
366
367 options_.blob_cache->EraseUnRefEntries();
368
369 {
370 // Cache-only GetBlob
371 std::vector<PinnableSlice> values(keys.size());
372 uint64_t bytes_read = 0;
373
374 read_options.read_tier = ReadTier::kBlockCacheTier;
375 read_options.fill_cache = true;
376 get_perf_context()->Reset();
377 statistics->Reset().PermitUncheckedError();
378
379 for (size_t i = 0; i < num_blobs; ++i) {
380 ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
381 blob_offsets[i]));
382
383 ASSERT_TRUE(blob_source
384 .GetBlob(read_options, keys[i], blob_file_number,
385 blob_offsets[i], file_size, blob_sizes[i],
386 kNoCompression, prefetch_buffer, &values[i],
387 &bytes_read)
388 .IsIncomplete());
389 ASSERT_TRUE(values[i].empty());
390 ASSERT_FALSE(values[i].IsPinned());
391 ASSERT_EQ(bytes_read, 0);
392
393 ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
394 blob_offsets[i]));
395 }
396
397 // Retrieved the blob cache num_blobs * 3 times via TEST_BlobInCache,
398 // GetBlob, and TEST_BlobInCache.
399 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, 0);
400 ASSERT_EQ((int)get_perf_context()->blob_read_count, 0);
401 ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0);
402
403 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 3);
404 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), 0);
405 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0);
406 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), 0);
407 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0);
408 }
409
410 {
411 // GetBlob from non-existing file
412 std::vector<PinnableSlice> values(keys.size());
413 uint64_t bytes_read = 0;
414 uint64_t file_number = 100; // non-existing file
415
416 read_options.read_tier = ReadTier::kReadAllTier;
417 read_options.fill_cache = true;
418 get_perf_context()->Reset();
419 statistics->Reset().PermitUncheckedError();
420
421 for (size_t i = 0; i < num_blobs; ++i) {
422 ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size,
423 blob_offsets[i]));
424
425 ASSERT_TRUE(blob_source
426 .GetBlob(read_options, keys[i], file_number,
427 blob_offsets[i], file_size, blob_sizes[i],
428 kNoCompression, prefetch_buffer, &values[i],
429 &bytes_read)
430 .IsIOError());
431 ASSERT_TRUE(values[i].empty());
432 ASSERT_FALSE(values[i].IsPinned());
433 ASSERT_EQ(bytes_read, 0);
434
435 ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size,
436 blob_offsets[i]));
437 }
438
439 // Retrieved the blob cache num_blobs * 3 times via TEST_BlobInCache,
440 // GetBlob, and TEST_BlobInCache.
441 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, 0);
442 ASSERT_EQ((int)get_perf_context()->blob_read_count, 0);
443 ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0);
444
445 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 3);
446 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), 0);
447 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0);
448 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), 0);
449 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0);
450 }
451}
452
453TEST_F(BlobSourceTest, GetCompressedBlobs) {
454 if (!Snappy_Supported()) {
455 return;
456 }
457
458 const CompressionType compression = kSnappyCompression;
459
460 options_.cf_paths.emplace_back(
461 test::PerThreadDBPath(env_, "BlobSourceTest_GetCompressedBlobs"), 0);
462
463 DestroyAndReopen(options_);
464
465 ImmutableOptions immutable_options(options_);
466
467 constexpr uint32_t column_family_id = 1;
468 constexpr bool has_ttl = false;
469 constexpr ExpirationRange expiration_range;
470 constexpr size_t num_blobs = 256;
471
472 std::vector<std::string> key_strs;
473 std::vector<std::string> blob_strs;
474
475 for (size_t i = 0; i < num_blobs; ++i) {
476 key_strs.push_back("key" + std::to_string(i));
477 blob_strs.push_back("blob" + std::to_string(i));
478 }
479
480 std::vector<Slice> keys;
481 std::vector<Slice> blobs;
482
483 for (size_t i = 0; i < num_blobs; ++i) {
484 keys.push_back({key_strs[i]});
485 blobs.push_back({blob_strs[i]});
486 }
487
488 std::vector<uint64_t> blob_offsets(keys.size());
489 std::vector<uint64_t> blob_sizes(keys.size());
490
491 constexpr size_t capacity = 1024;
492 auto backing_cache = NewLRUCache(capacity); // Blob file cache
493
494 FileOptions file_options;
495 std::unique_ptr<BlobFileCache> blob_file_cache =
496 std::make_unique<BlobFileCache>(
497 backing_cache.get(), &immutable_options, &file_options,
498 column_family_id, nullptr /*HistogramImpl*/, nullptr /*IOTracer*/);
499
500 BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
501 blob_file_cache.get());
502
503 ReadOptions read_options;
504 read_options.verify_checksums = true;
505
506 uint64_t bytes_read = 0;
507 std::vector<PinnableSlice> values(keys.size());
508
509 {
510 // Snappy Compression
511 const uint64_t file_number = 1;
512
513 read_options.read_tier = ReadTier::kReadAllTier;
514
515 WriteBlobFile(immutable_options, column_family_id, has_ttl,
516 expiration_range, expiration_range, file_number, keys, blobs,
517 compression, blob_offsets, blob_sizes);
518
519 CacheHandleGuard<BlobFileReader> blob_file_reader;
520 ASSERT_OK(blob_source.GetBlobFileReader(file_number, &blob_file_reader));
521 ASSERT_NE(blob_file_reader.GetValue(), nullptr);
522
523 const uint64_t file_size = blob_file_reader.GetValue()->GetFileSize();
524 ASSERT_EQ(blob_file_reader.GetValue()->GetCompressionType(), compression);
525
526 for (size_t i = 0; i < num_blobs; ++i) {
527 ASSERT_NE(blobs[i].size() /*uncompressed size*/,
528 blob_sizes[i] /*compressed size*/);
529 }
530
531 read_options.fill_cache = true;
532 read_options.read_tier = ReadTier::kReadAllTier;
533 get_perf_context()->Reset();
534
535 for (size_t i = 0; i < num_blobs; ++i) {
536 ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size,
537 blob_offsets[i]));
538 ASSERT_OK(blob_source.GetBlob(read_options, keys[i], file_number,
539 blob_offsets[i], file_size, blob_sizes[i],
540 compression, nullptr /*prefetch_buffer*/,
541 &values[i], &bytes_read));
542 ASSERT_EQ(values[i], blobs[i] /*uncompressed blob*/);
543 ASSERT_NE(values[i].size(), blob_sizes[i] /*compressed size*/);
544 ASSERT_EQ(bytes_read,
545 BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
546
547 ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size,
548 blob_offsets[i]));
549 }
550
551 ASSERT_GE((int)get_perf_context()->blob_decompress_time, 0);
552
553 read_options.read_tier = ReadTier::kBlockCacheTier;
554 get_perf_context()->Reset();
555
556 for (size_t i = 0; i < num_blobs; ++i) {
557 ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size,
558 blob_offsets[i]));
559
560 // Compressed blob size is passed in GetBlob
561 ASSERT_OK(blob_source.GetBlob(read_options, keys[i], file_number,
562 blob_offsets[i], file_size, blob_sizes[i],
563 compression, nullptr /*prefetch_buffer*/,
564 &values[i], &bytes_read));
565 ASSERT_EQ(values[i], blobs[i] /*uncompressed blob*/);
566 ASSERT_NE(values[i].size(), blob_sizes[i] /*compressed size*/);
567 ASSERT_EQ(bytes_read,
568 BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
569
570 ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size,
571 blob_offsets[i]));
572 }
573
574 ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0);
575 }
576}
577
578TEST_F(BlobSourceTest, MultiGetBlobsFromMultiFiles) {
579 options_.cf_paths.emplace_back(
580 test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromMultiFiles"),
581 0);
582
583 options_.statistics = CreateDBStatistics();
584 Statistics* statistics = options_.statistics.get();
585 assert(statistics);
586
587 DestroyAndReopen(options_);
588
589 ImmutableOptions immutable_options(options_);
590
591 constexpr uint32_t column_family_id = 1;
592 constexpr bool has_ttl = false;
593 constexpr ExpirationRange expiration_range;
594 constexpr uint64_t blob_files = 2;
595 constexpr size_t num_blobs = 32;
596
597 std::vector<std::string> key_strs;
598 std::vector<std::string> blob_strs;
599
600 for (size_t i = 0; i < num_blobs; ++i) {
601 key_strs.push_back("key" + std::to_string(i));
602 blob_strs.push_back("blob" + std::to_string(i));
603 }
604
605 std::vector<Slice> keys;
606 std::vector<Slice> blobs;
607
608 uint64_t file_size = BlobLogHeader::kSize;
609 uint64_t blob_value_bytes = 0;
610 for (size_t i = 0; i < num_blobs; ++i) {
611 keys.push_back({key_strs[i]});
612 blobs.push_back({blob_strs[i]});
613 blob_value_bytes += blobs[i].size();
614 file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size();
615 }
616 file_size += BlobLogFooter::kSize;
617 const uint64_t blob_records_bytes =
618 file_size - BlobLogHeader::kSize - BlobLogFooter::kSize;
619
620 std::vector<uint64_t> blob_offsets(keys.size());
621 std::vector<uint64_t> blob_sizes(keys.size());
622
623 {
624 // Write key/blob pairs to multiple blob files.
625 for (size_t i = 0; i < blob_files; ++i) {
626 const uint64_t file_number = i + 1;
627 WriteBlobFile(immutable_options, column_family_id, has_ttl,
628 expiration_range, expiration_range, file_number, keys,
629 blobs, kNoCompression, blob_offsets, blob_sizes);
630 }
631 }
632
633 constexpr size_t capacity = 10;
634 std::shared_ptr<Cache> backing_cache =
635 NewLRUCache(capacity); // Blob file cache
636
637 FileOptions file_options;
638 constexpr HistogramImpl* blob_file_read_hist = nullptr;
639
640 std::unique_ptr<BlobFileCache> blob_file_cache =
641 std::make_unique<BlobFileCache>(
642 backing_cache.get(), &immutable_options, &file_options,
643 column_family_id, blob_file_read_hist, nullptr /*IOTracer*/);
644
645 BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
646 blob_file_cache.get());
647
648 ReadOptions read_options;
649 read_options.verify_checksums = true;
650
651 uint64_t bytes_read = 0;
652
653 {
654 // MultiGetBlob
655 read_options.fill_cache = true;
656 read_options.read_tier = ReadTier::kReadAllTier;
657
658 autovector<BlobFileReadRequests> blob_reqs;
659 std::array<autovector<BlobReadRequest>, blob_files> blob_reqs_in_file;
660 std::array<PinnableSlice, num_blobs * blob_files> value_buf;
661 std::array<Status, num_blobs * blob_files> statuses_buf;
662
663 for (size_t i = 0; i < blob_files; ++i) {
664 const uint64_t file_number = i + 1;
665 for (size_t j = 0; j < num_blobs; ++j) {
666 blob_reqs_in_file[i].emplace_back(
667 keys[j], blob_offsets[j], blob_sizes[j], kNoCompression,
668 &value_buf[i * num_blobs + j], &statuses_buf[i * num_blobs + j]);
669 }
670 blob_reqs.emplace_back(file_number, file_size, blob_reqs_in_file[i]);
671 }
672
673 get_perf_context()->Reset();
674 statistics->Reset().PermitUncheckedError();
675
676 blob_source.MultiGetBlob(read_options, blob_reqs, &bytes_read);
677
678 for (size_t i = 0; i < blob_files; ++i) {
679 const uint64_t file_number = i + 1;
680 for (size_t j = 0; j < num_blobs; ++j) {
681 ASSERT_OK(statuses_buf[i * num_blobs + j]);
682 ASSERT_EQ(value_buf[i * num_blobs + j], blobs[j]);
683 ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size,
684 blob_offsets[j]));
685 }
686 }
687
688 // Retrieved all blobs from 2 blob files twice via MultiGetBlob and
689 // TEST_BlobInCache.
690 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count,
691 num_blobs * blob_files);
692 ASSERT_EQ((int)get_perf_context()->blob_read_count,
693 num_blobs * blob_files); // blocking i/o
694 ASSERT_EQ((int)get_perf_context()->blob_read_byte,
695 blob_records_bytes * blob_files); // blocking i/o
696 ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0);
697 ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0);
698
699 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS),
700 num_blobs * blob_files); // MultiGetBlob
701 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT),
702 num_blobs * blob_files); // TEST_BlobInCache
703 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD),
704 num_blobs * blob_files); // MultiGetBlob
705 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ),
706 blob_value_bytes * blob_files); // TEST_BlobInCache
707 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE),
708 blob_value_bytes * blob_files); // MultiGetBlob
709
710 get_perf_context()->Reset();
711 statistics->Reset().PermitUncheckedError();
712
713 autovector<BlobReadRequest> fake_blob_reqs_in_file;
714 std::array<PinnableSlice, num_blobs> fake_value_buf;
715 std::array<Status, num_blobs> fake_statuses_buf;
716
717 const uint64_t fake_file_number = 100;
718 for (size_t i = 0; i < num_blobs; ++i) {
719 fake_blob_reqs_in_file.emplace_back(
720 keys[i], blob_offsets[i], blob_sizes[i], kNoCompression,
721 &fake_value_buf[i], &fake_statuses_buf[i]);
722 }
723
724 // Add a fake multi-get blob request.
725 blob_reqs.emplace_back(fake_file_number, file_size, fake_blob_reqs_in_file);
726
727 blob_source.MultiGetBlob(read_options, blob_reqs, &bytes_read);
728
729 // Check the real blob read requests.
730 for (size_t i = 0; i < blob_files; ++i) {
731 const uint64_t file_number = i + 1;
732 for (size_t j = 0; j < num_blobs; ++j) {
733 ASSERT_OK(statuses_buf[i * num_blobs + j]);
734 ASSERT_EQ(value_buf[i * num_blobs + j], blobs[j]);
735 ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size,
736 blob_offsets[j]));
737 }
738 }
739
740 // Check the fake blob request.
741 for (size_t i = 0; i < num_blobs; ++i) {
742 ASSERT_TRUE(fake_statuses_buf[i].IsIOError());
743 ASSERT_TRUE(fake_value_buf[i].empty());
744 ASSERT_FALSE(blob_source.TEST_BlobInCache(fake_file_number, file_size,
745 blob_offsets[i]));
746 }
747
748 // Retrieved all blobs from 3 blob files (including the fake one) twice
749 // via MultiGetBlob and TEST_BlobInCache.
750 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count,
751 num_blobs * blob_files * 2);
752 ASSERT_EQ((int)get_perf_context()->blob_read_count,
753 0); // blocking i/o
754 ASSERT_EQ((int)get_perf_context()->blob_read_byte,
755 0); // blocking i/o
756 ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0);
757 ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0);
758
759 // Fake blob requests: MultiGetBlob and TEST_BlobInCache
760 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 2);
761 // Real blob requests: MultiGetBlob and TEST_BlobInCache
762 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT),
763 num_blobs * blob_files * 2);
764 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0);
765 // Real blob requests: MultiGetBlob and TEST_BlobInCache
766 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ),
767 blob_value_bytes * blob_files * 2);
768 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0);
769 }
770}
771
772TEST_F(BlobSourceTest, MultiGetBlobsFromCache) {
773 options_.cf_paths.emplace_back(
774 test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromCache"), 0);
775
776 options_.statistics = CreateDBStatistics();
777 Statistics* statistics = options_.statistics.get();
778 assert(statistics);
779
780 DestroyAndReopen(options_);
781
782 ImmutableOptions immutable_options(options_);
783
784 constexpr uint32_t column_family_id = 1;
785 constexpr bool has_ttl = false;
786 constexpr ExpirationRange expiration_range;
787 constexpr uint64_t blob_file_number = 1;
788 constexpr size_t num_blobs = 16;
789
790 std::vector<std::string> key_strs;
791 std::vector<std::string> blob_strs;
792
793 for (size_t i = 0; i < num_blobs; ++i) {
794 key_strs.push_back("key" + std::to_string(i));
795 blob_strs.push_back("blob" + std::to_string(i));
796 }
797
798 std::vector<Slice> keys;
799 std::vector<Slice> blobs;
800
801 uint64_t file_size = BlobLogHeader::kSize;
802 for (size_t i = 0; i < num_blobs; ++i) {
803 keys.push_back({key_strs[i]});
804 blobs.push_back({blob_strs[i]});
805 file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size();
806 }
807 file_size += BlobLogFooter::kSize;
808
809 std::vector<uint64_t> blob_offsets(keys.size());
810 std::vector<uint64_t> blob_sizes(keys.size());
811
812 WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
813 expiration_range, blob_file_number, keys, blobs, kNoCompression,
814 blob_offsets, blob_sizes);
815
816 constexpr size_t capacity = 10;
817 std::shared_ptr<Cache> backing_cache =
818 NewLRUCache(capacity); // Blob file cache
819
820 FileOptions file_options;
821 constexpr HistogramImpl* blob_file_read_hist = nullptr;
822
823 std::unique_ptr<BlobFileCache> blob_file_cache =
824 std::make_unique<BlobFileCache>(
825 backing_cache.get(), &immutable_options, &file_options,
826 column_family_id, blob_file_read_hist, nullptr /*IOTracer*/);
827
828 BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
829 blob_file_cache.get());
830
831 ReadOptions read_options;
832 read_options.verify_checksums = true;
833
834 constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
835
836 {
837 // MultiGetBlobFromOneFile
838 uint64_t bytes_read = 0;
839 std::array<Status, num_blobs> statuses_buf;
840 std::array<PinnableSlice, num_blobs> value_buf;
841 autovector<BlobReadRequest> blob_reqs;
842
843 for (size_t i = 0; i < num_blobs; i += 2) { // even index
844 blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i],
845 kNoCompression, &value_buf[i], &statuses_buf[i]);
846 ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
847 blob_offsets[i]));
848 }
849
850 read_options.fill_cache = true;
851 read_options.read_tier = ReadTier::kReadAllTier;
852 get_perf_context()->Reset();
853 statistics->Reset().PermitUncheckedError();
854
855 // Get half of blobs
856 blob_source.MultiGetBlobFromOneFile(read_options, blob_file_number,
857 file_size, blob_reqs, &bytes_read);
858
859 uint64_t fs_read_bytes = 0;
860 uint64_t ca_read_bytes = 0;
861 for (size_t i = 0; i < num_blobs; ++i) {
862 if (i % 2 == 0) {
863 ASSERT_OK(statuses_buf[i]);
864 ASSERT_EQ(value_buf[i], blobs[i]);
865 ASSERT_TRUE(value_buf[i].IsPinned());
866 fs_read_bytes +=
867 blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize;
868 ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
869 blob_offsets[i]));
870 ca_read_bytes += blob_sizes[i];
871 } else {
872 statuses_buf[i].PermitUncheckedError();
873 ASSERT_TRUE(value_buf[i].empty());
874 ASSERT_FALSE(value_buf[i].IsPinned());
875 ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
876 blob_offsets[i]));
877 }
878 }
879
880 constexpr int num_even_blobs = num_blobs / 2;
881 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, num_even_blobs);
882 ASSERT_EQ((int)get_perf_context()->blob_read_count,
883 num_even_blobs); // blocking i/o
884 ASSERT_EQ((int)get_perf_context()->blob_read_byte,
885 fs_read_bytes); // blocking i/o
886 ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0);
887 ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0);
888
889 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs);
890 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), num_even_blobs);
891 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), num_even_blobs);
892 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ),
893 ca_read_bytes);
894 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE),
895 ca_read_bytes);
896
897 // Get the rest of blobs
898 for (size_t i = 1; i < num_blobs; i += 2) { // odd index
899 ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
900 blob_offsets[i]));
901
902 ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number,
903 blob_offsets[i], file_size, blob_sizes[i],
904 kNoCompression, prefetch_buffer,
905 &value_buf[i], &bytes_read));
906 ASSERT_EQ(value_buf[i], blobs[i]);
907 ASSERT_TRUE(value_buf[i].IsPinned());
908 ASSERT_EQ(bytes_read,
909 BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]);
910
911 ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
912 blob_offsets[i]));
913 }
914
915 // Cache-only MultiGetBlobFromOneFile
916 read_options.read_tier = ReadTier::kBlockCacheTier;
917 get_perf_context()->Reset();
918 statistics->Reset().PermitUncheckedError();
919
920 blob_reqs.clear();
921 for (size_t i = 0; i < num_blobs; ++i) {
922 blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i],
923 kNoCompression, &value_buf[i], &statuses_buf[i]);
924 }
925
926 blob_source.MultiGetBlobFromOneFile(read_options, blob_file_number,
927 file_size, blob_reqs, &bytes_read);
928
929 uint64_t blob_bytes = 0;
930 for (size_t i = 0; i < num_blobs; ++i) {
931 ASSERT_OK(statuses_buf[i]);
932 ASSERT_EQ(value_buf[i], blobs[i]);
933 ASSERT_TRUE(value_buf[i].IsPinned());
934 ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
935 blob_offsets[i]));
936 blob_bytes += blob_sizes[i];
937 }
938
939 // Retrieved the blob cache num_blobs * 2 times via GetBlob and
940 // TEST_BlobInCache.
941 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, num_blobs * 2);
942 ASSERT_EQ((int)get_perf_context()->blob_read_count, 0); // blocking i/o
943 ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0); // blocking i/o
944 ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0);
945 ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0);
946
947 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), 0);
948 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), num_blobs * 2);
949 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0);
950 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ),
951 blob_bytes * 2);
952 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0);
953 }
954
955 options_.blob_cache->EraseUnRefEntries();
956
957 {
958 // Cache-only MultiGetBlobFromOneFile
959 uint64_t bytes_read = 0;
960 read_options.read_tier = ReadTier::kBlockCacheTier;
961
962 std::array<Status, num_blobs> statuses_buf;
963 std::array<PinnableSlice, num_blobs> value_buf;
964 autovector<BlobReadRequest> blob_reqs;
965
966 for (size_t i = 0; i < num_blobs; i++) {
967 blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i],
968 kNoCompression, &value_buf[i], &statuses_buf[i]);
969 ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
970 blob_offsets[i]));
971 }
972
973 get_perf_context()->Reset();
974 statistics->Reset().PermitUncheckedError();
975
976 blob_source.MultiGetBlobFromOneFile(read_options, blob_file_number,
977 file_size, blob_reqs, &bytes_read);
978
979 for (size_t i = 0; i < num_blobs; ++i) {
980 ASSERT_TRUE(statuses_buf[i].IsIncomplete());
981 ASSERT_TRUE(value_buf[i].empty());
982 ASSERT_FALSE(value_buf[i].IsPinned());
983 ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size,
984 blob_offsets[i]));
985 }
986
987 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, 0);
988 ASSERT_EQ((int)get_perf_context()->blob_read_count, 0); // blocking i/o
989 ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0); // blocking i/o
990 ASSERT_EQ((int)get_perf_context()->blob_checksum_time, 0);
991 ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0);
992
993 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 2);
994 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), 0);
995 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0);
996 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), 0);
997 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0);
998 }
999
1000 {
1001 // MultiGetBlobFromOneFile from non-existing file
1002 uint64_t bytes_read = 0;
1003 uint64_t non_existing_file_number = 100;
1004 read_options.read_tier = ReadTier::kReadAllTier;
1005
1006 std::array<Status, num_blobs> statuses_buf;
1007 std::array<PinnableSlice, num_blobs> value_buf;
1008 autovector<BlobReadRequest> blob_reqs;
1009
1010 for (size_t i = 0; i < num_blobs; i++) {
1011 blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i],
1012 kNoCompression, &value_buf[i], &statuses_buf[i]);
1013 ASSERT_FALSE(blob_source.TEST_BlobInCache(non_existing_file_number,
1014 file_size, blob_offsets[i]));
1015 }
1016
1017 get_perf_context()->Reset();
1018 statistics->Reset().PermitUncheckedError();
1019
1020 blob_source.MultiGetBlobFromOneFile(read_options, non_existing_file_number,
1021 file_size, blob_reqs, &bytes_read);
1022
1023 for (size_t i = 0; i < num_blobs; ++i) {
1024 ASSERT_TRUE(statuses_buf[i].IsIOError());
1025 ASSERT_TRUE(value_buf[i].empty());
1026 ASSERT_FALSE(value_buf[i].IsPinned());
1027 ASSERT_FALSE(blob_source.TEST_BlobInCache(non_existing_file_number,
1028 file_size, blob_offsets[i]));
1029 }
1030
1031 ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, 0);
1032 ASSERT_EQ((int)get_perf_context()->blob_read_count, 0); // blocking i/o
1033 ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0); // blocking i/o
1034 ASSERT_EQ((int)get_perf_context()->blob_checksum_time, 0);
1035 ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0);
1036
1037 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 2);
1038 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), 0);
1039 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0);
1040 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), 0);
1041 ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0);
1042 }
1043}
1044
1045class BlobSecondaryCacheTest : public DBTestBase {
1046 protected:
1047 public:
1048 explicit BlobSecondaryCacheTest()
1049 : DBTestBase("blob_secondary_cache_test", /*env_do_fsync=*/true) {
1050 options_.env = env_;
1051 options_.enable_blob_files = true;
1052 options_.create_if_missing = true;
1053
1054 // Set a small cache capacity to evict entries from the cache, and to test
1055 // that secondary cache is used properly.
1056 lru_cache_opts_.capacity = 1024;
1057 lru_cache_opts_.num_shard_bits = 0;
1058 lru_cache_opts_.strict_capacity_limit = true;
1059 lru_cache_opts_.metadata_charge_policy = kDontChargeCacheMetadata;
1060 lru_cache_opts_.high_pri_pool_ratio = 0.2;
1061 lru_cache_opts_.low_pri_pool_ratio = 0.2;
1062
1063 secondary_cache_opts_.capacity = 8 << 20; // 8 MB
1064 secondary_cache_opts_.num_shard_bits = 0;
1065 secondary_cache_opts_.metadata_charge_policy =
1066 kDefaultCacheMetadataChargePolicy;
1067
1068 // Read blobs from the secondary cache if they are not in the primary cache
1069 options_.lowest_used_cache_tier = CacheTier::kNonVolatileBlockTier;
1070
1071 assert(db_->GetDbIdentity(db_id_).ok());
1072 assert(db_->GetDbSessionId(db_session_id_).ok());
1073 }
1074
1075 Options options_;
1076
1077 LRUCacheOptions lru_cache_opts_;
1078 CompressedSecondaryCacheOptions secondary_cache_opts_;
1079
1080 std::string db_id_;
1081 std::string db_session_id_;
1082};
1083
1084TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) {
1085 if (!Snappy_Supported()) {
1086 return;
1087 }
1088
1089 secondary_cache_opts_.compression_type = kSnappyCompression;
1090 lru_cache_opts_.secondary_cache =
1091 NewCompressedSecondaryCache(secondary_cache_opts_);
1092 options_.blob_cache = NewLRUCache(lru_cache_opts_);
1093
1094 options_.cf_paths.emplace_back(
1095 test::PerThreadDBPath(
1096 env_, "BlobSecondaryCacheTest_GetBlobsFromSecondaryCache"),
1097 0);
1098
1099 options_.statistics = CreateDBStatistics();
1100 Statistics* statistics = options_.statistics.get();
1101 assert(statistics);
1102
1103 DestroyAndReopen(options_);
1104
1105 ImmutableOptions immutable_options(options_);
1106
1107 constexpr uint32_t column_family_id = 1;
1108 constexpr bool has_ttl = false;
1109 constexpr ExpirationRange expiration_range;
1110 constexpr uint64_t file_number = 1;
1111
1112 Random rnd(301);
1113
1114 std::vector<std::string> key_strs{"key0", "key1"};
1115 std::vector<std::string> blob_strs{rnd.RandomString(512),
1116 rnd.RandomString(768)};
1117
1118 std::vector<Slice> keys{key_strs[0], key_strs[1]};
1119 std::vector<Slice> blobs{blob_strs[0], blob_strs[1]};
1120
1121 std::vector<uint64_t> blob_offsets(keys.size());
1122 std::vector<uint64_t> blob_sizes(keys.size());
1123
1124 WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range,
1125 expiration_range, file_number, keys, blobs, kNoCompression,
1126 blob_offsets, blob_sizes);
1127
1128 constexpr size_t capacity = 1024;
1129 std::shared_ptr<Cache> backing_cache = NewLRUCache(capacity);
1130
1131 FileOptions file_options;
1132 constexpr HistogramImpl* blob_file_read_hist = nullptr;
1133
1134 std::unique_ptr<BlobFileCache> blob_file_cache(new BlobFileCache(
1135 backing_cache.get(), &immutable_options, &file_options, column_family_id,
1136 blob_file_read_hist, nullptr /*IOTracer*/));
1137
1138 BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
1139 blob_file_cache.get());
1140
1141 CacheHandleGuard<BlobFileReader> file_reader;
1142 ASSERT_OK(blob_source.GetBlobFileReader(file_number, &file_reader));
1143 ASSERT_NE(file_reader.GetValue(), nullptr);
1144 const uint64_t file_size = file_reader.GetValue()->GetFileSize();
1145 ASSERT_EQ(file_reader.GetValue()->GetCompressionType(), kNoCompression);
1146
1147 ReadOptions read_options;
1148 read_options.verify_checksums = true;
1149
1150 auto blob_cache = options_.blob_cache;
1151 auto secondary_cache = lru_cache_opts_.secondary_cache;
1152
1153 Cache::CreateCallback create_cb = [](const void* buf, size_t size,
1154 void** out_obj,
1155 size_t* charge) -> Status {
1156 CacheAllocationPtr allocation(new char[size]);
1157
1158 return BlobContents::CreateCallback(std::move(allocation), buf, size,
1159 out_obj, charge);
1160 };
1161
1162 {
1163 // GetBlob
1164 std::vector<PinnableSlice> values(keys.size());
1165
1166 read_options.fill_cache = true;
1167 get_perf_context()->Reset();
1168
1169 // key0 should be filled to the primary cache from the blob file.
1170 ASSERT_OK(blob_source.GetBlob(read_options, keys[0], file_number,
1171 blob_offsets[0], file_size, blob_sizes[0],
1172 kNoCompression, nullptr /* prefetch_buffer */,
1173 &values[0], nullptr /* bytes_read */));
1174 // Release cache handle
1175 values[0].Reset();
1176
1177 // key0 should be evicted and key0's dummy item is inserted into secondary
1178 // cache. key1 should be filled to the primary cache from the blob file.
1179 ASSERT_OK(blob_source.GetBlob(read_options, keys[1], file_number,
1180 blob_offsets[1], file_size, blob_sizes[1],
1181 kNoCompression, nullptr /* prefetch_buffer */,
1182 &values[1], nullptr /* bytes_read */));
1183
1184 // Release cache handle
1185 values[1].Reset();
1186
1187 // key0 should be filled to the primary cache from the blob file. key1
1188 // should be evicted and key1's dummy item is inserted into secondary cache.
1189 ASSERT_OK(blob_source.GetBlob(read_options, keys[0], file_number,
1190 blob_offsets[0], file_size, blob_sizes[0],
1191 kNoCompression, nullptr /* prefetch_buffer */,
1192 &values[0], nullptr /* bytes_read */));
1193 ASSERT_EQ(values[0], blobs[0]);
1194 ASSERT_TRUE(
1195 blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[0]));
1196
1197 // Release cache handle
1198 values[0].Reset();
1199
1200 // key0 should be evicted and is inserted into secondary cache.
1201 // key1 should be filled to the primary cache from the blob file.
1202 ASSERT_OK(blob_source.GetBlob(read_options, keys[1], file_number,
1203 blob_offsets[1], file_size, blob_sizes[1],
1204 kNoCompression, nullptr /* prefetch_buffer */,
1205 &values[1], nullptr /* bytes_read */));
1206 ASSERT_EQ(values[1], blobs[1]);
1207 ASSERT_TRUE(
1208 blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[1]));
1209
1210 // Release cache handle
1211 values[1].Reset();
1212
1213 OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number);
1214
1215 // blob_cache here only looks at the primary cache since we didn't provide
1216 // the cache item helper for the secondary cache. However, since key0 is
1217 // demoted to the secondary cache, we shouldn't be able to find it in the
1218 // primary cache.
1219 {
1220 CacheKey cache_key = base_cache_key.WithOffset(blob_offsets[0]);
1221 const Slice key0 = cache_key.AsSlice();
1222 auto handle0 = blob_cache->Lookup(key0, statistics);
1223 ASSERT_EQ(handle0, nullptr);
1224
1225 // key0's item should be in the secondary cache.
1226 bool is_in_sec_cache = false;
1227 auto sec_handle0 =
1228 secondary_cache->Lookup(key0, create_cb, true,
1229 /*advise_erase=*/true, is_in_sec_cache);
1230 ASSERT_FALSE(is_in_sec_cache);
1231 ASSERT_NE(sec_handle0, nullptr);
1232 ASSERT_TRUE(sec_handle0->IsReady());
1233 auto value = static_cast<BlobContents*>(sec_handle0->Value());
1234 ASSERT_NE(value, nullptr);
1235 ASSERT_EQ(value->data(), blobs[0]);
1236 delete value;
1237
1238 // key0 doesn't exist in the blob cache although key0's dummy
1239 // item exist in the secondary cache.
1240 ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size,
1241 blob_offsets[0]));
1242 }
1243
1244 // key1 should exists in the primary cache. key1's dummy item exists
1245 // in the secondary cache.
1246 {
1247 CacheKey cache_key = base_cache_key.WithOffset(blob_offsets[1]);
1248 const Slice key1 = cache_key.AsSlice();
1249 auto handle1 = blob_cache->Lookup(key1, statistics);
1250 ASSERT_NE(handle1, nullptr);
1251 blob_cache->Release(handle1);
1252
1253 bool is_in_sec_cache = false;
1254 auto sec_handle1 =
1255 secondary_cache->Lookup(key1, create_cb, true,
1256 /*advise_erase=*/true, is_in_sec_cache);
1257 ASSERT_FALSE(is_in_sec_cache);
1258 ASSERT_EQ(sec_handle1, nullptr);
1259
1260 ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size,
1261 blob_offsets[1]));
1262 }
1263
1264 {
1265 // fetch key0 from the blob file to the primary cache.
1266 // key1 is evicted and inserted into the secondary cache.
1267 ASSERT_OK(blob_source.GetBlob(
1268 read_options, keys[0], file_number, blob_offsets[0], file_size,
1269 blob_sizes[0], kNoCompression, nullptr /* prefetch_buffer */,
1270 &values[0], nullptr /* bytes_read */));
1271 ASSERT_EQ(values[0], blobs[0]);
1272
1273 // Release cache handle
1274 values[0].Reset();
1275
1276 // key0 should be in the primary cache.
1277 CacheKey cache_key0 = base_cache_key.WithOffset(blob_offsets[0]);
1278 const Slice key0 = cache_key0.AsSlice();
1279 auto handle0 = blob_cache->Lookup(key0, statistics);
1280 ASSERT_NE(handle0, nullptr);
1281 auto value = static_cast<BlobContents*>(blob_cache->Value(handle0));
1282 ASSERT_NE(value, nullptr);
1283 ASSERT_EQ(value->data(), blobs[0]);
1284 blob_cache->Release(handle0);
1285
1286 // key1 is not in the primary cache and is in the secondary cache.
1287 CacheKey cache_key1 = base_cache_key.WithOffset(blob_offsets[1]);
1288 const Slice key1 = cache_key1.AsSlice();
1289 auto handle1 = blob_cache->Lookup(key1, statistics);
1290 ASSERT_EQ(handle1, nullptr);
1291
1292 // erase key0 from the primary cache.
1293 blob_cache->Erase(key0);
1294 handle0 = blob_cache->Lookup(key0, statistics);
1295 ASSERT_EQ(handle0, nullptr);
1296
1297 // key1 promotion should succeed due to the primary cache being empty. we
1298 // did't call secondary cache's Lookup() here, because it will remove the
1299 // key but it won't be able to promote the key to the primary cache.
1300 // Instead we use the end-to-end blob source API to read key1.
1301 // In function TEST_BlobInCache, key1's dummy item is inserted into the
1302 // primary cache and a standalone handle is checked by GetValue().
1303 ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size,
1304 blob_offsets[1]));
1305
1306 // key1's dummy handle is in the primary cache and key1's item is still
1307 // in the secondary cache. So, the primary cache's Lookup() without
1308 // secondary cache support cannot see it. (NOTE: The dummy handle used
1309 // to be a leaky abstraction but not anymore.)
1310 handle1 = blob_cache->Lookup(key1, statistics);
1311 ASSERT_EQ(handle1, nullptr);
1312
1313 // But after another access, it is promoted to primary cache
1314 ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size,
1315 blob_offsets[1]));
1316
1317 // And Lookup() can find it (without secondary cache support)
1318 handle1 = blob_cache->Lookup(key1, statistics);
1319 ASSERT_NE(handle1, nullptr);
1320 ASSERT_NE(blob_cache->Value(handle1), nullptr);
1321 blob_cache->Release(handle1);
1322 }
1323 }
1324}
1325
1326class BlobSourceCacheReservationTest : public DBTestBase {
1327 public:
1328 explicit BlobSourceCacheReservationTest()
1329 : DBTestBase("blob_source_cache_reservation_test",
1330 /*env_do_fsync=*/true) {
1331 options_.env = env_;
1332 options_.enable_blob_files = true;
1333 options_.create_if_missing = true;
1334
1335 LRUCacheOptions co;
1336 co.capacity = kCacheCapacity;
1337 co.num_shard_bits = kNumShardBits;
1338 co.metadata_charge_policy = kDontChargeCacheMetadata;
1339
1340 co.high_pri_pool_ratio = 0.0;
1341 co.low_pri_pool_ratio = 0.0;
1342 std::shared_ptr<Cache> blob_cache = NewLRUCache(co);
1343
1344 co.high_pri_pool_ratio = 0.5;
1345 co.low_pri_pool_ratio = 0.5;
1346 std::shared_ptr<Cache> block_cache = NewLRUCache(co);
1347
1348 options_.blob_cache = blob_cache;
1349 options_.lowest_used_cache_tier = CacheTier::kVolatileTier;
1350
1351 BlockBasedTableOptions block_based_options;
1352 block_based_options.no_block_cache = false;
1353 block_based_options.block_cache = block_cache;
1354 block_based_options.cache_usage_options.options_overrides.insert(
1355 {CacheEntryRole::kBlobCache,
1356 {/* charged = */ CacheEntryRoleOptions::Decision::kEnabled}});
1357 options_.table_factory.reset(
1358 NewBlockBasedTableFactory(block_based_options));
1359
1360 assert(db_->GetDbIdentity(db_id_).ok());
1361 assert(db_->GetDbSessionId(db_session_id_).ok());
1362 }
1363
1364 void GenerateKeysAndBlobs() {
1365 for (size_t i = 0; i < kNumBlobs; ++i) {
1366 key_strs_.push_back("key" + std::to_string(i));
1367 blob_strs_.push_back("blob" + std::to_string(i));
1368 }
1369
1370 blob_file_size_ = BlobLogHeader::kSize;
1371 for (size_t i = 0; i < kNumBlobs; ++i) {
1372 keys_.push_back({key_strs_[i]});
1373 blobs_.push_back({blob_strs_[i]});
1374 blob_file_size_ +=
1375 BlobLogRecord::kHeaderSize + keys_[i].size() + blobs_[i].size();
1376 }
1377 blob_file_size_ += BlobLogFooter::kSize;
1378 }
1379
1380 static constexpr std::size_t kSizeDummyEntry = CacheReservationManagerImpl<
1381 CacheEntryRole::kBlobCache>::GetDummyEntrySize();
1382 static constexpr std::size_t kCacheCapacity = 1 * kSizeDummyEntry;
1383 static constexpr int kNumShardBits = 0; // 2^0 shard
1384
1385 static constexpr uint32_t kColumnFamilyId = 1;
1386 static constexpr bool kHasTTL = false;
1387 static constexpr uint64_t kBlobFileNumber = 1;
1388 static constexpr size_t kNumBlobs = 16;
1389
1390 std::vector<Slice> keys_;
1391 std::vector<Slice> blobs_;
1392 std::vector<std::string> key_strs_;
1393 std::vector<std::string> blob_strs_;
1394 uint64_t blob_file_size_;
1395
1396 Options options_;
1397 std::string db_id_;
1398 std::string db_session_id_;
1399};
1400
1401#ifndef ROCKSDB_LITE
1402TEST_F(BlobSourceCacheReservationTest, SimpleCacheReservation) {
1403 options_.cf_paths.emplace_back(
1404 test::PerThreadDBPath(
1405 env_, "BlobSourceCacheReservationTest_SimpleCacheReservation"),
1406 0);
1407
1408 GenerateKeysAndBlobs();
1409
1410 DestroyAndReopen(options_);
1411
1412 ImmutableOptions immutable_options(options_);
1413
1414 constexpr ExpirationRange expiration_range;
1415
1416 std::vector<uint64_t> blob_offsets(keys_.size());
1417 std::vector<uint64_t> blob_sizes(keys_.size());
1418
1419 WriteBlobFile(immutable_options, kColumnFamilyId, kHasTTL, expiration_range,
1420 expiration_range, kBlobFileNumber, keys_, blobs_,
1421 kNoCompression, blob_offsets, blob_sizes);
1422
1423 constexpr size_t capacity = 10;
1424 std::shared_ptr<Cache> backing_cache = NewLRUCache(capacity);
1425
1426 FileOptions file_options;
1427 constexpr HistogramImpl* blob_file_read_hist = nullptr;
1428
1429 std::unique_ptr<BlobFileCache> blob_file_cache =
1430 std::make_unique<BlobFileCache>(
1431 backing_cache.get(), &immutable_options, &file_options,
1432 kColumnFamilyId, blob_file_read_hist, nullptr /*IOTracer*/);
1433
1434 BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
1435 blob_file_cache.get());
1436
1437 ConcurrentCacheReservationManager* cache_res_mgr =
1438 static_cast<ChargedCache*>(blob_source.GetBlobCache())
1439 ->TEST_GetCacheReservationManager();
1440 ASSERT_NE(cache_res_mgr, nullptr);
1441
1442 ReadOptions read_options;
1443 read_options.verify_checksums = true;
1444
1445 {
1446 read_options.fill_cache = false;
1447
1448 std::vector<PinnableSlice> values(keys_.size());
1449
1450 for (size_t i = 0; i < kNumBlobs; ++i) {
1451 ASSERT_OK(blob_source.GetBlob(
1452 read_options, keys_[i], kBlobFileNumber, blob_offsets[i],
1453 blob_file_size_, blob_sizes[i], kNoCompression,
1454 nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */));
1455 ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0);
1456 ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), 0);
1457 }
1458 }
1459
1460 {
1461 read_options.fill_cache = true;
1462
1463 std::vector<PinnableSlice> values(keys_.size());
1464
1465 // num_blobs is 16, so the total blob cache usage is less than a single
1466 // dummy entry. Therefore, cache reservation manager only reserves one dummy
1467 // entry here.
1468 uint64_t blob_bytes = 0;
1469 for (size_t i = 0; i < kNumBlobs; ++i) {
1470 ASSERT_OK(blob_source.GetBlob(
1471 read_options, keys_[i], kBlobFileNumber, blob_offsets[i],
1472 blob_file_size_, blob_sizes[i], kNoCompression,
1473 nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */));
1474
1475 size_t charge = 0;
1476 ASSERT_TRUE(blob_source.TEST_BlobInCache(kBlobFileNumber, blob_file_size_,
1477 blob_offsets[i], &charge));
1478
1479 blob_bytes += charge;
1480 ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry);
1481 ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), blob_bytes);
1482 ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(),
1483 options_.blob_cache->GetUsage());
1484 }
1485 }
1486
1487 {
1488 OffsetableCacheKey base_cache_key(db_id_, db_session_id_, kBlobFileNumber);
1489 size_t blob_bytes = options_.blob_cache->GetUsage();
1490
1491 for (size_t i = 0; i < kNumBlobs; ++i) {
1492 size_t charge = 0;
1493 ASSERT_TRUE(blob_source.TEST_BlobInCache(kBlobFileNumber, blob_file_size_,
1494 blob_offsets[i], &charge));
1495
1496 CacheKey cache_key = base_cache_key.WithOffset(blob_offsets[i]);
1497 // We didn't call options_.blob_cache->Erase() here, this is because
1498 // the cache wrapper's Erase() method must be called to update the
1499 // cache usage after erasing the cache entry.
1500 blob_source.GetBlobCache()->Erase(cache_key.AsSlice());
1501 if (i == kNumBlobs - 1) {
1502 // All the blobs got removed from the cache. cache_res_mgr should not
1503 // reserve any space for them.
1504 ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0);
1505 } else {
1506 ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry);
1507 }
1508 blob_bytes -= charge;
1509 ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), blob_bytes);
1510 ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(),
1511 options_.blob_cache->GetUsage());
1512 }
1513 }
1514}
1515
1516TEST_F(BlobSourceCacheReservationTest, IncreaseCacheReservationOnFullCache) {
1517 options_.cf_paths.emplace_back(
1518 test::PerThreadDBPath(
1519 env_,
1520 "BlobSourceCacheReservationTest_IncreaseCacheReservationOnFullCache"),
1521 0);
1522
1523 GenerateKeysAndBlobs();
1524
1525 DestroyAndReopen(options_);
1526
1527 ImmutableOptions immutable_options(options_);
1528 constexpr size_t blob_size = kSizeDummyEntry / (kNumBlobs / 2);
1529 for (size_t i = 0; i < kNumBlobs; ++i) {
1530 blob_file_size_ -= blobs_[i].size(); // old blob size
1531 blob_strs_[i].resize(blob_size, '@');
1532 blobs_[i] = Slice(blob_strs_[i]);
1533 blob_file_size_ += blobs_[i].size(); // new blob size
1534 }
1535
1536 std::vector<uint64_t> blob_offsets(keys_.size());
1537 std::vector<uint64_t> blob_sizes(keys_.size());
1538
1539 constexpr ExpirationRange expiration_range;
1540 WriteBlobFile(immutable_options, kColumnFamilyId, kHasTTL, expiration_range,
1541 expiration_range, kBlobFileNumber, keys_, blobs_,
1542 kNoCompression, blob_offsets, blob_sizes);
1543
1544 constexpr size_t capacity = 10;
1545 std::shared_ptr<Cache> backing_cache = NewLRUCache(capacity);
1546
1547 FileOptions file_options;
1548 constexpr HistogramImpl* blob_file_read_hist = nullptr;
1549
1550 std::unique_ptr<BlobFileCache> blob_file_cache =
1551 std::make_unique<BlobFileCache>(
1552 backing_cache.get(), &immutable_options, &file_options,
1553 kColumnFamilyId, blob_file_read_hist, nullptr /*IOTracer*/);
1554
1555 BlobSource blob_source(&immutable_options, db_id_, db_session_id_,
1556 blob_file_cache.get());
1557
1558 ConcurrentCacheReservationManager* cache_res_mgr =
1559 static_cast<ChargedCache*>(blob_source.GetBlobCache())
1560 ->TEST_GetCacheReservationManager();
1561 ASSERT_NE(cache_res_mgr, nullptr);
1562
1563 ReadOptions read_options;
1564 read_options.verify_checksums = true;
1565
1566 {
1567 read_options.fill_cache = false;
1568
1569 std::vector<PinnableSlice> values(keys_.size());
1570
1571 for (size_t i = 0; i < kNumBlobs; ++i) {
1572 ASSERT_OK(blob_source.GetBlob(
1573 read_options, keys_[i], kBlobFileNumber, blob_offsets[i],
1574 blob_file_size_, blob_sizes[i], kNoCompression,
1575 nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */));
1576 ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0);
1577 ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), 0);
1578 }
1579 }
1580
1581 {
1582 read_options.fill_cache = true;
1583
1584 std::vector<PinnableSlice> values(keys_.size());
1585
1586 // Since we resized each blob to be kSizeDummyEntry / (num_blobs / 2), we
1587 // can't fit all the blobs in the cache at the same time, which means we
1588 // should observe cache evictions once we reach the cache's capacity.
1589 // Due to the overhead of the cache and the BlobContents objects, as well as
1590 // jemalloc bin sizes, this happens after inserting seven blobs.
1591 uint64_t blob_bytes = 0;
1592 for (size_t i = 0; i < kNumBlobs; ++i) {
1593 ASSERT_OK(blob_source.GetBlob(
1594 read_options, keys_[i], kBlobFileNumber, blob_offsets[i],
1595 blob_file_size_, blob_sizes[i], kNoCompression,
1596 nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */));
1597
1598 // Release cache handle
1599 values[i].Reset();
1600
1601 if (i < kNumBlobs / 2 - 1) {
1602 size_t charge = 0;
1603 ASSERT_TRUE(blob_source.TEST_BlobInCache(
1604 kBlobFileNumber, blob_file_size_, blob_offsets[i], &charge));
1605
1606 blob_bytes += charge;
1607 }
1608
1609 ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry);
1610 ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), blob_bytes);
1611 ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(),
1612 options_.blob_cache->GetUsage());
1613 }
1614 }
1615}
1616#endif // ROCKSDB_LITE
1617
1618} // namespace ROCKSDB_NAMESPACE
1619
1620int main(int argc, char** argv) {
1621 ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
1622 ::testing::InitGoogleTest(&argc, argv);
1623 return RUN_ALL_TESTS();
1624}