]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
20effc67 TL |
10 | #include "db/compaction/compaction_job.h" |
11 | ||
7c673cae | 12 | #include <algorithm> |
f67539c2 | 13 | #include <cinttypes> |
7c673cae FG |
14 | #include <functional> |
15 | #include <list> | |
16 | #include <memory> | |
17 | #include <random> | |
18 | #include <set> | |
19 | #include <thread> | |
20 | #include <utility> | |
21 | #include <vector> | |
22 | ||
20effc67 TL |
23 | #include "db/blob/blob_file_addition.h" |
24 | #include "db/blob/blob_file_builder.h" | |
7c673cae | 25 | #include "db/builder.h" |
f67539c2 | 26 | #include "db/db_impl/db_impl.h" |
7c673cae FG |
27 | #include "db/db_iter.h" |
28 | #include "db/dbformat.h" | |
11fdf7f2 | 29 | #include "db/error_handler.h" |
7c673cae FG |
30 | #include "db/event_helpers.h" |
31 | #include "db/log_reader.h" | |
32 | #include "db/log_writer.h" | |
33 | #include "db/memtable.h" | |
34 | #include "db/memtable_list.h" | |
35 | #include "db/merge_context.h" | |
36 | #include "db/merge_helper.h" | |
20effc67 | 37 | #include "db/output_validator.h" |
494da23a | 38 | #include "db/range_del_aggregator.h" |
7c673cae | 39 | #include "db/version_set.h" |
f67539c2 TL |
40 | #include "file/filename.h" |
41 | #include "file/read_write_util.h" | |
42 | #include "file/sst_file_manager_impl.h" | |
43 | #include "file/writable_file_writer.h" | |
44 | #include "logging/log_buffer.h" | |
45 | #include "logging/logging.h" | |
7c673cae FG |
46 | #include "monitoring/iostats_context_imp.h" |
47 | #include "monitoring/perf_context_imp.h" | |
48 | #include "monitoring/thread_status_util.h" | |
7c673cae FG |
49 | #include "port/port.h" |
50 | #include "rocksdb/db.h" | |
51 | #include "rocksdb/env.h" | |
20effc67 | 52 | #include "rocksdb/sst_partitioner.h" |
7c673cae FG |
53 | #include "rocksdb/statistics.h" |
54 | #include "rocksdb/status.h" | |
55 | #include "rocksdb/table.h" | |
f67539c2 TL |
56 | #include "table/block_based/block.h" |
57 | #include "table/block_based/block_based_table_factory.h" | |
7c673cae FG |
58 | #include "table/merging_iterator.h" |
59 | #include "table/table_builder.h" | |
f67539c2 | 60 | #include "test_util/sync_point.h" |
7c673cae | 61 | #include "util/coding.h" |
20effc67 | 62 | #include "util/hash.h" |
7c673cae FG |
63 | #include "util/mutexlock.h" |
64 | #include "util/random.h" | |
7c673cae FG |
65 | #include "util/stop_watch.h" |
66 | #include "util/string_util.h" | |
7c673cae | 67 | |
f67539c2 | 68 | namespace ROCKSDB_NAMESPACE { |
7c673cae | 69 | |
11fdf7f2 TL |
70 | const char* GetCompactionReasonString(CompactionReason compaction_reason) { |
71 | switch (compaction_reason) { | |
72 | case CompactionReason::kUnknown: | |
73 | return "Unknown"; | |
74 | case CompactionReason::kLevelL0FilesNum: | |
75 | return "LevelL0FilesNum"; | |
76 | case CompactionReason::kLevelMaxLevelSize: | |
77 | return "LevelMaxLevelSize"; | |
78 | case CompactionReason::kUniversalSizeAmplification: | |
79 | return "UniversalSizeAmplification"; | |
80 | case CompactionReason::kUniversalSizeRatio: | |
81 | return "UniversalSizeRatio"; | |
82 | case CompactionReason::kUniversalSortedRunNum: | |
83 | return "UniversalSortedRunNum"; | |
84 | case CompactionReason::kFIFOMaxSize: | |
85 | return "FIFOMaxSize"; | |
86 | case CompactionReason::kFIFOReduceNumFiles: | |
87 | return "FIFOReduceNumFiles"; | |
88 | case CompactionReason::kFIFOTtl: | |
89 | return "FIFOTtl"; | |
90 | case CompactionReason::kManualCompaction: | |
91 | return "ManualCompaction"; | |
92 | case CompactionReason::kFilesMarkedForCompaction: | |
93 | return "FilesMarkedForCompaction"; | |
94 | case CompactionReason::kBottommostFiles: | |
95 | return "BottommostFiles"; | |
96 | case CompactionReason::kTtl: | |
97 | return "Ttl"; | |
98 | case CompactionReason::kFlush: | |
99 | return "Flush"; | |
100 | case CompactionReason::kExternalSstIngestion: | |
101 | return "ExternalSstIngestion"; | |
f67539c2 TL |
102 | case CompactionReason::kPeriodicCompaction: |
103 | return "PeriodicCompaction"; | |
11fdf7f2 TL |
104 | case CompactionReason::kNumOfReasons: |
105 | // fall through | |
106 | default: | |
107 | assert(false); | |
108 | return "Invalid"; | |
109 | } | |
110 | } | |
111 | ||
7c673cae FG |
112 | // Maintains state for each sub-compaction |
113 | struct CompactionJob::SubcompactionState { | |
114 | const Compaction* compaction; | |
115 | std::unique_ptr<CompactionIterator> c_iter; | |
116 | ||
117 | // The boundaries of the key-range this compaction is interested in. No two | |
118 | // subcompactions may have overlapping key-ranges. | |
119 | // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded | |
120 | Slice *start, *end; | |
121 | ||
122 | // The return status of this subcompaction | |
123 | Status status; | |
124 | ||
20effc67 TL |
125 | // The return IO Status of this subcompaction |
126 | IOStatus io_status; | |
127 | ||
7c673cae FG |
128 | // Files produced by this subcompaction |
129 | struct Output { | |
20effc67 TL |
130 | Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp, |
131 | bool _enable_order_check, bool _enable_hash) | |
132 | : meta(std::move(_meta)), | |
133 | validator(_icmp, _enable_order_check, _enable_hash), | |
134 | finished(false) {} | |
7c673cae | 135 | FileMetaData meta; |
20effc67 | 136 | OutputValidator validator; |
7c673cae FG |
137 | bool finished; |
138 | std::shared_ptr<const TableProperties> table_properties; | |
139 | }; | |
140 | ||
141 | // State kept for output being generated | |
142 | std::vector<Output> outputs; | |
20effc67 | 143 | std::vector<BlobFileAddition> blob_file_additions; |
7c673cae FG |
144 | std::unique_ptr<WritableFileWriter> outfile; |
145 | std::unique_ptr<TableBuilder> builder; | |
20effc67 | 146 | |
7c673cae FG |
147 | Output* current_output() { |
148 | if (outputs.empty()) { | |
20effc67 | 149 | // This subcompaction's output could be empty if compaction was aborted |
7c673cae FG |
150 | // before this subcompaction had a chance to generate any output files. |
151 | // When subcompactions are executed sequentially this is more likely and | |
152 | // will be particulalry likely for the later subcompactions to be empty. | |
153 | // Once they are run in parallel however it should be much rarer. | |
154 | return nullptr; | |
155 | } else { | |
156 | return &outputs.back(); | |
157 | } | |
158 | } | |
159 | ||
20effc67 | 160 | uint64_t current_output_file_size = 0; |
7c673cae FG |
161 | |
162 | // State during the subcompaction | |
20effc67 TL |
163 | uint64_t total_bytes = 0; |
164 | uint64_t num_output_records = 0; | |
7c673cae | 165 | CompactionJobStats compaction_job_stats; |
20effc67 | 166 | uint64_t approx_size = 0; |
7c673cae FG |
167 | // An index that used to speed up ShouldStopBefore(). |
168 | size_t grandparent_index = 0; | |
169 | // The number of bytes overlapping between the current output and | |
170 | // grandparent files used in ShouldStopBefore(). | |
171 | uint64_t overlapped_bytes = 0; | |
172 | // A flag determine whether the key has been seen in ShouldStopBefore() | |
173 | bool seen_key = false; | |
7c673cae | 174 | |
20effc67 TL |
175 | SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size) |
176 | : compaction(c), start(_start), end(_end), approx_size(size) { | |
7c673cae FG |
177 | assert(compaction != nullptr); |
178 | } | |
179 | ||
20effc67 TL |
180 | // Adds the key and value to the builder |
181 | // If paranoid is true, adds the key-value to the paranoid hash | |
182 | Status AddToBuilder(const Slice& key, const Slice& value) { | |
183 | auto curr = current_output(); | |
184 | assert(builder != nullptr); | |
185 | assert(curr != nullptr); | |
186 | Status s = curr->validator.Add(key, value); | |
187 | if (!s.ok()) { | |
188 | return s; | |
189 | } | |
190 | builder->Add(key, value); | |
191 | return Status::OK(); | |
7c673cae FG |
192 | } |
193 | ||
7c673cae FG |
194 | // Returns true iff we should stop building the current output |
195 | // before processing "internal_key". | |
196 | bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) { | |
197 | const InternalKeyComparator* icmp = | |
198 | &compaction->column_family_data()->internal_comparator(); | |
199 | const std::vector<FileMetaData*>& grandparents = compaction->grandparents(); | |
200 | ||
201 | // Scan to find earliest grandparent file that contains key. | |
202 | while (grandparent_index < grandparents.size() && | |
203 | icmp->Compare(internal_key, | |
204 | grandparents[grandparent_index]->largest.Encode()) > | |
205 | 0) { | |
206 | if (seen_key) { | |
207 | overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize(); | |
208 | } | |
209 | assert(grandparent_index + 1 >= grandparents.size() || | |
210 | icmp->Compare( | |
211 | grandparents[grandparent_index]->largest.Encode(), | |
212 | grandparents[grandparent_index + 1]->smallest.Encode()) <= 0); | |
213 | grandparent_index++; | |
214 | } | |
215 | seen_key = true; | |
216 | ||
217 | if (overlapped_bytes + curr_file_size > | |
218 | compaction->max_compaction_bytes()) { | |
219 | // Too much overlap for current output; start new output | |
220 | overlapped_bytes = 0; | |
221 | return true; | |
222 | } | |
223 | ||
224 | return false; | |
225 | } | |
226 | }; | |
227 | ||
228 | // Maintains state for the entire compaction | |
229 | struct CompactionJob::CompactionState { | |
230 | Compaction* const compaction; | |
231 | ||
232 | // REQUIRED: subcompaction states are stored in order of increasing | |
233 | // key-range | |
234 | std::vector<CompactionJob::SubcompactionState> sub_compact_states; | |
235 | Status status; | |
236 | ||
20effc67 TL |
237 | size_t num_output_files = 0; |
238 | uint64_t total_bytes = 0; | |
239 | size_t num_blob_output_files = 0; | |
240 | uint64_t total_blob_bytes = 0; | |
241 | uint64_t num_output_records = 0; | |
7c673cae | 242 | |
20effc67 | 243 | explicit CompactionState(Compaction* c) : compaction(c) {} |
7c673cae FG |
244 | |
245 | Slice SmallestUserKey() { | |
246 | for (const auto& sub_compact_state : sub_compact_states) { | |
247 | if (!sub_compact_state.outputs.empty() && | |
248 | sub_compact_state.outputs[0].finished) { | |
249 | return sub_compact_state.outputs[0].meta.smallest.user_key(); | |
250 | } | |
251 | } | |
252 | // If there is no finished output, return an empty slice. | |
253 | return Slice(nullptr, 0); | |
254 | } | |
255 | ||
256 | Slice LargestUserKey() { | |
257 | for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend(); | |
258 | ++it) { | |
259 | if (!it->outputs.empty() && it->current_output()->finished) { | |
260 | assert(it->current_output() != nullptr); | |
261 | return it->current_output()->meta.largest.user_key(); | |
262 | } | |
263 | } | |
264 | // If there is no finished output, return an empty slice. | |
265 | return Slice(nullptr, 0); | |
266 | } | |
267 | }; | |
268 | ||
269 | void CompactionJob::AggregateStatistics() { | |
20effc67 TL |
270 | assert(compact_); |
271 | ||
7c673cae | 272 | for (SubcompactionState& sc : compact_->sub_compact_states) { |
20effc67 TL |
273 | auto& outputs = sc.outputs; |
274 | ||
275 | if (!outputs.empty() && !outputs.back().meta.fd.file_size) { | |
276 | // An error occurred, so ignore the last output. | |
277 | outputs.pop_back(); | |
278 | } | |
279 | ||
280 | compact_->num_output_files += outputs.size(); | |
7c673cae | 281 | compact_->total_bytes += sc.total_bytes; |
20effc67 TL |
282 | |
283 | const auto& blobs = sc.blob_file_additions; | |
284 | ||
285 | compact_->num_blob_output_files += blobs.size(); | |
286 | ||
287 | for (const auto& blob : blobs) { | |
288 | compact_->total_blob_bytes += blob.GetTotalBlobBytes(); | |
7c673cae | 289 | } |
20effc67 TL |
290 | |
291 | compact_->num_output_records += sc.num_output_records; | |
292 | ||
293 | compaction_job_stats_->Add(sc.compaction_job_stats); | |
7c673cae FG |
294 | } |
295 | } | |
296 | ||
297 | CompactionJob::CompactionJob( | |
298 | int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, | |
f67539c2 | 299 | const FileOptions& file_options, VersionSet* versions, |
11fdf7f2 TL |
300 | const std::atomic<bool>* shutting_down, |
301 | const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, | |
20effc67 TL |
302 | FSDirectory* db_directory, FSDirectory* output_directory, |
303 | FSDirectory* blob_output_directory, Statistics* stats, | |
11fdf7f2 | 304 | InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, |
7c673cae FG |
305 | std::vector<SequenceNumber> existing_snapshots, |
306 | SequenceNumber earliest_write_conflict_snapshot, | |
11fdf7f2 TL |
307 | const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache, |
308 | EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats, | |
494da23a | 309 | const std::string& dbname, CompactionJobStats* compaction_job_stats, |
20effc67 TL |
310 | Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer, |
311 | const std::atomic<int>* manual_compaction_paused, const std::string& db_id, | |
312 | const std::string& db_session_id, std::string full_history_ts_low) | |
7c673cae FG |
313 | : job_id_(job_id), |
314 | compact_(new CompactionState(compaction)), | |
315 | compaction_job_stats_(compaction_job_stats), | |
11fdf7f2 | 316 | compaction_stats_(compaction->compaction_reason(), 1), |
7c673cae | 317 | dbname_(dbname), |
20effc67 TL |
318 | db_id_(db_id), |
319 | db_session_id_(db_session_id), | |
7c673cae | 320 | db_options_(db_options), |
f67539c2 | 321 | file_options_(file_options), |
7c673cae | 322 | env_(db_options.env), |
20effc67 TL |
323 | io_tracer_(io_tracer), |
324 | fs_(db_options.fs, io_tracer), | |
f67539c2 TL |
325 | file_options_for_read_( |
326 | fs_->OptimizeForCompactionTableRead(file_options, db_options_)), | |
7c673cae FG |
327 | versions_(versions), |
328 | shutting_down_(shutting_down), | |
f67539c2 | 329 | manual_compaction_paused_(manual_compaction_paused), |
11fdf7f2 | 330 | preserve_deletes_seqnum_(preserve_deletes_seqnum), |
7c673cae FG |
331 | log_buffer_(log_buffer), |
332 | db_directory_(db_directory), | |
333 | output_directory_(output_directory), | |
20effc67 | 334 | blob_output_directory_(blob_output_directory), |
7c673cae FG |
335 | stats_(stats), |
336 | db_mutex_(db_mutex), | |
11fdf7f2 | 337 | db_error_handler_(db_error_handler), |
7c673cae FG |
338 | existing_snapshots_(std::move(existing_snapshots)), |
339 | earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), | |
11fdf7f2 | 340 | snapshot_checker_(snapshot_checker), |
7c673cae FG |
341 | table_cache_(std::move(table_cache)), |
342 | event_logger_(event_logger), | |
11fdf7f2 | 343 | bottommost_level_(false), |
7c673cae | 344 | paranoid_file_checks_(paranoid_file_checks), |
11fdf7f2 | 345 | measure_io_stats_(measure_io_stats), |
494da23a | 346 | write_hint_(Env::WLTH_NOT_SET), |
20effc67 TL |
347 | thread_pri_(thread_pri), |
348 | full_history_ts_low_(std::move(full_history_ts_low)) { | |
349 | assert(compaction_job_stats_ != nullptr); | |
7c673cae FG |
350 | assert(log_buffer_ != nullptr); |
351 | const auto* cfd = compact_->compaction->column_family_data(); | |
352 | ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env, | |
353 | db_options_.enable_thread_tracking); | |
354 | ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); | |
355 | ReportStartedCompaction(compaction); | |
356 | } | |
357 | ||
358 | CompactionJob::~CompactionJob() { | |
359 | assert(compact_ == nullptr); | |
360 | ThreadStatusUtil::ResetThreadStatus(); | |
361 | } | |
362 | ||
11fdf7f2 | 363 | void CompactionJob::ReportStartedCompaction(Compaction* compaction) { |
7c673cae FG |
364 | const auto* cfd = compact_->compaction->column_family_data(); |
365 | ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env, | |
366 | db_options_.enable_thread_tracking); | |
367 | ||
11fdf7f2 TL |
368 | ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID, |
369 | job_id_); | |
7c673cae FG |
370 | |
371 | ThreadStatusUtil::SetThreadOperationProperty( | |
372 | ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL, | |
373 | (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) + | |
374 | compact_->compaction->output_level()); | |
375 | ||
376 | // In the current design, a CompactionJob is always created | |
377 | // for non-trivial compaction. | |
378 | assert(compaction->IsTrivialMove() == false || | |
379 | compaction->is_manual_compaction() == true); | |
380 | ||
381 | ThreadStatusUtil::SetThreadOperationProperty( | |
382 | ThreadStatus::COMPACTION_PROP_FLAGS, | |
383 | compaction->is_manual_compaction() + | |
384 | (compaction->deletion_compaction() << 1)); | |
385 | ||
386 | ThreadStatusUtil::SetThreadOperationProperty( | |
387 | ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, | |
388 | compaction->CalculateTotalInputSize()); | |
389 | ||
390 | IOSTATS_RESET(bytes_written); | |
391 | IOSTATS_RESET(bytes_read); | |
392 | ThreadStatusUtil::SetThreadOperationProperty( | |
393 | ThreadStatus::COMPACTION_BYTES_WRITTEN, 0); | |
394 | ThreadStatusUtil::SetThreadOperationProperty( | |
395 | ThreadStatus::COMPACTION_BYTES_READ, 0); | |
396 | ||
397 | // Set the thread operation after operation properties | |
398 | // to ensure GetThreadList() can always show them all together. | |
11fdf7f2 | 399 | ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); |
7c673cae | 400 | |
20effc67 TL |
401 | compaction_job_stats_->is_manual_compaction = |
402 | compaction->is_manual_compaction(); | |
403 | compaction_job_stats_->is_full_compaction = compaction->is_full_compaction(); | |
7c673cae FG |
404 | } |
405 | ||
406 | void CompactionJob::Prepare() { | |
407 | AutoThreadOperationStageUpdater stage_updater( | |
408 | ThreadStatus::STAGE_COMPACTION_PREPARE); | |
409 | ||
410 | // Generate file_levels_ for compaction berfore making Iterator | |
411 | auto* c = compact_->compaction; | |
412 | assert(c->column_family_data() != nullptr); | |
11fdf7f2 TL |
413 | assert(c->column_family_data()->current()->storage_info()->NumLevelFiles( |
414 | compact_->compaction->level()) > 0); | |
7c673cae | 415 | |
11fdf7f2 TL |
416 | write_hint_ = |
417 | c->column_family_data()->CalculateSSTWriteHint(c->output_level()); | |
7c673cae FG |
418 | bottommost_level_ = c->bottommost_level(); |
419 | ||
420 | if (c->ShouldFormSubcompactions()) { | |
494da23a TL |
421 | { |
422 | StopWatch sw(env_, stats_, SUBCOMPACTION_SETUP_TIME); | |
423 | GenSubcompactionBoundaries(); | |
424 | } | |
7c673cae FG |
425 | assert(sizes_.size() == boundaries_.size() + 1); |
426 | ||
427 | for (size_t i = 0; i <= boundaries_.size(); i++) { | |
428 | Slice* start = i == 0 ? nullptr : &boundaries_[i - 1]; | |
429 | Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i]; | |
430 | compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]); | |
431 | } | |
494da23a TL |
432 | RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED, |
433 | compact_->sub_compact_states.size()); | |
7c673cae | 434 | } else { |
20effc67 TL |
435 | constexpr Slice* start = nullptr; |
436 | constexpr Slice* end = nullptr; | |
437 | constexpr uint64_t size = 0; | |
438 | ||
439 | compact_->sub_compact_states.emplace_back(c, start, end, size); | |
7c673cae FG |
440 | } |
441 | } | |
442 | ||
443 | struct RangeWithSize { | |
444 | Range range; | |
445 | uint64_t size; | |
446 | ||
447 | RangeWithSize(const Slice& a, const Slice& b, uint64_t s = 0) | |
448 | : range(a, b), size(s) {} | |
449 | }; | |
450 | ||
7c673cae FG |
451 | void CompactionJob::GenSubcompactionBoundaries() { |
452 | auto* c = compact_->compaction; | |
453 | auto* cfd = c->column_family_data(); | |
454 | const Comparator* cfd_comparator = cfd->user_comparator(); | |
455 | std::vector<Slice> bounds; | |
456 | int start_lvl = c->start_level(); | |
457 | int out_lvl = c->output_level(); | |
458 | ||
459 | // Add the starting and/or ending key of certain input files as a potential | |
460 | // boundary | |
461 | for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) { | |
462 | int lvl = c->level(lvl_idx); | |
463 | if (lvl >= start_lvl && lvl <= out_lvl) { | |
464 | const LevelFilesBrief* flevel = c->input_levels(lvl_idx); | |
465 | size_t num_files = flevel->num_files; | |
466 | ||
467 | if (num_files == 0) { | |
468 | continue; | |
469 | } | |
470 | ||
471 | if (lvl == 0) { | |
472 | // For level 0 add the starting and ending key of each file since the | |
473 | // files may have greatly differing key ranges (not range-partitioned) | |
474 | for (size_t i = 0; i < num_files; i++) { | |
475 | bounds.emplace_back(flevel->files[i].smallest_key); | |
476 | bounds.emplace_back(flevel->files[i].largest_key); | |
477 | } | |
478 | } else { | |
479 | // For all other levels add the smallest/largest key in the level to | |
480 | // encompass the range covered by that level | |
481 | bounds.emplace_back(flevel->files[0].smallest_key); | |
482 | bounds.emplace_back(flevel->files[num_files - 1].largest_key); | |
483 | if (lvl == out_lvl) { | |
484 | // For the last level include the starting keys of all files since | |
485 | // the last level is the largest and probably has the widest key | |
486 | // range. Since it's range partitioned, the ending key of one file | |
487 | // and the starting key of the next are very close (or identical). | |
488 | for (size_t i = 1; i < num_files; i++) { | |
489 | bounds.emplace_back(flevel->files[i].smallest_key); | |
490 | } | |
491 | } | |
492 | } | |
493 | } | |
494 | } | |
495 | ||
496 | std::sort(bounds.begin(), bounds.end(), | |
11fdf7f2 TL |
497 | [cfd_comparator](const Slice& a, const Slice& b) -> bool { |
498 | return cfd_comparator->Compare(ExtractUserKey(a), | |
499 | ExtractUserKey(b)) < 0; | |
500 | }); | |
7c673cae | 501 | // Remove duplicated entries from bounds |
11fdf7f2 TL |
502 | bounds.erase( |
503 | std::unique(bounds.begin(), bounds.end(), | |
504 | [cfd_comparator](const Slice& a, const Slice& b) -> bool { | |
505 | return cfd_comparator->Compare(ExtractUserKey(a), | |
506 | ExtractUserKey(b)) == 0; | |
507 | }), | |
508 | bounds.end()); | |
7c673cae FG |
509 | |
510 | // Combine consecutive pairs of boundaries into ranges with an approximate | |
511 | // size of data covered by keys in that range | |
512 | uint64_t sum = 0; | |
513 | std::vector<RangeWithSize> ranges; | |
494da23a TL |
514 | // Get input version from CompactionState since it's already referenced |
515 | // earlier in SetInputVersioCompaction::SetInputVersion and will not change | |
516 | // when db_mutex_ is released below | |
517 | auto* v = compact_->compaction->input_version(); | |
7c673cae FG |
518 | for (auto it = bounds.begin();;) { |
519 | const Slice a = *it; | |
f67539c2 | 520 | ++it; |
7c673cae FG |
521 | |
522 | if (it == bounds.end()) { | |
523 | break; | |
524 | } | |
525 | ||
526 | const Slice b = *it; | |
494da23a TL |
527 | |
528 | // ApproximateSize could potentially create table reader iterator to seek | |
529 | // to the index block and may incur I/O cost in the process. Unlock db | |
530 | // mutex to reduce contention | |
531 | db_mutex_->Unlock(); | |
f67539c2 TL |
532 | uint64_t size = versions_->ApproximateSize(SizeApproximationOptions(), v, a, |
533 | b, start_lvl, out_lvl + 1, | |
534 | TableReaderCaller::kCompaction); | |
494da23a | 535 | db_mutex_->Lock(); |
7c673cae FG |
536 | ranges.emplace_back(a, b, size); |
537 | sum += size; | |
538 | } | |
539 | ||
540 | // Group the ranges into subcompactions | |
541 | const double min_file_fill_percent = 4.0 / 5; | |
11fdf7f2 TL |
542 | int base_level = v->storage_info()->base_level(); |
543 | uint64_t max_output_files = static_cast<uint64_t>(std::ceil( | |
544 | sum / min_file_fill_percent / | |
545 | MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl, | |
546 | c->immutable_cf_options()->compaction_style, base_level, | |
547 | c->immutable_cf_options()->level_compaction_dynamic_level_bytes))); | |
7c673cae FG |
548 | uint64_t subcompactions = |
549 | std::min({static_cast<uint64_t>(ranges.size()), | |
11fdf7f2 | 550 | static_cast<uint64_t>(c->max_subcompactions()), |
7c673cae FG |
551 | max_output_files}); |
552 | ||
553 | if (subcompactions > 1) { | |
554 | double mean = sum * 1.0 / subcompactions; | |
555 | // Greedily add ranges to the subcompaction until the sum of the ranges' | |
556 | // sizes becomes >= the expected mean size of a subcompaction | |
557 | sum = 0; | |
20effc67 | 558 | for (size_t i = 0; i + 1 < ranges.size(); i++) { |
7c673cae FG |
559 | sum += ranges[i].size; |
560 | if (subcompactions == 1) { | |
561 | // If there's only one left to schedule then it goes to the end so no | |
562 | // need to put an end boundary | |
563 | continue; | |
564 | } | |
565 | if (sum >= mean) { | |
566 | boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit)); | |
567 | sizes_.emplace_back(sum); | |
568 | subcompactions--; | |
569 | sum = 0; | |
570 | } | |
571 | } | |
572 | sizes_.emplace_back(sum + ranges.back().size); | |
573 | } else { | |
574 | // Only one range so its size is the total sum of sizes computed above | |
575 | sizes_.emplace_back(sum); | |
576 | } | |
577 | } | |
578 | ||
579 | Status CompactionJob::Run() { | |
580 | AutoThreadOperationStageUpdater stage_updater( | |
581 | ThreadStatus::STAGE_COMPACTION_RUN); | |
582 | TEST_SYNC_POINT("CompactionJob::Run():Start"); | |
583 | log_buffer_->FlushBufferToLog(); | |
584 | LogCompaction(); | |
585 | ||
586 | const size_t num_threads = compact_->sub_compact_states.size(); | |
587 | assert(num_threads > 0); | |
588 | const uint64_t start_micros = env_->NowMicros(); | |
589 | ||
590 | // Launch a thread for each of subcompactions 1...num_threads-1 | |
591 | std::vector<port::Thread> thread_pool; | |
592 | thread_pool.reserve(num_threads - 1); | |
593 | for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) { | |
594 | thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this, | |
595 | &compact_->sub_compact_states[i]); | |
596 | } | |
597 | ||
598 | // Always schedule the first subcompaction (whether or not there are also | |
599 | // others) in the current thread to be efficient with resources | |
600 | ProcessKeyValueCompaction(&compact_->sub_compact_states[0]); | |
601 | ||
602 | // Wait for all other threads (if there are any) to finish execution | |
603 | for (auto& thread : thread_pool) { | |
604 | thread.join(); | |
605 | } | |
606 | ||
7c673cae | 607 | compaction_stats_.micros = env_->NowMicros() - start_micros; |
494da23a TL |
608 | compaction_stats_.cpu_micros = 0; |
609 | for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) { | |
610 | compaction_stats_.cpu_micros += | |
611 | compact_->sub_compact_states[i].compaction_job_stats.cpu_micros; | |
612 | } | |
613 | ||
614 | RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros); | |
615 | RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME, | |
616 | compaction_stats_.cpu_micros); | |
7c673cae | 617 | |
11fdf7f2 TL |
618 | TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify"); |
619 | ||
7c673cae FG |
620 | // Check if any thread encountered an error during execution |
621 | Status status; | |
20effc67 TL |
622 | IOStatus io_s; |
623 | bool wrote_new_blob_files = false; | |
624 | ||
7c673cae FG |
625 | for (const auto& state : compact_->sub_compact_states) { |
626 | if (!state.status.ok()) { | |
627 | status = state.status; | |
20effc67 | 628 | io_s = state.io_status; |
7c673cae FG |
629 | break; |
630 | } | |
20effc67 TL |
631 | |
632 | if (!state.blob_file_additions.empty()) { | |
633 | wrote_new_blob_files = true; | |
634 | } | |
7c673cae FG |
635 | } |
636 | ||
20effc67 TL |
637 | if (io_status_.ok()) { |
638 | io_status_ = io_s; | |
11fdf7f2 | 639 | } |
20effc67 TL |
640 | if (status.ok()) { |
641 | constexpr IODebugContext* dbg = nullptr; | |
642 | ||
643 | if (output_directory_) { | |
644 | io_s = output_directory_->Fsync(IOOptions(), dbg); | |
645 | } | |
11fdf7f2 | 646 | |
20effc67 TL |
647 | if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ && |
648 | blob_output_directory_ != output_directory_) { | |
649 | io_s = blob_output_directory_->Fsync(IOOptions(), dbg); | |
650 | } | |
651 | } | |
652 | if (io_status_.ok()) { | |
653 | io_status_ = io_s; | |
654 | } | |
655 | if (status.ok()) { | |
656 | status = io_s; | |
657 | } | |
11fdf7f2 TL |
658 | if (status.ok()) { |
659 | thread_pool.clear(); | |
20effc67 | 660 | std::vector<const CompactionJob::SubcompactionState::Output*> files_output; |
11fdf7f2 TL |
661 | for (const auto& state : compact_->sub_compact_states) { |
662 | for (const auto& output : state.outputs) { | |
20effc67 | 663 | files_output.emplace_back(&output); |
11fdf7f2 TL |
664 | } |
665 | } | |
666 | ColumnFamilyData* cfd = compact_->compaction->column_family_data(); | |
667 | auto prefix_extractor = | |
668 | compact_->compaction->mutable_cf_options()->prefix_extractor.get(); | |
20effc67 | 669 | std::atomic<size_t> next_file_idx(0); |
11fdf7f2 TL |
670 | auto verify_table = [&](Status& output_status) { |
671 | while (true) { | |
20effc67 TL |
672 | size_t file_idx = next_file_idx.fetch_add(1); |
673 | if (file_idx >= files_output.size()) { | |
11fdf7f2 TL |
674 | break; |
675 | } | |
676 | // Verify that the table is usable | |
677 | // We set for_compaction to false and don't OptimizeForCompactionTableRead | |
678 | // here because this is a special case after we finish the table building | |
679 | // No matter whether use_direct_io_for_flush_and_compaction is true, | |
680 | // we will regard this verification as user reads since the goal is | |
681 | // to cache it here for further user reads | |
20effc67 | 682 | ReadOptions read_options; |
11fdf7f2 | 683 | InternalIterator* iter = cfd->table_cache()->NewIterator( |
20effc67 TL |
684 | read_options, file_options_, cfd->internal_comparator(), |
685 | files_output[file_idx]->meta, /*range_del_agg=*/nullptr, | |
686 | prefix_extractor, | |
f67539c2 | 687 | /*table_reader_ptr=*/nullptr, |
11fdf7f2 TL |
688 | cfd->internal_stats()->GetFileReadHist( |
689 | compact_->compaction->output_level()), | |
f67539c2 TL |
690 | TableReaderCaller::kCompactionRefill, /*arena=*/nullptr, |
691 | /*skip_filters=*/false, compact_->compaction->output_level(), | |
20effc67 TL |
692 | MaxFileSizeForL0MetaPin( |
693 | *compact_->compaction->mutable_cf_options()), | |
f67539c2 | 694 | /*smallest_compaction_key=*/nullptr, |
20effc67 TL |
695 | /*largest_compaction_key=*/nullptr, |
696 | /*allow_unprepared_value=*/false); | |
11fdf7f2 TL |
697 | auto s = iter->status(); |
698 | ||
699 | if (s.ok() && paranoid_file_checks_) { | |
20effc67 TL |
700 | OutputValidator validator(cfd->internal_comparator(), |
701 | /*_enable_order_check=*/true, | |
702 | /*_enable_hash=*/true); | |
703 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { | |
704 | s = validator.Add(iter->key(), iter->value()); | |
705 | if (!s.ok()) { | |
706 | break; | |
707 | } | |
708 | } | |
709 | if (s.ok()) { | |
710 | s = iter->status(); | |
711 | } | |
712 | if (s.ok() && | |
713 | !validator.CompareValidator(files_output[file_idx]->validator)) { | |
714 | s = Status::Corruption("Paranoid checksums do not match"); | |
715 | } | |
11fdf7f2 TL |
716 | } |
717 | ||
718 | delete iter; | |
719 | ||
720 | if (!s.ok()) { | |
721 | output_status = s; | |
722 | break; | |
723 | } | |
724 | } | |
725 | }; | |
726 | for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) { | |
727 | thread_pool.emplace_back(verify_table, | |
728 | std::ref(compact_->sub_compact_states[i].status)); | |
729 | } | |
730 | verify_table(compact_->sub_compact_states[0].status); | |
731 | for (auto& thread : thread_pool) { | |
732 | thread.join(); | |
733 | } | |
734 | for (const auto& state : compact_->sub_compact_states) { | |
735 | if (!state.status.ok()) { | |
736 | status = state.status; | |
737 | break; | |
738 | } | |
739 | } | |
740 | } | |
741 | ||
7c673cae FG |
742 | TablePropertiesCollection tp; |
743 | for (const auto& state : compact_->sub_compact_states) { | |
744 | for (const auto& output : state.outputs) { | |
11fdf7f2 TL |
745 | auto fn = |
746 | TableFileName(state.compaction->immutable_cf_options()->cf_paths, | |
747 | output.meta.fd.GetNumber(), output.meta.fd.GetPathId()); | |
7c673cae FG |
748 | tp[fn] = output.table_properties; |
749 | } | |
750 | } | |
751 | compact_->compaction->SetOutputTableProperties(std::move(tp)); | |
752 | ||
753 | // Finish up all book-keeping to unify the subcompaction results | |
754 | AggregateStatistics(); | |
755 | UpdateCompactionStats(); | |
20effc67 | 756 | |
7c673cae FG |
757 | RecordCompactionIOStats(); |
758 | LogFlush(db_options_.info_log); | |
759 | TEST_SYNC_POINT("CompactionJob::Run():End"); | |
760 | ||
761 | compact_->status = status; | |
762 | return status; | |
763 | } | |
764 | ||
765 | Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) { | |
20effc67 TL |
766 | assert(compact_); |
767 | ||
7c673cae FG |
768 | AutoThreadOperationStageUpdater stage_updater( |
769 | ThreadStatus::STAGE_COMPACTION_INSTALL); | |
770 | db_mutex_->AssertHeld(); | |
771 | Status status = compact_->status; | |
20effc67 | 772 | |
7c673cae | 773 | ColumnFamilyData* cfd = compact_->compaction->column_family_data(); |
20effc67 TL |
774 | assert(cfd); |
775 | ||
7c673cae | 776 | cfd->internal_stats()->AddCompactionStats( |
494da23a | 777 | compact_->compaction->output_level(), thread_pri_, compaction_stats_); |
7c673cae FG |
778 | |
779 | if (status.ok()) { | |
780 | status = InstallCompactionResults(mutable_cf_options); | |
781 | } | |
20effc67 TL |
782 | if (!versions_->io_status().ok()) { |
783 | io_status_ = versions_->io_status(); | |
784 | } | |
785 | ||
7c673cae FG |
786 | VersionStorageInfo::LevelSummaryStorage tmp; |
787 | auto vstorage = cfd->current()->storage_info(); | |
788 | const auto& stats = compaction_stats_; | |
789 | ||
790 | double read_write_amp = 0.0; | |
791 | double write_amp = 0.0; | |
11fdf7f2 TL |
792 | double bytes_read_per_sec = 0; |
793 | double bytes_written_per_sec = 0; | |
794 | ||
7c673cae FG |
795 | if (stats.bytes_read_non_output_levels > 0) { |
796 | read_write_amp = (stats.bytes_written + stats.bytes_read_output_level + | |
797 | stats.bytes_read_non_output_levels) / | |
798 | static_cast<double>(stats.bytes_read_non_output_levels); | |
799 | write_amp = stats.bytes_written / | |
800 | static_cast<double>(stats.bytes_read_non_output_levels); | |
801 | } | |
11fdf7f2 TL |
802 | if (stats.micros > 0) { |
803 | bytes_read_per_sec = | |
804 | (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) / | |
805 | static_cast<double>(stats.micros); | |
806 | bytes_written_per_sec = | |
807 | stats.bytes_written / static_cast<double>(stats.micros); | |
808 | } | |
809 | ||
20effc67 TL |
810 | const std::string& column_family_name = cfd->GetName(); |
811 | ||
7c673cae FG |
812 | ROCKS_LOG_BUFFER( |
813 | log_buffer_, | |
814 | "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " | |
815 | "files in(%d, %d) out(%d) " | |
816 | "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) " | |
11fdf7f2 TL |
817 | "write-amplify(%.1f) %s, records in: %" PRIu64 |
818 | ", records dropped: %" PRIu64 " output_compression: %s\n", | |
20effc67 TL |
819 | column_family_name.c_str(), vstorage->LevelSummary(&tmp), |
820 | bytes_read_per_sec, bytes_written_per_sec, | |
821 | compact_->compaction->output_level(), | |
7c673cae FG |
822 | stats.num_input_files_in_non_output_levels, |
823 | stats.num_input_files_in_output_level, stats.num_output_files, | |
824 | stats.bytes_read_non_output_levels / 1048576.0, | |
825 | stats.bytes_read_output_level / 1048576.0, | |
826 | stats.bytes_written / 1048576.0, read_write_amp, write_amp, | |
827 | status.ToString().c_str(), stats.num_input_records, | |
11fdf7f2 TL |
828 | stats.num_dropped_records, |
829 | CompressionTypeToString(compact_->compaction->output_compression()) | |
830 | .c_str()); | |
7c673cae | 831 | |
20effc67 TL |
832 | const auto& blob_files = vstorage->GetBlobFiles(); |
833 | if (!blob_files.empty()) { | |
834 | ROCKS_LOG_BUFFER(log_buffer_, | |
835 | "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 | |
836 | "\n", | |
837 | column_family_name.c_str(), blob_files.begin()->first, | |
838 | blob_files.rbegin()->first); | |
839 | } | |
840 | ||
7c673cae FG |
841 | UpdateCompactionJobStats(stats); |
842 | ||
843 | auto stream = event_logger_->LogToBuffer(log_buffer_); | |
11fdf7f2 TL |
844 | stream << "job" << job_id_ << "event" |
845 | << "compaction_finished" | |
f67539c2 TL |
846 | << "compaction_time_micros" << stats.micros |
847 | << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level" | |
848 | << compact_->compaction->output_level() << "num_output_files" | |
20effc67 TL |
849 | << compact_->num_output_files << "total_output_size" |
850 | << compact_->total_bytes; | |
7c673cae | 851 | |
20effc67 TL |
852 | if (compact_->num_blob_output_files > 0) { |
853 | stream << "num_blob_output_files" << compact_->num_blob_output_files | |
854 | << "total_blob_output_size" << compact_->total_blob_bytes; | |
7c673cae FG |
855 | } |
856 | ||
20effc67 TL |
857 | stream << "num_input_records" << stats.num_input_records |
858 | << "num_output_records" << compact_->num_output_records | |
859 | << "num_subcompactions" << compact_->sub_compact_states.size() | |
860 | << "output_compression" | |
861 | << CompressionTypeToString(compact_->compaction->output_compression()); | |
862 | ||
863 | stream << "num_single_delete_mismatches" | |
864 | << compaction_job_stats_->num_single_del_mismatch; | |
865 | stream << "num_single_delete_fallthrough" | |
866 | << compaction_job_stats_->num_single_del_fallthru; | |
867 | ||
868 | if (measure_io_stats_) { | |
7c673cae FG |
869 | stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos; |
870 | stream << "file_range_sync_nanos" | |
871 | << compaction_job_stats_->file_range_sync_nanos; | |
872 | stream << "file_fsync_nanos" << compaction_job_stats_->file_fsync_nanos; | |
873 | stream << "file_prepare_write_nanos" | |
874 | << compaction_job_stats_->file_prepare_write_nanos; | |
875 | } | |
876 | ||
877 | stream << "lsm_state"; | |
878 | stream.StartArray(); | |
879 | for (int level = 0; level < vstorage->num_levels(); ++level) { | |
880 | stream << vstorage->NumLevelFiles(level); | |
881 | } | |
882 | stream.EndArray(); | |
883 | ||
20effc67 TL |
884 | if (!blob_files.empty()) { |
885 | stream << "blob_file_head" << blob_files.begin()->first; | |
886 | stream << "blob_file_tail" << blob_files.rbegin()->first; | |
887 | } | |
888 | ||
7c673cae FG |
889 | CleanupCompaction(); |
890 | return status; | |
891 | } | |
892 | ||
893 | void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { | |
20effc67 TL |
894 | assert(sub_compact); |
895 | assert(sub_compact->compaction); | |
494da23a TL |
896 | |
897 | uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000; | |
898 | ||
7c673cae | 899 | ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); |
494da23a TL |
900 | |
901 | // Create compaction filter and fail the compaction if | |
902 | // IgnoreSnapshots() = false because it is not supported anymore | |
903 | const CompactionFilter* compaction_filter = | |
904 | cfd->ioptions()->compaction_filter; | |
905 | std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr; | |
906 | if (compaction_filter == nullptr) { | |
907 | compaction_filter_from_factory = | |
908 | sub_compact->compaction->CreateCompactionFilter(); | |
909 | compaction_filter = compaction_filter_from_factory.get(); | |
910 | } | |
911 | if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) { | |
912 | sub_compact->status = Status::NotSupported( | |
913 | "CompactionFilter::IgnoreSnapshots() = false is not supported " | |
914 | "anymore."); | |
915 | return; | |
916 | } | |
917 | ||
918 | CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(), | |
919 | existing_snapshots_); | |
20effc67 TL |
920 | ReadOptions read_options; |
921 | read_options.verify_checksums = true; | |
922 | read_options.fill_cache = false; | |
923 | // Compaction iterators shouldn't be confined to a single prefix. | |
924 | // Compactions use Seek() for | |
925 | // (a) concurrent compactions, | |
926 | // (b) CompactionFilter::Decision::kRemoveAndSkipUntil. | |
927 | read_options.total_order_seek = true; | |
494da23a TL |
928 | |
929 | // Although the v2 aggregator is what the level iterator(s) know about, | |
930 | // the AddTombstones calls will be propagated down to the v1 aggregator. | |
20effc67 TL |
931 | std::unique_ptr<InternalIterator> input( |
932 | versions_->MakeInputIterator(read_options, sub_compact->compaction, | |
933 | &range_del_agg, file_options_for_read_)); | |
7c673cae FG |
934 | |
935 | AutoThreadOperationStageUpdater stage_updater( | |
936 | ThreadStatus::STAGE_COMPACTION_PROCESS_KV); | |
937 | ||
938 | // I/O measurement variables | |
939 | PerfLevel prev_perf_level = PerfLevel::kEnableTime; | |
940 | const uint64_t kRecordStatsEvery = 1000; | |
941 | uint64_t prev_write_nanos = 0; | |
942 | uint64_t prev_fsync_nanos = 0; | |
943 | uint64_t prev_range_sync_nanos = 0; | |
944 | uint64_t prev_prepare_write_nanos = 0; | |
494da23a TL |
945 | uint64_t prev_cpu_write_nanos = 0; |
946 | uint64_t prev_cpu_read_nanos = 0; | |
7c673cae FG |
947 | if (measure_io_stats_) { |
948 | prev_perf_level = GetPerfLevel(); | |
494da23a | 949 | SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); |
7c673cae FG |
950 | prev_write_nanos = IOSTATS(write_nanos); |
951 | prev_fsync_nanos = IOSTATS(fsync_nanos); | |
952 | prev_range_sync_nanos = IOSTATS(range_sync_nanos); | |
953 | prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); | |
494da23a TL |
954 | prev_cpu_write_nanos = IOSTATS(cpu_write_nanos); |
955 | prev_cpu_read_nanos = IOSTATS(cpu_read_nanos); | |
7c673cae FG |
956 | } |
957 | ||
7c673cae FG |
958 | MergeHelper merge( |
959 | env_, cfd->user_comparator(), cfd->ioptions()->merge_operator, | |
960 | compaction_filter, db_options_.info_log.get(), | |
961 | false /* internal key corruption is expected */, | |
962 | existing_snapshots_.empty() ? 0 : existing_snapshots_.back(), | |
11fdf7f2 | 963 | snapshot_checker_, compact_->compaction->level(), |
f67539c2 | 964 | db_options_.statistics.get()); |
7c673cae | 965 | |
20effc67 TL |
966 | const MutableCFOptions* mutable_cf_options = |
967 | sub_compact->compaction->mutable_cf_options(); | |
968 | assert(mutable_cf_options); | |
969 | ||
970 | std::vector<std::string> blob_file_paths; | |
971 | ||
972 | std::unique_ptr<BlobFileBuilder> blob_file_builder( | |
973 | mutable_cf_options->enable_blob_files | |
974 | ? new BlobFileBuilder( | |
975 | versions_, env_, fs_.get(), | |
976 | sub_compact->compaction->immutable_cf_options(), | |
977 | mutable_cf_options, &file_options_, job_id_, cfd->GetID(), | |
978 | cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_, | |
979 | &blob_file_paths, &sub_compact->blob_file_additions) | |
980 | : nullptr); | |
981 | ||
7c673cae | 982 | TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); |
f67539c2 TL |
983 | TEST_SYNC_POINT_CALLBACK( |
984 | "CompactionJob::Run():PausingManualCompaction:1", | |
985 | reinterpret_cast<void*>( | |
20effc67 | 986 | const_cast<std::atomic<int>*>(manual_compaction_paused_))); |
7c673cae FG |
987 | |
988 | Slice* start = sub_compact->start; | |
989 | Slice* end = sub_compact->end; | |
990 | if (start != nullptr) { | |
991 | IterKey start_iter; | |
992 | start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek); | |
993 | input->Seek(start_iter.GetInternalKey()); | |
994 | } else { | |
995 | input->SeekToFirst(); | |
996 | } | |
997 | ||
7c673cae | 998 | Status status; |
20effc67 TL |
999 | const std::string* const full_history_ts_low = |
1000 | full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_; | |
7c673cae FG |
1001 | sub_compact->c_iter.reset(new CompactionIterator( |
1002 | input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(), | |
11fdf7f2 | 1003 | &existing_snapshots_, earliest_write_conflict_snapshot_, |
20effc67 TL |
1004 | snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), |
1005 | /*expect_valid_internal_key=*/true, &range_del_agg, | |
1006 | blob_file_builder.get(), db_options_.allow_data_in_errors, | |
1007 | sub_compact->compaction, compaction_filter, shutting_down_, | |
1008 | preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log, | |
1009 | full_history_ts_low)); | |
7c673cae FG |
1010 | auto c_iter = sub_compact->c_iter.get(); |
1011 | c_iter->SeekToFirst(); | |
11fdf7f2 | 1012 | if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) { |
7c673cae FG |
1013 | // ShouldStopBefore() maintains state based on keys processed so far. The |
1014 | // compaction loop always calls it on the "next" key, thus won't tell it the | |
1015 | // first key. So we do that here. | |
11fdf7f2 TL |
1016 | sub_compact->ShouldStopBefore(c_iter->key(), |
1017 | sub_compact->current_output_file_size); | |
7c673cae FG |
1018 | } |
1019 | const auto& c_iter_stats = c_iter->iter_stats(); | |
7c673cae | 1020 | |
20effc67 TL |
1021 | std::unique_ptr<SstPartitioner> partitioner = |
1022 | sub_compact->compaction->output_level() == 0 | |
1023 | ? nullptr | |
1024 | : sub_compact->compaction->CreateSstPartitioner(); | |
1025 | std::string last_key_for_partitioner; | |
1026 | ||
7c673cae FG |
1027 | while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) { |
1028 | // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid() | |
1029 | // returns true. | |
1030 | const Slice& key = c_iter->key(); | |
1031 | const Slice& value = c_iter->value(); | |
1032 | ||
1033 | // If an end key (exclusive) is specified, check if the current key is | |
1034 | // >= than it and exit if it is because the iterator is out of its range | |
1035 | if (end != nullptr && | |
1036 | cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) { | |
1037 | break; | |
1038 | } | |
1039 | if (c_iter_stats.num_input_records % kRecordStatsEvery == | |
1040 | kRecordStatsEvery - 1) { | |
1041 | RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); | |
1042 | c_iter->ResetRecordCounts(); | |
1043 | RecordCompactionIOStats(); | |
1044 | } | |
1045 | ||
1046 | // Open output file if necessary | |
1047 | if (sub_compact->builder == nullptr) { | |
1048 | status = OpenCompactionOutputFile(sub_compact); | |
1049 | if (!status.ok()) { | |
1050 | break; | |
1051 | } | |
1052 | } | |
20effc67 TL |
1053 | status = sub_compact->AddToBuilder(key, value); |
1054 | if (!status.ok()) { | |
1055 | break; | |
1056 | } | |
1057 | ||
1058 | sub_compact->current_output_file_size = | |
1059 | sub_compact->builder->EstimatedFileSize(); | |
f67539c2 | 1060 | const ParsedInternalKey& ikey = c_iter->ikey(); |
7c673cae | 1061 | sub_compact->current_output()->meta.UpdateBoundaries( |
f67539c2 | 1062 | key, value, ikey.sequence, ikey.type); |
7c673cae FG |
1063 | sub_compact->num_output_records++; |
1064 | ||
7c673cae FG |
1065 | // Close output file if it is big enough. Two possibilities determine it's |
1066 | // time to close it: (1) the current key should be this file's last key, (2) | |
1067 | // the next key should not be in this file. | |
1068 | // | |
1069 | // TODO(aekmekji): determine if file should be closed earlier than this | |
1070 | // during subcompactions (i.e. if output size, estimated by input size, is | |
1071 | // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB | |
1072 | // and 0.6MB instead of 1MB and 0.2MB) | |
1073 | bool output_file_ended = false; | |
7c673cae FG |
1074 | if (sub_compact->compaction->output_level() != 0 && |
1075 | sub_compact->current_output_file_size >= | |
1076 | sub_compact->compaction->max_output_file_size()) { | |
1077 | // (1) this key terminates the file. For historical reasons, the iterator | |
1078 | // status before advancing will be given to FinishCompactionOutputFile(). | |
7c673cae FG |
1079 | output_file_ended = true; |
1080 | } | |
f67539c2 TL |
1081 | TEST_SYNC_POINT_CALLBACK( |
1082 | "CompactionJob::Run():PausingManualCompaction:2", | |
1083 | reinterpret_cast<void*>( | |
20effc67 TL |
1084 | const_cast<std::atomic<int>*>(manual_compaction_paused_))); |
1085 | if (partitioner.get()) { | |
1086 | last_key_for_partitioner.assign(c_iter->user_key().data_, | |
1087 | c_iter->user_key().size_); | |
1088 | } | |
7c673cae | 1089 | c_iter->Next(); |
f67539c2 TL |
1090 | if (c_iter->status().IsManualCompactionPaused()) { |
1091 | break; | |
1092 | } | |
20effc67 TL |
1093 | if (!output_file_ended && c_iter->Valid()) { |
1094 | if (((partitioner.get() && | |
1095 | partitioner->ShouldPartition(PartitionerRequest( | |
1096 | last_key_for_partitioner, c_iter->user_key(), | |
1097 | sub_compact->current_output_file_size)) == kRequired) || | |
1098 | (sub_compact->compaction->output_level() != 0 && | |
1099 | sub_compact->ShouldStopBefore( | |
1100 | c_iter->key(), sub_compact->current_output_file_size))) && | |
1101 | sub_compact->builder != nullptr) { | |
1102 | // (2) this key belongs to the next file. For historical reasons, the | |
1103 | // iterator status after advancing will be given to | |
1104 | // FinishCompactionOutputFile(). | |
1105 | output_file_ended = true; | |
1106 | } | |
7c673cae FG |
1107 | } |
1108 | if (output_file_ended) { | |
1109 | const Slice* next_key = nullptr; | |
1110 | if (c_iter->Valid()) { | |
1111 | next_key = &c_iter->key(); | |
1112 | } | |
1113 | CompactionIterationStats range_del_out_stats; | |
20effc67 TL |
1114 | status = FinishCompactionOutputFile(input->status(), sub_compact, |
1115 | &range_del_agg, &range_del_out_stats, | |
1116 | next_key); | |
7c673cae FG |
1117 | RecordDroppedKeys(range_del_out_stats, |
1118 | &sub_compact->compaction_job_stats); | |
7c673cae FG |
1119 | } |
1120 | } | |
1121 | ||
7c673cae FG |
1122 | sub_compact->compaction_job_stats.num_input_deletion_records = |
1123 | c_iter_stats.num_input_deletion_records; | |
1124 | sub_compact->compaction_job_stats.num_corrupt_keys = | |
1125 | c_iter_stats.num_input_corrupt_records; | |
1126 | sub_compact->compaction_job_stats.num_single_del_fallthru = | |
1127 | c_iter_stats.num_single_del_fallthru; | |
1128 | sub_compact->compaction_job_stats.num_single_del_mismatch = | |
1129 | c_iter_stats.num_single_del_mismatch; | |
1130 | sub_compact->compaction_job_stats.total_input_raw_key_bytes += | |
1131 | c_iter_stats.total_input_raw_key_bytes; | |
1132 | sub_compact->compaction_job_stats.total_input_raw_value_bytes += | |
1133 | c_iter_stats.total_input_raw_value_bytes; | |
1134 | ||
1135 | RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, | |
1136 | c_iter_stats.total_filter_time); | |
1137 | RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats); | |
1138 | RecordCompactionIOStats(); | |
1139 | ||
f67539c2 TL |
1140 | if (status.ok() && cfd->IsDropped()) { |
1141 | status = | |
1142 | Status::ColumnFamilyDropped("Column family dropped during compaction"); | |
1143 | } | |
1144 | if ((status.ok() || status.IsColumnFamilyDropped()) && | |
1145 | shutting_down_->load(std::memory_order_relaxed)) { | |
1146 | status = Status::ShutdownInProgress("Database shutdown"); | |
1147 | } | |
1148 | if ((status.ok() || status.IsColumnFamilyDropped()) && | |
1149 | (manual_compaction_paused_ && | |
20effc67 | 1150 | manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) { |
f67539c2 | 1151 | status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); |
7c673cae FG |
1152 | } |
1153 | if (status.ok()) { | |
1154 | status = input->status(); | |
1155 | } | |
1156 | if (status.ok()) { | |
1157 | status = c_iter->status(); | |
1158 | } | |
1159 | ||
1160 | if (status.ok() && sub_compact->builder == nullptr && | |
494da23a | 1161 | sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) { |
7c673cae FG |
1162 | // handle subcompaction containing only range deletions |
1163 | status = OpenCompactionOutputFile(sub_compact); | |
1164 | } | |
1165 | ||
1166 | // Call FinishCompactionOutputFile() even if status is not ok: it needs to | |
1167 | // close the output file. | |
1168 | if (sub_compact->builder != nullptr) { | |
1169 | CompactionIterationStats range_del_out_stats; | |
494da23a TL |
1170 | Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg, |
1171 | &range_del_out_stats); | |
20effc67 | 1172 | if (!s.ok() && status.ok()) { |
7c673cae FG |
1173 | status = s; |
1174 | } | |
1175 | RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); | |
1176 | } | |
1177 | ||
20effc67 TL |
1178 | if (blob_file_builder) { |
1179 | if (status.ok()) { | |
1180 | status = blob_file_builder->Finish(); | |
1181 | } | |
1182 | ||
1183 | blob_file_builder.reset(); | |
1184 | } | |
1185 | ||
494da23a TL |
1186 | sub_compact->compaction_job_stats.cpu_micros = |
1187 | env_->NowCPUNanos() / 1000 - prev_cpu_micros; | |
1188 | ||
7c673cae FG |
1189 | if (measure_io_stats_) { |
1190 | sub_compact->compaction_job_stats.file_write_nanos += | |
1191 | IOSTATS(write_nanos) - prev_write_nanos; | |
1192 | sub_compact->compaction_job_stats.file_fsync_nanos += | |
1193 | IOSTATS(fsync_nanos) - prev_fsync_nanos; | |
1194 | sub_compact->compaction_job_stats.file_range_sync_nanos += | |
1195 | IOSTATS(range_sync_nanos) - prev_range_sync_nanos; | |
1196 | sub_compact->compaction_job_stats.file_prepare_write_nanos += | |
1197 | IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos; | |
494da23a TL |
1198 | sub_compact->compaction_job_stats.cpu_micros -= |
1199 | (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos + | |
1200 | IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) / | |
1201 | 1000; | |
1202 | if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) { | |
7c673cae FG |
1203 | SetPerfLevel(prev_perf_level); |
1204 | } | |
1205 | } | |
20effc67 TL |
1206 | #ifdef ROCKSDB_ASSERT_STATUS_CHECKED |
1207 | if (!status.ok()) { | |
1208 | if (sub_compact->c_iter) { | |
1209 | sub_compact->c_iter->status().PermitUncheckedError(); | |
1210 | } | |
1211 | if (input) { | |
1212 | input->status().PermitUncheckedError(); | |
1213 | } | |
1214 | } | |
1215 | #endif // ROCKSDB_ASSERT_STATUS_CHECKED | |
7c673cae FG |
1216 | |
1217 | sub_compact->c_iter.reset(); | |
1218 | input.reset(); | |
1219 | sub_compact->status = status; | |
1220 | } | |
1221 | ||
1222 | void CompactionJob::RecordDroppedKeys( | |
1223 | const CompactionIterationStats& c_iter_stats, | |
1224 | CompactionJobStats* compaction_job_stats) { | |
1225 | if (c_iter_stats.num_record_drop_user > 0) { | |
1226 | RecordTick(stats_, COMPACTION_KEY_DROP_USER, | |
1227 | c_iter_stats.num_record_drop_user); | |
1228 | } | |
1229 | if (c_iter_stats.num_record_drop_hidden > 0) { | |
1230 | RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, | |
1231 | c_iter_stats.num_record_drop_hidden); | |
1232 | if (compaction_job_stats) { | |
1233 | compaction_job_stats->num_records_replaced += | |
1234 | c_iter_stats.num_record_drop_hidden; | |
1235 | } | |
1236 | } | |
1237 | if (c_iter_stats.num_record_drop_obsolete > 0) { | |
1238 | RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, | |
1239 | c_iter_stats.num_record_drop_obsolete); | |
1240 | if (compaction_job_stats) { | |
1241 | compaction_job_stats->num_expired_deletion_records += | |
1242 | c_iter_stats.num_record_drop_obsolete; | |
1243 | } | |
1244 | } | |
1245 | if (c_iter_stats.num_record_drop_range_del > 0) { | |
1246 | RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL, | |
1247 | c_iter_stats.num_record_drop_range_del); | |
1248 | } | |
1249 | if (c_iter_stats.num_range_del_drop_obsolete > 0) { | |
1250 | RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE, | |
1251 | c_iter_stats.num_range_del_drop_obsolete); | |
1252 | } | |
11fdf7f2 TL |
1253 | if (c_iter_stats.num_optimized_del_drop_obsolete > 0) { |
1254 | RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE, | |
1255 | c_iter_stats.num_optimized_del_drop_obsolete); | |
1256 | } | |
7c673cae FG |
1257 | } |
1258 | ||
1259 | Status CompactionJob::FinishCompactionOutputFile( | |
1260 | const Status& input_status, SubcompactionState* sub_compact, | |
494da23a | 1261 | CompactionRangeDelAggregator* range_del_agg, |
7c673cae FG |
1262 | CompactionIterationStats* range_del_out_stats, |
1263 | const Slice* next_table_min_key /* = nullptr */) { | |
1264 | AutoThreadOperationStageUpdater stage_updater( | |
1265 | ThreadStatus::STAGE_COMPACTION_SYNC_FILE); | |
1266 | assert(sub_compact != nullptr); | |
1267 | assert(sub_compact->outfile); | |
1268 | assert(sub_compact->builder != nullptr); | |
1269 | assert(sub_compact->current_output() != nullptr); | |
1270 | ||
1271 | uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber(); | |
1272 | assert(output_number != 0); | |
1273 | ||
11fdf7f2 TL |
1274 | ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); |
1275 | const Comparator* ucmp = cfd->user_comparator(); | |
20effc67 TL |
1276 | std::string file_checksum = kUnknownFileChecksum; |
1277 | std::string file_checksum_func_name = kUnknownFileChecksumFuncName; | |
11fdf7f2 | 1278 | |
7c673cae FG |
1279 | // Check for iterator errors |
1280 | Status s = input_status; | |
1281 | auto meta = &sub_compact->current_output()->meta; | |
11fdf7f2 | 1282 | assert(meta != nullptr); |
7c673cae FG |
1283 | if (s.ok()) { |
1284 | Slice lower_bound_guard, upper_bound_guard; | |
11fdf7f2 | 1285 | std::string smallest_user_key; |
7c673cae | 1286 | const Slice *lower_bound, *upper_bound; |
494da23a | 1287 | bool lower_bound_from_sub_compact = false; |
7c673cae FG |
1288 | if (sub_compact->outputs.size() == 1) { |
1289 | // For the first output table, include range tombstones before the min key | |
1290 | // but after the subcompaction boundary. | |
1291 | lower_bound = sub_compact->start; | |
494da23a | 1292 | lower_bound_from_sub_compact = true; |
7c673cae FG |
1293 | } else if (meta->smallest.size() > 0) { |
1294 | // For subsequent output tables, only include range tombstones from min | |
1295 | // key onwards since the previous file was extended to contain range | |
1296 | // tombstones falling before min key. | |
11fdf7f2 TL |
1297 | smallest_user_key = meta->smallest.user_key().ToString(false /*hex*/); |
1298 | lower_bound_guard = Slice(smallest_user_key); | |
7c673cae FG |
1299 | lower_bound = &lower_bound_guard; |
1300 | } else { | |
1301 | lower_bound = nullptr; | |
1302 | } | |
1303 | if (next_table_min_key != nullptr) { | |
494da23a TL |
1304 | // This may be the last file in the subcompaction in some cases, so we |
1305 | // need to compare the end key of subcompaction with the next file start | |
1306 | // key. When the end key is chosen by the subcompaction, we know that | |
1307 | // it must be the biggest key in output file. Therefore, it is safe to | |
1308 | // use the smaller key as the upper bound of the output file, to ensure | |
1309 | // that there is no overlapping between different output files. | |
7c673cae | 1310 | upper_bound_guard = ExtractUserKey(*next_table_min_key); |
494da23a TL |
1311 | if (sub_compact->end != nullptr && |
1312 | ucmp->Compare(upper_bound_guard, *sub_compact->end) >= 0) { | |
1313 | upper_bound = sub_compact->end; | |
1314 | } else { | |
1315 | upper_bound = &upper_bound_guard; | |
1316 | } | |
7c673cae FG |
1317 | } else { |
1318 | // This is the last file in the subcompaction, so extend until the | |
1319 | // subcompaction ends. | |
1320 | upper_bound = sub_compact->end; | |
1321 | } | |
11fdf7f2 TL |
1322 | auto earliest_snapshot = kMaxSequenceNumber; |
1323 | if (existing_snapshots_.size() > 0) { | |
1324 | earliest_snapshot = existing_snapshots_[0]; | |
1325 | } | |
494da23a TL |
1326 | bool has_overlapping_endpoints; |
1327 | if (upper_bound != nullptr && meta->largest.size() > 0) { | |
1328 | has_overlapping_endpoints = | |
1329 | ucmp->Compare(meta->largest.user_key(), *upper_bound) == 0; | |
1330 | } else { | |
1331 | has_overlapping_endpoints = false; | |
1332 | } | |
1333 | ||
1334 | // The end key of the subcompaction must be bigger or equal to the upper | |
1335 | // bound. If the end of subcompaction is null or the upper bound is null, | |
1336 | // it means that this file is the last file in the compaction. So there | |
1337 | // will be no overlapping between this file and others. | |
1338 | assert(sub_compact->end == nullptr || | |
1339 | upper_bound == nullptr || | |
1340 | ucmp->Compare(*upper_bound , *sub_compact->end) <= 0); | |
1341 | auto it = range_del_agg->NewIterator(lower_bound, upper_bound, | |
1342 | has_overlapping_endpoints); | |
1343 | // Position the range tombstone output iterator. There may be tombstone | |
1344 | // fragments that are entirely out of range, so make sure that we do not | |
1345 | // include those. | |
11fdf7f2 TL |
1346 | if (lower_bound != nullptr) { |
1347 | it->Seek(*lower_bound); | |
494da23a TL |
1348 | } else { |
1349 | it->SeekToFirst(); | |
11fdf7f2 | 1350 | } |
20effc67 | 1351 | TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1"); |
11fdf7f2 TL |
1352 | for (; it->Valid(); it->Next()) { |
1353 | auto tombstone = it->Tombstone(); | |
494da23a TL |
1354 | if (upper_bound != nullptr) { |
1355 | int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_); | |
1356 | if ((has_overlapping_endpoints && cmp < 0) || | |
1357 | (!has_overlapping_endpoints && cmp <= 0)) { | |
1358 | // Tombstones starting after upper_bound only need to be included in | |
1359 | // the next table. If the current SST ends before upper_bound, i.e., | |
1360 | // `has_overlapping_endpoints == false`, we can also skip over range | |
1361 | // tombstones that start exactly at upper_bound. Such range tombstones | |
1362 | // will be included in the next file and are not relevant to the point | |
1363 | // keys or endpoints of the current file. | |
1364 | break; | |
1365 | } | |
11fdf7f2 TL |
1366 | } |
1367 | ||
1368 | if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) { | |
1369 | // TODO(andrewkr): tombstones that span multiple output files are | |
1370 | // counted for each compaction output file, so lots of double counting. | |
1371 | range_del_out_stats->num_range_del_drop_obsolete++; | |
1372 | range_del_out_stats->num_record_drop_obsolete++; | |
1373 | continue; | |
1374 | } | |
1375 | ||
1376 | auto kv = tombstone.Serialize(); | |
494da23a TL |
1377 | assert(lower_bound == nullptr || |
1378 | ucmp->Compare(*lower_bound, kv.second) < 0); | |
20effc67 | 1379 | // Range tombstone is not supported by output validator yet. |
11fdf7f2 TL |
1380 | sub_compact->builder->Add(kv.first.Encode(), kv.second); |
1381 | InternalKey smallest_candidate = std::move(kv.first); | |
1382 | if (lower_bound != nullptr && | |
1383 | ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) { | |
1384 | // Pretend the smallest key has the same user key as lower_bound | |
1385 | // (the max key in the previous table or subcompaction) in order for | |
1386 | // files to appear key-space partitioned. | |
1387 | // | |
494da23a TL |
1388 | // When lower_bound is chosen by a subcompaction, we know that |
1389 | // subcompactions over smaller keys cannot contain any keys at | |
1390 | // lower_bound. We also know that smaller subcompactions exist, because | |
1391 | // otherwise the subcompaction woud be unbounded on the left. As a | |
1392 | // result, we know that no other files on the output level will contain | |
1393 | // actual keys at lower_bound (an output file may have a largest key of | |
1394 | // lower_bound@kMaxSequenceNumber, but this only indicates a large range | |
1395 | // tombstone was truncated). Therefore, it is safe to use the | |
1396 | // tombstone's sequence number, to ensure that keys at lower_bound at | |
1397 | // lower levels are covered by truncated tombstones. | |
1398 | // | |
1399 | // If lower_bound was chosen by the smallest data key in the file, | |
1400 | // choose lowest seqnum so this file's smallest internal key comes after | |
1401 | // the previous file's largest. The fake seqnum is OK because the read | |
1402 | // path's file-picking code only considers user key. | |
1403 | smallest_candidate = InternalKey( | |
1404 | *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0, | |
1405 | kTypeRangeDeletion); | |
11fdf7f2 TL |
1406 | } |
1407 | InternalKey largest_candidate = tombstone.SerializeEndKey(); | |
1408 | if (upper_bound != nullptr && | |
1409 | ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) { | |
1410 | // Pretend the largest key has the same user key as upper_bound (the | |
1411 | // min key in the following table or subcompaction) in order for files | |
1412 | // to appear key-space partitioned. | |
1413 | // | |
1414 | // Choose highest seqnum so this file's largest internal key comes | |
1415 | // before the next file's/subcompaction's smallest. The fake seqnum is | |
1416 | // OK because the read path's file-picking code only considers the user | |
1417 | // key portion. | |
1418 | // | |
1419 | // Note Seek() also creates InternalKey with (user_key, | |
1420 | // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of | |
1421 | // kTypeRangeDeletion (0xF), so the range tombstone comes before the | |
1422 | // Seek() key in InternalKey's ordering. So Seek() will look in the | |
1423 | // next file for the user key. | |
1424 | largest_candidate = | |
1425 | InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion); | |
1426 | } | |
494da23a TL |
1427 | #ifndef NDEBUG |
1428 | SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber; | |
1429 | if (meta->smallest.size() > 0) { | |
1430 | smallest_ikey_seqnum = GetInternalKeySeqno(meta->smallest.Encode()); | |
1431 | } | |
1432 | #endif | |
11fdf7f2 TL |
1433 | meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate, |
1434 | tombstone.seq_, | |
1435 | cfd->internal_comparator()); | |
494da23a TL |
1436 | |
1437 | // The smallest key in a file is used for range tombstone truncation, so | |
1438 | // it cannot have a seqnum of 0 (unless the smallest data key in a file | |
1439 | // has a seqnum of 0). Otherwise, the truncated tombstone may expose | |
1440 | // deleted keys at lower levels. | |
1441 | assert(smallest_ikey_seqnum == 0 || | |
1442 | ExtractInternalKeyFooter(meta->smallest.Encode()) != | |
1443 | PackSequenceAndType(0, kTypeRangeDeletion)); | |
11fdf7f2 | 1444 | } |
7c673cae FG |
1445 | } |
1446 | const uint64_t current_entries = sub_compact->builder->NumEntries(); | |
7c673cae FG |
1447 | if (s.ok()) { |
1448 | s = sub_compact->builder->Finish(); | |
1449 | } else { | |
1450 | sub_compact->builder->Abandon(); | |
1451 | } | |
20effc67 TL |
1452 | IOStatus io_s = sub_compact->builder->io_status(); |
1453 | if (s.ok()) { | |
1454 | s = io_s; | |
1455 | } | |
7c673cae | 1456 | const uint64_t current_bytes = sub_compact->builder->FileSize(); |
11fdf7f2 TL |
1457 | if (s.ok()) { |
1458 | meta->fd.file_size = current_bytes; | |
20effc67 | 1459 | meta->marked_for_compaction = sub_compact->builder->NeedCompact(); |
11fdf7f2 | 1460 | } |
7c673cae FG |
1461 | sub_compact->current_output()->finished = true; |
1462 | sub_compact->total_bytes += current_bytes; | |
1463 | ||
1464 | // Finish and check for file errors | |
1465 | if (s.ok()) { | |
1466 | StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS); | |
20effc67 TL |
1467 | io_s = sub_compact->outfile->Sync(db_options_.use_fsync); |
1468 | } | |
1469 | if (s.ok() && io_s.ok()) { | |
1470 | io_s = sub_compact->outfile->Close(); | |
1471 | } | |
1472 | if (s.ok() && io_s.ok()) { | |
1473 | // Add the checksum information to file metadata. | |
1474 | meta->file_checksum = sub_compact->outfile->GetFileChecksum(); | |
1475 | meta->file_checksum_func_name = | |
1476 | sub_compact->outfile->GetFileChecksumFuncName(); | |
1477 | file_checksum = meta->file_checksum; | |
1478 | file_checksum_func_name = meta->file_checksum_func_name; | |
7c673cae FG |
1479 | } |
1480 | if (s.ok()) { | |
20effc67 TL |
1481 | s = io_s; |
1482 | } | |
1483 | if (sub_compact->io_status.ok()) { | |
1484 | sub_compact->io_status = io_s; | |
1485 | // Since this error is really a copy of the | |
1486 | // "normal" status, it does not also need to be checked | |
1487 | sub_compact->io_status.PermitUncheckedError(); | |
7c673cae FG |
1488 | } |
1489 | sub_compact->outfile.reset(); | |
1490 | ||
7c673cae | 1491 | TableProperties tp; |
11fdf7f2 TL |
1492 | if (s.ok()) { |
1493 | tp = sub_compact->builder->GetTableProperties(); | |
1494 | } | |
7c673cae | 1495 | |
11fdf7f2 TL |
1496 | if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) { |
1497 | // If there is nothing to output, no necessary to generate a sst file. | |
1498 | // This happens when the output level is bottom level, at the same time | |
1499 | // the sub_compact output nothing. | |
1500 | std::string fname = | |
1501 | TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths, | |
1502 | meta->fd.GetNumber(), meta->fd.GetPathId()); | |
1503 | env_->DeleteFile(fname); | |
1504 | ||
1505 | // Also need to remove the file from outputs, or it will be added to the | |
1506 | // VersionEdit. | |
1507 | assert(!sub_compact->outputs.empty()); | |
1508 | sub_compact->outputs.pop_back(); | |
1509 | meta = nullptr; | |
1510 | } | |
7c673cae | 1511 | |
11fdf7f2 | 1512 | if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) { |
7c673cae | 1513 | // Output to event logger and fire events. |
11fdf7f2 TL |
1514 | sub_compact->current_output()->table_properties = |
1515 | std::make_shared<TableProperties>(tp); | |
1516 | ROCKS_LOG_INFO(db_options_.info_log, | |
1517 | "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 | |
1518 | " keys, %" PRIu64 " bytes%s", | |
1519 | cfd->GetName().c_str(), job_id_, output_number, | |
1520 | current_entries, current_bytes, | |
1521 | meta->marked_for_compaction ? " (need compaction)" : ""); | |
1522 | } | |
1523 | std::string fname; | |
1524 | FileDescriptor output_fd; | |
f67539c2 | 1525 | uint64_t oldest_blob_file_number = kInvalidBlobFileNumber; |
11fdf7f2 TL |
1526 | if (meta != nullptr) { |
1527 | fname = | |
1528 | TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths, | |
1529 | meta->fd.GetNumber(), meta->fd.GetPathId()); | |
1530 | output_fd = meta->fd; | |
f67539c2 | 1531 | oldest_blob_file_number = meta->oldest_blob_file_number; |
11fdf7f2 TL |
1532 | } else { |
1533 | fname = "(nil)"; | |
7c673cae | 1534 | } |
7c673cae FG |
1535 | EventHelpers::LogAndNotifyTableFileCreationFinished( |
1536 | event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, | |
f67539c2 | 1537 | job_id_, output_fd, oldest_blob_file_number, tp, |
20effc67 TL |
1538 | TableFileCreationReason::kCompaction, s, file_checksum, |
1539 | file_checksum_func_name); | |
7c673cae FG |
1540 | |
1541 | #ifndef ROCKSDB_LITE | |
1542 | // Report new file to SstFileManagerImpl | |
1543 | auto sfm = | |
1544 | static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get()); | |
11fdf7f2 | 1545 | if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) { |
20effc67 TL |
1546 | Status add_s = sfm->OnAddFile(fname); |
1547 | if (!add_s.ok() && s.ok()) { | |
1548 | s = add_s; | |
1549 | } | |
7c673cae | 1550 | if (sfm->IsMaxAllowedSpaceReached()) { |
11fdf7f2 TL |
1551 | // TODO(ajkr): should we return OK() if max space was reached by the final |
1552 | // compaction output file (similarly to how flush works when full)? | |
1553 | s = Status::SpaceLimit("Max allowed space was reached"); | |
1554 | TEST_SYNC_POINT( | |
1555 | "CompactionJob::FinishCompactionOutputFile:" | |
1556 | "MaxAllowedSpaceReached"); | |
7c673cae | 1557 | InstrumentedMutexLock l(db_mutex_); |
20effc67 TL |
1558 | // Should handle return error? |
1559 | db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction) | |
1560 | .PermitUncheckedError(); | |
7c673cae FG |
1561 | } |
1562 | } | |
1563 | #endif | |
1564 | ||
1565 | sub_compact->builder.reset(); | |
1566 | sub_compact->current_output_file_size = 0; | |
1567 | return s; | |
1568 | } | |
1569 | ||
1570 | Status CompactionJob::InstallCompactionResults( | |
1571 | const MutableCFOptions& mutable_cf_options) { | |
20effc67 TL |
1572 | assert(compact_); |
1573 | ||
7c673cae FG |
1574 | db_mutex_->AssertHeld(); |
1575 | ||
1576 | auto* compaction = compact_->compaction; | |
20effc67 TL |
1577 | assert(compaction); |
1578 | ||
7c673cae FG |
1579 | // paranoia: verify that the files that we started with |
1580 | // still exist in the current version and in the same original level. | |
1581 | // This ensures that a concurrent compaction did not erroneously | |
1582 | // pick the same files to compact_. | |
1583 | if (!versions_->VerifyCompactionFileConsistency(compaction)) { | |
1584 | Compaction::InputLevelSummaryBuffer inputs_summary; | |
1585 | ||
1586 | ROCKS_LOG_ERROR(db_options_.info_log, "[%s] [JOB %d] Compaction %s aborted", | |
1587 | compaction->column_family_data()->GetName().c_str(), | |
1588 | job_id_, compaction->InputLevelSummary(&inputs_summary)); | |
1589 | return Status::Corruption("Compaction input files inconsistent"); | |
1590 | } | |
1591 | ||
1592 | { | |
1593 | Compaction::InputLevelSummaryBuffer inputs_summary; | |
20effc67 TL |
1594 | ROCKS_LOG_INFO(db_options_.info_log, |
1595 | "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes", | |
1596 | compaction->column_family_data()->GetName().c_str(), job_id_, | |
1597 | compaction->InputLevelSummary(&inputs_summary), | |
1598 | compact_->total_bytes + compact_->total_blob_bytes); | |
7c673cae FG |
1599 | } |
1600 | ||
20effc67 TL |
1601 | VersionEdit* const edit = compaction->edit(); |
1602 | assert(edit); | |
1603 | ||
11fdf7f2 | 1604 | // Add compaction inputs |
20effc67 | 1605 | compaction->AddInputDeletions(edit); |
7c673cae FG |
1606 | |
1607 | for (const auto& sub_compact : compact_->sub_compact_states) { | |
1608 | for (const auto& out : sub_compact.outputs) { | |
20effc67 TL |
1609 | edit->AddFile(compaction->output_level(), out.meta); |
1610 | } | |
1611 | ||
1612 | for (const auto& blob : sub_compact.blob_file_additions) { | |
1613 | edit->AddBlobFile(blob); | |
7c673cae FG |
1614 | } |
1615 | } | |
20effc67 | 1616 | |
7c673cae | 1617 | return versions_->LogAndApply(compaction->column_family_data(), |
20effc67 TL |
1618 | mutable_cf_options, edit, db_mutex_, |
1619 | db_directory_); | |
7c673cae FG |
1620 | } |
1621 | ||
1622 | void CompactionJob::RecordCompactionIOStats() { | |
1623 | RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read)); | |
20effc67 TL |
1624 | RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written)); |
1625 | CompactionReason compaction_reason = | |
1626 | compact_->compaction->compaction_reason(); | |
1627 | if (compaction_reason == CompactionReason::kFilesMarkedForCompaction) { | |
1628 | RecordTick(stats_, COMPACT_READ_BYTES_MARKED, IOSTATS(bytes_read)); | |
1629 | RecordTick(stats_, COMPACT_WRITE_BYTES_MARKED, IOSTATS(bytes_written)); | |
1630 | } else if (compaction_reason == CompactionReason::kPeriodicCompaction) { | |
1631 | RecordTick(stats_, COMPACT_READ_BYTES_PERIODIC, IOSTATS(bytes_read)); | |
1632 | RecordTick(stats_, COMPACT_WRITE_BYTES_PERIODIC, IOSTATS(bytes_written)); | |
1633 | } else if (compaction_reason == CompactionReason::kTtl) { | |
1634 | RecordTick(stats_, COMPACT_READ_BYTES_TTL, IOSTATS(bytes_read)); | |
1635 | RecordTick(stats_, COMPACT_WRITE_BYTES_TTL, IOSTATS(bytes_written)); | |
1636 | } | |
7c673cae FG |
1637 | ThreadStatusUtil::IncreaseThreadOperationProperty( |
1638 | ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read)); | |
1639 | IOSTATS_RESET(bytes_read); | |
7c673cae FG |
1640 | ThreadStatusUtil::IncreaseThreadOperationProperty( |
1641 | ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written)); | |
1642 | IOSTATS_RESET(bytes_written); | |
1643 | } | |
1644 | ||
1645 | Status CompactionJob::OpenCompactionOutputFile( | |
1646 | SubcompactionState* sub_compact) { | |
1647 | assert(sub_compact != nullptr); | |
1648 | assert(sub_compact->builder == nullptr); | |
1649 | // no need to lock because VersionSet::next_file_number_ is atomic | |
1650 | uint64_t file_number = versions_->NewFileNumber(); | |
11fdf7f2 TL |
1651 | std::string fname = |
1652 | TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths, | |
1653 | file_number, sub_compact->compaction->output_path_id()); | |
7c673cae FG |
1654 | // Fire events. |
1655 | ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); | |
1656 | #ifndef ROCKSDB_LITE | |
1657 | EventHelpers::NotifyTableFileCreationStarted( | |
1658 | cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, job_id_, | |
1659 | TableFileCreationReason::kCompaction); | |
1660 | #endif // !ROCKSDB_LITE | |
1661 | // Make the output file | |
f67539c2 | 1662 | std::unique_ptr<FSWritableFile> writable_file; |
11fdf7f2 | 1663 | #ifndef NDEBUG |
f67539c2 | 1664 | bool syncpoint_arg = file_options_.use_direct_writes; |
7c673cae | 1665 | TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile", |
11fdf7f2 TL |
1666 | &syncpoint_arg); |
1667 | #endif | |
20effc67 TL |
1668 | Status s; |
1669 | IOStatus io_s = | |
1670 | NewWritableFile(fs_.get(), fname, &writable_file, file_options_); | |
1671 | s = io_s; | |
1672 | if (sub_compact->io_status.ok()) { | |
1673 | sub_compact->io_status = io_s; | |
1674 | // Since this error is really a copy of the io_s that is checked below as s, | |
1675 | // it does not also need to be checked. | |
1676 | sub_compact->io_status.PermitUncheckedError(); | |
1677 | } | |
7c673cae FG |
1678 | if (!s.ok()) { |
1679 | ROCKS_LOG_ERROR( | |
1680 | db_options_.info_log, | |
1681 | "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64 | |
1682 | " fails at NewWritableFile with status %s", | |
1683 | sub_compact->compaction->column_family_data()->GetName().c_str(), | |
1684 | job_id_, file_number, s.ToString().c_str()); | |
1685 | LogFlush(db_options_.info_log); | |
1686 | EventHelpers::LogAndNotifyTableFileCreationFinished( | |
1687 | event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), | |
f67539c2 | 1688 | fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber, |
20effc67 TL |
1689 | TableProperties(), TableFileCreationReason::kCompaction, s, |
1690 | kUnknownFileChecksum, kUnknownFileChecksumFuncName); | |
7c673cae FG |
1691 | return s; |
1692 | } | |
1693 | ||
f67539c2 TL |
1694 | // Try to figure out the output file's oldest ancester time. |
1695 | int64_t temp_current_time = 0; | |
1696 | auto get_time_status = env_->GetCurrentTime(&temp_current_time); | |
1697 | // Safe to proceed even if GetCurrentTime fails. So, log and proceed. | |
1698 | if (!get_time_status.ok()) { | |
1699 | ROCKS_LOG_WARN(db_options_.info_log, | |
1700 | "Failed to get current time. Status: %s", | |
1701 | get_time_status.ToString().c_str()); | |
1702 | } | |
1703 | uint64_t current_time = static_cast<uint64_t>(temp_current_time); | |
1704 | uint64_t oldest_ancester_time = | |
1705 | sub_compact->compaction->MinInputFileOldestAncesterTime(); | |
1706 | if (oldest_ancester_time == port::kMaxUint64) { | |
1707 | oldest_ancester_time = current_time; | |
1708 | } | |
7c673cae | 1709 | |
f67539c2 TL |
1710 | // Initialize a SubcompactionState::Output and add it to sub_compact->outputs |
1711 | { | |
20effc67 TL |
1712 | FileMetaData meta; |
1713 | meta.fd = FileDescriptor(file_number, | |
1714 | sub_compact->compaction->output_path_id(), 0); | |
1715 | meta.oldest_ancester_time = oldest_ancester_time; | |
1716 | meta.file_creation_time = current_time; | |
1717 | sub_compact->outputs.emplace_back( | |
1718 | std::move(meta), cfd->internal_comparator(), | |
1719 | /*enable_order_check=*/ | |
1720 | sub_compact->compaction->mutable_cf_options() | |
1721 | ->check_flush_compaction_key_order, | |
1722 | /*enable_hash=*/paranoid_file_checks_); | |
f67539c2 TL |
1723 | } |
1724 | ||
1725 | writable_file->SetIOPriority(Env::IOPriority::IO_LOW); | |
11fdf7f2 | 1726 | writable_file->SetWriteLifeTimeHint(write_hint_); |
7c673cae FG |
1727 | writable_file->SetPreallocationBlockSize(static_cast<size_t>( |
1728 | sub_compact->compaction->OutputFilePreallocationSize())); | |
494da23a TL |
1729 | const auto& listeners = |
1730 | sub_compact->compaction->immutable_cf_options()->listeners; | |
20effc67 TL |
1731 | sub_compact->outfile.reset(new WritableFileWriter( |
1732 | std::move(writable_file), fname, file_options_, env_, io_tracer_, | |
1733 | db_options_.statistics.get(), listeners, | |
1734 | db_options_.file_checksum_gen_factory.get())); | |
7c673cae FG |
1735 | |
1736 | // If the Column family flag is to only optimize filters for hits, | |
1737 | // we can skip creating filters if this is the bottommost_level where | |
1738 | // data is going to be found | |
1739 | bool skip_filters = | |
1740 | cfd->ioptions()->optimize_filters_for_hits && bottommost_level_; | |
11fdf7f2 | 1741 | |
7c673cae | 1742 | sub_compact->builder.reset(NewTableBuilder( |
11fdf7f2 TL |
1743 | *cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()), |
1744 | cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), | |
1745 | cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(), | |
1746 | sub_compact->compaction->output_compression(), | |
494da23a | 1747 | 0 /*sample_for_compression */, |
11fdf7f2 | 1748 | sub_compact->compaction->output_compression_opts(), |
494da23a | 1749 | sub_compact->compaction->output_level(), skip_filters, |
f67539c2 | 1750 | oldest_ancester_time, 0 /* oldest_key_time */, |
20effc67 TL |
1751 | sub_compact->compaction->max_output_file_size(), current_time, db_id_, |
1752 | db_session_id_)); | |
7c673cae FG |
1753 | LogFlush(db_options_.info_log); |
1754 | return s; | |
1755 | } | |
1756 | ||
1757 | void CompactionJob::CleanupCompaction() { | |
1758 | for (SubcompactionState& sub_compact : compact_->sub_compact_states) { | |
1759 | const auto& sub_status = sub_compact.status; | |
1760 | ||
1761 | if (sub_compact.builder != nullptr) { | |
1762 | // May happen if we get a shutdown call in the middle of compaction | |
1763 | sub_compact.builder->Abandon(); | |
1764 | sub_compact.builder.reset(); | |
1765 | } else { | |
1766 | assert(!sub_status.ok() || sub_compact.outfile == nullptr); | |
1767 | } | |
1768 | for (const auto& out : sub_compact.outputs) { | |
1769 | // If this file was inserted into the table cache then remove | |
1770 | // them here because this compaction was not committed. | |
1771 | if (!sub_status.ok()) { | |
1772 | TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber()); | |
1773 | } | |
1774 | } | |
20effc67 TL |
1775 | // TODO: sub_compact.io_status is not checked like status. Not sure if thats |
1776 | // intentional. So ignoring the io_status as of now. | |
1777 | sub_compact.io_status.PermitUncheckedError(); | |
7c673cae FG |
1778 | } |
1779 | delete compact_; | |
1780 | compact_ = nullptr; | |
1781 | } | |
1782 | ||
1783 | #ifndef ROCKSDB_LITE | |
1784 | namespace { | |
11fdf7f2 | 1785 | void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) { |
7c673cae FG |
1786 | assert(prefix_length > 0); |
1787 | size_t length = src.size() > prefix_length ? prefix_length : src.size(); | |
1788 | dst->assign(src.data(), length); | |
1789 | } | |
1790 | } // namespace | |
1791 | ||
1792 | #endif // !ROCKSDB_LITE | |
1793 | ||
1794 | void CompactionJob::UpdateCompactionStats() { | |
20effc67 TL |
1795 | assert(compact_); |
1796 | ||
7c673cae FG |
1797 | Compaction* compaction = compact_->compaction; |
1798 | compaction_stats_.num_input_files_in_non_output_levels = 0; | |
1799 | compaction_stats_.num_input_files_in_output_level = 0; | |
1800 | for (int input_level = 0; | |
1801 | input_level < static_cast<int>(compaction->num_input_levels()); | |
1802 | ++input_level) { | |
1803 | if (compaction->level(input_level) != compaction->output_level()) { | |
1804 | UpdateCompactionInputStatsHelper( | |
1805 | &compaction_stats_.num_input_files_in_non_output_levels, | |
11fdf7f2 | 1806 | &compaction_stats_.bytes_read_non_output_levels, input_level); |
7c673cae FG |
1807 | } else { |
1808 | UpdateCompactionInputStatsHelper( | |
1809 | &compaction_stats_.num_input_files_in_output_level, | |
11fdf7f2 | 1810 | &compaction_stats_.bytes_read_output_level, input_level); |
7c673cae FG |
1811 | } |
1812 | } | |
1813 | ||
20effc67 TL |
1814 | compaction_stats_.num_output_files = |
1815 | static_cast<int>(compact_->num_output_files) + | |
1816 | static_cast<int>(compact_->num_blob_output_files); | |
1817 | compaction_stats_.bytes_written = | |
1818 | compact_->total_bytes + compact_->total_blob_bytes; | |
f67539c2 | 1819 | |
20effc67 | 1820 | if (compaction_stats_.num_input_records > compact_->num_output_records) { |
f67539c2 | 1821 | compaction_stats_.num_dropped_records = |
20effc67 | 1822 | compaction_stats_.num_input_records - compact_->num_output_records; |
7c673cae FG |
1823 | } |
1824 | } | |
1825 | ||
11fdf7f2 TL |
1826 | void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files, |
1827 | uint64_t* bytes_read, | |
1828 | int input_level) { | |
7c673cae FG |
1829 | const Compaction* compaction = compact_->compaction; |
1830 | auto num_input_files = compaction->num_input_files(input_level); | |
1831 | *num_files += static_cast<int>(num_input_files); | |
1832 | ||
1833 | for (size_t i = 0; i < num_input_files; ++i) { | |
1834 | const auto* file_meta = compaction->input(input_level, i); | |
1835 | *bytes_read += file_meta->fd.GetFileSize(); | |
1836 | compaction_stats_.num_input_records += | |
1837 | static_cast<uint64_t>(file_meta->num_entries); | |
1838 | } | |
1839 | } | |
1840 | ||
1841 | void CompactionJob::UpdateCompactionJobStats( | |
1842 | const InternalStats::CompactionStats& stats) const { | |
1843 | #ifndef ROCKSDB_LITE | |
20effc67 TL |
1844 | compaction_job_stats_->elapsed_micros = stats.micros; |
1845 | ||
1846 | // input information | |
1847 | compaction_job_stats_->total_input_bytes = | |
1848 | stats.bytes_read_non_output_levels + stats.bytes_read_output_level; | |
1849 | compaction_job_stats_->num_input_records = stats.num_input_records; | |
1850 | compaction_job_stats_->num_input_files = | |
1851 | stats.num_input_files_in_non_output_levels + | |
1852 | stats.num_input_files_in_output_level; | |
1853 | compaction_job_stats_->num_input_files_at_output_level = | |
1854 | stats.num_input_files_in_output_level; | |
1855 | ||
1856 | // output information | |
1857 | compaction_job_stats_->total_output_bytes = stats.bytes_written; | |
1858 | compaction_job_stats_->num_output_records = compact_->num_output_records; | |
1859 | compaction_job_stats_->num_output_files = stats.num_output_files; | |
1860 | ||
1861 | if (stats.num_output_files > 0) { | |
1862 | CopyPrefix(compact_->SmallestUserKey(), | |
1863 | CompactionJobStats::kMaxPrefixLength, | |
1864 | &compaction_job_stats_->smallest_output_key_prefix); | |
1865 | CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength, | |
1866 | &compaction_job_stats_->largest_output_key_prefix); | |
7c673cae | 1867 | } |
11fdf7f2 TL |
1868 | #else |
1869 | (void)stats; | |
7c673cae FG |
1870 | #endif // !ROCKSDB_LITE |
1871 | } | |
1872 | ||
1873 | void CompactionJob::LogCompaction() { | |
1874 | Compaction* compaction = compact_->compaction; | |
1875 | ColumnFamilyData* cfd = compaction->column_family_data(); | |
1876 | ||
1877 | // Let's check if anything will get logged. Don't prepare all the info if | |
1878 | // we're not logging | |
1879 | if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) { | |
1880 | Compaction::InputLevelSummaryBuffer inputs_summary; | |
1881 | ROCKS_LOG_INFO( | |
1882 | db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f", | |
1883 | cfd->GetName().c_str(), job_id_, | |
1884 | compaction->InputLevelSummary(&inputs_summary), compaction->score()); | |
1885 | char scratch[2345]; | |
1886 | compaction->Summary(scratch, sizeof(scratch)); | |
1887 | ROCKS_LOG_INFO(db_options_.info_log, "[%s] Compaction start summary: %s\n", | |
1888 | cfd->GetName().c_str(), scratch); | |
1889 | // build event logger report | |
1890 | auto stream = event_logger_->Log(); | |
1891 | stream << "job" << job_id_ << "event" | |
11fdf7f2 TL |
1892 | << "compaction_started" |
1893 | << "compaction_reason" | |
1894 | << GetCompactionReasonString(compaction->compaction_reason()); | |
7c673cae FG |
1895 | for (size_t i = 0; i < compaction->num_input_levels(); ++i) { |
1896 | stream << ("files_L" + ToString(compaction->level(i))); | |
1897 | stream.StartArray(); | |
1898 | for (auto f : *compaction->inputs(i)) { | |
1899 | stream << f->fd.GetNumber(); | |
1900 | } | |
1901 | stream.EndArray(); | |
1902 | } | |
1903 | stream << "score" << compaction->score() << "input_data_size" | |
1904 | << compaction->CalculateTotalInputSize(); | |
1905 | } | |
1906 | } | |
1907 | ||
f67539c2 | 1908 | } // namespace ROCKSDB_NAMESPACE |