]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/file/random_access_file_reader.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rocksdb / file / random_access_file_reader.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "file/random_access_file_reader.h"
11
12 #include <algorithm>
13 #include <mutex>
14
15 #include "file/file_util.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "table/format.h"
20 #include "test_util/sync_point.h"
21 #include "util/random.h"
22 #include "util/rate_limiter.h"
23
24 namespace ROCKSDB_NAMESPACE {
25
26 inline void RecordIOStats(Statistics* stats, Temperature file_temperature,
27 bool is_last_level, size_t size) {
28 IOSTATS_ADD(bytes_read, size);
29 // record for last/non-last level
30 if (is_last_level) {
31 RecordTick(stats, LAST_LEVEL_READ_BYTES, size);
32 RecordTick(stats, LAST_LEVEL_READ_COUNT, 1);
33 } else {
34 RecordTick(stats, NON_LAST_LEVEL_READ_BYTES, size);
35 RecordTick(stats, NON_LAST_LEVEL_READ_COUNT, 1);
36 }
37
38 // record for temperature file
39 if (file_temperature != Temperature::kUnknown) {
40 switch (file_temperature) {
41 case Temperature::kHot:
42 IOSTATS_ADD(file_io_stats_by_temperature.hot_file_bytes_read, size);
43 IOSTATS_ADD(file_io_stats_by_temperature.hot_file_read_count, 1);
44 RecordTick(stats, HOT_FILE_READ_BYTES, size);
45 RecordTick(stats, HOT_FILE_READ_COUNT, 1);
46 break;
47 case Temperature::kWarm:
48 IOSTATS_ADD(file_io_stats_by_temperature.warm_file_bytes_read, size);
49 IOSTATS_ADD(file_io_stats_by_temperature.warm_file_read_count, 1);
50 RecordTick(stats, WARM_FILE_READ_BYTES, size);
51 RecordTick(stats, WARM_FILE_READ_COUNT, 1);
52 break;
53 case Temperature::kCold:
54 IOSTATS_ADD(file_io_stats_by_temperature.cold_file_bytes_read, size);
55 IOSTATS_ADD(file_io_stats_by_temperature.cold_file_read_count, 1);
56 RecordTick(stats, COLD_FILE_READ_BYTES, size);
57 RecordTick(stats, COLD_FILE_READ_COUNT, 1);
58 break;
59 default:
60 break;
61 }
62 }
63 }
64
65 IOStatus RandomAccessFileReader::Create(
66 const std::shared_ptr<FileSystem>& fs, const std::string& fname,
67 const FileOptions& file_opts,
68 std::unique_ptr<RandomAccessFileReader>* reader, IODebugContext* dbg) {
69 std::unique_ptr<FSRandomAccessFile> file;
70 IOStatus io_s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg);
71 if (io_s.ok()) {
72 reader->reset(new RandomAccessFileReader(std::move(file), fname));
73 }
74 return io_s;
75 }
76
77 IOStatus RandomAccessFileReader::Read(
78 const IOOptions& opts, uint64_t offset, size_t n, Slice* result,
79 char* scratch, AlignedBuf* aligned_buf,
80 Env::IOPriority rate_limiter_priority) const {
81 (void)aligned_buf;
82
83 TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
84
85 // To be paranoid: modify scratch a little bit, so in case underlying
86 // FileSystem doesn't fill the buffer but return success and `scratch` returns
87 // contains a previous block, returned value will not pass checksum.
88 if (n > 0 && scratch != nullptr) {
89 // This byte might not change anything for direct I/O case, but it's OK.
90 scratch[0]++;
91 }
92
93 IOStatus io_s;
94 uint64_t elapsed = 0;
95 {
96 StopWatch sw(clock_, stats_, hist_type_,
97 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
98 true /*delay_enabled*/);
99 auto prev_perf_level = GetPerfLevel();
100 IOSTATS_TIMER_GUARD(read_nanos);
101 if (use_direct_io()) {
102 #ifndef ROCKSDB_LITE
103 size_t alignment = file_->GetRequiredBufferAlignment();
104 size_t aligned_offset =
105 TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
106 size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
107 size_t read_size =
108 Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
109 AlignedBuffer buf;
110 buf.Alignment(alignment);
111 buf.AllocateNewBuffer(read_size);
112 while (buf.CurrentSize() < read_size) {
113 size_t allowed;
114 if (rate_limiter_priority != Env::IO_TOTAL &&
115 rate_limiter_ != nullptr) {
116 allowed = rate_limiter_->RequestToken(
117 buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
118 rate_limiter_priority, stats_, RateLimiter::OpType::kRead);
119 } else {
120 assert(buf.CurrentSize() == 0);
121 allowed = read_size;
122 }
123 Slice tmp;
124
125 FileOperationInfo::StartTimePoint start_ts;
126 uint64_t orig_offset = 0;
127 if (ShouldNotifyListeners()) {
128 start_ts = FileOperationInfo::StartNow();
129 orig_offset = aligned_offset + buf.CurrentSize();
130 }
131
132 {
133 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
134 // Only user reads are expected to specify a timeout. And user reads
135 // are not subjected to rate_limiter and should go through only
136 // one iteration of this loop, so we don't need to check and adjust
137 // the opts.timeout before calling file_->Read
138 assert(!opts.timeout.count() || allowed == read_size);
139 io_s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts,
140 &tmp, buf.Destination(), nullptr);
141 }
142 if (ShouldNotifyListeners()) {
143 auto finish_ts = FileOperationInfo::FinishNow();
144 NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
145 io_s);
146 if (!io_s.ok()) {
147 NotifyOnIOError(io_s, FileOperationType::kRead, file_name(),
148 tmp.size(), orig_offset);
149 }
150 }
151
152 buf.Size(buf.CurrentSize() + tmp.size());
153 if (!io_s.ok() || tmp.size() < allowed) {
154 break;
155 }
156 }
157 size_t res_len = 0;
158 if (io_s.ok() && offset_advance < buf.CurrentSize()) {
159 res_len = std::min(buf.CurrentSize() - offset_advance, n);
160 if (aligned_buf == nullptr) {
161 buf.Read(scratch, offset_advance, res_len);
162 } else {
163 scratch = buf.BufferStart() + offset_advance;
164 aligned_buf->reset(buf.Release());
165 }
166 }
167 *result = Slice(scratch, res_len);
168 #endif // !ROCKSDB_LITE
169 } else {
170 size_t pos = 0;
171 const char* res_scratch = nullptr;
172 while (pos < n) {
173 size_t allowed;
174 if (rate_limiter_priority != Env::IO_TOTAL &&
175 rate_limiter_ != nullptr) {
176 if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
177 sw.DelayStart();
178 }
179 allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
180 rate_limiter_priority, stats_,
181 RateLimiter::OpType::kRead);
182 if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
183 sw.DelayStop();
184 }
185 } else {
186 allowed = n;
187 }
188 Slice tmp_result;
189
190 #ifndef ROCKSDB_LITE
191 FileOperationInfo::StartTimePoint start_ts;
192 if (ShouldNotifyListeners()) {
193 start_ts = FileOperationInfo::StartNow();
194 }
195 #endif
196
197 {
198 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
199 // Only user reads are expected to specify a timeout. And user reads
200 // are not subjected to rate_limiter and should go through only
201 // one iteration of this loop, so we don't need to check and adjust
202 // the opts.timeout before calling file_->Read
203 assert(!opts.timeout.count() || allowed == n);
204 io_s = file_->Read(offset + pos, allowed, opts, &tmp_result,
205 scratch + pos, nullptr);
206 }
207 #ifndef ROCKSDB_LITE
208 if (ShouldNotifyListeners()) {
209 auto finish_ts = FileOperationInfo::FinishNow();
210 NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
211 finish_ts, io_s);
212
213 if (!io_s.ok()) {
214 NotifyOnIOError(io_s, FileOperationType::kRead, file_name(),
215 tmp_result.size(), offset + pos);
216 }
217 }
218 #endif
219 if (res_scratch == nullptr) {
220 // we can't simply use `scratch` because reads of mmap'd files return
221 // data in a different buffer.
222 res_scratch = tmp_result.data();
223 } else {
224 // make sure chunks are inserted contiguously into `res_scratch`.
225 assert(tmp_result.data() == res_scratch + pos);
226 }
227 pos += tmp_result.size();
228 if (!io_s.ok() || tmp_result.size() < allowed) {
229 break;
230 }
231 }
232 *result = Slice(res_scratch, io_s.ok() ? pos : 0);
233 }
234 RecordIOStats(stats_, file_temperature_, is_last_level_, result->size());
235 SetPerfLevel(prev_perf_level);
236 }
237 if (stats_ != nullptr && file_read_hist_ != nullptr) {
238 file_read_hist_->Add(elapsed);
239 }
240
241 return io_s;
242 }
243
244 size_t End(const FSReadRequest& r) {
245 return static_cast<size_t>(r.offset) + r.len;
246 }
247
248 FSReadRequest Align(const FSReadRequest& r, size_t alignment) {
249 FSReadRequest req;
250 req.offset = static_cast<uint64_t>(
251 TruncateToPageBoundary(alignment, static_cast<size_t>(r.offset)));
252 req.len = Roundup(End(r), alignment) - req.offset;
253 req.scratch = nullptr;
254 return req;
255 }
256
257 bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
258 size_t dest_offset = static_cast<size_t>(dest->offset);
259 size_t src_offset = static_cast<size_t>(src.offset);
260 size_t dest_end = End(*dest);
261 size_t src_end = End(src);
262 if (std::max(dest_offset, src_offset) > std::min(dest_end, src_end)) {
263 return false;
264 }
265 dest->offset = static_cast<uint64_t>(std::min(dest_offset, src_offset));
266 dest->len = std::max(dest_end, src_end) - dest->offset;
267 return true;
268 }
269
270 IOStatus RandomAccessFileReader::MultiRead(
271 const IOOptions& opts, FSReadRequest* read_reqs, size_t num_reqs,
272 AlignedBuf* aligned_buf, Env::IOPriority rate_limiter_priority) const {
273 (void)aligned_buf; // suppress warning of unused variable in LITE mode
274 assert(num_reqs > 0);
275
276 #ifndef NDEBUG
277 for (size_t i = 0; i < num_reqs - 1; ++i) {
278 assert(read_reqs[i].offset <= read_reqs[i + 1].offset);
279 }
280 #endif // !NDEBUG
281
282 // To be paranoid modify scratch a little bit, so in case underlying
283 // FileSystem doesn't fill the buffer but return success and `scratch` returns
284 // contains a previous block, returned value will not pass checksum.
285 // This byte might not change anything for direct I/O case, but it's OK.
286 for (size_t i = 0; i < num_reqs; i++) {
287 FSReadRequest& r = read_reqs[i];
288 if (r.len > 0 && r.scratch != nullptr) {
289 r.scratch[0]++;
290 }
291 }
292
293 IOStatus io_s;
294 uint64_t elapsed = 0;
295 {
296 StopWatch sw(clock_, stats_, hist_type_,
297 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
298 true /*delay_enabled*/);
299 auto prev_perf_level = GetPerfLevel();
300 IOSTATS_TIMER_GUARD(read_nanos);
301
302 FSReadRequest* fs_reqs = read_reqs;
303 size_t num_fs_reqs = num_reqs;
304 #ifndef ROCKSDB_LITE
305 std::vector<FSReadRequest> aligned_reqs;
306 if (use_direct_io()) {
307 // num_reqs is the max possible size,
308 // this can reduce std::vecector's internal resize operations.
309 aligned_reqs.reserve(num_reqs);
310 // Align and merge the read requests.
311 size_t alignment = file_->GetRequiredBufferAlignment();
312 for (size_t i = 0; i < num_reqs; i++) {
313 const auto& r = Align(read_reqs[i], alignment);
314 if (i == 0) {
315 // head
316 aligned_reqs.push_back(r);
317
318 } else if (!TryMerge(&aligned_reqs.back(), r)) {
319 // head + n
320 aligned_reqs.push_back(r);
321
322 } else {
323 // unused
324 r.status.PermitUncheckedError();
325 }
326 }
327 TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::MultiRead:AlignedReqs",
328 &aligned_reqs);
329
330 // Allocate aligned buffer and let scratch buffers point to it.
331 size_t total_len = 0;
332 for (const auto& r : aligned_reqs) {
333 total_len += r.len;
334 }
335 AlignedBuffer buf;
336 buf.Alignment(alignment);
337 buf.AllocateNewBuffer(total_len);
338 char* scratch = buf.BufferStart();
339 for (auto& r : aligned_reqs) {
340 r.scratch = scratch;
341 scratch += r.len;
342 }
343
344 aligned_buf->reset(buf.Release());
345 fs_reqs = aligned_reqs.data();
346 num_fs_reqs = aligned_reqs.size();
347 }
348 #endif // ROCKSDB_LITE
349
350 #ifndef ROCKSDB_LITE
351 FileOperationInfo::StartTimePoint start_ts;
352 if (ShouldNotifyListeners()) {
353 start_ts = FileOperationInfo::StartNow();
354 }
355 #endif // ROCKSDB_LITE
356
357 {
358 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
359 if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
360 // TODO: ideally we should call `RateLimiter::RequestToken()` for
361 // allowed bytes to multi-read and then consume those bytes by
362 // satisfying as many requests in `MultiRead()` as possible, instead of
363 // what we do here, which can cause burst when the
364 // `total_multi_read_size` is big.
365 size_t total_multi_read_size = 0;
366 assert(fs_reqs != nullptr);
367 for (size_t i = 0; i < num_fs_reqs; ++i) {
368 FSReadRequest& req = fs_reqs[i];
369 total_multi_read_size += req.len;
370 }
371 size_t remaining_bytes = total_multi_read_size;
372 size_t request_bytes = 0;
373 while (remaining_bytes > 0) {
374 request_bytes = std::min(
375 static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()),
376 remaining_bytes);
377 rate_limiter_->Request(request_bytes, rate_limiter_priority,
378 nullptr /* stats */,
379 RateLimiter::OpType::kRead);
380 remaining_bytes -= request_bytes;
381 }
382 }
383 io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, nullptr);
384 RecordInHistogram(stats_, MULTIGET_IO_BATCH_SIZE, num_fs_reqs);
385 }
386
387 #ifndef ROCKSDB_LITE
388 if (use_direct_io()) {
389 // Populate results in the unaligned read requests.
390 size_t aligned_i = 0;
391 for (size_t i = 0; i < num_reqs; i++) {
392 auto& r = read_reqs[i];
393 if (static_cast<size_t>(r.offset) > End(aligned_reqs[aligned_i])) {
394 aligned_i++;
395 }
396 const auto& fs_r = fs_reqs[aligned_i];
397 r.status = fs_r.status;
398 if (r.status.ok()) {
399 uint64_t offset = r.offset - fs_r.offset;
400 if (fs_r.result.size() <= offset) {
401 // No byte in the read range is returned.
402 r.result = Slice();
403 } else {
404 size_t len = std::min(
405 r.len, static_cast<size_t>(fs_r.result.size() - offset));
406 r.result = Slice(fs_r.scratch + offset, len);
407 }
408 } else {
409 r.result = Slice();
410 }
411 }
412 }
413 #endif // ROCKSDB_LITE
414
415 for (size_t i = 0; i < num_reqs; ++i) {
416 #ifndef ROCKSDB_LITE
417 if (ShouldNotifyListeners()) {
418 auto finish_ts = FileOperationInfo::FinishNow();
419 NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
420 start_ts, finish_ts, read_reqs[i].status);
421 }
422 if (!read_reqs[i].status.ok()) {
423 NotifyOnIOError(read_reqs[i].status, FileOperationType::kRead,
424 file_name(), read_reqs[i].result.size(),
425 read_reqs[i].offset);
426 }
427
428 #endif // ROCKSDB_LITE
429 RecordIOStats(stats_, file_temperature_, is_last_level_,
430 read_reqs[i].result.size());
431 }
432 SetPerfLevel(prev_perf_level);
433 }
434 if (stats_ != nullptr && file_read_hist_ != nullptr) {
435 file_read_hist_->Add(elapsed);
436 }
437
438 return io_s;
439 }
440
441 IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
442 IOOptions& opts) {
443 if (clock_ != nullptr) {
444 return PrepareIOFromReadOptions(ro, clock_, opts);
445 } else {
446 return PrepareIOFromReadOptions(ro, SystemClock::Default().get(), opts);
447 }
448 }
449
450 IOStatus RandomAccessFileReader::ReadAsync(
451 FSReadRequest& req, const IOOptions& opts,
452 std::function<void(const FSReadRequest&, void*)> cb, void* cb_arg,
453 void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf) {
454 IOStatus s;
455 // Create a callback and populate info.
456 auto read_async_callback =
457 std::bind(&RandomAccessFileReader::ReadAsyncCallback, this,
458 std::placeholders::_1, std::placeholders::_2);
459 ReadAsyncInfo* read_async_info =
460 new ReadAsyncInfo(cb, cb_arg, clock_->NowMicros());
461
462 #ifndef ROCKSDB_LITE
463 if (ShouldNotifyListeners()) {
464 read_async_info->fs_start_ts_ = FileOperationInfo::StartNow();
465 }
466 #endif
467
468 size_t alignment = file_->GetRequiredBufferAlignment();
469 bool is_aligned = (req.offset & (alignment - 1)) == 0 &&
470 (req.len & (alignment - 1)) == 0 &&
471 (uintptr_t(req.scratch) & (alignment - 1)) == 0;
472 read_async_info->is_aligned_ = is_aligned;
473
474 uint64_t elapsed = 0;
475 if (use_direct_io() && is_aligned == false) {
476 FSReadRequest aligned_req = Align(req, alignment);
477 aligned_req.status.PermitUncheckedError();
478
479 // Allocate aligned buffer.
480 read_async_info->buf_.Alignment(alignment);
481 read_async_info->buf_.AllocateNewBuffer(aligned_req.len);
482
483 // Set rem fields in aligned FSReadRequest.
484 aligned_req.scratch = read_async_info->buf_.BufferStart();
485
486 // Set user provided fields to populate back in callback.
487 read_async_info->user_scratch_ = req.scratch;
488 read_async_info->user_aligned_buf_ = aligned_buf;
489 read_async_info->user_len_ = req.len;
490 read_async_info->user_offset_ = req.offset;
491 read_async_info->user_result_ = req.result;
492
493 assert(read_async_info->buf_.CurrentSize() == 0);
494
495 StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed,
496 true /*overwrite*/, true /*delay_enabled*/);
497 s = file_->ReadAsync(aligned_req, opts, read_async_callback,
498 read_async_info, io_handle, del_fn, nullptr /*dbg*/);
499 } else {
500 StopWatch sw(clock_, nullptr /*stats*/, 0 /*hist_type*/, &elapsed,
501 true /*overwrite*/, true /*delay_enabled*/);
502 s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
503 io_handle, del_fn, nullptr /*dbg*/);
504 }
505 RecordTick(stats_, READ_ASYNC_MICROS, elapsed);
506
507 // Suppress false positive clang analyzer warnings.
508 // Memory is not released if file_->ReadAsync returns !s.ok(), because
509 // ReadAsyncCallback is never called in that case. If ReadAsyncCallback is
510 // called then ReadAsync should always return IOStatus::OK().
511 #ifndef __clang_analyzer__
512 if (!s.ok()) {
513 delete read_async_info;
514 }
515 #endif // __clang_analyzer__
516
517 return s;
518 }
519
520 void RandomAccessFileReader::ReadAsyncCallback(const FSReadRequest& req,
521 void* cb_arg) {
522 ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg);
523 assert(read_async_info);
524 assert(read_async_info->cb_);
525
526 if (use_direct_io() && read_async_info->is_aligned_ == false) {
527 // Create FSReadRequest with user provided fields.
528 FSReadRequest user_req;
529 user_req.scratch = read_async_info->user_scratch_;
530 user_req.offset = read_async_info->user_offset_;
531 user_req.len = read_async_info->user_len_;
532
533 // Update results in user_req.
534 user_req.result = req.result;
535 user_req.status = req.status;
536
537 read_async_info->buf_.Size(read_async_info->buf_.CurrentSize() +
538 req.result.size());
539
540 size_t offset_advance_len = static_cast<size_t>(
541 /*offset_passed_by_user=*/read_async_info->user_offset_ -
542 /*aligned_offset=*/req.offset);
543
544 size_t res_len = 0;
545 if (req.status.ok() &&
546 offset_advance_len < read_async_info->buf_.CurrentSize()) {
547 res_len =
548 std::min(read_async_info->buf_.CurrentSize() - offset_advance_len,
549 read_async_info->user_len_);
550 if (read_async_info->user_aligned_buf_ == nullptr) {
551 // Copy the data into user's scratch.
552 // Clang analyzer assumes that it will take use_direct_io() == false in
553 // ReadAsync and use_direct_io() == true in Callback which cannot be true.
554 #ifndef __clang_analyzer__
555 read_async_info->buf_.Read(user_req.scratch, offset_advance_len,
556 res_len);
557 #endif // __clang_analyzer__
558 } else {
559 // Set aligned_buf provided by user without additional copy.
560 user_req.scratch =
561 read_async_info->buf_.BufferStart() + offset_advance_len;
562 read_async_info->user_aligned_buf_->reset(
563 read_async_info->buf_.Release());
564 }
565 user_req.result = Slice(user_req.scratch, res_len);
566 } else {
567 // Either req.status is not ok or data was not read.
568 user_req.result = Slice();
569 }
570 read_async_info->cb_(user_req, read_async_info->cb_arg_);
571 } else {
572 read_async_info->cb_(req, read_async_info->cb_arg_);
573 }
574
575 // Update stats and notify listeners.
576 if (stats_ != nullptr && file_read_hist_ != nullptr) {
577 // elapsed doesn't take into account delay and overwrite as StopWatch does
578 // in Read.
579 uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_;
580 file_read_hist_->Add(elapsed);
581 }
582 if (req.status.ok()) {
583 RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size());
584 } else if (!req.status.IsAborted()) {
585 RecordTick(stats_, ASYNC_READ_ERROR_COUNT, 1);
586 }
587 #ifndef ROCKSDB_LITE
588 if (ShouldNotifyListeners()) {
589 auto finish_ts = FileOperationInfo::FinishNow();
590 NotifyOnFileReadFinish(req.offset, req.result.size(),
591 read_async_info->fs_start_ts_, finish_ts,
592 req.status);
593 }
594 if (!req.status.ok()) {
595 NotifyOnIOError(req.status, FileOperationType::kRead, file_name(),
596 req.result.size(), req.offset);
597 }
598 #endif
599 RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
600 delete read_async_info;
601 }
602 } // namespace ROCKSDB_NAMESPACE