]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/file/random_access_file_reader.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "file/random_access_file_reader.h"
15 #include "monitoring/histogram.h"
16 #include "monitoring/iostats_context_imp.h"
17 #include "port/port.h"
18 #include "test_util/sync_point.h"
19 #include "util/random.h"
20 #include "util/rate_limiter.h"
22 namespace ROCKSDB_NAMESPACE
{
23 Status
RandomAccessFileReader::Read(uint64_t offset
, size_t n
, Slice
* result
,
24 char* scratch
, bool for_compaction
) const {
28 StopWatch
sw(env_
, stats_
, hist_type_
,
29 (stats_
!= nullptr) ? &elapsed
: nullptr, true /*overwrite*/,
30 true /*delay_enabled*/);
31 auto prev_perf_level
= GetPerfLevel();
32 IOSTATS_TIMER_GUARD(read_nanos
);
33 if (use_direct_io()) {
35 size_t alignment
= file_
->GetRequiredBufferAlignment();
36 size_t aligned_offset
=
37 TruncateToPageBoundary(alignment
, static_cast<size_t>(offset
));
38 size_t offset_advance
= static_cast<size_t>(offset
) - aligned_offset
;
40 Roundup(static_cast<size_t>(offset
+ n
), alignment
) - aligned_offset
;
42 buf
.Alignment(alignment
);
43 buf
.AllocateNewBuffer(read_size
);
44 while (buf
.CurrentSize() < read_size
) {
46 if (for_compaction
&& rate_limiter_
!= nullptr) {
47 allowed
= rate_limiter_
->RequestToken(
48 buf
.Capacity() - buf
.CurrentSize(), buf
.Alignment(),
49 Env::IOPriority::IO_LOW
, stats_
, RateLimiter::OpType::kRead
);
51 assert(buf
.CurrentSize() == 0);
56 FileOperationInfo::TimePoint start_ts
;
57 uint64_t orig_offset
= 0;
58 if (ShouldNotifyListeners()) {
59 start_ts
= std::chrono::system_clock::now();
60 orig_offset
= aligned_offset
+ buf
.CurrentSize();
63 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos
, env_
);
64 s
= file_
->Read(aligned_offset
+ buf
.CurrentSize(), allowed
,
65 IOOptions(), &tmp
, buf
.Destination(), nullptr);
67 if (ShouldNotifyListeners()) {
68 auto finish_ts
= std::chrono::system_clock::now();
69 NotifyOnFileReadFinish(orig_offset
, tmp
.size(), start_ts
, finish_ts
,
73 buf
.Size(buf
.CurrentSize() + tmp
.size());
74 if (!s
.ok() || tmp
.size() < allowed
) {
79 if (s
.ok() && offset_advance
< buf
.CurrentSize()) {
80 res_len
= buf
.Read(scratch
, offset_advance
,
81 std::min(buf
.CurrentSize() - offset_advance
, n
));
83 *result
= Slice(scratch
, res_len
);
84 #endif // !ROCKSDB_LITE
87 const char* res_scratch
= nullptr;
90 if (for_compaction
&& rate_limiter_
!= nullptr) {
91 if (rate_limiter_
->IsRateLimited(RateLimiter::OpType::kRead
)) {
94 allowed
= rate_limiter_
->RequestToken(n
- pos
, 0 /* alignment */,
95 Env::IOPriority::IO_LOW
, stats_
,
96 RateLimiter::OpType::kRead
);
97 if (rate_limiter_
->IsRateLimited(RateLimiter::OpType::kRead
)) {
106 FileOperationInfo::TimePoint start_ts
;
107 if (ShouldNotifyListeners()) {
108 start_ts
= std::chrono::system_clock::now();
112 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos
, env_
);
113 s
= file_
->Read(offset
+ pos
, allowed
, IOOptions(), &tmp_result
,
114 scratch
+ pos
, nullptr);
117 if (ShouldNotifyListeners()) {
118 auto finish_ts
= std::chrono::system_clock::now();
119 NotifyOnFileReadFinish(offset
+ pos
, tmp_result
.size(), start_ts
,
124 if (res_scratch
== nullptr) {
125 // we can't simply use `scratch` because reads of mmap'd files return
126 // data in a different buffer.
127 res_scratch
= tmp_result
.data();
129 // make sure chunks are inserted contiguously into `res_scratch`.
130 assert(tmp_result
.data() == res_scratch
+ pos
);
132 pos
+= tmp_result
.size();
133 if (!s
.ok() || tmp_result
.size() < allowed
) {
137 *result
= Slice(res_scratch
, s
.ok() ? pos
: 0);
139 IOSTATS_ADD_IF_POSITIVE(bytes_read
, result
->size());
140 SetPerfLevel(prev_perf_level
);
142 if (stats_
!= nullptr && file_read_hist_
!= nullptr) {
143 file_read_hist_
->Add(elapsed
);
149 Status
RandomAccessFileReader::MultiRead(FSReadRequest
* read_reqs
,
150 size_t num_reqs
) const {
152 uint64_t elapsed
= 0;
153 assert(!use_direct_io());
155 StopWatch
sw(env_
, stats_
, hist_type_
,
156 (stats_
!= nullptr) ? &elapsed
: nullptr, true /*overwrite*/,
157 true /*delay_enabled*/);
158 auto prev_perf_level
= GetPerfLevel();
159 IOSTATS_TIMER_GUARD(read_nanos
);
162 FileOperationInfo::TimePoint start_ts
;
163 if (ShouldNotifyListeners()) {
164 start_ts
= std::chrono::system_clock::now();
166 #endif // ROCKSDB_LITE
168 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos
, env_
);
169 s
= file_
->MultiRead(read_reqs
, num_reqs
, IOOptions(), nullptr);
171 for (size_t i
= 0; i
< num_reqs
; ++i
) {
173 if (ShouldNotifyListeners()) {
174 auto finish_ts
= std::chrono::system_clock::now();
175 NotifyOnFileReadFinish(read_reqs
[i
].offset
, read_reqs
[i
].result
.size(),
176 start_ts
, finish_ts
, read_reqs
[i
].status
);
178 #endif // ROCKSDB_LITE
179 IOSTATS_ADD_IF_POSITIVE(bytes_read
, read_reqs
[i
].result
.size());
181 SetPerfLevel(prev_perf_level
);
183 if (stats_
!= nullptr && file_read_hist_
!= nullptr) {
184 file_read_hist_
->Add(elapsed
);
189 } // namespace ROCKSDB_NAMESPACE