]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright 2019 ScyllaDB | |
20 | */ | |
21 | ||
22 | #define __user /* empty */ // for xfs includes, below | |
23 | ||
24 | #include <sys/syscall.h> | |
25 | #include <dirent.h> | |
26 | #include <linux/types.h> // for xfs, below | |
27 | #include <sys/ioctl.h> | |
28 | #include <xfs/linux.h> | |
29 | #define min min /* prevent xfs.h from defining min() as a macro */ | |
30 | #include <xfs/xfs.h> | |
31 | #undef min | |
32 | #include <boost/range/numeric.hpp> | |
33 | #include <boost/range/adaptor/transformed.hpp> | |
34 | #include <seastar/core/reactor.hh> | |
35 | #include <seastar/core/file.hh> | |
36 | #include <seastar/core/report_exception.hh> | |
37 | #include <seastar/core/linux-aio.hh> | |
f67539c2 | 38 | #include <seastar/util/later.hh> |
9f95a23c TL |
39 | #include "core/file-impl.hh" |
40 | #include "core/syscall_result.hh" | |
41 | #include "core/thread_pool.hh" | |
42 | #include "core/uname.hh" | |
43 | ||
44 | namespace seastar { | |
45 | ||
f67539c2 TL |
46 | static_assert(std::is_nothrow_copy_constructible_v<io_priority_class>); |
47 | static_assert(std::is_nothrow_move_constructible_v<io_priority_class>); | |
48 | ||
9f95a23c TL |
49 | using namespace internal; |
50 | using namespace internal::linux_abi; | |
51 | ||
52 | file_handle::file_handle(const file_handle& x) | |
53 | : _impl(x._impl ? x._impl->clone() : std::unique_ptr<file_handle_impl>()) { | |
54 | } | |
55 | ||
56 | file_handle::file_handle(file_handle&& x) noexcept = default; | |
57 | ||
58 | file_handle& | |
59 | file_handle::operator=(const file_handle& x) { | |
60 | return operator=(file_handle(x)); | |
61 | } | |
62 | ||
63 | file_handle& | |
64 | file_handle::operator=(file_handle&&) noexcept = default; | |
65 | ||
66 | file | |
67 | file_handle::to_file() const & { | |
68 | return file_handle(*this).to_file(); | |
69 | } | |
70 | ||
71 | file | |
72 | file_handle::to_file() && { | |
73 | return file(std::move(*_impl).to_file()); | |
74 | } | |
75 | ||
f67539c2 TL |
76 | posix_file_impl::posix_file_impl(int fd, open_flags f, file_open_options options, dev_t device_id) |
77 | : _device_id(device_id) | |
78 | , _io_queue(&(engine().get_io_queue(_device_id))) | |
9f95a23c TL |
79 | , _open_flags(f) |
80 | , _fd(fd) | |
81 | { | |
82 | query_dma_alignment(); | |
83 | } | |
84 | ||
85 | posix_file_impl::~posix_file_impl() { | |
86 | if (_refcount && _refcount->fetch_add(-1, std::memory_order_relaxed) != 1) { | |
87 | return; | |
88 | } | |
89 | delete _refcount; | |
90 | if (_fd != -1) { | |
91 | // Note: close() can be a blocking operation on NFS | |
92 | ::close(_fd); | |
93 | } | |
94 | } | |
95 | ||
96 | void | |
97 | posix_file_impl::query_dma_alignment() { | |
98 | dioattr da; | |
99 | auto r = ioctl(_fd, XFS_IOC_DIOINFO, &da); | |
100 | if (r == 0) { | |
101 | _memory_dma_alignment = da.d_mem; | |
102 | _disk_read_dma_alignment = da.d_miniosz; | |
103 | // xfs wants at least the block size for writes | |
104 | // FIXME: really read the block size | |
105 | _disk_write_dma_alignment = std::max<unsigned>(da.d_miniosz, 4096); | |
106 | } | |
107 | } | |
108 | ||
109 | std::unique_ptr<seastar::file_handle_impl> | |
110 | posix_file_impl::dup() { | |
111 | if (!_refcount) { | |
112 | _refcount = new std::atomic<unsigned>(1u); | |
113 | } | |
f67539c2 | 114 | auto ret = std::make_unique<posix_file_handle_impl>(_fd, _open_flags, _refcount, _device_id); |
9f95a23c TL |
115 | _refcount->fetch_add(1, std::memory_order_relaxed); |
116 | return ret; | |
117 | } | |
118 | ||
f67539c2 TL |
119 | posix_file_impl::posix_file_impl(int fd, open_flags f, std::atomic<unsigned>* refcount, dev_t device_id) |
120 | : _refcount(refcount) | |
121 | , _device_id(device_id) | |
122 | , _io_queue(&(engine().get_io_queue(_device_id))) | |
123 | , _open_flags(f) | |
124 | , _fd(fd) { | |
9f95a23c TL |
125 | } |
126 | ||
127 | future<> | |
f67539c2 | 128 | posix_file_impl::flush(void) noexcept { |
9f95a23c TL |
129 | if ((_open_flags & open_flags::dsync) != open_flags{}) { |
130 | return make_ready_future<>(); | |
131 | } | |
132 | return engine().fdatasync(_fd); | |
133 | } | |
134 | ||
135 | future<struct stat> | |
f67539c2 TL |
136 | posix_file_impl::stat() noexcept { |
137 | return engine().fstat(_fd); | |
9f95a23c TL |
138 | } |
139 | ||
140 | future<> | |
f67539c2 | 141 | posix_file_impl::truncate(uint64_t length) noexcept { |
9f95a23c TL |
142 | return engine()._thread_pool->submit<syscall_result<int>>([this, length] { |
143 | return wrap_syscall<int>(::ftruncate(_fd, length)); | |
144 | }).then([] (syscall_result<int> sr) { | |
145 | sr.throw_if_error(); | |
146 | return make_ready_future<>(); | |
147 | }); | |
148 | } | |
149 | ||
150 | future<> | |
f67539c2 | 151 | posix_file_impl::discard(uint64_t offset, uint64_t length) noexcept { |
9f95a23c TL |
152 | return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable { |
153 | return wrap_syscall<int>(::fallocate(_fd, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, | |
154 | offset, length)); | |
155 | }).then([] (syscall_result<int> sr) { | |
156 | sr.throw_if_error(); | |
157 | return make_ready_future<>(); | |
158 | }); | |
159 | } | |
160 | ||
161 | future<> | |
f67539c2 | 162 | posix_file_impl::allocate(uint64_t position, uint64_t length) noexcept { |
9f95a23c TL |
163 | #ifdef FALLOC_FL_ZERO_RANGE |
164 | // FALLOC_FL_ZERO_RANGE is fairly new, so don't fail if it's not supported. | |
165 | static bool supported = true; | |
166 | if (!supported) { | |
167 | return make_ready_future<>(); | |
168 | } | |
169 | return engine()._thread_pool->submit<syscall_result<int>>([this, position, length] () mutable { | |
170 | auto ret = ::fallocate(_fd, FALLOC_FL_ZERO_RANGE|FALLOC_FL_KEEP_SIZE, position, length); | |
171 | if (ret == -1 && errno == EOPNOTSUPP) { | |
172 | ret = 0; | |
173 | supported = false; // Racy, but harmless. At most we issue an extra call or two. | |
174 | } | |
175 | return wrap_syscall<int>(ret); | |
176 | }).then([] (syscall_result<int> sr) { | |
177 | sr.throw_if_error(); | |
178 | return make_ready_future<>(); | |
179 | }); | |
180 | #else | |
181 | return make_ready_future<>(); | |
182 | #endif | |
183 | } | |
184 | ||
185 | future<uint64_t> | |
f67539c2 | 186 | posix_file_impl::size() noexcept { |
9f95a23c TL |
187 | auto r = ::lseek(_fd, 0, SEEK_END); |
188 | if (r == -1) { | |
189 | return make_exception_future<uint64_t>(std::system_error(errno, std::system_category())); | |
190 | } | |
191 | return make_ready_future<uint64_t>(r); | |
192 | } | |
193 | ||
194 | future<> | |
195 | posix_file_impl::close() noexcept { | |
196 | if (_fd == -1) { | |
197 | seastar_logger.warn("double close() detected, contact support"); | |
198 | return make_ready_future<>(); | |
199 | } | |
200 | auto fd = _fd; | |
201 | _fd = -1; // Prevent a concurrent close (which is illegal) from closing another file's fd | |
202 | if (_refcount && _refcount->fetch_add(-1, std::memory_order_relaxed) != 1) { | |
203 | _refcount = nullptr; | |
204 | return make_ready_future<>(); | |
205 | } | |
206 | delete _refcount; | |
207 | _refcount = nullptr; | |
208 | auto closed = [fd] () noexcept { | |
209 | try { | |
210 | return engine()._thread_pool->submit<syscall_result<int>>([fd] { | |
211 | return wrap_syscall<int>(::close(fd)); | |
212 | }); | |
213 | } catch (...) { | |
214 | report_exception("Running ::close() in reactor thread, submission failed with exception", std::current_exception()); | |
215 | return make_ready_future<syscall_result<int>>(wrap_syscall<int>(::close(fd))); | |
216 | } | |
217 | }(); | |
218 | return closed.then([] (syscall_result<int> sr) { | |
219 | sr.throw_if_error(); | |
220 | }); | |
221 | } | |
222 | ||
223 | future<uint64_t> | |
f67539c2 | 224 | blockdev_file_impl::size(void) noexcept { |
9f95a23c TL |
225 | return engine()._thread_pool->submit<syscall_result_extra<size_t>>([this] { |
226 | uint64_t size; | |
227 | int ret = ::ioctl(_fd, BLKGETSIZE64, &size); | |
228 | return wrap_syscall(ret, size); | |
229 | }).then([] (syscall_result_extra<uint64_t> ret) { | |
230 | ret.throw_if_error(); | |
231 | return make_ready_future<uint64_t>(ret.extra); | |
232 | }); | |
233 | } | |
234 | ||
235 | subscription<directory_entry> | |
236 | posix_file_impl::list_directory(std::function<future<> (directory_entry de)> next) { | |
237 | static constexpr size_t buffer_size = 8192; | |
238 | struct work { | |
239 | stream<directory_entry> s; | |
240 | unsigned current = 0; | |
241 | unsigned total = 0; | |
242 | bool eof = false; | |
243 | int error = 0; | |
244 | char buffer[buffer_size]; | |
245 | }; | |
246 | ||
247 | // While it would be natural to use fdopendir()/readdir(), | |
248 | // our syscall thread pool doesn't support malloc(), which is | |
249 | // required for this to work. So resort to using getdents() | |
250 | // instead. | |
251 | ||
252 | // From getdents(2): | |
253 | struct linux_dirent64 { | |
254 | ino64_t d_ino; /* 64-bit inode number */ | |
255 | off64_t d_off; /* 64-bit offset to next structure */ | |
256 | unsigned short d_reclen; /* Size of this dirent */ | |
257 | unsigned char d_type; /* File type */ | |
258 | char d_name[]; /* Filename (null-terminated) */ | |
259 | }; | |
260 | ||
261 | auto w = make_lw_shared<work>(); | |
262 | auto ret = w->s.listen(std::move(next)); | |
263 | // List the directory asynchronously in the background. | |
264 | // Caller synchronizes using the returned subscription. | |
265 | (void)w->s.started().then([w, this] { | |
266 | auto eofcond = [w] { return w->eof; }; | |
267 | return do_until(eofcond, [w, this] { | |
268 | if (w->current == w->total) { | |
269 | return engine()._thread_pool->submit<syscall_result<long>>([w , this] () { | |
270 | auto ret = ::syscall(__NR_getdents64, _fd, reinterpret_cast<linux_dirent64*>(w->buffer), buffer_size); | |
271 | return wrap_syscall(ret); | |
272 | }).then([w] (syscall_result<long> ret) { | |
273 | ret.throw_if_error(); | |
274 | if (ret.result == 0) { | |
275 | w->eof = true; | |
276 | } else { | |
277 | w->current = 0; | |
278 | w->total = ret.result; | |
279 | } | |
280 | }); | |
281 | } | |
282 | auto start = w->buffer + w->current; | |
283 | auto de = reinterpret_cast<linux_dirent64*>(start); | |
f67539c2 | 284 | std::optional<directory_entry_type> type; |
9f95a23c TL |
285 | switch (de->d_type) { |
286 | case DT_BLK: | |
287 | type = directory_entry_type::block_device; | |
288 | break; | |
289 | case DT_CHR: | |
290 | type = directory_entry_type::char_device; | |
291 | break; | |
292 | case DT_DIR: | |
293 | type = directory_entry_type::directory; | |
294 | break; | |
295 | case DT_FIFO: | |
296 | type = directory_entry_type::fifo; | |
297 | break; | |
298 | case DT_REG: | |
299 | type = directory_entry_type::regular; | |
300 | break; | |
301 | case DT_LNK: | |
302 | type = directory_entry_type::link; | |
303 | break; | |
304 | case DT_SOCK: | |
305 | type = directory_entry_type::socket; | |
306 | break; | |
307 | default: | |
308 | // unknown, ignore | |
309 | ; | |
310 | } | |
311 | w->current += de->d_reclen; | |
312 | sstring name = de->d_name; | |
313 | if (name == "." || name == "..") { | |
314 | return make_ready_future<>(); | |
315 | } | |
316 | return w->s.produce({std::move(name), type}); | |
317 | }); | |
318 | }).then([w] { | |
319 | w->s.close(); | |
320 | }).handle_exception([] (std::exception_ptr ignored) {}); | |
321 | return ret; | |
322 | } | |
323 | ||
324 | future<size_t> | |
f67539c2 | 325 | posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& io_priority_class) noexcept { |
9f95a23c TL |
326 | auto req = internal::io_request::make_write(_fd, pos, buffer, len); |
327 | return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)); | |
328 | } | |
329 | ||
330 | future<size_t> | |
f67539c2 | 331 | posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& io_priority_class) noexcept { |
9f95a23c TL |
332 | auto len = internal::sanitize_iovecs(iov, _disk_write_dma_alignment); |
333 | auto req = internal::io_request::make_writev(_fd, pos, iov); | |
334 | return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).finally([iov = std::move(iov)] () {}); | |
335 | } | |
336 | ||
337 | future<size_t> | |
f67539c2 | 338 | posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& io_priority_class) noexcept { |
9f95a23c TL |
339 | auto req = internal::io_request::make_read(_fd, pos, buffer, len); |
340 | return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)); | |
341 | } | |
342 | ||
343 | future<size_t> | |
f67539c2 | 344 | posix_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& io_priority_class) noexcept { |
9f95a23c TL |
345 | auto len = internal::sanitize_iovecs(iov, _disk_read_dma_alignment); |
346 | auto req = internal::io_request::make_readv(_fd, pos, iov); | |
347 | return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)).finally([iov = std::move(iov)] () {}); | |
348 | } | |
349 | ||
350 | future<temporary_buffer<uint8_t>> | |
f67539c2 | 351 | posix_file_impl::dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class& pc) noexcept { |
9f95a23c TL |
352 | using tmp_buf_type = typename file::read_state<uint8_t>::tmp_buf_type; |
353 | ||
f67539c2 | 354 | try { |
9f95a23c TL |
355 | auto front = offset & (_disk_read_dma_alignment - 1); |
356 | offset -= front; | |
357 | range_size += front; | |
358 | ||
359 | auto rstate = make_lw_shared<file::read_state<uint8_t>>(offset, front, | |
360 | range_size, | |
361 | _memory_dma_alignment, | |
362 | _disk_read_dma_alignment); | |
363 | ||
364 | // | |
365 | // First, try to read directly into the buffer. Most of the reads will | |
366 | // end here. | |
367 | // | |
368 | auto read = read_dma(offset, rstate->buf.get_write(), | |
369 | rstate->buf.size(), pc); | |
370 | ||
371 | return read.then([rstate, this, &pc] (size_t size) mutable { | |
372 | rstate->pos = size; | |
373 | ||
374 | // | |
375 | // If we haven't read all required data at once - | |
376 | // start read-copy sequence. We can't continue with direct reads | |
377 | // into the previously allocated buffer here since we have to ensure | |
378 | // the aligned read length and thus the aligned destination buffer | |
379 | // size. | |
380 | // | |
381 | // The copying will actually take place only if there was a HW glitch. | |
382 | // In EOF case or in case of a persistent I/O error the only overhead is | |
383 | // an extra allocation. | |
384 | // | |
385 | return do_until( | |
386 | [rstate] { return rstate->done(); }, | |
387 | [rstate, this, &pc] () mutable { | |
388 | return read_maybe_eof( | |
389 | rstate->cur_offset(), rstate->left_to_read(), pc).then( | |
390 | [rstate] (auto buf1) mutable { | |
391 | if (buf1.size()) { | |
392 | rstate->append_new_data(buf1); | |
393 | } else { | |
394 | rstate->eof = true; | |
395 | } | |
396 | ||
397 | return make_ready_future<>(); | |
398 | }); | |
399 | }).then([rstate] () mutable { | |
400 | // | |
401 | // If we are here we are promised to have read some bytes beyond | |
402 | // "front" so we may trim straight away. | |
403 | // | |
404 | rstate->trim_buf_before_ret(); | |
405 | return make_ready_future<tmp_buf_type>(std::move(rstate->buf)); | |
406 | }); | |
407 | }); | |
f67539c2 TL |
408 | } catch (...) { |
409 | return make_exception_future<tmp_buf_type>(std::current_exception()); | |
410 | } | |
9f95a23c TL |
411 | } |
412 | ||
413 | future<temporary_buffer<uint8_t>> | |
414 | posix_file_impl::read_maybe_eof(uint64_t pos, size_t len, const io_priority_class& pc) { | |
415 | // | |
416 | // We have to allocate a new aligned buffer to make sure we don't get | |
417 | // an EINVAL error due to unaligned destination buffer. | |
418 | // | |
419 | temporary_buffer<uint8_t> buf = temporary_buffer<uint8_t>::aligned( | |
420 | _memory_dma_alignment, align_up(len, size_t(_disk_read_dma_alignment))); | |
421 | ||
422 | // try to read a single bulk from the given position | |
423 | auto dst = buf.get_write(); | |
424 | auto buf_size = buf.size(); | |
425 | return read_dma(pos, dst, buf_size, pc).then_wrapped( | |
426 | [buf = std::move(buf)](future<size_t> f) mutable { | |
427 | try { | |
f67539c2 | 428 | size_t size = f.get0(); |
9f95a23c TL |
429 | |
430 | buf.trim(size); | |
431 | ||
432 | return std::move(buf); | |
433 | } catch (std::system_error& e) { | |
434 | // | |
435 | // TODO: implement a non-trowing file_impl::dma_read() interface to | |
436 | // avoid the exceptions throwing in a good flow completely. | |
437 | // Otherwise for users that don't want to care about the | |
438 | // underlying file size and preventing the attempts to read | |
439 | // bytes beyond EOF there will always be at least one | |
440 | // exception throwing at the file end for files with unaligned | |
441 | // length. | |
442 | // | |
443 | if (e.code().value() == EINVAL) { | |
444 | buf.trim(0); | |
445 | return std::move(buf); | |
446 | } else { | |
447 | throw; | |
448 | } | |
449 | } | |
450 | }); | |
451 | } | |
452 | ||
f67539c2 TL |
453 | blockdev_file_impl::blockdev_file_impl(int fd, open_flags f, file_open_options options, dev_t device_id) |
454 | : posix_file_impl(fd, f, options, device_id) { | |
9f95a23c TL |
455 | } |
456 | ||
457 | future<> | |
f67539c2 | 458 | blockdev_file_impl::truncate(uint64_t length) noexcept { |
9f95a23c TL |
459 | return make_ready_future<>(); |
460 | } | |
461 | ||
462 | future<> | |
f67539c2 | 463 | blockdev_file_impl::discard(uint64_t offset, uint64_t length) noexcept { |
9f95a23c TL |
464 | return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable { |
465 | uint64_t range[2] { offset, length }; | |
466 | return wrap_syscall<int>(::ioctl(_fd, BLKDISCARD, &range)); | |
467 | }).then([] (syscall_result<int> sr) { | |
468 | sr.throw_if_error(); | |
469 | return make_ready_future<>(); | |
470 | }); | |
471 | } | |
472 | ||
473 | future<> | |
f67539c2 | 474 | blockdev_file_impl::allocate(uint64_t position, uint64_t length) noexcept { |
9f95a23c TL |
475 | // nothing to do for block device |
476 | return make_ready_future<>(); | |
477 | } | |
478 | ||
479 | append_challenged_posix_file_impl::append_challenged_posix_file_impl(int fd, open_flags f, file_open_options options, | |
f67539c2 TL |
480 | unsigned max_size_changing_ops, bool fsync_is_exclusive, dev_t device_id) |
481 | : posix_file_impl(fd, f, options, device_id) | |
9f95a23c TL |
482 | , _max_size_changing_ops(max_size_changing_ops) |
483 | , _fsync_is_exclusive(fsync_is_exclusive) { | |
484 | auto r = ::lseek(fd, 0, SEEK_END); | |
485 | throw_system_error_on(r == -1); | |
486 | _committed_size = _logical_size = r; | |
487 | _sloppy_size = options.sloppy_size; | |
488 | auto hint = align_up<uint64_t>(options.sloppy_size_hint, _disk_write_dma_alignment); | |
489 | if (_sloppy_size && _committed_size < hint) { | |
490 | auto r = ::ftruncate(_fd, hint); | |
491 | // We can ignore errors, since it's just a hint. | |
492 | if (r != -1) { | |
493 | _committed_size = hint; | |
494 | } | |
495 | } | |
496 | } | |
497 | ||
498 | append_challenged_posix_file_impl::~append_challenged_posix_file_impl() { | |
499 | // If the file has not been closed we risk having running tasks | |
500 | // that will try to access freed memory. | |
f67539c2 TL |
501 | // |
502 | // It is safe to destory it if nothing is queued. | |
503 | // Note that posix_file_impl::~posix_file_impl auto-closes the file descriptor. | |
504 | assert(_q.empty() && _logical_size == _committed_size); | |
9f95a23c TL |
505 | } |
506 | ||
507 | bool | |
508 | append_challenged_posix_file_impl::must_run_alone(const op& candidate) const noexcept { | |
509 | // checks if candidate is a non-write, size-changing operation. | |
510 | return (candidate.type == opcode::truncate) | |
511 | || (candidate.type == opcode::flush && (_fsync_is_exclusive || _sloppy_size)); | |
512 | } | |
513 | ||
514 | bool | |
515 | append_challenged_posix_file_impl::size_changing(const op& candidate) const noexcept { | |
516 | return (candidate.type == opcode::write && candidate.pos + candidate.len > _committed_size) | |
517 | || must_run_alone(candidate); | |
518 | } | |
519 | ||
520 | bool | |
521 | append_challenged_posix_file_impl::may_dispatch(const op& candidate) const noexcept { | |
522 | if (size_changing(candidate)) { | |
523 | return !_current_size_changing_ops && !_current_non_size_changing_ops; | |
524 | } else { | |
525 | return !_current_size_changing_ops; | |
526 | } | |
527 | } | |
528 | ||
529 | void | |
530 | append_challenged_posix_file_impl::dispatch(op& candidate) noexcept { | |
531 | unsigned* op_counter = size_changing(candidate) | |
532 | ? &_current_size_changing_ops : &_current_non_size_changing_ops; | |
533 | ++*op_counter; | |
534 | // FIXME: future is discarded | |
535 | (void)candidate.run().then([me = shared_from_this(), op_counter] { | |
536 | --*op_counter; | |
537 | me->process_queue(); | |
538 | }); | |
539 | } | |
540 | ||
541 | // If we have a bunch of size-extending writes in the queue, | |
542 | // issue an ftruncate() extending the file size, so they can | |
543 | // be issued concurrently. | |
544 | void | |
545 | append_challenged_posix_file_impl::optimize_queue() noexcept { | |
546 | if (_current_non_size_changing_ops || _current_size_changing_ops) { | |
547 | // Can't issue an ftruncate() if something is going on | |
548 | return; | |
549 | } | |
550 | auto speculative_size = _committed_size; | |
551 | unsigned n_appending_writes = 0; | |
552 | for (const auto& op : _q) { | |
553 | // stop calculating speculative size after a non-write, size-changing | |
554 | // operation is found to prevent an useless truncate from being issued. | |
555 | if (must_run_alone(op)) { | |
556 | break; | |
557 | } | |
558 | if (op.type == opcode::write && op.pos + op.len > _committed_size) { | |
559 | speculative_size = std::max(speculative_size, op.pos + op.len); | |
560 | ++n_appending_writes; | |
561 | } | |
562 | } | |
563 | if (n_appending_writes > _max_size_changing_ops | |
564 | || (n_appending_writes && _sloppy_size)) { | |
565 | if (_sloppy_size && speculative_size < 2 * _committed_size) { | |
566 | speculative_size = align_up<uint64_t>(2 * _committed_size, _disk_write_dma_alignment); | |
567 | } | |
568 | // We're all alone, so issuing the ftruncate() in the reactor | |
569 | // thread won't block us. | |
570 | // | |
571 | // Issuing it in the syscall thread is too slow; this can happen | |
572 | // every several ops, and the syscall thread latency can be very | |
573 | // high. | |
574 | auto r = ::ftruncate(_fd, speculative_size); | |
575 | if (r != -1) { | |
576 | _committed_size = speculative_size; | |
577 | // If we failed, the next write will pick it up. | |
578 | } | |
579 | } | |
580 | } | |
581 | ||
582 | void | |
583 | append_challenged_posix_file_impl::process_queue() noexcept { | |
584 | optimize_queue(); | |
585 | while (!_q.empty() && may_dispatch(_q.front())) { | |
586 | op candidate = std::move(_q.front()); | |
587 | _q.pop_front(); | |
588 | dispatch(candidate); | |
589 | } | |
590 | if (may_quit()) { | |
591 | _completed.set_value(); | |
592 | _closing_state = state::closing; // prevents _completed to be signaled again in case of recursion | |
593 | } | |
594 | } | |
595 | ||
596 | void | |
f67539c2 | 597 | append_challenged_posix_file_impl::enqueue_op(op&& op) { |
9f95a23c TL |
598 | _q.push_back(std::move(op)); |
599 | process_queue(); | |
600 | } | |
601 | ||
602 | bool | |
603 | append_challenged_posix_file_impl::may_quit() const noexcept { | |
604 | return _closing_state == state::draining && _q.empty() && !_current_non_size_changing_ops && | |
605 | !_current_size_changing_ops; | |
606 | } | |
607 | ||
608 | void | |
609 | append_challenged_posix_file_impl::commit_size(uint64_t size) noexcept { | |
610 | _committed_size = std::max(size, _committed_size); | |
611 | _logical_size = std::max(size, _logical_size); | |
612 | } | |
613 | ||
614 | future<size_t> | |
f67539c2 | 615 | append_challenged_posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& pc) noexcept { |
9f95a23c TL |
616 | if (pos >= _logical_size) { |
617 | // later() avoids tail recursion | |
618 | return later().then([] { | |
619 | return size_t(0); | |
620 | }); | |
621 | } | |
622 | len = std::min(pos + len, align_up<uint64_t>(_logical_size, _disk_read_dma_alignment)) - pos; | |
f67539c2 | 623 | return enqueue<size_t>( |
9f95a23c TL |
624 | opcode::read, |
625 | pos, | |
626 | len, | |
f67539c2 TL |
627 | [this, pos, buffer, len, &pc] () mutable { |
628 | return posix_file_impl::read_dma(pos, buffer, len, pc); | |
9f95a23c | 629 | } |
f67539c2 | 630 | ); |
9f95a23c TL |
631 | } |
632 | ||
633 | future<size_t> | |
f67539c2 | 634 | append_challenged_posix_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) noexcept { |
9f95a23c TL |
635 | if (pos >= _logical_size) { |
636 | // later() avoids tail recursion | |
637 | return later().then([] { | |
638 | return size_t(0); | |
639 | }); | |
640 | } | |
641 | size_t len = 0; | |
642 | auto i = iov.begin(); | |
f67539c2 TL |
643 | auto aligned_logical_size = align_up<uint64_t>(_logical_size, _disk_read_dma_alignment); |
644 | while (i != iov.end() && pos + len + i->iov_len <= aligned_logical_size) { | |
9f95a23c TL |
645 | len += i++->iov_len; |
646 | } | |
9f95a23c TL |
647 | if (i != iov.end()) { |
648 | auto last_len = pos + len + i->iov_len - aligned_logical_size; | |
649 | if (last_len) { | |
650 | i++->iov_len = last_len; | |
651 | } | |
652 | iov.erase(i, iov.end()); | |
653 | } | |
f67539c2 | 654 | return enqueue<size_t>( |
9f95a23c TL |
655 | opcode::read, |
656 | pos, | |
657 | len, | |
f67539c2 TL |
658 | [this, pos, iov = std::move(iov), &pc] () mutable { |
659 | return posix_file_impl::read_dma(pos, std::move(iov), pc); | |
9f95a23c | 660 | } |
f67539c2 | 661 | ); |
9f95a23c TL |
662 | } |
663 | ||
664 | future<size_t> | |
f67539c2 TL |
665 | append_challenged_posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& pc) noexcept { |
666 | return enqueue<size_t>( | |
9f95a23c TL |
667 | opcode::write, |
668 | pos, | |
669 | len, | |
f67539c2 TL |
670 | [this, pos, buffer, len, &pc] { |
671 | return posix_file_impl::write_dma(pos, buffer, len, pc).then([this, pos] (size_t ret) { | |
672 | commit_size(pos + ret); | |
673 | return make_ready_future<size_t>(ret); | |
9f95a23c TL |
674 | }); |
675 | } | |
f67539c2 | 676 | ); |
9f95a23c TL |
677 | } |
678 | ||
679 | future<size_t> | |
f67539c2 | 680 | append_challenged_posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) noexcept { |
9f95a23c | 681 | auto len = boost::accumulate(iov | boost::adaptors::transformed(std::mem_fn(&iovec::iov_len)), size_t(0)); |
f67539c2 | 682 | return enqueue<size_t>( |
9f95a23c TL |
683 | opcode::write, |
684 | pos, | |
685 | len, | |
f67539c2 TL |
686 | [this, pos, iov = std::move(iov), &pc] () mutable { |
687 | return posix_file_impl::write_dma(pos, std::move(iov), pc).then([this, pos] (size_t ret) { | |
688 | commit_size(pos + ret); | |
689 | return make_ready_future<size_t>(ret); | |
9f95a23c TL |
690 | }); |
691 | } | |
f67539c2 | 692 | ); |
9f95a23c TL |
693 | } |
694 | ||
695 | future<> | |
f67539c2 | 696 | append_challenged_posix_file_impl::flush() noexcept { |
9f95a23c TL |
697 | if ((!_sloppy_size || _logical_size == _committed_size) && !_fsync_is_exclusive) { |
698 | // FIXME: determine if flush can block concurrent reads or writes | |
699 | return posix_file_impl::flush(); | |
700 | } else { | |
f67539c2 | 701 | return enqueue<>( |
9f95a23c TL |
702 | opcode::flush, |
703 | 0, | |
704 | 0, | |
f67539c2 TL |
705 | [this] () { |
706 | if (_logical_size != _committed_size) { | |
707 | // We're all alone, so can truncate in reactor thread | |
708 | auto r = ::ftruncate(_fd, _logical_size); | |
709 | if (r == -1) { | |
710 | return make_exception_future<>(std::system_error(errno, std::system_category(), "flush")); | |
9f95a23c | 711 | } |
f67539c2 TL |
712 | _committed_size = _logical_size; |
713 | } | |
714 | return posix_file_impl::flush(); | |
9f95a23c | 715 | } |
f67539c2 | 716 | ); |
9f95a23c TL |
717 | } |
718 | } | |
719 | ||
720 | future<struct stat> | |
f67539c2 | 721 | append_challenged_posix_file_impl::stat() noexcept { |
9f95a23c TL |
722 | // FIXME: can this conflict with anything? |
723 | return posix_file_impl::stat().then([this] (struct stat stat) { | |
724 | stat.st_size = _logical_size; | |
725 | return stat; | |
726 | }); | |
727 | } | |
728 | ||
729 | future<> | |
f67539c2 TL |
730 | append_challenged_posix_file_impl::truncate(uint64_t length) noexcept { |
731 | return enqueue<>( | |
9f95a23c TL |
732 | opcode::truncate, |
733 | length, | |
734 | 0, | |
f67539c2 TL |
735 | [this, length] () mutable { |
736 | return posix_file_impl::truncate(length).then([this, length] () mutable { | |
737 | _committed_size = _logical_size = length; | |
9f95a23c TL |
738 | }); |
739 | } | |
f67539c2 | 740 | ); |
9f95a23c TL |
741 | } |
742 | ||
743 | future<uint64_t> | |
f67539c2 | 744 | append_challenged_posix_file_impl::size() noexcept { |
9f95a23c TL |
745 | return make_ready_future<size_t>(_logical_size); |
746 | } | |
747 | ||
748 | future<> | |
749 | append_challenged_posix_file_impl::close() noexcept { | |
750 | // Caller should have drained all pending I/O | |
751 | _closing_state = state::draining; | |
752 | process_queue(); | |
753 | return _completed.get_future().then([this] { | |
754 | if (_logical_size != _committed_size) { | |
755 | auto r = ::ftruncate(_fd, _logical_size); | |
756 | if (r != -1) { | |
757 | _committed_size = _logical_size; | |
758 | } | |
759 | } | |
760 | return posix_file_impl::close().finally([this] { _closing_state = state::closed; }); | |
761 | }); | |
762 | } | |
763 | ||
764 | posix_file_handle_impl::~posix_file_handle_impl() { | |
765 | if (_refcount && _refcount->fetch_add(-1, std::memory_order_relaxed) == 1) { | |
766 | ::close(_fd); | |
767 | delete _refcount; | |
768 | } | |
769 | } | |
770 | ||
771 | std::unique_ptr<seastar::file_handle_impl> | |
772 | posix_file_handle_impl::clone() const { | |
f67539c2 | 773 | auto ret = std::make_unique<posix_file_handle_impl>(_fd, _open_flags, _refcount, _device_id); |
9f95a23c TL |
774 | if (_refcount) { |
775 | _refcount->fetch_add(1, std::memory_order_relaxed); | |
776 | } | |
777 | return ret; | |
778 | } | |
779 | ||
780 | shared_ptr<file_impl> | |
781 | posix_file_handle_impl::to_file() && { | |
f67539c2 | 782 | auto ret = ::seastar::make_shared<posix_file_impl>(_fd, _open_flags, _refcount, _device_id); |
9f95a23c TL |
783 | _fd = -1; |
784 | _refcount = nullptr; | |
785 | return ret; | |
786 | } | |
787 | ||
788 | // Some kernels can append to xfs filesystems, some cannot; determine | |
789 | // from kernel version. | |
790 | static | |
791 | unsigned | |
792 | xfs_concurrency_from_kernel_version() { | |
793 | // try to see if this is a mainline kernel with xfs append fixed (3.15+) | |
794 | // or a RHEL kernel with the backported fix (3.10.0-325.el7+) | |
795 | if (kernel_uname().whitelisted({"3.15", "3.10.0-325.el7"})) { | |
796 | // Can append, but not concurrently | |
797 | return 1; | |
798 | } | |
799 | // Cannot append at all; need ftrucnate(). | |
800 | return 0; | |
801 | } | |
802 | ||
f67539c2 TL |
803 | future<shared_ptr<file_impl>> |
804 | make_file_impl(int fd, file_open_options options, int flags) noexcept { | |
805 | return engine().fstat(fd).then([fd, options = std::move(options), flags] (struct stat st) mutable { | |
806 | auto st_dev = st.st_dev; | |
9f95a23c | 807 | |
f67539c2 TL |
808 | if (S_ISBLK(st.st_mode)) { |
809 | return make_ready_future<shared_ptr<file_impl>>(make_shared<blockdev_file_impl>(fd, open_flags(flags), options, st_dev)); | |
810 | } else { | |
811 | if ((flags & O_ACCMODE) == O_RDONLY || S_ISDIR(st.st_mode)) { | |
812 | return make_ready_future<shared_ptr<file_impl>>(make_shared<posix_file_impl>(fd, open_flags(flags), options, st_dev)); | |
9f95a23c | 813 | } |
f67539c2 TL |
814 | struct append_support { |
815 | bool append_challenged; | |
816 | unsigned append_concurrency; | |
817 | bool fsync_is_exclusive; | |
818 | }; | |
819 | static thread_local std::unordered_map<decltype(st_dev), append_support> s_fstype; | |
820 | future<> get_append_support = s_fstype.count(st_dev) ? make_ready_future<>() : | |
821 | engine().fstatfs(fd).then([st_dev] (struct statfs sfs) { | |
822 | append_support as; | |
823 | switch (sfs.f_type) { | |
824 | case 0x58465342: /* XFS */ | |
825 | as.append_challenged = true; | |
826 | static auto xc = xfs_concurrency_from_kernel_version(); | |
827 | as.append_concurrency = xc; | |
828 | as.fsync_is_exclusive = true; | |
829 | break; | |
830 | case 0x6969: /* NFS */ | |
831 | as.append_challenged = false; | |
832 | as.append_concurrency = 0; | |
833 | as.fsync_is_exclusive = false; | |
834 | break; | |
835 | case 0xEF53: /* EXT4 */ | |
836 | as.append_challenged = true; | |
837 | as.append_concurrency = 0; | |
838 | as.fsync_is_exclusive = false; | |
839 | break; | |
840 | default: | |
841 | as.append_challenged = true; | |
842 | as.append_concurrency = 0; | |
843 | as.fsync_is_exclusive = true; | |
844 | } | |
845 | s_fstype[st_dev] = as; | |
846 | }); | |
847 | return get_append_support.then([st_dev, fd, flags, options = std::move(options)] () mutable { | |
848 | auto as = s_fstype[st_dev]; | |
849 | if (!as.append_challenged) { | |
850 | return make_ready_future<shared_ptr<file_impl>>(make_shared<posix_file_impl>(fd, open_flags(flags), std::move(options), st_dev)); | |
851 | } | |
852 | return make_ready_future<shared_ptr<file_impl>>(make_shared<append_challenged_posix_file_impl>(fd, open_flags(flags), std::move(options), as.append_concurrency, as.fsync_is_exclusive, st_dev)); | |
853 | }); | |
9f95a23c | 854 | } |
f67539c2 TL |
855 | }); |
856 | } | |
857 | ||
858 | file::file(seastar::file_handle&& handle) noexcept | |
859 | : _file_impl(std::move(std::move(handle).to_file()._file_impl)) { | |
860 | } | |
861 | ||
862 | future<uint64_t> file::size() const noexcept { | |
863 | try { | |
864 | return _file_impl->size(); | |
865 | } catch (...) { | |
866 | return current_exception_as_future<uint64_t>(); | |
867 | } | |
868 | } | |
869 | ||
870 | future<> file::close() noexcept { | |
871 | return do_with(shared_ptr<file_impl>(_file_impl), [](shared_ptr<file_impl>& f) { | |
872 | return f->close(); | |
873 | }); | |
874 | } | |
875 | ||
876 | subscription<directory_entry> | |
877 | file::list_directory(std::function<future<>(directory_entry de)> next) { | |
878 | return _file_impl->list_directory(std::move(next)); | |
879 | } | |
880 | ||
881 | future<temporary_buffer<uint8_t>> | |
882 | file::dma_read_bulk_impl(uint64_t offset, size_t range_size, const io_priority_class& pc) noexcept { | |
883 | try { | |
884 | return _file_impl->dma_read_bulk(offset, range_size, pc); | |
885 | } catch (...) { | |
886 | return current_exception_as_future<temporary_buffer<uint8_t>>(); | |
887 | } | |
888 | } | |
889 | ||
890 | future<> file::discard(uint64_t offset, uint64_t length) noexcept { | |
891 | try { | |
892 | return _file_impl->discard(offset, length); | |
893 | } catch (...) { | |
894 | return current_exception_as_future(); | |
895 | } | |
896 | } | |
897 | ||
898 | future<> file::allocate(uint64_t position, uint64_t length) noexcept { | |
899 | try { | |
900 | return _file_impl->allocate(position, length); | |
901 | } catch (...) { | |
902 | return current_exception_as_future(); | |
903 | } | |
904 | } | |
905 | ||
906 | future<> file::truncate(uint64_t length) noexcept { | |
907 | try { | |
908 | return _file_impl->truncate(length); | |
909 | } catch (...) { | |
910 | return current_exception_as_future(); | |
911 | } | |
912 | } | |
913 | ||
914 | future<struct stat> file::stat() noexcept { | |
915 | try { | |
916 | return _file_impl->stat(); | |
917 | } catch (...) { | |
918 | return current_exception_as_future<struct stat>(); | |
919 | } | |
920 | } | |
921 | ||
922 | future<> file::flush() noexcept { | |
923 | try { | |
924 | return _file_impl->flush(); | |
925 | } catch (...) { | |
926 | return current_exception_as_future(); | |
927 | } | |
928 | } | |
929 | ||
930 | future<size_t> file::dma_write(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) noexcept { | |
931 | try { | |
932 | return _file_impl->write_dma(pos, std::move(iov), pc); | |
933 | } catch (...) { | |
934 | return current_exception_as_future<size_t>(); | |
935 | } | |
936 | } | |
937 | ||
938 | future<size_t> | |
939 | file::dma_write_impl(uint64_t pos, const uint8_t* buffer, size_t len, const io_priority_class& pc) noexcept { | |
940 | try { | |
941 | return _file_impl->write_dma(pos, buffer, len, pc); | |
942 | } catch (...) { | |
943 | return current_exception_as_future<size_t>(); | |
944 | } | |
945 | } | |
946 | ||
947 | future<size_t> file::dma_read(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) noexcept { | |
948 | try { | |
949 | return _file_impl->read_dma(pos, std::move(iov), pc); | |
950 | } catch (...) { | |
951 | return current_exception_as_future<size_t>(); | |
952 | } | |
953 | } | |
954 | ||
955 | future<temporary_buffer<uint8_t>> | |
956 | file::dma_read_exactly_impl(uint64_t pos, size_t len, const io_priority_class& pc) noexcept { | |
957 | return dma_read<uint8_t>(pos, len, pc).then([len](auto buf) { | |
958 | if (buf.size() < len) { | |
959 | throw eof_error(); | |
9f95a23c | 960 | } |
f67539c2 TL |
961 | |
962 | return buf; | |
963 | }); | |
9f95a23c TL |
964 | } |
965 | ||
f67539c2 TL |
966 | future<temporary_buffer<uint8_t>> |
967 | file::dma_read_impl(uint64_t pos, size_t len, const io_priority_class& pc) noexcept { | |
968 | return dma_read_bulk<uint8_t>(pos, len, pc).then([len](temporary_buffer<uint8_t> buf) { | |
969 | if (len < buf.size()) { | |
970 | buf.trim(len); | |
971 | } | |
972 | ||
973 | return buf; | |
974 | }); | |
9f95a23c TL |
975 | } |
976 | ||
f67539c2 TL |
977 | future<size_t> |
978 | file::dma_read_impl(uint64_t aligned_pos, uint8_t* aligned_buffer, size_t aligned_len, const io_priority_class& pc) noexcept { | |
979 | try { | |
980 | return _file_impl->read_dma(aligned_pos, aligned_buffer, aligned_len, pc); | |
981 | } catch (...) { | |
982 | return current_exception_as_future<size_t>(); | |
983 | } | |
9f95a23c TL |
984 | } |
985 | ||
986 | seastar::file_handle | |
987 | file::dup() { | |
988 | return seastar::file_handle(_file_impl->dup()); | |
989 | } | |
990 | ||
991 | file_impl* file_impl::get_file_impl(file& f) { | |
992 | return f._file_impl.get(); | |
993 | } | |
994 | ||
995 | std::unique_ptr<seastar::file_handle_impl> | |
996 | file_impl::dup() { | |
997 | throw std::runtime_error("this file type cannot be duplicated"); | |
998 | } | |
999 | ||
f67539c2 TL |
1000 | future<file> open_file_dma(std::string_view name, open_flags flags) noexcept { |
1001 | return engine().open_file_dma(name, flags, file_open_options()); | |
1002 | } | |
1003 | ||
1004 | future<file> open_file_dma(std::string_view name, open_flags flags, file_open_options options) noexcept { | |
1005 | return engine().open_file_dma(name, flags, std::move(options)); | |
1006 | } | |
1007 | ||
1008 | future<file> open_directory(std::string_view name) noexcept { | |
1009 | return engine().open_directory(name); | |
1010 | } | |
1011 | ||
9f95a23c | 1012 | } |