1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
21 #include "db/builder.h"
22 #include "db/compaction/compaction_job.h"
23 #include "db/db_impl/db_impl.h"
24 #include "db/db_iter.h"
25 #include "db/dbformat.h"
26 #include "db/error_handler.h"
27 #include "db/event_helpers.h"
28 #include "db/log_reader.h"
29 #include "db/log_writer.h"
30 #include "db/memtable.h"
31 #include "db/memtable_list.h"
32 #include "db/merge_context.h"
33 #include "db/merge_helper.h"
34 #include "db/range_del_aggregator.h"
35 #include "db/version_set.h"
36 #include "file/filename.h"
37 #include "file/read_write_util.h"
38 #include "file/sst_file_manager_impl.h"
39 #include "file/writable_file_writer.h"
40 #include "logging/log_buffer.h"
41 #include "logging/logging.h"
42 #include "monitoring/iostats_context_imp.h"
43 #include "monitoring/perf_context_imp.h"
44 #include "monitoring/thread_status_util.h"
45 #include "port/port.h"
46 #include "rocksdb/db.h"
47 #include "rocksdb/env.h"
48 #include "rocksdb/statistics.h"
49 #include "rocksdb/status.h"
50 #include "rocksdb/table.h"
51 #include "table/block_based/block.h"
52 #include "table/block_based/block_based_table_factory.h"
53 #include "table/merging_iterator.h"
54 #include "table/table_builder.h"
55 #include "test_util/sync_point.h"
56 #include "util/coding.h"
57 #include "util/mutexlock.h"
58 #include "util/random.h"
59 #include "util/stop_watch.h"
60 #include "util/string_util.h"
62 namespace ROCKSDB_NAMESPACE
{
64 const char* GetCompactionReasonString(CompactionReason compaction_reason
) {
65 switch (compaction_reason
) {
66 case CompactionReason::kUnknown
:
68 case CompactionReason::kLevelL0FilesNum
:
69 return "LevelL0FilesNum";
70 case CompactionReason::kLevelMaxLevelSize
:
71 return "LevelMaxLevelSize";
72 case CompactionReason::kUniversalSizeAmplification
:
73 return "UniversalSizeAmplification";
74 case CompactionReason::kUniversalSizeRatio
:
75 return "UniversalSizeRatio";
76 case CompactionReason::kUniversalSortedRunNum
:
77 return "UniversalSortedRunNum";
78 case CompactionReason::kFIFOMaxSize
:
80 case CompactionReason::kFIFOReduceNumFiles
:
81 return "FIFOReduceNumFiles";
82 case CompactionReason::kFIFOTtl
:
84 case CompactionReason::kManualCompaction
:
85 return "ManualCompaction";
86 case CompactionReason::kFilesMarkedForCompaction
:
87 return "FilesMarkedForCompaction";
88 case CompactionReason::kBottommostFiles
:
89 return "BottommostFiles";
90 case CompactionReason::kTtl
:
92 case CompactionReason::kFlush
:
94 case CompactionReason::kExternalSstIngestion
:
95 return "ExternalSstIngestion";
96 case CompactionReason::kPeriodicCompaction
:
97 return "PeriodicCompaction";
98 case CompactionReason::kNumOfReasons
:
106 // Maintains state for each sub-compaction
107 struct CompactionJob::SubcompactionState
{
108 const Compaction
* compaction
;
109 std::unique_ptr
<CompactionIterator
> c_iter
;
111 // The boundaries of the key-range this compaction is interested in. No two
112 // subcompactions may have overlapping key-ranges.
113 // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
116 // The return status of this subcompaction
119 // Files produced by this subcompaction
123 std::shared_ptr
<const TableProperties
> table_properties
;
126 // State kept for output being generated
127 std::vector
<Output
> outputs
;
128 std::unique_ptr
<WritableFileWriter
> outfile
;
129 std::unique_ptr
<TableBuilder
> builder
;
130 Output
* current_output() {
131 if (outputs
.empty()) {
132 // This subcompaction's outptut could be empty if compaction was aborted
133 // before this subcompaction had a chance to generate any output files.
134 // When subcompactions are executed sequentially this is more likely and
135 // will be particulalry likely for the later subcompactions to be empty.
136 // Once they are run in parallel however it should be much rarer.
139 return &outputs
.back();
143 uint64_t current_output_file_size
;
145 // State during the subcompaction
146 uint64_t total_bytes
;
147 uint64_t num_output_records
;
148 CompactionJobStats compaction_job_stats
;
149 uint64_t approx_size
;
150 // An index that used to speed up ShouldStopBefore().
151 size_t grandparent_index
= 0;
152 // The number of bytes overlapping between the current output and
153 // grandparent files used in ShouldStopBefore().
154 uint64_t overlapped_bytes
= 0;
155 // A flag determine whether the key has been seen in ShouldStopBefore()
156 bool seen_key
= false;
158 SubcompactionState(Compaction
* c
, Slice
* _start
, Slice
* _end
,
165 current_output_file_size(0),
167 num_output_records(0),
169 grandparent_index(0),
172 assert(compaction
!= nullptr);
175 SubcompactionState(SubcompactionState
&& o
) { *this = std::move(o
); }
177 SubcompactionState
& operator=(SubcompactionState
&& o
) {
178 compaction
= std::move(o
.compaction
);
179 start
= std::move(o
.start
);
180 end
= std::move(o
.end
);
181 status
= std::move(o
.status
);
182 outputs
= std::move(o
.outputs
);
183 outfile
= std::move(o
.outfile
);
184 builder
= std::move(o
.builder
);
185 current_output_file_size
= std::move(o
.current_output_file_size
);
186 total_bytes
= std::move(o
.total_bytes
);
187 num_output_records
= std::move(o
.num_output_records
);
188 compaction_job_stats
= std::move(o
.compaction_job_stats
);
189 approx_size
= std::move(o
.approx_size
);
190 grandparent_index
= std::move(o
.grandparent_index
);
191 overlapped_bytes
= std::move(o
.overlapped_bytes
);
192 seen_key
= std::move(o
.seen_key
);
196 // Because member std::unique_ptrs do not have these.
197 SubcompactionState(const SubcompactionState
&) = delete;
199 SubcompactionState
& operator=(const SubcompactionState
&) = delete;
201 // Returns true iff we should stop building the current output
202 // before processing "internal_key".
203 bool ShouldStopBefore(const Slice
& internal_key
, uint64_t curr_file_size
) {
204 const InternalKeyComparator
* icmp
=
205 &compaction
->column_family_data()->internal_comparator();
206 const std::vector
<FileMetaData
*>& grandparents
= compaction
->grandparents();
208 // Scan to find earliest grandparent file that contains key.
209 while (grandparent_index
< grandparents
.size() &&
210 icmp
->Compare(internal_key
,
211 grandparents
[grandparent_index
]->largest
.Encode()) >
214 overlapped_bytes
+= grandparents
[grandparent_index
]->fd
.GetFileSize();
216 assert(grandparent_index
+ 1 >= grandparents
.size() ||
218 grandparents
[grandparent_index
]->largest
.Encode(),
219 grandparents
[grandparent_index
+ 1]->smallest
.Encode()) <= 0);
224 if (overlapped_bytes
+ curr_file_size
>
225 compaction
->max_compaction_bytes()) {
226 // Too much overlap for current output; start new output
227 overlapped_bytes
= 0;
235 // Maintains state for the entire compaction
236 struct CompactionJob::CompactionState
{
237 Compaction
* const compaction
;
239 // REQUIRED: subcompaction states are stored in order of increasing
241 std::vector
<CompactionJob::SubcompactionState
> sub_compact_states
;
244 uint64_t total_bytes
;
245 uint64_t num_output_records
;
247 explicit CompactionState(Compaction
* c
)
250 num_output_records(0) {}
252 size_t NumOutputFiles() {
254 for (auto& s
: sub_compact_states
) {
255 total
+= s
.outputs
.size();
260 Slice
SmallestUserKey() {
261 for (const auto& sub_compact_state
: sub_compact_states
) {
262 if (!sub_compact_state
.outputs
.empty() &&
263 sub_compact_state
.outputs
[0].finished
) {
264 return sub_compact_state
.outputs
[0].meta
.smallest
.user_key();
267 // If there is no finished output, return an empty slice.
268 return Slice(nullptr, 0);
271 Slice
LargestUserKey() {
272 for (auto it
= sub_compact_states
.rbegin(); it
< sub_compact_states
.rend();
274 if (!it
->outputs
.empty() && it
->current_output()->finished
) {
275 assert(it
->current_output() != nullptr);
276 return it
->current_output()->meta
.largest
.user_key();
279 // If there is no finished output, return an empty slice.
280 return Slice(nullptr, 0);
284 void CompactionJob::AggregateStatistics() {
285 for (SubcompactionState
& sc
: compact_
->sub_compact_states
) {
286 compact_
->total_bytes
+= sc
.total_bytes
;
287 compact_
->num_output_records
+= sc
.num_output_records
;
289 if (compaction_job_stats_
) {
290 for (SubcompactionState
& sc
: compact_
->sub_compact_states
) {
291 compaction_job_stats_
->Add(sc
.compaction_job_stats
);
296 CompactionJob::CompactionJob(
297 int job_id
, Compaction
* compaction
, const ImmutableDBOptions
& db_options
,
298 const FileOptions
& file_options
, VersionSet
* versions
,
299 const std::atomic
<bool>* shutting_down
,
300 const SequenceNumber preserve_deletes_seqnum
, LogBuffer
* log_buffer
,
301 Directory
* db_directory
, Directory
* output_directory
, Statistics
* stats
,
302 InstrumentedMutex
* db_mutex
, ErrorHandler
* db_error_handler
,
303 std::vector
<SequenceNumber
> existing_snapshots
,
304 SequenceNumber earliest_write_conflict_snapshot
,
305 const SnapshotChecker
* snapshot_checker
, std::shared_ptr
<Cache
> table_cache
,
306 EventLogger
* event_logger
, bool paranoid_file_checks
, bool measure_io_stats
,
307 const std::string
& dbname
, CompactionJobStats
* compaction_job_stats
,
308 Env::Priority thread_pri
, const std::atomic
<bool>* manual_compaction_paused
)
310 compact_(new CompactionState(compaction
)),
311 compaction_job_stats_(compaction_job_stats
),
312 compaction_stats_(compaction
->compaction_reason(), 1),
314 db_options_(db_options
),
315 file_options_(file_options
),
316 env_(db_options
.env
),
317 fs_(db_options
.fs
.get()),
318 file_options_for_read_(
319 fs_
->OptimizeForCompactionTableRead(file_options
, db_options_
)),
321 shutting_down_(shutting_down
),
322 manual_compaction_paused_(manual_compaction_paused
),
323 preserve_deletes_seqnum_(preserve_deletes_seqnum
),
324 log_buffer_(log_buffer
),
325 db_directory_(db_directory
),
326 output_directory_(output_directory
),
329 db_error_handler_(db_error_handler
),
330 existing_snapshots_(std::move(existing_snapshots
)),
331 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot
),
332 snapshot_checker_(snapshot_checker
),
333 table_cache_(std::move(table_cache
)),
334 event_logger_(event_logger
),
335 bottommost_level_(false),
336 paranoid_file_checks_(paranoid_file_checks
),
337 measure_io_stats_(measure_io_stats
),
338 write_hint_(Env::WLTH_NOT_SET
),
339 thread_pri_(thread_pri
) {
340 assert(log_buffer_
!= nullptr);
341 const auto* cfd
= compact_
->compaction
->column_family_data();
342 ThreadStatusUtil::SetColumnFamily(cfd
, cfd
->ioptions()->env
,
343 db_options_
.enable_thread_tracking
);
344 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION
);
345 ReportStartedCompaction(compaction
);
348 CompactionJob::~CompactionJob() {
349 assert(compact_
== nullptr);
350 ThreadStatusUtil::ResetThreadStatus();
353 void CompactionJob::ReportStartedCompaction(Compaction
* compaction
) {
354 const auto* cfd
= compact_
->compaction
->column_family_data();
355 ThreadStatusUtil::SetColumnFamily(cfd
, cfd
->ioptions()->env
,
356 db_options_
.enable_thread_tracking
);
358 ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID
,
361 ThreadStatusUtil::SetThreadOperationProperty(
362 ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL
,
363 (static_cast<uint64_t>(compact_
->compaction
->start_level()) << 32) +
364 compact_
->compaction
->output_level());
366 // In the current design, a CompactionJob is always created
367 // for non-trivial compaction.
368 assert(compaction
->IsTrivialMove() == false ||
369 compaction
->is_manual_compaction() == true);
371 ThreadStatusUtil::SetThreadOperationProperty(
372 ThreadStatus::COMPACTION_PROP_FLAGS
,
373 compaction
->is_manual_compaction() +
374 (compaction
->deletion_compaction() << 1));
376 ThreadStatusUtil::SetThreadOperationProperty(
377 ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES
,
378 compaction
->CalculateTotalInputSize());
380 IOSTATS_RESET(bytes_written
);
381 IOSTATS_RESET(bytes_read
);
382 ThreadStatusUtil::SetThreadOperationProperty(
383 ThreadStatus::COMPACTION_BYTES_WRITTEN
, 0);
384 ThreadStatusUtil::SetThreadOperationProperty(
385 ThreadStatus::COMPACTION_BYTES_READ
, 0);
387 // Set the thread operation after operation properties
388 // to ensure GetThreadList() can always show them all together.
389 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION
);
391 if (compaction_job_stats_
) {
392 compaction_job_stats_
->is_manual_compaction
=
393 compaction
->is_manual_compaction();
397 void CompactionJob::Prepare() {
398 AutoThreadOperationStageUpdater
stage_updater(
399 ThreadStatus::STAGE_COMPACTION_PREPARE
);
401 // Generate file_levels_ for compaction berfore making Iterator
402 auto* c
= compact_
->compaction
;
403 assert(c
->column_family_data() != nullptr);
404 assert(c
->column_family_data()->current()->storage_info()->NumLevelFiles(
405 compact_
->compaction
->level()) > 0);
408 c
->column_family_data()->CalculateSSTWriteHint(c
->output_level());
409 bottommost_level_
= c
->bottommost_level();
411 if (c
->ShouldFormSubcompactions()) {
413 StopWatch
sw(env_
, stats_
, SUBCOMPACTION_SETUP_TIME
);
414 GenSubcompactionBoundaries();
416 assert(sizes_
.size() == boundaries_
.size() + 1);
418 for (size_t i
= 0; i
<= boundaries_
.size(); i
++) {
419 Slice
* start
= i
== 0 ? nullptr : &boundaries_
[i
- 1];
420 Slice
* end
= i
== boundaries_
.size() ? nullptr : &boundaries_
[i
];
421 compact_
->sub_compact_states
.emplace_back(c
, start
, end
, sizes_
[i
]);
423 RecordInHistogram(stats_
, NUM_SUBCOMPACTIONS_SCHEDULED
,
424 compact_
->sub_compact_states
.size());
426 compact_
->sub_compact_states
.emplace_back(c
, nullptr, nullptr);
430 struct RangeWithSize
{
434 RangeWithSize(const Slice
& a
, const Slice
& b
, uint64_t s
= 0)
435 : range(a
, b
), size(s
) {}
438 void CompactionJob::GenSubcompactionBoundaries() {
439 auto* c
= compact_
->compaction
;
440 auto* cfd
= c
->column_family_data();
441 const Comparator
* cfd_comparator
= cfd
->user_comparator();
442 std::vector
<Slice
> bounds
;
443 int start_lvl
= c
->start_level();
444 int out_lvl
= c
->output_level();
446 // Add the starting and/or ending key of certain input files as a potential
448 for (size_t lvl_idx
= 0; lvl_idx
< c
->num_input_levels(); lvl_idx
++) {
449 int lvl
= c
->level(lvl_idx
);
450 if (lvl
>= start_lvl
&& lvl
<= out_lvl
) {
451 const LevelFilesBrief
* flevel
= c
->input_levels(lvl_idx
);
452 size_t num_files
= flevel
->num_files
;
454 if (num_files
== 0) {
459 // For level 0 add the starting and ending key of each file since the
460 // files may have greatly differing key ranges (not range-partitioned)
461 for (size_t i
= 0; i
< num_files
; i
++) {
462 bounds
.emplace_back(flevel
->files
[i
].smallest_key
);
463 bounds
.emplace_back(flevel
->files
[i
].largest_key
);
466 // For all other levels add the smallest/largest key in the level to
467 // encompass the range covered by that level
468 bounds
.emplace_back(flevel
->files
[0].smallest_key
);
469 bounds
.emplace_back(flevel
->files
[num_files
- 1].largest_key
);
470 if (lvl
== out_lvl
) {
471 // For the last level include the starting keys of all files since
472 // the last level is the largest and probably has the widest key
473 // range. Since it's range partitioned, the ending key of one file
474 // and the starting key of the next are very close (or identical).
475 for (size_t i
= 1; i
< num_files
; i
++) {
476 bounds
.emplace_back(flevel
->files
[i
].smallest_key
);
483 std::sort(bounds
.begin(), bounds
.end(),
484 [cfd_comparator
](const Slice
& a
, const Slice
& b
) -> bool {
485 return cfd_comparator
->Compare(ExtractUserKey(a
),
486 ExtractUserKey(b
)) < 0;
488 // Remove duplicated entries from bounds
490 std::unique(bounds
.begin(), bounds
.end(),
491 [cfd_comparator
](const Slice
& a
, const Slice
& b
) -> bool {
492 return cfd_comparator
->Compare(ExtractUserKey(a
),
493 ExtractUserKey(b
)) == 0;
497 // Combine consecutive pairs of boundaries into ranges with an approximate
498 // size of data covered by keys in that range
500 std::vector
<RangeWithSize
> ranges
;
501 // Get input version from CompactionState since it's already referenced
502 // earlier in SetInputVersioCompaction::SetInputVersion and will not change
503 // when db_mutex_ is released below
504 auto* v
= compact_
->compaction
->input_version();
505 for (auto it
= bounds
.begin();;) {
509 if (it
== bounds
.end()) {
515 // ApproximateSize could potentially create table reader iterator to seek
516 // to the index block and may incur I/O cost in the process. Unlock db
517 // mutex to reduce contention
519 uint64_t size
= versions_
->ApproximateSize(SizeApproximationOptions(), v
, a
,
520 b
, start_lvl
, out_lvl
+ 1,
521 TableReaderCaller::kCompaction
);
523 ranges
.emplace_back(a
, b
, size
);
527 // Group the ranges into subcompactions
528 const double min_file_fill_percent
= 4.0 / 5;
529 int base_level
= v
->storage_info()->base_level();
530 uint64_t max_output_files
= static_cast<uint64_t>(std::ceil(
531 sum
/ min_file_fill_percent
/
532 MaxFileSizeForLevel(*(c
->mutable_cf_options()), out_lvl
,
533 c
->immutable_cf_options()->compaction_style
, base_level
,
534 c
->immutable_cf_options()->level_compaction_dynamic_level_bytes
)));
535 uint64_t subcompactions
=
536 std::min({static_cast<uint64_t>(ranges
.size()),
537 static_cast<uint64_t>(c
->max_subcompactions()),
540 if (subcompactions
> 1) {
541 double mean
= sum
* 1.0 / subcompactions
;
542 // Greedily add ranges to the subcompaction until the sum of the ranges'
543 // sizes becomes >= the expected mean size of a subcompaction
545 for (size_t i
= 0; i
< ranges
.size() - 1; i
++) {
546 sum
+= ranges
[i
].size
;
547 if (subcompactions
== 1) {
548 // If there's only one left to schedule then it goes to the end so no
549 // need to put an end boundary
553 boundaries_
.emplace_back(ExtractUserKey(ranges
[i
].range
.limit
));
554 sizes_
.emplace_back(sum
);
559 sizes_
.emplace_back(sum
+ ranges
.back().size
);
561 // Only one range so its size is the total sum of sizes computed above
562 sizes_
.emplace_back(sum
);
566 Status
CompactionJob::Run() {
567 AutoThreadOperationStageUpdater
stage_updater(
568 ThreadStatus::STAGE_COMPACTION_RUN
);
569 TEST_SYNC_POINT("CompactionJob::Run():Start");
570 log_buffer_
->FlushBufferToLog();
573 const size_t num_threads
= compact_
->sub_compact_states
.size();
574 assert(num_threads
> 0);
575 const uint64_t start_micros
= env_
->NowMicros();
577 // Launch a thread for each of subcompactions 1...num_threads-1
578 std::vector
<port::Thread
> thread_pool
;
579 thread_pool
.reserve(num_threads
- 1);
580 for (size_t i
= 1; i
< compact_
->sub_compact_states
.size(); i
++) {
581 thread_pool
.emplace_back(&CompactionJob::ProcessKeyValueCompaction
, this,
582 &compact_
->sub_compact_states
[i
]);
585 // Always schedule the first subcompaction (whether or not there are also
586 // others) in the current thread to be efficient with resources
587 ProcessKeyValueCompaction(&compact_
->sub_compact_states
[0]);
589 // Wait for all other threads (if there are any) to finish execution
590 for (auto& thread
: thread_pool
) {
594 compaction_stats_
.micros
= env_
->NowMicros() - start_micros
;
595 compaction_stats_
.cpu_micros
= 0;
596 for (size_t i
= 0; i
< compact_
->sub_compact_states
.size(); i
++) {
597 compaction_stats_
.cpu_micros
+=
598 compact_
->sub_compact_states
[i
].compaction_job_stats
.cpu_micros
;
601 RecordTimeToHistogram(stats_
, COMPACTION_TIME
, compaction_stats_
.micros
);
602 RecordTimeToHistogram(stats_
, COMPACTION_CPU_TIME
,
603 compaction_stats_
.cpu_micros
);
605 TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
607 // Check if any thread encountered an error during execution
609 for (const auto& state
: compact_
->sub_compact_states
) {
610 if (!state
.status
.ok()) {
611 status
= state
.status
;
616 if (status
.ok() && output_directory_
) {
617 status
= output_directory_
->Fsync();
622 std::vector
<const FileMetaData
*> files_meta
;
623 for (const auto& state
: compact_
->sub_compact_states
) {
624 for (const auto& output
: state
.outputs
) {
625 files_meta
.emplace_back(&output
.meta
);
628 ColumnFamilyData
* cfd
= compact_
->compaction
->column_family_data();
629 auto prefix_extractor
=
630 compact_
->compaction
->mutable_cf_options()->prefix_extractor
.get();
631 std::atomic
<size_t> next_file_meta_idx(0);
632 auto verify_table
= [&](Status
& output_status
) {
634 size_t file_idx
= next_file_meta_idx
.fetch_add(1);
635 if (file_idx
>= files_meta
.size()) {
638 // Verify that the table is usable
639 // We set for_compaction to false and don't OptimizeForCompactionTableRead
640 // here because this is a special case after we finish the table building
641 // No matter whether use_direct_io_for_flush_and_compaction is true,
642 // we will regard this verification as user reads since the goal is
643 // to cache it here for further user reads
644 InternalIterator
* iter
= cfd
->table_cache()->NewIterator(
645 ReadOptions(), file_options_
, cfd
->internal_comparator(),
646 *files_meta
[file_idx
], /*range_del_agg=*/nullptr, prefix_extractor
,
647 /*table_reader_ptr=*/nullptr,
648 cfd
->internal_stats()->GetFileReadHist(
649 compact_
->compaction
->output_level()),
650 TableReaderCaller::kCompactionRefill
, /*arena=*/nullptr,
651 /*skip_filters=*/false, compact_
->compaction
->output_level(),
652 /*smallest_compaction_key=*/nullptr,
653 /*largest_compaction_key=*/nullptr);
654 auto s
= iter
->status();
656 if (s
.ok() && paranoid_file_checks_
) {
657 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {}
669 for (size_t i
= 1; i
< compact_
->sub_compact_states
.size(); i
++) {
670 thread_pool
.emplace_back(verify_table
,
671 std::ref(compact_
->sub_compact_states
[i
].status
));
673 verify_table(compact_
->sub_compact_states
[0].status
);
674 for (auto& thread
: thread_pool
) {
677 for (const auto& state
: compact_
->sub_compact_states
) {
678 if (!state
.status
.ok()) {
679 status
= state
.status
;
685 TablePropertiesCollection tp
;
686 for (const auto& state
: compact_
->sub_compact_states
) {
687 for (const auto& output
: state
.outputs
) {
689 TableFileName(state
.compaction
->immutable_cf_options()->cf_paths
,
690 output
.meta
.fd
.GetNumber(), output
.meta
.fd
.GetPathId());
691 tp
[fn
] = output
.table_properties
;
694 compact_
->compaction
->SetOutputTableProperties(std::move(tp
));
696 // Finish up all book-keeping to unify the subcompaction results
697 AggregateStatistics();
698 UpdateCompactionStats();
699 RecordCompactionIOStats();
700 LogFlush(db_options_
.info_log
);
701 TEST_SYNC_POINT("CompactionJob::Run():End");
703 compact_
->status
= status
;
707 Status
CompactionJob::Install(const MutableCFOptions
& mutable_cf_options
) {
708 AutoThreadOperationStageUpdater
stage_updater(
709 ThreadStatus::STAGE_COMPACTION_INSTALL
);
710 db_mutex_
->AssertHeld();
711 Status status
= compact_
->status
;
712 ColumnFamilyData
* cfd
= compact_
->compaction
->column_family_data();
713 cfd
->internal_stats()->AddCompactionStats(
714 compact_
->compaction
->output_level(), thread_pri_
, compaction_stats_
);
717 status
= InstallCompactionResults(mutable_cf_options
);
719 VersionStorageInfo::LevelSummaryStorage tmp
;
720 auto vstorage
= cfd
->current()->storage_info();
721 const auto& stats
= compaction_stats_
;
723 double read_write_amp
= 0.0;
724 double write_amp
= 0.0;
725 double bytes_read_per_sec
= 0;
726 double bytes_written_per_sec
= 0;
728 if (stats
.bytes_read_non_output_levels
> 0) {
729 read_write_amp
= (stats
.bytes_written
+ stats
.bytes_read_output_level
+
730 stats
.bytes_read_non_output_levels
) /
731 static_cast<double>(stats
.bytes_read_non_output_levels
);
732 write_amp
= stats
.bytes_written
/
733 static_cast<double>(stats
.bytes_read_non_output_levels
);
735 if (stats
.micros
> 0) {
737 (stats
.bytes_read_non_output_levels
+ stats
.bytes_read_output_level
) /
738 static_cast<double>(stats
.micros
);
739 bytes_written_per_sec
=
740 stats
.bytes_written
/ static_cast<double>(stats
.micros
);
745 "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
746 "files in(%d, %d) out(%d) "
747 "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
748 "write-amplify(%.1f) %s, records in: %" PRIu64
749 ", records dropped: %" PRIu64
" output_compression: %s\n",
750 cfd
->GetName().c_str(), vstorage
->LevelSummary(&tmp
), bytes_read_per_sec
,
751 bytes_written_per_sec
, compact_
->compaction
->output_level(),
752 stats
.num_input_files_in_non_output_levels
,
753 stats
.num_input_files_in_output_level
, stats
.num_output_files
,
754 stats
.bytes_read_non_output_levels
/ 1048576.0,
755 stats
.bytes_read_output_level
/ 1048576.0,
756 stats
.bytes_written
/ 1048576.0, read_write_amp
, write_amp
,
757 status
.ToString().c_str(), stats
.num_input_records
,
758 stats
.num_dropped_records
,
759 CompressionTypeToString(compact_
->compaction
->output_compression())
762 UpdateCompactionJobStats(stats
);
764 auto stream
= event_logger_
->LogToBuffer(log_buffer_
);
765 stream
<< "job" << job_id_
<< "event"
766 << "compaction_finished"
767 << "compaction_time_micros" << stats
.micros
768 << "compaction_time_cpu_micros" << stats
.cpu_micros
<< "output_level"
769 << compact_
->compaction
->output_level() << "num_output_files"
770 << compact_
->NumOutputFiles() << "total_output_size"
771 << compact_
->total_bytes
<< "num_input_records"
772 << stats
.num_input_records
<< "num_output_records"
773 << compact_
->num_output_records
<< "num_subcompactions"
774 << compact_
->sub_compact_states
.size() << "output_compression"
775 << CompressionTypeToString(compact_
->compaction
->output_compression());
777 if (compaction_job_stats_
!= nullptr) {
778 stream
<< "num_single_delete_mismatches"
779 << compaction_job_stats_
->num_single_del_mismatch
;
780 stream
<< "num_single_delete_fallthrough"
781 << compaction_job_stats_
->num_single_del_fallthru
;
784 if (measure_io_stats_
&& compaction_job_stats_
!= nullptr) {
785 stream
<< "file_write_nanos" << compaction_job_stats_
->file_write_nanos
;
786 stream
<< "file_range_sync_nanos"
787 << compaction_job_stats_
->file_range_sync_nanos
;
788 stream
<< "file_fsync_nanos" << compaction_job_stats_
->file_fsync_nanos
;
789 stream
<< "file_prepare_write_nanos"
790 << compaction_job_stats_
->file_prepare_write_nanos
;
793 stream
<< "lsm_state";
795 for (int level
= 0; level
< vstorage
->num_levels(); ++level
) {
796 stream
<< vstorage
->NumLevelFiles(level
);
804 void CompactionJob::ProcessKeyValueCompaction(SubcompactionState
* sub_compact
) {
805 assert(sub_compact
!= nullptr);
807 uint64_t prev_cpu_micros
= env_
->NowCPUNanos() / 1000;
809 ColumnFamilyData
* cfd
= sub_compact
->compaction
->column_family_data();
811 // Create compaction filter and fail the compaction if
812 // IgnoreSnapshots() = false because it is not supported anymore
813 const CompactionFilter
* compaction_filter
=
814 cfd
->ioptions()->compaction_filter
;
815 std::unique_ptr
<CompactionFilter
> compaction_filter_from_factory
= nullptr;
816 if (compaction_filter
== nullptr) {
817 compaction_filter_from_factory
=
818 sub_compact
->compaction
->CreateCompactionFilter();
819 compaction_filter
= compaction_filter_from_factory
.get();
821 if (compaction_filter
!= nullptr && !compaction_filter
->IgnoreSnapshots()) {
822 sub_compact
->status
= Status::NotSupported(
823 "CompactionFilter::IgnoreSnapshots() = false is not supported "
828 CompactionRangeDelAggregator
range_del_agg(&cfd
->internal_comparator(),
829 existing_snapshots_
);
831 // Although the v2 aggregator is what the level iterator(s) know about,
832 // the AddTombstones calls will be propagated down to the v1 aggregator.
833 std::unique_ptr
<InternalIterator
> input(versions_
->MakeInputIterator(
834 sub_compact
->compaction
, &range_del_agg
, file_options_for_read_
));
836 AutoThreadOperationStageUpdater
stage_updater(
837 ThreadStatus::STAGE_COMPACTION_PROCESS_KV
);
839 // I/O measurement variables
840 PerfLevel prev_perf_level
= PerfLevel::kEnableTime
;
841 const uint64_t kRecordStatsEvery
= 1000;
842 uint64_t prev_write_nanos
= 0;
843 uint64_t prev_fsync_nanos
= 0;
844 uint64_t prev_range_sync_nanos
= 0;
845 uint64_t prev_prepare_write_nanos
= 0;
846 uint64_t prev_cpu_write_nanos
= 0;
847 uint64_t prev_cpu_read_nanos
= 0;
848 if (measure_io_stats_
) {
849 prev_perf_level
= GetPerfLevel();
850 SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex
);
851 prev_write_nanos
= IOSTATS(write_nanos
);
852 prev_fsync_nanos
= IOSTATS(fsync_nanos
);
853 prev_range_sync_nanos
= IOSTATS(range_sync_nanos
);
854 prev_prepare_write_nanos
= IOSTATS(prepare_write_nanos
);
855 prev_cpu_write_nanos
= IOSTATS(cpu_write_nanos
);
856 prev_cpu_read_nanos
= IOSTATS(cpu_read_nanos
);
860 env_
, cfd
->user_comparator(), cfd
->ioptions()->merge_operator
,
861 compaction_filter
, db_options_
.info_log
.get(),
862 false /* internal key corruption is expected */,
863 existing_snapshots_
.empty() ? 0 : existing_snapshots_
.back(),
864 snapshot_checker_
, compact_
->compaction
->level(),
865 db_options_
.statistics
.get());
867 TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
868 TEST_SYNC_POINT_CALLBACK(
869 "CompactionJob::Run():PausingManualCompaction:1",
870 reinterpret_cast<void*>(
871 const_cast<std::atomic
<bool>*>(manual_compaction_paused_
)));
873 Slice
* start
= sub_compact
->start
;
874 Slice
* end
= sub_compact
->end
;
875 if (start
!= nullptr) {
877 start_iter
.SetInternalKey(*start
, kMaxSequenceNumber
, kValueTypeForSeek
);
878 input
->Seek(start_iter
.GetInternalKey());
880 input
->SeekToFirst();
884 sub_compact
->c_iter
.reset(new CompactionIterator(
885 input
.get(), cfd
->user_comparator(), &merge
, versions_
->LastSequence(),
886 &existing_snapshots_
, earliest_write_conflict_snapshot_
,
887 snapshot_checker_
, env_
, ShouldReportDetailedTime(env_
, stats_
), false,
888 &range_del_agg
, sub_compact
->compaction
, compaction_filter
,
889 shutting_down_
, preserve_deletes_seqnum_
, manual_compaction_paused_
,
890 db_options_
.info_log
));
891 auto c_iter
= sub_compact
->c_iter
.get();
892 c_iter
->SeekToFirst();
893 if (c_iter
->Valid() && sub_compact
->compaction
->output_level() != 0) {
894 // ShouldStopBefore() maintains state based on keys processed so far. The
895 // compaction loop always calls it on the "next" key, thus won't tell it the
896 // first key. So we do that here.
897 sub_compact
->ShouldStopBefore(c_iter
->key(),
898 sub_compact
->current_output_file_size
);
900 const auto& c_iter_stats
= c_iter
->iter_stats();
902 while (status
.ok() && !cfd
->IsDropped() && c_iter
->Valid()) {
903 // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
905 const Slice
& key
= c_iter
->key();
906 const Slice
& value
= c_iter
->value();
908 // If an end key (exclusive) is specified, check if the current key is
909 // >= than it and exit if it is because the iterator is out of its range
910 if (end
!= nullptr &&
911 cfd
->user_comparator()->Compare(c_iter
->user_key(), *end
) >= 0) {
914 if (c_iter_stats
.num_input_records
% kRecordStatsEvery
==
915 kRecordStatsEvery
- 1) {
916 RecordDroppedKeys(c_iter_stats
, &sub_compact
->compaction_job_stats
);
917 c_iter
->ResetRecordCounts();
918 RecordCompactionIOStats();
921 // Open output file if necessary
922 if (sub_compact
->builder
== nullptr) {
923 status
= OpenCompactionOutputFile(sub_compact
);
928 assert(sub_compact
->builder
!= nullptr);
929 assert(sub_compact
->current_output() != nullptr);
930 sub_compact
->builder
->Add(key
, value
);
931 sub_compact
->current_output_file_size
= sub_compact
->builder
->FileSize();
932 const ParsedInternalKey
& ikey
= c_iter
->ikey();
933 sub_compact
->current_output()->meta
.UpdateBoundaries(
934 key
, value
, ikey
.sequence
, ikey
.type
);
935 sub_compact
->num_output_records
++;
937 // Close output file if it is big enough. Two possibilities determine it's
938 // time to close it: (1) the current key should be this file's last key, (2)
939 // the next key should not be in this file.
941 // TODO(aekmekji): determine if file should be closed earlier than this
942 // during subcompactions (i.e. if output size, estimated by input size, is
943 // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
944 // and 0.6MB instead of 1MB and 0.2MB)
945 bool output_file_ended
= false;
947 if (sub_compact
->compaction
->output_level() != 0 &&
948 sub_compact
->current_output_file_size
>=
949 sub_compact
->compaction
->max_output_file_size()) {
950 // (1) this key terminates the file. For historical reasons, the iterator
951 // status before advancing will be given to FinishCompactionOutputFile().
952 input_status
= input
->status();
953 output_file_ended
= true;
955 TEST_SYNC_POINT_CALLBACK(
956 "CompactionJob::Run():PausingManualCompaction:2",
957 reinterpret_cast<void*>(
958 const_cast<std::atomic
<bool>*>(manual_compaction_paused_
)));
960 if (c_iter
->status().IsManualCompactionPaused()) {
963 if (!output_file_ended
&& c_iter
->Valid() &&
964 sub_compact
->compaction
->output_level() != 0 &&
965 sub_compact
->ShouldStopBefore(c_iter
->key(),
966 sub_compact
->current_output_file_size
) &&
967 sub_compact
->builder
!= nullptr) {
968 // (2) this key belongs to the next file. For historical reasons, the
969 // iterator status after advancing will be given to
970 // FinishCompactionOutputFile().
971 input_status
= input
->status();
972 output_file_ended
= true;
974 if (output_file_ended
) {
975 const Slice
* next_key
= nullptr;
976 if (c_iter
->Valid()) {
977 next_key
= &c_iter
->key();
979 CompactionIterationStats range_del_out_stats
;
981 FinishCompactionOutputFile(input_status
, sub_compact
, &range_del_agg
,
982 &range_del_out_stats
, next_key
);
983 RecordDroppedKeys(range_del_out_stats
,
984 &sub_compact
->compaction_job_stats
);
988 sub_compact
->compaction_job_stats
.num_input_deletion_records
=
989 c_iter_stats
.num_input_deletion_records
;
990 sub_compact
->compaction_job_stats
.num_corrupt_keys
=
991 c_iter_stats
.num_input_corrupt_records
;
992 sub_compact
->compaction_job_stats
.num_single_del_fallthru
=
993 c_iter_stats
.num_single_del_fallthru
;
994 sub_compact
->compaction_job_stats
.num_single_del_mismatch
=
995 c_iter_stats
.num_single_del_mismatch
;
996 sub_compact
->compaction_job_stats
.total_input_raw_key_bytes
+=
997 c_iter_stats
.total_input_raw_key_bytes
;
998 sub_compact
->compaction_job_stats
.total_input_raw_value_bytes
+=
999 c_iter_stats
.total_input_raw_value_bytes
;
1001 RecordTick(stats_
, FILTER_OPERATION_TOTAL_TIME
,
1002 c_iter_stats
.total_filter_time
);
1003 RecordDroppedKeys(c_iter_stats
, &sub_compact
->compaction_job_stats
);
1004 RecordCompactionIOStats();
1006 if (status
.ok() && cfd
->IsDropped()) {
1008 Status::ColumnFamilyDropped("Column family dropped during compaction");
1010 if ((status
.ok() || status
.IsColumnFamilyDropped()) &&
1011 shutting_down_
->load(std::memory_order_relaxed
)) {
1012 status
= Status::ShutdownInProgress("Database shutdown");
1014 if ((status
.ok() || status
.IsColumnFamilyDropped()) &&
1015 (manual_compaction_paused_
&&
1016 manual_compaction_paused_
->load(std::memory_order_relaxed
))) {
1017 status
= Status::Incomplete(Status::SubCode::kManualCompactionPaused
);
1020 status
= input
->status();
1023 status
= c_iter
->status();
1026 if (status
.ok() && sub_compact
->builder
== nullptr &&
1027 sub_compact
->outputs
.size() == 0 && !range_del_agg
.IsEmpty()) {
1028 // handle subcompaction containing only range deletions
1029 status
= OpenCompactionOutputFile(sub_compact
);
1032 // Call FinishCompactionOutputFile() even if status is not ok: it needs to
1033 // close the output file.
1034 if (sub_compact
->builder
!= nullptr) {
1035 CompactionIterationStats range_del_out_stats
;
1036 Status s
= FinishCompactionOutputFile(status
, sub_compact
, &range_del_agg
,
1037 &range_del_out_stats
);
1041 RecordDroppedKeys(range_del_out_stats
, &sub_compact
->compaction_job_stats
);
1044 sub_compact
->compaction_job_stats
.cpu_micros
=
1045 env_
->NowCPUNanos() / 1000 - prev_cpu_micros
;
1047 if (measure_io_stats_
) {
1048 sub_compact
->compaction_job_stats
.file_write_nanos
+=
1049 IOSTATS(write_nanos
) - prev_write_nanos
;
1050 sub_compact
->compaction_job_stats
.file_fsync_nanos
+=
1051 IOSTATS(fsync_nanos
) - prev_fsync_nanos
;
1052 sub_compact
->compaction_job_stats
.file_range_sync_nanos
+=
1053 IOSTATS(range_sync_nanos
) - prev_range_sync_nanos
;
1054 sub_compact
->compaction_job_stats
.file_prepare_write_nanos
+=
1055 IOSTATS(prepare_write_nanos
) - prev_prepare_write_nanos
;
1056 sub_compact
->compaction_job_stats
.cpu_micros
-=
1057 (IOSTATS(cpu_write_nanos
) - prev_cpu_write_nanos
+
1058 IOSTATS(cpu_read_nanos
) - prev_cpu_read_nanos
) /
1060 if (prev_perf_level
!= PerfLevel::kEnableTimeAndCPUTimeExceptForMutex
) {
1061 SetPerfLevel(prev_perf_level
);
1065 sub_compact
->c_iter
.reset();
1067 sub_compact
->status
= status
;
1070 void CompactionJob::RecordDroppedKeys(
1071 const CompactionIterationStats
& c_iter_stats
,
1072 CompactionJobStats
* compaction_job_stats
) {
1073 if (c_iter_stats
.num_record_drop_user
> 0) {
1074 RecordTick(stats_
, COMPACTION_KEY_DROP_USER
,
1075 c_iter_stats
.num_record_drop_user
);
1077 if (c_iter_stats
.num_record_drop_hidden
> 0) {
1078 RecordTick(stats_
, COMPACTION_KEY_DROP_NEWER_ENTRY
,
1079 c_iter_stats
.num_record_drop_hidden
);
1080 if (compaction_job_stats
) {
1081 compaction_job_stats
->num_records_replaced
+=
1082 c_iter_stats
.num_record_drop_hidden
;
1085 if (c_iter_stats
.num_record_drop_obsolete
> 0) {
1086 RecordTick(stats_
, COMPACTION_KEY_DROP_OBSOLETE
,
1087 c_iter_stats
.num_record_drop_obsolete
);
1088 if (compaction_job_stats
) {
1089 compaction_job_stats
->num_expired_deletion_records
+=
1090 c_iter_stats
.num_record_drop_obsolete
;
1093 if (c_iter_stats
.num_record_drop_range_del
> 0) {
1094 RecordTick(stats_
, COMPACTION_KEY_DROP_RANGE_DEL
,
1095 c_iter_stats
.num_record_drop_range_del
);
1097 if (c_iter_stats
.num_range_del_drop_obsolete
> 0) {
1098 RecordTick(stats_
, COMPACTION_RANGE_DEL_DROP_OBSOLETE
,
1099 c_iter_stats
.num_range_del_drop_obsolete
);
1101 if (c_iter_stats
.num_optimized_del_drop_obsolete
> 0) {
1102 RecordTick(stats_
, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE
,
1103 c_iter_stats
.num_optimized_del_drop_obsolete
);
1107 Status
CompactionJob::FinishCompactionOutputFile(
1108 const Status
& input_status
, SubcompactionState
* sub_compact
,
1109 CompactionRangeDelAggregator
* range_del_agg
,
1110 CompactionIterationStats
* range_del_out_stats
,
1111 const Slice
* next_table_min_key
/* = nullptr */) {
1112 AutoThreadOperationStageUpdater
stage_updater(
1113 ThreadStatus::STAGE_COMPACTION_SYNC_FILE
);
1114 assert(sub_compact
!= nullptr);
1115 assert(sub_compact
->outfile
);
1116 assert(sub_compact
->builder
!= nullptr);
1117 assert(sub_compact
->current_output() != nullptr);
1119 uint64_t output_number
= sub_compact
->current_output()->meta
.fd
.GetNumber();
1120 assert(output_number
!= 0);
1122 ColumnFamilyData
* cfd
= sub_compact
->compaction
->column_family_data();
1123 const Comparator
* ucmp
= cfd
->user_comparator();
1125 // Check for iterator errors
1126 Status s
= input_status
;
1127 auto meta
= &sub_compact
->current_output()->meta
;
1128 assert(meta
!= nullptr);
1130 Slice lower_bound_guard
, upper_bound_guard
;
1131 std::string smallest_user_key
;
1132 const Slice
*lower_bound
, *upper_bound
;
1133 bool lower_bound_from_sub_compact
= false;
1134 if (sub_compact
->outputs
.size() == 1) {
1135 // For the first output table, include range tombstones before the min key
1136 // but after the subcompaction boundary.
1137 lower_bound
= sub_compact
->start
;
1138 lower_bound_from_sub_compact
= true;
1139 } else if (meta
->smallest
.size() > 0) {
1140 // For subsequent output tables, only include range tombstones from min
1141 // key onwards since the previous file was extended to contain range
1142 // tombstones falling before min key.
1143 smallest_user_key
= meta
->smallest
.user_key().ToString(false /*hex*/);
1144 lower_bound_guard
= Slice(smallest_user_key
);
1145 lower_bound
= &lower_bound_guard
;
1147 lower_bound
= nullptr;
1149 if (next_table_min_key
!= nullptr) {
1150 // This may be the last file in the subcompaction in some cases, so we
1151 // need to compare the end key of subcompaction with the next file start
1152 // key. When the end key is chosen by the subcompaction, we know that
1153 // it must be the biggest key in output file. Therefore, it is safe to
1154 // use the smaller key as the upper bound of the output file, to ensure
1155 // that there is no overlapping between different output files.
1156 upper_bound_guard
= ExtractUserKey(*next_table_min_key
);
1157 if (sub_compact
->end
!= nullptr &&
1158 ucmp
->Compare(upper_bound_guard
, *sub_compact
->end
) >= 0) {
1159 upper_bound
= sub_compact
->end
;
1161 upper_bound
= &upper_bound_guard
;
1164 // This is the last file in the subcompaction, so extend until the
1165 // subcompaction ends.
1166 upper_bound
= sub_compact
->end
;
1168 auto earliest_snapshot
= kMaxSequenceNumber
;
1169 if (existing_snapshots_
.size() > 0) {
1170 earliest_snapshot
= existing_snapshots_
[0];
1172 bool has_overlapping_endpoints
;
1173 if (upper_bound
!= nullptr && meta
->largest
.size() > 0) {
1174 has_overlapping_endpoints
=
1175 ucmp
->Compare(meta
->largest
.user_key(), *upper_bound
) == 0;
1177 has_overlapping_endpoints
= false;
1180 // The end key of the subcompaction must be bigger or equal to the upper
1181 // bound. If the end of subcompaction is null or the upper bound is null,
1182 // it means that this file is the last file in the compaction. So there
1183 // will be no overlapping between this file and others.
1184 assert(sub_compact
->end
== nullptr ||
1185 upper_bound
== nullptr ||
1186 ucmp
->Compare(*upper_bound
, *sub_compact
->end
) <= 0);
1187 auto it
= range_del_agg
->NewIterator(lower_bound
, upper_bound
,
1188 has_overlapping_endpoints
);
1189 // Position the range tombstone output iterator. There may be tombstone
1190 // fragments that are entirely out of range, so make sure that we do not
1192 if (lower_bound
!= nullptr) {
1193 it
->Seek(*lower_bound
);
1197 for (; it
->Valid(); it
->Next()) {
1198 auto tombstone
= it
->Tombstone();
1199 if (upper_bound
!= nullptr) {
1200 int cmp
= ucmp
->Compare(*upper_bound
, tombstone
.start_key_
);
1201 if ((has_overlapping_endpoints
&& cmp
< 0) ||
1202 (!has_overlapping_endpoints
&& cmp
<= 0)) {
1203 // Tombstones starting after upper_bound only need to be included in
1204 // the next table. If the current SST ends before upper_bound, i.e.,
1205 // `has_overlapping_endpoints == false`, we can also skip over range
1206 // tombstones that start exactly at upper_bound. Such range tombstones
1207 // will be included in the next file and are not relevant to the point
1208 // keys or endpoints of the current file.
1213 if (bottommost_level_
&& tombstone
.seq_
<= earliest_snapshot
) {
1214 // TODO(andrewkr): tombstones that span multiple output files are
1215 // counted for each compaction output file, so lots of double counting.
1216 range_del_out_stats
->num_range_del_drop_obsolete
++;
1217 range_del_out_stats
->num_record_drop_obsolete
++;
1221 auto kv
= tombstone
.Serialize();
1222 assert(lower_bound
== nullptr ||
1223 ucmp
->Compare(*lower_bound
, kv
.second
) < 0);
1224 sub_compact
->builder
->Add(kv
.first
.Encode(), kv
.second
);
1225 InternalKey smallest_candidate
= std::move(kv
.first
);
1226 if (lower_bound
!= nullptr &&
1227 ucmp
->Compare(smallest_candidate
.user_key(), *lower_bound
) <= 0) {
1228 // Pretend the smallest key has the same user key as lower_bound
1229 // (the max key in the previous table or subcompaction) in order for
1230 // files to appear key-space partitioned.
1232 // When lower_bound is chosen by a subcompaction, we know that
1233 // subcompactions over smaller keys cannot contain any keys at
1234 // lower_bound. We also know that smaller subcompactions exist, because
1235 // otherwise the subcompaction woud be unbounded on the left. As a
1236 // result, we know that no other files on the output level will contain
1237 // actual keys at lower_bound (an output file may have a largest key of
1238 // lower_bound@kMaxSequenceNumber, but this only indicates a large range
1239 // tombstone was truncated). Therefore, it is safe to use the
1240 // tombstone's sequence number, to ensure that keys at lower_bound at
1241 // lower levels are covered by truncated tombstones.
1243 // If lower_bound was chosen by the smallest data key in the file,
1244 // choose lowest seqnum so this file's smallest internal key comes after
1245 // the previous file's largest. The fake seqnum is OK because the read
1246 // path's file-picking code only considers user key.
1247 smallest_candidate
= InternalKey(
1248 *lower_bound
, lower_bound_from_sub_compact
? tombstone
.seq_
: 0,
1249 kTypeRangeDeletion
);
1251 InternalKey largest_candidate
= tombstone
.SerializeEndKey();
1252 if (upper_bound
!= nullptr &&
1253 ucmp
->Compare(*upper_bound
, largest_candidate
.user_key()) <= 0) {
1254 // Pretend the largest key has the same user key as upper_bound (the
1255 // min key in the following table or subcompaction) in order for files
1256 // to appear key-space partitioned.
1258 // Choose highest seqnum so this file's largest internal key comes
1259 // before the next file's/subcompaction's smallest. The fake seqnum is
1260 // OK because the read path's file-picking code only considers the user
1263 // Note Seek() also creates InternalKey with (user_key,
1264 // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
1265 // kTypeRangeDeletion (0xF), so the range tombstone comes before the
1266 // Seek() key in InternalKey's ordering. So Seek() will look in the
1267 // next file for the user key.
1269 InternalKey(*upper_bound
, kMaxSequenceNumber
, kTypeRangeDeletion
);
1272 SequenceNumber smallest_ikey_seqnum
= kMaxSequenceNumber
;
1273 if (meta
->smallest
.size() > 0) {
1274 smallest_ikey_seqnum
= GetInternalKeySeqno(meta
->smallest
.Encode());
1277 meta
->UpdateBoundariesForRange(smallest_candidate
, largest_candidate
,
1279 cfd
->internal_comparator());
1281 // The smallest key in a file is used for range tombstone truncation, so
1282 // it cannot have a seqnum of 0 (unless the smallest data key in a file
1283 // has a seqnum of 0). Otherwise, the truncated tombstone may expose
1284 // deleted keys at lower levels.
1285 assert(smallest_ikey_seqnum
== 0 ||
1286 ExtractInternalKeyFooter(meta
->smallest
.Encode()) !=
1287 PackSequenceAndType(0, kTypeRangeDeletion
));
1289 meta
->marked_for_compaction
= sub_compact
->builder
->NeedCompact();
1291 const uint64_t current_entries
= sub_compact
->builder
->NumEntries();
1293 s
= sub_compact
->builder
->Finish();
1295 sub_compact
->builder
->Abandon();
1297 const uint64_t current_bytes
= sub_compact
->builder
->FileSize();
1299 // Add the checksum information to file metadata.
1300 meta
->file_checksum
= sub_compact
->builder
->GetFileChecksum();
1301 meta
->file_checksum_func_name
=
1302 sub_compact
->builder
->GetFileChecksumFuncName();
1304 meta
->fd
.file_size
= current_bytes
;
1306 sub_compact
->current_output()->finished
= true;
1307 sub_compact
->total_bytes
+= current_bytes
;
1309 // Finish and check for file errors
1311 StopWatch
sw(env_
, stats_
, COMPACTION_OUTFILE_SYNC_MICROS
);
1312 s
= sub_compact
->outfile
->Sync(db_options_
.use_fsync
);
1315 s
= sub_compact
->outfile
->Close();
1317 sub_compact
->outfile
.reset();
1321 tp
= sub_compact
->builder
->GetTableProperties();
1324 if (s
.ok() && current_entries
== 0 && tp
.num_range_deletions
== 0) {
1325 // If there is nothing to output, no necessary to generate a sst file.
1326 // This happens when the output level is bottom level, at the same time
1327 // the sub_compact output nothing.
1329 TableFileName(sub_compact
->compaction
->immutable_cf_options()->cf_paths
,
1330 meta
->fd
.GetNumber(), meta
->fd
.GetPathId());
1331 env_
->DeleteFile(fname
);
1333 // Also need to remove the file from outputs, or it will be added to the
1335 assert(!sub_compact
->outputs
.empty());
1336 sub_compact
->outputs
.pop_back();
1340 if (s
.ok() && (current_entries
> 0 || tp
.num_range_deletions
> 0)) {
1341 // Output to event logger and fire events.
1342 sub_compact
->current_output()->table_properties
=
1343 std::make_shared
<TableProperties
>(tp
);
1344 ROCKS_LOG_INFO(db_options_
.info_log
,
1345 "[%s] [JOB %d] Generated table #%" PRIu64
": %" PRIu64
1346 " keys, %" PRIu64
" bytes%s",
1347 cfd
->GetName().c_str(), job_id_
, output_number
,
1348 current_entries
, current_bytes
,
1349 meta
->marked_for_compaction
? " (need compaction)" : "");
1352 FileDescriptor output_fd
;
1353 uint64_t oldest_blob_file_number
= kInvalidBlobFileNumber
;
1354 if (meta
!= nullptr) {
1356 TableFileName(sub_compact
->compaction
->immutable_cf_options()->cf_paths
,
1357 meta
->fd
.GetNumber(), meta
->fd
.GetPathId());
1358 output_fd
= meta
->fd
;
1359 oldest_blob_file_number
= meta
->oldest_blob_file_number
;
1363 EventHelpers::LogAndNotifyTableFileCreationFinished(
1364 event_logger_
, cfd
->ioptions()->listeners
, dbname_
, cfd
->GetName(), fname
,
1365 job_id_
, output_fd
, oldest_blob_file_number
, tp
,
1366 TableFileCreationReason::kCompaction
, s
);
1368 #ifndef ROCKSDB_LITE
1369 // Report new file to SstFileManagerImpl
1371 static_cast<SstFileManagerImpl
*>(db_options_
.sst_file_manager
.get());
1372 if (sfm
&& meta
!= nullptr && meta
->fd
.GetPathId() == 0) {
1373 sfm
->OnAddFile(fname
);
1374 if (sfm
->IsMaxAllowedSpaceReached()) {
1375 // TODO(ajkr): should we return OK() if max space was reached by the final
1376 // compaction output file (similarly to how flush works when full)?
1377 s
= Status::SpaceLimit("Max allowed space was reached");
1379 "CompactionJob::FinishCompactionOutputFile:"
1380 "MaxAllowedSpaceReached");
1381 InstrumentedMutexLock
l(db_mutex_
);
1382 db_error_handler_
->SetBGError(s
, BackgroundErrorReason::kCompaction
);
1387 sub_compact
->builder
.reset();
1388 sub_compact
->current_output_file_size
= 0;
1392 Status
CompactionJob::InstallCompactionResults(
1393 const MutableCFOptions
& mutable_cf_options
) {
1394 db_mutex_
->AssertHeld();
1396 auto* compaction
= compact_
->compaction
;
1397 // paranoia: verify that the files that we started with
1398 // still exist in the current version and in the same original level.
1399 // This ensures that a concurrent compaction did not erroneously
1400 // pick the same files to compact_.
1401 if (!versions_
->VerifyCompactionFileConsistency(compaction
)) {
1402 Compaction::InputLevelSummaryBuffer inputs_summary
;
1404 ROCKS_LOG_ERROR(db_options_
.info_log
, "[%s] [JOB %d] Compaction %s aborted",
1405 compaction
->column_family_data()->GetName().c_str(),
1406 job_id_
, compaction
->InputLevelSummary(&inputs_summary
));
1407 return Status::Corruption("Compaction input files inconsistent");
1411 Compaction::InputLevelSummaryBuffer inputs_summary
;
1413 db_options_
.info_log
, "[%s] [JOB %d] Compacted %s => %" PRIu64
" bytes",
1414 compaction
->column_family_data()->GetName().c_str(), job_id_
,
1415 compaction
->InputLevelSummary(&inputs_summary
), compact_
->total_bytes
);
1418 // Add compaction inputs
1419 compaction
->AddInputDeletions(compact_
->compaction
->edit());
1421 for (const auto& sub_compact
: compact_
->sub_compact_states
) {
1422 for (const auto& out
: sub_compact
.outputs
) {
1423 compaction
->edit()->AddFile(compaction
->output_level(), out
.meta
);
1426 return versions_
->LogAndApply(compaction
->column_family_data(),
1427 mutable_cf_options
, compaction
->edit(),
1428 db_mutex_
, db_directory_
);
1431 void CompactionJob::RecordCompactionIOStats() {
1432 RecordTick(stats_
, COMPACT_READ_BYTES
, IOSTATS(bytes_read
));
1433 ThreadStatusUtil::IncreaseThreadOperationProperty(
1434 ThreadStatus::COMPACTION_BYTES_READ
, IOSTATS(bytes_read
));
1435 IOSTATS_RESET(bytes_read
);
1436 RecordTick(stats_
, COMPACT_WRITE_BYTES
, IOSTATS(bytes_written
));
1437 ThreadStatusUtil::IncreaseThreadOperationProperty(
1438 ThreadStatus::COMPACTION_BYTES_WRITTEN
, IOSTATS(bytes_written
));
1439 IOSTATS_RESET(bytes_written
);
1442 Status
CompactionJob::OpenCompactionOutputFile(
1443 SubcompactionState
* sub_compact
) {
1444 assert(sub_compact
!= nullptr);
1445 assert(sub_compact
->builder
== nullptr);
1446 // no need to lock because VersionSet::next_file_number_ is atomic
1447 uint64_t file_number
= versions_
->NewFileNumber();
1449 TableFileName(sub_compact
->compaction
->immutable_cf_options()->cf_paths
,
1450 file_number
, sub_compact
->compaction
->output_path_id());
1452 ColumnFamilyData
* cfd
= sub_compact
->compaction
->column_family_data();
1453 #ifndef ROCKSDB_LITE
1454 EventHelpers::NotifyTableFileCreationStarted(
1455 cfd
->ioptions()->listeners
, dbname_
, cfd
->GetName(), fname
, job_id_
,
1456 TableFileCreationReason::kCompaction
);
1457 #endif // !ROCKSDB_LITE
1458 // Make the output file
1459 std::unique_ptr
<FSWritableFile
> writable_file
;
1461 bool syncpoint_arg
= file_options_
.use_direct_writes
;
1462 TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
1465 Status s
= NewWritableFile(fs_
, fname
, &writable_file
, file_options_
);
1468 db_options_
.info_log
,
1469 "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
1470 " fails at NewWritableFile with status %s",
1471 sub_compact
->compaction
->column_family_data()->GetName().c_str(),
1472 job_id_
, file_number
, s
.ToString().c_str());
1473 LogFlush(db_options_
.info_log
);
1474 EventHelpers::LogAndNotifyTableFileCreationFinished(
1475 event_logger_
, cfd
->ioptions()->listeners
, dbname_
, cfd
->GetName(),
1476 fname
, job_id_
, FileDescriptor(), kInvalidBlobFileNumber
,
1477 TableProperties(), TableFileCreationReason::kCompaction
, s
);
1481 // Try to figure out the output file's oldest ancester time.
1482 int64_t temp_current_time
= 0;
1483 auto get_time_status
= env_
->GetCurrentTime(&temp_current_time
);
1484 // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
1485 if (!get_time_status
.ok()) {
1486 ROCKS_LOG_WARN(db_options_
.info_log
,
1487 "Failed to get current time. Status: %s",
1488 get_time_status
.ToString().c_str());
1490 uint64_t current_time
= static_cast<uint64_t>(temp_current_time
);
1491 uint64_t oldest_ancester_time
=
1492 sub_compact
->compaction
->MinInputFileOldestAncesterTime();
1493 if (oldest_ancester_time
== port::kMaxUint64
) {
1494 oldest_ancester_time
= current_time
;
1497 // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
1499 SubcompactionState::Output out
;
1500 out
.meta
.fd
= FileDescriptor(file_number
,
1501 sub_compact
->compaction
->output_path_id(), 0);
1502 out
.meta
.oldest_ancester_time
= oldest_ancester_time
;
1503 out
.meta
.file_creation_time
= current_time
;
1504 out
.finished
= false;
1505 sub_compact
->outputs
.push_back(out
);
1508 writable_file
->SetIOPriority(Env::IOPriority::IO_LOW
);
1509 writable_file
->SetWriteLifeTimeHint(write_hint_
);
1510 writable_file
->SetPreallocationBlockSize(static_cast<size_t>(
1511 sub_compact
->compaction
->OutputFilePreallocationSize()));
1512 const auto& listeners
=
1513 sub_compact
->compaction
->immutable_cf_options()->listeners
;
1514 sub_compact
->outfile
.reset(
1515 new WritableFileWriter(std::move(writable_file
), fname
, file_options_
,
1516 env_
, db_options_
.statistics
.get(), listeners
,
1517 db_options_
.sst_file_checksum_func
.get()));
1519 // If the Column family flag is to only optimize filters for hits,
1520 // we can skip creating filters if this is the bottommost_level where
1521 // data is going to be found
1523 cfd
->ioptions()->optimize_filters_for_hits
&& bottommost_level_
;
1525 sub_compact
->builder
.reset(NewTableBuilder(
1526 *cfd
->ioptions(), *(sub_compact
->compaction
->mutable_cf_options()),
1527 cfd
->internal_comparator(), cfd
->int_tbl_prop_collector_factories(),
1528 cfd
->GetID(), cfd
->GetName(), sub_compact
->outfile
.get(),
1529 sub_compact
->compaction
->output_compression(),
1530 0 /*sample_for_compression */,
1531 sub_compact
->compaction
->output_compression_opts(),
1532 sub_compact
->compaction
->output_level(), skip_filters
,
1533 oldest_ancester_time
, 0 /* oldest_key_time */,
1534 sub_compact
->compaction
->max_output_file_size(), current_time
));
1535 LogFlush(db_options_
.info_log
);
1539 void CompactionJob::CleanupCompaction() {
1540 for (SubcompactionState
& sub_compact
: compact_
->sub_compact_states
) {
1541 const auto& sub_status
= sub_compact
.status
;
1543 if (sub_compact
.builder
!= nullptr) {
1544 // May happen if we get a shutdown call in the middle of compaction
1545 sub_compact
.builder
->Abandon();
1546 sub_compact
.builder
.reset();
1548 assert(!sub_status
.ok() || sub_compact
.outfile
== nullptr);
1550 for (const auto& out
: sub_compact
.outputs
) {
1551 // If this file was inserted into the table cache then remove
1552 // them here because this compaction was not committed.
1553 if (!sub_status
.ok()) {
1554 TableCache::Evict(table_cache_
.get(), out
.meta
.fd
.GetNumber());
1562 #ifndef ROCKSDB_LITE
1564 void CopyPrefix(const Slice
& src
, size_t prefix_length
, std::string
* dst
) {
1565 assert(prefix_length
> 0);
1566 size_t length
= src
.size() > prefix_length
? prefix_length
: src
.size();
1567 dst
->assign(src
.data(), length
);
1571 #endif // !ROCKSDB_LITE
1573 void CompactionJob::UpdateCompactionStats() {
1574 Compaction
* compaction
= compact_
->compaction
;
1575 compaction_stats_
.num_input_files_in_non_output_levels
= 0;
1576 compaction_stats_
.num_input_files_in_output_level
= 0;
1577 for (int input_level
= 0;
1578 input_level
< static_cast<int>(compaction
->num_input_levels());
1580 if (compaction
->level(input_level
) != compaction
->output_level()) {
1581 UpdateCompactionInputStatsHelper(
1582 &compaction_stats_
.num_input_files_in_non_output_levels
,
1583 &compaction_stats_
.bytes_read_non_output_levels
, input_level
);
1585 UpdateCompactionInputStatsHelper(
1586 &compaction_stats_
.num_input_files_in_output_level
,
1587 &compaction_stats_
.bytes_read_output_level
, input_level
);
1591 uint64_t num_output_records
= 0;
1593 for (const auto& sub_compact
: compact_
->sub_compact_states
) {
1594 size_t num_output_files
= sub_compact
.outputs
.size();
1595 if (sub_compact
.builder
!= nullptr) {
1596 // An error occurred so ignore the last output.
1597 assert(num_output_files
> 0);
1600 compaction_stats_
.num_output_files
+= static_cast<int>(num_output_files
);
1602 num_output_records
+= sub_compact
.num_output_records
;
1604 for (const auto& out
: sub_compact
.outputs
) {
1605 compaction_stats_
.bytes_written
+= out
.meta
.fd
.file_size
;
1609 if (compaction_stats_
.num_input_records
> num_output_records
) {
1610 compaction_stats_
.num_dropped_records
=
1611 compaction_stats_
.num_input_records
- num_output_records
;
1615 void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files
,
1616 uint64_t* bytes_read
,
1618 const Compaction
* compaction
= compact_
->compaction
;
1619 auto num_input_files
= compaction
->num_input_files(input_level
);
1620 *num_files
+= static_cast<int>(num_input_files
);
1622 for (size_t i
= 0; i
< num_input_files
; ++i
) {
1623 const auto* file_meta
= compaction
->input(input_level
, i
);
1624 *bytes_read
+= file_meta
->fd
.GetFileSize();
1625 compaction_stats_
.num_input_records
+=
1626 static_cast<uint64_t>(file_meta
->num_entries
);
1630 void CompactionJob::UpdateCompactionJobStats(
1631 const InternalStats::CompactionStats
& stats
) const {
1632 #ifndef ROCKSDB_LITE
1633 if (compaction_job_stats_
) {
1634 compaction_job_stats_
->elapsed_micros
= stats
.micros
;
1636 // input information
1637 compaction_job_stats_
->total_input_bytes
=
1638 stats
.bytes_read_non_output_levels
+ stats
.bytes_read_output_level
;
1639 compaction_job_stats_
->num_input_records
= stats
.num_input_records
;
1640 compaction_job_stats_
->num_input_files
=
1641 stats
.num_input_files_in_non_output_levels
+
1642 stats
.num_input_files_in_output_level
;
1643 compaction_job_stats_
->num_input_files_at_output_level
=
1644 stats
.num_input_files_in_output_level
;
1646 // output information
1647 compaction_job_stats_
->total_output_bytes
= stats
.bytes_written
;
1648 compaction_job_stats_
->num_output_records
= compact_
->num_output_records
;
1649 compaction_job_stats_
->num_output_files
= stats
.num_output_files
;
1651 if (compact_
->NumOutputFiles() > 0U) {
1652 CopyPrefix(compact_
->SmallestUserKey(),
1653 CompactionJobStats::kMaxPrefixLength
,
1654 &compaction_job_stats_
->smallest_output_key_prefix
);
1655 CopyPrefix(compact_
->LargestUserKey(),
1656 CompactionJobStats::kMaxPrefixLength
,
1657 &compaction_job_stats_
->largest_output_key_prefix
);
1662 #endif // !ROCKSDB_LITE
1665 void CompactionJob::LogCompaction() {
1666 Compaction
* compaction
= compact_
->compaction
;
1667 ColumnFamilyData
* cfd
= compaction
->column_family_data();
1669 // Let's check if anything will get logged. Don't prepare all the info if
1670 // we're not logging
1671 if (db_options_
.info_log_level
<= InfoLogLevel::INFO_LEVEL
) {
1672 Compaction::InputLevelSummaryBuffer inputs_summary
;
1674 db_options_
.info_log
, "[%s] [JOB %d] Compacting %s, score %.2f",
1675 cfd
->GetName().c_str(), job_id_
,
1676 compaction
->InputLevelSummary(&inputs_summary
), compaction
->score());
1678 compaction
->Summary(scratch
, sizeof(scratch
));
1679 ROCKS_LOG_INFO(db_options_
.info_log
, "[%s] Compaction start summary: %s\n",
1680 cfd
->GetName().c_str(), scratch
);
1681 // build event logger report
1682 auto stream
= event_logger_
->Log();
1683 stream
<< "job" << job_id_
<< "event"
1684 << "compaction_started"
1685 << "compaction_reason"
1686 << GetCompactionReasonString(compaction
->compaction_reason());
1687 for (size_t i
= 0; i
< compaction
->num_input_levels(); ++i
) {
1688 stream
<< ("files_L" + ToString(compaction
->level(i
)));
1689 stream
.StartArray();
1690 for (auto f
: *compaction
->inputs(i
)) {
1691 stream
<< f
->fd
.GetNumber();
1695 stream
<< "score" << compaction
->score() << "input_data_size"
1696 << compaction
->CalculateTotalInputSize();
1700 } // namespace ROCKSDB_NAMESPACE