#include <algorithm>
#include <cinttypes>
-#include <functional>
-#include <list>
#include <memory>
-#include <random>
+#include <optional>
#include <set>
-#include <thread>
#include <utility>
#include <vector>
+#include "db/blob/blob_counting_iterator.h"
#include "db/blob/blob_file_addition.h"
#include "db/blob/blob_file_builder.h"
#include "db/builder.h"
+#include "db/compaction/clipping_iterator.h"
+#include "db/compaction/compaction_state.h"
#include "db/db_impl/db_impl.h"
-#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/error_handler.h"
#include "db/event_helpers.h"
-#include "db/log_reader.h"
+#include "db/history_trimming_iterator.h"
#include "db/log_writer.h"
-#include "db/memtable.h"
-#include "db/memtable_list.h"
-#include "db/merge_context.h"
#include "db/merge_helper.h"
-#include "db/output_validator.h"
#include "db/range_del_aggregator.h"
+#include "db/version_edit.h"
#include "db/version_set.h"
#include "file/filename.h"
#include "file/read_write_util.h"
#include "logging/log_buffer.h"
#include "logging/logging.h"
#include "monitoring/iostats_context_imp.h"
-#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_util.h"
+#include "options/configurable_helper.h"
+#include "options/options_helper.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
-#include "rocksdb/sst_partitioner.h"
+#include "rocksdb/options.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
-#include "table/block_based/block.h"
-#include "table/block_based/block_based_table_factory.h"
+#include "rocksdb/utilities/options_type.h"
#include "table/merging_iterator.h"
#include "table/table_builder.h"
+#include "table/unique_id_impl.h"
#include "test_util/sync_point.h"
-#include "util/coding.h"
-#include "util/hash.h"
-#include "util/mutexlock.h"
-#include "util/random.h"
#include "util/stop_watch.h"
-#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
return "ExternalSstIngestion";
case CompactionReason::kPeriodicCompaction:
return "PeriodicCompaction";
+ case CompactionReason::kChangeTemperature:
+ return "ChangeTemperature";
+ case CompactionReason::kForcedBlobGC:
+ return "ForcedBlobGC";
+ case CompactionReason::kRoundRobinTtl:
+ return "RoundRobinTtl";
case CompactionReason::kNumOfReasons:
// fall through
default:
}
}
-// Maintains state for each sub-compaction
-struct CompactionJob::SubcompactionState {
- const Compaction* compaction;
- std::unique_ptr<CompactionIterator> c_iter;
-
- // The boundaries of the key-range this compaction is interested in. No two
- // subcompactions may have overlapping key-ranges.
- // 'start' is inclusive, 'end' is exclusive, and nullptr means unbounded
- Slice *start, *end;
-
- // The return status of this subcompaction
- Status status;
-
- // The return IO Status of this subcompaction
- IOStatus io_status;
-
- // Files produced by this subcompaction
- struct Output {
- Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
- bool _enable_order_check, bool _enable_hash)
- : meta(std::move(_meta)),
- validator(_icmp, _enable_order_check, _enable_hash),
- finished(false) {}
- FileMetaData meta;
- OutputValidator validator;
- bool finished;
- std::shared_ptr<const TableProperties> table_properties;
- };
-
- // State kept for output being generated
- std::vector<Output> outputs;
- std::vector<BlobFileAddition> blob_file_additions;
- std::unique_ptr<WritableFileWriter> outfile;
- std::unique_ptr<TableBuilder> builder;
-
- Output* current_output() {
- if (outputs.empty()) {
- // This subcompaction's output could be empty if compaction was aborted
- // before this subcompaction had a chance to generate any output files.
- // When subcompactions are executed sequentially this is more likely and
- // will be particulalry likely for the later subcompactions to be empty.
- // Once they are run in parallel however it should be much rarer.
- return nullptr;
- } else {
- return &outputs.back();
- }
- }
-
- uint64_t current_output_file_size = 0;
-
- // State during the subcompaction
- uint64_t total_bytes = 0;
- uint64_t num_output_records = 0;
- CompactionJobStats compaction_job_stats;
- uint64_t approx_size = 0;
- // An index that used to speed up ShouldStopBefore().
- size_t grandparent_index = 0;
- // The number of bytes overlapping between the current output and
- // grandparent files used in ShouldStopBefore().
- uint64_t overlapped_bytes = 0;
- // A flag determine whether the key has been seen in ShouldStopBefore()
- bool seen_key = false;
-
- SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size)
- : compaction(c), start(_start), end(_end), approx_size(size) {
- assert(compaction != nullptr);
- }
-
- // Adds the key and value to the builder
- // If paranoid is true, adds the key-value to the paranoid hash
- Status AddToBuilder(const Slice& key, const Slice& value) {
- auto curr = current_output();
- assert(builder != nullptr);
- assert(curr != nullptr);
- Status s = curr->validator.Add(key, value);
- if (!s.ok()) {
- return s;
- }
- builder->Add(key, value);
- return Status::OK();
- }
-
- // Returns true iff we should stop building the current output
- // before processing "internal_key".
- bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) {
- const InternalKeyComparator* icmp =
- &compaction->column_family_data()->internal_comparator();
- const std::vector<FileMetaData*>& grandparents = compaction->grandparents();
-
- // Scan to find earliest grandparent file that contains key.
- while (grandparent_index < grandparents.size() &&
- icmp->Compare(internal_key,
- grandparents[grandparent_index]->largest.Encode()) >
- 0) {
- if (seen_key) {
- overlapped_bytes += grandparents[grandparent_index]->fd.GetFileSize();
- }
- assert(grandparent_index + 1 >= grandparents.size() ||
- icmp->Compare(
- grandparents[grandparent_index]->largest.Encode(),
- grandparents[grandparent_index + 1]->smallest.Encode()) <= 0);
- grandparent_index++;
- }
- seen_key = true;
-
- if (overlapped_bytes + curr_file_size >
- compaction->max_compaction_bytes()) {
- // Too much overlap for current output; start new output
- overlapped_bytes = 0;
- return true;
- }
-
- return false;
- }
-};
-
-// Maintains state for the entire compaction
-struct CompactionJob::CompactionState {
- Compaction* const compaction;
-
- // REQUIRED: subcompaction states are stored in order of increasing
- // key-range
- std::vector<CompactionJob::SubcompactionState> sub_compact_states;
- Status status;
-
- size_t num_output_files = 0;
- uint64_t total_bytes = 0;
- size_t num_blob_output_files = 0;
- uint64_t total_blob_bytes = 0;
- uint64_t num_output_records = 0;
-
- explicit CompactionState(Compaction* c) : compaction(c) {}
-
- Slice SmallestUserKey() {
- for (const auto& sub_compact_state : sub_compact_states) {
- if (!sub_compact_state.outputs.empty() &&
- sub_compact_state.outputs[0].finished) {
- return sub_compact_state.outputs[0].meta.smallest.user_key();
- }
- }
- // If there is no finished output, return an empty slice.
- return Slice(nullptr, 0);
- }
-
- Slice LargestUserKey() {
- for (auto it = sub_compact_states.rbegin(); it < sub_compact_states.rend();
- ++it) {
- if (!it->outputs.empty() && it->current_output()->finished) {
- assert(it->current_output() != nullptr);
- return it->current_output()->meta.largest.user_key();
- }
- }
- // If there is no finished output, return an empty slice.
- return Slice(nullptr, 0);
- }
-};
-
-void CompactionJob::AggregateStatistics() {
- assert(compact_);
-
- for (SubcompactionState& sc : compact_->sub_compact_states) {
- auto& outputs = sc.outputs;
-
- if (!outputs.empty() && !outputs.back().meta.fd.file_size) {
- // An error occurred, so ignore the last output.
- outputs.pop_back();
- }
-
- compact_->num_output_files += outputs.size();
- compact_->total_bytes += sc.total_bytes;
-
- const auto& blobs = sc.blob_file_additions;
-
- compact_->num_blob_output_files += blobs.size();
-
- for (const auto& blob : blobs) {
- compact_->total_blob_bytes += blob.GetTotalBlobBytes();
- }
-
- compact_->num_output_records += sc.num_output_records;
-
- compaction_job_stats_->Add(sc.compaction_job_stats);
+const char* GetCompactionPenultimateOutputRangeTypeString(
+ Compaction::PenultimateOutputRangeType range_type) {
+ switch (range_type) {
+ case Compaction::PenultimateOutputRangeType::kNotSupported:
+ return "NotSupported";
+ case Compaction::PenultimateOutputRangeType::kFullRange:
+ return "FullRange";
+ case Compaction::PenultimateOutputRangeType::kNonLastRange:
+ return "NonLastRange";
+ case Compaction::PenultimateOutputRangeType::kDisabled:
+ return "Disabled";
+ default:
+ assert(false);
+ return "Invalid";
}
}
CompactionJob::CompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
- const FileOptions& file_options, VersionSet* versions,
- const std::atomic<bool>* shutting_down,
- const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
- FSDirectory* db_directory, FSDirectory* output_directory,
- FSDirectory* blob_output_directory, Statistics* stats,
- InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
+ const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
+ VersionSet* versions, const std::atomic<bool>* shutting_down,
+ LogBuffer* log_buffer, FSDirectory* db_directory,
+ FSDirectory* output_directory, FSDirectory* blob_output_directory,
+ Statistics* stats, InstrumentedMutex* db_mutex,
+ ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
- const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
- EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
- const std::string& dbname, CompactionJobStats* compaction_job_stats,
- Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
- const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
- const std::string& db_session_id, std::string full_history_ts_low)
- : job_id_(job_id),
- compact_(new CompactionState(compaction)),
- compaction_job_stats_(compaction_job_stats),
+ const SnapshotChecker* snapshot_checker, JobContext* job_context,
+ std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
+ bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
+ CompactionJobStats* compaction_job_stats, Env::Priority thread_pri,
+ const std::shared_ptr<IOTracer>& io_tracer,
+ const std::atomic<bool>& manual_compaction_canceled,
+ const std::string& db_id, const std::string& db_session_id,
+ std::string full_history_ts_low, std::string trim_ts,
+ BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled,
+ int* bg_bottom_compaction_scheduled)
+ : compact_(new CompactionState(compaction)),
compaction_stats_(compaction->compaction_reason(), 1),
+ db_options_(db_options),
+ mutable_db_options_copy_(mutable_db_options),
+ log_buffer_(log_buffer),
+ output_directory_(output_directory),
+ stats_(stats),
+ bottommost_level_(false),
+ write_hint_(Env::WLTH_NOT_SET),
+ compaction_job_stats_(compaction_job_stats),
+ job_id_(job_id),
dbname_(dbname),
db_id_(db_id),
db_session_id_(db_session_id),
- db_options_(db_options),
file_options_(file_options),
env_(db_options.env),
io_tracer_(io_tracer),
fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
versions_(versions),
shutting_down_(shutting_down),
- manual_compaction_paused_(manual_compaction_paused),
- preserve_deletes_seqnum_(preserve_deletes_seqnum),
- log_buffer_(log_buffer),
+ manual_compaction_canceled_(manual_compaction_canceled),
db_directory_(db_directory),
- output_directory_(output_directory),
blob_output_directory_(blob_output_directory),
- stats_(stats),
db_mutex_(db_mutex),
db_error_handler_(db_error_handler),
existing_snapshots_(std::move(existing_snapshots)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker),
+ job_context_(job_context),
table_cache_(std::move(table_cache)),
event_logger_(event_logger),
- bottommost_level_(false),
paranoid_file_checks_(paranoid_file_checks),
measure_io_stats_(measure_io_stats),
- write_hint_(Env::WLTH_NOT_SET),
thread_pri_(thread_pri),
- full_history_ts_low_(std::move(full_history_ts_low)) {
+ full_history_ts_low_(std::move(full_history_ts_low)),
+ trim_ts_(std::move(trim_ts)),
+ blob_callback_(blob_callback),
+ extra_num_subcompaction_threads_reserved_(0),
+ bg_compaction_scheduled_(bg_compaction_scheduled),
+ bg_bottom_compaction_scheduled_(bg_bottom_compaction_scheduled) {
assert(compaction_job_stats_ != nullptr);
assert(log_buffer_ != nullptr);
+
const auto* cfd = compact_->compaction->column_family_data();
ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
db_options_.enable_thread_tracking);
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PREPARE);
- // Generate file_levels_ for compaction berfore making Iterator
+ // Generate file_levels_ for compaction before making Iterator
auto* c = compact_->compaction;
- assert(c->column_family_data() != nullptr);
- assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
+ ColumnFamilyData* cfd = c->column_family_data();
+ assert(cfd != nullptr);
+ assert(cfd->current()->storage_info()->NumLevelFiles(
compact_->compaction->level()) > 0);
- write_hint_ =
- c->column_family_data()->CalculateSSTWriteHint(c->output_level());
+ write_hint_ = cfd->CalculateSSTWriteHint(c->output_level());
bottommost_level_ = c->bottommost_level();
if (c->ShouldFormSubcompactions()) {
- {
- StopWatch sw(env_, stats_, SUBCOMPACTION_SETUP_TIME);
- GenSubcompactionBoundaries();
- }
- assert(sizes_.size() == boundaries_.size() + 1);
-
+ StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
+ GenSubcompactionBoundaries();
+ }
+ if (boundaries_.size() > 1) {
for (size_t i = 0; i <= boundaries_.size(); i++) {
- Slice* start = i == 0 ? nullptr : &boundaries_[i - 1];
- Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
- compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
+ compact_->sub_compact_states.emplace_back(
+ c, (i != 0) ? std::optional<Slice>(boundaries_[i - 1]) : std::nullopt,
+ (i != boundaries_.size()) ? std::optional<Slice>(boundaries_[i])
+ : std::nullopt,
+ static_cast<uint32_t>(i));
+ // assert to validate that boundaries don't have same user keys (without
+ // timestamp part).
+ assert(i == 0 || i == boundaries_.size() ||
+ cfd->user_comparator()->CompareWithoutTimestamp(
+ boundaries_[i - 1], boundaries_[i]) < 0);
}
RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
compact_->sub_compact_states.size());
} else {
- constexpr Slice* start = nullptr;
- constexpr Slice* end = nullptr;
- constexpr uint64_t size = 0;
+ compact_->sub_compact_states.emplace_back(c, std::nullopt, std::nullopt,
+ /*sub_job_id*/ 0);
+ }
+
+ // collect all seqno->time information from the input files which will be used
+ // to encode seqno->time to the output files.
+ uint64_t preserve_time_duration =
+ std::max(c->immutable_options()->preserve_internal_time_seconds,
+ c->immutable_options()->preclude_last_level_data_seconds);
+
+ if (preserve_time_duration > 0) {
+ // setup seqno_time_mapping_
+ seqno_time_mapping_.SetMaxTimeDuration(preserve_time_duration);
+ for (const auto& each_level : *c->inputs()) {
+ for (const auto& fmd : each_level.files) {
+ std::shared_ptr<const TableProperties> tp;
+ Status s = cfd->current()->GetTableProperties(&tp, fmd, nullptr);
+ if (s.ok()) {
+ seqno_time_mapping_.Add(tp->seqno_to_time_mapping)
+ .PermitUncheckedError();
+ seqno_time_mapping_.Add(fmd->fd.smallest_seqno,
+ fmd->oldest_ancester_time);
+ }
+ }
+ }
- compact_->sub_compact_states.emplace_back(c, start, end, size);
+ auto status = seqno_time_mapping_.Sort();
+ if (!status.ok()) {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Invalid sequence number to time mapping: Status: %s",
+ status.ToString().c_str());
+ }
+ int64_t _current_time = 0;
+ status = db_options_.clock->GetCurrentTime(&_current_time);
+ if (!status.ok()) {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Failed to get current time in compaction: Status: %s",
+ status.ToString().c_str());
+ // preserve all time information
+ preserve_time_min_seqno_ = 0;
+ preclude_last_level_min_seqno_ = 0;
+ } else {
+ seqno_time_mapping_.TruncateOldEntries(_current_time);
+ uint64_t preserve_time =
+ static_cast<uint64_t>(_current_time) > preserve_time_duration
+ ? _current_time - preserve_time_duration
+ : 0;
+ preserve_time_min_seqno_ =
+ seqno_time_mapping_.GetOldestSequenceNum(preserve_time);
+ if (c->immutable_options()->preclude_last_level_data_seconds > 0) {
+ uint64_t preclude_last_level_time =
+ static_cast<uint64_t>(_current_time) >
+ c->immutable_options()->preclude_last_level_data_seconds
+ ? _current_time -
+ c->immutable_options()->preclude_last_level_data_seconds
+ : 0;
+ preclude_last_level_min_seqno_ =
+ seqno_time_mapping_.GetOldestSequenceNum(preclude_last_level_time);
+ }
+ }
}
}
+uint64_t CompactionJob::GetSubcompactionsLimit() {
+ return extra_num_subcompaction_threads_reserved_ +
+ std::max(
+ std::uint64_t(1),
+ static_cast<uint64_t>(compact_->compaction->max_subcompactions()));
+}
+
+void CompactionJob::AcquireSubcompactionResources(
+ int num_extra_required_subcompactions) {
+ TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:0");
+ TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:1");
+ int max_db_compactions =
+ DBImpl::GetBGJobLimits(
+ mutable_db_options_copy_.max_background_flushes,
+ mutable_db_options_copy_.max_background_compactions,
+ mutable_db_options_copy_.max_background_jobs,
+ versions_->GetColumnFamilySet()
+ ->write_controller()
+ ->NeedSpeedupCompaction())
+ .max_compactions;
+ InstrumentedMutexLock l(db_mutex_);
+ // Apply min function first since We need to compute the extra subcompaction
+ // against compaction limits. And then try to reserve threads for extra
+ // subcompactions. The actual number of reserved threads could be less than
+ // the desired number.
+ int available_bg_compactions_against_db_limit =
+ std::max(max_db_compactions - *bg_compaction_scheduled_ -
+ *bg_bottom_compaction_scheduled_,
+ 0);
+ // Reservation only supports backgrdoun threads of which the priority is
+ // between BOTTOM and HIGH. Need to degrade the priority to HIGH if the
+ // origin thread_pri_ is higher than that. Similar to ReleaseThreads().
+ extra_num_subcompaction_threads_reserved_ =
+ env_->ReserveThreads(std::min(num_extra_required_subcompactions,
+ available_bg_compactions_against_db_limit),
+ std::min(thread_pri_, Env::Priority::HIGH));
+
+ // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
+ // depending on if this compaction has the bottommost priority
+ if (thread_pri_ == Env::Priority::BOTTOM) {
+ *bg_bottom_compaction_scheduled_ +=
+ extra_num_subcompaction_threads_reserved_;
+ } else {
+ *bg_compaction_scheduled_ += extra_num_subcompaction_threads_reserved_;
+ }
+}
+
+void CompactionJob::ShrinkSubcompactionResources(uint64_t num_extra_resources) {
+ // Do nothing when we have zero resources to shrink
+ if (num_extra_resources == 0) return;
+ db_mutex_->Lock();
+ // We cannot release threads more than what we reserved before
+ int extra_num_subcompaction_threads_released = env_->ReleaseThreads(
+ (int)num_extra_resources, std::min(thread_pri_, Env::Priority::HIGH));
+ // Update the number of reserved threads and the number of background
+ // scheduled compactions for this compaction job
+ extra_num_subcompaction_threads_reserved_ -=
+ extra_num_subcompaction_threads_released;
+ // TODO (zichen): design a test case with new subcompaction partitioning
+ // when the number of actual partitions is less than the number of planned
+ // partitions
+ assert(extra_num_subcompaction_threads_released == (int)num_extra_resources);
+ // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
+ // depending on if this compaction has the bottommost priority
+ if (thread_pri_ == Env::Priority::BOTTOM) {
+ *bg_bottom_compaction_scheduled_ -=
+ extra_num_subcompaction_threads_released;
+ } else {
+ *bg_compaction_scheduled_ -= extra_num_subcompaction_threads_released;
+ }
+ db_mutex_->Unlock();
+ TEST_SYNC_POINT("CompactionJob::ShrinkSubcompactionResources:0");
+}
+
+void CompactionJob::ReleaseSubcompactionResources() {
+ if (extra_num_subcompaction_threads_reserved_ == 0) {
+ return;
+ }
+ {
+ InstrumentedMutexLock l(db_mutex_);
+ // The number of reserved threads becomes larger than 0 only if the
+ // compaction prioity is round robin and there is no sufficient
+ // sub-compactions available
+
+ // The scheduled compaction must be no less than 1 + extra number
+ // subcompactions using acquired resources since this compaction job has not
+ // finished yet
+ assert(*bg_bottom_compaction_scheduled_ >=
+ 1 + extra_num_subcompaction_threads_reserved_ ||
+ *bg_compaction_scheduled_ >=
+ 1 + extra_num_subcompaction_threads_reserved_);
+ }
+ ShrinkSubcompactionResources(extra_num_subcompaction_threads_reserved_);
+}
+
struct RangeWithSize {
Range range;
uint64_t size;
};
void CompactionJob::GenSubcompactionBoundaries() {
+ // The goal is to find some boundary keys so that we can evenly partition
+ // the compaction input data into max_subcompactions ranges.
+ // For every input file, we ask TableReader to estimate 128 anchor points
+ // that evenly partition the input file into 128 ranges and the range
+ // sizes. This can be calculated by scanning index blocks of the file.
+ // Once we have the anchor points for all the input files, we merge them
+ // together and try to find keys dividing ranges evenly.
+ // For example, if we have two input files, and each returns following
+ // ranges:
+ // File1: (a1, 1000), (b1, 1200), (c1, 1100)
+ // File2: (a2, 1100), (b2, 1000), (c2, 1000)
+ // We total sort the keys to following:
+ // (a1, 1000), (a2, 1100), (b1, 1200), (b2, 1000), (c1, 1100), (c2, 1000)
+ // We calculate the total size by adding up all ranges' size, which is 6400.
+ // If we would like to partition into 2 subcompactions, the target of the
+ // range size is 3200. Based on the size, we take "b1" as the partition key
+ // since the first three ranges would hit 3200.
+ //
+ // Note that the ranges are actually overlapping. For example, in the example
+ // above, the range ending with "b1" is overlapping with the range ending with
+ // "b2". So the size 1000+1100+1200 is an underestimation of data size up to
+ // "b1". In extreme cases where we only compact N L0 files, a range can
+ // overlap with N-1 other ranges. Since we requested a relatively large number
+ // (128) of ranges from each input files, even N range overlapping would
+ // cause relatively small inaccuracy.
+
auto* c = compact_->compaction;
+ if (c->max_subcompactions() <= 1 &&
+ !(c->immutable_options()->compaction_pri == kRoundRobin &&
+ c->immutable_options()->compaction_style == kCompactionStyleLevel)) {
+ return;
+ }
auto* cfd = c->column_family_data();
const Comparator* cfd_comparator = cfd->user_comparator();
- std::vector<Slice> bounds;
+ const InternalKeyComparator& icomp = cfd->internal_comparator();
+
+ auto* v = compact_->compaction->input_version();
+ int base_level = v->storage_info()->base_level();
+ InstrumentedMutexUnlock unlock_guard(db_mutex_);
+
+ uint64_t total_size = 0;
+ std::vector<TableReader::Anchor> all_anchors;
int start_lvl = c->start_level();
int out_lvl = c->output_level();
- // Add the starting and/or ending key of certain input files as a potential
- // boundary
for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
int lvl = c->level(lvl_idx);
if (lvl >= start_lvl && lvl <= out_lvl) {
continue;
}
- if (lvl == 0) {
- // For level 0 add the starting and ending key of each file since the
- // files may have greatly differing key ranges (not range-partitioned)
- for (size_t i = 0; i < num_files; i++) {
- bounds.emplace_back(flevel->files[i].smallest_key);
- bounds.emplace_back(flevel->files[i].largest_key);
+ for (size_t i = 0; i < num_files; i++) {
+ FileMetaData* f = flevel->files[i].file_metadata;
+ std::vector<TableReader::Anchor> my_anchors;
+ Status s = cfd->table_cache()->ApproximateKeyAnchors(
+ ReadOptions(), icomp, *f, my_anchors);
+ if (!s.ok() || my_anchors.empty()) {
+ my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize());
}
- } else {
- // For all other levels add the smallest/largest key in the level to
- // encompass the range covered by that level
- bounds.emplace_back(flevel->files[0].smallest_key);
- bounds.emplace_back(flevel->files[num_files - 1].largest_key);
- if (lvl == out_lvl) {
- // For the last level include the starting keys of all files since
- // the last level is the largest and probably has the widest key
- // range. Since it's range partitioned, the ending key of one file
- // and the starting key of the next are very close (or identical).
- for (size_t i = 1; i < num_files; i++) {
- bounds.emplace_back(flevel->files[i].smallest_key);
- }
+ for (auto& ac : my_anchors) {
+ // Can be optimize to avoid this loop.
+ total_size += ac.range_size;
}
+
+ all_anchors.insert(all_anchors.end(), my_anchors.begin(),
+ my_anchors.end());
}
}
}
-
- std::sort(bounds.begin(), bounds.end(),
- [cfd_comparator](const Slice& a, const Slice& b) -> bool {
- return cfd_comparator->Compare(ExtractUserKey(a),
- ExtractUserKey(b)) < 0;
- });
- // Remove duplicated entries from bounds
- bounds.erase(
- std::unique(bounds.begin(), bounds.end(),
- [cfd_comparator](const Slice& a, const Slice& b) -> bool {
- return cfd_comparator->Compare(ExtractUserKey(a),
- ExtractUserKey(b)) == 0;
+ // Here we total sort all the anchor points across all files and go through
+ // them in the sorted order to find partitioning boundaries.
+ // Not the most efficient implementation. A much more efficient algorithm
+ // probably exists. But they are more complex. If performance turns out to
+ // be a problem, we can optimize.
+ std::sort(
+ all_anchors.begin(), all_anchors.end(),
+ [cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool {
+ return cfd_comparator->CompareWithoutTimestamp(a.user_key, b.user_key) <
+ 0;
+ });
+
+ // Remove duplicated entries from boundaries.
+ all_anchors.erase(
+ std::unique(all_anchors.begin(), all_anchors.end(),
+ [cfd_comparator](TableReader::Anchor& a,
+ TableReader::Anchor& b) -> bool {
+ return cfd_comparator->CompareWithoutTimestamp(
+ a.user_key, b.user_key) == 0;
}),
- bounds.end());
-
- // Combine consecutive pairs of boundaries into ranges with an approximate
- // size of data covered by keys in that range
- uint64_t sum = 0;
- std::vector<RangeWithSize> ranges;
- // Get input version from CompactionState since it's already referenced
- // earlier in SetInputVersioCompaction::SetInputVersion and will not change
- // when db_mutex_ is released below
- auto* v = compact_->compaction->input_version();
- for (auto it = bounds.begin();;) {
- const Slice a = *it;
- ++it;
-
- if (it == bounds.end()) {
- break;
+ all_anchors.end());
+
+ // Get the number of planned subcompactions, may update reserve threads
+ // and update extra_num_subcompaction_threads_reserved_ for round-robin
+ uint64_t num_planned_subcompactions;
+ if (c->immutable_options()->compaction_pri == kRoundRobin &&
+ c->immutable_options()->compaction_style == kCompactionStyleLevel) {
+ // For round-robin compaction prioity, we need to employ more
+ // subcompactions (may exceed the max_subcompaction limit). The extra
+ // subcompactions will be executed using reserved threads and taken into
+ // account bg_compaction_scheduled or bg_bottom_compaction_scheduled.
+
+ // Initialized by the number of input files
+ num_planned_subcompactions = static_cast<uint64_t>(c->num_input_files(0));
+ uint64_t max_subcompactions_limit = GetSubcompactionsLimit();
+ if (max_subcompactions_limit < num_planned_subcompactions) {
+ // Assert two pointers are not empty so that we can use extra
+ // subcompactions against db compaction limits
+ assert(bg_bottom_compaction_scheduled_ != nullptr);
+ assert(bg_compaction_scheduled_ != nullptr);
+ // Reserve resources when max_subcompaction is not sufficient
+ AcquireSubcompactionResources(
+ (int)(num_planned_subcompactions - max_subcompactions_limit));
+ // Subcompactions limit changes after acquiring additional resources.
+ // Need to call GetSubcompactionsLimit() again to update the number
+ // of planned subcompactions
+ num_planned_subcompactions =
+ std::min(num_planned_subcompactions, GetSubcompactionsLimit());
+ } else {
+ num_planned_subcompactions = max_subcompactions_limit;
}
+ } else {
+ num_planned_subcompactions = GetSubcompactionsLimit();
+ }
- const Slice b = *it;
+ TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:0",
+ &num_planned_subcompactions);
+ if (num_planned_subcompactions == 1) return;
- // ApproximateSize could potentially create table reader iterator to seek
- // to the index block and may incur I/O cost in the process. Unlock db
- // mutex to reduce contention
- db_mutex_->Unlock();
- uint64_t size = versions_->ApproximateSize(SizeApproximationOptions(), v, a,
- b, start_lvl, out_lvl + 1,
- TableReaderCaller::kCompaction);
- db_mutex_->Lock();
- ranges.emplace_back(a, b, size);
- sum += size;
+ // Group the ranges into subcompactions
+ uint64_t target_range_size = std::max(
+ total_size / num_planned_subcompactions,
+ MaxFileSizeForLevel(
+ *(c->mutable_cf_options()), out_lvl,
+ c->immutable_options()->compaction_style, base_level,
+ c->immutable_options()->level_compaction_dynamic_level_bytes));
+
+ if (target_range_size >= total_size) {
+ return;
}
- // Group the ranges into subcompactions
- const double min_file_fill_percent = 4.0 / 5;
- int base_level = v->storage_info()->base_level();
- uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
- sum / min_file_fill_percent /
- MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl,
- c->immutable_cf_options()->compaction_style, base_level,
- c->immutable_cf_options()->level_compaction_dynamic_level_bytes)));
- uint64_t subcompactions =
- std::min({static_cast<uint64_t>(ranges.size()),
- static_cast<uint64_t>(c->max_subcompactions()),
- max_output_files});
-
- if (subcompactions > 1) {
- double mean = sum * 1.0 / subcompactions;
- // Greedily add ranges to the subcompaction until the sum of the ranges'
- // sizes becomes >= the expected mean size of a subcompaction
- sum = 0;
- for (size_t i = 0; i + 1 < ranges.size(); i++) {
- sum += ranges[i].size;
- if (subcompactions == 1) {
- // If there's only one left to schedule then it goes to the end so no
- // need to put an end boundary
- continue;
- }
- if (sum >= mean) {
- boundaries_.emplace_back(ExtractUserKey(ranges[i].range.limit));
- sizes_.emplace_back(sum);
- subcompactions--;
- sum = 0;
- }
+ uint64_t next_threshold = target_range_size;
+ uint64_t cumulative_size = 0;
+ uint64_t num_actual_subcompactions = 1U;
+ for (TableReader::Anchor& anchor : all_anchors) {
+ cumulative_size += anchor.range_size;
+ if (cumulative_size > next_threshold) {
+ next_threshold += target_range_size;
+ num_actual_subcompactions++;
+ boundaries_.push_back(anchor.user_key);
+ }
+ if (num_actual_subcompactions == num_planned_subcompactions) {
+ break;
}
- sizes_.emplace_back(sum + ranges.back().size);
- } else {
- // Only one range so its size is the total sum of sizes computed above
- sizes_.emplace_back(sum);
}
+ TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:1",
+ &num_actual_subcompactions);
+ // Shrink extra subcompactions resources when extra resrouces are acquired
+ ShrinkSubcompactionResources(
+ std::min((int)(num_planned_subcompactions - num_actual_subcompactions),
+ extra_num_subcompaction_threads_reserved_));
}
Status CompactionJob::Run() {
const size_t num_threads = compact_->sub_compact_states.size();
assert(num_threads > 0);
- const uint64_t start_micros = env_->NowMicros();
+ const uint64_t start_micros = db_options_.clock->NowMicros();
// Launch a thread for each of subcompactions 1...num_threads-1
std::vector<port::Thread> thread_pool;
thread.join();
}
- compaction_stats_.micros = env_->NowMicros() - start_micros;
- compaction_stats_.cpu_micros = 0;
- for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
- compaction_stats_.cpu_micros +=
- compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
+ compaction_stats_.SetMicros(db_options_.clock->NowMicros() - start_micros);
+
+ for (auto& state : compact_->sub_compact_states) {
+ compaction_stats_.AddCpuMicros(state.compaction_job_stats.cpu_micros);
+ state.RemoveLastEmptyOutput();
}
- RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
+ RecordTimeToHistogram(stats_, COMPACTION_TIME,
+ compaction_stats_.stats.micros);
RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
- compaction_stats_.cpu_micros);
+ compaction_stats_.stats.cpu_micros);
TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
break;
}
- if (!state.blob_file_additions.empty()) {
+ if (state.Current().HasBlobFileAdditions()) {
wrote_new_blob_files = true;
}
}
constexpr IODebugContext* dbg = nullptr;
if (output_directory_) {
- io_s = output_directory_->Fsync(IOOptions(), dbg);
+ io_s = output_directory_->FsyncWithDirOptions(
+ IOOptions(), dbg,
+ DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
}
if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
blob_output_directory_ != output_directory_) {
- io_s = blob_output_directory_->Fsync(IOOptions(), dbg);
+ io_s = blob_output_directory_->FsyncWithDirOptions(
+ IOOptions(), dbg,
+ DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
}
}
if (io_status_.ok()) {
}
if (status.ok()) {
thread_pool.clear();
- std::vector<const CompactionJob::SubcompactionState::Output*> files_output;
+ std::vector<const CompactionOutputs::Output*> files_output;
for (const auto& state : compact_->sub_compact_states) {
- for (const auto& output : state.outputs) {
+ for (const auto& output : state.GetOutputs()) {
files_output.emplace_back(&output);
}
}
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
- auto prefix_extractor =
- compact_->compaction->mutable_cf_options()->prefix_extractor.get();
+ auto& prefix_extractor =
+ compact_->compaction->mutable_cf_options()->prefix_extractor;
std::atomic<size_t> next_file_idx(0);
auto verify_table = [&](Status& output_status) {
while (true) {
break;
}
// Verify that the table is usable
- // We set for_compaction to false and don't OptimizeForCompactionTableRead
- // here because this is a special case after we finish the table building
- // No matter whether use_direct_io_for_flush_and_compaction is true,
- // we will regard this verification as user reads since the goal is
- // to cache it here for further user reads
+ // We set for_compaction to false and don't
+ // OptimizeForCompactionTableRead here because this is a special case
+ // after we finish the table building No matter whether
+ // use_direct_io_for_flush_and_compaction is true, we will regard this
+ // verification as user reads since the goal is to cache it here for
+ // further user reads
ReadOptions read_options;
InternalIterator* iter = cfd->table_cache()->NewIterator(
read_options, file_options_, cfd->internal_comparator(),
}
};
for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
- thread_pool.emplace_back(verify_table,
- std::ref(compact_->sub_compact_states[i].status));
+ thread_pool.emplace_back(
+ verify_table, std::ref(compact_->sub_compact_states[i].status));
}
verify_table(compact_->sub_compact_states[0].status);
for (auto& thread : thread_pool) {
thread.join();
}
+
for (const auto& state : compact_->sub_compact_states) {
if (!state.status.ok()) {
status = state.status;
}
}
+ ReleaseSubcompactionResources();
+ TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources:0");
+ TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources:1");
+
TablePropertiesCollection tp;
for (const auto& state : compact_->sub_compact_states) {
- for (const auto& output : state.outputs) {
+ for (const auto& output : state.GetOutputs()) {
auto fn =
- TableFileName(state.compaction->immutable_cf_options()->cf_paths,
+ TableFileName(state.compaction->immutable_options()->cf_paths,
output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
tp[fn] = output.table_properties;
}
compact_->compaction->SetOutputTableProperties(std::move(tp));
// Finish up all book-keeping to unify the subcompaction results
- AggregateStatistics();
+ compact_->AggregateCompactionStats(compaction_stats_, *compaction_job_stats_);
UpdateCompactionStats();
RecordCompactionIOStats();
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
assert(cfd);
- cfd->internal_stats()->AddCompactionStats(
- compact_->compaction->output_level(), thread_pri_, compaction_stats_);
+ int output_level = compact_->compaction->output_level();
+ cfd->internal_stats()->AddCompactionStats(output_level, thread_pri_,
+ compaction_stats_);
if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options);
VersionStorageInfo::LevelSummaryStorage tmp;
auto vstorage = cfd->current()->storage_info();
- const auto& stats = compaction_stats_;
+ const auto& stats = compaction_stats_.stats;
double read_write_amp = 0.0;
double write_amp = 0.0;
double bytes_read_per_sec = 0;
double bytes_written_per_sec = 0;
- if (stats.bytes_read_non_output_levels > 0) {
- read_write_amp = (stats.bytes_written + stats.bytes_read_output_level +
- stats.bytes_read_non_output_levels) /
- static_cast<double>(stats.bytes_read_non_output_levels);
- write_amp = stats.bytes_written /
- static_cast<double>(stats.bytes_read_non_output_levels);
+ const uint64_t bytes_read_non_output_and_blob =
+ stats.bytes_read_non_output_levels + stats.bytes_read_blob;
+ const uint64_t bytes_read_all =
+ stats.bytes_read_output_level + bytes_read_non_output_and_blob;
+ const uint64_t bytes_written_all =
+ stats.bytes_written + stats.bytes_written_blob;
+
+ if (bytes_read_non_output_and_blob > 0) {
+ read_write_amp = (bytes_written_all + bytes_read_all) /
+ static_cast<double>(bytes_read_non_output_and_blob);
+ write_amp =
+ bytes_written_all / static_cast<double>(bytes_read_non_output_and_blob);
}
if (stats.micros > 0) {
- bytes_read_per_sec =
- (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
- static_cast<double>(stats.micros);
+ bytes_read_per_sec = bytes_read_all / static_cast<double>(stats.micros);
bytes_written_per_sec =
- stats.bytes_written / static_cast<double>(stats.micros);
+ bytes_written_all / static_cast<double>(stats.micros);
}
const std::string& column_family_name = cfd->GetName();
+ constexpr double kMB = 1048576.0;
+
ROCKS_LOG_BUFFER(
log_buffer_,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
- "files in(%d, %d) out(%d) "
- "MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
- "write-amplify(%.1f) %s, records in: %" PRIu64
+ "files in(%d, %d) out(%d +%d blob) "
+ "MB in(%.1f, %.1f +%.1f blob) out(%.1f +%.1f blob), "
+ "read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64
", records dropped: %" PRIu64 " output_compression: %s\n",
column_family_name.c_str(), vstorage->LevelSummary(&tmp),
bytes_read_per_sec, bytes_written_per_sec,
compact_->compaction->output_level(),
stats.num_input_files_in_non_output_levels,
stats.num_input_files_in_output_level, stats.num_output_files,
- stats.bytes_read_non_output_levels / 1048576.0,
- stats.bytes_read_output_level / 1048576.0,
- stats.bytes_written / 1048576.0, read_write_amp, write_amp,
- status.ToString().c_str(), stats.num_input_records,
+ stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB,
+ stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB,
+ stats.bytes_written / kMB, stats.bytes_written_blob / kMB, read_write_amp,
+ write_amp, status.ToString().c_str(), stats.num_input_records,
stats.num_dropped_records,
CompressionTypeToString(compact_->compaction->output_compression())
.c_str());
const auto& blob_files = vstorage->GetBlobFiles();
if (!blob_files.empty()) {
- ROCKS_LOG_BUFFER(log_buffer_,
- "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64
- "\n",
- column_family_name.c_str(), blob_files.begin()->first,
- blob_files.rbegin()->first);
+ assert(blob_files.front());
+ assert(blob_files.back());
+
+ ROCKS_LOG_BUFFER(
+ log_buffer_,
+ "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n",
+ column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(),
+ blob_files.back()->GetBlobFileNumber());
+ }
+
+ if (compaction_stats_.has_penultimate_level_output) {
+ ROCKS_LOG_BUFFER(
+ log_buffer_,
+ "[%s] has Penultimate Level output: %" PRIu64
+ ", level %d, number of files: %" PRIu64 ", number of records: %" PRIu64,
+ column_family_name.c_str(),
+ compaction_stats_.penultimate_level_stats.bytes_written,
+ compact_->compaction->GetPenultimateLevel(),
+ compaction_stats_.penultimate_level_stats.num_output_files,
+ compaction_stats_.penultimate_level_stats.num_output_records);
}
UpdateCompactionJobStats(stats);
- auto stream = event_logger_->LogToBuffer(log_buffer_);
+ auto stream = event_logger_->LogToBuffer(log_buffer_, 8192);
stream << "job" << job_id_ << "event"
<< "compaction_finished"
<< "compaction_time_micros" << stats.micros
<< "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
<< compact_->compaction->output_level() << "num_output_files"
- << compact_->num_output_files << "total_output_size"
- << compact_->total_bytes;
+ << stats.num_output_files << "total_output_size"
+ << stats.bytes_written;
- if (compact_->num_blob_output_files > 0) {
- stream << "num_blob_output_files" << compact_->num_blob_output_files
- << "total_blob_output_size" << compact_->total_blob_bytes;
+ if (stats.num_output_files_blob > 0) {
+ stream << "num_blob_output_files" << stats.num_output_files_blob
+ << "total_blob_output_size" << stats.bytes_written_blob;
}
stream << "num_input_records" << stats.num_input_records
- << "num_output_records" << compact_->num_output_records
+ << "num_output_records" << stats.num_output_records
<< "num_subcompactions" << compact_->sub_compact_states.size()
<< "output_compression"
<< CompressionTypeToString(compact_->compaction->output_compression());
stream.EndArray();
if (!blob_files.empty()) {
- stream << "blob_file_head" << blob_files.begin()->first;
- stream << "blob_file_tail" << blob_files.rbegin()->first;
+ assert(blob_files.front());
+ stream << "blob_file_head" << blob_files.front()->GetBlobFileNumber();
+
+ assert(blob_files.back());
+ stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber();
+ }
+
+ if (compaction_stats_.has_penultimate_level_output) {
+ InternalStats::CompactionStats& pl_stats =
+ compaction_stats_.penultimate_level_stats;
+ stream << "penultimate_level_num_output_files" << pl_stats.num_output_files;
+ stream << "penultimate_level_bytes_written" << pl_stats.bytes_written;
+ stream << "penultimate_level_num_output_records"
+ << pl_stats.num_output_records;
+ stream << "penultimate_level_num_output_files_blob"
+ << pl_stats.num_output_files_blob;
+ stream << "penultimate_level_bytes_written_blob"
+ << pl_stats.bytes_written_blob;
}
CleanupCompaction();
return status;
}
+void CompactionJob::NotifyOnSubcompactionBegin(
+ SubcompactionState* sub_compact) {
+#ifndef ROCKSDB_LITE
+ Compaction* c = compact_->compaction;
+
+ if (db_options_.listeners.empty()) {
+ return;
+ }
+ if (shutting_down_->load(std::memory_order_acquire)) {
+ return;
+ }
+ if (c->is_manual_compaction() &&
+ manual_compaction_canceled_.load(std::memory_order_acquire)) {
+ return;
+ }
+
+ sub_compact->notify_on_subcompaction_completion = true;
+
+ SubcompactionJobInfo info{};
+ sub_compact->BuildSubcompactionJobInfo(info);
+ info.job_id = static_cast<int>(job_id_);
+ info.thread_id = env_->GetThreadID();
+
+ for (const auto& listener : db_options_.listeners) {
+ listener->OnSubcompactionBegin(info);
+ }
+ info.status.PermitUncheckedError();
+
+#else
+ (void)sub_compact;
+#endif // ROCKSDB_LITE
+}
+
+void CompactionJob::NotifyOnSubcompactionCompleted(
+ SubcompactionState* sub_compact) {
+#ifndef ROCKSDB_LITE
+
+ if (db_options_.listeners.empty()) {
+ return;
+ }
+ if (shutting_down_->load(std::memory_order_acquire)) {
+ return;
+ }
+
+ if (sub_compact->notify_on_subcompaction_completion == false) {
+ return;
+ }
+
+ SubcompactionJobInfo info{};
+ sub_compact->BuildSubcompactionJobInfo(info);
+ info.job_id = static_cast<int>(job_id_);
+ info.thread_id = env_->GetThreadID();
+
+ for (const auto& listener : db_options_.listeners) {
+ listener->OnSubcompactionCompleted(info);
+ }
+#else
+ (void)sub_compact;
+#endif // ROCKSDB_LITE
+}
+
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact);
assert(sub_compact->compaction);
- uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
+#ifndef ROCKSDB_LITE
+ if (db_options_.compaction_service) {
+ CompactionServiceJobStatus comp_status =
+ ProcessKeyValueCompactionWithCompactionService(sub_compact);
+ if (comp_status == CompactionServiceJobStatus::kSuccess ||
+ comp_status == CompactionServiceJobStatus::kFailure) {
+ return;
+ }
+ // fallback to local compaction
+ assert(comp_status == CompactionServiceJobStatus::kUseLocal);
+ }
+#endif // !ROCKSDB_LITE
+
+ uint64_t prev_cpu_micros = db_options_.clock->CPUMicros();
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
return;
}
- CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
- existing_snapshots_);
+ NotifyOnSubcompactionBegin(sub_compact);
+
+ auto range_del_agg = std::make_unique<CompactionRangeDelAggregator>(
+ &cfd->internal_comparator(), existing_snapshots_, &full_history_ts_low_,
+ &trim_ts_);
+
+ // TODO: since we already use C++17, should use
+ // std::optional<const Slice> instead.
+ const std::optional<Slice> start = sub_compact->start;
+ const std::optional<Slice> end = sub_compact->end;
+
+ std::optional<Slice> start_without_ts;
+ std::optional<Slice> end_without_ts;
+
ReadOptions read_options;
read_options.verify_checksums = true;
read_options.fill_cache = false;
+ read_options.rate_limiter_priority = GetRateLimiterPriority();
// Compaction iterators shouldn't be confined to a single prefix.
// Compactions use Seek() for
// (a) concurrent compactions,
// (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
read_options.total_order_seek = true;
+ // Remove the timestamps from boundaries because boundaries created in
+ // GenSubcompactionBoundaries doesn't strip away the timestamp.
+ size_t ts_sz = cfd->user_comparator()->timestamp_size();
+ if (start.has_value()) {
+ read_options.iterate_lower_bound = &start.value();
+ if (ts_sz > 0) {
+ start_without_ts = StripTimestampFromUserKey(start.value(), ts_sz);
+ read_options.iterate_lower_bound = &start_without_ts.value();
+ }
+ }
+ if (end.has_value()) {
+ read_options.iterate_upper_bound = &end.value();
+ if (ts_sz > 0) {
+ end_without_ts = StripTimestampFromUserKey(end.value(), ts_sz);
+ read_options.iterate_upper_bound = &end_without_ts.value();
+ }
+ }
+
// Although the v2 aggregator is what the level iterator(s) know about,
// the AddTombstones calls will be propagated down to the v1 aggregator.
- std::unique_ptr<InternalIterator> input(
- versions_->MakeInputIterator(read_options, sub_compact->compaction,
- &range_del_agg, file_options_for_read_));
+ std::unique_ptr<InternalIterator> raw_input(versions_->MakeInputIterator(
+ read_options, sub_compact->compaction, range_del_agg.get(),
+ file_options_for_read_, start, end));
+ InternalIterator* input = raw_input.get();
+
+ IterKey start_ikey;
+ IterKey end_ikey;
+ Slice start_slice;
+ Slice end_slice;
+
+ static constexpr char kMaxTs[] =
+ "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff";
+ Slice ts_slice;
+ std::string max_ts;
+ if (ts_sz > 0) {
+ if (ts_sz <= strlen(kMaxTs)) {
+ ts_slice = Slice(kMaxTs, ts_sz);
+ } else {
+ max_ts = std::string(ts_sz, '\xff');
+ ts_slice = Slice(max_ts);
+ }
+ }
+
+ if (start.has_value()) {
+ start_ikey.SetInternalKey(start.value(), kMaxSequenceNumber,
+ kValueTypeForSeek);
+ if (ts_sz > 0) {
+ start_ikey.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeek,
+ &ts_slice);
+ }
+ start_slice = start_ikey.GetInternalKey();
+ }
+ if (end.has_value()) {
+ end_ikey.SetInternalKey(end.value(), kMaxSequenceNumber, kValueTypeForSeek);
+ if (ts_sz > 0) {
+ end_ikey.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeek,
+ &ts_slice);
+ }
+ end_slice = end_ikey.GetInternalKey();
+ }
+
+ std::unique_ptr<InternalIterator> clip;
+ if (start.has_value() || end.has_value()) {
+ clip = std::make_unique<ClippingIterator>(
+ raw_input.get(), start.has_value() ? &start_slice : nullptr,
+ end.has_value() ? &end_slice : nullptr, &cfd->internal_comparator());
+ input = clip.get();
+ }
+
+ std::unique_ptr<InternalIterator> blob_counter;
+
+ if (sub_compact->compaction->DoesInputReferenceBlobFiles()) {
+ BlobGarbageMeter* meter = sub_compact->Current().CreateBlobGarbageMeter();
+ blob_counter = std::make_unique<BlobCountingIterator>(input, meter);
+ input = blob_counter.get();
+ }
+
+ std::unique_ptr<InternalIterator> trim_history_iter;
+ if (ts_sz > 0 && !trim_ts_.empty()) {
+ trim_history_iter = std::make_unique<HistoryTrimmingIterator>(
+ input, cfd->user_comparator(), trim_ts_);
+ input = trim_history_iter.get();
+ }
+
+ input->SeekToFirst();
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
}
MergeHelper merge(
- env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
+ env_, cfd->user_comparator(), cfd->ioptions()->merge_operator.get(),
compaction_filter, db_options_.info_log.get(),
false /* internal key corruption is expected */,
existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
- snapshot_checker_, compact_->compaction->level(),
- db_options_.statistics.get());
+ snapshot_checker_, compact_->compaction->level(), db_options_.stats);
const MutableCFOptions* mutable_cf_options =
sub_compact->compaction->mutable_cf_options();
std::vector<std::string> blob_file_paths;
+ // TODO: BlobDB to support output_to_penultimate_level compaction, which needs
+ // 2 builders, so may need to move to `CompactionOutputs`
std::unique_ptr<BlobFileBuilder> blob_file_builder(
- mutable_cf_options->enable_blob_files
+ (mutable_cf_options->enable_blob_files &&
+ sub_compact->compaction->output_level() >=
+ mutable_cf_options->blob_file_starting_level)
? new BlobFileBuilder(
- versions_, env_, fs_.get(),
- sub_compact->compaction->immutable_cf_options(),
- mutable_cf_options, &file_options_, job_id_, cfd->GetID(),
- cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_,
- &blob_file_paths, &sub_compact->blob_file_additions)
+ versions_, fs_.get(),
+ sub_compact->compaction->immutable_options(),
+ mutable_cf_options, &file_options_, db_id_, db_session_id_,
+ job_id_, cfd->GetID(), cfd->GetName(), Env::IOPriority::IO_LOW,
+ write_hint_, io_tracer_, blob_callback_,
+ BlobFileCreationReason::kCompaction, &blob_file_paths,
+ sub_compact->Current().GetBlobFileAdditionsPtr())
: nullptr);
TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:1",
reinterpret_cast<void*>(
- const_cast<std::atomic<int>*>(manual_compaction_paused_)));
-
- Slice* start = sub_compact->start;
- Slice* end = sub_compact->end;
- if (start != nullptr) {
- IterKey start_iter;
- start_iter.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
- input->Seek(start_iter.GetInternalKey());
- } else {
- input->SeekToFirst();
- }
+ const_cast<std::atomic<bool>*>(&manual_compaction_canceled_)));
- Status status;
const std::string* const full_history_ts_low =
full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
- sub_compact->c_iter.reset(new CompactionIterator(
- input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
- &existing_snapshots_, earliest_write_conflict_snapshot_,
+ const SequenceNumber job_snapshot_seq =
+ job_context_ ? job_context_->GetJobSnapshotSequence()
+ : kMaxSequenceNumber;
+
+ auto c_iter = std::make_unique<CompactionIterator>(
+ input, cfd->user_comparator(), &merge, versions_->LastSequence(),
+ &existing_snapshots_, earliest_write_conflict_snapshot_, job_snapshot_seq,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
- /*expect_valid_internal_key=*/true, &range_del_agg,
+ /*expect_valid_internal_key=*/true, range_del_agg.get(),
blob_file_builder.get(), db_options_.allow_data_in_errors,
+ db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
sub_compact->compaction, compaction_filter, shutting_down_,
- preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log,
- full_history_ts_low));
- auto c_iter = sub_compact->c_iter.get();
+ db_options_.info_log, full_history_ts_low, preserve_time_min_seqno_,
+ preclude_last_level_min_seqno_);
c_iter->SeekToFirst();
- if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
- // ShouldStopBefore() maintains state based on keys processed so far. The
- // compaction loop always calls it on the "next" key, thus won't tell it the
- // first key. So we do that here.
- sub_compact->ShouldStopBefore(c_iter->key(),
- sub_compact->current_output_file_size);
- }
+
+ // Assign range delete aggregator to the target output level, which makes sure
+ // it only output to single level
+ sub_compact->AssignRangeDelAggregator(std::move(range_del_agg));
+
const auto& c_iter_stats = c_iter->iter_stats();
- std::unique_ptr<SstPartitioner> partitioner =
- sub_compact->compaction->output_level() == 0
- ? nullptr
- : sub_compact->compaction->CreateSstPartitioner();
- std::string last_key_for_partitioner;
+ // define the open and close functions for the compaction files, which will be
+ // used open/close output files when needed.
+ const CompactionFileOpenFunc open_file_func =
+ [this, sub_compact](CompactionOutputs& outputs) {
+ return this->OpenCompactionOutputFile(sub_compact, outputs);
+ };
+ const CompactionFileCloseFunc close_file_func =
+ [this, sub_compact](CompactionOutputs& outputs, const Status& status,
+ const Slice& next_table_min_key) {
+ return this->FinishCompactionOutputFile(status, sub_compact, outputs,
+ next_table_min_key);
+ };
+ Status status;
+ TEST_SYNC_POINT_CALLBACK(
+ "CompactionJob::ProcessKeyValueCompaction()::Processing",
+ reinterpret_cast<void*>(
+ const_cast<Compaction*>(sub_compact->compaction)));
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
// returns true.
- const Slice& key = c_iter->key();
- const Slice& value = c_iter->value();
- // If an end key (exclusive) is specified, check if the current key is
- // >= than it and exit if it is because the iterator is out of its range
- if (end != nullptr &&
- cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
- break;
- }
+ assert(!end.has_value() || cfd->user_comparator()->Compare(
+ c_iter->user_key(), end.value()) < 0);
+
if (c_iter_stats.num_input_records % kRecordStatsEvery ==
kRecordStatsEvery - 1) {
RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
RecordCompactionIOStats();
}
- // Open output file if necessary
- if (sub_compact->builder == nullptr) {
- status = OpenCompactionOutputFile(sub_compact);
- if (!status.ok()) {
- break;
- }
- }
- status = sub_compact->AddToBuilder(key, value);
+ // Add current compaction_iterator key to target compaction output, if the
+ // output file needs to be close or open, it will call the `open_file_func`
+ // and `close_file_func`.
+ // TODO: it would be better to have the compaction file open/close moved
+ // into `CompactionOutputs` which has the output file information.
+ status = sub_compact->AddToOutput(*c_iter, open_file_func, close_file_func);
if (!status.ok()) {
break;
}
- sub_compact->current_output_file_size =
- sub_compact->builder->EstimatedFileSize();
- const ParsedInternalKey& ikey = c_iter->ikey();
- sub_compact->current_output()->meta.UpdateBoundaries(
- key, value, ikey.sequence, ikey.type);
- sub_compact->num_output_records++;
-
- // Close output file if it is big enough. Two possibilities determine it's
- // time to close it: (1) the current key should be this file's last key, (2)
- // the next key should not be in this file.
- //
- // TODO(aekmekji): determine if file should be closed earlier than this
- // during subcompactions (i.e. if output size, estimated by input size, is
- // going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
- // and 0.6MB instead of 1MB and 0.2MB)
- bool output_file_ended = false;
- if (sub_compact->compaction->output_level() != 0 &&
- sub_compact->current_output_file_size >=
- sub_compact->compaction->max_output_file_size()) {
- // (1) this key terminates the file. For historical reasons, the iterator
- // status before advancing will be given to FinishCompactionOutputFile().
- output_file_ended = true;
- }
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:2",
reinterpret_cast<void*>(
- const_cast<std::atomic<int>*>(manual_compaction_paused_)));
- if (partitioner.get()) {
- last_key_for_partitioner.assign(c_iter->user_key().data_,
- c_iter->user_key().size_);
- }
+ const_cast<std::atomic<bool>*>(&manual_compaction_canceled_)));
c_iter->Next();
if (c_iter->status().IsManualCompactionPaused()) {
break;
}
- if (!output_file_ended && c_iter->Valid()) {
- if (((partitioner.get() &&
- partitioner->ShouldPartition(PartitionerRequest(
- last_key_for_partitioner, c_iter->user_key(),
- sub_compact->current_output_file_size)) == kRequired) ||
- (sub_compact->compaction->output_level() != 0 &&
- sub_compact->ShouldStopBefore(
- c_iter->key(), sub_compact->current_output_file_size))) &&
- sub_compact->builder != nullptr) {
- // (2) this key belongs to the next file. For historical reasons, the
- // iterator status after advancing will be given to
- // FinishCompactionOutputFile().
- output_file_ended = true;
- }
- }
- if (output_file_ended) {
- const Slice* next_key = nullptr;
- if (c_iter->Valid()) {
- next_key = &c_iter->key();
- }
- CompactionIterationStats range_del_out_stats;
- status = FinishCompactionOutputFile(input->status(), sub_compact,
- &range_del_agg, &range_del_out_stats,
- next_key);
- RecordDroppedKeys(range_del_out_stats,
- &sub_compact->compaction_job_stats);
- }
}
+ sub_compact->compaction_job_stats.num_blobs_read =
+ c_iter_stats.num_blobs_read;
+ sub_compact->compaction_job_stats.total_blob_bytes_read =
+ c_iter_stats.total_blob_bytes_read;
sub_compact->compaction_job_stats.num_input_deletion_records =
c_iter_stats.num_input_deletion_records;
sub_compact->compaction_job_stats.num_corrupt_keys =
RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
c_iter_stats.total_filter_time);
+
+ if (c_iter_stats.num_blobs_relocated > 0) {
+ RecordTick(stats_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
+ c_iter_stats.num_blobs_relocated);
+ }
+ if (c_iter_stats.total_blob_bytes_relocated > 0) {
+ RecordTick(stats_, BLOB_DB_GC_BYTES_RELOCATED,
+ c_iter_stats.total_blob_bytes_relocated);
+ }
+
RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
RecordCompactionIOStats();
status = Status::ShutdownInProgress("Database shutdown");
}
if ((status.ok() || status.IsColumnFamilyDropped()) &&
- (manual_compaction_paused_ &&
- manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) {
+ (manual_compaction_canceled_.load(std::memory_order_relaxed))) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
if (status.ok()) {
status = c_iter->status();
}
- if (status.ok() && sub_compact->builder == nullptr &&
- sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
- // handle subcompaction containing only range deletions
- status = OpenCompactionOutputFile(sub_compact);
- }
-
// Call FinishCompactionOutputFile() even if status is not ok: it needs to
- // close the output file.
- if (sub_compact->builder != nullptr) {
- CompactionIterationStats range_del_out_stats;
- Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
- &range_del_out_stats);
- if (!s.ok() && status.ok()) {
- status = s;
- }
- RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
- }
+ // close the output files. Open file function is also passed, in case there's
+ // only range-dels, no file was opened, to save the range-dels, it need to
+ // create a new output file.
+ status = sub_compact->CloseCompactionFiles(status, open_file_func,
+ close_file_func);
if (blob_file_builder) {
if (status.ok()) {
status = blob_file_builder->Finish();
+ } else {
+ blob_file_builder->Abandon(status);
}
-
blob_file_builder.reset();
+ sub_compact->Current().UpdateBlobStats();
}
sub_compact->compaction_job_stats.cpu_micros =
- env_->NowCPUNanos() / 1000 - prev_cpu_micros;
+ db_options_.clock->CPUMicros() - prev_cpu_micros;
if (measure_io_stats_) {
sub_compact->compaction_job_stats.file_write_nanos +=
}
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
if (!status.ok()) {
- if (sub_compact->c_iter) {
- sub_compact->c_iter->status().PermitUncheckedError();
+ if (c_iter) {
+ c_iter->status().PermitUncheckedError();
}
if (input) {
input->status().PermitUncheckedError();
}
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
- sub_compact->c_iter.reset();
- input.reset();
+ blob_counter.reset();
+ clip.reset();
+ raw_input.reset();
sub_compact->status = status;
+ NotifyOnSubcompactionCompleted(sub_compact);
+}
+
+uint64_t CompactionJob::GetCompactionId(SubcompactionState* sub_compact) const {
+ return (uint64_t)job_id_ << 32 | sub_compact->sub_job_id;
}
void CompactionJob::RecordDroppedKeys(
Status CompactionJob::FinishCompactionOutputFile(
const Status& input_status, SubcompactionState* sub_compact,
- CompactionRangeDelAggregator* range_del_agg,
- CompactionIterationStats* range_del_out_stats,
- const Slice* next_table_min_key /* = nullptr */) {
+ CompactionOutputs& outputs, const Slice& next_table_min_key) {
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
assert(sub_compact != nullptr);
- assert(sub_compact->outfile);
- assert(sub_compact->builder != nullptr);
- assert(sub_compact->current_output() != nullptr);
+ assert(outputs.HasBuilder());
- uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
+ FileMetaData* meta = outputs.GetMetaData();
+ uint64_t output_number = meta->fd.GetNumber();
assert(output_number != 0);
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
- const Comparator* ucmp = cfd->user_comparator();
std::string file_checksum = kUnknownFileChecksum;
std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
// Check for iterator errors
Status s = input_status;
- auto meta = &sub_compact->current_output()->meta;
- assert(meta != nullptr);
- if (s.ok()) {
- Slice lower_bound_guard, upper_bound_guard;
- std::string smallest_user_key;
- const Slice *lower_bound, *upper_bound;
- bool lower_bound_from_sub_compact = false;
- if (sub_compact->outputs.size() == 1) {
- // For the first output table, include range tombstones before the min key
- // but after the subcompaction boundary.
- lower_bound = sub_compact->start;
- lower_bound_from_sub_compact = true;
- } else if (meta->smallest.size() > 0) {
- // For subsequent output tables, only include range tombstones from min
- // key onwards since the previous file was extended to contain range
- // tombstones falling before min key.
- smallest_user_key = meta->smallest.user_key().ToString(false /*hex*/);
- lower_bound_guard = Slice(smallest_user_key);
- lower_bound = &lower_bound_guard;
- } else {
- lower_bound = nullptr;
- }
- if (next_table_min_key != nullptr) {
- // This may be the last file in the subcompaction in some cases, so we
- // need to compare the end key of subcompaction with the next file start
- // key. When the end key is chosen by the subcompaction, we know that
- // it must be the biggest key in output file. Therefore, it is safe to
- // use the smaller key as the upper bound of the output file, to ensure
- // that there is no overlapping between different output files.
- upper_bound_guard = ExtractUserKey(*next_table_min_key);
- if (sub_compact->end != nullptr &&
- ucmp->Compare(upper_bound_guard, *sub_compact->end) >= 0) {
- upper_bound = sub_compact->end;
- } else {
- upper_bound = &upper_bound_guard;
- }
- } else {
- // This is the last file in the subcompaction, so extend until the
- // subcompaction ends.
- upper_bound = sub_compact->end;
- }
- auto earliest_snapshot = kMaxSequenceNumber;
- if (existing_snapshots_.size() > 0) {
- earliest_snapshot = existing_snapshots_[0];
- }
- bool has_overlapping_endpoints;
- if (upper_bound != nullptr && meta->largest.size() > 0) {
- has_overlapping_endpoints =
- ucmp->Compare(meta->largest.user_key(), *upper_bound) == 0;
- } else {
- has_overlapping_endpoints = false;
- }
- // The end key of the subcompaction must be bigger or equal to the upper
- // bound. If the end of subcompaction is null or the upper bound is null,
- // it means that this file is the last file in the compaction. So there
- // will be no overlapping between this file and others.
- assert(sub_compact->end == nullptr ||
- upper_bound == nullptr ||
- ucmp->Compare(*upper_bound , *sub_compact->end) <= 0);
- auto it = range_del_agg->NewIterator(lower_bound, upper_bound,
- has_overlapping_endpoints);
- // Position the range tombstone output iterator. There may be tombstone
- // fragments that are entirely out of range, so make sure that we do not
- // include those.
- if (lower_bound != nullptr) {
- it->Seek(*lower_bound);
- } else {
- it->SeekToFirst();
+ // Add range tombstones
+ auto earliest_snapshot = kMaxSequenceNumber;
+ if (existing_snapshots_.size() > 0) {
+ earliest_snapshot = existing_snapshots_[0];
+ }
+ if (s.ok()) {
+ CompactionIterationStats range_del_out_stats;
+ // if the compaction supports per_key_placement, only output range dels to
+ // the penultimate level.
+ // Note: Use `bottommost_level_ = true` for both bottommost and
+ // output_to_penultimate_level compaction here, as it's only used to decide
+ // if range dels could be dropped.
+ if (outputs.HasRangeDel()) {
+ s = outputs.AddRangeDels(
+ sub_compact->start.has_value() ? &(sub_compact->start.value())
+ : nullptr,
+ sub_compact->end.has_value() ? &(sub_compact->end.value()) : nullptr,
+ range_del_out_stats, bottommost_level_, cfd->internal_comparator(),
+ earliest_snapshot, next_table_min_key, full_history_ts_low_);
}
+ RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
- for (; it->Valid(); it->Next()) {
- auto tombstone = it->Tombstone();
- if (upper_bound != nullptr) {
- int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
- if ((has_overlapping_endpoints && cmp < 0) ||
- (!has_overlapping_endpoints && cmp <= 0)) {
- // Tombstones starting after upper_bound only need to be included in
- // the next table. If the current SST ends before upper_bound, i.e.,
- // `has_overlapping_endpoints == false`, we can also skip over range
- // tombstones that start exactly at upper_bound. Such range tombstones
- // will be included in the next file and are not relevant to the point
- // keys or endpoints of the current file.
- break;
- }
- }
+ }
- if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
- // TODO(andrewkr): tombstones that span multiple output files are
- // counted for each compaction output file, so lots of double counting.
- range_del_out_stats->num_range_del_drop_obsolete++;
- range_del_out_stats->num_record_drop_obsolete++;
- continue;
- }
+ const uint64_t current_entries = outputs.NumEntries();
- auto kv = tombstone.Serialize();
- assert(lower_bound == nullptr ||
- ucmp->Compare(*lower_bound, kv.second) < 0);
- // Range tombstone is not supported by output validator yet.
- sub_compact->builder->Add(kv.first.Encode(), kv.second);
- InternalKey smallest_candidate = std::move(kv.first);
- if (lower_bound != nullptr &&
- ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
- // Pretend the smallest key has the same user key as lower_bound
- // (the max key in the previous table or subcompaction) in order for
- // files to appear key-space partitioned.
- //
- // When lower_bound is chosen by a subcompaction, we know that
- // subcompactions over smaller keys cannot contain any keys at
- // lower_bound. We also know that smaller subcompactions exist, because
- // otherwise the subcompaction woud be unbounded on the left. As a
- // result, we know that no other files on the output level will contain
- // actual keys at lower_bound (an output file may have a largest key of
- // lower_bound@kMaxSequenceNumber, but this only indicates a large range
- // tombstone was truncated). Therefore, it is safe to use the
- // tombstone's sequence number, to ensure that keys at lower_bound at
- // lower levels are covered by truncated tombstones.
- //
- // If lower_bound was chosen by the smallest data key in the file,
- // choose lowest seqnum so this file's smallest internal key comes after
- // the previous file's largest. The fake seqnum is OK because the read
- // path's file-picking code only considers user key.
- smallest_candidate = InternalKey(
- *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
- kTypeRangeDeletion);
- }
- InternalKey largest_candidate = tombstone.SerializeEndKey();
- if (upper_bound != nullptr &&
- ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
- // Pretend the largest key has the same user key as upper_bound (the
- // min key in the following table or subcompaction) in order for files
- // to appear key-space partitioned.
- //
- // Choose highest seqnum so this file's largest internal key comes
- // before the next file's/subcompaction's smallest. The fake seqnum is
- // OK because the read path's file-picking code only considers the user
- // key portion.
- //
- // Note Seek() also creates InternalKey with (user_key,
- // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
- // kTypeRangeDeletion (0xF), so the range tombstone comes before the
- // Seek() key in InternalKey's ordering. So Seek() will look in the
- // next file for the user key.
- largest_candidate =
- InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
- }
-#ifndef NDEBUG
- SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;
- if (meta->smallest.size() > 0) {
- smallest_ikey_seqnum = GetInternalKeySeqno(meta->smallest.Encode());
+ s = outputs.Finish(s, seqno_time_mapping_);
+
+ if (s.ok()) {
+ // With accurate smallest and largest key, we can get a slightly more
+ // accurate oldest ancester time.
+ // This makes oldest ancester time in manifest more accurate than in
+ // table properties. Not sure how to resolve it.
+ if (meta->smallest.size() > 0 && meta->largest.size() > 0) {
+ uint64_t refined_oldest_ancester_time;
+ Slice new_smallest = meta->smallest.user_key();
+ Slice new_largest = meta->largest.user_key();
+ if (!new_largest.empty() && !new_smallest.empty()) {
+ refined_oldest_ancester_time =
+ sub_compact->compaction->MinInputFileOldestAncesterTime(
+ &(meta->smallest), &(meta->largest));
+ if (refined_oldest_ancester_time !=
+ std::numeric_limits<uint64_t>::max()) {
+ meta->oldest_ancester_time = refined_oldest_ancester_time;
+ }
}
-#endif
- meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
- tombstone.seq_,
- cfd->internal_comparator());
-
- // The smallest key in a file is used for range tombstone truncation, so
- // it cannot have a seqnum of 0 (unless the smallest data key in a file
- // has a seqnum of 0). Otherwise, the truncated tombstone may expose
- // deleted keys at lower levels.
- assert(smallest_ikey_seqnum == 0 ||
- ExtractInternalKeyFooter(meta->smallest.Encode()) !=
- PackSequenceAndType(0, kTypeRangeDeletion));
}
}
- const uint64_t current_entries = sub_compact->builder->NumEntries();
- if (s.ok()) {
- s = sub_compact->builder->Finish();
- } else {
- sub_compact->builder->Abandon();
- }
- IOStatus io_s = sub_compact->builder->io_status();
- if (s.ok()) {
- s = io_s;
- }
- const uint64_t current_bytes = sub_compact->builder->FileSize();
- if (s.ok()) {
- meta->fd.file_size = current_bytes;
- meta->marked_for_compaction = sub_compact->builder->NeedCompact();
- }
- sub_compact->current_output()->finished = true;
- sub_compact->total_bytes += current_bytes;
// Finish and check for file errors
- if (s.ok()) {
- StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
- io_s = sub_compact->outfile->Sync(db_options_.use_fsync);
- }
- if (s.ok() && io_s.ok()) {
- io_s = sub_compact->outfile->Close();
- }
+ IOStatus io_s = outputs.WriterSyncClose(s, db_options_.clock, stats_,
+ db_options_.use_fsync);
+
if (s.ok() && io_s.ok()) {
- // Add the checksum information to file metadata.
- meta->file_checksum = sub_compact->outfile->GetFileChecksum();
- meta->file_checksum_func_name =
- sub_compact->outfile->GetFileChecksumFuncName();
file_checksum = meta->file_checksum;
file_checksum_func_name = meta->file_checksum_func_name;
}
+
if (s.ok()) {
s = io_s;
}
// "normal" status, it does not also need to be checked
sub_compact->io_status.PermitUncheckedError();
}
- sub_compact->outfile.reset();
TableProperties tp;
if (s.ok()) {
- tp = sub_compact->builder->GetTableProperties();
+ tp = outputs.GetTableProperties();
}
if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
// This happens when the output level is bottom level, at the same time
// the sub_compact output nothing.
std::string fname =
- TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
+ TableFileName(sub_compact->compaction->immutable_options()->cf_paths,
meta->fd.GetNumber(), meta->fd.GetPathId());
- env_->DeleteFile(fname);
+
+ // TODO(AR) it is not clear if there are any larger implications if
+ // DeleteFile fails here
+ Status ds = env_->DeleteFile(fname);
+ if (!ds.ok()) {
+ ROCKS_LOG_WARN(
+ db_options_.info_log,
+ "[%s] [JOB %d] Unable to remove SST file for table #%" PRIu64
+ " at bottom level%s",
+ cfd->GetName().c_str(), job_id_, output_number,
+ meta->marked_for_compaction ? " (need compaction)" : "");
+ }
// Also need to remove the file from outputs, or it will be added to the
// VersionEdit.
- assert(!sub_compact->outputs.empty());
- sub_compact->outputs.pop_back();
+ outputs.RemoveLastOutput();
meta = nullptr;
}
if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
// Output to event logger and fire events.
- sub_compact->current_output()->table_properties =
- std::make_shared<TableProperties>(tp);
+ outputs.UpdateTableProperties();
ROCKS_LOG_INFO(db_options_.info_log,
"[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
- " keys, %" PRIu64 " bytes%s",
+ " keys, %" PRIu64 " bytes%s, temperature: %s",
cfd->GetName().c_str(), job_id_, output_number,
- current_entries, current_bytes,
- meta->marked_for_compaction ? " (need compaction)" : "");
+ current_entries, meta->fd.file_size,
+ meta->marked_for_compaction ? " (need compaction)" : "",
+ temperature_to_string[meta->temperature].c_str());
}
std::string fname;
FileDescriptor output_fd;
uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
+ Status status_for_listener = s;
if (meta != nullptr) {
- fname =
- TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
- meta->fd.GetNumber(), meta->fd.GetPathId());
+ fname = GetTableFileName(meta->fd.GetNumber());
output_fd = meta->fd;
oldest_blob_file_number = meta->oldest_blob_file_number;
} else {
fname = "(nil)";
+ if (s.ok()) {
+ status_for_listener = Status::Aborted("Empty SST file not kept");
+ }
}
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
job_id_, output_fd, oldest_blob_file_number, tp,
- TableFileCreationReason::kCompaction, s, file_checksum,
+ TableFileCreationReason::kCompaction, status_for_listener, file_checksum,
file_checksum_func_name);
#ifndef ROCKSDB_LITE
// compaction output file (similarly to how flush works when full)?
s = Status::SpaceLimit("Max allowed space was reached");
TEST_SYNC_POINT(
- "CompactionJob::FinishCompactionOutputFile:"
- "MaxAllowedSpaceReached");
+ "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached");
InstrumentedMutexLock l(db_mutex_);
- // Should handle return error?
- db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction)
- .PermitUncheckedError();
+ db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
}
}
#endif
- sub_compact->builder.reset();
- sub_compact->current_output_file_size = 0;
+ outputs.ResetBuilder();
return s;
}
auto* compaction = compact_->compaction;
assert(compaction);
- // paranoia: verify that the files that we started with
- // still exist in the current version and in the same original level.
- // This ensures that a concurrent compaction did not erroneously
- // pick the same files to compact_.
- if (!versions_->VerifyCompactionFileConsistency(compaction)) {
- Compaction::InputLevelSummaryBuffer inputs_summary;
-
- ROCKS_LOG_ERROR(db_options_.info_log, "[%s] [JOB %d] Compaction %s aborted",
- compaction->column_family_data()->GetName().c_str(),
- job_id_, compaction->InputLevelSummary(&inputs_summary));
- return Status::Corruption("Compaction input files inconsistent");
- }
-
{
Compaction::InputLevelSummaryBuffer inputs_summary;
- ROCKS_LOG_INFO(db_options_.info_log,
- "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
- compaction->column_family_data()->GetName().c_str(), job_id_,
- compaction->InputLevelSummary(&inputs_summary),
- compact_->total_bytes + compact_->total_blob_bytes);
+ if (compaction_stats_.has_penultimate_level_output) {
+ ROCKS_LOG_BUFFER(
+ log_buffer_,
+ "[%s] [JOB %d] Compacted %s => output_to_penultimate_level: %" PRIu64
+ " bytes + last: %" PRIu64 " bytes. Total: %" PRIu64 " bytes",
+ compaction->column_family_data()->GetName().c_str(), job_id_,
+ compaction->InputLevelSummary(&inputs_summary),
+ compaction_stats_.penultimate_level_stats.bytes_written,
+ compaction_stats_.stats.bytes_written,
+ compaction_stats_.TotalBytesWritten());
+ } else {
+ ROCKS_LOG_BUFFER(log_buffer_,
+ "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
+ compaction->column_family_data()->GetName().c_str(),
+ job_id_, compaction->InputLevelSummary(&inputs_summary),
+ compaction_stats_.TotalBytesWritten());
+ }
}
VersionEdit* const edit = compaction->edit();
// Add compaction inputs
compaction->AddInputDeletions(edit);
+ std::unordered_map<uint64_t, BlobGarbageMeter::BlobStats> blob_total_garbage;
+
for (const auto& sub_compact : compact_->sub_compact_states) {
- for (const auto& out : sub_compact.outputs) {
- edit->AddFile(compaction->output_level(), out.meta);
- }
+ sub_compact.AddOutputsEdit(edit);
- for (const auto& blob : sub_compact.blob_file_additions) {
+ for (const auto& blob : sub_compact.Current().GetBlobFileAdditions()) {
edit->AddBlobFile(blob);
}
+
+ if (sub_compact.Current().GetBlobGarbageMeter()) {
+ const auto& flows = sub_compact.Current().GetBlobGarbageMeter()->flows();
+
+ for (const auto& pair : flows) {
+ const uint64_t blob_file_number = pair.first;
+ const BlobGarbageMeter::BlobInOutFlow& flow = pair.second;
+
+ assert(flow.IsValid());
+ if (flow.HasGarbage()) {
+ blob_total_garbage[blob_file_number].Add(flow.GetGarbageCount(),
+ flow.GetGarbageBytes());
+ }
+ }
+ }
+ }
+
+ for (const auto& pair : blob_total_garbage) {
+ const uint64_t blob_file_number = pair.first;
+ const BlobGarbageMeter::BlobStats& stats = pair.second;
+
+ edit->AddBlobFileGarbage(blob_file_number, stats.GetCount(),
+ stats.GetBytes());
+ }
+
+ if ((compaction->compaction_reason() ==
+ CompactionReason::kLevelMaxLevelSize ||
+ compaction->compaction_reason() == CompactionReason::kRoundRobinTtl) &&
+ compaction->immutable_options()->compaction_pri == kRoundRobin) {
+ int start_level = compaction->start_level();
+ if (start_level > 0) {
+ auto vstorage = compaction->input_version()->storage_info();
+ edit->AddCompactCursor(start_level,
+ vstorage->GetNextCompactCursor(
+ start_level, compaction->num_input_files(0)));
+ }
}
return versions_->LogAndApply(compaction->column_family_data(),
IOSTATS_RESET(bytes_written);
}
-Status CompactionJob::OpenCompactionOutputFile(
- SubcompactionState* sub_compact) {
+Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
+ CompactionOutputs& outputs) {
assert(sub_compact != nullptr);
- assert(sub_compact->builder == nullptr);
+
// no need to lock because VersionSet::next_file_number_ is atomic
uint64_t file_number = versions_->NewFileNumber();
- std::string fname =
- TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
- file_number, sub_compact->compaction->output_path_id());
+ std::string fname = GetTableFileName(file_number);
// Fire events.
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
#ifndef ROCKSDB_LITE
TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
&syncpoint_arg);
#endif
+
+ // Pass temperature of the last level files to FileSystem.
+ FileOptions fo_copy = file_options_;
+ Temperature temperature = sub_compact->compaction->output_temperature();
+ // only set for the last level compaction and also it's not output to
+ // penultimate level (when preclude_last_level feature is enabled)
+ if (temperature == Temperature::kUnknown &&
+ sub_compact->compaction->is_last_level() &&
+ !sub_compact->IsCurrentPenultimateLevel()) {
+ temperature =
+ sub_compact->compaction->mutable_cf_options()->last_level_temperature;
+ }
+ fo_copy.temperature = temperature;
+
Status s;
- IOStatus io_s =
- NewWritableFile(fs_.get(), fname, &writable_file, file_options_);
+ IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy);
s = io_s;
if (sub_compact->io_status.ok()) {
sub_compact->io_status = io_s;
// Try to figure out the output file's oldest ancester time.
int64_t temp_current_time = 0;
- auto get_time_status = env_->GetCurrentTime(&temp_current_time);
+ auto get_time_status = db_options_.clock->GetCurrentTime(&temp_current_time);
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
if (!get_time_status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
get_time_status.ToString().c_str());
}
uint64_t current_time = static_cast<uint64_t>(temp_current_time);
+ InternalKey tmp_start, tmp_end;
+ if (sub_compact->start.has_value()) {
+ tmp_start.SetMinPossibleForUserKey(sub_compact->start.value());
+ }
+ if (sub_compact->end.has_value()) {
+ tmp_end.SetMinPossibleForUserKey(sub_compact->end.value());
+ }
uint64_t oldest_ancester_time =
- sub_compact->compaction->MinInputFileOldestAncesterTime();
- if (oldest_ancester_time == port::kMaxUint64) {
+ sub_compact->compaction->MinInputFileOldestAncesterTime(
+ sub_compact->start.has_value() ? &tmp_start : nullptr,
+ sub_compact->end.has_value() ? &tmp_end : nullptr);
+ if (oldest_ancester_time == std::numeric_limits<uint64_t>::max()) {
oldest_ancester_time = current_time;
}
sub_compact->compaction->output_path_id(), 0);
meta.oldest_ancester_time = oldest_ancester_time;
meta.file_creation_time = current_time;
- sub_compact->outputs.emplace_back(
- std::move(meta), cfd->internal_comparator(),
- /*enable_order_check=*/
- sub_compact->compaction->mutable_cf_options()
- ->check_flush_compaction_key_order,
- /*enable_hash=*/paranoid_file_checks_);
+ meta.temperature = temperature;
+ assert(!db_id_.empty());
+ assert(!db_session_id_.empty());
+ s = GetSstInternalUniqueId(db_id_, db_session_id_, meta.fd.GetNumber(),
+ &meta.unique_id);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "[%s] [JOB %d] file #%" PRIu64
+ " failed to generate unique id: %s.",
+ cfd->GetName().c_str(), job_id_, meta.fd.GetNumber(),
+ s.ToString().c_str());
+ return s;
+ }
+
+ outputs.AddOutput(std::move(meta), cfd->internal_comparator(),
+ sub_compact->compaction->mutable_cf_options()
+ ->check_flush_compaction_key_order,
+ paranoid_file_checks_);
}
- writable_file->SetIOPriority(Env::IOPriority::IO_LOW);
+ writable_file->SetIOPriority(GetRateLimiterPriority());
writable_file->SetWriteLifeTimeHint(write_hint_);
+ FileTypeSet tmp_set = db_options_.checksum_handoff_file_types;
writable_file->SetPreallocationBlockSize(static_cast<size_t>(
sub_compact->compaction->OutputFilePreallocationSize()));
const auto& listeners =
- sub_compact->compaction->immutable_cf_options()->listeners;
- sub_compact->outfile.reset(new WritableFileWriter(
- std::move(writable_file), fname, file_options_, env_, io_tracer_,
- db_options_.statistics.get(), listeners,
- db_options_.file_checksum_gen_factory.get()));
-
- // If the Column family flag is to only optimize filters for hits,
- // we can skip creating filters if this is the bottommost_level where
- // data is going to be found
- bool skip_filters =
- cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
-
- sub_compact->builder.reset(NewTableBuilder(
+ sub_compact->compaction->immutable_options()->listeners;
+ outputs.AssignFileWriter(new WritableFileWriter(
+ std::move(writable_file), fname, fo_copy, db_options_.clock, io_tracer_,
+ db_options_.stats, listeners, db_options_.file_checksum_gen_factory.get(),
+ tmp_set.Contains(FileType::kTableFile), false));
+
+ TableBuilderOptions tboptions(
*cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
- cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
sub_compact->compaction->output_compression(),
- 0 /*sample_for_compression */,
- sub_compact->compaction->output_compression_opts(),
- sub_compact->compaction->output_level(), skip_filters,
- oldest_ancester_time, 0 /* oldest_key_time */,
- sub_compact->compaction->max_output_file_size(), current_time, db_id_,
- db_session_id_));
+ sub_compact->compaction->output_compression_opts(), cfd->GetID(),
+ cfd->GetName(), sub_compact->compaction->output_level(),
+ bottommost_level_, TableFileCreationReason::kCompaction,
+ 0 /* oldest_key_time */, current_time, db_id_, db_session_id_,
+ sub_compact->compaction->max_output_file_size(), file_number);
+
+ outputs.NewBuilder(tboptions);
+
LogFlush(db_options_.info_log);
return s;
}
void CompactionJob::CleanupCompaction() {
for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
- const auto& sub_status = sub_compact.status;
-
- if (sub_compact.builder != nullptr) {
- // May happen if we get a shutdown call in the middle of compaction
- sub_compact.builder->Abandon();
- sub_compact.builder.reset();
- } else {
- assert(!sub_status.ok() || sub_compact.outfile == nullptr);
- }
- for (const auto& out : sub_compact.outputs) {
- // If this file was inserted into the table cache then remove
- // them here because this compaction was not committed.
- if (!sub_status.ok()) {
- TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber());
- }
- }
- // TODO: sub_compact.io_status is not checked like status. Not sure if thats
- // intentional. So ignoring the io_status as of now.
- sub_compact.io_status.PermitUncheckedError();
+ sub_compact.Cleanup(table_cache_.get());
}
delete compact_;
compact_ = nullptr;
assert(compact_);
Compaction* compaction = compact_->compaction;
- compaction_stats_.num_input_files_in_non_output_levels = 0;
- compaction_stats_.num_input_files_in_output_level = 0;
+ compaction_stats_.stats.num_input_files_in_non_output_levels = 0;
+ compaction_stats_.stats.num_input_files_in_output_level = 0;
for (int input_level = 0;
input_level < static_cast<int>(compaction->num_input_levels());
++input_level) {
if (compaction->level(input_level) != compaction->output_level()) {
UpdateCompactionInputStatsHelper(
- &compaction_stats_.num_input_files_in_non_output_levels,
- &compaction_stats_.bytes_read_non_output_levels, input_level);
+ &compaction_stats_.stats.num_input_files_in_non_output_levels,
+ &compaction_stats_.stats.bytes_read_non_output_levels, input_level);
} else {
UpdateCompactionInputStatsHelper(
- &compaction_stats_.num_input_files_in_output_level,
- &compaction_stats_.bytes_read_output_level, input_level);
+ &compaction_stats_.stats.num_input_files_in_output_level,
+ &compaction_stats_.stats.bytes_read_output_level, input_level);
}
}
- compaction_stats_.num_output_files =
- static_cast<int>(compact_->num_output_files) +
- static_cast<int>(compact_->num_blob_output_files);
- compaction_stats_.bytes_written =
- compact_->total_bytes + compact_->total_blob_bytes;
+ assert(compaction_job_stats_);
+ compaction_stats_.stats.bytes_read_blob =
+ compaction_job_stats_->total_blob_bytes_read;
- if (compaction_stats_.num_input_records > compact_->num_output_records) {
- compaction_stats_.num_dropped_records =
- compaction_stats_.num_input_records - compact_->num_output_records;
- }
+ compaction_stats_.stats.num_dropped_records =
+ compaction_stats_.DroppedRecords();
}
void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
for (size_t i = 0; i < num_input_files; ++i) {
const auto* file_meta = compaction->input(input_level, i);
*bytes_read += file_meta->fd.GetFileSize();
- compaction_stats_.num_input_records +=
+ compaction_stats_.stats.num_input_records +=
static_cast<uint64_t>(file_meta->num_entries);
}
}
// output information
compaction_job_stats_->total_output_bytes = stats.bytes_written;
- compaction_job_stats_->num_output_records = compact_->num_output_records;
+ compaction_job_stats_->total_output_bytes_blob = stats.bytes_written_blob;
+ compaction_job_stats_->num_output_records = stats.num_output_records;
compaction_job_stats_->num_output_files = stats.num_output_files;
+ compaction_job_stats_->num_output_files_blob = stats.num_output_files_blob;
if (stats.num_output_files > 0) {
CopyPrefix(compact_->SmallestUserKey(),
compaction->InputLevelSummary(&inputs_summary), compaction->score());
char scratch[2345];
compaction->Summary(scratch, sizeof(scratch));
- ROCKS_LOG_INFO(db_options_.info_log, "[%s] Compaction start summary: %s\n",
+ ROCKS_LOG_INFO(db_options_.info_log, "[%s]: Compaction start summary: %s\n",
cfd->GetName().c_str(), scratch);
// build event logger report
auto stream = event_logger_->Log();
<< "compaction_reason"
<< GetCompactionReasonString(compaction->compaction_reason());
for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
- stream << ("files_L" + ToString(compaction->level(i)));
+ stream << ("files_L" + std::to_string(compaction->level(i)));
stream.StartArray();
for (auto f : *compaction->inputs(i)) {
stream << f->fd.GetNumber();
stream.EndArray();
}
stream << "score" << compaction->score() << "input_data_size"
- << compaction->CalculateTotalInputSize();
+ << compaction->CalculateTotalInputSize() << "oldest_snapshot_seqno"
+ << (existing_snapshots_.empty()
+ ? int64_t{-1} // Use -1 for "none"
+ : static_cast<int64_t>(existing_snapshots_[0]));
+ if (compaction->SupportsPerKeyPlacement()) {
+ stream << "preclude_last_level_min_seqno"
+ << preclude_last_level_min_seqno_;
+ stream << "penultimate_output_level" << compaction->GetPenultimateLevel();
+ stream << "penultimate_output_range"
+ << GetCompactionPenultimateOutputRangeTypeString(
+ compaction->GetPenultimateOutputRangeType());
+
+ if (compaction->GetPenultimateOutputRangeType() ==
+ Compaction::PenultimateOutputRangeType::kDisabled) {
+ ROCKS_LOG_WARN(
+ db_options_.info_log,
+ "[%s] [JOB %d] Penultimate level output is disabled, likely "
+ "because of the range conflict in the penultimate level",
+ cfd->GetName().c_str(), job_id_);
+ }
+ }
+ }
+}
+
+std::string CompactionJob::GetTableFileName(uint64_t file_number) {
+ return TableFileName(compact_->compaction->immutable_options()->cf_paths,
+ file_number, compact_->compaction->output_path_id());
+}
+
+Env::IOPriority CompactionJob::GetRateLimiterPriority() {
+ if (versions_ && versions_->GetColumnFamilySet() &&
+ versions_->GetColumnFamilySet()->write_controller()) {
+ WriteController* write_controller =
+ versions_->GetColumnFamilySet()->write_controller();
+ if (write_controller->NeedsDelay() || write_controller->IsStopped()) {
+ return Env::IO_USER;
+ }
}
+
+ return Env::IO_LOW;
}
} // namespace ROCKSDB_NAMESPACE