#include <utility>
-#include "file/file_util.h"
+#include "block_type.h"
+#include "file/random_access_file_reader.h"
+#include "logging/logging.h"
#include "monitoring/perf_context_imp.h"
#include "port/malloc.h"
#include "port/port.h"
true /*use_delta_encoding*/,
use_value_delta_encoding),
p_index_builder_(p_index_builder),
- keys_added_to_partition_(0) {
- keys_per_partition_ =
- filter_bits_builder_->CalculateNumEntry(partition_size);
+ keys_added_to_partition_(0),
+ total_added_in_built_(0) {
+ keys_per_partition_ = static_cast<uint32_t>(
+ filter_bits_builder_->ApproximateNumEntries(partition_size));
if (keys_per_partition_ < 1) {
// partition_size (minus buffer, ~10%) might be smaller than minimum
// filter size, sometimes based on cache line size. Try to find that
// minimum size without CalculateSpace (not necessarily available).
uint32_t larger = std::max(partition_size + 4, uint32_t{16});
for (;;) {
- keys_per_partition_ = filter_bits_builder_->CalculateNumEntry(larger);
+ keys_per_partition_ = static_cast<uint32_t>(
+ filter_bits_builder_->ApproximateNumEntries(larger));
if (keys_per_partition_ >= 1) {
break;
}
}
}
-PartitionedFilterBlockBuilder::~PartitionedFilterBlockBuilder() {}
+PartitionedFilterBlockBuilder::~PartitionedFilterBlockBuilder() {
+ partitioned_filters_construction_status_.PermitUncheckedError();
+}
void PartitionedFilterBlockBuilder::MaybeCutAFilterBlock(
const Slice* next_key) {
if (!p_index_builder_->ShouldCutFilterBlock()) {
return;
}
- filter_gc.push_back(std::unique_ptr<const char[]>(nullptr));
- // Add the prefix of the next key before finishing the partition. This hack,
- // fixes a bug with format_verison=3 where seeking for the prefix would lead
- // us to the previous partition.
- const bool add_prefix =
+ // Add the prefix of the next key before finishing the partition without
+ // updating last_prefix_str_. This hack, fixes a bug with format_verison=3
+ // where seeking for the prefix would lead us to the previous partition.
+ const bool maybe_add_prefix =
next_key && prefix_extractor() && prefix_extractor()->InDomain(*next_key);
- if (add_prefix) {
- FullFilterBlockBuilder::AddPrefix(*next_key);
+ if (maybe_add_prefix) {
+ const Slice next_key_prefix = prefix_extractor()->Transform(*next_key);
+ if (next_key_prefix.compare(last_prefix_str()) != 0) {
+ AddKey(next_key_prefix);
+ }
}
- Slice filter = filter_bits_builder_->Finish(&filter_gc.back());
+ total_added_in_built_ += filter_bits_builder_->EstimateEntriesAdded();
+ std::unique_ptr<const char[]> filter_data;
+ Status filter_construction_status = Status::OK();
+ Slice filter =
+ filter_bits_builder_->Finish(&filter_data, &filter_construction_status);
+ if (filter_construction_status.ok()) {
+ filter_construction_status = filter_bits_builder_->MaybePostVerify(filter);
+ }
std::string& index_key = p_index_builder_->GetPartitionKey();
- filters.push_back({index_key, filter});
+ filters.push_back({index_key, std::move(filter_data), filter});
+ if (!filter_construction_status.ok() &&
+ partitioned_filters_construction_status_.ok()) {
+ partitioned_filters_construction_status_ = filter_construction_status;
+ }
keys_added_to_partition_ = 0;
Reset();
}
keys_added_to_partition_++;
}
+size_t PartitionedFilterBlockBuilder::EstimateEntriesAdded() {
+ return total_added_in_built_ + filter_bits_builder_->EstimateEntriesAdded();
+}
+
Slice PartitionedFilterBlockBuilder::Finish(
- const BlockHandle& last_partition_block_handle, Status* status) {
+ const BlockHandle& last_partition_block_handle, Status* status,
+ std::unique_ptr<const char[]>* filter_data) {
if (finishing_filters == true) {
// Record the handle of the last written filter block in the index
- FilterEntry& last_entry = filters.front();
std::string handle_encoding;
last_partition_block_handle.EncodeTo(&handle_encoding);
std::string handle_delta_encoding;
last_partition_block_handle.size() - last_encoded_handle_.size());
last_encoded_handle_ = last_partition_block_handle;
const Slice handle_delta_encoding_slice(handle_delta_encoding);
- index_on_filter_block_builder_.Add(last_entry.key, handle_encoding,
+ index_on_filter_block_builder_.Add(last_filter_entry_key, handle_encoding,
&handle_delta_encoding_slice);
if (!p_index_builder_->seperator_is_key_plus_seq()) {
index_on_filter_block_builder_without_seq_.Add(
- ExtractUserKey(last_entry.key), handle_encoding,
+ ExtractUserKey(last_filter_entry_key), handle_encoding,
&handle_delta_encoding_slice);
}
- filters.pop_front();
} else {
MaybeCutAFilterBlock(nullptr);
}
+
+ if (!partitioned_filters_construction_status_.ok()) {
+ *status = partitioned_filters_construction_status_;
+ return Slice();
+ }
+
// If there is no filter partition left, then return the index on filter
// partitions
if (UNLIKELY(filters.empty())) {
*status = Status::OK();
+ last_filter_data.reset();
if (finishing_filters) {
+ // Simplest to just add them all at the end
+ total_added_in_built_ = 0;
if (p_index_builder_->seperator_is_key_plus_seq()) {
return index_on_filter_block_builder_.Finish();
} else {
// indicate we expect more calls to Finish
*status = Status::Incomplete();
finishing_filters = true;
- return filters.front().filter;
+
+ last_filter_entry_key = filters.front().key;
+ Slice filter = filters.front().filter;
+ last_filter_data = std::move(filters.front().filter_data);
+ if (filter_data != nullptr) {
+ *filter_data = std::move(last_filter_data);
+ }
+ filters.pop_front();
+ return filter;
}
}
CachableEntry<Block> filter_block;
if (prefetch || !use_cache) {
- const Status s = ReadFilterBlock(table, prefetch_buffer, ro, use_cache,
- nullptr /* get_context */, lookup_context,
- &filter_block);
+ const Status s = ReadFilterBlock(
+ table, prefetch_buffer, ro, use_cache, nullptr /* get_context */,
+ lookup_context, &filter_block, BlockType::kFilterPartitionIndex);
if (!s.ok()) {
IGNORE_STATUS_IF_ERROR(s);
return std::unique_ptr<FilterBlockReader>();
}
bool PartitionedFilterBlockReader::KeyMayMatch(
- const Slice& key, const SliceTransform* prefix_extractor,
- uint64_t block_offset, const bool no_io, const Slice* const const_ikey_ptr,
- GetContext* get_context, BlockCacheLookupContext* lookup_context) {
+ const Slice& key, const bool no_io, const Slice* const const_ikey_ptr,
+ GetContext* get_context, BlockCacheLookupContext* lookup_context,
+ Env::IOPriority rate_limiter_priority) {
assert(const_ikey_ptr != nullptr);
- assert(block_offset == kNotValid);
if (!whole_key_filtering()) {
return true;
}
- return MayMatch(key, prefix_extractor, block_offset, no_io, const_ikey_ptr,
- get_context, lookup_context,
- &FullFilterBlockReader::KeyMayMatch);
+ return MayMatch(key, no_io, const_ikey_ptr, get_context, lookup_context,
+ rate_limiter_priority, &FullFilterBlockReader::KeyMayMatch);
}
void PartitionedFilterBlockReader::KeysMayMatch(
- MultiGetRange* range, const SliceTransform* prefix_extractor,
- uint64_t block_offset, const bool no_io,
- BlockCacheLookupContext* lookup_context) {
- assert(block_offset == kNotValid);
+ MultiGetRange* range, const bool no_io,
+ BlockCacheLookupContext* lookup_context,
+ Env::IOPriority rate_limiter_priority) {
if (!whole_key_filtering()) {
return; // Any/all may match
}
- MayMatch(range, prefix_extractor, block_offset, no_io, lookup_context,
- &FullFilterBlockReader::KeysMayMatch);
+ MayMatch(range, nullptr, no_io, lookup_context, rate_limiter_priority,
+ &FullFilterBlockReader::KeysMayMatch2);
}
bool PartitionedFilterBlockReader::PrefixMayMatch(
- const Slice& prefix, const SliceTransform* prefix_extractor,
- uint64_t block_offset, const bool no_io, const Slice* const const_ikey_ptr,
- GetContext* get_context, BlockCacheLookupContext* lookup_context) {
+ const Slice& prefix, const bool no_io, const Slice* const const_ikey_ptr,
+ GetContext* get_context, BlockCacheLookupContext* lookup_context,
+ Env::IOPriority rate_limiter_priority) {
assert(const_ikey_ptr != nullptr);
- assert(block_offset == kNotValid);
- if (!table_prefix_extractor() && !prefix_extractor) {
- return true;
- }
-
- return MayMatch(prefix, prefix_extractor, block_offset, no_io, const_ikey_ptr,
- get_context, lookup_context,
+ return MayMatch(prefix, no_io, const_ikey_ptr, get_context, lookup_context,
+ rate_limiter_priority,
&FullFilterBlockReader::PrefixMayMatch);
}
void PartitionedFilterBlockReader::PrefixesMayMatch(
MultiGetRange* range, const SliceTransform* prefix_extractor,
- uint64_t block_offset, const bool no_io,
- BlockCacheLookupContext* lookup_context) {
- assert(block_offset == kNotValid);
- if (!table_prefix_extractor() && !prefix_extractor) {
- return; // Any/all may match
- }
-
- MayMatch(range, prefix_extractor, block_offset, no_io, lookup_context,
- &FullFilterBlockReader::PrefixesMayMatch);
+ const bool no_io, BlockCacheLookupContext* lookup_context,
+ Env::IOPriority rate_limiter_priority) {
+ assert(prefix_extractor);
+ MayMatch(range, prefix_extractor, no_io, lookup_context,
+ rate_limiter_priority, &FullFilterBlockReader::PrefixesMayMatch);
}
BlockHandle PartitionedFilterBlockReader::GetFilterPartitionHandle(
Statistics* kNullStats = nullptr;
filter_block.GetValue()->NewIndexIterator(
comparator->user_comparator(),
- table()->get_rep()->get_global_seqno(BlockType::kFilter), &iter,
- kNullStats, true /* total_order_seek */, false /* have_first_key */,
- index_key_includes_seq(), index_value_is_full());
+ table()->get_rep()->get_global_seqno(BlockType::kFilterPartitionIndex),
+ &iter, kNullStats, true /* total_order_seek */,
+ false /* have_first_key */, index_key_includes_seq(),
+ index_value_is_full());
iter.Seek(entry);
if (UNLIKELY(!iter.Valid())) {
// entry is larger than all the keys. However its prefix might still be
FilePrefetchBuffer* prefetch_buffer, const BlockHandle& fltr_blk_handle,
bool no_io, GetContext* get_context,
BlockCacheLookupContext* lookup_context,
+ Env::IOPriority rate_limiter_priority,
CachableEntry<ParsedFullFilterBlock>* filter_block) const {
assert(table());
assert(filter_block);
}
ReadOptions read_options;
+ read_options.rate_limiter_priority = rate_limiter_priority;
if (no_io) {
read_options.read_tier = kBlockCacheTier;
}
table()->RetrieveBlock(prefetch_buffer, read_options, fltr_blk_handle,
UncompressionDict::GetEmptyDict(), filter_block,
BlockType::kFilter, get_context, lookup_context,
- /* for_compaction */ false, /* use_cache */ true);
+ /* for_compaction */ false, /* use_cache */ true,
+ /* wait_for_cache */ true, /* async_read */ false);
return s;
}
bool PartitionedFilterBlockReader::MayMatch(
- const Slice& slice, const SliceTransform* prefix_extractor,
- uint64_t block_offset, bool no_io, const Slice* const_ikey_ptr,
+ const Slice& slice, bool no_io, const Slice* const_ikey_ptr,
GetContext* get_context, BlockCacheLookupContext* lookup_context,
+ Env::IOPriority rate_limiter_priority,
FilterFunction filter_function) const {
CachableEntry<Block> filter_block;
- Status s =
- GetOrReadFilterBlock(no_io, get_context, lookup_context, &filter_block);
+ Status s = GetOrReadFilterBlock(
+ no_io, get_context, lookup_context, &filter_block,
+ BlockType::kFilterPartitionIndex, rate_limiter_priority);
if (UNLIKELY(!s.ok())) {
IGNORE_STATUS_IF_ERROR(s);
return true;
CachableEntry<ParsedFullFilterBlock> filter_partition_block;
s = GetFilterPartitionBlock(nullptr /* prefetch_buffer */, filter_handle,
no_io, get_context, lookup_context,
- &filter_partition_block);
+ rate_limiter_priority, &filter_partition_block);
if (UNLIKELY(!s.ok())) {
IGNORE_STATUS_IF_ERROR(s);
return true;
FullFilterBlockReader filter_partition(table(),
std::move(filter_partition_block));
- return (filter_partition.*filter_function)(
- slice, prefix_extractor, block_offset, no_io, const_ikey_ptr, get_context,
- lookup_context);
+ return (filter_partition.*filter_function)(slice, no_io, const_ikey_ptr,
+ get_context, lookup_context,
+ rate_limiter_priority);
}
void PartitionedFilterBlockReader::MayMatch(
- MultiGetRange* range, const SliceTransform* prefix_extractor,
- uint64_t block_offset, bool no_io, BlockCacheLookupContext* lookup_context,
+ MultiGetRange* range, const SliceTransform* prefix_extractor, bool no_io,
+ BlockCacheLookupContext* lookup_context,
+ Env::IOPriority rate_limiter_priority,
FilterManyFunction filter_function) const {
CachableEntry<Block> filter_block;
- Status s = GetOrReadFilterBlock(no_io, range->begin()->get_context,
- lookup_context, &filter_block);
+ Status s = GetOrReadFilterBlock(
+ no_io, range->begin()->get_context, lookup_context, &filter_block,
+ BlockType::kFilterPartitionIndex, rate_limiter_priority);
if (UNLIKELY(!s.ok())) {
IGNORE_STATUS_IF_ERROR(s);
return; // Any/all may match
if (!prev_filter_handle.IsNull() &&
this_filter_handle != prev_filter_handle) {
MultiGetRange subrange(*range, start_iter_same_handle, iter);
- MayMatchPartition(&subrange, prefix_extractor, block_offset,
- prev_filter_handle, no_io, lookup_context,
- filter_function);
+ MayMatchPartition(&subrange, prefix_extractor, prev_filter_handle, no_io,
+ lookup_context, rate_limiter_priority, filter_function);
range->AddSkipsFrom(subrange);
start_iter_same_handle = iter;
}
}
if (!prev_filter_handle.IsNull()) {
MultiGetRange subrange(*range, start_iter_same_handle, range->end());
- MayMatchPartition(&subrange, prefix_extractor, block_offset,
- prev_filter_handle, no_io, lookup_context,
- filter_function);
+ MayMatchPartition(&subrange, prefix_extractor, prev_filter_handle, no_io,
+ lookup_context, rate_limiter_priority, filter_function);
range->AddSkipsFrom(subrange);
}
}
void PartitionedFilterBlockReader::MayMatchPartition(
MultiGetRange* range, const SliceTransform* prefix_extractor,
- uint64_t block_offset, BlockHandle filter_handle, bool no_io,
+ BlockHandle filter_handle, bool no_io,
BlockCacheLookupContext* lookup_context,
+ Env::IOPriority rate_limiter_priority,
FilterManyFunction filter_function) const {
CachableEntry<ParsedFullFilterBlock> filter_partition_block;
Status s = GetFilterPartitionBlock(
nullptr /* prefetch_buffer */, filter_handle, no_io,
- range->begin()->get_context, lookup_context, &filter_partition_block);
+ range->begin()->get_context, lookup_context, rate_limiter_priority,
+ &filter_partition_block);
if (UNLIKELY(!s.ok())) {
IGNORE_STATUS_IF_ERROR(s);
return; // Any/all may match
FullFilterBlockReader filter_partition(table(),
std::move(filter_partition_block));
- (filter_partition.*filter_function)(range, prefix_extractor, block_offset,
- no_io, lookup_context);
+ (filter_partition.*filter_function)(range, prefix_extractor, no_io,
+ lookup_context, rate_limiter_priority);
}
size_t PartitionedFilterBlockReader::ApproximateMemoryUsage() const {
CachableEntry<Block> filter_block;
Status s = GetOrReadFilterBlock(false /* no_io */, nullptr /* get_context */,
- &lookup_context, &filter_block);
+ &lookup_context, &filter_block,
+ BlockType::kFilterPartitionIndex,
+ ro.rate_limiter_priority);
if (!s.ok()) {
- ROCKS_LOG_ERROR(rep->ioptions.info_log,
+ ROCKS_LOG_ERROR(rep->ioptions.logger,
"Error retrieving top-level filter block while trying to "
"cache filter partitions: %s",
s.ToString().c_str());
const InternalKeyComparator* const comparator = internal_comparator();
Statistics* kNullStats = nullptr;
filter_block.GetValue()->NewIndexIterator(
- comparator->user_comparator(), rep->get_global_seqno(BlockType::kFilter),
- &biter, kNullStats, true /* total_order_seek */,
- false /* have_first_key */, index_key_includes_seq(),
- index_value_is_full());
+ comparator->user_comparator(),
+ rep->get_global_seqno(BlockType::kFilterPartitionIndex), &biter,
+ kNullStats, true /* total_order_seek */, false /* have_first_key */,
+ index_key_includes_seq(), index_value_is_full());
// Index partitions are assumed to be consecuitive. Prefetch them all.
// Read the first block offset
biter.SeekToFirst();
// Read the last block's offset
biter.SeekToLast();
handle = biter.value().handle;
- uint64_t last_off = handle.offset() + handle.size() + kBlockTrailerSize;
+ uint64_t last_off =
+ handle.offset() + handle.size() + BlockBasedTable::kBlockTrailerSize;
uint64_t prefetch_len = last_off - prefetch_off;
std::unique_ptr<FilePrefetchBuffer> prefetch_buffer;
- rep->CreateFilePrefetchBuffer(0, 0, &prefetch_buffer);
+ rep->CreateFilePrefetchBuffer(
+ 0, 0, &prefetch_buffer, false /* Implicit autoreadahead */,
+ 0 /*num_reads_*/, 0 /*num_file_reads_for_auto_readahead*/);
IOOptions opts;
- s = PrepareIOFromReadOptions(ro, rep->file->env(), opts);
+ s = rep->file->PrepareIOOptions(ro, opts);
if (s.ok()) {
s = prefetch_buffer->Prefetch(opts, rep->file.get(), prefetch_off,
- static_cast<size_t>(prefetch_len));
+ static_cast<size_t>(prefetch_len),
+ ro.rate_limiter_priority);
}
if (!s.ok()) {
return s;
// filter blocks
s = table()->MaybeReadBlockAndLoadToCache(
prefetch_buffer.get(), ro, handle, UncompressionDict::GetEmptyDict(),
- &block, BlockType::kFilter, nullptr /* get_context */, &lookup_context,
- nullptr /* contents */);
+ /* wait */ true, /* for_compaction */ false, &block, BlockType::kFilter,
+ nullptr /* get_context */, &lookup_context, nullptr /* contents */,
+ false);
if (!s.ok()) {
return s;
}