1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
13 #include "port/port.h"
14 #include "rocksdb/env.h"
15 #include "rocksdb/rate_limiter.h"
16 #include "util/aligned_buffer.h"
17 #include "util/sync_point.h"
24 std::unique_ptr
<RandomAccessFile
> NewReadaheadRandomAccessFile(
25 std::unique_ptr
<RandomAccessFile
>&& file
, size_t readahead_size
);
27 class SequentialFileReader
{
29 std::unique_ptr
<SequentialFile
> file_
;
30 std::string file_name_
;
31 std::atomic
<size_t> offset_
; // read offset
34 explicit SequentialFileReader(std::unique_ptr
<SequentialFile
>&& _file
,
35 const std::string
& _file_name
)
36 : file_(std::move(_file
)), file_name_(_file_name
), offset_(0) {}
38 SequentialFileReader(SequentialFileReader
&& o
) ROCKSDB_NOEXCEPT
{
42 SequentialFileReader
& operator=(SequentialFileReader
&& o
) ROCKSDB_NOEXCEPT
{
43 file_
= std::move(o
.file_
);
47 SequentialFileReader(const SequentialFileReader
&) = delete;
48 SequentialFileReader
& operator=(const SequentialFileReader
&) = delete;
50 Status
Read(size_t n
, Slice
* result
, char* scratch
);
52 Status
Skip(uint64_t n
);
56 SequentialFile
* file() { return file_
.get(); }
58 std::string
file_name() { return file_name_
; }
60 bool use_direct_io() const { return file_
->use_direct_io(); }
63 class RandomAccessFileReader
{
65 std::unique_ptr
<RandomAccessFile
> file_
;
66 std::string file_name_
;
70 HistogramImpl
* file_read_hist_
;
71 RateLimiter
* rate_limiter_
;
75 explicit RandomAccessFileReader(std::unique_ptr
<RandomAccessFile
>&& raf
,
76 std::string _file_name
,
78 Statistics
* stats
= nullptr,
79 uint32_t hist_type
= 0,
80 HistogramImpl
* file_read_hist
= nullptr,
81 RateLimiter
* rate_limiter
= nullptr,
82 bool for_compaction
= false)
83 : file_(std::move(raf
)),
84 file_name_(std::move(_file_name
)),
87 hist_type_(hist_type
),
88 file_read_hist_(file_read_hist
),
89 rate_limiter_(rate_limiter
),
90 for_compaction_(for_compaction
) {}
92 RandomAccessFileReader(RandomAccessFileReader
&& o
) ROCKSDB_NOEXCEPT
{
96 RandomAccessFileReader
& operator=(RandomAccessFileReader
&& o
)
98 file_
= std::move(o
.file_
);
99 env_
= std::move(o
.env_
);
100 stats_
= std::move(o
.stats_
);
101 hist_type_
= std::move(o
.hist_type_
);
102 file_read_hist_
= std::move(o
.file_read_hist_
);
103 rate_limiter_
= std::move(o
.rate_limiter_
);
104 for_compaction_
= std::move(o
.for_compaction_
);
108 RandomAccessFileReader(const RandomAccessFileReader
&) = delete;
109 RandomAccessFileReader
& operator=(const RandomAccessFileReader
&) = delete;
111 Status
Read(uint64_t offset
, size_t n
, Slice
* result
, char* scratch
) const;
113 Status
Prefetch(uint64_t offset
, size_t n
) const {
114 return file_
->Prefetch(offset
, n
);
117 RandomAccessFile
* file() { return file_
.get(); }
119 std::string
file_name() const { return file_name_
; }
121 bool use_direct_io() const { return file_
->use_direct_io(); }
124 // Use posix write to write data to a file.
125 class WritableFileWriter
{
127 std::unique_ptr
<WritableFile
> writable_file_
;
128 std::string file_name_
;
130 size_t max_buffer_size_
;
131 // Actually written data size can be used for truncate
132 // not counting padding data
135 // This is necessary when we use unbuffered access
136 // and writes must happen on aligned offsets
137 // so we need to go back and write that page again
138 uint64_t next_write_offset_
;
139 #endif // ROCKSDB_LITE
141 uint64_t last_sync_size_
;
142 uint64_t bytes_per_sync_
;
143 RateLimiter
* rate_limiter_
;
147 WritableFileWriter(std::unique_ptr
<WritableFile
>&& file
,
148 const std::string
& _file_name
, const EnvOptions
& options
,
149 Statistics
* stats
= nullptr)
150 : writable_file_(std::move(file
)),
151 file_name_(_file_name
),
153 max_buffer_size_(options
.writable_file_max_buffer_size
),
156 next_write_offset_(0),
157 #endif // ROCKSDB_LITE
158 pending_sync_(false),
160 bytes_per_sync_(options
.bytes_per_sync
),
161 rate_limiter_(options
.rate_limiter
),
163 TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
164 reinterpret_cast<void*>(max_buffer_size_
));
165 buf_
.Alignment(writable_file_
->GetRequiredBufferAlignment());
166 buf_
.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_
));
169 WritableFileWriter(const WritableFileWriter
&) = delete;
171 WritableFileWriter
& operator=(const WritableFileWriter
&) = delete;
173 ~WritableFileWriter() { Close(); }
175 std::string
file_name() const { return file_name_
; }
177 Status
Append(const Slice
& data
);
179 Status
Pad(const size_t pad_bytes
);
185 Status
Sync(bool use_fsync
);
187 // Sync only the data that was already Flush()ed. Safe to call concurrently
188 // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
189 // returns NotSupported status.
190 Status
SyncWithoutFlush(bool use_fsync
);
192 uint64_t GetFileSize() { return filesize_
; }
194 Status
InvalidateCache(size_t offset
, size_t length
) {
195 return writable_file_
->InvalidateCache(offset
, length
);
198 WritableFile
* writable_file() const { return writable_file_
.get(); }
200 bool use_direct_io() { return writable_file_
->use_direct_io(); }
202 bool TEST_BufferIsEmpty() { return buf_
.CurrentSize() == 0; }
205 // Used when os buffering is OFF and we are writing
206 // DMA such as in Direct I/O mode
208 Status
WriteDirect();
209 #endif // !ROCKSDB_LITE
211 Status
WriteBuffered(const char* data
, size_t size
);
212 Status
RangeSync(uint64_t offset
, uint64_t nbytes
);
213 Status
SyncInternal(bool use_fsync
);
216 // FilePrefetchBuffer can automatically do the readahead if file_reader,
217 // readahead_size, and max_readahead_size are passed in.
218 // max_readahead_size should be greater than or equal to readahead_size.
219 // readahead_size will be doubled on every IO, until max_readahead_size.
220 class FilePrefetchBuffer
{
222 // If `track_min_offset` is true, track minimum offset ever read.
223 FilePrefetchBuffer(RandomAccessFileReader
* file_reader
= nullptr,
224 size_t readadhead_size
= 0, size_t max_readahead_size
= 0,
225 bool enable
= true, bool track_min_offset
= false)
227 file_reader_(file_reader
),
228 readahead_size_(readadhead_size
),
229 max_readahead_size_(max_readahead_size
),
230 min_offset_read_(port::kMaxSizet
),
232 track_min_offset_(track_min_offset
) {}
233 Status
Prefetch(RandomAccessFileReader
* reader
, uint64_t offset
, size_t n
);
234 bool TryReadFromCache(uint64_t offset
, size_t n
, Slice
* result
);
236 // The minimum `offset` ever passed to TryReadFromCache(). Only be tracked
237 // if track_min_offset = true.
238 size_t min_offset_read() const { return min_offset_read_
; }
241 AlignedBuffer buffer_
;
242 uint64_t buffer_offset_
;
243 RandomAccessFileReader
* file_reader_
;
244 size_t readahead_size_
;
245 size_t max_readahead_size_
;
246 // The minimum `offset` ever passed to TryReadFromCache().
247 size_t min_offset_read_
;
248 // if false, TryReadFromCache() always return false, and we only take stats
249 // for track_min_offset_ if track_min_offset_ = true
251 // If true, track minimum `offset` ever passed to TryReadFromCache(), which
252 // can be fetched from min_offset_read().
253 bool track_min_offset_
;
256 extern Status
NewWritableFile(Env
* env
, const std::string
& fname
,
257 unique_ptr
<WritableFile
>* result
,
258 const EnvOptions
& options
);
259 bool ReadOneLine(std::istringstream
* iss
, SequentialFile
* seq_file
,
260 std::string
* output
, bool* has_data
, Status
* result
);
262 } // namespace rocksdb