]>
Commit | Line | Data |
---|---|---|
7c673cae | 1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
11fdf7f2 TL |
2 | // This source code is licensed under both the GPLv2 (found in the |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
7c673cae FG |
5 | // |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | #pragma once | |
10 | #include <atomic> | |
11fdf7f2 | 11 | #include <sstream> |
7c673cae FG |
12 | #include <string> |
13 | #include "port/port.h" | |
14 | #include "rocksdb/env.h" | |
494da23a | 15 | #include "rocksdb/listener.h" |
11fdf7f2 | 16 | #include "rocksdb/rate_limiter.h" |
7c673cae | 17 | #include "util/aligned_buffer.h" |
11fdf7f2 | 18 | #include "util/sync_point.h" |
7c673cae FG |
19 | |
20 | namespace rocksdb { | |
21 | ||
22 | class Statistics; | |
23 | class HistogramImpl; | |
24 | ||
25 | std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile( | |
26 | std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size); | |
27 | ||
28 | class SequentialFileReader { | |
29 | private: | |
30 | std::unique_ptr<SequentialFile> file_; | |
11fdf7f2 | 31 | std::string file_name_; |
7c673cae FG |
32 | std::atomic<size_t> offset_; // read offset |
33 | ||
34 | public: | |
11fdf7f2 TL |
35 | explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file, |
36 | const std::string& _file_name) | |
37 | : file_(std::move(_file)), file_name_(_file_name), offset_(0) {} | |
7c673cae FG |
38 | |
39 | SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { | |
40 | *this = std::move(o); | |
41 | } | |
42 | ||
43 | SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT { | |
44 | file_ = std::move(o.file_); | |
45 | return *this; | |
46 | } | |
47 | ||
48 | SequentialFileReader(const SequentialFileReader&) = delete; | |
49 | SequentialFileReader& operator=(const SequentialFileReader&) = delete; | |
50 | ||
51 | Status Read(size_t n, Slice* result, char* scratch); | |
52 | ||
53 | Status Skip(uint64_t n); | |
54 | ||
11fdf7f2 TL |
55 | void Rewind(); |
56 | ||
7c673cae FG |
57 | SequentialFile* file() { return file_.get(); } |
58 | ||
11fdf7f2 | 59 | std::string file_name() { return file_name_; } |
7c673cae | 60 | |
11fdf7f2 | 61 | bool use_direct_io() const { return file_->use_direct_io(); } |
7c673cae FG |
62 | }; |
63 | ||
64 | class RandomAccessFileReader { | |
65 | private: | |
494da23a TL |
66 | #ifndef ROCKSDB_LITE |
67 | void NotifyOnFileReadFinish(uint64_t offset, size_t length, | |
68 | const FileOperationInfo::TimePoint& start_ts, | |
69 | const FileOperationInfo::TimePoint& finish_ts, | |
70 | const Status& status) const { | |
71 | FileOperationInfo info(file_name_, start_ts, finish_ts); | |
72 | info.offset = offset; | |
73 | info.length = length; | |
74 | info.status = status; | |
75 | ||
76 | for (auto& listener : listeners_) { | |
77 | listener->OnFileReadFinish(info); | |
78 | } | |
79 | } | |
80 | #endif // ROCKSDB_LITE | |
81 | ||
82 | bool ShouldNotifyListeners() const { return !listeners_.empty(); } | |
83 | ||
7c673cae | 84 | std::unique_ptr<RandomAccessFile> file_; |
11fdf7f2 | 85 | std::string file_name_; |
7c673cae FG |
86 | Env* env_; |
87 | Statistics* stats_; | |
88 | uint32_t hist_type_; | |
89 | HistogramImpl* file_read_hist_; | |
11fdf7f2 TL |
90 | RateLimiter* rate_limiter_; |
91 | bool for_compaction_; | |
494da23a | 92 | std::vector<std::shared_ptr<EventListener>> listeners_; |
7c673cae FG |
93 | |
94 | public: | |
494da23a TL |
95 | explicit RandomAccessFileReader( |
96 | std::unique_ptr<RandomAccessFile>&& raf, std::string _file_name, | |
97 | Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0, | |
98 | HistogramImpl* file_read_hist = nullptr, | |
99 | RateLimiter* rate_limiter = nullptr, bool for_compaction = false, | |
100 | const std::vector<std::shared_ptr<EventListener>>& listeners = {}) | |
7c673cae | 101 | : file_(std::move(raf)), |
11fdf7f2 | 102 | file_name_(std::move(_file_name)), |
7c673cae FG |
103 | env_(env), |
104 | stats_(stats), | |
105 | hist_type_(hist_type), | |
11fdf7f2 TL |
106 | file_read_hist_(file_read_hist), |
107 | rate_limiter_(rate_limiter), | |
494da23a TL |
108 | for_compaction_(for_compaction), |
109 | listeners_() { | |
110 | #ifndef ROCKSDB_LITE | |
111 | std::for_each(listeners.begin(), listeners.end(), | |
112 | [this](const std::shared_ptr<EventListener>& e) { | |
113 | if (e->ShouldBeNotifiedOnFileIO()) { | |
114 | listeners_.emplace_back(e); | |
115 | } | |
116 | }); | |
117 | #else // !ROCKSDB_LITE | |
118 | (void)listeners; | |
119 | #endif | |
120 | } | |
7c673cae FG |
121 | |
122 | RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT { | |
123 | *this = std::move(o); | |
124 | } | |
125 | ||
11fdf7f2 TL |
126 | RandomAccessFileReader& operator=(RandomAccessFileReader&& o) |
127 | ROCKSDB_NOEXCEPT { | |
7c673cae FG |
128 | file_ = std::move(o.file_); |
129 | env_ = std::move(o.env_); | |
130 | stats_ = std::move(o.stats_); | |
131 | hist_type_ = std::move(o.hist_type_); | |
132 | file_read_hist_ = std::move(o.file_read_hist_); | |
11fdf7f2 TL |
133 | rate_limiter_ = std::move(o.rate_limiter_); |
134 | for_compaction_ = std::move(o.for_compaction_); | |
7c673cae FG |
135 | return *this; |
136 | } | |
137 | ||
138 | RandomAccessFileReader(const RandomAccessFileReader&) = delete; | |
139 | RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete; | |
140 | ||
141 | Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const; | |
142 | ||
143 | Status Prefetch(uint64_t offset, size_t n) const { | |
144 | return file_->Prefetch(offset, n); | |
145 | } | |
146 | ||
147 | RandomAccessFile* file() { return file_.get(); } | |
148 | ||
11fdf7f2 | 149 | std::string file_name() const { return file_name_; } |
7c673cae | 150 | |
11fdf7f2 | 151 | bool use_direct_io() const { return file_->use_direct_io(); } |
7c673cae FG |
152 | }; |
153 | ||
154 | // Use posix write to write data to a file. | |
155 | class WritableFileWriter { | |
156 | private: | |
494da23a TL |
157 | #ifndef ROCKSDB_LITE |
158 | void NotifyOnFileWriteFinish(uint64_t offset, size_t length, | |
159 | const FileOperationInfo::TimePoint& start_ts, | |
160 | const FileOperationInfo::TimePoint& finish_ts, | |
161 | const Status& status) { | |
162 | FileOperationInfo info(file_name_, start_ts, finish_ts); | |
163 | info.offset = offset; | |
164 | info.length = length; | |
165 | info.status = status; | |
166 | ||
167 | for (auto& listener : listeners_) { | |
168 | listener->OnFileWriteFinish(info); | |
169 | } | |
170 | } | |
171 | #endif // ROCKSDB_LITE | |
172 | ||
173 | bool ShouldNotifyListeners() const { return !listeners_.empty(); } | |
174 | ||
7c673cae | 175 | std::unique_ptr<WritableFile> writable_file_; |
11fdf7f2 | 176 | std::string file_name_; |
494da23a | 177 | Env* env_; |
7c673cae FG |
178 | AlignedBuffer buf_; |
179 | size_t max_buffer_size_; | |
180 | // Actually written data size can be used for truncate | |
181 | // not counting padding data | |
182 | uint64_t filesize_; | |
11fdf7f2 | 183 | #ifndef ROCKSDB_LITE |
7c673cae FG |
184 | // This is necessary when we use unbuffered access |
185 | // and writes must happen on aligned offsets | |
186 | // so we need to go back and write that page again | |
187 | uint64_t next_write_offset_; | |
11fdf7f2 | 188 | #endif // ROCKSDB_LITE |
7c673cae FG |
189 | bool pending_sync_; |
190 | uint64_t last_sync_size_; | |
191 | uint64_t bytes_per_sync_; | |
192 | RateLimiter* rate_limiter_; | |
193 | Statistics* stats_; | |
494da23a | 194 | std::vector<std::shared_ptr<EventListener>> listeners_; |
7c673cae FG |
195 | |
196 | public: | |
494da23a TL |
197 | WritableFileWriter( |
198 | std::unique_ptr<WritableFile>&& file, const std::string& _file_name, | |
199 | const EnvOptions& options, Env* env = nullptr, | |
200 | Statistics* stats = nullptr, | |
201 | const std::vector<std::shared_ptr<EventListener>>& listeners = {}) | |
7c673cae | 202 | : writable_file_(std::move(file)), |
11fdf7f2 | 203 | file_name_(_file_name), |
494da23a | 204 | env_(env), |
7c673cae FG |
205 | buf_(), |
206 | max_buffer_size_(options.writable_file_max_buffer_size), | |
207 | filesize_(0), | |
11fdf7f2 | 208 | #ifndef ROCKSDB_LITE |
7c673cae | 209 | next_write_offset_(0), |
11fdf7f2 | 210 | #endif // ROCKSDB_LITE |
7c673cae FG |
211 | pending_sync_(false), |
212 | last_sync_size_(0), | |
213 | bytes_per_sync_(options.bytes_per_sync), | |
214 | rate_limiter_(options.rate_limiter), | |
494da23a TL |
215 | stats_(stats), |
216 | listeners_() { | |
11fdf7f2 TL |
217 | TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", |
218 | reinterpret_cast<void*>(max_buffer_size_)); | |
7c673cae | 219 | buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); |
11fdf7f2 | 220 | buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); |
494da23a TL |
221 | #ifndef ROCKSDB_LITE |
222 | std::for_each(listeners.begin(), listeners.end(), | |
223 | [this](const std::shared_ptr<EventListener>& e) { | |
224 | if (e->ShouldBeNotifiedOnFileIO()) { | |
225 | listeners_.emplace_back(e); | |
226 | } | |
227 | }); | |
228 | #else // !ROCKSDB_LITE | |
229 | (void)listeners; | |
230 | #endif | |
7c673cae FG |
231 | } |
232 | ||
233 | WritableFileWriter(const WritableFileWriter&) = delete; | |
234 | ||
235 | WritableFileWriter& operator=(const WritableFileWriter&) = delete; | |
236 | ||
237 | ~WritableFileWriter() { Close(); } | |
238 | ||
11fdf7f2 TL |
239 | std::string file_name() const { return file_name_; } |
240 | ||
7c673cae FG |
241 | Status Append(const Slice& data); |
242 | ||
11fdf7f2 TL |
243 | Status Pad(const size_t pad_bytes); |
244 | ||
7c673cae FG |
245 | Status Flush(); |
246 | ||
247 | Status Close(); | |
248 | ||
249 | Status Sync(bool use_fsync); | |
250 | ||
251 | // Sync only the data that was already Flush()ed. Safe to call concurrently | |
252 | // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(), | |
253 | // returns NotSupported status. | |
254 | Status SyncWithoutFlush(bool use_fsync); | |
255 | ||
256 | uint64_t GetFileSize() { return filesize_; } | |
257 | ||
258 | Status InvalidateCache(size_t offset, size_t length) { | |
259 | return writable_file_->InvalidateCache(offset, length); | |
260 | } | |
261 | ||
262 | WritableFile* writable_file() const { return writable_file_.get(); } | |
263 | ||
264 | bool use_direct_io() { return writable_file_->use_direct_io(); } | |
265 | ||
11fdf7f2 TL |
266 | bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; } |
267 | ||
7c673cae FG |
268 | private: |
269 | // Used when os buffering is OFF and we are writing | |
270 | // DMA such as in Direct I/O mode | |
271 | #ifndef ROCKSDB_LITE | |
272 | Status WriteDirect(); | |
273 | #endif // !ROCKSDB_LITE | |
274 | // Normal write | |
275 | Status WriteBuffered(const char* data, size_t size); | |
276 | Status RangeSync(uint64_t offset, uint64_t nbytes); | |
7c673cae FG |
277 | Status SyncInternal(bool use_fsync); |
278 | }; | |
279 | ||
11fdf7f2 TL |
280 | // FilePrefetchBuffer can automatically do the readahead if file_reader, |
281 | // readahead_size, and max_readahead_size are passed in. | |
282 | // max_readahead_size should be greater than or equal to readahead_size. | |
283 | // readahead_size will be doubled on every IO, until max_readahead_size. | |
284 | class FilePrefetchBuffer { | |
285 | public: | |
286 | // If `track_min_offset` is true, track minimum offset ever read. | |
287 | FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr, | |
288 | size_t readadhead_size = 0, size_t max_readahead_size = 0, | |
289 | bool enable = true, bool track_min_offset = false) | |
290 | : buffer_offset_(0), | |
291 | file_reader_(file_reader), | |
292 | readahead_size_(readadhead_size), | |
293 | max_readahead_size_(max_readahead_size), | |
294 | min_offset_read_(port::kMaxSizet), | |
295 | enable_(enable), | |
296 | track_min_offset_(track_min_offset) {} | |
297 | Status Prefetch(RandomAccessFileReader* reader, uint64_t offset, size_t n); | |
298 | bool TryReadFromCache(uint64_t offset, size_t n, Slice* result); | |
299 | ||
300 | // The minimum `offset` ever passed to TryReadFromCache(). Only be tracked | |
301 | // if track_min_offset = true. | |
302 | size_t min_offset_read() const { return min_offset_read_; } | |
303 | ||
304 | private: | |
305 | AlignedBuffer buffer_; | |
306 | uint64_t buffer_offset_; | |
307 | RandomAccessFileReader* file_reader_; | |
308 | size_t readahead_size_; | |
309 | size_t max_readahead_size_; | |
310 | // The minimum `offset` ever passed to TryReadFromCache(). | |
311 | size_t min_offset_read_; | |
312 | // if false, TryReadFromCache() always return false, and we only take stats | |
313 | // for track_min_offset_ if track_min_offset_ = true | |
314 | bool enable_; | |
315 | // If true, track minimum `offset` ever passed to TryReadFromCache(), which | |
316 | // can be fetched from min_offset_read(). | |
317 | bool track_min_offset_; | |
318 | }; | |
319 | ||
7c673cae | 320 | extern Status NewWritableFile(Env* env, const std::string& fname, |
494da23a | 321 | std::unique_ptr<WritableFile>* result, |
7c673cae | 322 | const EnvOptions& options); |
11fdf7f2 TL |
323 | bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file, |
324 | std::string* output, bool* has_data, Status* result); | |
325 | ||
7c673cae | 326 | } // namespace rocksdb |