]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / utilities / blob_db / blob_compaction_filter.cc
CommitLineData
11fdf7f2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5
6#ifndef ROCKSDB_LITE
7
8#include "utilities/blob_db/blob_compaction_filter.h"
11fdf7f2 9
f67539c2
TL
10#include <cinttypes>
11
20effc67
TL
12#include "db/dbformat.h"
13#include "test_util/sync_point.h"
14
f67539c2 15namespace ROCKSDB_NAMESPACE {
11fdf7f2
TL
16namespace blob_db {
17
20effc67
TL
18BlobIndexCompactionFilterBase::~BlobIndexCompactionFilterBase() {
19 if (blob_file_) {
20 CloseAndRegisterNewBlobFile();
21 }
22 RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
23 RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_);
24 RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_);
25 RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_);
26}
27
f67539c2 28CompactionFilter::Decision BlobIndexCompactionFilterBase::FilterV2(
20effc67
TL
29 int level, const Slice& key, ValueType value_type, const Slice& value,
30 std::string* new_value, std::string* skip_until) const {
31 const CompactionFilter* ucf = user_comp_filter();
f67539c2 32 if (value_type != kBlobIndex) {
20effc67
TL
33 if (ucf == nullptr) {
34 return Decision::kKeep;
35 }
36 // Apply user compaction filter for inlined data.
37 CompactionFilter::Decision decision =
38 ucf->FilterV2(level, key, value_type, value, new_value, skip_until);
39 if (decision == Decision::kChangeValue) {
40 return HandleValueChange(key, new_value);
41 }
42 return decision;
f67539c2
TL
43 }
44 BlobIndex blob_index;
45 Status s = blob_index.DecodeFrom(value);
46 if (!s.ok()) {
47 // Unable to decode blob index. Keeping the value.
48 return Decision::kKeep;
49 }
50 if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) {
51 // Expired
52 expired_count_++;
53 expired_size_ += key.size() + value.size();
54 return Decision::kRemove;
55 }
56 if (!blob_index.IsInlined() &&
57 blob_index.file_number() < context_.next_file_number &&
58 context_.current_blob_files.count(blob_index.file_number()) == 0) {
59 // Corresponding blob file gone (most likely, evicted by FIFO eviction).
60 evicted_count_++;
61 evicted_size_ += key.size() + value.size();
62 return Decision::kRemove;
63 }
64 if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() &&
65 blob_index.expiration() < context_.evict_expiration_up_to) {
66 // Hack: Internal key is passed to BlobIndexCompactionFilter for it to
67 // get sequence number.
68 ParsedInternalKey ikey;
20effc67
TL
69 if (!ParseInternalKey(
70 key, &ikey,
71 context_.blob_db_impl->db_options_.allow_data_in_errors)
72 .ok()) {
73 assert(false);
74 return Decision::kKeep;
75 }
f67539c2
TL
76 // Remove keys that could have been remove by last FIFO eviction.
77 // If get error while parsing key, ignore and continue.
20effc67 78 if (ikey.sequence < context_.fifo_eviction_seq) {
f67539c2
TL
79 evicted_count_++;
80 evicted_size_ += key.size() + value.size();
81 return Decision::kRemove;
82 }
83 }
20effc67
TL
84 // Apply user compaction filter for all non-TTL blob data.
85 if (ucf != nullptr && !blob_index.HasTTL()) {
86 // Hack: Internal key is passed to BlobIndexCompactionFilter for it to
87 // get sequence number.
88 ParsedInternalKey ikey;
89 if (!ParseInternalKey(
90 key, &ikey,
91 context_.blob_db_impl->db_options_.allow_data_in_errors)
92 .ok()) {
93 assert(false);
94 return Decision::kKeep;
95 }
96 // Read value from blob file.
97 PinnableSlice blob;
98 CompressionType compression_type = kNoCompression;
99 constexpr bool need_decompress = true;
100 if (!ReadBlobFromOldFile(ikey.user_key, blob_index, &blob, need_decompress,
101 &compression_type)) {
102 return Decision::kIOError;
103 }
104 CompactionFilter::Decision decision = ucf->FilterV2(
105 level, ikey.user_key, kValue, blob, new_value, skip_until);
106 if (decision == Decision::kChangeValue) {
107 return HandleValueChange(ikey.user_key, new_value);
108 }
109 return decision;
110 }
f67539c2
TL
111 return Decision::kKeep;
112}
113
20effc67
TL
114CompactionFilter::Decision BlobIndexCompactionFilterBase::HandleValueChange(
115 const Slice& key, std::string* new_value) const {
116 BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
117 assert(blob_db_impl);
118
119 if (new_value->size() < blob_db_impl->bdb_options_.min_blob_size) {
120 // Keep new_value inlined.
121 return Decision::kChangeValue;
122 }
123 if (!OpenNewBlobFileIfNeeded()) {
124 return Decision::kIOError;
125 }
126 Slice new_blob_value(*new_value);
127 std::string compression_output;
128 if (blob_db_impl->bdb_options_.compression != kNoCompression) {
129 new_blob_value =
130 blob_db_impl->GetCompressedSlice(new_blob_value, &compression_output);
f67539c2 131 }
20effc67
TL
132 uint64_t new_blob_file_number = 0;
133 uint64_t new_blob_offset = 0;
134 if (!WriteBlobToNewFile(key, new_blob_value, &new_blob_file_number,
135 &new_blob_offset)) {
136 return Decision::kIOError;
137 }
138 if (!CloseAndRegisterNewBlobFileIfNeeded()) {
139 return Decision::kIOError;
140 }
141 BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset,
142 new_blob_value.size(),
143 blob_db_impl->bdb_options_.compression);
144 return Decision::kChangeBlobIndex;
145}
f67539c2 146
20effc67
TL
147BlobIndexCompactionFilterGC::~BlobIndexCompactionFilterGC() {
148 assert(context().blob_db_impl);
f67539c2 149
20effc67 150 ROCKS_LOG_INFO(context().blob_db_impl->db_options_.info_log,
f67539c2
TL
151 "GC pass finished %s: encountered %" PRIu64 " blobs (%" PRIu64
152 " bytes), relocated %" PRIu64 " blobs (%" PRIu64
153 " bytes), created %" PRIu64 " new blob file(s)",
154 !gc_stats_.HasError() ? "successfully" : "with failure",
155 gc_stats_.AllBlobs(), gc_stats_.AllBytes(),
156 gc_stats_.RelocatedBlobs(), gc_stats_.RelocatedBytes(),
157 gc_stats_.NewFiles());
158
159 RecordTick(statistics(), BLOB_DB_GC_NUM_KEYS_RELOCATED,
160 gc_stats_.RelocatedBlobs());
161 RecordTick(statistics(), BLOB_DB_GC_BYTES_RELOCATED,
162 gc_stats_.RelocatedBytes());
163 RecordTick(statistics(), BLOB_DB_GC_NUM_NEW_FILES, gc_stats_.NewFiles());
164 RecordTick(statistics(), BLOB_DB_GC_FAILURES, gc_stats_.HasError());
165}
166
20effc67
TL
167bool BlobIndexCompactionFilterBase::IsBlobFileOpened() const {
168 if (blob_file_) {
169 assert(writer_);
170 return true;
f67539c2 171 }
20effc67 172 return false;
f67539c2
TL
173}
174
20effc67
TL
175bool BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded() const {
176 if (IsBlobFileOpened()) {
f67539c2
TL
177 return true;
178 }
179
20effc67 180 BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
f67539c2
TL
181 assert(blob_db_impl);
182
183 const Status s = blob_db_impl->CreateBlobFileAndWriter(
20effc67
TL
184 /* has_ttl */ false, ExpirationRange(), "compaction/GC", &blob_file_,
185 &writer_);
f67539c2 186 if (!s.ok()) {
20effc67
TL
187 ROCKS_LOG_ERROR(
188 blob_db_impl->db_options_.info_log,
189 "Error opening new blob file during compaction/GC, status: %s",
190 s.ToString().c_str());
191 blob_file_.reset();
192 writer_.reset();
f67539c2 193 return false;
11fdf7f2
TL
194 }
195
f67539c2
TL
196 assert(blob_file_);
197 assert(writer_);
11fdf7f2 198
f67539c2
TL
199 return true;
200}
201
20effc67 202bool BlobIndexCompactionFilterBase::ReadBlobFromOldFile(
f67539c2 203 const Slice& key, const BlobIndex& blob_index, PinnableSlice* blob,
20effc67
TL
204 bool need_decompress, CompressionType* compression_type) const {
205 BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
f67539c2
TL
206 assert(blob_db_impl);
207
20effc67 208 Status s = blob_db_impl->GetRawBlobFromFile(
f67539c2
TL
209 key, blob_index.file_number(), blob_index.offset(), blob_index.size(),
210 blob, compression_type);
211
212 if (!s.ok()) {
20effc67
TL
213 ROCKS_LOG_ERROR(
214 blob_db_impl->db_options_.info_log,
215 "Error reading blob during compaction/GC, key: %s (%s), status: %s",
216 key.ToString(/* output_hex */ true).c_str(),
217 blob_index.DebugString(/* output_hex */ true).c_str(),
218 s.ToString().c_str());
f67539c2
TL
219
220 return false;
221 }
222
20effc67
TL
223 if (need_decompress && *compression_type != kNoCompression) {
224 s = blob_db_impl->DecompressSlice(*blob, *compression_type, blob);
225 if (!s.ok()) {
226 ROCKS_LOG_ERROR(
227 blob_db_impl->db_options_.info_log,
228 "Uncompression error during blob read from file: %" PRIu64
229 " blob_offset: %" PRIu64 " blob_size: %" PRIu64
230 " key: %s status: '%s'",
231 blob_index.file_number(), blob_index.offset(), blob_index.size(),
232 key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
233
234 return false;
235 }
236 }
237
f67539c2
TL
238 return true;
239}
240
20effc67 241bool BlobIndexCompactionFilterBase::WriteBlobToNewFile(
f67539c2
TL
242 const Slice& key, const Slice& blob, uint64_t* new_blob_file_number,
243 uint64_t* new_blob_offset) const {
20effc67 244 TEST_SYNC_POINT("BlobIndexCompactionFilterBase::WriteBlobToNewFile");
f67539c2
TL
245 assert(new_blob_file_number);
246 assert(new_blob_offset);
247
248 assert(blob_file_);
249 *new_blob_file_number = blob_file_->BlobFileNumber();
250
251 assert(writer_);
252 uint64_t new_key_offset = 0;
253 const Status s = writer_->AddRecord(key, blob, kNoExpiration, &new_key_offset,
254 new_blob_offset);
255
256 if (!s.ok()) {
20effc67 257 const BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
f67539c2
TL
258 assert(blob_db_impl);
259
20effc67
TL
260 ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log,
261 "Error writing blob to new file %s during compaction/GC, "
262 "key: %s, status: %s",
263 blob_file_->PathName().c_str(),
264 key.ToString(/* output_hex */ true).c_str(),
265 s.ToString().c_str());
f67539c2
TL
266 return false;
267 }
268
269 const uint64_t new_size =
270 BlobLogRecord::kHeaderSize + key.size() + blob.size();
271 blob_file_->BlobRecordAdded(new_size);
272
20effc67 273 BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
f67539c2
TL
274 assert(blob_db_impl);
275
276 blob_db_impl->total_blob_size_ += new_size;
277
278 return true;
279}
280
20effc67
TL
281bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFileIfNeeded()
282 const {
283 const BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
f67539c2
TL
284 assert(blob_db_impl);
285
286 assert(blob_file_);
287 if (blob_file_->GetFileSize() < blob_db_impl->bdb_options_.blob_file_size) {
288 return true;
289 }
290
291 return CloseAndRegisterNewBlobFile();
292}
293
20effc67
TL
294bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFile() const {
295 BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
f67539c2
TL
296 assert(blob_db_impl);
297 assert(blob_file_);
298
299 Status s;
300
301 {
302 WriteLock wl(&blob_db_impl->mutex_);
303
304 s = blob_db_impl->CloseBlobFile(blob_file_);
305
306 // Note: we delay registering the new blob file until it's closed to
20effc67 307 // prevent FIFO eviction from processing it during compaction/GC.
f67539c2
TL
308 blob_db_impl->RegisterBlobFile(blob_file_);
309 }
310
311 assert(blob_file_->Immutable());
20effc67
TL
312
313 if (!s.ok()) {
314 ROCKS_LOG_ERROR(
315 blob_db_impl->db_options_.info_log,
316 "Error closing new blob file %s during compaction/GC, status: %s",
317 blob_file_->PathName().c_str(), s.ToString().c_str());
318 }
319
f67539c2 320 blob_file_.reset();
20effc67
TL
321 return s.ok();
322}
f67539c2 323
20effc67
TL
324CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput(
325 const Slice& key, const Slice& existing_value,
326 std::string* new_value) const {
327 assert(new_value);
328
329 const BlobDBImpl* const blob_db_impl = context().blob_db_impl;
330 (void)blob_db_impl;
331
332 assert(blob_db_impl);
333 assert(blob_db_impl->bdb_options_.enable_garbage_collection);
334
335 BlobIndex blob_index;
336 const Status s = blob_index.DecodeFrom(existing_value);
f67539c2 337 if (!s.ok()) {
20effc67
TL
338 gc_stats_.SetError();
339 return BlobDecision::kCorruption;
340 }
f67539c2 341
20effc67
TL
342 if (blob_index.IsInlined()) {
343 gc_stats_.AddBlob(blob_index.value().size());
344
345 return BlobDecision::kKeep;
f67539c2
TL
346 }
347
20effc67
TL
348 gc_stats_.AddBlob(blob_index.size());
349
350 if (blob_index.HasTTL()) {
351 return BlobDecision::kKeep;
352 }
353
354 if (blob_index.file_number() >= context_gc_.cutoff_file_number) {
355 return BlobDecision::kKeep;
356 }
357
358 // Note: each compaction generates its own blob files, which, depending on the
359 // workload, might result in many small blob files. The total number of files
360 // is bounded though (determined by the number of compactions and the blob
361 // file size option).
362 if (!OpenNewBlobFileIfNeeded()) {
363 gc_stats_.SetError();
364 return BlobDecision::kIOError;
365 }
366
367 PinnableSlice blob;
368 CompressionType compression_type = kNoCompression;
369 std::string compression_output;
370 if (!ReadBlobFromOldFile(key, blob_index, &blob, false, &compression_type)) {
371 gc_stats_.SetError();
372 return BlobDecision::kIOError;
373 }
374
375 // If the compression_type is changed, re-compress it with the new compression
376 // type.
377 if (compression_type != blob_db_impl->bdb_options_.compression) {
378 if (compression_type != kNoCompression) {
379 const Status status =
380 blob_db_impl->DecompressSlice(blob, compression_type, &blob);
381 if (!status.ok()) {
382 gc_stats_.SetError();
383 return BlobDecision::kCorruption;
384 }
385 }
386 if (blob_db_impl->bdb_options_.compression != kNoCompression) {
387 blob_db_impl->GetCompressedSlice(blob, &compression_output);
388 blob = PinnableSlice(&compression_output);
389 blob.PinSelf();
390 }
391 }
392
393 uint64_t new_blob_file_number = 0;
394 uint64_t new_blob_offset = 0;
395 if (!WriteBlobToNewFile(key, blob, &new_blob_file_number, &new_blob_offset)) {
396 gc_stats_.SetError();
397 return BlobDecision::kIOError;
398 }
399
400 if (!CloseAndRegisterNewBlobFileIfNeeded()) {
401 gc_stats_.SetError();
402 return BlobDecision::kIOError;
403 }
404
405 BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset,
406 blob.size(), compression_type);
407
408 gc_stats_.AddRelocatedBlob(blob_index.size());
409
410 return BlobDecision::kChangeValue;
411}
412
413bool BlobIndexCompactionFilterGC::OpenNewBlobFileIfNeeded() const {
414 if (IsBlobFileOpened()) {
415 return true;
416 }
417 bool result = BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded();
418 if (result) {
419 gc_stats_.AddNewFile();
420 }
421 return result;
422}
423
424std::unique_ptr<CompactionFilter>
425BlobIndexCompactionFilterFactoryBase::CreateUserCompactionFilterFromFactory(
426 const CompactionFilter::Context& context) const {
427 std::unique_ptr<CompactionFilter> user_comp_filter_from_factory;
428 if (user_comp_filter_factory_) {
429 user_comp_filter_from_factory =
430 user_comp_filter_factory_->CreateCompactionFilter(context);
431 }
432 return user_comp_filter_from_factory;
f67539c2 433}
11fdf7f2
TL
434
435std::unique_ptr<CompactionFilter>
436BlobIndexCompactionFilterFactory::CreateCompactionFilter(
20effc67 437 const CompactionFilter::Context& _context) {
f67539c2
TL
438 assert(env());
439
11fdf7f2 440 int64_t current_time = 0;
f67539c2 441 Status s = env()->GetCurrentTime(&current_time);
11fdf7f2
TL
442 if (!s.ok()) {
443 return nullptr;
444 }
445 assert(current_time >= 0);
446
f67539c2
TL
447 assert(blob_db_impl());
448
11fdf7f2 449 BlobCompactionContext context;
f67539c2 450 blob_db_impl()->GetCompactionContext(&context);
11fdf7f2 451
20effc67
TL
452 std::unique_ptr<CompactionFilter> user_comp_filter_from_factory =
453 CreateUserCompactionFilterFromFactory(_context);
454
11fdf7f2 455 return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilter(
20effc67
TL
456 std::move(context), user_comp_filter(),
457 std::move(user_comp_filter_from_factory), current_time, statistics()));
f67539c2
TL
458}
459
460std::unique_ptr<CompactionFilter>
461BlobIndexCompactionFilterFactoryGC::CreateCompactionFilter(
20effc67 462 const CompactionFilter::Context& _context) {
f67539c2
TL
463 assert(env());
464
465 int64_t current_time = 0;
466 Status s = env()->GetCurrentTime(&current_time);
467 if (!s.ok()) {
468 return nullptr;
469 }
470 assert(current_time >= 0);
471
472 assert(blob_db_impl());
473
474 BlobCompactionContext context;
475 BlobCompactionContextGC context_gc;
476 blob_db_impl()->GetCompactionContext(&context, &context_gc);
477
20effc67
TL
478 std::unique_ptr<CompactionFilter> user_comp_filter_from_factory =
479 CreateUserCompactionFilterFromFactory(_context);
480
f67539c2 481 return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilterGC(
20effc67
TL
482 std::move(context), std::move(context_gc), user_comp_filter(),
483 std::move(user_comp_filter_from_factory), current_time, statistics()));
11fdf7f2
TL
484}
485
486} // namespace blob_db
f67539c2 487} // namespace ROCKSDB_NAMESPACE
11fdf7f2 488#endif // ROCKSDB_LITE