]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/db/compaction/compaction_job.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / db / compaction / compaction_job.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "db/compaction/compaction_job.h"
11
12 #include <algorithm>
13 #include <cinttypes>
14 #include <functional>
15 #include <list>
16 #include <memory>
17 #include <random>
18 #include <set>
19 #include <thread>
20 #include <utility>
21 #include <vector>
22
23 #include "db/blob/blob_file_addition.h"
24 #include "db/blob/blob_file_builder.h"
25 #include "db/builder.h"
26 #include "db/db_impl/db_impl.h"
27 #include "db/db_iter.h"
28 #include "db/dbformat.h"
29 #include "db/error_handler.h"
30 #include "db/event_helpers.h"
31 #include "db/log_reader.h"
32 #include "db/log_writer.h"
33 #include "db/memtable.h"
34 #include "db/memtable_list.h"
35 #include "db/merge_context.h"
36 #include "db/merge_helper.h"
37 #include "db/output_validator.h"
38 #include "db/range_del_aggregator.h"
39 #include "db/version_set.h"
40 #include "file/filename.h"
41 #include "file/read_write_util.h"
42 #include "file/sst_file_manager_impl.h"
43 #include "file/writable_file_writer.h"
44 #include "logging/log_buffer.h"
45 #include "logging/logging.h"
46 #include "monitoring/iostats_context_imp.h"
47 #include "monitoring/perf_context_imp.h"
48 #include "monitoring/thread_status_util.h"
49 #include "port/port.h"
50 #include "rocksdb/db.h"
51 #include "rocksdb/env.h"
52 #include "rocksdb/sst_partitioner.h"
53 #include "rocksdb/statistics.h"
54 #include "rocksdb/status.h"
55 #include "rocksdb/table.h"
56 #include "table/block_based/block.h"
57 #include "table/block_based/block_based_table_factory.h"
58 #include "table/merging_iterator.h"
59 #include "table/table_builder.h"
60 #include "test_util/sync_point.h"
61 #include "util/coding.h"
62 #include "util/hash.h"
63 #include "util/mutexlock.h"
64 #include "util/random.h"
65 #include "util/stop_watch.h"
66 #include "util/string_util.h"
67
68 namespace ROCKSDB_NAMESPACE {
69
70 const char* GetCompactionReasonString(CompactionReason compaction_reason) {
71 switch (compaction_reason) {
72 case CompactionReason::kUnknown:
73 return "Unknown";
74 case CompactionReason::kLevelL0FilesNum:
75 return "LevelL0FilesNum";
76 case CompactionReason::kLevelMaxLevelSize:
77 return "LevelMaxLevelSize";
78 case CompactionReason::kUniversalSizeAmplification:
79 return "UniversalSizeAmplification";
80 case CompactionReason::kUniversalSizeRatio:
81 return "UniversalSizeRatio";
82 case CompactionReason::kUniversalSortedRunNum:
83 return "UniversalSortedRunNum";
84 case CompactionReason::kFIFOMaxSize:
85 return "FIFOMaxSize";
86 case CompactionReason::kFIFOReduceNumFiles:
87 return "FIFOReduceNumFiles";
88 case CompactionReason::kFIFOTtl:
89 return "FIFOTtl";
90 case CompactionReason::kManualCompaction:
91 return "ManualCompaction";
92 case CompactionReason::kFilesMarkedForCompaction:
93 return "FilesMarkedForCompaction";
94 case CompactionReason::kBottommostFiles:
95 return "BottommostFiles";
96 case CompactionReason::kTtl:
97 return "Ttl";
98 case CompactionReason::kFlush:
99 return "Flush";
100 case CompactionReason::kExternalSstIngestion:
101 return "ExternalSstIngestion";
102 case CompactionReason::kPeriodicCompaction:
103 return "PeriodicCompaction";
104 case CompactionReason::kNumOfReasons:
105 // fall through
106 default:
107 assert(false);
108 return "Invalid";
109 }
110 }
111
112 // Maintains state for each sub-compaction
113 struct CompactionJob::SubcompactionState {
114 const Compaction* compaction;
115 std::unique_ptr<CompactionIterator> c_iter;
116
117 // The boundaries of the key-range this compaction is interested in. No two
118 // subcompactions may have overlapping key-ranges.
119 // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
120 Slice *start, *end;
121
122 // The return status of this subcompaction
123 Status status;
124
125 // The return IO Status of this subcompaction
126 IOStatus io_status;
127
128 // Files produced by this subcompaction
129 struct Output {
130 Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
131 bool _enable_order_check, bool _enable_hash)
132 : meta(std::move(_meta)),
133 validator(_icmp, _enable_order_check, _enable_hash),
134 finished(false) {}
135 FileMetaData meta;
136 OutputValidator validator;
137 bool finished;
138 std::shared_ptr<const TableProperties> table_properties;
139 };
140
141 // State kept for output being generated
142 std::vector<Output> outputs;
143 std::vector<BlobFileAddition> blob_file_additions;
144 std::unique_ptr<WritableFileWriter> outfile;
145 std::unique_ptr<TableBuilder> builder;
146
147 Output* current_output() {
148 if (outputs.empty()) {
149 // This subcompaction's output could be empty if compaction was aborted
150 // before this subcompaction had a chance to generate any output files.
151 // When subcompactions are executed sequentially this is more likely and
152 // will be particulalry likely for the later subcompactions to be empty.
153 // Once they are run in parallel however it should be much rarer.
154 return nullptr;
155 } else {
156 return &outputs.back();
157 }
158 }
159
160 uint64_t current_output_file_size = 0;
161
162 // State during the subcompaction
163 uint64_t total_bytes = 0;
164 uint64_t num_output_records = 0;
165 CompactionJobStats compaction_job_stats;
166 uint64_t approx_size = 0;
167 // An index that used to speed up ShouldStopBefore().
168 size_t grandparent_index = 0;
169 // The number of bytes overlapping between the current output and
170 // grandparent files used in ShouldStopBefore().
171 uint64_t overlapped_bytes = 0;
172 // A flag determine whether the key has been seen in ShouldStopBefore()
173 bool seen_key = false;
174
175 SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size)
176 : compaction(c), start(_start), end(_end), approx_size(size) {
177 assert(compaction != nullptr);
178 }
179
180 // Adds the key and value to the builder
181 // If paranoid is true, adds the key-value to the paranoid hash
182 Status AddToBuilder(const Slice& key, const Slice& value) {
183 auto curr = current_output();
184 assert(builder != nullptr);
185 assert(curr != nullptr);
186 Status s = curr->validator.Add(key, value);
187 if (!s.ok()) {
188 return s;
189 }
190 builder->Add(key, value);
191 return Status::OK();
192 }
193
194 // Returns true iff we should stop building the current output
195 // before processing "internal_key".
196 bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) {
197 const InternalKeyComparator* icmp =
198 &compaction->column_family_data()->internal_comparator();
199 const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
200
201 // Scan to find earliest grandparent file that contains key.
202 while (grandparent_index < grandparents.size() &&
203 icmp->Compare(internal_key,
204 grandparents[grandparent_index]->largest.Encode()) >
205 0) {
206 if (seen_key) {
207 overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize();
208 }
209 assert(grandparent_index + 1 >= grandparents.size() ||
210 icmp->Compare(
211 grandparents[grandparent_index]->largest.Encode(),
212 grandparents[grandparent_index + 1]->smallest.Encode()) <= 0);
213 grandparent_index++;
214 }
215 seen_key = true;
216
217 if (overlapped_bytes + curr_file_size >
218 compaction->max_compaction_bytes()) {
219 // Too much overlap for current output; start new output
220 overlapped_bytes = 0;
221 return true;
222 }
223
224 return false;
225 }
226 };
227
228 // Maintains state for the entire compaction
229 struct CompactionJob::CompactionState {
230 Compaction* const compaction;
231
232 // REQUIRED: subcompaction states are stored in order of increasing
233 // key-range
234 std::vector<CompactionJob::SubcompactionState> sub_compact_states;
235 Status status;
236
237 size_t num_output_files = 0;
238 uint64_t total_bytes = 0;
239 size_t num_blob_output_files = 0;
240 uint64_t total_blob_bytes = 0;
241 uint64_t num_output_records = 0;
242
243 explicit CompactionState(Compaction* c) : compaction(c) {}
244
245 Slice SmallestUserKey() {
246 for (const auto& sub_compact_state : sub_compact_states) {
247 if (!sub_compact_state.outputs.empty() &&
248 sub_compact_state.outputs[0].finished) {
249 return sub_compact_state.outputs[0].meta.smallest.user_key();
250 }
251 }
252 // If there is no finished output, return an empty slice.
253 return Slice(nullptr, 0);
254 }
255
256 Slice LargestUserKey() {
257 for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend();
258 ++it) {
259 if (!it->outputs.empty() && it->current_output()->finished) {
260 assert(it->current_output() != nullptr);
261 return it->current_output()->meta.largest.user_key();
262 }
263 }
264 // If there is no finished output, return an empty slice.
265 return Slice(nullptr, 0);
266 }
267 };
268
269 void CompactionJob::AggregateStatistics() {
270 assert(compact_);
271
272 for (SubcompactionState& sc : compact_->sub_compact_states) {
273 auto& outputs = sc.outputs;
274
275 if (!outputs.empty() && !outputs.back().meta.fd.file_size) {
276 // An error occurred, so ignore the last output.
277 outputs.pop_back();
278 }
279
280 compact_->num_output_files += outputs.size();
281 compact_->total_bytes += sc.total_bytes;
282
283 const auto& blobs = sc.blob_file_additions;
284
285 compact_->num_blob_output_files += blobs.size();
286
287 for (const auto& blob : blobs) {
288 compact_->total_blob_bytes += blob.GetTotalBlobBytes();
289 }
290
291 compact_->num_output_records += sc.num_output_records;
292
293 compaction_job_stats_->Add(sc.compaction_job_stats);
294 }
295 }
296
297 CompactionJob::CompactionJob(
298 int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
299 const FileOptions& file_options, VersionSet* versions,
300 const std::atomic<bool>* shutting_down,
301 const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
302 FSDirectory* db_directory, FSDirectory* output_directory,
303 FSDirectory* blob_output_directory, Statistics* stats,
304 InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
305 std::vector<SequenceNumber> existing_snapshots,
306 SequenceNumber earliest_write_conflict_snapshot,
307 const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
308 EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
309 const std::string& dbname, CompactionJobStats* compaction_job_stats,
310 Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
311 const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
312 const std::string& db_session_id, std::string full_history_ts_low)
313 : job_id_(job_id),
314 compact_(new CompactionState(compaction)),
315 compaction_job_stats_(compaction_job_stats),
316 compaction_stats_(compaction->compaction_reason(), 1),
317 dbname_(dbname),
318 db_id_(db_id),
319 db_session_id_(db_session_id),
320 db_options_(db_options),
321 file_options_(file_options),
322 env_(db_options.env),
323 io_tracer_(io_tracer),
324 fs_(db_options.fs, io_tracer),
325 file_options_for_read_(
326 fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
327 versions_(versions),
328 shutting_down_(shutting_down),
329 manual_compaction_paused_(manual_compaction_paused),
330 preserve_deletes_seqnum_(preserve_deletes_seqnum),
331 log_buffer_(log_buffer),
332 db_directory_(db_directory),
333 output_directory_(output_directory),
334 blob_output_directory_(blob_output_directory),
335 stats_(stats),
336 db_mutex_(db_mutex),
337 db_error_handler_(db_error_handler),
338 existing_snapshots_(std::move(existing_snapshots)),
339 earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
340 snapshot_checker_(snapshot_checker),
341 table_cache_(std::move(table_cache)),
342 event_logger_(event_logger),
343 bottommost_level_(false),
344 paranoid_file_checks_(paranoid_file_checks),
345 measure_io_stats_(measure_io_stats),
346 write_hint_(Env::WLTH_NOT_SET),
347 thread_pri_(thread_pri),
348 full_history_ts_low_(std::move(full_history_ts_low)) {
349 assert(compaction_job_stats_ != nullptr);
350 assert(log_buffer_ != nullptr);
351 const auto* cfd = compact_->compaction->column_family_data();
352 ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
353 db_options_.enable_thread_tracking);
354 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
355 ReportStartedCompaction(compaction);
356 }
357
358 CompactionJob::~CompactionJob() {
359 assert(compact_ == nullptr);
360 ThreadStatusUtil::ResetThreadStatus();
361 }
362
363 void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
364 const auto* cfd = compact_->compaction->column_family_data();
365 ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
366 db_options_.enable_thread_tracking);
367
368 ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
369 job_id_);
370
371 ThreadStatusUtil::SetThreadOperationProperty(
372 ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
373 (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
374 compact_->compaction->output_level());
375
376 // In the current design, a CompactionJob is always created
377 // for non-trivial compaction.
378 assert(compaction->IsTrivialMove() == false ||
379 compaction->is_manual_compaction() == true);
380
381 ThreadStatusUtil::SetThreadOperationProperty(
382 ThreadStatus::COMPACTION_PROP_FLAGS,
383 compaction->is_manual_compaction() +
384 (compaction->deletion_compaction() << 1));
385
386 ThreadStatusUtil::SetThreadOperationProperty(
387 ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES,
388 compaction->CalculateTotalInputSize());
389
390 IOSTATS_RESET(bytes_written);
391 IOSTATS_RESET(bytes_read);
392 ThreadStatusUtil::SetThreadOperationProperty(
393 ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
394 ThreadStatusUtil::SetThreadOperationProperty(
395 ThreadStatus::COMPACTION_BYTES_READ, 0);
396
397 // Set the thread operation after operation properties
398 // to ensure GetThreadList() can always show them all together.
399 ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
400
401 compaction_job_stats_->is_manual_compaction =
402 compaction->is_manual_compaction();
403 compaction_job_stats_->is_full_compaction = compaction->is_full_compaction();
404 }
405
406 void CompactionJob::Prepare() {
407 AutoThreadOperationStageUpdater stage_updater(
408 ThreadStatus::STAGE_COMPACTION_PREPARE);
409
410 // Generate file_levels_ for compaction berfore making Iterator
411 auto* c = compact_->compaction;
412 assert(c->column_family_data() != nullptr);
413 assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
414 compact_->compaction->level()) > 0);
415
416 write_hint_ =
417 c->column_family_data()->CalculateSSTWriteHint(c->output_level());
418 bottommost_level_ = c->bottommost_level();
419
420 if (c->ShouldFormSubcompactions()) {
421 {
422 StopWatch sw(env_, stats_, SUBCOMPACTION_SETUP_TIME);
423 GenSubcompactionBoundaries();
424 }
425 assert(sizes_.size() == boundaries_.size() + 1);
426
427 for (size_t i = 0; i <= boundaries_.size(); i++) {
428 Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
429 Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
430 compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
431 }
432 RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
433 compact_->sub_compact_states.size());
434 } else {
435 constexpr Slice* start = nullptr;
436 constexpr Slice* end = nullptr;
437 constexpr uint64_t size = 0;
438
439 compact_->sub_compact_states.emplace_back(c, start, end, size);
440 }
441 }
442
443 struct RangeWithSize {
444 Range range;
445 uint64_t size;
446
447 RangeWithSize(const Slice& a, const Slice& b, uint64_t s = 0)
448 : range(a, b), size(s) {}
449 };
450
451 void CompactionJob::GenSubcompactionBoundaries() {
452 auto* c = compact_->compaction;
453 auto* cfd = c->column_family_data();
454 const Comparator* cfd_comparator = cfd->user_comparator();
455 std::vector<Slice> bounds;
456 int start_lvl = c->start_level();
457 int out_lvl = c->output_level();
458
459 // Add the starting and/or ending key of certain input files as a potential
460 // boundary
461 for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
462 int lvl = c->level(lvl_idx);
463 if (lvl >= start_lvl && lvl <= out_lvl) {
464 const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
465 size_t num_files = flevel->num_files;
466
467 if (num_files == 0) {
468 continue;
469 }
470
471 if (lvl == 0) {
472 // For level 0 add the starting and ending key of each file since the
473 // files may have greatly differing key ranges (not range-partitioned)
474 for (size_t i = 0; i < num_files; i++) {
475 bounds.emplace_back(flevel->files[i].smallest_key);
476 bounds.emplace_back(flevel->files[i].largest_key);
477 }
478 } else {
479 // For all other levels add the smallest/largest key in the level to
480 // encompass the range covered by that level
481 bounds.emplace_back(flevel->files[0].smallest_key);
482 bounds.emplace_back(flevel->files[num_files - 1].largest_key);
483 if (lvl == out_lvl) {
484 // For the last level include the starting keys of all files since
485 // the last level is the largest and probably has the widest key
486 // range. Since it's range partitioned, the ending key of one file
487 // and the starting key of the next are very close (or identical).
488 for (size_t i = 1; i < num_files; i++) {
489 bounds.emplace_back(flevel->files[i].smallest_key);
490 }
491 }
492 }
493 }
494 }
495
496 std::sort(bounds.begin(), bounds.end(),
497 [cfd_comparator](const Slice& a, const Slice& b) -> bool {
498 return cfd_comparator->Compare(ExtractUserKey(a),
499 ExtractUserKey(b)) < 0;
500 });
501 // Remove duplicated entries from bounds
502 bounds.erase(
503 std::unique(bounds.begin(), bounds.end(),
504 [cfd_comparator](const Slice& a, const Slice& b) -> bool {
505 return cfd_comparator->Compare(ExtractUserKey(a),
506 ExtractUserKey(b)) == 0;
507 }),
508 bounds.end());
509
510 // Combine consecutive pairs of boundaries into ranges with an approximate
511 // size of data covered by keys in that range
512 uint64_t sum = 0;
513 std::vector<RangeWithSize> ranges;
514 // Get input version from CompactionState since it's already referenced
515 // earlier in SetInputVersioCompaction::SetInputVersion and will not change
516 // when db_mutex_ is released below
517 auto* v = compact_->compaction->input_version();
518 for (auto it = bounds.begin();;) {
519 const Slice a = *it;
520 ++it;
521
522 if (it == bounds.end()) {
523 break;
524 }
525
526 const Slice b = *it;
527
528 // ApproximateSize could potentially create table reader iterator to seek
529 // to the index block and may incur I/O cost in the process. Unlock db
530 // mutex to reduce contention
531 db_mutex_->Unlock();
532 uint64_t size = versions_->ApproximateSize(SizeApproximationOptions(), v, a,
533 b, start_lvl, out_lvl + 1,
534 TableReaderCaller::kCompaction);
535 db_mutex_->Lock();
536 ranges.emplace_back(a, b, size);
537 sum += size;
538 }
539
540 // Group the ranges into subcompactions
541 const double min_file_fill_percent = 4.0 / 5;
542 int base_level = v->storage_info()->base_level();
543 uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
544 sum / min_file_fill_percent /
545 MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl,
546 c->immutable_cf_options()->compaction_style, base_level,
547 c->immutable_cf_options()->level_compaction_dynamic_level_bytes)));
548 uint64_t subcompactions =
549 std::min({static_cast<uint64_t>(ranges.size()),
550 static_cast<uint64_t>(c->max_subcompactions()),
551 max_output_files});
552
553 if (subcompactions > 1) {
554 double mean = sum * 1.0 / subcompactions;
555 // Greedily add ranges to the subcompaction until the sum of the ranges'
556 // sizes becomes >= the expected mean size of a subcompaction
557 sum = 0;
558 for (size_t i = 0; i + 1 < ranges.size(); i++) {
559 sum += ranges[i].size;
560 if (subcompactions == 1) {
561 // If there's only one left to schedule then it goes to the end so no
562 // need to put an end boundary
563 continue;
564 }
565 if (sum >= mean) {
566 boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
567 sizes_.emplace_back(sum);
568 subcompactions--;
569 sum = 0;
570 }
571 }
572 sizes_.emplace_back(sum + ranges.back().size);
573 } else {
574 // Only one range so its size is the total sum of sizes computed above
575 sizes_.emplace_back(sum);
576 }
577 }
578
579 Status CompactionJob::Run() {
580 AutoThreadOperationStageUpdater stage_updater(
581 ThreadStatus::STAGE_COMPACTION_RUN);
582 TEST_SYNC_POINT("CompactionJob::Run():Start");
583 log_buffer_->FlushBufferToLog();
584 LogCompaction();
585
586 const size_t num_threads = compact_->sub_compact_states.size();
587 assert(num_threads > 0);
588 const uint64_t start_micros = env_->NowMicros();
589
590 // Launch a thread for each of subcompactions 1...num_threads-1
591 std::vector<port::Thread> thread_pool;
592 thread_pool.reserve(num_threads - 1);
593 for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
594 thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
595 &compact_->sub_compact_states[i]);
596 }
597
598 // Always schedule the first subcompaction (whether or not there are also
599 // others) in the current thread to be efficient with resources
600 ProcessKeyValueCompaction(&compact_->sub_compact_states[0]);
601
602 // Wait for all other threads (if there are any) to finish execution
603 for (auto& thread : thread_pool) {
604 thread.join();
605 }
606
607 compaction_stats_.micros = env_->NowMicros() - start_micros;
608 compaction_stats_.cpu_micros = 0;
609 for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
610 compaction_stats_.cpu_micros +=
611 compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
612 }
613
614 RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
615 RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
616 compaction_stats_.cpu_micros);
617
618 TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
619
620 // Check if any thread encountered an error during execution
621 Status status;
622 IOStatus io_s;
623 bool wrote_new_blob_files = false;
624
625 for (const auto& state : compact_->sub_compact_states) {
626 if (!state.status.ok()) {
627 status = state.status;
628 io_s = state.io_status;
629 break;
630 }
631
632 if (!state.blob_file_additions.empty()) {
633 wrote_new_blob_files = true;
634 }
635 }
636
637 if (io_status_.ok()) {
638 io_status_ = io_s;
639 }
640 if (status.ok()) {
641 constexpr IODebugContext* dbg = nullptr;
642
643 if (output_directory_) {
644 io_s = output_directory_->Fsync(IOOptions(), dbg);
645 }
646
647 if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
648 blob_output_directory_ != output_directory_) {
649 io_s = blob_output_directory_->Fsync(IOOptions(), dbg);
650 }
651 }
652 if (io_status_.ok()) {
653 io_status_ = io_s;
654 }
655 if (status.ok()) {
656 status = io_s;
657 }
658 if (status.ok()) {
659 thread_pool.clear();
660 std::vector<const CompactionJob::SubcompactionState::Output*> files_output;
661 for (const auto& state : compact_->sub_compact_states) {
662 for (const auto& output : state.outputs) {
663 files_output.emplace_back(&output);
664 }
665 }
666 ColumnFamilyData* cfd = compact_->compaction->column_family_data();
667 auto prefix_extractor =
668 compact_->compaction->mutable_cf_options()->prefix_extractor.get();
669 std::atomic<size_t> next_file_idx(0);
670 auto verify_table = [&](Status& output_status) {
671 while (true) {
672 size_t file_idx = next_file_idx.fetch_add(1);
673 if (file_idx >= files_output.size()) {
674 break;
675 }
676 // Verify that the table is usable
677 // We set for_compaction to false and don't OptimizeForCompactionTableRead
678 // here because this is a special case after we finish the table building
679 // No matter whether use_direct_io_for_flush_and_compaction is true,
680 // we will regard this verification as user reads since the goal is
681 // to cache it here for further user reads
682 ReadOptions read_options;
683 InternalIterator* iter = cfd->table_cache()->NewIterator(
684 read_options, file_options_, cfd->internal_comparator(),
685 files_output[file_idx]->meta, /*range_del_agg=*/nullptr,
686 prefix_extractor,
687 /*table_reader_ptr=*/nullptr,
688 cfd->internal_stats()->GetFileReadHist(
689 compact_->compaction->output_level()),
690 TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
691 /*skip_filters=*/false, compact_->compaction->output_level(),
692 MaxFileSizeForL0MetaPin(
693 *compact_->compaction->mutable_cf_options()),
694 /*smallest_compaction_key=*/nullptr,
695 /*largest_compaction_key=*/nullptr,
696 /*allow_unprepared_value=*/false);
697 auto s = iter->status();
698
699 if (s.ok() && paranoid_file_checks_) {
700 OutputValidator validator(cfd->internal_comparator(),
701 /*_enable_order_check=*/true,
702 /*_enable_hash=*/true);
703 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
704 s = validator.Add(iter->key(), iter->value());
705 if (!s.ok()) {
706 break;
707 }
708 }
709 if (s.ok()) {
710 s = iter->status();
711 }
712 if (s.ok() &&
713 !validator.CompareValidator(files_output[file_idx]->validator)) {
714 s = Status::Corruption("Paranoid checksums do not match");
715 }
716 }
717
718 delete iter;
719
720 if (!s.ok()) {
721 output_status = s;
722 break;
723 }
724 }
725 };
726 for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
727 thread_pool.emplace_back(verify_table,
728 std::ref(compact_->sub_compact_states[i].status));
729 }
730 verify_table(compact_->sub_compact_states[0].status);
731 for (auto& thread : thread_pool) {
732 thread.join();
733 }
734 for (const auto& state : compact_->sub_compact_states) {
735 if (!state.status.ok()) {
736 status = state.status;
737 break;
738 }
739 }
740 }
741
742 TablePropertiesCollection tp;
743 for (const auto& state : compact_->sub_compact_states) {
744 for (const auto& output : state.outputs) {
745 auto fn =
746 TableFileName(state.compaction->immutable_cf_options()->cf_paths,
747 output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
748 tp[fn] = output.table_properties;
749 }
750 }
751 compact_->compaction->SetOutputTableProperties(std::move(tp));
752
753 // Finish up all book-keeping to unify the subcompaction results
754 AggregateStatistics();
755 UpdateCompactionStats();
756
757 RecordCompactionIOStats();
758 LogFlush(db_options_.info_log);
759 TEST_SYNC_POINT("CompactionJob::Run():End");
760
761 compact_->status = status;
762 return status;
763 }
764
765 Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
766 assert(compact_);
767
768 AutoThreadOperationStageUpdater stage_updater(
769 ThreadStatus::STAGE_COMPACTION_INSTALL);
770 db_mutex_->AssertHeld();
771 Status status = compact_->status;
772
773 ColumnFamilyData* cfd = compact_->compaction->column_family_data();
774 assert(cfd);
775
776 cfd->internal_stats()->AddCompactionStats(
777 compact_->compaction->output_level(), thread_pri_, compaction_stats_);
778
779 if (status.ok()) {
780 status = InstallCompactionResults(mutable_cf_options);
781 }
782 if (!versions_->io_status().ok()) {
783 io_status_ = versions_->io_status();
784 }
785
786 VersionStorageInfo::LevelSummaryStorage tmp;
787 auto vstorage = cfd->current()->storage_info();
788 const auto& stats = compaction_stats_;
789
790 double read_write_amp = 0.0;
791 double write_amp = 0.0;
792 double bytes_read_per_sec = 0;
793 double bytes_written_per_sec = 0;
794
795 if (stats.bytes_read_non_output_levels > 0) {
796 read_write_amp = (stats.bytes_written + stats.bytes_read_output_level +
797 stats.bytes_read_non_output_levels) /
798 static_cast<double>(stats.bytes_read_non_output_levels);
799 write_amp = stats.bytes_written /
800 static_cast<double>(stats.bytes_read_non_output_levels);
801 }
802 if (stats.micros > 0) {
803 bytes_read_per_sec =
804 (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
805 static_cast<double>(stats.micros);
806 bytes_written_per_sec =
807 stats.bytes_written / static_cast<double>(stats.micros);
808 }
809
810 const std::string& column_family_name = cfd->GetName();
811
812 ROCKS_LOG_BUFFER(
813 log_buffer_,
814 "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
815 "files in(%d, %d) out(%d) "
816 "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
817 "write-amplify(%.1f) %s, records in: %" PRIu64
818 ", records dropped: %" PRIu64 " output_compression: %s\n",
819 column_family_name.c_str(), vstorage->LevelSummary(&tmp),
820 bytes_read_per_sec, bytes_written_per_sec,
821 compact_->compaction->output_level(),
822 stats.num_input_files_in_non_output_levels,
823 stats.num_input_files_in_output_level, stats.num_output_files,
824 stats.bytes_read_non_output_levels / 1048576.0,
825 stats.bytes_read_output_level / 1048576.0,
826 stats.bytes_written / 1048576.0, read_write_amp, write_amp,
827 status.ToString().c_str(), stats.num_input_records,
828 stats.num_dropped_records,
829 CompressionTypeToString(compact_->compaction->output_compression())
830 .c_str());
831
832 const auto& blob_files = vstorage->GetBlobFiles();
833 if (!blob_files.empty()) {
834 ROCKS_LOG_BUFFER(log_buffer_,
835 "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64
836 "\n",
837 column_family_name.c_str(), blob_files.begin()->first,
838 blob_files.rbegin()->first);
839 }
840
841 UpdateCompactionJobStats(stats);
842
843 auto stream = event_logger_->LogToBuffer(log_buffer_);
844 stream << "job" << job_id_ << "event"
845 << "compaction_finished"
846 << "compaction_time_micros" << stats.micros
847 << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
848 << compact_->compaction->output_level() << "num_output_files"
849 << compact_->num_output_files << "total_output_size"
850 << compact_->total_bytes;
851
852 if (compact_->num_blob_output_files > 0) {
853 stream << "num_blob_output_files" << compact_->num_blob_output_files
854 << "total_blob_output_size" << compact_->total_blob_bytes;
855 }
856
857 stream << "num_input_records" << stats.num_input_records
858 << "num_output_records" << compact_->num_output_records
859 << "num_subcompactions" << compact_->sub_compact_states.size()
860 << "output_compression"
861 << CompressionTypeToString(compact_->compaction->output_compression());
862
863 stream << "num_single_delete_mismatches"
864 << compaction_job_stats_->num_single_del_mismatch;
865 stream << "num_single_delete_fallthrough"
866 << compaction_job_stats_->num_single_del_fallthru;
867
868 if (measure_io_stats_) {
869 stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos;
870 stream << "file_range_sync_nanos"
871 << compaction_job_stats_->file_range_sync_nanos;
872 stream << "file_fsync_nanos" << compaction_job_stats_->file_fsync_nanos;
873 stream << "file_prepare_write_nanos"
874 << compaction_job_stats_->file_prepare_write_nanos;
875 }
876
877 stream << "lsm_state";
878 stream.StartArray();
879 for (int level = 0; level < vstorage->num_levels(); ++level) {
880 stream << vstorage->NumLevelFiles(level);
881 }
882 stream.EndArray();
883
884 if (!blob_files.empty()) {
885 stream << "blob_file_head" << blob_files.begin()->first;
886 stream << "blob_file_tail" << blob_files.rbegin()->first;
887 }
888
889 CleanupCompaction();
890 return status;
891 }
892
893 void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
894 assert(sub_compact);
895 assert(sub_compact->compaction);
896
897 uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
898
899 ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
900
901 // Create compaction filter and fail the compaction if
902 // IgnoreSnapshots() = false because it is not supported anymore
903 const CompactionFilter* compaction_filter =
904 cfd->ioptions()->compaction_filter;
905 std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
906 if (compaction_filter == nullptr) {
907 compaction_filter_from_factory =
908 sub_compact->compaction->CreateCompactionFilter();
909 compaction_filter = compaction_filter_from_factory.get();
910 }
911 if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
912 sub_compact->status = Status::NotSupported(
913 "CompactionFilter::IgnoreSnapshots() = false is not supported "
914 "anymore.");
915 return;
916 }
917
918 CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
919 existing_snapshots_);
920 ReadOptions read_options;
921 read_options.verify_checksums = true;
922 read_options.fill_cache = false;
923 // Compaction iterators shouldn't be confined to a single prefix.
924 // Compactions use Seek() for
925 // (a) concurrent compactions,
926 // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
927 read_options.total_order_seek = true;
928
929 // Although the v2 aggregator is what the level iterator(s) know about,
930 // the AddTombstones calls will be propagated down to the v1 aggregator.
931 std::unique_ptr<InternalIterator> input(
932 versions_->MakeInputIterator(read_options, sub_compact->compaction,
933 &range_del_agg, file_options_for_read_));
934
935 AutoThreadOperationStageUpdater stage_updater(
936 ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
937
938 // I/O measurement variables
939 PerfLevel prev_perf_level = PerfLevel::kEnableTime;
940 const uint64_t kRecordStatsEvery = 1000;
941 uint64_t prev_write_nanos = 0;
942 uint64_t prev_fsync_nanos = 0;
943 uint64_t prev_range_sync_nanos = 0;
944 uint64_t prev_prepare_write_nanos = 0;
945 uint64_t prev_cpu_write_nanos = 0;
946 uint64_t prev_cpu_read_nanos = 0;
947 if (measure_io_stats_) {
948 prev_perf_level = GetPerfLevel();
949 SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
950 prev_write_nanos = IOSTATS(write_nanos);
951 prev_fsync_nanos = IOSTATS(fsync_nanos);
952 prev_range_sync_nanos = IOSTATS(range_sync_nanos);
953 prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
954 prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
955 prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
956 }
957
958 MergeHelper merge(
959 env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
960 compaction_filter, db_options_.info_log.get(),
961 false /* internal key corruption is expected */,
962 existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
963 snapshot_checker_, compact_->compaction->level(),
964 db_options_.statistics.get());
965
966 const MutableCFOptions* mutable_cf_options =
967 sub_compact->compaction->mutable_cf_options();
968 assert(mutable_cf_options);
969
970 std::vector<std::string> blob_file_paths;
971
972 std::unique_ptr<BlobFileBuilder> blob_file_builder(
973 mutable_cf_options->enable_blob_files
974 ? new BlobFileBuilder(
975 versions_, env_, fs_.get(),
976 sub_compact->compaction->immutable_cf_options(),
977 mutable_cf_options, &file_options_, job_id_, cfd->GetID(),
978 cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_,
979 &blob_file_paths, &sub_compact->blob_file_additions)
980 : nullptr);
981
982 TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
983 TEST_SYNC_POINT_CALLBACK(
984 "CompactionJob::Run():PausingManualCompaction:1",
985 reinterpret_cast<void*>(
986 const_cast<std::atomic<int>*>(manual_compaction_paused_)));
987
988 Slice* start = sub_compact->start;
989 Slice* end = sub_compact->end;
990 if (start != nullptr) {
991 IterKey start_iter;
992 start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
993 input->Seek(start_iter.GetInternalKey());
994 } else {
995 input->SeekToFirst();
996 }
997
998 Status status;
999 const std::string* const full_history_ts_low =
1000 full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
1001 sub_compact->c_iter.reset(new CompactionIterator(
1002 input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
1003 &existing_snapshots_, earliest_write_conflict_snapshot_,
1004 snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
1005 /*expect_valid_internal_key=*/true, &range_del_agg,
1006 blob_file_builder.get(), db_options_.allow_data_in_errors,
1007 sub_compact->compaction, compaction_filter, shutting_down_,
1008 preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log,
1009 full_history_ts_low));
1010 auto c_iter = sub_compact->c_iter.get();
1011 c_iter->SeekToFirst();
1012 if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
1013 // ShouldStopBefore() maintains state based on keys processed so far. The
1014 // compaction loop always calls it on the "next" key, thus won't tell it the
1015 // first key. So we do that here.
1016 sub_compact->ShouldStopBefore(c_iter->key(),
1017 sub_compact->current_output_file_size);
1018 }
1019 const auto& c_iter_stats = c_iter->iter_stats();
1020
1021 std::unique_ptr<SstPartitioner> partitioner =
1022 sub_compact->compaction->output_level() == 0
1023 ? nullptr
1024 : sub_compact->compaction->CreateSstPartitioner();
1025 std::string last_key_for_partitioner;
1026
1027 while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
1028 // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
1029 // returns true.
1030 const Slice& key = c_iter->key();
1031 const Slice& value = c_iter->value();
1032
1033 // If an end key (exclusive) is specified, check if the current key is
1034 // >= than it and exit if it is because the iterator is out of its range
1035 if (end != nullptr &&
1036 cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
1037 break;
1038 }
1039 if (c_iter_stats.num_input_records % kRecordStatsEvery ==
1040 kRecordStatsEvery - 1) {
1041 RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
1042 c_iter->ResetRecordCounts();
1043 RecordCompactionIOStats();
1044 }
1045
1046 // Open output file if necessary
1047 if (sub_compact->builder == nullptr) {
1048 status = OpenCompactionOutputFile(sub_compact);
1049 if (!status.ok()) {
1050 break;
1051 }
1052 }
1053 status = sub_compact->AddToBuilder(key, value);
1054 if (!status.ok()) {
1055 break;
1056 }
1057
1058 sub_compact->current_output_file_size =
1059 sub_compact->builder->EstimatedFileSize();
1060 const ParsedInternalKey& ikey = c_iter->ikey();
1061 sub_compact->current_output()->meta.UpdateBoundaries(
1062 key, value, ikey.sequence, ikey.type);
1063 sub_compact->num_output_records++;
1064
1065 // Close output file if it is big enough. Two possibilities determine it's
1066 // time to close it: (1) the current key should be this file's last key, (2)
1067 // the next key should not be in this file.
1068 //
1069 // TODO(aekmekji): determine if file should be closed earlier than this
1070 // during subcompactions (i.e. if output size, estimated by input size, is
1071 // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
1072 // and 0.6MB instead of 1MB and 0.2MB)
1073 bool output_file_ended = false;
1074 if (sub_compact->compaction->output_level() != 0 &&
1075 sub_compact->current_output_file_size >=
1076 sub_compact->compaction->max_output_file_size()) {
1077 // (1) this key terminates the file. For historical reasons, the iterator
1078 // status before advancing will be given to FinishCompactionOutputFile().
1079 output_file_ended = true;
1080 }
1081 TEST_SYNC_POINT_CALLBACK(
1082 "CompactionJob::Run():PausingManualCompaction:2",
1083 reinterpret_cast<void*>(
1084 const_cast<std::atomic<int>*>(manual_compaction_paused_)));
1085 if (partitioner.get()) {
1086 last_key_for_partitioner.assign(c_iter->user_key().data_,
1087 c_iter->user_key().size_);
1088 }
1089 c_iter->Next();
1090 if (c_iter->status().IsManualCompactionPaused()) {
1091 break;
1092 }
1093 if (!output_file_ended && c_iter->Valid()) {
1094 if (((partitioner.get() &&
1095 partitioner->ShouldPartition(PartitionerRequest(
1096 last_key_for_partitioner, c_iter->user_key(),
1097 sub_compact->current_output_file_size)) == kRequired) ||
1098 (sub_compact->compaction->output_level() != 0 &&
1099 sub_compact->ShouldStopBefore(
1100 c_iter->key(), sub_compact->current_output_file_size))) &&
1101 sub_compact->builder != nullptr) {
1102 // (2) this key belongs to the next file. For historical reasons, the
1103 // iterator status after advancing will be given to
1104 // FinishCompactionOutputFile().
1105 output_file_ended = true;
1106 }
1107 }
1108 if (output_file_ended) {
1109 const Slice* next_key = nullptr;
1110 if (c_iter->Valid()) {
1111 next_key = &c_iter->key();
1112 }
1113 CompactionIterationStats range_del_out_stats;
1114 status = FinishCompactionOutputFile(input->status(), sub_compact,
1115 &range_del_agg, &range_del_out_stats,
1116 next_key);
1117 RecordDroppedKeys(range_del_out_stats,
1118 &sub_compact->compaction_job_stats);
1119 }
1120 }
1121
1122 sub_compact->compaction_job_stats.num_input_deletion_records =
1123 c_iter_stats.num_input_deletion_records;
1124 sub_compact->compaction_job_stats.num_corrupt_keys =
1125 c_iter_stats.num_input_corrupt_records;
1126 sub_compact->compaction_job_stats.num_single_del_fallthru =
1127 c_iter_stats.num_single_del_fallthru;
1128 sub_compact->compaction_job_stats.num_single_del_mismatch =
1129 c_iter_stats.num_single_del_mismatch;
1130 sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
1131 c_iter_stats.total_input_raw_key_bytes;
1132 sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
1133 c_iter_stats.total_input_raw_value_bytes;
1134
1135 RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
1136 c_iter_stats.total_filter_time);
1137 RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
1138 RecordCompactionIOStats();
1139
1140 if (status.ok() && cfd->IsDropped()) {
1141 status =
1142 Status::ColumnFamilyDropped("Column family dropped during compaction");
1143 }
1144 if ((status.ok() || status.IsColumnFamilyDropped()) &&
1145 shutting_down_->load(std::memory_order_relaxed)) {
1146 status = Status::ShutdownInProgress("Database shutdown");
1147 }
1148 if ((status.ok() || status.IsColumnFamilyDropped()) &&
1149 (manual_compaction_paused_ &&
1150 manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) {
1151 status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1152 }
1153 if (status.ok()) {
1154 status = input->status();
1155 }
1156 if (status.ok()) {
1157 status = c_iter->status();
1158 }
1159
1160 if (status.ok() && sub_compact->builder == nullptr &&
1161 sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
1162 // handle subcompaction containing only range deletions
1163 status = OpenCompactionOutputFile(sub_compact);
1164 }
1165
1166 // Call FinishCompactionOutputFile() even if status is not ok: it needs to
1167 // close the output file.
1168 if (sub_compact->builder != nullptr) {
1169 CompactionIterationStats range_del_out_stats;
1170 Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
1171 &range_del_out_stats);
1172 if (!s.ok() && status.ok()) {
1173 status = s;
1174 }
1175 RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
1176 }
1177
1178 if (blob_file_builder) {
1179 if (status.ok()) {
1180 status = blob_file_builder->Finish();
1181 }
1182
1183 blob_file_builder.reset();
1184 }
1185
1186 sub_compact->compaction_job_stats.cpu_micros =
1187 env_->NowCPUNanos() / 1000 - prev_cpu_micros;
1188
1189 if (measure_io_stats_) {
1190 sub_compact->compaction_job_stats.file_write_nanos +=
1191 IOSTATS(write_nanos) - prev_write_nanos;
1192 sub_compact->compaction_job_stats.file_fsync_nanos +=
1193 IOSTATS(fsync_nanos) - prev_fsync_nanos;
1194 sub_compact->compaction_job_stats.file_range_sync_nanos +=
1195 IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
1196 sub_compact->compaction_job_stats.file_prepare_write_nanos +=
1197 IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
1198 sub_compact->compaction_job_stats.cpu_micros -=
1199 (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
1200 IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
1201 1000;
1202 if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
1203 SetPerfLevel(prev_perf_level);
1204 }
1205 }
1206 #ifdef ROCKSDB_ASSERT_STATUS_CHECKED
1207 if (!status.ok()) {
1208 if (sub_compact->c_iter) {
1209 sub_compact->c_iter->status().PermitUncheckedError();
1210 }
1211 if (input) {
1212 input->status().PermitUncheckedError();
1213 }
1214 }
1215 #endif // ROCKSDB_ASSERT_STATUS_CHECKED
1216
1217 sub_compact->c_iter.reset();
1218 input.reset();
1219 sub_compact->status = status;
1220 }
1221
1222 void CompactionJob::RecordDroppedKeys(
1223 const CompactionIterationStats& c_iter_stats,
1224 CompactionJobStats* compaction_job_stats) {
1225 if (c_iter_stats.num_record_drop_user > 0) {
1226 RecordTick(stats_, COMPACTION_KEY_DROP_USER,
1227 c_iter_stats.num_record_drop_user);
1228 }
1229 if (c_iter_stats.num_record_drop_hidden > 0) {
1230 RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
1231 c_iter_stats.num_record_drop_hidden);
1232 if (compaction_job_stats) {
1233 compaction_job_stats->num_records_replaced +=
1234 c_iter_stats.num_record_drop_hidden;
1235 }
1236 }
1237 if (c_iter_stats.num_record_drop_obsolete > 0) {
1238 RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE,
1239 c_iter_stats.num_record_drop_obsolete);
1240 if (compaction_job_stats) {
1241 compaction_job_stats->num_expired_deletion_records +=
1242 c_iter_stats.num_record_drop_obsolete;
1243 }
1244 }
1245 if (c_iter_stats.num_record_drop_range_del > 0) {
1246 RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL,
1247 c_iter_stats.num_record_drop_range_del);
1248 }
1249 if (c_iter_stats.num_range_del_drop_obsolete > 0) {
1250 RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE,
1251 c_iter_stats.num_range_del_drop_obsolete);
1252 }
1253 if (c_iter_stats.num_optimized_del_drop_obsolete > 0) {
1254 RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
1255 c_iter_stats.num_optimized_del_drop_obsolete);
1256 }
1257 }
1258
1259 Status CompactionJob::FinishCompactionOutputFile(
1260 const Status& input_status, SubcompactionState* sub_compact,
1261 CompactionRangeDelAggregator* range_del_agg,
1262 CompactionIterationStats* range_del_out_stats,
1263 const Slice* next_table_min_key /* = nullptr */) {
1264 AutoThreadOperationStageUpdater stage_updater(
1265 ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
1266 assert(sub_compact != nullptr);
1267 assert(sub_compact->outfile);
1268 assert(sub_compact->builder != nullptr);
1269 assert(sub_compact->current_output() != nullptr);
1270
1271 uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
1272 assert(output_number != 0);
1273
1274 ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1275 const Comparator* ucmp = cfd->user_comparator();
1276 std::string file_checksum = kUnknownFileChecksum;
1277 std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
1278
1279 // Check for iterator errors
1280 Status s = input_status;
1281 auto meta = &sub_compact->current_output()->meta;
1282 assert(meta != nullptr);
1283 if (s.ok()) {
1284 Slice lower_bound_guard, upper_bound_guard;
1285 std::string smallest_user_key;
1286 const Slice *lower_bound, *upper_bound;
1287 bool lower_bound_from_sub_compact = false;
1288 if (sub_compact->outputs.size() == 1) {
1289 // For the first output table, include range tombstones before the min key
1290 // but after the subcompaction boundary.
1291 lower_bound = sub_compact->start;
1292 lower_bound_from_sub_compact = true;
1293 } else if (meta->smallest.size() > 0) {
1294 // For subsequent output tables, only include range tombstones from min
1295 // key onwards since the previous file was extended to contain range
1296 // tombstones falling before min key.
1297 smallest_user_key = meta->smallest.user_key().ToString(false /*hex*/);
1298 lower_bound_guard = Slice(smallest_user_key);
1299 lower_bound = &lower_bound_guard;
1300 } else {
1301 lower_bound = nullptr;
1302 }
1303 if (next_table_min_key != nullptr) {
1304 // This may be the last file in the subcompaction in some cases, so we
1305 // need to compare the end key of subcompaction with the next file start
1306 // key. When the end key is chosen by the subcompaction, we know that
1307 // it must be the biggest key in output file. Therefore, it is safe to
1308 // use the smaller key as the upper bound of the output file, to ensure
1309 // that there is no overlapping between different output files.
1310 upper_bound_guard = ExtractUserKey(*next_table_min_key);
1311 if (sub_compact->end != nullptr &&
1312 ucmp->Compare(upper_bound_guard, *sub_compact->end) >= 0) {
1313 upper_bound = sub_compact->end;
1314 } else {
1315 upper_bound = &upper_bound_guard;
1316 }
1317 } else {
1318 // This is the last file in the subcompaction, so extend until the
1319 // subcompaction ends.
1320 upper_bound = sub_compact->end;
1321 }
1322 auto earliest_snapshot = kMaxSequenceNumber;
1323 if (existing_snapshots_.size() > 0) {
1324 earliest_snapshot = existing_snapshots_[0];
1325 }
1326 bool has_overlapping_endpoints;
1327 if (upper_bound != nullptr && meta->largest.size() > 0) {
1328 has_overlapping_endpoints =
1329 ucmp->Compare(meta->largest.user_key(), *upper_bound) == 0;
1330 } else {
1331 has_overlapping_endpoints = false;
1332 }
1333
1334 // The end key of the subcompaction must be bigger or equal to the upper
1335 // bound. If the end of subcompaction is null or the upper bound is null,
1336 // it means that this file is the last file in the compaction. So there
1337 // will be no overlapping between this file and others.
1338 assert(sub_compact->end == nullptr ||
1339 upper_bound == nullptr ||
1340 ucmp->Compare(*upper_bound , *sub_compact->end) <= 0);
1341 auto it = range_del_agg->NewIterator(lower_bound, upper_bound,
1342 has_overlapping_endpoints);
1343 // Position the range tombstone output iterator. There may be tombstone
1344 // fragments that are entirely out of range, so make sure that we do not
1345 // include those.
1346 if (lower_bound != nullptr) {
1347 it->Seek(*lower_bound);
1348 } else {
1349 it->SeekToFirst();
1350 }
1351 TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
1352 for (; it->Valid(); it->Next()) {
1353 auto tombstone = it->Tombstone();
1354 if (upper_bound != nullptr) {
1355 int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
1356 if ((has_overlapping_endpoints && cmp < 0) ||
1357 (!has_overlapping_endpoints && cmp <= 0)) {
1358 // Tombstones starting after upper_bound only need to be included in
1359 // the next table. If the current SST ends before upper_bound, i.e.,
1360 // `has_overlapping_endpoints == false`, we can also skip over range
1361 // tombstones that start exactly at upper_bound. Such range tombstones
1362 // will be included in the next file and are not relevant to the point
1363 // keys or endpoints of the current file.
1364 break;
1365 }
1366 }
1367
1368 if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
1369 // TODO(andrewkr): tombstones that span multiple output files are
1370 // counted for each compaction output file, so lots of double counting.
1371 range_del_out_stats->num_range_del_drop_obsolete++;
1372 range_del_out_stats->num_record_drop_obsolete++;
1373 continue;
1374 }
1375
1376 auto kv = tombstone.Serialize();
1377 assert(lower_bound == nullptr ||
1378 ucmp->Compare(*lower_bound, kv.second) < 0);
1379 // Range tombstone is not supported by output validator yet.
1380 sub_compact->builder->Add(kv.first.Encode(), kv.second);
1381 InternalKey smallest_candidate = std::move(kv.first);
1382 if (lower_bound != nullptr &&
1383 ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
1384 // Pretend the smallest key has the same user key as lower_bound
1385 // (the max key in the previous table or subcompaction) in order for
1386 // files to appear key-space partitioned.
1387 //
1388 // When lower_bound is chosen by a subcompaction, we know that
1389 // subcompactions over smaller keys cannot contain any keys at
1390 // lower_bound. We also know that smaller subcompactions exist, because
1391 // otherwise the subcompaction woud be unbounded on the left. As a
1392 // result, we know that no other files on the output level will contain
1393 // actual keys at lower_bound (an output file may have a largest key of
1394 // lower_bound@kMaxSequenceNumber, but this only indicates a large range
1395 // tombstone was truncated). Therefore, it is safe to use the
1396 // tombstone's sequence number, to ensure that keys at lower_bound at
1397 // lower levels are covered by truncated tombstones.
1398 //
1399 // If lower_bound was chosen by the smallest data key in the file,
1400 // choose lowest seqnum so this file's smallest internal key comes after
1401 // the previous file's largest. The fake seqnum is OK because the read
1402 // path's file-picking code only considers user key.
1403 smallest_candidate = InternalKey(
1404 *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
1405 kTypeRangeDeletion);
1406 }
1407 InternalKey largest_candidate = tombstone.SerializeEndKey();
1408 if (upper_bound != nullptr &&
1409 ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
1410 // Pretend the largest key has the same user key as upper_bound (the
1411 // min key in the following table or subcompaction) in order for files
1412 // to appear key-space partitioned.
1413 //
1414 // Choose highest seqnum so this file's largest internal key comes
1415 // before the next file's/subcompaction's smallest. The fake seqnum is
1416 // OK because the read path's file-picking code only considers the user
1417 // key portion.
1418 //
1419 // Note Seek() also creates InternalKey with (user_key,
1420 // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
1421 // kTypeRangeDeletion (0xF), so the range tombstone comes before the
1422 // Seek() key in InternalKey's ordering. So Seek() will look in the
1423 // next file for the user key.
1424 largest_candidate =
1425 InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
1426 }
1427 #ifndef NDEBUG
1428 SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;
1429 if (meta->smallest.size() > 0) {
1430 smallest_ikey_seqnum = GetInternalKeySeqno(meta->smallest.Encode());
1431 }
1432 #endif
1433 meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
1434 tombstone.seq_,
1435 cfd->internal_comparator());
1436
1437 // The smallest key in a file is used for range tombstone truncation, so
1438 // it cannot have a seqnum of 0 (unless the smallest data key in a file
1439 // has a seqnum of 0). Otherwise, the truncated tombstone may expose
1440 // deleted keys at lower levels.
1441 assert(smallest_ikey_seqnum == 0 ||
1442 ExtractInternalKeyFooter(meta->smallest.Encode()) !=
1443 PackSequenceAndType(0, kTypeRangeDeletion));
1444 }
1445 }
1446 const uint64_t current_entries = sub_compact->builder->NumEntries();
1447 if (s.ok()) {
1448 s = sub_compact->builder->Finish();
1449 } else {
1450 sub_compact->builder->Abandon();
1451 }
1452 IOStatus io_s = sub_compact->builder->io_status();
1453 if (s.ok()) {
1454 s = io_s;
1455 }
1456 const uint64_t current_bytes = sub_compact->builder->FileSize();
1457 if (s.ok()) {
1458 meta->fd.file_size = current_bytes;
1459 meta->marked_for_compaction = sub_compact->builder->NeedCompact();
1460 }
1461 sub_compact->current_output()->finished = true;
1462 sub_compact->total_bytes += current_bytes;
1463
1464 // Finish and check for file errors
1465 if (s.ok()) {
1466 StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
1467 io_s = sub_compact->outfile->Sync(db_options_.use_fsync);
1468 }
1469 if (s.ok() && io_s.ok()) {
1470 io_s = sub_compact->outfile->Close();
1471 }
1472 if (s.ok() && io_s.ok()) {
1473 // Add the checksum information to file metadata.
1474 meta->file_checksum = sub_compact->outfile->GetFileChecksum();
1475 meta->file_checksum_func_name =
1476 sub_compact->outfile->GetFileChecksumFuncName();
1477 file_checksum = meta->file_checksum;
1478 file_checksum_func_name = meta->file_checksum_func_name;
1479 }
1480 if (s.ok()) {
1481 s = io_s;
1482 }
1483 if (sub_compact->io_status.ok()) {
1484 sub_compact->io_status = io_s;
1485 // Since this error is really a copy of the
1486 // "normal" status, it does not also need to be checked
1487 sub_compact->io_status.PermitUncheckedError();
1488 }
1489 sub_compact->outfile.reset();
1490
1491 TableProperties tp;
1492 if (s.ok()) {
1493 tp = sub_compact->builder->GetTableProperties();
1494 }
1495
1496 if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
1497 // If there is nothing to output, no necessary to generate a sst file.
1498 // This happens when the output level is bottom level, at the same time
1499 // the sub_compact output nothing.
1500 std::string fname =
1501 TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
1502 meta->fd.GetNumber(), meta->fd.GetPathId());
1503 env_->DeleteFile(fname);
1504
1505 // Also need to remove the file from outputs, or it will be added to the
1506 // VersionEdit.
1507 assert(!sub_compact->outputs.empty());
1508 sub_compact->outputs.pop_back();
1509 meta = nullptr;
1510 }
1511
1512 if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
1513 // Output to event logger and fire events.
1514 sub_compact->current_output()->table_properties =
1515 std::make_shared<TableProperties>(tp);
1516 ROCKS_LOG_INFO(db_options_.info_log,
1517 "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
1518 " keys, %" PRIu64 " bytes%s",
1519 cfd->GetName().c_str(), job_id_, output_number,
1520 current_entries, current_bytes,
1521 meta->marked_for_compaction ? " (need compaction)" : "");
1522 }
1523 std::string fname;
1524 FileDescriptor output_fd;
1525 uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
1526 if (meta != nullptr) {
1527 fname =
1528 TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
1529 meta->fd.GetNumber(), meta->fd.GetPathId());
1530 output_fd = meta->fd;
1531 oldest_blob_file_number = meta->oldest_blob_file_number;
1532 } else {
1533 fname = "(nil)";
1534 }
1535 EventHelpers::LogAndNotifyTableFileCreationFinished(
1536 event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
1537 job_id_, output_fd, oldest_blob_file_number, tp,
1538 TableFileCreationReason::kCompaction, s, file_checksum,
1539 file_checksum_func_name);
1540
1541 #ifndef ROCKSDB_LITE
1542 // Report new file to SstFileManagerImpl
1543 auto sfm =
1544 static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
1545 if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
1546 Status add_s = sfm->OnAddFile(fname);
1547 if (!add_s.ok() && s.ok()) {
1548 s = add_s;
1549 }
1550 if (sfm->IsMaxAllowedSpaceReached()) {
1551 // TODO(ajkr): should we return OK() if max space was reached by the final
1552 // compaction output file (similarly to how flush works when full)?
1553 s = Status::SpaceLimit("Max allowed space was reached");
1554 TEST_SYNC_POINT(
1555 "CompactionJob::FinishCompactionOutputFile:"
1556 "MaxAllowedSpaceReached");
1557 InstrumentedMutexLock l(db_mutex_);
1558 // Should handle return error?
1559 db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction)
1560 .PermitUncheckedError();
1561 }
1562 }
1563 #endif
1564
1565 sub_compact->builder.reset();
1566 sub_compact->current_output_file_size = 0;
1567 return s;
1568 }
1569
1570 Status CompactionJob::InstallCompactionResults(
1571 const MutableCFOptions& mutable_cf_options) {
1572 assert(compact_);
1573
1574 db_mutex_->AssertHeld();
1575
1576 auto* compaction = compact_->compaction;
1577 assert(compaction);
1578
1579 // paranoia: verify that the files that we started with
1580 // still exist in the current version and in the same original level.
1581 // This ensures that a concurrent compaction did not erroneously
1582 // pick the same files to compact_.
1583 if (!versions_->VerifyCompactionFileConsistency(compaction)) {
1584 Compaction::InputLevelSummaryBuffer inputs_summary;
1585
1586 ROCKS_LOG_ERROR(db_options_.info_log, "[%s] [JOB %d] Compaction %s aborted",
1587 compaction->column_family_data()->GetName().c_str(),
1588 job_id_, compaction->InputLevelSummary(&inputs_summary));
1589 return Status::Corruption("Compaction input files inconsistent");
1590 }
1591
1592 {
1593 Compaction::InputLevelSummaryBuffer inputs_summary;
1594 ROCKS_LOG_INFO(db_options_.info_log,
1595 "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
1596 compaction->column_family_data()->GetName().c_str(), job_id_,
1597 compaction->InputLevelSummary(&inputs_summary),
1598 compact_->total_bytes + compact_->total_blob_bytes);
1599 }
1600
1601 VersionEdit* const edit = compaction->edit();
1602 assert(edit);
1603
1604 // Add compaction inputs
1605 compaction->AddInputDeletions(edit);
1606
1607 for (const auto& sub_compact : compact_->sub_compact_states) {
1608 for (const auto& out : sub_compact.outputs) {
1609 edit->AddFile(compaction->output_level(), out.meta);
1610 }
1611
1612 for (const auto& blob : sub_compact.blob_file_additions) {
1613 edit->AddBlobFile(blob);
1614 }
1615 }
1616
1617 return versions_->LogAndApply(compaction->column_family_data(),
1618 mutable_cf_options, edit, db_mutex_,
1619 db_directory_);
1620 }
1621
1622 void CompactionJob::RecordCompactionIOStats() {
1623 RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
1624 RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
1625 CompactionReason compaction_reason =
1626 compact_->compaction->compaction_reason();
1627 if (compaction_reason == CompactionReason::kFilesMarkedForCompaction) {
1628 RecordTick(stats_, COMPACT_READ_BYTES_MARKED, IOSTATS(bytes_read));
1629 RecordTick(stats_, COMPACT_WRITE_BYTES_MARKED, IOSTATS(bytes_written));
1630 } else if (compaction_reason == CompactionReason::kPeriodicCompaction) {
1631 RecordTick(stats_, COMPACT_READ_BYTES_PERIODIC, IOSTATS(bytes_read));
1632 RecordTick(stats_, COMPACT_WRITE_BYTES_PERIODIC, IOSTATS(bytes_written));
1633 } else if (compaction_reason == CompactionReason::kTtl) {
1634 RecordTick(stats_, COMPACT_READ_BYTES_TTL, IOSTATS(bytes_read));
1635 RecordTick(stats_, COMPACT_WRITE_BYTES_TTL, IOSTATS(bytes_written));
1636 }
1637 ThreadStatusUtil::IncreaseThreadOperationProperty(
1638 ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
1639 IOSTATS_RESET(bytes_read);
1640 ThreadStatusUtil::IncreaseThreadOperationProperty(
1641 ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
1642 IOSTATS_RESET(bytes_written);
1643 }
1644
1645 Status CompactionJob::OpenCompactionOutputFile(
1646 SubcompactionState* sub_compact) {
1647 assert(sub_compact != nullptr);
1648 assert(sub_compact->builder == nullptr);
1649 // no need to lock because VersionSet::next_file_number_ is atomic
1650 uint64_t file_number = versions_->NewFileNumber();
1651 std::string fname =
1652 TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
1653 file_number, sub_compact->compaction->output_path_id());
1654 // Fire events.
1655 ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1656 #ifndef ROCKSDB_LITE
1657 EventHelpers::NotifyTableFileCreationStarted(
1658 cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname, job_id_,
1659 TableFileCreationReason::kCompaction);
1660 #endif // !ROCKSDB_LITE
1661 // Make the output file
1662 std::unique_ptr<FSWritableFile> writable_file;
1663 #ifndef NDEBUG
1664 bool syncpoint_arg = file_options_.use_direct_writes;
1665 TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
1666 &syncpoint_arg);
1667 #endif
1668 Status s;
1669 IOStatus io_s =
1670 NewWritableFile(fs_.get(), fname, &writable_file, file_options_);
1671 s = io_s;
1672 if (sub_compact->io_status.ok()) {
1673 sub_compact->io_status = io_s;
1674 // Since this error is really a copy of the io_s that is checked below as s,
1675 // it does not also need to be checked.
1676 sub_compact->io_status.PermitUncheckedError();
1677 }
1678 if (!s.ok()) {
1679 ROCKS_LOG_ERROR(
1680 db_options_.info_log,
1681 "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
1682 " fails at NewWritableFile with status %s",
1683 sub_compact->compaction->column_family_data()->GetName().c_str(),
1684 job_id_, file_number, s.ToString().c_str());
1685 LogFlush(db_options_.info_log);
1686 EventHelpers::LogAndNotifyTableFileCreationFinished(
1687 event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(),
1688 fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
1689 TableProperties(), TableFileCreationReason::kCompaction, s,
1690 kUnknownFileChecksum, kUnknownFileChecksumFuncName);
1691 return s;
1692 }
1693
1694 // Try to figure out the output file's oldest ancester time.
1695 int64_t temp_current_time = 0;
1696 auto get_time_status = env_->GetCurrentTime(&temp_current_time);
1697 // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
1698 if (!get_time_status.ok()) {
1699 ROCKS_LOG_WARN(db_options_.info_log,
1700 "Failed to get current time. Status: %s",
1701 get_time_status.ToString().c_str());
1702 }
1703 uint64_t current_time = static_cast<uint64_t>(temp_current_time);
1704 uint64_t oldest_ancester_time =
1705 sub_compact->compaction->MinInputFileOldestAncesterTime();
1706 if (oldest_ancester_time == port::kMaxUint64) {
1707 oldest_ancester_time = current_time;
1708 }
1709
1710 // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
1711 {
1712 FileMetaData meta;
1713 meta.fd = FileDescriptor(file_number,
1714 sub_compact->compaction->output_path_id(), 0);
1715 meta.oldest_ancester_time = oldest_ancester_time;
1716 meta.file_creation_time = current_time;
1717 sub_compact->outputs.emplace_back(
1718 std::move(meta), cfd->internal_comparator(),
1719 /*enable_order_check=*/
1720 sub_compact->compaction->mutable_cf_options()
1721 ->check_flush_compaction_key_order,
1722 /*enable_hash=*/paranoid_file_checks_);
1723 }
1724
1725 writable_file->SetIOPriority(Env::IOPriority::IO_LOW);
1726 writable_file->SetWriteLifeTimeHint(write_hint_);
1727 writable_file->SetPreallocationBlockSize(static_cast<size_t>(
1728 sub_compact->compaction->OutputFilePreallocationSize()));
1729 const auto& listeners =
1730 sub_compact->compaction->immutable_cf_options()->listeners;
1731 sub_compact->outfile.reset(new WritableFileWriter(
1732 std::move(writable_file), fname, file_options_, env_, io_tracer_,
1733 db_options_.statistics.get(), listeners,
1734 db_options_.file_checksum_gen_factory.get()));
1735
1736 // If the Column family flag is to only optimize filters for hits,
1737 // we can skip creating filters if this is the bottommost_level where
1738 // data is going to be found
1739 bool skip_filters =
1740 cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
1741
1742 sub_compact->builder.reset(NewTableBuilder(
1743 *cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
1744 cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
1745 cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
1746 sub_compact->compaction->output_compression(),
1747 0 /*sample_for_compression */,
1748 sub_compact->compaction->output_compression_opts(),
1749 sub_compact->compaction->output_level(), skip_filters,
1750 oldest_ancester_time, 0 /* oldest_key_time */,
1751 sub_compact->compaction->max_output_file_size(), current_time, db_id_,
1752 db_session_id_));
1753 LogFlush(db_options_.info_log);
1754 return s;
1755 }
1756
1757 void CompactionJob::CleanupCompaction() {
1758 for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
1759 const auto& sub_status = sub_compact.status;
1760
1761 if (sub_compact.builder != nullptr) {
1762 // May happen if we get a shutdown call in the middle of compaction
1763 sub_compact.builder->Abandon();
1764 sub_compact.builder.reset();
1765 } else {
1766 assert(!sub_status.ok() || sub_compact.outfile == nullptr);
1767 }
1768 for (const auto& out : sub_compact.outputs) {
1769 // If this file was inserted into the table cache then remove
1770 // them here because this compaction was not committed.
1771 if (!sub_status.ok()) {
1772 TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber());
1773 }
1774 }
1775 // TODO: sub_compact.io_status is not checked like status. Not sure if thats
1776 // intentional. So ignoring the io_status as of now.
1777 sub_compact.io_status.PermitUncheckedError();
1778 }
1779 delete compact_;
1780 compact_ = nullptr;
1781 }
1782
1783 #ifndef ROCKSDB_LITE
1784 namespace {
1785 void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
1786 assert(prefix_length > 0);
1787 size_t length = src.size() > prefix_length ? prefix_length : src.size();
1788 dst->assign(src.data(), length);
1789 }
1790 } // namespace
1791
1792 #endif // !ROCKSDB_LITE
1793
1794 void CompactionJob::UpdateCompactionStats() {
1795 assert(compact_);
1796
1797 Compaction* compaction = compact_->compaction;
1798 compaction_stats_.num_input_files_in_non_output_levels = 0;
1799 compaction_stats_.num_input_files_in_output_level = 0;
1800 for (int input_level = 0;
1801 input_level < static_cast<int>(compaction->num_input_levels());
1802 ++input_level) {
1803 if (compaction->level(input_level) != compaction->output_level()) {
1804 UpdateCompactionInputStatsHelper(
1805 &compaction_stats_.num_input_files_in_non_output_levels,
1806 &compaction_stats_.bytes_read_non_output_levels, input_level);
1807 } else {
1808 UpdateCompactionInputStatsHelper(
1809 &compaction_stats_.num_input_files_in_output_level,
1810 &compaction_stats_.bytes_read_output_level, input_level);
1811 }
1812 }
1813
1814 compaction_stats_.num_output_files =
1815 static_cast<int>(compact_->num_output_files) +
1816 static_cast<int>(compact_->num_blob_output_files);
1817 compaction_stats_.bytes_written =
1818 compact_->total_bytes + compact_->total_blob_bytes;
1819
1820 if (compaction_stats_.num_input_records > compact_->num_output_records) {
1821 compaction_stats_.num_dropped_records =
1822 compaction_stats_.num_input_records - compact_->num_output_records;
1823 }
1824 }
1825
1826 void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
1827 uint64_t* bytes_read,
1828 int input_level) {
1829 const Compaction* compaction = compact_->compaction;
1830 auto num_input_files = compaction->num_input_files(input_level);
1831 *num_files += static_cast<int>(num_input_files);
1832
1833 for (size_t i = 0; i < num_input_files; ++i) {
1834 const auto* file_meta = compaction->input(input_level, i);
1835 *bytes_read += file_meta->fd.GetFileSize();
1836 compaction_stats_.num_input_records +=
1837 static_cast<uint64_t>(file_meta->num_entries);
1838 }
1839 }
1840
1841 void CompactionJob::UpdateCompactionJobStats(
1842 const InternalStats::CompactionStats& stats) const {
1843 #ifndef ROCKSDB_LITE
1844 compaction_job_stats_->elapsed_micros = stats.micros;
1845
1846 // input information
1847 compaction_job_stats_->total_input_bytes =
1848 stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
1849 compaction_job_stats_->num_input_records = stats.num_input_records;
1850 compaction_job_stats_->num_input_files =
1851 stats.num_input_files_in_non_output_levels +
1852 stats.num_input_files_in_output_level;
1853 compaction_job_stats_->num_input_files_at_output_level =
1854 stats.num_input_files_in_output_level;
1855
1856 // output information
1857 compaction_job_stats_->total_output_bytes = stats.bytes_written;
1858 compaction_job_stats_->num_output_records = compact_->num_output_records;
1859 compaction_job_stats_->num_output_files = stats.num_output_files;
1860
1861 if (stats.num_output_files > 0) {
1862 CopyPrefix(compact_->SmallestUserKey(),
1863 CompactionJobStats::kMaxPrefixLength,
1864 &compaction_job_stats_->smallest_output_key_prefix);
1865 CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength,
1866 &compaction_job_stats_->largest_output_key_prefix);
1867 }
1868 #else
1869 (void)stats;
1870 #endif // !ROCKSDB_LITE
1871 }
1872
1873 void CompactionJob::LogCompaction() {
1874 Compaction* compaction = compact_->compaction;
1875 ColumnFamilyData* cfd = compaction->column_family_data();
1876
1877 // Let's check if anything will get logged. Don't prepare all the info if
1878 // we're not logging
1879 if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
1880 Compaction::InputLevelSummaryBuffer inputs_summary;
1881 ROCKS_LOG_INFO(
1882 db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f",
1883 cfd->GetName().c_str(), job_id_,
1884 compaction->InputLevelSummary(&inputs_summary), compaction->score());
1885 char scratch[2345];
1886 compaction->Summary(scratch, sizeof(scratch));
1887 ROCKS_LOG_INFO(db_options_.info_log, "[%s] Compaction start summary: %s\n",
1888 cfd->GetName().c_str(), scratch);
1889 // build event logger report
1890 auto stream = event_logger_->Log();
1891 stream << "job" << job_id_ << "event"
1892 << "compaction_started"
1893 << "compaction_reason"
1894 << GetCompactionReasonString(compaction->compaction_reason());
1895 for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
1896 stream << ("files_L" + ToString(compaction->level(i)));
1897 stream.StartArray();
1898 for (auto f : *compaction->inputs(i)) {
1899 stream << f->fd.GetNumber();
1900 }
1901 stream.EndArray();
1902 }
1903 stream << "score" << compaction->score() << "input_data_size"
1904 << compaction->CalculateTotalInputSize();
1905 }
1906 }
1907
1908 } // namespace ROCKSDB_NAMESPACE