// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
+#include "db/compaction/compaction_iterator.h"
+
#include <cinttypes>
-#include "db/compaction/compaction_iterator.h"
+#include "db/blob/blob_file_builder.h"
#include "db/snapshot_checker.h"
#include "port/likely.h"
#include "rocksdb/listener.h"
SequenceNumber earliest_write_conflict_snapshot,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
- CompactionRangeDelAggregator* range_del_agg, const Compaction* compaction,
- const CompactionFilter* compaction_filter,
+ CompactionRangeDelAggregator* range_del_agg,
+ BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
+ const Compaction* compaction, const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
- const std::atomic<bool>* manual_compaction_paused,
- const std::shared_ptr<Logger> info_log)
+ const std::atomic<int>* manual_compaction_paused,
+ const std::shared_ptr<Logger> info_log,
+ const std::string* full_history_ts_low)
: CompactionIterator(
input, cmp, merge_helper, last_sequence, snapshots,
earliest_write_conflict_snapshot, snapshot_checker, env,
report_detailed_time, expect_valid_internal_key, range_del_agg,
+ blob_file_builder, allow_data_in_errors,
std::unique_ptr<CompactionProxy>(
- compaction ? new CompactionProxy(compaction) : nullptr),
+ compaction ? new RealCompaction(compaction) : nullptr),
compaction_filter, shutting_down, preserve_deletes_seqnum,
- manual_compaction_paused, info_log) {}
+ manual_compaction_paused, info_log, full_history_ts_low) {}
CompactionIterator::CompactionIterator(
InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
const SnapshotChecker* snapshot_checker, Env* env,
bool report_detailed_time, bool expect_valid_internal_key,
CompactionRangeDelAggregator* range_del_agg,
+ BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
std::unique_ptr<CompactionProxy> compaction,
const CompactionFilter* compaction_filter,
const std::atomic<bool>* shutting_down,
const SequenceNumber preserve_deletes_seqnum,
- const std::atomic<bool>* manual_compaction_paused,
- const std::shared_ptr<Logger> info_log)
+ const std::atomic<int>* manual_compaction_paused,
+ const std::shared_ptr<Logger> info_log,
+ const std::string* full_history_ts_low)
: input_(input),
cmp_(cmp),
merge_helper_(merge_helper),
report_detailed_time_(report_detailed_time),
expect_valid_internal_key_(expect_valid_internal_key),
range_del_agg_(range_del_agg),
+ blob_file_builder_(blob_file_builder),
compaction_(std::move(compaction)),
compaction_filter_(compaction_filter),
shutting_down_(shutting_down),
manual_compaction_paused_(manual_compaction_paused),
preserve_deletes_seqnum_(preserve_deletes_seqnum),
+ info_log_(info_log),
+ allow_data_in_errors_(allow_data_in_errors),
+ timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0),
+ full_history_ts_low_(full_history_ts_low),
current_user_key_sequence_(0),
current_user_key_snapshot_(0),
merge_out_iter_(merge_helper_),
current_key_committed_(false),
- info_log_(info_log) {
+ cmp_with_history_ts_low_(0) {
assert(compaction_filter_ == nullptr || compaction_ != nullptr);
assert(snapshots_ != nullptr);
- bottommost_level_ =
- compaction_ == nullptr ? false : compaction_->bottommost_level();
+ bottommost_level_ = compaction_ == nullptr
+ ? false
+ : compaction_->bottommost_level() &&
+ !compaction_->allow_ingest_behind();
if (compaction_ != nullptr) {
level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
}
for (size_t i = 1; i < snapshots_->size(); ++i) {
assert(snapshots_->at(i - 1) < snapshots_->at(i));
}
+ assert(timestamp_size_ == 0 || !full_history_ts_low_ ||
+ timestamp_size_ == full_history_ts_low_->size());
#endif
input_->SetPinnedItersMgr(&pinned_iters_mgr_);
TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get());
if (merge_out_iter_.Valid()) {
key_ = merge_out_iter_.key();
value_ = merge_out_iter_.value();
- bool valid_key __attribute__((__unused__));
- valid_key = ParseInternalKey(key_, &ikey_);
+ Status s = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to be valid.
- assert(valid_key);
- if (!valid_key) {
- ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction",
- key_.ToString(true).c_str());
+ assert(s.ok());
+ if (!s.ok()) {
+ ROCKS_LOG_FATAL(info_log_, "Invalid key in compaction. %s",
+ s.getState());
}
// Keep current_key_ in sync.
PrepareOutput();
}
-void CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
+bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
Slice* skip_until) {
if (compaction_filter_ != nullptr &&
(ikey_.type == kTypeValue || ikey_.type == kTypeBlobIndex)) {
value_.clear();
iter_stats_.num_record_drop_user++;
} else if (filter == CompactionFilter::Decision::kChangeValue) {
+ if (ikey_.type == kTypeBlobIndex) {
+ // value transfer from blob file to inlined data
+ ikey_.type = kTypeValue;
+ current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+ }
value_ = compaction_filter_value_;
} else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) {
*need_skip = true;
compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber,
kValueTypeForSeek);
*skip_until = compaction_filter_skip_until_.Encode();
+ } else if (filter == CompactionFilter::Decision::kChangeBlobIndex) {
+ if (ikey_.type == kTypeValue) {
+ // value transfer from inlined data to blob file
+ ikey_.type = kTypeBlobIndex;
+ current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+ }
+ value_ = compaction_filter_value_;
+ } else if (filter == CompactionFilter::Decision::kIOError) {
+ status_ =
+ Status::IOError("Failed to access blob during compaction filter");
+ return false;
}
}
+ return true;
}
void CompactionIterator::NextFromInput() {
value_ = input_->value();
iter_stats_.num_input_records++;
- if (!ParseInternalKey(key_, &ikey_)) {
+ Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
+ if (!pik_status.ok()) {
+ iter_stats_.num_input_corrupt_records++;
+
// If `expect_valid_internal_key_` is false, return the corrupted key
// and let the caller decide what to do with it.
- // TODO(noetzli): We should have a more elegant solution for this.
if (expect_valid_internal_key_) {
- assert(!"Corrupted internal key not expected.");
- status_ = Status::Corruption("Corrupted internal key not expected.");
- break;
+ status_ = pik_status;
+ return;
}
key_ = current_key_.SetInternalKey(key_);
has_current_user_key_ = false;
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
- iter_stats_.num_input_corrupt_records++;
valid_ = true;
break;
}
TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_);
// Update input statistics
- if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
+ if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion ||
+ ikey_.type == kTypeDeletionWithTimestamp) {
iter_stats_.num_input_deletion_records++;
}
iter_stats_.total_input_raw_key_bytes += key_.size();
// merge_helper_->compaction_filter_skip_until_.
Slice skip_until;
+ int cmp_user_key_without_ts = 0;
+ int cmp_ts = 0;
+ if (has_current_user_key_) {
+ cmp_user_key_without_ts =
+ timestamp_size_
+ ? cmp_->CompareWithoutTimestamp(ikey_.user_key, current_user_key_)
+ : cmp_->Compare(ikey_.user_key, current_user_key_);
+ // if timestamp_size_ > 0, then curr_ts_ has been initialized by a
+ // previous key.
+ cmp_ts = timestamp_size_ ? cmp_->CompareTimestamp(
+ ExtractTimestampFromUserKey(
+ ikey_.user_key, timestamp_size_),
+ curr_ts_)
+ : 0;
+ }
+
// Check whether the user key changed. After this if statement current_key_
// is a copy of the current input key (maybe converted to a delete by the
// compaction filter). ikey_.user_key is pointing to the copy.
- if (!has_current_user_key_ ||
- !cmp_->Equal(ikey_.user_key, current_user_key_)) {
+ if (!has_current_user_key_ || cmp_user_key_without_ts != 0 || cmp_ts != 0) {
// First occurrence of this user key
// Copy key for output
key_ = current_key_.SetInternalKey(key_, &ikey_);
+
+ // If timestamp_size_ > 0, then copy from ikey_ to curr_ts_ for the use
+ // in next iteration to compare with the timestamp of next key.
+ UpdateTimestampAndCompareWithFullHistoryLow();
+
+ // If
+ // (1) !has_current_user_key_, OR
+ // (2) timestamp is disabled, OR
+ // (3) all history will be preserved, OR
+ // (4) user key (excluding timestamp) is different from previous key, OR
+ // (5) timestamp is NO older than *full_history_ts_low_
+ // then current_user_key_ must be treated as a different user key.
+ // This means, if a user key (excluding ts) is the same as the previous
+ // user key, and its ts is older than *full_history_ts_low_, then we
+ // consider this key for GC, e.g. it may be dropped if certain conditions
+ // match.
+ if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ ||
+ 0 != cmp_user_key_without_ts || cmp_with_history_ts_low_ >= 0) {
+ // Initialize for future comparison for rule (A) and etc.
+ current_user_key_sequence_ = kMaxSequenceNumber;
+ current_user_key_snapshot_ = 0;
+ has_current_user_key_ = true;
+ }
current_user_key_ = ikey_.user_key;
- has_current_user_key_ = true;
+
has_outputted_key_ = false;
- current_user_key_sequence_ = kMaxSequenceNumber;
- current_user_key_snapshot_ = 0;
+
current_key_committed_ = KeyCommitted(ikey_.sequence);
// Apply the compaction filter to the first committed version of the user
// key.
- if (current_key_committed_) {
- InvokeFilterIfNeeded(&need_skip, &skip_until);
+ if (current_key_committed_ &&
+ !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
+ break;
}
} else {
// Update the current key to reflect the new sequence number/type without
current_key_committed_ = KeyCommitted(ikey_.sequence);
// Apply the compaction filter to the first committed version of the
// user key.
- if (current_key_committed_) {
- InvokeFilterIfNeeded(&need_skip, &skip_until);
+ if (current_key_committed_ &&
+ !InvokeFilterIfNeeded(&need_skip, &skip_until)) {
+ break;
}
}
}
// If there are no snapshots, then this kv affect visibility at tip.
// Otherwise, search though all existing snapshots to find the earliest
// snapshot that is affected by this kv.
- SequenceNumber last_sequence __attribute__((__unused__));
- last_sequence = current_user_key_sequence_;
+ SequenceNumber last_sequence = current_user_key_sequence_;
current_user_key_sequence_ = ikey_.sequence;
SequenceNumber last_snapshot = current_user_key_snapshot_;
SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot
// Check whether the next key exists, is not corrupt, and is the same key
// as the single delete.
- if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
+ if (input_->Valid() &&
+ ParseInternalKey(input_->key(), &next_ikey, allow_data_in_errors_)
+ .ok() &&
cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
// Check whether the next key belongs to the same snapshot as the
// SingleDelete.
last_sequence, current_user_key_sequence_);
}
- ++iter_stats_.num_record_drop_hidden; // (A)
+ ++iter_stats_.num_record_drop_hidden; // rule (A)
input_->Next();
- } else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
+ } else if (compaction_ != nullptr &&
+ (ikey_.type == kTypeDeletion ||
+ (ikey_.type == kTypeDeletionWithTimestamp &&
+ cmp_with_history_ts_low_ < 0)) &&
IN_EARLIEST_SNAPSHOT(ikey_.sequence) &&
ikeyNotNeededForIncrementalSnapshot() &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
// given that:
// (1) The deletion is earlier than earliest_write_conflict_snapshot, and
// (2) No value exist earlier than the deletion.
+ //
+ // Note also that a deletion marker of type kTypeDeletionWithTimestamp
+ // will be treated as a different user key unless the timestamp is older
+ // than *full_history_ts_low_.
++iter_stats_.num_record_drop_obsolete;
if (!bottommost_level_) {
++iter_stats_.num_optimized_del_drop_obsolete;
}
input_->Next();
- } else if ((ikey_.type == kTypeDeletion) && bottommost_level_ &&
- ikeyNotNeededForIncrementalSnapshot()) {
+ } else if ((ikey_.type == kTypeDeletion ||
+ (ikey_.type == kTypeDeletionWithTimestamp &&
+ cmp_with_history_ts_low_ < 0)) &&
+ bottommost_level_ && ikeyNotNeededForIncrementalSnapshot()) {
// Handle the case where we have a delete key at the bottom most level
// We can skip outputting the key iff there are no subsequent puts for this
// key
+ assert(!compaction_ || compaction_->KeyNotExistsBeyondOutputLevel(
+ ikey_.user_key, &level_ptrs_));
ParsedInternalKey next_ikey;
input_->Next();
- // Skip over all versions of this key that happen to occur in the same snapshot
- // range as the delete
- while (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
- cmp_->Equal(ikey_.user_key, next_ikey.user_key) &&
+ // Skip over all versions of this key that happen to occur in the same
+ // snapshot range as the delete.
+ //
+ // Note that a deletion marker of type kTypeDeletionWithTimestamp will be
+ // considered to have a different user key unless the timestamp is older
+ // than *full_history_ts_low_.
+ while (!IsPausingManualCompaction() && !IsShuttingDown() &&
+ input_->Valid() &&
+ (ParseInternalKey(input_->key(), &next_ikey, allow_data_in_errors_)
+ .ok()) &&
+ 0 == cmp_->CompareWithoutTimestamp(ikey_.user_key,
+ next_ikey.user_key) &&
(prev_snapshot == 0 ||
DEFINITELY_NOT_IN_SNAPSHOT(next_ikey.sequence, prev_snapshot))) {
input_->Next();
}
// If you find you still need to output a row with this key, we need to output the
// delete too
- if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
- cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
+ if (input_->Valid() &&
+ (ParseInternalKey(input_->key(), &next_ikey, allow_data_in_errors_)
+ .ok()) &&
+ 0 == cmp_->CompareWithoutTimestamp(ikey_.user_key,
+ next_ikey.user_key)) {
valid_ = true;
at_next_ = true;
}
// have hit (A)
// We encapsulate the merge related state machine in a different
// object to minimize change to the existing flow.
- Status s = merge_helper_->MergeUntil(input_, range_del_agg_,
- prev_snapshot, bottommost_level_);
+ Status s =
+ merge_helper_->MergeUntil(input_, range_del_agg_, prev_snapshot,
+ bottommost_level_, allow_data_in_errors_);
merge_out_iter_.SeekToFirst();
if (!s.ok() && !s.IsMergeInProgress()) {
// These will be correctly set below.
key_ = merge_out_iter_.key();
value_ = merge_out_iter_.value();
- bool valid_key __attribute__((__unused__));
- valid_key = ParseInternalKey(key_, &ikey_);
+ pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_);
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to valid.
- assert(valid_key);
- if (!valid_key) {
- ROCKS_LOG_FATAL(info_log_, "Invalid key (%s) in compaction",
- key_.ToString(true).c_str());
+ assert(pik_status.ok());
+ if (!pik_status.ok()) {
+ ROCKS_LOG_FATAL(info_log_, "Invalid key in compaction. %s",
+ pik_status.getState());
}
// Keep current_key_ in sync.
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
void CompactionIterator::PrepareOutput() {
if (valid_) {
- if (compaction_filter_ && ikey_.type == kTypeBlobIndex) {
- const auto blob_decision = compaction_filter_->PrepareBlobOutput(
- user_key(), value_, &compaction_filter_value_);
-
- if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
- status_ = Status::Corruption(
- "Corrupted blob reference encountered during GC");
- valid_ = false;
- } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
- status_ = Status::IOError("Could not relocate blob during GC");
- valid_ = false;
- } else if (blob_decision ==
- CompactionFilter::BlobDecision::kChangeValue) {
- value_ = compaction_filter_value_;
+ if (ikey_.type == kTypeValue) {
+ if (blob_file_builder_) {
+ blob_index_.clear();
+ const Status s =
+ blob_file_builder_->Add(user_key(), value_, &blob_index_);
+
+ if (!s.ok()) {
+ status_ = s;
+ valid_ = false;
+ } else if (!blob_index_.empty()) {
+ value_ = blob_index_;
+ ikey_.type = kTypeBlobIndex;
+ current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
+ }
+ }
+ } else if (ikey_.type == kTypeBlobIndex) {
+ if (compaction_filter_) {
+ const auto blob_decision = compaction_filter_->PrepareBlobOutput(
+ user_key(), value_, &compaction_filter_value_);
+
+ if (blob_decision == CompactionFilter::BlobDecision::kCorruption) {
+ status_ = Status::Corruption(
+ "Corrupted blob reference encountered during GC");
+ valid_ = false;
+ } else if (blob_decision == CompactionFilter::BlobDecision::kIOError) {
+ status_ = Status::IOError("Could not relocate blob during GC");
+ valid_ = false;
+ } else if (blob_decision ==
+ CompactionFilter::BlobDecision::kChangeValue) {
+ value_ = compaction_filter_value_;
+ }
}
}
ikey_.type);
}
ikey_.sequence = 0;
- current_key_.UpdateInternalKey(0, ikey_.type);
+ if (!timestamp_size_) {
+ current_key_.UpdateInternalKey(0, ikey_.type);
+ } else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) {
+ // We can also zero out timestamp for better compression.
+ // For the same user key (excluding timestamp), the timestamp-based
+ // history can be collapsed to save some space if the timestamp is
+ // older than *full_history_ts_low_.
+ const std::string kTsMin(timestamp_size_, static_cast<char>(0));
+ const Slice ts_slice = kTsMin;
+ ikey_.SetTimestamp(ts_slice);
+ current_key_.UpdateInternalKey(0, ikey_.type, &ts_slice);
+ }
}
}
}