#pragma once
-#include <stdint.h>
-#include <memory>
-#include <set>
-#include <string>
-#include <utility>
-#include <vector>
-
#include "db/range_tombstone_fragmenter.h"
#include "file/filename.h"
-#include "file/random_access_file_reader.h"
-#include "options/cf_options.h"
-#include "rocksdb/options.h"
-#include "rocksdb/persistent_cache.h"
-#include "rocksdb/statistics.h"
-#include "rocksdb/status.h"
-#include "rocksdb/table.h"
-#include "table/block_based/block.h"
#include "table/block_based/block_based_table_factory.h"
#include "table/block_based/block_type.h"
#include "table/block_based/cachable_entry.h"
#include "table/block_based/filter_block.h"
#include "table/block_based/uncompression_dict_reader.h"
-#include "table/format.h"
-#include "table/get_context.h"
-#include "table/multiget_context.h"
-#include "table/persistent_cache_helper.h"
#include "table/table_properties_internal.h"
#include "table/table_reader.h"
#include "table/two_level_iterator.h"
+
#include "trace_replay/block_cache_tracer.h"
-#include "util/coding.h"
-#include "util/user_comparator_wrapper.h"
namespace ROCKSDB_NAMESPACE {
// @param skip_filters Disables loading/accessing the filter block. Overrides
// prefetch_index_and_filter_in_cache, so filter will be skipped if both
// are set.
- static Status Open(const ImmutableCFOptions& ioptions,
+ // @param force_direct_prefetch if true, always prefetching to RocksDB
+ // buffer, rather than calling RandomAccessFile::Prefetch().
+ static Status Open(const ReadOptions& ro, const ImmutableCFOptions& ioptions,
const EnvOptions& env_options,
const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_key_comparator,
bool skip_filters = false, int level = -1,
const bool immortal_table = false,
const SequenceNumber largest_seqno = 0,
+ bool force_direct_prefetch = false,
TailPrefetchStats* tail_prefetch_stats = nullptr,
- BlockCacheTracer* const block_cache_tracer = nullptr);
+ BlockCacheTracer* const block_cache_tracer = nullptr,
+ size_t max_file_size_for_l0_meta_pin = 0);
bool PrefixMayMatch(const Slice& internal_key,
const ReadOptions& read_options,
// Returns a new iterator over the table contents.
// The result of NewIterator() is initially invalid (caller must
// call one of the Seek methods on the iterator before using it).
+ // @param read_options Must outlive the returned iterator.
// @param skip_filters Disables loading/accessing the filter block
// compaction_readahead_size: its value will only be used if caller =
// kCompaction.
const SliceTransform* prefix_extractor,
Arena* arena, bool skip_filters,
TableReaderCaller caller,
- size_t compaction_readahead_size = 0) override;
+ size_t compaction_readahead_size = 0,
+ bool allow_unprepared_value = false) override;
FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
const ReadOptions& read_options) override;
virtual size_t ApproximateMemoryUsage() const = 0;
// Cache the dependencies of the index reader (e.g. the partitions
// of a partitioned index).
- virtual void CacheDependencies(bool /* pin */) {}
+ virtual Status CacheDependencies(const ReadOptions& /*ro*/,
+ bool /* pin */) {
+ return Status::OK();
+ }
};
class IndexReaderCommon;
private:
friend class MockedBlockBasedTable;
+ friend class BlockBasedTableReaderTestVerifyChecksum_ChecksumMismatch_Test;
static std::atomic<uint64_t> next_cache_key_id_;
BlockCacheTracer* const block_cache_tracer_;
void UpdateCacheMissMetrics(BlockType block_type,
GetContext* get_context) const;
void UpdateCacheInsertionMetrics(BlockType block_type,
- GetContext* get_context, size_t usage) const;
+ GetContext* get_context, size_t usage,
+ bool redundant) const;
Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
BlockType block_type,
GetContext* get_context) const;
// Either Block::NewDataIterator() or Block::NewIndexIterator().
template <typename TBlockIter>
static TBlockIter* InitBlockIterator(const Rep* rep, Block* block,
+ BlockType block_type,
TBlockIter* input_iter,
bool block_contents_pinned);
// @param uncompression_dict Data for presetting the compression library's
// dictionary.
template <typename TBlocklike>
- Status PutDataBlockToCache(
- const Slice& block_cache_key, const Slice& compressed_block_cache_key,
- Cache* block_cache, Cache* block_cache_compressed,
- CachableEntry<TBlocklike>* cached_block,
- BlockContents* raw_block_contents, CompressionType raw_block_comp_type,
- const UncompressionDict& uncompression_dict, SequenceNumber seq_no,
- MemoryAllocator* memory_allocator, BlockType block_type,
- GetContext* get_context) const;
+ Status PutDataBlockToCache(const Slice& block_cache_key,
+ const Slice& compressed_block_cache_key,
+ Cache* block_cache, Cache* block_cache_compressed,
+ CachableEntry<TBlocklike>* cached_block,
+ BlockContents* raw_block_contents,
+ CompressionType raw_block_comp_type,
+ const UncompressionDict& uncompression_dict,
+ MemoryAllocator* memory_allocator,
+ BlockType block_type,
+ GetContext* get_context) const;
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.
// Optionally, user can pass a preloaded meta_index_iter for the index that
// need to access extra meta blocks for index construction. This parameter
// helps avoid re-reading meta index block if caller already created one.
- Status CreateIndexReader(FilePrefetchBuffer* prefetch_buffer,
+ Status CreateIndexReader(const ReadOptions& ro,
+ FilePrefetchBuffer* prefetch_buffer,
InternalIterator* preloaded_meta_index_iter,
bool use_cache, bool prefetch, bool pin,
BlockCacheLookupContext* lookup_context,
const SliceTransform* prefix_extractor,
BlockCacheLookupContext* lookup_context) const;
+ // If force_direct_prefetch is true, always prefetching to RocksDB
+ // buffer, rather than calling RandomAccessFile::Prefetch().
static Status PrefetchTail(
- RandomAccessFileReader* file, uint64_t file_size,
- TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all,
- const bool preload_all,
+ const ReadOptions& ro, RandomAccessFileReader* file, uint64_t file_size,
+ bool force_direct_prefetch, TailPrefetchStats* tail_prefetch_stats,
+ const bool prefetch_all, const bool preload_all,
std::unique_ptr<FilePrefetchBuffer>* prefetch_buffer);
- Status ReadMetaIndexBlock(FilePrefetchBuffer* prefetch_buffer,
+ Status ReadMetaIndexBlock(const ReadOptions& ro,
+ FilePrefetchBuffer* prefetch_buffer,
std::unique_ptr<Block>* metaindex_block,
std::unique_ptr<InternalIterator>* iter);
- Status TryReadPropertiesWithGlobalSeqno(FilePrefetchBuffer* prefetch_buffer,
+ Status TryReadPropertiesWithGlobalSeqno(const ReadOptions& ro,
+ FilePrefetchBuffer* prefetch_buffer,
const Slice& handle_value,
TableProperties** table_properties);
- Status ReadPropertiesBlock(FilePrefetchBuffer* prefetch_buffer,
+ Status ReadPropertiesBlock(const ReadOptions& ro,
+ FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter,
const SequenceNumber largest_seqno);
- Status ReadRangeDelBlock(FilePrefetchBuffer* prefetch_buffer,
+ Status ReadRangeDelBlock(const ReadOptions& ro,
+ FilePrefetchBuffer* prefetch_buffer,
InternalIterator* meta_iter,
const InternalKeyComparator& internal_comparator,
BlockCacheLookupContext* lookup_context);
Status PrefetchIndexAndFilterBlocks(
- FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter,
- BlockBasedTable* new_table, bool prefetch_all,
- const BlockBasedTableOptions& table_options, const int level,
+ const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
+ InternalIterator* meta_iter, BlockBasedTable* new_table,
+ bool prefetch_all, const BlockBasedTableOptions& table_options,
+ const int level, size_t file_size, size_t max_file_size_for_l0_meta_pin,
BlockCacheLookupContext* lookup_context);
static BlockType GetBlockTypeForMetaBlockByName(const Slice& meta_block_name);
// Create the filter from the filter block.
std::unique_ptr<FilterBlockReader> CreateFilterBlockReader(
- FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch,
- bool pin, BlockCacheLookupContext* lookup_context);
+ const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
+ bool use_cache, bool prefetch, bool pin,
+ BlockCacheLookupContext* lookup_context);
static void SetupCacheKeyPrefix(Rep* rep);
// Generate a cache key prefix from the file
- static void GenerateCachePrefix(Cache* cc, FSRandomAccessFile* file,
- char* buffer, size_t* size);
- static void GenerateCachePrefix(Cache* cc, FSWritableFile* file, char* buffer,
- size_t* size);
+ template <typename TCache, typename TFile>
+ static void GenerateCachePrefix(TCache* cc, TFile* file, char* buffer,
+ size_t* size) {
+ // generate an id from the file
+ *size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize);
+
+ // If the prefix wasn't generated or was too long,
+ // create one from the cache.
+ if (cc != nullptr && *size == 0) {
+ char* end = EncodeVarint64(buffer, cc->NewId());
+ *size = static_cast<size_t>(end - buffer);
+ }
+ }
+
+ // Size of all data blocks, maybe approximate
+ uint64_t GetApproximateDataSize();
- // Given an iterator return its offset in file.
- uint64_t ApproximateOffsetOf(
- const InternalIteratorBase<IndexValue>& index_iter) const;
+ // Given an iterator return its offset in data block section of file.
+ uint64_t ApproximateDataOffsetOf(
+ const InternalIteratorBase<IndexValue>& index_iter,
+ uint64_t data_size) const;
// Helper functions for DumpTable()
- Status DumpIndexBlock(WritableFile* out_file);
- Status DumpDataBlocks(WritableFile* out_file);
+ Status DumpIndexBlock(std::ostream& out_stream);
+ Status DumpDataBlocks(std::ostream& out_stream);
void DumpKeyValue(const Slice& key, const Slice& value,
- WritableFile* out_file);
+ std::ostream& out_stream);
// A cumulative data block file read in MultiGet lower than this size will
// use a stack buffer
friend class DBBasicTest_MultiGetIOBufferOverrun_Test;
};
-// Maitaning state of a two-level iteration on a partitioned index structure.
+// Maintaining state of a two-level iteration on a partitioned index structure.
class BlockBasedTable::PartitionedIndexIteratorState
: public TwoLevelIteratorState {
public:
Rep(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options,
const BlockBasedTableOptions& _table_opt,
const InternalKeyComparator& _internal_comparator, bool skip_filters,
- int _level, const bool _immortal_table)
+ uint64_t _file_size, int _level, const bool _immortal_table)
: ioptions(_ioptions),
env_options(_env_options),
table_options(_table_opt),
whole_key_filtering(_table_opt.whole_key_filtering),
prefix_filtering(true),
global_seqno(kDisableGlobalSequenceNumber),
+ file_size(_file_size),
level(_level),
immortal_table(_immortal_table) {}
-
+ ~Rep() { status.PermitUncheckedError(); }
const ImmutableCFOptions& ioptions;
const EnvOptions& env_options;
const BlockBasedTableOptions table_options;
// and every key have it's own seqno.
SequenceNumber global_seqno;
+ // Size of the table file on disk
+ uint64_t file_size;
+
// the level when the table is opened, could potentially change when trivial
// move is involved
int level;
max_readahead_size,
!ioptions.allow_mmap_reads /* enable */));
}
-};
-
-// Iterates over the contents of BlockBasedTable.
-template <class TBlockIter, typename TValue = Slice>
-class BlockBasedTableIterator : public InternalIteratorBase<TValue> {
- // compaction_readahead_size: its value will only be used if for_compaction =
- // true
- public:
- BlockBasedTableIterator(const BlockBasedTable* table,
- const ReadOptions& read_options,
- const InternalKeyComparator& icomp,
- InternalIteratorBase<IndexValue>* index_iter,
- bool check_filter, bool need_upper_bound_check,
- const SliceTransform* prefix_extractor,
- BlockType block_type, TableReaderCaller caller,
- size_t compaction_readahead_size = 0)
- : table_(table),
- read_options_(read_options),
- icomp_(icomp),
- user_comparator_(icomp.user_comparator()),
- index_iter_(index_iter),
- pinned_iters_mgr_(nullptr),
- block_iter_points_to_real_block_(false),
- check_filter_(check_filter),
- need_upper_bound_check_(need_upper_bound_check),
- prefix_extractor_(prefix_extractor),
- block_type_(block_type),
- lookup_context_(caller),
- compaction_readahead_size_(compaction_readahead_size) {}
-
- ~BlockBasedTableIterator() { delete index_iter_; }
-
- void Seek(const Slice& target) override;
- void SeekForPrev(const Slice& target) override;
- void SeekToFirst() override;
- void SeekToLast() override;
- void Next() final override;
- bool NextAndGetResult(IterateResult* result) override;
- void Prev() override;
- bool Valid() const override {
- return !is_out_of_bound_ &&
- (is_at_first_key_from_index_ ||
- (block_iter_points_to_real_block_ && block_iter_.Valid()));
- }
- Slice key() const override {
- assert(Valid());
- if (is_at_first_key_from_index_) {
- return index_iter_->value().first_internal_key;
- } else {
- return block_iter_.key();
- }
- }
- Slice user_key() const override {
- assert(Valid());
- if (is_at_first_key_from_index_) {
- return ExtractUserKey(index_iter_->value().first_internal_key);
- } else {
- return block_iter_.user_key();
- }
- }
- TValue value() const override {
- assert(Valid());
-
- // Load current block if not loaded.
- if (is_at_first_key_from_index_ &&
- !const_cast<BlockBasedTableIterator*>(this)
- ->MaterializeCurrentBlock()) {
- // Oops, index is not consistent with block contents, but we have
- // no good way to report error at this point. Let's return empty value.
- return TValue();
- }
- return block_iter_.value();
- }
- Status status() const override {
- // Prefix index set status to NotFound when the prefix does not exist
- if (!index_iter_->status().ok() && !index_iter_->status().IsNotFound()) {
- return index_iter_->status();
- } else if (block_iter_points_to_real_block_) {
- return block_iter_.status();
- } else {
- return Status::OK();
- }
- }
-
- // Whether iterator invalidated for being out of bound.
- bool IsOutOfBound() override { return is_out_of_bound_; }
-
- inline bool MayBeOutOfUpperBound() override {
- assert(Valid());
- return !data_block_within_upper_bound_;
- }
-
- void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
- pinned_iters_mgr_ = pinned_iters_mgr;
- }
- bool IsKeyPinned() const override {
- // Our key comes either from block_iter_'s current key
- // or index_iter_'s current *value*.
- return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
- ((is_at_first_key_from_index_ && index_iter_->IsValuePinned()) ||
- (block_iter_points_to_real_block_ && block_iter_.IsKeyPinned()));
- }
- bool IsValuePinned() const override {
- // Load current block if not loaded.
- if (is_at_first_key_from_index_) {
- const_cast<BlockBasedTableIterator*>(this)->MaterializeCurrentBlock();
+ void CreateFilePrefetchBufferIfNotExists(
+ size_t readahead_size, size_t max_readahead_size,
+ std::unique_ptr<FilePrefetchBuffer>* fpb) const {
+ if (!(*fpb)) {
+ CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb);
}
- // BlockIter::IsValuePinned() is always true. No need to check
- return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
- block_iter_points_to_real_block_;
}
+};
- void ResetDataIter() {
- if (block_iter_points_to_real_block_) {
- if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) {
- block_iter_.DelegateCleanupsTo(pinned_iters_mgr_);
- }
- block_iter_.Invalidate(Status::OK());
- block_iter_points_to_real_block_ = false;
+// This is an adapter class for `WritableFile` to be used for `std::ostream`.
+// The adapter wraps a `WritableFile`, which can be passed to a `std::ostream`
+// constructor for storing streaming data.
+// Note:
+// * This adapter doesn't provide any buffering, each write is forwarded to
+// `WritableFile->Append()` directly.
+// * For a failed write, the user needs to check the status by `ostream.good()`
+class WritableFileStringStreamAdapter : public std::stringbuf {
+ public:
+ explicit WritableFileStringStreamAdapter(WritableFile* writable_file)
+ : file_(writable_file) {}
+
+ // This is to handle `std::endl`, `endl` is written by `os.put()` directly
+ // without going through `xsputn()`. As we explicitly disabled buffering,
+ // every write, not captured by xsputn, is an overflow.
+ int overflow(int ch = EOF) override {
+ if (ch == '\n') {
+ file_->Append("\n");
+ return ch;
}
+ return EOF;
}
- void SavePrevIndexValue() {
- if (block_iter_points_to_real_block_) {
- // Reseek. If they end up with the same data block, we shouldn't re-fetch
- // the same data block.
- prev_block_offset_ = index_iter_->value().handle.offset();
+ std::streamsize xsputn(char const* p, std::streamsize n) override {
+ Status s = file_->Append(Slice(p, n));
+ if (!s.ok()) {
+ return 0;
}
+ return n;
}
private:
- enum class IterDirection {
- kForward,
- kBackward,
- };
-
- const BlockBasedTable* table_;
- const ReadOptions read_options_;
- const InternalKeyComparator& icomp_;
- UserComparatorWrapper user_comparator_;
- InternalIteratorBase<IndexValue>* index_iter_;
- PinnedIteratorsManager* pinned_iters_mgr_;
- TBlockIter block_iter_;
-
- // True if block_iter_ is initialized and points to the same block
- // as index iterator.
- bool block_iter_points_to_real_block_;
- // See InternalIteratorBase::IsOutOfBound().
- bool is_out_of_bound_ = false;
- // Whether current data block being fully within iterate upper bound.
- bool data_block_within_upper_bound_ = false;
- // True if we're standing at the first key of a block, and we haven't loaded
- // that block yet. A call to value() will trigger loading the block.
- bool is_at_first_key_from_index_ = false;
- bool check_filter_;
- // TODO(Zhongyi): pick a better name
- bool need_upper_bound_check_;
- const SliceTransform* prefix_extractor_;
- BlockType block_type_;
- uint64_t prev_block_offset_ = std::numeric_limits<uint64_t>::max();
- BlockCacheLookupContext lookup_context_;
- // Readahead size used in compaction, its value is used only if
- // lookup_context_.caller = kCompaction.
- size_t compaction_readahead_size_;
-
- size_t readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize;
- size_t readahead_limit_ = 0;
- int64_t num_file_reads_ = 0;
- std::unique_ptr<FilePrefetchBuffer> prefetch_buffer_;
-
- // If `target` is null, seek to first.
- void SeekImpl(const Slice* target);
-
- void InitDataBlock();
- bool MaterializeCurrentBlock();
- void FindKeyForward();
- void FindBlockForward();
- void FindKeyBackward();
- void CheckOutOfBound();
-
- // Check if data block is fully within iterate_upper_bound.
- //
- // Note MyRocks may update iterate bounds between seek. To workaround it,
- // we need to check and update data_block_within_upper_bound_ accordingly.
- void CheckDataBlockWithinUpperBound();
-
- bool CheckPrefixMayMatch(const Slice& ikey, IterDirection direction) {
- if (need_upper_bound_check_ && direction == IterDirection::kBackward) {
- // Upper bound check isn't sufficnet for backward direction to
- // guarantee the same result as total order, so disable prefix
- // check.
- return true;
- }
- if (check_filter_ &&
- !table_->PrefixMayMatch(ikey, read_options_, prefix_extractor_,
- need_upper_bound_check_, &lookup_context_)) {
- // TODO remember the iterator is invalidated because of prefix
- // match. This can avoid the upper level file iterator to falsely
- // believe the position is the end of the SST file and move to
- // the first key of the next file.
- ResetDataIter();
- return false;
- }
- return true;
- }
+ WritableFile* file_;
};
} // namespace ROCKSDB_NAMESPACE