]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/util/file_reader_writer.h
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / rocksdb / util / file_reader_writer.h
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9#pragma once
10#include <atomic>
11fdf7f2 11#include <sstream>
7c673cae
FG
12#include <string>
13#include "port/port.h"
14#include "rocksdb/env.h"
494da23a 15#include "rocksdb/listener.h"
11fdf7f2 16#include "rocksdb/rate_limiter.h"
7c673cae 17#include "util/aligned_buffer.h"
11fdf7f2 18#include "util/sync_point.h"
7c673cae
FG
19
20namespace rocksdb {
21
22class Statistics;
23class HistogramImpl;
24
25std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
26 std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size);
27
28class SequentialFileReader {
29 private:
30 std::unique_ptr<SequentialFile> file_;
11fdf7f2 31 std::string file_name_;
7c673cae
FG
32 std::atomic<size_t> offset_; // read offset
33
34 public:
11fdf7f2
TL
35 explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file,
36 const std::string& _file_name)
37 : file_(std::move(_file)), file_name_(_file_name), offset_(0) {}
7c673cae
FG
38
39 SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
40 *this = std::move(o);
41 }
42
43 SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
44 file_ = std::move(o.file_);
45 return *this;
46 }
47
48 SequentialFileReader(const SequentialFileReader&) = delete;
49 SequentialFileReader& operator=(const SequentialFileReader&) = delete;
50
51 Status Read(size_t n, Slice* result, char* scratch);
52
53 Status Skip(uint64_t n);
54
11fdf7f2
TL
55 void Rewind();
56
7c673cae
FG
57 SequentialFile* file() { return file_.get(); }
58
11fdf7f2 59 std::string file_name() { return file_name_; }
7c673cae 60
11fdf7f2 61 bool use_direct_io() const { return file_->use_direct_io(); }
7c673cae
FG
62};
63
64class RandomAccessFileReader {
65 private:
494da23a
TL
66#ifndef ROCKSDB_LITE
67 void NotifyOnFileReadFinish(uint64_t offset, size_t length,
68 const FileOperationInfo::TimePoint& start_ts,
69 const FileOperationInfo::TimePoint& finish_ts,
70 const Status& status) const {
71 FileOperationInfo info(file_name_, start_ts, finish_ts);
72 info.offset = offset;
73 info.length = length;
74 info.status = status;
75
76 for (auto& listener : listeners_) {
77 listener->OnFileReadFinish(info);
78 }
79 }
80#endif // ROCKSDB_LITE
81
82 bool ShouldNotifyListeners() const { return !listeners_.empty(); }
83
7c673cae 84 std::unique_ptr<RandomAccessFile> file_;
11fdf7f2 85 std::string file_name_;
7c673cae
FG
86 Env* env_;
87 Statistics* stats_;
88 uint32_t hist_type_;
89 HistogramImpl* file_read_hist_;
11fdf7f2
TL
90 RateLimiter* rate_limiter_;
91 bool for_compaction_;
494da23a 92 std::vector<std::shared_ptr<EventListener>> listeners_;
7c673cae
FG
93
94 public:
494da23a
TL
95 explicit RandomAccessFileReader(
96 std::unique_ptr<RandomAccessFile>&& raf, std::string _file_name,
97 Env* env = nullptr, Statistics* stats = nullptr, uint32_t hist_type = 0,
98 HistogramImpl* file_read_hist = nullptr,
99 RateLimiter* rate_limiter = nullptr, bool for_compaction = false,
100 const std::vector<std::shared_ptr<EventListener>>& listeners = {})
7c673cae 101 : file_(std::move(raf)),
11fdf7f2 102 file_name_(std::move(_file_name)),
7c673cae
FG
103 env_(env),
104 stats_(stats),
105 hist_type_(hist_type),
11fdf7f2
TL
106 file_read_hist_(file_read_hist),
107 rate_limiter_(rate_limiter),
494da23a
TL
108 for_compaction_(for_compaction),
109 listeners_() {
110#ifndef ROCKSDB_LITE
111 std::for_each(listeners.begin(), listeners.end(),
112 [this](const std::shared_ptr<EventListener>& e) {
113 if (e->ShouldBeNotifiedOnFileIO()) {
114 listeners_.emplace_back(e);
115 }
116 });
117#else // !ROCKSDB_LITE
118 (void)listeners;
119#endif
120 }
7c673cae
FG
121
122 RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT {
123 *this = std::move(o);
124 }
125
11fdf7f2
TL
126 RandomAccessFileReader& operator=(RandomAccessFileReader&& o)
127 ROCKSDB_NOEXCEPT {
7c673cae
FG
128 file_ = std::move(o.file_);
129 env_ = std::move(o.env_);
130 stats_ = std::move(o.stats_);
131 hist_type_ = std::move(o.hist_type_);
132 file_read_hist_ = std::move(o.file_read_hist_);
11fdf7f2
TL
133 rate_limiter_ = std::move(o.rate_limiter_);
134 for_compaction_ = std::move(o.for_compaction_);
7c673cae
FG
135 return *this;
136 }
137
138 RandomAccessFileReader(const RandomAccessFileReader&) = delete;
139 RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;
140
141 Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const;
142
143 Status Prefetch(uint64_t offset, size_t n) const {
144 return file_->Prefetch(offset, n);
145 }
146
147 RandomAccessFile* file() { return file_.get(); }
148
11fdf7f2 149 std::string file_name() const { return file_name_; }
7c673cae 150
11fdf7f2 151 bool use_direct_io() const { return file_->use_direct_io(); }
7c673cae
FG
152};
153
154// Use posix write to write data to a file.
155class WritableFileWriter {
156 private:
494da23a
TL
157#ifndef ROCKSDB_LITE
158 void NotifyOnFileWriteFinish(uint64_t offset, size_t length,
159 const FileOperationInfo::TimePoint& start_ts,
160 const FileOperationInfo::TimePoint& finish_ts,
161 const Status& status) {
162 FileOperationInfo info(file_name_, start_ts, finish_ts);
163 info.offset = offset;
164 info.length = length;
165 info.status = status;
166
167 for (auto& listener : listeners_) {
168 listener->OnFileWriteFinish(info);
169 }
170 }
171#endif // ROCKSDB_LITE
172
173 bool ShouldNotifyListeners() const { return !listeners_.empty(); }
174
7c673cae 175 std::unique_ptr<WritableFile> writable_file_;
11fdf7f2 176 std::string file_name_;
494da23a 177 Env* env_;
7c673cae
FG
178 AlignedBuffer buf_;
179 size_t max_buffer_size_;
180 // Actually written data size can be used for truncate
181 // not counting padding data
182 uint64_t filesize_;
11fdf7f2 183#ifndef ROCKSDB_LITE
7c673cae
FG
184 // This is necessary when we use unbuffered access
185 // and writes must happen on aligned offsets
186 // so we need to go back and write that page again
187 uint64_t next_write_offset_;
11fdf7f2 188#endif // ROCKSDB_LITE
7c673cae
FG
189 bool pending_sync_;
190 uint64_t last_sync_size_;
191 uint64_t bytes_per_sync_;
192 RateLimiter* rate_limiter_;
193 Statistics* stats_;
494da23a 194 std::vector<std::shared_ptr<EventListener>> listeners_;
7c673cae
FG
195
196 public:
494da23a
TL
197 WritableFileWriter(
198 std::unique_ptr<WritableFile>&& file, const std::string& _file_name,
199 const EnvOptions& options, Env* env = nullptr,
200 Statistics* stats = nullptr,
201 const std::vector<std::shared_ptr<EventListener>>& listeners = {})
7c673cae 202 : writable_file_(std::move(file)),
11fdf7f2 203 file_name_(_file_name),
494da23a 204 env_(env),
7c673cae
FG
205 buf_(),
206 max_buffer_size_(options.writable_file_max_buffer_size),
207 filesize_(0),
11fdf7f2 208#ifndef ROCKSDB_LITE
7c673cae 209 next_write_offset_(0),
11fdf7f2 210#endif // ROCKSDB_LITE
7c673cae
FG
211 pending_sync_(false),
212 last_sync_size_(0),
213 bytes_per_sync_(options.bytes_per_sync),
214 rate_limiter_(options.rate_limiter),
494da23a
TL
215 stats_(stats),
216 listeners_() {
11fdf7f2
TL
217 TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
218 reinterpret_cast<void*>(max_buffer_size_));
7c673cae 219 buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
11fdf7f2 220 buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
494da23a
TL
221#ifndef ROCKSDB_LITE
222 std::for_each(listeners.begin(), listeners.end(),
223 [this](const std::shared_ptr<EventListener>& e) {
224 if (e->ShouldBeNotifiedOnFileIO()) {
225 listeners_.emplace_back(e);
226 }
227 });
228#else // !ROCKSDB_LITE
229 (void)listeners;
230#endif
7c673cae
FG
231 }
232
233 WritableFileWriter(const WritableFileWriter&) = delete;
234
235 WritableFileWriter& operator=(const WritableFileWriter&) = delete;
236
237 ~WritableFileWriter() { Close(); }
238
11fdf7f2
TL
239 std::string file_name() const { return file_name_; }
240
7c673cae
FG
241 Status Append(const Slice& data);
242
11fdf7f2
TL
243 Status Pad(const size_t pad_bytes);
244
7c673cae
FG
245 Status Flush();
246
247 Status Close();
248
249 Status Sync(bool use_fsync);
250
251 // Sync only the data that was already Flush()ed. Safe to call concurrently
252 // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
253 // returns NotSupported status.
254 Status SyncWithoutFlush(bool use_fsync);
255
256 uint64_t GetFileSize() { return filesize_; }
257
258 Status InvalidateCache(size_t offset, size_t length) {
259 return writable_file_->InvalidateCache(offset, length);
260 }
261
262 WritableFile* writable_file() const { return writable_file_.get(); }
263
264 bool use_direct_io() { return writable_file_->use_direct_io(); }
265
11fdf7f2
TL
266 bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; }
267
7c673cae
FG
268 private:
269 // Used when os buffering is OFF and we are writing
270 // DMA such as in Direct I/O mode
271#ifndef ROCKSDB_LITE
272 Status WriteDirect();
273#endif // !ROCKSDB_LITE
274 // Normal write
275 Status WriteBuffered(const char* data, size_t size);
276 Status RangeSync(uint64_t offset, uint64_t nbytes);
7c673cae
FG
277 Status SyncInternal(bool use_fsync);
278};
279
11fdf7f2
TL
280// FilePrefetchBuffer can automatically do the readahead if file_reader,
281// readahead_size, and max_readahead_size are passed in.
282// max_readahead_size should be greater than or equal to readahead_size.
283// readahead_size will be doubled on every IO, until max_readahead_size.
284class FilePrefetchBuffer {
285 public:
286 // If `track_min_offset` is true, track minimum offset ever read.
287 FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr,
288 size_t readadhead_size = 0, size_t max_readahead_size = 0,
289 bool enable = true, bool track_min_offset = false)
290 : buffer_offset_(0),
291 file_reader_(file_reader),
292 readahead_size_(readadhead_size),
293 max_readahead_size_(max_readahead_size),
294 min_offset_read_(port::kMaxSizet),
295 enable_(enable),
296 track_min_offset_(track_min_offset) {}
297 Status Prefetch(RandomAccessFileReader* reader, uint64_t offset, size_t n);
298 bool TryReadFromCache(uint64_t offset, size_t n, Slice* result);
299
300 // The minimum `offset` ever passed to TryReadFromCache(). Only be tracked
301 // if track_min_offset = true.
302 size_t min_offset_read() const { return min_offset_read_; }
303
304 private:
305 AlignedBuffer buffer_;
306 uint64_t buffer_offset_;
307 RandomAccessFileReader* file_reader_;
308 size_t readahead_size_;
309 size_t max_readahead_size_;
310 // The minimum `offset` ever passed to TryReadFromCache().
311 size_t min_offset_read_;
312 // if false, TryReadFromCache() always return false, and we only take stats
313 // for track_min_offset_ if track_min_offset_ = true
314 bool enable_;
315 // If true, track minimum `offset` ever passed to TryReadFromCache(), which
316 // can be fetched from min_offset_read().
317 bool track_min_offset_;
318};
319
7c673cae 320extern Status NewWritableFile(Env* env, const std::string& fname,
494da23a 321 std::unique_ptr<WritableFile>* result,
7c673cae 322 const EnvOptions& options);
11fdf7f2
TL
323bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file,
324 std::string* output, bool* has_data, Status* result);
325
7c673cae 326} // namespace rocksdb