]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | ||
6 | #ifndef ROCKSDB_LITE | |
7 | ||
8 | #include "utilities/blob_db/blob_compaction_filter.h" | |
9 | #include "db/dbformat.h" | |
10 | ||
11 | namespace rocksdb { | |
12 | namespace blob_db { | |
13 | ||
14 | namespace { | |
15 | ||
16 | // CompactionFilter to delete expired blob index from base DB. | |
17 | class BlobIndexCompactionFilter : public CompactionFilter { | |
18 | public: | |
19 | BlobIndexCompactionFilter(BlobCompactionContext context, | |
20 | uint64_t current_time, Statistics* statistics) | |
21 | : context_(context), | |
22 | current_time_(current_time), | |
23 | statistics_(statistics) {} | |
24 | ||
494da23a | 25 | ~BlobIndexCompactionFilter() override { |
11fdf7f2 TL |
26 | RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_); |
27 | RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_); | |
28 | RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_); | |
29 | RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_); | |
30 | } | |
31 | ||
494da23a | 32 | const char* Name() const override { return "BlobIndexCompactionFilter"; } |
11fdf7f2 TL |
33 | |
34 | // Filter expired blob indexes regardless of snapshots. | |
494da23a | 35 | bool IgnoreSnapshots() const override { return true; } |
11fdf7f2 | 36 | |
494da23a TL |
37 | Decision FilterV2(int /*level*/, const Slice& key, ValueType value_type, |
38 | const Slice& value, std::string* /*new_value*/, | |
39 | std::string* /*skip_until*/) const override { | |
11fdf7f2 TL |
40 | if (value_type != kBlobIndex) { |
41 | return Decision::kKeep; | |
42 | } | |
43 | BlobIndex blob_index; | |
44 | Status s = blob_index.DecodeFrom(value); | |
45 | if (!s.ok()) { | |
46 | // Unable to decode blob index. Keeping the value. | |
47 | return Decision::kKeep; | |
48 | } | |
49 | if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) { | |
50 | // Expired | |
51 | expired_count_++; | |
52 | expired_size_ += key.size() + value.size(); | |
53 | return Decision::kRemove; | |
54 | } | |
55 | if (!blob_index.IsInlined() && | |
56 | blob_index.file_number() < context_.next_file_number && | |
57 | context_.current_blob_files.count(blob_index.file_number()) == 0) { | |
58 | // Corresponding blob file gone. Could have been garbage collected or | |
59 | // evicted by FIFO eviction. | |
60 | evicted_count_++; | |
61 | evicted_size_ += key.size() + value.size(); | |
62 | return Decision::kRemove; | |
63 | } | |
64 | if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() && | |
65 | blob_index.expiration() < context_.evict_expiration_up_to) { | |
66 | // Hack: Internal key is passed to BlobIndexCompactionFilter for it to | |
67 | // get sequence number. | |
68 | ParsedInternalKey ikey; | |
69 | bool ok = ParseInternalKey(key, &ikey); | |
70 | // Remove keys that could have been remove by last FIFO eviction. | |
71 | // If get error while parsing key, ignore and continue. | |
72 | if (ok && ikey.sequence < context_.fifo_eviction_seq) { | |
73 | evicted_count_++; | |
74 | evicted_size_ += key.size() + value.size(); | |
75 | return Decision::kRemove; | |
76 | } | |
77 | } | |
78 | return Decision::kKeep; | |
79 | } | |
80 | ||
81 | private: | |
82 | BlobCompactionContext context_; | |
83 | const uint64_t current_time_; | |
84 | Statistics* statistics_; | |
85 | // It is safe to not using std::atomic since the compaction filter, created | |
86 | // from a compaction filter factroy, will not be called from multiple threads. | |
87 | mutable uint64_t expired_count_ = 0; | |
88 | mutable uint64_t expired_size_ = 0; | |
89 | mutable uint64_t evicted_count_ = 0; | |
90 | mutable uint64_t evicted_size_ = 0; | |
91 | }; | |
92 | ||
93 | } // anonymous namespace | |
94 | ||
95 | std::unique_ptr<CompactionFilter> | |
96 | BlobIndexCompactionFilterFactory::CreateCompactionFilter( | |
97 | const CompactionFilter::Context& /*context*/) { | |
98 | int64_t current_time = 0; | |
99 | Status s = env_->GetCurrentTime(¤t_time); | |
100 | if (!s.ok()) { | |
101 | return nullptr; | |
102 | } | |
103 | assert(current_time >= 0); | |
104 | ||
105 | BlobCompactionContext context; | |
106 | blob_db_impl_->GetCompactionContext(&context); | |
107 | ||
108 | return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilter( | |
109 | context, static_cast<uint64_t>(current_time), statistics_)); | |
110 | } | |
111 | ||
112 | } // namespace blob_db | |
113 | } // namespace rocksdb | |
114 | #endif // ROCKSDB_LITE |