#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
+#include "db/range_del_aggregator.h"
#include "db/version_set.h"
#include "monitoring/iostats_context_imp.h"
#include "monitoring/perf_context_imp.h"
uint64_t overlapped_bytes = 0;
// A flag determine whether the key has been seen in ShouldStopBefore()
bool seen_key = false;
- std::string compression_dict;
SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
uint64_t size = 0)
approx_size(size),
grandparent_index(0),
overlapped_bytes(0),
- seen_key(false),
- compression_dict() {
+ seen_key(false) {
assert(compaction != nullptr);
}
grandparent_index = std::move(o.grandparent_index);
overlapped_bytes = std::move(o.overlapped_bytes);
seen_key = std::move(o.seen_key);
- compression_dict = std::move(o.compression_dict);
return *this;
}
- // Because member unique_ptrs do not have these.
+ // Because member std::unique_ptrs do not have these.
SubcompactionState(const SubcompactionState&) = delete;
SubcompactionState& operator=(const SubcompactionState&) = delete;
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
- const std::string& dbname, CompactionJobStats* compaction_job_stats)
+ const std::string& dbname, CompactionJobStats* compaction_job_stats,
+ Env::Priority thread_pri)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats),
bottommost_level_(false),
paranoid_file_checks_(paranoid_file_checks),
measure_io_stats_(measure_io_stats),
- write_hint_(Env::WLTH_NOT_SET) {
+ write_hint_(Env::WLTH_NOT_SET),
+ thread_pri_(thread_pri) {
assert(log_buffer_ != nullptr);
const auto* cfd = compact_->compaction->column_family_data();
ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
bottommost_level_ = c->bottommost_level();
if (c->ShouldFormSubcompactions()) {
- const uint64_t start_micros = env_->NowMicros();
- GenSubcompactionBoundaries();
- MeasureTime(stats_, SUBCOMPACTION_SETUP_TIME,
- env_->NowMicros() - start_micros);
-
+ {
+ StopWatch sw(env_, stats_, SUBCOMPACTION_SETUP_TIME);
+ GenSubcompactionBoundaries();
+ }
assert(sizes_.size() == boundaries_.size() + 1);
for (size_t i = 0; i <= boundaries_.size(); i++) {
Slice* end = i == boundaries_.size() ? nullptr : &boundaries_[i];
compact_->sub_compact_states.emplace_back(c, start, end, sizes_[i]);
}
- MeasureTime(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
- compact_->sub_compact_states.size());
+ RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
+ compact_->sub_compact_states.size());
} else {
compact_->sub_compact_states.emplace_back(c, nullptr, nullptr);
}
// size of data covered by keys in that range
uint64_t sum = 0;
std::vector<RangeWithSize> ranges;
- auto* v = cfd->current();
+ // Get input version from CompactionState since it's already referenced
+ // earlier in SetInputVersioCompaction::SetInputVersion and will not change
+ // when db_mutex_ is released below
+ auto* v = compact_->compaction->input_version();
for (auto it = bounds.begin();;) {
const Slice a = *it;
it++;
}
const Slice b = *it;
+
+ // ApproximateSize could potentially create table reader iterator to seek
+ // to the index block and may incur I/O cost in the process. Unlock db
+ // mutex to reduce contention
+ db_mutex_->Unlock();
uint64_t size = versions_->ApproximateSize(v, a, b, start_lvl, out_lvl + 1);
+ db_mutex_->Lock();
ranges.emplace_back(a, b, size);
sum += size;
}
}
compaction_stats_.micros = env_->NowMicros() - start_micros;
- MeasureTime(stats_, COMPACTION_TIME, compaction_stats_.micros);
+ compaction_stats_.cpu_micros = 0;
+ for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
+ compaction_stats_.cpu_micros +=
+ compact_->sub_compact_states[i].compaction_job_stats.cpu_micros;
+ }
+
+ RecordTimeToHistogram(stats_, COMPACTION_TIME, compaction_stats_.micros);
+ RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
+ compaction_stats_.cpu_micros);
TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
Status status = compact_->status;
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
cfd->internal_stats()->AddCompactionStats(
- compact_->compaction->output_level(), compaction_stats_);
+ compact_->compaction->output_level(), thread_pri_, compaction_stats_);
if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options);
stream << "job" << job_id_ << "event"
<< "compaction_finished"
<< "compaction_time_micros" << compaction_stats_.micros
+ << "compaction_time_cpu_micros" << compaction_stats_.cpu_micros
<< "output_level" << compact_->compaction->output_level()
<< "num_output_files" << compact_->NumOutputFiles()
<< "total_output_size" << compact_->total_bytes << "num_input_records"
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact != nullptr);
+
+ uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
+
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
- std::unique_ptr<RangeDelAggregator> range_del_agg(
- new RangeDelAggregator(cfd->internal_comparator(), existing_snapshots_));
+
+ // Create compaction filter and fail the compaction if
+ // IgnoreSnapshots() = false because it is not supported anymore
+ const CompactionFilter* compaction_filter =
+ cfd->ioptions()->compaction_filter;
+ std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
+ if (compaction_filter == nullptr) {
+ compaction_filter_from_factory =
+ sub_compact->compaction->CreateCompactionFilter();
+ compaction_filter = compaction_filter_from_factory.get();
+ }
+ if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
+ sub_compact->status = Status::NotSupported(
+ "CompactionFilter::IgnoreSnapshots() = false is not supported "
+ "anymore.");
+ return;
+ }
+
+ CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
+ existing_snapshots_);
+
+ // Although the v2 aggregator is what the level iterator(s) know about,
+ // the AddTombstones calls will be propagated down to the v1 aggregator.
std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
- sub_compact->compaction, range_del_agg.get(), env_optiosn_for_read_));
+ sub_compact->compaction, &range_del_agg, env_optiosn_for_read_));
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
uint64_t prev_fsync_nanos = 0;
uint64_t prev_range_sync_nanos = 0;
uint64_t prev_prepare_write_nanos = 0;
+ uint64_t prev_cpu_write_nanos = 0;
+ uint64_t prev_cpu_read_nanos = 0;
if (measure_io_stats_) {
prev_perf_level = GetPerfLevel();
- SetPerfLevel(PerfLevel::kEnableTime);
+ SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
prev_write_nanos = IOSTATS(write_nanos);
prev_fsync_nanos = IOSTATS(fsync_nanos);
prev_range_sync_nanos = IOSTATS(range_sync_nanos);
prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
+ prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
+ prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
}
- const MutableCFOptions* mutable_cf_options =
- sub_compact->compaction->mutable_cf_options();
-
- // To build compression dictionary, we sample the first output file, assuming
- // it'll reach the maximum length. We optionally pass these samples through
- // zstd's dictionary trainer, or just use them directly. Then, the dictionary
- // is used for compressing subsequent output files in the same subcompaction.
- const bool kUseZstdTrainer =
- sub_compact->compaction->output_compression_opts().zstd_max_train_bytes >
- 0;
- const size_t kSampleBytes =
- kUseZstdTrainer
- ? sub_compact->compaction->output_compression_opts()
- .zstd_max_train_bytes
- : sub_compact->compaction->output_compression_opts().max_dict_bytes;
- const int kSampleLenShift = 6; // 2^6 = 64-byte samples
- std::set<size_t> sample_begin_offsets;
- if (bottommost_level_ && kSampleBytes > 0) {
- const size_t kMaxSamples = kSampleBytes >> kSampleLenShift;
- const size_t kOutFileLen =
- static_cast<size_t>(MaxFileSizeForLevel(*mutable_cf_options,
- compact_->compaction->output_level(),
- cfd->ioptions()->compaction_style,
- compact_->compaction->GetInputBaseLevel(),
- cfd->ioptions()->level_compaction_dynamic_level_bytes));
- if (kOutFileLen != port::kMaxSizet) {
- const size_t kOutFileNumSamples = kOutFileLen >> kSampleLenShift;
- Random64 generator{versions_->NewFileNumber()};
- for (size_t i = 0; i < kMaxSamples; ++i) {
- sample_begin_offsets.insert(
- static_cast<size_t>(generator.Uniform(kOutFileNumSamples))
- << kSampleLenShift);
- }
- }
- }
-
- auto compaction_filter = cfd->ioptions()->compaction_filter;
- std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
- if (compaction_filter == nullptr) {
- compaction_filter_from_factory =
- sub_compact->compaction->CreateCompactionFilter();
- compaction_filter = compaction_filter_from_factory.get();
- }
MergeHelper merge(
env_, cfd->user_comparator(), cfd->ioptions()->merge_operator,
compaction_filter, db_options_.info_log.get(),
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_,
snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
- range_del_agg.get(), sub_compact->compaction, compaction_filter,
+ &range_del_agg, sub_compact->compaction, compaction_filter,
shutting_down_, preserve_deletes_seqnum_));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
sub_compact->current_output_file_size);
}
const auto& c_iter_stats = c_iter->iter_stats();
- auto sample_begin_offset_iter = sample_begin_offsets.cbegin();
- // data_begin_offset and dict_sample_data are only valid while generating
- // dictionary from the first output file.
- size_t data_begin_offset = 0;
- std::string dict_sample_data;
- dict_sample_data.reserve(kSampleBytes);
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
key, c_iter->ikey().sequence);
sub_compact->num_output_records++;
- if (sub_compact->outputs.size() == 1) { // first output file
- // Check if this key/value overlaps any sample intervals; if so, appends
- // overlapping portions to the dictionary.
- for (const auto& data_elmt : {key, value}) {
- size_t data_end_offset = data_begin_offset + data_elmt.size();
- while (sample_begin_offset_iter != sample_begin_offsets.cend() &&
- *sample_begin_offset_iter < data_end_offset) {
- size_t sample_end_offset =
- *sample_begin_offset_iter + (1 << kSampleLenShift);
- // Invariant: Because we advance sample iterator while processing the
- // data_elmt containing the sample's last byte, the current sample
- // cannot end before the current data_elmt.
- assert(data_begin_offset < sample_end_offset);
-
- size_t data_elmt_copy_offset, data_elmt_copy_len;
- if (*sample_begin_offset_iter <= data_begin_offset) {
- // The sample starts before data_elmt starts, so take bytes starting
- // at the beginning of data_elmt.
- data_elmt_copy_offset = 0;
- } else {
- // data_elmt starts before the sample starts, so take bytes starting
- // at the below offset into data_elmt.
- data_elmt_copy_offset =
- *sample_begin_offset_iter - data_begin_offset;
- }
- if (sample_end_offset <= data_end_offset) {
- // The sample ends before data_elmt ends, so take as many bytes as
- // needed.
- data_elmt_copy_len =
- sample_end_offset - (data_begin_offset + data_elmt_copy_offset);
- } else {
- // data_elmt ends before the sample ends, so take all remaining
- // bytes in data_elmt.
- data_elmt_copy_len =
- data_end_offset - (data_begin_offset + data_elmt_copy_offset);
- }
- dict_sample_data.append(&data_elmt.data()[data_elmt_copy_offset],
- data_elmt_copy_len);
- if (sample_end_offset > data_end_offset) {
- // Didn't finish sample. Try to finish it with the next data_elmt.
- break;
- }
- // Next sample may require bytes from same data_elmt.
- sample_begin_offset_iter++;
- }
- data_begin_offset = data_end_offset;
- }
- }
-
// Close output file if it is big enough. Two possibilities determine it's
// time to close it: (1) the current key should be this file's last key, (2)
// the next key should not be in this file.
next_key = &c_iter->key();
}
CompactionIterationStats range_del_out_stats;
- status = FinishCompactionOutputFile(input_status, sub_compact,
- range_del_agg.get(),
- &range_del_out_stats, next_key);
+ status =
+ FinishCompactionOutputFile(input_status, sub_compact, &range_del_agg,
+ &range_del_out_stats, next_key);
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
- if (sub_compact->outputs.size() == 1) {
- // Use samples from first output file to create dictionary for
- // compression of subsequent files.
- if (kUseZstdTrainer) {
- sub_compact->compression_dict = ZSTD_TrainDictionary(
- dict_sample_data, kSampleLenShift,
- sub_compact->compaction->output_compression_opts()
- .max_dict_bytes);
- } else {
- sub_compact->compression_dict = std::move(dict_sample_data);
- }
- }
}
}
}
if (status.ok() && sub_compact->builder == nullptr &&
- sub_compact->outputs.size() == 0 &&
- !range_del_agg->IsEmpty()) {
+ sub_compact->outputs.size() == 0 && !range_del_agg.IsEmpty()) {
// handle subcompaction containing only range deletions
status = OpenCompactionOutputFile(sub_compact);
}
// close the output file.
if (sub_compact->builder != nullptr) {
CompactionIterationStats range_del_out_stats;
- Status s = FinishCompactionOutputFile(
- status, sub_compact, range_del_agg.get(), &range_del_out_stats);
+ Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
+ &range_del_out_stats);
if (status.ok()) {
status = s;
}
RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
}
+ sub_compact->compaction_job_stats.cpu_micros =
+ env_->NowCPUNanos() / 1000 - prev_cpu_micros;
+
if (measure_io_stats_) {
sub_compact->compaction_job_stats.file_write_nanos +=
IOSTATS(write_nanos) - prev_write_nanos;
IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
sub_compact->compaction_job_stats.file_prepare_write_nanos +=
IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
- if (prev_perf_level != PerfLevel::kEnableTime) {
+ sub_compact->compaction_job_stats.cpu_micros -=
+ (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
+ IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
+ 1000;
+ if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
SetPerfLevel(prev_perf_level);
}
}
Status CompactionJob::FinishCompactionOutputFile(
const Status& input_status, SubcompactionState* sub_compact,
- RangeDelAggregator* range_del_agg,
+ CompactionRangeDelAggregator* range_del_agg,
CompactionIterationStats* range_del_out_stats,
const Slice* next_table_min_key /* = nullptr */) {
AutoThreadOperationStageUpdater stage_updater(
Slice lower_bound_guard, upper_bound_guard;
std::string smallest_user_key;
const Slice *lower_bound, *upper_bound;
+ bool lower_bound_from_sub_compact = false;
if (sub_compact->outputs.size() == 1) {
// For the first output table, include range tombstones before the min key
// but after the subcompaction boundary.
lower_bound = sub_compact->start;
+ lower_bound_from_sub_compact = true;
} else if (meta->smallest.size() > 0) {
// For subsequent output tables, only include range tombstones from min
// key onwards since the previous file was extended to contain range
lower_bound = nullptr;
}
if (next_table_min_key != nullptr) {
- // This isn't the last file in the subcompaction, so extend until the next
- // file starts.
+ // This may be the last file in the subcompaction in some cases, so we
+ // need to compare the end key of subcompaction with the next file start
+ // key. When the end key is chosen by the subcompaction, we know that
+ // it must be the biggest key in output file. Therefore, it is safe to
+ // use the smaller key as the upper bound of the output file, to ensure
+ // that there is no overlapping between different output files.
upper_bound_guard = ExtractUserKey(*next_table_min_key);
- upper_bound = &upper_bound_guard;
+ if (sub_compact->end != nullptr &&
+ ucmp->Compare(upper_bound_guard, *sub_compact->end) >= 0) {
+ upper_bound = sub_compact->end;
+ } else {
+ upper_bound = &upper_bound_guard;
+ }
} else {
// This is the last file in the subcompaction, so extend until the
// subcompaction ends.
if (existing_snapshots_.size() > 0) {
earliest_snapshot = existing_snapshots_[0];
}
- auto it = range_del_agg->NewIterator();
+ bool has_overlapping_endpoints;
+ if (upper_bound != nullptr && meta->largest.size() > 0) {
+ has_overlapping_endpoints =
+ ucmp->Compare(meta->largest.user_key(), *upper_bound) == 0;
+ } else {
+ has_overlapping_endpoints = false;
+ }
+
+ // The end key of the subcompaction must be bigger or equal to the upper
+ // bound. If the end of subcompaction is null or the upper bound is null,
+ // it means that this file is the last file in the compaction. So there
+ // will be no overlapping between this file and others.
+ assert(sub_compact->end == nullptr ||
+ upper_bound == nullptr ||
+ ucmp->Compare(*upper_bound , *sub_compact->end) <= 0);
+ auto it = range_del_agg->NewIterator(lower_bound, upper_bound,
+ has_overlapping_endpoints);
+ // Position the range tombstone output iterator. There may be tombstone
+ // fragments that are entirely out of range, so make sure that we do not
+ // include those.
if (lower_bound != nullptr) {
it->Seek(*lower_bound);
+ } else {
+ it->SeekToFirst();
}
for (; it->Valid(); it->Next()) {
auto tombstone = it->Tombstone();
- if (upper_bound != nullptr &&
- ucmp->Compare(*upper_bound, tombstone.start_key_) <= 0) {
- // Tombstones starting at upper_bound or later only need to be included
- // in the next table. Break because subsequent tombstones will start
- // even later.
- break;
+ if (upper_bound != nullptr) {
+ int cmp = ucmp->Compare(*upper_bound, tombstone.start_key_);
+ if ((has_overlapping_endpoints && cmp < 0) ||
+ (!has_overlapping_endpoints && cmp <= 0)) {
+ // Tombstones starting after upper_bound only need to be included in
+ // the next table. If the current SST ends before upper_bound, i.e.,
+ // `has_overlapping_endpoints == false`, we can also skip over range
+ // tombstones that start exactly at upper_bound. Such range tombstones
+ // will be included in the next file and are not relevant to the point
+ // keys or endpoints of the current file.
+ break;
+ }
}
if (bottommost_level_ && tombstone.seq_ <= earliest_snapshot) {
}
auto kv = tombstone.Serialize();
+ assert(lower_bound == nullptr ||
+ ucmp->Compare(*lower_bound, kv.second) < 0);
sub_compact->builder->Add(kv.first.Encode(), kv.second);
InternalKey smallest_candidate = std::move(kv.first);
if (lower_bound != nullptr &&
// (the max key in the previous table or subcompaction) in order for
// files to appear key-space partitioned.
//
- // Choose lowest seqnum so this file's smallest internal key comes
- // after the previous file's/subcompaction's largest. The fake seqnum
- // is OK because the read path's file-picking code only considers user
- // key.
- smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion);
+ // When lower_bound is chosen by a subcompaction, we know that
+ // subcompactions over smaller keys cannot contain any keys at
+ // lower_bound. We also know that smaller subcompactions exist, because
+ // otherwise the subcompaction woud be unbounded on the left. As a
+ // result, we know that no other files on the output level will contain
+ // actual keys at lower_bound (an output file may have a largest key of
+ // lower_bound@kMaxSequenceNumber, but this only indicates a large range
+ // tombstone was truncated). Therefore, it is safe to use the
+ // tombstone's sequence number, to ensure that keys at lower_bound at
+ // lower levels are covered by truncated tombstones.
+ //
+ // If lower_bound was chosen by the smallest data key in the file,
+ // choose lowest seqnum so this file's smallest internal key comes after
+ // the previous file's largest. The fake seqnum is OK because the read
+ // path's file-picking code only considers user key.
+ smallest_candidate = InternalKey(
+ *lower_bound, lower_bound_from_sub_compact ? tombstone.seq_ : 0,
+ kTypeRangeDeletion);
}
InternalKey largest_candidate = tombstone.SerializeEndKey();
if (upper_bound != nullptr &&
largest_candidate =
InternalKey(*upper_bound, kMaxSequenceNumber, kTypeRangeDeletion);
}
+#ifndef NDEBUG
+ SequenceNumber smallest_ikey_seqnum = kMaxSequenceNumber;
+ if (meta->smallest.size() > 0) {
+ smallest_ikey_seqnum = GetInternalKeySeqno(meta->smallest.Encode());
+ }
+#endif
meta->UpdateBoundariesForRange(smallest_candidate, largest_candidate,
tombstone.seq_,
cfd->internal_comparator());
+
+ // The smallest key in a file is used for range tombstone truncation, so
+ // it cannot have a seqnum of 0 (unless the smallest data key in a file
+ // has a seqnum of 0). Otherwise, the truncated tombstone may expose
+ // deleted keys at lower levels.
+ assert(smallest_ikey_seqnum == 0 ||
+ ExtractInternalKeyFooter(meta->smallest.Encode()) !=
+ PackSequenceAndType(0, kTypeRangeDeletion));
}
meta->marked_for_compaction = sub_compact->builder->NeedCompact();
}
auto sfm =
static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
- auto fn =
- TableFileName(sub_compact->compaction->immutable_cf_options()->cf_paths,
- meta->fd.GetNumber(), meta->fd.GetPathId());
- sfm->OnAddFile(fn);
+ sfm->OnAddFile(fname);
if (sfm->IsMaxAllowedSpaceReached()) {
// TODO(ajkr): should we return OK() if max space was reached by the final
// compaction output file (similarly to how flush works when full)?
TableFileCreationReason::kCompaction);
#endif // !ROCKSDB_LITE
// Make the output file
- unique_ptr<WritableFile> writable_file;
+ std::unique_ptr<WritableFile> writable_file;
#ifndef NDEBUG
bool syncpoint_arg = env_options_.use_direct_writes;
TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
writable_file->SetWriteLifeTimeHint(write_hint_);
writable_file->SetPreallocationBlockSize(static_cast<size_t>(
sub_compact->compaction->OutputFilePreallocationSize()));
+ const auto& listeners =
+ sub_compact->compaction->immutable_cf_options()->listeners;
sub_compact->outfile.reset(
new WritableFileWriter(std::move(writable_file), fname, env_options_,
- db_options_.statistics.get()));
+ env_, db_options_.statistics.get(), listeners));
// If the Column family flag is to only optimize filters for hits,
// we can skip creating filters if this is the bottommost_level where
cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
cfd->GetID(), cfd->GetName(), sub_compact->outfile.get(),
sub_compact->compaction->output_compression(),
+ 0 /*sample_for_compression */,
sub_compact->compaction->output_compression_opts(),
- sub_compact->compaction->output_level(), &sub_compact->compression_dict,
- skip_filters, output_file_creation_time));
+ sub_compact->compaction->output_level(), skip_filters,
+ output_file_creation_time, 0 /* oldest_key_time */,
+ sub_compact->compaction->max_output_file_size()));
LogFlush(db_options_.info_log);
return s;
}