]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/file/random_access_file_reader.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "file/random_access_file_reader.h"
15 #include "monitoring/histogram.h"
16 #include "monitoring/iostats_context_imp.h"
17 #include "port/port.h"
18 #include "table/format.h"
19 #include "test_util/sync_point.h"
20 #include "util/random.h"
21 #include "util/rate_limiter.h"
23 namespace ROCKSDB_NAMESPACE
{
25 Status
RandomAccessFileReader::Read(const IOOptions
& opts
, uint64_t offset
,
26 size_t n
, Slice
* result
, char* scratch
,
27 AlignedBuf
* aligned_buf
,
28 bool for_compaction
) const {
31 TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
35 StopWatch
sw(env_
, stats_
, hist_type_
,
36 (stats_
!= nullptr) ? &elapsed
: nullptr, true /*overwrite*/,
37 true /*delay_enabled*/);
38 auto prev_perf_level
= GetPerfLevel();
39 IOSTATS_TIMER_GUARD(read_nanos
);
40 if (use_direct_io()) {
42 size_t alignment
= file_
->GetRequiredBufferAlignment();
43 size_t aligned_offset
=
44 TruncateToPageBoundary(alignment
, static_cast<size_t>(offset
));
45 size_t offset_advance
= static_cast<size_t>(offset
) - aligned_offset
;
47 Roundup(static_cast<size_t>(offset
+ n
), alignment
) - aligned_offset
;
49 buf
.Alignment(alignment
);
50 buf
.AllocateNewBuffer(read_size
);
51 while (buf
.CurrentSize() < read_size
) {
53 if (for_compaction
&& rate_limiter_
!= nullptr) {
54 allowed
= rate_limiter_
->RequestToken(
55 buf
.Capacity() - buf
.CurrentSize(), buf
.Alignment(),
56 Env::IOPriority::IO_LOW
, stats_
, RateLimiter::OpType::kRead
);
58 assert(buf
.CurrentSize() == 0);
63 FileOperationInfo::StartTimePoint start_ts
;
64 uint64_t orig_offset
= 0;
65 if (ShouldNotifyListeners()) {
66 start_ts
= FileOperationInfo::StartNow();
67 orig_offset
= aligned_offset
+ buf
.CurrentSize();
71 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos
, env_
);
72 // Only user reads are expected to specify a timeout. And user reads
73 // are not subjected to rate_limiter and should go through only
74 // one iteration of this loop, so we don't need to check and adjust
75 // the opts.timeout before calling file_->Read
76 assert(!opts
.timeout
.count() || allowed
== read_size
);
77 s
= file_
->Read(aligned_offset
+ buf
.CurrentSize(), allowed
, opts
,
78 &tmp
, buf
.Destination(), nullptr);
80 if (ShouldNotifyListeners()) {
81 auto finish_ts
= FileOperationInfo::FinishNow();
82 NotifyOnFileReadFinish(orig_offset
, tmp
.size(), start_ts
, finish_ts
,
86 buf
.Size(buf
.CurrentSize() + tmp
.size());
87 if (!s
.ok() || tmp
.size() < allowed
) {
92 if (s
.ok() && offset_advance
< buf
.CurrentSize()) {
93 res_len
= std::min(buf
.CurrentSize() - offset_advance
, n
);
94 if (aligned_buf
== nullptr) {
95 buf
.Read(scratch
, offset_advance
, res_len
);
97 scratch
= buf
.BufferStart() + offset_advance
;
98 aligned_buf
->reset(buf
.Release());
101 *result
= Slice(scratch
, res_len
);
102 #endif // !ROCKSDB_LITE
105 const char* res_scratch
= nullptr;
108 if (for_compaction
&& rate_limiter_
!= nullptr) {
109 if (rate_limiter_
->IsRateLimited(RateLimiter::OpType::kRead
)) {
112 allowed
= rate_limiter_
->RequestToken(n
- pos
, 0 /* alignment */,
113 Env::IOPriority::IO_LOW
, stats_
,
114 RateLimiter::OpType::kRead
);
115 if (rate_limiter_
->IsRateLimited(RateLimiter::OpType::kRead
)) {
124 FileOperationInfo::StartTimePoint start_ts
;
125 if (ShouldNotifyListeners()) {
126 start_ts
= FileOperationInfo::StartNow();
131 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos
, env_
);
132 // Only user reads are expected to specify a timeout. And user reads
133 // are not subjected to rate_limiter and should go through only
134 // one iteration of this loop, so we don't need to check and adjust
135 // the opts.timeout before calling file_->Read
136 assert(!opts
.timeout
.count() || allowed
== n
);
137 s
= file_
->Read(offset
+ pos
, allowed
, opts
, &tmp_result
,
138 scratch
+ pos
, nullptr);
141 if (ShouldNotifyListeners()) {
142 auto finish_ts
= FileOperationInfo::FinishNow();
143 NotifyOnFileReadFinish(offset
+ pos
, tmp_result
.size(), start_ts
,
148 if (res_scratch
== nullptr) {
149 // we can't simply use `scratch` because reads of mmap'd files return
150 // data in a different buffer.
151 res_scratch
= tmp_result
.data();
153 // make sure chunks are inserted contiguously into `res_scratch`.
154 assert(tmp_result
.data() == res_scratch
+ pos
);
156 pos
+= tmp_result
.size();
157 if (!s
.ok() || tmp_result
.size() < allowed
) {
161 *result
= Slice(res_scratch
, s
.ok() ? pos
: 0);
163 IOSTATS_ADD_IF_POSITIVE(bytes_read
, result
->size());
164 SetPerfLevel(prev_perf_level
);
166 if (stats_
!= nullptr && file_read_hist_
!= nullptr) {
167 file_read_hist_
->Add(elapsed
);
173 size_t End(const FSReadRequest
& r
) {
174 return static_cast<size_t>(r
.offset
) + r
.len
;
177 FSReadRequest
Align(const FSReadRequest
& r
, size_t alignment
) {
179 req
.offset
= static_cast<uint64_t>(
180 TruncateToPageBoundary(alignment
, static_cast<size_t>(r
.offset
)));
181 req
.len
= Roundup(End(r
), alignment
) - req
.offset
;
182 req
.scratch
= nullptr;
186 bool TryMerge(FSReadRequest
* dest
, const FSReadRequest
& src
) {
187 size_t dest_offset
= static_cast<size_t>(dest
->offset
);
188 size_t src_offset
= static_cast<size_t>(src
.offset
);
189 size_t dest_end
= End(*dest
);
190 size_t src_end
= End(src
);
191 if (std::max(dest_offset
, src_offset
) > std::min(dest_end
, src_end
)) {
194 dest
->offset
= static_cast<uint64_t>(std::min(dest_offset
, src_offset
));
195 dest
->len
= std::max(dest_end
, src_end
) - dest
->offset
;
199 Status
RandomAccessFileReader::MultiRead(const IOOptions
& opts
,
200 FSReadRequest
* read_reqs
,
202 AlignedBuf
* aligned_buf
) const {
203 (void)aligned_buf
; // suppress warning of unused variable in LITE mode
204 assert(num_reqs
> 0);
206 uint64_t elapsed
= 0;
208 StopWatch
sw(env_
, stats_
, hist_type_
,
209 (stats_
!= nullptr) ? &elapsed
: nullptr, true /*overwrite*/,
210 true /*delay_enabled*/);
211 auto prev_perf_level
= GetPerfLevel();
212 IOSTATS_TIMER_GUARD(read_nanos
);
214 FSReadRequest
* fs_reqs
= read_reqs
;
215 size_t num_fs_reqs
= num_reqs
;
217 std::vector
<FSReadRequest
> aligned_reqs
;
218 if (use_direct_io()) {
219 // num_reqs is the max possible size,
220 // this can reduce std::vecector's internal resize operations.
221 aligned_reqs
.reserve(num_reqs
);
222 // Align and merge the read requests.
223 size_t alignment
= file_
->GetRequiredBufferAlignment();
224 aligned_reqs
.push_back(Align(read_reqs
[0], alignment
));
225 for (size_t i
= 1; i
< num_reqs
; i
++) {
226 const auto& r
= Align(read_reqs
[i
], alignment
);
227 if (!TryMerge(&aligned_reqs
.back(), r
)) {
228 aligned_reqs
.push_back(r
);
231 TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::MultiRead:AlignedReqs",
234 // Allocate aligned buffer and let scratch buffers point to it.
235 size_t total_len
= 0;
236 for (const auto& r
: aligned_reqs
) {
240 buf
.Alignment(alignment
);
241 buf
.AllocateNewBuffer(total_len
);
242 char* scratch
= buf
.BufferStart();
243 for (auto& r
: aligned_reqs
) {
248 aligned_buf
->reset(buf
.Release());
249 fs_reqs
= aligned_reqs
.data();
250 num_fs_reqs
= aligned_reqs
.size();
252 #endif // ROCKSDB_LITE
255 FileOperationInfo::StartTimePoint start_ts
;
256 if (ShouldNotifyListeners()) {
257 start_ts
= FileOperationInfo::StartNow();
259 #endif // ROCKSDB_LITE
262 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos
, env_
);
263 s
= file_
->MultiRead(fs_reqs
, num_fs_reqs
, opts
, nullptr);
267 if (use_direct_io()) {
268 // Populate results in the unaligned read requests.
269 size_t aligned_i
= 0;
270 for (size_t i
= 0; i
< num_reqs
; i
++) {
271 auto& r
= read_reqs
[i
];
272 if (static_cast<size_t>(r
.offset
) > End(aligned_reqs
[aligned_i
])) {
275 const auto& fs_r
= fs_reqs
[aligned_i
];
276 r
.status
= fs_r
.status
;
278 uint64_t offset
= r
.offset
- fs_r
.offset
;
279 size_t len
= std::min(r
.len
, static_cast<size_t>(fs_r
.len
- offset
));
280 r
.result
= Slice(fs_r
.scratch
+ offset
, len
);
286 #endif // ROCKSDB_LITE
288 for (size_t i
= 0; i
< num_reqs
; ++i
) {
290 if (ShouldNotifyListeners()) {
291 auto finish_ts
= FileOperationInfo::FinishNow();
292 NotifyOnFileReadFinish(read_reqs
[i
].offset
, read_reqs
[i
].result
.size(),
293 start_ts
, finish_ts
, read_reqs
[i
].status
);
295 #endif // ROCKSDB_LITE
296 IOSTATS_ADD_IF_POSITIVE(bytes_read
, read_reqs
[i
].result
.size());
298 SetPerfLevel(prev_perf_level
);
300 if (stats_
!= nullptr && file_read_hist_
!= nullptr) {
301 file_read_hist_
->Add(elapsed
);
307 } // namespace ROCKSDB_NAMESPACE