1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 #include "cache/compressed_secondary_cache.h"
12 #include "memory/memory_allocator.h"
13 #include "monitoring/perf_context_imp.h"
14 #include "util/compression.h"
15 #include "util/string_util.h"
17 namespace ROCKSDB_NAMESPACE
{
19 CompressedSecondaryCache::CompressedSecondaryCache(
20 size_t capacity
, int num_shard_bits
, bool strict_capacity_limit
,
21 double high_pri_pool_ratio
, double low_pri_pool_ratio
,
22 std::shared_ptr
<MemoryAllocator
> memory_allocator
, bool use_adaptive_mutex
,
23 CacheMetadataChargePolicy metadata_charge_policy
,
24 CompressionType compression_type
, uint32_t compress_format_version
,
25 bool enable_custom_split_merge
)
26 : cache_options_(capacity
, num_shard_bits
, strict_capacity_limit
,
27 high_pri_pool_ratio
, low_pri_pool_ratio
, memory_allocator
,
28 use_adaptive_mutex
, metadata_charge_policy
,
29 compression_type
, compress_format_version
,
30 enable_custom_split_merge
) {
32 NewLRUCache(capacity
, num_shard_bits
, strict_capacity_limit
,
33 high_pri_pool_ratio
, memory_allocator
, use_adaptive_mutex
,
34 metadata_charge_policy
, low_pri_pool_ratio
);
37 CompressedSecondaryCache::~CompressedSecondaryCache() { cache_
.reset(); }
39 std::unique_ptr
<SecondaryCacheResultHandle
> CompressedSecondaryCache::Lookup(
40 const Slice
& key
, const Cache::CreateCallback
& create_cb
, bool /*wait*/,
41 bool advise_erase
, bool& is_in_sec_cache
) {
42 std::unique_ptr
<SecondaryCacheResultHandle
> handle
;
43 is_in_sec_cache
= false;
44 Cache::Handle
* lru_handle
= cache_
->Lookup(key
);
45 if (lru_handle
== nullptr) {
49 void* handle_value
= cache_
->Value(lru_handle
);
50 if (handle_value
== nullptr) {
51 cache_
->Release(lru_handle
, /*erase_if_last_ref=*/false);
55 CacheAllocationPtr
* ptr
{nullptr};
56 CacheAllocationPtr merged_value
;
57 size_t handle_value_charge
{0};
58 if (cache_options_
.enable_custom_split_merge
) {
59 CacheValueChunk
* value_chunk_ptr
=
60 reinterpret_cast<CacheValueChunk
*>(handle_value
);
61 merged_value
= MergeChunksIntoValue(value_chunk_ptr
, handle_value_charge
);
64 ptr
= reinterpret_cast<CacheAllocationPtr
*>(handle_value
);
65 handle_value_charge
= cache_
->GetCharge(lru_handle
);
71 if (cache_options_
.compression_type
== kNoCompression
) {
72 s
= create_cb(ptr
->get(), handle_value_charge
, &value
, &charge
);
74 UncompressionContext
uncompression_context(cache_options_
.compression_type
);
75 UncompressionInfo
uncompression_info(uncompression_context
,
76 UncompressionDict::GetEmptyDict(),
77 cache_options_
.compression_type
);
79 size_t uncompressed_size
{0};
80 CacheAllocationPtr uncompressed
= UncompressData(
81 uncompression_info
, (char*)ptr
->get(), handle_value_charge
,
82 &uncompressed_size
, cache_options_
.compress_format_version
,
83 cache_options_
.memory_allocator
.get());
86 cache_
->Release(lru_handle
, /*erase_if_last_ref=*/true);
89 s
= create_cb(uncompressed
.get(), uncompressed_size
, &value
, &charge
);
93 cache_
->Release(lru_handle
, /*erase_if_last_ref=*/true);
98 cache_
->Release(lru_handle
, /*erase_if_last_ref=*/true);
99 // Insert a dummy handle.
101 ->Insert(key
, /*value=*/nullptr, /*charge=*/0,
102 GetDeletionCallback(cache_options_
.enable_custom_split_merge
))
103 .PermitUncheckedError();
105 is_in_sec_cache
= true;
106 cache_
->Release(lru_handle
, /*erase_if_last_ref=*/false);
108 handle
.reset(new CompressedSecondaryCacheResultHandle(value
, charge
));
112 Status
CompressedSecondaryCache::Insert(const Slice
& key
, void* value
,
113 const Cache::CacheItemHelper
* helper
) {
114 if (value
== nullptr) {
115 return Status::InvalidArgument();
118 Cache::Handle
* lru_handle
= cache_
->Lookup(key
);
119 Cache::DeleterFn del_cb
=
120 GetDeletionCallback(cache_options_
.enable_custom_split_merge
);
121 if (lru_handle
== nullptr) {
122 PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count
, 1);
123 // Insert a dummy handle if the handle is evicted for the first time.
124 return cache_
->Insert(key
, /*value=*/nullptr, /*charge=*/0, del_cb
);
126 cache_
->Release(lru_handle
, /*erase_if_last_ref=*/false);
129 size_t size
= (*helper
->size_cb
)(value
);
130 CacheAllocationPtr ptr
=
131 AllocateBlock(size
, cache_options_
.memory_allocator
.get());
133 Status s
= (*helper
->saveto_cb
)(value
, 0, size
, ptr
.get());
137 Slice
val(ptr
.get(), size
);
139 std::string compressed_val
;
140 if (cache_options_
.compression_type
!= kNoCompression
) {
141 PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes
, size
);
142 CompressionOptions compression_opts
;
143 CompressionContext
compression_context(cache_options_
.compression_type
);
144 uint64_t sample_for_compression
{0};
145 CompressionInfo
compression_info(
146 compression_opts
, compression_context
, CompressionDict::GetEmptyDict(),
147 cache_options_
.compression_type
, sample_for_compression
);
150 CompressData(val
, compression_info
,
151 cache_options_
.compress_format_version
, &compressed_val
);
154 return Status::Corruption("Error compressing value.");
157 val
= Slice(compressed_val
);
158 size
= compressed_val
.size();
159 PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes
, size
);
161 if (!cache_options_
.enable_custom_split_merge
) {
162 ptr
= AllocateBlock(size
, cache_options_
.memory_allocator
.get());
163 memcpy(ptr
.get(), compressed_val
.data(), size
);
167 PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count
, 1);
168 if (cache_options_
.enable_custom_split_merge
) {
170 CacheValueChunk
* value_chunks_head
=
171 SplitValueIntoChunks(val
, cache_options_
.compression_type
, charge
);
172 return cache_
->Insert(key
, value_chunks_head
, charge
, del_cb
);
174 CacheAllocationPtr
* buf
= new CacheAllocationPtr(std::move(ptr
));
175 return cache_
->Insert(key
, buf
, size
, del_cb
);
179 void CompressedSecondaryCache::Erase(const Slice
& key
) { cache_
->Erase(key
); }
181 Status
CompressedSecondaryCache::SetCapacity(size_t capacity
) {
182 MutexLock
l(&capacity_mutex_
);
183 cache_options_
.capacity
= capacity
;
184 cache_
->SetCapacity(capacity
);
188 Status
CompressedSecondaryCache::GetCapacity(size_t& capacity
) {
189 MutexLock
l(&capacity_mutex_
);
190 capacity
= cache_options_
.capacity
;
194 std::string
CompressedSecondaryCache::GetPrintableOptions() const {
197 const int kBufferSize
{200};
198 char buffer
[kBufferSize
];
199 ret
.append(cache_
->GetPrintableOptions());
200 snprintf(buffer
, kBufferSize
, " compression_type : %s\n",
201 CompressionTypeToString(cache_options_
.compression_type
).c_str());
203 snprintf(buffer
, kBufferSize
, " compress_format_version : %d\n",
204 cache_options_
.compress_format_version
);
209 CompressedSecondaryCache::CacheValueChunk
*
210 CompressedSecondaryCache::SplitValueIntoChunks(const Slice
& value
,
211 CompressionType compression_type
,
213 assert(!value
.empty());
214 const char* src_ptr
= value
.data();
215 size_t src_size
{value
.size()};
217 CacheValueChunk dummy_head
= CacheValueChunk();
218 CacheValueChunk
* current_chunk
= &dummy_head
;
219 // Do not split when value size is large or there is no compression.
220 size_t predicted_chunk_size
{0};
221 size_t actual_chunk_size
{0};
223 while (src_size
> 0) {
224 predicted_chunk_size
= sizeof(CacheValueChunk
) - 1 + src_size
;
226 std::upper_bound(malloc_bin_sizes_
.begin(), malloc_bin_sizes_
.end(),
227 predicted_chunk_size
);
228 // Do not split when value size is too small, too large, close to a bin
229 // size, or there is no compression.
230 if (upper
== malloc_bin_sizes_
.begin() ||
231 upper
== malloc_bin_sizes_
.end() ||
232 *upper
- predicted_chunk_size
< malloc_bin_sizes_
.front() ||
233 compression_type
== kNoCompression
) {
234 tmp_size
= predicted_chunk_size
;
236 tmp_size
= *(--upper
);
239 CacheValueChunk
* new_chunk
=
240 reinterpret_cast<CacheValueChunk
*>(new char[tmp_size
]);
241 current_chunk
->next
= new_chunk
;
242 current_chunk
= current_chunk
->next
;
243 actual_chunk_size
= tmp_size
- sizeof(CacheValueChunk
) + 1;
244 memcpy(current_chunk
->data
, src_ptr
, actual_chunk_size
);
245 current_chunk
->size
= actual_chunk_size
;
246 src_ptr
+= actual_chunk_size
;
247 src_size
-= actual_chunk_size
;
250 current_chunk
->next
= nullptr;
252 return dummy_head
.next
;
255 CacheAllocationPtr
CompressedSecondaryCache::MergeChunksIntoValue(
256 const void* chunks_head
, size_t& charge
) {
257 const CacheValueChunk
* head
=
258 reinterpret_cast<const CacheValueChunk
*>(chunks_head
);
259 const CacheValueChunk
* current_chunk
= head
;
261 while (current_chunk
!= nullptr) {
262 charge
+= current_chunk
->size
;
263 current_chunk
= current_chunk
->next
;
266 CacheAllocationPtr ptr
=
267 AllocateBlock(charge
, cache_options_
.memory_allocator
.get());
268 current_chunk
= head
;
270 while (current_chunk
!= nullptr) {
271 memcpy(ptr
.get() + pos
, current_chunk
->data
, current_chunk
->size
);
272 pos
+= current_chunk
->size
;
273 current_chunk
= current_chunk
->next
;
279 Cache::DeleterFn
CompressedSecondaryCache::GetDeletionCallback(
280 bool enable_custom_split_merge
) {
281 if (enable_custom_split_merge
) {
282 return [](const Slice
& /*key*/, void* obj
) {
283 CacheValueChunk
* chunks_head
= reinterpret_cast<CacheValueChunk
*>(obj
);
284 while (chunks_head
!= nullptr) {
285 CacheValueChunk
* tmp_chunk
= chunks_head
;
286 chunks_head
= chunks_head
->next
;
292 return [](const Slice
& /*key*/, void* obj
) {
293 delete reinterpret_cast<CacheAllocationPtr
*>(obj
);
299 std::shared_ptr
<SecondaryCache
> NewCompressedSecondaryCache(
300 size_t capacity
, int num_shard_bits
, bool strict_capacity_limit
,
301 double high_pri_pool_ratio
, double low_pri_pool_ratio
,
302 std::shared_ptr
<MemoryAllocator
> memory_allocator
, bool use_adaptive_mutex
,
303 CacheMetadataChargePolicy metadata_charge_policy
,
304 CompressionType compression_type
, uint32_t compress_format_version
,
305 bool enable_custom_split_merge
) {
306 return std::make_shared
<CompressedSecondaryCache
>(
307 capacity
, num_shard_bits
, strict_capacity_limit
, high_pri_pool_ratio
,
308 low_pri_pool_ratio
, memory_allocator
, use_adaptive_mutex
,
309 metadata_charge_policy
, compression_type
, compress_format_version
,
310 enable_custom_split_merge
);
313 std::shared_ptr
<SecondaryCache
> NewCompressedSecondaryCache(
314 const CompressedSecondaryCacheOptions
& opts
) {
315 // The secondary_cache is disabled for this LRUCache instance.
316 assert(opts
.secondary_cache
== nullptr);
317 return NewCompressedSecondaryCache(
318 opts
.capacity
, opts
.num_shard_bits
, opts
.strict_capacity_limit
,
319 opts
.high_pri_pool_ratio
, opts
.low_pri_pool_ratio
, opts
.memory_allocator
,
320 opts
.use_adaptive_mutex
, opts
.metadata_charge_policy
,
321 opts
.compression_type
, opts
.compress_format_version
,
322 opts
.enable_custom_split_merge
);
325 } // namespace ROCKSDB_NAMESPACE