]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/file-impl.hh
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / core / file-impl.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2016 ScyllaDB
20 */
21
22 #pragma once
23
24 #include <seastar/core/file.hh>
25 #include <seastar/core/shared_ptr.hh>
26
27 #include <deque>
28 #include <atomic>
29
30 namespace seastar {
31 class io_queue;
32
33 namespace internal {
34
35 // Given a properly aligned vector of iovecs, ensures that it respects the
36 // IOV_MAX limit, by trimming if necessary. The modified vector still satisfied
37 // the alignment requirements.
38 // Returns the final total length of all iovecs.
39 size_t sanitize_iovecs(std::vector<iovec>& iov, size_t disk_alignment) noexcept;
40
41 }
42
43 class posix_file_handle_impl : public seastar::file_handle_impl {
44 int _fd;
45 std::atomic<unsigned>* _refcount;
46 dev_t _device_id;
47 open_flags _open_flags;
48 public:
49 posix_file_handle_impl(int fd, open_flags f, std::atomic<unsigned>* refcount, dev_t device_id)
50 : _fd(fd), _refcount(refcount), _device_id(device_id), _open_flags(f) {
51 }
52 virtual ~posix_file_handle_impl();
53 posix_file_handle_impl(const posix_file_handle_impl&) = delete;
54 posix_file_handle_impl(posix_file_handle_impl&&) = delete;
55 virtual shared_ptr<file_impl> to_file() && override;
56 virtual std::unique_ptr<seastar::file_handle_impl> clone() const override;
57 };
58
59 class posix_file_impl : public file_impl {
60 std::atomic<unsigned>* _refcount = nullptr;
61 dev_t _device_id;
62 io_queue* _io_queue;
63 open_flags _open_flags;
64 public:
65 int _fd;
66 posix_file_impl(int fd, open_flags, file_open_options options, dev_t device_id);
67 posix_file_impl(int fd, open_flags, std::atomic<unsigned>* refcount, dev_t device_id);
68 virtual ~posix_file_impl() override;
69 future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& pc) noexcept override;
70 future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) noexcept override;
71 future<size_t> read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& pc) noexcept override;
72 future<size_t> read_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) noexcept override;
73 future<> flush(void) noexcept override;
74 future<struct stat> stat(void) noexcept override;
75 future<> truncate(uint64_t length) noexcept override;
76 future<> discard(uint64_t offset, uint64_t length) noexcept override;
77 virtual future<> allocate(uint64_t position, uint64_t length) noexcept override;
78 future<uint64_t> size() noexcept override;
79 virtual future<> close() noexcept override;
80 virtual std::unique_ptr<seastar::file_handle_impl> dup() override;
81 virtual subscription<directory_entry> list_directory(std::function<future<> (directory_entry de)> next) override;
82 virtual future<temporary_buffer<uint8_t>> dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class& pc) noexcept override;
83
84 open_flags flags() const {
85 return _open_flags;
86 }
87 private:
88 void query_dma_alignment();
89
90 /**
91 * Try to read from the given position where the previous short read has
92 * stopped. Check the EOF condition.
93 *
94 * The below code assumes the following: short reads due to I/O errors
95 * always end at address aligned to HW block boundary. Therefore if we issue
96 * a new read operation from the next position we are promised to get an
97 * error (different from EINVAL). If we've got a short read because we have
98 * reached EOF then the above read would either return a zero-length success
99 * (if the file size is aligned to HW block size) or an EINVAL error (if
100 * file length is not aligned to HW block size).
101 *
102 * @param pos offset to read from
103 * @param len number of bytes to read
104 * @param pc the IO priority class under which to queue this operation
105 *
106 * @return temporary buffer with read data or zero-sized temporary buffer if
107 * pos is at or beyond EOF.
108 * @throw appropriate exception in case of I/O error.
109 */
110 future<temporary_buffer<uint8_t>>
111 read_maybe_eof(uint64_t pos, size_t len, const io_priority_class& pc);
112 };
113
114 // The Linux XFS implementation is challenged wrt. append: a write that changes
115 // eof will be blocked by any other concurrent AIO operation to the same file, whether
116 // it changes file size or not. Furthermore, ftruncate() will also block and be blocked
117 // by AIO, so attempts to game the system and call ftruncate() have to be done very carefully.
118 //
119 // Other Linux filesystems may have different locking rules, so this may need to be
120 // adjusted for them.
121 class append_challenged_posix_file_impl : public posix_file_impl, public enable_shared_from_this<append_challenged_posix_file_impl> {
122 // File size as a result of completed kernel operations (writes and truncates)
123 uint64_t _committed_size;
124 // File size as a result of seastar API calls
125 uint64_t _logical_size;
126 // Pending operations
127 enum class opcode {
128 invalid,
129 read,
130 write,
131 truncate,
132 flush,
133 };
134 struct op {
135 opcode type;
136 uint64_t pos;
137 size_t len;
138 std::function<future<> ()> run;
139 };
140 // Queue of pending operations; processed from front to end to avoid
141 // starvation, but can issue concurrent operations.
142 std::deque<op> _q;
143 unsigned _max_size_changing_ops = 0;
144 unsigned _current_non_size_changing_ops = 0;
145 unsigned _current_size_changing_ops = 0;
146 bool _fsync_is_exclusive = true;
147
148 // Set when the user is closing the file
149 enum class state { open, draining, closing, closed };
150 state _closing_state = state::open;
151
152 bool _sloppy_size = false;
153 // Fulfiled when _done and I/O is complete
154 promise<> _completed;
155 private:
156 void commit_size(uint64_t size) noexcept;
157 bool must_run_alone(const op& candidate) const noexcept;
158 bool size_changing(const op& candidate) const noexcept;
159 bool may_dispatch(const op& candidate) const noexcept;
160 void dispatch(op& candidate) noexcept;
161 void optimize_queue() noexcept;
162 void process_queue() noexcept;
163 bool may_quit() const noexcept;
164 void enqueue_op(op&& op);
165 template <typename... T, typename Func>
166 future<T...> enqueue(opcode type, uint64_t pos, size_t len, Func&& func) noexcept {
167 try {
168 auto pr = make_lw_shared(promise<T...>());
169 auto fut = pr->get_future();
170 auto op_func = [func = std::move(func), pr = std::move(pr)] () mutable {
171 return futurize_invoke(std::move(func)).then_wrapped([pr = std::move(pr)] (future<T...> f) mutable {
172 f.forward_to(std::move(*pr));
173 });
174 };
175 enqueue_op({type, pos, len, op_func});
176 return fut;
177 } catch (...) {
178 return make_exception_future<T...>(std::current_exception());
179 }
180 }
181 public:
182 append_challenged_posix_file_impl(int fd, open_flags, file_open_options options, unsigned max_size_changing_ops, bool fsync_is_exclusive, dev_t device_id);
183 ~append_challenged_posix_file_impl() override;
184 future<size_t> read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& pc) noexcept override;
185 future<size_t> read_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) noexcept override;
186 future<size_t> write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& pc) noexcept override;
187 future<size_t> write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) noexcept override;
188 future<> flush() noexcept override;
189 future<struct stat> stat() noexcept override;
190 future<> truncate(uint64_t length) noexcept override;
191 future<uint64_t> size() noexcept override;
192 future<> close() noexcept override;
193 };
194
195 class blockdev_file_impl : public posix_file_impl {
196 public:
197 blockdev_file_impl(int fd, open_flags, file_open_options options, dev_t device_id);
198 future<> truncate(uint64_t length) noexcept override;
199 future<> discard(uint64_t offset, uint64_t length) noexcept override;
200 future<uint64_t> size() noexcept override;
201 virtual future<> allocate(uint64_t position, uint64_t length) noexcept override;
202 };
203
204 }