#include "db/range_tombstone_fragmenter.h"
#include <algorithm>
+#include <cinttypes>
+#include <cstdio>
#include <functional>
#include <set>
-#include <stdio.h>
-#include <cinttypes>
-
#include "util/autovector.h"
#include "util/kv_map.h"
#include "util/vector_iterator.h"
return;
}
bool is_sorted = true;
- int num_tombstones = 0;
InternalKey pinned_last_start_key;
Slice last_start_key;
+ num_unfragmented_tombstones_ = 0;
+ total_tombstone_payload_bytes_ = 0;
for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
- unfragmented_tombstones->Next(), num_tombstones++) {
- if (num_tombstones > 0 &&
+ unfragmented_tombstones->Next(), num_unfragmented_tombstones_++) {
+ total_tombstone_payload_bytes_ += unfragmented_tombstones->key().size() +
+ unfragmented_tombstones->value().size();
+ if (num_unfragmented_tombstones_ > 0 &&
icmp.Compare(last_start_key, unfragmented_tombstones->key()) > 0) {
is_sorted = false;
break;
// Sort the tombstones before fragmenting them.
std::vector<std::string> keys, values;
- keys.reserve(num_tombstones);
- values.reserve(num_tombstones);
+ keys.reserve(num_unfragmented_tombstones_);
+ values.reserve(num_unfragmented_tombstones_);
+ // Reset the counter to zero for the next iteration over keys.
+ total_tombstone_payload_bytes_ = 0;
for (unfragmented_tombstones->SeekToFirst(); unfragmented_tombstones->Valid();
unfragmented_tombstones->Next()) {
+ total_tombstone_payload_bytes_ += unfragmented_tombstones->key().size() +
+ unfragmented_tombstones->value().size();
keys.emplace_back(unfragmented_tombstones->key().data(),
unfragmented_tombstones->key().size());
values.emplace_back(unfragmented_tombstones->value().data(),
unfragmented_tombstones->value().size());
}
// VectorIterator implicitly sorts by key during construction.
- auto iter = std::unique_ptr<VectorIterator>(
- new VectorIterator(std::move(keys), std::move(values), &icmp));
+ auto iter = std::make_unique<VectorIterator>(std::move(keys),
+ std::move(values), &icmp);
FragmentTombstones(std::move(iter), icmp, for_compaction, snapshots);
}
// for use in flush_current_tombstones.
std::set<ParsedInternalKey, ParsedInternalKeyComparator> cur_end_keys(cmp);
+ size_t ts_sz = icmp.user_comparator()->timestamp_size();
// Given the next start key in unfragmented_tombstones,
// flush_current_tombstones writes every tombstone fragment that starts
// and ends with a key before next_start_key, and starts with a key greater
bool reached_next_start_key = false;
for (; it != cur_end_keys.end() && !reached_next_start_key; ++it) {
Slice cur_end_key = it->user_key;
- if (icmp.user_comparator()->Compare(cur_start_key, cur_end_key) == 0) {
+ if (icmp.user_comparator()->CompareWithoutTimestamp(cur_start_key,
+ cur_end_key) == 0) {
// Empty tombstone.
continue;
}
- if (icmp.user_comparator()->Compare(next_start_key, cur_end_key) <= 0) {
- // All of the end keys in [it, cur_end_keys.end()) are after
+ if (icmp.user_comparator()->CompareWithoutTimestamp(next_start_key,
+ cur_end_key) <= 0) {
+ // All the end keys in [it, cur_end_keys.end()) are after
// next_start_key, so the tombstones they represent can be used in
// fragments that start with keys greater than or equal to
// next_start_key. However, the end keys we already passed will not be
// Flush a range tombstone fragment [cur_start_key, cur_end_key), which
// should not overlap with the last-flushed tombstone fragment.
assert(tombstones_.empty() ||
- icmp.user_comparator()->Compare(tombstones_.back().end_key,
- cur_start_key) <= 0);
+ icmp.user_comparator()->CompareWithoutTimestamp(
+ tombstones_.back().end_key, cur_start_key) <= 0);
// Sort the sequence numbers of the tombstones being fragmented in
// descending order, and then flush them in that order.
autovector<SequenceNumber> seqnums_to_flush;
+ autovector<Slice> timestamps_to_flush;
for (auto flush_it = it; flush_it != cur_end_keys.end(); ++flush_it) {
seqnums_to_flush.push_back(flush_it->sequence);
+ if (ts_sz) {
+ timestamps_to_flush.push_back(
+ ExtractTimestampFromUserKey(flush_it->user_key, ts_sz));
+ }
}
+ // TODO: bind the two sorting together to be more efficient
std::sort(seqnums_to_flush.begin(), seqnums_to_flush.end(),
std::greater<SequenceNumber>());
+ if (ts_sz) {
+ std::sort(timestamps_to_flush.begin(), timestamps_to_flush.end(),
+ [icmp](const Slice& ts1, const Slice& ts2) {
+ return icmp.user_comparator()->CompareTimestamp(ts1, ts2) >
+ 0;
+ });
+ }
size_t start_idx = tombstone_seqs_.size();
size_t end_idx = start_idx + seqnums_to_flush.size();
- if (for_compaction) {
+ // If user-defined timestamp is enabled, we should not drop tombstones
+ // from any snapshot stripe. Garbage collection of range tombstones
+ // happens in CompactionOutputs::AddRangeDels().
+ if (for_compaction && ts_sz == 0) {
// Drop all tombstone seqnums that are not preserved by a snapshot.
SequenceNumber next_snapshot = kMaxSequenceNumber;
for (auto seq : seqnums_to_flush) {
if (seq <= next_snapshot) {
// This seqnum is visible by a lower snapshot.
tombstone_seqs_.push_back(seq);
- seq_set_.insert(seq);
auto upper_bound_it =
std::lower_bound(snapshots.begin(), snapshots.end(), seq);
if (upper_bound_it == snapshots.begin()) {
// The fragmentation is being done for reads, so preserve all seqnums.
tombstone_seqs_.insert(tombstone_seqs_.end(), seqnums_to_flush.begin(),
seqnums_to_flush.end());
- seq_set_.insert(seqnums_to_flush.begin(), seqnums_to_flush.end());
+ if (ts_sz) {
+ tombstone_timestamps_.insert(tombstone_timestamps_.end(),
+ timestamps_to_flush.begin(),
+ timestamps_to_flush.end());
+ }
}
assert(start_idx < end_idx);
- tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx, end_idx);
+ if (ts_sz) {
+ std::string start_key_with_max_ts;
+ AppendUserKeyWithMaxTimestamp(&start_key_with_max_ts, cur_start_key,
+ ts_sz);
+ pinned_slices_.emplace_back(std::move(start_key_with_max_ts));
+ Slice start_key = pinned_slices_.back();
+
+ std::string end_key_with_max_ts;
+ AppendUserKeyWithMaxTimestamp(&end_key_with_max_ts, cur_end_key, ts_sz);
+ pinned_slices_.emplace_back(std::move(end_key_with_max_ts));
+ Slice end_key = pinned_slices_.back();
+
+ // RangeTombstoneStack expects start_key and end_key to have max
+ // timestamp.
+ tombstones_.emplace_back(start_key, end_key, start_idx, end_idx);
+ } else {
+ tombstones_.emplace_back(cur_start_key, cur_end_key, start_idx,
+ end_idx);
+ }
cur_start_key = cur_end_key;
}
tombstone_end_key.size());
tombstone_end_key = pinned_slices_.back();
}
- if (!cur_end_keys.empty() && icmp.user_comparator()->Compare(
- cur_start_key, tombstone_start_key) != 0) {
+ if (!cur_end_keys.empty() &&
+ icmp.user_comparator()->CompareWithoutTimestamp(
+ cur_start_key, tombstone_start_key) != 0) {
// The start key has changed. Flush all tombstones that start before
// this new start key.
flush_current_tombstones(tombstone_start_key);
}
bool FragmentedRangeTombstoneList::ContainsRange(SequenceNumber lower,
- SequenceNumber upper) const {
+ SequenceNumber upper) {
+ std::call_once(seq_set_init_once_flag_, [this]() {
+ for (auto s : tombstone_seqs_) {
+ seq_set_.insert(s);
+ }
+ });
auto seq_it = seq_set_.lower_bound(lower);
return seq_it != seq_set_.end() && *seq_it <= upper;
}
FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
- const FragmentedRangeTombstoneList* tombstones,
- const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
+ FragmentedRangeTombstoneList* tombstones, const InternalKeyComparator& icmp,
+ SequenceNumber _upper_bound, const Slice* ts_upper_bound,
SequenceNumber _lower_bound)
: tombstone_start_cmp_(icmp.user_comparator()),
tombstone_end_cmp_(icmp.user_comparator()),
ucmp_(icmp.user_comparator()),
tombstones_(tombstones),
upper_bound_(_upper_bound),
- lower_bound_(_lower_bound) {
+ lower_bound_(_lower_bound),
+ ts_upper_bound_(ts_upper_bound) {
assert(tombstones_ != nullptr);
Invalidate();
}
FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
- const std::shared_ptr<const FragmentedRangeTombstoneList>& tombstones,
+ const std::shared_ptr<FragmentedRangeTombstoneList>& tombstones,
const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
- SequenceNumber _lower_bound)
+ const Slice* ts_upper_bound, SequenceNumber _lower_bound)
: tombstone_start_cmp_(icmp.user_comparator()),
tombstone_end_cmp_(icmp.user_comparator()),
icmp_(&icmp),
tombstones_ref_(tombstones),
tombstones_(tombstones_ref_.get()),
upper_bound_(_upper_bound),
+ lower_bound_(_lower_bound),
+ ts_upper_bound_(ts_upper_bound) {
+ assert(tombstones_ != nullptr);
+ Invalidate();
+}
+
+FragmentedRangeTombstoneIterator::FragmentedRangeTombstoneIterator(
+ const std::shared_ptr<FragmentedRangeTombstoneListCache>& tombstones_cache,
+ const InternalKeyComparator& icmp, SequenceNumber _upper_bound,
+ const Slice* ts_upper_bound, SequenceNumber _lower_bound)
+ : tombstone_start_cmp_(icmp.user_comparator()),
+ tombstone_end_cmp_(icmp.user_comparator()),
+ icmp_(&icmp),
+ ucmp_(icmp.user_comparator()),
+ tombstones_cache_ref_(tombstones_cache),
+ tombstones_(tombstones_cache_ref_->tombstones.get()),
+ upper_bound_(_upper_bound),
lower_bound_(_lower_bound) {
assert(tombstones_ != nullptr);
+ if (!ts_upper_bound || ts_upper_bound->empty()) {
+ ts_upper_bound_ = nullptr;
+ } else {
+ ts_upper_bound_ = ts_upper_bound;
+ }
Invalidate();
}
return;
}
pos_ = tombstones_->begin();
- seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
- tombstones_->seq_iter(pos_->seq_end_idx),
- upper_bound_, std::greater<SequenceNumber>());
+ SetMaxVisibleSeqAndTimestamp();
ScanForwardToVisibleTombstone();
}
return;
}
pos_ = std::prev(tombstones_->end());
- seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
- tombstones_->seq_iter(pos_->seq_end_idx),
- upper_bound_, std::greater<SequenceNumber>());
+ SetMaxVisibleSeqAndTimestamp();
ScanBackwardToVisibleTombstone();
}
+// @param `target` is a user key, with timestamp if user-defined timestamp is
+// enabled.
void FragmentedRangeTombstoneIterator::Seek(const Slice& target) {
if (tombstones_->empty()) {
Invalidate();
seq_pos_ = tombstones_->seq_end();
return;
}
- seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
- tombstones_->seq_iter(pos_->seq_end_idx),
- upper_bound_, std::greater<SequenceNumber>());
+ SetMaxVisibleSeqAndTimestamp();
}
void FragmentedRangeTombstoneIterator::SeekForPrevToCoveringTombstone(
return;
}
--pos_;
- seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
- tombstones_->seq_iter(pos_->seq_end_idx),
- upper_bound_, std::greater<SequenceNumber>());
+ SetMaxVisibleSeqAndTimestamp();
}
void FragmentedRangeTombstoneIterator::ScanForwardToVisibleTombstone() {
Invalidate();
return;
}
- seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
- tombstones_->seq_iter(pos_->seq_end_idx),
- upper_bound_, std::greater<SequenceNumber>());
+ SetMaxVisibleSeqAndTimestamp();
}
}
return;
}
--pos_;
- seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
- tombstones_->seq_iter(pos_->seq_end_idx),
- upper_bound_, std::greater<SequenceNumber>());
+ SetMaxVisibleSeqAndTimestamp();
}
}
if (pos_ == tombstones_->end()) {
return;
}
- seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
- tombstones_->seq_iter(pos_->seq_end_idx),
- upper_bound_, std::greater<SequenceNumber>());
+ SetMaxVisibleSeqAndTimestamp();
ScanForwardToVisibleTombstone();
}
return;
}
--pos_;
- seq_pos_ = std::lower_bound(tombstones_->seq_iter(pos_->seq_start_idx),
- tombstones_->seq_iter(pos_->seq_end_idx),
- upper_bound_, std::greater<SequenceNumber>());
+ SetMaxVisibleSeqAndTimestamp();
ScanBackwardToVisibleTombstone();
}
SequenceNumber FragmentedRangeTombstoneIterator::MaxCoveringTombstoneSeqnum(
const Slice& target_user_key) {
SeekToCoveringTombstone(target_user_key);
- return ValidPos() && ucmp_->Compare(start_key(), target_user_key) <= 0 ? seq()
- : 0;
+ return ValidPos() && ucmp_->CompareWithoutTimestamp(start_key(),
+ target_user_key) <= 0
+ ? seq()
+ : 0;
}
std::map<SequenceNumber, std::unique_ptr<FragmentedRangeTombstoneIterator>>
upper = snapshots[i];
}
if (tombstones_->ContainsRange(lower, upper)) {
- splits.emplace(upper, std::unique_ptr<FragmentedRangeTombstoneIterator>(
- new FragmentedRangeTombstoneIterator(
- tombstones_, *icmp_, upper, lower)));
+ splits.emplace(upper,
+ std::make_unique<FragmentedRangeTombstoneIterator>(
+ tombstones_, *icmp_, upper, ts_upper_bound_, lower));
}
lower = upper + 1;
}