]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/file/random_access_file_reader.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / file / random_access_file_reader.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "file/random_access_file_reader.h"
11
12 #include <algorithm>
13 #include <mutex>
14
15 #include "monitoring/histogram.h"
16 #include "monitoring/iostats_context_imp.h"
17 #include "port/port.h"
18 #include "test_util/sync_point.h"
19 #include "util/random.h"
20 #include "util/rate_limiter.h"
21
22 namespace ROCKSDB_NAMESPACE {
23 Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result,
24 char* scratch, bool for_compaction) const {
25 Status s;
26 uint64_t elapsed = 0;
27 {
28 StopWatch sw(env_, stats_, hist_type_,
29 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
30 true /*delay_enabled*/);
31 auto prev_perf_level = GetPerfLevel();
32 IOSTATS_TIMER_GUARD(read_nanos);
33 if (use_direct_io()) {
34 #ifndef ROCKSDB_LITE
35 size_t alignment = file_->GetRequiredBufferAlignment();
36 size_t aligned_offset =
37 TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
38 size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
39 size_t read_size =
40 Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
41 AlignedBuffer buf;
42 buf.Alignment(alignment);
43 buf.AllocateNewBuffer(read_size);
44 while (buf.CurrentSize() < read_size) {
45 size_t allowed;
46 if (for_compaction && rate_limiter_ != nullptr) {
47 allowed = rate_limiter_->RequestToken(
48 buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
49 Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);
50 } else {
51 assert(buf.CurrentSize() == 0);
52 allowed = read_size;
53 }
54 Slice tmp;
55
56 FileOperationInfo::TimePoint start_ts;
57 uint64_t orig_offset = 0;
58 if (ShouldNotifyListeners()) {
59 start_ts = std::chrono::system_clock::now();
60 orig_offset = aligned_offset + buf.CurrentSize();
61 }
62 {
63 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
64 s = file_->Read(aligned_offset + buf.CurrentSize(), allowed,
65 IOOptions(), &tmp, buf.Destination(), nullptr);
66 }
67 if (ShouldNotifyListeners()) {
68 auto finish_ts = std::chrono::system_clock::now();
69 NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
70 s);
71 }
72
73 buf.Size(buf.CurrentSize() + tmp.size());
74 if (!s.ok() || tmp.size() < allowed) {
75 break;
76 }
77 }
78 size_t res_len = 0;
79 if (s.ok() && offset_advance < buf.CurrentSize()) {
80 res_len = buf.Read(scratch, offset_advance,
81 std::min(buf.CurrentSize() - offset_advance, n));
82 }
83 *result = Slice(scratch, res_len);
84 #endif // !ROCKSDB_LITE
85 } else {
86 size_t pos = 0;
87 const char* res_scratch = nullptr;
88 while (pos < n) {
89 size_t allowed;
90 if (for_compaction && rate_limiter_ != nullptr) {
91 if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
92 sw.DelayStart();
93 }
94 allowed = rate_limiter_->RequestToken(n - pos, 0 /* alignment */,
95 Env::IOPriority::IO_LOW, stats_,
96 RateLimiter::OpType::kRead);
97 if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
98 sw.DelayStop();
99 }
100 } else {
101 allowed = n;
102 }
103 Slice tmp_result;
104
105 #ifndef ROCKSDB_LITE
106 FileOperationInfo::TimePoint start_ts;
107 if (ShouldNotifyListeners()) {
108 start_ts = std::chrono::system_clock::now();
109 }
110 #endif
111 {
112 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
113 s = file_->Read(offset + pos, allowed, IOOptions(), &tmp_result,
114 scratch + pos, nullptr);
115 }
116 #ifndef ROCKSDB_LITE
117 if (ShouldNotifyListeners()) {
118 auto finish_ts = std::chrono::system_clock::now();
119 NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
120 finish_ts, s);
121 }
122 #endif
123
124 if (res_scratch == nullptr) {
125 // we can't simply use `scratch` because reads of mmap'd files return
126 // data in a different buffer.
127 res_scratch = tmp_result.data();
128 } else {
129 // make sure chunks are inserted contiguously into `res_scratch`.
130 assert(tmp_result.data() == res_scratch + pos);
131 }
132 pos += tmp_result.size();
133 if (!s.ok() || tmp_result.size() < allowed) {
134 break;
135 }
136 }
137 *result = Slice(res_scratch, s.ok() ? pos : 0);
138 }
139 IOSTATS_ADD_IF_POSITIVE(bytes_read, result->size());
140 SetPerfLevel(prev_perf_level);
141 }
142 if (stats_ != nullptr && file_read_hist_ != nullptr) {
143 file_read_hist_->Add(elapsed);
144 }
145
146 return s;
147 }
148
149 Status RandomAccessFileReader::MultiRead(FSReadRequest* read_reqs,
150 size_t num_reqs) const {
151 Status s;
152 uint64_t elapsed = 0;
153 assert(!use_direct_io());
154 {
155 StopWatch sw(env_, stats_, hist_type_,
156 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
157 true /*delay_enabled*/);
158 auto prev_perf_level = GetPerfLevel();
159 IOSTATS_TIMER_GUARD(read_nanos);
160
161 #ifndef ROCKSDB_LITE
162 FileOperationInfo::TimePoint start_ts;
163 if (ShouldNotifyListeners()) {
164 start_ts = std::chrono::system_clock::now();
165 }
166 #endif // ROCKSDB_LITE
167 {
168 IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, env_);
169 s = file_->MultiRead(read_reqs, num_reqs, IOOptions(), nullptr);
170 }
171 for (size_t i = 0; i < num_reqs; ++i) {
172 #ifndef ROCKSDB_LITE
173 if (ShouldNotifyListeners()) {
174 auto finish_ts = std::chrono::system_clock::now();
175 NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
176 start_ts, finish_ts, read_reqs[i].status);
177 }
178 #endif // ROCKSDB_LITE
179 IOSTATS_ADD_IF_POSITIVE(bytes_read, read_reqs[i].result.size());
180 }
181 SetPerfLevel(prev_perf_level);
182 }
183 if (stats_ != nullptr && file_read_hist_ != nullptr) {
184 file_read_hist_->Add(elapsed);
185 }
186
187 return s;
188 }
189 } // namespace ROCKSDB_NAMESPACE