]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/util/file_reader_writer.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rocksdb / util / file_reader_writer.h
1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
5 //
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
9 #pragma once
10 #include <atomic>
11 #include <sstream>
12 #include <string>
13 #include "port/port.h"
14 #include "rocksdb/env.h"
15 #include "rocksdb/rate_limiter.h"
16 #include "util/aligned_buffer.h"
17 #include "util/sync_point.h"
18
19 namespace rocksdb {
20
21 class Statistics;
22 class HistogramImpl;
23
24 std::unique_ptr<RandomAccessFile> NewReadaheadRandomAccessFile(
25 std::unique_ptr<RandomAccessFile>&& file, size_t readahead_size);
26
27 class SequentialFileReader {
28 private:
29 std::unique_ptr<SequentialFile> file_;
30 std::string file_name_;
31 std::atomic<size_t> offset_; // read offset
32
33 public:
34 explicit SequentialFileReader(std::unique_ptr<SequentialFile>&& _file,
35 const std::string& _file_name)
36 : file_(std::move(_file)), file_name_(_file_name), offset_(0) {}
37
38 SequentialFileReader(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
39 *this = std::move(o);
40 }
41
42 SequentialFileReader& operator=(SequentialFileReader&& o) ROCKSDB_NOEXCEPT {
43 file_ = std::move(o.file_);
44 return *this;
45 }
46
47 SequentialFileReader(const SequentialFileReader&) = delete;
48 SequentialFileReader& operator=(const SequentialFileReader&) = delete;
49
50 Status Read(size_t n, Slice* result, char* scratch);
51
52 Status Skip(uint64_t n);
53
54 void Rewind();
55
56 SequentialFile* file() { return file_.get(); }
57
58 std::string file_name() { return file_name_; }
59
60 bool use_direct_io() const { return file_->use_direct_io(); }
61 };
62
63 class RandomAccessFileReader {
64 private:
65 std::unique_ptr<RandomAccessFile> file_;
66 std::string file_name_;
67 Env* env_;
68 Statistics* stats_;
69 uint32_t hist_type_;
70 HistogramImpl* file_read_hist_;
71 RateLimiter* rate_limiter_;
72 bool for_compaction_;
73
74 public:
75 explicit RandomAccessFileReader(std::unique_ptr<RandomAccessFile>&& raf,
76 std::string _file_name,
77 Env* env = nullptr,
78 Statistics* stats = nullptr,
79 uint32_t hist_type = 0,
80 HistogramImpl* file_read_hist = nullptr,
81 RateLimiter* rate_limiter = nullptr,
82 bool for_compaction = false)
83 : file_(std::move(raf)),
84 file_name_(std::move(_file_name)),
85 env_(env),
86 stats_(stats),
87 hist_type_(hist_type),
88 file_read_hist_(file_read_hist),
89 rate_limiter_(rate_limiter),
90 for_compaction_(for_compaction) {}
91
92 RandomAccessFileReader(RandomAccessFileReader&& o) ROCKSDB_NOEXCEPT {
93 *this = std::move(o);
94 }
95
96 RandomAccessFileReader& operator=(RandomAccessFileReader&& o)
97 ROCKSDB_NOEXCEPT {
98 file_ = std::move(o.file_);
99 env_ = std::move(o.env_);
100 stats_ = std::move(o.stats_);
101 hist_type_ = std::move(o.hist_type_);
102 file_read_hist_ = std::move(o.file_read_hist_);
103 rate_limiter_ = std::move(o.rate_limiter_);
104 for_compaction_ = std::move(o.for_compaction_);
105 return *this;
106 }
107
108 RandomAccessFileReader(const RandomAccessFileReader&) = delete;
109 RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;
110
111 Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const;
112
113 Status Prefetch(uint64_t offset, size_t n) const {
114 return file_->Prefetch(offset, n);
115 }
116
117 RandomAccessFile* file() { return file_.get(); }
118
119 std::string file_name() const { return file_name_; }
120
121 bool use_direct_io() const { return file_->use_direct_io(); }
122 };
123
124 // Use posix write to write data to a file.
125 class WritableFileWriter {
126 private:
127 std::unique_ptr<WritableFile> writable_file_;
128 std::string file_name_;
129 AlignedBuffer buf_;
130 size_t max_buffer_size_;
131 // Actually written data size can be used for truncate
132 // not counting padding data
133 uint64_t filesize_;
134 #ifndef ROCKSDB_LITE
135 // This is necessary when we use unbuffered access
136 // and writes must happen on aligned offsets
137 // so we need to go back and write that page again
138 uint64_t next_write_offset_;
139 #endif // ROCKSDB_LITE
140 bool pending_sync_;
141 uint64_t last_sync_size_;
142 uint64_t bytes_per_sync_;
143 RateLimiter* rate_limiter_;
144 Statistics* stats_;
145
146 public:
147 WritableFileWriter(std::unique_ptr<WritableFile>&& file,
148 const std::string& _file_name, const EnvOptions& options,
149 Statistics* stats = nullptr)
150 : writable_file_(std::move(file)),
151 file_name_(_file_name),
152 buf_(),
153 max_buffer_size_(options.writable_file_max_buffer_size),
154 filesize_(0),
155 #ifndef ROCKSDB_LITE
156 next_write_offset_(0),
157 #endif // ROCKSDB_LITE
158 pending_sync_(false),
159 last_sync_size_(0),
160 bytes_per_sync_(options.bytes_per_sync),
161 rate_limiter_(options.rate_limiter),
162 stats_(stats) {
163 TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
164 reinterpret_cast<void*>(max_buffer_size_));
165 buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
166 buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
167 }
168
169 WritableFileWriter(const WritableFileWriter&) = delete;
170
171 WritableFileWriter& operator=(const WritableFileWriter&) = delete;
172
173 ~WritableFileWriter() { Close(); }
174
175 std::string file_name() const { return file_name_; }
176
177 Status Append(const Slice& data);
178
179 Status Pad(const size_t pad_bytes);
180
181 Status Flush();
182
183 Status Close();
184
185 Status Sync(bool use_fsync);
186
187 // Sync only the data that was already Flush()ed. Safe to call concurrently
188 // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
189 // returns NotSupported status.
190 Status SyncWithoutFlush(bool use_fsync);
191
192 uint64_t GetFileSize() { return filesize_; }
193
194 Status InvalidateCache(size_t offset, size_t length) {
195 return writable_file_->InvalidateCache(offset, length);
196 }
197
198 WritableFile* writable_file() const { return writable_file_.get(); }
199
200 bool use_direct_io() { return writable_file_->use_direct_io(); }
201
202 bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; }
203
204 private:
205 // Used when os buffering is OFF and we are writing
206 // DMA such as in Direct I/O mode
207 #ifndef ROCKSDB_LITE
208 Status WriteDirect();
209 #endif // !ROCKSDB_LITE
210 // Normal write
211 Status WriteBuffered(const char* data, size_t size);
212 Status RangeSync(uint64_t offset, uint64_t nbytes);
213 Status SyncInternal(bool use_fsync);
214 };
215
216 // FilePrefetchBuffer can automatically do the readahead if file_reader,
217 // readahead_size, and max_readahead_size are passed in.
218 // max_readahead_size should be greater than or equal to readahead_size.
219 // readahead_size will be doubled on every IO, until max_readahead_size.
220 class FilePrefetchBuffer {
221 public:
222 // If `track_min_offset` is true, track minimum offset ever read.
223 FilePrefetchBuffer(RandomAccessFileReader* file_reader = nullptr,
224 size_t readadhead_size = 0, size_t max_readahead_size = 0,
225 bool enable = true, bool track_min_offset = false)
226 : buffer_offset_(0),
227 file_reader_(file_reader),
228 readahead_size_(readadhead_size),
229 max_readahead_size_(max_readahead_size),
230 min_offset_read_(port::kMaxSizet),
231 enable_(enable),
232 track_min_offset_(track_min_offset) {}
233 Status Prefetch(RandomAccessFileReader* reader, uint64_t offset, size_t n);
234 bool TryReadFromCache(uint64_t offset, size_t n, Slice* result);
235
236 // The minimum `offset` ever passed to TryReadFromCache(). Only be tracked
237 // if track_min_offset = true.
238 size_t min_offset_read() const { return min_offset_read_; }
239
240 private:
241 AlignedBuffer buffer_;
242 uint64_t buffer_offset_;
243 RandomAccessFileReader* file_reader_;
244 size_t readahead_size_;
245 size_t max_readahead_size_;
246 // The minimum `offset` ever passed to TryReadFromCache().
247 size_t min_offset_read_;
248 // if false, TryReadFromCache() always return false, and we only take stats
249 // for track_min_offset_ if track_min_offset_ = true
250 bool enable_;
251 // If true, track minimum `offset` ever passed to TryReadFromCache(), which
252 // can be fetched from min_offset_read().
253 bool track_min_offset_;
254 };
255
256 extern Status NewWritableFile(Env* env, const std::string& fname,
257 unique_ptr<WritableFile>* result,
258 const EnvOptions& options);
259 bool ReadOneLine(std::istringstream* iss, SequentialFile* seq_file,
260 std::string* output, bool* has_data, Status* result);
261
262 } // namespace rocksdb