// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "table/block_based/block_based_table_reader.h"
-
#include "table/block_based/reader_common.h"
// The file contains some member functions of BlockBasedTable that
TBlockIter* BlockBasedTable::NewDataBlockIterator(
const ReadOptions& ro, const BlockHandle& handle, TBlockIter* input_iter,
BlockType block_type, GetContext* get_context,
- BlockCacheLookupContext* lookup_context, Status s,
- FilePrefetchBuffer* prefetch_buffer, bool for_compaction) const {
+ BlockCacheLookupContext* lookup_context,
+ FilePrefetchBuffer* prefetch_buffer, bool for_compaction, bool async_read,
+ Status& s) const {
PERF_TIMER_GUARD(new_table_block_iter_nanos);
TBlockIter* iter = input_iter != nullptr ? input_iter : new TBlockIter;
return iter;
}
- CachableEntry<UncompressionDict> uncompression_dict;
- if (rep_->uncompression_dict_reader) {
+ CachableEntry<Block> block;
+ if (rep_->uncompression_dict_reader && block_type == BlockType::kData) {
+ CachableEntry<UncompressionDict> uncompression_dict;
const bool no_io = (ro.read_tier == kBlockCacheTier);
s = rep_->uncompression_dict_reader->GetOrReadUncompressionDictionary(
- prefetch_buffer, no_io, get_context, lookup_context,
- &uncompression_dict);
+ prefetch_buffer, no_io, ro.verify_checksums, get_context,
+ lookup_context, &uncompression_dict);
if (!s.ok()) {
iter->Invalidate(s);
return iter;
}
+ const UncompressionDict& dict = uncompression_dict.GetValue()
+ ? *uncompression_dict.GetValue()
+ : UncompressionDict::GetEmptyDict();
+ s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type,
+ get_context, lookup_context, for_compaction,
+ /* use_cache */ true, /* wait_for_cache */ true,
+ async_read);
+ } else {
+ s = RetrieveBlock(
+ prefetch_buffer, ro, handle, UncompressionDict::GetEmptyDict(), &block,
+ block_type, get_context, lookup_context, for_compaction,
+ /* use_cache */ true, /* wait_for_cache */ true, async_read);
}
- const UncompressionDict& dict = uncompression_dict.GetValue()
- ? *uncompression_dict.GetValue()
- : UncompressionDict::GetEmptyDict();
-
- CachableEntry<Block> block;
- s = RetrieveBlock(prefetch_buffer, ro, handle, dict, &block, block_type,
- get_context, lookup_context, for_compaction,
- /* use_cache */ true);
+ if (s.IsTryAgain() && async_read) {
+ return iter;
+ }
if (!s.ok()) {
assert(block.IsEmpty());
block_contents_pinned);
if (!block.IsCached()) {
- if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
- // insert a dummy record to block cache to track the memory usage
+ if (!ro.fill_cache) {
Cache* const block_cache = rep_->table_options.block_cache.get();
- Cache::Handle* cache_handle = nullptr;
- // There are two other types of cache keys: 1) SST cache key added in
- // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
- // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
- // from SST cache key(31 bytes), and use non-zero prefix to
- // differentiate from `write_buffer_manager`
- const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
- char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
- // Prefix: use rep_->cache_key_prefix padded by 0s
- memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
- assert(rep_->cache_key_prefix_size != 0);
- assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
- memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
- char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
- next_cache_key_id_++);
- assert(end - cache_key <=
- static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
- const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
- s = block_cache->Insert(unique_key, nullptr,
- block.GetValue()->ApproximateMemoryUsage(),
- nullptr, &cache_handle);
-
- if (s.ok()) {
- assert(cache_handle != nullptr);
- iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
- cache_handle);
+ if (block_cache) {
+ // insert a dummy record to block cache to track the memory usage
+ Cache::Handle* cache_handle = nullptr;
+ CacheKey key = CacheKey::CreateUniqueForCacheLifetime(block_cache);
+ s = block_cache->Insert(key.AsSlice(), nullptr,
+ block.GetValue()->ApproximateMemoryUsage(),
+ nullptr, &cache_handle);
+
+ if (s.ok()) {
+ assert(cache_handle != nullptr);
+ iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
+ cache_handle);
+ }
}
}
} else {
iter, block_contents_pinned);
if (!block.IsCached()) {
- if (!ro.fill_cache && rep_->cache_key_prefix_size != 0) {
- // insert a dummy record to block cache to track the memory usage
+ if (!ro.fill_cache) {
Cache* const block_cache = rep_->table_options.block_cache.get();
- Cache::Handle* cache_handle = nullptr;
- // There are two other types of cache keys: 1) SST cache key added in
- // `MaybeReadBlockAndLoadToCache` 2) dummy cache key added in
- // `write_buffer_manager`. Use longer prefix (41 bytes) to differentiate
- // from SST cache key(31 bytes), and use non-zero prefix to
- // differentiate from `write_buffer_manager`
- const size_t kExtraCacheKeyPrefix = kMaxVarint64Length * 4 + 1;
- char cache_key[kExtraCacheKeyPrefix + kMaxVarint64Length];
- // Prefix: use rep_->cache_key_prefix padded by 0s
- memset(cache_key, 0, kExtraCacheKeyPrefix + kMaxVarint64Length);
- assert(rep_->cache_key_prefix_size != 0);
- assert(rep_->cache_key_prefix_size <= kExtraCacheKeyPrefix);
- memcpy(cache_key, rep_->cache_key_prefix, rep_->cache_key_prefix_size);
- char* end = EncodeVarint64(cache_key + kExtraCacheKeyPrefix,
- next_cache_key_id_++);
- assert(end - cache_key <=
- static_cast<int>(kExtraCacheKeyPrefix + kMaxVarint64Length));
- const Slice unique_key(cache_key, static_cast<size_t>(end - cache_key));
- s = block_cache->Insert(unique_key, nullptr,
- block.GetValue()->ApproximateMemoryUsage(),
- nullptr, &cache_handle);
- if (s.ok()) {
- assert(cache_handle != nullptr);
- iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
- cache_handle);
+ if (block_cache) {
+ // insert a dummy record to block cache to track the memory usage
+ Cache::Handle* cache_handle = nullptr;
+ CacheKey key = CacheKey::CreateUniqueForCacheLifetime(block_cache);
+ s = block_cache->Insert(key.AsSlice(), nullptr,
+ block.GetValue()->ApproximateMemoryUsage(),
+ nullptr, &cache_handle);
+
+ if (s.ok()) {
+ assert(cache_handle != nullptr);
+ iter->RegisterCleanup(&ForceReleaseCachedEntry, block_cache,
+ cache_handle);
+ }
}
}
} else {