]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | // | |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #include "file/random_access_file_reader.h" | |
11 | ||
12 | #include <algorithm> | |
13 | #include <mutex> | |
14 | ||
1e59de90 | 15 | #include "file/file_util.h" |
f67539c2 TL |
16 | #include "monitoring/histogram.h" |
17 | #include "monitoring/iostats_context_imp.h" | |
18 | #include "port/port.h" | |
20effc67 | 19 | #include "table/format.h" |
f67539c2 TL |
20 | #include "test_util/sync_point.h" |
21 | #include "util/random.h" | |
22 | #include "util/rate_limiter.h" | |
23 | ||
24 | namespace ROCKSDB_NAMESPACE { | |
20effc67 | 25 | |
1e59de90 TL |
26 | inline void RecordIOStats(Statistics* stats, Temperature file_temperature, |
27 | bool is_last_level, size_t size) { | |
28 | IOSTATS_ADD(bytes_read, size); | |
29 | // record for last/non-last level | |
30 | if (is_last_level) { | |
31 | RecordTick(stats, LAST_LEVEL_READ_BYTES, size); | |
32 | RecordTick(stats, LAST_LEVEL_READ_COUNT, 1); | |
33 | } else { | |
34 | RecordTick(stats, NON_LAST_LEVEL_READ_BYTES, size); | |
35 | RecordTick(stats, NON_LAST_LEVEL_READ_COUNT, 1); | |
36 | } | |
37 | ||
38 | // record for temperature file | |
39 | if (file_temperature != Temperature::kUnknown) { | |
40 | switch (file_temperature) { | |
41 | case Temperature::kHot: | |
42 | IOSTATS_ADD(file_io_stats_by_temperature.hot_file_bytes_read, size); | |
43 | IOSTATS_ADD(file_io_stats_by_temperature.hot_file_read_count, 1); | |
44 | RecordTick(stats, HOT_FILE_READ_BYTES, size); | |
45 | RecordTick(stats, HOT_FILE_READ_COUNT, 1); | |
46 | break; | |
47 | case Temperature::kWarm: | |
48 | IOSTATS_ADD(file_io_stats_by_temperature.warm_file_bytes_read, size); | |
49 | IOSTATS_ADD(file_io_stats_by_temperature.warm_file_read_count, 1); | |
50 | RecordTick(stats, WARM_FILE_READ_BYTES, size); | |
51 | RecordTick(stats, WARM_FILE_READ_COUNT, 1); | |
52 | break; | |
53 | case Temperature::kCold: | |
54 | IOSTATS_ADD(file_io_stats_by_temperature.cold_file_bytes_read, size); | |
55 | IOSTATS_ADD(file_io_stats_by_temperature.cold_file_read_count, 1); | |
56 | RecordTick(stats, COLD_FILE_READ_BYTES, size); | |
57 | RecordTick(stats, COLD_FILE_READ_COUNT, 1); | |
58 | break; | |
59 | default: | |
60 | break; | |
61 | } | |
62 | } | |
63 | } | |
64 | ||
65 | IOStatus RandomAccessFileReader::Create( | |
66 | const std::shared_ptr<FileSystem>& fs, const std::string& fname, | |
67 | const FileOptions& file_opts, | |
68 | std::unique_ptr<RandomAccessFileReader>* reader, IODebugContext* dbg) { | |
69 | std::unique_ptr<FSRandomAccessFile> file; | |
70 | IOStatus io_s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg); | |
71 | if (io_s.ok()) { | |
72 | reader->reset(new RandomAccessFileReader(std::move(file), fname)); | |
73 | } | |
74 | return io_s; | |
75 | } | |
76 | ||
77 | IOStatus RandomAccessFileReader::Read( | |
78 | const IOOptions& opts, uint64_t offset, size_t n, Slice* result, | |
79 | char* scratch, AlignedBuf* aligned_buf, | |
80 | Env::IOPriority rate_limiter_priority) const { | |
20effc67 TL |
81 | (void)aligned_buf; |
82 | ||
83 | TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr); | |
1e59de90 TL |
84 | |
85 | // To be paranoid: modify scratch a little bit, so in case underlying | |
86 | // FileSystem doesn't fill the buffer but return success and `scratch` returns | |
87 | // contains a previous block, returned value will not pass checksum. | |
88 | if (n > 0 && scratch != nullptr) { | |
89 | // This byte might not change anything for direct I/O case, but it's OK. | |
90 | scratch[0]++; | |
91 | } | |
92 | ||
93 | IOStatus io_s; | |
f67539c2 TL |
94 | uint64_t elapsed = 0; |
95 | { | |
1e59de90 | 96 | StopWatch sw(clock_, stats_, hist_type_, |
f67539c2 TL |
97 | (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, |
98 | true /*delay_enabled*/); | |
99 | auto prev_perf_level = GetPerfLevel(); | |
100 | IOSTATS_TIMER_GUARD(read_nanos); | |
101 | if (use_direct_io()) { | |
102 | #ifndef ROCKSDB_LITE | |
103 | size_t alignment = file_->GetRequiredBufferAlignment(); | |
104 | size_t aligned_offset = | |
105 | TruncateToPageBoundary(alignment, static_cast<size_t>(offset)); | |
106 | size_t offset_advance = static_cast<size_t>(offset) - aligned_offset; | |
107 | size_t read_size = | |
108 | Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset; | |
109 | AlignedBuffer buf; | |
110 | buf.Alignment(alignment); | |
111 | buf.AllocateNewBuffer(read_size); | |
112 | while (buf.CurrentSize() < read_size) { | |
113 | size_t allowed; | |
1e59de90 TL |
114 | if (rate_limiter_priority != Env::IO_TOTAL && |
115 | rate_limiter_ != nullptr) { | |
f67539c2 TL |
116 | allowed = rate_limiter_->RequestToken( |
117 | buf.Capacity() - buf.CurrentSize(), buf.Alignment(), | |
1e59de90 | 118 | rate_limiter_priority, stats_, RateLimiter::OpType::kRead); |
f67539c2 TL |
119 | } else { |
120 | assert(buf.CurrentSize() == 0); | |
121 | allowed = read_size; | |
122 | } | |
123 | Slice tmp; | |
124 | ||
20effc67 | 125 | FileOperationInfo::StartTimePoint start_ts; |
f67539c2 TL |
126 | uint64_t orig_offset = 0; |
127 | if (ShouldNotifyListeners()) { | |
20effc67 | 128 | start_ts = FileOperationInfo::StartNow(); |
f67539c2 TL |
129 | orig_offset = aligned_offset + buf.CurrentSize(); |
130 | } | |
20effc67 | 131 | |
f67539c2 | 132 | { |
1e59de90 | 133 | IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_); |
20effc67 TL |
134 | // Only user reads are expected to specify a timeout. And user reads |
135 | // are not subjected to rate_limiter and should go through only | |
136 | // one iteration of this loop, so we don't need to check and adjust | |
137 | // the opts.timeout before calling file_->Read | |
138 | assert(!opts.timeout.count() || allowed == read_size); | |
1e59de90 TL |
139 | io_s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts, |
140 | &tmp, buf.Destination(), nullptr); | |
f67539c2 TL |
141 | } |
142 | if (ShouldNotifyListeners()) { | |
20effc67 | 143 | auto finish_ts = FileOperationInfo::FinishNow(); |
f67539c2 | 144 | NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, |
1e59de90 TL |
145 | io_s); |
146 | if (!io_s.ok()) { | |
147 | NotifyOnIOError(io_s, FileOperationType::kRead, file_name(), | |
148 | tmp.size(), orig_offset); | |
149 | } | |
f67539c2 TL |
150 | } |
151 | ||
152 | buf.Size(buf.CurrentSize() + tmp.size()); | |
1e59de90 | 153 | if (!io_s.ok() || tmp.size() < allowed) { |
f67539c2 TL |
154 | break; |
155 | } | |
156 | } | |
157 | size_t res_len = 0; | |
1e59de90 | 158 | if (io_s.ok() && offset_advance < buf.CurrentSize()) { |
20effc67 TL |
159 | res_len = std::min(buf.CurrentSize() - offset_advance, n); |
160 | if (aligned_buf == nullptr) { | |
161 | buf.Read(scratch, offset_advance, res_len); | |
162 | } else { | |
163 | scratch = buf.BufferStart() + offset_advance; | |
164 | aligned_buf->reset(buf.Release()); | |
165 | } | |
f67539c2 TL |
166 | } |
167 | *result = Slice(scratch, res_len); | |
168 | #endif // !ROCKSDB_LITE | |
169 | } else { | |
170 | size_t pos = 0; | |
171 | const char* res_scratch = nullptr; | |
172 | while (pos < n) { | |
173 | size_t allowed; | |
1e59de90 TL |
174 | if (rate_limiter_priority != Env::IO_TOTAL && |
175 | rate_limiter_ != nullptr) { | |
f67539c2 TL |
176 | if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { |
177 | sw.DelayStart(); | |
178 | } | |
179 | allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */, | |
1e59de90 | 180 | rate_limiter_priority, stats_, |
f67539c2 TL |
181 | RateLimiter::OpType::kRead); |
182 | if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { | |
183 | sw.DelayStop(); | |
184 | } | |
185 | } else { | |
186 | allowed = n; | |
187 | } | |
188 | Slice tmp_result; | |
189 | ||
190 | #ifndef ROCKSDB_LITE | |
20effc67 | 191 | FileOperationInfo::StartTimePoint start_ts; |
f67539c2 | 192 | if (ShouldNotifyListeners()) { |
20effc67 | 193 | start_ts = FileOperationInfo::StartNow(); |
f67539c2 TL |
194 | } |
195 | #endif | |
20effc67 | 196 | |
f67539c2 | 197 | { |
1e59de90 | 198 | IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_); |
20effc67 TL |
199 | // Only user reads are expected to specify a timeout. And user reads |
200 | // are not subjected to rate_limiter and should go through only | |
201 | // one iteration of this loop, so we don't need to check and adjust | |
202 | // the opts.timeout before calling file_->Read | |
203 | assert(!opts.timeout.count() || allowed == n); | |
1e59de90 TL |
204 | io_s = file_->Read(offset + pos, allowed, opts, &tmp_result, |
205 | scratch + pos, nullptr); | |
f67539c2 TL |
206 | } |
207 | #ifndef ROCKSDB_LITE | |
208 | if (ShouldNotifyListeners()) { | |
20effc67 | 209 | auto finish_ts = FileOperationInfo::FinishNow(); |
f67539c2 | 210 | NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts, |
1e59de90 TL |
211 | finish_ts, io_s); |
212 | ||
213 | if (!io_s.ok()) { | |
214 | NotifyOnIOError(io_s, FileOperationType::kRead, file_name(), | |
215 | tmp_result.size(), offset + pos); | |
216 | } | |
f67539c2 TL |
217 | } |
218 | #endif | |
f67539c2 TL |
219 | if (res_scratch == nullptr) { |
220 | // we can't simply use `scratch` because reads of mmap'd files return | |
221 | // data in a different buffer. | |
222 | res_scratch = tmp_result.data(); | |
223 | } else { | |
224 | // make sure chunks are inserted contiguously into `res_scratch`. | |
225 | assert(tmp_result.data() == res_scratch + pos); | |
226 | } | |
227 | pos += tmp_result.size(); | |
1e59de90 | 228 | if (!io_s.ok() || tmp_result.size() < allowed) { |
f67539c2 TL |
229 | break; |
230 | } | |
231 | } | |
1e59de90 | 232 | *result = Slice(res_scratch, io_s.ok() ? pos : 0); |
f67539c2 | 233 | } |
1e59de90 | 234 | RecordIOStats(stats_, file_temperature_, is_last_level_, result->size()); |
f67539c2 TL |
235 | SetPerfLevel(prev_perf_level); |
236 | } | |
237 | if (stats_ != nullptr && file_read_hist_ != nullptr) { | |
238 | file_read_hist_->Add(elapsed); | |
239 | } | |
240 | ||
1e59de90 | 241 | return io_s; |
f67539c2 TL |
242 | } |
243 | ||
20effc67 TL |
244 | size_t End(const FSReadRequest& r) { |
245 | return static_cast<size_t>(r.offset) + r.len; | |
246 | } | |
247 | ||
248 | FSReadRequest Align(const FSReadRequest& r, size_t alignment) { | |
249 | FSReadRequest req; | |
250 | req.offset = static_cast<uint64_t>( | |
1e59de90 | 251 | TruncateToPageBoundary(alignment, static_cast<size_t>(r.offset))); |
20effc67 TL |
252 | req.len = Roundup(End(r), alignment) - req.offset; |
253 | req.scratch = nullptr; | |
254 | return req; | |
255 | } | |
256 | ||
257 | bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) { | |
258 | size_t dest_offset = static_cast<size_t>(dest->offset); | |
259 | size_t src_offset = static_cast<size_t>(src.offset); | |
260 | size_t dest_end = End(*dest); | |
261 | size_t src_end = End(src); | |
262 | if (std::max(dest_offset, src_offset) > std::min(dest_end, src_end)) { | |
263 | return false; | |
264 | } | |
265 | dest->offset = static_cast<uint64_t>(std::min(dest_offset, src_offset)); | |
266 | dest->len = std::max(dest_end, src_end) - dest->offset; | |
267 | return true; | |
268 | } | |
269 | ||
1e59de90 TL |
270 | IOStatus RandomAccessFileReader::MultiRead( |
271 | const IOOptions& opts, FSReadRequest* read_reqs, size_t num_reqs, | |
272 | AlignedBuf* aligned_buf, Env::IOPriority rate_limiter_priority) const { | |
20effc67 TL |
273 | (void)aligned_buf; // suppress warning of unused variable in LITE mode |
274 | assert(num_reqs > 0); | |
1e59de90 TL |
275 | |
276 | #ifndef NDEBUG | |
277 | for (size_t i = 0; i < num_reqs - 1; ++i) { | |
278 | assert(read_reqs[i].offset <= read_reqs[i + 1].offset); | |
279 | } | |
280 | #endif // !NDEBUG | |
281 | ||
282 | // To be paranoid modify scratch a little bit, so in case underlying | |
283 | // FileSystem doesn't fill the buffer but return success and `scratch` returns | |
284 | // contains a previous block, returned value will not pass checksum. | |
285 | // This byte might not change anything for direct I/O case, but it's OK. | |
286 | for (size_t i = 0; i < num_reqs; i++) { | |
287 | FSReadRequest& r = read_reqs[i]; | |
288 | if (r.len > 0 && r.scratch != nullptr) { | |
289 | r.scratch[0]++; | |
290 | } | |
291 | } | |
292 | ||
293 | IOStatus io_s; | |
f67539c2 | 294 | uint64_t elapsed = 0; |
f67539c2 | 295 | { |
1e59de90 | 296 | StopWatch sw(clock_, stats_, hist_type_, |
f67539c2 TL |
297 | (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, |
298 | true /*delay_enabled*/); | |
299 | auto prev_perf_level = GetPerfLevel(); | |
300 | IOSTATS_TIMER_GUARD(read_nanos); | |
301 | ||
20effc67 TL |
302 | FSReadRequest* fs_reqs = read_reqs; |
303 | size_t num_fs_reqs = num_reqs; | |
f67539c2 | 304 | #ifndef ROCKSDB_LITE |
20effc67 TL |
305 | std::vector<FSReadRequest> aligned_reqs; |
306 | if (use_direct_io()) { | |
307 | // num_reqs is the max possible size, | |
308 | // this can reduce std::vecector's internal resize operations. | |
309 | aligned_reqs.reserve(num_reqs); | |
310 | // Align and merge the read requests. | |
311 | size_t alignment = file_->GetRequiredBufferAlignment(); | |
1e59de90 | 312 | for (size_t i = 0; i < num_reqs; i++) { |
20effc67 | 313 | const auto& r = Align(read_reqs[i], alignment); |
1e59de90 TL |
314 | if (i == 0) { |
315 | // head | |
20effc67 | 316 | aligned_reqs.push_back(r); |
1e59de90 TL |
317 | |
318 | } else if (!TryMerge(&aligned_reqs.back(), r)) { | |
319 | // head + n | |
320 | aligned_reqs.push_back(r); | |
321 | ||
322 | } else { | |
323 | // unused | |
324 | r.status.PermitUncheckedError(); | |
20effc67 TL |
325 | } |
326 | } | |
327 | TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::MultiRead:AlignedReqs", | |
328 | &aligned_reqs); | |
329 | ||
330 | // Allocate aligned buffer and let scratch buffers point to it. | |
331 | size_t total_len = 0; | |
332 | for (const auto& r : aligned_reqs) { | |
333 | total_len += r.len; | |
334 | } | |
335 | AlignedBuffer buf; | |
336 | buf.Alignment(alignment); | |
337 | buf.AllocateNewBuffer(total_len); | |
338 | char* scratch = buf.BufferStart(); | |
339 | for (auto& r : aligned_reqs) { | |
340 | r.scratch = scratch; | |
341 | scratch += r.len; | |
342 | } | |
343 | ||
344 | aligned_buf->reset(buf.Release()); | |
345 | fs_reqs = aligned_reqs.data(); | |
346 | num_fs_reqs = aligned_reqs.size(); | |
347 | } | |
348 | #endif // ROCKSDB_LITE | |
349 | ||
350 | #ifndef ROCKSDB_LITE | |
351 | FileOperationInfo::StartTimePoint start_ts; | |
f67539c2 | 352 | if (ShouldNotifyListeners()) { |
20effc67 | 353 | start_ts = FileOperationInfo::StartNow(); |
f67539c2 TL |
354 | } |
355 | #endif // ROCKSDB_LITE | |
20effc67 | 356 | |
f67539c2 | 357 | { |
1e59de90 TL |
358 | IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_); |
359 | if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) { | |
360 | // TODO: ideally we should call `RateLimiter::RequestToken()` for | |
361 | // allowed bytes to multi-read and then consume those bytes by | |
362 | // satisfying as many requests in `MultiRead()` as possible, instead of | |
363 | // what we do here, which can cause burst when the | |
364 | // `total_multi_read_size` is big. | |
365 | size_t total_multi_read_size = 0; | |
366 | assert(fs_reqs != nullptr); | |
367 | for (size_t i = 0; i < num_fs_reqs; ++i) { | |
368 | FSReadRequest& req = fs_reqs[i]; | |
369 | total_multi_read_size += req.len; | |
370 | } | |
371 | size_t remaining_bytes = total_multi_read_size; | |
372 | size_t request_bytes = 0; | |
373 | while (remaining_bytes > 0) { | |
374 | request_bytes = std::min( | |
375 | static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()), | |
376 | remaining_bytes); | |
377 | rate_limiter_->Request(request_bytes, rate_limiter_priority, | |
378 | nullptr /* stats */, | |
379 | RateLimiter::OpType::kRead); | |
380 | remaining_bytes -= request_bytes; | |
381 | } | |
382 | } | |
383 | io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr); | |
384 | RecordInHistogram(stats_, MULTIGET_IO_BATCH_SIZE, num_fs_reqs); | |
f67539c2 | 385 | } |
20effc67 TL |
386 | |
387 | #ifndef ROCKSDB_LITE | |
388 | if (use_direct_io()) { | |
389 | // Populate results in the unaligned read requests. | |
390 | size_t aligned_i = 0; | |
391 | for (size_t i = 0; i < num_reqs; i++) { | |
392 | auto& r = read_reqs[i]; | |
393 | if (static_cast<size_t>(r.offset) > End(aligned_reqs[aligned_i])) { | |
394 | aligned_i++; | |
395 | } | |
396 | const auto& fs_r = fs_reqs[aligned_i]; | |
397 | r.status = fs_r.status; | |
398 | if (r.status.ok()) { | |
399 | uint64_t offset = r.offset - fs_r.offset; | |
1e59de90 TL |
400 | if (fs_r.result.size() <= offset) { |
401 | // No byte in the read range is returned. | |
402 | r.result = Slice(); | |
403 | } else { | |
404 | size_t len = std::min( | |
405 | r.len, static_cast<size_t>(fs_r.result.size() - offset)); | |
406 | r.result = Slice(fs_r.scratch + offset, len); | |
407 | } | |
20effc67 TL |
408 | } else { |
409 | r.result = Slice(); | |
410 | } | |
411 | } | |
412 | } | |
413 | #endif // ROCKSDB_LITE | |
414 | ||
f67539c2 TL |
415 | for (size_t i = 0; i < num_reqs; ++i) { |
416 | #ifndef ROCKSDB_LITE | |
417 | if (ShouldNotifyListeners()) { | |
20effc67 | 418 | auto finish_ts = FileOperationInfo::FinishNow(); |
f67539c2 TL |
419 | NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(), |
420 | start_ts, finish_ts, read_reqs[i].status); | |
421 | } | |
1e59de90 TL |
422 | if (!read_reqs[i].status.ok()) { |
423 | NotifyOnIOError(read_reqs[i].status, FileOperationType::kRead, | |
424 | file_name(), read_reqs[i].result.size(), | |
425 | read_reqs[i].offset); | |
426 | } | |
427 | ||
f67539c2 | 428 | #endif // ROCKSDB_LITE |
1e59de90 TL |
429 | RecordIOStats(stats_, file_temperature_, is_last_level_, |
430 | read_reqs[i].result.size()); | |
f67539c2 TL |
431 | } |
432 | SetPerfLevel(prev_perf_level); | |
433 | } | |
434 | if (stats_ != nullptr && file_read_hist_ != nullptr) { | |
435 | file_read_hist_->Add(elapsed); | |
436 | } | |
437 | ||
1e59de90 TL |
438 | return io_s; |
439 | } | |
440 | ||
441 | IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro, | |
442 | IOOptions& opts) { | |
443 | if (clock_ != nullptr) { | |
444 | return PrepareIOFromReadOptions(ro, clock_, opts); | |
445 | } else { | |
446 | return PrepareIOFromReadOptions(ro, SystemClock::Default().get(), opts); | |
447 | } | |
448 | } | |
449 | ||
450 | IOStatus RandomAccessFileReader::ReadAsync( | |
451 | FSReadRequest& req, const IOOptions& opts, | |
452 | std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg, | |
453 | void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf) { | |
454 | IOStatus s; | |
455 | // Create a callback and populate info. | |
456 | auto read_async_callback = | |
457 | std::bind(&RandomAccessFileReader::ReadAsyncCallback, this, | |
458 | std::placeholders::_1, std::placeholders::_2); | |
459 | ReadAsyncInfo* read_async_info = | |
460 | new ReadAsyncInfo(cb, cb_arg, clock_->NowMicros()); | |
461 | ||
462 | #ifndef ROCKSDB_LITE | |
463 | if (ShouldNotifyListeners()) { | |
464 | read_async_info->fs_start_ts_ = FileOperationInfo::StartNow(); | |
465 | } | |
466 | #endif | |
467 | ||
468 | size_t alignment = file_->GetRequiredBufferAlignment(); | |
469 | bool is_aligned = (req.offset & (alignment - 1)) == 0 && | |
470 | (req.len & (alignment - 1)) == 0 && | |
471 | (uintptr_t(req.scratch) & (alignment - 1)) == 0; | |
472 | read_async_info->is_aligned_ = is_aligned; | |
473 | ||
474 | uint64_t elapsed = 0; | |
475 | if (use_direct_io() && is_aligned == false) { | |
476 | FSReadRequest aligned_req = Align(req, alignment); | |
477 | aligned_req.status.PermitUncheckedError(); | |
478 | ||
479 | // Allocate aligned buffer. | |
480 | read_async_info->buf_.Alignment(alignment); | |
481 | read_async_info->buf_.AllocateNewBuffer(aligned_req.len); | |
482 | ||
483 | // Set rem fields in aligned FSReadRequest. | |
484 | aligned_req.scratch = read_async_info->buf_.BufferStart(); | |
485 | ||
486 | // Set user provided fields to populate back in callback. | |
487 | read_async_info->user_scratch_ = req.scratch; | |
488 | read_async_info->user_aligned_buf_ = aligned_buf; | |
489 | read_async_info->user_len_ = req.len; | |
490 | read_async_info->user_offset_ = req.offset; | |
491 | read_async_info->user_result_ = req.result; | |
492 | ||
493 | assert(read_async_info->buf_.CurrentSize() == 0); | |
494 | ||
495 | StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed, | |
496 | true /*overwrite*/, true /*delay_enabled*/); | |
497 | s = file_->ReadAsync(aligned_req, opts, read_async_callback, | |
498 | read_async_info, io_handle, del_fn, nullptr /*dbg*/); | |
499 | } else { | |
500 | StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed, | |
501 | true /*overwrite*/, true /*delay_enabled*/); | |
502 | s = file_->ReadAsync(req, opts, read_async_callback, read_async_info, | |
503 | io_handle, del_fn, nullptr /*dbg*/); | |
504 | } | |
505 | RecordTick(stats_, READ_ASYNC_MICROS, elapsed); | |
506 | ||
507 | // Suppress false positive clang analyzer warnings. | |
508 | // Memory is not released if file_->ReadAsync returns !s.ok(), because | |
509 | // ReadAsyncCallback is never called in that case. If ReadAsyncCallback is | |
510 | // called then ReadAsync should always return IOStatus::OK(). | |
511 | #ifndef __clang_analyzer__ | |
512 | if (!s.ok()) { | |
513 | delete read_async_info; | |
514 | } | |
515 | #endif // __clang_analyzer__ | |
516 | ||
f67539c2 TL |
517 | return s; |
518 | } | |
20effc67 | 519 | |
1e59de90 TL |
520 | void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req, |
521 | void* cb_arg) { | |
522 | ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg); | |
523 | assert(read_async_info); | |
524 | assert(read_async_info->cb_); | |
525 | ||
526 | if (use_direct_io() && read_async_info->is_aligned_ == false) { | |
527 | // Create FSReadRequest with user provided fields. | |
528 | FSReadRequest user_req; | |
529 | user_req.scratch = read_async_info->user_scratch_; | |
530 | user_req.offset = read_async_info->user_offset_; | |
531 | user_req.len = read_async_info->user_len_; | |
532 | ||
533 | // Update results in user_req. | |
534 | user_req.result = req.result; | |
535 | user_req.status = req.status; | |
536 | ||
537 | read_async_info->buf_.Size(read_async_info->buf_.CurrentSize() + | |
538 | req.result.size()); | |
539 | ||
540 | size_t offset_advance_len = static_cast<size_t>( | |
541 | /*offset_passed_by_user=*/read_async_info->user_offset_ - | |
542 | /*aligned_offset=*/req.offset); | |
543 | ||
544 | size_t res_len = 0; | |
545 | if (req.status.ok() && | |
546 | offset_advance_len < read_async_info->buf_.CurrentSize()) { | |
547 | res_len = | |
548 | std::min(read_async_info->buf_.CurrentSize() - offset_advance_len, | |
549 | read_async_info->user_len_); | |
550 | if (read_async_info->user_aligned_buf_ == nullptr) { | |
551 | // Copy the data into user's scratch. | |
552 | // Clang analyzer assumes that it will take use_direct_io() == false in | |
553 | // ReadAsync and use_direct_io() == true in Callback which cannot be true. | |
554 | #ifndef __clang_analyzer__ | |
555 | read_async_info->buf_.Read(user_req.scratch, offset_advance_len, | |
556 | res_len); | |
557 | #endif // __clang_analyzer__ | |
558 | } else { | |
559 | // Set aligned_buf provided by user without additional copy. | |
560 | user_req.scratch = | |
561 | read_async_info->buf_.BufferStart() + offset_advance_len; | |
562 | read_async_info->user_aligned_buf_->reset( | |
563 | read_async_info->buf_.Release()); | |
564 | } | |
565 | user_req.result = Slice(user_req.scratch, res_len); | |
566 | } else { | |
567 | // Either req.status is not ok or data was not read. | |
568 | user_req.result = Slice(); | |
569 | } | |
570 | read_async_info->cb_(user_req, read_async_info->cb_arg_); | |
571 | } else { | |
572 | read_async_info->cb_(req, read_async_info->cb_arg_); | |
573 | } | |
574 | ||
575 | // Update stats and notify listeners. | |
576 | if (stats_ != nullptr && file_read_hist_ != nullptr) { | |
577 | // elapsed doesn't take into account delay and overwrite as StopWatch does | |
578 | // in Read. | |
579 | uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_; | |
580 | file_read_hist_->Add(elapsed); | |
581 | } | |
582 | if (req.status.ok()) { | |
583 | RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size()); | |
584 | } else if (!req.status.IsAborted()) { | |
585 | RecordTick(stats_, ASYNC_READ_ERROR_COUNT, 1); | |
586 | } | |
587 | #ifndef ROCKSDB_LITE | |
588 | if (ShouldNotifyListeners()) { | |
589 | auto finish_ts = FileOperationInfo::FinishNow(); | |
590 | NotifyOnFileReadFinish(req.offset, req.result.size(), | |
591 | read_async_info->fs_start_ts_, finish_ts, | |
592 | req.status); | |
593 | } | |
594 | if (!req.status.ok()) { | |
595 | NotifyOnIOError(req.status, FileOperationType::kRead, file_name(), | |
596 | req.result.size(), req.offset); | |
597 | } | |
598 | #endif | |
599 | RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size()); | |
600 | delete read_async_info; | |
601 | } | |
f67539c2 | 602 | } // namespace ROCKSDB_NAMESPACE |