]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction/compaction_job.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_job.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include <algorithm>
11 #include <cinttypes>
12 #include <functional>
13 #include <list>
14 #include <memory>
15 #include <random>
16 #include <set>
17 #include <thread>
18 #include <utility>
19 #include <vector>
20
21 #include "db/builder.h"
22 #include "db/compaction/compaction_job.h"
23 #include "db/db_impl/db_impl.h"
24 #include "db/db_iter.h"
25 #include "db/dbformat.h"
26 #include "db/error_handler.h"
27 #include "db/event_helpers.h"
28 #include "db/log_reader.h"
29 #include "db/log_writer.h"
30 #include "db/memtable.h"
31 #include "db/memtable_list.h"
32 #include "db/merge_context.h"
33 #include "db/merge_helper.h"
34 #include "db/range_del_aggregator.h"
35 #include "db/version_set.h"
36 #include "file/filename.h"
37 #include "file/read_write_util.h"
38 #include "file/sst_file_manager_impl.h"
39 #include "file/writable_file_writer.h"
40 #include "logging/log_buffer.h"
41 #include "logging/logging.h"
42 #include "monitoring/iostats_context_imp.h"
43 #include "monitoring/perf_context_imp.h"
44 #include "monitoring/thread_status_util.h"
45 #include "port/port.h"
46 #include "rocksdb/db.h"
47 #include "rocksdb/env.h"
48 #include "rocksdb/statistics.h"
49 #include "rocksdb/status.h"
50 #include "rocksdb/table.h"
51 #include "table/block_based/block.h"
52 #include "table/block_based/block_based_table_factory.h"
53 #include "table/merging_iterator.h"
54 #include "table/table_builder.h"
55 #include "test_util/sync_point.h"
56 #include "util/coding.h"
57 #include "util/mutexlock.h"
58 #include "util/random.h"
59 #include "util/stop_watch.h"
60 #include "util/string_util.h"
61
62 namespace ROCKSDB_NAMESPACE {
63
64 const char* GetCompactionReasonString(CompactionReason compaction_reason) {
65 switch (compaction_reason) {
66 case CompactionReason::kUnknown:
67 return "Unknown";
68 case CompactionReason::kLevelL0FilesNum:
69 return "LevelL0FilesNum";
70 case CompactionReason::kLevelMaxLevelSize:
71 return "LevelMaxLevelSize";
72 case CompactionReason::kUniversalSizeAmplification:
73 return "UniversalSizeAmplification";
74 case CompactionReason::kUniversalSizeRatio:
75 return "UniversalSizeRatio";
76 case CompactionReason::kUniversalSortedRunNum:
77 return "UniversalSortedRunNum";
78 case CompactionReason::kFIFOMaxSize:
79 return "FIFOMaxSize";
80 case CompactionReason::kFIFOReduceNumFiles:
81 return "FIFOReduceNumFiles";
82 case CompactionReason::kFIFOTtl:
83 return "FIFOTtl";
84 case CompactionReason::kManualCompaction:
85 return "ManualCompaction";
86 case CompactionReason::kFilesMarkedForCompaction:
87 return "FilesMarkedForCompaction";
88 case CompactionReason::kBottommostFiles:
89 return "BottommostFiles";
90 case CompactionReason::kTtl:
91 return "Ttl";
92 case CompactionReason::kFlush:
93 return "Flush";
94 case CompactionReason::kExternalSstIngestion:
95 return "ExternalSstIngestion";
96 case CompactionReason::kPeriodicCompaction:
97 return "PeriodicCompaction";
98 case CompactionReason::kNumOfReasons:
99 // fall through
100 default:
101 assert(false);
102 return "Invalid";
103 }
104 }
105
106 // Maintains state for each sub-compaction
107 struct CompactionJob::SubcompactionState {
108 const Compaction* compaction;
109 std::unique_ptr<CompactionIterator> c_iter;
110
111 // The boundaries of the key-range this compaction is interested in. No two
112 // subcompactions may have overlapping key-ranges.
113 // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
114 Slice *start, *end;
115
116 // The return status of this subcompaction
117 Status status;
118
119 // Files produced by this subcompaction
120 struct Output {
121 FileMetaData meta;
122 bool finished;
123 std::shared_ptr<const TableProperties> table_properties;
124 };
125
126 // State kept for output being generated
127 std::vector<Output> outputs;
128 std::unique_ptr<WritableFileWriter> outfile;
129 std::unique_ptr<TableBuilder> builder;
130 Output* current_output() {
131 if (outputs.empty()) {
132 // This subcompaction's outptut could be empty if compaction was aborted
133 // before this subcompaction had a chance to generate any output files.
134 // When subcompactions are executed sequentially this is more likely and
135 // will be particulalry likely for the later subcompactions to be empty.
136 // Once they are run in parallel however it should be much rarer.
137 return nullptr;
138 } else {
139 return &outputs.back();
140 }
141 }
142
143 uint64_t current_output_file_size;
144
145 // State during the subcompaction
146 uint64_t total_bytes;
147 uint64_t num_output_records;
148 CompactionJobStats compaction_job_stats;
149 uint64_t approx_size;
150 // An index that used to speed up ShouldStopBefore().
151 size_t grandparent_index = 0;
152 // The number of bytes overlapping between the current output and
153 // grandparent files used in ShouldStopBefore().
154 uint64_t overlapped_bytes = 0;
155 // A flag determine whether the key has been seen in ShouldStopBefore()
156 bool seen_key = false;
157
158 SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
159 uint64_t size = 0)
160 : compaction(c),
161 start(_start),
162 end(_end),
163 outfile(nullptr),
164 builder(nullptr),
165 current_output_file_size(0),
166 total_bytes(0),
167 num_output_records(0),
168 approx_size(size),
169 grandparent_index(0),
170 overlapped_bytes(0),
171 seen_key(false) {
172 assert(compaction != nullptr);
173 }
174
175 SubcompactionState(SubcompactionState&& o) { *this = std::move(o); }
176
177 SubcompactionState& operator=(SubcompactionState&& o) {
178 compaction = std::move(o.compaction);
179 start = std::move(o.start);
180 end = std::move(o.end);
181 status = std::move(o.status);
182 outputs = std::move(o.outputs);
183 outfile = std::move(o.outfile);
184 builder = std::move(o.builder);
185 current_output_file_size = std::move(o.current_output_file_size);
186 total_bytes = std::move(o.total_bytes);
187 num_output_records = std::move(o.num_output_records);
188 compaction_job_stats = std::move(o.compaction_job_stats);
189 approx_size = std::move(o.approx_size);
190 grandparent_index = std::move(o.grandparent_index);
191 overlapped_bytes = std::move(o.overlapped_bytes);
192 seen_key = std::move(o.seen_key);
193 return *this;
194 }
195
196 // Because member std::unique_ptrs do not have these.
197 SubcompactionState(const SubcompactionState&) = delete;
198
199 SubcompactionState& operator=(const SubcompactionState&) = delete;
200
201 // Returns true iff we should stop building the current output
202 // before processing "internal_key".
203 bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) {
204 const InternalKeyComparator* icmp =
205 &compaction->column_family_data()->internal_comparator();
206 const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
207
208 // Scan to find earliest grandparent file that contains key.
209 while (grandparent_index < grandparents.size() &&
210 icmp->Compare(internal_key,
211 grandparents[grandparent_index]->largest.Encode()) >
212 0) {
213 if (seen_key) {
214 overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize();
215 }
216 assert(grandparent_index + 1 >= grandparents.size() ||
217 icmp->Compare(
218 grandparents[grandparent_index]->largest.Encode(),
219 grandparents[grandparent_index + 1]->smallest.Encode()) <= 0);
220 grandparent_index++;
221 }
222 seen_key = true;
223
224 if (overlapped_bytes + curr_file_size >
225 compaction->max_compaction_bytes()) {
226 // Too much overlap for current output; start new output
227 overlapped_bytes = 0;
228 return true;
229 }
230
231 return false;
232 }
233 };
234
235 // Maintains state for the entire compaction
236 struct CompactionJob::CompactionState {
237 Compaction* const compaction;
238
239 // REQUIRED: subcompaction states are stored in order of increasing
240 // key-range
241 std::vector<CompactionJob::SubcompactionState> sub_compact_states;
242 Status status;
243
244 uint64_t total_bytes;
245 uint64_t num_output_records;
246
247 explicit CompactionState(Compaction* c)
248 : compaction(c),
249 total_bytes(0),
250 num_output_records(0) {}
251
252 size_t NumOutputFiles() {
253 size_t total = 0;
254 for (auto& s : sub_compact_states) {
255 total += s.outputs.size();
256 }
257 return total;
258 }
259
260 Slice SmallestUserKey() {
261 for (const auto& sub_compact_state : sub_compact_states) {
262 if (!sub_compact_state.outputs.empty() &&
263 sub_compact_state.outputs[0].finished) {
264 return sub_compact_state.outputs[0].meta.smallest.user_key();
265 }
266 }
267 // If there is no finished output, return an empty slice.
268 return Slice(nullptr, 0);
269 }
270
271 Slice LargestUserKey() {
272 for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend();
273 ++it) {
274 if (!it->outputs.empty() && it->current_output()->finished) {
275 assert(it->current_output() != nullptr);
276 return it->current_output()->meta.largest.user_key();
277 }
278 }
279 // If there is no finished output, return an empty slice.
280 return Slice(nullptr, 0);
281 }
282 };
283
284 void CompactionJob::AggregateStatistics() {
285 for (SubcompactionState& sc : compact_->sub_compact_states) {
286 compact_->total_bytes += sc.total_bytes;
287 compact_->num_output_records += sc.num_output_records;
288 }
289 if (compaction_job_stats_) {
290 for (SubcompactionState& sc : compact_->sub_compact_states) {
291 compaction_job_stats_->Add(sc.compaction_job_stats);
292 }
293 }
294 }
295
296 CompactionJob::CompactionJob(
297 int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
298 const FileOptions& file_options, VersionSet* versions,
299 const std::atomic<bool>* shutting_down,
300 const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
301 Directory* db_directory, Directory* output_directory, Statistics* stats,
302 InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
303 std::vector<SequenceNumber> existing_snapshots,
304 SequenceNumber earliest_write_conflict_snapshot,
305 const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
306 EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
307 const std::string& dbname, CompactionJobStats* compaction_job_stats,
308 Env::Priority thread_pri, const std::atomic<bool>* manual_compaction_paused)
309 : job_id_(job_id),
310 compact_(new CompactionState(compaction)),
311 compaction_job_stats_(compaction_job_stats),
312 compaction_stats_(compaction->compaction_reason(), 1),
313 dbname_(dbname),
314 db_options_(db_options),
315 file_options_(file_options),
316 env_(db_options.env),
317 fs_(db_options.fs.get()),
318 file_options_for_read_(
319 fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
320 versions_(versions),
321 shutting_down_(shutting_down),
322 manual_compaction_paused_(manual_compaction_paused),
323 preserve_deletes_seqnum_(preserve_deletes_seqnum),
324 log_buffer_(log_buffer),
325 db_directory_(db_directory),
326 output_directory_(output_directory),
327 stats_(stats),
328 db_mutex_(db_mutex),
329 db_error_handler_(db_error_handler),
330 existing_snapshots_(std::move(existing_snapshots)),
331 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
332 snapshot_checker_(snapshot_checker),
333 table_cache_(std::move(table_cache)),
334 event_logger_(event_logger),
335 bottommost_level_(false),
336 paranoid_file_checks_(paranoid_file_checks),
337 measure_io_stats_(measure_io_stats),
338 write_hint_(Env::WLTH_NOT_SET),
339 thread_pri_(thread_pri) {
340 assert(log_buffer_ != nullptr);
341 const auto* cfd = compact_->compaction->column_family_data();
342 ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
343 db_options_.enable_thread_tracking);
344 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
345 ReportStartedCompaction(compaction);
346 }
347
348 CompactionJob::~CompactionJob() {
349 assert(compact_ == nullptr);
350 ThreadStatusUtil::ResetThreadStatus();
351 }
352
353 void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
354 const auto* cfd = compact_->compaction->column_family_data();
355 ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
356 db_options_.enable_thread_tracking);
357
358 ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
359 job_id_);
360
361 ThreadStatusUtil::SetThreadOperationProperty(
362 ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
363 (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
364 compact_->compaction->output_level());
365
366 // In the current design, a CompactionJob is always created
367 // for non-trivial compaction.
368 assert(compaction->IsTrivialMove() == false ||
369 compaction->is_manual_compaction() == true);
370
371 ThreadStatusUtil::SetThreadOperationProperty(
372 ThreadStatus::COMPACTION_PROP_FLAGS,
373 compaction->is_manual_compaction() +
374 (compaction->deletion_compaction() << 1));
375
376 ThreadStatusUtil::SetThreadOperationProperty(
377 ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES,
378 compaction->CalculateTotalInputSize());
379
380 IOSTATS_RESET(bytes_written);
381 IOSTATS_RESET(bytes_read);
382 ThreadStatusUtil::SetThreadOperationProperty(
383 ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
384 ThreadStatusUtil::SetThreadOperationProperty(
385 ThreadStatus::COMPACTION_BYTES_READ, 0);
386
387 // Set the thread operation after operation properties
388 // to ensure GetThreadList() can always show them all together.
389 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
390
391 if (compaction_job_stats_) {
392 compaction_job_stats_->is_manual_compaction =
393 compaction->is_manual_compaction();
394 }
395 }
396
397 void CompactionJob::Prepare() {
398 AutoThreadOperationStageUpdater stage_updater(
399 ThreadStatus::STAGE_COMPACTION_PREPARE);
400
401 // Generate file_levels_ for compaction berfore making Iterator
402 auto* c = compact_->compaction;
403 assert(c->column_family_data() != nullptr);
404 assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
405 compact_->compaction->level()) > 0);
406
407 write_hint_ =
408 c->column_family_data()->CalculateSSTWriteHint(c->output_level());
409 bottommost_level_ = c->bottommost_level();
410
411 if (c->ShouldFormSubcompactions()) {
412 {
413 StopWatch sw(env_, stats_, SUBCOMPACTION_SETUP_TIME);
414 GenSubcompactionBoundaries();
415 }
416 assert(sizes_.size() == boundaries_.size() + 1);
417
418 for (size_t i = 0; i <= boundaries_.size(); i++) {
419 Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
420 Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
421 compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
422 }
423 RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
424 compact_->sub_compact_states.size());
425 } else {
426 compact_->sub_compact_states.emplace_back(c, nullptr, nullptr);
427 }
428 }
429
430 struct RangeWithSize {
431 Range range;
432 uint64_t size;
433
434 RangeWithSize(const Slice& a, const Slice& b, uint64_t s = 0)
435 : range(a, b), size(s) {}
436 };
437
438 void CompactionJob::GenSubcompactionBoundaries() {
439 auto* c = compact_->compaction;
440 auto* cfd = c->column_family_data();
441 const Comparator* cfd_comparator = cfd->user_comparator();
442 std::vector<Slice> bounds;
443 int start_lvl = c->start_level();
444 int out_lvl = c->output_level();
445
446 // Add the starting and/or ending key of certain input files as a potential
447 // boundary
448 for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
449 int lvl = c->level(lvl_idx);
450 if (lvl >= start_lvl && lvl <= out_lvl) {
451 const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
452 size_t num_files = flevel->num_files;
453
454 if (num_files == 0) {
455 continue;
456 }
457
458 if (lvl == 0) {
459 // For level 0 add the starting and ending key of each file since the
460 // files may have greatly differing key ranges (not range-partitioned)
461 for (size_t i = 0; i < num_files; i++) {
462 bounds.emplace_back(flevel->files[i].smallest_key);
463 bounds.emplace_back(flevel->files[i].largest_key);
464 }
465 } else {
466 // For all other levels add the smallest/largest key in the level to
467 // encompass the range covered by that level
468 bounds.emplace_back(flevel->files[0].smallest_key);
469 bounds.emplace_back(flevel->files[num_files - 1].largest_key);
470 if (lvl == out_lvl) {
471 // For the last level include the starting keys of all files since
472 // the last level is the largest and probably has the widest key
473 // range. Since it's range partitioned, the ending key of one file
474 // and the starting key of the next are very close (or identical).
475 for (size_t i = 1; i < num_files; i++) {
476 bounds.emplace_back(flevel->files[i].smallest_key);
477 }
478 }
479 }
480 }
481 }
482
483 std::sort(bounds.begin(), bounds.end(),
484 [cfd_comparator](const Slice& a, const Slice& b) -> bool {
485 return cfd_comparator->Compare(ExtractUserKey(a),
486 ExtractUserKey(b)) < 0;
487 });
488 // Remove duplicated entries from bounds
489 bounds.erase(
490 std::unique(bounds.begin(), bounds.end(),
491 [cfd_comparator](const Slice& a, const Slice& b) -> bool {
492 return cfd_comparator->Compare(ExtractUserKey(a),
493 ExtractUserKey(b)) == 0;
494 }),
495 bounds.end());
496
497 // Combine consecutive pairs of boundaries into ranges with an approximate
498 // size of data covered by keys in that range
499 uint64_t sum = 0;
500 std::vector<RangeWithSize> ranges;
501 // Get input version from CompactionState since it's already referenced
502 // earlier in SetInputVersioCompaction::SetInputVersion and will not change
503 // when db_mutex_ is released below
504 auto* v = compact_->compaction->input_version();
505 for (auto it = bounds.begin();;) {
506 const Slice a = *it;
507 ++it;
508
509 if (it == bounds.end()) {
510 break;
511 }
512
513 const Slice b = *it;
514
515 // ApproximateSize could potentially create table reader iterator to seek
516 // to the index block and may incur I/O cost in the process. Unlock db
517 // mutex to reduce contention
518 db_mutex_->Unlock();
519 uint64_t size = versions_->ApproximateSize(SizeApproximationOptions(), v, a,
520 b, start_lvl, out_lvl + 1,
521 TableReaderCaller::kCompaction);
522 db_mutex_->Lock();
523 ranges.emplace_back(a, b, size);
524 sum += size;
525 }
526
527 // Group the ranges into subcompactions
528 const double min_file_fill_percent = 4.0 / 5;
529 int base_level = v->storage_info()->base_level();
530 uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
531 sum / min_file_fill_percent /
532 MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl,
533 c->immutable_cf_options()->compaction_style, base_level,
534 c->immutable_cf_options()->level_compaction_dynamic_level_bytes)));
535 uint64_t subcompactions =
536 std::min({static_cast<uint64_t>(ranges.size()),
537 static_cast<uint64_t>(c->max_subcompactions()),
538 max_output_files});
539
540 if (subcompactions > 1) {
541 double mean = sum * 1.0 / subcompactions;
542 // Greedily add ranges to the subcompaction until the sum of the ranges'
543 // sizes becomes >= the expected mean size of a subcompaction
544 sum = 0;
545 for (size_t i = 0; i < ranges.size() - 1; i++) {
546 sum += ranges[i].size;
547 if (subcompactions == 1) {
548 // If there's only one left to schedule then it goes to the end so no
549 // need to put an end boundary
550 continue;
551 }
552 if (sum >= mean) {
553 boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
554 sizes_.emplace_back(sum);
555 subcompactions--;
556 sum = 0;
557 }
558 }
559 sizes_.emplace_back(sum + ranges.back().size);
560 } else {
561 // Only one range so its size is the total sum of sizes computed above
562 sizes_.emplace_back(sum);
563 }
564 }
565
566 Status CompactionJob::Run() {
567 AutoThreadOperationStageUpdater stage_updater(
568 ThreadStatus::STAGE_COMPACTION_RUN);
569 TEST_SYNC_POINT("CompactionJob::Run():Start");
570 log_buffer_->FlushBufferToLog();
571 LogCompaction();
572
573 const size_t num_threads = compact_->sub_compact_states.size();
574 assert(num_threads > 0);
575 const uint64_t start_micros = env_->NowMicros();
576
577 // Launch a thread for each of subcompactions 1...num_threads-1
578 std::vector<port::Thread> thread_pool;
579 thread_pool.reserve(num_threads - 1);
580 for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
581 thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
582 &compact_->sub_compact_states[i]);
583 }
584
585 // Always schedule the first subcompaction (whether or not there are also
586 // others) in the current thread to be efficient with resources
587 ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);
588
589 // Wait for all other threads (if there are any) to finish execution
590 for (auto& thread : thread_pool) {
591 thread.join();
592 }
593
594 compaction_stats_.micros = env_->NowMicros() - start_micros;
595 compaction_stats_.cpu_micros = 0;
596 for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
597 compaction_stats_.cpu_micros +=
598 compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
599 }
600
601 RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
602 RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
603 compaction_stats_.cpu_micros);
604
605 TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
606
607 // Check if any thread encountered an error during execution
608 Status status;
609 for (const auto& state : compact_->sub_compact_states) {
610 if (!state.status.ok()) {
611 status = state.status;
612 break;
613 }
614 }
615
616 if (status.ok() && output_directory_) {
617 status = output_directory_->Fsync();
618 }
619
620 if (status.ok()) {
621 thread_pool.clear();
622 std::vector<const FileMetaData*> files_meta;
623 for (const auto& state : compact_->sub_compact_states) {
624 for (const auto& output : state.outputs) {
625 files_meta.emplace_back(&output.meta);
626 }
627 }
628 ColumnFamilyData* cfd = compact_->compaction->column_family_data();
629 auto prefix_extractor =
630 compact_->compaction->mutable_cf_options()->prefix_extractor.get();
631 std::atomic<size_t> next_file_meta_idx(0);
632 auto verify_table = [&](Status& output_status) {
633 while (true) {
634 size_t file_idx = next_file_meta_idx.fetch_add(1);
635 if (file_idx >= files_meta.size()) {
636 break;
637 }
638 // Verify that the table is usable
639 // We set for_compaction to false and don't OptimizeForCompactionTableRead
640 // here because this is a special case after we finish the table building
641 // No matter whether use_direct_io_for_flush_and_compaction is true,
642 // we will regard this verification as user reads since the goal is
643 // to cache it here for further user reads
644 InternalIterator* iter = cfd->table_cache()->NewIterator(
645 ReadOptions(), file_options_, cfd->internal_comparator(),
646 *files_meta[file_idx], /*range_del_agg=*/nullptr, prefix_extractor,
647 /*table_reader_ptr=*/nullptr,
648 cfd->internal_stats()->GetFileReadHist(
649 compact_->compaction->output_level()),
650 TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
651 /*skip_filters=*/false, compact_->compaction->output_level(),
652 /*smallest_compaction_key=*/nullptr,
653 /*largest_compaction_key=*/nullptr);
654 auto s = iter->status();
655
656 if (s.ok() && paranoid_file_checks_) {
657 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
658 s = iter->status();
659 }
660
661 delete iter;
662
663 if (!s.ok()) {
664 output_status = s;
665 break;
666 }
667 }
668 };
669 for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
670 thread_pool.emplace_back(verify_table,
671 std::ref(compact_->sub_compact_states[i].status));
672 }
673 verify_table(compact_->sub_compact_states[0].status);
674 for (auto& thread : thread_pool) {
675 thread.join();
676 }
677 for (const auto& state : compact_->sub_compact_states) {
678 if (!state.status.ok()) {
679 status = state.status;
680 break;
681 }
682 }
683 }
684
685 TablePropertiesCollection tp;
686 for (const auto& state : compact_->sub_compact_states) {
687 for (const auto& output : state.outputs) {
688 auto fn =
689 TableFileName(state.compaction->immutable_cf_options()->cf_paths,
690 output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
691 tp[fn] = output.table_properties;
692 }
693 }
694 compact_->compaction->SetOutputTableProperties(std::move(tp));
695
696 // Finish up all book-keeping to unify the subcompaction results
697 AggregateStatistics();
698 UpdateCompactionStats();
699 RecordCompactionIOStats();
700 LogFlush(db_options_.info_log);
701 TEST_SYNC_POINT("CompactionJob::Run():End");
702
703 compact_->status = status;
704 return status;
705 }
706
707 Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
708 AutoThreadOperationStageUpdater stage_updater(
709 ThreadStatus::STAGE_COMPACTION_INSTALL);
710 db_mutex_->AssertHeld();
711 Status status = compact_->status;
712 ColumnFamilyData* cfd = compact_->compaction->column_family_data();
713 cfd->internal_stats()->AddCompactionStats(
714 compact_->compaction->output_level(), thread_pri_, compaction_stats_);
715
716 if (status.ok()) {
717 status = InstallCompactionResults(mutable_cf_options);
718 }
719 VersionStorageInfo::LevelSummaryStorage tmp;
720 auto vstorage = cfd->current()->storage_info();
721 const auto& stats = compaction_stats_;
722
723 double read_write_amp = 0.0;
724 double write_amp = 0.0;
725 double bytes_read_per_sec = 0;
726 double bytes_written_per_sec = 0;
727
728 if (stats.bytes_read_non_output_levels > 0) {
729 read_write_amp = (stats.bytes_written + stats.bytes_read_output_level +
730 stats.bytes_read_non_output_levels) /
731 static_cast<double>(stats.bytes_read_non_output_levels);
732 write_amp = stats.bytes_written /
733 static_cast<double>(stats.bytes_read_non_output_levels);
734 }
735 if (stats.micros > 0) {
736 bytes_read_per_sec =
737 (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
738 static_cast<double>(stats.micros);
739 bytes_written_per_sec =
740 stats.bytes_written / static_cast<double>(stats.micros);
741 }
742
743 ROCKS_LOG_BUFFER(
744 log_buffer_,
745 "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
746 "files in(%d, %d) out(%d) "
747 "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
748 "write-amplify(%.1f) %s, records in: %" PRIu64
749 ", records dropped: %" PRIu64 " output_compression: %s\n",
750 cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), bytes_read_per_sec,
751 bytes_written_per_sec, compact_->compaction->output_level(),
752 stats.num_input_files_in_non_output_levels,
753 stats.num_input_files_in_output_level, stats.num_output_files,
754 stats.bytes_read_non_output_levels / 1048576.0,
755 stats.bytes_read_output_level / 1048576.0,
756 stats.bytes_written / 1048576.0, read_write_amp, write_amp,
757 status.ToString().c_str(), stats.num_input_records,
758 stats.num_dropped_records,
759 CompressionTypeToString(compact_->compaction->output_compression())
760 .c_str());
761
762 UpdateCompactionJobStats(stats);
763
764 auto stream = event_logger_->LogToBuffer(log_buffer_);
765 stream << "job" << job_id_ << "event"
766 << "compaction_finished"
767 << "compaction_time_micros" << stats.micros
768 << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
769 << compact_->compaction->output_level() << "num_output_files"
770 << compact_->NumOutputFiles() << "total_output_size"
771 << compact_->total_bytes << "num_input_records"
772 << stats.num_input_records << "num_output_records"
773 << compact_->num_output_records << "num_subcompactions"
774 << compact_->sub_compact_states.size() << "output_compression"
775 << CompressionTypeToString(compact_->compaction->output_compression());
776
777 if (compaction_job_stats_ != nullptr) {
778 stream << "num_single_delete_mismatches"
779 << compaction_job_stats_->num_single_del_mismatch;
780 stream << "num_single_delete_fallthrough"
781 << compaction_job_stats_->num_single_del_fallthru;
782 }
783
784 if (measure_io_stats_ && compaction_job_stats_ != nullptr) {
785 stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos;
786 stream << "file_range_sync_nanos"
787 << compaction_job_stats_->file_range_sync_nanos;
788 stream << "file_fsync_nanos" << compaction_job_stats_->file_fsync_nanos;
789 stream << "file_prepare_write_nanos"
790 << compaction_job_stats_->file_prepare_write_nanos;
791 }
792
793 stream << "lsm_state";
794 stream.StartArray();
795 for (int level = 0; level < vstorage->num_levels(); ++level) {
796 stream << vstorage->NumLevelFiles(level);
797 }
798 stream.EndArray();
799
800 CleanupCompaction();
801 return status;
802 }
803
804 void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
805 assert(sub_compact != nullptr);
806
807 uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
808
809 ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
810
811 // Create compaction filter and fail the compaction if
812 // IgnoreSnapshots() = false because it is not supported anymore
813 const CompactionFilter* compaction_filter =
814 cfd->ioptions()->compaction_filter;
815 std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
816 if (compaction_filter == nullptr) {
817 compaction_filter_from_factory =
818 sub_compact->compaction->CreateCompactionFilter();
819 compaction_filter = compaction_filter_from_factory.get();
820 }
821 if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
822 sub_compact->status = Status::NotSupported(
823 "CompactionFilter::IgnoreSnapshots() = false is not supported "
824 "anymore.");
825 return;
826 }
827
828 CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
829 existing_snapshots_);
830
831 // Although the v2 aggregator is what the level iterator(s) know about,
832 // the AddTombstones calls will be propagated down to the v1 aggregator.
833 std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
834 sub_compact->compaction, &range_del_agg, file_options_for_read_));
835
836 AutoThreadOperationStageUpdater stage_updater(
837 ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
838
839 // I/O measurement variables
840 PerfLevel prev_perf_level = PerfLevel::kEnableTime;
841 const uint64_t kRecordStatsEvery = 1000;
842 uint64_t prev_write_nanos = 0;
843 uint64_t prev_fsync_nanos = 0;
844 uint64_t prev_range_sync_nanos = 0;
845 uint64_t prev_prepare_write_nanos = 0;
846 uint64_t prev_cpu_write_nanos = 0;
847 uint64_t prev_cpu_read_nanos = 0;
848 if (measure_io_stats_) {
849 prev_perf_level = GetPerfLevel();
850 SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
851 prev_write_nanos = IOSTATS(write_nanos);
852 prev_fsync_nanos = IOSTATS(fsync_nanos);
853 prev_range_sync_nanos = IOSTATS(range_sync_nanos);
854 prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
855 prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
856 prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
857 }
858
859 MergeHelper merge(
860 env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
861 compaction_filter, db_options_.info_log.get(),
862 false /* internal key corruption is expected */,
863 existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
864 snapshot_checker_, compact_->compaction->level(),
865 db_options_.statistics.get());
866
867 TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
868 TEST_SYNC_POINT_CALLBACK(
869 "CompactionJob::Run():PausingManualCompaction:1",
870 reinterpret_cast<void*>(
871 const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
872
873 Slice* start = sub_compact->start;
874 Slice* end = sub_compact->end;
875 if (start != nullptr) {
876 IterKey start_iter;
877 start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
878 input->Seek(start_iter.GetInternalKey());
879 } else {
880 input->SeekToFirst();
881 }
882
883 Status status;
884 sub_compact->c_iter.reset(new CompactionIterator(
885 input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
886 &existing_snapshots_, earliest_write_conflict_snapshot_,
887 snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
888 &range_del_agg, sub_compact->compaction, compaction_filter,
889 shutting_down_, preserve_deletes_seqnum_, manual_compaction_paused_,
890 db_options_.info_log));
891 auto c_iter = sub_compact->c_iter.get();
892 c_iter->SeekToFirst();
893 if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
894 // ShouldStopBefore() maintains state based on keys processed so far. The
895 // compaction loop always calls it on the "next" key, thus won't tell it the
896 // first key. So we do that here.
897 sub_compact->ShouldStopBefore(c_iter->key(),
898 sub_compact->current_output_file_size);
899 }
900 const auto& c_iter_stats = c_iter->iter_stats();
901
902 while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
903 // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
904 // returns true.
905 const Slice& key = c_iter->key();
906 const Slice& value = c_iter->value();
907
908 // If an end key (exclusive) is specified, check if the current key is
909 // >= than it and exit if it is because the iterator is out of its range
910 if (end != nullptr &&
911 cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
912 break;
913 }
914 if (c_iter_stats.num_input_records % kRecordStatsEvery ==
915 kRecordStatsEvery - 1) {
916 RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
917 c_iter->ResetRecordCounts();
918 RecordCompactionIOStats();
919 }
920
921 // Open output file if necessary
922 if (sub_compact->builder == nullptr) {
923 status = OpenCompactionOutputFile(sub_compact);
924 if (!status.ok()) {
925 break;
926 }
927 }
928 assert(sub_compact->builder != nullptr);
929 assert(sub_compact->current_output() != nullptr);
930 sub_compact->builder->Add(key, value);
931 sub_compact->current_output_file_size = sub_compact->builder->FileSize();
932 const ParsedInternalKey& ikey = c_iter->ikey();
933 sub_compact->current_output()->meta.UpdateBoundaries(
934 key, value, ikey.sequence, ikey.type);
935 sub_compact->num_output_records++;
936
937 // Close output file if it is big enough. Two possibilities determine it's
938 // time to close it: (1) the current key should be this file's last key, (2)
939 // the next key should not be in this file.
940 //
941 // TODO(aekmekji): determine if file should be closed earlier than this
942 // during subcompactions (i.e. if output size, estimated by input size, is
943 // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
944 // and 0.6MB instead of 1MB and 0.2MB)
945 bool output_file_ended = false;
946 Status input_status;
947 if (sub_compact->compaction->output_level() != 0 &&
948 sub_compact->current_output_file_size >=
949 sub_compact->compaction->max_output_file_size()) {
950 // (1) this key terminates the file. For historical reasons, the iterator
951 // status before advancing will be given to FinishCompactionOutputFile().
952 input_status = input->status();
953 output_file_ended = true;
954 }
955 TEST_SYNC_POINT_CALLBACK(
956 "CompactionJob::Run():PausingManualCompaction:2",
957 reinterpret_cast<void*>(
958 const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
959 c_iter->Next();
960 if (c_iter->status().IsManualCompactionPaused()) {
961 break;
962 }
963 if (!output_file_ended && c_iter->Valid() &&
964 sub_compact->compaction->output_level() != 0 &&
965 sub_compact->ShouldStopBefore(c_iter->key(),
966 sub_compact->current_output_file_size) &&
967 sub_compact->builder != nullptr) {
968 // (2) this key belongs to the next file. For historical reasons, the
969 // iterator status after advancing will be given to
970 // FinishCompactionOutputFile().
971 input_status = input->status();
972 output_file_ended = true;
973 }
974 if (output_file_ended) {
975 const Slice* next_key = nullptr;
976 if (c_iter->Valid()) {
977 next_key = &c_iter->key();
978 }
979 CompactionIterationStats range_del_out_stats;
980 status =
981 FinishCompactionOutputFile(input_status, sub_compact, &range_del_agg,
982 &range_del_out_stats, next_key);
983 RecordDroppedKeys(range_del_out_stats,
984 &sub_compact->compaction_job_stats);
985 }
986 }
987
988 sub_compact->compaction_job_stats.num_input_deletion_records =
989 c_iter_stats.num_input_deletion_records;
990 sub_compact->compaction_job_stats.num_corrupt_keys =
991 c_iter_stats.num_input_corrupt_records;
992 sub_compact->compaction_job_stats.num_single_del_fallthru =
993 c_iter_stats.num_single_del_fallthru;
994 sub_compact->compaction_job_stats.num_single_del_mismatch =
995 c_iter_stats.num_single_del_mismatch;
996 sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
997 c_iter_stats.total_input_raw_key_bytes;
998 sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
999 c_iter_stats.total_input_raw_value_bytes;
1000
1001 RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
1002 c_iter_stats.total_filter_time);
1003 RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
1004 RecordCompactionIOStats();
1005
1006 if (status.ok() && cfd->IsDropped()) {
1007 status =
1008 Status::ColumnFamilyDropped("Column family dropped during compaction");
1009 }
1010 if ((status.ok() || status.IsColumnFamilyDropped()) &&
1011 shutting_down_->load(std::memory_order_relaxed)) {
1012 status = Status::ShutdownInProgress("Database shutdown");
1013 }
1014 if ((status.ok() || status.IsColumnFamilyDropped()) &&
1015 (manual_compaction_paused_ &&
1016 manual_compaction_paused_->load(std::memory_order_relaxed))) {
1017 status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1018 }
1019 if (status.ok()) {
1020 status = input->status();
1021 }
1022 if (status.ok()) {
1023 status = c_iter->status();
1024 }
1025
1026 if (status.ok() && sub_compact->builder == nullptr &&
1027 sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
1028 // handle subcompaction containing only range deletions
1029 status = OpenCompactionOutputFile(sub_compact);
1030 }
1031
1032 // Call FinishCompactionOutputFile() even if status is not ok: it needs to
1033 // close the output file.
1034 if (sub_compact->builder != nullptr) {
1035 CompactionIterationStats range_del_out_stats;
1036 Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
1037 &range_del_out_stats);
1038 if (status.ok()) {
1039 status = s;
1040 }
1041 RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
1042 }
1043
1044 sub_compact->compaction_job_stats.cpu_micros =
1045 env_->NowCPUNanos() / 1000 - prev_cpu_micros;
1046
1047 if (measure_io_stats_) {
1048 sub_compact->compaction_job_stats.file_write_nanos +=
1049 IOSTATS(write_nanos) - prev_write_nanos;
1050 sub_compact->compaction_job_stats.file_fsync_nanos +=
1051 IOSTATS(fsync_nanos) - prev_fsync_nanos;
1052 sub_compact->compaction_job_stats.file_range_sync_nanos +=
1053 IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
1054 sub_compact->compaction_job_stats.file_prepare_write_nanos +=
1055 IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
1056 sub_compact->compaction_job_stats.cpu_micros -=
1057 (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
1058 IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
1059 1000;
1060 if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
1061 SetPerfLevel(prev_perf_level);
1062 }
1063 }
1064
1065 sub_compact->c_iter.reset();
1066 input.reset();
1067 sub_compact->status = status;
1068 }
1069
1070 void CompactionJob::RecordDroppedKeys(
1071 const CompactionIterationStats& c_iter_stats,
1072 CompactionJobStats* compaction_job_stats) {
1073 if (c_iter_stats.num_record_drop_user > 0) {
1074 RecordTick(stats_, COMPACTION_KEY_DROP_USER,
1075 c_iter_stats.num_record_drop_user);
1076 }
1077 if (c_iter_stats.num_record_drop_hidden > 0) {
1078 RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
1079 c_iter_stats.num_record_drop_hidden);
1080 if (compaction_job_stats) {
1081 compaction_job_stats->num_records_replaced +=
1082 c_iter_stats.num_record_drop_hidden;
1083 }
1084 }
1085 if (c_iter_stats.num_record_drop_obsolete > 0) {
1086 RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE,
1087 c_iter_stats.num_record_drop_obsolete);
1088 if (compaction_job_stats) {
1089 compaction_job_stats->num_expired_deletion_records +=
1090 c_iter_stats.num_record_drop_obsolete;
1091 }
1092 }
1093 if (c_iter_stats.num_record_drop_range_del > 0) {
1094 RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL,
1095 c_iter_stats.num_record_drop_range_del);
1096 }
1097 if (c_iter_stats.num_range_del_drop_obsolete > 0) {
1098 RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE,
1099 c_iter_stats.num_range_del_drop_obsolete);
1100 }
1101 if (c_iter_stats.num_optimized_del_drop_obsolete > 0) {
1102 RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
1103 c_iter_stats.num_optimized_del_drop_obsolete);
1104 }
1105 }
1106
1107 Status CompactionJob::FinishCompactionOutputFile(
1108 const Status& input_status, SubcompactionState* sub_compact,
1109 CompactionRangeDelAggregator* range_del_agg,
1110 CompactionIterationStats* range_del_out_stats,
1111 const Slice* next_table_min_key /* = nullptr */) {
1112 AutoThreadOperationStageUpdater stage_updater(
1113 ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
1114 assert(sub_compact != nullptr);
1115 assert(sub_compact->outfile);
1116 assert(sub_compact->builder != nullptr);
1117 assert(sub_compact->current_output() != nullptr);
1118
1119 uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
1120 assert(output_number != 0);
1121
1122 ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1123 const Comparator* ucmp = cfd->user_comparator();
1124
1125 // Check for iterator errors
1126 Status s = input_status;
1127 auto meta = &sub_compact->current_output()->meta;
1128 assert(meta != nullptr);
1129 if (s.ok()) {
1130 Slice lower_bound_guard, upper_bound_guard;
1131 std::string smallest_user_key;
1132 const Slice *lower_bound, *upper_bound;
1133 bool lower_bound_from_sub_compact = false;
1134 if (sub_compact->outputs.size() == 1) {
1135 // For the first output table, include range tombstones before the min key
1136 // but after the subcompaction boundary.
1137 lower_bound = sub_compact->start;
1138 lower_bound_from_sub_compact = true;
1139 } else if (meta->smallest.size() > 0) {
1140 // For subsequent output tables, only include range tombstones from min
1141 // key onwards since the previous file was extended to contain range
1142 // tombstones falling before min key.
1143 smallest_user_key = meta->smallest.user_key().ToString(false /*hex*/);
1144 lower_bound_guard = Slice(smallest_user_key);
1145 lower_bound = &lower_bound_guard;
1146 } else {
1147 lower_bound = nullptr;
1148 }
1149 if (next_table_min_key != nullptr) {
1150 // This may be the last file in the subcompaction in some cases, so we
1151 // need to compare the end key of subcompaction with the next file start
1152 // key. When the end key is chosen by the subcompaction, we know that
1153 // it must be the biggest key in output file. Therefore, it is safe to
1154 // use the smaller key as the upper bound of the output file, to ensure
1155 // that there is no overlapping between different output files.
1156 upper_bound_guard = ExtractUserKey(*next_table_min_key);
1157 if (sub_compact->end != nullptr &&
1158 ucmp->Compare(upper_bound_guard, *sub_compact->end) >= 0) {
1159 upper_bound = sub_compact->end;
1160 } else {
1161 upper_bound = &upper_bound_guard;
1162 }
1163 } else {
1164 // This is the last file in the subcompaction, so extend until the
1165 // subcompaction ends.
1166 upper_bound = sub_compact->end;
1167 }
1168 auto earliest_snapshot = kMaxSequenceNumber;
1169 if (existing_snapshots_.size() > 0) {
1170 earliest_snapshot = existing_snapshots_[0];
1171 }
1172 bool has_overlapping_endpoints;
1173 if (upper_bound != nullptr && meta->largest.size() > 0) {
1174 has_overlapping_endpoints =
1175 ucmp->Compare(meta->largest.user_key(), *upper_bound) == 0;
1176 } else {
1177 has_overlapping_endpoints = false;
1178 }
1179
1180 // The end key of the subcompaction must be bigger or equal to the upper
1181 // bound. If the end of subcompaction is null or the upper bound is null,
1182 // it means that this file is the last file in the compaction. So there
1183 // will be no overlapping between this file and others.
1184 assert(sub_compact->end == nullptr ||
1185 upper_bound == nullptr ||
1186 ucmp->Compare(*upper_bound , *sub_compact->end) <= 0);
1187 auto it = range_del_agg->NewIterator(lower_bound, upper_bound,
1188 has_overlapping_endpoints);
1189 // Position the range tombstone output iterator. There may be tombstone
1190 // fragments that are entirely out of range, so make sure that we do not
1191 // include those.
1192 if (lower_bound != nullptr) {
1193 it->Seek(*lower_bound);
1194 } else {
1195 it->SeekToFirst();
1196 }
1197 for (; it->Valid(); it->Next()) {
1198 auto tombstone = it->Tombstone();
1199 if (upper_bound != nullptr) {
1200 int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
1201 if ((has_overlapping_endpoints && cmp < 0) ||
1202 (!has_overlapping_endpoints && cmp <= 0)) {
1203 // Tombstones starting after upper_bound only need to be included in
1204 // the next table. If the current SST ends before upper_bound, i.e.,
1205 // `has_overlapping_endpoints == false`, we can also skip over range
1206 // tombstones that start exactly at upper_bound. Such range tombstones
1207 // will be included in the next file and are not relevant to the point
1208 // keys or endpoints of the current file.
1209 break;
1210 }
1211 }
1212
1213 if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
1214 // TODO(andrewkr): tombstones that span multiple output files are
1215 // counted for each compaction output file, so lots of double counting.
1216 range_del_out_stats->num_range_del_drop_obsolete++;
1217 range_del_out_stats->num_record_drop_obsolete++;
1218 continue;
1219 }
1220
1221 auto kv = tombstone.Serialize();
1222 assert(lower_bound == nullptr ||
1223 ucmp->Compare(*lower_bound, kv.second) < 0);
1224 sub_compact->builder->Add(kv.first.Encode(), kv.second);
1225 InternalKey smallest_candidate = std::move(kv.first);
1226 if (lower_bound != nullptr &&
1227 ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
1228 // Pretend the smallest key has the same user key as lower_bound
1229 // (the max key in the previous table or subcompaction) in order for
1230 // files to appear key-space partitioned.
1231 //
1232 // When lower_bound is chosen by a subcompaction, we know that
1233 // subcompactions over smaller keys cannot contain any keys at
1234 // lower_bound. We also know that smaller subcompactions exist, because
1235 // otherwise the subcompaction woud be unbounded on the left. As a
1236 // result, we know that no other files on the output level will contain
1237 // actual keys at lower_bound (an output file may have a largest key of
1238 // lower_bound@kMaxSequenceNumber, but this only indicates a large range
1239 // tombstone was truncated). Therefore, it is safe to use the
1240 // tombstone's sequence number, to ensure that keys at lower_bound at
1241 // lower levels are covered by truncated tombstones.
1242 //
1243 // If lower_bound was chosen by the smallest data key in the file,
1244 // choose lowest seqnum so this file's smallest internal key comes after
1245 // the previous file's largest. The fake seqnum is OK because the read
1246 // path's file-picking code only considers user key.
1247 smallest_candidate = InternalKey(
1248 *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
1249 kTypeRangeDeletion);
1250 }
1251 InternalKey largest_candidate = tombstone.SerializeEndKey();
1252 if (upper_bound != nullptr &&
1253 ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
1254 // Pretend the largest key has the same user key as upper_bound (the
1255 // min key in the following table or subcompaction) in order for files
1256 // to appear key-space partitioned.
1257 //
1258 // Choose highest seqnum so this file's largest internal key comes
1259 // before the next file's/subcompaction's smallest. The fake seqnum is
1260 // OK because the read path's file-picking code only considers the user
1261 // key portion.
1262 //
1263 // Note Seek() also creates InternalKey with (user_key,
1264 // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
1265 // kTypeRangeDeletion (0xF), so the range tombstone comes before the
1266 // Seek() key in InternalKey's ordering. So Seek() will look in the
1267 // next file for the user key.
1268 largest_candidate =
1269 InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
1270 }
1271 #ifndef NDEBUG
1272 SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;
1273 if (meta->smallest.size() > 0) {
1274 smallest_ikey_seqnum = GetInternalKeySeqno(meta->smallest.Encode());
1275 }
1276 #endif
1277 meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
1278 tombstone.seq_,
1279 cfd->internal_comparator());
1280
1281 // The smallest key in a file is used for range tombstone truncation, so
1282 // it cannot have a seqnum of 0 (unless the smallest data key in a file
1283 // has a seqnum of 0). Otherwise, the truncated tombstone may expose
1284 // deleted keys at lower levels.
1285 assert(smallest_ikey_seqnum == 0 ||
1286 ExtractInternalKeyFooter(meta->smallest.Encode()) !=
1287 PackSequenceAndType(0, kTypeRangeDeletion));
1288 }
1289 meta->marked_for_compaction = sub_compact->builder->NeedCompact();
1290 }
1291 const uint64_t current_entries = sub_compact->builder->NumEntries();
1292 if (s.ok()) {
1293 s = sub_compact->builder->Finish();
1294 } else {
1295 sub_compact->builder->Abandon();
1296 }
1297 const uint64_t current_bytes = sub_compact->builder->FileSize();
1298 if (s.ok()) {
1299 // Add the checksum information to file metadata.
1300 meta->file_checksum = sub_compact->builder->GetFileChecksum();
1301 meta->file_checksum_func_name =
1302 sub_compact->builder->GetFileChecksumFuncName();
1303
1304 meta->fd.file_size = current_bytes;
1305 }
1306 sub_compact->current_output()->finished = true;
1307 sub_compact->total_bytes += current_bytes;
1308
1309 // Finish and check for file errors
1310 if (s.ok()) {
1311 StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
1312 s = sub_compact->outfile->Sync(db_options_.use_fsync);
1313 }
1314 if (s.ok()) {
1315 s = sub_compact->outfile->Close();
1316 }
1317 sub_compact->outfile.reset();
1318
1319 TableProperties tp;
1320 if (s.ok()) {
1321 tp = sub_compact->builder->GetTableProperties();
1322 }
1323
1324 if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
1325 // If there is nothing to output, no necessary to generate a sst file.
1326 // This happens when the output level is bottom level, at the same time
1327 // the sub_compact output nothing.
1328 std::string fname =
1329 TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
1330 meta->fd.GetNumber(), meta->fd.GetPathId());
1331 env_->DeleteFile(fname);
1332
1333 // Also need to remove the file from outputs, or it will be added to the
1334 // VersionEdit.
1335 assert(!sub_compact->outputs.empty());
1336 sub_compact->outputs.pop_back();
1337 meta = nullptr;
1338 }
1339
1340 if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
1341 // Output to event logger and fire events.
1342 sub_compact->current_output()->table_properties =
1343 std::make_shared<TableProperties>(tp);
1344 ROCKS_LOG_INFO(db_options_.info_log,
1345 "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
1346 " keys, %" PRIu64 " bytes%s",
1347 cfd->GetName().c_str(), job_id_, output_number,
1348 current_entries, current_bytes,
1349 meta->marked_for_compaction ? " (need compaction)" : "");
1350 }
1351 std::string fname;
1352 FileDescriptor output_fd;
1353 uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
1354 if (meta != nullptr) {
1355 fname =
1356 TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
1357 meta->fd.GetNumber(), meta->fd.GetPathId());
1358 output_fd = meta->fd;
1359 oldest_blob_file_number = meta->oldest_blob_file_number;
1360 } else {
1361 fname = "(nil)";
1362 }
1363 EventHelpers::LogAndNotifyTableFileCreationFinished(
1364 event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
1365 job_id_, output_fd, oldest_blob_file_number, tp,
1366 TableFileCreationReason::kCompaction, s);
1367
1368 #ifndef ROCKSDB_LITE
1369 // Report new file to SstFileManagerImpl
1370 auto sfm =
1371 static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
1372 if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
1373 sfm->OnAddFile(fname);
1374 if (sfm->IsMaxAllowedSpaceReached()) {
1375 // TODO(ajkr): should we return OK() if max space was reached by the final
1376 // compaction output file (similarly to how flush works when full)?
1377 s = Status::SpaceLimit("Max allowed space was reached");
1378 TEST_SYNC_POINT(
1379 "CompactionJob::FinishCompactionOutputFile:"
1380 "MaxAllowedSpaceReached");
1381 InstrumentedMutexLock l(db_mutex_);
1382 db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
1383 }
1384 }
1385 #endif
1386
1387 sub_compact->builder.reset();
1388 sub_compact->current_output_file_size = 0;
1389 return s;
1390 }
1391
1392 Status CompactionJob::InstallCompactionResults(
1393 const MutableCFOptions& mutable_cf_options) {
1394 db_mutex_->AssertHeld();
1395
1396 auto* compaction = compact_->compaction;
1397 // paranoia: verify that the files that we started with
1398 // still exist in the current version and in the same original level.
1399 // This ensures that a concurrent compaction did not erroneously
1400 // pick the same files to compact_.
1401 if (!versions_->VerifyCompactionFileConsistency(compaction)) {
1402 Compaction::InputLevelSummaryBuffer inputs_summary;
1403
1404 ROCKS_LOG_ERROR(db_options_.info_log, "[%s] [JOB %d] Compaction %s aborted",
1405 compaction->column_family_data()->GetName().c_str(),
1406 job_id_, compaction->InputLevelSummary(&inputs_summary));
1407 return Status::Corruption("Compaction input files inconsistent");
1408 }
1409
1410 {
1411 Compaction::InputLevelSummaryBuffer inputs_summary;
1412 ROCKS_LOG_INFO(
1413 db_options_.info_log, "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
1414 compaction->column_family_data()->GetName().c_str(), job_id_,
1415 compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
1416 }
1417
1418 // Add compaction inputs
1419 compaction->AddInputDeletions(compact_->compaction->edit());
1420
1421 for (const auto& sub_compact : compact_->sub_compact_states) {
1422 for (const auto& out : sub_compact.outputs) {
1423 compaction->edit()->AddFile(compaction->output_level(), out.meta);
1424 }
1425 }
1426 return versions_->LogAndApply(compaction->column_family_data(),
1427 mutable_cf_options, compaction->edit(),
1428 db_mutex_, db_directory_);
1429 }
1430
1431 void CompactionJob::RecordCompactionIOStats() {
1432 RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
1433 ThreadStatusUtil::IncreaseThreadOperationProperty(
1434 ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
1435 IOSTATS_RESET(bytes_read);
1436 RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
1437 ThreadStatusUtil::IncreaseThreadOperationProperty(
1438 ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
1439 IOSTATS_RESET(bytes_written);
1440 }
1441
1442 Status CompactionJob::OpenCompactionOutputFile(
1443 SubcompactionState* sub_compact) {
1444 assert(sub_compact != nullptr);
1445 assert(sub_compact->builder == nullptr);
1446 // no need to lock because VersionSet::next_file_number_ is atomic
1447 uint64_t file_number = versions_->NewFileNumber();
1448 std::string fname =
1449 TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
1450 file_number, sub_compact->compaction->output_path_id());
1451 // Fire events.
1452 ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1453 #ifndef ROCKSDB_LITE
1454 EventHelpers::NotifyTableFileCreationStarted(
1455 cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, job_id_,
1456 TableFileCreationReason::kCompaction);
1457 #endif // !ROCKSDB_LITE
1458 // Make the output file
1459 std::unique_ptr<FSWritableFile> writable_file;
1460 #ifndef NDEBUG
1461 bool syncpoint_arg = file_options_.use_direct_writes;
1462 TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
1463 &syncpoint_arg);
1464 #endif
1465 Status s = NewWritableFile(fs_, fname, &writable_file, file_options_);
1466 if (!s.ok()) {
1467 ROCKS_LOG_ERROR(
1468 db_options_.info_log,
1469 "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
1470 " fails at NewWritableFile with status %s",
1471 sub_compact->compaction->column_family_data()->GetName().c_str(),
1472 job_id_, file_number, s.ToString().c_str());
1473 LogFlush(db_options_.info_log);
1474 EventHelpers::LogAndNotifyTableFileCreationFinished(
1475 event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(),
1476 fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
1477 TableProperties(), TableFileCreationReason::kCompaction, s);
1478 return s;
1479 }
1480
1481 // Try to figure out the output file's oldest ancester time.
1482 int64_t temp_current_time = 0;
1483 auto get_time_status = env_->GetCurrentTime(&temp_current_time);
1484 // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
1485 if (!get_time_status.ok()) {
1486 ROCKS_LOG_WARN(db_options_.info_log,
1487 "Failed to get current time. Status: %s",
1488 get_time_status.ToString().c_str());
1489 }
1490 uint64_t current_time = static_cast<uint64_t>(temp_current_time);
1491 uint64_t oldest_ancester_time =
1492 sub_compact->compaction->MinInputFileOldestAncesterTime();
1493 if (oldest_ancester_time == port::kMaxUint64) {
1494 oldest_ancester_time = current_time;
1495 }
1496
1497 // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
1498 {
1499 SubcompactionState::Output out;
1500 out.meta.fd = FileDescriptor(file_number,
1501 sub_compact->compaction->output_path_id(), 0);
1502 out.meta.oldest_ancester_time = oldest_ancester_time;
1503 out.meta.file_creation_time = current_time;
1504 out.finished = false;
1505 sub_compact->outputs.push_back(out);
1506 }
1507
1508 writable_file->SetIOPriority(Env::IOPriority::IO_LOW);
1509 writable_file->SetWriteLifeTimeHint(write_hint_);
1510 writable_file->SetPreallocationBlockSize(static_cast<size_t>(
1511 sub_compact->compaction->OutputFilePreallocationSize()));
1512 const auto& listeners =
1513 sub_compact->compaction->immutable_cf_options()->listeners;
1514 sub_compact->outfile.reset(
1515 new WritableFileWriter(std::move(writable_file), fname, file_options_,
1516 env_, db_options_.statistics.get(), listeners,
1517 db_options_.sst_file_checksum_func.get()));
1518
1519 // If the Column family flag is to only optimize filters for hits,
1520 // we can skip creating filters if this is the bottommost_level where
1521 // data is going to be found
1522 bool skip_filters =
1523 cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
1524
1525 sub_compact->builder.reset(NewTableBuilder(
1526 *cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
1527 cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
1528 cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
1529 sub_compact->compaction->output_compression(),
1530 0 /*sample_for_compression */,
1531 sub_compact->compaction->output_compression_opts(),
1532 sub_compact->compaction->output_level(), skip_filters,
1533 oldest_ancester_time, 0 /* oldest_key_time */,
1534 sub_compact->compaction->max_output_file_size(), current_time));
1535 LogFlush(db_options_.info_log);
1536 return s;
1537 }
1538
1539 void CompactionJob::CleanupCompaction() {
1540 for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
1541 const auto& sub_status = sub_compact.status;
1542
1543 if (sub_compact.builder != nullptr) {
1544 // May happen if we get a shutdown call in the middle of compaction
1545 sub_compact.builder->Abandon();
1546 sub_compact.builder.reset();
1547 } else {
1548 assert(!sub_status.ok() || sub_compact.outfile == nullptr);
1549 }
1550 for (const auto& out : sub_compact.outputs) {
1551 // If this file was inserted into the table cache then remove
1552 // them here because this compaction was not committed.
1553 if (!sub_status.ok()) {
1554 TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber());
1555 }
1556 }
1557 }
1558 delete compact_;
1559 compact_ = nullptr;
1560 }
1561
1562 #ifndef ROCKSDB_LITE
1563 namespace {
1564 void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
1565 assert(prefix_length > 0);
1566 size_t length = src.size() > prefix_length ? prefix_length : src.size();
1567 dst->assign(src.data(), length);
1568 }
1569 } // namespace
1570
1571 #endif // !ROCKSDB_LITE
1572
1573 void CompactionJob::UpdateCompactionStats() {
1574 Compaction* compaction = compact_->compaction;
1575 compaction_stats_.num_input_files_in_non_output_levels = 0;
1576 compaction_stats_.num_input_files_in_output_level = 0;
1577 for (int input_level = 0;
1578 input_level < static_cast<int>(compaction->num_input_levels());
1579 ++input_level) {
1580 if (compaction->level(input_level) != compaction->output_level()) {
1581 UpdateCompactionInputStatsHelper(
1582 &compaction_stats_.num_input_files_in_non_output_levels,
1583 &compaction_stats_.bytes_read_non_output_levels, input_level);
1584 } else {
1585 UpdateCompactionInputStatsHelper(
1586 &compaction_stats_.num_input_files_in_output_level,
1587 &compaction_stats_.bytes_read_output_level, input_level);
1588 }
1589 }
1590
1591 uint64_t num_output_records = 0;
1592
1593 for (const auto& sub_compact : compact_->sub_compact_states) {
1594 size_t num_output_files = sub_compact.outputs.size();
1595 if (sub_compact.builder != nullptr) {
1596 // An error occurred so ignore the last output.
1597 assert(num_output_files > 0);
1598 --num_output_files;
1599 }
1600 compaction_stats_.num_output_files += static_cast<int>(num_output_files);
1601
1602 num_output_records += sub_compact.num_output_records;
1603
1604 for (const auto& out : sub_compact.outputs) {
1605 compaction_stats_.bytes_written += out.meta.fd.file_size;
1606 }
1607 }
1608
1609 if (compaction_stats_.num_input_records > num_output_records) {
1610 compaction_stats_.num_dropped_records =
1611 compaction_stats_.num_input_records - num_output_records;
1612 }
1613 }
1614
1615 void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
1616 uint64_t* bytes_read,
1617 int input_level) {
1618 const Compaction* compaction = compact_->compaction;
1619 auto num_input_files = compaction->num_input_files(input_level);
1620 *num_files += static_cast<int>(num_input_files);
1621
1622 for (size_t i = 0; i < num_input_files; ++i) {
1623 const auto* file_meta = compaction->input(input_level, i);
1624 *bytes_read += file_meta->fd.GetFileSize();
1625 compaction_stats_.num_input_records +=
1626 static_cast<uint64_t>(file_meta->num_entries);
1627 }
1628 }
1629
1630 void CompactionJob::UpdateCompactionJobStats(
1631 const InternalStats::CompactionStats& stats) const {
1632 #ifndef ROCKSDB_LITE
1633 if (compaction_job_stats_) {
1634 compaction_job_stats_->elapsed_micros = stats.micros;
1635
1636 // input information
1637 compaction_job_stats_->total_input_bytes =
1638 stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
1639 compaction_job_stats_->num_input_records = stats.num_input_records;
1640 compaction_job_stats_->num_input_files =
1641 stats.num_input_files_in_non_output_levels +
1642 stats.num_input_files_in_output_level;
1643 compaction_job_stats_->num_input_files_at_output_level =
1644 stats.num_input_files_in_output_level;
1645
1646 // output information
1647 compaction_job_stats_->total_output_bytes = stats.bytes_written;
1648 compaction_job_stats_->num_output_records = compact_->num_output_records;
1649 compaction_job_stats_->num_output_files = stats.num_output_files;
1650
1651 if (compact_->NumOutputFiles() > 0U) {
1652 CopyPrefix(compact_->SmallestUserKey(),
1653 CompactionJobStats::kMaxPrefixLength,
1654 &compaction_job_stats_->smallest_output_key_prefix);
1655 CopyPrefix(compact_->LargestUserKey(),
1656 CompactionJobStats::kMaxPrefixLength,
1657 &compaction_job_stats_->largest_output_key_prefix);
1658 }
1659 }
1660 #else
1661 (void)stats;
1662 #endif // !ROCKSDB_LITE
1663 }
1664
1665 void CompactionJob::LogCompaction() {
1666 Compaction* compaction = compact_->compaction;
1667 ColumnFamilyData* cfd = compaction->column_family_data();
1668
1669 // Let's check if anything will get logged. Don't prepare all the info if
1670 // we're not logging
1671 if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
1672 Compaction::InputLevelSummaryBuffer inputs_summary;
1673 ROCKS_LOG_INFO(
1674 db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f",
1675 cfd->GetName().c_str(), job_id_,
1676 compaction->InputLevelSummary(&inputs_summary), compaction->score());
1677 char scratch[2345];
1678 compaction->Summary(scratch, sizeof(scratch));
1679 ROCKS_LOG_INFO(db_options_.info_log, "[%s] Compaction start summary: %s\n",
1680 cfd->GetName().c_str(), scratch);
1681 // build event logger report
1682 auto stream = event_logger_->Log();
1683 stream << "job" << job_id_ << "event"
1684 << "compaction_started"
1685 << "compaction_reason"
1686 << GetCompactionReasonString(compaction->compaction_reason());
1687 for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
1688 stream << ("files_L" + ToString(compaction->level(i)));
1689 stream.StartArray();
1690 for (auto f : *compaction->inputs(i)) {
1691 stream << f->fd.GetNumber();
1692 }
1693 stream.EndArray();
1694 }
1695 stream << "score" << compaction->score() << "input_data_size"
1696 << compaction->CalculateTotalInputSize();
1697 }
1698 }
1699
1700 } // namespace ROCKSDB_NAMESPACE