]>
git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/file/sequence_file_reader.cc
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
10 #include "file/sequence_file_reader.h"
15 #include "file/read_write_util.h"
16 #include "monitoring/histogram.h"
17 #include "monitoring/iostats_context_imp.h"
18 #include "port/port.h"
19 #include "test_util/sync_point.h"
20 #include "util/aligned_buffer.h"
21 #include "util/random.h"
22 #include "util/rate_limiter.h"
24 namespace ROCKSDB_NAMESPACE
{
25 Status
SequentialFileReader::Read(size_t n
, Slice
* result
, char* scratch
) {
27 if (use_direct_io()) {
29 size_t offset
= offset_
.fetch_add(n
);
30 size_t alignment
= file_
->GetRequiredBufferAlignment();
31 size_t aligned_offset
= TruncateToPageBoundary(alignment
, offset
);
32 size_t offset_advance
= offset
- aligned_offset
;
33 size_t size
= Roundup(offset
+ n
, alignment
) - aligned_offset
;
36 buf
.Alignment(alignment
);
37 buf
.AllocateNewBuffer(size
);
39 s
= file_
->PositionedRead(aligned_offset
, size
, IOOptions(), &tmp
,
40 buf
.BufferStart(), nullptr);
41 if (s
.ok() && offset_advance
< tmp
.size()) {
43 r
= buf
.Read(scratch
, offset_advance
,
44 std::min(tmp
.size() - offset_advance
, n
));
46 *result
= Slice(scratch
, r
);
47 #endif // !ROCKSDB_LITE
49 s
= file_
->Read(n
, IOOptions(), result
, scratch
, nullptr);
51 IOSTATS_ADD(bytes_read
, result
->size());
55 Status
SequentialFileReader::Skip(uint64_t n
) {
57 if (use_direct_io()) {
58 offset_
+= static_cast<size_t>(n
);
61 #endif // !ROCKSDB_LITE
62 return file_
->Skip(n
);
66 // This class wraps a SequentialFile, exposing same API, with the differenece
67 // of being able to prefetch up to readahead_size bytes and then serve them
68 // from memory, avoiding the entire round-trip if, for example, the data for the
69 // file is actually remote.
70 class ReadaheadSequentialFile
: public FSSequentialFile
{
72 ReadaheadSequentialFile(std::unique_ptr
<FSSequentialFile
>&& file
,
73 size_t readahead_size
)
74 : file_(std::move(file
)),
75 alignment_(file_
->GetRequiredBufferAlignment()),
76 readahead_size_(Roundup(readahead_size
, alignment_
)),
80 buffer_
.Alignment(alignment_
);
81 buffer_
.AllocateNewBuffer(readahead_size_
);
84 ReadaheadSequentialFile(const ReadaheadSequentialFile
&) = delete;
86 ReadaheadSequentialFile
& operator=(const ReadaheadSequentialFile
&) = delete;
88 IOStatus
Read(size_t n
, const IOOptions
& opts
, Slice
* result
, char* scratch
,
89 IODebugContext
* dbg
) override
{
90 std::unique_lock
<std::mutex
> lk(lock_
);
92 size_t cached_len
= 0;
93 // Check if there is a cache hit, meaning that [offset, offset + n) is
94 // either completely or partially in the buffer. If it's completely cached,
95 // including end of file case when offset + n is greater than EOF, then
97 if (TryReadFromCache(n
, &cached_len
, scratch
) &&
98 (cached_len
== n
|| buffer_
.CurrentSize() < readahead_size_
)) {
99 // We read exactly what we needed, or we hit end of file - return.
100 *result
= Slice(scratch
, cached_len
);
101 return IOStatus::OK();
106 // Read-ahead only make sense if we have some slack left after reading
107 if (n
+ alignment_
>= readahead_size_
) {
108 s
= file_
->Read(n
, opts
, result
, scratch
+ cached_len
, dbg
);
110 read_offset_
+= result
->size();
111 *result
= Slice(scratch
, cached_len
+ result
->size());
117 s
= ReadIntoBuffer(readahead_size_
, opts
, dbg
);
119 // The data we need is now in cache, so we can safely read it
120 size_t remaining_len
;
121 TryReadFromCache(n
, &remaining_len
, scratch
+ cached_len
);
122 *result
= Slice(scratch
, cached_len
+ remaining_len
);
127 IOStatus
Skip(uint64_t n
) override
{
128 std::unique_lock
<std::mutex
> lk(lock_
);
129 IOStatus s
= IOStatus::OK();
130 // First check if we need to skip already cached data
131 if (buffer_
.CurrentSize() > 0) {
132 // Do we need to skip beyond cached data?
133 if (read_offset_
+ n
>= buffer_offset_
+ buffer_
.CurrentSize()) {
134 // Yes. Skip whaterver is in memory and adjust offset accordingly
135 n
-= buffer_offset_
+ buffer_
.CurrentSize() - read_offset_
;
136 read_offset_
= buffer_offset_
+ buffer_
.CurrentSize();
138 // No. The entire section to be skipped is entirely i cache.
144 // We still need to skip more, so call the file API for skipping
154 IOStatus
PositionedRead(uint64_t offset
, size_t n
, const IOOptions
& opts
,
155 Slice
* result
, char* scratch
,
156 IODebugContext
* dbg
) override
{
157 return file_
->PositionedRead(offset
, n
, opts
, result
, scratch
, dbg
);
160 IOStatus
InvalidateCache(size_t offset
, size_t length
) override
{
161 std::unique_lock
<std::mutex
> lk(lock_
);
163 return file_
->InvalidateCache(offset
, length
);
166 bool use_direct_io() const override
{ return file_
->use_direct_io(); }
169 // Tries to read from buffer_ n bytes. If anything was read from the cache, it
170 // sets cached_len to the number of bytes actually read, copies these number
171 // of bytes to scratch and returns true.
172 // If nothing was read sets cached_len to 0 and returns false.
173 bool TryReadFromCache(size_t n
, size_t* cached_len
, char* scratch
) {
174 if (read_offset_
< buffer_offset_
||
175 read_offset_
>= buffer_offset_
+ buffer_
.CurrentSize()) {
179 uint64_t offset_in_buffer
= read_offset_
- buffer_offset_
;
180 *cached_len
= std::min(
181 buffer_
.CurrentSize() - static_cast<size_t>(offset_in_buffer
), n
);
182 memcpy(scratch
, buffer_
.BufferStart() + offset_in_buffer
, *cached_len
);
183 read_offset_
+= *cached_len
;
187 // Reads into buffer_ the next n bytes from file_.
188 // Can actually read less if EOF was reached.
189 // Returns the status of the read operastion on the file.
190 IOStatus
ReadIntoBuffer(size_t n
, const IOOptions
& opts
,
191 IODebugContext
* dbg
) {
192 if (n
> buffer_
.Capacity()) {
193 n
= buffer_
.Capacity();
195 assert(IsFileSectorAligned(n
, alignment_
));
197 IOStatus s
= file_
->Read(n
, opts
, &result
, buffer_
.BufferStart(), dbg
);
199 buffer_offset_
= read_offset_
;
200 buffer_
.Size(result
.size());
201 assert(result
.size() == 0 || buffer_
.BufferStart() == result
.data());
206 const std::unique_ptr
<FSSequentialFile
> file_
;
207 const size_t alignment_
;
208 const size_t readahead_size_
;
211 // The buffer storing the prefetched data
212 AlignedBuffer buffer_
;
213 // The offset in file_, corresponding to data stored in buffer_
214 uint64_t buffer_offset_
;
215 // The offset up to which data was read from file_. In fact, it can be larger
216 // than the actual file size, since the file_->Skip(n) call doesn't return the
217 // actual number of bytes that were skipped, which can be less than n.
218 // This is not a problemm since read_offset_ is monotonically increasing and
219 // its only use is to figure out if next piece of data should be read from
220 // buffer_ or file_ directly.
221 uint64_t read_offset_
;
225 std::unique_ptr
<FSSequentialFile
>
226 SequentialFileReader::NewReadaheadSequentialFile(
227 std::unique_ptr
<FSSequentialFile
>&& file
, size_t readahead_size
) {
228 if (file
->GetRequiredBufferAlignment() >= readahead_size
) {
229 // Short-circuit and return the original file if readahead_size is
230 // too small and hence doesn't make sense to be used for prefetching.
231 return std::move(file
);
233 std::unique_ptr
<FSSequentialFile
> result(
234 new ReadaheadSequentialFile(std::move(file
), readahead_size
));
237 } // namespace ROCKSDB_NAMESPACE