]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction/compaction_job.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_job.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/compaction/compaction_job.h"
11
12 #include <algorithm>
13 #include <cinttypes>
14 #include <memory>
15 #include <optional>
16 #include <set>
17 #include <utility>
18 #include <vector>
19
20 #include "db/blob/blob_counting_iterator.h"
21 #include "db/blob/blob_file_addition.h"
22 #include "db/blob/blob_file_builder.h"
23 #include "db/builder.h"
24 #include "db/compaction/clipping_iterator.h"
25 #include "db/compaction/compaction_state.h"
26 #include "db/db_impl/db_impl.h"
27 #include "db/dbformat.h"
28 #include "db/error_handler.h"
29 #include "db/event_helpers.h"
30 #include "db/history_trimming_iterator.h"
31 #include "db/log_writer.h"
32 #include "db/merge_helper.h"
33 #include "db/range_del_aggregator.h"
34 #include "db/version_edit.h"
35 #include "db/version_set.h"
36 #include "file/filename.h"
37 #include "file/read_write_util.h"
38 #include "file/sst_file_manager_impl.h"
39 #include "file/writable_file_writer.h"
40 #include "logging/log_buffer.h"
41 #include "logging/logging.h"
42 #include "monitoring/iostats_context_imp.h"
43 #include "monitoring/thread_status_util.h"
44 #include "options/configurable_helper.h"
45 #include "options/options_helper.h"
46 #include "port/port.h"
47 #include "rocksdb/db.h"
48 #include "rocksdb/env.h"
49 #include "rocksdb/options.h"
50 #include "rocksdb/statistics.h"
51 #include "rocksdb/status.h"
52 #include "rocksdb/table.h"
53 #include "rocksdb/utilities/options_type.h"
54 #include "table/merging_iterator.h"
55 #include "table/table_builder.h"
56 #include "table/unique_id_impl.h"
57 #include "test_util/sync_point.h"
58 #include "util/stop_watch.h"
59
60 namespace ROCKSDB_NAMESPACE {
61
62 const char* GetCompactionReasonString(CompactionReason compaction_reason) {
63 switch (compaction_reason) {
64 case CompactionReason::kUnknown:
65 return "Unknown";
66 case CompactionReason::kLevelL0FilesNum:
67 return "LevelL0FilesNum";
68 case CompactionReason::kLevelMaxLevelSize:
69 return "LevelMaxLevelSize";
70 case CompactionReason::kUniversalSizeAmplification:
71 return "UniversalSizeAmplification";
72 case CompactionReason::kUniversalSizeRatio:
73 return "UniversalSizeRatio";
74 case CompactionReason::kUniversalSortedRunNum:
75 return "UniversalSortedRunNum";
76 case CompactionReason::kFIFOMaxSize:
77 return "FIFOMaxSize";
78 case CompactionReason::kFIFOReduceNumFiles:
79 return "FIFOReduceNumFiles";
80 case CompactionReason::kFIFOTtl:
81 return "FIFOTtl";
82 case CompactionReason::kManualCompaction:
83 return "ManualCompaction";
84 case CompactionReason::kFilesMarkedForCompaction:
85 return "FilesMarkedForCompaction";
86 case CompactionReason::kBottommostFiles:
87 return "BottommostFiles";
88 case CompactionReason::kTtl:
89 return "Ttl";
90 case CompactionReason::kFlush:
91 return "Flush";
92 case CompactionReason::kExternalSstIngestion:
93 return "ExternalSstIngestion";
94 case CompactionReason::kPeriodicCompaction:
95 return "PeriodicCompaction";
96 case CompactionReason::kChangeTemperature:
97 return "ChangeTemperature";
98 case CompactionReason::kForcedBlobGC:
99 return "ForcedBlobGC";
100 case CompactionReason::kRoundRobinTtl:
101 return "RoundRobinTtl";
102 case CompactionReason::kNumOfReasons:
103 // fall through
104 default:
105 assert(false);
106 return "Invalid";
107 }
108 }
109
110 const char* GetCompactionPenultimateOutputRangeTypeString(
111 Compaction::PenultimateOutputRangeType range_type) {
112 switch (range_type) {
113 case Compaction::PenultimateOutputRangeType::kNotSupported:
114 return "NotSupported";
115 case Compaction::PenultimateOutputRangeType::kFullRange:
116 return "FullRange";
117 case Compaction::PenultimateOutputRangeType::kNonLastRange:
118 return "NonLastRange";
119 case Compaction::PenultimateOutputRangeType::kDisabled:
120 return "Disabled";
121 default:
122 assert(false);
123 return "Invalid";
124 }
125 }
126
127 CompactionJob::CompactionJob(
128 int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
129 const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
130 VersionSet* versions, const std::atomic<bool>* shutting_down,
131 LogBuffer* log_buffer, FSDirectory* db_directory,
132 FSDirectory* output_directory, FSDirectory* blob_output_directory,
133 Statistics* stats, InstrumentedMutex* db_mutex,
134 ErrorHandler* db_error_handler,
135 std::vector<SequenceNumber> existing_snapshots,
136 SequenceNumber earliest_write_conflict_snapshot,
137 const SnapshotChecker* snapshot_checker, JobContext* job_context,
138 std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
139 bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
140 CompactionJobStats* compaction_job_stats, Env::Priority thread_pri,
141 const std::shared_ptr<IOTracer>& io_tracer,
142 const std::atomic<bool>& manual_compaction_canceled,
143 const std::string& db_id, const std::string& db_session_id,
144 std::string full_history_ts_low, std::string trim_ts,
145 BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled,
146 int* bg_bottom_compaction_scheduled)
147 : compact_(new CompactionState(compaction)),
148 compaction_stats_(compaction->compaction_reason(), 1),
149 db_options_(db_options),
150 mutable_db_options_copy_(mutable_db_options),
151 log_buffer_(log_buffer),
152 output_directory_(output_directory),
153 stats_(stats),
154 bottommost_level_(false),
155 write_hint_(Env::WLTH_NOT_SET),
156 compaction_job_stats_(compaction_job_stats),
157 job_id_(job_id),
158 dbname_(dbname),
159 db_id_(db_id),
160 db_session_id_(db_session_id),
161 file_options_(file_options),
162 env_(db_options.env),
163 io_tracer_(io_tracer),
164 fs_(db_options.fs, io_tracer),
165 file_options_for_read_(
166 fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
167 versions_(versions),
168 shutting_down_(shutting_down),
169 manual_compaction_canceled_(manual_compaction_canceled),
170 db_directory_(db_directory),
171 blob_output_directory_(blob_output_directory),
172 db_mutex_(db_mutex),
173 db_error_handler_(db_error_handler),
174 existing_snapshots_(std::move(existing_snapshots)),
175 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
176 snapshot_checker_(snapshot_checker),
177 job_context_(job_context),
178 table_cache_(std::move(table_cache)),
179 event_logger_(event_logger),
180 paranoid_file_checks_(paranoid_file_checks),
181 measure_io_stats_(measure_io_stats),
182 thread_pri_(thread_pri),
183 full_history_ts_low_(std::move(full_history_ts_low)),
184 trim_ts_(std::move(trim_ts)),
185 blob_callback_(blob_callback),
186 extra_num_subcompaction_threads_reserved_(0),
187 bg_compaction_scheduled_(bg_compaction_scheduled),
188 bg_bottom_compaction_scheduled_(bg_bottom_compaction_scheduled) {
189 assert(compaction_job_stats_ != nullptr);
190 assert(log_buffer_ != nullptr);
191
192 const auto* cfd = compact_->compaction->column_family_data();
193 ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
194 db_options_.enable_thread_tracking);
195 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
196 ReportStartedCompaction(compaction);
197 }
198
199 CompactionJob::~CompactionJob() {
200 assert(compact_ == nullptr);
201 ThreadStatusUtil::ResetThreadStatus();
202 }
203
204 void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
205 const auto* cfd = compact_->compaction->column_family_data();
206 ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
207 db_options_.enable_thread_tracking);
208
209 ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
210 job_id_);
211
212 ThreadStatusUtil::SetThreadOperationProperty(
213 ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
214 (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
215 compact_->compaction->output_level());
216
217 // In the current design, a CompactionJob is always created
218 // for non-trivial compaction.
219 assert(compaction->IsTrivialMove() == false ||
220 compaction->is_manual_compaction() == true);
221
222 ThreadStatusUtil::SetThreadOperationProperty(
223 ThreadStatus::COMPACTION_PROP_FLAGS,
224 compaction->is_manual_compaction() +
225 (compaction->deletion_compaction() << 1));
226
227 ThreadStatusUtil::SetThreadOperationProperty(
228 ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES,
229 compaction->CalculateTotalInputSize());
230
231 IOSTATS_RESET(bytes_written);
232 IOSTATS_RESET(bytes_read);
233 ThreadStatusUtil::SetThreadOperationProperty(
234 ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
235 ThreadStatusUtil::SetThreadOperationProperty(
236 ThreadStatus::COMPACTION_BYTES_READ, 0);
237
238 // Set the thread operation after operation properties
239 // to ensure GetThreadList() can always show them all together.
240 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
241
242 compaction_job_stats_->is_manual_compaction =
243 compaction->is_manual_compaction();
244 compaction_job_stats_->is_full_compaction = compaction->is_full_compaction();
245 }
246
247 void CompactionJob::Prepare() {
248 AutoThreadOperationStageUpdater stage_updater(
249 ThreadStatus::STAGE_COMPACTION_PREPARE);
250
251 // Generate file_levels_ for compaction before making Iterator
252 auto* c = compact_->compaction;
253 ColumnFamilyData* cfd = c->column_family_data();
254 assert(cfd != nullptr);
255 assert(cfd->current()->storage_info()->NumLevelFiles(
256 compact_->compaction->level()) > 0);
257
258 write_hint_ = cfd->CalculateSSTWriteHint(c->output_level());
259 bottommost_level_ = c->bottommost_level();
260
261 if (c->ShouldFormSubcompactions()) {
262 StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
263 GenSubcompactionBoundaries();
264 }
265 if (boundaries_.size() > 1) {
266 for (size_t i = 0; i <= boundaries_.size(); i++) {
267 compact_->sub_compact_states.emplace_back(
268 c, (i != 0) ? std::optional<Slice>(boundaries_[i - 1]) : std::nullopt,
269 (i != boundaries_.size()) ? std::optional<Slice>(boundaries_[i])
270 : std::nullopt,
271 static_cast<uint32_t>(i));
272 // assert to validate that boundaries don't have same user keys (without
273 // timestamp part).
274 assert(i == 0 || i == boundaries_.size() ||
275 cfd->user_comparator()->CompareWithoutTimestamp(
276 boundaries_[i - 1], boundaries_[i]) < 0);
277 }
278 RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
279 compact_->sub_compact_states.size());
280 } else {
281 compact_->sub_compact_states.emplace_back(c, std::nullopt, std::nullopt,
282 /*sub_job_id*/ 0);
283 }
284
285 // collect all seqno->time information from the input files which will be used
286 // to encode seqno->time to the output files.
287 uint64_t preserve_time_duration =
288 std::max(c->immutable_options()->preserve_internal_time_seconds,
289 c->immutable_options()->preclude_last_level_data_seconds);
290
291 if (preserve_time_duration > 0) {
292 // setup seqno_time_mapping_
293 seqno_time_mapping_.SetMaxTimeDuration(preserve_time_duration);
294 for (const auto& each_level : *c->inputs()) {
295 for (const auto& fmd : each_level.files) {
296 std::shared_ptr<const TableProperties> tp;
297 Status s = cfd->current()->GetTableProperties(&tp, fmd, nullptr);
298 if (s.ok()) {
299 seqno_time_mapping_.Add(tp->seqno_to_time_mapping)
300 .PermitUncheckedError();
301 seqno_time_mapping_.Add(fmd->fd.smallest_seqno,
302 fmd->oldest_ancester_time);
303 }
304 }
305 }
306
307 auto status = seqno_time_mapping_.Sort();
308 if (!status.ok()) {
309 ROCKS_LOG_WARN(db_options_.info_log,
310 "Invalid sequence number to time mapping: Status: %s",
311 status.ToString().c_str());
312 }
313 int64_t _current_time = 0;
314 status = db_options_.clock->GetCurrentTime(&_current_time);
315 if (!status.ok()) {
316 ROCKS_LOG_WARN(db_options_.info_log,
317 "Failed to get current time in compaction: Status: %s",
318 status.ToString().c_str());
319 // preserve all time information
320 preserve_time_min_seqno_ = 0;
321 preclude_last_level_min_seqno_ = 0;
322 } else {
323 seqno_time_mapping_.TruncateOldEntries(_current_time);
324 uint64_t preserve_time =
325 static_cast<uint64_t>(_current_time) > preserve_time_duration
326 ? _current_time - preserve_time_duration
327 : 0;
328 preserve_time_min_seqno_ =
329 seqno_time_mapping_.GetOldestSequenceNum(preserve_time);
330 if (c->immutable_options()->preclude_last_level_data_seconds > 0) {
331 uint64_t preclude_last_level_time =
332 static_cast<uint64_t>(_current_time) >
333 c->immutable_options()->preclude_last_level_data_seconds
334 ? _current_time -
335 c->immutable_options()->preclude_last_level_data_seconds
336 : 0;
337 preclude_last_level_min_seqno_ =
338 seqno_time_mapping_.GetOldestSequenceNum(preclude_last_level_time);
339 }
340 }
341 }
342 }
343
344 uint64_t CompactionJob::GetSubcompactionsLimit() {
345 return extra_num_subcompaction_threads_reserved_ +
346 std::max(
347 std::uint64_t(1),
348 static_cast<uint64_t>(compact_->compaction->max_subcompactions()));
349 }
350
351 void CompactionJob::AcquireSubcompactionResources(
352 int num_extra_required_subcompactions) {
353 TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:0");
354 TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:1");
355 int max_db_compactions =
356 DBImpl::GetBGJobLimits(
357 mutable_db_options_copy_.max_background_flushes,
358 mutable_db_options_copy_.max_background_compactions,
359 mutable_db_options_copy_.max_background_jobs,
360 versions_->GetColumnFamilySet()
361 ->write_controller()
362 ->NeedSpeedupCompaction())
363 .max_compactions;
364 InstrumentedMutexLock l(db_mutex_);
365 // Apply min function first since We need to compute the extra subcompaction
366 // against compaction limits. And then try to reserve threads for extra
367 // subcompactions. The actual number of reserved threads could be less than
368 // the desired number.
369 int available_bg_compactions_against_db_limit =
370 std::max(max_db_compactions - *bg_compaction_scheduled_ -
371 *bg_bottom_compaction_scheduled_,
372 0);
373 // Reservation only supports backgrdoun threads of which the priority is
374 // between BOTTOM and HIGH. Need to degrade the priority to HIGH if the
375 // origin thread_pri_ is higher than that. Similar to ReleaseThreads().
376 extra_num_subcompaction_threads_reserved_ =
377 env_->ReserveThreads(std::min(num_extra_required_subcompactions,
378 available_bg_compactions_against_db_limit),
379 std::min(thread_pri_, Env::Priority::HIGH));
380
381 // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
382 // depending on if this compaction has the bottommost priority
383 if (thread_pri_ == Env::Priority::BOTTOM) {
384 *bg_bottom_compaction_scheduled_ +=
385 extra_num_subcompaction_threads_reserved_;
386 } else {
387 *bg_compaction_scheduled_ += extra_num_subcompaction_threads_reserved_;
388 }
389 }
390
391 void CompactionJob::ShrinkSubcompactionResources(uint64_t num_extra_resources) {
392 // Do nothing when we have zero resources to shrink
393 if (num_extra_resources == 0) return;
394 db_mutex_->Lock();
395 // We cannot release threads more than what we reserved before
396 int extra_num_subcompaction_threads_released = env_->ReleaseThreads(
397 (int)num_extra_resources, std::min(thread_pri_, Env::Priority::HIGH));
398 // Update the number of reserved threads and the number of background
399 // scheduled compactions for this compaction job
400 extra_num_subcompaction_threads_reserved_ -=
401 extra_num_subcompaction_threads_released;
402 // TODO (zichen): design a test case with new subcompaction partitioning
403 // when the number of actual partitions is less than the number of planned
404 // partitions
405 assert(extra_num_subcompaction_threads_released == (int)num_extra_resources);
406 // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
407 // depending on if this compaction has the bottommost priority
408 if (thread_pri_ == Env::Priority::BOTTOM) {
409 *bg_bottom_compaction_scheduled_ -=
410 extra_num_subcompaction_threads_released;
411 } else {
412 *bg_compaction_scheduled_ -= extra_num_subcompaction_threads_released;
413 }
414 db_mutex_->Unlock();
415 TEST_SYNC_POINT("CompactionJob::ShrinkSubcompactionResources:0");
416 }
417
418 void CompactionJob::ReleaseSubcompactionResources() {
419 if (extra_num_subcompaction_threads_reserved_ == 0) {
420 return;
421 }
422 {
423 InstrumentedMutexLock l(db_mutex_);
424 // The number of reserved threads becomes larger than 0 only if the
425 // compaction prioity is round robin and there is no sufficient
426 // sub-compactions available
427
428 // The scheduled compaction must be no less than 1 + extra number
429 // subcompactions using acquired resources since this compaction job has not
430 // finished yet
431 assert(*bg_bottom_compaction_scheduled_ >=
432 1 + extra_num_subcompaction_threads_reserved_ ||
433 *bg_compaction_scheduled_ >=
434 1 + extra_num_subcompaction_threads_reserved_);
435 }
436 ShrinkSubcompactionResources(extra_num_subcompaction_threads_reserved_);
437 }
438
439 struct RangeWithSize {
440 Range range;
441 uint64_t size;
442
443 RangeWithSize(const Slice& a, const Slice& b, uint64_t s = 0)
444 : range(a, b), size(s) {}
445 };
446
447 void CompactionJob::GenSubcompactionBoundaries() {
448 // The goal is to find some boundary keys so that we can evenly partition
449 // the compaction input data into max_subcompactions ranges.
450 // For every input file, we ask TableReader to estimate 128 anchor points
451 // that evenly partition the input file into 128 ranges and the range
452 // sizes. This can be calculated by scanning index blocks of the file.
453 // Once we have the anchor points for all the input files, we merge them
454 // together and try to find keys dividing ranges evenly.
455 // For example, if we have two input files, and each returns following
456 // ranges:
457 // File1: (a1, 1000), (b1, 1200), (c1, 1100)
458 // File2: (a2, 1100), (b2, 1000), (c2, 1000)
459 // We total sort the keys to following:
460 // (a1, 1000), (a2, 1100), (b1, 1200), (b2, 1000), (c1, 1100), (c2, 1000)
461 // We calculate the total size by adding up all ranges' size, which is 6400.
462 // If we would like to partition into 2 subcompactions, the target of the
463 // range size is 3200. Based on the size, we take "b1" as the partition key
464 // since the first three ranges would hit 3200.
465 //
466 // Note that the ranges are actually overlapping. For example, in the example
467 // above, the range ending with "b1" is overlapping with the range ending with
468 // "b2". So the size 1000+1100+1200 is an underestimation of data size up to
469 // "b1". In extreme cases where we only compact N L0 files, a range can
470 // overlap with N-1 other ranges. Since we requested a relatively large number
471 // (128) of ranges from each input files, even N range overlapping would
472 // cause relatively small inaccuracy.
473
474 auto* c = compact_->compaction;
475 if (c->max_subcompactions() <= 1 &&
476 !(c->immutable_options()->compaction_pri == kRoundRobin &&
477 c->immutable_options()->compaction_style == kCompactionStyleLevel)) {
478 return;
479 }
480 auto* cfd = c->column_family_data();
481 const Comparator* cfd_comparator = cfd->user_comparator();
482 const InternalKeyComparator& icomp = cfd->internal_comparator();
483
484 auto* v = compact_->compaction->input_version();
485 int base_level = v->storage_info()->base_level();
486 InstrumentedMutexUnlock unlock_guard(db_mutex_);
487
488 uint64_t total_size = 0;
489 std::vector<TableReader::Anchor> all_anchors;
490 int start_lvl = c->start_level();
491 int out_lvl = c->output_level();
492
493 for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
494 int lvl = c->level(lvl_idx);
495 if (lvl >= start_lvl && lvl <= out_lvl) {
496 const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
497 size_t num_files = flevel->num_files;
498
499 if (num_files == 0) {
500 continue;
501 }
502
503 for (size_t i = 0; i < num_files; i++) {
504 FileMetaData* f = flevel->files[i].file_metadata;
505 std::vector<TableReader::Anchor> my_anchors;
506 Status s = cfd->table_cache()->ApproximateKeyAnchors(
507 ReadOptions(), icomp, *f, my_anchors);
508 if (!s.ok() || my_anchors.empty()) {
509 my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize());
510 }
511 for (auto& ac : my_anchors) {
512 // Can be optimize to avoid this loop.
513 total_size += ac.range_size;
514 }
515
516 all_anchors.insert(all_anchors.end(), my_anchors.begin(),
517 my_anchors.end());
518 }
519 }
520 }
521 // Here we total sort all the anchor points across all files and go through
522 // them in the sorted order to find partitioning boundaries.
523 // Not the most efficient implementation. A much more efficient algorithm
524 // probably exists. But they are more complex. If performance turns out to
525 // be a problem, we can optimize.
526 std::sort(
527 all_anchors.begin(), all_anchors.end(),
528 [cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool {
529 return cfd_comparator->CompareWithoutTimestamp(a.user_key, b.user_key) <
530 0;
531 });
532
533 // Remove duplicated entries from boundaries.
534 all_anchors.erase(
535 std::unique(all_anchors.begin(), all_anchors.end(),
536 [cfd_comparator](TableReader::Anchor& a,
537 TableReader::Anchor& b) -> bool {
538 return cfd_comparator->CompareWithoutTimestamp(
539 a.user_key, b.user_key) == 0;
540 }),
541 all_anchors.end());
542
543 // Get the number of planned subcompactions, may update reserve threads
544 // and update extra_num_subcompaction_threads_reserved_ for round-robin
545 uint64_t num_planned_subcompactions;
546 if (c->immutable_options()->compaction_pri == kRoundRobin &&
547 c->immutable_options()->compaction_style == kCompactionStyleLevel) {
548 // For round-robin compaction prioity, we need to employ more
549 // subcompactions (may exceed the max_subcompaction limit). The extra
550 // subcompactions will be executed using reserved threads and taken into
551 // account bg_compaction_scheduled or bg_bottom_compaction_scheduled.
552
553 // Initialized by the number of input files
554 num_planned_subcompactions = static_cast<uint64_t>(c->num_input_files(0));
555 uint64_t max_subcompactions_limit = GetSubcompactionsLimit();
556 if (max_subcompactions_limit < num_planned_subcompactions) {
557 // Assert two pointers are not empty so that we can use extra
558 // subcompactions against db compaction limits
559 assert(bg_bottom_compaction_scheduled_ != nullptr);
560 assert(bg_compaction_scheduled_ != nullptr);
561 // Reserve resources when max_subcompaction is not sufficient
562 AcquireSubcompactionResources(
563 (int)(num_planned_subcompactions - max_subcompactions_limit));
564 // Subcompactions limit changes after acquiring additional resources.
565 // Need to call GetSubcompactionsLimit() again to update the number
566 // of planned subcompactions
567 num_planned_subcompactions =
568 std::min(num_planned_subcompactions, GetSubcompactionsLimit());
569 } else {
570 num_planned_subcompactions = max_subcompactions_limit;
571 }
572 } else {
573 num_planned_subcompactions = GetSubcompactionsLimit();
574 }
575
576 TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:0",
577 &num_planned_subcompactions);
578 if (num_planned_subcompactions == 1) return;
579
580 // Group the ranges into subcompactions
581 uint64_t target_range_size = std::max(
582 total_size / num_planned_subcompactions,
583 MaxFileSizeForLevel(
584 *(c->mutable_cf_options()), out_lvl,
585 c->immutable_options()->compaction_style, base_level,
586 c->immutable_options()->level_compaction_dynamic_level_bytes));
587
588 if (target_range_size >= total_size) {
589 return;
590 }
591
592 uint64_t next_threshold = target_range_size;
593 uint64_t cumulative_size = 0;
594 uint64_t num_actual_subcompactions = 1U;
595 for (TableReader::Anchor& anchor : all_anchors) {
596 cumulative_size += anchor.range_size;
597 if (cumulative_size > next_threshold) {
598 next_threshold += target_range_size;
599 num_actual_subcompactions++;
600 boundaries_.push_back(anchor.user_key);
601 }
602 if (num_actual_subcompactions == num_planned_subcompactions) {
603 break;
604 }
605 }
606 TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:1",
607 &num_actual_subcompactions);
608 // Shrink extra subcompactions resources when extra resrouces are acquired
609 ShrinkSubcompactionResources(
610 std::min((int)(num_planned_subcompactions - num_actual_subcompactions),
611 extra_num_subcompaction_threads_reserved_));
612 }
613
614 Status CompactionJob::Run() {
615 AutoThreadOperationStageUpdater stage_updater(
616 ThreadStatus::STAGE_COMPACTION_RUN);
617 TEST_SYNC_POINT("CompactionJob::Run():Start");
618 log_buffer_->FlushBufferToLog();
619 LogCompaction();
620
621 const size_t num_threads = compact_->sub_compact_states.size();
622 assert(num_threads > 0);
623 const uint64_t start_micros = db_options_.clock->NowMicros();
624
625 // Launch a thread for each of subcompactions 1...num_threads-1
626 std::vector<port::Thread> thread_pool;
627 thread_pool.reserve(num_threads - 1);
628 for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
629 thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
630 &compact_->sub_compact_states[i]);
631 }
632
633 // Always schedule the first subcompaction (whether or not there are also
634 // others) in the current thread to be efficient with resources
635 ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);
636
637 // Wait for all other threads (if there are any) to finish execution
638 for (auto& thread : thread_pool) {
639 thread.join();
640 }
641
642 compaction_stats_.SetMicros(db_options_.clock->NowMicros() - start_micros);
643
644 for (auto& state : compact_->sub_compact_states) {
645 compaction_stats_.AddCpuMicros(state.compaction_job_stats.cpu_micros);
646 state.RemoveLastEmptyOutput();
647 }
648
649 RecordTimeToHistogram(stats_, COMPACTION_TIME,
650 compaction_stats_.stats.micros);
651 RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
652 compaction_stats_.stats.cpu_micros);
653
654 TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
655
656 // Check if any thread encountered an error during execution
657 Status status;
658 IOStatus io_s;
659 bool wrote_new_blob_files = false;
660
661 for (const auto& state : compact_->sub_compact_states) {
662 if (!state.status.ok()) {
663 status = state.status;
664 io_s = state.io_status;
665 break;
666 }
667
668 if (state.Current().HasBlobFileAdditions()) {
669 wrote_new_blob_files = true;
670 }
671 }
672
673 if (io_status_.ok()) {
674 io_status_ = io_s;
675 }
676 if (status.ok()) {
677 constexpr IODebugContext* dbg = nullptr;
678
679 if (output_directory_) {
680 io_s = output_directory_->FsyncWithDirOptions(
681 IOOptions(), dbg,
682 DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
683 }
684
685 if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
686 blob_output_directory_ != output_directory_) {
687 io_s = blob_output_directory_->FsyncWithDirOptions(
688 IOOptions(), dbg,
689 DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
690 }
691 }
692 if (io_status_.ok()) {
693 io_status_ = io_s;
694 }
695 if (status.ok()) {
696 status = io_s;
697 }
698 if (status.ok()) {
699 thread_pool.clear();
700 std::vector<const CompactionOutputs::Output*> files_output;
701 for (const auto& state : compact_->sub_compact_states) {
702 for (const auto& output : state.GetOutputs()) {
703 files_output.emplace_back(&output);
704 }
705 }
706 ColumnFamilyData* cfd = compact_->compaction->column_family_data();
707 auto& prefix_extractor =
708 compact_->compaction->mutable_cf_options()->prefix_extractor;
709 std::atomic<size_t> next_file_idx(0);
710 auto verify_table = [&](Status& output_status) {
711 while (true) {
712 size_t file_idx = next_file_idx.fetch_add(1);
713 if (file_idx >= files_output.size()) {
714 break;
715 }
716 // Verify that the table is usable
717 // We set for_compaction to false and don't
718 // OptimizeForCompactionTableRead here because this is a special case
719 // after we finish the table building No matter whether
720 // use_direct_io_for_flush_and_compaction is true, we will regard this
721 // verification as user reads since the goal is to cache it here for
722 // further user reads
723 ReadOptions read_options;
724 InternalIterator* iter = cfd->table_cache()->NewIterator(
725 read_options, file_options_, cfd->internal_comparator(),
726 files_output[file_idx]->meta, /*range_del_agg=*/nullptr,
727 prefix_extractor,
728 /*table_reader_ptr=*/nullptr,
729 cfd->internal_stats()->GetFileReadHist(
730 compact_->compaction->output_level()),
731 TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
732 /*skip_filters=*/false, compact_->compaction->output_level(),
733 MaxFileSizeForL0MetaPin(
734 *compact_->compaction->mutable_cf_options()),
735 /*smallest_compaction_key=*/nullptr,
736 /*largest_compaction_key=*/nullptr,
737 /*allow_unprepared_value=*/false);
738 auto s = iter->status();
739
740 if (s.ok() && paranoid_file_checks_) {
741 OutputValidator validator(cfd->internal_comparator(),
742 /*_enable_order_check=*/true,
743 /*_enable_hash=*/true);
744 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
745 s = validator.Add(iter->key(), iter->value());
746 if (!s.ok()) {
747 break;
748 }
749 }
750 if (s.ok()) {
751 s = iter->status();
752 }
753 if (s.ok() &&
754 !validator.CompareValidator(files_output[file_idx]->validator)) {
755 s = Status::Corruption("Paranoid checksums do not match");
756 }
757 }
758
759 delete iter;
760
761 if (!s.ok()) {
762 output_status = s;
763 break;
764 }
765 }
766 };
767 for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
768 thread_pool.emplace_back(
769 verify_table, std::ref(compact_->sub_compact_states[i].status));
770 }
771 verify_table(compact_->sub_compact_states[0].status);
772 for (auto& thread : thread_pool) {
773 thread.join();
774 }
775
776 for (const auto& state : compact_->sub_compact_states) {
777 if (!state.status.ok()) {
778 status = state.status;
779 break;
780 }
781 }
782 }
783
784 ReleaseSubcompactionResources();
785 TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources:0");
786 TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources:1");
787
788 TablePropertiesCollection tp;
789 for (const auto& state : compact_->sub_compact_states) {
790 for (const auto& output : state.GetOutputs()) {
791 auto fn =
792 TableFileName(state.compaction->immutable_options()->cf_paths,
793 output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
794 tp[fn] = output.table_properties;
795 }
796 }
797 compact_->compaction->SetOutputTableProperties(std::move(tp));
798
799 // Finish up all book-keeping to unify the subcompaction results
800 compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_);
801 UpdateCompactionStats();
802
803 RecordCompactionIOStats();
804 LogFlush(db_options_.info_log);
805 TEST_SYNC_POINT("CompactionJob::Run():End");
806
807 compact_->status = status;
808 return status;
809 }
810
811 Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
812 assert(compact_);
813
814 AutoThreadOperationStageUpdater stage_updater(
815 ThreadStatus::STAGE_COMPACTION_INSTALL);
816 db_mutex_->AssertHeld();
817 Status status = compact_->status;
818
819 ColumnFamilyData* cfd = compact_->compaction->column_family_data();
820 assert(cfd);
821
822 int output_level = compact_->compaction->output_level();
823 cfd->internal_stats()->AddCompactionStats(output_level, thread_pri_,
824 compaction_stats_);
825
826 if (status.ok()) {
827 status = InstallCompactionResults(mutable_cf_options);
828 }
829 if (!versions_->io_status().ok()) {
830 io_status_ = versions_->io_status();
831 }
832
833 VersionStorageInfo::LevelSummaryStorage tmp;
834 auto vstorage = cfd->current()->storage_info();
835 const auto& stats = compaction_stats_.stats;
836
837 double read_write_amp = 0.0;
838 double write_amp = 0.0;
839 double bytes_read_per_sec = 0;
840 double bytes_written_per_sec = 0;
841
842 const uint64_t bytes_read_non_output_and_blob =
843 stats.bytes_read_non_output_levels + stats.bytes_read_blob;
844 const uint64_t bytes_read_all =
845 stats.bytes_read_output_level + bytes_read_non_output_and_blob;
846 const uint64_t bytes_written_all =
847 stats.bytes_written + stats.bytes_written_blob;
848
849 if (bytes_read_non_output_and_blob > 0) {
850 read_write_amp = (bytes_written_all + bytes_read_all) /
851 static_cast<double>(bytes_read_non_output_and_blob);
852 write_amp =
853 bytes_written_all / static_cast<double>(bytes_read_non_output_and_blob);
854 }
855 if (stats.micros > 0) {
856 bytes_read_per_sec = bytes_read_all / static_cast<double>(stats.micros);
857 bytes_written_per_sec =
858 bytes_written_all / static_cast<double>(stats.micros);
859 }
860
861 const std::string& column_family_name = cfd->GetName();
862
863 constexpr double kMB = 1048576.0;
864
865 ROCKS_LOG_BUFFER(
866 log_buffer_,
867 "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
868 "files in(%d, %d) out(%d +%d blob) "
869 "MB in(%.1f, %.1f +%.1f blob) out(%.1f +%.1f blob), "
870 "read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64
871 ", records dropped: %" PRIu64 " output_compression: %s\n",
872 column_family_name.c_str(), vstorage->LevelSummary(&tmp),
873 bytes_read_per_sec, bytes_written_per_sec,
874 compact_->compaction->output_level(),
875 stats.num_input_files_in_non_output_levels,
876 stats.num_input_files_in_output_level, stats.num_output_files,
877 stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB,
878 stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB,
879 stats.bytes_written / kMB, stats.bytes_written_blob / kMB, read_write_amp,
880 write_amp, status.ToString().c_str(), stats.num_input_records,
881 stats.num_dropped_records,
882 CompressionTypeToString(compact_->compaction->output_compression())
883 .c_str());
884
885 const auto& blob_files = vstorage->GetBlobFiles();
886 if (!blob_files.empty()) {
887 assert(blob_files.front());
888 assert(blob_files.back());
889
890 ROCKS_LOG_BUFFER(
891 log_buffer_,
892 "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n",
893 column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(),
894 blob_files.back()->GetBlobFileNumber());
895 }
896
897 if (compaction_stats_.has_penultimate_level_output) {
898 ROCKS_LOG_BUFFER(
899 log_buffer_,
900 "[%s] has Penultimate Level output: %" PRIu64
901 ", level %d, number of files: %" PRIu64 ", number of records: %" PRIu64,
902 column_family_name.c_str(),
903 compaction_stats_.penultimate_level_stats.bytes_written,
904 compact_->compaction->GetPenultimateLevel(),
905 compaction_stats_.penultimate_level_stats.num_output_files,
906 compaction_stats_.penultimate_level_stats.num_output_records);
907 }
908
909 UpdateCompactionJobStats(stats);
910
911 auto stream = event_logger_->LogToBuffer(log_buffer_, 8192);
912 stream << "job" << job_id_ << "event"
913 << "compaction_finished"
914 << "compaction_time_micros" << stats.micros
915 << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
916 << compact_->compaction->output_level() << "num_output_files"
917 << stats.num_output_files << "total_output_size"
918 << stats.bytes_written;
919
920 if (stats.num_output_files_blob > 0) {
921 stream << "num_blob_output_files" << stats.num_output_files_blob
922 << "total_blob_output_size" << stats.bytes_written_blob;
923 }
924
925 stream << "num_input_records" << stats.num_input_records
926 << "num_output_records" << stats.num_output_records
927 << "num_subcompactions" << compact_->sub_compact_states.size()
928 << "output_compression"
929 << CompressionTypeToString(compact_->compaction->output_compression());
930
931 stream << "num_single_delete_mismatches"
932 << compaction_job_stats_->num_single_del_mismatch;
933 stream << "num_single_delete_fallthrough"
934 << compaction_job_stats_->num_single_del_fallthru;
935
936 if (measure_io_stats_) {
937 stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos;
938 stream << "file_range_sync_nanos"
939 << compaction_job_stats_->file_range_sync_nanos;
940 stream << "file_fsync_nanos" << compaction_job_stats_->file_fsync_nanos;
941 stream << "file_prepare_write_nanos"
942 << compaction_job_stats_->file_prepare_write_nanos;
943 }
944
945 stream << "lsm_state";
946 stream.StartArray();
947 for (int level = 0; level < vstorage->num_levels(); ++level) {
948 stream << vstorage->NumLevelFiles(level);
949 }
950 stream.EndArray();
951
952 if (!blob_files.empty()) {
953 assert(blob_files.front());
954 stream << "blob_file_head" << blob_files.front()->GetBlobFileNumber();
955
956 assert(blob_files.back());
957 stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber();
958 }
959
960 if (compaction_stats_.has_penultimate_level_output) {
961 InternalStats::CompactionStats& pl_stats =
962 compaction_stats_.penultimate_level_stats;
963 stream << "penultimate_level_num_output_files" << pl_stats.num_output_files;
964 stream << "penultimate_level_bytes_written" << pl_stats.bytes_written;
965 stream << "penultimate_level_num_output_records"
966 << pl_stats.num_output_records;
967 stream << "penultimate_level_num_output_files_blob"
968 << pl_stats.num_output_files_blob;
969 stream << "penultimate_level_bytes_written_blob"
970 << pl_stats.bytes_written_blob;
971 }
972
973 CleanupCompaction();
974 return status;
975 }
976
977 void CompactionJob::NotifyOnSubcompactionBegin(
978 SubcompactionState* sub_compact) {
979 #ifndef ROCKSDB_LITE
980 Compaction* c = compact_->compaction;
981
982 if (db_options_.listeners.empty()) {
983 return;
984 }
985 if (shutting_down_->load(std::memory_order_acquire)) {
986 return;
987 }
988 if (c->is_manual_compaction() &&
989 manual_compaction_canceled_.load(std::memory_order_acquire)) {
990 return;
991 }
992
993 sub_compact->notify_on_subcompaction_completion = true;
994
995 SubcompactionJobInfo info{};
996 sub_compact->BuildSubcompactionJobInfo(info);
997 info.job_id = static_cast<int>(job_id_);
998 info.thread_id = env_->GetThreadID();
999
1000 for (const auto& listener : db_options_.listeners) {
1001 listener->OnSubcompactionBegin(info);
1002 }
1003 info.status.PermitUncheckedError();
1004
1005 #else
1006 (void)sub_compact;
1007 #endif // ROCKSDB_LITE
1008 }
1009
1010 void CompactionJob::NotifyOnSubcompactionCompleted(
1011 SubcompactionState* sub_compact) {
1012 #ifndef ROCKSDB_LITE
1013
1014 if (db_options_.listeners.empty()) {
1015 return;
1016 }
1017 if (shutting_down_->load(std::memory_order_acquire)) {
1018 return;
1019 }
1020
1021 if (sub_compact->notify_on_subcompaction_completion == false) {
1022 return;
1023 }
1024
1025 SubcompactionJobInfo info{};
1026 sub_compact->BuildSubcompactionJobInfo(info);
1027 info.job_id = static_cast<int>(job_id_);
1028 info.thread_id = env_->GetThreadID();
1029
1030 for (const auto& listener : db_options_.listeners) {
1031 listener->OnSubcompactionCompleted(info);
1032 }
1033 #else
1034 (void)sub_compact;
1035 #endif // ROCKSDB_LITE
1036 }
1037
1038 void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
1039 assert(sub_compact);
1040 assert(sub_compact->compaction);
1041
1042 #ifndef ROCKSDB_LITE
1043 if (db_options_.compaction_service) {
1044 CompactionServiceJobStatus comp_status =
1045 ProcessKeyValueCompactionWithCompactionService(sub_compact);
1046 if (comp_status == CompactionServiceJobStatus::kSuccess ||
1047 comp_status == CompactionServiceJobStatus::kFailure) {
1048 return;
1049 }
1050 // fallback to local compaction
1051 assert(comp_status == CompactionServiceJobStatus::kUseLocal);
1052 }
1053 #endif // !ROCKSDB_LITE
1054
1055 uint64_t prev_cpu_micros = db_options_.clock->CPUMicros();
1056
1057 ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1058
1059 // Create compaction filter and fail the compaction if
1060 // IgnoreSnapshots() = false because it is not supported anymore
1061 const CompactionFilter* compaction_filter =
1062 cfd->ioptions()->compaction_filter;
1063 std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
1064 if (compaction_filter == nullptr) {
1065 compaction_filter_from_factory =
1066 sub_compact->compaction->CreateCompactionFilter();
1067 compaction_filter = compaction_filter_from_factory.get();
1068 }
1069 if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
1070 sub_compact->status = Status::NotSupported(
1071 "CompactionFilter::IgnoreSnapshots() = false is not supported "
1072 "anymore.");
1073 return;
1074 }
1075
1076 NotifyOnSubcompactionBegin(sub_compact);
1077
1078 auto range_del_agg = std::make_unique<CompactionRangeDelAggregator>(
1079 &cfd->internal_comparator(), existing_snapshots_, &full_history_ts_low_,
1080 &trim_ts_);
1081
1082 // TODO: since we already use C++17, should use
1083 // std::optional<const Slice> instead.
1084 const std::optional<Slice> start = sub_compact->start;
1085 const std::optional<Slice> end = sub_compact->end;
1086
1087 std::optional<Slice> start_without_ts;
1088 std::optional<Slice> end_without_ts;
1089
1090 ReadOptions read_options;
1091 read_options.verify_checksums = true;
1092 read_options.fill_cache = false;
1093 read_options.rate_limiter_priority = GetRateLimiterPriority();
1094 // Compaction iterators shouldn't be confined to a single prefix.
1095 // Compactions use Seek() for
1096 // (a) concurrent compactions,
1097 // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
1098 read_options.total_order_seek = true;
1099
1100 // Remove the timestamps from boundaries because boundaries created in
1101 // GenSubcompactionBoundaries doesn't strip away the timestamp.
1102 size_t ts_sz = cfd->user_comparator()->timestamp_size();
1103 if (start.has_value()) {
1104 read_options.iterate_lower_bound = &start.value();
1105 if (ts_sz > 0) {
1106 start_without_ts = StripTimestampFromUserKey(start.value(), ts_sz);
1107 read_options.iterate_lower_bound = &start_without_ts.value();
1108 }
1109 }
1110 if (end.has_value()) {
1111 read_options.iterate_upper_bound = &end.value();
1112 if (ts_sz > 0) {
1113 end_without_ts = StripTimestampFromUserKey(end.value(), ts_sz);
1114 read_options.iterate_upper_bound = &end_without_ts.value();
1115 }
1116 }
1117
1118 // Although the v2 aggregator is what the level iterator(s) know about,
1119 // the AddTombstones calls will be propagated down to the v1 aggregator.
1120 std::unique_ptr<InternalIterator> raw_input(versions_->MakeInputIterator(
1121 read_options, sub_compact->compaction, range_del_agg.get(),
1122 file_options_for_read_, start, end));
1123 InternalIterator* input = raw_input.get();
1124
1125 IterKey start_ikey;
1126 IterKey end_ikey;
1127 Slice start_slice;
1128 Slice end_slice;
1129
1130 static constexpr char kMaxTs[] =
1131 "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff";
1132 Slice ts_slice;
1133 std::string max_ts;
1134 if (ts_sz > 0) {
1135 if (ts_sz <= strlen(kMaxTs)) {
1136 ts_slice = Slice(kMaxTs, ts_sz);
1137 } else {
1138 max_ts = std::string(ts_sz, '\xff');
1139 ts_slice = Slice(max_ts);
1140 }
1141 }
1142
1143 if (start.has_value()) {
1144 start_ikey.SetInternalKey(start.value(), kMaxSequenceNumber,
1145 kValueTypeForSeek);
1146 if (ts_sz > 0) {
1147 start_ikey.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeek,
1148 &ts_slice);
1149 }
1150 start_slice = start_ikey.GetInternalKey();
1151 }
1152 if (end.has_value()) {
1153 end_ikey.SetInternalKey(end.value(), kMaxSequenceNumber, kValueTypeForSeek);
1154 if (ts_sz > 0) {
1155 end_ikey.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeek,
1156 &ts_slice);
1157 }
1158 end_slice = end_ikey.GetInternalKey();
1159 }
1160
1161 std::unique_ptr<InternalIterator> clip;
1162 if (start.has_value() || end.has_value()) {
1163 clip = std::make_unique<ClippingIterator>(
1164 raw_input.get(), start.has_value() ? &start_slice : nullptr,
1165 end.has_value() ? &end_slice : nullptr, &cfd->internal_comparator());
1166 input = clip.get();
1167 }
1168
1169 std::unique_ptr<InternalIterator> blob_counter;
1170
1171 if (sub_compact->compaction->DoesInputReferenceBlobFiles()) {
1172 BlobGarbageMeter* meter = sub_compact->Current().CreateBlobGarbageMeter();
1173 blob_counter = std::make_unique<BlobCountingIterator>(input, meter);
1174 input = blob_counter.get();
1175 }
1176
1177 std::unique_ptr<InternalIterator> trim_history_iter;
1178 if (ts_sz > 0 && !trim_ts_.empty()) {
1179 trim_history_iter = std::make_unique<HistoryTrimmingIterator>(
1180 input, cfd->user_comparator(), trim_ts_);
1181 input = trim_history_iter.get();
1182 }
1183
1184 input->SeekToFirst();
1185
1186 AutoThreadOperationStageUpdater stage_updater(
1187 ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
1188
1189 // I/O measurement variables
1190 PerfLevel prev_perf_level = PerfLevel::kEnableTime;
1191 const uint64_t kRecordStatsEvery = 1000;
1192 uint64_t prev_write_nanos = 0;
1193 uint64_t prev_fsync_nanos = 0;
1194 uint64_t prev_range_sync_nanos = 0;
1195 uint64_t prev_prepare_write_nanos = 0;
1196 uint64_t prev_cpu_write_nanos = 0;
1197 uint64_t prev_cpu_read_nanos = 0;
1198 if (measure_io_stats_) {
1199 prev_perf_level = GetPerfLevel();
1200 SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
1201 prev_write_nanos = IOSTATS(write_nanos);
1202 prev_fsync_nanos = IOSTATS(fsync_nanos);
1203 prev_range_sync_nanos = IOSTATS(range_sync_nanos);
1204 prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
1205 prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
1206 prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
1207 }
1208
1209 MergeHelper merge(
1210 env_, cfd->user_comparator(), cfd->ioptions()->merge_operator.get(),
1211 compaction_filter, db_options_.info_log.get(),
1212 false /* internal key corruption is expected */,
1213 existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
1214 snapshot_checker_, compact_->compaction->level(), db_options_.stats);
1215
1216 const MutableCFOptions* mutable_cf_options =
1217 sub_compact->compaction->mutable_cf_options();
1218 assert(mutable_cf_options);
1219
1220 std::vector<std::string> blob_file_paths;
1221
1222 // TODO: BlobDB to support output_to_penultimate_level compaction, which needs
1223 // 2 builders, so may need to move to `CompactionOutputs`
1224 std::unique_ptr<BlobFileBuilder> blob_file_builder(
1225 (mutable_cf_options->enable_blob_files &&
1226 sub_compact->compaction->output_level() >=
1227 mutable_cf_options->blob_file_starting_level)
1228 ? new BlobFileBuilder(
1229 versions_, fs_.get(),
1230 sub_compact->compaction->immutable_options(),
1231 mutable_cf_options, &file_options_, db_id_, db_session_id_,
1232 job_id_, cfd->GetID(), cfd->GetName(), Env::IOPriority::IO_LOW,
1233 write_hint_, io_tracer_, blob_callback_,
1234 BlobFileCreationReason::kCompaction, &blob_file_paths,
1235 sub_compact->Current().GetBlobFileAdditionsPtr())
1236 : nullptr);
1237
1238 TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
1239 TEST_SYNC_POINT_CALLBACK(
1240 "CompactionJob::Run():PausingManualCompaction:1",
1241 reinterpret_cast<void*>(
1242 const_cast<std::atomic<bool>*>(&manual_compaction_canceled_)));
1243
1244 const std::string* const full_history_ts_low =
1245 full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
1246 const SequenceNumber job_snapshot_seq =
1247 job_context_ ? job_context_->GetJobSnapshotSequence()
1248 : kMaxSequenceNumber;
1249
1250 auto c_iter = std::make_unique<CompactionIterator>(
1251 input, cfd->user_comparator(), &merge, versions_->LastSequence(),
1252 &existing_snapshots_, earliest_write_conflict_snapshot_, job_snapshot_seq,
1253 snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
1254 /*expect_valid_internal_key=*/true, range_del_agg.get(),
1255 blob_file_builder.get(), db_options_.allow_data_in_errors,
1256 db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
1257 sub_compact->compaction, compaction_filter, shutting_down_,
1258 db_options_.info_log, full_history_ts_low, preserve_time_min_seqno_,
1259 preclude_last_level_min_seqno_);
1260 c_iter->SeekToFirst();
1261
1262 // Assign range delete aggregator to the target output level, which makes sure
1263 // it only output to single level
1264 sub_compact->AssignRangeDelAggregator(std::move(range_del_agg));
1265
1266 const auto& c_iter_stats = c_iter->iter_stats();
1267
1268 // define the open and close functions for the compaction files, which will be
1269 // used open/close output files when needed.
1270 const CompactionFileOpenFunc open_file_func =
1271 [this, sub_compact](CompactionOutputs& outputs) {
1272 return this->OpenCompactionOutputFile(sub_compact, outputs);
1273 };
1274 const CompactionFileCloseFunc close_file_func =
1275 [this, sub_compact](CompactionOutputs& outputs, const Status& status,
1276 const Slice& next_table_min_key) {
1277 return this->FinishCompactionOutputFile(status, sub_compact, outputs,
1278 next_table_min_key);
1279 };
1280
1281 Status status;
1282 TEST_SYNC_POINT_CALLBACK(
1283 "CompactionJob::ProcessKeyValueCompaction()::Processing",
1284 reinterpret_cast<void*>(
1285 const_cast<Compaction*>(sub_compact->compaction)));
1286 while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
1287 // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
1288 // returns true.
1289
1290 assert(!end.has_value() || cfd->user_comparator()->Compare(
1291 c_iter->user_key(), end.value()) < 0);
1292
1293 if (c_iter_stats.num_input_records % kRecordStatsEvery ==
1294 kRecordStatsEvery - 1) {
1295 RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
1296 c_iter->ResetRecordCounts();
1297 RecordCompactionIOStats();
1298 }
1299
1300 // Add current compaction_iterator key to target compaction output, if the
1301 // output file needs to be close or open, it will call the `open_file_func`
1302 // and `close_file_func`.
1303 // TODO: it would be better to have the compaction file open/close moved
1304 // into `CompactionOutputs` which has the output file information.
1305 status = sub_compact->AddToOutput(*c_iter, open_file_func, close_file_func);
1306 if (!status.ok()) {
1307 break;
1308 }
1309
1310 TEST_SYNC_POINT_CALLBACK(
1311 "CompactionJob::Run():PausingManualCompaction:2",
1312 reinterpret_cast<void*>(
1313 const_cast<std::atomic<bool>*>(&manual_compaction_canceled_)));
1314 c_iter->Next();
1315 if (c_iter->status().IsManualCompactionPaused()) {
1316 break;
1317 }
1318 }
1319
1320 sub_compact->compaction_job_stats.num_blobs_read =
1321 c_iter_stats.num_blobs_read;
1322 sub_compact->compaction_job_stats.total_blob_bytes_read =
1323 c_iter_stats.total_blob_bytes_read;
1324 sub_compact->compaction_job_stats.num_input_deletion_records =
1325 c_iter_stats.num_input_deletion_records;
1326 sub_compact->compaction_job_stats.num_corrupt_keys =
1327 c_iter_stats.num_input_corrupt_records;
1328 sub_compact->compaction_job_stats.num_single_del_fallthru =
1329 c_iter_stats.num_single_del_fallthru;
1330 sub_compact->compaction_job_stats.num_single_del_mismatch =
1331 c_iter_stats.num_single_del_mismatch;
1332 sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
1333 c_iter_stats.total_input_raw_key_bytes;
1334 sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
1335 c_iter_stats.total_input_raw_value_bytes;
1336
1337 RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
1338 c_iter_stats.total_filter_time);
1339
1340 if (c_iter_stats.num_blobs_relocated > 0) {
1341 RecordTick(stats_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
1342 c_iter_stats.num_blobs_relocated);
1343 }
1344 if (c_iter_stats.total_blob_bytes_relocated > 0) {
1345 RecordTick(stats_, BLOB_DB_GC_BYTES_RELOCATED,
1346 c_iter_stats.total_blob_bytes_relocated);
1347 }
1348
1349 RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
1350 RecordCompactionIOStats();
1351
1352 if (status.ok() && cfd->IsDropped()) {
1353 status =
1354 Status::ColumnFamilyDropped("Column family dropped during compaction");
1355 }
1356 if ((status.ok() || status.IsColumnFamilyDropped()) &&
1357 shutting_down_->load(std::memory_order_relaxed)) {
1358 status = Status::ShutdownInProgress("Database shutdown");
1359 }
1360 if ((status.ok() || status.IsColumnFamilyDropped()) &&
1361 (manual_compaction_canceled_.load(std::memory_order_relaxed))) {
1362 status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1363 }
1364 if (status.ok()) {
1365 status = input->status();
1366 }
1367 if (status.ok()) {
1368 status = c_iter->status();
1369 }
1370
1371 // Call FinishCompactionOutputFile() even if status is not ok: it needs to
1372 // close the output files. Open file function is also passed, in case there's
1373 // only range-dels, no file was opened, to save the range-dels, it need to
1374 // create a new output file.
1375 status = sub_compact->CloseCompactionFiles(status, open_file_func,
1376 close_file_func);
1377
1378 if (blob_file_builder) {
1379 if (status.ok()) {
1380 status = blob_file_builder->Finish();
1381 } else {
1382 blob_file_builder->Abandon(status);
1383 }
1384 blob_file_builder.reset();
1385 sub_compact->Current().UpdateBlobStats();
1386 }
1387
1388 sub_compact->compaction_job_stats.cpu_micros =
1389 db_options_.clock->CPUMicros() - prev_cpu_micros;
1390
1391 if (measure_io_stats_) {
1392 sub_compact->compaction_job_stats.file_write_nanos +=
1393 IOSTATS(write_nanos) - prev_write_nanos;
1394 sub_compact->compaction_job_stats.file_fsync_nanos +=
1395 IOSTATS(fsync_nanos) - prev_fsync_nanos;
1396 sub_compact->compaction_job_stats.file_range_sync_nanos +=
1397 IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
1398 sub_compact->compaction_job_stats.file_prepare_write_nanos +=
1399 IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
1400 sub_compact->compaction_job_stats.cpu_micros -=
1401 (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
1402 IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
1403 1000;
1404 if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
1405 SetPerfLevel(prev_perf_level);
1406 }
1407 }
1408 #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
1409 if (!status.ok()) {
1410 if (c_iter) {
1411 c_iter->status().PermitUncheckedError();
1412 }
1413 if (input) {
1414 input->status().PermitUncheckedError();
1415 }
1416 }
1417 #endif // ROCKSDB_ASSERT_STATUS_CHECKED
1418
1419 blob_counter.reset();
1420 clip.reset();
1421 raw_input.reset();
1422 sub_compact->status = status;
1423 NotifyOnSubcompactionCompleted(sub_compact);
1424 }
1425
1426 uint64_t CompactionJob::GetCompactionId(SubcompactionState* sub_compact) const {
1427 return (uint64_t)job_id_ << 32 | sub_compact->sub_job_id;
1428 }
1429
1430 void CompactionJob::RecordDroppedKeys(
1431 const CompactionIterationStats& c_iter_stats,
1432 CompactionJobStats* compaction_job_stats) {
1433 if (c_iter_stats.num_record_drop_user > 0) {
1434 RecordTick(stats_, COMPACTION_KEY_DROP_USER,
1435 c_iter_stats.num_record_drop_user);
1436 }
1437 if (c_iter_stats.num_record_drop_hidden > 0) {
1438 RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
1439 c_iter_stats.num_record_drop_hidden);
1440 if (compaction_job_stats) {
1441 compaction_job_stats->num_records_replaced +=
1442 c_iter_stats.num_record_drop_hidden;
1443 }
1444 }
1445 if (c_iter_stats.num_record_drop_obsolete > 0) {
1446 RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE,
1447 c_iter_stats.num_record_drop_obsolete);
1448 if (compaction_job_stats) {
1449 compaction_job_stats->num_expired_deletion_records +=
1450 c_iter_stats.num_record_drop_obsolete;
1451 }
1452 }
1453 if (c_iter_stats.num_record_drop_range_del > 0) {
1454 RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL,
1455 c_iter_stats.num_record_drop_range_del);
1456 }
1457 if (c_iter_stats.num_range_del_drop_obsolete > 0) {
1458 RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE,
1459 c_iter_stats.num_range_del_drop_obsolete);
1460 }
1461 if (c_iter_stats.num_optimized_del_drop_obsolete > 0) {
1462 RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
1463 c_iter_stats.num_optimized_del_drop_obsolete);
1464 }
1465 }
1466
1467 Status CompactionJob::FinishCompactionOutputFile(
1468 const Status& input_status, SubcompactionState* sub_compact,
1469 CompactionOutputs& outputs, const Slice& next_table_min_key) {
1470 AutoThreadOperationStageUpdater stage_updater(
1471 ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
1472 assert(sub_compact != nullptr);
1473 assert(outputs.HasBuilder());
1474
1475 FileMetaData* meta = outputs.GetMetaData();
1476 uint64_t output_number = meta->fd.GetNumber();
1477 assert(output_number != 0);
1478
1479 ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1480 std::string file_checksum = kUnknownFileChecksum;
1481 std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
1482
1483 // Check for iterator errors
1484 Status s = input_status;
1485
1486 // Add range tombstones
1487 auto earliest_snapshot = kMaxSequenceNumber;
1488 if (existing_snapshots_.size() > 0) {
1489 earliest_snapshot = existing_snapshots_[0];
1490 }
1491 if (s.ok()) {
1492 CompactionIterationStats range_del_out_stats;
1493 // if the compaction supports per_key_placement, only output range dels to
1494 // the penultimate level.
1495 // Note: Use `bottommost_level_ = true` for both bottommost and
1496 // output_to_penultimate_level compaction here, as it's only used to decide
1497 // if range dels could be dropped.
1498 if (outputs.HasRangeDel()) {
1499 s = outputs.AddRangeDels(
1500 sub_compact->start.has_value() ? &(sub_compact->start.value())
1501 : nullptr,
1502 sub_compact->end.has_value() ? &(sub_compact->end.value()) : nullptr,
1503 range_del_out_stats, bottommost_level_, cfd->internal_comparator(),
1504 earliest_snapshot, next_table_min_key, full_history_ts_low_);
1505 }
1506 RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
1507 TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
1508 }
1509
1510 const uint64_t current_entries = outputs.NumEntries();
1511
1512 s = outputs.Finish(s, seqno_time_mapping_);
1513
1514 if (s.ok()) {
1515 // With accurate smallest and largest key, we can get a slightly more
1516 // accurate oldest ancester time.
1517 // This makes oldest ancester time in manifest more accurate than in
1518 // table properties. Not sure how to resolve it.
1519 if (meta->smallest.size() > 0 && meta->largest.size() > 0) {
1520 uint64_t refined_oldest_ancester_time;
1521 Slice new_smallest = meta->smallest.user_key();
1522 Slice new_largest = meta->largest.user_key();
1523 if (!new_largest.empty() && !new_smallest.empty()) {
1524 refined_oldest_ancester_time =
1525 sub_compact->compaction->MinInputFileOldestAncesterTime(
1526 &(meta->smallest), &(meta->largest));
1527 if (refined_oldest_ancester_time !=
1528 std::numeric_limits<uint64_t>::max()) {
1529 meta->oldest_ancester_time = refined_oldest_ancester_time;
1530 }
1531 }
1532 }
1533 }
1534
1535 // Finish and check for file errors
1536 IOStatus io_s = outputs.WriterSyncClose(s, db_options_.clock, stats_,
1537 db_options_.use_fsync);
1538
1539 if (s.ok() && io_s.ok()) {
1540 file_checksum = meta->file_checksum;
1541 file_checksum_func_name = meta->file_checksum_func_name;
1542 }
1543
1544 if (s.ok()) {
1545 s = io_s;
1546 }
1547 if (sub_compact->io_status.ok()) {
1548 sub_compact->io_status = io_s;
1549 // Since this error is really a copy of the
1550 // "normal" status, it does not also need to be checked
1551 sub_compact->io_status.PermitUncheckedError();
1552 }
1553
1554 TableProperties tp;
1555 if (s.ok()) {
1556 tp = outputs.GetTableProperties();
1557 }
1558
1559 if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
1560 // If there is nothing to output, no necessary to generate a sst file.
1561 // This happens when the output level is bottom level, at the same time
1562 // the sub_compact output nothing.
1563 std::string fname =
1564 TableFileName(sub_compact->compaction->immutable_options()->cf_paths,
1565 meta->fd.GetNumber(), meta->fd.GetPathId());
1566
1567 // TODO(AR) it is not clear if there are any larger implications if
1568 // DeleteFile fails here
1569 Status ds = env_->DeleteFile(fname);
1570 if (!ds.ok()) {
1571 ROCKS_LOG_WARN(
1572 db_options_.info_log,
1573 "[%s] [JOB %d] Unable to remove SST file for table #%" PRIu64
1574 " at bottom level%s",
1575 cfd->GetName().c_str(), job_id_, output_number,
1576 meta->marked_for_compaction ? " (need compaction)" : "");
1577 }
1578
1579 // Also need to remove the file from outputs, or it will be added to the
1580 // VersionEdit.
1581 outputs.RemoveLastOutput();
1582 meta = nullptr;
1583 }
1584
1585 if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
1586 // Output to event logger and fire events.
1587 outputs.UpdateTableProperties();
1588 ROCKS_LOG_INFO(db_options_.info_log,
1589 "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
1590 " keys, %" PRIu64 " bytes%s, temperature: %s",
1591 cfd->GetName().c_str(), job_id_, output_number,
1592 current_entries, meta->fd.file_size,
1593 meta->marked_for_compaction ? " (need compaction)" : "",
1594 temperature_to_string[meta->temperature].c_str());
1595 }
1596 std::string fname;
1597 FileDescriptor output_fd;
1598 uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
1599 Status status_for_listener = s;
1600 if (meta != nullptr) {
1601 fname = GetTableFileName(meta->fd.GetNumber());
1602 output_fd = meta->fd;
1603 oldest_blob_file_number = meta->oldest_blob_file_number;
1604 } else {
1605 fname = "(nil)";
1606 if (s.ok()) {
1607 status_for_listener = Status::Aborted("Empty SST file not kept");
1608 }
1609 }
1610 EventHelpers::LogAndNotifyTableFileCreationFinished(
1611 event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
1612 job_id_, output_fd, oldest_blob_file_number, tp,
1613 TableFileCreationReason::kCompaction, status_for_listener, file_checksum,
1614 file_checksum_func_name);
1615
1616 #ifndef ROCKSDB_LITE
1617 // Report new file to SstFileManagerImpl
1618 auto sfm =
1619 static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
1620 if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
1621 Status add_s = sfm->OnAddFile(fname);
1622 if (!add_s.ok() && s.ok()) {
1623 s = add_s;
1624 }
1625 if (sfm->IsMaxAllowedSpaceReached()) {
1626 // TODO(ajkr): should we return OK() if max space was reached by the final
1627 // compaction output file (similarly to how flush works when full)?
1628 s = Status::SpaceLimit("Max allowed space was reached");
1629 TEST_SYNC_POINT(
1630 "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached");
1631 InstrumentedMutexLock l(db_mutex_);
1632 db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
1633 }
1634 }
1635 #endif
1636
1637 outputs.ResetBuilder();
1638 return s;
1639 }
1640
1641 Status CompactionJob::InstallCompactionResults(
1642 const MutableCFOptions& mutable_cf_options) {
1643 assert(compact_);
1644
1645 db_mutex_->AssertHeld();
1646
1647 auto* compaction = compact_->compaction;
1648 assert(compaction);
1649
1650 {
1651 Compaction::InputLevelSummaryBuffer inputs_summary;
1652 if (compaction_stats_.has_penultimate_level_output) {
1653 ROCKS_LOG_BUFFER(
1654 log_buffer_,
1655 "[%s] [JOB %d] Compacted %s => output_to_penultimate_level: %" PRIu64
1656 " bytes + last: %" PRIu64 " bytes. Total: %" PRIu64 " bytes",
1657 compaction->column_family_data()->GetName().c_str(), job_id_,
1658 compaction->InputLevelSummary(&inputs_summary),
1659 compaction_stats_.penultimate_level_stats.bytes_written,
1660 compaction_stats_.stats.bytes_written,
1661 compaction_stats_.TotalBytesWritten());
1662 } else {
1663 ROCKS_LOG_BUFFER(log_buffer_,
1664 "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
1665 compaction->column_family_data()->GetName().c_str(),
1666 job_id_, compaction->InputLevelSummary(&inputs_summary),
1667 compaction_stats_.TotalBytesWritten());
1668 }
1669 }
1670
1671 VersionEdit* const edit = compaction->edit();
1672 assert(edit);
1673
1674 // Add compaction inputs
1675 compaction->AddInputDeletions(edit);
1676
1677 std::unordered_map<uint64_t, BlobGarbageMeter::BlobStats> blob_total_garbage;
1678
1679 for (const auto& sub_compact : compact_->sub_compact_states) {
1680 sub_compact.AddOutputsEdit(edit);
1681
1682 for (const auto& blob : sub_compact.Current().GetBlobFileAdditions()) {
1683 edit->AddBlobFile(blob);
1684 }
1685
1686 if (sub_compact.Current().GetBlobGarbageMeter()) {
1687 const auto& flows = sub_compact.Current().GetBlobGarbageMeter()->flows();
1688
1689 for (const auto& pair : flows) {
1690 const uint64_t blob_file_number = pair.first;
1691 const BlobGarbageMeter::BlobInOutFlow& flow = pair.second;
1692
1693 assert(flow.IsValid());
1694 if (flow.HasGarbage()) {
1695 blob_total_garbage[blob_file_number].Add(flow.GetGarbageCount(),
1696 flow.GetGarbageBytes());
1697 }
1698 }
1699 }
1700 }
1701
1702 for (const auto& pair : blob_total_garbage) {
1703 const uint64_t blob_file_number = pair.first;
1704 const BlobGarbageMeter::BlobStats& stats = pair.second;
1705
1706 edit->AddBlobFileGarbage(blob_file_number, stats.GetCount(),
1707 stats.GetBytes());
1708 }
1709
1710 if ((compaction->compaction_reason() ==
1711 CompactionReason::kLevelMaxLevelSize ||
1712 compaction->compaction_reason() == CompactionReason::kRoundRobinTtl) &&
1713 compaction->immutable_options()->compaction_pri == kRoundRobin) {
1714 int start_level = compaction->start_level();
1715 if (start_level > 0) {
1716 auto vstorage = compaction->input_version()->storage_info();
1717 edit->AddCompactCursor(start_level,
1718 vstorage->GetNextCompactCursor(
1719 start_level, compaction->num_input_files(0)));
1720 }
1721 }
1722
1723 return versions_->LogAndApply(compaction->column_family_data(),
1724 mutable_cf_options, edit, db_mutex_,
1725 db_directory_);
1726 }
1727
1728 void CompactionJob::RecordCompactionIOStats() {
1729 RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
1730 RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
1731 CompactionReason compaction_reason =
1732 compact_->compaction->compaction_reason();
1733 if (compaction_reason == CompactionReason::kFilesMarkedForCompaction) {
1734 RecordTick(stats_, COMPACT_READ_BYTES_MARKED, IOSTATS(bytes_read));
1735 RecordTick(stats_, COMPACT_WRITE_BYTES_MARKED, IOSTATS(bytes_written));
1736 } else if (compaction_reason == CompactionReason::kPeriodicCompaction) {
1737 RecordTick(stats_, COMPACT_READ_BYTES_PERIODIC, IOSTATS(bytes_read));
1738 RecordTick(stats_, COMPACT_WRITE_BYTES_PERIODIC, IOSTATS(bytes_written));
1739 } else if (compaction_reason == CompactionReason::kTtl) {
1740 RecordTick(stats_, COMPACT_READ_BYTES_TTL, IOSTATS(bytes_read));
1741 RecordTick(stats_, COMPACT_WRITE_BYTES_TTL, IOSTATS(bytes_written));
1742 }
1743 ThreadStatusUtil::IncreaseThreadOperationProperty(
1744 ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
1745 IOSTATS_RESET(bytes_read);
1746 ThreadStatusUtil::IncreaseThreadOperationProperty(
1747 ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
1748 IOSTATS_RESET(bytes_written);
1749 }
1750
1751 Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
1752 CompactionOutputs& outputs) {
1753 assert(sub_compact != nullptr);
1754
1755 // no need to lock because VersionSet::next_file_number_ is atomic
1756 uint64_t file_number = versions_->NewFileNumber();
1757 std::string fname = GetTableFileName(file_number);
1758 // Fire events.
1759 ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1760 #ifndef ROCKSDB_LITE
1761 EventHelpers::NotifyTableFileCreationStarted(
1762 cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, job_id_,
1763 TableFileCreationReason::kCompaction);
1764 #endif // !ROCKSDB_LITE
1765 // Make the output file
1766 std::unique_ptr<FSWritableFile> writable_file;
1767 #ifndef NDEBUG
1768 bool syncpoint_arg = file_options_.use_direct_writes;
1769 TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
1770 &syncpoint_arg);
1771 #endif
1772
1773 // Pass temperature of the last level files to FileSystem.
1774 FileOptions fo_copy = file_options_;
1775 Temperature temperature = sub_compact->compaction->output_temperature();
1776 // only set for the last level compaction and also it's not output to
1777 // penultimate level (when preclude_last_level feature is enabled)
1778 if (temperature == Temperature::kUnknown &&
1779 sub_compact->compaction->is_last_level() &&
1780 !sub_compact->IsCurrentPenultimateLevel()) {
1781 temperature =
1782 sub_compact->compaction->mutable_cf_options()->last_level_temperature;
1783 }
1784 fo_copy.temperature = temperature;
1785
1786 Status s;
1787 IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy);
1788 s = io_s;
1789 if (sub_compact->io_status.ok()) {
1790 sub_compact->io_status = io_s;
1791 // Since this error is really a copy of the io_s that is checked below as s,
1792 // it does not also need to be checked.
1793 sub_compact->io_status.PermitUncheckedError();
1794 }
1795 if (!s.ok()) {
1796 ROCKS_LOG_ERROR(
1797 db_options_.info_log,
1798 "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
1799 " fails at NewWritableFile with status %s",
1800 sub_compact->compaction->column_family_data()->GetName().c_str(),
1801 job_id_, file_number, s.ToString().c_str());
1802 LogFlush(db_options_.info_log);
1803 EventHelpers::LogAndNotifyTableFileCreationFinished(
1804 event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(),
1805 fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
1806 TableProperties(), TableFileCreationReason::kCompaction, s,
1807 kUnknownFileChecksum, kUnknownFileChecksumFuncName);
1808 return s;
1809 }
1810
1811 // Try to figure out the output file's oldest ancester time.
1812 int64_t temp_current_time = 0;
1813 auto get_time_status = db_options_.clock->GetCurrentTime(&temp_current_time);
1814 // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
1815 if (!get_time_status.ok()) {
1816 ROCKS_LOG_WARN(db_options_.info_log,
1817 "Failed to get current time. Status: %s",
1818 get_time_status.ToString().c_str());
1819 }
1820 uint64_t current_time = static_cast<uint64_t>(temp_current_time);
1821 InternalKey tmp_start, tmp_end;
1822 if (sub_compact->start.has_value()) {
1823 tmp_start.SetMinPossibleForUserKey(sub_compact->start.value());
1824 }
1825 if (sub_compact->end.has_value()) {
1826 tmp_end.SetMinPossibleForUserKey(sub_compact->end.value());
1827 }
1828 uint64_t oldest_ancester_time =
1829 sub_compact->compaction->MinInputFileOldestAncesterTime(
1830 sub_compact->start.has_value() ? &tmp_start : nullptr,
1831 sub_compact->end.has_value() ? &tmp_end : nullptr);
1832 if (oldest_ancester_time == std::numeric_limits<uint64_t>::max()) {
1833 oldest_ancester_time = current_time;
1834 }
1835
1836 // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
1837 {
1838 FileMetaData meta;
1839 meta.fd = FileDescriptor(file_number,
1840 sub_compact->compaction->output_path_id(), 0);
1841 meta.oldest_ancester_time = oldest_ancester_time;
1842 meta.file_creation_time = current_time;
1843 meta.temperature = temperature;
1844 assert(!db_id_.empty());
1845 assert(!db_session_id_.empty());
1846 s = GetSstInternalUniqueId(db_id_, db_session_id_, meta.fd.GetNumber(),
1847 &meta.unique_id);
1848 if (!s.ok()) {
1849 ROCKS_LOG_ERROR(db_options_.info_log,
1850 "[%s] [JOB %d] file #%" PRIu64
1851 " failed to generate unique id: %s.",
1852 cfd->GetName().c_str(), job_id_, meta.fd.GetNumber(),
1853 s.ToString().c_str());
1854 return s;
1855 }
1856
1857 outputs.AddOutput(std::move(meta), cfd->internal_comparator(),
1858 sub_compact->compaction->mutable_cf_options()
1859 ->check_flush_compaction_key_order,
1860 paranoid_file_checks_);
1861 }
1862
1863 writable_file->SetIOPriority(GetRateLimiterPriority());
1864 writable_file->SetWriteLifeTimeHint(write_hint_);
1865 FileTypeSet tmp_set = db_options_.checksum_handoff_file_types;
1866 writable_file->SetPreallocationBlockSize(static_cast<size_t>(
1867 sub_compact->compaction->OutputFilePreallocationSize()));
1868 const auto& listeners =
1869 sub_compact->compaction->immutable_options()->listeners;
1870 outputs.AssignFileWriter(new WritableFileWriter(
1871 std::move(writable_file), fname, fo_copy, db_options_.clock, io_tracer_,
1872 db_options_.stats, listeners, db_options_.file_checksum_gen_factory.get(),
1873 tmp_set.Contains(FileType::kTableFile), false));
1874
1875 TableBuilderOptions tboptions(
1876 *cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
1877 cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
1878 sub_compact->compaction->output_compression(),
1879 sub_compact->compaction->output_compression_opts(), cfd->GetID(),
1880 cfd->GetName(), sub_compact->compaction->output_level(),
1881 bottommost_level_, TableFileCreationReason::kCompaction,
1882 0 /* oldest_key_time */, current_time, db_id_, db_session_id_,
1883 sub_compact->compaction->max_output_file_size(), file_number);
1884
1885 outputs.NewBuilder(tboptions);
1886
1887 LogFlush(db_options_.info_log);
1888 return s;
1889 }
1890
1891 void CompactionJob::CleanupCompaction() {
1892 for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
1893 sub_compact.Cleanup(table_cache_.get());
1894 }
1895 delete compact_;
1896 compact_ = nullptr;
1897 }
1898
1899 #ifndef ROCKSDB_LITE
1900 namespace {
1901 void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
1902 assert(prefix_length > 0);
1903 size_t length = src.size() > prefix_length ? prefix_length : src.size();
1904 dst->assign(src.data(), length);
1905 }
1906 } // namespace
1907
1908 #endif // !ROCKSDB_LITE
1909
1910 void CompactionJob::UpdateCompactionStats() {
1911 assert(compact_);
1912
1913 Compaction* compaction = compact_->compaction;
1914 compaction_stats_.stats.num_input_files_in_non_output_levels = 0;
1915 compaction_stats_.stats.num_input_files_in_output_level = 0;
1916 for (int input_level = 0;
1917 input_level < static_cast<int>(compaction->num_input_levels());
1918 ++input_level) {
1919 if (compaction->level(input_level) != compaction->output_level()) {
1920 UpdateCompactionInputStatsHelper(
1921 &compaction_stats_.stats.num_input_files_in_non_output_levels,
1922 &compaction_stats_.stats.bytes_read_non_output_levels, input_level);
1923 } else {
1924 UpdateCompactionInputStatsHelper(
1925 &compaction_stats_.stats.num_input_files_in_output_level,
1926 &compaction_stats_.stats.bytes_read_output_level, input_level);
1927 }
1928 }
1929
1930 assert(compaction_job_stats_);
1931 compaction_stats_.stats.bytes_read_blob =
1932 compaction_job_stats_->total_blob_bytes_read;
1933
1934 compaction_stats_.stats.num_dropped_records =
1935 compaction_stats_.DroppedRecords();
1936 }
1937
1938 void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
1939 uint64_t* bytes_read,
1940 int input_level) {
1941 const Compaction* compaction = compact_->compaction;
1942 auto num_input_files = compaction->num_input_files(input_level);
1943 *num_files += static_cast<int>(num_input_files);
1944
1945 for (size_t i = 0; i < num_input_files; ++i) {
1946 const auto* file_meta = compaction->input(input_level, i);
1947 *bytes_read += file_meta->fd.GetFileSize();
1948 compaction_stats_.stats.num_input_records +=
1949 static_cast<uint64_t>(file_meta->num_entries);
1950 }
1951 }
1952
1953 void CompactionJob::UpdateCompactionJobStats(
1954 const InternalStats::CompactionStats& stats) const {
1955 #ifndef ROCKSDB_LITE
1956 compaction_job_stats_->elapsed_micros = stats.micros;
1957
1958 // input information
1959 compaction_job_stats_->total_input_bytes =
1960 stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
1961 compaction_job_stats_->num_input_records = stats.num_input_records;
1962 compaction_job_stats_->num_input_files =
1963 stats.num_input_files_in_non_output_levels +
1964 stats.num_input_files_in_output_level;
1965 compaction_job_stats_->num_input_files_at_output_level =
1966 stats.num_input_files_in_output_level;
1967
1968 // output information
1969 compaction_job_stats_->total_output_bytes = stats.bytes_written;
1970 compaction_job_stats_->total_output_bytes_blob = stats.bytes_written_blob;
1971 compaction_job_stats_->num_output_records = stats.num_output_records;
1972 compaction_job_stats_->num_output_files = stats.num_output_files;
1973 compaction_job_stats_->num_output_files_blob = stats.num_output_files_blob;
1974
1975 if (stats.num_output_files > 0) {
1976 CopyPrefix(compact_->SmallestUserKey(),
1977 CompactionJobStats::kMaxPrefixLength,
1978 &compaction_job_stats_->smallest_output_key_prefix);
1979 CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength,
1980 &compaction_job_stats_->largest_output_key_prefix);
1981 }
1982 #else
1983 (void)stats;
1984 #endif // !ROCKSDB_LITE
1985 }
1986
1987 void CompactionJob::LogCompaction() {
1988 Compaction* compaction = compact_->compaction;
1989 ColumnFamilyData* cfd = compaction->column_family_data();
1990
1991 // Let's check if anything will get logged. Don't prepare all the info if
1992 // we're not logging
1993 if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
1994 Compaction::InputLevelSummaryBuffer inputs_summary;
1995 ROCKS_LOG_INFO(
1996 db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f",
1997 cfd->GetName().c_str(), job_id_,
1998 compaction->InputLevelSummary(&inputs_summary), compaction->score());
1999 char scratch[2345];
2000 compaction->Summary(scratch, sizeof(scratch));
2001 ROCKS_LOG_INFO(db_options_.info_log, "[%s]: Compaction start summary: %s\n",
2002 cfd->GetName().c_str(), scratch);
2003 // build event logger report
2004 auto stream = event_logger_->Log();
2005 stream << "job" << job_id_ << "event"
2006 << "compaction_started"
2007 << "compaction_reason"
2008 << GetCompactionReasonString(compaction->compaction_reason());
2009 for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
2010 stream << ("files_L" + std::to_string(compaction->level(i)));
2011 stream.StartArray();
2012 for (auto f : *compaction->inputs(i)) {
2013 stream << f->fd.GetNumber();
2014 }
2015 stream.EndArray();
2016 }
2017 stream << "score" << compaction->score() << "input_data_size"
2018 << compaction->CalculateTotalInputSize() << "oldest_snapshot_seqno"
2019 << (existing_snapshots_.empty()
2020 ? int64_t{-1} // Use -1 for "none"
2021 : static_cast<int64_t>(existing_snapshots_[0]));
2022 if (compaction->SupportsPerKeyPlacement()) {
2023 stream << "preclude_last_level_min_seqno"
2024 << preclude_last_level_min_seqno_;
2025 stream << "penultimate_output_level" << compaction->GetPenultimateLevel();
2026 stream << "penultimate_output_range"
2027 << GetCompactionPenultimateOutputRangeTypeString(
2028 compaction->GetPenultimateOutputRangeType());
2029
2030 if (compaction->GetPenultimateOutputRangeType() ==
2031 Compaction::PenultimateOutputRangeType::kDisabled) {
2032 ROCKS_LOG_WARN(
2033 db_options_.info_log,
2034 "[%s] [JOB %d] Penultimate level output is disabled, likely "
2035 "because of the range conflict in the penultimate level",
2036 cfd->GetName().c_str(), job_id_);
2037 }
2038 }
2039 }
2040 }
2041
2042 std::string CompactionJob::GetTableFileName(uint64_t file_number) {
2043 return TableFileName(compact_->compaction->immutable_options()->cf_paths,
2044 file_number, compact_->compaction->output_path_id());
2045 }
2046
2047 Env::IOPriority CompactionJob::GetRateLimiterPriority() {
2048 if (versions_ && versions_->GetColumnFamilySet() &&
2049 versions_->GetColumnFamilySet()->write_controller()) {
2050 WriteController* write_controller =
2051 versions_->GetColumnFamilySet()->write_controller();
2052 if (write_controller->NeedsDelay() || write_controller->IsStopped()) {
2053 return Env::IO_USER;
2054 }
2055 }
2056
2057 return Env::IO_LOW;
2058 }
2059
2060 } // namespace ROCKSDB_NAMESPACE