1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "file/random_access_file_reader.h"
15 #include "file/file_util.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "table/format.h"
20 #include "test_util/sync_point.h"
21 #include "util/random.h"
22 #include "util/rate_limiter.h"
24 namespace ROCKSDB_NAMESPACE
{
26 inline void RecordIOStats(Statistics
* stats
, Temperature file_temperature
,
27 bool is_last_level
, size_t size
) {
28 IOSTATS_ADD(bytes_read
, size
);
29 // record for last/non-last level
31 RecordTick(stats
, LAST_LEVEL_READ_BYTES
, size
);
32 RecordTick(stats
, LAST_LEVEL_READ_COUNT
, 1);
34 RecordTick(stats
, NON_LAST_LEVEL_READ_BYTES
, size
);
35 RecordTick(stats
, NON_LAST_LEVEL_READ_COUNT
, 1);
38 // record for temperature file
39 if (file_temperature
!= Temperature::kUnknown
) {
40 switch (file_temperature
) {
41 case Temperature::kHot
:
42 IOSTATS_ADD(file_io_stats_by_temperature
.hot_file_bytes_read
, size
);
43 IOSTATS_ADD(file_io_stats_by_temperature
.hot_file_read_count
, 1);
44 RecordTick(stats
, HOT_FILE_READ_BYTES
, size
);
45 RecordTick(stats
, HOT_FILE_READ_COUNT
, 1);
47 case Temperature::kWarm
:
48 IOSTATS_ADD(file_io_stats_by_temperature
.warm_file_bytes_read
, size
);
49 IOSTATS_ADD(file_io_stats_by_temperature
.warm_file_read_count
, 1);
50 RecordTick(stats
, WARM_FILE_READ_BYTES
, size
);
51 RecordTick(stats
, WARM_FILE_READ_COUNT
, 1);
53 case Temperature::kCold
:
54 IOSTATS_ADD(file_io_stats_by_temperature
.cold_file_bytes_read
, size
);
55 IOSTATS_ADD(file_io_stats_by_temperature
.cold_file_read_count
, 1);
56 RecordTick(stats
, COLD_FILE_READ_BYTES
, size
);
57 RecordTick(stats
, COLD_FILE_READ_COUNT
, 1);
65 IOStatus
RandomAccessFileReader::Create(
66 const std::shared_ptr
<FileSystem
>& fs
, const std::string
& fname
,
67 const FileOptions
& file_opts
,
68 std::unique_ptr
<RandomAccessFileReader
>* reader
, IODebugContext
* dbg
) {
69 std::unique_ptr
<FSRandomAccessFile
> file
;
70 IOStatus io_s
= fs
->NewRandomAccessFile(fname
, file_opts
, &file
, dbg
);
72 reader
->reset(new RandomAccessFileReader(std::move(file
), fname
));
77 IOStatus
RandomAccessFileReader::Read(
78 const IOOptions
& opts
, uint64_t offset
, size_t n
, Slice
* result
,
79 char* scratch
, AlignedBuf
* aligned_buf
,
80 Env::IOPriority rate_limiter_priority
) const {
83 TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
85 // To be paranoid: modify scratch a little bit, so in case underlying
86 // FileSystem doesn't fill the buffer but return success and `scratch` returns
87 // contains a previous block, returned value will not pass checksum.
88 if (n
> 0 && scratch
!= nullptr) {
89 // This byte might not change anything for direct I/O case, but it's OK.
96 StopWatch
sw(clock_
, stats_
, hist_type_
,
97 (stats_
!= nullptr) ? &elapsed
: nullptr, true /*overwrite*/,
98 true /*delay_enabled*/);
99 auto prev_perf_level
= GetPerfLevel();
100 IOSTATS_TIMER_GUARD(read_nanos
);
101 if (use_direct_io()) {
103 size_t alignment
= file_
->GetRequiredBufferAlignment();
104 size_t aligned_offset
=
105 TruncateToPageBoundary(alignment
, static_cast<size_t>(offset
));
106 size_t offset_advance
= static_cast<size_t>(offset
) - aligned_offset
;
108 Roundup(static_cast<size_t>(offset
+ n
), alignment
) - aligned_offset
;
110 buf
.Alignment(alignment
);
111 buf
.AllocateNewBuffer(read_size
);
112 while (buf
.CurrentSize() < read_size
) {
114 if (rate_limiter_priority
!= Env::IO_TOTAL
&&
115 rate_limiter_
!= nullptr) {
116 allowed
= rate_limiter_
->RequestToken(
117 buf
.Capacity() - buf
.CurrentSize(), buf
.Alignment(),
118 rate_limiter_priority
, stats_
, RateLimiter::OpType::kRead
);
120 assert(buf
.CurrentSize() == 0);
125 FileOperationInfo::StartTimePoint start_ts
;
126 uint64_t orig_offset
= 0;
127 if (ShouldNotifyListeners()) {
128 start_ts
= FileOperationInfo::StartNow();
129 orig_offset
= aligned_offset
+ buf
.CurrentSize();
133 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos
, clock_
);
134 // Only user reads are expected to specify a timeout. And user reads
135 // are not subjected to rate_limiter and should go through only
136 // one iteration of this loop, so we don't need to check and adjust
137 // the opts.timeout before calling file_->Read
138 assert(!opts
.timeout
.count() || allowed
== read_size
);
139 io_s
= file_
->Read(aligned_offset
+ buf
.CurrentSize(), allowed
, opts
,
140 &tmp
, buf
.Destination(), nullptr);
142 if (ShouldNotifyListeners()) {
143 auto finish_ts
= FileOperationInfo::FinishNow();
144 NotifyOnFileReadFinish(orig_offset
, tmp
.size(), start_ts
, finish_ts
,
147 NotifyOnIOError(io_s
, FileOperationType::kRead
, file_name(),
148 tmp
.size(), orig_offset
);
152 buf
.Size(buf
.CurrentSize() + tmp
.size());
153 if (!io_s
.ok() || tmp
.size() < allowed
) {
158 if (io_s
.ok() && offset_advance
< buf
.CurrentSize()) {
159 res_len
= std::min(buf
.CurrentSize() - offset_advance
, n
);
160 if (aligned_buf
== nullptr) {
161 buf
.Read(scratch
, offset_advance
, res_len
);
163 scratch
= buf
.BufferStart() + offset_advance
;
164 aligned_buf
->reset(buf
.Release());
167 *result
= Slice(scratch
, res_len
);
168 #endif // !ROCKSDB_LITE
171 const char* res_scratch
= nullptr;
174 if (rate_limiter_priority
!= Env::IO_TOTAL
&&
175 rate_limiter_
!= nullptr) {
176 if (rate_limiter_
->IsRateLimited(RateLimiter::OpType::kRead
)) {
179 allowed
= rate_limiter_
->RequestToken(n
- pos
, 0 /* alignment */,
180 rate_limiter_priority
, stats_
,
181 RateLimiter::OpType::kRead
);
182 if (rate_limiter_
->IsRateLimited(RateLimiter::OpType::kRead
)) {
191 FileOperationInfo::StartTimePoint start_ts
;
192 if (ShouldNotifyListeners()) {
193 start_ts
= FileOperationInfo::StartNow();
198 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos
, clock_
);
199 // Only user reads are expected to specify a timeout. And user reads
200 // are not subjected to rate_limiter and should go through only
201 // one iteration of this loop, so we don't need to check and adjust
202 // the opts.timeout before calling file_->Read
203 assert(!opts
.timeout
.count() || allowed
== n
);
204 io_s
= file_
->Read(offset
+ pos
, allowed
, opts
, &tmp_result
,
205 scratch
+ pos
, nullptr);
208 if (ShouldNotifyListeners()) {
209 auto finish_ts
= FileOperationInfo::FinishNow();
210 NotifyOnFileReadFinish(offset
+ pos
, tmp_result
.size(), start_ts
,
214 NotifyOnIOError(io_s
, FileOperationType::kRead
, file_name(),
215 tmp_result
.size(), offset
+ pos
);
219 if (res_scratch
== nullptr) {
220 // we can't simply use `scratch` because reads of mmap'd files return
221 // data in a different buffer.
222 res_scratch
= tmp_result
.data();
224 // make sure chunks are inserted contiguously into `res_scratch`.
225 assert(tmp_result
.data() == res_scratch
+ pos
);
227 pos
+= tmp_result
.size();
228 if (!io_s
.ok() || tmp_result
.size() < allowed
) {
232 *result
= Slice(res_scratch
, io_s
.ok() ? pos
: 0);
234 RecordIOStats(stats_
, file_temperature_
, is_last_level_
, result
->size());
235 SetPerfLevel(prev_perf_level
);
237 if (stats_
!= nullptr && file_read_hist_
!= nullptr) {
238 file_read_hist_
->Add(elapsed
);
244 size_t End(const FSReadRequest
& r
) {
245 return static_cast<size_t>(r
.offset
) + r
.len
;
248 FSReadRequest
Align(const FSReadRequest
& r
, size_t alignment
) {
250 req
.offset
= static_cast<uint64_t>(
251 TruncateToPageBoundary(alignment
, static_cast<size_t>(r
.offset
)));
252 req
.len
= Roundup(End(r
), alignment
) - req
.offset
;
253 req
.scratch
= nullptr;
257 bool TryMerge(FSReadRequest
* dest
, const FSReadRequest
& src
) {
258 size_t dest_offset
= static_cast<size_t>(dest
->offset
);
259 size_t src_offset
= static_cast<size_t>(src
.offset
);
260 size_t dest_end
= End(*dest
);
261 size_t src_end
= End(src
);
262 if (std::max(dest_offset
, src_offset
) > std::min(dest_end
, src_end
)) {
265 dest
->offset
= static_cast<uint64_t>(std::min(dest_offset
, src_offset
));
266 dest
->len
= std::max(dest_end
, src_end
) - dest
->offset
;
270 IOStatus
RandomAccessFileReader::MultiRead(
271 const IOOptions
& opts
, FSReadRequest
* read_reqs
, size_t num_reqs
,
272 AlignedBuf
* aligned_buf
, Env::IOPriority rate_limiter_priority
) const {
273 (void)aligned_buf
; // suppress warning of unused variable in LITE mode
274 assert(num_reqs
> 0);
277 for (size_t i
= 0; i
< num_reqs
- 1; ++i
) {
278 assert(read_reqs
[i
].offset
<= read_reqs
[i
+ 1].offset
);
282 // To be paranoid modify scratch a little bit, so in case underlying
283 // FileSystem doesn't fill the buffer but return success and `scratch` returns
284 // contains a previous block, returned value will not pass checksum.
285 // This byte might not change anything for direct I/O case, but it's OK.
286 for (size_t i
= 0; i
< num_reqs
; i
++) {
287 FSReadRequest
& r
= read_reqs
[i
];
288 if (r
.len
> 0 && r
.scratch
!= nullptr) {
294 uint64_t elapsed
= 0;
296 StopWatch
sw(clock_
, stats_
, hist_type_
,
297 (stats_
!= nullptr) ? &elapsed
: nullptr, true /*overwrite*/,
298 true /*delay_enabled*/);
299 auto prev_perf_level
= GetPerfLevel();
300 IOSTATS_TIMER_GUARD(read_nanos
);
302 FSReadRequest
* fs_reqs
= read_reqs
;
303 size_t num_fs_reqs
= num_reqs
;
305 std::vector
<FSReadRequest
> aligned_reqs
;
306 if (use_direct_io()) {
307 // num_reqs is the max possible size,
308 // this can reduce std::vecector's internal resize operations.
309 aligned_reqs
.reserve(num_reqs
);
310 // Align and merge the read requests.
311 size_t alignment
= file_
->GetRequiredBufferAlignment();
312 for (size_t i
= 0; i
< num_reqs
; i
++) {
313 const auto& r
= Align(read_reqs
[i
], alignment
);
316 aligned_reqs
.push_back(r
);
318 } else if (!TryMerge(&aligned_reqs
.back(), r
)) {
320 aligned_reqs
.push_back(r
);
324 r
.status
.PermitUncheckedError();
327 TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::MultiRead:AlignedReqs",
330 // Allocate aligned buffer and let scratch buffers point to it.
331 size_t total_len
= 0;
332 for (const auto& r
: aligned_reqs
) {
336 buf
.Alignment(alignment
);
337 buf
.AllocateNewBuffer(total_len
);
338 char* scratch
= buf
.BufferStart();
339 for (auto& r
: aligned_reqs
) {
344 aligned_buf
->reset(buf
.Release());
345 fs_reqs
= aligned_reqs
.data();
346 num_fs_reqs
= aligned_reqs
.size();
348 #endif // ROCKSDB_LITE
351 FileOperationInfo::StartTimePoint start_ts
;
352 if (ShouldNotifyListeners()) {
353 start_ts
= FileOperationInfo::StartNow();
355 #endif // ROCKSDB_LITE
358 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos
, clock_
);
359 if (rate_limiter_priority
!= Env::IO_TOTAL
&& rate_limiter_
!= nullptr) {
360 // TODO: ideally we should call `RateLimiter::RequestToken()` for
361 // allowed bytes to multi-read and then consume those bytes by
362 // satisfying as many requests in `MultiRead()` as possible, instead of
363 // what we do here, which can cause burst when the
364 // `total_multi_read_size` is big.
365 size_t total_multi_read_size
= 0;
366 assert(fs_reqs
!= nullptr);
367 for (size_t i
= 0; i
< num_fs_reqs
; ++i
) {
368 FSReadRequest
& req
= fs_reqs
[i
];
369 total_multi_read_size
+= req
.len
;
371 size_t remaining_bytes
= total_multi_read_size
;
372 size_t request_bytes
= 0;
373 while (remaining_bytes
> 0) {
374 request_bytes
= std::min(
375 static_cast<size_t>(rate_limiter_
->GetSingleBurstBytes()),
377 rate_limiter_
->Request(request_bytes
, rate_limiter_priority
,
379 RateLimiter::OpType::kRead
);
380 remaining_bytes
-= request_bytes
;
383 io_s
= file_
->MultiRead(fs_reqs
, num_fs_reqs
, opts
, nullptr);
384 RecordInHistogram(stats_
, MULTIGET_IO_BATCH_SIZE
, num_fs_reqs
);
388 if (use_direct_io()) {
389 // Populate results in the unaligned read requests.
390 size_t aligned_i
= 0;
391 for (size_t i
= 0; i
< num_reqs
; i
++) {
392 auto& r
= read_reqs
[i
];
393 if (static_cast<size_t>(r
.offset
) > End(aligned_reqs
[aligned_i
])) {
396 const auto& fs_r
= fs_reqs
[aligned_i
];
397 r
.status
= fs_r
.status
;
399 uint64_t offset
= r
.offset
- fs_r
.offset
;
400 if (fs_r
.result
.size() <= offset
) {
401 // No byte in the read range is returned.
404 size_t len
= std::min(
405 r
.len
, static_cast<size_t>(fs_r
.result
.size() - offset
));
406 r
.result
= Slice(fs_r
.scratch
+ offset
, len
);
413 #endif // ROCKSDB_LITE
415 for (size_t i
= 0; i
< num_reqs
; ++i
) {
417 if (ShouldNotifyListeners()) {
418 auto finish_ts
= FileOperationInfo::FinishNow();
419 NotifyOnFileReadFinish(read_reqs
[i
].offset
, read_reqs
[i
].result
.size(),
420 start_ts
, finish_ts
, read_reqs
[i
].status
);
422 if (!read_reqs
[i
].status
.ok()) {
423 NotifyOnIOError(read_reqs
[i
].status
, FileOperationType::kRead
,
424 file_name(), read_reqs
[i
].result
.size(),
425 read_reqs
[i
].offset
);
428 #endif // ROCKSDB_LITE
429 RecordIOStats(stats_
, file_temperature_
, is_last_level_
,
430 read_reqs
[i
].result
.size());
432 SetPerfLevel(prev_perf_level
);
434 if (stats_
!= nullptr && file_read_hist_
!= nullptr) {
435 file_read_hist_
->Add(elapsed
);
441 IOStatus
RandomAccessFileReader::PrepareIOOptions(const ReadOptions
& ro
,
443 if (clock_
!= nullptr) {
444 return PrepareIOFromReadOptions(ro
, clock_
, opts
);
446 return PrepareIOFromReadOptions(ro
, SystemClock::Default().get(), opts
);
450 IOStatus
RandomAccessFileReader::ReadAsync(
451 FSReadRequest
& req
, const IOOptions
& opts
,
452 std::function
<void(const FSReadRequest
&, void*)> cb
, void* cb_arg
,
453 void** io_handle
, IOHandleDeleter
* del_fn
, AlignedBuf
* aligned_buf
) {
455 // Create a callback and populate info.
456 auto read_async_callback
=
457 std::bind(&RandomAccessFileReader::ReadAsyncCallback
, this,
458 std::placeholders::_1
, std::placeholders::_2
);
459 ReadAsyncInfo
* read_async_info
=
460 new ReadAsyncInfo(cb
, cb_arg
, clock_
->NowMicros());
463 if (ShouldNotifyListeners()) {
464 read_async_info
->fs_start_ts_
= FileOperationInfo::StartNow();
468 size_t alignment
= file_
->GetRequiredBufferAlignment();
469 bool is_aligned
= (req
.offset
& (alignment
- 1)) == 0 &&
470 (req
.len
& (alignment
- 1)) == 0 &&
471 (uintptr_t(req
.scratch
) & (alignment
- 1)) == 0;
472 read_async_info
->is_aligned_
= is_aligned
;
474 uint64_t elapsed
= 0;
475 if (use_direct_io() && is_aligned
== false) {
476 FSReadRequest aligned_req
= Align(req
, alignment
);
477 aligned_req
.status
.PermitUncheckedError();
479 // Allocate aligned buffer.
480 read_async_info
->buf_
.Alignment(alignment
);
481 read_async_info
->buf_
.AllocateNewBuffer(aligned_req
.len
);
483 // Set rem fields in aligned FSReadRequest.
484 aligned_req
.scratch
= read_async_info
->buf_
.BufferStart();
486 // Set user provided fields to populate back in callback.
487 read_async_info
->user_scratch_
= req
.scratch
;
488 read_async_info
->user_aligned_buf_
= aligned_buf
;
489 read_async_info
->user_len_
= req
.len
;
490 read_async_info
->user_offset_
= req
.offset
;
491 read_async_info
->user_result_
= req
.result
;
493 assert(read_async_info
->buf_
.CurrentSize() == 0);
495 StopWatch
sw(clock_
, nullptr /*stats*/, 0 /*hist_type*/, &elapsed
,
496 true /*overwrite*/, true /*delay_enabled*/);
497 s
= file_
->ReadAsync(aligned_req
, opts
, read_async_callback
,
498 read_async_info
, io_handle
, del_fn
, nullptr /*dbg*/);
500 StopWatch
sw(clock_
, nullptr /*stats*/, 0 /*hist_type*/, &elapsed
,
501 true /*overwrite*/, true /*delay_enabled*/);
502 s
= file_
->ReadAsync(req
, opts
, read_async_callback
, read_async_info
,
503 io_handle
, del_fn
, nullptr /*dbg*/);
505 RecordTick(stats_
, READ_ASYNC_MICROS
, elapsed
);
507 // Suppress false positive clang analyzer warnings.
508 // Memory is not released if file_->ReadAsync returns !s.ok(), because
509 // ReadAsyncCallback is never called in that case. If ReadAsyncCallback is
510 // called then ReadAsync should always return IOStatus::OK().
511 #ifndef __clang_analyzer__
513 delete read_async_info
;
515 #endif // __clang_analyzer__
520 void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest
& req
,
522 ReadAsyncInfo
* read_async_info
= static_cast<ReadAsyncInfo
*>(cb_arg
);
523 assert(read_async_info
);
524 assert(read_async_info
->cb_
);
526 if (use_direct_io() && read_async_info
->is_aligned_
== false) {
527 // Create FSReadRequest with user provided fields.
528 FSReadRequest user_req
;
529 user_req
.scratch
= read_async_info
->user_scratch_
;
530 user_req
.offset
= read_async_info
->user_offset_
;
531 user_req
.len
= read_async_info
->user_len_
;
533 // Update results in user_req.
534 user_req
.result
= req
.result
;
535 user_req
.status
= req
.status
;
537 read_async_info
->buf_
.Size(read_async_info
->buf_
.CurrentSize() +
540 size_t offset_advance_len
= static_cast<size_t>(
541 /*offset_passed_by_user=*/read_async_info
->user_offset_
-
542 /*aligned_offset=*/req
.offset
);
545 if (req
.status
.ok() &&
546 offset_advance_len
< read_async_info
->buf_
.CurrentSize()) {
548 std::min(read_async_info
->buf_
.CurrentSize() - offset_advance_len
,
549 read_async_info
->user_len_
);
550 if (read_async_info
->user_aligned_buf_
== nullptr) {
551 // Copy the data into user's scratch.
552 // Clang analyzer assumes that it will take use_direct_io() == false in
553 // ReadAsync and use_direct_io() == true in Callback which cannot be true.
554 #ifndef __clang_analyzer__
555 read_async_info
->buf_
.Read(user_req
.scratch
, offset_advance_len
,
557 #endif // __clang_analyzer__
559 // Set aligned_buf provided by user without additional copy.
561 read_async_info
->buf_
.BufferStart() + offset_advance_len
;
562 read_async_info
->user_aligned_buf_
->reset(
563 read_async_info
->buf_
.Release());
565 user_req
.result
= Slice(user_req
.scratch
, res_len
);
567 // Either req.status is not ok or data was not read.
568 user_req
.result
= Slice();
570 read_async_info
->cb_(user_req
, read_async_info
->cb_arg_
);
572 read_async_info
->cb_(req
, read_async_info
->cb_arg_
);
575 // Update stats and notify listeners.
576 if (stats_
!= nullptr && file_read_hist_
!= nullptr) {
577 // elapsed doesn't take into account delay and overwrite as StopWatch does
579 uint64_t elapsed
= clock_
->NowMicros() - read_async_info
->start_time_
;
580 file_read_hist_
->Add(elapsed
);
582 if (req
.status
.ok()) {
583 RecordInHistogram(stats_
, ASYNC_READ_BYTES
, req
.result
.size());
584 } else if (!req
.status
.IsAborted()) {
585 RecordTick(stats_
, ASYNC_READ_ERROR_COUNT
, 1);
588 if (ShouldNotifyListeners()) {
589 auto finish_ts
= FileOperationInfo::FinishNow();
590 NotifyOnFileReadFinish(req
.offset
, req
.result
.size(),
591 read_async_info
->fs_start_ts_
, finish_ts
,
594 if (!req
.status
.ok()) {
595 NotifyOnIOError(req
.status
, FileOperationType::kRead
, file_name(),
596 req
.result
.size(), req
.offset
);
599 RecordIOStats(stats_
, file_temperature_
, is_last_level_
, req
.result
.size());
600 delete read_async_info
;
602 } // namespace ROCKSDB_NAMESPACE