1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/compaction/compaction_job.h"
23 #include "db/blob/blob_file_addition.h"
24 #include "db/blob/blob_file_builder.h"
25 #include "db/builder.h"
26 #include "db/db_impl/db_impl.h"
27 #include "db/db_iter.h"
28 #include "db/dbformat.h"
29 #include "db/error_handler.h"
30 #include "db/event_helpers.h"
31 #include "db/log_reader.h"
32 #include "db/log_writer.h"
33 #include "db/memtable.h"
34 #include "db/memtable_list.h"
35 #include "db/merge_context.h"
36 #include "db/merge_helper.h"
37 #include "db/output_validator.h"
38 #include "db/range_del_aggregator.h"
39 #include "db/version_set.h"
40 #include "file/filename.h"
41 #include "file/read_write_util.h"
42 #include "file/sst_file_manager_impl.h"
43 #include "file/writable_file_writer.h"
44 #include "logging/log_buffer.h"
45 #include "logging/logging.h"
46 #include "monitoring/iostats_context_imp.h"
47 #include "monitoring/perf_context_imp.h"
48 #include "monitoring/thread_status_util.h"
49 #include "port/port.h"
50 #include "rocksdb/db.h"
51 #include "rocksdb/env.h"
52 #include "rocksdb/sst_partitioner.h"
53 #include "rocksdb/statistics.h"
54 #include "rocksdb/status.h"
55 #include "rocksdb/table.h"
56 #include "table/block_based/block.h"
57 #include "table/block_based/block_based_table_factory.h"
58 #include "table/merging_iterator.h"
59 #include "table/table_builder.h"
60 #include "test_util/sync_point.h"
61 #include "util/coding.h"
62 #include "util/hash.h"
63 #include "util/mutexlock.h"
64 #include "util/random.h"
65 #include "util/stop_watch.h"
66 #include "util/string_util.h"
68 namespace ROCKSDB_NAMESPACE
{
70 const char* GetCompactionReasonString(CompactionReason compaction_reason
) {
71 switch (compaction_reason
) {
72 case CompactionReason::kUnknown
:
74 case CompactionReason::kLevelL0FilesNum
:
75 return "LevelL0FilesNum";
76 case CompactionReason::kLevelMaxLevelSize
:
77 return "LevelMaxLevelSize";
78 case CompactionReason::kUniversalSizeAmplification
:
79 return "UniversalSizeAmplification";
80 case CompactionReason::kUniversalSizeRatio
:
81 return "UniversalSizeRatio";
82 case CompactionReason::kUniversalSortedRunNum
:
83 return "UniversalSortedRunNum";
84 case CompactionReason::kFIFOMaxSize
:
86 case CompactionReason::kFIFOReduceNumFiles
:
87 return "FIFOReduceNumFiles";
88 case CompactionReason::kFIFOTtl
:
90 case CompactionReason::kManualCompaction
:
91 return "ManualCompaction";
92 case CompactionReason::kFilesMarkedForCompaction
:
93 return "FilesMarkedForCompaction";
94 case CompactionReason::kBottommostFiles
:
95 return "BottommostFiles";
96 case CompactionReason::kTtl
:
98 case CompactionReason::kFlush
:
100 case CompactionReason::kExternalSstIngestion
:
101 return "ExternalSstIngestion";
102 case CompactionReason::kPeriodicCompaction
:
103 return "PeriodicCompaction";
104 case CompactionReason::kNumOfReasons
:
112 // Maintains state for each sub-compaction
113 struct CompactionJob::SubcompactionState
{
114 const Compaction
* compaction
;
115 std::unique_ptr
<CompactionIterator
> c_iter
;
117 // The boundaries of the key-range this compaction is interested in. No two
118 // subcompactions may have overlapping key-ranges.
119 // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
122 // The return status of this subcompaction
125 // The return IO Status of this subcompaction
128 // Files produced by this subcompaction
130 Output(FileMetaData
&& _meta
, const InternalKeyComparator
& _icmp
,
131 bool _enable_order_check
, bool _enable_hash
)
132 : meta(std::move(_meta
)),
133 validator(_icmp
, _enable_order_check
, _enable_hash
),
136 OutputValidator validator
;
138 std::shared_ptr
<const TableProperties
> table_properties
;
141 // State kept for output being generated
142 std::vector
<Output
> outputs
;
143 std::vector
<BlobFileAddition
> blob_file_additions
;
144 std::unique_ptr
<WritableFileWriter
> outfile
;
145 std::unique_ptr
<TableBuilder
> builder
;
147 Output
* current_output() {
148 if (outputs
.empty()) {
149 // This subcompaction's output could be empty if compaction was aborted
150 // before this subcompaction had a chance to generate any output files.
151 // When subcompactions are executed sequentially this is more likely and
152 // will be particulalry likely for the later subcompactions to be empty.
153 // Once they are run in parallel however it should be much rarer.
156 return &outputs
.back();
160 uint64_t current_output_file_size
= 0;
162 // State during the subcompaction
163 uint64_t total_bytes
= 0;
164 uint64_t num_output_records
= 0;
165 CompactionJobStats compaction_job_stats
;
166 uint64_t approx_size
= 0;
167 // An index that used to speed up ShouldStopBefore().
168 size_t grandparent_index
= 0;
169 // The number of bytes overlapping between the current output and
170 // grandparent files used in ShouldStopBefore().
171 uint64_t overlapped_bytes
= 0;
172 // A flag determine whether the key has been seen in ShouldStopBefore()
173 bool seen_key
= false;
175 SubcompactionState(Compaction
* c
, Slice
* _start
, Slice
* _end
, uint64_t size
)
176 : compaction(c
), start(_start
), end(_end
), approx_size(size
) {
177 assert(compaction
!= nullptr);
180 // Adds the key and value to the builder
181 // If paranoid is true, adds the key-value to the paranoid hash
182 Status
AddToBuilder(const Slice
& key
, const Slice
& value
) {
183 auto curr
= current_output();
184 assert(builder
!= nullptr);
185 assert(curr
!= nullptr);
186 Status s
= curr
->validator
.Add(key
, value
);
190 builder
->Add(key
, value
);
194 // Returns true iff we should stop building the current output
195 // before processing "internal_key".
196 bool ShouldStopBefore(const Slice
& internal_key
, uint64_t curr_file_size
) {
197 const InternalKeyComparator
* icmp
=
198 &compaction
->column_family_data()->internal_comparator();
199 const std::vector
<FileMetaData
*>& grandparents
= compaction
->grandparents();
201 // Scan to find earliest grandparent file that contains key.
202 while (grandparent_index
< grandparents
.size() &&
203 icmp
->Compare(internal_key
,
204 grandparents
[grandparent_index
]->largest
.Encode()) >
207 overlapped_bytes
+= grandparents
[grandparent_index
]->fd
.GetFileSize();
209 assert(grandparent_index
+ 1 >= grandparents
.size() ||
211 grandparents
[grandparent_index
]->largest
.Encode(),
212 grandparents
[grandparent_index
+ 1]->smallest
.Encode()) <= 0);
217 if (overlapped_bytes
+ curr_file_size
>
218 compaction
->max_compaction_bytes()) {
219 // Too much overlap for current output; start new output
220 overlapped_bytes
= 0;
228 // Maintains state for the entire compaction
229 struct CompactionJob::CompactionState
{
230 Compaction
* const compaction
;
232 // REQUIRED: subcompaction states are stored in order of increasing
234 std::vector
<CompactionJob::SubcompactionState
> sub_compact_states
;
237 size_t num_output_files
= 0;
238 uint64_t total_bytes
= 0;
239 size_t num_blob_output_files
= 0;
240 uint64_t total_blob_bytes
= 0;
241 uint64_t num_output_records
= 0;
243 explicit CompactionState(Compaction
* c
) : compaction(c
) {}
245 Slice
SmallestUserKey() {
246 for (const auto& sub_compact_state
: sub_compact_states
) {
247 if (!sub_compact_state
.outputs
.empty() &&
248 sub_compact_state
.outputs
[0].finished
) {
249 return sub_compact_state
.outputs
[0].meta
.smallest
.user_key();
252 // If there is no finished output, return an empty slice.
253 return Slice(nullptr, 0);
256 Slice
LargestUserKey() {
257 for (auto it
= sub_compact_states
.rbegin(); it
< sub_compact_states
.rend();
259 if (!it
->outputs
.empty() && it
->current_output()->finished
) {
260 assert(it
->current_output() != nullptr);
261 return it
->current_output()->meta
.largest
.user_key();
264 // If there is no finished output, return an empty slice.
265 return Slice(nullptr, 0);
269 void CompactionJob::AggregateStatistics() {
272 for (SubcompactionState
& sc
: compact_
->sub_compact_states
) {
273 auto& outputs
= sc
.outputs
;
275 if (!outputs
.empty() && !outputs
.back().meta
.fd
.file_size
) {
276 // An error occurred, so ignore the last output.
280 compact_
->num_output_files
+= outputs
.size();
281 compact_
->total_bytes
+= sc
.total_bytes
;
283 const auto& blobs
= sc
.blob_file_additions
;
285 compact_
->num_blob_output_files
+= blobs
.size();
287 for (const auto& blob
: blobs
) {
288 compact_
->total_blob_bytes
+= blob
.GetTotalBlobBytes();
291 compact_
->num_output_records
+= sc
.num_output_records
;
293 compaction_job_stats_
->Add(sc
.compaction_job_stats
);
297 CompactionJob::CompactionJob(
298 int job_id
, Compaction
* compaction
, const ImmutableDBOptions
& db_options
,
299 const FileOptions
& file_options
, VersionSet
* versions
,
300 const std::atomic
<bool>* shutting_down
,
301 const SequenceNumber preserve_deletes_seqnum
, LogBuffer
* log_buffer
,
302 FSDirectory
* db_directory
, FSDirectory
* output_directory
,
303 FSDirectory
* blob_output_directory
, Statistics
* stats
,
304 InstrumentedMutex
* db_mutex
, ErrorHandler
* db_error_handler
,
305 std::vector
<SequenceNumber
> existing_snapshots
,
306 SequenceNumber earliest_write_conflict_snapshot
,
307 const SnapshotChecker
* snapshot_checker
, std::shared_ptr
<Cache
> table_cache
,
308 EventLogger
* event_logger
, bool paranoid_file_checks
, bool measure_io_stats
,
309 const std::string
& dbname
, CompactionJobStats
* compaction_job_stats
,
310 Env::Priority thread_pri
, const std::shared_ptr
<IOTracer
>& io_tracer
,
311 const std::atomic
<int>* manual_compaction_paused
, const std::string
& db_id
,
312 const std::string
& db_session_id
, std::string full_history_ts_low
)
314 compact_(new CompactionState(compaction
)),
315 compaction_job_stats_(compaction_job_stats
),
316 compaction_stats_(compaction
->compaction_reason(), 1),
319 db_session_id_(db_session_id
),
320 db_options_(db_options
),
321 file_options_(file_options
),
322 env_(db_options
.env
),
323 io_tracer_(io_tracer
),
324 fs_(db_options
.fs
, io_tracer
),
325 file_options_for_read_(
326 fs_
->OptimizeForCompactionTableRead(file_options
, db_options_
)),
328 shutting_down_(shutting_down
),
329 manual_compaction_paused_(manual_compaction_paused
),
330 preserve_deletes_seqnum_(preserve_deletes_seqnum
),
331 log_buffer_(log_buffer
),
332 db_directory_(db_directory
),
333 output_directory_(output_directory
),
334 blob_output_directory_(blob_output_directory
),
337 db_error_handler_(db_error_handler
),
338 existing_snapshots_(std::move(existing_snapshots
)),
339 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot
),
340 snapshot_checker_(snapshot_checker
),
341 table_cache_(std::move(table_cache
)),
342 event_logger_(event_logger
),
343 bottommost_level_(false),
344 paranoid_file_checks_(paranoid_file_checks
),
345 measure_io_stats_(measure_io_stats
),
346 write_hint_(Env::WLTH_NOT_SET
),
347 thread_pri_(thread_pri
),
348 full_history_ts_low_(std::move(full_history_ts_low
)) {
349 assert(compaction_job_stats_
!= nullptr);
350 assert(log_buffer_
!= nullptr);
351 const auto* cfd
= compact_
->compaction
->column_family_data();
352 ThreadStatusUtil::SetColumnFamily(cfd
, cfd
->ioptions()->env
,
353 db_options_
.enable_thread_tracking
);
354 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION
);
355 ReportStartedCompaction(compaction
);
358 CompactionJob::~CompactionJob() {
359 assert(compact_
== nullptr);
360 ThreadStatusUtil::ResetThreadStatus();
363 void CompactionJob::ReportStartedCompaction(Compaction
* compaction
) {
364 const auto* cfd
= compact_
->compaction
->column_family_data();
365 ThreadStatusUtil::SetColumnFamily(cfd
, cfd
->ioptions()->env
,
366 db_options_
.enable_thread_tracking
);
368 ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID
,
371 ThreadStatusUtil::SetThreadOperationProperty(
372 ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL
,
373 (static_cast<uint64_t>(compact_
->compaction
->start_level()) << 32) +
374 compact_
->compaction
->output_level());
376 // In the current design, a CompactionJob is always created
377 // for non-trivial compaction.
378 assert(compaction
->IsTrivialMove() == false ||
379 compaction
->is_manual_compaction() == true);
381 ThreadStatusUtil::SetThreadOperationProperty(
382 ThreadStatus::COMPACTION_PROP_FLAGS
,
383 compaction
->is_manual_compaction() +
384 (compaction
->deletion_compaction() << 1));
386 ThreadStatusUtil::SetThreadOperationProperty(
387 ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES
,
388 compaction
->CalculateTotalInputSize());
390 IOSTATS_RESET(bytes_written
);
391 IOSTATS_RESET(bytes_read
);
392 ThreadStatusUtil::SetThreadOperationProperty(
393 ThreadStatus::COMPACTION_BYTES_WRITTEN
, 0);
394 ThreadStatusUtil::SetThreadOperationProperty(
395 ThreadStatus::COMPACTION_BYTES_READ
, 0);
397 // Set the thread operation after operation properties
398 // to ensure GetThreadList() can always show them all together.
399 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION
);
401 compaction_job_stats_
->is_manual_compaction
=
402 compaction
->is_manual_compaction();
403 compaction_job_stats_
->is_full_compaction
= compaction
->is_full_compaction();
406 void CompactionJob::Prepare() {
407 AutoThreadOperationStageUpdater
stage_updater(
408 ThreadStatus::STAGE_COMPACTION_PREPARE
);
410 // Generate file_levels_ for compaction berfore making Iterator
411 auto* c
= compact_
->compaction
;
412 assert(c
->column_family_data() != nullptr);
413 assert(c
->column_family_data()->current()->storage_info()->NumLevelFiles(
414 compact_
->compaction
->level()) > 0);
417 c
->column_family_data()->CalculateSSTWriteHint(c
->output_level());
418 bottommost_level_
= c
->bottommost_level();
420 if (c
->ShouldFormSubcompactions()) {
422 StopWatch
sw(env_
, stats_
, SUBCOMPACTION_SETUP_TIME
);
423 GenSubcompactionBoundaries();
425 assert(sizes_
.size() == boundaries_
.size() + 1);
427 for (size_t i
= 0; i
<= boundaries_
.size(); i
++) {
428 Slice
* start
= i
== 0 ? nullptr : &boundaries_
[i
- 1];
429 Slice
* end
= i
== boundaries_
.size() ? nullptr : &boundaries_
[i
];
430 compact_
->sub_compact_states
.emplace_back(c
, start
, end
, sizes_
[i
]);
432 RecordInHistogram(stats_
, NUM_SUBCOMPACTIONS_SCHEDULED
,
433 compact_
->sub_compact_states
.size());
435 constexpr Slice
* start
= nullptr;
436 constexpr Slice
* end
= nullptr;
437 constexpr uint64_t size
= 0;
439 compact_
->sub_compact_states
.emplace_back(c
, start
, end
, size
);
443 struct RangeWithSize
{
447 RangeWithSize(const Slice
& a
, const Slice
& b
, uint64_t s
= 0)
448 : range(a
, b
), size(s
) {}
451 void CompactionJob::GenSubcompactionBoundaries() {
452 auto* c
= compact_
->compaction
;
453 auto* cfd
= c
->column_family_data();
454 const Comparator
* cfd_comparator
= cfd
->user_comparator();
455 std::vector
<Slice
> bounds
;
456 int start_lvl
= c
->start_level();
457 int out_lvl
= c
->output_level();
459 // Add the starting and/or ending key of certain input files as a potential
461 for (size_t lvl_idx
= 0; lvl_idx
< c
->num_input_levels(); lvl_idx
++) {
462 int lvl
= c
->level(lvl_idx
);
463 if (lvl
>= start_lvl
&& lvl
<= out_lvl
) {
464 const LevelFilesBrief
* flevel
= c
->input_levels(lvl_idx
);
465 size_t num_files
= flevel
->num_files
;
467 if (num_files
== 0) {
472 // For level 0 add the starting and ending key of each file since the
473 // files may have greatly differing key ranges (not range-partitioned)
474 for (size_t i
= 0; i
< num_files
; i
++) {
475 bounds
.emplace_back(flevel
->files
[i
].smallest_key
);
476 bounds
.emplace_back(flevel
->files
[i
].largest_key
);
479 // For all other levels add the smallest/largest key in the level to
480 // encompass the range covered by that level
481 bounds
.emplace_back(flevel
->files
[0].smallest_key
);
482 bounds
.emplace_back(flevel
->files
[num_files
- 1].largest_key
);
483 if (lvl
== out_lvl
) {
484 // For the last level include the starting keys of all files since
485 // the last level is the largest and probably has the widest key
486 // range. Since it's range partitioned, the ending key of one file
487 // and the starting key of the next are very close (or identical).
488 for (size_t i
= 1; i
< num_files
; i
++) {
489 bounds
.emplace_back(flevel
->files
[i
].smallest_key
);
496 std::sort(bounds
.begin(), bounds
.end(),
497 [cfd_comparator
](const Slice
& a
, const Slice
& b
) -> bool {
498 return cfd_comparator
->Compare(ExtractUserKey(a
),
499 ExtractUserKey(b
)) < 0;
501 // Remove duplicated entries from bounds
503 std::unique(bounds
.begin(), bounds
.end(),
504 [cfd_comparator
](const Slice
& a
, const Slice
& b
) -> bool {
505 return cfd_comparator
->Compare(ExtractUserKey(a
),
506 ExtractUserKey(b
)) == 0;
510 // Combine consecutive pairs of boundaries into ranges with an approximate
511 // size of data covered by keys in that range
513 std::vector
<RangeWithSize
> ranges
;
514 // Get input version from CompactionState since it's already referenced
515 // earlier in SetInputVersioCompaction::SetInputVersion and will not change
516 // when db_mutex_ is released below
517 auto* v
= compact_
->compaction
->input_version();
518 for (auto it
= bounds
.begin();;) {
522 if (it
== bounds
.end()) {
528 // ApproximateSize could potentially create table reader iterator to seek
529 // to the index block and may incur I/O cost in the process. Unlock db
530 // mutex to reduce contention
532 uint64_t size
= versions_
->ApproximateSize(SizeApproximationOptions(), v
, a
,
533 b
, start_lvl
, out_lvl
+ 1,
534 TableReaderCaller::kCompaction
);
536 ranges
.emplace_back(a
, b
, size
);
540 // Group the ranges into subcompactions
541 const double min_file_fill_percent
= 4.0 / 5;
542 int base_level
= v
->storage_info()->base_level();
543 uint64_t max_output_files
= static_cast<uint64_t>(std::ceil(
544 sum
/ min_file_fill_percent
/
545 MaxFileSizeForLevel(*(c
->mutable_cf_options()), out_lvl
,
546 c
->immutable_cf_options()->compaction_style
, base_level
,
547 c
->immutable_cf_options()->level_compaction_dynamic_level_bytes
)));
548 uint64_t subcompactions
=
549 std::min({static_cast<uint64_t>(ranges
.size()),
550 static_cast<uint64_t>(c
->max_subcompactions()),
553 if (subcompactions
> 1) {
554 double mean
= sum
* 1.0 / subcompactions
;
555 // Greedily add ranges to the subcompaction until the sum of the ranges'
556 // sizes becomes >= the expected mean size of a subcompaction
558 for (size_t i
= 0; i
+ 1 < ranges
.size(); i
++) {
559 sum
+= ranges
[i
].size
;
560 if (subcompactions
== 1) {
561 // If there's only one left to schedule then it goes to the end so no
562 // need to put an end boundary
566 boundaries_
.emplace_back(ExtractUserKey(ranges
[i
].range
.limit
));
567 sizes_
.emplace_back(sum
);
572 sizes_
.emplace_back(sum
+ ranges
.back().size
);
574 // Only one range so its size is the total sum of sizes computed above
575 sizes_
.emplace_back(sum
);
579 Status
CompactionJob::Run() {
580 AutoThreadOperationStageUpdater
stage_updater(
581 ThreadStatus::STAGE_COMPACTION_RUN
);
582 TEST_SYNC_POINT("CompactionJob::Run():Start");
583 log_buffer_
->FlushBufferToLog();
586 const size_t num_threads
= compact_
->sub_compact_states
.size();
587 assert(num_threads
> 0);
588 const uint64_t start_micros
= env_
->NowMicros();
590 // Launch a thread for each of subcompactions 1...num_threads-1
591 std::vector
<port::Thread
> thread_pool
;
592 thread_pool
.reserve(num_threads
- 1);
593 for (size_t i
= 1; i
< compact_
->sub_compact_states
.size(); i
++) {
594 thread_pool
.emplace_back(&CompactionJob::ProcessKeyValueCompaction
, this,
595 &compact_
->sub_compact_states
[i
]);
598 // Always schedule the first subcompaction (whether or not there are also
599 // others) in the current thread to be efficient with resources
600 ProcessKeyValueCompaction(&compact_
->sub_compact_states
[0]);
602 // Wait for all other threads (if there are any) to finish execution
603 for (auto& thread
: thread_pool
) {
607 compaction_stats_
.micros
= env_
->NowMicros() - start_micros
;
608 compaction_stats_
.cpu_micros
= 0;
609 for (size_t i
= 0; i
< compact_
->sub_compact_states
.size(); i
++) {
610 compaction_stats_
.cpu_micros
+=
611 compact_
->sub_compact_states
[i
].compaction_job_stats
.cpu_micros
;
614 RecordTimeToHistogram(stats_
, COMPACTION_TIME
, compaction_stats_
.micros
);
615 RecordTimeToHistogram(stats_
, COMPACTION_CPU_TIME
,
616 compaction_stats_
.cpu_micros
);
618 TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
620 // Check if any thread encountered an error during execution
623 bool wrote_new_blob_files
= false;
625 for (const auto& state
: compact_
->sub_compact_states
) {
626 if (!state
.status
.ok()) {
627 status
= state
.status
;
628 io_s
= state
.io_status
;
632 if (!state
.blob_file_additions
.empty()) {
633 wrote_new_blob_files
= true;
637 if (io_status_
.ok()) {
641 constexpr IODebugContext
* dbg
= nullptr;
643 if (output_directory_
) {
644 io_s
= output_directory_
->Fsync(IOOptions(), dbg
);
647 if (io_s
.ok() && wrote_new_blob_files
&& blob_output_directory_
&&
648 blob_output_directory_
!= output_directory_
) {
649 io_s
= blob_output_directory_
->Fsync(IOOptions(), dbg
);
652 if (io_status_
.ok()) {
660 std::vector
<const CompactionJob::SubcompactionState::Output
*> files_output
;
661 for (const auto& state
: compact_
->sub_compact_states
) {
662 for (const auto& output
: state
.outputs
) {
663 files_output
.emplace_back(&output
);
666 ColumnFamilyData
* cfd
= compact_
->compaction
->column_family_data();
667 auto prefix_extractor
=
668 compact_
->compaction
->mutable_cf_options()->prefix_extractor
.get();
669 std::atomic
<size_t> next_file_idx(0);
670 auto verify_table
= [&](Status
& output_status
) {
672 size_t file_idx
= next_file_idx
.fetch_add(1);
673 if (file_idx
>= files_output
.size()) {
676 // Verify that the table is usable
677 // We set for_compaction to false and don't OptimizeForCompactionTableRead
678 // here because this is a special case after we finish the table building
679 // No matter whether use_direct_io_for_flush_and_compaction is true,
680 // we will regard this verification as user reads since the goal is
681 // to cache it here for further user reads
682 ReadOptions read_options
;
683 InternalIterator
* iter
= cfd
->table_cache()->NewIterator(
684 read_options
, file_options_
, cfd
->internal_comparator(),
685 files_output
[file_idx
]->meta
, /*range_del_agg=*/nullptr,
687 /*table_reader_ptr=*/nullptr,
688 cfd
->internal_stats()->GetFileReadHist(
689 compact_
->compaction
->output_level()),
690 TableReaderCaller::kCompactionRefill
, /*arena=*/nullptr,
691 /*skip_filters=*/false, compact_
->compaction
->output_level(),
692 MaxFileSizeForL0MetaPin(
693 *compact_
->compaction
->mutable_cf_options()),
694 /*smallest_compaction_key=*/nullptr,
695 /*largest_compaction_key=*/nullptr,
696 /*allow_unprepared_value=*/false);
697 auto s
= iter
->status();
699 if (s
.ok() && paranoid_file_checks_
) {
700 OutputValidator
validator(cfd
->internal_comparator(),
701 /*_enable_order_check=*/true,
702 /*_enable_hash=*/true);
703 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
704 s
= validator
.Add(iter
->key(), iter
->value());
713 !validator
.CompareValidator(files_output
[file_idx
]->validator
)) {
714 s
= Status::Corruption("Paranoid checksums do not match");
726 for (size_t i
= 1; i
< compact_
->sub_compact_states
.size(); i
++) {
727 thread_pool
.emplace_back(verify_table
,
728 std::ref(compact_
->sub_compact_states
[i
].status
));
730 verify_table(compact_
->sub_compact_states
[0].status
);
731 for (auto& thread
: thread_pool
) {
734 for (const auto& state
: compact_
->sub_compact_states
) {
735 if (!state
.status
.ok()) {
736 status
= state
.status
;
742 TablePropertiesCollection tp
;
743 for (const auto& state
: compact_
->sub_compact_states
) {
744 for (const auto& output
: state
.outputs
) {
746 TableFileName(state
.compaction
->immutable_cf_options()->cf_paths
,
747 output
.meta
.fd
.GetNumber(), output
.meta
.fd
.GetPathId());
748 tp
[fn
] = output
.table_properties
;
751 compact_
->compaction
->SetOutputTableProperties(std::move(tp
));
753 // Finish up all book-keeping to unify the subcompaction results
754 AggregateStatistics();
755 UpdateCompactionStats();
757 RecordCompactionIOStats();
758 LogFlush(db_options_
.info_log
);
759 TEST_SYNC_POINT("CompactionJob::Run():End");
761 compact_
->status
= status
;
765 Status
CompactionJob::Install(const MutableCFOptions
& mutable_cf_options
) {
768 AutoThreadOperationStageUpdater
stage_updater(
769 ThreadStatus::STAGE_COMPACTION_INSTALL
);
770 db_mutex_
->AssertHeld();
771 Status status
= compact_
->status
;
773 ColumnFamilyData
* cfd
= compact_
->compaction
->column_family_data();
776 cfd
->internal_stats()->AddCompactionStats(
777 compact_
->compaction
->output_level(), thread_pri_
, compaction_stats_
);
780 status
= InstallCompactionResults(mutable_cf_options
);
782 if (!versions_
->io_status().ok()) {
783 io_status_
= versions_
->io_status();
786 VersionStorageInfo::LevelSummaryStorage tmp
;
787 auto vstorage
= cfd
->current()->storage_info();
788 const auto& stats
= compaction_stats_
;
790 double read_write_amp
= 0.0;
791 double write_amp
= 0.0;
792 double bytes_read_per_sec
= 0;
793 double bytes_written_per_sec
= 0;
795 if (stats
.bytes_read_non_output_levels
> 0) {
796 read_write_amp
= (stats
.bytes_written
+ stats
.bytes_read_output_level
+
797 stats
.bytes_read_non_output_levels
) /
798 static_cast<double>(stats
.bytes_read_non_output_levels
);
799 write_amp
= stats
.bytes_written
/
800 static_cast<double>(stats
.bytes_read_non_output_levels
);
802 if (stats
.micros
> 0) {
804 (stats
.bytes_read_non_output_levels
+ stats
.bytes_read_output_level
) /
805 static_cast<double>(stats
.micros
);
806 bytes_written_per_sec
=
807 stats
.bytes_written
/ static_cast<double>(stats
.micros
);
810 const std::string
& column_family_name
= cfd
->GetName();
814 "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
815 "files in(%d, %d) out(%d) "
816 "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
817 "write-amplify(%.1f) %s, records in: %" PRIu64
818 ", records dropped: %" PRIu64
" output_compression: %s\n",
819 column_family_name
.c_str(), vstorage
->LevelSummary(&tmp
),
820 bytes_read_per_sec
, bytes_written_per_sec
,
821 compact_
->compaction
->output_level(),
822 stats
.num_input_files_in_non_output_levels
,
823 stats
.num_input_files_in_output_level
, stats
.num_output_files
,
824 stats
.bytes_read_non_output_levels
/ 1048576.0,
825 stats
.bytes_read_output_level
/ 1048576.0,
826 stats
.bytes_written
/ 1048576.0, read_write_amp
, write_amp
,
827 status
.ToString().c_str(), stats
.num_input_records
,
828 stats
.num_dropped_records
,
829 CompressionTypeToString(compact_
->compaction
->output_compression())
832 const auto& blob_files
= vstorage
->GetBlobFiles();
833 if (!blob_files
.empty()) {
834 ROCKS_LOG_BUFFER(log_buffer_
,
835 "[%s] Blob file summary: head=%" PRIu64
", tail=%" PRIu64
837 column_family_name
.c_str(), blob_files
.begin()->first
,
838 blob_files
.rbegin()->first
);
841 UpdateCompactionJobStats(stats
);
843 auto stream
= event_logger_
->LogToBuffer(log_buffer_
);
844 stream
<< "job" << job_id_
<< "event"
845 << "compaction_finished"
846 << "compaction_time_micros" << stats
.micros
847 << "compaction_time_cpu_micros" << stats
.cpu_micros
<< "output_level"
848 << compact_
->compaction
->output_level() << "num_output_files"
849 << compact_
->num_output_files
<< "total_output_size"
850 << compact_
->total_bytes
;
852 if (compact_
->num_blob_output_files
> 0) {
853 stream
<< "num_blob_output_files" << compact_
->num_blob_output_files
854 << "total_blob_output_size" << compact_
->total_blob_bytes
;
857 stream
<< "num_input_records" << stats
.num_input_records
858 << "num_output_records" << compact_
->num_output_records
859 << "num_subcompactions" << compact_
->sub_compact_states
.size()
860 << "output_compression"
861 << CompressionTypeToString(compact_
->compaction
->output_compression());
863 stream
<< "num_single_delete_mismatches"
864 << compaction_job_stats_
->num_single_del_mismatch
;
865 stream
<< "num_single_delete_fallthrough"
866 << compaction_job_stats_
->num_single_del_fallthru
;
868 if (measure_io_stats_
) {
869 stream
<< "file_write_nanos" << compaction_job_stats_
->file_write_nanos
;
870 stream
<< "file_range_sync_nanos"
871 << compaction_job_stats_
->file_range_sync_nanos
;
872 stream
<< "file_fsync_nanos" << compaction_job_stats_
->file_fsync_nanos
;
873 stream
<< "file_prepare_write_nanos"
874 << compaction_job_stats_
->file_prepare_write_nanos
;
877 stream
<< "lsm_state";
879 for (int level
= 0; level
< vstorage
->num_levels(); ++level
) {
880 stream
<< vstorage
->NumLevelFiles(level
);
884 if (!blob_files
.empty()) {
885 stream
<< "blob_file_head" << blob_files
.begin()->first
;
886 stream
<< "blob_file_tail" << blob_files
.rbegin()->first
;
893 void CompactionJob::ProcessKeyValueCompaction(SubcompactionState
* sub_compact
) {
895 assert(sub_compact
->compaction
);
897 uint64_t prev_cpu_micros
= env_
->NowCPUNanos() / 1000;
899 ColumnFamilyData
* cfd
= sub_compact
->compaction
->column_family_data();
901 // Create compaction filter and fail the compaction if
902 // IgnoreSnapshots() = false because it is not supported anymore
903 const CompactionFilter
* compaction_filter
=
904 cfd
->ioptions()->compaction_filter
;
905 std::unique_ptr
<CompactionFilter
> compaction_filter_from_factory
= nullptr;
906 if (compaction_filter
== nullptr) {
907 compaction_filter_from_factory
=
908 sub_compact
->compaction
->CreateCompactionFilter();
909 compaction_filter
= compaction_filter_from_factory
.get();
911 if (compaction_filter
!= nullptr && !compaction_filter
->IgnoreSnapshots()) {
912 sub_compact
->status
= Status::NotSupported(
913 "CompactionFilter::IgnoreSnapshots() = false is not supported "
918 CompactionRangeDelAggregator
range_del_agg(&cfd
->internal_comparator(),
919 existing_snapshots_
);
920 ReadOptions read_options
;
921 read_options
.verify_checksums
= true;
922 read_options
.fill_cache
= false;
923 // Compaction iterators shouldn't be confined to a single prefix.
924 // Compactions use Seek() for
925 // (a) concurrent compactions,
926 // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
927 read_options
.total_order_seek
= true;
929 // Although the v2 aggregator is what the level iterator(s) know about,
930 // the AddTombstones calls will be propagated down to the v1 aggregator.
931 std::unique_ptr
<InternalIterator
> input(
932 versions_
->MakeInputIterator(read_options
, sub_compact
->compaction
,
933 &range_del_agg
, file_options_for_read_
));
935 AutoThreadOperationStageUpdater
stage_updater(
936 ThreadStatus::STAGE_COMPACTION_PROCESS_KV
);
938 // I/O measurement variables
939 PerfLevel prev_perf_level
= PerfLevel::kEnableTime
;
940 const uint64_t kRecordStatsEvery
= 1000;
941 uint64_t prev_write_nanos
= 0;
942 uint64_t prev_fsync_nanos
= 0;
943 uint64_t prev_range_sync_nanos
= 0;
944 uint64_t prev_prepare_write_nanos
= 0;
945 uint64_t prev_cpu_write_nanos
= 0;
946 uint64_t prev_cpu_read_nanos
= 0;
947 if (measure_io_stats_
) {
948 prev_perf_level
= GetPerfLevel();
949 SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex
);
950 prev_write_nanos
= IOSTATS(write_nanos
);
951 prev_fsync_nanos
= IOSTATS(fsync_nanos
);
952 prev_range_sync_nanos
= IOSTATS(range_sync_nanos
);
953 prev_prepare_write_nanos
= IOSTATS(prepare_write_nanos
);
954 prev_cpu_write_nanos
= IOSTATS(cpu_write_nanos
);
955 prev_cpu_read_nanos
= IOSTATS(cpu_read_nanos
);
959 env_
, cfd
->user_comparator(), cfd
->ioptions()->merge_operator
,
960 compaction_filter
, db_options_
.info_log
.get(),
961 false /* internal key corruption is expected */,
962 existing_snapshots_
.empty() ? 0 : existing_snapshots_
.back(),
963 snapshot_checker_
, compact_
->compaction
->level(),
964 db_options_
.statistics
.get());
966 const MutableCFOptions
* mutable_cf_options
=
967 sub_compact
->compaction
->mutable_cf_options();
968 assert(mutable_cf_options
);
970 std::vector
<std::string
> blob_file_paths
;
972 std::unique_ptr
<BlobFileBuilder
> blob_file_builder(
973 mutable_cf_options
->enable_blob_files
974 ? new BlobFileBuilder(
975 versions_
, env_
, fs_
.get(),
976 sub_compact
->compaction
->immutable_cf_options(),
977 mutable_cf_options
, &file_options_
, job_id_
, cfd
->GetID(),
978 cfd
->GetName(), Env::IOPriority::IO_LOW
, write_hint_
,
979 &blob_file_paths
, &sub_compact
->blob_file_additions
)
982 TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
983 TEST_SYNC_POINT_CALLBACK(
984 "CompactionJob::Run():PausingManualCompaction:1",
985 reinterpret_cast<void*>(
986 const_cast<std::atomic
<int>*>(manual_compaction_paused_
)));
988 Slice
* start
= sub_compact
->start
;
989 Slice
* end
= sub_compact
->end
;
990 if (start
!= nullptr) {
992 start_iter
.SetInternalKey(*start
, kMaxSequenceNumber
, kValueTypeForSeek
);
993 input
->Seek(start_iter
.GetInternalKey());
995 input
->SeekToFirst();
999 const std::string
* const full_history_ts_low
=
1000 full_history_ts_low_
.empty() ? nullptr : &full_history_ts_low_
;
1001 sub_compact
->c_iter
.reset(new CompactionIterator(
1002 input
.get(), cfd
->user_comparator(), &merge
, versions_
->LastSequence(),
1003 &existing_snapshots_
, earliest_write_conflict_snapshot_
,
1004 snapshot_checker_
, env_
, ShouldReportDetailedTime(env_
, stats_
),
1005 /*expect_valid_internal_key=*/true, &range_del_agg
,
1006 blob_file_builder
.get(), db_options_
.allow_data_in_errors
,
1007 sub_compact
->compaction
, compaction_filter
, shutting_down_
,
1008 preserve_deletes_seqnum_
, manual_compaction_paused_
, db_options_
.info_log
,
1009 full_history_ts_low
));
1010 auto c_iter
= sub_compact
->c_iter
.get();
1011 c_iter
->SeekToFirst();
1012 if (c_iter
->Valid() && sub_compact
->compaction
->output_level() != 0) {
1013 // ShouldStopBefore() maintains state based on keys processed so far. The
1014 // compaction loop always calls it on the "next" key, thus won't tell it the
1015 // first key. So we do that here.
1016 sub_compact
->ShouldStopBefore(c_iter
->key(),
1017 sub_compact
->current_output_file_size
);
1019 const auto& c_iter_stats
= c_iter
->iter_stats();
1021 std::unique_ptr
<SstPartitioner
> partitioner
=
1022 sub_compact
->compaction
->output_level() == 0
1024 : sub_compact
->compaction
->CreateSstPartitioner();
1025 std::string last_key_for_partitioner
;
1027 while (status
.ok() && !cfd
->IsDropped() && c_iter
->Valid()) {
1028 // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
1030 const Slice
& key
= c_iter
->key();
1031 const Slice
& value
= c_iter
->value();
1033 // If an end key (exclusive) is specified, check if the current key is
1034 // >= than it and exit if it is because the iterator is out of its range
1035 if (end
!= nullptr &&
1036 cfd
->user_comparator()->Compare(c_iter
->user_key(), *end
) >= 0) {
1039 if (c_iter_stats
.num_input_records
% kRecordStatsEvery
==
1040 kRecordStatsEvery
- 1) {
1041 RecordDroppedKeys(c_iter_stats
, &sub_compact
->compaction_job_stats
);
1042 c_iter
->ResetRecordCounts();
1043 RecordCompactionIOStats();
1046 // Open output file if necessary
1047 if (sub_compact
->builder
== nullptr) {
1048 status
= OpenCompactionOutputFile(sub_compact
);
1053 status
= sub_compact
->AddToBuilder(key
, value
);
1058 sub_compact
->current_output_file_size
=
1059 sub_compact
->builder
->EstimatedFileSize();
1060 const ParsedInternalKey
& ikey
= c_iter
->ikey();
1061 sub_compact
->current_output()->meta
.UpdateBoundaries(
1062 key
, value
, ikey
.sequence
, ikey
.type
);
1063 sub_compact
->num_output_records
++;
1065 // Close output file if it is big enough. Two possibilities determine it's
1066 // time to close it: (1) the current key should be this file's last key, (2)
1067 // the next key should not be in this file.
1069 // TODO(aekmekji): determine if file should be closed earlier than this
1070 // during subcompactions (i.e. if output size, estimated by input size, is
1071 // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
1072 // and 0.6MB instead of 1MB and 0.2MB)
1073 bool output_file_ended
= false;
1074 if (sub_compact
->compaction
->output_level() != 0 &&
1075 sub_compact
->current_output_file_size
>=
1076 sub_compact
->compaction
->max_output_file_size()) {
1077 // (1) this key terminates the file. For historical reasons, the iterator
1078 // status before advancing will be given to FinishCompactionOutputFile().
1079 output_file_ended
= true;
1081 TEST_SYNC_POINT_CALLBACK(
1082 "CompactionJob::Run():PausingManualCompaction:2",
1083 reinterpret_cast<void*>(
1084 const_cast<std::atomic
<int>*>(manual_compaction_paused_
)));
1085 if (partitioner
.get()) {
1086 last_key_for_partitioner
.assign(c_iter
->user_key().data_
,
1087 c_iter
->user_key().size_
);
1090 if (c_iter
->status().IsManualCompactionPaused()) {
1093 if (!output_file_ended
&& c_iter
->Valid()) {
1094 if (((partitioner
.get() &&
1095 partitioner
->ShouldPartition(PartitionerRequest(
1096 last_key_for_partitioner
, c_iter
->user_key(),
1097 sub_compact
->current_output_file_size
)) == kRequired
) ||
1098 (sub_compact
->compaction
->output_level() != 0 &&
1099 sub_compact
->ShouldStopBefore(
1100 c_iter
->key(), sub_compact
->current_output_file_size
))) &&
1101 sub_compact
->builder
!= nullptr) {
1102 // (2) this key belongs to the next file. For historical reasons, the
1103 // iterator status after advancing will be given to
1104 // FinishCompactionOutputFile().
1105 output_file_ended
= true;
1108 if (output_file_ended
) {
1109 const Slice
* next_key
= nullptr;
1110 if (c_iter
->Valid()) {
1111 next_key
= &c_iter
->key();
1113 CompactionIterationStats range_del_out_stats
;
1114 status
= FinishCompactionOutputFile(input
->status(), sub_compact
,
1115 &range_del_agg
, &range_del_out_stats
,
1117 RecordDroppedKeys(range_del_out_stats
,
1118 &sub_compact
->compaction_job_stats
);
1122 sub_compact
->compaction_job_stats
.num_input_deletion_records
=
1123 c_iter_stats
.num_input_deletion_records
;
1124 sub_compact
->compaction_job_stats
.num_corrupt_keys
=
1125 c_iter_stats
.num_input_corrupt_records
;
1126 sub_compact
->compaction_job_stats
.num_single_del_fallthru
=
1127 c_iter_stats
.num_single_del_fallthru
;
1128 sub_compact
->compaction_job_stats
.num_single_del_mismatch
=
1129 c_iter_stats
.num_single_del_mismatch
;
1130 sub_compact
->compaction_job_stats
.total_input_raw_key_bytes
+=
1131 c_iter_stats
.total_input_raw_key_bytes
;
1132 sub_compact
->compaction_job_stats
.total_input_raw_value_bytes
+=
1133 c_iter_stats
.total_input_raw_value_bytes
;
1135 RecordTick(stats_
, FILTER_OPERATION_TOTAL_TIME
,
1136 c_iter_stats
.total_filter_time
);
1137 RecordDroppedKeys(c_iter_stats
, &sub_compact
->compaction_job_stats
);
1138 RecordCompactionIOStats();
1140 if (status
.ok() && cfd
->IsDropped()) {
1142 Status::ColumnFamilyDropped("Column family dropped during compaction");
1144 if ((status
.ok() || status
.IsColumnFamilyDropped()) &&
1145 shutting_down_
->load(std::memory_order_relaxed
)) {
1146 status
= Status::ShutdownInProgress("Database shutdown");
1148 if ((status
.ok() || status
.IsColumnFamilyDropped()) &&
1149 (manual_compaction_paused_
&&
1150 manual_compaction_paused_
->load(std::memory_order_relaxed
) > 0)) {
1151 status
= Status::Incomplete(Status::SubCode::kManualCompactionPaused
);
1154 status
= input
->status();
1157 status
= c_iter
->status();
1160 if (status
.ok() && sub_compact
->builder
== nullptr &&
1161 sub_compact
->outputs
.size() == 0 && !range_del_agg
.IsEmpty()) {
1162 // handle subcompaction containing only range deletions
1163 status
= OpenCompactionOutputFile(sub_compact
);
1166 // Call FinishCompactionOutputFile() even if status is not ok: it needs to
1167 // close the output file.
1168 if (sub_compact
->builder
!= nullptr) {
1169 CompactionIterationStats range_del_out_stats
;
1170 Status s
= FinishCompactionOutputFile(status
, sub_compact
, &range_del_agg
,
1171 &range_del_out_stats
);
1172 if (!s
.ok() && status
.ok()) {
1175 RecordDroppedKeys(range_del_out_stats
, &sub_compact
->compaction_job_stats
);
1178 if (blob_file_builder
) {
1180 status
= blob_file_builder
->Finish();
1183 blob_file_builder
.reset();
1186 sub_compact
->compaction_job_stats
.cpu_micros
=
1187 env_
->NowCPUNanos() / 1000 - prev_cpu_micros
;
1189 if (measure_io_stats_
) {
1190 sub_compact
->compaction_job_stats
.file_write_nanos
+=
1191 IOSTATS(write_nanos
) - prev_write_nanos
;
1192 sub_compact
->compaction_job_stats
.file_fsync_nanos
+=
1193 IOSTATS(fsync_nanos
) - prev_fsync_nanos
;
1194 sub_compact
->compaction_job_stats
.file_range_sync_nanos
+=
1195 IOSTATS(range_sync_nanos
) - prev_range_sync_nanos
;
1196 sub_compact
->compaction_job_stats
.file_prepare_write_nanos
+=
1197 IOSTATS(prepare_write_nanos
) - prev_prepare_write_nanos
;
1198 sub_compact
->compaction_job_stats
.cpu_micros
-=
1199 (IOSTATS(cpu_write_nanos
) - prev_cpu_write_nanos
+
1200 IOSTATS(cpu_read_nanos
) - prev_cpu_read_nanos
) /
1202 if (prev_perf_level
!= PerfLevel::kEnableTimeAndCPUTimeExceptForMutex
) {
1203 SetPerfLevel(prev_perf_level
);
1206 #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
1208 if (sub_compact
->c_iter
) {
1209 sub_compact
->c_iter
->status().PermitUncheckedError();
1212 input
->status().PermitUncheckedError();
1215 #endif // ROCKSDB_ASSERT_STATUS_CHECKED
1217 sub_compact
->c_iter
.reset();
1219 sub_compact
->status
= status
;
1222 void CompactionJob::RecordDroppedKeys(
1223 const CompactionIterationStats
& c_iter_stats
,
1224 CompactionJobStats
* compaction_job_stats
) {
1225 if (c_iter_stats
.num_record_drop_user
> 0) {
1226 RecordTick(stats_
, COMPACTION_KEY_DROP_USER
,
1227 c_iter_stats
.num_record_drop_user
);
1229 if (c_iter_stats
.num_record_drop_hidden
> 0) {
1230 RecordTick(stats_
, COMPACTION_KEY_DROP_NEWER_ENTRY
,
1231 c_iter_stats
.num_record_drop_hidden
);
1232 if (compaction_job_stats
) {
1233 compaction_job_stats
->num_records_replaced
+=
1234 c_iter_stats
.num_record_drop_hidden
;
1237 if (c_iter_stats
.num_record_drop_obsolete
> 0) {
1238 RecordTick(stats_
, COMPACTION_KEY_DROP_OBSOLETE
,
1239 c_iter_stats
.num_record_drop_obsolete
);
1240 if (compaction_job_stats
) {
1241 compaction_job_stats
->num_expired_deletion_records
+=
1242 c_iter_stats
.num_record_drop_obsolete
;
1245 if (c_iter_stats
.num_record_drop_range_del
> 0) {
1246 RecordTick(stats_
, COMPACTION_KEY_DROP_RANGE_DEL
,
1247 c_iter_stats
.num_record_drop_range_del
);
1249 if (c_iter_stats
.num_range_del_drop_obsolete
> 0) {
1250 RecordTick(stats_
, COMPACTION_RANGE_DEL_DROP_OBSOLETE
,
1251 c_iter_stats
.num_range_del_drop_obsolete
);
1253 if (c_iter_stats
.num_optimized_del_drop_obsolete
> 0) {
1254 RecordTick(stats_
, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE
,
1255 c_iter_stats
.num_optimized_del_drop_obsolete
);
1259 Status
CompactionJob::FinishCompactionOutputFile(
1260 const Status
& input_status
, SubcompactionState
* sub_compact
,
1261 CompactionRangeDelAggregator
* range_del_agg
,
1262 CompactionIterationStats
* range_del_out_stats
,
1263 const Slice
* next_table_min_key
/* = nullptr */) {
1264 AutoThreadOperationStageUpdater
stage_updater(
1265 ThreadStatus::STAGE_COMPACTION_SYNC_FILE
);
1266 assert(sub_compact
!= nullptr);
1267 assert(sub_compact
->outfile
);
1268 assert(sub_compact
->builder
!= nullptr);
1269 assert(sub_compact
->current_output() != nullptr);
1271 uint64_t output_number
= sub_compact
->current_output()->meta
.fd
.GetNumber();
1272 assert(output_number
!= 0);
1274 ColumnFamilyData
* cfd
= sub_compact
->compaction
->column_family_data();
1275 const Comparator
* ucmp
= cfd
->user_comparator();
1276 std::string file_checksum
= kUnknownFileChecksum
;
1277 std::string file_checksum_func_name
= kUnknownFileChecksumFuncName
;
1279 // Check for iterator errors
1280 Status s
= input_status
;
1281 auto meta
= &sub_compact
->current_output()->meta
;
1282 assert(meta
!= nullptr);
1284 Slice lower_bound_guard
, upper_bound_guard
;
1285 std::string smallest_user_key
;
1286 const Slice
*lower_bound
, *upper_bound
;
1287 bool lower_bound_from_sub_compact
= false;
1288 if (sub_compact
->outputs
.size() == 1) {
1289 // For the first output table, include range tombstones before the min key
1290 // but after the subcompaction boundary.
1291 lower_bound
= sub_compact
->start
;
1292 lower_bound_from_sub_compact
= true;
1293 } else if (meta
->smallest
.size() > 0) {
1294 // For subsequent output tables, only include range tombstones from min
1295 // key onwards since the previous file was extended to contain range
1296 // tombstones falling before min key.
1297 smallest_user_key
= meta
->smallest
.user_key().ToString(false /*hex*/);
1298 lower_bound_guard
= Slice(smallest_user_key
);
1299 lower_bound
= &lower_bound_guard
;
1301 lower_bound
= nullptr;
1303 if (next_table_min_key
!= nullptr) {
1304 // This may be the last file in the subcompaction in some cases, so we
1305 // need to compare the end key of subcompaction with the next file start
1306 // key. When the end key is chosen by the subcompaction, we know that
1307 // it must be the biggest key in output file. Therefore, it is safe to
1308 // use the smaller key as the upper bound of the output file, to ensure
1309 // that there is no overlapping between different output files.
1310 upper_bound_guard
= ExtractUserKey(*next_table_min_key
);
1311 if (sub_compact
->end
!= nullptr &&
1312 ucmp
->Compare(upper_bound_guard
, *sub_compact
->end
) >= 0) {
1313 upper_bound
= sub_compact
->end
;
1315 upper_bound
= &upper_bound_guard
;
1318 // This is the last file in the subcompaction, so extend until the
1319 // subcompaction ends.
1320 upper_bound
= sub_compact
->end
;
1322 auto earliest_snapshot
= kMaxSequenceNumber
;
1323 if (existing_snapshots_
.size() > 0) {
1324 earliest_snapshot
= existing_snapshots_
[0];
1326 bool has_overlapping_endpoints
;
1327 if (upper_bound
!= nullptr && meta
->largest
.size() > 0) {
1328 has_overlapping_endpoints
=
1329 ucmp
->Compare(meta
->largest
.user_key(), *upper_bound
) == 0;
1331 has_overlapping_endpoints
= false;
1334 // The end key of the subcompaction must be bigger or equal to the upper
1335 // bound. If the end of subcompaction is null or the upper bound is null,
1336 // it means that this file is the last file in the compaction. So there
1337 // will be no overlapping between this file and others.
1338 assert(sub_compact
->end
== nullptr ||
1339 upper_bound
== nullptr ||
1340 ucmp
->Compare(*upper_bound
, *sub_compact
->end
) <= 0);
1341 auto it
= range_del_agg
->NewIterator(lower_bound
, upper_bound
,
1342 has_overlapping_endpoints
);
1343 // Position the range tombstone output iterator. There may be tombstone
1344 // fragments that are entirely out of range, so make sure that we do not
1346 if (lower_bound
!= nullptr) {
1347 it
->Seek(*lower_bound
);
1351 TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
1352 for (; it
->Valid(); it
->Next()) {
1353 auto tombstone
= it
->Tombstone();
1354 if (upper_bound
!= nullptr) {
1355 int cmp
= ucmp
->Compare(*upper_bound
, tombstone
.start_key_
);
1356 if ((has_overlapping_endpoints
&& cmp
< 0) ||
1357 (!has_overlapping_endpoints
&& cmp
<= 0)) {
1358 // Tombstones starting after upper_bound only need to be included in
1359 // the next table. If the current SST ends before upper_bound, i.e.,
1360 // `has_overlapping_endpoints == false`, we can also skip over range
1361 // tombstones that start exactly at upper_bound. Such range tombstones
1362 // will be included in the next file and are not relevant to the point
1363 // keys or endpoints of the current file.
1368 if (bottommost_level_
&& tombstone
.seq_
<= earliest_snapshot
) {
1369 // TODO(andrewkr): tombstones that span multiple output files are
1370 // counted for each compaction output file, so lots of double counting.
1371 range_del_out_stats
->num_range_del_drop_obsolete
++;
1372 range_del_out_stats
->num_record_drop_obsolete
++;
1376 auto kv
= tombstone
.Serialize();
1377 assert(lower_bound
== nullptr ||
1378 ucmp
->Compare(*lower_bound
, kv
.second
) < 0);
1379 // Range tombstone is not supported by output validator yet.
1380 sub_compact
->builder
->Add(kv
.first
.Encode(), kv
.second
);
1381 InternalKey smallest_candidate
= std::move(kv
.first
);
1382 if (lower_bound
!= nullptr &&
1383 ucmp
->Compare(smallest_candidate
.user_key(), *lower_bound
) <= 0) {
1384 // Pretend the smallest key has the same user key as lower_bound
1385 // (the max key in the previous table or subcompaction) in order for
1386 // files to appear key-space partitioned.
1388 // When lower_bound is chosen by a subcompaction, we know that
1389 // subcompactions over smaller keys cannot contain any keys at
1390 // lower_bound. We also know that smaller subcompactions exist, because
1391 // otherwise the subcompaction woud be unbounded on the left. As a
1392 // result, we know that no other files on the output level will contain
1393 // actual keys at lower_bound (an output file may have a largest key of
1394 // lower_bound@kMaxSequenceNumber, but this only indicates a large range
1395 // tombstone was truncated). Therefore, it is safe to use the
1396 // tombstone's sequence number, to ensure that keys at lower_bound at
1397 // lower levels are covered by truncated tombstones.
1399 // If lower_bound was chosen by the smallest data key in the file,
1400 // choose lowest seqnum so this file's smallest internal key comes after
1401 // the previous file's largest. The fake seqnum is OK because the read
1402 // path's file-picking code only considers user key.
1403 smallest_candidate
= InternalKey(
1404 *lower_bound
, lower_bound_from_sub_compact
? tombstone
.seq_
: 0,
1405 kTypeRangeDeletion
);
1407 InternalKey largest_candidate
= tombstone
.SerializeEndKey();
1408 if (upper_bound
!= nullptr &&
1409 ucmp
->Compare(*upper_bound
, largest_candidate
.user_key()) <= 0) {
1410 // Pretend the largest key has the same user key as upper_bound (the
1411 // min key in the following table or subcompaction) in order for files
1412 // to appear key-space partitioned.
1414 // Choose highest seqnum so this file's largest internal key comes
1415 // before the next file's/subcompaction's smallest. The fake seqnum is
1416 // OK because the read path's file-picking code only considers the user
1419 // Note Seek() also creates InternalKey with (user_key,
1420 // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
1421 // kTypeRangeDeletion (0xF), so the range tombstone comes before the
1422 // Seek() key in InternalKey's ordering. So Seek() will look in the
1423 // next file for the user key.
1425 InternalKey(*upper_bound
, kMaxSequenceNumber
, kTypeRangeDeletion
);
1428 SequenceNumber smallest_ikey_seqnum
= kMaxSequenceNumber
;
1429 if (meta
->smallest
.size() > 0) {
1430 smallest_ikey_seqnum
= GetInternalKeySeqno(meta
->smallest
.Encode());
1433 meta
->UpdateBoundariesForRange(smallest_candidate
, largest_candidate
,
1435 cfd
->internal_comparator());
1437 // The smallest key in a file is used for range tombstone truncation, so
1438 // it cannot have a seqnum of 0 (unless the smallest data key in a file
1439 // has a seqnum of 0). Otherwise, the truncated tombstone may expose
1440 // deleted keys at lower levels.
1441 assert(smallest_ikey_seqnum
== 0 ||
1442 ExtractInternalKeyFooter(meta
->smallest
.Encode()) !=
1443 PackSequenceAndType(0, kTypeRangeDeletion
));
1446 const uint64_t current_entries
= sub_compact
->builder
->NumEntries();
1448 s
= sub_compact
->builder
->Finish();
1450 sub_compact
->builder
->Abandon();
1452 IOStatus io_s
= sub_compact
->builder
->io_status();
1456 const uint64_t current_bytes
= sub_compact
->builder
->FileSize();
1458 meta
->fd
.file_size
= current_bytes
;
1459 meta
->marked_for_compaction
= sub_compact
->builder
->NeedCompact();
1461 sub_compact
->current_output()->finished
= true;
1462 sub_compact
->total_bytes
+= current_bytes
;
1464 // Finish and check for file errors
1466 StopWatch
sw(env_
, stats_
, COMPACTION_OUTFILE_SYNC_MICROS
);
1467 io_s
= sub_compact
->outfile
->Sync(db_options_
.use_fsync
);
1469 if (s
.ok() && io_s
.ok()) {
1470 io_s
= sub_compact
->outfile
->Close();
1472 if (s
.ok() && io_s
.ok()) {
1473 // Add the checksum information to file metadata.
1474 meta
->file_checksum
= sub_compact
->outfile
->GetFileChecksum();
1475 meta
->file_checksum_func_name
=
1476 sub_compact
->outfile
->GetFileChecksumFuncName();
1477 file_checksum
= meta
->file_checksum
;
1478 file_checksum_func_name
= meta
->file_checksum_func_name
;
1483 if (sub_compact
->io_status
.ok()) {
1484 sub_compact
->io_status
= io_s
;
1485 // Since this error is really a copy of the
1486 // "normal" status, it does not also need to be checked
1487 sub_compact
->io_status
.PermitUncheckedError();
1489 sub_compact
->outfile
.reset();
1493 tp
= sub_compact
->builder
->GetTableProperties();
1496 if (s
.ok() && current_entries
== 0 && tp
.num_range_deletions
== 0) {
1497 // If there is nothing to output, no necessary to generate a sst file.
1498 // This happens when the output level is bottom level, at the same time
1499 // the sub_compact output nothing.
1501 TableFileName(sub_compact
->compaction
->immutable_cf_options()->cf_paths
,
1502 meta
->fd
.GetNumber(), meta
->fd
.GetPathId());
1503 env_
->DeleteFile(fname
);
1505 // Also need to remove the file from outputs, or it will be added to the
1507 assert(!sub_compact
->outputs
.empty());
1508 sub_compact
->outputs
.pop_back();
1512 if (s
.ok() && (current_entries
> 0 || tp
.num_range_deletions
> 0)) {
1513 // Output to event logger and fire events.
1514 sub_compact
->current_output()->table_properties
=
1515 std::make_shared
<TableProperties
>(tp
);
1516 ROCKS_LOG_INFO(db_options_
.info_log
,
1517 "[%s] [JOB %d] Generated table #%" PRIu64
": %" PRIu64
1518 " keys, %" PRIu64
" bytes%s",
1519 cfd
->GetName().c_str(), job_id_
, output_number
,
1520 current_entries
, current_bytes
,
1521 meta
->marked_for_compaction
? " (need compaction)" : "");
1524 FileDescriptor output_fd
;
1525 uint64_t oldest_blob_file_number
= kInvalidBlobFileNumber
;
1526 if (meta
!= nullptr) {
1528 TableFileName(sub_compact
->compaction
->immutable_cf_options()->cf_paths
,
1529 meta
->fd
.GetNumber(), meta
->fd
.GetPathId());
1530 output_fd
= meta
->fd
;
1531 oldest_blob_file_number
= meta
->oldest_blob_file_number
;
1535 EventHelpers::LogAndNotifyTableFileCreationFinished(
1536 event_logger_
, cfd
->ioptions()->listeners
, dbname_
, cfd
->GetName(), fname
,
1537 job_id_
, output_fd
, oldest_blob_file_number
, tp
,
1538 TableFileCreationReason::kCompaction
, s
, file_checksum
,
1539 file_checksum_func_name
);
1541 #ifndef ROCKSDB_LITE
1542 // Report new file to SstFileManagerImpl
1544 static_cast<SstFileManagerImpl
*>(db_options_
.sst_file_manager
.get());
1545 if (sfm
&& meta
!= nullptr && meta
->fd
.GetPathId() == 0) {
1546 Status add_s
= sfm
->OnAddFile(fname
);
1547 if (!add_s
.ok() && s
.ok()) {
1550 if (sfm
->IsMaxAllowedSpaceReached()) {
1551 // TODO(ajkr): should we return OK() if max space was reached by the final
1552 // compaction output file (similarly to how flush works when full)?
1553 s
= Status::SpaceLimit("Max allowed space was reached");
1555 "CompactionJob::FinishCompactionOutputFile:"
1556 "MaxAllowedSpaceReached");
1557 InstrumentedMutexLock
l(db_mutex_
);
1558 // Should handle return error?
1559 db_error_handler_
->SetBGError(s
, BackgroundErrorReason::kCompaction
)
1560 .PermitUncheckedError();
1565 sub_compact
->builder
.reset();
1566 sub_compact
->current_output_file_size
= 0;
1570 Status
CompactionJob::InstallCompactionResults(
1571 const MutableCFOptions
& mutable_cf_options
) {
1574 db_mutex_
->AssertHeld();
1576 auto* compaction
= compact_
->compaction
;
1579 // paranoia: verify that the files that we started with
1580 // still exist in the current version and in the same original level.
1581 // This ensures that a concurrent compaction did not erroneously
1582 // pick the same files to compact_.
1583 if (!versions_
->VerifyCompactionFileConsistency(compaction
)) {
1584 Compaction::InputLevelSummaryBuffer inputs_summary
;
1586 ROCKS_LOG_ERROR(db_options_
.info_log
, "[%s] [JOB %d] Compaction %s aborted",
1587 compaction
->column_family_data()->GetName().c_str(),
1588 job_id_
, compaction
->InputLevelSummary(&inputs_summary
));
1589 return Status::Corruption("Compaction input files inconsistent");
1593 Compaction::InputLevelSummaryBuffer inputs_summary
;
1594 ROCKS_LOG_INFO(db_options_
.info_log
,
1595 "[%s] [JOB %d] Compacted %s => %" PRIu64
" bytes",
1596 compaction
->column_family_data()->GetName().c_str(), job_id_
,
1597 compaction
->InputLevelSummary(&inputs_summary
),
1598 compact_
->total_bytes
+ compact_
->total_blob_bytes
);
1601 VersionEdit
* const edit
= compaction
->edit();
1604 // Add compaction inputs
1605 compaction
->AddInputDeletions(edit
);
1607 for (const auto& sub_compact
: compact_
->sub_compact_states
) {
1608 for (const auto& out
: sub_compact
.outputs
) {
1609 edit
->AddFile(compaction
->output_level(), out
.meta
);
1612 for (const auto& blob
: sub_compact
.blob_file_additions
) {
1613 edit
->AddBlobFile(blob
);
1617 return versions_
->LogAndApply(compaction
->column_family_data(),
1618 mutable_cf_options
, edit
, db_mutex_
,
1622 void CompactionJob::RecordCompactionIOStats() {
1623 RecordTick(stats_
, COMPACT_READ_BYTES
, IOSTATS(bytes_read
));
1624 RecordTick(stats_
, COMPACT_WRITE_BYTES
, IOSTATS(bytes_written
));
1625 CompactionReason compaction_reason
=
1626 compact_
->compaction
->compaction_reason();
1627 if (compaction_reason
== CompactionReason::kFilesMarkedForCompaction
) {
1628 RecordTick(stats_
, COMPACT_READ_BYTES_MARKED
, IOSTATS(bytes_read
));
1629 RecordTick(stats_
, COMPACT_WRITE_BYTES_MARKED
, IOSTATS(bytes_written
));
1630 } else if (compaction_reason
== CompactionReason::kPeriodicCompaction
) {
1631 RecordTick(stats_
, COMPACT_READ_BYTES_PERIODIC
, IOSTATS(bytes_read
));
1632 RecordTick(stats_
, COMPACT_WRITE_BYTES_PERIODIC
, IOSTATS(bytes_written
));
1633 } else if (compaction_reason
== CompactionReason::kTtl
) {
1634 RecordTick(stats_
, COMPACT_READ_BYTES_TTL
, IOSTATS(bytes_read
));
1635 RecordTick(stats_
, COMPACT_WRITE_BYTES_TTL
, IOSTATS(bytes_written
));
1637 ThreadStatusUtil::IncreaseThreadOperationProperty(
1638 ThreadStatus::COMPACTION_BYTES_READ
, IOSTATS(bytes_read
));
1639 IOSTATS_RESET(bytes_read
);
1640 ThreadStatusUtil::IncreaseThreadOperationProperty(
1641 ThreadStatus::COMPACTION_BYTES_WRITTEN
, IOSTATS(bytes_written
));
1642 IOSTATS_RESET(bytes_written
);
1645 Status
CompactionJob::OpenCompactionOutputFile(
1646 SubcompactionState
* sub_compact
) {
1647 assert(sub_compact
!= nullptr);
1648 assert(sub_compact
->builder
== nullptr);
1649 // no need to lock because VersionSet::next_file_number_ is atomic
1650 uint64_t file_number
= versions_
->NewFileNumber();
1652 TableFileName(sub_compact
->compaction
->immutable_cf_options()->cf_paths
,
1653 file_number
, sub_compact
->compaction
->output_path_id());
1655 ColumnFamilyData
* cfd
= sub_compact
->compaction
->column_family_data();
1656 #ifndef ROCKSDB_LITE
1657 EventHelpers::NotifyTableFileCreationStarted(
1658 cfd
->ioptions()->listeners
, dbname_
, cfd
->GetName(), fname
, job_id_
,
1659 TableFileCreationReason::kCompaction
);
1660 #endif // !ROCKSDB_LITE
1661 // Make the output file
1662 std::unique_ptr
<FSWritableFile
> writable_file
;
1664 bool syncpoint_arg
= file_options_
.use_direct_writes
;
1665 TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
1670 NewWritableFile(fs_
.get(), fname
, &writable_file
, file_options_
);
1672 if (sub_compact
->io_status
.ok()) {
1673 sub_compact
->io_status
= io_s
;
1674 // Since this error is really a copy of the io_s that is checked below as s,
1675 // it does not also need to be checked.
1676 sub_compact
->io_status
.PermitUncheckedError();
1680 db_options_
.info_log
,
1681 "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
1682 " fails at NewWritableFile with status %s",
1683 sub_compact
->compaction
->column_family_data()->GetName().c_str(),
1684 job_id_
, file_number
, s
.ToString().c_str());
1685 LogFlush(db_options_
.info_log
);
1686 EventHelpers::LogAndNotifyTableFileCreationFinished(
1687 event_logger_
, cfd
->ioptions()->listeners
, dbname_
, cfd
->GetName(),
1688 fname
, job_id_
, FileDescriptor(), kInvalidBlobFileNumber
,
1689 TableProperties(), TableFileCreationReason::kCompaction
, s
,
1690 kUnknownFileChecksum
, kUnknownFileChecksumFuncName
);
1694 // Try to figure out the output file's oldest ancester time.
1695 int64_t temp_current_time
= 0;
1696 auto get_time_status
= env_
->GetCurrentTime(&temp_current_time
);
1697 // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
1698 if (!get_time_status
.ok()) {
1699 ROCKS_LOG_WARN(db_options_
.info_log
,
1700 "Failed to get current time. Status: %s",
1701 get_time_status
.ToString().c_str());
1703 uint64_t current_time
= static_cast<uint64_t>(temp_current_time
);
1704 uint64_t oldest_ancester_time
=
1705 sub_compact
->compaction
->MinInputFileOldestAncesterTime();
1706 if (oldest_ancester_time
== port::kMaxUint64
) {
1707 oldest_ancester_time
= current_time
;
1710 // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
1713 meta
.fd
= FileDescriptor(file_number
,
1714 sub_compact
->compaction
->output_path_id(), 0);
1715 meta
.oldest_ancester_time
= oldest_ancester_time
;
1716 meta
.file_creation_time
= current_time
;
1717 sub_compact
->outputs
.emplace_back(
1718 std::move(meta
), cfd
->internal_comparator(),
1719 /*enable_order_check=*/
1720 sub_compact
->compaction
->mutable_cf_options()
1721 ->check_flush_compaction_key_order
,
1722 /*enable_hash=*/paranoid_file_checks_
);
1725 writable_file
->SetIOPriority(Env::IOPriority::IO_LOW
);
1726 writable_file
->SetWriteLifeTimeHint(write_hint_
);
1727 writable_file
->SetPreallocationBlockSize(static_cast<size_t>(
1728 sub_compact
->compaction
->OutputFilePreallocationSize()));
1729 const auto& listeners
=
1730 sub_compact
->compaction
->immutable_cf_options()->listeners
;
1731 sub_compact
->outfile
.reset(new WritableFileWriter(
1732 std::move(writable_file
), fname
, file_options_
, env_
, io_tracer_
,
1733 db_options_
.statistics
.get(), listeners
,
1734 db_options_
.file_checksum_gen_factory
.get()));
1736 // If the Column family flag is to only optimize filters for hits,
1737 // we can skip creating filters if this is the bottommost_level where
1738 // data is going to be found
1740 cfd
->ioptions()->optimize_filters_for_hits
&& bottommost_level_
;
1742 sub_compact
->builder
.reset(NewTableBuilder(
1743 *cfd
->ioptions(), *(sub_compact
->compaction
->mutable_cf_options()),
1744 cfd
->internal_comparator(), cfd
->int_tbl_prop_collector_factories(),
1745 cfd
->GetID(), cfd
->GetName(), sub_compact
->outfile
.get(),
1746 sub_compact
->compaction
->output_compression(),
1747 0 /*sample_for_compression */,
1748 sub_compact
->compaction
->output_compression_opts(),
1749 sub_compact
->compaction
->output_level(), skip_filters
,
1750 oldest_ancester_time
, 0 /* oldest_key_time */,
1751 sub_compact
->compaction
->max_output_file_size(), current_time
, db_id_
,
1753 LogFlush(db_options_
.info_log
);
1757 void CompactionJob::CleanupCompaction() {
1758 for (SubcompactionState
& sub_compact
: compact_
->sub_compact_states
) {
1759 const auto& sub_status
= sub_compact
.status
;
1761 if (sub_compact
.builder
!= nullptr) {
1762 // May happen if we get a shutdown call in the middle of compaction
1763 sub_compact
.builder
->Abandon();
1764 sub_compact
.builder
.reset();
1766 assert(!sub_status
.ok() || sub_compact
.outfile
== nullptr);
1768 for (const auto& out
: sub_compact
.outputs
) {
1769 // If this file was inserted into the table cache then remove
1770 // them here because this compaction was not committed.
1771 if (!sub_status
.ok()) {
1772 TableCache::Evict(table_cache_
.get(), out
.meta
.fd
.GetNumber());
1775 // TODO: sub_compact.io_status is not checked like status. Not sure if thats
1776 // intentional. So ignoring the io_status as of now.
1777 sub_compact
.io_status
.PermitUncheckedError();
1783 #ifndef ROCKSDB_LITE
1785 void CopyPrefix(const Slice
& src
, size_t prefix_length
, std::string
* dst
) {
1786 assert(prefix_length
> 0);
1787 size_t length
= src
.size() > prefix_length
? prefix_length
: src
.size();
1788 dst
->assign(src
.data(), length
);
1792 #endif // !ROCKSDB_LITE
1794 void CompactionJob::UpdateCompactionStats() {
1797 Compaction
* compaction
= compact_
->compaction
;
1798 compaction_stats_
.num_input_files_in_non_output_levels
= 0;
1799 compaction_stats_
.num_input_files_in_output_level
= 0;
1800 for (int input_level
= 0;
1801 input_level
< static_cast<int>(compaction
->num_input_levels());
1803 if (compaction
->level(input_level
) != compaction
->output_level()) {
1804 UpdateCompactionInputStatsHelper(
1805 &compaction_stats_
.num_input_files_in_non_output_levels
,
1806 &compaction_stats_
.bytes_read_non_output_levels
, input_level
);
1808 UpdateCompactionInputStatsHelper(
1809 &compaction_stats_
.num_input_files_in_output_level
,
1810 &compaction_stats_
.bytes_read_output_level
, input_level
);
1814 compaction_stats_
.num_output_files
=
1815 static_cast<int>(compact_
->num_output_files
) +
1816 static_cast<int>(compact_
->num_blob_output_files
);
1817 compaction_stats_
.bytes_written
=
1818 compact_
->total_bytes
+ compact_
->total_blob_bytes
;
1820 if (compaction_stats_
.num_input_records
> compact_
->num_output_records
) {
1821 compaction_stats_
.num_dropped_records
=
1822 compaction_stats_
.num_input_records
- compact_
->num_output_records
;
1826 void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files
,
1827 uint64_t* bytes_read
,
1829 const Compaction
* compaction
= compact_
->compaction
;
1830 auto num_input_files
= compaction
->num_input_files(input_level
);
1831 *num_files
+= static_cast<int>(num_input_files
);
1833 for (size_t i
= 0; i
< num_input_files
; ++i
) {
1834 const auto* file_meta
= compaction
->input(input_level
, i
);
1835 *bytes_read
+= file_meta
->fd
.GetFileSize();
1836 compaction_stats_
.num_input_records
+=
1837 static_cast<uint64_t>(file_meta
->num_entries
);
1841 void CompactionJob::UpdateCompactionJobStats(
1842 const InternalStats::CompactionStats
& stats
) const {
1843 #ifndef ROCKSDB_LITE
1844 compaction_job_stats_
->elapsed_micros
= stats
.micros
;
1846 // input information
1847 compaction_job_stats_
->total_input_bytes
=
1848 stats
.bytes_read_non_output_levels
+ stats
.bytes_read_output_level
;
1849 compaction_job_stats_
->num_input_records
= stats
.num_input_records
;
1850 compaction_job_stats_
->num_input_files
=
1851 stats
.num_input_files_in_non_output_levels
+
1852 stats
.num_input_files_in_output_level
;
1853 compaction_job_stats_
->num_input_files_at_output_level
=
1854 stats
.num_input_files_in_output_level
;
1856 // output information
1857 compaction_job_stats_
->total_output_bytes
= stats
.bytes_written
;
1858 compaction_job_stats_
->num_output_records
= compact_
->num_output_records
;
1859 compaction_job_stats_
->num_output_files
= stats
.num_output_files
;
1861 if (stats
.num_output_files
> 0) {
1862 CopyPrefix(compact_
->SmallestUserKey(),
1863 CompactionJobStats::kMaxPrefixLength
,
1864 &compaction_job_stats_
->smallest_output_key_prefix
);
1865 CopyPrefix(compact_
->LargestUserKey(), CompactionJobStats::kMaxPrefixLength
,
1866 &compaction_job_stats_
->largest_output_key_prefix
);
1870 #endif // !ROCKSDB_LITE
1873 void CompactionJob::LogCompaction() {
1874 Compaction
* compaction
= compact_
->compaction
;
1875 ColumnFamilyData
* cfd
= compaction
->column_family_data();
1877 // Let's check if anything will get logged. Don't prepare all the info if
1878 // we're not logging
1879 if (db_options_
.info_log_level
<= InfoLogLevel::INFO_LEVEL
) {
1880 Compaction::InputLevelSummaryBuffer inputs_summary
;
1882 db_options_
.info_log
, "[%s] [JOB %d] Compacting %s, score %.2f",
1883 cfd
->GetName().c_str(), job_id_
,
1884 compaction
->InputLevelSummary(&inputs_summary
), compaction
->score());
1886 compaction
->Summary(scratch
, sizeof(scratch
));
1887 ROCKS_LOG_INFO(db_options_
.info_log
, "[%s] Compaction start summary: %s\n",
1888 cfd
->GetName().c_str(), scratch
);
1889 // build event logger report
1890 auto stream
= event_logger_
->Log();
1891 stream
<< "job" << job_id_
<< "event"
1892 << "compaction_started"
1893 << "compaction_reason"
1894 << GetCompactionReasonString(compaction
->compaction_reason());
1895 for (size_t i
= 0; i
< compaction
->num_input_levels(); ++i
) {
1896 stream
<< ("files_L" + ToString(compaction
->level(i
)));
1897 stream
.StartArray();
1898 for (auto f
: *compaction
->inputs(i
)) {
1899 stream
<< f
->fd
.GetNumber();
1903 stream
<< "score" << compaction
->score() << "input_data_size"
1904 << compaction
->CalculateTotalInputSize();
1908 } // namespace ROCKSDB_NAMESPACE