1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
20 #include "db/column_family.h"
21 #include "db/compaction/compaction_iterator.h"
22 #include "db/dbformat.h"
23 #include "db/flush_scheduler.h"
24 #include "db/internal_stats.h"
25 #include "db/job_context.h"
26 #include "db/log_writer.h"
27 #include "db/memtable_list.h"
28 #include "db/range_del_aggregator.h"
29 #include "db/version_edit.h"
30 #include "db/write_controller.h"
31 #include "db/write_thread.h"
32 #include "logging/event_logger.h"
33 #include "options/cf_options.h"
34 #include "options/db_options.h"
35 #include "port/port.h"
36 #include "rocksdb/compaction_filter.h"
37 #include "rocksdb/compaction_job_stats.h"
38 #include "rocksdb/db.h"
39 #include "rocksdb/env.h"
40 #include "rocksdb/memtablerep.h"
41 #include "rocksdb/transaction_log.h"
42 #include "table/scoped_arena_iterator.h"
43 #include "util/autovector.h"
44 #include "util/stop_watch.h"
45 #include "util/thread_local.h"
47 namespace ROCKSDB_NAMESPACE
{
52 class SnapshotChecker
;
58 // CompactionJob is responsible for executing the compaction. Each (manual or
59 // automated) compaction corresponds to a CompactionJob object, and usually
60 // goes through the stages of `Prepare()`->`Run()`->`Install()`. CompactionJob
61 // will divide the compaction into subcompactions and execute them in parallel
65 CompactionJob(int job_id
, Compaction
* compaction
,
66 const ImmutableDBOptions
& db_options
,
67 const FileOptions
& file_options
, VersionSet
* versions
,
68 const std::atomic
<bool>* shutting_down
,
69 const SequenceNumber preserve_deletes_seqnum
,
70 LogBuffer
* log_buffer
, Directory
* db_directory
,
71 Directory
* output_directory
, Statistics
* stats
,
72 InstrumentedMutex
* db_mutex
, ErrorHandler
* db_error_handler
,
73 std::vector
<SequenceNumber
> existing_snapshots
,
74 SequenceNumber earliest_write_conflict_snapshot
,
75 const SnapshotChecker
* snapshot_checker
,
76 std::shared_ptr
<Cache
> table_cache
, EventLogger
* event_logger
,
77 bool paranoid_file_checks
, bool measure_io_stats
,
78 const std::string
& dbname
,
79 CompactionJobStats
* compaction_job_stats
,
80 Env::Priority thread_pri
,
81 const std::atomic
<bool>* manual_compaction_paused
= nullptr);
86 CompactionJob(CompactionJob
&& job
) = delete;
87 CompactionJob(const CompactionJob
& job
) = delete;
88 CompactionJob
& operator=(const CompactionJob
& job
) = delete;
90 // REQUIRED: mutex held
91 // Prepare for the compaction by setting up boundaries for each subcompaction
93 // REQUIRED mutex not held
94 // Launch threads for each subcompaction and wait for them to finish. After
95 // that, verify table is usable and finally do bookkeeping to unify
96 // subcompaction results
99 // REQUIRED: mutex held
100 // Add compaction input/output to the current version
101 Status
Install(const MutableCFOptions
& mutable_cf_options
);
104 struct SubcompactionState
;
106 void AggregateStatistics();
108 // Generates a histogram representing potential divisions of key ranges from
109 // the input. It adds the starting and/or ending keys of certain input files
110 // to the working set and then finds the approximate size of data in between
111 // each consecutive pair of slices. Then it divides these ranges into
112 // consecutive groups such that each group has a similar size.
113 void GenSubcompactionBoundaries();
115 // update the thread status for starting a compaction.
116 void ReportStartedCompaction(Compaction
* compaction
);
117 void AllocateCompactionOutputFileNumbers();
118 // Call compaction filter. Then iterate through input and compact the
120 void ProcessKeyValueCompaction(SubcompactionState
* sub_compact
);
122 Status
FinishCompactionOutputFile(
123 const Status
& input_status
, SubcompactionState
* sub_compact
,
124 CompactionRangeDelAggregator
* range_del_agg
,
125 CompactionIterationStats
* range_del_out_stats
,
126 const Slice
* next_table_min_key
= nullptr);
127 Status
InstallCompactionResults(const MutableCFOptions
& mutable_cf_options
);
128 void RecordCompactionIOStats();
129 Status
OpenCompactionOutputFile(SubcompactionState
* sub_compact
);
130 void CleanupCompaction();
131 void UpdateCompactionJobStats(
132 const InternalStats::CompactionStats
& stats
) const;
133 void RecordDroppedKeys(const CompactionIterationStats
& c_iter_stats
,
134 CompactionJobStats
* compaction_job_stats
= nullptr);
136 void UpdateCompactionStats();
137 void UpdateCompactionInputStatsHelper(
138 int* num_files
, uint64_t* bytes_read
, int input_level
);
140 void LogCompaction();
144 // CompactionJob state
145 struct CompactionState
;
146 CompactionState
* compact_
;
147 CompactionJobStats
* compaction_job_stats_
;
148 InternalStats::CompactionStats compaction_stats_
;
151 const std::string
& dbname_
;
152 const ImmutableDBOptions
& db_options_
;
153 const FileOptions file_options_
;
157 // env_option optimized for compaction table reads
158 FileOptions file_options_for_read_
;
159 VersionSet
* versions_
;
160 const std::atomic
<bool>* shutting_down_
;
161 const std::atomic
<bool>* manual_compaction_paused_
;
162 const SequenceNumber preserve_deletes_seqnum_
;
163 LogBuffer
* log_buffer_
;
164 Directory
* db_directory_
;
165 Directory
* output_directory_
;
167 InstrumentedMutex
* db_mutex_
;
168 ErrorHandler
* db_error_handler_
;
169 // If there were two snapshots with seq numbers s1 and
170 // s2 and s1 < s2, and if we find two instances of a key k1 then lies
171 // entirely within s1 and s2, then the earlier version of k1 can be safely
172 // deleted because that version is not visible in any snapshot.
173 std::vector
<SequenceNumber
> existing_snapshots_
;
175 // This is the earliest snapshot that could be used for write-conflict
176 // checking by a transaction. For any user-key newer than this snapshot, we
177 // should make sure not to remove evidence that a write occurred.
178 SequenceNumber earliest_write_conflict_snapshot_
;
180 const SnapshotChecker
* const snapshot_checker_
;
182 std::shared_ptr
<Cache
> table_cache_
;
184 EventLogger
* event_logger_
;
186 // Is this compaction creating a file in the bottom most level?
187 bool bottommost_level_
;
188 bool paranoid_file_checks_
;
189 bool measure_io_stats_
;
190 // Stores the Slices that designate the boundaries for each subcompaction
191 std::vector
<Slice
> boundaries_
;
192 // Stores the approx size of keys covered in the range of each subcompaction
193 std::vector
<uint64_t> sizes_
;
194 Env::WriteLifeTimeHint write_hint_
;
195 Env::Priority thread_pri_
;
198 } // namespace ROCKSDB_NAMESPACE