]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #include "db/blob/blob_source.h" | |
7 | ||
8 | #include <cassert> | |
9 | #include <cstdint> | |
10 | #include <cstdio> | |
11 | #include <memory> | |
12 | #include <string> | |
13 | ||
14 | #include "cache/charged_cache.h" | |
15 | #include "cache/compressed_secondary_cache.h" | |
16 | #include "db/blob/blob_contents.h" | |
17 | #include "db/blob/blob_file_cache.h" | |
18 | #include "db/blob/blob_file_reader.h" | |
19 | #include "db/blob/blob_log_format.h" | |
20 | #include "db/blob/blob_log_writer.h" | |
21 | #include "db/db_test_util.h" | |
22 | #include "file/filename.h" | |
23 | #include "file/read_write_util.h" | |
24 | #include "options/cf_options.h" | |
25 | #include "rocksdb/options.h" | |
26 | #include "util/compression.h" | |
27 | #include "util/random.h" | |
28 | ||
29 | namespace ROCKSDB_NAMESPACE { | |
30 | ||
31 | namespace { | |
32 | ||
33 | // Creates a test blob file with `num` blobs in it. | |
34 | void WriteBlobFile(const ImmutableOptions& immutable_options, | |
35 | uint32_t column_family_id, bool has_ttl, | |
36 | const ExpirationRange& expiration_range_header, | |
37 | const ExpirationRange& expiration_range_footer, | |
38 | uint64_t blob_file_number, const std::vector<Slice>& keys, | |
39 | const std::vector<Slice>& blobs, CompressionType compression, | |
40 | std::vector<uint64_t>& blob_offsets, | |
41 | std::vector<uint64_t>& blob_sizes) { | |
42 | assert(!immutable_options.cf_paths.empty()); | |
43 | size_t num = keys.size(); | |
44 | assert(num == blobs.size()); | |
45 | assert(num == blob_offsets.size()); | |
46 | assert(num == blob_sizes.size()); | |
47 | ||
48 | const std::string blob_file_path = | |
49 | BlobFileName(immutable_options.cf_paths.front().path, blob_file_number); | |
50 | std::unique_ptr<FSWritableFile> file; | |
51 | ASSERT_OK(NewWritableFile(immutable_options.fs.get(), blob_file_path, &file, | |
52 | FileOptions())); | |
53 | ||
54 | std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter( | |
55 | std::move(file), blob_file_path, FileOptions(), immutable_options.clock)); | |
56 | ||
57 | constexpr Statistics* statistics = nullptr; | |
58 | constexpr bool use_fsync = false; | |
59 | constexpr bool do_flush = false; | |
60 | ||
61 | BlobLogWriter blob_log_writer(std::move(file_writer), immutable_options.clock, | |
62 | statistics, blob_file_number, use_fsync, | |
63 | do_flush); | |
64 | ||
65 | BlobLogHeader header(column_family_id, compression, has_ttl, | |
66 | expiration_range_header); | |
67 | ||
68 | ASSERT_OK(blob_log_writer.WriteHeader(header)); | |
69 | ||
70 | std::vector<std::string> compressed_blobs(num); | |
71 | std::vector<Slice> blobs_to_write(num); | |
72 | if (kNoCompression == compression) { | |
73 | for (size_t i = 0; i < num; ++i) { | |
74 | blobs_to_write[i] = blobs[i]; | |
75 | blob_sizes[i] = blobs[i].size(); | |
76 | } | |
77 | } else { | |
78 | CompressionOptions opts; | |
79 | CompressionContext context(compression); | |
80 | constexpr uint64_t sample_for_compression = 0; | |
81 | CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), | |
82 | compression, sample_for_compression); | |
83 | ||
84 | constexpr uint32_t compression_format_version = 2; | |
85 | ||
86 | for (size_t i = 0; i < num; ++i) { | |
87 | ASSERT_TRUE(CompressData(blobs[i], info, compression_format_version, | |
88 | &compressed_blobs[i])); | |
89 | blobs_to_write[i] = compressed_blobs[i]; | |
90 | blob_sizes[i] = compressed_blobs[i].size(); | |
91 | } | |
92 | } | |
93 | ||
94 | for (size_t i = 0; i < num; ++i) { | |
95 | uint64_t key_offset = 0; | |
96 | ASSERT_OK(blob_log_writer.AddRecord(keys[i], blobs_to_write[i], &key_offset, | |
97 | &blob_offsets[i])); | |
98 | } | |
99 | ||
100 | BlobLogFooter footer; | |
101 | footer.blob_count = num; | |
102 | footer.expiration_range = expiration_range_footer; | |
103 | ||
104 | std::string checksum_method; | |
105 | std::string checksum_value; | |
106 | ASSERT_OK( | |
107 | blob_log_writer.AppendFooter(footer, &checksum_method, &checksum_value)); | |
108 | } | |
109 | ||
110 | } // anonymous namespace | |
111 | ||
112 | class BlobSourceTest : public DBTestBase { | |
113 | protected: | |
114 | public: | |
115 | explicit BlobSourceTest() | |
116 | : DBTestBase("blob_source_test", /*env_do_fsync=*/true) { | |
117 | options_.env = env_; | |
118 | options_.enable_blob_files = true; | |
119 | options_.create_if_missing = true; | |
120 | ||
121 | LRUCacheOptions co; | |
122 | co.capacity = 8 << 20; | |
123 | co.num_shard_bits = 2; | |
124 | co.metadata_charge_policy = kDontChargeCacheMetadata; | |
125 | co.high_pri_pool_ratio = 0.2; | |
126 | co.low_pri_pool_ratio = 0.2; | |
127 | options_.blob_cache = NewLRUCache(co); | |
128 | options_.lowest_used_cache_tier = CacheTier::kVolatileTier; | |
129 | ||
130 | assert(db_->GetDbIdentity(db_id_).ok()); | |
131 | assert(db_->GetDbSessionId(db_session_id_).ok()); | |
132 | } | |
133 | ||
134 | Options options_; | |
135 | std::string db_id_; | |
136 | std::string db_session_id_; | |
137 | }; | |
138 | ||
139 | TEST_F(BlobSourceTest, GetBlobsFromCache) { | |
140 | options_.cf_paths.emplace_back( | |
141 | test::PerThreadDBPath(env_, "BlobSourceTest_GetBlobsFromCache"), 0); | |
142 | ||
143 | options_.statistics = CreateDBStatistics(); | |
144 | Statistics* statistics = options_.statistics.get(); | |
145 | assert(statistics); | |
146 | ||
147 | DestroyAndReopen(options_); | |
148 | ||
149 | ImmutableOptions immutable_options(options_); | |
150 | ||
151 | constexpr uint32_t column_family_id = 1; | |
152 | constexpr bool has_ttl = false; | |
153 | constexpr ExpirationRange expiration_range; | |
154 | constexpr uint64_t blob_file_number = 1; | |
155 | constexpr size_t num_blobs = 16; | |
156 | ||
157 | std::vector<std::string> key_strs; | |
158 | std::vector<std::string> blob_strs; | |
159 | ||
160 | for (size_t i = 0; i < num_blobs; ++i) { | |
161 | key_strs.push_back("key" + std::to_string(i)); | |
162 | blob_strs.push_back("blob" + std::to_string(i)); | |
163 | } | |
164 | ||
165 | std::vector<Slice> keys; | |
166 | std::vector<Slice> blobs; | |
167 | ||
168 | uint64_t file_size = BlobLogHeader::kSize; | |
169 | for (size_t i = 0; i < num_blobs; ++i) { | |
170 | keys.push_back({key_strs[i]}); | |
171 | blobs.push_back({blob_strs[i]}); | |
172 | file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size(); | |
173 | } | |
174 | file_size += BlobLogFooter::kSize; | |
175 | ||
176 | std::vector<uint64_t> blob_offsets(keys.size()); | |
177 | std::vector<uint64_t> blob_sizes(keys.size()); | |
178 | ||
179 | WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range, | |
180 | expiration_range, blob_file_number, keys, blobs, kNoCompression, | |
181 | blob_offsets, blob_sizes); | |
182 | ||
183 | constexpr size_t capacity = 1024; | |
184 | std::shared_ptr<Cache> backing_cache = | |
185 | NewLRUCache(capacity); // Blob file cache | |
186 | ||
187 | FileOptions file_options; | |
188 | constexpr HistogramImpl* blob_file_read_hist = nullptr; | |
189 | ||
190 | std::unique_ptr<BlobFileCache> blob_file_cache = | |
191 | std::make_unique<BlobFileCache>( | |
192 | backing_cache.get(), &immutable_options, &file_options, | |
193 | column_family_id, blob_file_read_hist, nullptr /*IOTracer*/); | |
194 | ||
195 | BlobSource blob_source(&immutable_options, db_id_, db_session_id_, | |
196 | blob_file_cache.get()); | |
197 | ||
198 | ReadOptions read_options; | |
199 | read_options.verify_checksums = true; | |
200 | ||
201 | constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; | |
202 | ||
203 | { | |
204 | // GetBlob | |
205 | std::vector<PinnableSlice> values(keys.size()); | |
206 | uint64_t bytes_read = 0; | |
207 | uint64_t blob_bytes = 0; | |
208 | uint64_t total_bytes = 0; | |
209 | ||
210 | read_options.fill_cache = false; | |
211 | get_perf_context()->Reset(); | |
212 | ||
213 | for (size_t i = 0; i < num_blobs; ++i) { | |
214 | ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
215 | blob_offsets[i])); | |
216 | ||
217 | ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, | |
218 | blob_offsets[i], file_size, blob_sizes[i], | |
219 | kNoCompression, prefetch_buffer, &values[i], | |
220 | &bytes_read)); | |
221 | ASSERT_EQ(values[i], blobs[i]); | |
222 | ASSERT_TRUE(values[i].IsPinned()); | |
223 | ASSERT_EQ(bytes_read, | |
224 | BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); | |
225 | ||
226 | ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
227 | blob_offsets[i])); | |
228 | total_bytes += bytes_read; | |
229 | } | |
230 | ||
231 | // Retrieved the blob cache num_blobs * 3 times via TEST_BlobInCache, | |
232 | // GetBlob, and TEST_BlobInCache. | |
233 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, 0); | |
234 | ASSERT_EQ((int)get_perf_context()->blob_read_count, num_blobs); | |
235 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, total_bytes); | |
236 | ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0); | |
237 | ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0); | |
238 | ||
239 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 3); | |
240 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), 0); | |
241 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0); | |
242 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), 0); | |
243 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0); | |
244 | ||
245 | read_options.fill_cache = true; | |
246 | blob_bytes = 0; | |
247 | total_bytes = 0; | |
248 | get_perf_context()->Reset(); | |
249 | statistics->Reset().PermitUncheckedError(); | |
250 | ||
251 | for (size_t i = 0; i < num_blobs; ++i) { | |
252 | ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
253 | blob_offsets[i])); | |
254 | ||
255 | ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, | |
256 | blob_offsets[i], file_size, blob_sizes[i], | |
257 | kNoCompression, prefetch_buffer, &values[i], | |
258 | &bytes_read)); | |
259 | ASSERT_EQ(values[i], blobs[i]); | |
260 | ASSERT_TRUE(values[i].IsPinned()); | |
261 | ASSERT_EQ(bytes_read, | |
262 | BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); | |
263 | ||
264 | blob_bytes += blob_sizes[i]; | |
265 | total_bytes += bytes_read; | |
266 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, i); | |
267 | ASSERT_EQ((int)get_perf_context()->blob_read_count, i + 1); | |
268 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, total_bytes); | |
269 | ||
270 | ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
271 | blob_offsets[i])); | |
272 | ||
273 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, i + 1); | |
274 | ASSERT_EQ((int)get_perf_context()->blob_read_count, i + 1); | |
275 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, total_bytes); | |
276 | } | |
277 | ||
278 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, num_blobs); | |
279 | ASSERT_EQ((int)get_perf_context()->blob_read_count, num_blobs); | |
280 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, total_bytes); | |
281 | ||
282 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 2); | |
283 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), num_blobs); | |
284 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), num_blobs); | |
285 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), blob_bytes); | |
286 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), | |
287 | blob_bytes); | |
288 | ||
289 | read_options.fill_cache = true; | |
290 | total_bytes = 0; | |
291 | blob_bytes = 0; | |
292 | get_perf_context()->Reset(); | |
293 | statistics->Reset().PermitUncheckedError(); | |
294 | ||
295 | for (size_t i = 0; i < num_blobs; ++i) { | |
296 | ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
297 | blob_offsets[i])); | |
298 | ||
299 | ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, | |
300 | blob_offsets[i], file_size, blob_sizes[i], | |
301 | kNoCompression, prefetch_buffer, &values[i], | |
302 | &bytes_read)); | |
303 | ASSERT_EQ(values[i], blobs[i]); | |
304 | ASSERT_TRUE(values[i].IsPinned()); | |
305 | ASSERT_EQ(bytes_read, | |
306 | BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); | |
307 | ||
308 | ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
309 | blob_offsets[i])); | |
310 | total_bytes += bytes_read; // on-disk blob record size | |
311 | blob_bytes += blob_sizes[i]; // cached blob value size | |
312 | } | |
313 | ||
314 | // Retrieved the blob cache num_blobs * 3 times via TEST_BlobInCache, | |
315 | // GetBlob, and TEST_BlobInCache. | |
316 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, num_blobs * 3); | |
317 | ASSERT_EQ((int)get_perf_context()->blob_read_count, 0); // without i/o | |
318 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0); // without i/o | |
319 | ||
320 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), 0); | |
321 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), num_blobs * 3); | |
322 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0); | |
323 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), | |
324 | blob_bytes * 3); | |
325 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0); | |
326 | ||
327 | // Cache-only GetBlob | |
328 | read_options.read_tier = ReadTier::kBlockCacheTier; | |
329 | total_bytes = 0; | |
330 | blob_bytes = 0; | |
331 | get_perf_context()->Reset(); | |
332 | statistics->Reset().PermitUncheckedError(); | |
333 | ||
334 | for (size_t i = 0; i < num_blobs; ++i) { | |
335 | ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
336 | blob_offsets[i])); | |
337 | ||
338 | ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, | |
339 | blob_offsets[i], file_size, blob_sizes[i], | |
340 | kNoCompression, prefetch_buffer, &values[i], | |
341 | &bytes_read)); | |
342 | ASSERT_EQ(values[i], blobs[i]); | |
343 | ASSERT_TRUE(values[i].IsPinned()); | |
344 | ASSERT_EQ(bytes_read, | |
345 | BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); | |
346 | ||
347 | ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
348 | blob_offsets[i])); | |
349 | total_bytes += bytes_read; | |
350 | blob_bytes += blob_sizes[i]; | |
351 | } | |
352 | ||
353 | // Retrieved the blob cache num_blobs * 3 times via TEST_BlobInCache, | |
354 | // GetBlob, and TEST_BlobInCache. | |
355 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, num_blobs * 3); | |
356 | ASSERT_EQ((int)get_perf_context()->blob_read_count, 0); // without i/o | |
357 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0); // without i/o | |
358 | ||
359 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), 0); | |
360 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), num_blobs * 3); | |
361 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0); | |
362 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), | |
363 | blob_bytes * 3); | |
364 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0); | |
365 | } | |
366 | ||
367 | options_.blob_cache->EraseUnRefEntries(); | |
368 | ||
369 | { | |
370 | // Cache-only GetBlob | |
371 | std::vector<PinnableSlice> values(keys.size()); | |
372 | uint64_t bytes_read = 0; | |
373 | ||
374 | read_options.read_tier = ReadTier::kBlockCacheTier; | |
375 | read_options.fill_cache = true; | |
376 | get_perf_context()->Reset(); | |
377 | statistics->Reset().PermitUncheckedError(); | |
378 | ||
379 | for (size_t i = 0; i < num_blobs; ++i) { | |
380 | ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
381 | blob_offsets[i])); | |
382 | ||
383 | ASSERT_TRUE(blob_source | |
384 | .GetBlob(read_options, keys[i], blob_file_number, | |
385 | blob_offsets[i], file_size, blob_sizes[i], | |
386 | kNoCompression, prefetch_buffer, &values[i], | |
387 | &bytes_read) | |
388 | .IsIncomplete()); | |
389 | ASSERT_TRUE(values[i].empty()); | |
390 | ASSERT_FALSE(values[i].IsPinned()); | |
391 | ASSERT_EQ(bytes_read, 0); | |
392 | ||
393 | ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
394 | blob_offsets[i])); | |
395 | } | |
396 | ||
397 | // Retrieved the blob cache num_blobs * 3 times via TEST_BlobInCache, | |
398 | // GetBlob, and TEST_BlobInCache. | |
399 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, 0); | |
400 | ASSERT_EQ((int)get_perf_context()->blob_read_count, 0); | |
401 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0); | |
402 | ||
403 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 3); | |
404 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), 0); | |
405 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0); | |
406 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), 0); | |
407 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0); | |
408 | } | |
409 | ||
410 | { | |
411 | // GetBlob from non-existing file | |
412 | std::vector<PinnableSlice> values(keys.size()); | |
413 | uint64_t bytes_read = 0; | |
414 | uint64_t file_number = 100; // non-existing file | |
415 | ||
416 | read_options.read_tier = ReadTier::kReadAllTier; | |
417 | read_options.fill_cache = true; | |
418 | get_perf_context()->Reset(); | |
419 | statistics->Reset().PermitUncheckedError(); | |
420 | ||
421 | for (size_t i = 0; i < num_blobs; ++i) { | |
422 | ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, | |
423 | blob_offsets[i])); | |
424 | ||
425 | ASSERT_TRUE(blob_source | |
426 | .GetBlob(read_options, keys[i], file_number, | |
427 | blob_offsets[i], file_size, blob_sizes[i], | |
428 | kNoCompression, prefetch_buffer, &values[i], | |
429 | &bytes_read) | |
430 | .IsIOError()); | |
431 | ASSERT_TRUE(values[i].empty()); | |
432 | ASSERT_FALSE(values[i].IsPinned()); | |
433 | ASSERT_EQ(bytes_read, 0); | |
434 | ||
435 | ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, | |
436 | blob_offsets[i])); | |
437 | } | |
438 | ||
439 | // Retrieved the blob cache num_blobs * 3 times via TEST_BlobInCache, | |
440 | // GetBlob, and TEST_BlobInCache. | |
441 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, 0); | |
442 | ASSERT_EQ((int)get_perf_context()->blob_read_count, 0); | |
443 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0); | |
444 | ||
445 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 3); | |
446 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), 0); | |
447 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0); | |
448 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), 0); | |
449 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0); | |
450 | } | |
451 | } | |
452 | ||
453 | TEST_F(BlobSourceTest, GetCompressedBlobs) { | |
454 | if (!Snappy_Supported()) { | |
455 | return; | |
456 | } | |
457 | ||
458 | const CompressionType compression = kSnappyCompression; | |
459 | ||
460 | options_.cf_paths.emplace_back( | |
461 | test::PerThreadDBPath(env_, "BlobSourceTest_GetCompressedBlobs"), 0); | |
462 | ||
463 | DestroyAndReopen(options_); | |
464 | ||
465 | ImmutableOptions immutable_options(options_); | |
466 | ||
467 | constexpr uint32_t column_family_id = 1; | |
468 | constexpr bool has_ttl = false; | |
469 | constexpr ExpirationRange expiration_range; | |
470 | constexpr size_t num_blobs = 256; | |
471 | ||
472 | std::vector<std::string> key_strs; | |
473 | std::vector<std::string> blob_strs; | |
474 | ||
475 | for (size_t i = 0; i < num_blobs; ++i) { | |
476 | key_strs.push_back("key" + std::to_string(i)); | |
477 | blob_strs.push_back("blob" + std::to_string(i)); | |
478 | } | |
479 | ||
480 | std::vector<Slice> keys; | |
481 | std::vector<Slice> blobs; | |
482 | ||
483 | for (size_t i = 0; i < num_blobs; ++i) { | |
484 | keys.push_back({key_strs[i]}); | |
485 | blobs.push_back({blob_strs[i]}); | |
486 | } | |
487 | ||
488 | std::vector<uint64_t> blob_offsets(keys.size()); | |
489 | std::vector<uint64_t> blob_sizes(keys.size()); | |
490 | ||
491 | constexpr size_t capacity = 1024; | |
492 | auto backing_cache = NewLRUCache(capacity); // Blob file cache | |
493 | ||
494 | FileOptions file_options; | |
495 | std::unique_ptr<BlobFileCache> blob_file_cache = | |
496 | std::make_unique<BlobFileCache>( | |
497 | backing_cache.get(), &immutable_options, &file_options, | |
498 | column_family_id, nullptr /*HistogramImpl*/, nullptr /*IOTracer*/); | |
499 | ||
500 | BlobSource blob_source(&immutable_options, db_id_, db_session_id_, | |
501 | blob_file_cache.get()); | |
502 | ||
503 | ReadOptions read_options; | |
504 | read_options.verify_checksums = true; | |
505 | ||
506 | uint64_t bytes_read = 0; | |
507 | std::vector<PinnableSlice> values(keys.size()); | |
508 | ||
509 | { | |
510 | // Snappy Compression | |
511 | const uint64_t file_number = 1; | |
512 | ||
513 | read_options.read_tier = ReadTier::kReadAllTier; | |
514 | ||
515 | WriteBlobFile(immutable_options, column_family_id, has_ttl, | |
516 | expiration_range, expiration_range, file_number, keys, blobs, | |
517 | compression, blob_offsets, blob_sizes); | |
518 | ||
519 | CacheHandleGuard<BlobFileReader> blob_file_reader; | |
520 | ASSERT_OK(blob_source.GetBlobFileReader(file_number, &blob_file_reader)); | |
521 | ASSERT_NE(blob_file_reader.GetValue(), nullptr); | |
522 | ||
523 | const uint64_t file_size = blob_file_reader.GetValue()->GetFileSize(); | |
524 | ASSERT_EQ(blob_file_reader.GetValue()->GetCompressionType(), compression); | |
525 | ||
526 | for (size_t i = 0; i < num_blobs; ++i) { | |
527 | ASSERT_NE(blobs[i].size() /*uncompressed size*/, | |
528 | blob_sizes[i] /*compressed size*/); | |
529 | } | |
530 | ||
531 | read_options.fill_cache = true; | |
532 | read_options.read_tier = ReadTier::kReadAllTier; | |
533 | get_perf_context()->Reset(); | |
534 | ||
535 | for (size_t i = 0; i < num_blobs; ++i) { | |
536 | ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, | |
537 | blob_offsets[i])); | |
538 | ASSERT_OK(blob_source.GetBlob(read_options, keys[i], file_number, | |
539 | blob_offsets[i], file_size, blob_sizes[i], | |
540 | compression, nullptr /*prefetch_buffer*/, | |
541 | &values[i], &bytes_read)); | |
542 | ASSERT_EQ(values[i], blobs[i] /*uncompressed blob*/); | |
543 | ASSERT_NE(values[i].size(), blob_sizes[i] /*compressed size*/); | |
544 | ASSERT_EQ(bytes_read, | |
545 | BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); | |
546 | ||
547 | ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, | |
548 | blob_offsets[i])); | |
549 | } | |
550 | ||
551 | ASSERT_GE((int)get_perf_context()->blob_decompress_time, 0); | |
552 | ||
553 | read_options.read_tier = ReadTier::kBlockCacheTier; | |
554 | get_perf_context()->Reset(); | |
555 | ||
556 | for (size_t i = 0; i < num_blobs; ++i) { | |
557 | ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, | |
558 | blob_offsets[i])); | |
559 | ||
560 | // Compressed blob size is passed in GetBlob | |
561 | ASSERT_OK(blob_source.GetBlob(read_options, keys[i], file_number, | |
562 | blob_offsets[i], file_size, blob_sizes[i], | |
563 | compression, nullptr /*prefetch_buffer*/, | |
564 | &values[i], &bytes_read)); | |
565 | ASSERT_EQ(values[i], blobs[i] /*uncompressed blob*/); | |
566 | ASSERT_NE(values[i].size(), blob_sizes[i] /*compressed size*/); | |
567 | ASSERT_EQ(bytes_read, | |
568 | BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); | |
569 | ||
570 | ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, | |
571 | blob_offsets[i])); | |
572 | } | |
573 | ||
574 | ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0); | |
575 | } | |
576 | } | |
577 | ||
578 | TEST_F(BlobSourceTest, MultiGetBlobsFromMultiFiles) { | |
579 | options_.cf_paths.emplace_back( | |
580 | test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromMultiFiles"), | |
581 | 0); | |
582 | ||
583 | options_.statistics = CreateDBStatistics(); | |
584 | Statistics* statistics = options_.statistics.get(); | |
585 | assert(statistics); | |
586 | ||
587 | DestroyAndReopen(options_); | |
588 | ||
589 | ImmutableOptions immutable_options(options_); | |
590 | ||
591 | constexpr uint32_t column_family_id = 1; | |
592 | constexpr bool has_ttl = false; | |
593 | constexpr ExpirationRange expiration_range; | |
594 | constexpr uint64_t blob_files = 2; | |
595 | constexpr size_t num_blobs = 32; | |
596 | ||
597 | std::vector<std::string> key_strs; | |
598 | std::vector<std::string> blob_strs; | |
599 | ||
600 | for (size_t i = 0; i < num_blobs; ++i) { | |
601 | key_strs.push_back("key" + std::to_string(i)); | |
602 | blob_strs.push_back("blob" + std::to_string(i)); | |
603 | } | |
604 | ||
605 | std::vector<Slice> keys; | |
606 | std::vector<Slice> blobs; | |
607 | ||
608 | uint64_t file_size = BlobLogHeader::kSize; | |
609 | uint64_t blob_value_bytes = 0; | |
610 | for (size_t i = 0; i < num_blobs; ++i) { | |
611 | keys.push_back({key_strs[i]}); | |
612 | blobs.push_back({blob_strs[i]}); | |
613 | blob_value_bytes += blobs[i].size(); | |
614 | file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size(); | |
615 | } | |
616 | file_size += BlobLogFooter::kSize; | |
617 | const uint64_t blob_records_bytes = | |
618 | file_size - BlobLogHeader::kSize - BlobLogFooter::kSize; | |
619 | ||
620 | std::vector<uint64_t> blob_offsets(keys.size()); | |
621 | std::vector<uint64_t> blob_sizes(keys.size()); | |
622 | ||
623 | { | |
624 | // Write key/blob pairs to multiple blob files. | |
625 | for (size_t i = 0; i < blob_files; ++i) { | |
626 | const uint64_t file_number = i + 1; | |
627 | WriteBlobFile(immutable_options, column_family_id, has_ttl, | |
628 | expiration_range, expiration_range, file_number, keys, | |
629 | blobs, kNoCompression, blob_offsets, blob_sizes); | |
630 | } | |
631 | } | |
632 | ||
633 | constexpr size_t capacity = 10; | |
634 | std::shared_ptr<Cache> backing_cache = | |
635 | NewLRUCache(capacity); // Blob file cache | |
636 | ||
637 | FileOptions file_options; | |
638 | constexpr HistogramImpl* blob_file_read_hist = nullptr; | |
639 | ||
640 | std::unique_ptr<BlobFileCache> blob_file_cache = | |
641 | std::make_unique<BlobFileCache>( | |
642 | backing_cache.get(), &immutable_options, &file_options, | |
643 | column_family_id, blob_file_read_hist, nullptr /*IOTracer*/); | |
644 | ||
645 | BlobSource blob_source(&immutable_options, db_id_, db_session_id_, | |
646 | blob_file_cache.get()); | |
647 | ||
648 | ReadOptions read_options; | |
649 | read_options.verify_checksums = true; | |
650 | ||
651 | uint64_t bytes_read = 0; | |
652 | ||
653 | { | |
654 | // MultiGetBlob | |
655 | read_options.fill_cache = true; | |
656 | read_options.read_tier = ReadTier::kReadAllTier; | |
657 | ||
658 | autovector<BlobFileReadRequests> blob_reqs; | |
659 | std::array<autovector<BlobReadRequest>, blob_files> blob_reqs_in_file; | |
660 | std::array<PinnableSlice, num_blobs * blob_files> value_buf; | |
661 | std::array<Status, num_blobs * blob_files> statuses_buf; | |
662 | ||
663 | for (size_t i = 0; i < blob_files; ++i) { | |
664 | const uint64_t file_number = i + 1; | |
665 | for (size_t j = 0; j < num_blobs; ++j) { | |
666 | blob_reqs_in_file[i].emplace_back( | |
667 | keys[j], blob_offsets[j], blob_sizes[j], kNoCompression, | |
668 | &value_buf[i * num_blobs + j], &statuses_buf[i * num_blobs + j]); | |
669 | } | |
670 | blob_reqs.emplace_back(file_number, file_size, blob_reqs_in_file[i]); | |
671 | } | |
672 | ||
673 | get_perf_context()->Reset(); | |
674 | statistics->Reset().PermitUncheckedError(); | |
675 | ||
676 | blob_source.MultiGetBlob(read_options, blob_reqs, &bytes_read); | |
677 | ||
678 | for (size_t i = 0; i < blob_files; ++i) { | |
679 | const uint64_t file_number = i + 1; | |
680 | for (size_t j = 0; j < num_blobs; ++j) { | |
681 | ASSERT_OK(statuses_buf[i * num_blobs + j]); | |
682 | ASSERT_EQ(value_buf[i * num_blobs + j], blobs[j]); | |
683 | ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, | |
684 | blob_offsets[j])); | |
685 | } | |
686 | } | |
687 | ||
688 | // Retrieved all blobs from 2 blob files twice via MultiGetBlob and | |
689 | // TEST_BlobInCache. | |
690 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, | |
691 | num_blobs * blob_files); | |
692 | ASSERT_EQ((int)get_perf_context()->blob_read_count, | |
693 | num_blobs * blob_files); // blocking i/o | |
694 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, | |
695 | blob_records_bytes * blob_files); // blocking i/o | |
696 | ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0); | |
697 | ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0); | |
698 | ||
699 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), | |
700 | num_blobs * blob_files); // MultiGetBlob | |
701 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), | |
702 | num_blobs * blob_files); // TEST_BlobInCache | |
703 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), | |
704 | num_blobs * blob_files); // MultiGetBlob | |
705 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), | |
706 | blob_value_bytes * blob_files); // TEST_BlobInCache | |
707 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), | |
708 | blob_value_bytes * blob_files); // MultiGetBlob | |
709 | ||
710 | get_perf_context()->Reset(); | |
711 | statistics->Reset().PermitUncheckedError(); | |
712 | ||
713 | autovector<BlobReadRequest> fake_blob_reqs_in_file; | |
714 | std::array<PinnableSlice, num_blobs> fake_value_buf; | |
715 | std::array<Status, num_blobs> fake_statuses_buf; | |
716 | ||
717 | const uint64_t fake_file_number = 100; | |
718 | for (size_t i = 0; i < num_blobs; ++i) { | |
719 | fake_blob_reqs_in_file.emplace_back( | |
720 | keys[i], blob_offsets[i], blob_sizes[i], kNoCompression, | |
721 | &fake_value_buf[i], &fake_statuses_buf[i]); | |
722 | } | |
723 | ||
724 | // Add a fake multi-get blob request. | |
725 | blob_reqs.emplace_back(fake_file_number, file_size, fake_blob_reqs_in_file); | |
726 | ||
727 | blob_source.MultiGetBlob(read_options, blob_reqs, &bytes_read); | |
728 | ||
729 | // Check the real blob read requests. | |
730 | for (size_t i = 0; i < blob_files; ++i) { | |
731 | const uint64_t file_number = i + 1; | |
732 | for (size_t j = 0; j < num_blobs; ++j) { | |
733 | ASSERT_OK(statuses_buf[i * num_blobs + j]); | |
734 | ASSERT_EQ(value_buf[i * num_blobs + j], blobs[j]); | |
735 | ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, | |
736 | blob_offsets[j])); | |
737 | } | |
738 | } | |
739 | ||
740 | // Check the fake blob request. | |
741 | for (size_t i = 0; i < num_blobs; ++i) { | |
742 | ASSERT_TRUE(fake_statuses_buf[i].IsIOError()); | |
743 | ASSERT_TRUE(fake_value_buf[i].empty()); | |
744 | ASSERT_FALSE(blob_source.TEST_BlobInCache(fake_file_number, file_size, | |
745 | blob_offsets[i])); | |
746 | } | |
747 | ||
748 | // Retrieved all blobs from 3 blob files (including the fake one) twice | |
749 | // via MultiGetBlob and TEST_BlobInCache. | |
750 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, | |
751 | num_blobs * blob_files * 2); | |
752 | ASSERT_EQ((int)get_perf_context()->blob_read_count, | |
753 | 0); // blocking i/o | |
754 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, | |
755 | 0); // blocking i/o | |
756 | ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0); | |
757 | ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0); | |
758 | ||
759 | // Fake blob requests: MultiGetBlob and TEST_BlobInCache | |
760 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 2); | |
761 | // Real blob requests: MultiGetBlob and TEST_BlobInCache | |
762 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), | |
763 | num_blobs * blob_files * 2); | |
764 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0); | |
765 | // Real blob requests: MultiGetBlob and TEST_BlobInCache | |
766 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), | |
767 | blob_value_bytes * blob_files * 2); | |
768 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0); | |
769 | } | |
770 | } | |
771 | ||
772 | TEST_F(BlobSourceTest, MultiGetBlobsFromCache) { | |
773 | options_.cf_paths.emplace_back( | |
774 | test::PerThreadDBPath(env_, "BlobSourceTest_MultiGetBlobsFromCache"), 0); | |
775 | ||
776 | options_.statistics = CreateDBStatistics(); | |
777 | Statistics* statistics = options_.statistics.get(); | |
778 | assert(statistics); | |
779 | ||
780 | DestroyAndReopen(options_); | |
781 | ||
782 | ImmutableOptions immutable_options(options_); | |
783 | ||
784 | constexpr uint32_t column_family_id = 1; | |
785 | constexpr bool has_ttl = false; | |
786 | constexpr ExpirationRange expiration_range; | |
787 | constexpr uint64_t blob_file_number = 1; | |
788 | constexpr size_t num_blobs = 16; | |
789 | ||
790 | std::vector<std::string> key_strs; | |
791 | std::vector<std::string> blob_strs; | |
792 | ||
793 | for (size_t i = 0; i < num_blobs; ++i) { | |
794 | key_strs.push_back("key" + std::to_string(i)); | |
795 | blob_strs.push_back("blob" + std::to_string(i)); | |
796 | } | |
797 | ||
798 | std::vector<Slice> keys; | |
799 | std::vector<Slice> blobs; | |
800 | ||
801 | uint64_t file_size = BlobLogHeader::kSize; | |
802 | for (size_t i = 0; i < num_blobs; ++i) { | |
803 | keys.push_back({key_strs[i]}); | |
804 | blobs.push_back({blob_strs[i]}); | |
805 | file_size += BlobLogRecord::kHeaderSize + keys[i].size() + blobs[i].size(); | |
806 | } | |
807 | file_size += BlobLogFooter::kSize; | |
808 | ||
809 | std::vector<uint64_t> blob_offsets(keys.size()); | |
810 | std::vector<uint64_t> blob_sizes(keys.size()); | |
811 | ||
812 | WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range, | |
813 | expiration_range, blob_file_number, keys, blobs, kNoCompression, | |
814 | blob_offsets, blob_sizes); | |
815 | ||
816 | constexpr size_t capacity = 10; | |
817 | std::shared_ptr<Cache> backing_cache = | |
818 | NewLRUCache(capacity); // Blob file cache | |
819 | ||
820 | FileOptions file_options; | |
821 | constexpr HistogramImpl* blob_file_read_hist = nullptr; | |
822 | ||
823 | std::unique_ptr<BlobFileCache> blob_file_cache = | |
824 | std::make_unique<BlobFileCache>( | |
825 | backing_cache.get(), &immutable_options, &file_options, | |
826 | column_family_id, blob_file_read_hist, nullptr /*IOTracer*/); | |
827 | ||
828 | BlobSource blob_source(&immutable_options, db_id_, db_session_id_, | |
829 | blob_file_cache.get()); | |
830 | ||
831 | ReadOptions read_options; | |
832 | read_options.verify_checksums = true; | |
833 | ||
834 | constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; | |
835 | ||
836 | { | |
837 | // MultiGetBlobFromOneFile | |
838 | uint64_t bytes_read = 0; | |
839 | std::array<Status, num_blobs> statuses_buf; | |
840 | std::array<PinnableSlice, num_blobs> value_buf; | |
841 | autovector<BlobReadRequest> blob_reqs; | |
842 | ||
843 | for (size_t i = 0; i < num_blobs; i += 2) { // even index | |
844 | blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i], | |
845 | kNoCompression, &value_buf[i], &statuses_buf[i]); | |
846 | ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
847 | blob_offsets[i])); | |
848 | } | |
849 | ||
850 | read_options.fill_cache = true; | |
851 | read_options.read_tier = ReadTier::kReadAllTier; | |
852 | get_perf_context()->Reset(); | |
853 | statistics->Reset().PermitUncheckedError(); | |
854 | ||
855 | // Get half of blobs | |
856 | blob_source.MultiGetBlobFromOneFile(read_options, blob_file_number, | |
857 | file_size, blob_reqs, &bytes_read); | |
858 | ||
859 | uint64_t fs_read_bytes = 0; | |
860 | uint64_t ca_read_bytes = 0; | |
861 | for (size_t i = 0; i < num_blobs; ++i) { | |
862 | if (i % 2 == 0) { | |
863 | ASSERT_OK(statuses_buf[i]); | |
864 | ASSERT_EQ(value_buf[i], blobs[i]); | |
865 | ASSERT_TRUE(value_buf[i].IsPinned()); | |
866 | fs_read_bytes += | |
867 | blob_sizes[i] + keys[i].size() + BlobLogRecord::kHeaderSize; | |
868 | ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
869 | blob_offsets[i])); | |
870 | ca_read_bytes += blob_sizes[i]; | |
871 | } else { | |
872 | statuses_buf[i].PermitUncheckedError(); | |
873 | ASSERT_TRUE(value_buf[i].empty()); | |
874 | ASSERT_FALSE(value_buf[i].IsPinned()); | |
875 | ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
876 | blob_offsets[i])); | |
877 | } | |
878 | } | |
879 | ||
880 | constexpr int num_even_blobs = num_blobs / 2; | |
881 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, num_even_blobs); | |
882 | ASSERT_EQ((int)get_perf_context()->blob_read_count, | |
883 | num_even_blobs); // blocking i/o | |
884 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, | |
885 | fs_read_bytes); // blocking i/o | |
886 | ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0); | |
887 | ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0); | |
888 | ||
889 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs); | |
890 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), num_even_blobs); | |
891 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), num_even_blobs); | |
892 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), | |
893 | ca_read_bytes); | |
894 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), | |
895 | ca_read_bytes); | |
896 | ||
897 | // Get the rest of blobs | |
898 | for (size_t i = 1; i < num_blobs; i += 2) { // odd index | |
899 | ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
900 | blob_offsets[i])); | |
901 | ||
902 | ASSERT_OK(blob_source.GetBlob(read_options, keys[i], blob_file_number, | |
903 | blob_offsets[i], file_size, blob_sizes[i], | |
904 | kNoCompression, prefetch_buffer, | |
905 | &value_buf[i], &bytes_read)); | |
906 | ASSERT_EQ(value_buf[i], blobs[i]); | |
907 | ASSERT_TRUE(value_buf[i].IsPinned()); | |
908 | ASSERT_EQ(bytes_read, | |
909 | BlobLogRecord::kHeaderSize + keys[i].size() + blob_sizes[i]); | |
910 | ||
911 | ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
912 | blob_offsets[i])); | |
913 | } | |
914 | ||
915 | // Cache-only MultiGetBlobFromOneFile | |
916 | read_options.read_tier = ReadTier::kBlockCacheTier; | |
917 | get_perf_context()->Reset(); | |
918 | statistics->Reset().PermitUncheckedError(); | |
919 | ||
920 | blob_reqs.clear(); | |
921 | for (size_t i = 0; i < num_blobs; ++i) { | |
922 | blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i], | |
923 | kNoCompression, &value_buf[i], &statuses_buf[i]); | |
924 | } | |
925 | ||
926 | blob_source.MultiGetBlobFromOneFile(read_options, blob_file_number, | |
927 | file_size, blob_reqs, &bytes_read); | |
928 | ||
929 | uint64_t blob_bytes = 0; | |
930 | for (size_t i = 0; i < num_blobs; ++i) { | |
931 | ASSERT_OK(statuses_buf[i]); | |
932 | ASSERT_EQ(value_buf[i], blobs[i]); | |
933 | ASSERT_TRUE(value_buf[i].IsPinned()); | |
934 | ASSERT_TRUE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
935 | blob_offsets[i])); | |
936 | blob_bytes += blob_sizes[i]; | |
937 | } | |
938 | ||
939 | // Retrieved the blob cache num_blobs * 2 times via GetBlob and | |
940 | // TEST_BlobInCache. | |
941 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, num_blobs * 2); | |
942 | ASSERT_EQ((int)get_perf_context()->blob_read_count, 0); // blocking i/o | |
943 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0); // blocking i/o | |
944 | ASSERT_GE((int)get_perf_context()->blob_checksum_time, 0); | |
945 | ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0); | |
946 | ||
947 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), 0); | |
948 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), num_blobs * 2); | |
949 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0); | |
950 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), | |
951 | blob_bytes * 2); | |
952 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0); | |
953 | } | |
954 | ||
955 | options_.blob_cache->EraseUnRefEntries(); | |
956 | ||
957 | { | |
958 | // Cache-only MultiGetBlobFromOneFile | |
959 | uint64_t bytes_read = 0; | |
960 | read_options.read_tier = ReadTier::kBlockCacheTier; | |
961 | ||
962 | std::array<Status, num_blobs> statuses_buf; | |
963 | std::array<PinnableSlice, num_blobs> value_buf; | |
964 | autovector<BlobReadRequest> blob_reqs; | |
965 | ||
966 | for (size_t i = 0; i < num_blobs; i++) { | |
967 | blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i], | |
968 | kNoCompression, &value_buf[i], &statuses_buf[i]); | |
969 | ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
970 | blob_offsets[i])); | |
971 | } | |
972 | ||
973 | get_perf_context()->Reset(); | |
974 | statistics->Reset().PermitUncheckedError(); | |
975 | ||
976 | blob_source.MultiGetBlobFromOneFile(read_options, blob_file_number, | |
977 | file_size, blob_reqs, &bytes_read); | |
978 | ||
979 | for (size_t i = 0; i < num_blobs; ++i) { | |
980 | ASSERT_TRUE(statuses_buf[i].IsIncomplete()); | |
981 | ASSERT_TRUE(value_buf[i].empty()); | |
982 | ASSERT_FALSE(value_buf[i].IsPinned()); | |
983 | ASSERT_FALSE(blob_source.TEST_BlobInCache(blob_file_number, file_size, | |
984 | blob_offsets[i])); | |
985 | } | |
986 | ||
987 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, 0); | |
988 | ASSERT_EQ((int)get_perf_context()->blob_read_count, 0); // blocking i/o | |
989 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0); // blocking i/o | |
990 | ASSERT_EQ((int)get_perf_context()->blob_checksum_time, 0); | |
991 | ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0); | |
992 | ||
993 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 2); | |
994 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), 0); | |
995 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0); | |
996 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), 0); | |
997 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0); | |
998 | } | |
999 | ||
1000 | { | |
1001 | // MultiGetBlobFromOneFile from non-existing file | |
1002 | uint64_t bytes_read = 0; | |
1003 | uint64_t non_existing_file_number = 100; | |
1004 | read_options.read_tier = ReadTier::kReadAllTier; | |
1005 | ||
1006 | std::array<Status, num_blobs> statuses_buf; | |
1007 | std::array<PinnableSlice, num_blobs> value_buf; | |
1008 | autovector<BlobReadRequest> blob_reqs; | |
1009 | ||
1010 | for (size_t i = 0; i < num_blobs; i++) { | |
1011 | blob_reqs.emplace_back(keys[i], blob_offsets[i], blob_sizes[i], | |
1012 | kNoCompression, &value_buf[i], &statuses_buf[i]); | |
1013 | ASSERT_FALSE(blob_source.TEST_BlobInCache(non_existing_file_number, | |
1014 | file_size, blob_offsets[i])); | |
1015 | } | |
1016 | ||
1017 | get_perf_context()->Reset(); | |
1018 | statistics->Reset().PermitUncheckedError(); | |
1019 | ||
1020 | blob_source.MultiGetBlobFromOneFile(read_options, non_existing_file_number, | |
1021 | file_size, blob_reqs, &bytes_read); | |
1022 | ||
1023 | for (size_t i = 0; i < num_blobs; ++i) { | |
1024 | ASSERT_TRUE(statuses_buf[i].IsIOError()); | |
1025 | ASSERT_TRUE(value_buf[i].empty()); | |
1026 | ASSERT_FALSE(value_buf[i].IsPinned()); | |
1027 | ASSERT_FALSE(blob_source.TEST_BlobInCache(non_existing_file_number, | |
1028 | file_size, blob_offsets[i])); | |
1029 | } | |
1030 | ||
1031 | ASSERT_EQ((int)get_perf_context()->blob_cache_hit_count, 0); | |
1032 | ASSERT_EQ((int)get_perf_context()->blob_read_count, 0); // blocking i/o | |
1033 | ASSERT_EQ((int)get_perf_context()->blob_read_byte, 0); // blocking i/o | |
1034 | ASSERT_EQ((int)get_perf_context()->blob_checksum_time, 0); | |
1035 | ASSERT_EQ((int)get_perf_context()->blob_decompress_time, 0); | |
1036 | ||
1037 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_MISS), num_blobs * 2); | |
1038 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_HIT), 0); | |
1039 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_ADD), 0); | |
1040 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_READ), 0); | |
1041 | ASSERT_EQ(statistics->getTickerCount(BLOB_DB_CACHE_BYTES_WRITE), 0); | |
1042 | } | |
1043 | } | |
1044 | ||
1045 | class BlobSecondaryCacheTest : public DBTestBase { | |
1046 | protected: | |
1047 | public: | |
1048 | explicit BlobSecondaryCacheTest() | |
1049 | : DBTestBase("blob_secondary_cache_test", /*env_do_fsync=*/true) { | |
1050 | options_.env = env_; | |
1051 | options_.enable_blob_files = true; | |
1052 | options_.create_if_missing = true; | |
1053 | ||
1054 | // Set a small cache capacity to evict entries from the cache, and to test | |
1055 | // that secondary cache is used properly. | |
1056 | lru_cache_opts_.capacity = 1024; | |
1057 | lru_cache_opts_.num_shard_bits = 0; | |
1058 | lru_cache_opts_.strict_capacity_limit = true; | |
1059 | lru_cache_opts_.metadata_charge_policy = kDontChargeCacheMetadata; | |
1060 | lru_cache_opts_.high_pri_pool_ratio = 0.2; | |
1061 | lru_cache_opts_.low_pri_pool_ratio = 0.2; | |
1062 | ||
1063 | secondary_cache_opts_.capacity = 8 << 20; // 8 MB | |
1064 | secondary_cache_opts_.num_shard_bits = 0; | |
1065 | secondary_cache_opts_.metadata_charge_policy = | |
1066 | kDefaultCacheMetadataChargePolicy; | |
1067 | ||
1068 | // Read blobs from the secondary cache if they are not in the primary cache | |
1069 | options_.lowest_used_cache_tier = CacheTier::kNonVolatileBlockTier; | |
1070 | ||
1071 | assert(db_->GetDbIdentity(db_id_).ok()); | |
1072 | assert(db_->GetDbSessionId(db_session_id_).ok()); | |
1073 | } | |
1074 | ||
1075 | Options options_; | |
1076 | ||
1077 | LRUCacheOptions lru_cache_opts_; | |
1078 | CompressedSecondaryCacheOptions secondary_cache_opts_; | |
1079 | ||
1080 | std::string db_id_; | |
1081 | std::string db_session_id_; | |
1082 | }; | |
1083 | ||
1084 | TEST_F(BlobSecondaryCacheTest, GetBlobsFromSecondaryCache) { | |
1085 | if (!Snappy_Supported()) { | |
1086 | return; | |
1087 | } | |
1088 | ||
1089 | secondary_cache_opts_.compression_type = kSnappyCompression; | |
1090 | lru_cache_opts_.secondary_cache = | |
1091 | NewCompressedSecondaryCache(secondary_cache_opts_); | |
1092 | options_.blob_cache = NewLRUCache(lru_cache_opts_); | |
1093 | ||
1094 | options_.cf_paths.emplace_back( | |
1095 | test::PerThreadDBPath( | |
1096 | env_, "BlobSecondaryCacheTest_GetBlobsFromSecondaryCache"), | |
1097 | 0); | |
1098 | ||
1099 | options_.statistics = CreateDBStatistics(); | |
1100 | Statistics* statistics = options_.statistics.get(); | |
1101 | assert(statistics); | |
1102 | ||
1103 | DestroyAndReopen(options_); | |
1104 | ||
1105 | ImmutableOptions immutable_options(options_); | |
1106 | ||
1107 | constexpr uint32_t column_family_id = 1; | |
1108 | constexpr bool has_ttl = false; | |
1109 | constexpr ExpirationRange expiration_range; | |
1110 | constexpr uint64_t file_number = 1; | |
1111 | ||
1112 | Random rnd(301); | |
1113 | ||
1114 | std::vector<std::string> key_strs{"key0", "key1"}; | |
1115 | std::vector<std::string> blob_strs{rnd.RandomString(512), | |
1116 | rnd.RandomString(768)}; | |
1117 | ||
1118 | std::vector<Slice> keys{key_strs[0], key_strs[1]}; | |
1119 | std::vector<Slice> blobs{blob_strs[0], blob_strs[1]}; | |
1120 | ||
1121 | std::vector<uint64_t> blob_offsets(keys.size()); | |
1122 | std::vector<uint64_t> blob_sizes(keys.size()); | |
1123 | ||
1124 | WriteBlobFile(immutable_options, column_family_id, has_ttl, expiration_range, | |
1125 | expiration_range, file_number, keys, blobs, kNoCompression, | |
1126 | blob_offsets, blob_sizes); | |
1127 | ||
1128 | constexpr size_t capacity = 1024; | |
1129 | std::shared_ptr<Cache> backing_cache = NewLRUCache(capacity); | |
1130 | ||
1131 | FileOptions file_options; | |
1132 | constexpr HistogramImpl* blob_file_read_hist = nullptr; | |
1133 | ||
1134 | std::unique_ptr<BlobFileCache> blob_file_cache(new BlobFileCache( | |
1135 | backing_cache.get(), &immutable_options, &file_options, column_family_id, | |
1136 | blob_file_read_hist, nullptr /*IOTracer*/)); | |
1137 | ||
1138 | BlobSource blob_source(&immutable_options, db_id_, db_session_id_, | |
1139 | blob_file_cache.get()); | |
1140 | ||
1141 | CacheHandleGuard<BlobFileReader> file_reader; | |
1142 | ASSERT_OK(blob_source.GetBlobFileReader(file_number, &file_reader)); | |
1143 | ASSERT_NE(file_reader.GetValue(), nullptr); | |
1144 | const uint64_t file_size = file_reader.GetValue()->GetFileSize(); | |
1145 | ASSERT_EQ(file_reader.GetValue()->GetCompressionType(), kNoCompression); | |
1146 | ||
1147 | ReadOptions read_options; | |
1148 | read_options.verify_checksums = true; | |
1149 | ||
1150 | auto blob_cache = options_.blob_cache; | |
1151 | auto secondary_cache = lru_cache_opts_.secondary_cache; | |
1152 | ||
1153 | Cache::CreateCallback create_cb = [](const void* buf, size_t size, | |
1154 | void** out_obj, | |
1155 | size_t* charge) -> Status { | |
1156 | CacheAllocationPtr allocation(new char[size]); | |
1157 | ||
1158 | return BlobContents::CreateCallback(std::move(allocation), buf, size, | |
1159 | out_obj, charge); | |
1160 | }; | |
1161 | ||
1162 | { | |
1163 | // GetBlob | |
1164 | std::vector<PinnableSlice> values(keys.size()); | |
1165 | ||
1166 | read_options.fill_cache = true; | |
1167 | get_perf_context()->Reset(); | |
1168 | ||
1169 | // key0 should be filled to the primary cache from the blob file. | |
1170 | ASSERT_OK(blob_source.GetBlob(read_options, keys[0], file_number, | |
1171 | blob_offsets[0], file_size, blob_sizes[0], | |
1172 | kNoCompression, nullptr /* prefetch_buffer */, | |
1173 | &values[0], nullptr /* bytes_read */)); | |
1174 | // Release cache handle | |
1175 | values[0].Reset(); | |
1176 | ||
1177 | // key0 should be evicted and key0's dummy item is inserted into secondary | |
1178 | // cache. key1 should be filled to the primary cache from the blob file. | |
1179 | ASSERT_OK(blob_source.GetBlob(read_options, keys[1], file_number, | |
1180 | blob_offsets[1], file_size, blob_sizes[1], | |
1181 | kNoCompression, nullptr /* prefetch_buffer */, | |
1182 | &values[1], nullptr /* bytes_read */)); | |
1183 | ||
1184 | // Release cache handle | |
1185 | values[1].Reset(); | |
1186 | ||
1187 | // key0 should be filled to the primary cache from the blob file. key1 | |
1188 | // should be evicted and key1's dummy item is inserted into secondary cache. | |
1189 | ASSERT_OK(blob_source.GetBlob(read_options, keys[0], file_number, | |
1190 | blob_offsets[0], file_size, blob_sizes[0], | |
1191 | kNoCompression, nullptr /* prefetch_buffer */, | |
1192 | &values[0], nullptr /* bytes_read */)); | |
1193 | ASSERT_EQ(values[0], blobs[0]); | |
1194 | ASSERT_TRUE( | |
1195 | blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[0])); | |
1196 | ||
1197 | // Release cache handle | |
1198 | values[0].Reset(); | |
1199 | ||
1200 | // key0 should be evicted and is inserted into secondary cache. | |
1201 | // key1 should be filled to the primary cache from the blob file. | |
1202 | ASSERT_OK(blob_source.GetBlob(read_options, keys[1], file_number, | |
1203 | blob_offsets[1], file_size, blob_sizes[1], | |
1204 | kNoCompression, nullptr /* prefetch_buffer */, | |
1205 | &values[1], nullptr /* bytes_read */)); | |
1206 | ASSERT_EQ(values[1], blobs[1]); | |
1207 | ASSERT_TRUE( | |
1208 | blob_source.TEST_BlobInCache(file_number, file_size, blob_offsets[1])); | |
1209 | ||
1210 | // Release cache handle | |
1211 | values[1].Reset(); | |
1212 | ||
1213 | OffsetableCacheKey base_cache_key(db_id_, db_session_id_, file_number); | |
1214 | ||
1215 | // blob_cache here only looks at the primary cache since we didn't provide | |
1216 | // the cache item helper for the secondary cache. However, since key0 is | |
1217 | // demoted to the secondary cache, we shouldn't be able to find it in the | |
1218 | // primary cache. | |
1219 | { | |
1220 | CacheKey cache_key = base_cache_key.WithOffset(blob_offsets[0]); | |
1221 | const Slice key0 = cache_key.AsSlice(); | |
1222 | auto handle0 = blob_cache->Lookup(key0, statistics); | |
1223 | ASSERT_EQ(handle0, nullptr); | |
1224 | ||
1225 | // key0's item should be in the secondary cache. | |
1226 | bool is_in_sec_cache = false; | |
1227 | auto sec_handle0 = | |
1228 | secondary_cache->Lookup(key0, create_cb, true, | |
1229 | /*advise_erase=*/true, is_in_sec_cache); | |
1230 | ASSERT_FALSE(is_in_sec_cache); | |
1231 | ASSERT_NE(sec_handle0, nullptr); | |
1232 | ASSERT_TRUE(sec_handle0->IsReady()); | |
1233 | auto value = static_cast<BlobContents*>(sec_handle0->Value()); | |
1234 | ASSERT_NE(value, nullptr); | |
1235 | ASSERT_EQ(value->data(), blobs[0]); | |
1236 | delete value; | |
1237 | ||
1238 | // key0 doesn't exist in the blob cache although key0's dummy | |
1239 | // item exist in the secondary cache. | |
1240 | ASSERT_FALSE(blob_source.TEST_BlobInCache(file_number, file_size, | |
1241 | blob_offsets[0])); | |
1242 | } | |
1243 | ||
1244 | // key1 should exists in the primary cache. key1's dummy item exists | |
1245 | // in the secondary cache. | |
1246 | { | |
1247 | CacheKey cache_key = base_cache_key.WithOffset(blob_offsets[1]); | |
1248 | const Slice key1 = cache_key.AsSlice(); | |
1249 | auto handle1 = blob_cache->Lookup(key1, statistics); | |
1250 | ASSERT_NE(handle1, nullptr); | |
1251 | blob_cache->Release(handle1); | |
1252 | ||
1253 | bool is_in_sec_cache = false; | |
1254 | auto sec_handle1 = | |
1255 | secondary_cache->Lookup(key1, create_cb, true, | |
1256 | /*advise_erase=*/true, is_in_sec_cache); | |
1257 | ASSERT_FALSE(is_in_sec_cache); | |
1258 | ASSERT_EQ(sec_handle1, nullptr); | |
1259 | ||
1260 | ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, | |
1261 | blob_offsets[1])); | |
1262 | } | |
1263 | ||
1264 | { | |
1265 | // fetch key0 from the blob file to the primary cache. | |
1266 | // key1 is evicted and inserted into the secondary cache. | |
1267 | ASSERT_OK(blob_source.GetBlob( | |
1268 | read_options, keys[0], file_number, blob_offsets[0], file_size, | |
1269 | blob_sizes[0], kNoCompression, nullptr /* prefetch_buffer */, | |
1270 | &values[0], nullptr /* bytes_read */)); | |
1271 | ASSERT_EQ(values[0], blobs[0]); | |
1272 | ||
1273 | // Release cache handle | |
1274 | values[0].Reset(); | |
1275 | ||
1276 | // key0 should be in the primary cache. | |
1277 | CacheKey cache_key0 = base_cache_key.WithOffset(blob_offsets[0]); | |
1278 | const Slice key0 = cache_key0.AsSlice(); | |
1279 | auto handle0 = blob_cache->Lookup(key0, statistics); | |
1280 | ASSERT_NE(handle0, nullptr); | |
1281 | auto value = static_cast<BlobContents*>(blob_cache->Value(handle0)); | |
1282 | ASSERT_NE(value, nullptr); | |
1283 | ASSERT_EQ(value->data(), blobs[0]); | |
1284 | blob_cache->Release(handle0); | |
1285 | ||
1286 | // key1 is not in the primary cache and is in the secondary cache. | |
1287 | CacheKey cache_key1 = base_cache_key.WithOffset(blob_offsets[1]); | |
1288 | const Slice key1 = cache_key1.AsSlice(); | |
1289 | auto handle1 = blob_cache->Lookup(key1, statistics); | |
1290 | ASSERT_EQ(handle1, nullptr); | |
1291 | ||
1292 | // erase key0 from the primary cache. | |
1293 | blob_cache->Erase(key0); | |
1294 | handle0 = blob_cache->Lookup(key0, statistics); | |
1295 | ASSERT_EQ(handle0, nullptr); | |
1296 | ||
1297 | // key1 promotion should succeed due to the primary cache being empty. we | |
1298 | // did't call secondary cache's Lookup() here, because it will remove the | |
1299 | // key but it won't be able to promote the key to the primary cache. | |
1300 | // Instead we use the end-to-end blob source API to read key1. | |
1301 | // In function TEST_BlobInCache, key1's dummy item is inserted into the | |
1302 | // primary cache and a standalone handle is checked by GetValue(). | |
1303 | ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, | |
1304 | blob_offsets[1])); | |
1305 | ||
1306 | // key1's dummy handle is in the primary cache and key1's item is still | |
1307 | // in the secondary cache. So, the primary cache's Lookup() without | |
1308 | // secondary cache support cannot see it. (NOTE: The dummy handle used | |
1309 | // to be a leaky abstraction but not anymore.) | |
1310 | handle1 = blob_cache->Lookup(key1, statistics); | |
1311 | ASSERT_EQ(handle1, nullptr); | |
1312 | ||
1313 | // But after another access, it is promoted to primary cache | |
1314 | ASSERT_TRUE(blob_source.TEST_BlobInCache(file_number, file_size, | |
1315 | blob_offsets[1])); | |
1316 | ||
1317 | // And Lookup() can find it (without secondary cache support) | |
1318 | handle1 = blob_cache->Lookup(key1, statistics); | |
1319 | ASSERT_NE(handle1, nullptr); | |
1320 | ASSERT_NE(blob_cache->Value(handle1), nullptr); | |
1321 | blob_cache->Release(handle1); | |
1322 | } | |
1323 | } | |
1324 | } | |
1325 | ||
1326 | class BlobSourceCacheReservationTest : public DBTestBase { | |
1327 | public: | |
1328 | explicit BlobSourceCacheReservationTest() | |
1329 | : DBTestBase("blob_source_cache_reservation_test", | |
1330 | /*env_do_fsync=*/true) { | |
1331 | options_.env = env_; | |
1332 | options_.enable_blob_files = true; | |
1333 | options_.create_if_missing = true; | |
1334 | ||
1335 | LRUCacheOptions co; | |
1336 | co.capacity = kCacheCapacity; | |
1337 | co.num_shard_bits = kNumShardBits; | |
1338 | co.metadata_charge_policy = kDontChargeCacheMetadata; | |
1339 | ||
1340 | co.high_pri_pool_ratio = 0.0; | |
1341 | co.low_pri_pool_ratio = 0.0; | |
1342 | std::shared_ptr<Cache> blob_cache = NewLRUCache(co); | |
1343 | ||
1344 | co.high_pri_pool_ratio = 0.5; | |
1345 | co.low_pri_pool_ratio = 0.5; | |
1346 | std::shared_ptr<Cache> block_cache = NewLRUCache(co); | |
1347 | ||
1348 | options_.blob_cache = blob_cache; | |
1349 | options_.lowest_used_cache_tier = CacheTier::kVolatileTier; | |
1350 | ||
1351 | BlockBasedTableOptions block_based_options; | |
1352 | block_based_options.no_block_cache = false; | |
1353 | block_based_options.block_cache = block_cache; | |
1354 | block_based_options.cache_usage_options.options_overrides.insert( | |
1355 | {CacheEntryRole::kBlobCache, | |
1356 | {/* charged = */ CacheEntryRoleOptions::Decision::kEnabled}}); | |
1357 | options_.table_factory.reset( | |
1358 | NewBlockBasedTableFactory(block_based_options)); | |
1359 | ||
1360 | assert(db_->GetDbIdentity(db_id_).ok()); | |
1361 | assert(db_->GetDbSessionId(db_session_id_).ok()); | |
1362 | } | |
1363 | ||
1364 | void GenerateKeysAndBlobs() { | |
1365 | for (size_t i = 0; i < kNumBlobs; ++i) { | |
1366 | key_strs_.push_back("key" + std::to_string(i)); | |
1367 | blob_strs_.push_back("blob" + std::to_string(i)); | |
1368 | } | |
1369 | ||
1370 | blob_file_size_ = BlobLogHeader::kSize; | |
1371 | for (size_t i = 0; i < kNumBlobs; ++i) { | |
1372 | keys_.push_back({key_strs_[i]}); | |
1373 | blobs_.push_back({blob_strs_[i]}); | |
1374 | blob_file_size_ += | |
1375 | BlobLogRecord::kHeaderSize + keys_[i].size() + blobs_[i].size(); | |
1376 | } | |
1377 | blob_file_size_ += BlobLogFooter::kSize; | |
1378 | } | |
1379 | ||
1380 | static constexpr std::size_t kSizeDummyEntry = CacheReservationManagerImpl< | |
1381 | CacheEntryRole::kBlobCache>::GetDummyEntrySize(); | |
1382 | static constexpr std::size_t kCacheCapacity = 1 * kSizeDummyEntry; | |
1383 | static constexpr int kNumShardBits = 0; // 2^0 shard | |
1384 | ||
1385 | static constexpr uint32_t kColumnFamilyId = 1; | |
1386 | static constexpr bool kHasTTL = false; | |
1387 | static constexpr uint64_t kBlobFileNumber = 1; | |
1388 | static constexpr size_t kNumBlobs = 16; | |
1389 | ||
1390 | std::vector<Slice> keys_; | |
1391 | std::vector<Slice> blobs_; | |
1392 | std::vector<std::string> key_strs_; | |
1393 | std::vector<std::string> blob_strs_; | |
1394 | uint64_t blob_file_size_; | |
1395 | ||
1396 | Options options_; | |
1397 | std::string db_id_; | |
1398 | std::string db_session_id_; | |
1399 | }; | |
1400 | ||
1401 | #ifndef ROCKSDB_LITE | |
1402 | TEST_F(BlobSourceCacheReservationTest, SimpleCacheReservation) { | |
1403 | options_.cf_paths.emplace_back( | |
1404 | test::PerThreadDBPath( | |
1405 | env_, "BlobSourceCacheReservationTest_SimpleCacheReservation"), | |
1406 | 0); | |
1407 | ||
1408 | GenerateKeysAndBlobs(); | |
1409 | ||
1410 | DestroyAndReopen(options_); | |
1411 | ||
1412 | ImmutableOptions immutable_options(options_); | |
1413 | ||
1414 | constexpr ExpirationRange expiration_range; | |
1415 | ||
1416 | std::vector<uint64_t> blob_offsets(keys_.size()); | |
1417 | std::vector<uint64_t> blob_sizes(keys_.size()); | |
1418 | ||
1419 | WriteBlobFile(immutable_options, kColumnFamilyId, kHasTTL, expiration_range, | |
1420 | expiration_range, kBlobFileNumber, keys_, blobs_, | |
1421 | kNoCompression, blob_offsets, blob_sizes); | |
1422 | ||
1423 | constexpr size_t capacity = 10; | |
1424 | std::shared_ptr<Cache> backing_cache = NewLRUCache(capacity); | |
1425 | ||
1426 | FileOptions file_options; | |
1427 | constexpr HistogramImpl* blob_file_read_hist = nullptr; | |
1428 | ||
1429 | std::unique_ptr<BlobFileCache> blob_file_cache = | |
1430 | std::make_unique<BlobFileCache>( | |
1431 | backing_cache.get(), &immutable_options, &file_options, | |
1432 | kColumnFamilyId, blob_file_read_hist, nullptr /*IOTracer*/); | |
1433 | ||
1434 | BlobSource blob_source(&immutable_options, db_id_, db_session_id_, | |
1435 | blob_file_cache.get()); | |
1436 | ||
1437 | ConcurrentCacheReservationManager* cache_res_mgr = | |
1438 | static_cast<ChargedCache*>(blob_source.GetBlobCache()) | |
1439 | ->TEST_GetCacheReservationManager(); | |
1440 | ASSERT_NE(cache_res_mgr, nullptr); | |
1441 | ||
1442 | ReadOptions read_options; | |
1443 | read_options.verify_checksums = true; | |
1444 | ||
1445 | { | |
1446 | read_options.fill_cache = false; | |
1447 | ||
1448 | std::vector<PinnableSlice> values(keys_.size()); | |
1449 | ||
1450 | for (size_t i = 0; i < kNumBlobs; ++i) { | |
1451 | ASSERT_OK(blob_source.GetBlob( | |
1452 | read_options, keys_[i], kBlobFileNumber, blob_offsets[i], | |
1453 | blob_file_size_, blob_sizes[i], kNoCompression, | |
1454 | nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */)); | |
1455 | ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0); | |
1456 | ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), 0); | |
1457 | } | |
1458 | } | |
1459 | ||
1460 | { | |
1461 | read_options.fill_cache = true; | |
1462 | ||
1463 | std::vector<PinnableSlice> values(keys_.size()); | |
1464 | ||
1465 | // num_blobs is 16, so the total blob cache usage is less than a single | |
1466 | // dummy entry. Therefore, cache reservation manager only reserves one dummy | |
1467 | // entry here. | |
1468 | uint64_t blob_bytes = 0; | |
1469 | for (size_t i = 0; i < kNumBlobs; ++i) { | |
1470 | ASSERT_OK(blob_source.GetBlob( | |
1471 | read_options, keys_[i], kBlobFileNumber, blob_offsets[i], | |
1472 | blob_file_size_, blob_sizes[i], kNoCompression, | |
1473 | nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */)); | |
1474 | ||
1475 | size_t charge = 0; | |
1476 | ASSERT_TRUE(blob_source.TEST_BlobInCache(kBlobFileNumber, blob_file_size_, | |
1477 | blob_offsets[i], &charge)); | |
1478 | ||
1479 | blob_bytes += charge; | |
1480 | ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry); | |
1481 | ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), blob_bytes); | |
1482 | ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), | |
1483 | options_.blob_cache->GetUsage()); | |
1484 | } | |
1485 | } | |
1486 | ||
1487 | { | |
1488 | OffsetableCacheKey base_cache_key(db_id_, db_session_id_, kBlobFileNumber); | |
1489 | size_t blob_bytes = options_.blob_cache->GetUsage(); | |
1490 | ||
1491 | for (size_t i = 0; i < kNumBlobs; ++i) { | |
1492 | size_t charge = 0; | |
1493 | ASSERT_TRUE(blob_source.TEST_BlobInCache(kBlobFileNumber, blob_file_size_, | |
1494 | blob_offsets[i], &charge)); | |
1495 | ||
1496 | CacheKey cache_key = base_cache_key.WithOffset(blob_offsets[i]); | |
1497 | // We didn't call options_.blob_cache->Erase() here, this is because | |
1498 | // the cache wrapper's Erase() method must be called to update the | |
1499 | // cache usage after erasing the cache entry. | |
1500 | blob_source.GetBlobCache()->Erase(cache_key.AsSlice()); | |
1501 | if (i == kNumBlobs - 1) { | |
1502 | // All the blobs got removed from the cache. cache_res_mgr should not | |
1503 | // reserve any space for them. | |
1504 | ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0); | |
1505 | } else { | |
1506 | ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry); | |
1507 | } | |
1508 | blob_bytes -= charge; | |
1509 | ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), blob_bytes); | |
1510 | ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), | |
1511 | options_.blob_cache->GetUsage()); | |
1512 | } | |
1513 | } | |
1514 | } | |
1515 | ||
1516 | TEST_F(BlobSourceCacheReservationTest, IncreaseCacheReservationOnFullCache) { | |
1517 | options_.cf_paths.emplace_back( | |
1518 | test::PerThreadDBPath( | |
1519 | env_, | |
1520 | "BlobSourceCacheReservationTest_IncreaseCacheReservationOnFullCache"), | |
1521 | 0); | |
1522 | ||
1523 | GenerateKeysAndBlobs(); | |
1524 | ||
1525 | DestroyAndReopen(options_); | |
1526 | ||
1527 | ImmutableOptions immutable_options(options_); | |
1528 | constexpr size_t blob_size = kSizeDummyEntry / (kNumBlobs / 2); | |
1529 | for (size_t i = 0; i < kNumBlobs; ++i) { | |
1530 | blob_file_size_ -= blobs_[i].size(); // old blob size | |
1531 | blob_strs_[i].resize(blob_size, '@'); | |
1532 | blobs_[i] = Slice(blob_strs_[i]); | |
1533 | blob_file_size_ += blobs_[i].size(); // new blob size | |
1534 | } | |
1535 | ||
1536 | std::vector<uint64_t> blob_offsets(keys_.size()); | |
1537 | std::vector<uint64_t> blob_sizes(keys_.size()); | |
1538 | ||
1539 | constexpr ExpirationRange expiration_range; | |
1540 | WriteBlobFile(immutable_options, kColumnFamilyId, kHasTTL, expiration_range, | |
1541 | expiration_range, kBlobFileNumber, keys_, blobs_, | |
1542 | kNoCompression, blob_offsets, blob_sizes); | |
1543 | ||
1544 | constexpr size_t capacity = 10; | |
1545 | std::shared_ptr<Cache> backing_cache = NewLRUCache(capacity); | |
1546 | ||
1547 | FileOptions file_options; | |
1548 | constexpr HistogramImpl* blob_file_read_hist = nullptr; | |
1549 | ||
1550 | std::unique_ptr<BlobFileCache> blob_file_cache = | |
1551 | std::make_unique<BlobFileCache>( | |
1552 | backing_cache.get(), &immutable_options, &file_options, | |
1553 | kColumnFamilyId, blob_file_read_hist, nullptr /*IOTracer*/); | |
1554 | ||
1555 | BlobSource blob_source(&immutable_options, db_id_, db_session_id_, | |
1556 | blob_file_cache.get()); | |
1557 | ||
1558 | ConcurrentCacheReservationManager* cache_res_mgr = | |
1559 | static_cast<ChargedCache*>(blob_source.GetBlobCache()) | |
1560 | ->TEST_GetCacheReservationManager(); | |
1561 | ASSERT_NE(cache_res_mgr, nullptr); | |
1562 | ||
1563 | ReadOptions read_options; | |
1564 | read_options.verify_checksums = true; | |
1565 | ||
1566 | { | |
1567 | read_options.fill_cache = false; | |
1568 | ||
1569 | std::vector<PinnableSlice> values(keys_.size()); | |
1570 | ||
1571 | for (size_t i = 0; i < kNumBlobs; ++i) { | |
1572 | ASSERT_OK(blob_source.GetBlob( | |
1573 | read_options, keys_[i], kBlobFileNumber, blob_offsets[i], | |
1574 | blob_file_size_, blob_sizes[i], kNoCompression, | |
1575 | nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */)); | |
1576 | ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), 0); | |
1577 | ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), 0); | |
1578 | } | |
1579 | } | |
1580 | ||
1581 | { | |
1582 | read_options.fill_cache = true; | |
1583 | ||
1584 | std::vector<PinnableSlice> values(keys_.size()); | |
1585 | ||
1586 | // Since we resized each blob to be kSizeDummyEntry / (num_blobs / 2), we | |
1587 | // can't fit all the blobs in the cache at the same time, which means we | |
1588 | // should observe cache evictions once we reach the cache's capacity. | |
1589 | // Due to the overhead of the cache and the BlobContents objects, as well as | |
1590 | // jemalloc bin sizes, this happens after inserting seven blobs. | |
1591 | uint64_t blob_bytes = 0; | |
1592 | for (size_t i = 0; i < kNumBlobs; ++i) { | |
1593 | ASSERT_OK(blob_source.GetBlob( | |
1594 | read_options, keys_[i], kBlobFileNumber, blob_offsets[i], | |
1595 | blob_file_size_, blob_sizes[i], kNoCompression, | |
1596 | nullptr /* prefetch_buffer */, &values[i], nullptr /* bytes_read */)); | |
1597 | ||
1598 | // Release cache handle | |
1599 | values[i].Reset(); | |
1600 | ||
1601 | if (i < kNumBlobs / 2 - 1) { | |
1602 | size_t charge = 0; | |
1603 | ASSERT_TRUE(blob_source.TEST_BlobInCache( | |
1604 | kBlobFileNumber, blob_file_size_, blob_offsets[i], &charge)); | |
1605 | ||
1606 | blob_bytes += charge; | |
1607 | } | |
1608 | ||
1609 | ASSERT_EQ(cache_res_mgr->GetTotalReservedCacheSize(), kSizeDummyEntry); | |
1610 | ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), blob_bytes); | |
1611 | ASSERT_EQ(cache_res_mgr->GetTotalMemoryUsed(), | |
1612 | options_.blob_cache->GetUsage()); | |
1613 | } | |
1614 | } | |
1615 | } | |
1616 | #endif // ROCKSDB_LITE | |
1617 | ||
1618 | } // namespace ROCKSDB_NAMESPACE | |
1619 | ||
1620 | int main(int argc, char** argv) { | |
1621 | ROCKSDB_NAMESPACE::port::InstallStackTraceHandler(); | |
1622 | ::testing::InitGoogleTest(&argc, argv); | |
1623 | return RUN_ALL_TESTS(); | |
1624 | } |