]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/table/block_fetcher.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / table / block_fetcher.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "table/block_fetcher.h"
11
12 #include <cassert>
13 #include <cinttypes>
14 #include <string>
15
16 #include "logging/logging.h"
17 #include "memory/memory_allocator.h"
18 #include "monitoring/perf_context_imp.h"
19 #include "rocksdb/compression_type.h"
20 #include "rocksdb/env.h"
21 #include "table/block_based/block.h"
22 #include "table/block_based/block_based_table_reader.h"
23 #include "table/block_based/block_type.h"
24 #include "table/block_based/reader_common.h"
25 #include "table/format.h"
26 #include "table/persistent_cache_helper.h"
27 #include "util/compression.h"
28 #include "util/stop_watch.h"
29
30 namespace ROCKSDB_NAMESPACE {
31
32 inline void BlockFetcher::ProcessTrailerIfPresent() {
33 if (footer_.GetBlockTrailerSize() > 0) {
34 assert(footer_.GetBlockTrailerSize() == BlockBasedTable::kBlockTrailerSize);
35 if (read_options_.verify_checksums) {
36 io_status_ = status_to_io_status(VerifyBlockChecksum(
37 footer_.checksum_type(), slice_.data(), block_size_,
38 file_->file_name(), handle_.offset()));
39 RecordTick(ioptions_.stats, BLOCK_CHECKSUM_COMPUTE_COUNT);
40 }
41 compression_type_ =
42 BlockBasedTable::GetBlockCompressionType(slice_.data(), block_size_);
43 } else {
44 // E.g. plain table or cuckoo table
45 compression_type_ = kNoCompression;
46 }
47 }
48
49 inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() {
50 if (cache_options_.persistent_cache &&
51 !cache_options_.persistent_cache->IsCompressed()) {
52 Status status = PersistentCacheHelper::LookupUncompressed(
53 cache_options_, handle_, contents_);
54 if (status.ok()) {
55 // uncompressed page is found for the block handle
56 return true;
57 } else {
58 // uncompressed page is not found
59 if (ioptions_.logger && !status.IsNotFound()) {
60 assert(!status.ok());
61 ROCKS_LOG_INFO(ioptions_.logger,
62 "Error reading from persistent cache. %s",
63 status.ToString().c_str());
64 }
65 }
66 }
67 return false;
68 }
69
70 inline bool BlockFetcher::TryGetFromPrefetchBuffer() {
71 if (prefetch_buffer_ != nullptr) {
72 IOOptions opts;
73 IOStatus io_s = file_->PrepareIOOptions(read_options_, opts);
74 if (io_s.ok()) {
75 bool read_from_prefetch_buffer = false;
76 if (read_options_.async_io && !for_compaction_) {
77 read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCacheAsync(
78 opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
79 &io_s, read_options_.rate_limiter_priority);
80 } else {
81 read_from_prefetch_buffer = prefetch_buffer_->TryReadFromCache(
82 opts, file_, handle_.offset(), block_size_with_trailer_, &slice_,
83 &io_s, read_options_.rate_limiter_priority, for_compaction_);
84 }
85 if (read_from_prefetch_buffer) {
86 ProcessTrailerIfPresent();
87 if (!io_status_.ok()) {
88 return true;
89 }
90 got_from_prefetch_buffer_ = true;
91 used_buf_ = const_cast<char*>(slice_.data());
92 }
93 }
94 if (!io_s.ok()) {
95 io_status_ = io_s;
96 return true;
97 }
98 }
99 return got_from_prefetch_buffer_;
100 }
101
102 inline bool BlockFetcher::TryGetSerializedBlockFromPersistentCache() {
103 if (cache_options_.persistent_cache &&
104 cache_options_.persistent_cache->IsCompressed()) {
105 std::unique_ptr<char[]> buf;
106 io_status_ = status_to_io_status(PersistentCacheHelper::LookupSerialized(
107 cache_options_, handle_, &buf, block_size_with_trailer_));
108 if (io_status_.ok()) {
109 heap_buf_ = CacheAllocationPtr(buf.release());
110 used_buf_ = heap_buf_.get();
111 slice_ = Slice(heap_buf_.get(), block_size_);
112 ProcessTrailerIfPresent();
113 return true;
114 } else if (!io_status_.IsNotFound() && ioptions_.logger) {
115 assert(!io_status_.ok());
116 ROCKS_LOG_INFO(ioptions_.logger,
117 "Error reading from persistent cache. %s",
118 io_status_.ToString().c_str());
119 }
120 }
121 return false;
122 }
123
124 inline void BlockFetcher::PrepareBufferForBlockFromFile() {
125 // cache miss read from device
126 if ((do_uncompress_ || ioptions_.allow_mmap_reads) &&
127 block_size_with_trailer_ < kDefaultStackBufferSize) {
128 // If we've got a small enough chunk of data, read it in to the
129 // trivially allocated stack buffer instead of needing a full malloc()
130 //
131 // `GetBlockContents()` cannot return this data as its lifetime is tied to
132 // this `BlockFetcher`'s lifetime. That is fine because this is only used
133 // in cases where we do not expect the `GetBlockContents()` result to be the
134 // same buffer we are assigning here. If we guess incorrectly, there will be
135 // a heap allocation and memcpy in `GetBlockContents()` to obtain the final
136 // result. Considering we are eliding a heap allocation here by using the
137 // stack buffer, the cost of guessing incorrectly here is one extra memcpy.
138 //
139 // When `do_uncompress_` is true, we expect the uncompression step will
140 // allocate heap memory for the final result. However this expectation will
141 // be wrong if the block turns out to already be uncompressed, which we
142 // won't know for sure until after reading it.
143 //
144 // When `ioptions_.allow_mmap_reads` is true, we do not expect the file
145 // reader to use the scratch buffer at all, but instead return a pointer
146 // into the mapped memory. This expectation will be wrong when using a
147 // file reader that does not implement mmap reads properly.
148 used_buf_ = &stack_buf_[0];
149 } else if (maybe_compressed_ && !do_uncompress_) {
150 compressed_buf_ =
151 AllocateBlock(block_size_with_trailer_, memory_allocator_compressed_);
152 used_buf_ = compressed_buf_.get();
153 } else {
154 heap_buf_ = AllocateBlock(block_size_with_trailer_, memory_allocator_);
155 used_buf_ = heap_buf_.get();
156 }
157 }
158
159 inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() {
160 if (io_status_.ok() && read_options_.fill_cache &&
161 cache_options_.persistent_cache &&
162 cache_options_.persistent_cache->IsCompressed()) {
163 PersistentCacheHelper::InsertSerialized(cache_options_, handle_, used_buf_,
164 block_size_with_trailer_);
165 }
166 }
167
168 inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() {
169 if (io_status_.ok() && !got_from_prefetch_buffer_ &&
170 read_options_.fill_cache && cache_options_.persistent_cache &&
171 !cache_options_.persistent_cache->IsCompressed()) {
172 // insert to uncompressed cache
173 PersistentCacheHelper::InsertUncompressed(cache_options_, handle_,
174 *contents_);
175 }
176 }
177
178 inline void BlockFetcher::CopyBufferToHeapBuf() {
179 assert(used_buf_ != heap_buf_.get());
180 heap_buf_ = AllocateBlock(block_size_with_trailer_, memory_allocator_);
181 memcpy(heap_buf_.get(), used_buf_, block_size_with_trailer_);
182 #ifndef NDEBUG
183 num_heap_buf_memcpy_++;
184 #endif
185 }
186
187 inline void BlockFetcher::CopyBufferToCompressedBuf() {
188 assert(used_buf_ != compressed_buf_.get());
189 compressed_buf_ =
190 AllocateBlock(block_size_with_trailer_, memory_allocator_compressed_);
191 memcpy(compressed_buf_.get(), used_buf_, block_size_with_trailer_);
192 #ifndef NDEBUG
193 num_compressed_buf_memcpy_++;
194 #endif
195 }
196
197 // Entering this method means the block is not compressed or do not need to be
198 // uncompressed. The block can be in one of the following buffers:
199 // 1. prefetch buffer if prefetch is enabled and the block is prefetched before
200 // 2. stack_buf_ if block size is smaller than the stack_buf_ size and block
201 // is not compressed
202 // 3. heap_buf_ if the block is not compressed
203 // 4. compressed_buf_ if the block is compressed
204 // 5. direct_io_buf_ if direct IO is enabled
205 // After this method, if the block is compressed, it should be in
206 // compressed_buf_, otherwise should be in heap_buf_.
207 inline void BlockFetcher::GetBlockContents() {
208 if (slice_.data() != used_buf_) {
209 // the slice content is not the buffer provided
210 *contents_ = BlockContents(Slice(slice_.data(), block_size_));
211 } else {
212 // page can be either uncompressed or compressed, the buffer either stack
213 // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096
214 if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) {
215 CopyBufferToHeapBuf();
216 } else if (used_buf_ == compressed_buf_.get()) {
217 if (compression_type_ == kNoCompression &&
218 memory_allocator_ != memory_allocator_compressed_) {
219 CopyBufferToHeapBuf();
220 } else {
221 heap_buf_ = std::move(compressed_buf_);
222 }
223 } else if (direct_io_buf_.get() != nullptr) {
224 if (compression_type_ == kNoCompression) {
225 CopyBufferToHeapBuf();
226 } else {
227 CopyBufferToCompressedBuf();
228 heap_buf_ = std::move(compressed_buf_);
229 }
230 }
231 *contents_ = BlockContents(std::move(heap_buf_), block_size_);
232 }
233 #ifndef NDEBUG
234 contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
235 #endif
236 }
237
238 IOStatus BlockFetcher::ReadBlockContents() {
239 if (TryGetUncompressBlockFromPersistentCache()) {
240 compression_type_ = kNoCompression;
241 #ifndef NDEBUG
242 contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
243 #endif // NDEBUG
244 return IOStatus::OK();
245 }
246 if (TryGetFromPrefetchBuffer()) {
247 if (!io_status_.ok()) {
248 return io_status_;
249 }
250 } else if (!TryGetSerializedBlockFromPersistentCache()) {
251 IOOptions opts;
252 io_status_ = file_->PrepareIOOptions(read_options_, opts);
253 // Actual file read
254 if (io_status_.ok()) {
255 if (file_->use_direct_io()) {
256 PERF_TIMER_GUARD(block_read_time);
257 io_status_ = file_->Read(
258 opts, handle_.offset(), block_size_with_trailer_, &slice_, nullptr,
259 &direct_io_buf_, read_options_.rate_limiter_priority);
260 PERF_COUNTER_ADD(block_read_count, 1);
261 used_buf_ = const_cast<char*>(slice_.data());
262 } else {
263 PrepareBufferForBlockFromFile();
264 PERF_TIMER_GUARD(block_read_time);
265 io_status_ = file_->Read(opts, handle_.offset(),
266 block_size_with_trailer_, &slice_, used_buf_,
267 nullptr, read_options_.rate_limiter_priority);
268 PERF_COUNTER_ADD(block_read_count, 1);
269 #ifndef NDEBUG
270 if (slice_.data() == &stack_buf_[0]) {
271 num_stack_buf_memcpy_++;
272 } else if (slice_.data() == heap_buf_.get()) {
273 num_heap_buf_memcpy_++;
274 } else if (slice_.data() == compressed_buf_.get()) {
275 num_compressed_buf_memcpy_++;
276 }
277 #endif
278 }
279 }
280
281 // TODO: introduce dedicated perf counter for range tombstones
282 switch (block_type_) {
283 case BlockType::kFilter:
284 case BlockType::kFilterPartitionIndex:
285 PERF_COUNTER_ADD(filter_block_read_count, 1);
286 break;
287
288 case BlockType::kCompressionDictionary:
289 PERF_COUNTER_ADD(compression_dict_block_read_count, 1);
290 break;
291
292 case BlockType::kIndex:
293 PERF_COUNTER_ADD(index_block_read_count, 1);
294 break;
295
296 // Nothing to do here as we don't have counters for the other types.
297 default:
298 break;
299 }
300
301 PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_);
302 if (!io_status_.ok()) {
303 return io_status_;
304 }
305
306 if (slice_.size() != block_size_with_trailer_) {
307 return IOStatus::Corruption(
308 "truncated block read from " + file_->file_name() + " offset " +
309 std::to_string(handle_.offset()) + ", expected " +
310 std::to_string(block_size_with_trailer_) + " bytes, got " +
311 std::to_string(slice_.size()));
312 }
313
314 ProcessTrailerIfPresent();
315 if (io_status_.ok()) {
316 InsertCompressedBlockToPersistentCacheIfNeeded();
317 } else {
318 return io_status_;
319 }
320 }
321
322 if (do_uncompress_ && compression_type_ != kNoCompression) {
323 PERF_TIMER_GUARD(block_decompress_time);
324 // compressed page, uncompress, update cache
325 UncompressionContext context(compression_type_);
326 UncompressionInfo info(context, uncompression_dict_, compression_type_);
327 io_status_ = status_to_io_status(UncompressSerializedBlock(
328 info, slice_.data(), block_size_, contents_, footer_.format_version(),
329 ioptions_, memory_allocator_));
330 #ifndef NDEBUG
331 num_heap_buf_memcpy_++;
332 #endif
333 compression_type_ = kNoCompression;
334 } else {
335 GetBlockContents();
336 }
337
338 InsertUncompressedBlockToPersistentCacheIfNeeded();
339
340 return io_status_;
341 }
342
343 IOStatus BlockFetcher::ReadAsyncBlockContents() {
344 if (TryGetUncompressBlockFromPersistentCache()) {
345 compression_type_ = kNoCompression;
346 #ifndef NDEBUG
347 contents_->has_trailer = footer_.GetBlockTrailerSize() > 0;
348 #endif // NDEBUG
349 return IOStatus::OK();
350 } else if (!TryGetSerializedBlockFromPersistentCache()) {
351 assert(prefetch_buffer_ != nullptr);
352 if (!for_compaction_) {
353 IOOptions opts;
354 IOStatus io_s = file_->PrepareIOOptions(read_options_, opts);
355 if (!io_s.ok()) {
356 return io_s;
357 }
358 io_s = status_to_io_status(prefetch_buffer_->PrefetchAsync(
359 opts, file_, handle_.offset(), block_size_with_trailer_, &slice_));
360 if (io_s.IsTryAgain()) {
361 return io_s;
362 }
363 if (io_s.ok()) {
364 // Data Block is already in prefetch.
365 got_from_prefetch_buffer_ = true;
366 ProcessTrailerIfPresent();
367 if (!io_status_.ok()) {
368 return io_status_;
369 }
370 used_buf_ = const_cast<char*>(slice_.data());
371
372 if (do_uncompress_ && compression_type_ != kNoCompression) {
373 PERF_TIMER_GUARD(block_decompress_time);
374 // compressed page, uncompress, update cache
375 UncompressionContext context(compression_type_);
376 UncompressionInfo info(context, uncompression_dict_,
377 compression_type_);
378 io_status_ = status_to_io_status(UncompressSerializedBlock(
379 info, slice_.data(), block_size_, contents_,
380 footer_.format_version(), ioptions_, memory_allocator_));
381 #ifndef NDEBUG
382 num_heap_buf_memcpy_++;
383 #endif
384 compression_type_ = kNoCompression;
385 } else {
386 GetBlockContents();
387 }
388 InsertUncompressedBlockToPersistentCacheIfNeeded();
389 return io_status_;
390 }
391 }
392 // Fallback to sequential reading of data blocks in case of io_s returns
393 // error or for_compaction_is true.
394 return ReadBlockContents();
395 }
396 return io_status_;
397 }
398
399 } // namespace ROCKSDB_NAMESPACE