]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/file/writable_file_writer.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / file / writable_file_writer.h
CommitLineData
f67539c2
TL
1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
5//
6// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7// Use of this source code is governed by a BSD-style license that can be
8// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10#pragma once
11#include <atomic>
12#include <string>
20effc67 13
f67539c2 14#include "db/version_edit.h"
20effc67 15#include "env/file_system_tracer.h"
f67539c2
TL
16#include "port/port.h"
17#include "rocksdb/env.h"
18#include "rocksdb/file_checksum.h"
19#include "rocksdb/file_system.h"
20effc67 20#include "rocksdb/io_status.h"
f67539c2
TL
21#include "rocksdb/listener.h"
22#include "rocksdb/rate_limiter.h"
23#include "test_util/sync_point.h"
24#include "util/aligned_buffer.h"
25
26namespace ROCKSDB_NAMESPACE {
27class Statistics;
28
29// WritableFileWriter is a wrapper on top of Env::WritableFile. It provides
30// facilities to:
31// - Handle Buffered and Direct writes.
32// - Rate limit writes.
33// - Flush and Sync the data to the underlying filesystem.
34// - Notify any interested listeners on the completion of a write.
35// - Update IO stats.
36class WritableFileWriter {
37 private:
38#ifndef ROCKSDB_LITE
20effc67
TL
39 void NotifyOnFileWriteFinish(
40 uint64_t offset, size_t length,
41 const FileOperationInfo::StartTimePoint& start_ts,
42 const FileOperationInfo::FinishTimePoint& finish_ts,
43 const IOStatus& io_status) {
44 FileOperationInfo info(FileOperationType::kWrite, file_name_, start_ts,
45 finish_ts, io_status);
f67539c2
TL
46 info.offset = offset;
47 info.length = length;
f67539c2
TL
48
49 for (auto& listener : listeners_) {
50 listener->OnFileWriteFinish(info);
51 }
20effc67
TL
52 info.status.PermitUncheckedError();
53 }
54 void NotifyOnFileFlushFinish(
55 const FileOperationInfo::StartTimePoint& start_ts,
56 const FileOperationInfo::FinishTimePoint& finish_ts,
57 const IOStatus& io_status) {
58 FileOperationInfo info(FileOperationType::kFlush, file_name_, start_ts,
59 finish_ts, io_status);
60
61 for (auto& listener : listeners_) {
62 listener->OnFileFlushFinish(info);
63 }
64 info.status.PermitUncheckedError();
65 }
66 void NotifyOnFileSyncFinish(
67 const FileOperationInfo::StartTimePoint& start_ts,
68 const FileOperationInfo::FinishTimePoint& finish_ts,
69 const IOStatus& io_status,
70 FileOperationType type = FileOperationType::kSync) {
71 FileOperationInfo info(type, file_name_, start_ts, finish_ts, io_status);
72
73 for (auto& listener : listeners_) {
74 listener->OnFileSyncFinish(info);
75 }
76 info.status.PermitUncheckedError();
77 }
78 void NotifyOnFileRangeSyncFinish(
79 uint64_t offset, size_t length,
80 const FileOperationInfo::StartTimePoint& start_ts,
81 const FileOperationInfo::FinishTimePoint& finish_ts,
82 const IOStatus& io_status) {
83 FileOperationInfo info(FileOperationType::kRangeSync, file_name_, start_ts,
84 finish_ts, io_status);
85 info.offset = offset;
86 info.length = length;
87
88 for (auto& listener : listeners_) {
89 listener->OnFileRangeSyncFinish(info);
90 }
91 info.status.PermitUncheckedError();
92 }
93 void NotifyOnFileTruncateFinish(
94 const FileOperationInfo::StartTimePoint& start_ts,
95 const FileOperationInfo::FinishTimePoint& finish_ts,
96 const IOStatus& io_status) {
97 FileOperationInfo info(FileOperationType::kTruncate, file_name_, start_ts,
98 finish_ts, io_status);
99
100 for (auto& listener : listeners_) {
101 listener->OnFileTruncateFinish(info);
102 }
103 info.status.PermitUncheckedError();
104 }
105 void NotifyOnFileCloseFinish(
106 const FileOperationInfo::StartTimePoint& start_ts,
107 const FileOperationInfo::FinishTimePoint& finish_ts,
108 const IOStatus& io_status) {
109 FileOperationInfo info(FileOperationType::kClose, file_name_, start_ts,
110 finish_ts, io_status);
111
112 for (auto& listener : listeners_) {
113 listener->OnFileCloseFinish(info);
114 }
115 info.status.PermitUncheckedError();
f67539c2
TL
116 }
117#endif // ROCKSDB_LITE
118
119 bool ShouldNotifyListeners() const { return !listeners_.empty(); }
20effc67 120 void UpdateFileChecksum(const Slice& data);
f67539c2 121
f67539c2 122 std::string file_name_;
20effc67 123 FSWritableFilePtr writable_file_;
f67539c2
TL
124 Env* env_;
125 AlignedBuffer buf_;
126 size_t max_buffer_size_;
127 // Actually written data size can be used for truncate
128 // not counting padding data
129 uint64_t filesize_;
130#ifndef ROCKSDB_LITE
131 // This is necessary when we use unbuffered access
132 // and writes must happen on aligned offsets
133 // so we need to go back and write that page again
134 uint64_t next_write_offset_;
135#endif // ROCKSDB_LITE
136 bool pending_sync_;
137 uint64_t last_sync_size_;
138 uint64_t bytes_per_sync_;
139 RateLimiter* rate_limiter_;
140 Statistics* stats_;
141 std::vector<std::shared_ptr<EventListener>> listeners_;
20effc67
TL
142 std::unique_ptr<FileChecksumGenerator> checksum_generator_;
143 bool checksum_finalized_;
f67539c2
TL
144
145 public:
146 WritableFileWriter(
147 std::unique_ptr<FSWritableFile>&& file, const std::string& _file_name,
148 const FileOptions& options, Env* env = nullptr,
20effc67 149 const std::shared_ptr<IOTracer>& io_tracer = nullptr,
f67539c2
TL
150 Statistics* stats = nullptr,
151 const std::vector<std::shared_ptr<EventListener>>& listeners = {},
20effc67
TL
152 FileChecksumGenFactory* file_checksum_gen_factory = nullptr)
153 : file_name_(_file_name),
154 writable_file_(std::move(file), io_tracer),
f67539c2
TL
155 env_(env),
156 buf_(),
157 max_buffer_size_(options.writable_file_max_buffer_size),
158 filesize_(0),
159#ifndef ROCKSDB_LITE
160 next_write_offset_(0),
161#endif // ROCKSDB_LITE
162 pending_sync_(false),
163 last_sync_size_(0),
164 bytes_per_sync_(options.bytes_per_sync),
165 rate_limiter_(options.rate_limiter),
166 stats_(stats),
167 listeners_(),
20effc67
TL
168 checksum_generator_(nullptr),
169 checksum_finalized_(false) {
f67539c2
TL
170 TEST_SYNC_POINT_CALLBACK("WritableFileWriter::WritableFileWriter:0",
171 reinterpret_cast<void*>(max_buffer_size_));
172 buf_.Alignment(writable_file_->GetRequiredBufferAlignment());
173 buf_.AllocateNewBuffer(std::min((size_t)65536, max_buffer_size_));
174#ifndef ROCKSDB_LITE
175 std::for_each(listeners.begin(), listeners.end(),
176 [this](const std::shared_ptr<EventListener>& e) {
177 if (e->ShouldBeNotifiedOnFileIO()) {
178 listeners_.emplace_back(e);
179 }
180 });
181#else // !ROCKSDB_LITE
182 (void)listeners;
183#endif
20effc67
TL
184 if (file_checksum_gen_factory != nullptr) {
185 FileChecksumGenContext checksum_gen_context;
186 checksum_gen_context.file_name = _file_name;
187 checksum_generator_ =
188 file_checksum_gen_factory->CreateFileChecksumGenerator(
189 checksum_gen_context);
190 }
f67539c2
TL
191 }
192
193 WritableFileWriter(const WritableFileWriter&) = delete;
194
195 WritableFileWriter& operator=(const WritableFileWriter&) = delete;
196
20effc67
TL
197 ~WritableFileWriter() {
198 auto s = Close();
199 s.PermitUncheckedError();
200 }
f67539c2
TL
201
202 std::string file_name() const { return file_name_; }
203
20effc67 204 IOStatus Append(const Slice& data);
f67539c2 205
20effc67 206 IOStatus Pad(const size_t pad_bytes);
f67539c2 207
20effc67 208 IOStatus Flush();
f67539c2 209
20effc67 210 IOStatus Close();
f67539c2 211
20effc67 212 IOStatus Sync(bool use_fsync);
f67539c2
TL
213
214 // Sync only the data that was already Flush()ed. Safe to call concurrently
215 // with Append() and Flush(). If !writable_file_->IsSyncThreadSafe(),
216 // returns NotSupported status.
20effc67 217 IOStatus SyncWithoutFlush(bool use_fsync);
f67539c2
TL
218
219 uint64_t GetFileSize() const { return filesize_; }
220
20effc67 221 IOStatus InvalidateCache(size_t offset, size_t length) {
f67539c2
TL
222 return writable_file_->InvalidateCache(offset, length);
223 }
224
225 FSWritableFile* writable_file() const { return writable_file_.get(); }
226
227 bool use_direct_io() { return writable_file_->use_direct_io(); }
228
229 bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; }
230
20effc67
TL
231 void TEST_SetFileChecksumGenerator(
232 FileChecksumGenerator* checksum_generator) {
233 checksum_generator_.reset(checksum_generator);
f67539c2
TL
234 }
235
20effc67 236 std::string GetFileChecksum();
f67539c2
TL
237
238 const char* GetFileChecksumFuncName() const;
239
240 private:
241 // Used when os buffering is OFF and we are writing
242 // DMA such as in Direct I/O mode
243#ifndef ROCKSDB_LITE
20effc67 244 IOStatus WriteDirect();
f67539c2
TL
245#endif // !ROCKSDB_LITE
246 // Normal write
20effc67
TL
247 IOStatus WriteBuffered(const char* data, size_t size);
248 IOStatus RangeSync(uint64_t offset, uint64_t nbytes);
249 IOStatus SyncInternal(bool use_fsync);
f67539c2
TL
250};
251} // namespace ROCKSDB_NAMESPACE