]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | // This source code is licensed under both the GPLv2 (found in the | |
3 | // COPYING file in the root directory) and Apache 2.0 License | |
4 | // (found in the LICENSE.Apache file in the root directory). | |
5 | // | |
6 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. | |
7 | // Use of this source code is governed by a BSD-style license that can be | |
8 | // found in the LICENSE file. See the AUTHORS file for names of contributors. | |
9 | ||
10 | #pragma once | |
11 | #include <atomic> | |
12 | #include <string> | |
20effc67 | 13 | |
f67539c2 | 14 | #include "db/version_edit.h" |
20effc67 | 15 | #include "env/file_system_tracer.h" |
f67539c2 TL |
16 | #include "port/port.h" |
17 | #include "rocksdb/env.h" | |
18 | #include "rocksdb/file_checksum.h" | |
19 | #include "rocksdb/file_system.h" | |
20effc67 | 20 | #include "rocksdb/io_status.h" |
f67539c2 TL |
21 | #include "rocksdb/listener.h" |
22 | #include "rocksdb/rate_limiter.h" | |
23 | #include "test_util/sync_point.h" | |
24 | #include "util/aligned_buffer.h" | |
25 | ||
26 | namespace ROCKSDB_NAMESPACE { | |
27 | class Statistics; | |
28 | ||
29 | // WritableFileWriter is a wrapper on top of Env::WritableFile. It provides | |
30 | // facilities to: | |
31 | // - Handle Buffered and Direct writes. | |
32 | // - Rate limit writes. | |
33 | // - Flush and Sync the data to the underlying filesystem. | |
34 | // - Notify any interested listeners on the completion of a write. | |
35 | // - Update IO stats. | |
36 | class WritableFileWriter { | |
37 | private: | |
38 | #ifndef ROCKSDB_LITE | |
20effc67 TL |
39 | void NotifyOnFileWriteFinish( |
40 | uint64_t offset, size_t length, | |
41 | const FileOperationInfo::StartTimePoint& start_ts, | |
42 | const FileOperationInfo::FinishTimePoint& finish_ts, | |
43 | const IOStatus& io_status) { | |
44 | FileOperationInfo info(FileOperationType::kWrite, file_name_, start_ts, | |
45 | finish_ts, io_status); | |
f67539c2 TL |
46 | info.offset = offset; |
47 | info.length = length; | |
f67539c2 TL |
48 | |
49 | for (auto& listener : listeners_) { | |
50 | listener->OnFileWriteFinish(info); | |
51 | } | |
20effc67 TL |
52 | info.status.PermitUncheckedError(); |
53 | } | |
54 | void NotifyOnFileFlushFinish( | |
55 | const FileOperationInfo::StartTimePoint& start_ts, | |
56 | const FileOperationInfo::FinishTimePoint& finish_ts, | |
57 | const IOStatus& io_status) { | |
58 | FileOperationInfo info(FileOperationType::kFlush, file_name_, start_ts, | |
59 | finish_ts, io_status); | |
60 | ||
61 | for (auto& listener : listeners_) { | |
62 | listener->OnFileFlushFinish(info); | |
63 | } | |
64 | info.status.PermitUncheckedError(); | |
65 | } | |
66 | void NotifyOnFileSyncFinish( | |
67 | const FileOperationInfo::StartTimePoint& start_ts, | |
68 | const FileOperationInfo::FinishTimePoint& finish_ts, | |
69 | const IOStatus& io_status, | |
70 | FileOperationType type = FileOperationType::kSync) { | |
71 | FileOperationInfo info(type, file_name_, start_ts, finish_ts, io_status); | |
72 | ||
73 | for (auto& listener : listeners_) { | |
74 | listener->OnFileSyncFinish(info); | |
75 | } | |
76 | info.status.PermitUncheckedError(); | |
77 | } | |
78 | void NotifyOnFileRangeSyncFinish( | |
79 | uint64_t offset, size_t length, | |
80 | const FileOperationInfo::StartTimePoint& start_ts, | |
81 | const FileOperationInfo::FinishTimePoint& finish_ts, | |
82 | const IOStatus& io_status) { | |
83 | FileOperationInfo info(FileOperationType::kRangeSync, file_name_, start_ts, | |
84 | finish_ts, io_status); | |
85 | info.offset = offset; | |
86 | info.length = length; | |
87 | ||
88 | for (auto& listener : listeners_) { | |
89 | listener->OnFileRangeSyncFinish(info); | |
90 | } | |
91 | info.status.PermitUncheckedError(); | |
92 | } | |
93 | void NotifyOnFileTruncateFinish( | |
94 | const FileOperationInfo::StartTimePoint& start_ts, | |
95 | const FileOperationInfo::FinishTimePoint& finish_ts, | |
96 | const IOStatus& io_status) { | |
97 | FileOperationInfo info(FileOperationType::kTruncate, file_name_, start_ts, | |
98 | finish_ts, io_status); | |
99 | ||
100 | for (auto& listener : listeners_) { | |
101 | listener->OnFileTruncateFinish(info); | |
102 | } | |
103 | info.status.PermitUncheckedError(); | |
104 | } | |
105 | void NotifyOnFileCloseFinish( | |
106 | const FileOperationInfo::StartTimePoint& start_ts, | |
107 | const FileOperationInfo::FinishTimePoint& finish_ts, | |
108 | const IOStatus& io_status) { | |
109 | FileOperationInfo info(FileOperationType::kClose, file_name_, start_ts, | |
110 | finish_ts, io_status); | |
111 | ||
112 | for (auto& listener : listeners_) { | |
113 | listener->OnFileCloseFinish(info); | |
114 | } | |
115 | info.status.PermitUncheckedError(); | |
f67539c2 TL |
116 | } |
117 | #endif // ROCKSDB_LITE | |
118 | ||
119 | bool ShouldNotifyListeners() const { return !listeners_.empty(); } | |
20effc67 | 120 | void UpdateFileChecksum(const Slice& data); |
f67539c2 | 121 | |
f67539c2 | 122 | std::string file_name_; |
20effc67 | 123 | FSWritableFilePtr writable_file_; |
f67539c2 TL |
124 | Env* env_; |
125 | AlignedBuffer buf_; | |
126 | size_t max_buffer_size_; | |
127 | // Actually written data size can be used for truncate | |
128 | // not counting padding data | |
129 | uint64_t filesize_; | |
130 | #ifndef ROCKSDB_LITE | |
131 | // This is necessary when we use unbuffered access | |
132 | // and writes must happen on aligned offsets | |
133 | // so we need to go back and write that page again | |
134 | uint64_t next_write_offset_; | |
135 | #endif // ROCKSDB_LITE | |
136 | bool pending_sync_; | |
137 | uint64_t last_sync_size_; | |
138 | uint64_t bytes_per_sync_; | |
139 | RateLimiter* rate_limiter_; | |
140 | Statistics* stats_; | |
141 | std::vector<std::shared_ptr<EventListener>> listeners_; | |
20effc67 TL |
142 | std::unique_ptr<FileChecksumGenerator> checksum_generator_; |
143 | bool checksum_finalized_; | |
f67539c2 TL |
144 | |
145 | public: | |
146 | WritableFileWriter( | |
147 | std::unique_ptr<FSWritableFile>&& file, const std::string& _file_name, | |
148 | const FileOptions& options, Env* env = nullptr, | |
20effc67 | 149 | const std::shared_ptr<IOTracer>& io_tracer = nullptr, |
f67539c2 TL |
150 | Statistics* stats = nullptr, |
151 | const std::vector<std::shared_ptr<EventListener>>& listeners = {}, | |
20effc67 TL |
152 | FileChecksumGenFactory* file_checksum_gen_factory = nullptr) |
153 | : file_name_(_file_name), | |
154 | writable_file_(std::move(file), io_tracer), | |
f67539c2 TL |
155 | env_(env), |
156 | buf_(), | |
157 | max_buffer_size_(options.writable_file_max_buffer_size), | |
158 | filesize_(0), | |
159 | #ifndef ROCKSDB_LITE | |
160 | next_write_offset_(0), | |
161 | #endif // ROCKSDB_LITE | |
162 | pending_sync_(false), | |
163 | last_sync_size_(0), | |
164 | bytes_per_sync_(options.bytes_per_sync), | |
165 | rate_limiter_(options.rate_limiter), | |
166 | stats_(stats), | |
167 | listeners_(), | |
20effc67 TL |
168 | checksum_generator_(nullptr), |
169 | checksum_finalized_(false) { | |
f67539c2 TL |
170 | TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0", |
171 | reinterpret_cast<void*>(max_buffer_size_)); | |
172 | buf_.Alignment(writable_file_->GetRequiredBufferAlignment()); | |
173 | buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_)); | |
174 | #ifndef ROCKSDB_LITE | |
175 | std::for_each(listeners.begin(), listeners.end(), | |
176 | [this](const std::shared_ptr<EventListener>& e) { | |
177 | if (e->ShouldBeNotifiedOnFileIO()) { | |
178 | listeners_.emplace_back(e); | |
179 | } | |
180 | }); | |
181 | #else // !ROCKSDB_LITE | |
182 | (void)listeners; | |
183 | #endif | |
20effc67 TL |
184 | if (file_checksum_gen_factory != nullptr) { |
185 | FileChecksumGenContext checksum_gen_context; | |
186 | checksum_gen_context.file_name = _file_name; | |
187 | checksum_generator_ = | |
188 | file_checksum_gen_factory->CreateFileChecksumGenerator( | |
189 | checksum_gen_context); | |
190 | } | |
f67539c2 TL |
191 | } |
192 | ||
193 | WritableFileWriter(const WritableFileWriter&) = delete; | |
194 | ||
195 | WritableFileWriter& operator=(const WritableFileWriter&) = delete; | |
196 | ||
20effc67 TL |
197 | ~WritableFileWriter() { |
198 | auto s = Close(); | |
199 | s.PermitUncheckedError(); | |
200 | } | |
f67539c2 TL |
201 | |
202 | std::string file_name() const { return file_name_; } | |
203 | ||
20effc67 | 204 | IOStatus Append(const Slice& data); |
f67539c2 | 205 | |
20effc67 | 206 | IOStatus Pad(const size_t pad_bytes); |
f67539c2 | 207 | |
20effc67 | 208 | IOStatus Flush(); |
f67539c2 | 209 | |
20effc67 | 210 | IOStatus Close(); |
f67539c2 | 211 | |
20effc67 | 212 | IOStatus Sync(bool use_fsync); |
f67539c2 TL |
213 | |
214 | // Sync only the data that was already Flush()ed. Safe to call concurrently | |
215 | // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(), | |
216 | // returns NotSupported status. | |
20effc67 | 217 | IOStatus SyncWithoutFlush(bool use_fsync); |
f67539c2 TL |
218 | |
219 | uint64_t GetFileSize() const { return filesize_; } | |
220 | ||
20effc67 | 221 | IOStatus InvalidateCache(size_t offset, size_t length) { |
f67539c2 TL |
222 | return writable_file_->InvalidateCache(offset, length); |
223 | } | |
224 | ||
225 | FSWritableFile* writable_file() const { return writable_file_.get(); } | |
226 | ||
227 | bool use_direct_io() { return writable_file_->use_direct_io(); } | |
228 | ||
229 | bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; } | |
230 | ||
20effc67 TL |
231 | void TEST_SetFileChecksumGenerator( |
232 | FileChecksumGenerator* checksum_generator) { | |
233 | checksum_generator_.reset(checksum_generator); | |
f67539c2 TL |
234 | } |
235 | ||
20effc67 | 236 | std::string GetFileChecksum(); |
f67539c2 TL |
237 | |
238 | const char* GetFileChecksumFuncName() const; | |
239 | ||
240 | private: | |
241 | // Used when os buffering is OFF and we are writing | |
242 | // DMA such as in Direct I/O mode | |
243 | #ifndef ROCKSDB_LITE | |
20effc67 | 244 | IOStatus WriteDirect(); |
f67539c2 TL |
245 | #endif // !ROCKSDB_LITE |
246 | // Normal write | |
20effc67 TL |
247 | IOStatus WriteBuffered(const char* data, size_t size); |
248 | IOStatus RangeSync(uint64_t offset, uint64_t nbytes); | |
249 | IOStatus SyncInternal(bool use_fsync); | |
f67539c2 TL |
250 | }; |
251 | } // namespace ROCKSDB_NAMESPACE |