]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include "utilities/blob_db/blob_compaction_filter.h" | |
11fdf7f2 | 9 | |
f67539c2 TL |
10 | #include <cinttypes> |
11 | ||
20effc67 TL |
12 | #include "db/dbformat.h" |
13 | #include "test_util/sync_point.h" | |
14 | ||
f67539c2 | 15 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 TL |
16 | namespace blob_db { |
17 | ||
20effc67 TL |
18 | BlobIndexCompactionFilterBase::~BlobIndexCompactionFilterBase() { |
19 | if (blob_file_) { | |
20 | CloseAndRegisterNewBlobFile(); | |
21 | } | |
22 | RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_); | |
23 | RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_); | |
24 | RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_); | |
25 | RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_); | |
26 | } | |
27 | ||
f67539c2 | 28 | CompactionFilter::Decision BlobIndexCompactionFilterBase::FilterV2( |
20effc67 TL |
29 | int level, const Slice& key, ValueType value_type, const Slice& value, |
30 | std::string* new_value, std::string* skip_until) const { | |
31 | const CompactionFilter* ucf = user_comp_filter(); | |
f67539c2 | 32 | if (value_type != kBlobIndex) { |
20effc67 TL |
33 | if (ucf == nullptr) { |
34 | return Decision::kKeep; | |
35 | } | |
36 | // Apply user compaction filter for inlined data. | |
37 | CompactionFilter::Decision decision = | |
38 | ucf->FilterV2(level, key, value_type, value, new_value, skip_until); | |
39 | if (decision == Decision::kChangeValue) { | |
40 | return HandleValueChange(key, new_value); | |
41 | } | |
42 | return decision; | |
f67539c2 TL |
43 | } |
44 | BlobIndex blob_index; | |
45 | Status s = blob_index.DecodeFrom(value); | |
46 | if (!s.ok()) { | |
47 | // Unable to decode blob index. Keeping the value. | |
48 | return Decision::kKeep; | |
49 | } | |
50 | if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) { | |
51 | // Expired | |
52 | expired_count_++; | |
53 | expired_size_ += key.size() + value.size(); | |
54 | return Decision::kRemove; | |
55 | } | |
56 | if (!blob_index.IsInlined() && | |
57 | blob_index.file_number() < context_.next_file_number && | |
58 | context_.current_blob_files.count(blob_index.file_number()) == 0) { | |
59 | // Corresponding blob file gone (most likely, evicted by FIFO eviction). | |
60 | evicted_count_++; | |
61 | evicted_size_ += key.size() + value.size(); | |
62 | return Decision::kRemove; | |
63 | } | |
64 | if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() && | |
65 | blob_index.expiration() < context_.evict_expiration_up_to) { | |
66 | // Hack: Internal key is passed to BlobIndexCompactionFilter for it to | |
67 | // get sequence number. | |
68 | ParsedInternalKey ikey; | |
20effc67 TL |
69 | if (!ParseInternalKey( |
70 | key, &ikey, | |
71 | context_.blob_db_impl->db_options_.allow_data_in_errors) | |
72 | .ok()) { | |
73 | assert(false); | |
74 | return Decision::kKeep; | |
75 | } | |
f67539c2 TL |
76 | // Remove keys that could have been remove by last FIFO eviction. |
77 | // If get error while parsing key, ignore and continue. | |
20effc67 | 78 | if (ikey.sequence < context_.fifo_eviction_seq) { |
f67539c2 TL |
79 | evicted_count_++; |
80 | evicted_size_ += key.size() + value.size(); | |
81 | return Decision::kRemove; | |
82 | } | |
83 | } | |
20effc67 TL |
84 | // Apply user compaction filter for all non-TTL blob data. |
85 | if (ucf != nullptr && !blob_index.HasTTL()) { | |
86 | // Hack: Internal key is passed to BlobIndexCompactionFilter for it to | |
87 | // get sequence number. | |
88 | ParsedInternalKey ikey; | |
89 | if (!ParseInternalKey( | |
90 | key, &ikey, | |
91 | context_.blob_db_impl->db_options_.allow_data_in_errors) | |
92 | .ok()) { | |
93 | assert(false); | |
94 | return Decision::kKeep; | |
95 | } | |
96 | // Read value from blob file. | |
97 | PinnableSlice blob; | |
98 | CompressionType compression_type = kNoCompression; | |
99 | constexpr bool need_decompress = true; | |
100 | if (!ReadBlobFromOldFile(ikey.user_key, blob_index, &blob, need_decompress, | |
101 | &compression_type)) { | |
102 | return Decision::kIOError; | |
103 | } | |
104 | CompactionFilter::Decision decision = ucf->FilterV2( | |
105 | level, ikey.user_key, kValue, blob, new_value, skip_until); | |
106 | if (decision == Decision::kChangeValue) { | |
107 | return HandleValueChange(ikey.user_key, new_value); | |
108 | } | |
109 | return decision; | |
110 | } | |
f67539c2 TL |
111 | return Decision::kKeep; |
112 | } | |
113 | ||
20effc67 TL |
114 | CompactionFilter::Decision BlobIndexCompactionFilterBase::HandleValueChange( |
115 | const Slice& key, std::string* new_value) const { | |
116 | BlobDBImpl* const blob_db_impl = context_.blob_db_impl; | |
117 | assert(blob_db_impl); | |
118 | ||
119 | if (new_value->size() < blob_db_impl->bdb_options_.min_blob_size) { | |
120 | // Keep new_value inlined. | |
121 | return Decision::kChangeValue; | |
122 | } | |
123 | if (!OpenNewBlobFileIfNeeded()) { | |
124 | return Decision::kIOError; | |
125 | } | |
126 | Slice new_blob_value(*new_value); | |
127 | std::string compression_output; | |
128 | if (blob_db_impl->bdb_options_.compression != kNoCompression) { | |
129 | new_blob_value = | |
130 | blob_db_impl->GetCompressedSlice(new_blob_value, &compression_output); | |
f67539c2 | 131 | } |
20effc67 TL |
132 | uint64_t new_blob_file_number = 0; |
133 | uint64_t new_blob_offset = 0; | |
134 | if (!WriteBlobToNewFile(key, new_blob_value, &new_blob_file_number, | |
135 | &new_blob_offset)) { | |
136 | return Decision::kIOError; | |
137 | } | |
138 | if (!CloseAndRegisterNewBlobFileIfNeeded()) { | |
139 | return Decision::kIOError; | |
140 | } | |
141 | BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset, | |
142 | new_blob_value.size(), | |
143 | blob_db_impl->bdb_options_.compression); | |
144 | return Decision::kChangeBlobIndex; | |
145 | } | |
f67539c2 | 146 | |
20effc67 TL |
147 | BlobIndexCompactionFilterGC::~BlobIndexCompactionFilterGC() { |
148 | assert(context().blob_db_impl); | |
f67539c2 | 149 | |
20effc67 | 150 | ROCKS_LOG_INFO(context().blob_db_impl->db_options_.info_log, |
f67539c2 TL |
151 | "GC pass finished %s: encountered %" PRIu64 " blobs (%" PRIu64 |
152 | " bytes), relocated %" PRIu64 " blobs (%" PRIu64 | |
153 | " bytes), created %" PRIu64 " new blob file(s)", | |
154 | !gc_stats_.HasError() ? "successfully" : "with failure", | |
155 | gc_stats_.AllBlobs(), gc_stats_.AllBytes(), | |
156 | gc_stats_.RelocatedBlobs(), gc_stats_.RelocatedBytes(), | |
157 | gc_stats_.NewFiles()); | |
158 | ||
159 | RecordTick(statistics(), BLOB_DB_GC_NUM_KEYS_RELOCATED, | |
160 | gc_stats_.RelocatedBlobs()); | |
161 | RecordTick(statistics(), BLOB_DB_GC_BYTES_RELOCATED, | |
162 | gc_stats_.RelocatedBytes()); | |
163 | RecordTick(statistics(), BLOB_DB_GC_NUM_NEW_FILES, gc_stats_.NewFiles()); | |
164 | RecordTick(statistics(), BLOB_DB_GC_FAILURES, gc_stats_.HasError()); | |
165 | } | |
166 | ||
20effc67 TL |
167 | bool BlobIndexCompactionFilterBase::IsBlobFileOpened() const { |
168 | if (blob_file_) { | |
169 | assert(writer_); | |
170 | return true; | |
f67539c2 | 171 | } |
20effc67 | 172 | return false; |
f67539c2 TL |
173 | } |
174 | ||
20effc67 TL |
175 | bool BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded() const { |
176 | if (IsBlobFileOpened()) { | |
f67539c2 TL |
177 | return true; |
178 | } | |
179 | ||
20effc67 | 180 | BlobDBImpl* const blob_db_impl = context_.blob_db_impl; |
f67539c2 TL |
181 | assert(blob_db_impl); |
182 | ||
183 | const Status s = blob_db_impl->CreateBlobFileAndWriter( | |
20effc67 TL |
184 | /* has_ttl */ false, ExpirationRange(), "compaction/GC", &blob_file_, |
185 | &writer_); | |
f67539c2 | 186 | if (!s.ok()) { |
20effc67 TL |
187 | ROCKS_LOG_ERROR( |
188 | blob_db_impl->db_options_.info_log, | |
189 | "Error opening new blob file during compaction/GC, status: %s", | |
190 | s.ToString().c_str()); | |
191 | blob_file_.reset(); | |
192 | writer_.reset(); | |
f67539c2 | 193 | return false; |
11fdf7f2 TL |
194 | } |
195 | ||
f67539c2 TL |
196 | assert(blob_file_); |
197 | assert(writer_); | |
11fdf7f2 | 198 | |
f67539c2 TL |
199 | return true; |
200 | } | |
201 | ||
20effc67 | 202 | bool BlobIndexCompactionFilterBase::ReadBlobFromOldFile( |
f67539c2 | 203 | const Slice& key, const BlobIndex& blob_index, PinnableSlice* blob, |
20effc67 TL |
204 | bool need_decompress, CompressionType* compression_type) const { |
205 | BlobDBImpl* const blob_db_impl = context_.blob_db_impl; | |
f67539c2 TL |
206 | assert(blob_db_impl); |
207 | ||
20effc67 | 208 | Status s = blob_db_impl->GetRawBlobFromFile( |
f67539c2 TL |
209 | key, blob_index.file_number(), blob_index.offset(), blob_index.size(), |
210 | blob, compression_type); | |
211 | ||
212 | if (!s.ok()) { | |
20effc67 TL |
213 | ROCKS_LOG_ERROR( |
214 | blob_db_impl->db_options_.info_log, | |
215 | "Error reading blob during compaction/GC, key: %s (%s), status: %s", | |
216 | key.ToString(/* output_hex */ true).c_str(), | |
217 | blob_index.DebugString(/* output_hex */ true).c_str(), | |
218 | s.ToString().c_str()); | |
f67539c2 TL |
219 | |
220 | return false; | |
221 | } | |
222 | ||
20effc67 TL |
223 | if (need_decompress && *compression_type != kNoCompression) { |
224 | s = blob_db_impl->DecompressSlice(*blob, *compression_type, blob); | |
225 | if (!s.ok()) { | |
226 | ROCKS_LOG_ERROR( | |
227 | blob_db_impl->db_options_.info_log, | |
228 | "Uncompression error during blob read from file: %" PRIu64 | |
229 | " blob_offset: %" PRIu64 " blob_size: %" PRIu64 | |
230 | " key: %s status: '%s'", | |
231 | blob_index.file_number(), blob_index.offset(), blob_index.size(), | |
232 | key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str()); | |
233 | ||
234 | return false; | |
235 | } | |
236 | } | |
237 | ||
f67539c2 TL |
238 | return true; |
239 | } | |
240 | ||
20effc67 | 241 | bool BlobIndexCompactionFilterBase::WriteBlobToNewFile( |
f67539c2 TL |
242 | const Slice& key, const Slice& blob, uint64_t* new_blob_file_number, |
243 | uint64_t* new_blob_offset) const { | |
20effc67 | 244 | TEST_SYNC_POINT("BlobIndexCompactionFilterBase::WriteBlobToNewFile"); |
f67539c2 TL |
245 | assert(new_blob_file_number); |
246 | assert(new_blob_offset); | |
247 | ||
248 | assert(blob_file_); | |
249 | *new_blob_file_number = blob_file_->BlobFileNumber(); | |
250 | ||
251 | assert(writer_); | |
252 | uint64_t new_key_offset = 0; | |
253 | const Status s = writer_->AddRecord(key, blob, kNoExpiration, &new_key_offset, | |
254 | new_blob_offset); | |
255 | ||
256 | if (!s.ok()) { | |
20effc67 | 257 | const BlobDBImpl* const blob_db_impl = context_.blob_db_impl; |
f67539c2 TL |
258 | assert(blob_db_impl); |
259 | ||
20effc67 TL |
260 | ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log, |
261 | "Error writing blob to new file %s during compaction/GC, " | |
262 | "key: %s, status: %s", | |
263 | blob_file_->PathName().c_str(), | |
264 | key.ToString(/* output_hex */ true).c_str(), | |
265 | s.ToString().c_str()); | |
f67539c2 TL |
266 | return false; |
267 | } | |
268 | ||
269 | const uint64_t new_size = | |
270 | BlobLogRecord::kHeaderSize + key.size() + blob.size(); | |
271 | blob_file_->BlobRecordAdded(new_size); | |
272 | ||
20effc67 | 273 | BlobDBImpl* const blob_db_impl = context_.blob_db_impl; |
f67539c2 TL |
274 | assert(blob_db_impl); |
275 | ||
276 | blob_db_impl->total_blob_size_ += new_size; | |
277 | ||
278 | return true; | |
279 | } | |
280 | ||
20effc67 TL |
281 | bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFileIfNeeded() |
282 | const { | |
283 | const BlobDBImpl* const blob_db_impl = context_.blob_db_impl; | |
f67539c2 TL |
284 | assert(blob_db_impl); |
285 | ||
286 | assert(blob_file_); | |
287 | if (blob_file_->GetFileSize() < blob_db_impl->bdb_options_.blob_file_size) { | |
288 | return true; | |
289 | } | |
290 | ||
291 | return CloseAndRegisterNewBlobFile(); | |
292 | } | |
293 | ||
20effc67 TL |
294 | bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFile() const { |
295 | BlobDBImpl* const blob_db_impl = context_.blob_db_impl; | |
f67539c2 TL |
296 | assert(blob_db_impl); |
297 | assert(blob_file_); | |
298 | ||
299 | Status s; | |
300 | ||
301 | { | |
302 | WriteLock wl(&blob_db_impl->mutex_); | |
303 | ||
304 | s = blob_db_impl->CloseBlobFile(blob_file_); | |
305 | ||
306 | // Note: we delay registering the new blob file until it's closed to | |
20effc67 | 307 | // prevent FIFO eviction from processing it during compaction/GC. |
f67539c2 TL |
308 | blob_db_impl->RegisterBlobFile(blob_file_); |
309 | } | |
310 | ||
311 | assert(blob_file_->Immutable()); | |
20effc67 TL |
312 | |
313 | if (!s.ok()) { | |
314 | ROCKS_LOG_ERROR( | |
315 | blob_db_impl->db_options_.info_log, | |
316 | "Error closing new blob file %s during compaction/GC, status: %s", | |
317 | blob_file_->PathName().c_str(), s.ToString().c_str()); | |
318 | } | |
319 | ||
f67539c2 | 320 | blob_file_.reset(); |
20effc67 TL |
321 | return s.ok(); |
322 | } | |
f67539c2 | 323 | |
20effc67 TL |
324 | CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput( |
325 | const Slice& key, const Slice& existing_value, | |
326 | std::string* new_value) const { | |
327 | assert(new_value); | |
328 | ||
329 | const BlobDBImpl* const blob_db_impl = context().blob_db_impl; | |
330 | (void)blob_db_impl; | |
331 | ||
332 | assert(blob_db_impl); | |
333 | assert(blob_db_impl->bdb_options_.enable_garbage_collection); | |
334 | ||
335 | BlobIndex blob_index; | |
336 | const Status s = blob_index.DecodeFrom(existing_value); | |
f67539c2 | 337 | if (!s.ok()) { |
20effc67 TL |
338 | gc_stats_.SetError(); |
339 | return BlobDecision::kCorruption; | |
340 | } | |
f67539c2 | 341 | |
20effc67 TL |
342 | if (blob_index.IsInlined()) { |
343 | gc_stats_.AddBlob(blob_index.value().size()); | |
344 | ||
345 | return BlobDecision::kKeep; | |
f67539c2 TL |
346 | } |
347 | ||
20effc67 TL |
348 | gc_stats_.AddBlob(blob_index.size()); |
349 | ||
350 | if (blob_index.HasTTL()) { | |
351 | return BlobDecision::kKeep; | |
352 | } | |
353 | ||
354 | if (blob_index.file_number() >= context_gc_.cutoff_file_number) { | |
355 | return BlobDecision::kKeep; | |
356 | } | |
357 | ||
358 | // Note: each compaction generates its own blob files, which, depending on the | |
359 | // workload, might result in many small blob files. The total number of files | |
360 | // is bounded though (determined by the number of compactions and the blob | |
361 | // file size option). | |
362 | if (!OpenNewBlobFileIfNeeded()) { | |
363 | gc_stats_.SetError(); | |
364 | return BlobDecision::kIOError; | |
365 | } | |
366 | ||
367 | PinnableSlice blob; | |
368 | CompressionType compression_type = kNoCompression; | |
369 | std::string compression_output; | |
370 | if (!ReadBlobFromOldFile(key, blob_index, &blob, false, &compression_type)) { | |
371 | gc_stats_.SetError(); | |
372 | return BlobDecision::kIOError; | |
373 | } | |
374 | ||
375 | // If the compression_type is changed, re-compress it with the new compression | |
376 | // type. | |
377 | if (compression_type != blob_db_impl->bdb_options_.compression) { | |
378 | if (compression_type != kNoCompression) { | |
379 | const Status status = | |
380 | blob_db_impl->DecompressSlice(blob, compression_type, &blob); | |
381 | if (!status.ok()) { | |
382 | gc_stats_.SetError(); | |
383 | return BlobDecision::kCorruption; | |
384 | } | |
385 | } | |
386 | if (blob_db_impl->bdb_options_.compression != kNoCompression) { | |
387 | blob_db_impl->GetCompressedSlice(blob, &compression_output); | |
388 | blob = PinnableSlice(&compression_output); | |
389 | blob.PinSelf(); | |
390 | } | |
391 | } | |
392 | ||
393 | uint64_t new_blob_file_number = 0; | |
394 | uint64_t new_blob_offset = 0; | |
395 | if (!WriteBlobToNewFile(key, blob, &new_blob_file_number, &new_blob_offset)) { | |
396 | gc_stats_.SetError(); | |
397 | return BlobDecision::kIOError; | |
398 | } | |
399 | ||
400 | if (!CloseAndRegisterNewBlobFileIfNeeded()) { | |
401 | gc_stats_.SetError(); | |
402 | return BlobDecision::kIOError; | |
403 | } | |
404 | ||
405 | BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset, | |
406 | blob.size(), compression_type); | |
407 | ||
408 | gc_stats_.AddRelocatedBlob(blob_index.size()); | |
409 | ||
410 | return BlobDecision::kChangeValue; | |
411 | } | |
412 | ||
413 | bool BlobIndexCompactionFilterGC::OpenNewBlobFileIfNeeded() const { | |
414 | if (IsBlobFileOpened()) { | |
415 | return true; | |
416 | } | |
417 | bool result = BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded(); | |
418 | if (result) { | |
419 | gc_stats_.AddNewFile(); | |
420 | } | |
421 | return result; | |
422 | } | |
423 | ||
424 | std::unique_ptr<CompactionFilter> | |
425 | BlobIndexCompactionFilterFactoryBase::CreateUserCompactionFilterFromFactory( | |
426 | const CompactionFilter::Context& context) const { | |
427 | std::unique_ptr<CompactionFilter> user_comp_filter_from_factory; | |
428 | if (user_comp_filter_factory_) { | |
429 | user_comp_filter_from_factory = | |
430 | user_comp_filter_factory_->CreateCompactionFilter(context); | |
431 | } | |
432 | return user_comp_filter_from_factory; | |
f67539c2 | 433 | } |
11fdf7f2 TL |
434 | |
435 | std::unique_ptr<CompactionFilter> | |
436 | BlobIndexCompactionFilterFactory::CreateCompactionFilter( | |
20effc67 | 437 | const CompactionFilter::Context& _context) { |
f67539c2 TL |
438 | assert(env()); |
439 | ||
11fdf7f2 | 440 | int64_t current_time = 0; |
f67539c2 | 441 | Status s = env()->GetCurrentTime(¤t_time); |
11fdf7f2 TL |
442 | if (!s.ok()) { |
443 | return nullptr; | |
444 | } | |
445 | assert(current_time >= 0); | |
446 | ||
f67539c2 TL |
447 | assert(blob_db_impl()); |
448 | ||
11fdf7f2 | 449 | BlobCompactionContext context; |
f67539c2 | 450 | blob_db_impl()->GetCompactionContext(&context); |
11fdf7f2 | 451 | |
20effc67 TL |
452 | std::unique_ptr<CompactionFilter> user_comp_filter_from_factory = |
453 | CreateUserCompactionFilterFromFactory(_context); | |
454 | ||
11fdf7f2 | 455 | return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilter( |
20effc67 TL |
456 | std::move(context), user_comp_filter(), |
457 | std::move(user_comp_filter_from_factory), current_time, statistics())); | |
f67539c2 TL |
458 | } |
459 | ||
460 | std::unique_ptr<CompactionFilter> | |
461 | BlobIndexCompactionFilterFactoryGC::CreateCompactionFilter( | |
20effc67 | 462 | const CompactionFilter::Context& _context) { |
f67539c2 TL |
463 | assert(env()); |
464 | ||
465 | int64_t current_time = 0; | |
466 | Status s = env()->GetCurrentTime(¤t_time); | |
467 | if (!s.ok()) { | |
468 | return nullptr; | |
469 | } | |
470 | assert(current_time >= 0); | |
471 | ||
472 | assert(blob_db_impl()); | |
473 | ||
474 | BlobCompactionContext context; | |
475 | BlobCompactionContextGC context_gc; | |
476 | blob_db_impl()->GetCompactionContext(&context, &context_gc); | |
477 | ||
20effc67 TL |
478 | std::unique_ptr<CompactionFilter> user_comp_filter_from_factory = |
479 | CreateUserCompactionFilterFromFactory(_context); | |
480 | ||
f67539c2 | 481 | return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilterGC( |
20effc67 TL |
482 | std::move(context), std::move(context_gc), user_comp_filter(), |
483 | std::move(user_comp_filter_from_factory), current_time, statistics())); | |
11fdf7f2 TL |
484 | } |
485 | ||
486 | } // namespace blob_db | |
f67539c2 | 487 | } // namespace ROCKSDB_NAMESPACE |
11fdf7f2 | 488 | #endif // ROCKSDB_LITE |