]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/cache/compressed_secondary_cache.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / cache / compressed_secondary_cache.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5
6 #include "cache/compressed_secondary_cache.h"
7
8 #include <algorithm>
9 #include <cstdint>
10 #include <memory>
11
12 #include "memory/memory_allocator.h"
13 #include "monitoring/perf_context_imp.h"
14 #include "util/compression.h"
15 #include "util/string_util.h"
16
17 namespace ROCKSDB_NAMESPACE {
18
19 CompressedSecondaryCache::CompressedSecondaryCache(
20 size_t capacity, int num_shard_bits, bool strict_capacity_limit,
21 double high_pri_pool_ratio, double low_pri_pool_ratio,
22 std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex,
23 CacheMetadataChargePolicy metadata_charge_policy,
24 CompressionType compression_type, uint32_t compress_format_version,
25 bool enable_custom_split_merge)
26 : cache_options_(capacity, num_shard_bits, strict_capacity_limit,
27 high_pri_pool_ratio, low_pri_pool_ratio, memory_allocator,
28 use_adaptive_mutex, metadata_charge_policy,
29 compression_type, compress_format_version,
30 enable_custom_split_merge) {
31 cache_ =
32 NewLRUCache(capacity, num_shard_bits, strict_capacity_limit,
33 high_pri_pool_ratio, memory_allocator, use_adaptive_mutex,
34 metadata_charge_policy, low_pri_pool_ratio);
35 }
36
37 CompressedSecondaryCache::~CompressedSecondaryCache() { cache_.reset(); }
38
39 std::unique_ptr<SecondaryCacheResultHandle> CompressedSecondaryCache::Lookup(
40 const Slice& key, const Cache::CreateCallback& create_cb, bool /*wait*/,
41 bool advise_erase, bool& is_in_sec_cache) {
42 std::unique_ptr<SecondaryCacheResultHandle> handle;
43 is_in_sec_cache = false;
44 Cache::Handle* lru_handle = cache_->Lookup(key);
45 if (lru_handle == nullptr) {
46 return nullptr;
47 }
48
49 void* handle_value = cache_->Value(lru_handle);
50 if (handle_value == nullptr) {
51 cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
52 return nullptr;
53 }
54
55 CacheAllocationPtr* ptr{nullptr};
56 CacheAllocationPtr merged_value;
57 size_t handle_value_charge{0};
58 if (cache_options_.enable_custom_split_merge) {
59 CacheValueChunk* value_chunk_ptr =
60 reinterpret_cast<CacheValueChunk*>(handle_value);
61 merged_value = MergeChunksIntoValue(value_chunk_ptr, handle_value_charge);
62 ptr = &merged_value;
63 } else {
64 ptr = reinterpret_cast<CacheAllocationPtr*>(handle_value);
65 handle_value_charge = cache_->GetCharge(lru_handle);
66 }
67
68 Status s;
69 void* value{nullptr};
70 size_t charge{0};
71 if (cache_options_.compression_type == kNoCompression) {
72 s = create_cb(ptr->get(), handle_value_charge, &value, &charge);
73 } else {
74 UncompressionContext uncompression_context(cache_options_.compression_type);
75 UncompressionInfo uncompression_info(uncompression_context,
76 UncompressionDict::GetEmptyDict(),
77 cache_options_.compression_type);
78
79 size_t uncompressed_size{0};
80 CacheAllocationPtr uncompressed = UncompressData(
81 uncompression_info, (char*)ptr->get(), handle_value_charge,
82 &uncompressed_size, cache_options_.compress_format_version,
83 cache_options_.memory_allocator.get());
84
85 if (!uncompressed) {
86 cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
87 return nullptr;
88 }
89 s = create_cb(uncompressed.get(), uncompressed_size, &value, &charge);
90 }
91
92 if (!s.ok()) {
93 cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
94 return nullptr;
95 }
96
97 if (advise_erase) {
98 cache_->Release(lru_handle, /*erase_if_last_ref=*/true);
99 // Insert a dummy handle.
100 cache_
101 ->Insert(key, /*value=*/nullptr, /*charge=*/0,
102 GetDeletionCallback(cache_options_.enable_custom_split_merge))
103 .PermitUncheckedError();
104 } else {
105 is_in_sec_cache = true;
106 cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
107 }
108 handle.reset(new CompressedSecondaryCacheResultHandle(value, charge));
109 return handle;
110 }
111
112 Status CompressedSecondaryCache::Insert(const Slice& key, void* value,
113 const Cache::CacheItemHelper* helper) {
114 if (value == nullptr) {
115 return Status::InvalidArgument();
116 }
117
118 Cache::Handle* lru_handle = cache_->Lookup(key);
119 Cache::DeleterFn del_cb =
120 GetDeletionCallback(cache_options_.enable_custom_split_merge);
121 if (lru_handle == nullptr) {
122 PERF_COUNTER_ADD(compressed_sec_cache_insert_dummy_count, 1);
123 // Insert a dummy handle if the handle is evicted for the first time.
124 return cache_->Insert(key, /*value=*/nullptr, /*charge=*/0, del_cb);
125 } else {
126 cache_->Release(lru_handle, /*erase_if_last_ref=*/false);
127 }
128
129 size_t size = (*helper->size_cb)(value);
130 CacheAllocationPtr ptr =
131 AllocateBlock(size, cache_options_.memory_allocator.get());
132
133 Status s = (*helper->saveto_cb)(value, 0, size, ptr.get());
134 if (!s.ok()) {
135 return s;
136 }
137 Slice val(ptr.get(), size);
138
139 std::string compressed_val;
140 if (cache_options_.compression_type != kNoCompression) {
141 PERF_COUNTER_ADD(compressed_sec_cache_uncompressed_bytes, size);
142 CompressionOptions compression_opts;
143 CompressionContext compression_context(cache_options_.compression_type);
144 uint64_t sample_for_compression{0};
145 CompressionInfo compression_info(
146 compression_opts, compression_context, CompressionDict::GetEmptyDict(),
147 cache_options_.compression_type, sample_for_compression);
148
149 bool success =
150 CompressData(val, compression_info,
151 cache_options_.compress_format_version, &compressed_val);
152
153 if (!success) {
154 return Status::Corruption("Error compressing value.");
155 }
156
157 val = Slice(compressed_val);
158 size = compressed_val.size();
159 PERF_COUNTER_ADD(compressed_sec_cache_compressed_bytes, size);
160
161 if (!cache_options_.enable_custom_split_merge) {
162 ptr = AllocateBlock(size, cache_options_.memory_allocator.get());
163 memcpy(ptr.get(), compressed_val.data(), size);
164 }
165 }
166
167 PERF_COUNTER_ADD(compressed_sec_cache_insert_real_count, 1);
168 if (cache_options_.enable_custom_split_merge) {
169 size_t charge{0};
170 CacheValueChunk* value_chunks_head =
171 SplitValueIntoChunks(val, cache_options_.compression_type, charge);
172 return cache_->Insert(key, value_chunks_head, charge, del_cb);
173 } else {
174 CacheAllocationPtr* buf = new CacheAllocationPtr(std::move(ptr));
175 return cache_->Insert(key, buf, size, del_cb);
176 }
177 }
178
179 void CompressedSecondaryCache::Erase(const Slice& key) { cache_->Erase(key); }
180
181 Status CompressedSecondaryCache::SetCapacity(size_t capacity) {
182 MutexLock l(&capacity_mutex_);
183 cache_options_.capacity = capacity;
184 cache_->SetCapacity(capacity);
185 return Status::OK();
186 }
187
188 Status CompressedSecondaryCache::GetCapacity(size_t& capacity) {
189 MutexLock l(&capacity_mutex_);
190 capacity = cache_options_.capacity;
191 return Status::OK();
192 }
193
194 std::string CompressedSecondaryCache::GetPrintableOptions() const {
195 std::string ret;
196 ret.reserve(20000);
197 const int kBufferSize{200};
198 char buffer[kBufferSize];
199 ret.append(cache_->GetPrintableOptions());
200 snprintf(buffer, kBufferSize, " compression_type : %s\n",
201 CompressionTypeToString(cache_options_.compression_type).c_str());
202 ret.append(buffer);
203 snprintf(buffer, kBufferSize, " compress_format_version : %d\n",
204 cache_options_.compress_format_version);
205 ret.append(buffer);
206 return ret;
207 }
208
209 CompressedSecondaryCache::CacheValueChunk*
210 CompressedSecondaryCache::SplitValueIntoChunks(const Slice& value,
211 CompressionType compression_type,
212 size_t& charge) {
213 assert(!value.empty());
214 const char* src_ptr = value.data();
215 size_t src_size{value.size()};
216
217 CacheValueChunk dummy_head = CacheValueChunk();
218 CacheValueChunk* current_chunk = &dummy_head;
219 // Do not split when value size is large or there is no compression.
220 size_t predicted_chunk_size{0};
221 size_t actual_chunk_size{0};
222 size_t tmp_size{0};
223 while (src_size > 0) {
224 predicted_chunk_size = sizeof(CacheValueChunk) - 1 + src_size;
225 auto upper =
226 std::upper_bound(malloc_bin_sizes_.begin(), malloc_bin_sizes_.end(),
227 predicted_chunk_size);
228 // Do not split when value size is too small, too large, close to a bin
229 // size, or there is no compression.
230 if (upper == malloc_bin_sizes_.begin() ||
231 upper == malloc_bin_sizes_.end() ||
232 *upper - predicted_chunk_size < malloc_bin_sizes_.front() ||
233 compression_type == kNoCompression) {
234 tmp_size = predicted_chunk_size;
235 } else {
236 tmp_size = *(--upper);
237 }
238
239 CacheValueChunk* new_chunk =
240 reinterpret_cast<CacheValueChunk*>(new char[tmp_size]);
241 current_chunk->next = new_chunk;
242 current_chunk = current_chunk->next;
243 actual_chunk_size = tmp_size - sizeof(CacheValueChunk) + 1;
244 memcpy(current_chunk->data, src_ptr, actual_chunk_size);
245 current_chunk->size = actual_chunk_size;
246 src_ptr += actual_chunk_size;
247 src_size -= actual_chunk_size;
248 charge += tmp_size;
249 }
250 current_chunk->next = nullptr;
251
252 return dummy_head.next;
253 }
254
255 CacheAllocationPtr CompressedSecondaryCache::MergeChunksIntoValue(
256 const void* chunks_head, size_t& charge) {
257 const CacheValueChunk* head =
258 reinterpret_cast<const CacheValueChunk*>(chunks_head);
259 const CacheValueChunk* current_chunk = head;
260 charge = 0;
261 while (current_chunk != nullptr) {
262 charge += current_chunk->size;
263 current_chunk = current_chunk->next;
264 }
265
266 CacheAllocationPtr ptr =
267 AllocateBlock(charge, cache_options_.memory_allocator.get());
268 current_chunk = head;
269 size_t pos{0};
270 while (current_chunk != nullptr) {
271 memcpy(ptr.get() + pos, current_chunk->data, current_chunk->size);
272 pos += current_chunk->size;
273 current_chunk = current_chunk->next;
274 }
275
276 return ptr;
277 }
278
279 Cache::DeleterFn CompressedSecondaryCache::GetDeletionCallback(
280 bool enable_custom_split_merge) {
281 if (enable_custom_split_merge) {
282 return [](const Slice& /*key*/, void* obj) {
283 CacheValueChunk* chunks_head = reinterpret_cast<CacheValueChunk*>(obj);
284 while (chunks_head != nullptr) {
285 CacheValueChunk* tmp_chunk = chunks_head;
286 chunks_head = chunks_head->next;
287 tmp_chunk->Free();
288 obj = nullptr;
289 };
290 };
291 } else {
292 return [](const Slice& /*key*/, void* obj) {
293 delete reinterpret_cast<CacheAllocationPtr*>(obj);
294 obj = nullptr;
295 };
296 }
297 }
298
299 std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache(
300 size_t capacity, int num_shard_bits, bool strict_capacity_limit,
301 double high_pri_pool_ratio, double low_pri_pool_ratio,
302 std::shared_ptr<MemoryAllocator> memory_allocator, bool use_adaptive_mutex,
303 CacheMetadataChargePolicy metadata_charge_policy,
304 CompressionType compression_type, uint32_t compress_format_version,
305 bool enable_custom_split_merge) {
306 return std::make_shared<CompressedSecondaryCache>(
307 capacity, num_shard_bits, strict_capacity_limit, high_pri_pool_ratio,
308 low_pri_pool_ratio, memory_allocator, use_adaptive_mutex,
309 metadata_charge_policy, compression_type, compress_format_version,
310 enable_custom_split_merge);
311 }
312
313 std::shared_ptr<SecondaryCache> NewCompressedSecondaryCache(
314 const CompressedSecondaryCacheOptions& opts) {
315 // The secondary_cache is disabled for this LRUCache instance.
316 assert(opts.secondary_cache == nullptr);
317 return NewCompressedSecondaryCache(
318 opts.capacity, opts.num_shard_bits, opts.strict_capacity_limit,
319 opts.high_pri_pool_ratio, opts.low_pri_pool_ratio, opts.memory_allocator,
320 opts.use_adaptive_mutex, opts.metadata_charge_policy,
321 opts.compression_type, opts.compress_format_version,
322 opts.enable_custom_split_merge);
323 }
324
325 } // namespace ROCKSDB_NAMESPACE