]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/table/block_based/block_based_table_reader.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / table / block_based / block_based_table_reader.h
index 28a378988ba5def5741ee6dccf47041d50d44f33..2923b482d3443c67cb13ed3f69a520ee628d3d63 100644 (file)
@@ -9,38 +9,18 @@
 
 #pragma once
 
-#include <stdint.h>
-#include <memory>
-#include <set>
-#include <string>
-#include <utility>
-#include <vector>
-
 #include "db/range_tombstone_fragmenter.h"
 #include "file/filename.h"
-#include "file/random_access_file_reader.h"
-#include "options/cf_options.h"
-#include "rocksdb/options.h"
-#include "rocksdb/persistent_cache.h"
-#include "rocksdb/statistics.h"
-#include "rocksdb/status.h"
-#include "rocksdb/table.h"
-#include "table/block_based/block.h"
 #include "table/block_based/block_based_table_factory.h"
 #include "table/block_based/block_type.h"
 #include "table/block_based/cachable_entry.h"
 #include "table/block_based/filter_block.h"
 #include "table/block_based/uncompression_dict_reader.h"
-#include "table/format.h"
-#include "table/get_context.h"
-#include "table/multiget_context.h"
-#include "table/persistent_cache_helper.h"
 #include "table/table_properties_internal.h"
 #include "table/table_reader.h"
 #include "table/two_level_iterator.h"
+
 #include "trace_replay/block_cache_tracer.h"
-#include "util/coding.h"
-#include "util/user_comparator_wrapper.h"
 
 namespace ROCKSDB_NAMESPACE {
 
@@ -105,7 +85,9 @@ class BlockBasedTable : public TableReader {
   // @param skip_filters Disables loading/accessing the filter block. Overrides
   //    prefetch_index_and_filter_in_cache, so filter will be skipped if both
   //    are set.
-  static Status Open(const ImmutableCFOptions& ioptions,
+  // @param force_direct_prefetch if true, always prefetching to RocksDB
+  //    buffer, rather than calling RandomAccessFile::Prefetch().
+  static Status Open(const ReadOptions& ro, const ImmutableCFOptions& ioptions,
                      const EnvOptions& env_options,
                      const BlockBasedTableOptions& table_options,
                      const InternalKeyComparator& internal_key_comparator,
@@ -117,8 +99,10 @@ class BlockBasedTable : public TableReader {
                      bool skip_filters = false, int level = -1,
                      const bool immortal_table = false,
                      const SequenceNumber largest_seqno = 0,
+                     bool force_direct_prefetch = false,
                      TailPrefetchStats* tail_prefetch_stats = nullptr,
-                     BlockCacheTracer* const block_cache_tracer = nullptr);
+                     BlockCacheTracer* const block_cache_tracer = nullptr,
+                     size_t max_file_size_for_l0_meta_pin = 0);
 
   bool PrefixMayMatch(const Slice& internal_key,
                       const ReadOptions& read_options,
@@ -129,6 +113,7 @@ class BlockBasedTable : public TableReader {
   // Returns a new iterator over the table contents.
   // The result of NewIterator() is initially invalid (caller must
   // call one of the Seek methods on the iterator before using it).
+  // @param read_options Must outlive the returned iterator.
   // @param skip_filters Disables loading/accessing the filter block
   // compaction_readahead_size: its value will only be used if caller =
   // kCompaction.
@@ -136,7 +121,8 @@ class BlockBasedTable : public TableReader {
                                 const SliceTransform* prefix_extractor,
                                 Arena* arena, bool skip_filters,
                                 TableReaderCaller caller,
-                                size_t compaction_readahead_size = 0) override;
+                                size_t compaction_readahead_size = 0,
+                                bool allow_unprepared_value = false) override;
 
   FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
       const ReadOptions& read_options) override;
@@ -220,7 +206,10 @@ class BlockBasedTable : public TableReader {
     virtual size_t ApproximateMemoryUsage() const = 0;
     // Cache the dependencies of the index reader (e.g. the partitions
     // of a partitioned index).
-    virtual void CacheDependencies(bool /* pin */) {}
+    virtual Status CacheDependencies(const ReadOptions& /*ro*/,
+                                     bool /* pin */) {
+      return Status::OK();
+    }
   };
 
   class IndexReaderCommon;
@@ -271,6 +260,7 @@ class BlockBasedTable : public TableReader {
 
  private:
   friend class MockedBlockBasedTable;
+  friend class BlockBasedTableReaderTestVerifyChecksum_ChecksumMismatch_Test;
   static std::atomic<uint64_t> next_cache_key_id_;
   BlockCacheTracer* const block_cache_tracer_;
 
@@ -279,7 +269,8 @@ class BlockBasedTable : public TableReader {
   void UpdateCacheMissMetrics(BlockType block_type,
                               GetContext* get_context) const;
   void UpdateCacheInsertionMetrics(BlockType block_type,
-                                   GetContext* get_context, size_t usage) const;
+                                   GetContext* get_context, size_t usage,
+                                   bool redundant) const;
   Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
                                    BlockType block_type,
                                    GetContext* get_context) const;
@@ -287,6 +278,7 @@ class BlockBasedTable : public TableReader {
   // Either Block::NewDataIterator() or Block::NewIndexIterator().
   template <typename TBlockIter>
   static TBlockIter* InitBlockIterator(const Rep* rep, Block* block,
+                                       BlockType block_type,
                                        TBlockIter* input_iter,
                                        bool block_contents_pinned);
 
@@ -370,14 +362,16 @@ class BlockBasedTable : public TableReader {
   // @param uncompression_dict Data for presetting the compression library's
   //    dictionary.
   template <typename TBlocklike>
-  Status PutDataBlockToCache(
-      const Slice& block_cache_key, const Slice& compressed_block_cache_key,
-      Cache* block_cache, Cache* block_cache_compressed,
-      CachableEntry<TBlocklike>* cached_block,
-      BlockContents* raw_block_contents, CompressionType raw_block_comp_type,
-      const UncompressionDict& uncompression_dict, SequenceNumber seq_no,
-      MemoryAllocator* memory_allocator, BlockType block_type,
-      GetContext* get_context) const;
+  Status PutDataBlockToCache(const Slice& block_cache_key,
+                             const Slice& compressed_block_cache_key,
+                             Cache* block_cache, Cache* block_cache_compressed,
+                             CachableEntry<TBlocklike>* cached_block,
+                             BlockContents* raw_block_contents,
+                             CompressionType raw_block_comp_type,
+                             const UncompressionDict& uncompression_dict,
+                             MemoryAllocator* memory_allocator,
+                             BlockType block_type,
+                             GetContext* get_context) const;
 
   // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
   // after a call to Seek(key), until handle_result returns false.
@@ -389,7 +383,8 @@ class BlockBasedTable : public TableReader {
   // Optionally, user can pass a preloaded meta_index_iter for the index that
   // need to access extra meta blocks for index construction. This parameter
   // helps avoid re-reading meta index block if caller already created one.
-  Status CreateIndexReader(FilePrefetchBuffer* prefetch_buffer,
+  Status CreateIndexReader(const ReadOptions& ro,
+                           FilePrefetchBuffer* prefetch_buffer,
                            InternalIterator* preloaded_meta_index_iter,
                            bool use_cache, bool prefetch, bool pin,
                            BlockCacheLookupContext* lookup_context,
@@ -408,28 +403,35 @@ class BlockBasedTable : public TableReader {
                               const SliceTransform* prefix_extractor,
                               BlockCacheLookupContext* lookup_context) const;
 
+  // If force_direct_prefetch is true, always prefetching to RocksDB
+  //    buffer, rather than calling RandomAccessFile::Prefetch().
   static Status PrefetchTail(
-      RandomAccessFileReader* file, uint64_t file_size,
-      TailPrefetchStats* tail_prefetch_stats, const bool prefetch_all,
-      const bool preload_all,
+      const ReadOptions& ro, RandomAccessFileReader* file, uint64_t file_size,
+      bool force_direct_prefetch, TailPrefetchStats* tail_prefetch_stats,
+      const bool prefetch_all, const bool preload_all,
       std::unique_ptr<FilePrefetchBuffer>* prefetch_buffer);
-  Status ReadMetaIndexBlock(FilePrefetchBuffer* prefetch_buffer,
+  Status ReadMetaIndexBlock(const ReadOptions& ro,
+                            FilePrefetchBuffer* prefetch_buffer,
                             std::unique_ptr<Block>* metaindex_block,
                             std::unique_ptr<InternalIterator>* iter);
-  Status TryReadPropertiesWithGlobalSeqno(FilePrefetchBuffer* prefetch_buffer,
+  Status TryReadPropertiesWithGlobalSeqno(const ReadOptions& ro,
+                                          FilePrefetchBuffer* prefetch_buffer,
                                           const Slice& handle_value,
                                           TableProperties** table_properties);
-  Status ReadPropertiesBlock(FilePrefetchBuffer* prefetch_buffer,
+  Status ReadPropertiesBlock(const ReadOptions& ro,
+                             FilePrefetchBuffer* prefetch_buffer,
                              InternalIterator* meta_iter,
                              const SequenceNumber largest_seqno);
-  Status ReadRangeDelBlock(FilePrefetchBuffer* prefetch_buffer,
+  Status ReadRangeDelBlock(const ReadOptions& ro,
+                           FilePrefetchBuffer* prefetch_buffer,
                            InternalIterator* meta_iter,
                            const InternalKeyComparator& internal_comparator,
                            BlockCacheLookupContext* lookup_context);
   Status PrefetchIndexAndFilterBlocks(
-      FilePrefetchBuffer* prefetch_buffer, InternalIterator* meta_iter,
-      BlockBasedTable* new_table, bool prefetch_all,
-      const BlockBasedTableOptions& table_options, const int level,
+      const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
+      InternalIterator* meta_iter, BlockBasedTable* new_table,
+      bool prefetch_all, const BlockBasedTableOptions& table_options,
+      const int level, size_t file_size, size_t max_file_size_for_l0_meta_pin,
       BlockCacheLookupContext* lookup_context);
 
   static BlockType GetBlockTypeForMetaBlockByName(const Slice& meta_block_name);
@@ -440,26 +442,40 @@ class BlockBasedTable : public TableReader {
 
   // Create the filter from the filter block.
   std::unique_ptr<FilterBlockReader> CreateFilterBlockReader(
-      FilePrefetchBuffer* prefetch_buffer, bool use_cache, bool prefetch,
-      bool pin, BlockCacheLookupContext* lookup_context);
+      const ReadOptions& ro, FilePrefetchBuffer* prefetch_buffer,
+      bool use_cache, bool prefetch, bool pin,
+      BlockCacheLookupContext* lookup_context);
 
   static void SetupCacheKeyPrefix(Rep* rep);
 
   // Generate a cache key prefix from the file
-  static void GenerateCachePrefix(Cache* cc, FSRandomAccessFile* file,
-                                  char* buffer, size_t* size);
-  static void GenerateCachePrefix(Cache* cc, FSWritableFile* file, char* buffer,
-                                  size_t* size);
+  template <typename TCache, typename TFile>
+  static void GenerateCachePrefix(TCache* cc, TFile* file, char* buffer,
+                                  size_t* size) {
+    // generate an id from the file
+    *size = file->GetUniqueId(buffer, kMaxCacheKeyPrefixSize);
+
+    // If the prefix wasn't generated or was too long,
+    // create one from the cache.
+    if (cc != nullptr && *size == 0) {
+      char* end = EncodeVarint64(buffer, cc->NewId());
+      *size = static_cast<size_t>(end - buffer);
+    }
+  }
+
+  // Size of all data blocks, maybe approximate
+  uint64_t GetApproximateDataSize();
 
-  // Given an iterator return its offset in file.
-  uint64_t ApproximateOffsetOf(
-      const InternalIteratorBase<IndexValue>& index_iter) const;
+  // Given an iterator return its offset in data block section of file.
+  uint64_t ApproximateDataOffsetOf(
+      const InternalIteratorBase<IndexValue>& index_iter,
+      uint64_t data_size) const;
 
   // Helper functions for DumpTable()
-  Status DumpIndexBlock(WritableFile* out_file);
-  Status DumpDataBlocks(WritableFile* out_file);
+  Status DumpIndexBlock(std::ostream& out_stream);
+  Status DumpDataBlocks(std::ostream& out_stream);
   void DumpKeyValue(const Slice& key, const Slice& value,
-                    WritableFile* out_file);
+                    std::ostream& out_stream);
 
   // A cumulative data block file read in MultiGet lower than this size will
   // use a stack buffer
@@ -470,7 +486,7 @@ class BlockBasedTable : public TableReader {
   friend class DBBasicTest_MultiGetIOBufferOverrun_Test;
 };
 
-// Maitaning state of a two-level iteration on a partitioned index structure.
+// Maintaining state of a two-level iteration on a partitioned index structure.
 class BlockBasedTable::PartitionedIndexIteratorState
     : public TwoLevelIteratorState {
  public:
@@ -492,7 +508,7 @@ struct BlockBasedTable::Rep {
   Rep(const ImmutableCFOptions& _ioptions, const EnvOptions& _env_options,
       const BlockBasedTableOptions& _table_opt,
       const InternalKeyComparator& _internal_comparator, bool skip_filters,
-      int _level, const bool _immortal_table)
+      uint64_t _file_size, int _level, const bool _immortal_table)
       : ioptions(_ioptions),
         env_options(_env_options),
         table_options(_table_opt),
@@ -504,9 +520,10 @@ struct BlockBasedTable::Rep {
         whole_key_filtering(_table_opt.whole_key_filtering),
         prefix_filtering(true),
         global_seqno(kDisableGlobalSequenceNumber),
+        file_size(_file_size),
         level(_level),
         immortal_table(_immortal_table) {}
-
+  ~Rep() { status.PermitUncheckedError(); }
   const ImmutableCFOptions& ioptions;
   const EnvOptions& env_options;
   const BlockBasedTableOptions table_options;
@@ -561,6 +578,9 @@ struct BlockBasedTable::Rep {
   // and every key have it's own seqno.
   SequenceNumber global_seqno;
 
+  // Size of the table file on disk
+  uint64_t file_size;
+
   // the level when the table is opened, could potentially change when trivial
   // move is involved
   int level;
@@ -613,212 +633,49 @@ struct BlockBasedTable::Rep {
                                       max_readahead_size,
                                       !ioptions.allow_mmap_reads /* enable */));
   }
-};
-
-// Iterates over the contents of BlockBasedTable.
-template <class TBlockIter, typename TValue = Slice>
-class BlockBasedTableIterator : public InternalIteratorBase<TValue> {
-  // compaction_readahead_size: its value will only be used if for_compaction =
-  // true
- public:
-  BlockBasedTableIterator(const BlockBasedTable* table,
-                          const ReadOptions& read_options,
-                          const InternalKeyComparator& icomp,
-                          InternalIteratorBase<IndexValue>* index_iter,
-                          bool check_filter, bool need_upper_bound_check,
-                          const SliceTransform* prefix_extractor,
-                          BlockType block_type, TableReaderCaller caller,
-                          size_t compaction_readahead_size = 0)
-      : table_(table),
-        read_options_(read_options),
-        icomp_(icomp),
-        user_comparator_(icomp.user_comparator()),
-        index_iter_(index_iter),
-        pinned_iters_mgr_(nullptr),
-        block_iter_points_to_real_block_(false),
-        check_filter_(check_filter),
-        need_upper_bound_check_(need_upper_bound_check),
-        prefix_extractor_(prefix_extractor),
-        block_type_(block_type),
-        lookup_context_(caller),
-        compaction_readahead_size_(compaction_readahead_size) {}
-
-  ~BlockBasedTableIterator() { delete index_iter_; }
-
-  void Seek(const Slice& target) override;
-  void SeekForPrev(const Slice& target) override;
-  void SeekToFirst() override;
-  void SeekToLast() override;
-  void Next() final override;
-  bool NextAndGetResult(IterateResult* result) override;
-  void Prev() override;
-  bool Valid() const override {
-    return !is_out_of_bound_ &&
-           (is_at_first_key_from_index_ ||
-            (block_iter_points_to_real_block_ && block_iter_.Valid()));
-  }
-  Slice key() const override {
-    assert(Valid());
-    if (is_at_first_key_from_index_) {
-      return index_iter_->value().first_internal_key;
-    } else {
-      return block_iter_.key();
-    }
-  }
-  Slice user_key() const override {
-    assert(Valid());
-    if (is_at_first_key_from_index_) {
-      return ExtractUserKey(index_iter_->value().first_internal_key);
-    } else {
-      return block_iter_.user_key();
-    }
-  }
-  TValue value() const override {
-    assert(Valid());
-
-    // Load current block if not loaded.
-    if (is_at_first_key_from_index_ &&
-        !const_cast<BlockBasedTableIterator*>(this)
-             ->MaterializeCurrentBlock()) {
-      // Oops, index is not consistent with block contents, but we have
-      // no good way to report error at this point. Let's return empty value.
-      return TValue();
-    }
 
-    return block_iter_.value();
-  }
-  Status status() const override {
-    // Prefix index set status to NotFound when the prefix does not exist
-    if (!index_iter_->status().ok() && !index_iter_->status().IsNotFound()) {
-      return index_iter_->status();
-    } else if (block_iter_points_to_real_block_) {
-      return block_iter_.status();
-    } else {
-      return Status::OK();
-    }
-  }
-
-  // Whether iterator invalidated for being out of bound.
-  bool IsOutOfBound() override { return is_out_of_bound_; }
-
-  inline bool MayBeOutOfUpperBound() override {
-    assert(Valid());
-    return !data_block_within_upper_bound_;
-  }
-
-  void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
-    pinned_iters_mgr_ = pinned_iters_mgr;
-  }
-  bool IsKeyPinned() const override {
-    // Our key comes either from block_iter_'s current key
-    // or index_iter_'s current *value*.
-    return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
-           ((is_at_first_key_from_index_ && index_iter_->IsValuePinned()) ||
-            (block_iter_points_to_real_block_ && block_iter_.IsKeyPinned()));
-  }
-  bool IsValuePinned() const override {
-    // Load current block if not loaded.
-    if (is_at_first_key_from_index_) {
-      const_cast<BlockBasedTableIterator*>(this)->MaterializeCurrentBlock();
+  void CreateFilePrefetchBufferIfNotExists(
+      size_t readahead_size, size_t max_readahead_size,
+      std::unique_ptr<FilePrefetchBuffer>* fpb) const {
+    if (!(*fpb)) {
+      CreateFilePrefetchBuffer(readahead_size, max_readahead_size, fpb);
     }
-    // BlockIter::IsValuePinned() is always true. No need to check
-    return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() &&
-           block_iter_points_to_real_block_;
   }
+};
 
-  void ResetDataIter() {
-    if (block_iter_points_to_real_block_) {
-      if (pinned_iters_mgr_ != nullptr && pinned_iters_mgr_->PinningEnabled()) {
-        block_iter_.DelegateCleanupsTo(pinned_iters_mgr_);
-      }
-      block_iter_.Invalidate(Status::OK());
-      block_iter_points_to_real_block_ = false;
+// This is an adapter class for `WritableFile` to be used for `std::ostream`.
+// The adapter wraps a `WritableFile`, which can be passed to a `std::ostream`
+// constructor for storing streaming data.
+// Note:
+//  * This adapter doesn't provide any buffering, each write is forwarded to
+//    `WritableFile->Append()` directly.
+//  * For a failed write, the user needs to check the status by `ostream.good()`
+class WritableFileStringStreamAdapter : public std::stringbuf {
+ public:
+  explicit WritableFileStringStreamAdapter(WritableFile* writable_file)
+      : file_(writable_file) {}
+
+  // This is to handle `std::endl`, `endl` is written by `os.put()` directly
+  // without going through `xsputn()`. As we explicitly disabled buffering,
+  // every write, not captured by xsputn, is an overflow.
+  int overflow(int ch = EOF) override {
+    if (ch == '\n') {
+      file_->Append("\n");
+      return ch;
     }
+    return EOF;
   }
 
-  void SavePrevIndexValue() {
-    if (block_iter_points_to_real_block_) {
-      // Reseek. If they end up with the same data block, we shouldn't re-fetch
-      // the same data block.
-      prev_block_offset_ = index_iter_->value().handle.offset();
+  std::streamsize xsputn(char const* p, std::streamsize n) override {
+    Status s = file_->Append(Slice(p, n));
+    if (!s.ok()) {
+      return 0;
     }
+    return n;
   }
 
  private:
-  enum class IterDirection {
-    kForward,
-    kBackward,
-  };
-
-  const BlockBasedTable* table_;
-  const ReadOptions read_options_;
-  const InternalKeyComparator& icomp_;
-  UserComparatorWrapper user_comparator_;
-  InternalIteratorBase<IndexValue>* index_iter_;
-  PinnedIteratorsManager* pinned_iters_mgr_;
-  TBlockIter block_iter_;
-
-  // True if block_iter_ is initialized and points to the same block
-  // as index iterator.
-  bool block_iter_points_to_real_block_;
-  // See InternalIteratorBase::IsOutOfBound().
-  bool is_out_of_bound_ = false;
-  // Whether current data block being fully within iterate upper bound.
-  bool data_block_within_upper_bound_ = false;
-  // True if we're standing at the first key of a block, and we haven't loaded
-  // that block yet. A call to value() will trigger loading the block.
-  bool is_at_first_key_from_index_ = false;
-  bool check_filter_;
-  // TODO(Zhongyi): pick a better name
-  bool need_upper_bound_check_;
-  const SliceTransform* prefix_extractor_;
-  BlockType block_type_;
-  uint64_t prev_block_offset_ = std::numeric_limits<uint64_t>::max();
-  BlockCacheLookupContext lookup_context_;
-  // Readahead size used in compaction, its value is used only if
-  // lookup_context_.caller = kCompaction.
-  size_t compaction_readahead_size_;
-
-  size_t readahead_size_ = BlockBasedTable::kInitAutoReadaheadSize;
-  size_t readahead_limit_ = 0;
-  int64_t num_file_reads_ = 0;
-  std::unique_ptr<FilePrefetchBuffer> prefetch_buffer_;
-
-  // If `target` is null, seek to first.
-  void SeekImpl(const Slice* target);
-
-  void InitDataBlock();
-  bool MaterializeCurrentBlock();
-  void FindKeyForward();
-  void FindBlockForward();
-  void FindKeyBackward();
-  void CheckOutOfBound();
-
-  // Check if data block is fully within iterate_upper_bound.
-  //
-  // Note MyRocks may update iterate bounds between seek. To workaround it,
-  // we need to check and update data_block_within_upper_bound_ accordingly.
-  void CheckDataBlockWithinUpperBound();
-
-  bool CheckPrefixMayMatch(const Slice& ikey, IterDirection direction) {
-    if (need_upper_bound_check_ && direction == IterDirection::kBackward) {
-      // Upper bound check isn't sufficnet for backward direction to
-      // guarantee the same result as total order, so disable prefix
-      // check.
-      return true;
-    }
-    if (check_filter_ &&
-        !table_->PrefixMayMatch(ikey, read_options_, prefix_extractor_,
-                                need_upper_bound_check_, &lookup_context_)) {
-      // TODO remember the iterator is invalidated because of prefix
-      // match. This can avoid the upper level file iterator to falsely
-      // believe the position is the end of the SST file and move to
-      // the first key of the next file.
-      ResetDataIter();
-      return false;
-    }
-    return true;
-  }
+  WritableFile* file_;
 };
 
 }  // namespace ROCKSDB_NAMESPACE