1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
20 #include "db/column_family.h"
21 #include "db/compaction_iterator.h"
22 #include "db/dbformat.h"
23 #include "db/flush_scheduler.h"
24 #include "db/internal_stats.h"
25 #include "db/job_context.h"
26 #include "db/log_writer.h"
27 #include "db/memtable_list.h"
28 #include "db/range_del_aggregator.h"
29 #include "db/version_edit.h"
30 #include "db/write_controller.h"
31 #include "db/write_thread.h"
32 #include "options/cf_options.h"
33 #include "options/db_options.h"
34 #include "port/port.h"
35 #include "rocksdb/compaction_filter.h"
36 #include "rocksdb/compaction_job_stats.h"
37 #include "rocksdb/db.h"
38 #include "rocksdb/env.h"
39 #include "rocksdb/memtablerep.h"
40 #include "rocksdb/transaction_log.h"
41 #include "table/scoped_arena_iterator.h"
42 #include "util/autovector.h"
43 #include "util/event_logger.h"
44 #include "util/stop_watch.h"
45 #include "util/thread_local.h"
52 class SnapshotChecker
;
60 CompactionJob(int job_id
, Compaction
* compaction
,
61 const ImmutableDBOptions
& db_options
,
62 const EnvOptions env_options
, VersionSet
* versions
,
63 const std::atomic
<bool>* shutting_down
,
64 const SequenceNumber preserve_deletes_seqnum
,
65 LogBuffer
* log_buffer
, Directory
* db_directory
,
66 Directory
* output_directory
, Statistics
* stats
,
67 InstrumentedMutex
* db_mutex
, ErrorHandler
* db_error_handler
,
68 std::vector
<SequenceNumber
> existing_snapshots
,
69 SequenceNumber earliest_write_conflict_snapshot
,
70 const SnapshotChecker
* snapshot_checker
,
71 std::shared_ptr
<Cache
> table_cache
, EventLogger
* event_logger
,
72 bool paranoid_file_checks
, bool measure_io_stats
,
73 const std::string
& dbname
,
74 CompactionJobStats
* compaction_job_stats
,
75 Env::Priority thread_pri
);
80 CompactionJob(CompactionJob
&& job
) = delete;
81 CompactionJob(const CompactionJob
& job
) = delete;
82 CompactionJob
& operator=(const CompactionJob
& job
) = delete;
84 // REQUIRED: mutex held
86 // REQUIRED mutex not held
89 // REQUIRED: mutex held
90 Status
Install(const MutableCFOptions
& mutable_cf_options
);
93 struct SubcompactionState
;
95 void AggregateStatistics();
96 void GenSubcompactionBoundaries();
98 // update the thread status for starting a compaction.
99 void ReportStartedCompaction(Compaction
* compaction
);
100 void AllocateCompactionOutputFileNumbers();
101 // Call compaction filter. Then iterate through input and compact the
103 void ProcessKeyValueCompaction(SubcompactionState
* sub_compact
);
105 Status
FinishCompactionOutputFile(
106 const Status
& input_status
, SubcompactionState
* sub_compact
,
107 CompactionRangeDelAggregator
* range_del_agg
,
108 CompactionIterationStats
* range_del_out_stats
,
109 const Slice
* next_table_min_key
= nullptr);
110 Status
InstallCompactionResults(const MutableCFOptions
& mutable_cf_options
);
111 void RecordCompactionIOStats();
112 Status
OpenCompactionOutputFile(SubcompactionState
* sub_compact
);
113 void CleanupCompaction();
114 void UpdateCompactionJobStats(
115 const InternalStats::CompactionStats
& stats
) const;
116 void RecordDroppedKeys(const CompactionIterationStats
& c_iter_stats
,
117 CompactionJobStats
* compaction_job_stats
= nullptr);
119 void UpdateCompactionStats();
120 void UpdateCompactionInputStatsHelper(
121 int* num_files
, uint64_t* bytes_read
, int input_level
);
123 void LogCompaction();
127 // CompactionJob state
128 struct CompactionState
;
129 CompactionState
* compact_
;
130 CompactionJobStats
* compaction_job_stats_
;
131 InternalStats::CompactionStats compaction_stats_
;
134 const std::string
& dbname_
;
135 const ImmutableDBOptions
& db_options_
;
136 const EnvOptions env_options_
;
139 // env_option optimized for compaction table reads
140 EnvOptions env_optiosn_for_read_
;
141 VersionSet
* versions_
;
142 const std::atomic
<bool>* shutting_down_
;
143 const SequenceNumber preserve_deletes_seqnum_
;
144 LogBuffer
* log_buffer_
;
145 Directory
* db_directory_
;
146 Directory
* output_directory_
;
148 InstrumentedMutex
* db_mutex_
;
149 ErrorHandler
* db_error_handler_
;
150 // If there were two snapshots with seq numbers s1 and
151 // s2 and s1 < s2, and if we find two instances of a key k1 then lies
152 // entirely within s1 and s2, then the earlier version of k1 can be safely
153 // deleted because that version is not visible in any snapshot.
154 std::vector
<SequenceNumber
> existing_snapshots_
;
156 // This is the earliest snapshot that could be used for write-conflict
157 // checking by a transaction. For any user-key newer than this snapshot, we
158 // should make sure not to remove evidence that a write occurred.
159 SequenceNumber earliest_write_conflict_snapshot_
;
161 const SnapshotChecker
* const snapshot_checker_
;
163 std::shared_ptr
<Cache
> table_cache_
;
165 EventLogger
* event_logger_
;
167 bool bottommost_level_
;
168 bool paranoid_file_checks_
;
169 bool measure_io_stats_
;
170 // Stores the Slices that designate the boundaries for each subcompaction
171 std::vector
<Slice
> boundaries_
;
172 // Stores the approx size of keys covered in the range of each subcompaction
173 std::vector
<uint64_t> sizes_
;
174 Env::WriteLifeTimeHint write_hint_
;
175 Env::Priority thread_pri_
;
178 } // namespace rocksdb