]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | // | |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "table/block_fetcher.h" | |
11 | ||
f67539c2 | 12 | #include <cinttypes> |
494da23a | 13 | #include <string> |
11fdf7f2 | 14 | |
20effc67 | 15 | #include "file/file_util.h" |
f67539c2 TL |
16 | #include "logging/logging.h" |
17 | #include "memory/memory_allocator.h" | |
11fdf7f2 | 18 | #include "monitoring/perf_context_imp.h" |
11fdf7f2 | 19 | #include "rocksdb/env.h" |
f67539c2 TL |
20 | #include "table/block_based/block.h" |
21 | #include "table/block_based/block_based_table_reader.h" | |
20effc67 | 22 | #include "table/block_based/reader_common.h" |
11fdf7f2 | 23 | #include "table/format.h" |
494da23a | 24 | #include "table/persistent_cache_helper.h" |
11fdf7f2 | 25 | #include "util/compression.h" |
11fdf7f2 | 26 | #include "util/stop_watch.h" |
11fdf7f2 | 27 | |
f67539c2 | 28 | namespace ROCKSDB_NAMESPACE { |
11fdf7f2 | 29 | |
494da23a | 30 | inline void BlockFetcher::CheckBlockChecksum() { |
11fdf7f2 TL |
31 | // Check the crc of the type and the block contents |
32 | if (read_options_.verify_checksums) { | |
20effc67 TL |
33 | status_ = ROCKSDB_NAMESPACE::VerifyBlockChecksum( |
34 | footer_.checksum(), slice_.data(), block_size_, file_->file_name(), | |
35 | handle_.offset()); | |
11fdf7f2 TL |
36 | } |
37 | } | |
38 | ||
494da23a | 39 | inline bool BlockFetcher::TryGetUncompressBlockFromPersistentCache() { |
11fdf7f2 TL |
40 | if (cache_options_.persistent_cache && |
41 | !cache_options_.persistent_cache->IsCompressed()) { | |
42 | Status status = PersistentCacheHelper::LookupUncompressedPage( | |
43 | cache_options_, handle_, contents_); | |
44 | if (status.ok()) { | |
45 | // uncompressed page is found for the block handle | |
46 | return true; | |
47 | } else { | |
48 | // uncompressed page is not found | |
49 | if (ioptions_.info_log && !status.IsNotFound()) { | |
50 | assert(!status.ok()); | |
51 | ROCKS_LOG_INFO(ioptions_.info_log, | |
52 | "Error reading from persistent cache. %s", | |
53 | status.ToString().c_str()); | |
54 | } | |
55 | } | |
56 | } | |
57 | return false; | |
58 | } | |
59 | ||
494da23a | 60 | inline bool BlockFetcher::TryGetFromPrefetchBuffer() { |
20effc67 TL |
61 | if (prefetch_buffer_ != nullptr) { |
62 | IOOptions opts; | |
63 | Status s = PrepareIOFromReadOptions(read_options_, file_->env(), opts); | |
64 | if (s.ok() && prefetch_buffer_->TryReadFromCache( | |
65 | opts, handle_.offset(), block_size_with_trailer_, &slice_, | |
66 | for_compaction_)) { | |
67 | CheckBlockChecksum(); | |
68 | if (!status_.ok()) { | |
69 | return true; | |
70 | } | |
71 | got_from_prefetch_buffer_ = true; | |
72 | used_buf_ = const_cast<char*>(slice_.data()); | |
11fdf7f2 | 73 | } |
11fdf7f2 TL |
74 | } |
75 | return got_from_prefetch_buffer_; | |
76 | } | |
77 | ||
494da23a | 78 | inline bool BlockFetcher::TryGetCompressedBlockFromPersistentCache() { |
11fdf7f2 TL |
79 | if (cache_options_.persistent_cache && |
80 | cache_options_.persistent_cache->IsCompressed()) { | |
81 | // lookup uncompressed cache mode p-cache | |
494da23a | 82 | std::unique_ptr<char[]> raw_data; |
11fdf7f2 | 83 | status_ = PersistentCacheHelper::LookupRawPage( |
20effc67 | 84 | cache_options_, handle_, &raw_data, block_size_with_trailer_); |
11fdf7f2 | 85 | if (status_.ok()) { |
494da23a | 86 | heap_buf_ = CacheAllocationPtr(raw_data.release()); |
11fdf7f2 TL |
87 | used_buf_ = heap_buf_.get(); |
88 | slice_ = Slice(heap_buf_.get(), block_size_); | |
89 | return true; | |
90 | } else if (!status_.IsNotFound() && ioptions_.info_log) { | |
91 | assert(!status_.ok()); | |
92 | ROCKS_LOG_INFO(ioptions_.info_log, | |
93 | "Error reading from persistent cache. %s", | |
94 | status_.ToString().c_str()); | |
95 | } | |
96 | } | |
97 | return false; | |
98 | } | |
99 | ||
494da23a | 100 | inline void BlockFetcher::PrepareBufferForBlockFromFile() { |
11fdf7f2 | 101 | // cache miss read from device |
20effc67 TL |
102 | if ((do_uncompress_ || ioptions_.allow_mmap_reads) && |
103 | block_size_with_trailer_ < kDefaultStackBufferSize) { | |
11fdf7f2 TL |
104 | // If we've got a small enough hunk of data, read it in to the |
105 | // trivially allocated stack buffer instead of needing a full malloc() | |
20effc67 TL |
106 | // |
107 | // `GetBlockContents()` cannot return this data as its lifetime is tied to | |
108 | // this `BlockFetcher`'s lifetime. That is fine because this is only used | |
109 | // in cases where we do not expect the `GetBlockContents()` result to be the | |
110 | // same buffer we are assigning here. If we guess incorrectly, there will be | |
111 | // a heap allocation and memcpy in `GetBlockContents()` to obtain the final | |
112 | // result. Considering we are eliding a heap allocation here by using the | |
113 | // stack buffer, the cost of guessing incorrectly here is one extra memcpy. | |
114 | // | |
115 | // When `do_uncompress_` is true, we expect the uncompression step will | |
116 | // allocate heap memory for the final result. However this expectation will | |
117 | // be wrong if the block turns out to already be uncompressed, which we | |
118 | // won't know for sure until after reading it. | |
119 | // | |
120 | // When `ioptions_.allow_mmap_reads` is true, we do not expect the file | |
121 | // reader to use the scratch buffer at all, but instead return a pointer | |
122 | // into the mapped memory. This expectation will be wrong when using a | |
123 | // file reader that does not implement mmap reads properly. | |
11fdf7f2 | 124 | used_buf_ = &stack_buf_[0]; |
494da23a | 125 | } else if (maybe_compressed_ && !do_uncompress_) { |
20effc67 | 126 | compressed_buf_ = AllocateBlock(block_size_with_trailer_, |
494da23a TL |
127 | memory_allocator_compressed_); |
128 | used_buf_ = compressed_buf_.get(); | |
11fdf7f2 | 129 | } else { |
494da23a | 130 | heap_buf_ = |
20effc67 | 131 | AllocateBlock(block_size_with_trailer_, memory_allocator_); |
11fdf7f2 TL |
132 | used_buf_ = heap_buf_.get(); |
133 | } | |
134 | } | |
135 | ||
494da23a | 136 | inline void BlockFetcher::InsertCompressedBlockToPersistentCacheIfNeeded() { |
11fdf7f2 TL |
137 | if (status_.ok() && read_options_.fill_cache && |
138 | cache_options_.persistent_cache && | |
139 | cache_options_.persistent_cache->IsCompressed()) { | |
140 | // insert to raw cache | |
141 | PersistentCacheHelper::InsertRawPage(cache_options_, handle_, used_buf_, | |
20effc67 | 142 | block_size_with_trailer_); |
11fdf7f2 TL |
143 | } |
144 | } | |
145 | ||
494da23a | 146 | inline void BlockFetcher::InsertUncompressedBlockToPersistentCacheIfNeeded() { |
11fdf7f2 TL |
147 | if (status_.ok() && !got_from_prefetch_buffer_ && read_options_.fill_cache && |
148 | cache_options_.persistent_cache && | |
149 | !cache_options_.persistent_cache->IsCompressed()) { | |
150 | // insert to uncompressed cache | |
151 | PersistentCacheHelper::InsertUncompressedPage(cache_options_, handle_, | |
152 | *contents_); | |
153 | } | |
154 | } | |
155 | ||
20effc67 | 156 | inline void BlockFetcher::CopyBufferToHeapBuf() { |
494da23a | 157 | assert(used_buf_ != heap_buf_.get()); |
20effc67 TL |
158 | heap_buf_ = AllocateBlock(block_size_with_trailer_, memory_allocator_); |
159 | memcpy(heap_buf_.get(), used_buf_, block_size_with_trailer_); | |
160 | #ifndef NDEBUG | |
161 | num_heap_buf_memcpy_++; | |
162 | #endif | |
163 | } | |
164 | ||
165 | inline void BlockFetcher::CopyBufferToCompressedBuf() { | |
166 | assert(used_buf_ != compressed_buf_.get()); | |
167 | compressed_buf_ = AllocateBlock(block_size_with_trailer_, | |
168 | memory_allocator_compressed_); | |
169 | memcpy(compressed_buf_.get(), used_buf_, block_size_with_trailer_); | |
170 | #ifndef NDEBUG | |
171 | num_compressed_buf_memcpy_++; | |
172 | #endif | |
494da23a TL |
173 | } |
174 | ||
20effc67 TL |
175 | // Entering this method means the block is not compressed or do not need to be |
176 | // uncompressed. The block can be in one of the following buffers: | |
177 | // 1. prefetch buffer if prefetch is enabled and the block is prefetched before | |
178 | // 2. stack_buf_ if block size is smaller than the stack_buf_ size and block | |
179 | // is not compressed | |
180 | // 3. heap_buf_ if the block is not compressed | |
181 | // 4. compressed_buf_ if the block is compressed | |
182 | // 5. direct_io_buf_ if direct IO is enabled | |
183 | // After this method, if the block is compressed, it should be in | |
184 | // compressed_buf_, otherwise should be in heap_buf_. | |
494da23a | 185 | inline void BlockFetcher::GetBlockContents() { |
11fdf7f2 TL |
186 | if (slice_.data() != used_buf_) { |
187 | // the slice content is not the buffer provided | |
494da23a | 188 | *contents_ = BlockContents(Slice(slice_.data(), block_size_)); |
11fdf7f2 TL |
189 | } else { |
190 | // page can be either uncompressed or compressed, the buffer either stack | |
191 | // or heap provided. Refer to https://github.com/facebook/rocksdb/pull/4096 | |
192 | if (got_from_prefetch_buffer_ || used_buf_ == &stack_buf_[0]) { | |
20effc67 | 193 | CopyBufferToHeapBuf(); |
494da23a TL |
194 | } else if (used_buf_ == compressed_buf_.get()) { |
195 | if (compression_type_ == kNoCompression && | |
196 | memory_allocator_ != memory_allocator_compressed_) { | |
20effc67 TL |
197 | CopyBufferToHeapBuf(); |
198 | } else { | |
199 | heap_buf_ = std::move(compressed_buf_); | |
200 | } | |
201 | } else if (direct_io_buf_.get() != nullptr) { | |
202 | if (compression_type_ == kNoCompression) { | |
203 | CopyBufferToHeapBuf(); | |
494da23a | 204 | } else { |
20effc67 | 205 | CopyBufferToCompressedBuf(); |
494da23a TL |
206 | heap_buf_ = std::move(compressed_buf_); |
207 | } | |
11fdf7f2 | 208 | } |
494da23a | 209 | *contents_ = BlockContents(std::move(heap_buf_), block_size_); |
11fdf7f2 | 210 | } |
494da23a TL |
211 | #ifndef NDEBUG |
212 | contents_->is_raw_block = true; | |
213 | #endif | |
11fdf7f2 TL |
214 | } |
215 | ||
216 | Status BlockFetcher::ReadBlockContents() { | |
11fdf7f2 | 217 | if (TryGetUncompressBlockFromPersistentCache()) { |
494da23a TL |
218 | compression_type_ = kNoCompression; |
219 | #ifndef NDEBUG | |
220 | contents_->is_raw_block = true; | |
221 | #endif // NDEBUG | |
11fdf7f2 TL |
222 | return Status::OK(); |
223 | } | |
224 | if (TryGetFromPrefetchBuffer()) { | |
225 | if (!status_.ok()) { | |
226 | return status_; | |
227 | } | |
228 | } else if (!TryGetCompressedBlockFromPersistentCache()) { | |
20effc67 TL |
229 | IOOptions opts; |
230 | status_ = PrepareIOFromReadOptions(read_options_, file_->env(), opts); | |
231 | // Actual file read | |
232 | if (status_.ok()) { | |
233 | if (file_->use_direct_io()) { | |
234 | PERF_TIMER_GUARD(block_read_time); | |
235 | status_ = | |
236 | file_->Read(opts, handle_.offset(), block_size_with_trailer_, | |
237 | &slice_, nullptr, &direct_io_buf_, for_compaction_); | |
238 | PERF_COUNTER_ADD(block_read_count, 1); | |
239 | used_buf_ = const_cast<char*>(slice_.data()); | |
240 | } else { | |
241 | PrepareBufferForBlockFromFile(); | |
242 | PERF_TIMER_GUARD(block_read_time); | |
243 | status_ = file_->Read(opts, handle_.offset(), block_size_with_trailer_, | |
244 | &slice_, used_buf_, nullptr, for_compaction_); | |
245 | PERF_COUNTER_ADD(block_read_count, 1); | |
246 | #ifndef NDEBUG | |
247 | if (slice_.data() == &stack_buf_[0]) { | |
248 | num_stack_buf_memcpy_++; | |
249 | } else if (slice_.data() == heap_buf_.get()) { | |
250 | num_heap_buf_memcpy_++; | |
251 | } else if (slice_.data() == compressed_buf_.get()) { | |
252 | num_compressed_buf_memcpy_++; | |
253 | } | |
254 | #endif | |
255 | } | |
11fdf7f2 | 256 | } |
f67539c2 TL |
257 | |
258 | // TODO: introduce dedicated perf counter for range tombstones | |
259 | switch (block_type_) { | |
260 | case BlockType::kFilter: | |
261 | PERF_COUNTER_ADD(filter_block_read_count, 1); | |
262 | break; | |
263 | ||
264 | case BlockType::kCompressionDictionary: | |
265 | PERF_COUNTER_ADD(compression_dict_block_read_count, 1); | |
266 | break; | |
267 | ||
268 | case BlockType::kIndex: | |
269 | PERF_COUNTER_ADD(index_block_read_count, 1); | |
270 | break; | |
271 | ||
272 | // Nothing to do here as we don't have counters for the other types. | |
273 | default: | |
274 | break; | |
275 | } | |
276 | ||
20effc67 | 277 | PERF_COUNTER_ADD(block_read_byte, block_size_with_trailer_); |
11fdf7f2 TL |
278 | if (!status_.ok()) { |
279 | return status_; | |
280 | } | |
281 | ||
20effc67 | 282 | if (slice_.size() != block_size_with_trailer_) { |
11fdf7f2 TL |
283 | return Status::Corruption("truncated block read from " + |
284 | file_->file_name() + " offset " + | |
285 | ToString(handle_.offset()) + ", expected " + | |
20effc67 | 286 | ToString(block_size_with_trailer_) + |
11fdf7f2 TL |
287 | " bytes, got " + ToString(slice_.size())); |
288 | } | |
289 | ||
290 | CheckBlockChecksum(); | |
291 | if (status_.ok()) { | |
292 | InsertCompressedBlockToPersistentCacheIfNeeded(); | |
293 | } else { | |
294 | return status_; | |
295 | } | |
296 | } | |
297 | ||
494da23a | 298 | compression_type_ = get_block_compression_type(slice_.data(), block_size_); |
11fdf7f2 | 299 | |
494da23a | 300 | if (do_uncompress_ && compression_type_ != kNoCompression) { |
20effc67 | 301 | PERF_TIMER_GUARD(block_decompress_time); |
11fdf7f2 | 302 | // compressed page, uncompress, update cache |
494da23a TL |
303 | UncompressionContext context(compression_type_); |
304 | UncompressionInfo info(context, uncompression_dict_, compression_type_); | |
305 | status_ = UncompressBlockContents(info, slice_.data(), block_size_, | |
306 | contents_, footer_.version(), ioptions_, | |
307 | memory_allocator_); | |
20effc67 TL |
308 | #ifndef NDEBUG |
309 | num_heap_buf_memcpy_++; | |
310 | #endif | |
494da23a | 311 | compression_type_ = kNoCompression; |
11fdf7f2 TL |
312 | } else { |
313 | GetBlockContents(); | |
314 | } | |
315 | ||
316 | InsertUncompressedBlockToPersistentCacheIfNeeded(); | |
317 | ||
318 | return status_; | |
319 | } | |
320 | ||
f67539c2 | 321 | } // namespace ROCKSDB_NAMESPACE |