#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/pinned_iterators_manager.h"
+#include "db/range_tombstone_fragmenter.h"
#include "db/read_callback.h"
#include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h"
#include "util/autovector.h"
#include "util/coding.h"
#include "util/memory_usage.h"
-#include "util/murmurhash.h"
#include "util/mutexlock.h"
#include "util/util.h"
mutable_cf_options.memtable_prefix_bloom_size_ratio) *
8u),
memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size),
+ memtable_whole_key_filtering(
+ mutable_cf_options.memtable_whole_key_filtering),
inplace_update_support(ioptions.inplace_update_support),
inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
inplace_callback(ioptions.inplace_callback),
refs_(0),
kArenaBlockSize(OptimizeBlockSize(moptions_.arena_block_size)),
mem_tracker_(write_buffer_manager),
- arena_(
- moptions_.arena_block_size,
- (write_buffer_manager != nullptr && write_buffer_manager->enabled())
- ? &mem_tracker_
- : nullptr,
- mutable_cf_options.memtable_huge_page_size),
+ arena_(moptions_.arena_block_size,
+ (write_buffer_manager != nullptr &&
+ (write_buffer_manager->enabled() ||
+ write_buffer_manager->cost_to_cache()))
+ ? &mem_tracker_
+ : nullptr,
+ mutable_cf_options.memtable_huge_page_size),
table_(ioptions.memtable_factory->CreateMemTableRep(
comparator_, &arena_, mutable_cf_options.prefix_extractor.get(),
ioptions.info_log, column_family_id)),
env_(ioptions.env),
insert_with_hint_prefix_extractor_(
ioptions.memtable_insert_with_hint_prefix_extractor),
- oldest_key_time_(std::numeric_limits<uint64_t>::max()) {
+ oldest_key_time_(std::numeric_limits<uint64_t>::max()),
+ atomic_flush_seqno_(kMaxSequenceNumber) {
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());
- if (prefix_extractor_ && moptions_.memtable_prefix_bloom_bits > 0) {
- prefix_bloom_.reset(new DynamicBloom(
- &arena_, moptions_.memtable_prefix_bloom_bits, ioptions.bloom_locality,
- 6 /* hard coded 6 probes */, nullptr, moptions_.memtable_huge_page_size,
- ioptions.info_log));
+ // use bloom_filter_ for both whole key and prefix bloom filter
+ if ((prefix_extractor_ || moptions_.memtable_whole_key_filtering) &&
+ moptions_.memtable_prefix_bloom_bits > 0) {
+ bloom_filter_.reset(
+ new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits,
+ ioptions.bloom_locality, 6 /* hard coded 6 probes */,
+ moptions_.memtable_huge_page_size, ioptions.info_log));
}
}
if (use_range_del_table) {
iter_ = mem.range_del_table_->GetIterator(arena);
} else if (prefix_extractor_ != nullptr && !read_options.total_order_seek) {
- bloom_ = mem.prefix_bloom_.get();
+ bloom_ = mem.bloom_filter_.get();
iter_ = mem.table_->GetDynamicPrefixIterator(arena);
} else {
iter_ = mem.table_->GetIterator(arena);
}
}
- ~MemTableIterator() {
+ ~MemTableIterator() override {
#ifndef NDEBUG
// Assert that the MemTableIterator is never deleted while
// Pinning is Enabled.
}
#ifndef NDEBUG
- virtual void SetPinnedItersMgr(
- PinnedIteratorsManager* pinned_iters_mgr) override {
+ void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
pinned_iters_mgr_ = pinned_iters_mgr;
}
PinnedIteratorsManager* pinned_iters_mgr_ = nullptr;
#endif
- virtual bool Valid() const override { return valid_; }
- virtual void Seek(const Slice& k) override {
+ bool Valid() const override { return valid_; }
+ void Seek(const Slice& k) override {
PERF_TIMER_GUARD(seek_on_memtable_time);
PERF_COUNTER_ADD(seek_on_memtable_count, 1);
- if (bloom_ != nullptr) {
+ if (bloom_) {
+ // iterator should only use prefix bloom filter
if (!bloom_->MayContain(
prefix_extractor_->Transform(ExtractUserKey(k)))) {
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
iter_->Seek(k, nullptr);
valid_ = iter_->Valid();
}
- virtual void SeekForPrev(const Slice& k) override {
+ void SeekForPrev(const Slice& k) override {
PERF_TIMER_GUARD(seek_on_memtable_time);
PERF_COUNTER_ADD(seek_on_memtable_count, 1);
- if (bloom_ != nullptr) {
+ if (bloom_) {
if (!bloom_->MayContain(
prefix_extractor_->Transform(ExtractUserKey(k)))) {
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
Prev();
}
}
- virtual void SeekToFirst() override {
+ void SeekToFirst() override {
iter_->SeekToFirst();
valid_ = iter_->Valid();
}
- virtual void SeekToLast() override {
+ void SeekToLast() override {
iter_->SeekToLast();
valid_ = iter_->Valid();
}
- virtual void Next() override {
+ void Next() override {
PERF_COUNTER_ADD(next_on_memtable_count, 1);
assert(Valid());
iter_->Next();
valid_ = iter_->Valid();
}
- virtual void Prev() override {
+ void Prev() override {
PERF_COUNTER_ADD(prev_on_memtable_count, 1);
assert(Valid());
iter_->Prev();
valid_ = iter_->Valid();
}
- virtual Slice key() const override {
+ Slice key() const override {
assert(Valid());
return GetLengthPrefixedSlice(iter_->key());
}
- virtual Slice value() const override {
+ Slice value() const override {
assert(Valid());
Slice key_slice = GetLengthPrefixedSlice(iter_->key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
- virtual Status status() const override { return Status::OK(); }
+ Status status() const override { return Status::OK(); }
- virtual bool IsKeyPinned() const override {
+ bool IsKeyPinned() const override {
// memtable data is always pinned
return true;
}
- virtual bool IsValuePinned() const override {
+ bool IsValuePinned() const override {
// memtable value is always pinned, except if we allow inplace update.
return value_pinned_;
}
return new (mem) MemTableIterator(*this, read_options, arena);
}
-InternalIterator* MemTable::NewRangeTombstoneIterator(
- const ReadOptions& read_options) {
- if (read_options.ignore_range_deletions || is_range_del_table_empty_) {
+FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
+ const ReadOptions& read_options, SequenceNumber read_seq) {
+ if (read_options.ignore_range_deletions ||
+ is_range_del_table_empty_.load(std::memory_order_relaxed)) {
return nullptr;
}
- return new MemTableIterator(*this, read_options, nullptr /* arena */,
- true /* use_range_del_table */);
+ auto* unfragmented_iter = new MemTableIterator(
+ *this, read_options, nullptr /* arena */, true /* use_range_del_table */);
+ if (unfragmented_iter == nullptr) {
+ return nullptr;
+ }
+ auto fragmented_tombstone_list =
+ std::make_shared<FragmentedRangeTombstoneList>(
+ std::unique_ptr<InternalIterator>(unfragmented_iter),
+ comparator_.comparator);
+
+ auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
+ fragmented_tombstone_list, comparator_.comparator, read_seq);
+ return fragmented_iter;
}
port::RWMutex* MemTable::GetLock(const Slice& key) {
- static murmur_hash hash;
- return &locks_[hash(key) % locks_.size()];
+ return &locks_[static_cast<size_t>(GetSliceNPHash64(key)) % locks_.size()];
}
MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
std::memory_order_relaxed);
}
- if (prefix_bloom_) {
- assert(prefix_extractor_);
- prefix_bloom_->Add(prefix_extractor_->Transform(key));
+ if (bloom_filter_ && prefix_extractor_) {
+ bloom_filter_->Add(prefix_extractor_->Transform(key));
+ }
+ if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
+ bloom_filter_->Add(key);
}
// The first sequence number inserted into the memtable
post_process_info->num_deletes++;
}
- if (prefix_bloom_) {
- assert(prefix_extractor_);
- prefix_bloom_->AddConcurrently(prefix_extractor_->Transform(key));
+ if (bloom_filter_ && prefix_extractor_) {
+ bloom_filter_->AddConcurrently(prefix_extractor_->Transform(key));
+ }
+ if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
+ bloom_filter_->AddConcurrently(key);
}
// atomically update first_seqno_ and earliest_seqno_.
!first_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
}
}
- if (is_range_del_table_empty_ && type == kTypeRangeDeletion) {
- is_range_del_table_empty_ = false;
+ if (type == kTypeRangeDeletion) {
+ is_range_del_table_empty_.store(false, std::memory_order_relaxed);
}
UpdateOldestKeyTime();
return true;
const MergeOperator* merge_operator;
// the merge operations encountered;
MergeContext* merge_context;
- RangeDelAggregator* range_del_agg;
+ SequenceNumber max_covering_tombstone_seq;
MemTable* mem;
Logger* logger;
Statistics* statistics;
Saver* s = reinterpret_cast<Saver*>(arg);
assert(s != nullptr);
MergeContext* merge_context = s->merge_context;
- RangeDelAggregator* range_del_agg = s->range_del_agg;
+ SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
const MergeOperator* merge_operator = s->merge_operator;
- assert(merge_context != nullptr && range_del_agg != nullptr);
+ assert(merge_context != nullptr);
// entry format is:
// klength varint32
s->seq = seq;
if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) &&
- range_del_agg->ShouldDelete(Slice(key_ptr, key_length))) {
+ max_covering_tombstone_seq > seq) {
type = kTypeRangeDeletion;
}
switch (type) {
*(s->found_final_value) = true;
return false;
}
- FALLTHROUGH_INTENDED;
+ FALLTHROUGH_INTENDED;
case kTypeValue: {
if (s->inplace_update_support) {
s->mem->GetLock(s->key->user_key())->ReadLock();
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
MergeContext* merge_context,
- RangeDelAggregator* range_del_agg, SequenceNumber* seq,
- const ReadOptions& read_opts, ReadCallback* callback,
- bool* is_blob_index) {
+ SequenceNumber* max_covering_tombstone_seq,
+ SequenceNumber* seq, const ReadOptions& read_opts,
+ ReadCallback* callback, bool* is_blob_index) {
// The sequence number is updated synchronously in version_set.h
if (IsEmpty()) {
// Avoiding recording stats for speed.
}
PERF_TIMER_GUARD(get_from_memtable_time);
- std::unique_ptr<InternalIterator> range_del_iter(
- NewRangeTombstoneIterator(read_opts));
- Status status = range_del_agg->AddTombstones(std::move(range_del_iter));
- if (!status.ok()) {
- *s = status;
- return false;
+ std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
+ NewRangeTombstoneIterator(read_opts,
+ GetInternalKeySeqno(key.internal_key())));
+ if (range_del_iter != nullptr) {
+ *max_covering_tombstone_seq =
+ std::max(*max_covering_tombstone_seq,
+ range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()));
}
Slice user_key = key.user_key();
bool found_final_value = false;
bool merge_in_progress = s->IsMergeInProgress();
- bool const may_contain =
- nullptr == prefix_bloom_
- ? false
- : prefix_bloom_->MayContain(prefix_extractor_->Transform(user_key));
- if (prefix_bloom_ && !may_contain) {
+ bool may_contain = true;
+ if (bloom_filter_) {
+ // when both memtable_whole_key_filtering and prefix_extractor_ are set,
+ // only do whole key filtering for Get() to save CPU
+ if (moptions_.memtable_whole_key_filtering) {
+ may_contain = bloom_filter_->MayContain(user_key);
+ } else {
+ assert(prefix_extractor_);
+ may_contain =
+ bloom_filter_->MayContain(prefix_extractor_->Transform(user_key));
+ }
+ }
+ if (bloom_filter_ && !may_contain) {
// iter is null if prefix bloom says the key does not exist
PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
*seq = kMaxSequenceNumber;
} else {
- if (prefix_bloom_) {
+ if (bloom_filter_) {
PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
}
Saver saver;
saver.seq = kMaxSequenceNumber;
saver.mem = this;
saver.merge_context = merge_context;
- saver.range_del_agg = range_del_agg;
+ saver.max_covering_tombstone_seq = *max_covering_tombstone_seq;
saver.merge_operator = moptions_.merge_operator;
saver.logger = moptions_.info_log;
saver.inplace_update_support = moptions_.inplace_update_support;