// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
#include <vector>
#include "db/builder.h"
+#include "db/db_impl.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
+#include "db/error_handler.h"
#include "db/event_helpers.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
+#include "db/range_del_aggregator.h"
#include "db/version_set.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/thread_status_util.h"
-#include "port/likely.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
namespace rocksdb {
+const char* GetCompactionReasonString(CompactionReason compaction_reason) {
+ switch (compaction_reason) {
+ case CompactionReason::kUnknown:
+ return "Unknown";
+ case CompactionReason::kLevelL0FilesNum:
+ return "LevelL0FilesNum";
+ case CompactionReason::kLevelMaxLevelSize:
+ return "LevelMaxLevelSize";
+ case CompactionReason::kUniversalSizeAmplification:
+ return "UniversalSizeAmplification";
+ case CompactionReason::kUniversalSizeRatio:
+ return "UniversalSizeRatio";
+ case CompactionReason::kUniversalSortedRunNum:
+ return "UniversalSortedRunNum";
+ case CompactionReason::kFIFOMaxSize:
+ return "FIFOMaxSize";
+ case CompactionReason::kFIFOReduceNumFiles:
+ return "FIFOReduceNumFiles";
+ case CompactionReason::kFIFOTtl:
+ return "FIFOTtl";
+ case CompactionReason::kManualCompaction:
+ return "ManualCompaction";
+ case CompactionReason::kFilesMarkedForCompaction:
+ return "FilesMarkedForCompaction";
+ case CompactionReason::kBottommostFiles:
+ return "BottommostFiles";
+ case CompactionReason::kTtl:
+ return "Ttl";
+ case CompactionReason::kFlush:
+ return "Flush";
+ case CompactionReason::kExternalSstIngestion:
+ return "ExternalSstIngestion";
+ case CompactionReason::kNumOfReasons:
+ // fall through
+ default:
+ assert(false);
+ return "Invalid";
+ }
+}
+
// Maintains state for each sub-compaction
struct CompactionJob::SubcompactionState {
const Compaction* compaction;
uint64_t overlapped_bytes = 0;
// A flag determine whether the key has been seen in ShouldStopBefore()
bool seen_key = false;
- std::string compression_dict;
SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
uint64_t size = 0)
approx_size(size),
grandparent_index(0),
overlapped_bytes(0),
- seen_key(false),
- compression_dict() {
+ seen_key(false) {
assert(compaction != nullptr);
}
grandparent_index = std::move(o.grandparent_index);
overlapped_bytes = std::move(o.overlapped_bytes);
seen_key = std::move(o.seen_key);
- compression_dict = std::move(o.compression_dict);
return *this;
}
- // Because member unique_ptrs do not have these.
+ // Because member std::unique_ptrs do not have these.
SubcompactionState(const SubcompactionState&) = delete;
SubcompactionState& operator=(const SubcompactionState&) = delete;
CompactionJob::CompactionJob(
int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
- const EnvOptions& env_options, VersionSet* versions,
- const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
+ const EnvOptions env_options, VersionSet* versions,
+ const std::atomic<bool>* shutting_down,
+ const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
Directory* db_directory, Directory* output_directory, Statistics* stats,
- InstrumentedMutex* db_mutex, Status* db_bg_error,
+ InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
- std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
- bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
- CompactionJobStats* compaction_job_stats)
+ const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
+ EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
+ const std::string& dbname, CompactionJobStats* compaction_job_stats,
+ Env::Priority thread_pri)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats),
- compaction_stats_(1),
+ compaction_stats_(compaction->compaction_reason(), 1),
dbname_(dbname),
db_options_(db_options),
env_options_(env_options),
env_(db_options.env),
+ env_optiosn_for_read_(
+ env_->OptimizeForCompactionTableRead(env_options, db_options_)),
versions_(versions),
shutting_down_(shutting_down),
+ preserve_deletes_seqnum_(preserve_deletes_seqnum),
log_buffer_(log_buffer),
db_directory_(db_directory),
output_directory_(output_directory),
stats_(stats),
db_mutex_(db_mutex),
- db_bg_error_(db_bg_error),
+ db_error_handler_(db_error_handler),
existing_snapshots_(std::move(existing_snapshots)),
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
+ snapshot_checker_(snapshot_checker),
table_cache_(std::move(table_cache)),
event_logger_(event_logger),
+ bottommost_level_(false),
paranoid_file_checks_(paranoid_file_checks),
- measure_io_stats_(measure_io_stats) {
+ measure_io_stats_(measure_io_stats),
+ write_hint_(Env::WLTH_NOT_SET),
+ thread_pri_(thread_pri) {
assert(log_buffer_ != nullptr);
const auto* cfd = compact_->compaction->column_family_data();
ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
ThreadStatusUtil::ResetThreadStatus();
}
-void CompactionJob::ReportStartedCompaction(
- Compaction* compaction) {
+void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
const auto* cfd = compact_->compaction->column_family_data();
ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
db_options_.enable_thread_tracking);
- ThreadStatusUtil::SetThreadOperationProperty(
- ThreadStatus::COMPACTION_JOB_ID,
- job_id_);
+ ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
+ job_id_);
ThreadStatusUtil::SetThreadOperationProperty(
ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
// Set the thread operation after operation properties
// to ensure GetThreadList() can always show them all together.
- ThreadStatusUtil::SetThreadOperation(
- ThreadStatus::OP_COMPACTION);
+ ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
if (compaction_job_stats_) {
compaction_job_stats_->is_manual_compaction =
// Generate file_levels_ for compaction berfore making Iterator
auto* c = compact_->compaction;
assert(c->column_family_data() != nullptr);
- assert(c->column_family_data()->current()->storage_info()
- ->NumLevelFiles(compact_->compaction->level()) > 0);
+ assert(c->column_family_data()->current()->storage_info()->NumLevelFiles(
+ compact_->compaction->level()) > 0);
+ write_hint_ =
+ c->column_family_data()->CalculateSSTWriteHint(c->output_level());
// Is this compaction producing files at the bottommost level?
bottommost_level_ = c->bottommost_level();
if (c->ShouldFormSubcompactions()) {
- const uint64_t start_micros = env_->NowMicros();
- GenSubcompactionBoundaries();
- MeasureTime(stats_, SUBCOMPACTION_SETUP_TIME,
- env_->NowMicros() - start_micros);
-
+ {
+ StopWatch sw(env_, stats_, SUBCOMPACTION_SETUP_TIME);
+ GenSubcompactionBoundaries();
+ }
assert(sizes_.size() == boundaries_.size() + 1);
for (size_t i = 0; i <= boundaries_.size(); i++) {
Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
}
- MeasureTime(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
- compact_->sub_compact_states.size());
+ RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
+ compact_->sub_compact_states.size());
} else {
compact_->sub_compact_states.emplace_back(c, nullptr, nullptr);
}
}
std::sort(bounds.begin(), bounds.end(),
- [cfd_comparator] (const Slice& a, const Slice& b) -> bool {
- return cfd_comparator->Compare(ExtractUserKey(a), ExtractUserKey(b)) < 0;
- });
+ [cfd_comparator](const Slice& a, const Slice& b) -> bool {
+ return cfd_comparator->Compare(ExtractUserKey(a),
+ ExtractUserKey(b)) < 0;
+ });
// Remove duplicated entries from bounds
- bounds.erase(std::unique(bounds.begin(), bounds.end(),
- [cfd_comparator] (const Slice& a, const Slice& b) -> bool {
- return cfd_comparator->Compare(ExtractUserKey(a), ExtractUserKey(b)) == 0;
- }), bounds.end());
+ bounds.erase(
+ std::unique(bounds.begin(), bounds.end(),
+ [cfd_comparator](const Slice& a, const Slice& b) -> bool {
+ return cfd_comparator->Compare(ExtractUserKey(a),
+ ExtractUserKey(b)) == 0;
+ }),
+ bounds.end());
// Combine consecutive pairs of boundaries into ranges with an approximate
// size of data covered by keys in that range
uint64_t sum = 0;
std::vector<RangeWithSize> ranges;
- auto* v = cfd->current();
+ // Get input version from CompactionState since it's already referenced
+ // earlier in SetInputVersioCompaction::SetInputVersion and will not change
+ // when db_mutex_ is released below
+ auto* v = compact_->compaction->input_version();
for (auto it = bounds.begin();;) {
const Slice a = *it;
it++;
}
const Slice b = *it;
+
+ // ApproximateSize could potentially create table reader iterator to seek
+ // to the index block and may incur I/O cost in the process. Unlock db
+ // mutex to reduce contention
+ db_mutex_->Unlock();
uint64_t size = versions_->ApproximateSize(v, a, b, start_lvl, out_lvl + 1);
+ db_mutex_->Lock();
ranges.emplace_back(a, b, size);
sum += size;
}
// Group the ranges into subcompactions
const double min_file_fill_percent = 4.0 / 5;
- uint64_t max_output_files = static_cast<uint64_t>(
- std::ceil(sum / min_file_fill_percent /
- c->mutable_cf_options()->MaxFileSizeForLevel(out_lvl)));
+ int base_level = v->storage_info()->base_level();
+ uint64_t max_output_files = static_cast<uint64_t>(std::ceil(
+ sum / min_file_fill_percent /
+ MaxFileSizeForLevel(*(c->mutable_cf_options()), out_lvl,
+ c->immutable_cf_options()->compaction_style, base_level,
+ c->immutable_cf_options()->level_compaction_dynamic_level_bytes)));
uint64_t subcompactions =
std::min({static_cast<uint64_t>(ranges.size()),
- static_cast<uint64_t>(db_options_.max_subcompactions),
+ static_cast<uint64_t>(c->max_subcompactions()),
max_output_files});
if (subcompactions > 1) {
thread.join();
}
- if (output_directory_) {
- output_directory_->Fsync();
+ compaction_stats_.micros = env_->NowMicros() - start_micros;
+ compaction_stats_.cpu_micros = 0;
+ for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
+ compaction_stats_.cpu_micros +=
+ compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
}
- compaction_stats_.micros = env_->NowMicros() - start_micros;
- MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);
+ RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
+ RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
+ compaction_stats_.cpu_micros);
+
+ TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
// Check if any thread encountered an error during execution
Status status;
}
}
+ if (status.ok() && output_directory_) {
+ status = output_directory_->Fsync();
+ }
+
+ if (status.ok()) {
+ thread_pool.clear();
+ std::vector<const FileMetaData*> files_meta;
+ for (const auto& state : compact_->sub_compact_states) {
+ for (const auto& output : state.outputs) {
+ files_meta.emplace_back(&output.meta);
+ }
+ }
+ ColumnFamilyData* cfd = compact_->compaction->column_family_data();
+ auto prefix_extractor =
+ compact_->compaction->mutable_cf_options()->prefix_extractor.get();
+ std::atomic<size_t> next_file_meta_idx(0);
+ auto verify_table = [&](Status& output_status) {
+ while (true) {
+ size_t file_idx = next_file_meta_idx.fetch_add(1);
+ if (file_idx >= files_meta.size()) {
+ break;
+ }
+ // Verify that the table is usable
+ // We set for_compaction to false and don't OptimizeForCompactionTableRead
+ // here because this is a special case after we finish the table building
+ // No matter whether use_direct_io_for_flush_and_compaction is true,
+ // we will regard this verification as user reads since the goal is
+ // to cache it here for further user reads
+ InternalIterator* iter = cfd->table_cache()->NewIterator(
+ ReadOptions(), env_options_, cfd->internal_comparator(),
+ *files_meta[file_idx], nullptr /* range_del_agg */,
+ prefix_extractor, nullptr,
+ cfd->internal_stats()->GetFileReadHist(
+ compact_->compaction->output_level()),
+ false, nullptr /* arena */, false /* skip_filters */,
+ compact_->compaction->output_level());
+ auto s = iter->status();
+
+ if (s.ok() && paranoid_file_checks_) {
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
+ s = iter->status();
+ }
+
+ delete iter;
+
+ if (!s.ok()) {
+ output_status = s;
+ break;
+ }
+ }
+ };
+ for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
+ thread_pool.emplace_back(verify_table,
+ std::ref(compact_->sub_compact_states[i].status));
+ }
+ verify_table(compact_->sub_compact_states[0].status);
+ for (auto& thread : thread_pool) {
+ thread.join();
+ }
+ for (const auto& state : compact_->sub_compact_states) {
+ if (!state.status.ok()) {
+ status = state.status;
+ break;
+ }
+ }
+ }
+
TablePropertiesCollection tp;
for (const auto& state : compact_->sub_compact_states) {
for (const auto& output : state.outputs) {
- auto fn = TableFileName(db_options_.db_paths, output.meta.fd.GetNumber(),
- output.meta.fd.GetPathId());
+ auto fn =
+ TableFileName(state.compaction->immutable_cf_options()->cf_paths,
+ output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
tp[fn] = output.table_properties;
}
}
Status status = compact_->status;
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
cfd->internal_stats()->AddCompactionStats(
- compact_->compaction->output_level(), compaction_stats_);
+ compact_->compaction->output_level(), thread_pri_, compaction_stats_);
if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options);
double read_write_amp = 0.0;
double write_amp = 0.0;
+ double bytes_read_per_sec = 0;
+ double bytes_written_per_sec = 0;
+
if (stats.bytes_read_non_output_levels > 0) {
read_write_amp = (stats.bytes_written + stats.bytes_read_output_level +
stats.bytes_read_non_output_levels) /
write_amp = stats.bytes_written /
static_cast<double>(stats.bytes_read_non_output_levels);
}
+ if (stats.micros > 0) {
+ bytes_read_per_sec =
+ (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
+ static_cast<double>(stats.micros);
+ bytes_written_per_sec =
+ stats.bytes_written / static_cast<double>(stats.micros);
+ }
+
ROCKS_LOG_BUFFER(
log_buffer_,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
"files in(%d, %d) out(%d) "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
- "write-amplify(%.1f) %s, records in: %d, records dropped: %d\n",
- cfd->GetName().c_str(), vstorage->LevelSummary(&tmp),
- (stats.bytes_read_non_output_levels + stats.bytes_read_output_level) /
- static_cast<double>(stats.micros),
- stats.bytes_written / static_cast<double>(stats.micros),
- compact_->compaction->output_level(),
+ "write-amplify(%.1f) %s, records in: %" PRIu64
+ ", records dropped: %" PRIu64 " output_compression: %s\n",
+ cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), bytes_read_per_sec,
+ bytes_written_per_sec, compact_->compaction->output_level(),
stats.num_input_files_in_non_output_levels,
stats.num_input_files_in_output_level, stats.num_output_files,
stats.bytes_read_non_output_levels / 1048576.0,
stats.bytes_read_output_level / 1048576.0,
stats.bytes_written / 1048576.0, read_write_amp, write_amp,
status.ToString().c_str(), stats.num_input_records,
- stats.num_dropped_records);
+ stats.num_dropped_records,
+ CompressionTypeToString(compact_->compaction->output_compression())
+ .c_str());
UpdateCompactionJobStats(stats);
auto stream = event_logger_->LogToBuffer(log_buffer_);
- stream << "job" << job_id_
- << "event" << "compaction_finished"
+ stream << "job" << job_id_ << "event"
+ << "compaction_finished"
<< "compaction_time_micros" << compaction_stats_.micros
+ << "compaction_time_cpu_micros" << compaction_stats_.cpu_micros
<< "output_level" << compact_->compaction->output_level()
<< "num_output_files" << compact_->NumOutputFiles()
- << "total_output_size" << compact_->total_bytes
- << "num_input_records" << compact_->num_input_records
- << "num_output_records" << compact_->num_output_records
- << "num_subcompactions" << compact_->sub_compact_states.size();
+ << "total_output_size" << compact_->total_bytes << "num_input_records"
+ << compact_->num_input_records << "num_output_records"
+ << compact_->num_output_records << "num_subcompactions"
+ << compact_->sub_compact_states.size() << "output_compression"
+ << CompressionTypeToString(compact_->compaction->output_compression());
if (compaction_job_stats_ != nullptr) {
stream << "num_single_delete_mismatches"
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact != nullptr);
+
+ uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
+
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
- std::unique_ptr<RangeDelAggregator> range_del_agg(
- new RangeDelAggregator(cfd->internal_comparator(), existing_snapshots_));
+
+ // Create compaction filter and fail the compaction if
+ // IgnoreSnapshots() = false because it is not supported anymore
+ const CompactionFilter* compaction_filter =
+ cfd->ioptions()->compaction_filter;
+ std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
+ if (compaction_filter == nullptr) {
+ compaction_filter_from_factory =
+ sub_compact->compaction->CreateCompactionFilter();
+ compaction_filter = compaction_filter_from_factory.get();
+ }
+ if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
+ sub_compact->status = Status::NotSupported(
+ "CompactionFilter::IgnoreSnapshots() = false is not supported "
+ "anymore.");
+ return;
+ }
+
+ CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
+ existing_snapshots_);
+
+ // Although the v2 aggregator is what the level iterator(s) know about,
+ // the AddTombstones calls will be propagated down to the v1 aggregator.
std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
- sub_compact->compaction, range_del_agg.get()));
+ sub_compact->compaction, &range_del_agg, env_optiosn_for_read_));
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
uint64_t prev_fsync_nanos = 0;
uint64_t prev_range_sync_nanos = 0;
uint64_t prev_prepare_write_nanos = 0;
+ uint64_t prev_cpu_write_nanos = 0;
+ uint64_t prev_cpu_read_nanos = 0;
if (measure_io_stats_) {
prev_perf_level = GetPerfLevel();
- SetPerfLevel(PerfLevel::kEnableTime);
+ SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
prev_write_nanos = IOSTATS(write_nanos);
prev_fsync_nanos = IOSTATS(fsync_nanos);
prev_range_sync_nanos = IOSTATS(range_sync_nanos);
prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
+ prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
+ prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
}
- const MutableCFOptions* mutable_cf_options =
- sub_compact->compaction->mutable_cf_options();
-
- // To build compression dictionary, we sample the first output file, assuming
- // it'll reach the maximum length, and then use the dictionary for compressing
- // subsequent output files. The dictionary may be less than max_dict_bytes if
- // the first output file's length is less than the maximum.
- const int kSampleLenShift = 6; // 2^6 = 64-byte samples
- std::set<size_t> sample_begin_offsets;
- if (bottommost_level_ &&
- cfd->ioptions()->compression_opts.max_dict_bytes > 0) {
- const size_t kMaxSamples =
- cfd->ioptions()->compression_opts.max_dict_bytes >> kSampleLenShift;
- const size_t kOutFileLen = mutable_cf_options->MaxFileSizeForLevel(
- compact_->compaction->output_level());
- if (kOutFileLen != port::kMaxSizet) {
- const size_t kOutFileNumSamples = kOutFileLen >> kSampleLenShift;
- Random64 generator{versions_->NewFileNumber()};
- for (size_t i = 0; i < kMaxSamples; ++i) {
- sample_begin_offsets.insert(generator.Uniform(kOutFileNumSamples)
- << kSampleLenShift);
- }
- }
- }
-
- auto compaction_filter = cfd->ioptions()->compaction_filter;
- std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
- if (compaction_filter == nullptr) {
- compaction_filter_from_factory =
- sub_compact->compaction->CreateCompactionFilter();
- compaction_filter = compaction_filter_from_factory.get();
- }
MergeHelper merge(
env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
compaction_filter, db_options_.info_log.get(),
false /* internal key corruption is expected */,
existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
- compact_->compaction->level(), db_options_.statistics.get(),
- shutting_down_);
+ snapshot_checker_, compact_->compaction->level(),
+ db_options_.statistics.get(), shutting_down_);
TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
input->SeekToFirst();
}
- // we allow only 1 compaction event listener. Used by blob storage
- CompactionEventListener* comp_event_listener = nullptr;
-#ifndef ROCKSDB_LITE
- for (auto& celitr : cfd->ioptions()->listeners) {
- comp_event_listener = celitr->GetCompactionEventListener();
- if (comp_event_listener != nullptr) {
- break;
- }
- }
-#endif // ROCKSDB_LITE
-
Status status;
sub_compact->c_iter.reset(new CompactionIterator(
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
- &existing_snapshots_, earliest_write_conflict_snapshot_, env_, false,
- range_del_agg.get(), sub_compact->compaction, compaction_filter,
- comp_event_listener, shutting_down_));
+ &existing_snapshots_, earliest_write_conflict_snapshot_,
+ snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
+ &range_del_agg, sub_compact->compaction, compaction_filter,
+ shutting_down_, preserve_deletes_seqnum_));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
- if (c_iter->Valid() &&
- sub_compact->compaction->output_level() != 0) {
+ if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
// ShouldStopBefore() maintains state based on keys processed so far. The
// compaction loop always calls it on the "next" key, thus won't tell it the
// first key. So we do that here.
- sub_compact->ShouldStopBefore(
- c_iter->key(), sub_compact->current_output_file_size);
+ sub_compact->ShouldStopBefore(c_iter->key(),
+ sub_compact->current_output_file_size);
}
const auto& c_iter_stats = c_iter->iter_stats();
- auto sample_begin_offset_iter = sample_begin_offsets.cbegin();
- // data_begin_offset and compression_dict are only valid while generating
- // dictionary from the first output file.
- size_t data_begin_offset = 0;
- std::string compression_dict;
- compression_dict.reserve(cfd->ioptions()->compression_opts.max_dict_bytes);
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
key, c_iter->ikey().sequence);
sub_compact->num_output_records++;
- if (sub_compact->outputs.size() == 1) { // first output file
- // Check if this key/value overlaps any sample intervals; if so, appends
- // overlapping portions to the dictionary.
- for (const auto& data_elmt : {key, value}) {
- size_t data_end_offset = data_begin_offset + data_elmt.size();
- while (sample_begin_offset_iter != sample_begin_offsets.cend() &&
- *sample_begin_offset_iter < data_end_offset) {
- size_t sample_end_offset =
- *sample_begin_offset_iter + (1 << kSampleLenShift);
- // Invariant: Because we advance sample iterator while processing the
- // data_elmt containing the sample's last byte, the current sample
- // cannot end before the current data_elmt.
- assert(data_begin_offset < sample_end_offset);
-
- size_t data_elmt_copy_offset, data_elmt_copy_len;
- if (*sample_begin_offset_iter <= data_begin_offset) {
- // The sample starts before data_elmt starts, so take bytes starting
- // at the beginning of data_elmt.
- data_elmt_copy_offset = 0;
- } else {
- // data_elmt starts before the sample starts, so take bytes starting
- // at the below offset into data_elmt.
- data_elmt_copy_offset =
- *sample_begin_offset_iter - data_begin_offset;
- }
- if (sample_end_offset <= data_end_offset) {
- // The sample ends before data_elmt ends, so take as many bytes as
- // needed.
- data_elmt_copy_len =
- sample_end_offset - (data_begin_offset + data_elmt_copy_offset);
- } else {
- // data_elmt ends before the sample ends, so take all remaining
- // bytes in data_elmt.
- data_elmt_copy_len =
- data_end_offset - (data_begin_offset + data_elmt_copy_offset);
- }
- compression_dict.append(&data_elmt.data()[data_elmt_copy_offset],
- data_elmt_copy_len);
- if (sample_end_offset > data_end_offset) {
- // Didn't finish sample. Try to finish it with the next data_elmt.
- break;
- }
- // Next sample may require bytes from same data_elmt.
- sample_begin_offset_iter++;
- }
- data_begin_offset = data_end_offset;
- }
- }
-
// Close output file if it is big enough. Two possibilities determine it's
// time to close it: (1) the current key should be this file's last key, (2)
// the next key should not be in this file.
c_iter->Next();
if (!output_file_ended && c_iter->Valid() &&
sub_compact->compaction->output_level() != 0 &&
- sub_compact->ShouldStopBefore(
- c_iter->key(), sub_compact->current_output_file_size) &&
+ sub_compact->ShouldStopBefore(c_iter->key(),
+ sub_compact->current_output_file_size) &&
sub_compact->builder != nullptr) {
// (2) this key belongs to the next file. For historical reasons, the
// iterator status after advancing will be given to
next_key = &c_iter->key();
}
CompactionIterationStats range_del_out_stats;
- status = FinishCompactionOutputFile(input_status, sub_compact,
- range_del_agg.get(),
- &range_del_out_stats, next_key);
+ status =
+ FinishCompactionOutputFile(input_status, sub_compact, &range_del_agg,
+ &range_del_out_stats, next_key);
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
- if (sub_compact->outputs.size() == 1) {
- // Use dictionary from first output file for compression of subsequent
- // files.
- sub_compact->compression_dict = std::move(compression_dict);
- }
}
}
RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
RecordCompactionIOStats();
- if (status.ok() && (shutting_down_->load(std::memory_order_relaxed) ||
- cfd->IsDropped())) {
+ if (status.ok() &&
+ (shutting_down_->load(std::memory_order_relaxed) || cfd->IsDropped())) {
status = Status::ShutdownInProgress(
"Database shutdown or Column family drop during compaction");
}
}
if (status.ok() && sub_compact->builder == nullptr &&
- sub_compact->outputs.size() == 0 &&
- range_del_agg->ShouldAddTombstones(bottommost_level_)) {
+ sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
// handle subcompaction containing only range deletions
status = OpenCompactionOutputFile(sub_compact);
}
// close the output file.
if (sub_compact->builder != nullptr) {
CompactionIterationStats range_del_out_stats;
- Status s = FinishCompactionOutputFile(
- status, sub_compact, range_del_agg.get(), &range_del_out_stats);
+ Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
+ &range_del_out_stats);
if (status.ok()) {
status = s;
}
RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
}
+ sub_compact->compaction_job_stats.cpu_micros =
+ env_->NowCPUNanos() / 1000 - prev_cpu_micros;
+
if (measure_io_stats_) {
sub_compact->compaction_job_stats.file_write_nanos +=
IOSTATS(write_nanos) - prev_write_nanos;
IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
sub_compact->compaction_job_stats.file_prepare_write_nanos +=
IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
- if (prev_perf_level != PerfLevel::kEnableTime) {
+ sub_compact->compaction_job_stats.cpu_micros -=
+ (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
+ IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
+ 1000;
+ if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
SetPerfLevel(prev_perf_level);
}
}
RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE,
c_iter_stats.num_range_del_drop_obsolete);
}
+ if (c_iter_stats.num_optimized_del_drop_obsolete > 0) {
+ RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
+ c_iter_stats.num_optimized_del_drop_obsolete);
+ }
}
Status CompactionJob::FinishCompactionOutputFile(
const Status& input_status, SubcompactionState* sub_compact,
- RangeDelAggregator* range_del_agg,
+ CompactionRangeDelAggregator* range_del_agg,
CompactionIterationStats* range_del_out_stats,
const Slice* next_table_min_key /* = nullptr */) {
AutoThreadOperationStageUpdater stage_updater(
uint64_t output_number = sub_compact->current_output()->meta.fd.GetNumber();
assert(output_number != 0);
- TableProperties table_properties;
+ ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
+ const Comparator* ucmp = cfd->user_comparator();
+
// Check for iterator errors
Status s = input_status;
auto meta = &sub_compact->current_output()->meta;
+ assert(meta != nullptr);
if (s.ok()) {
Slice lower_bound_guard, upper_bound_guard;
+ std::string smallest_user_key;
const Slice *lower_bound, *upper_bound;
+ bool lower_bound_from_sub_compact = false;
if (sub_compact->outputs.size() == 1) {
// For the first output table, include range tombstones before the min key
// but after the subcompaction boundary.
lower_bound = sub_compact->start;
+ lower_bound_from_sub_compact = true;
} else if (meta->smallest.size() > 0) {
// For subsequent output tables, only include range tombstones from min
// key onwards since the previous file was extended to contain range
// tombstones falling before min key.
- lower_bound_guard = meta->smallest.user_key();
+ smallest_user_key = meta->smallest.user_key().ToString(false /*hex*/);
+ lower_bound_guard = Slice(smallest_user_key);
lower_bound = &lower_bound_guard;
} else {
lower_bound = nullptr;
}
if (next_table_min_key != nullptr) {
- // This isn't the last file in the subcompaction, so extend until the next
- // file starts.
+ // This may be the last file in the subcompaction in some cases, so we
+ // need to compare the end key of subcompaction with the next file start
+ // key. When the end key is chosen by the subcompaction, we know that
+ // it must be the biggest key in output file. Therefore, it is safe to
+ // use the smaller key as the upper bound of the output file, to ensure
+ // that there is no overlapping between different output files.
upper_bound_guard = ExtractUserKey(*next_table_min_key);
- upper_bound = &upper_bound_guard;
+ if (sub_compact->end != nullptr &&
+ ucmp->Compare(upper_bound_guard, *sub_compact->end) >= 0) {
+ upper_bound = sub_compact->end;
+ } else {
+ upper_bound = &upper_bound_guard;
+ }
} else {
// This is the last file in the subcompaction, so extend until the
// subcompaction ends.
upper_bound = sub_compact->end;
}
- range_del_agg->AddToBuilder(sub_compact->builder.get(), lower_bound,
- upper_bound, meta, range_del_out_stats,
- bottommost_level_);
+ auto earliest_snapshot = kMaxSequenceNumber;
+ if (existing_snapshots_.size() > 0) {
+ earliest_snapshot = existing_snapshots_[0];
+ }
+ bool has_overlapping_endpoints;
+ if (upper_bound != nullptr && meta->largest.size() > 0) {
+ has_overlapping_endpoints =
+ ucmp->Compare(meta->largest.user_key(), *upper_bound) == 0;
+ } else {
+ has_overlapping_endpoints = false;
+ }
+
+ // The end key of the subcompaction must be bigger or equal to the upper
+ // bound. If the end of subcompaction is null or the upper bound is null,
+ // it means that this file is the last file in the compaction. So there
+ // will be no overlapping between this file and others.
+ assert(sub_compact->end == nullptr ||
+ upper_bound == nullptr ||
+ ucmp->Compare(*upper_bound , *sub_compact->end) <= 0);
+ auto it = range_del_agg->NewIterator(lower_bound, upper_bound,
+ has_overlapping_endpoints);
+ // Position the range tombstone output iterator. There may be tombstone
+ // fragments that are entirely out of range, so make sure that we do not
+ // include those.
+ if (lower_bound != nullptr) {
+ it->Seek(*lower_bound);
+ } else {
+ it->SeekToFirst();
+ }
+ for (; it->Valid(); it->Next()) {
+ auto tombstone = it->Tombstone();
+ if (upper_bound != nullptr) {
+ int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
+ if ((has_overlapping_endpoints && cmp < 0) ||
+ (!has_overlapping_endpoints && cmp <= 0)) {
+ // Tombstones starting after upper_bound only need to be included in
+ // the next table. If the current SST ends before upper_bound, i.e.,
+ // `has_overlapping_endpoints == false`, we can also skip over range
+ // tombstones that start exactly at upper_bound. Such range tombstones
+ // will be included in the next file and are not relevant to the point
+ // keys or endpoints of the current file.
+ break;
+ }
+ }
+
+ if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
+ // TODO(andrewkr): tombstones that span multiple output files are
+ // counted for each compaction output file, so lots of double counting.
+ range_del_out_stats->num_range_del_drop_obsolete++;
+ range_del_out_stats->num_record_drop_obsolete++;
+ continue;
+ }
+
+ auto kv = tombstone.Serialize();
+ assert(lower_bound == nullptr ||
+ ucmp->Compare(*lower_bound, kv.second) < 0);
+ sub_compact->builder->Add(kv.first.Encode(), kv.second);
+ InternalKey smallest_candidate = std::move(kv.first);
+ if (lower_bound != nullptr &&
+ ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
+ // Pretend the smallest key has the same user key as lower_bound
+ // (the max key in the previous table or subcompaction) in order for
+ // files to appear key-space partitioned.
+ //
+ // When lower_bound is chosen by a subcompaction, we know that
+ // subcompactions over smaller keys cannot contain any keys at
+ // lower_bound. We also know that smaller subcompactions exist, because
+ // otherwise the subcompaction woud be unbounded on the left. As a
+ // result, we know that no other files on the output level will contain
+ // actual keys at lower_bound (an output file may have a largest key of
+ // lower_bound@kMaxSequenceNumber, but this only indicates a large range
+ // tombstone was truncated). Therefore, it is safe to use the
+ // tombstone's sequence number, to ensure that keys at lower_bound at
+ // lower levels are covered by truncated tombstones.
+ //
+ // If lower_bound was chosen by the smallest data key in the file,
+ // choose lowest seqnum so this file's smallest internal key comes after
+ // the previous file's largest. The fake seqnum is OK because the read
+ // path's file-picking code only considers user key.
+ smallest_candidate = InternalKey(
+ *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
+ kTypeRangeDeletion);
+ }
+ InternalKey largest_candidate = tombstone.SerializeEndKey();
+ if (upper_bound != nullptr &&
+ ucmp->Compare(*upper_bound, largest_candidate.user_key()) <= 0) {
+ // Pretend the largest key has the same user key as upper_bound (the
+ // min key in the following table or subcompaction) in order for files
+ // to appear key-space partitioned.
+ //
+ // Choose highest seqnum so this file's largest internal key comes
+ // before the next file's/subcompaction's smallest. The fake seqnum is
+ // OK because the read path's file-picking code only considers the user
+ // key portion.
+ //
+ // Note Seek() also creates InternalKey with (user_key,
+ // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of
+ // kTypeRangeDeletion (0xF), so the range tombstone comes before the
+ // Seek() key in InternalKey's ordering. So Seek() will look in the
+ // next file for the user key.
+ largest_candidate =
+ InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
+ }
+#ifndef NDEBUG
+ SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;
+ if (meta->smallest.size() > 0) {
+ smallest_ikey_seqnum = GetInternalKeySeqno(meta->smallest.Encode());
+ }
+#endif
+ meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
+ tombstone.seq_,
+ cfd->internal_comparator());
+
+ // The smallest key in a file is used for range tombstone truncation, so
+ // it cannot have a seqnum of 0 (unless the smallest data key in a file
+ // has a seqnum of 0). Otherwise, the truncated tombstone may expose
+ // deleted keys at lower levels.
+ assert(smallest_ikey_seqnum == 0 ||
+ ExtractInternalKeyFooter(meta->smallest.Encode()) !=
+ PackSequenceAndType(0, kTypeRangeDeletion));
+ }
+ meta->marked_for_compaction = sub_compact->builder->NeedCompact();
}
const uint64_t current_entries = sub_compact->builder->NumEntries();
- meta->marked_for_compaction = sub_compact->builder->NeedCompact();
if (s.ok()) {
s = sub_compact->builder->Finish();
} else {
sub_compact->builder->Abandon();
}
const uint64_t current_bytes = sub_compact->builder->FileSize();
- meta->fd.file_size = current_bytes;
+ if (s.ok()) {
+ meta->fd.file_size = current_bytes;
+ }
sub_compact->current_output()->finished = true;
sub_compact->total_bytes += current_bytes;
}
sub_compact->outfile.reset();
- ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
TableProperties tp;
- if (s.ok() && current_entries > 0) {
- // Verify that the table is usable
- // We set for_compaction to false and don't OptimizeForCompactionTableRead
- // here because this is a special case after we finish the table building
- // No matter whether use_direct_io_for_flush_and_compaction is true,
- // we will regrad this verification as user reads since the goal is
- // to cache it here for further user reads
- InternalIterator* iter = cfd->table_cache()->NewIterator(
- ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd,
- nullptr /* range_del_agg */, nullptr,
- cfd->internal_stats()->GetFileReadHist(
- compact_->compaction->output_level()),
- false);
- s = iter->status();
-
- if (s.ok() && paranoid_file_checks_) {
- for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
- s = iter->status();
- }
+ if (s.ok()) {
+ tp = sub_compact->builder->GetTableProperties();
+ }
- delete iter;
+ if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
+ // If there is nothing to output, no necessary to generate a sst file.
+ // This happens when the output level is bottom level, at the same time
+ // the sub_compact output nothing.
+ std::string fname =
+ TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
+ meta->fd.GetNumber(), meta->fd.GetPathId());
+ env_->DeleteFile(fname);
+
+ // Also need to remove the file from outputs, or it will be added to the
+ // VersionEdit.
+ assert(!sub_compact->outputs.empty());
+ sub_compact->outputs.pop_back();
+ meta = nullptr;
+ }
+ if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
// Output to event logger and fire events.
- if (s.ok()) {
- tp = sub_compact->builder->GetTableProperties();
- sub_compact->current_output()->table_properties =
- std::make_shared<TableProperties>(tp);
- ROCKS_LOG_INFO(db_options_.info_log,
- "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
- " keys, %" PRIu64 " bytes%s",
- cfd->GetName().c_str(), job_id_, output_number,
- current_entries, current_bytes,
- meta->marked_for_compaction ? " (need compaction)" : "");
- }
+ sub_compact->current_output()->table_properties =
+ std::make_shared<TableProperties>(tp);
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
+ " keys, %" PRIu64 " bytes%s",
+ cfd->GetName().c_str(), job_id_, output_number,
+ current_entries, current_bytes,
+ meta->marked_for_compaction ? " (need compaction)" : "");
+ }
+ std::string fname;
+ FileDescriptor output_fd;
+ if (meta != nullptr) {
+ fname =
+ TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
+ meta->fd.GetNumber(), meta->fd.GetPathId());
+ output_fd = meta->fd;
+ } else {
+ fname = "(nil)";
}
- std::string fname = TableFileName(db_options_.db_paths, meta->fd.GetNumber(),
- meta->fd.GetPathId());
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
- job_id_, meta->fd, tp, TableFileCreationReason::kCompaction, s);
+ job_id_, output_fd, tp, TableFileCreationReason::kCompaction, s);
#ifndef ROCKSDB_LITE
// Report new file to SstFileManagerImpl
auto sfm =
static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
- if (sfm && meta->fd.GetPathId() == 0) {
- auto fn = TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(),
- meta->fd.GetPathId());
- sfm->OnAddFile(fn);
+ if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
+ sfm->OnAddFile(fname);
if (sfm->IsMaxAllowedSpaceReached()) {
+ // TODO(ajkr): should we return OK() if max space was reached by the final
+ // compaction output file (similarly to how flush works when full)?
+ s = Status::SpaceLimit("Max allowed space was reached");
+ TEST_SYNC_POINT(
+ "CompactionJob::FinishCompactionOutputFile:"
+ "MaxAllowedSpaceReached");
InstrumentedMutexLock l(db_mutex_);
- if (db_bg_error_->ok()) {
- s = Status::IOError("Max allowed space was reached");
- *db_bg_error_ = s;
- TEST_SYNC_POINT(
- "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached");
- }
+ db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
}
}
#endif
compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
}
- // Add compaction outputs
+ // Add compaction inputs
compaction->AddInputDeletions(compact_->compaction->edit());
for (const auto& sub_compact : compact_->sub_compact_states) {
assert(sub_compact->builder == nullptr);
// no need to lock because VersionSet::next_file_number_ is atomic
uint64_t file_number = versions_->NewFileNumber();
- std::string fname = TableFileName(db_options_.db_paths, file_number,
- sub_compact->compaction->output_path_id());
+ std::string fname =
+ TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
+ file_number, sub_compact->compaction->output_path_id());
// Fire events.
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
#ifndef ROCKSDB_LITE
TableFileCreationReason::kCompaction);
#endif // !ROCKSDB_LITE
// Make the output file
- unique_ptr<WritableFile> writable_file;
- EnvOptions opt_env_opts =
- env_->OptimizeForCompactionTableWrite(env_options_, db_options_);
+ std::unique_ptr<WritableFile> writable_file;
+#ifndef NDEBUG
+ bool syncpoint_arg = env_options_.use_direct_writes;
TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
- &opt_env_opts.use_direct_writes);
- Status s = NewWritableFile(env_, fname, &writable_file, opt_env_opts);
+ &syncpoint_arg);
+#endif
+ Status s = NewWritableFile(env_, fname, &writable_file, env_options_);
if (!s.ok()) {
ROCKS_LOG_ERROR(
db_options_.info_log,
sub_compact->outputs.push_back(out);
writable_file->SetIOPriority(Env::IO_LOW);
+ writable_file->SetWriteLifeTimeHint(write_hint_);
writable_file->SetPreallocationBlockSize(static_cast<size_t>(
sub_compact->compaction->OutputFilePreallocationSize()));
- sub_compact->outfile.reset(new WritableFileWriter(
- std::move(writable_file), env_options_, db_options_.statistics.get()));
+ const auto& listeners =
+ sub_compact->compaction->immutable_cf_options()->listeners;
+ sub_compact->outfile.reset(
+ new WritableFileWriter(std::move(writable_file), fname, env_options_,
+ env_, db_options_.statistics.get(), listeners));
// If the Column family flag is to only optimize filters for hits,
// we can skip creating filters if this is the bottommost_level where
// data is going to be found
bool skip_filters =
cfd->ioptions()->optimize_filters_for_hits && bottommost_level_;
+
+ uint64_t output_file_creation_time =
+ sub_compact->compaction->MaxInputFileCreationTime();
+ if (output_file_creation_time == 0) {
+ int64_t _current_time = 0;
+ auto status = db_options_.env->GetCurrentTime(&_current_time);
+ // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
+ if (!status.ok()) {
+ ROCKS_LOG_WARN(
+ db_options_.info_log,
+ "Failed to get current time to populate creation_time property. "
+ "Status: %s",
+ status.ToString().c_str());
+ }
+ output_file_creation_time = static_cast<uint64_t>(_current_time);
+ }
+
sub_compact->builder.reset(NewTableBuilder(
- *cfd->ioptions(), cfd->internal_comparator(),
- cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
- sub_compact->outfile.get(), sub_compact->compaction->output_compression(),
- cfd->ioptions()->compression_opts,
- sub_compact->compaction->output_level(),
- &sub_compact->compression_dict,
- skip_filters));
+ *cfd->ioptions(), *(sub_compact->compaction->mutable_cf_options()),
+ cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
+ cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
+ sub_compact->compaction->output_compression(),
+ 0 /*sample_for_compression */,
+ sub_compact->compaction->output_compression_opts(),
+ sub_compact->compaction->output_level(), skip_filters,
+ output_file_creation_time, 0 /* oldest_key_time */,
+ sub_compact->compaction->max_output_file_size()));
LogFlush(db_options_.info_log);
return s;
}
#ifndef ROCKSDB_LITE
namespace {
-void CopyPrefix(
- const Slice& src, size_t prefix_length, std::string* dst) {
+void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
assert(prefix_length > 0);
size_t length = src.size() > prefix_length ? prefix_length : src.size();
dst->assign(src.data(), length);
if (compaction->level(input_level) != compaction->output_level()) {
UpdateCompactionInputStatsHelper(
&compaction_stats_.num_input_files_in_non_output_levels,
- &compaction_stats_.bytes_read_non_output_levels,
- input_level);
+ &compaction_stats_.bytes_read_non_output_levels, input_level);
} else {
UpdateCompactionInputStatsHelper(
&compaction_stats_.num_input_files_in_output_level,
- &compaction_stats_.bytes_read_output_level,
- input_level);
+ &compaction_stats_.bytes_read_output_level, input_level);
}
}
}
}
-void CompactionJob::UpdateCompactionInputStatsHelper(
- int* num_files, uint64_t* bytes_read, int input_level) {
+void CompactionJob::UpdateCompactionInputStatsHelper(int* num_files,
+ uint64_t* bytes_read,
+ int input_level) {
const Compaction* compaction = compact_->compaction;
auto num_input_files = compaction->num_input_files(input_level);
*num_files += static_cast<int>(num_input_files);
// input information
compaction_job_stats_->total_input_bytes =
- stats.bytes_read_non_output_levels +
- stats.bytes_read_output_level;
- compaction_job_stats_->num_input_records =
- compact_->num_input_records;
+ stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
+ compaction_job_stats_->num_input_records = compact_->num_input_records;
compaction_job_stats_->num_input_files =
stats.num_input_files_in_non_output_levels +
stats.num_input_files_in_output_level;
// output information
compaction_job_stats_->total_output_bytes = stats.bytes_written;
- compaction_job_stats_->num_output_records =
- compact_->num_output_records;
+ compaction_job_stats_->num_output_records = compact_->num_output_records;
compaction_job_stats_->num_output_files = stats.num_output_files;
if (compact_->NumOutputFiles() > 0U) {
- CopyPrefix(
- compact_->SmallestUserKey(),
- CompactionJobStats::kMaxPrefixLength,
- &compaction_job_stats_->smallest_output_key_prefix);
- CopyPrefix(
- compact_->LargestUserKey(),
- CompactionJobStats::kMaxPrefixLength,
- &compaction_job_stats_->largest_output_key_prefix);
+ CopyPrefix(compact_->SmallestUserKey(),
+ CompactionJobStats::kMaxPrefixLength,
+ &compaction_job_stats_->smallest_output_key_prefix);
+ CopyPrefix(compact_->LargestUserKey(),
+ CompactionJobStats::kMaxPrefixLength,
+ &compaction_job_stats_->largest_output_key_prefix);
}
}
+#else
+ (void)stats;
#endif // !ROCKSDB_LITE
}
// build event logger report
auto stream = event_logger_->Log();
stream << "job" << job_id_ << "event"
- << "compaction_started";
+ << "compaction_started"
+ << "compaction_reason"
+ << GetCompactionReasonString(compaction->compaction_reason());
for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
stream << ("files_L" + ToString(compaction->level(i)));
stream.StartArray();