1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "table/block_fetcher.h"
16 #include "logging/logging.h"
17 #include "memory/memory_allocator.h"
18 #include "monitoring/perf_context_imp.h"
19 #include "rocksdb/compression_type.h"
20 #include "rocksdb/env.h"
21 #include "table/block_based/block.h"
22 #include "table/block_based/block_based_table_reader.h"
23 #include "table/block_based/block_type.h"
24 #include "table/block_based/reader_common.h"
25 #include "table/format.h"
26 #include "table/persistent_cache_helper.h"
27 #include "util/compression.h"
28 #include "util/stop_watch.h"
30 namespace ROCKSDB_NAMESPACE
{
32 inline void BlockFetcher::ProcessTrailerIfPresent() {
33 if (footer_
.GetBlockTrailerSize() > 0) {
34 assert(footer_
.GetBlockTrailerSize() == BlockBasedTable::kBlockTrailerSize
);
35 if (read_options_
.verify_checksums
) {
36 io_status_
= status_to_io_status(VerifyBlockChecksum(
37 footer_
.checksum_type(), slice_
.data(), block_size_
,
38 file_
->file_name(), handle_
.offset()));
39 RecordTick(ioptions_
.stats
, BLOCK_CHECKSUM_COMPUTE_COUNT
);
42 BlockBasedTable::GetBlockCompressionType(slice_
.data(), block_size_
);
44 // E.g. plain table or cuckoo table
45 compression_type_
= kNoCompression
;
49 inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
50 if (cache_options_
.persistent_cache
&&
51 !cache_options_
.persistent_cache
->IsCompressed()) {
52 Status status
= PersistentCacheHelper::LookupUncompressed(
53 cache_options_
, handle_
, contents_
);
55 // uncompressed page is found for the block handle
58 // uncompressed page is not found
59 if (ioptions_
.logger
&& !status
.IsNotFound()) {
61 ROCKS_LOG_INFO(ioptions_
.logger
,
62 "Error reading from persistent cache. %s",
63 status
.ToString().c_str());
70 inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
71 if (prefetch_buffer_
!= nullptr) {
73 IOStatus io_s
= file_
->PrepareIOOptions(read_options_
, opts
);
75 bool read_from_prefetch_buffer
= false;
76 if (read_options_
.async_io
&& !for_compaction_
) {
77 read_from_prefetch_buffer
= prefetch_buffer_
->TryReadFromCacheAsync(
78 opts
, file_
, handle_
.offset(), block_size_with_trailer_
, &slice_
,
79 &io_s
, read_options_
.rate_limiter_priority
);
81 read_from_prefetch_buffer
= prefetch_buffer_
->TryReadFromCache(
82 opts
, file_
, handle_
.offset(), block_size_with_trailer_
, &slice_
,
83 &io_s
, read_options_
.rate_limiter_priority
, for_compaction_
);
85 if (read_from_prefetch_buffer
) {
86 ProcessTrailerIfPresent();
87 if (!io_status_
.ok()) {
90 got_from_prefetch_buffer_
= true;
91 used_buf_
= const_cast<char*>(slice_
.data());
99 return got_from_prefetch_buffer_
;
102 inline bool BlockFetcher::TryGetSerializedBlockFromPersistentCache() {
103 if (cache_options_
.persistent_cache
&&
104 cache_options_
.persistent_cache
->IsCompressed()) {
105 std::unique_ptr
<char[]> buf
;
106 io_status_
= status_to_io_status(PersistentCacheHelper::LookupSerialized(
107 cache_options_
, handle_
, &buf
, block_size_with_trailer_
));
108 if (io_status_
.ok()) {
109 heap_buf_
= CacheAllocationPtr(buf
.release());
110 used_buf_
= heap_buf_
.get();
111 slice_
= Slice(heap_buf_
.get(), block_size_
);
112 ProcessTrailerIfPresent();
114 } else if (!io_status_
.IsNotFound() && ioptions_
.logger
) {
115 assert(!io_status_
.ok());
116 ROCKS_LOG_INFO(ioptions_
.logger
,
117 "Error reading from persistent cache. %s",
118 io_status_
.ToString().c_str());
124 inline void BlockFetcher::PrepareBufferForBlockFromFile() {
125 // cache miss read from device
126 if ((do_uncompress_
|| ioptions_
.allow_mmap_reads
) &&
127 block_size_with_trailer_
< kDefaultStackBufferSize
) {
128 // If we've got a small enough chunk of data, read it in to the
129 // trivially allocated stack buffer instead of needing a full malloc()
131 // `GetBlockContents()` cannot return this data as its lifetime is tied to
132 // this `BlockFetcher`'s lifetime. That is fine because this is only used
133 // in cases where we do not expect the `GetBlockContents()` result to be the
134 // same buffer we are assigning here. If we guess incorrectly, there will be
135 // a heap allocation and memcpy in `GetBlockContents()` to obtain the final
136 // result. Considering we are eliding a heap allocation here by using the
137 // stack buffer, the cost of guessing incorrectly here is one extra memcpy.
139 // When `do_uncompress_` is true, we expect the uncompression step will
140 // allocate heap memory for the final result. However this expectation will
141 // be wrong if the block turns out to already be uncompressed, which we
142 // won't know for sure until after reading it.
144 // When `ioptions_.allow_mmap_reads` is true, we do not expect the file
145 // reader to use the scratch buffer at all, but instead return a pointer
146 // into the mapped memory. This expectation will be wrong when using a
147 // file reader that does not implement mmap reads properly.
148 used_buf_
= &stack_buf_
[0];
149 } else if (maybe_compressed_
&& !do_uncompress_
) {
151 AllocateBlock(block_size_with_trailer_
, memory_allocator_compressed_
);
152 used_buf_
= compressed_buf_
.get();
154 heap_buf_
= AllocateBlock(block_size_with_trailer_
, memory_allocator_
);
155 used_buf_
= heap_buf_
.get();
159 inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
160 if (io_status_
.ok() && read_options_
.fill_cache
&&
161 cache_options_
.persistent_cache
&&
162 cache_options_
.persistent_cache
->IsCompressed()) {
163 PersistentCacheHelper::InsertSerialized(cache_options_
, handle_
, used_buf_
,
164 block_size_with_trailer_
);
168 inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
169 if (io_status_
.ok() && !got_from_prefetch_buffer_
&&
170 read_options_
.fill_cache
&& cache_options_
.persistent_cache
&&
171 !cache_options_
.persistent_cache
->IsCompressed()) {
172 // insert to uncompressed cache
173 PersistentCacheHelper::InsertUncompressed(cache_options_
, handle_
,
178 inline void BlockFetcher::CopyBufferToHeapBuf() {
179 assert(used_buf_
!= heap_buf_
.get());
180 heap_buf_
= AllocateBlock(block_size_with_trailer_
, memory_allocator_
);
181 memcpy(heap_buf_
.get(), used_buf_
, block_size_with_trailer_
);
183 num_heap_buf_memcpy_
++;
187 inline void BlockFetcher::CopyBufferToCompressedBuf() {
188 assert(used_buf_
!= compressed_buf_
.get());
190 AllocateBlock(block_size_with_trailer_
, memory_allocator_compressed_
);
191 memcpy(compressed_buf_
.get(), used_buf_
, block_size_with_trailer_
);
193 num_compressed_buf_memcpy_
++;
197 // Entering this method means the block is not compressed or do not need to be
198 // uncompressed. The block can be in one of the following buffers:
199 // 1. prefetch buffer if prefetch is enabled and the block is prefetched before
200 // 2. stack_buf_ if block size is smaller than the stack_buf_ size and block
202 // 3. heap_buf_ if the block is not compressed
203 // 4. compressed_buf_ if the block is compressed
204 // 5. direct_io_buf_ if direct IO is enabled
205 // After this method, if the block is compressed, it should be in
206 // compressed_buf_, otherwise should be in heap_buf_.
207 inline void BlockFetcher::GetBlockContents() {
208 if (slice_
.data() != used_buf_
) {
209 // the slice content is not the buffer provided
210 *contents_
= BlockContents(Slice(slice_
.data(), block_size_
));
212 // page can be either uncompressed or compressed, the buffer either stack
213 // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
214 if (got_from_prefetch_buffer_
|| used_buf_
== &stack_buf_
[0]) {
215 CopyBufferToHeapBuf();
216 } else if (used_buf_
== compressed_buf_
.get()) {
217 if (compression_type_
== kNoCompression
&&
218 memory_allocator_
!= memory_allocator_compressed_
) {
219 CopyBufferToHeapBuf();
221 heap_buf_
= std::move(compressed_buf_
);
223 } else if (direct_io_buf_
.get() != nullptr) {
224 if (compression_type_
== kNoCompression
) {
225 CopyBufferToHeapBuf();
227 CopyBufferToCompressedBuf();
228 heap_buf_
= std::move(compressed_buf_
);
231 *contents_
= BlockContents(std::move(heap_buf_
), block_size_
);
234 contents_
->has_trailer
= footer_
.GetBlockTrailerSize() > 0;
238 IOStatus
BlockFetcher::ReadBlockContents() {
239 if (TryGetUncompressBlockFromPersistentCache()) {
240 compression_type_
= kNoCompression
;
242 contents_
->has_trailer
= footer_
.GetBlockTrailerSize() > 0;
244 return IOStatus::OK();
246 if (TryGetFromPrefetchBuffer()) {
247 if (!io_status_
.ok()) {
250 } else if (!TryGetSerializedBlockFromPersistentCache()) {
252 io_status_
= file_
->PrepareIOOptions(read_options_
, opts
);
254 if (io_status_
.ok()) {
255 if (file_
->use_direct_io()) {
256 PERF_TIMER_GUARD(block_read_time
);
257 io_status_
= file_
->Read(
258 opts
, handle_
.offset(), block_size_with_trailer_
, &slice_
, nullptr,
259 &direct_io_buf_
, read_options_
.rate_limiter_priority
);
260 PERF_COUNTER_ADD(block_read_count
, 1);
261 used_buf_
= const_cast<char*>(slice_
.data());
263 PrepareBufferForBlockFromFile();
264 PERF_TIMER_GUARD(block_read_time
);
265 io_status_
= file_
->Read(opts
, handle_
.offset(),
266 block_size_with_trailer_
, &slice_
, used_buf_
,
267 nullptr, read_options_
.rate_limiter_priority
);
268 PERF_COUNTER_ADD(block_read_count
, 1);
270 if (slice_
.data() == &stack_buf_
[0]) {
271 num_stack_buf_memcpy_
++;
272 } else if (slice_
.data() == heap_buf_
.get()) {
273 num_heap_buf_memcpy_
++;
274 } else if (slice_
.data() == compressed_buf_
.get()) {
275 num_compressed_buf_memcpy_
++;
281 // TODO: introduce dedicated perf counter for range tombstones
282 switch (block_type_
) {
283 case BlockType::kFilter
:
284 case BlockType::kFilterPartitionIndex
:
285 PERF_COUNTER_ADD(filter_block_read_count
, 1);
288 case BlockType::kCompressionDictionary
:
289 PERF_COUNTER_ADD(compression_dict_block_read_count
, 1);
292 case BlockType::kIndex
:
293 PERF_COUNTER_ADD(index_block_read_count
, 1);
296 // Nothing to do here as we don't have counters for the other types.
301 PERF_COUNTER_ADD(block_read_byte
, block_size_with_trailer_
);
302 if (!io_status_
.ok()) {
306 if (slice_
.size() != block_size_with_trailer_
) {
307 return IOStatus::Corruption(
308 "truncated block read from " + file_
->file_name() + " offset " +
309 std::to_string(handle_
.offset()) + ", expected " +
310 std::to_string(block_size_with_trailer_
) + " bytes, got " +
311 std::to_string(slice_
.size()));
314 ProcessTrailerIfPresent();
315 if (io_status_
.ok()) {
316 InsertCompressedBlockToPersistentCacheIfNeeded();
322 if (do_uncompress_
&& compression_type_
!= kNoCompression
) {
323 PERF_TIMER_GUARD(block_decompress_time
);
324 // compressed page, uncompress, update cache
325 UncompressionContext
context(compression_type_
);
326 UncompressionInfo
info(context
, uncompression_dict_
, compression_type_
);
327 io_status_
= status_to_io_status(UncompressSerializedBlock(
328 info
, slice_
.data(), block_size_
, contents_
, footer_
.format_version(),
329 ioptions_
, memory_allocator_
));
331 num_heap_buf_memcpy_
++;
333 compression_type_
= kNoCompression
;
338 InsertUncompressedBlockToPersistentCacheIfNeeded();
343 IOStatus
BlockFetcher::ReadAsyncBlockContents() {
344 if (TryGetUncompressBlockFromPersistentCache()) {
345 compression_type_
= kNoCompression
;
347 contents_
->has_trailer
= footer_
.GetBlockTrailerSize() > 0;
349 return IOStatus::OK();
350 } else if (!TryGetSerializedBlockFromPersistentCache()) {
351 assert(prefetch_buffer_
!= nullptr);
352 if (!for_compaction_
) {
354 IOStatus io_s
= file_
->PrepareIOOptions(read_options_
, opts
);
358 io_s
= status_to_io_status(prefetch_buffer_
->PrefetchAsync(
359 opts
, file_
, handle_
.offset(), block_size_with_trailer_
, &slice_
));
360 if (io_s
.IsTryAgain()) {
364 // Data Block is already in prefetch.
365 got_from_prefetch_buffer_
= true;
366 ProcessTrailerIfPresent();
367 if (!io_status_
.ok()) {
370 used_buf_
= const_cast<char*>(slice_
.data());
372 if (do_uncompress_
&& compression_type_
!= kNoCompression
) {
373 PERF_TIMER_GUARD(block_decompress_time
);
374 // compressed page, uncompress, update cache
375 UncompressionContext
context(compression_type_
);
376 UncompressionInfo
info(context
, uncompression_dict_
,
378 io_status_
= status_to_io_status(UncompressSerializedBlock(
379 info
, slice_
.data(), block_size_
, contents_
,
380 footer_
.format_version(), ioptions_
, memory_allocator_
));
382 num_heap_buf_memcpy_
++;
384 compression_type_
= kNoCompression
;
388 InsertUncompressedBlockToPersistentCacheIfNeeded();
392 // Fallback to sequential reading of data blocks in case of io_s returns
393 // error or for_compaction_is true.
394 return ReadBlockContents();
399 } // namespace ROCKSDB_NAMESPACE