1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "db/compaction/compaction_job.h"
20 #include "db/blob/blob_counting_iterator.h"
21 #include "db/blob/blob_file_addition.h"
22 #include "db/blob/blob_file_builder.h"
23 #include "db/builder.h"
24 #include "db/compaction/clipping_iterator.h"
25 #include "db/compaction/compaction_state.h"
26 #include "db/db_impl/db_impl.h"
27 #include "db/dbformat.h"
28 #include "db/error_handler.h"
29 #include "db/event_helpers.h"
30 #include "db/history_trimming_iterator.h"
31 #include "db/log_writer.h"
32 #include "db/merge_helper.h"
33 #include "db/range_del_aggregator.h"
34 #include "db/version_edit.h"
35 #include "db/version_set.h"
36 #include "file/filename.h"
37 #include "file/read_write_util.h"
38 #include "file/sst_file_manager_impl.h"
39 #include "file/writable_file_writer.h"
40 #include "logging/log_buffer.h"
41 #include "logging/logging.h"
42 #include "monitoring/iostats_context_imp.h"
43 #include "monitoring/thread_status_util.h"
44 #include "options/configurable_helper.h"
45 #include "options/options_helper.h"
46 #include "port/port.h"
47 #include "rocksdb/db.h"
48 #include "rocksdb/env.h"
49 #include "rocksdb/options.h"
50 #include "rocksdb/statistics.h"
51 #include "rocksdb/status.h"
52 #include "rocksdb/table.h"
53 #include "rocksdb/utilities/options_type.h"
54 #include "table/merging_iterator.h"
55 #include "table/table_builder.h"
56 #include "table/unique_id_impl.h"
57 #include "test_util/sync_point.h"
58 #include "util/stop_watch.h"
60 namespace ROCKSDB_NAMESPACE
{
62 const char* GetCompactionReasonString(CompactionReason compaction_reason
) {
63 switch (compaction_reason
) {
64 case CompactionReason::kUnknown
:
66 case CompactionReason::kLevelL0FilesNum
:
67 return "LevelL0FilesNum";
68 case CompactionReason::kLevelMaxLevelSize
:
69 return "LevelMaxLevelSize";
70 case CompactionReason::kUniversalSizeAmplification
:
71 return "UniversalSizeAmplification";
72 case CompactionReason::kUniversalSizeRatio
:
73 return "UniversalSizeRatio";
74 case CompactionReason::kUniversalSortedRunNum
:
75 return "UniversalSortedRunNum";
76 case CompactionReason::kFIFOMaxSize
:
78 case CompactionReason::kFIFOReduceNumFiles
:
79 return "FIFOReduceNumFiles";
80 case CompactionReason::kFIFOTtl
:
82 case CompactionReason::kManualCompaction
:
83 return "ManualCompaction";
84 case CompactionReason::kFilesMarkedForCompaction
:
85 return "FilesMarkedForCompaction";
86 case CompactionReason::kBottommostFiles
:
87 return "BottommostFiles";
88 case CompactionReason::kTtl
:
90 case CompactionReason::kFlush
:
92 case CompactionReason::kExternalSstIngestion
:
93 return "ExternalSstIngestion";
94 case CompactionReason::kPeriodicCompaction
:
95 return "PeriodicCompaction";
96 case CompactionReason::kChangeTemperature
:
97 return "ChangeTemperature";
98 case CompactionReason::kForcedBlobGC
:
99 return "ForcedBlobGC";
100 case CompactionReason::kRoundRobinTtl
:
101 return "RoundRobinTtl";
102 case CompactionReason::kNumOfReasons
:
110 const char* GetCompactionPenultimateOutputRangeTypeString(
111 Compaction::PenultimateOutputRangeType range_type
) {
112 switch (range_type
) {
113 case Compaction::PenultimateOutputRangeType::kNotSupported
:
114 return "NotSupported";
115 case Compaction::PenultimateOutputRangeType::kFullRange
:
117 case Compaction::PenultimateOutputRangeType::kNonLastRange
:
118 return "NonLastRange";
119 case Compaction::PenultimateOutputRangeType::kDisabled
:
127 CompactionJob::CompactionJob(
128 int job_id
, Compaction
* compaction
, const ImmutableDBOptions
& db_options
,
129 const MutableDBOptions
& mutable_db_options
, const FileOptions
& file_options
,
130 VersionSet
* versions
, const std::atomic
<bool>* shutting_down
,
131 LogBuffer
* log_buffer
, FSDirectory
* db_directory
,
132 FSDirectory
* output_directory
, FSDirectory
* blob_output_directory
,
133 Statistics
* stats
, InstrumentedMutex
* db_mutex
,
134 ErrorHandler
* db_error_handler
,
135 std::vector
<SequenceNumber
> existing_snapshots
,
136 SequenceNumber earliest_write_conflict_snapshot
,
137 const SnapshotChecker
* snapshot_checker
, JobContext
* job_context
,
138 std::shared_ptr
<Cache
> table_cache
, EventLogger
* event_logger
,
139 bool paranoid_file_checks
, bool measure_io_stats
, const std::string
& dbname
,
140 CompactionJobStats
* compaction_job_stats
, Env::Priority thread_pri
,
141 const std::shared_ptr
<IOTracer
>& io_tracer
,
142 const std::atomic
<bool>& manual_compaction_canceled
,
143 const std::string
& db_id
, const std::string
& db_session_id
,
144 std::string full_history_ts_low
, std::string trim_ts
,
145 BlobFileCompletionCallback
* blob_callback
, int* bg_compaction_scheduled
,
146 int* bg_bottom_compaction_scheduled
)
147 : compact_(new CompactionState(compaction
)),
148 compaction_stats_(compaction
->compaction_reason(), 1),
149 db_options_(db_options
),
150 mutable_db_options_copy_(mutable_db_options
),
151 log_buffer_(log_buffer
),
152 output_directory_(output_directory
),
154 bottommost_level_(false),
155 write_hint_(Env::WLTH_NOT_SET
),
156 compaction_job_stats_(compaction_job_stats
),
160 db_session_id_(db_session_id
),
161 file_options_(file_options
),
162 env_(db_options
.env
),
163 io_tracer_(io_tracer
),
164 fs_(db_options
.fs
, io_tracer
),
165 file_options_for_read_(
166 fs_
->OptimizeForCompactionTableRead(file_options
, db_options_
)),
168 shutting_down_(shutting_down
),
169 manual_compaction_canceled_(manual_compaction_canceled
),
170 db_directory_(db_directory
),
171 blob_output_directory_(blob_output_directory
),
173 db_error_handler_(db_error_handler
),
174 existing_snapshots_(std::move(existing_snapshots
)),
175 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot
),
176 snapshot_checker_(snapshot_checker
),
177 job_context_(job_context
),
178 table_cache_(std::move(table_cache
)),
179 event_logger_(event_logger
),
180 paranoid_file_checks_(paranoid_file_checks
),
181 measure_io_stats_(measure_io_stats
),
182 thread_pri_(thread_pri
),
183 full_history_ts_low_(std::move(full_history_ts_low
)),
184 trim_ts_(std::move(trim_ts
)),
185 blob_callback_(blob_callback
),
186 extra_num_subcompaction_threads_reserved_(0),
187 bg_compaction_scheduled_(bg_compaction_scheduled
),
188 bg_bottom_compaction_scheduled_(bg_bottom_compaction_scheduled
) {
189 assert(compaction_job_stats_
!= nullptr);
190 assert(log_buffer_
!= nullptr);
192 const auto* cfd
= compact_
->compaction
->column_family_data();
193 ThreadStatusUtil::SetColumnFamily(cfd
, cfd
->ioptions()->env
,
194 db_options_
.enable_thread_tracking
);
195 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION
);
196 ReportStartedCompaction(compaction
);
199 CompactionJob::~CompactionJob() {
200 assert(compact_
== nullptr);
201 ThreadStatusUtil::ResetThreadStatus();
204 void CompactionJob::ReportStartedCompaction(Compaction
* compaction
) {
205 const auto* cfd
= compact_
->compaction
->column_family_data();
206 ThreadStatusUtil::SetColumnFamily(cfd
, cfd
->ioptions()->env
,
207 db_options_
.enable_thread_tracking
);
209 ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID
,
212 ThreadStatusUtil::SetThreadOperationProperty(
213 ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL
,
214 (static_cast<uint64_t>(compact_
->compaction
->start_level()) << 32) +
215 compact_
->compaction
->output_level());
217 // In the current design, a CompactionJob is always created
218 // for non-trivial compaction.
219 assert(compaction
->IsTrivialMove() == false ||
220 compaction
->is_manual_compaction() == true);
222 ThreadStatusUtil::SetThreadOperationProperty(
223 ThreadStatus::COMPACTION_PROP_FLAGS
,
224 compaction
->is_manual_compaction() +
225 (compaction
->deletion_compaction() << 1));
227 ThreadStatusUtil::SetThreadOperationProperty(
228 ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES
,
229 compaction
->CalculateTotalInputSize());
231 IOSTATS_RESET(bytes_written
);
232 IOSTATS_RESET(bytes_read
);
233 ThreadStatusUtil::SetThreadOperationProperty(
234 ThreadStatus::COMPACTION_BYTES_WRITTEN
, 0);
235 ThreadStatusUtil::SetThreadOperationProperty(
236 ThreadStatus::COMPACTION_BYTES_READ
, 0);
238 // Set the thread operation after operation properties
239 // to ensure GetThreadList() can always show them all together.
240 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION
);
242 compaction_job_stats_
->is_manual_compaction
=
243 compaction
->is_manual_compaction();
244 compaction_job_stats_
->is_full_compaction
= compaction
->is_full_compaction();
247 void CompactionJob::Prepare() {
248 AutoThreadOperationStageUpdater
stage_updater(
249 ThreadStatus::STAGE_COMPACTION_PREPARE
);
251 // Generate file_levels_ for compaction before making Iterator
252 auto* c
= compact_
->compaction
;
253 ColumnFamilyData
* cfd
= c
->column_family_data();
254 assert(cfd
!= nullptr);
255 assert(cfd
->current()->storage_info()->NumLevelFiles(
256 compact_
->compaction
->level()) > 0);
258 write_hint_
= cfd
->CalculateSSTWriteHint(c
->output_level());
259 bottommost_level_
= c
->bottommost_level();
261 if (c
->ShouldFormSubcompactions()) {
262 StopWatch
sw(db_options_
.clock
, stats_
, SUBCOMPACTION_SETUP_TIME
);
263 GenSubcompactionBoundaries();
265 if (boundaries_
.size() > 1) {
266 for (size_t i
= 0; i
<= boundaries_
.size(); i
++) {
267 compact_
->sub_compact_states
.emplace_back(
268 c
, (i
!= 0) ? std::optional
<Slice
>(boundaries_
[i
- 1]) : std::nullopt
,
269 (i
!= boundaries_
.size()) ? std::optional
<Slice
>(boundaries_
[i
])
271 static_cast<uint32_t>(i
));
272 // assert to validate that boundaries don't have same user keys (without
274 assert(i
== 0 || i
== boundaries_
.size() ||
275 cfd
->user_comparator()->CompareWithoutTimestamp(
276 boundaries_
[i
- 1], boundaries_
[i
]) < 0);
278 RecordInHistogram(stats_
, NUM_SUBCOMPACTIONS_SCHEDULED
,
279 compact_
->sub_compact_states
.size());
281 compact_
->sub_compact_states
.emplace_back(c
, std::nullopt
, std::nullopt
,
285 // collect all seqno->time information from the input files which will be used
286 // to encode seqno->time to the output files.
287 uint64_t preserve_time_duration
=
288 std::max(c
->immutable_options()->preserve_internal_time_seconds
,
289 c
->immutable_options()->preclude_last_level_data_seconds
);
291 if (preserve_time_duration
> 0) {
292 // setup seqno_time_mapping_
293 seqno_time_mapping_
.SetMaxTimeDuration(preserve_time_duration
);
294 for (const auto& each_level
: *c
->inputs()) {
295 for (const auto& fmd
: each_level
.files
) {
296 std::shared_ptr
<const TableProperties
> tp
;
297 Status s
= cfd
->current()->GetTableProperties(&tp
, fmd
, nullptr);
299 seqno_time_mapping_
.Add(tp
->seqno_to_time_mapping
)
300 .PermitUncheckedError();
301 seqno_time_mapping_
.Add(fmd
->fd
.smallest_seqno
,
302 fmd
->oldest_ancester_time
);
307 auto status
= seqno_time_mapping_
.Sort();
309 ROCKS_LOG_WARN(db_options_
.info_log
,
310 "Invalid sequence number to time mapping: Status: %s",
311 status
.ToString().c_str());
313 int64_t _current_time
= 0;
314 status
= db_options_
.clock
->GetCurrentTime(&_current_time
);
316 ROCKS_LOG_WARN(db_options_
.info_log
,
317 "Failed to get current time in compaction: Status: %s",
318 status
.ToString().c_str());
319 // preserve all time information
320 preserve_time_min_seqno_
= 0;
321 preclude_last_level_min_seqno_
= 0;
323 seqno_time_mapping_
.TruncateOldEntries(_current_time
);
324 uint64_t preserve_time
=
325 static_cast<uint64_t>(_current_time
) > preserve_time_duration
326 ? _current_time
- preserve_time_duration
328 preserve_time_min_seqno_
=
329 seqno_time_mapping_
.GetOldestSequenceNum(preserve_time
);
330 if (c
->immutable_options()->preclude_last_level_data_seconds
> 0) {
331 uint64_t preclude_last_level_time
=
332 static_cast<uint64_t>(_current_time
) >
333 c
->immutable_options()->preclude_last_level_data_seconds
335 c
->immutable_options()->preclude_last_level_data_seconds
337 preclude_last_level_min_seqno_
=
338 seqno_time_mapping_
.GetOldestSequenceNum(preclude_last_level_time
);
344 uint64_t CompactionJob::GetSubcompactionsLimit() {
345 return extra_num_subcompaction_threads_reserved_
+
348 static_cast<uint64_t>(compact_
->compaction
->max_subcompactions()));
351 void CompactionJob::AcquireSubcompactionResources(
352 int num_extra_required_subcompactions
) {
353 TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:0");
354 TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:1");
355 int max_db_compactions
=
356 DBImpl::GetBGJobLimits(
357 mutable_db_options_copy_
.max_background_flushes
,
358 mutable_db_options_copy_
.max_background_compactions
,
359 mutable_db_options_copy_
.max_background_jobs
,
360 versions_
->GetColumnFamilySet()
362 ->NeedSpeedupCompaction())
364 InstrumentedMutexLock
l(db_mutex_
);
365 // Apply min function first since We need to compute the extra subcompaction
366 // against compaction limits. And then try to reserve threads for extra
367 // subcompactions. The actual number of reserved threads could be less than
368 // the desired number.
369 int available_bg_compactions_against_db_limit
=
370 std::max(max_db_compactions
- *bg_compaction_scheduled_
-
371 *bg_bottom_compaction_scheduled_
,
373 // Reservation only supports backgrdoun threads of which the priority is
374 // between BOTTOM and HIGH. Need to degrade the priority to HIGH if the
375 // origin thread_pri_ is higher than that. Similar to ReleaseThreads().
376 extra_num_subcompaction_threads_reserved_
=
377 env_
->ReserveThreads(std::min(num_extra_required_subcompactions
,
378 available_bg_compactions_against_db_limit
),
379 std::min(thread_pri_
, Env::Priority::HIGH
));
381 // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
382 // depending on if this compaction has the bottommost priority
383 if (thread_pri_
== Env::Priority::BOTTOM
) {
384 *bg_bottom_compaction_scheduled_
+=
385 extra_num_subcompaction_threads_reserved_
;
387 *bg_compaction_scheduled_
+= extra_num_subcompaction_threads_reserved_
;
391 void CompactionJob::ShrinkSubcompactionResources(uint64_t num_extra_resources
) {
392 // Do nothing when we have zero resources to shrink
393 if (num_extra_resources
== 0) return;
395 // We cannot release threads more than what we reserved before
396 int extra_num_subcompaction_threads_released
= env_
->ReleaseThreads(
397 (int)num_extra_resources
, std::min(thread_pri_
, Env::Priority::HIGH
));
398 // Update the number of reserved threads and the number of background
399 // scheduled compactions for this compaction job
400 extra_num_subcompaction_threads_reserved_
-=
401 extra_num_subcompaction_threads_released
;
402 // TODO (zichen): design a test case with new subcompaction partitioning
403 // when the number of actual partitions is less than the number of planned
405 assert(extra_num_subcompaction_threads_released
== (int)num_extra_resources
);
406 // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
407 // depending on if this compaction has the bottommost priority
408 if (thread_pri_
== Env::Priority::BOTTOM
) {
409 *bg_bottom_compaction_scheduled_
-=
410 extra_num_subcompaction_threads_released
;
412 *bg_compaction_scheduled_
-= extra_num_subcompaction_threads_released
;
415 TEST_SYNC_POINT("CompactionJob::ShrinkSubcompactionResources:0");
418 void CompactionJob::ReleaseSubcompactionResources() {
419 if (extra_num_subcompaction_threads_reserved_
== 0) {
423 InstrumentedMutexLock
l(db_mutex_
);
424 // The number of reserved threads becomes larger than 0 only if the
425 // compaction prioity is round robin and there is no sufficient
426 // sub-compactions available
428 // The scheduled compaction must be no less than 1 + extra number
429 // subcompactions using acquired resources since this compaction job has not
431 assert(*bg_bottom_compaction_scheduled_
>=
432 1 + extra_num_subcompaction_threads_reserved_
||
433 *bg_compaction_scheduled_
>=
434 1 + extra_num_subcompaction_threads_reserved_
);
436 ShrinkSubcompactionResources(extra_num_subcompaction_threads_reserved_
);
439 struct RangeWithSize
{
443 RangeWithSize(const Slice
& a
, const Slice
& b
, uint64_t s
= 0)
444 : range(a
, b
), size(s
) {}
447 void CompactionJob::GenSubcompactionBoundaries() {
448 // The goal is to find some boundary keys so that we can evenly partition
449 // the compaction input data into max_subcompactions ranges.
450 // For every input file, we ask TableReader to estimate 128 anchor points
451 // that evenly partition the input file into 128 ranges and the range
452 // sizes. This can be calculated by scanning index blocks of the file.
453 // Once we have the anchor points for all the input files, we merge them
454 // together and try to find keys dividing ranges evenly.
455 // For example, if we have two input files, and each returns following
457 // File1: (a1, 1000), (b1, 1200), (c1, 1100)
458 // File2: (a2, 1100), (b2, 1000), (c2, 1000)
459 // We total sort the keys to following:
460 // (a1, 1000), (a2, 1100), (b1, 1200), (b2, 1000), (c1, 1100), (c2, 1000)
461 // We calculate the total size by adding up all ranges' size, which is 6400.
462 // If we would like to partition into 2 subcompactions, the target of the
463 // range size is 3200. Based on the size, we take "b1" as the partition key
464 // since the first three ranges would hit 3200.
466 // Note that the ranges are actually overlapping. For example, in the example
467 // above, the range ending with "b1" is overlapping with the range ending with
468 // "b2". So the size 1000+1100+1200 is an underestimation of data size up to
469 // "b1". In extreme cases where we only compact N L0 files, a range can
470 // overlap with N-1 other ranges. Since we requested a relatively large number
471 // (128) of ranges from each input files, even N range overlapping would
472 // cause relatively small inaccuracy.
474 auto* c
= compact_
->compaction
;
475 if (c
->max_subcompactions() <= 1 &&
476 !(c
->immutable_options()->compaction_pri
== kRoundRobin
&&
477 c
->immutable_options()->compaction_style
== kCompactionStyleLevel
)) {
480 auto* cfd
= c
->column_family_data();
481 const Comparator
* cfd_comparator
= cfd
->user_comparator();
482 const InternalKeyComparator
& icomp
= cfd
->internal_comparator();
484 auto* v
= compact_
->compaction
->input_version();
485 int base_level
= v
->storage_info()->base_level();
486 InstrumentedMutexUnlock
unlock_guard(db_mutex_
);
488 uint64_t total_size
= 0;
489 std::vector
<TableReader::Anchor
> all_anchors
;
490 int start_lvl
= c
->start_level();
491 int out_lvl
= c
->output_level();
493 for (size_t lvl_idx
= 0; lvl_idx
< c
->num_input_levels(); lvl_idx
++) {
494 int lvl
= c
->level(lvl_idx
);
495 if (lvl
>= start_lvl
&& lvl
<= out_lvl
) {
496 const LevelFilesBrief
* flevel
= c
->input_levels(lvl_idx
);
497 size_t num_files
= flevel
->num_files
;
499 if (num_files
== 0) {
503 for (size_t i
= 0; i
< num_files
; i
++) {
504 FileMetaData
* f
= flevel
->files
[i
].file_metadata
;
505 std::vector
<TableReader::Anchor
> my_anchors
;
506 Status s
= cfd
->table_cache()->ApproximateKeyAnchors(
507 ReadOptions(), icomp
, *f
, my_anchors
);
508 if (!s
.ok() || my_anchors
.empty()) {
509 my_anchors
.emplace_back(f
->largest
.user_key(), f
->fd
.GetFileSize());
511 for (auto& ac
: my_anchors
) {
512 // Can be optimize to avoid this loop.
513 total_size
+= ac
.range_size
;
516 all_anchors
.insert(all_anchors
.end(), my_anchors
.begin(),
521 // Here we total sort all the anchor points across all files and go through
522 // them in the sorted order to find partitioning boundaries.
523 // Not the most efficient implementation. A much more efficient algorithm
524 // probably exists. But they are more complex. If performance turns out to
525 // be a problem, we can optimize.
527 all_anchors
.begin(), all_anchors
.end(),
528 [cfd_comparator
](TableReader::Anchor
& a
, TableReader::Anchor
& b
) -> bool {
529 return cfd_comparator
->CompareWithoutTimestamp(a
.user_key
, b
.user_key
) <
533 // Remove duplicated entries from boundaries.
535 std::unique(all_anchors
.begin(), all_anchors
.end(),
536 [cfd_comparator
](TableReader::Anchor
& a
,
537 TableReader::Anchor
& b
) -> bool {
538 return cfd_comparator
->CompareWithoutTimestamp(
539 a
.user_key
, b
.user_key
) == 0;
543 // Get the number of planned subcompactions, may update reserve threads
544 // and update extra_num_subcompaction_threads_reserved_ for round-robin
545 uint64_t num_planned_subcompactions
;
546 if (c
->immutable_options()->compaction_pri
== kRoundRobin
&&
547 c
->immutable_options()->compaction_style
== kCompactionStyleLevel
) {
548 // For round-robin compaction prioity, we need to employ more
549 // subcompactions (may exceed the max_subcompaction limit). The extra
550 // subcompactions will be executed using reserved threads and taken into
551 // account bg_compaction_scheduled or bg_bottom_compaction_scheduled.
553 // Initialized by the number of input files
554 num_planned_subcompactions
= static_cast<uint64_t>(c
->num_input_files(0));
555 uint64_t max_subcompactions_limit
= GetSubcompactionsLimit();
556 if (max_subcompactions_limit
< num_planned_subcompactions
) {
557 // Assert two pointers are not empty so that we can use extra
558 // subcompactions against db compaction limits
559 assert(bg_bottom_compaction_scheduled_
!= nullptr);
560 assert(bg_compaction_scheduled_
!= nullptr);
561 // Reserve resources when max_subcompaction is not sufficient
562 AcquireSubcompactionResources(
563 (int)(num_planned_subcompactions
- max_subcompactions_limit
));
564 // Subcompactions limit changes after acquiring additional resources.
565 // Need to call GetSubcompactionsLimit() again to update the number
566 // of planned subcompactions
567 num_planned_subcompactions
=
568 std::min(num_planned_subcompactions
, GetSubcompactionsLimit());
570 num_planned_subcompactions
= max_subcompactions_limit
;
573 num_planned_subcompactions
= GetSubcompactionsLimit();
576 TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:0",
577 &num_planned_subcompactions
);
578 if (num_planned_subcompactions
== 1) return;
580 // Group the ranges into subcompactions
581 uint64_t target_range_size
= std::max(
582 total_size
/ num_planned_subcompactions
,
584 *(c
->mutable_cf_options()), out_lvl
,
585 c
->immutable_options()->compaction_style
, base_level
,
586 c
->immutable_options()->level_compaction_dynamic_level_bytes
));
588 if (target_range_size
>= total_size
) {
592 uint64_t next_threshold
= target_range_size
;
593 uint64_t cumulative_size
= 0;
594 uint64_t num_actual_subcompactions
= 1U;
595 for (TableReader::Anchor
& anchor
: all_anchors
) {
596 cumulative_size
+= anchor
.range_size
;
597 if (cumulative_size
> next_threshold
) {
598 next_threshold
+= target_range_size
;
599 num_actual_subcompactions
++;
600 boundaries_
.push_back(anchor
.user_key
);
602 if (num_actual_subcompactions
== num_planned_subcompactions
) {
606 TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:1",
607 &num_actual_subcompactions
);
608 // Shrink extra subcompactions resources when extra resrouces are acquired
609 ShrinkSubcompactionResources(
610 std::min((int)(num_planned_subcompactions
- num_actual_subcompactions
),
611 extra_num_subcompaction_threads_reserved_
));
614 Status
CompactionJob::Run() {
615 AutoThreadOperationStageUpdater
stage_updater(
616 ThreadStatus::STAGE_COMPACTION_RUN
);
617 TEST_SYNC_POINT("CompactionJob::Run():Start");
618 log_buffer_
->FlushBufferToLog();
621 const size_t num_threads
= compact_
->sub_compact_states
.size();
622 assert(num_threads
> 0);
623 const uint64_t start_micros
= db_options_
.clock
->NowMicros();
625 // Launch a thread for each of subcompactions 1...num_threads-1
626 std::vector
<port::Thread
> thread_pool
;
627 thread_pool
.reserve(num_threads
- 1);
628 for (size_t i
= 1; i
< compact_
->sub_compact_states
.size(); i
++) {
629 thread_pool
.emplace_back(&CompactionJob::ProcessKeyValueCompaction
, this,
630 &compact_
->sub_compact_states
[i
]);
633 // Always schedule the first subcompaction (whether or not there are also
634 // others) in the current thread to be efficient with resources
635 ProcessKeyValueCompaction(&compact_
->sub_compact_states
[0]);
637 // Wait for all other threads (if there are any) to finish execution
638 for (auto& thread
: thread_pool
) {
642 compaction_stats_
.SetMicros(db_options_
.clock
->NowMicros() - start_micros
);
644 for (auto& state
: compact_
->sub_compact_states
) {
645 compaction_stats_
.AddCpuMicros(state
.compaction_job_stats
.cpu_micros
);
646 state
.RemoveLastEmptyOutput();
649 RecordTimeToHistogram(stats_
, COMPACTION_TIME
,
650 compaction_stats_
.stats
.micros
);
651 RecordTimeToHistogram(stats_
, COMPACTION_CPU_TIME
,
652 compaction_stats_
.stats
.cpu_micros
);
654 TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
656 // Check if any thread encountered an error during execution
659 bool wrote_new_blob_files
= false;
661 for (const auto& state
: compact_
->sub_compact_states
) {
662 if (!state
.status
.ok()) {
663 status
= state
.status
;
664 io_s
= state
.io_status
;
668 if (state
.Current().HasBlobFileAdditions()) {
669 wrote_new_blob_files
= true;
673 if (io_status_
.ok()) {
677 constexpr IODebugContext
* dbg
= nullptr;
679 if (output_directory_
) {
680 io_s
= output_directory_
->FsyncWithDirOptions(
682 DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced
));
685 if (io_s
.ok() && wrote_new_blob_files
&& blob_output_directory_
&&
686 blob_output_directory_
!= output_directory_
) {
687 io_s
= blob_output_directory_
->FsyncWithDirOptions(
689 DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced
));
692 if (io_status_
.ok()) {
700 std::vector
<const CompactionOutputs::Output
*> files_output
;
701 for (const auto& state
: compact_
->sub_compact_states
) {
702 for (const auto& output
: state
.GetOutputs()) {
703 files_output
.emplace_back(&output
);
706 ColumnFamilyData
* cfd
= compact_
->compaction
->column_family_data();
707 auto& prefix_extractor
=
708 compact_
->compaction
->mutable_cf_options()->prefix_extractor
;
709 std::atomic
<size_t> next_file_idx(0);
710 auto verify_table
= [&](Status
& output_status
) {
712 size_t file_idx
= next_file_idx
.fetch_add(1);
713 if (file_idx
>= files_output
.size()) {
716 // Verify that the table is usable
717 // We set for_compaction to false and don't
718 // OptimizeForCompactionTableRead here because this is a special case
719 // after we finish the table building No matter whether
720 // use_direct_io_for_flush_and_compaction is true, we will regard this
721 // verification as user reads since the goal is to cache it here for
722 // further user reads
723 ReadOptions read_options
;
724 InternalIterator
* iter
= cfd
->table_cache()->NewIterator(
725 read_options
, file_options_
, cfd
->internal_comparator(),
726 files_output
[file_idx
]->meta
, /*range_del_agg=*/nullptr,
728 /*table_reader_ptr=*/nullptr,
729 cfd
->internal_stats()->GetFileReadHist(
730 compact_
->compaction
->output_level()),
731 TableReaderCaller::kCompactionRefill
, /*arena=*/nullptr,
732 /*skip_filters=*/false, compact_
->compaction
->output_level(),
733 MaxFileSizeForL0MetaPin(
734 *compact_
->compaction
->mutable_cf_options()),
735 /*smallest_compaction_key=*/nullptr,
736 /*largest_compaction_key=*/nullptr,
737 /*allow_unprepared_value=*/false);
738 auto s
= iter
->status();
740 if (s
.ok() && paranoid_file_checks_
) {
741 OutputValidator
validator(cfd
->internal_comparator(),
742 /*_enable_order_check=*/true,
743 /*_enable_hash=*/true);
744 for (iter
->SeekToFirst(); iter
->Valid(); iter
->Next()) {
745 s
= validator
.Add(iter
->key(), iter
->value());
754 !validator
.CompareValidator(files_output
[file_idx
]->validator
)) {
755 s
= Status::Corruption("Paranoid checksums do not match");
767 for (size_t i
= 1; i
< compact_
->sub_compact_states
.size(); i
++) {
768 thread_pool
.emplace_back(
769 verify_table
, std::ref(compact_
->sub_compact_states
[i
].status
));
771 verify_table(compact_
->sub_compact_states
[0].status
);
772 for (auto& thread
: thread_pool
) {
776 for (const auto& state
: compact_
->sub_compact_states
) {
777 if (!state
.status
.ok()) {
778 status
= state
.status
;
784 ReleaseSubcompactionResources();
785 TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources:0");
786 TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources:1");
788 TablePropertiesCollection tp
;
789 for (const auto& state
: compact_
->sub_compact_states
) {
790 for (const auto& output
: state
.GetOutputs()) {
792 TableFileName(state
.compaction
->immutable_options()->cf_paths
,
793 output
.meta
.fd
.GetNumber(), output
.meta
.fd
.GetPathId());
794 tp
[fn
] = output
.table_properties
;
797 compact_
->compaction
->SetOutputTableProperties(std::move(tp
));
799 // Finish up all book-keeping to unify the subcompaction results
800 compact_
->AggregateCompactionStats(compaction_stats_
, *compaction_job_stats_
);
801 UpdateCompactionStats();
803 RecordCompactionIOStats();
804 LogFlush(db_options_
.info_log
);
805 TEST_SYNC_POINT("CompactionJob::Run():End");
807 compact_
->status
= status
;
811 Status
CompactionJob::Install(const MutableCFOptions
& mutable_cf_options
) {
814 AutoThreadOperationStageUpdater
stage_updater(
815 ThreadStatus::STAGE_COMPACTION_INSTALL
);
816 db_mutex_
->AssertHeld();
817 Status status
= compact_
->status
;
819 ColumnFamilyData
* cfd
= compact_
->compaction
->column_family_data();
822 int output_level
= compact_
->compaction
->output_level();
823 cfd
->internal_stats()->AddCompactionStats(output_level
, thread_pri_
,
827 status
= InstallCompactionResults(mutable_cf_options
);
829 if (!versions_
->io_status().ok()) {
830 io_status_
= versions_
->io_status();
833 VersionStorageInfo::LevelSummaryStorage tmp
;
834 auto vstorage
= cfd
->current()->storage_info();
835 const auto& stats
= compaction_stats_
.stats
;
837 double read_write_amp
= 0.0;
838 double write_amp
= 0.0;
839 double bytes_read_per_sec
= 0;
840 double bytes_written_per_sec
= 0;
842 const uint64_t bytes_read_non_output_and_blob
=
843 stats
.bytes_read_non_output_levels
+ stats
.bytes_read_blob
;
844 const uint64_t bytes_read_all
=
845 stats
.bytes_read_output_level
+ bytes_read_non_output_and_blob
;
846 const uint64_t bytes_written_all
=
847 stats
.bytes_written
+ stats
.bytes_written_blob
;
849 if (bytes_read_non_output_and_blob
> 0) {
850 read_write_amp
= (bytes_written_all
+ bytes_read_all
) /
851 static_cast<double>(bytes_read_non_output_and_blob
);
853 bytes_written_all
/ static_cast<double>(bytes_read_non_output_and_blob
);
855 if (stats
.micros
> 0) {
856 bytes_read_per_sec
= bytes_read_all
/ static_cast<double>(stats
.micros
);
857 bytes_written_per_sec
=
858 bytes_written_all
/ static_cast<double>(stats
.micros
);
861 const std::string
& column_family_name
= cfd
->GetName();
863 constexpr double kMB
= 1048576.0;
867 "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
868 "files in(%d, %d) out(%d +%d blob) "
869 "MB in(%.1f, %.1f +%.1f blob) out(%.1f +%.1f blob), "
870 "read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64
871 ", records dropped: %" PRIu64
" output_compression: %s\n",
872 column_family_name
.c_str(), vstorage
->LevelSummary(&tmp
),
873 bytes_read_per_sec
, bytes_written_per_sec
,
874 compact_
->compaction
->output_level(),
875 stats
.num_input_files_in_non_output_levels
,
876 stats
.num_input_files_in_output_level
, stats
.num_output_files
,
877 stats
.num_output_files_blob
, stats
.bytes_read_non_output_levels
/ kMB
,
878 stats
.bytes_read_output_level
/ kMB
, stats
.bytes_read_blob
/ kMB
,
879 stats
.bytes_written
/ kMB
, stats
.bytes_written_blob
/ kMB
, read_write_amp
,
880 write_amp
, status
.ToString().c_str(), stats
.num_input_records
,
881 stats
.num_dropped_records
,
882 CompressionTypeToString(compact_
->compaction
->output_compression())
885 const auto& blob_files
= vstorage
->GetBlobFiles();
886 if (!blob_files
.empty()) {
887 assert(blob_files
.front());
888 assert(blob_files
.back());
892 "[%s] Blob file summary: head=%" PRIu64
", tail=%" PRIu64
"\n",
893 column_family_name
.c_str(), blob_files
.front()->GetBlobFileNumber(),
894 blob_files
.back()->GetBlobFileNumber());
897 if (compaction_stats_
.has_penultimate_level_output
) {
900 "[%s] has Penultimate Level output: %" PRIu64
901 ", level %d, number of files: %" PRIu64
", number of records: %" PRIu64
,
902 column_family_name
.c_str(),
903 compaction_stats_
.penultimate_level_stats
.bytes_written
,
904 compact_
->compaction
->GetPenultimateLevel(),
905 compaction_stats_
.penultimate_level_stats
.num_output_files
,
906 compaction_stats_
.penultimate_level_stats
.num_output_records
);
909 UpdateCompactionJobStats(stats
);
911 auto stream
= event_logger_
->LogToBuffer(log_buffer_
, 8192);
912 stream
<< "job" << job_id_
<< "event"
913 << "compaction_finished"
914 << "compaction_time_micros" << stats
.micros
915 << "compaction_time_cpu_micros" << stats
.cpu_micros
<< "output_level"
916 << compact_
->compaction
->output_level() << "num_output_files"
917 << stats
.num_output_files
<< "total_output_size"
918 << stats
.bytes_written
;
920 if (stats
.num_output_files_blob
> 0) {
921 stream
<< "num_blob_output_files" << stats
.num_output_files_blob
922 << "total_blob_output_size" << stats
.bytes_written_blob
;
925 stream
<< "num_input_records" << stats
.num_input_records
926 << "num_output_records" << stats
.num_output_records
927 << "num_subcompactions" << compact_
->sub_compact_states
.size()
928 << "output_compression"
929 << CompressionTypeToString(compact_
->compaction
->output_compression());
931 stream
<< "num_single_delete_mismatches"
932 << compaction_job_stats_
->num_single_del_mismatch
;
933 stream
<< "num_single_delete_fallthrough"
934 << compaction_job_stats_
->num_single_del_fallthru
;
936 if (measure_io_stats_
) {
937 stream
<< "file_write_nanos" << compaction_job_stats_
->file_write_nanos
;
938 stream
<< "file_range_sync_nanos"
939 << compaction_job_stats_
->file_range_sync_nanos
;
940 stream
<< "file_fsync_nanos" << compaction_job_stats_
->file_fsync_nanos
;
941 stream
<< "file_prepare_write_nanos"
942 << compaction_job_stats_
->file_prepare_write_nanos
;
945 stream
<< "lsm_state";
947 for (int level
= 0; level
< vstorage
->num_levels(); ++level
) {
948 stream
<< vstorage
->NumLevelFiles(level
);
952 if (!blob_files
.empty()) {
953 assert(blob_files
.front());
954 stream
<< "blob_file_head" << blob_files
.front()->GetBlobFileNumber();
956 assert(blob_files
.back());
957 stream
<< "blob_file_tail" << blob_files
.back()->GetBlobFileNumber();
960 if (compaction_stats_
.has_penultimate_level_output
) {
961 InternalStats::CompactionStats
& pl_stats
=
962 compaction_stats_
.penultimate_level_stats
;
963 stream
<< "penultimate_level_num_output_files" << pl_stats
.num_output_files
;
964 stream
<< "penultimate_level_bytes_written" << pl_stats
.bytes_written
;
965 stream
<< "penultimate_level_num_output_records"
966 << pl_stats
.num_output_records
;
967 stream
<< "penultimate_level_num_output_files_blob"
968 << pl_stats
.num_output_files_blob
;
969 stream
<< "penultimate_level_bytes_written_blob"
970 << pl_stats
.bytes_written_blob
;
977 void CompactionJob::NotifyOnSubcompactionBegin(
978 SubcompactionState
* sub_compact
) {
980 Compaction
* c
= compact_
->compaction
;
982 if (db_options_
.listeners
.empty()) {
985 if (shutting_down_
->load(std::memory_order_acquire
)) {
988 if (c
->is_manual_compaction() &&
989 manual_compaction_canceled_
.load(std::memory_order_acquire
)) {
993 sub_compact
->notify_on_subcompaction_completion
= true;
995 SubcompactionJobInfo info
{};
996 sub_compact
->BuildSubcompactionJobInfo(info
);
997 info
.job_id
= static_cast<int>(job_id_
);
998 info
.thread_id
= env_
->GetThreadID();
1000 for (const auto& listener
: db_options_
.listeners
) {
1001 listener
->OnSubcompactionBegin(info
);
1003 info
.status
.PermitUncheckedError();
1007 #endif // ROCKSDB_LITE
1010 void CompactionJob::NotifyOnSubcompactionCompleted(
1011 SubcompactionState
* sub_compact
) {
1012 #ifndef ROCKSDB_LITE
1014 if (db_options_
.listeners
.empty()) {
1017 if (shutting_down_
->load(std::memory_order_acquire
)) {
1021 if (sub_compact
->notify_on_subcompaction_completion
== false) {
1025 SubcompactionJobInfo info
{};
1026 sub_compact
->BuildSubcompactionJobInfo(info
);
1027 info
.job_id
= static_cast<int>(job_id_
);
1028 info
.thread_id
= env_
->GetThreadID();
1030 for (const auto& listener
: db_options_
.listeners
) {
1031 listener
->OnSubcompactionCompleted(info
);
1035 #endif // ROCKSDB_LITE
1038 void CompactionJob::ProcessKeyValueCompaction(SubcompactionState
* sub_compact
) {
1039 assert(sub_compact
);
1040 assert(sub_compact
->compaction
);
1042 #ifndef ROCKSDB_LITE
1043 if (db_options_
.compaction_service
) {
1044 CompactionServiceJobStatus comp_status
=
1045 ProcessKeyValueCompactionWithCompactionService(sub_compact
);
1046 if (comp_status
== CompactionServiceJobStatus::kSuccess
||
1047 comp_status
== CompactionServiceJobStatus::kFailure
) {
1050 // fallback to local compaction
1051 assert(comp_status
== CompactionServiceJobStatus::kUseLocal
);
1053 #endif // !ROCKSDB_LITE
1055 uint64_t prev_cpu_micros
= db_options_
.clock
->CPUMicros();
1057 ColumnFamilyData
* cfd
= sub_compact
->compaction
->column_family_data();
1059 // Create compaction filter and fail the compaction if
1060 // IgnoreSnapshots() = false because it is not supported anymore
1061 const CompactionFilter
* compaction_filter
=
1062 cfd
->ioptions()->compaction_filter
;
1063 std::unique_ptr
<CompactionFilter
> compaction_filter_from_factory
= nullptr;
1064 if (compaction_filter
== nullptr) {
1065 compaction_filter_from_factory
=
1066 sub_compact
->compaction
->CreateCompactionFilter();
1067 compaction_filter
= compaction_filter_from_factory
.get();
1069 if (compaction_filter
!= nullptr && !compaction_filter
->IgnoreSnapshots()) {
1070 sub_compact
->status
= Status::NotSupported(
1071 "CompactionFilter::IgnoreSnapshots() = false is not supported "
1076 NotifyOnSubcompactionBegin(sub_compact
);
1078 auto range_del_agg
= std::make_unique
<CompactionRangeDelAggregator
>(
1079 &cfd
->internal_comparator(), existing_snapshots_
, &full_history_ts_low_
,
1082 // TODO: since we already use C++17, should use
1083 // std::optional<const Slice> instead.
1084 const std::optional
<Slice
> start
= sub_compact
->start
;
1085 const std::optional
<Slice
> end
= sub_compact
->end
;
1087 std::optional
<Slice
> start_without_ts
;
1088 std::optional
<Slice
> end_without_ts
;
1090 ReadOptions read_options
;
1091 read_options
.verify_checksums
= true;
1092 read_options
.fill_cache
= false;
1093 read_options
.rate_limiter_priority
= GetRateLimiterPriority();
1094 // Compaction iterators shouldn't be confined to a single prefix.
1095 // Compactions use Seek() for
1096 // (a) concurrent compactions,
1097 // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
1098 read_options
.total_order_seek
= true;
1100 // Remove the timestamps from boundaries because boundaries created in
1101 // GenSubcompactionBoundaries doesn't strip away the timestamp.
1102 size_t ts_sz
= cfd
->user_comparator()->timestamp_size();
1103 if (start
.has_value()) {
1104 read_options
.iterate_lower_bound
= &start
.value();
1106 start_without_ts
= StripTimestampFromUserKey(start
.value(), ts_sz
);
1107 read_options
.iterate_lower_bound
= &start_without_ts
.value();
1110 if (end
.has_value()) {
1111 read_options
.iterate_upper_bound
= &end
.value();
1113 end_without_ts
= StripTimestampFromUserKey(end
.value(), ts_sz
);
1114 read_options
.iterate_upper_bound
= &end_without_ts
.value();
1118 // Although the v2 aggregator is what the level iterator(s) know about,
1119 // the AddTombstones calls will be propagated down to the v1 aggregator.
1120 std::unique_ptr
<InternalIterator
> raw_input(versions_
->MakeInputIterator(
1121 read_options
, sub_compact
->compaction
, range_del_agg
.get(),
1122 file_options_for_read_
, start
, end
));
1123 InternalIterator
* input
= raw_input
.get();
1130 static constexpr char kMaxTs
[] =
1131 "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff";
1135 if (ts_sz
<= strlen(kMaxTs
)) {
1136 ts_slice
= Slice(kMaxTs
, ts_sz
);
1138 max_ts
= std::string(ts_sz
, '\xff');
1139 ts_slice
= Slice(max_ts
);
1143 if (start
.has_value()) {
1144 start_ikey
.SetInternalKey(start
.value(), kMaxSequenceNumber
,
1147 start_ikey
.UpdateInternalKey(kMaxSequenceNumber
, kValueTypeForSeek
,
1150 start_slice
= start_ikey
.GetInternalKey();
1152 if (end
.has_value()) {
1153 end_ikey
.SetInternalKey(end
.value(), kMaxSequenceNumber
, kValueTypeForSeek
);
1155 end_ikey
.UpdateInternalKey(kMaxSequenceNumber
, kValueTypeForSeek
,
1158 end_slice
= end_ikey
.GetInternalKey();
1161 std::unique_ptr
<InternalIterator
> clip
;
1162 if (start
.has_value() || end
.has_value()) {
1163 clip
= std::make_unique
<ClippingIterator
>(
1164 raw_input
.get(), start
.has_value() ? &start_slice
: nullptr,
1165 end
.has_value() ? &end_slice
: nullptr, &cfd
->internal_comparator());
1169 std::unique_ptr
<InternalIterator
> blob_counter
;
1171 if (sub_compact
->compaction
->DoesInputReferenceBlobFiles()) {
1172 BlobGarbageMeter
* meter
= sub_compact
->Current().CreateBlobGarbageMeter();
1173 blob_counter
= std::make_unique
<BlobCountingIterator
>(input
, meter
);
1174 input
= blob_counter
.get();
1177 std::unique_ptr
<InternalIterator
> trim_history_iter
;
1178 if (ts_sz
> 0 && !trim_ts_
.empty()) {
1179 trim_history_iter
= std::make_unique
<HistoryTrimmingIterator
>(
1180 input
, cfd
->user_comparator(), trim_ts_
);
1181 input
= trim_history_iter
.get();
1184 input
->SeekToFirst();
1186 AutoThreadOperationStageUpdater
stage_updater(
1187 ThreadStatus::STAGE_COMPACTION_PROCESS_KV
);
1189 // I/O measurement variables
1190 PerfLevel prev_perf_level
= PerfLevel::kEnableTime
;
1191 const uint64_t kRecordStatsEvery
= 1000;
1192 uint64_t prev_write_nanos
= 0;
1193 uint64_t prev_fsync_nanos
= 0;
1194 uint64_t prev_range_sync_nanos
= 0;
1195 uint64_t prev_prepare_write_nanos
= 0;
1196 uint64_t prev_cpu_write_nanos
= 0;
1197 uint64_t prev_cpu_read_nanos
= 0;
1198 if (measure_io_stats_
) {
1199 prev_perf_level
= GetPerfLevel();
1200 SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex
);
1201 prev_write_nanos
= IOSTATS(write_nanos
);
1202 prev_fsync_nanos
= IOSTATS(fsync_nanos
);
1203 prev_range_sync_nanos
= IOSTATS(range_sync_nanos
);
1204 prev_prepare_write_nanos
= IOSTATS(prepare_write_nanos
);
1205 prev_cpu_write_nanos
= IOSTATS(cpu_write_nanos
);
1206 prev_cpu_read_nanos
= IOSTATS(cpu_read_nanos
);
1210 env_
, cfd
->user_comparator(), cfd
->ioptions()->merge_operator
.get(),
1211 compaction_filter
, db_options_
.info_log
.get(),
1212 false /* internal key corruption is expected */,
1213 existing_snapshots_
.empty() ? 0 : existing_snapshots_
.back(),
1214 snapshot_checker_
, compact_
->compaction
->level(), db_options_
.stats
);
1216 const MutableCFOptions
* mutable_cf_options
=
1217 sub_compact
->compaction
->mutable_cf_options();
1218 assert(mutable_cf_options
);
1220 std::vector
<std::string
> blob_file_paths
;
1222 // TODO: BlobDB to support output_to_penultimate_level compaction, which needs
1223 // 2 builders, so may need to move to `CompactionOutputs`
1224 std::unique_ptr
<BlobFileBuilder
> blob_file_builder(
1225 (mutable_cf_options
->enable_blob_files
&&
1226 sub_compact
->compaction
->output_level() >=
1227 mutable_cf_options
->blob_file_starting_level
)
1228 ? new BlobFileBuilder(
1229 versions_
, fs_
.get(),
1230 sub_compact
->compaction
->immutable_options(),
1231 mutable_cf_options
, &file_options_
, db_id_
, db_session_id_
,
1232 job_id_
, cfd
->GetID(), cfd
->GetName(), Env::IOPriority::IO_LOW
,
1233 write_hint_
, io_tracer_
, blob_callback_
,
1234 BlobFileCreationReason::kCompaction
, &blob_file_paths
,
1235 sub_compact
->Current().GetBlobFileAdditionsPtr())
1238 TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
1239 TEST_SYNC_POINT_CALLBACK(
1240 "CompactionJob::Run():PausingManualCompaction:1",
1241 reinterpret_cast<void*>(
1242 const_cast<std::atomic
<bool>*>(&manual_compaction_canceled_
)));
1244 const std::string
* const full_history_ts_low
=
1245 full_history_ts_low_
.empty() ? nullptr : &full_history_ts_low_
;
1246 const SequenceNumber job_snapshot_seq
=
1247 job_context_
? job_context_
->GetJobSnapshotSequence()
1248 : kMaxSequenceNumber
;
1250 auto c_iter
= std::make_unique
<CompactionIterator
>(
1251 input
, cfd
->user_comparator(), &merge
, versions_
->LastSequence(),
1252 &existing_snapshots_
, earliest_write_conflict_snapshot_
, job_snapshot_seq
,
1253 snapshot_checker_
, env_
, ShouldReportDetailedTime(env_
, stats_
),
1254 /*expect_valid_internal_key=*/true, range_del_agg
.get(),
1255 blob_file_builder
.get(), db_options_
.allow_data_in_errors
,
1256 db_options_
.enforce_single_del_contracts
, manual_compaction_canceled_
,
1257 sub_compact
->compaction
, compaction_filter
, shutting_down_
,
1258 db_options_
.info_log
, full_history_ts_low
, preserve_time_min_seqno_
,
1259 preclude_last_level_min_seqno_
);
1260 c_iter
->SeekToFirst();
1262 // Assign range delete aggregator to the target output level, which makes sure
1263 // it only output to single level
1264 sub_compact
->AssignRangeDelAggregator(std::move(range_del_agg
));
1266 const auto& c_iter_stats
= c_iter
->iter_stats();
1268 // define the open and close functions for the compaction files, which will be
1269 // used open/close output files when needed.
1270 const CompactionFileOpenFunc open_file_func
=
1271 [this, sub_compact
](CompactionOutputs
& outputs
) {
1272 return this->OpenCompactionOutputFile(sub_compact
, outputs
);
1274 const CompactionFileCloseFunc close_file_func
=
1275 [this, sub_compact
](CompactionOutputs
& outputs
, const Status
& status
,
1276 const Slice
& next_table_min_key
) {
1277 return this->FinishCompactionOutputFile(status
, sub_compact
, outputs
,
1278 next_table_min_key
);
1282 TEST_SYNC_POINT_CALLBACK(
1283 "CompactionJob::ProcessKeyValueCompaction()::Processing",
1284 reinterpret_cast<void*>(
1285 const_cast<Compaction
*>(sub_compact
->compaction
)));
1286 while (status
.ok() && !cfd
->IsDropped() && c_iter
->Valid()) {
1287 // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
1290 assert(!end
.has_value() || cfd
->user_comparator()->Compare(
1291 c_iter
->user_key(), end
.value()) < 0);
1293 if (c_iter_stats
.num_input_records
% kRecordStatsEvery
==
1294 kRecordStatsEvery
- 1) {
1295 RecordDroppedKeys(c_iter_stats
, &sub_compact
->compaction_job_stats
);
1296 c_iter
->ResetRecordCounts();
1297 RecordCompactionIOStats();
1300 // Add current compaction_iterator key to target compaction output, if the
1301 // output file needs to be close or open, it will call the `open_file_func`
1302 // and `close_file_func`.
1303 // TODO: it would be better to have the compaction file open/close moved
1304 // into `CompactionOutputs` which has the output file information.
1305 status
= sub_compact
->AddToOutput(*c_iter
, open_file_func
, close_file_func
);
1310 TEST_SYNC_POINT_CALLBACK(
1311 "CompactionJob::Run():PausingManualCompaction:2",
1312 reinterpret_cast<void*>(
1313 const_cast<std::atomic
<bool>*>(&manual_compaction_canceled_
)));
1315 if (c_iter
->status().IsManualCompactionPaused()) {
1320 sub_compact
->compaction_job_stats
.num_blobs_read
=
1321 c_iter_stats
.num_blobs_read
;
1322 sub_compact
->compaction_job_stats
.total_blob_bytes_read
=
1323 c_iter_stats
.total_blob_bytes_read
;
1324 sub_compact
->compaction_job_stats
.num_input_deletion_records
=
1325 c_iter_stats
.num_input_deletion_records
;
1326 sub_compact
->compaction_job_stats
.num_corrupt_keys
=
1327 c_iter_stats
.num_input_corrupt_records
;
1328 sub_compact
->compaction_job_stats
.num_single_del_fallthru
=
1329 c_iter_stats
.num_single_del_fallthru
;
1330 sub_compact
->compaction_job_stats
.num_single_del_mismatch
=
1331 c_iter_stats
.num_single_del_mismatch
;
1332 sub_compact
->compaction_job_stats
.total_input_raw_key_bytes
+=
1333 c_iter_stats
.total_input_raw_key_bytes
;
1334 sub_compact
->compaction_job_stats
.total_input_raw_value_bytes
+=
1335 c_iter_stats
.total_input_raw_value_bytes
;
1337 RecordTick(stats_
, FILTER_OPERATION_TOTAL_TIME
,
1338 c_iter_stats
.total_filter_time
);
1340 if (c_iter_stats
.num_blobs_relocated
> 0) {
1341 RecordTick(stats_
, BLOB_DB_GC_NUM_KEYS_RELOCATED
,
1342 c_iter_stats
.num_blobs_relocated
);
1344 if (c_iter_stats
.total_blob_bytes_relocated
> 0) {
1345 RecordTick(stats_
, BLOB_DB_GC_BYTES_RELOCATED
,
1346 c_iter_stats
.total_blob_bytes_relocated
);
1349 RecordDroppedKeys(c_iter_stats
, &sub_compact
->compaction_job_stats
);
1350 RecordCompactionIOStats();
1352 if (status
.ok() && cfd
->IsDropped()) {
1354 Status::ColumnFamilyDropped("Column family dropped during compaction");
1356 if ((status
.ok() || status
.IsColumnFamilyDropped()) &&
1357 shutting_down_
->load(std::memory_order_relaxed
)) {
1358 status
= Status::ShutdownInProgress("Database shutdown");
1360 if ((status
.ok() || status
.IsColumnFamilyDropped()) &&
1361 (manual_compaction_canceled_
.load(std::memory_order_relaxed
))) {
1362 status
= Status::Incomplete(Status::SubCode::kManualCompactionPaused
);
1365 status
= input
->status();
1368 status
= c_iter
->status();
1371 // Call FinishCompactionOutputFile() even if status is not ok: it needs to
1372 // close the output files. Open file function is also passed, in case there's
1373 // only range-dels, no file was opened, to save the range-dels, it need to
1374 // create a new output file.
1375 status
= sub_compact
->CloseCompactionFiles(status
, open_file_func
,
1378 if (blob_file_builder
) {
1380 status
= blob_file_builder
->Finish();
1382 blob_file_builder
->Abandon(status
);
1384 blob_file_builder
.reset();
1385 sub_compact
->Current().UpdateBlobStats();
1388 sub_compact
->compaction_job_stats
.cpu_micros
=
1389 db_options_
.clock
->CPUMicros() - prev_cpu_micros
;
1391 if (measure_io_stats_
) {
1392 sub_compact
->compaction_job_stats
.file_write_nanos
+=
1393 IOSTATS(write_nanos
) - prev_write_nanos
;
1394 sub_compact
->compaction_job_stats
.file_fsync_nanos
+=
1395 IOSTATS(fsync_nanos
) - prev_fsync_nanos
;
1396 sub_compact
->compaction_job_stats
.file_range_sync_nanos
+=
1397 IOSTATS(range_sync_nanos
) - prev_range_sync_nanos
;
1398 sub_compact
->compaction_job_stats
.file_prepare_write_nanos
+=
1399 IOSTATS(prepare_write_nanos
) - prev_prepare_write_nanos
;
1400 sub_compact
->compaction_job_stats
.cpu_micros
-=
1401 (IOSTATS(cpu_write_nanos
) - prev_cpu_write_nanos
+
1402 IOSTATS(cpu_read_nanos
) - prev_cpu_read_nanos
) /
1404 if (prev_perf_level
!= PerfLevel::kEnableTimeAndCPUTimeExceptForMutex
) {
1405 SetPerfLevel(prev_perf_level
);
1408 #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
1411 c_iter
->status().PermitUncheckedError();
1414 input
->status().PermitUncheckedError();
1417 #endif // ROCKSDB_ASSERT_STATUS_CHECKED
1419 blob_counter
.reset();
1422 sub_compact
->status
= status
;
1423 NotifyOnSubcompactionCompleted(sub_compact
);
1426 uint64_t CompactionJob::GetCompactionId(SubcompactionState
* sub_compact
) const {
1427 return (uint64_t)job_id_
<< 32 | sub_compact
->sub_job_id
;
1430 void CompactionJob::RecordDroppedKeys(
1431 const CompactionIterationStats
& c_iter_stats
,
1432 CompactionJobStats
* compaction_job_stats
) {
1433 if (c_iter_stats
.num_record_drop_user
> 0) {
1434 RecordTick(stats_
, COMPACTION_KEY_DROP_USER
,
1435 c_iter_stats
.num_record_drop_user
);
1437 if (c_iter_stats
.num_record_drop_hidden
> 0) {
1438 RecordTick(stats_
, COMPACTION_KEY_DROP_NEWER_ENTRY
,
1439 c_iter_stats
.num_record_drop_hidden
);
1440 if (compaction_job_stats
) {
1441 compaction_job_stats
->num_records_replaced
+=
1442 c_iter_stats
.num_record_drop_hidden
;
1445 if (c_iter_stats
.num_record_drop_obsolete
> 0) {
1446 RecordTick(stats_
, COMPACTION_KEY_DROP_OBSOLETE
,
1447 c_iter_stats
.num_record_drop_obsolete
);
1448 if (compaction_job_stats
) {
1449 compaction_job_stats
->num_expired_deletion_records
+=
1450 c_iter_stats
.num_record_drop_obsolete
;
1453 if (c_iter_stats
.num_record_drop_range_del
> 0) {
1454 RecordTick(stats_
, COMPACTION_KEY_DROP_RANGE_DEL
,
1455 c_iter_stats
.num_record_drop_range_del
);
1457 if (c_iter_stats
.num_range_del_drop_obsolete
> 0) {
1458 RecordTick(stats_
, COMPACTION_RANGE_DEL_DROP_OBSOLETE
,
1459 c_iter_stats
.num_range_del_drop_obsolete
);
1461 if (c_iter_stats
.num_optimized_del_drop_obsolete
> 0) {
1462 RecordTick(stats_
, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE
,
1463 c_iter_stats
.num_optimized_del_drop_obsolete
);
1467 Status
CompactionJob::FinishCompactionOutputFile(
1468 const Status
& input_status
, SubcompactionState
* sub_compact
,
1469 CompactionOutputs
& outputs
, const Slice
& next_table_min_key
) {
1470 AutoThreadOperationStageUpdater
stage_updater(
1471 ThreadStatus::STAGE_COMPACTION_SYNC_FILE
);
1472 assert(sub_compact
!= nullptr);
1473 assert(outputs
.HasBuilder());
1475 FileMetaData
* meta
= outputs
.GetMetaData();
1476 uint64_t output_number
= meta
->fd
.GetNumber();
1477 assert(output_number
!= 0);
1479 ColumnFamilyData
* cfd
= sub_compact
->compaction
->column_family_data();
1480 std::string file_checksum
= kUnknownFileChecksum
;
1481 std::string file_checksum_func_name
= kUnknownFileChecksumFuncName
;
1483 // Check for iterator errors
1484 Status s
= input_status
;
1486 // Add range tombstones
1487 auto earliest_snapshot
= kMaxSequenceNumber
;
1488 if (existing_snapshots_
.size() > 0) {
1489 earliest_snapshot
= existing_snapshots_
[0];
1492 CompactionIterationStats range_del_out_stats
;
1493 // if the compaction supports per_key_placement, only output range dels to
1494 // the penultimate level.
1495 // Note: Use `bottommost_level_ = true` for both bottommost and
1496 // output_to_penultimate_level compaction here, as it's only used to decide
1497 // if range dels could be dropped.
1498 if (outputs
.HasRangeDel()) {
1499 s
= outputs
.AddRangeDels(
1500 sub_compact
->start
.has_value() ? &(sub_compact
->start
.value())
1502 sub_compact
->end
.has_value() ? &(sub_compact
->end
.value()) : nullptr,
1503 range_del_out_stats
, bottommost_level_
, cfd
->internal_comparator(),
1504 earliest_snapshot
, next_table_min_key
, full_history_ts_low_
);
1506 RecordDroppedKeys(range_del_out_stats
, &sub_compact
->compaction_job_stats
);
1507 TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
1510 const uint64_t current_entries
= outputs
.NumEntries();
1512 s
= outputs
.Finish(s
, seqno_time_mapping_
);
1515 // With accurate smallest and largest key, we can get a slightly more
1516 // accurate oldest ancester time.
1517 // This makes oldest ancester time in manifest more accurate than in
1518 // table properties. Not sure how to resolve it.
1519 if (meta
->smallest
.size() > 0 && meta
->largest
.size() > 0) {
1520 uint64_t refined_oldest_ancester_time
;
1521 Slice new_smallest
= meta
->smallest
.user_key();
1522 Slice new_largest
= meta
->largest
.user_key();
1523 if (!new_largest
.empty() && !new_smallest
.empty()) {
1524 refined_oldest_ancester_time
=
1525 sub_compact
->compaction
->MinInputFileOldestAncesterTime(
1526 &(meta
->smallest
), &(meta
->largest
));
1527 if (refined_oldest_ancester_time
!=
1528 std::numeric_limits
<uint64_t>::max()) {
1529 meta
->oldest_ancester_time
= refined_oldest_ancester_time
;
1535 // Finish and check for file errors
1536 IOStatus io_s
= outputs
.WriterSyncClose(s
, db_options_
.clock
, stats_
,
1537 db_options_
.use_fsync
);
1539 if (s
.ok() && io_s
.ok()) {
1540 file_checksum
= meta
->file_checksum
;
1541 file_checksum_func_name
= meta
->file_checksum_func_name
;
1547 if (sub_compact
->io_status
.ok()) {
1548 sub_compact
->io_status
= io_s
;
1549 // Since this error is really a copy of the
1550 // "normal" status, it does not also need to be checked
1551 sub_compact
->io_status
.PermitUncheckedError();
1556 tp
= outputs
.GetTableProperties();
1559 if (s
.ok() && current_entries
== 0 && tp
.num_range_deletions
== 0) {
1560 // If there is nothing to output, no necessary to generate a sst file.
1561 // This happens when the output level is bottom level, at the same time
1562 // the sub_compact output nothing.
1564 TableFileName(sub_compact
->compaction
->immutable_options()->cf_paths
,
1565 meta
->fd
.GetNumber(), meta
->fd
.GetPathId());
1567 // TODO(AR) it is not clear if there are any larger implications if
1568 // DeleteFile fails here
1569 Status ds
= env_
->DeleteFile(fname
);
1572 db_options_
.info_log
,
1573 "[%s] [JOB %d] Unable to remove SST file for table #%" PRIu64
1574 " at bottom level%s",
1575 cfd
->GetName().c_str(), job_id_
, output_number
,
1576 meta
->marked_for_compaction
? " (need compaction)" : "");
1579 // Also need to remove the file from outputs, or it will be added to the
1581 outputs
.RemoveLastOutput();
1585 if (s
.ok() && (current_entries
> 0 || tp
.num_range_deletions
> 0)) {
1586 // Output to event logger and fire events.
1587 outputs
.UpdateTableProperties();
1588 ROCKS_LOG_INFO(db_options_
.info_log
,
1589 "[%s] [JOB %d] Generated table #%" PRIu64
": %" PRIu64
1590 " keys, %" PRIu64
" bytes%s, temperature: %s",
1591 cfd
->GetName().c_str(), job_id_
, output_number
,
1592 current_entries
, meta
->fd
.file_size
,
1593 meta
->marked_for_compaction
? " (need compaction)" : "",
1594 temperature_to_string
[meta
->temperature
].c_str());
1597 FileDescriptor output_fd
;
1598 uint64_t oldest_blob_file_number
= kInvalidBlobFileNumber
;
1599 Status status_for_listener
= s
;
1600 if (meta
!= nullptr) {
1601 fname
= GetTableFileName(meta
->fd
.GetNumber());
1602 output_fd
= meta
->fd
;
1603 oldest_blob_file_number
= meta
->oldest_blob_file_number
;
1607 status_for_listener
= Status::Aborted("Empty SST file not kept");
1610 EventHelpers::LogAndNotifyTableFileCreationFinished(
1611 event_logger_
, cfd
->ioptions()->listeners
, dbname_
, cfd
->GetName(), fname
,
1612 job_id_
, output_fd
, oldest_blob_file_number
, tp
,
1613 TableFileCreationReason::kCompaction
, status_for_listener
, file_checksum
,
1614 file_checksum_func_name
);
1616 #ifndef ROCKSDB_LITE
1617 // Report new file to SstFileManagerImpl
1619 static_cast<SstFileManagerImpl
*>(db_options_
.sst_file_manager
.get());
1620 if (sfm
&& meta
!= nullptr && meta
->fd
.GetPathId() == 0) {
1621 Status add_s
= sfm
->OnAddFile(fname
);
1622 if (!add_s
.ok() && s
.ok()) {
1625 if (sfm
->IsMaxAllowedSpaceReached()) {
1626 // TODO(ajkr): should we return OK() if max space was reached by the final
1627 // compaction output file (similarly to how flush works when full)?
1628 s
= Status::SpaceLimit("Max allowed space was reached");
1630 "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached");
1631 InstrumentedMutexLock
l(db_mutex_
);
1632 db_error_handler_
->SetBGError(s
, BackgroundErrorReason::kCompaction
);
1637 outputs
.ResetBuilder();
1641 Status
CompactionJob::InstallCompactionResults(
1642 const MutableCFOptions
& mutable_cf_options
) {
1645 db_mutex_
->AssertHeld();
1647 auto* compaction
= compact_
->compaction
;
1651 Compaction::InputLevelSummaryBuffer inputs_summary
;
1652 if (compaction_stats_
.has_penultimate_level_output
) {
1655 "[%s] [JOB %d] Compacted %s => output_to_penultimate_level: %" PRIu64
1656 " bytes + last: %" PRIu64
" bytes. Total: %" PRIu64
" bytes",
1657 compaction
->column_family_data()->GetName().c_str(), job_id_
,
1658 compaction
->InputLevelSummary(&inputs_summary
),
1659 compaction_stats_
.penultimate_level_stats
.bytes_written
,
1660 compaction_stats_
.stats
.bytes_written
,
1661 compaction_stats_
.TotalBytesWritten());
1663 ROCKS_LOG_BUFFER(log_buffer_
,
1664 "[%s] [JOB %d] Compacted %s => %" PRIu64
" bytes",
1665 compaction
->column_family_data()->GetName().c_str(),
1666 job_id_
, compaction
->InputLevelSummary(&inputs_summary
),
1667 compaction_stats_
.TotalBytesWritten());
1671 VersionEdit
* const edit
= compaction
->edit();
1674 // Add compaction inputs
1675 compaction
->AddInputDeletions(edit
);
1677 std::unordered_map
<uint64_t, BlobGarbageMeter::BlobStats
> blob_total_garbage
;
1679 for (const auto& sub_compact
: compact_
->sub_compact_states
) {
1680 sub_compact
.AddOutputsEdit(edit
);
1682 for (const auto& blob
: sub_compact
.Current().GetBlobFileAdditions()) {
1683 edit
->AddBlobFile(blob
);
1686 if (sub_compact
.Current().GetBlobGarbageMeter()) {
1687 const auto& flows
= sub_compact
.Current().GetBlobGarbageMeter()->flows();
1689 for (const auto& pair
: flows
) {
1690 const uint64_t blob_file_number
= pair
.first
;
1691 const BlobGarbageMeter::BlobInOutFlow
& flow
= pair
.second
;
1693 assert(flow
.IsValid());
1694 if (flow
.HasGarbage()) {
1695 blob_total_garbage
[blob_file_number
].Add(flow
.GetGarbageCount(),
1696 flow
.GetGarbageBytes());
1702 for (const auto& pair
: blob_total_garbage
) {
1703 const uint64_t blob_file_number
= pair
.first
;
1704 const BlobGarbageMeter::BlobStats
& stats
= pair
.second
;
1706 edit
->AddBlobFileGarbage(blob_file_number
, stats
.GetCount(),
1710 if ((compaction
->compaction_reason() ==
1711 CompactionReason::kLevelMaxLevelSize
||
1712 compaction
->compaction_reason() == CompactionReason::kRoundRobinTtl
) &&
1713 compaction
->immutable_options()->compaction_pri
== kRoundRobin
) {
1714 int start_level
= compaction
->start_level();
1715 if (start_level
> 0) {
1716 auto vstorage
= compaction
->input_version()->storage_info();
1717 edit
->AddCompactCursor(start_level
,
1718 vstorage
->GetNextCompactCursor(
1719 start_level
, compaction
->num_input_files(0)));
1723 return versions_
->LogAndApply(compaction
->column_family_data(),
1724 mutable_cf_options
, edit
, db_mutex_
,
1728 void CompactionJob::RecordCompactionIOStats() {
1729 RecordTick(stats_
, COMPACT_READ_BYTES
, IOSTATS(bytes_read
));
1730 RecordTick(stats_
, COMPACT_WRITE_BYTES
, IOSTATS(bytes_written
));
1731 CompactionReason compaction_reason
=
1732 compact_
->compaction
->compaction_reason();
1733 if (compaction_reason
== CompactionReason::kFilesMarkedForCompaction
) {
1734 RecordTick(stats_
, COMPACT_READ_BYTES_MARKED
, IOSTATS(bytes_read
));
1735 RecordTick(stats_
, COMPACT_WRITE_BYTES_MARKED
, IOSTATS(bytes_written
));
1736 } else if (compaction_reason
== CompactionReason::kPeriodicCompaction
) {
1737 RecordTick(stats_
, COMPACT_READ_BYTES_PERIODIC
, IOSTATS(bytes_read
));
1738 RecordTick(stats_
, COMPACT_WRITE_BYTES_PERIODIC
, IOSTATS(bytes_written
));
1739 } else if (compaction_reason
== CompactionReason::kTtl
) {
1740 RecordTick(stats_
, COMPACT_READ_BYTES_TTL
, IOSTATS(bytes_read
));
1741 RecordTick(stats_
, COMPACT_WRITE_BYTES_TTL
, IOSTATS(bytes_written
));
1743 ThreadStatusUtil::IncreaseThreadOperationProperty(
1744 ThreadStatus::COMPACTION_BYTES_READ
, IOSTATS(bytes_read
));
1745 IOSTATS_RESET(bytes_read
);
1746 ThreadStatusUtil::IncreaseThreadOperationProperty(
1747 ThreadStatus::COMPACTION_BYTES_WRITTEN
, IOSTATS(bytes_written
));
1748 IOSTATS_RESET(bytes_written
);
1751 Status
CompactionJob::OpenCompactionOutputFile(SubcompactionState
* sub_compact
,
1752 CompactionOutputs
& outputs
) {
1753 assert(sub_compact
!= nullptr);
1755 // no need to lock because VersionSet::next_file_number_ is atomic
1756 uint64_t file_number
= versions_
->NewFileNumber();
1757 std::string fname
= GetTableFileName(file_number
);
1759 ColumnFamilyData
* cfd
= sub_compact
->compaction
->column_family_data();
1760 #ifndef ROCKSDB_LITE
1761 EventHelpers::NotifyTableFileCreationStarted(
1762 cfd
->ioptions()->listeners
, dbname_
, cfd
->GetName(), fname
, job_id_
,
1763 TableFileCreationReason::kCompaction
);
1764 #endif // !ROCKSDB_LITE
1765 // Make the output file
1766 std::unique_ptr
<FSWritableFile
> writable_file
;
1768 bool syncpoint_arg
= file_options_
.use_direct_writes
;
1769 TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
1773 // Pass temperature of the last level files to FileSystem.
1774 FileOptions fo_copy
= file_options_
;
1775 Temperature temperature
= sub_compact
->compaction
->output_temperature();
1776 // only set for the last level compaction and also it's not output to
1777 // penultimate level (when preclude_last_level feature is enabled)
1778 if (temperature
== Temperature::kUnknown
&&
1779 sub_compact
->compaction
->is_last_level() &&
1780 !sub_compact
->IsCurrentPenultimateLevel()) {
1782 sub_compact
->compaction
->mutable_cf_options()->last_level_temperature
;
1784 fo_copy
.temperature
= temperature
;
1787 IOStatus io_s
= NewWritableFile(fs_
.get(), fname
, &writable_file
, fo_copy
);
1789 if (sub_compact
->io_status
.ok()) {
1790 sub_compact
->io_status
= io_s
;
1791 // Since this error is really a copy of the io_s that is checked below as s,
1792 // it does not also need to be checked.
1793 sub_compact
->io_status
.PermitUncheckedError();
1797 db_options_
.info_log
,
1798 "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
1799 " fails at NewWritableFile with status %s",
1800 sub_compact
->compaction
->column_family_data()->GetName().c_str(),
1801 job_id_
, file_number
, s
.ToString().c_str());
1802 LogFlush(db_options_
.info_log
);
1803 EventHelpers::LogAndNotifyTableFileCreationFinished(
1804 event_logger_
, cfd
->ioptions()->listeners
, dbname_
, cfd
->GetName(),
1805 fname
, job_id_
, FileDescriptor(), kInvalidBlobFileNumber
,
1806 TableProperties(), TableFileCreationReason::kCompaction
, s
,
1807 kUnknownFileChecksum
, kUnknownFileChecksumFuncName
);
1811 // Try to figure out the output file's oldest ancester time.
1812 int64_t temp_current_time
= 0;
1813 auto get_time_status
= db_options_
.clock
->GetCurrentTime(&temp_current_time
);
1814 // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
1815 if (!get_time_status
.ok()) {
1816 ROCKS_LOG_WARN(db_options_
.info_log
,
1817 "Failed to get current time. Status: %s",
1818 get_time_status
.ToString().c_str());
1820 uint64_t current_time
= static_cast<uint64_t>(temp_current_time
);
1821 InternalKey tmp_start
, tmp_end
;
1822 if (sub_compact
->start
.has_value()) {
1823 tmp_start
.SetMinPossibleForUserKey(sub_compact
->start
.value());
1825 if (sub_compact
->end
.has_value()) {
1826 tmp_end
.SetMinPossibleForUserKey(sub_compact
->end
.value());
1828 uint64_t oldest_ancester_time
=
1829 sub_compact
->compaction
->MinInputFileOldestAncesterTime(
1830 sub_compact
->start
.has_value() ? &tmp_start
: nullptr,
1831 sub_compact
->end
.has_value() ? &tmp_end
: nullptr);
1832 if (oldest_ancester_time
== std::numeric_limits
<uint64_t>::max()) {
1833 oldest_ancester_time
= current_time
;
1836 // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
1839 meta
.fd
= FileDescriptor(file_number
,
1840 sub_compact
->compaction
->output_path_id(), 0);
1841 meta
.oldest_ancester_time
= oldest_ancester_time
;
1842 meta
.file_creation_time
= current_time
;
1843 meta
.temperature
= temperature
;
1844 assert(!db_id_
.empty());
1845 assert(!db_session_id_
.empty());
1846 s
= GetSstInternalUniqueId(db_id_
, db_session_id_
, meta
.fd
.GetNumber(),
1849 ROCKS_LOG_ERROR(db_options_
.info_log
,
1850 "[%s] [JOB %d] file #%" PRIu64
1851 " failed to generate unique id: %s.",
1852 cfd
->GetName().c_str(), job_id_
, meta
.fd
.GetNumber(),
1853 s
.ToString().c_str());
1857 outputs
.AddOutput(std::move(meta
), cfd
->internal_comparator(),
1858 sub_compact
->compaction
->mutable_cf_options()
1859 ->check_flush_compaction_key_order
,
1860 paranoid_file_checks_
);
1863 writable_file
->SetIOPriority(GetRateLimiterPriority());
1864 writable_file
->SetWriteLifeTimeHint(write_hint_
);
1865 FileTypeSet tmp_set
= db_options_
.checksum_handoff_file_types
;
1866 writable_file
->SetPreallocationBlockSize(static_cast<size_t>(
1867 sub_compact
->compaction
->OutputFilePreallocationSize()));
1868 const auto& listeners
=
1869 sub_compact
->compaction
->immutable_options()->listeners
;
1870 outputs
.AssignFileWriter(new WritableFileWriter(
1871 std::move(writable_file
), fname
, fo_copy
, db_options_
.clock
, io_tracer_
,
1872 db_options_
.stats
, listeners
, db_options_
.file_checksum_gen_factory
.get(),
1873 tmp_set
.Contains(FileType::kTableFile
), false));
1875 TableBuilderOptions
tboptions(
1876 *cfd
->ioptions(), *(sub_compact
->compaction
->mutable_cf_options()),
1877 cfd
->internal_comparator(), cfd
->int_tbl_prop_collector_factories(),
1878 sub_compact
->compaction
->output_compression(),
1879 sub_compact
->compaction
->output_compression_opts(), cfd
->GetID(),
1880 cfd
->GetName(), sub_compact
->compaction
->output_level(),
1881 bottommost_level_
, TableFileCreationReason::kCompaction
,
1882 0 /* oldest_key_time */, current_time
, db_id_
, db_session_id_
,
1883 sub_compact
->compaction
->max_output_file_size(), file_number
);
1885 outputs
.NewBuilder(tboptions
);
1887 LogFlush(db_options_
.info_log
);
1891 void CompactionJob::CleanupCompaction() {
1892 for (SubcompactionState
& sub_compact
: compact_
->sub_compact_states
) {
1893 sub_compact
.Cleanup(table_cache_
.get());
1899 #ifndef ROCKSDB_LITE
1901 void CopyPrefix(const Slice
& src
, size_t prefix_length
, std::string
* dst
) {
1902 assert(prefix_length
> 0);
1903 size_t length
= src
.size() > prefix_length
? prefix_length
: src
.size();
1904 dst
->assign(src
.data(), length
);
1908 #endif // !ROCKSDB_LITE
1910 void CompactionJob::UpdateCompactionStats() {
1913 Compaction
* compaction
= compact_
->compaction
;
1914 compaction_stats_
.stats
.num_input_files_in_non_output_levels
= 0;
1915 compaction_stats_
.stats
.num_input_files_in_output_level
= 0;
1916 for (int input_level
= 0;
1917 input_level
< static_cast<int>(compaction
->num_input_levels());
1919 if (compaction
->level(input_level
) != compaction
->output_level()) {
1920 UpdateCompactionInputStatsHelper(
1921 &compaction_stats_
.stats
.num_input_files_in_non_output_levels
,
1922 &compaction_stats_
.stats
.bytes_read_non_output_levels
, input_level
);
1924 UpdateCompactionInputStatsHelper(
1925 &compaction_stats_
.stats
.num_input_files_in_output_level
,
1926 &compaction_stats_
.stats
.bytes_read_output_level
, input_level
);
1930 assert(compaction_job_stats_
);
1931 compaction_stats_
.stats
.bytes_read_blob
=
1932 compaction_job_stats_
->total_blob_bytes_read
;
1934 compaction_stats_
.stats
.num_dropped_records
=
1935 compaction_stats_
.DroppedRecords();
1938 void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files
,
1939 uint64_t* bytes_read
,
1941 const Compaction
* compaction
= compact_
->compaction
;
1942 auto num_input_files
= compaction
->num_input_files(input_level
);
1943 *num_files
+= static_cast<int>(num_input_files
);
1945 for (size_t i
= 0; i
< num_input_files
; ++i
) {
1946 const auto* file_meta
= compaction
->input(input_level
, i
);
1947 *bytes_read
+= file_meta
->fd
.GetFileSize();
1948 compaction_stats_
.stats
.num_input_records
+=
1949 static_cast<uint64_t>(file_meta
->num_entries
);
1953 void CompactionJob::UpdateCompactionJobStats(
1954 const InternalStats::CompactionStats
& stats
) const {
1955 #ifndef ROCKSDB_LITE
1956 compaction_job_stats_
->elapsed_micros
= stats
.micros
;
1958 // input information
1959 compaction_job_stats_
->total_input_bytes
=
1960 stats
.bytes_read_non_output_levels
+ stats
.bytes_read_output_level
;
1961 compaction_job_stats_
->num_input_records
= stats
.num_input_records
;
1962 compaction_job_stats_
->num_input_files
=
1963 stats
.num_input_files_in_non_output_levels
+
1964 stats
.num_input_files_in_output_level
;
1965 compaction_job_stats_
->num_input_files_at_output_level
=
1966 stats
.num_input_files_in_output_level
;
1968 // output information
1969 compaction_job_stats_
->total_output_bytes
= stats
.bytes_written
;
1970 compaction_job_stats_
->total_output_bytes_blob
= stats
.bytes_written_blob
;
1971 compaction_job_stats_
->num_output_records
= stats
.num_output_records
;
1972 compaction_job_stats_
->num_output_files
= stats
.num_output_files
;
1973 compaction_job_stats_
->num_output_files_blob
= stats
.num_output_files_blob
;
1975 if (stats
.num_output_files
> 0) {
1976 CopyPrefix(compact_
->SmallestUserKey(),
1977 CompactionJobStats::kMaxPrefixLength
,
1978 &compaction_job_stats_
->smallest_output_key_prefix
);
1979 CopyPrefix(compact_
->LargestUserKey(), CompactionJobStats::kMaxPrefixLength
,
1980 &compaction_job_stats_
->largest_output_key_prefix
);
1984 #endif // !ROCKSDB_LITE
1987 void CompactionJob::LogCompaction() {
1988 Compaction
* compaction
= compact_
->compaction
;
1989 ColumnFamilyData
* cfd
= compaction
->column_family_data();
1991 // Let's check if anything will get logged. Don't prepare all the info if
1992 // we're not logging
1993 if (db_options_
.info_log_level
<= InfoLogLevel::INFO_LEVEL
) {
1994 Compaction::InputLevelSummaryBuffer inputs_summary
;
1996 db_options_
.info_log
, "[%s] [JOB %d] Compacting %s, score %.2f",
1997 cfd
->GetName().c_str(), job_id_
,
1998 compaction
->InputLevelSummary(&inputs_summary
), compaction
->score());
2000 compaction
->Summary(scratch
, sizeof(scratch
));
2001 ROCKS_LOG_INFO(db_options_
.info_log
, "[%s]: Compaction start summary: %s\n",
2002 cfd
->GetName().c_str(), scratch
);
2003 // build event logger report
2004 auto stream
= event_logger_
->Log();
2005 stream
<< "job" << job_id_
<< "event"
2006 << "compaction_started"
2007 << "compaction_reason"
2008 << GetCompactionReasonString(compaction
->compaction_reason());
2009 for (size_t i
= 0; i
< compaction
->num_input_levels(); ++i
) {
2010 stream
<< ("files_L" + std::to_string(compaction
->level(i
)));
2011 stream
.StartArray();
2012 for (auto f
: *compaction
->inputs(i
)) {
2013 stream
<< f
->fd
.GetNumber();
2017 stream
<< "score" << compaction
->score() << "input_data_size"
2018 << compaction
->CalculateTotalInputSize() << "oldest_snapshot_seqno"
2019 << (existing_snapshots_
.empty()
2020 ? int64_t{-1} // Use -1 for "none"
2021 : static_cast<int64_t>(existing_snapshots_
[0]));
2022 if (compaction
->SupportsPerKeyPlacement()) {
2023 stream
<< "preclude_last_level_min_seqno"
2024 << preclude_last_level_min_seqno_
;
2025 stream
<< "penultimate_output_level" << compaction
->GetPenultimateLevel();
2026 stream
<< "penultimate_output_range"
2027 << GetCompactionPenultimateOutputRangeTypeString(
2028 compaction
->GetPenultimateOutputRangeType());
2030 if (compaction
->GetPenultimateOutputRangeType() ==
2031 Compaction::PenultimateOutputRangeType::kDisabled
) {
2033 db_options_
.info_log
,
2034 "[%s] [JOB %d] Penultimate level output is disabled, likely "
2035 "because of the range conflict in the penultimate level",
2036 cfd
->GetName().c_str(), job_id_
);
2042 std::string
CompactionJob::GetTableFileName(uint64_t file_number
) {
2043 return TableFileName(compact_
->compaction
->immutable_options()->cf_paths
,
2044 file_number
, compact_
->compaction
->output_path_id());
2047 Env::IOPriority
CompactionJob::GetRateLimiterPriority() {
2048 if (versions_
&& versions_
->GetColumnFamilySet() &&
2049 versions_
->GetColumnFamilySet()->write_controller()) {
2050 WriteController
* write_controller
=
2051 versions_
->GetColumnFamilySet()->write_controller();
2052 if (write_controller
->NeedsDelay() || write_controller
->IsStopped()) {
2053 return Env::IO_USER
;
2060 } // namespace ROCKSDB_NAMESPACE