1 // Copyright (c) Meta Platforms, Inc. and affiliates.
3 // This source code is licensed under both the GPLv2 (found in the
4 // COPYING file in the root directory) and Apache 2.0 License
5 // (found in the LICENSE.Apache file in the root directory).
7 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
8 // Use of this source code is governed by a BSD-style license that can be
9 // found in the LICENSE file. See the AUTHORS file for names of contributors.
15 #include "db/blob/blob_file_addition.h"
16 #include "db/blob/blob_garbage_meter.h"
17 #include "db/compaction/compaction.h"
18 #include "db/compaction/compaction_iterator.h"
19 #include "db/compaction/compaction_outputs.h"
20 #include "db/internal_stats.h"
21 #include "db/output_validator.h"
22 #include "db/range_del_aggregator.h"
24 namespace ROCKSDB_NAMESPACE
{
26 // Maintains state and outputs for each sub-compaction
27 // It contains 2 `CompactionOutputs`:
28 // 1. one for the normal output files
29 // 2. another for the penultimate level outputs
30 // a `current` pointer maintains the current output group, when calling
31 // `AddToOutput()`, it checks the output of the current compaction_iterator key
32 // and point `current` to the target output group. By default, it just points to
33 // normal compaction_outputs, if the compaction_iterator key should be placed on
34 // the penultimate level, `current` is changed to point to
35 // `penultimate_level_outputs`.
36 // The later operations uses `Current()` to get the target group.
38 // +----------+ +-----------------------------+ +---------+
39 // | *current |--------> | compaction_outputs |----->| output |
40 // +----------+ +-----------------------------+ +---------+
45 // | +-----------------------------+ +---------+
46 // +-------------> | penultimate_level_outputs |----->| output |
47 // +-----------------------------+ +---------+
50 class SubcompactionState
{
52 const Compaction
* compaction
;
54 // The boundaries of the key-range this compaction is interested in. No two
55 // sub-compactions may have overlapping key-ranges.
56 // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
57 const std::optional
<Slice
> start
, end
;
59 // The return status of this sub-compaction
62 // The return IO Status of this sub-compaction
65 // Notify on sub-compaction completion only if listener was notified on
66 // sub-compaction begin.
67 bool notify_on_subcompaction_completion
= false;
69 // compaction job stats for this sub-compaction
70 CompactionJobStats compaction_job_stats
;
72 // sub-compaction job id, which is used to identify different sub-compaction
73 // within the same compaction job.
74 const uint32_t sub_job_id
;
76 Slice
SmallestUserKey() const;
78 Slice
LargestUserKey() const;
80 // Get all outputs from the subcompaction. For per_key_placement compaction,
81 // it returns both the last level outputs and penultimate level outputs.
82 OutputIterator
GetOutputs() const;
84 // Assign range dels aggregator, for each range_del, it can only be assigned
85 // to one output level, for per_key_placement, it's going to be the
87 void AssignRangeDelAggregator(
88 std::unique_ptr
<CompactionRangeDelAggregator
>&& range_del_agg
) {
89 if (compaction
->SupportsPerKeyPlacement()) {
90 penultimate_level_outputs_
.AssignRangeDelAggregator(
91 std::move(range_del_agg
));
93 compaction_outputs_
.AssignRangeDelAggregator(std::move(range_del_agg
));
97 void RemoveLastEmptyOutput() {
98 compaction_outputs_
.RemoveLastEmptyOutput();
99 penultimate_level_outputs_
.RemoveLastEmptyOutput();
103 void BuildSubcompactionJobInfo(
104 SubcompactionJobInfo
& subcompaction_job_info
) const {
105 const Compaction
* c
= compaction
;
106 const ColumnFamilyData
* cfd
= c
->column_family_data();
108 subcompaction_job_info
.cf_id
= cfd
->GetID();
109 subcompaction_job_info
.cf_name
= cfd
->GetName();
110 subcompaction_job_info
.status
= status
;
111 subcompaction_job_info
.subcompaction_job_id
= static_cast<int>(sub_job_id
);
112 subcompaction_job_info
.base_input_level
= c
->start_level();
113 subcompaction_job_info
.output_level
= c
->output_level();
114 subcompaction_job_info
.stats
= compaction_job_stats
;
116 #endif // !ROCKSDB_LITE
118 SubcompactionState() = delete;
119 SubcompactionState(const SubcompactionState
&) = delete;
120 SubcompactionState
& operator=(const SubcompactionState
&) = delete;
122 SubcompactionState(Compaction
* c
, const std::optional
<Slice
> _start
,
123 const std::optional
<Slice
> _end
, uint32_t _sub_job_id
)
127 sub_job_id(_sub_job_id
),
128 compaction_outputs_(c
, /*is_penultimate_level=*/false),
129 penultimate_level_outputs_(c
, /*is_penultimate_level=*/true) {
130 assert(compaction
!= nullptr);
131 // Set output split key (used for RoundRobin feature) only for normal
132 // compaction_outputs, output to penultimate_level feature doesn't support
133 // RoundRobin feature (and may never going to be supported, because for
134 // RoundRobin, the data time is mostly naturally sorted, no need to have
135 // per-key placement with output_to_penultimate_level).
136 compaction_outputs_
.SetOutputSlitKey(start
, end
);
139 SubcompactionState(SubcompactionState
&& state
) noexcept
140 : compaction(state
.compaction
),
143 status(std::move(state
.status
)),
144 io_status(std::move(state
.io_status
)),
145 notify_on_subcompaction_completion(
146 state
.notify_on_subcompaction_completion
),
147 compaction_job_stats(std::move(state
.compaction_job_stats
)),
148 sub_job_id(state
.sub_job_id
),
149 compaction_outputs_(std::move(state
.compaction_outputs_
)),
150 penultimate_level_outputs_(std::move(state
.penultimate_level_outputs_
)),
151 is_current_penultimate_level_(state
.is_current_penultimate_level_
),
152 has_penultimate_level_outputs_(state
.has_penultimate_level_outputs_
) {
153 current_outputs_
= is_current_penultimate_level_
154 ? &penultimate_level_outputs_
155 : &compaction_outputs_
;
158 bool HasPenultimateLevelOutputs() const {
159 return has_penultimate_level_outputs_
||
160 penultimate_level_outputs_
.HasRangeDel();
163 bool IsCurrentPenultimateLevel() const {
164 return is_current_penultimate_level_
;
167 // Add all the new files from this compaction to version_edit
168 void AddOutputsEdit(VersionEdit
* out_edit
) const {
169 for (const auto& file
: penultimate_level_outputs_
.outputs_
) {
170 out_edit
->AddFile(compaction
->GetPenultimateLevel(), file
.meta
);
172 for (const auto& file
: compaction_outputs_
.outputs_
) {
173 out_edit
->AddFile(compaction
->output_level(), file
.meta
);
177 void Cleanup(Cache
* cache
);
179 void AggregateCompactionStats(
180 InternalStats::CompactionStatsFull
& compaction_stats
) const;
182 CompactionOutputs
& Current() const {
183 assert(current_outputs_
);
184 return *current_outputs_
;
187 // Add compaction_iterator key/value to the `Current` output group.
188 Status
AddToOutput(const CompactionIterator
& iter
,
189 const CompactionFileOpenFunc
& open_file_func
,
190 const CompactionFileCloseFunc
& close_file_func
);
192 // Close all compaction output files, both output_to_penultimate_level outputs
193 // and normal outputs.
194 Status
CloseCompactionFiles(const Status
& curr_status
,
195 const CompactionFileOpenFunc
& open_file_func
,
196 const CompactionFileCloseFunc
& close_file_func
) {
197 // Call FinishCompactionOutputFile() even if status is not ok: it needs to
198 // close the output file.
199 Status s
= penultimate_level_outputs_
.CloseOutput(
200 curr_status
, open_file_func
, close_file_func
);
201 s
= compaction_outputs_
.CloseOutput(s
, open_file_func
, close_file_func
);
206 // State kept for output being generated
207 CompactionOutputs compaction_outputs_
;
208 CompactionOutputs penultimate_level_outputs_
;
209 CompactionOutputs
* current_outputs_
= &compaction_outputs_
;
210 bool is_current_penultimate_level_
= false;
211 bool has_penultimate_level_outputs_
= false;
214 } // namespace ROCKSDB_NAMESPACE