]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/file/random_access_file_reader.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / file / random_access_file_reader.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "file/random_access_file_reader.h"
11
12 #include <algorithm>
13 #include <mutex>
14
15 #include "monitoring/histogram.h"
16 #include "monitoring/iostats_context_imp.h"
17 #include "port/port.h"
18 #include "table/format.h"
19 #include "test_util/sync_point.h"
20 #include "util/random.h"
21 #include "util/rate_limiter.h"
22
23 namespace ROCKSDB_NAMESPACE {
24
25 Status RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
26 size_t n, Slice* result, char* scratch,
27 AlignedBuf* aligned_buf,
28 bool for_compaction) const {
29 (void)aligned_buf;
30
31 TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
32 Status s;
33 uint64_t elapsed = 0;
34 {
35 StopWatch sw(env_, stats_, hist_type_,
36 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
37 true /*delay_enabled*/);
38 auto prev_perf_level = GetPerfLevel();
39 IOSTATS_TIMER_GUARD(read_nanos);
40 if (use_direct_io()) {
41 #ifndef ROCKSDB_LITE
42 size_t alignment = file_->GetRequiredBufferAlignment();
43 size_t aligned_offset =
44 TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
45 size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
46 size_t read_size =
47 Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
48 AlignedBuffer buf;
49 buf.Alignment(alignment);
50 buf.AllocateNewBuffer(read_size);
51 while (buf.CurrentSize() < read_size) {
52 size_t allowed;
53 if (for_compaction && rate_limiter_ != nullptr) {
54 allowed = rate_limiter_->RequestToken(
55 buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
56 Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
57 } else {
58 assert(buf.CurrentSize() == 0);
59 allowed = read_size;
60 }
61 Slice tmp;
62
63 FileOperationInfo::StartTimePoint start_ts;
64 uint64_t orig_offset = 0;
65 if (ShouldNotifyListeners()) {
66 start_ts = FileOperationInfo::StartNow();
67 orig_offset = aligned_offset + buf.CurrentSize();
68 }
69
70 {
71 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
72 // Only user reads are expected to specify a timeout. And user reads
73 // are not subjected to rate_limiter and should go through only
74 // one iteration of this loop, so we don't need to check and adjust
75 // the opts.timeout before calling file_->Read
76 assert(!opts.timeout.count() || allowed == read_size);
77 s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts,
78 &tmp, buf.Destination(), nullptr);
79 }
80 if (ShouldNotifyListeners()) {
81 auto finish_ts = FileOperationInfo::FinishNow();
82 NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
83 s);
84 }
85
86 buf.Size(buf.CurrentSize() + tmp.size());
87 if (!s.ok() || tmp.size() < allowed) {
88 break;
89 }
90 }
91 size_t res_len = 0;
92 if (s.ok() && offset_advance < buf.CurrentSize()) {
93 res_len = std::min(buf.CurrentSize() - offset_advance, n);
94 if (aligned_buf == nullptr) {
95 buf.Read(scratch, offset_advance, res_len);
96 } else {
97 scratch = buf.BufferStart() + offset_advance;
98 aligned_buf->reset(buf.Release());
99 }
100 }
101 *result = Slice(scratch, res_len);
102 #endif // !ROCKSDB_LITE
103 } else {
104 size_t pos = 0;
105 const char* res_scratch = nullptr;
106 while (pos < n) {
107 size_t allowed;
108 if (for_compaction && rate_limiter_ != nullptr) {
109 if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
110 sw.DelayStart();
111 }
112 allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
113 Env::IOPriority::IO_LOW, stats_,
114 RateLimiter::OpType::kRead);
115 if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
116 sw.DelayStop();
117 }
118 } else {
119 allowed = n;
120 }
121 Slice tmp_result;
122
123 #ifndef ROCKSDB_LITE
124 FileOperationInfo::StartTimePoint start_ts;
125 if (ShouldNotifyListeners()) {
126 start_ts = FileOperationInfo::StartNow();
127 }
128 #endif
129
130 {
131 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
132 // Only user reads are expected to specify a timeout. And user reads
133 // are not subjected to rate_limiter and should go through only
134 // one iteration of this loop, so we don't need to check and adjust
135 // the opts.timeout before calling file_->Read
136 assert(!opts.timeout.count() || allowed == n);
137 s = file_->Read(offset + pos, allowed, opts, &tmp_result,
138 scratch + pos, nullptr);
139 }
140 #ifndef ROCKSDB_LITE
141 if (ShouldNotifyListeners()) {
142 auto finish_ts = FileOperationInfo::FinishNow();
143 NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
144 finish_ts, s);
145 }
146 #endif
147
148 if (res_scratch == nullptr) {
149 // we can't simply use `scratch` because reads of mmap'd files return
150 // data in a different buffer.
151 res_scratch = tmp_result.data();
152 } else {
153 // make sure chunks are inserted contiguously into `res_scratch`.
154 assert(tmp_result.data() == res_scratch + pos);
155 }
156 pos += tmp_result.size();
157 if (!s.ok() || tmp_result.size() < allowed) {
158 break;
159 }
160 }
161 *result = Slice(res_scratch, s.ok() ? pos : 0);
162 }
163 IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
164 SetPerfLevel(prev_perf_level);
165 }
166 if (stats_ != nullptr && file_read_hist_ != nullptr) {
167 file_read_hist_->Add(elapsed);
168 }
169
170 return s;
171 }
172
173 size_t End(const FSReadRequest& r) {
174 return static_cast<size_t>(r.offset) + r.len;
175 }
176
177 FSReadRequest Align(const FSReadRequest& r, size_t alignment) {
178 FSReadRequest req;
179 req.offset = static_cast<uint64_t>(
180 TruncateToPageBoundary(alignment, static_cast<size_t>(r.offset)));
181 req.len = Roundup(End(r), alignment) - req.offset;
182 req.scratch = nullptr;
183 return req;
184 }
185
186 bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
187 size_t dest_offset = static_cast<size_t>(dest->offset);
188 size_t src_offset = static_cast<size_t>(src.offset);
189 size_t dest_end = End(*dest);
190 size_t src_end = End(src);
191 if (std::max(dest_offset, src_offset) > std::min(dest_end, src_end)) {
192 return false;
193 }
194 dest->offset = static_cast<uint64_t>(std::min(dest_offset, src_offset));
195 dest->len = std::max(dest_end, src_end) - dest->offset;
196 return true;
197 }
198
199 Status RandomAccessFileReader::MultiRead(const IOOptions& opts,
200 FSReadRequest* read_reqs,
201 size_t num_reqs,
202 AlignedBuf* aligned_buf) const {
203 (void)aligned_buf; // suppress warning of unused variable in LITE mode
204 assert(num_reqs > 0);
205 Status s;
206 uint64_t elapsed = 0;
207 {
208 StopWatch sw(env_, stats_, hist_type_,
209 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
210 true /*delay_enabled*/);
211 auto prev_perf_level = GetPerfLevel();
212 IOSTATS_TIMER_GUARD(read_nanos);
213
214 FSReadRequest* fs_reqs = read_reqs;
215 size_t num_fs_reqs = num_reqs;
216 #ifndef ROCKSDB_LITE
217 std::vector<FSReadRequest> aligned_reqs;
218 if (use_direct_io()) {
219 // num_reqs is the max possible size,
220 // this can reduce std::vecector's internal resize operations.
221 aligned_reqs.reserve(num_reqs);
222 // Align and merge the read requests.
223 size_t alignment = file_->GetRequiredBufferAlignment();
224 aligned_reqs.push_back(Align(read_reqs[0], alignment));
225 for (size_t i = 1; i < num_reqs; i++) {
226 const auto& r = Align(read_reqs[i], alignment);
227 if (!TryMerge(&aligned_reqs.back(), r)) {
228 aligned_reqs.push_back(r);
229 }
230 }
231 TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::MultiRead:AlignedReqs",
232 &aligned_reqs);
233
234 // Allocate aligned buffer and let scratch buffers point to it.
235 size_t total_len = 0;
236 for (const auto& r : aligned_reqs) {
237 total_len += r.len;
238 }
239 AlignedBuffer buf;
240 buf.Alignment(alignment);
241 buf.AllocateNewBuffer(total_len);
242 char* scratch = buf.BufferStart();
243 for (auto& r : aligned_reqs) {
244 r.scratch = scratch;
245 scratch += r.len;
246 }
247
248 aligned_buf->reset(buf.Release());
249 fs_reqs = aligned_reqs.data();
250 num_fs_reqs = aligned_reqs.size();
251 }
252 #endif // ROCKSDB_LITE
253
254 #ifndef ROCKSDB_LITE
255 FileOperationInfo::StartTimePoint start_ts;
256 if (ShouldNotifyListeners()) {
257 start_ts = FileOperationInfo::StartNow();
258 }
259 #endif // ROCKSDB_LITE
260
261 {
262 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
263 s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr);
264 }
265
266 #ifndef ROCKSDB_LITE
267 if (use_direct_io()) {
268 // Populate results in the unaligned read requests.
269 size_t aligned_i = 0;
270 for (size_t i = 0; i < num_reqs; i++) {
271 auto& r = read_reqs[i];
272 if (static_cast<size_t>(r.offset) > End(aligned_reqs[aligned_i])) {
273 aligned_i++;
274 }
275 const auto& fs_r = fs_reqs[aligned_i];
276 r.status = fs_r.status;
277 if (r.status.ok()) {
278 uint64_t offset = r.offset - fs_r.offset;
279 size_t len = std::min(r.len, static_cast<size_t>(fs_r.len - offset));
280 r.result = Slice(fs_r.scratch + offset, len);
281 } else {
282 r.result = Slice();
283 }
284 }
285 }
286 #endif // ROCKSDB_LITE
287
288 for (size_t i = 0; i < num_reqs; ++i) {
289 #ifndef ROCKSDB_LITE
290 if (ShouldNotifyListeners()) {
291 auto finish_ts = FileOperationInfo::FinishNow();
292 NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
293 start_ts, finish_ts, read_reqs[i].status);
294 }
295 #endif // ROCKSDB_LITE
296 IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
297 }
298 SetPerfLevel(prev_perf_level);
299 }
300 if (stats_ != nullptr && file_read_hist_ != nullptr) {
301 file_read_hist_->Add(elapsed);
302 }
303
304 return s;
305 }
306
307 } // namespace ROCKSDB_NAMESPACE