// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#include "db/compaction/compaction_job.h"
+
#include <algorithm>
#include <cinttypes>
#include <functional>
#include <utility>
#include <vector>
+#include "db/blob/blob_file_addition.h"
+#include "db/blob/blob_file_builder.h"
#include "db/builder.h"
-#include "db/compaction/compaction_job.h"
#include "db/db_impl/db_impl.h"
#include "db/db_iter.h"
#include "db/dbformat.h"
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
+#include "db/output_validator.h"
#include "db/range_del_aggregator.h"
#include "db/version_set.h"
#include "file/filename.h"
#include "port/port.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
+#include "rocksdb/sst_partitioner.h"
#include "rocksdb/statistics.h"
#include "rocksdb/status.h"
#include "rocksdb/table.h"
#include "table/table_builder.h"
#include "test_util/sync_point.h"
#include "util/coding.h"
+#include "util/hash.h"
#include "util/mutexlock.h"
#include "util/random.h"
#include "util/stop_watch.h"
// The return status of this subcompaction
Status status;
+ // The return IO Status of this subcompaction
+ IOStatus io_status;
+
// Files produced by this subcompaction
struct Output {
+ Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
+ bool _enable_order_check, bool _enable_hash)
+ : meta(std::move(_meta)),
+ validator(_icmp, _enable_order_check, _enable_hash),
+ finished(false) {}
FileMetaData meta;
+ OutputValidator validator;
bool finished;
std::shared_ptr<const TableProperties> table_properties;
};
// State kept for output being generated
std::vector<Output> outputs;
+ std::vector<BlobFileAddition> blob_file_additions;
std::unique_ptr<WritableFileWriter> outfile;
std::unique_ptr<TableBuilder> builder;
+
Output* current_output() {
if (outputs.empty()) {
- // This subcompaction's outptut could be empty if compaction was aborted
+ // This subcompaction's output could be empty if compaction was aborted
// before this subcompaction had a chance to generate any output files.
// When subcompactions are executed sequentially this is more likely and
// will be particulalry likely for the later subcompactions to be empty.
}
}
- uint64_t current_output_file_size;
+ uint64_t current_output_file_size = 0;
// State during the subcompaction
- uint64_t total_bytes;
- uint64_t num_output_records;
+ uint64_t total_bytes = 0;
+ uint64_t num_output_records = 0;
CompactionJobStats compaction_job_stats;
- uint64_t approx_size;
+ uint64_t approx_size = 0;
// An index that used to speed up ShouldStopBefore().
size_t grandparent_index = 0;
// The number of bytes overlapping between the current output and
// A flag determine whether the key has been seen in ShouldStopBefore()
bool seen_key = false;
- SubcompactionState(Compaction* c, Slice* _start, Slice* _end,
- uint64_t size = 0)
- : compaction(c),
- start(_start),
- end(_end),
- outfile(nullptr),
- builder(nullptr),
- current_output_file_size(0),
- total_bytes(0),
- num_output_records(0),
- approx_size(size),
- grandparent_index(0),
- overlapped_bytes(0),
- seen_key(false) {
+ SubcompactionState(Compaction* c, Slice* _start, Slice* _end, uint64_t size)
+ : compaction(c), start(_start), end(_end), approx_size(size) {
assert(compaction != nullptr);
}
- SubcompactionState(SubcompactionState&& o) { *this = std::move(o); }
-
- SubcompactionState& operator=(SubcompactionState&& o) {
- compaction = std::move(o.compaction);
- start = std::move(o.start);
- end = std::move(o.end);
- status = std::move(o.status);
- outputs = std::move(o.outputs);
- outfile = std::move(o.outfile);
- builder = std::move(o.builder);
- current_output_file_size = std::move(o.current_output_file_size);
- total_bytes = std::move(o.total_bytes);
- num_output_records = std::move(o.num_output_records);
- compaction_job_stats = std::move(o.compaction_job_stats);
- approx_size = std::move(o.approx_size);
- grandparent_index = std::move(o.grandparent_index);
- overlapped_bytes = std::move(o.overlapped_bytes);
- seen_key = std::move(o.seen_key);
- return *this;
+ // Adds the key and value to the builder
+ // If paranoid is true, adds the key-value to the paranoid hash
+ Status AddToBuilder(const Slice& key, const Slice& value) {
+ auto curr = current_output();
+ assert(builder != nullptr);
+ assert(curr != nullptr);
+ Status s = curr->validator.Add(key, value);
+ if (!s.ok()) {
+ return s;
+ }
+ builder->Add(key, value);
+ return Status::OK();
}
- // Because member std::unique_ptrs do not have these.
- SubcompactionState(const SubcompactionState&) = delete;
-
- SubcompactionState& operator=(const SubcompactionState&) = delete;
-
// Returns true iff we should stop building the current output
// before processing "internal_key".
bool ShouldStopBefore(const Slice& internal_key, uint64_t curr_file_size) {
std::vector<CompactionJob::SubcompactionState> sub_compact_states;
Status status;
- uint64_t total_bytes;
- uint64_t num_output_records;
+ size_t num_output_files = 0;
+ uint64_t total_bytes = 0;
+ size_t num_blob_output_files = 0;
+ uint64_t total_blob_bytes = 0;
+ uint64_t num_output_records = 0;
- explicit CompactionState(Compaction* c)
- : compaction(c),
- total_bytes(0),
- num_output_records(0) {}
-
- size_t NumOutputFiles() {
- size_t total = 0;
- for (auto& s : sub_compact_states) {
- total += s.outputs.size();
- }
- return total;
- }
+ explicit CompactionState(Compaction* c) : compaction(c) {}
Slice SmallestUserKey() {
for (const auto& sub_compact_state : sub_compact_states) {
};
void CompactionJob::AggregateStatistics() {
+ assert(compact_);
+
for (SubcompactionState& sc : compact_->sub_compact_states) {
+ auto& outputs = sc.outputs;
+
+ if (!outputs.empty() && !outputs.back().meta.fd.file_size) {
+ // An error occurred, so ignore the last output.
+ outputs.pop_back();
+ }
+
+ compact_->num_output_files += outputs.size();
compact_->total_bytes += sc.total_bytes;
- compact_->num_output_records += sc.num_output_records;
- }
- if (compaction_job_stats_) {
- for (SubcompactionState& sc : compact_->sub_compact_states) {
- compaction_job_stats_->Add(sc.compaction_job_stats);
+
+ const auto& blobs = sc.blob_file_additions;
+
+ compact_->num_blob_output_files += blobs.size();
+
+ for (const auto& blob : blobs) {
+ compact_->total_blob_bytes += blob.GetTotalBlobBytes();
}
+
+ compact_->num_output_records += sc.num_output_records;
+
+ compaction_job_stats_->Add(sc.compaction_job_stats);
}
}
const FileOptions& file_options, VersionSet* versions,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer,
- Directory* db_directory, Directory* output_directory, Statistics* stats,
+ FSDirectory* db_directory, FSDirectory* output_directory,
+ FSDirectory* blob_output_directory, Statistics* stats,
InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
std::vector<SequenceNumber> existing_snapshots,
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, std::shared_ptr<Cache> table_cache,
EventLogger* event_logger, bool paranoid_file_checks, bool measure_io_stats,
const std::string& dbname, CompactionJobStats* compaction_job_stats,
- Env::Priority thread_pri, const std::atomic<bool>* manual_compaction_paused)
+ Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
+ const std::atomic<int>* manual_compaction_paused, const std::string& db_id,
+ const std::string& db_session_id, std::string full_history_ts_low)
: job_id_(job_id),
compact_(new CompactionState(compaction)),
compaction_job_stats_(compaction_job_stats),
compaction_stats_(compaction->compaction_reason(), 1),
dbname_(dbname),
+ db_id_(db_id),
+ db_session_id_(db_session_id),
db_options_(db_options),
file_options_(file_options),
env_(db_options.env),
- fs_(db_options.fs.get()),
+ io_tracer_(io_tracer),
+ fs_(db_options.fs, io_tracer),
file_options_for_read_(
fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
versions_(versions),
log_buffer_(log_buffer),
db_directory_(db_directory),
output_directory_(output_directory),
+ blob_output_directory_(blob_output_directory),
stats_(stats),
db_mutex_(db_mutex),
db_error_handler_(db_error_handler),
paranoid_file_checks_(paranoid_file_checks),
measure_io_stats_(measure_io_stats),
write_hint_(Env::WLTH_NOT_SET),
- thread_pri_(thread_pri) {
+ thread_pri_(thread_pri),
+ full_history_ts_low_(std::move(full_history_ts_low)) {
+ assert(compaction_job_stats_ != nullptr);
assert(log_buffer_ != nullptr);
const auto* cfd = compact_->compaction->column_family_data();
ThreadStatusUtil::SetColumnFamily(cfd, cfd->ioptions()->env,
// to ensure GetThreadList() can always show them all together.
ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
- if (compaction_job_stats_) {
- compaction_job_stats_->is_manual_compaction =
- compaction->is_manual_compaction();
- }
+ compaction_job_stats_->is_manual_compaction =
+ compaction->is_manual_compaction();
+ compaction_job_stats_->is_full_compaction = compaction->is_full_compaction();
}
void CompactionJob::Prepare() {
RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
compact_->sub_compact_states.size());
} else {
- compact_->sub_compact_states.emplace_back(c, nullptr, nullptr);
+ constexpr Slice* start = nullptr;
+ constexpr Slice* end = nullptr;
+ constexpr uint64_t size = 0;
+
+ compact_->sub_compact_states.emplace_back(c, start, end, size);
}
}
// Greedily add ranges to the subcompaction until the sum of the ranges'
// sizes becomes >= the expected mean size of a subcompaction
sum = 0;
- for (size_t i = 0; i < ranges.size() - 1; i++) {
+ for (size_t i = 0; i + 1 < ranges.size(); i++) {
sum += ranges[i].size;
if (subcompactions == 1) {
// If there's only one left to schedule then it goes to the end so no
// Check if any thread encountered an error during execution
Status status;
+ IOStatus io_s;
+ bool wrote_new_blob_files = false;
+
for (const auto& state : compact_->sub_compact_states) {
if (!state.status.ok()) {
status = state.status;
+ io_s = state.io_status;
break;
}
+
+ if (!state.blob_file_additions.empty()) {
+ wrote_new_blob_files = true;
+ }
}
- if (status.ok() && output_directory_) {
- status = output_directory_->Fsync();
+ if (io_status_.ok()) {
+ io_status_ = io_s;
}
+ if (status.ok()) {
+ constexpr IODebugContext* dbg = nullptr;
+
+ if (output_directory_) {
+ io_s = output_directory_->Fsync(IOOptions(), dbg);
+ }
+ if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
+ blob_output_directory_ != output_directory_) {
+ io_s = blob_output_directory_->Fsync(IOOptions(), dbg);
+ }
+ }
+ if (io_status_.ok()) {
+ io_status_ = io_s;
+ }
+ if (status.ok()) {
+ status = io_s;
+ }
if (status.ok()) {
thread_pool.clear();
- std::vector<const FileMetaData*> files_meta;
+ std::vector<const CompactionJob::SubcompactionState::Output*> files_output;
for (const auto& state : compact_->sub_compact_states) {
for (const auto& output : state.outputs) {
- files_meta.emplace_back(&output.meta);
+ files_output.emplace_back(&output);
}
}
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
auto prefix_extractor =
compact_->compaction->mutable_cf_options()->prefix_extractor.get();
- std::atomic<size_t> next_file_meta_idx(0);
+ std::atomic<size_t> next_file_idx(0);
auto verify_table = [&](Status& output_status) {
while (true) {
- size_t file_idx = next_file_meta_idx.fetch_add(1);
- if (file_idx >= files_meta.size()) {
+ size_t file_idx = next_file_idx.fetch_add(1);
+ if (file_idx >= files_output.size()) {
break;
}
// Verify that the table is usable
// No matter whether use_direct_io_for_flush_and_compaction is true,
// we will regard this verification as user reads since the goal is
// to cache it here for further user reads
+ ReadOptions read_options;
InternalIterator* iter = cfd->table_cache()->NewIterator(
- ReadOptions(), file_options_, cfd->internal_comparator(),
- *files_meta[file_idx], /*range_del_agg=*/nullptr, prefix_extractor,
+ read_options, file_options_, cfd->internal_comparator(),
+ files_output[file_idx]->meta, /*range_del_agg=*/nullptr,
+ prefix_extractor,
/*table_reader_ptr=*/nullptr,
cfd->internal_stats()->GetFileReadHist(
compact_->compaction->output_level()),
TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
/*skip_filters=*/false, compact_->compaction->output_level(),
+ MaxFileSizeForL0MetaPin(
+ *compact_->compaction->mutable_cf_options()),
/*smallest_compaction_key=*/nullptr,
- /*largest_compaction_key=*/nullptr);
+ /*largest_compaction_key=*/nullptr,
+ /*allow_unprepared_value=*/false);
auto s = iter->status();
if (s.ok() && paranoid_file_checks_) {
- for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {}
- s = iter->status();
+ OutputValidator validator(cfd->internal_comparator(),
+ /*_enable_order_check=*/true,
+ /*_enable_hash=*/true);
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ s = validator.Add(iter->key(), iter->value());
+ if (!s.ok()) {
+ break;
+ }
+ }
+ if (s.ok()) {
+ s = iter->status();
+ }
+ if (s.ok() &&
+ !validator.CompareValidator(files_output[file_idx]->validator)) {
+ s = Status::Corruption("Paranoid checksums do not match");
+ }
}
delete iter;
// Finish up all book-keeping to unify the subcompaction results
AggregateStatistics();
UpdateCompactionStats();
+
RecordCompactionIOStats();
LogFlush(db_options_.info_log);
TEST_SYNC_POINT("CompactionJob::Run():End");
}
Status CompactionJob::Install(const MutableCFOptions& mutable_cf_options) {
+ assert(compact_);
+
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_INSTALL);
db_mutex_->AssertHeld();
Status status = compact_->status;
+
ColumnFamilyData* cfd = compact_->compaction->column_family_data();
+ assert(cfd);
+
cfd->internal_stats()->AddCompactionStats(
compact_->compaction->output_level(), thread_pri_, compaction_stats_);
if (status.ok()) {
status = InstallCompactionResults(mutable_cf_options);
}
+ if (!versions_->io_status().ok()) {
+ io_status_ = versions_->io_status();
+ }
+
VersionStorageInfo::LevelSummaryStorage tmp;
auto vstorage = cfd->current()->storage_info();
const auto& stats = compaction_stats_;
stats.bytes_written / static_cast<double>(stats.micros);
}
+ const std::string& column_family_name = cfd->GetName();
+
ROCKS_LOG_BUFFER(
log_buffer_,
"[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s, records in: %" PRIu64
", records dropped: %" PRIu64 " output_compression: %s\n",
- cfd->GetName().c_str(), vstorage->LevelSummary(&tmp), bytes_read_per_sec,
- bytes_written_per_sec, compact_->compaction->output_level(),
+ column_family_name.c_str(), vstorage->LevelSummary(&tmp),
+ bytes_read_per_sec, bytes_written_per_sec,
+ compact_->compaction->output_level(),
stats.num_input_files_in_non_output_levels,
stats.num_input_files_in_output_level, stats.num_output_files,
stats.bytes_read_non_output_levels / 1048576.0,
CompressionTypeToString(compact_->compaction->output_compression())
.c_str());
+ const auto& blob_files = vstorage->GetBlobFiles();
+ if (!blob_files.empty()) {
+ ROCKS_LOG_BUFFER(log_buffer_,
+ "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64
+ "\n",
+ column_family_name.c_str(), blob_files.begin()->first,
+ blob_files.rbegin()->first);
+ }
+
UpdateCompactionJobStats(stats);
auto stream = event_logger_->LogToBuffer(log_buffer_);
<< "compaction_time_micros" << stats.micros
<< "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
<< compact_->compaction->output_level() << "num_output_files"
- << compact_->NumOutputFiles() << "total_output_size"
- << compact_->total_bytes << "num_input_records"
- << stats.num_input_records << "num_output_records"
- << compact_->num_output_records << "num_subcompactions"
- << compact_->sub_compact_states.size() << "output_compression"
- << CompressionTypeToString(compact_->compaction->output_compression());
+ << compact_->num_output_files << "total_output_size"
+ << compact_->total_bytes;
- if (compaction_job_stats_ != nullptr) {
- stream << "num_single_delete_mismatches"
- << compaction_job_stats_->num_single_del_mismatch;
- stream << "num_single_delete_fallthrough"
- << compaction_job_stats_->num_single_del_fallthru;
+ if (compact_->num_blob_output_files > 0) {
+ stream << "num_blob_output_files" << compact_->num_blob_output_files
+ << "total_blob_output_size" << compact_->total_blob_bytes;
}
- if (measure_io_stats_ && compaction_job_stats_ != nullptr) {
+ stream << "num_input_records" << stats.num_input_records
+ << "num_output_records" << compact_->num_output_records
+ << "num_subcompactions" << compact_->sub_compact_states.size()
+ << "output_compression"
+ << CompressionTypeToString(compact_->compaction->output_compression());
+
+ stream << "num_single_delete_mismatches"
+ << compaction_job_stats_->num_single_del_mismatch;
+ stream << "num_single_delete_fallthrough"
+ << compaction_job_stats_->num_single_del_fallthru;
+
+ if (measure_io_stats_) {
stream << "file_write_nanos" << compaction_job_stats_->file_write_nanos;
stream << "file_range_sync_nanos"
<< compaction_job_stats_->file_range_sync_nanos;
}
stream.EndArray();
+ if (!blob_files.empty()) {
+ stream << "blob_file_head" << blob_files.begin()->first;
+ stream << "blob_file_tail" << blob_files.rbegin()->first;
+ }
+
CleanupCompaction();
return status;
}
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
- assert(sub_compact != nullptr);
+ assert(sub_compact);
+ assert(sub_compact->compaction);
uint64_t prev_cpu_micros = env_->NowCPUNanos() / 1000;
CompactionRangeDelAggregator range_del_agg(&cfd->internal_comparator(),
existing_snapshots_);
+ ReadOptions read_options;
+ read_options.verify_checksums = true;
+ read_options.fill_cache = false;
+ // Compaction iterators shouldn't be confined to a single prefix.
+ // Compactions use Seek() for
+ // (a) concurrent compactions,
+ // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
+ read_options.total_order_seek = true;
// Although the v2 aggregator is what the level iterator(s) know about,
// the AddTombstones calls will be propagated down to the v1 aggregator.
- std::unique_ptr<InternalIterator> input(versions_->MakeInputIterator(
- sub_compact->compaction, &range_del_agg, file_options_for_read_));
+ std::unique_ptr<InternalIterator> input(
+ versions_->MakeInputIterator(read_options, sub_compact->compaction,
+ &range_del_agg, file_options_for_read_));
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
snapshot_checker_, compact_->compaction->level(),
db_options_.statistics.get());
+ const MutableCFOptions* mutable_cf_options =
+ sub_compact->compaction->mutable_cf_options();
+ assert(mutable_cf_options);
+
+ std::vector<std::string> blob_file_paths;
+
+ std::unique_ptr<BlobFileBuilder> blob_file_builder(
+ mutable_cf_options->enable_blob_files
+ ? new BlobFileBuilder(
+ versions_, env_, fs_.get(),
+ sub_compact->compaction->immutable_cf_options(),
+ mutable_cf_options, &file_options_, job_id_, cfd->GetID(),
+ cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_,
+ &blob_file_paths, &sub_compact->blob_file_additions)
+ : nullptr);
+
TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:1",
reinterpret_cast<void*>(
- const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
+ const_cast<std::atomic<int>*>(manual_compaction_paused_)));
Slice* start = sub_compact->start;
Slice* end = sub_compact->end;
}
Status status;
+ const std::string* const full_history_ts_low =
+ full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
sub_compact->c_iter.reset(new CompactionIterator(
input.get(), cfd->user_comparator(), &merge, versions_->LastSequence(),
&existing_snapshots_, earliest_write_conflict_snapshot_,
- snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_), false,
- &range_del_agg, sub_compact->compaction, compaction_filter,
- shutting_down_, preserve_deletes_seqnum_, manual_compaction_paused_,
- db_options_.info_log));
+ snapshot_checker_, env_, ShouldReportDetailedTime(env_, stats_),
+ /*expect_valid_internal_key=*/true, &range_del_agg,
+ blob_file_builder.get(), db_options_.allow_data_in_errors,
+ sub_compact->compaction, compaction_filter, shutting_down_,
+ preserve_deletes_seqnum_, manual_compaction_paused_, db_options_.info_log,
+ full_history_ts_low));
auto c_iter = sub_compact->c_iter.get();
c_iter->SeekToFirst();
if (c_iter->Valid() && sub_compact->compaction->output_level() != 0) {
}
const auto& c_iter_stats = c_iter->iter_stats();
+ std::unique_ptr<SstPartitioner> partitioner =
+ sub_compact->compaction->output_level() == 0
+ ? nullptr
+ : sub_compact->compaction->CreateSstPartitioner();
+ std::string last_key_for_partitioner;
+
while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
// Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
// returns true.
break;
}
}
- assert(sub_compact->builder != nullptr);
- assert(sub_compact->current_output() != nullptr);
- sub_compact->builder->Add(key, value);
- sub_compact->current_output_file_size = sub_compact->builder->FileSize();
+ status = sub_compact->AddToBuilder(key, value);
+ if (!status.ok()) {
+ break;
+ }
+
+ sub_compact->current_output_file_size =
+ sub_compact->builder->EstimatedFileSize();
const ParsedInternalKey& ikey = c_iter->ikey();
sub_compact->current_output()->meta.UpdateBoundaries(
key, value, ikey.sequence, ikey.type);
// going to be 1.2MB and max_output_file_size = 1MB, prefer to have 0.6MB
// and 0.6MB instead of 1MB and 0.2MB)
bool output_file_ended = false;
- Status input_status;
if (sub_compact->compaction->output_level() != 0 &&
sub_compact->current_output_file_size >=
sub_compact->compaction->max_output_file_size()) {
// (1) this key terminates the file. For historical reasons, the iterator
// status before advancing will be given to FinishCompactionOutputFile().
- input_status = input->status();
output_file_ended = true;
}
TEST_SYNC_POINT_CALLBACK(
"CompactionJob::Run():PausingManualCompaction:2",
reinterpret_cast<void*>(
- const_cast<std::atomic<bool>*>(manual_compaction_paused_)));
+ const_cast<std::atomic<int>*>(manual_compaction_paused_)));
+ if (partitioner.get()) {
+ last_key_for_partitioner.assign(c_iter->user_key().data_,
+ c_iter->user_key().size_);
+ }
c_iter->Next();
if (c_iter->status().IsManualCompactionPaused()) {
break;
}
- if (!output_file_ended && c_iter->Valid() &&
- sub_compact->compaction->output_level() != 0 &&
- sub_compact->ShouldStopBefore(c_iter->key(),
- sub_compact->current_output_file_size) &&
- sub_compact->builder != nullptr) {
- // (2) this key belongs to the next file. For historical reasons, the
- // iterator status after advancing will be given to
- // FinishCompactionOutputFile().
- input_status = input->status();
- output_file_ended = true;
+ if (!output_file_ended && c_iter->Valid()) {
+ if (((partitioner.get() &&
+ partitioner->ShouldPartition(PartitionerRequest(
+ last_key_for_partitioner, c_iter->user_key(),
+ sub_compact->current_output_file_size)) == kRequired) ||
+ (sub_compact->compaction->output_level() != 0 &&
+ sub_compact->ShouldStopBefore(
+ c_iter->key(), sub_compact->current_output_file_size))) &&
+ sub_compact->builder != nullptr) {
+ // (2) this key belongs to the next file. For historical reasons, the
+ // iterator status after advancing will be given to
+ // FinishCompactionOutputFile().
+ output_file_ended = true;
+ }
}
if (output_file_ended) {
const Slice* next_key = nullptr;
next_key = &c_iter->key();
}
CompactionIterationStats range_del_out_stats;
- status =
- FinishCompactionOutputFile(input_status, sub_compact, &range_del_agg,
- &range_del_out_stats, next_key);
+ status = FinishCompactionOutputFile(input->status(), sub_compact,
+ &range_del_agg, &range_del_out_stats,
+ next_key);
RecordDroppedKeys(range_del_out_stats,
&sub_compact->compaction_job_stats);
}
}
if ((status.ok() || status.IsColumnFamilyDropped()) &&
(manual_compaction_paused_ &&
- manual_compaction_paused_->load(std::memory_order_relaxed))) {
+ manual_compaction_paused_->load(std::memory_order_relaxed) > 0)) {
status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
}
if (status.ok()) {
CompactionIterationStats range_del_out_stats;
Status s = FinishCompactionOutputFile(status, sub_compact, &range_del_agg,
&range_del_out_stats);
- if (status.ok()) {
+ if (!s.ok() && status.ok()) {
status = s;
}
RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
}
+ if (blob_file_builder) {
+ if (status.ok()) {
+ status = blob_file_builder->Finish();
+ }
+
+ blob_file_builder.reset();
+ }
+
sub_compact->compaction_job_stats.cpu_micros =
env_->NowCPUNanos() / 1000 - prev_cpu_micros;
SetPerfLevel(prev_perf_level);
}
}
+#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
+ if (!status.ok()) {
+ if (sub_compact->c_iter) {
+ sub_compact->c_iter->status().PermitUncheckedError();
+ }
+ if (input) {
+ input->status().PermitUncheckedError();
+ }
+ }
+#endif // ROCKSDB_ASSERT_STATUS_CHECKED
sub_compact->c_iter.reset();
input.reset();
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
const Comparator* ucmp = cfd->user_comparator();
+ std::string file_checksum = kUnknownFileChecksum;
+ std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
// Check for iterator errors
Status s = input_status;
} else {
it->SeekToFirst();
}
+ TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
for (; it->Valid(); it->Next()) {
auto tombstone = it->Tombstone();
if (upper_bound != nullptr) {
auto kv = tombstone.Serialize();
assert(lower_bound == nullptr ||
ucmp->Compare(*lower_bound, kv.second) < 0);
+ // Range tombstone is not supported by output validator yet.
sub_compact->builder->Add(kv.first.Encode(), kv.second);
InternalKey smallest_candidate = std::move(kv.first);
if (lower_bound != nullptr &&
ExtractInternalKeyFooter(meta->smallest.Encode()) !=
PackSequenceAndType(0, kTypeRangeDeletion));
}
- meta->marked_for_compaction = sub_compact->builder->NeedCompact();
}
const uint64_t current_entries = sub_compact->builder->NumEntries();
if (s.ok()) {
} else {
sub_compact->builder->Abandon();
}
+ IOStatus io_s = sub_compact->builder->io_status();
+ if (s.ok()) {
+ s = io_s;
+ }
const uint64_t current_bytes = sub_compact->builder->FileSize();
if (s.ok()) {
- // Add the checksum information to file metadata.
- meta->file_checksum = sub_compact->builder->GetFileChecksum();
- meta->file_checksum_func_name =
- sub_compact->builder->GetFileChecksumFuncName();
-
meta->fd.file_size = current_bytes;
+ meta->marked_for_compaction = sub_compact->builder->NeedCompact();
}
sub_compact->current_output()->finished = true;
sub_compact->total_bytes += current_bytes;
// Finish and check for file errors
if (s.ok()) {
StopWatch sw(env_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
- s = sub_compact->outfile->Sync(db_options_.use_fsync);
+ io_s = sub_compact->outfile->Sync(db_options_.use_fsync);
+ }
+ if (s.ok() && io_s.ok()) {
+ io_s = sub_compact->outfile->Close();
+ }
+ if (s.ok() && io_s.ok()) {
+ // Add the checksum information to file metadata.
+ meta->file_checksum = sub_compact->outfile->GetFileChecksum();
+ meta->file_checksum_func_name =
+ sub_compact->outfile->GetFileChecksumFuncName();
+ file_checksum = meta->file_checksum;
+ file_checksum_func_name = meta->file_checksum_func_name;
}
if (s.ok()) {
- s = sub_compact->outfile->Close();
+ s = io_s;
+ }
+ if (sub_compact->io_status.ok()) {
+ sub_compact->io_status = io_s;
+ // Since this error is really a copy of the
+ // "normal" status, it does not also need to be checked
+ sub_compact->io_status.PermitUncheckedError();
}
sub_compact->outfile.reset();
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(), fname,
job_id_, output_fd, oldest_blob_file_number, tp,
- TableFileCreationReason::kCompaction, s);
+ TableFileCreationReason::kCompaction, s, file_checksum,
+ file_checksum_func_name);
#ifndef ROCKSDB_LITE
// Report new file to SstFileManagerImpl
auto sfm =
static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
- sfm->OnAddFile(fname);
+ Status add_s = sfm->OnAddFile(fname);
+ if (!add_s.ok() && s.ok()) {
+ s = add_s;
+ }
if (sfm->IsMaxAllowedSpaceReached()) {
// TODO(ajkr): should we return OK() if max space was reached by the final
// compaction output file (similarly to how flush works when full)?
"CompactionJob::FinishCompactionOutputFile:"
"MaxAllowedSpaceReached");
InstrumentedMutexLock l(db_mutex_);
- db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
+ // Should handle return error?
+ db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction)
+ .PermitUncheckedError();
}
}
#endif
Status CompactionJob::InstallCompactionResults(
const MutableCFOptions& mutable_cf_options) {
+ assert(compact_);
+
db_mutex_->AssertHeld();
auto* compaction = compact_->compaction;
+ assert(compaction);
+
// paranoia: verify that the files that we started with
// still exist in the current version and in the same original level.
// This ensures that a concurrent compaction did not erroneously
{
Compaction::InputLevelSummaryBuffer inputs_summary;
- ROCKS_LOG_INFO(
- db_options_.info_log, "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
- compaction->column_family_data()->GetName().c_str(), job_id_,
- compaction->InputLevelSummary(&inputs_summary), compact_->total_bytes);
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
+ compaction->column_family_data()->GetName().c_str(), job_id_,
+ compaction->InputLevelSummary(&inputs_summary),
+ compact_->total_bytes + compact_->total_blob_bytes);
}
+ VersionEdit* const edit = compaction->edit();
+ assert(edit);
+
// Add compaction inputs
- compaction->AddInputDeletions(compact_->compaction->edit());
+ compaction->AddInputDeletions(edit);
for (const auto& sub_compact : compact_->sub_compact_states) {
for (const auto& out : sub_compact.outputs) {
- compaction->edit()->AddFile(compaction->output_level(), out.meta);
+ edit->AddFile(compaction->output_level(), out.meta);
+ }
+
+ for (const auto& blob : sub_compact.blob_file_additions) {
+ edit->AddBlobFile(blob);
}
}
+
return versions_->LogAndApply(compaction->column_family_data(),
- mutable_cf_options, compaction->edit(),
- db_mutex_, db_directory_);
+ mutable_cf_options, edit, db_mutex_,
+ db_directory_);
}
void CompactionJob::RecordCompactionIOStats() {
RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
+ RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
+ CompactionReason compaction_reason =
+ compact_->compaction->compaction_reason();
+ if (compaction_reason == CompactionReason::kFilesMarkedForCompaction) {
+ RecordTick(stats_, COMPACT_READ_BYTES_MARKED, IOSTATS(bytes_read));
+ RecordTick(stats_, COMPACT_WRITE_BYTES_MARKED, IOSTATS(bytes_written));
+ } else if (compaction_reason == CompactionReason::kPeriodicCompaction) {
+ RecordTick(stats_, COMPACT_READ_BYTES_PERIODIC, IOSTATS(bytes_read));
+ RecordTick(stats_, COMPACT_WRITE_BYTES_PERIODIC, IOSTATS(bytes_written));
+ } else if (compaction_reason == CompactionReason::kTtl) {
+ RecordTick(stats_, COMPACT_READ_BYTES_TTL, IOSTATS(bytes_read));
+ RecordTick(stats_, COMPACT_WRITE_BYTES_TTL, IOSTATS(bytes_written));
+ }
ThreadStatusUtil::IncreaseThreadOperationProperty(
ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
IOSTATS_RESET(bytes_read);
- RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
ThreadStatusUtil::IncreaseThreadOperationProperty(
ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
IOSTATS_RESET(bytes_written);
TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
&syncpoint_arg);
#endif
- Status s = NewWritableFile(fs_, fname, &writable_file, file_options_);
+ Status s;
+ IOStatus io_s =
+ NewWritableFile(fs_.get(), fname, &writable_file, file_options_);
+ s = io_s;
+ if (sub_compact->io_status.ok()) {
+ sub_compact->io_status = io_s;
+ // Since this error is really a copy of the io_s that is checked below as s,
+ // it does not also need to be checked.
+ sub_compact->io_status.PermitUncheckedError();
+ }
if (!s.ok()) {
ROCKS_LOG_ERROR(
db_options_.info_log,
EventHelpers::LogAndNotifyTableFileCreationFinished(
event_logger_, cfd->ioptions()->listeners, dbname_, cfd->GetName(),
fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
- TableProperties(), TableFileCreationReason::kCompaction, s);
+ TableProperties(), TableFileCreationReason::kCompaction, s,
+ kUnknownFileChecksum, kUnknownFileChecksumFuncName);
return s;
}
// Initialize a SubcompactionState::Output and add it to sub_compact->outputs
{
- SubcompactionState::Output out;
- out.meta.fd = FileDescriptor(file_number,
- sub_compact->compaction->output_path_id(), 0);
- out.meta.oldest_ancester_time = oldest_ancester_time;
- out.meta.file_creation_time = current_time;
- out.finished = false;
- sub_compact->outputs.push_back(out);
+ FileMetaData meta;
+ meta.fd = FileDescriptor(file_number,
+ sub_compact->compaction->output_path_id(), 0);
+ meta.oldest_ancester_time = oldest_ancester_time;
+ meta.file_creation_time = current_time;
+ sub_compact->outputs.emplace_back(
+ std::move(meta), cfd->internal_comparator(),
+ /*enable_order_check=*/
+ sub_compact->compaction->mutable_cf_options()
+ ->check_flush_compaction_key_order,
+ /*enable_hash=*/paranoid_file_checks_);
}
writable_file->SetIOPriority(Env::IOPriority::IO_LOW);
sub_compact->compaction->OutputFilePreallocationSize()));
const auto& listeners =
sub_compact->compaction->immutable_cf_options()->listeners;
- sub_compact->outfile.reset(
- new WritableFileWriter(std::move(writable_file), fname, file_options_,
- env_, db_options_.statistics.get(), listeners,
- db_options_.sst_file_checksum_func.get()));
+ sub_compact->outfile.reset(new WritableFileWriter(
+ std::move(writable_file), fname, file_options_, env_, io_tracer_,
+ db_options_.statistics.get(), listeners,
+ db_options_.file_checksum_gen_factory.get()));
// If the Column family flag is to only optimize filters for hits,
// we can skip creating filters if this is the bottommost_level where
sub_compact->compaction->output_compression_opts(),
sub_compact->compaction->output_level(), skip_filters,
oldest_ancester_time, 0 /* oldest_key_time */,
- sub_compact->compaction->max_output_file_size(), current_time));
+ sub_compact->compaction->max_output_file_size(), current_time, db_id_,
+ db_session_id_));
LogFlush(db_options_.info_log);
return s;
}
TableCache::Evict(table_cache_.get(), out.meta.fd.GetNumber());
}
}
+ // TODO: sub_compact.io_status is not checked like status. Not sure if thats
+ // intentional. So ignoring the io_status as of now.
+ sub_compact.io_status.PermitUncheckedError();
}
delete compact_;
compact_ = nullptr;
#endif // !ROCKSDB_LITE
void CompactionJob::UpdateCompactionStats() {
+ assert(compact_);
+
Compaction* compaction = compact_->compaction;
compaction_stats_.num_input_files_in_non_output_levels = 0;
compaction_stats_.num_input_files_in_output_level = 0;
}
}
- uint64_t num_output_records = 0;
-
- for (const auto& sub_compact : compact_->sub_compact_states) {
- size_t num_output_files = sub_compact.outputs.size();
- if (sub_compact.builder != nullptr) {
- // An error occurred so ignore the last output.
- assert(num_output_files > 0);
- --num_output_files;
- }
- compaction_stats_.num_output_files += static_cast<int>(num_output_files);
-
- num_output_records += sub_compact.num_output_records;
+ compaction_stats_.num_output_files =
+ static_cast<int>(compact_->num_output_files) +
+ static_cast<int>(compact_->num_blob_output_files);
+ compaction_stats_.bytes_written =
+ compact_->total_bytes + compact_->total_blob_bytes;
- for (const auto& out : sub_compact.outputs) {
- compaction_stats_.bytes_written += out.meta.fd.file_size;
- }
- }
-
- if (compaction_stats_.num_input_records > num_output_records) {
+ if (compaction_stats_.num_input_records > compact_->num_output_records) {
compaction_stats_.num_dropped_records =
- compaction_stats_.num_input_records - num_output_records;
+ compaction_stats_.num_input_records - compact_->num_output_records;
}
}
void CompactionJob::UpdateCompactionJobStats(
const InternalStats::CompactionStats& stats) const {
#ifndef ROCKSDB_LITE
- if (compaction_job_stats_) {
- compaction_job_stats_->elapsed_micros = stats.micros;
-
- // input information
- compaction_job_stats_->total_input_bytes =
- stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
- compaction_job_stats_->num_input_records = stats.num_input_records;
- compaction_job_stats_->num_input_files =
- stats.num_input_files_in_non_output_levels +
- stats.num_input_files_in_output_level;
- compaction_job_stats_->num_input_files_at_output_level =
- stats.num_input_files_in_output_level;
-
- // output information
- compaction_job_stats_->total_output_bytes = stats.bytes_written;
- compaction_job_stats_->num_output_records = compact_->num_output_records;
- compaction_job_stats_->num_output_files = stats.num_output_files;
-
- if (compact_->NumOutputFiles() > 0U) {
- CopyPrefix(compact_->SmallestUserKey(),
- CompactionJobStats::kMaxPrefixLength,
- &compaction_job_stats_->smallest_output_key_prefix);
- CopyPrefix(compact_->LargestUserKey(),
- CompactionJobStats::kMaxPrefixLength,
- &compaction_job_stats_->largest_output_key_prefix);
- }
+ compaction_job_stats_->elapsed_micros = stats.micros;
+
+ // input information
+ compaction_job_stats_->total_input_bytes =
+ stats.bytes_read_non_output_levels + stats.bytes_read_output_level;
+ compaction_job_stats_->num_input_records = stats.num_input_records;
+ compaction_job_stats_->num_input_files =
+ stats.num_input_files_in_non_output_levels +
+ stats.num_input_files_in_output_level;
+ compaction_job_stats_->num_input_files_at_output_level =
+ stats.num_input_files_in_output_level;
+
+ // output information
+ compaction_job_stats_->total_output_bytes = stats.bytes_written;
+ compaction_job_stats_->num_output_records = compact_->num_output_records;
+ compaction_job_stats_->num_output_files = stats.num_output_files;
+
+ if (stats.num_output_files > 0) {
+ CopyPrefix(compact_->SmallestUserKey(),
+ CompactionJobStats::kMaxPrefixLength,
+ &compaction_job_stats_->smallest_output_key_prefix);
+ CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength,
+ &compaction_job_stats_->largest_output_key_prefix);
}
#else
(void)stats;