]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/file/sequence_file_reader.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rocksdb / file / sequence_file_reader.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10 #include "file/sequence_file_reader.h"
11
12 #include <algorithm>
13 #include <mutex>
14
15 #include "file/read_write_util.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "test_util/sync_point.h"
20 #include "util/aligned_buffer.h"
21 #include "util/random.h"
22 #include "util/rate_limiter.h"
23
24 namespace ROCKSDB_NAMESPACE {
25 Status SequentialFileReader::Read(size_t n, Slice* result, char* scratch) {
26 Status s;
27 if (use_direct_io()) {
28 #ifndef ROCKSDB_LITE
29 size_t offset = offset_.fetch_add(n);
30 size_t alignment = file_->GetRequiredBufferAlignment();
31 size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
32 size_t offset_advance = offset - aligned_offset;
33 size_t size = Roundup(offset + n, alignment) - aligned_offset;
34 size_t r = 0;
35 AlignedBuffer buf;
36 buf.Alignment(alignment);
37 buf.AllocateNewBuffer(size);
38 Slice tmp;
39 s = file_->PositionedRead(aligned_offset, size, IOOptions(), &tmp,
40 buf.BufferStart(), nullptr);
41 if (s.ok() && offset_advance < tmp.size()) {
42 buf.Size(tmp.size());
43 r = buf.Read(scratch, offset_advance,
44 std::min(tmp.size() - offset_advance, n));
45 }
46 *result = Slice(scratch, r);
47 #endif // !ROCKSDB_LITE
48 } else {
49 s = file_->Read(n, IOOptions(), result, scratch, nullptr);
50 }
51 IOSTATS_ADD(bytes_read, result->size());
52 return s;
53 }
54
55 Status SequentialFileReader::Skip(uint64_t n) {
56 #ifndef ROCKSDB_LITE
57 if (use_direct_io()) {
58 offset_ += static_cast<size_t>(n);
59 return Status::OK();
60 }
61 #endif // !ROCKSDB_LITE
62 return file_->Skip(n);
63 }
64
65 namespace {
66 // This class wraps a SequentialFile, exposing same API, with the differenece
67 // of being able to prefetch up to readahead_size bytes and then serve them
68 // from memory, avoiding the entire round-trip if, for example, the data for the
69 // file is actually remote.
70 class ReadaheadSequentialFile : public FSSequentialFile {
71 public:
72 ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile>&& file,
73 size_t readahead_size)
74 : file_(std::move(file)),
75 alignment_(file_->GetRequiredBufferAlignment()),
76 readahead_size_(Roundup(readahead_size, alignment_)),
77 buffer_(),
78 buffer_offset_(0),
79 read_offset_(0) {
80 buffer_.Alignment(alignment_);
81 buffer_.AllocateNewBuffer(readahead_size_);
82 }
83
84 ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete;
85
86 ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete;
87
88 IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch,
89 IODebugContext* dbg) override {
90 std::unique_lock<std::mutex> lk(lock_);
91
92 size_t cached_len = 0;
93 // Check if there is a cache hit, meaning that [offset, offset + n) is
94 // either completely or partially in the buffer. If it's completely cached,
95 // including end of file case when offset + n is greater than EOF, then
96 // return.
97 if (TryReadFromCache(n, &cached_len, scratch) &&
98 (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
99 // We read exactly what we needed, or we hit end of file - return.
100 *result = Slice(scratch, cached_len);
101 return IOStatus::OK();
102 }
103 n -= cached_len;
104
105 IOStatus s;
106 // Read-ahead only make sense if we have some slack left after reading
107 if (n + alignment_ >= readahead_size_) {
108 s = file_->Read(n, opts, result, scratch + cached_len, dbg);
109 if (s.ok()) {
110 read_offset_ += result->size();
111 *result = Slice(scratch, cached_len + result->size());
112 }
113 buffer_.Clear();
114 return s;
115 }
116
117 s = ReadIntoBuffer(readahead_size_, opts, dbg);
118 if (s.ok()) {
119 // The data we need is now in cache, so we can safely read it
120 size_t remaining_len;
121 TryReadFromCache(n, &remaining_len, scratch + cached_len);
122 *result = Slice(scratch, cached_len + remaining_len);
123 }
124 return s;
125 }
126
127 IOStatus Skip(uint64_t n) override {
128 std::unique_lock<std::mutex> lk(lock_);
129 IOStatus s = IOStatus::OK();
130 // First check if we need to skip already cached data
131 if (buffer_.CurrentSize() > 0) {
132 // Do we need to skip beyond cached data?
133 if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) {
134 // Yes. Skip whaterver is in memory and adjust offset accordingly
135 n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_;
136 read_offset_ = buffer_offset_ + buffer_.CurrentSize();
137 } else {
138 // No. The entire section to be skipped is entirely i cache.
139 read_offset_ += n;
140 n = 0;
141 }
142 }
143 if (n > 0) {
144 // We still need to skip more, so call the file API for skipping
145 s = file_->Skip(n);
146 if (s.ok()) {
147 read_offset_ += n;
148 }
149 buffer_.Clear();
150 }
151 return s;
152 }
153
154 IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts,
155 Slice* result, char* scratch,
156 IODebugContext* dbg) override {
157 return file_->PositionedRead(offset, n, opts, result, scratch, dbg);
158 }
159
160 IOStatus InvalidateCache(size_t offset, size_t length) override {
161 std::unique_lock<std::mutex> lk(lock_);
162 buffer_.Clear();
163 return file_->InvalidateCache(offset, length);
164 }
165
166 bool use_direct_io() const override { return file_->use_direct_io(); }
167
168 private:
169 // Tries to read from buffer_ n bytes. If anything was read from the cache, it
170 // sets cached_len to the number of bytes actually read, copies these number
171 // of bytes to scratch and returns true.
172 // If nothing was read sets cached_len to 0 and returns false.
173 bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) {
174 if (read_offset_ < buffer_offset_ ||
175 read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) {
176 *cached_len = 0;
177 return false;
178 }
179 uint64_t offset_in_buffer = read_offset_ - buffer_offset_;
180 *cached_len = std::min(
181 buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
182 memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
183 read_offset_ += *cached_len;
184 return true;
185 }
186
187 // Reads into buffer_ the next n bytes from file_.
188 // Can actually read less if EOF was reached.
189 // Returns the status of the read operastion on the file.
190 IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts,
191 IODebugContext* dbg) {
192 if (n > buffer_.Capacity()) {
193 n = buffer_.Capacity();
194 }
195 assert(IsFileSectorAligned(n, alignment_));
196 Slice result;
197 IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg);
198 if (s.ok()) {
199 buffer_offset_ = read_offset_;
200 buffer_.Size(result.size());
201 assert(result.size() == 0 || buffer_.BufferStart() == result.data());
202 }
203 return s;
204 }
205
206 const std::unique_ptr<FSSequentialFile> file_;
207 const size_t alignment_;
208 const size_t readahead_size_;
209
210 std::mutex lock_;
211 // The buffer storing the prefetched data
212 AlignedBuffer buffer_;
213 // The offset in file_, corresponding to data stored in buffer_
214 uint64_t buffer_offset_;
215 // The offset up to which data was read from file_. In fact, it can be larger
216 // than the actual file size, since the file_->Skip(n) call doesn't return the
217 // actual number of bytes that were skipped, which can be less than n.
218 // This is not a problemm since read_offset_ is monotonically increasing and
219 // its only use is to figure out if next piece of data should be read from
220 // buffer_ or file_ directly.
221 uint64_t read_offset_;
222 };
223 } // namespace
224
225 std::unique_ptr<FSSequentialFile>
226 SequentialFileReader::NewReadaheadSequentialFile(
227 std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size) {
228 if (file->GetRequiredBufferAlignment() >= readahead_size) {
229 // Short-circuit and return the original file if readahead_size is
230 // too small and hence doesn't make sense to be used for prefetching.
231 return std::move(file);
232 }
233 std::unique_ptr<FSSequentialFile> result(
234 new ReadaheadSequentialFile(std::move(file), readahead_size));
235 return result;
236 }
237 } // namespace ROCKSDB_NAMESPACE