2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright 2019 ScyllaDB
22 #define __user /* empty */ // for xfs includes, below
24 #include <sys/syscall.h>
26 #include <linux/types.h> // for xfs, below
27 #include <sys/ioctl.h>
28 #include <xfs/linux.h>
29 #define min min /* prevent xfs.h from defining min() as a macro */
32 #include <boost/range/numeric.hpp>
33 #include <boost/range/adaptor/transformed.hpp>
34 #include <seastar/core/reactor.hh>
35 #include <seastar/core/file.hh>
36 #include <seastar/core/report_exception.hh>
37 #include <seastar/core/linux-aio.hh>
38 #include "core/file-impl.hh"
39 #include "core/syscall_result.hh"
40 #include "core/thread_pool.hh"
41 #include "core/uname.hh"
45 using namespace internal
;
46 using namespace internal::linux_abi
;
48 file_handle::file_handle(const file_handle
& x
)
49 : _impl(x
._impl
? x
._impl
->clone() : std::unique_ptr
<file_handle_impl
>()) {
52 file_handle::file_handle(file_handle
&& x
) noexcept
= default;
55 file_handle::operator=(const file_handle
& x
) {
56 return operator=(file_handle(x
));
60 file_handle::operator=(file_handle
&&) noexcept
= default;
63 file_handle::to_file() const & {
64 return file_handle(*this).to_file();
68 file_handle::to_file() && {
69 return file(std::move(*_impl
).to_file());
72 posix_file_impl::posix_file_impl(int fd
, open_flags f
, file_open_options options
, io_queue
* ioq
)
77 query_dma_alignment();
80 posix_file_impl::~posix_file_impl() {
81 if (_refcount
&& _refcount
->fetch_add(-1, std::memory_order_relaxed
) != 1) {
86 // Note: close() can be a blocking operation on NFS
92 posix_file_impl::query_dma_alignment() {
94 auto r
= ioctl(_fd
, XFS_IOC_DIOINFO
, &da
);
96 _memory_dma_alignment
= da
.d_mem
;
97 _disk_read_dma_alignment
= da
.d_miniosz
;
98 // xfs wants at least the block size for writes
99 // FIXME: really read the block size
100 _disk_write_dma_alignment
= std::max
<unsigned>(da
.d_miniosz
, 4096);
104 std::unique_ptr
<seastar::file_handle_impl
>
105 posix_file_impl::dup() {
107 _refcount
= new std::atomic
<unsigned>(1u);
109 auto ret
= std::make_unique
<posix_file_handle_impl
>(_fd
, _open_flags
, _refcount
, _io_queue
);
110 _refcount
->fetch_add(1, std::memory_order_relaxed
);
114 posix_file_impl::posix_file_impl(int fd
, open_flags f
, std::atomic
<unsigned>* refcount
, io_queue
*ioq
)
115 : _refcount(refcount
), _io_queue(ioq
), _open_flags(f
), _fd(fd
) {
119 posix_file_impl::flush(void) {
120 if ((_open_flags
& open_flags::dsync
) != open_flags
{}) {
121 return make_ready_future
<>();
123 return engine().fdatasync(_fd
);
127 posix_file_impl::stat(void) {
128 return engine()._thread_pool
->submit
<syscall_result_extra
<struct stat
>>([this] {
130 auto ret
= ::fstat(_fd
, &st
);
131 return wrap_syscall(ret
, st
);
132 }).then([] (syscall_result_extra
<struct stat
> ret
) {
133 ret
.throw_if_error();
134 return make_ready_future
<struct stat
>(ret
.extra
);
139 posix_file_impl::truncate(uint64_t length
) {
140 return engine()._thread_pool
->submit
<syscall_result
<int>>([this, length
] {
141 return wrap_syscall
<int>(::ftruncate(_fd
, length
));
142 }).then([] (syscall_result
<int> sr
) {
144 return make_ready_future
<>();
149 posix_file_impl::discard(uint64_t offset
, uint64_t length
) {
150 return engine()._thread_pool
->submit
<syscall_result
<int>>([this, offset
, length
] () mutable {
151 return wrap_syscall
<int>(::fallocate(_fd
, FALLOC_FL_PUNCH_HOLE
|FALLOC_FL_KEEP_SIZE
,
153 }).then([] (syscall_result
<int> sr
) {
155 return make_ready_future
<>();
160 posix_file_impl::allocate(uint64_t position
, uint64_t length
) {
161 #ifdef FALLOC_FL_ZERO_RANGE
162 // FALLOC_FL_ZERO_RANGE is fairly new, so don't fail if it's not supported.
163 static bool supported
= true;
165 return make_ready_future
<>();
167 return engine()._thread_pool
->submit
<syscall_result
<int>>([this, position
, length
] () mutable {
168 auto ret
= ::fallocate(_fd
, FALLOC_FL_ZERO_RANGE
|FALLOC_FL_KEEP_SIZE
, position
, length
);
169 if (ret
== -1 && errno
== EOPNOTSUPP
) {
171 supported
= false; // Racy, but harmless. At most we issue an extra call or two.
173 return wrap_syscall
<int>(ret
);
174 }).then([] (syscall_result
<int> sr
) {
176 return make_ready_future
<>();
179 return make_ready_future
<>();
184 posix_file_impl::size() {
185 auto r
= ::lseek(_fd
, 0, SEEK_END
);
187 return make_exception_future
<uint64_t>(std::system_error(errno
, std::system_category()));
189 return make_ready_future
<uint64_t>(r
);
193 posix_file_impl::close() noexcept
{
195 seastar_logger
.warn("double close() detected, contact support");
196 return make_ready_future
<>();
199 _fd
= -1; // Prevent a concurrent close (which is illegal) from closing another file's fd
200 if (_refcount
&& _refcount
->fetch_add(-1, std::memory_order_relaxed
) != 1) {
202 return make_ready_future
<>();
206 auto closed
= [fd
] () noexcept
{
208 return engine()._thread_pool
->submit
<syscall_result
<int>>([fd
] {
209 return wrap_syscall
<int>(::close(fd
));
212 report_exception("Running ::close() in reactor thread, submission failed with exception", std::current_exception());
213 return make_ready_future
<syscall_result
<int>>(wrap_syscall
<int>(::close(fd
)));
216 return closed
.then([] (syscall_result
<int> sr
) {
222 blockdev_file_impl::size(void) {
223 return engine()._thread_pool
->submit
<syscall_result_extra
<size_t>>([this] {
225 int ret
= ::ioctl(_fd
, BLKGETSIZE64
, &size
);
226 return wrap_syscall(ret
, size
);
227 }).then([] (syscall_result_extra
<uint64_t> ret
) {
228 ret
.throw_if_error();
229 return make_ready_future
<uint64_t>(ret
.extra
);
233 subscription
<directory_entry
>
234 posix_file_impl::list_directory(std::function
<future
<> (directory_entry de
)> next
) {
235 static constexpr size_t buffer_size
= 8192;
237 stream
<directory_entry
> s
;
238 unsigned current
= 0;
242 char buffer
[buffer_size
];
245 // While it would be natural to use fdopendir()/readdir(),
246 // our syscall thread pool doesn't support malloc(), which is
247 // required for this to work. So resort to using getdents()
251 struct linux_dirent64
{
252 ino64_t d_ino
; /* 64-bit inode number */
253 off64_t d_off
; /* 64-bit offset to next structure */
254 unsigned short d_reclen
; /* Size of this dirent */
255 unsigned char d_type
; /* File type */
256 char d_name
[]; /* Filename (null-terminated) */
259 auto w
= make_lw_shared
<work
>();
260 auto ret
= w
->s
.listen(std::move(next
));
261 // List the directory asynchronously in the background.
262 // Caller synchronizes using the returned subscription.
263 (void)w
->s
.started().then([w
, this] {
264 auto eofcond
= [w
] { return w
->eof
; };
265 return do_until(eofcond
, [w
, this] {
266 if (w
->current
== w
->total
) {
267 return engine()._thread_pool
->submit
<syscall_result
<long>>([w
, this] () {
268 auto ret
= ::syscall(__NR_getdents64
, _fd
, reinterpret_cast<linux_dirent64
*>(w
->buffer
), buffer_size
);
269 return wrap_syscall(ret
);
270 }).then([w
] (syscall_result
<long> ret
) {
271 ret
.throw_if_error();
272 if (ret
.result
== 0) {
276 w
->total
= ret
.result
;
280 auto start
= w
->buffer
+ w
->current
;
281 auto de
= reinterpret_cast<linux_dirent64
*>(start
);
282 compat::optional
<directory_entry_type
> type
;
283 switch (de
->d_type
) {
285 type
= directory_entry_type::block_device
;
288 type
= directory_entry_type::char_device
;
291 type
= directory_entry_type::directory
;
294 type
= directory_entry_type::fifo
;
297 type
= directory_entry_type::regular
;
300 type
= directory_entry_type::link
;
303 type
= directory_entry_type::socket
;
309 w
->current
+= de
->d_reclen
;
310 sstring name
= de
->d_name
;
311 if (name
== "." || name
== "..") {
312 return make_ready_future
<>();
314 return w
->s
.produce({std::move(name
), type
});
318 }).handle_exception([] (std::exception_ptr ignored
) {});
323 posix_file_impl::write_dma(uint64_t pos
, const void* buffer
, size_t len
, const io_priority_class
& io_priority_class
) {
324 auto req
= internal::io_request::make_write(_fd
, pos
, buffer
, len
);
325 return engine().submit_io_write(_io_queue
, io_priority_class
, len
, std::move(req
));
329 posix_file_impl::write_dma(uint64_t pos
, std::vector
<iovec
> iov
, const io_priority_class
& io_priority_class
) {
330 auto len
= internal::sanitize_iovecs(iov
, _disk_write_dma_alignment
);
331 auto req
= internal::io_request::make_writev(_fd
, pos
, iov
);
332 return engine().submit_io_write(_io_queue
, io_priority_class
, len
, std::move(req
)).finally([iov
= std::move(iov
)] () {});
336 posix_file_impl::read_dma(uint64_t pos
, void* buffer
, size_t len
, const io_priority_class
& io_priority_class
) {
337 auto req
= internal::io_request::make_read(_fd
, pos
, buffer
, len
);
338 return engine().submit_io_read(_io_queue
, io_priority_class
, len
, std::move(req
));
342 posix_file_impl::read_dma(uint64_t pos
, std::vector
<iovec
> iov
, const io_priority_class
& io_priority_class
) {
343 auto len
= internal::sanitize_iovecs(iov
, _disk_read_dma_alignment
);
344 auto req
= internal::io_request::make_readv(_fd
, pos
, iov
);
345 return engine().submit_io_read(_io_queue
, io_priority_class
, len
, std::move(req
)).finally([iov
= std::move(iov
)] () {});
348 future
<temporary_buffer
<uint8_t>>
349 posix_file_impl::dma_read_bulk(uint64_t offset
, size_t range_size
, const io_priority_class
& pc
) {
350 using tmp_buf_type
= typename
file::read_state
<uint8_t>::tmp_buf_type
;
352 auto front
= offset
& (_disk_read_dma_alignment
- 1);
356 auto rstate
= make_lw_shared
<file::read_state
<uint8_t>>(offset
, front
,
358 _memory_dma_alignment
,
359 _disk_read_dma_alignment
);
362 // First, try to read directly into the buffer. Most of the reads will
365 auto read
= read_dma(offset
, rstate
->buf
.get_write(),
366 rstate
->buf
.size(), pc
);
368 return read
.then([rstate
, this, &pc
] (size_t size
) mutable {
372 // If we haven't read all required data at once -
373 // start read-copy sequence. We can't continue with direct reads
374 // into the previously allocated buffer here since we have to ensure
375 // the aligned read length and thus the aligned destination buffer
378 // The copying will actually take place only if there was a HW glitch.
379 // In EOF case or in case of a persistent I/O error the only overhead is
380 // an extra allocation.
383 [rstate
] { return rstate
->done(); },
384 [rstate
, this, &pc
] () mutable {
385 return read_maybe_eof(
386 rstate
->cur_offset(), rstate
->left_to_read(), pc
).then(
387 [rstate
] (auto buf1
) mutable {
389 rstate
->append_new_data(buf1
);
394 return make_ready_future
<>();
396 }).then([rstate
] () mutable {
398 // If we are here we are promised to have read some bytes beyond
399 // "front" so we may trim straight away.
401 rstate
->trim_buf_before_ret();
402 return make_ready_future
<tmp_buf_type
>(std::move(rstate
->buf
));
407 future
<temporary_buffer
<uint8_t>>
408 posix_file_impl::read_maybe_eof(uint64_t pos
, size_t len
, const io_priority_class
& pc
) {
410 // We have to allocate a new aligned buffer to make sure we don't get
411 // an EINVAL error due to unaligned destination buffer.
413 temporary_buffer
<uint8_t> buf
= temporary_buffer
<uint8_t>::aligned(
414 _memory_dma_alignment
, align_up(len
, size_t(_disk_read_dma_alignment
)));
416 // try to read a single bulk from the given position
417 auto dst
= buf
.get_write();
418 auto buf_size
= buf
.size();
419 return read_dma(pos
, dst
, buf_size
, pc
).then_wrapped(
420 [buf
= std::move(buf
)](future
<size_t> f
) mutable {
422 size_t size
= std::get
<0>(f
.get());
426 return std::move(buf
);
427 } catch (std::system_error
& e
) {
429 // TODO: implement a non-trowing file_impl::dma_read() interface to
430 // avoid the exceptions throwing in a good flow completely.
431 // Otherwise for users that don't want to care about the
432 // underlying file size and preventing the attempts to read
433 // bytes beyond EOF there will always be at least one
434 // exception throwing at the file end for files with unaligned
437 if (e
.code().value() == EINVAL
) {
439 return std::move(buf
);
447 blockdev_file_impl::blockdev_file_impl(int fd
, open_flags f
, file_open_options options
, io_queue
*ioq
)
448 : posix_file_impl(fd
, f
, options
, ioq
) {
452 blockdev_file_impl::truncate(uint64_t length
) {
453 return make_ready_future
<>();
457 blockdev_file_impl::discard(uint64_t offset
, uint64_t length
) {
458 return engine()._thread_pool
->submit
<syscall_result
<int>>([this, offset
, length
] () mutable {
459 uint64_t range
[2] { offset
, length
};
460 return wrap_syscall
<int>(::ioctl(_fd
, BLKDISCARD
, &range
));
461 }).then([] (syscall_result
<int> sr
) {
463 return make_ready_future
<>();
468 blockdev_file_impl::allocate(uint64_t position
, uint64_t length
) {
469 // nothing to do for block device
470 return make_ready_future
<>();
473 append_challenged_posix_file_impl::append_challenged_posix_file_impl(int fd
, open_flags f
, file_open_options options
,
474 unsigned max_size_changing_ops
, bool fsync_is_exclusive
, io_queue
* ioq
)
475 : posix_file_impl(fd
, f
, options
, ioq
)
476 , _max_size_changing_ops(max_size_changing_ops
)
477 , _fsync_is_exclusive(fsync_is_exclusive
) {
478 auto r
= ::lseek(fd
, 0, SEEK_END
);
479 throw_system_error_on(r
== -1);
480 _committed_size
= _logical_size
= r
;
481 _sloppy_size
= options
.sloppy_size
;
482 auto hint
= align_up
<uint64_t>(options
.sloppy_size_hint
, _disk_write_dma_alignment
);
483 if (_sloppy_size
&& _committed_size
< hint
) {
484 auto r
= ::ftruncate(_fd
, hint
);
485 // We can ignore errors, since it's just a hint.
487 _committed_size
= hint
;
492 append_challenged_posix_file_impl::~append_challenged_posix_file_impl() {
493 // If the file has not been closed we risk having running tasks
494 // that will try to access freed memory.
495 assert(_closing_state
== state::closed
);
499 append_challenged_posix_file_impl::must_run_alone(const op
& candidate
) const noexcept
{
500 // checks if candidate is a non-write, size-changing operation.
501 return (candidate
.type
== opcode::truncate
)
502 || (candidate
.type
== opcode::flush
&& (_fsync_is_exclusive
|| _sloppy_size
));
506 append_challenged_posix_file_impl::size_changing(const op
& candidate
) const noexcept
{
507 return (candidate
.type
== opcode::write
&& candidate
.pos
+ candidate
.len
> _committed_size
)
508 || must_run_alone(candidate
);
512 append_challenged_posix_file_impl::may_dispatch(const op
& candidate
) const noexcept
{
513 if (size_changing(candidate
)) {
514 return !_current_size_changing_ops
&& !_current_non_size_changing_ops
;
516 return !_current_size_changing_ops
;
521 append_challenged_posix_file_impl::dispatch(op
& candidate
) noexcept
{
522 unsigned* op_counter
= size_changing(candidate
)
523 ? &_current_size_changing_ops
: &_current_non_size_changing_ops
;
525 // FIXME: future is discarded
526 (void)candidate
.run().then([me
= shared_from_this(), op_counter
] {
532 // If we have a bunch of size-extending writes in the queue,
533 // issue an ftruncate() extending the file size, so they can
534 // be issued concurrently.
536 append_challenged_posix_file_impl::optimize_queue() noexcept
{
537 if (_current_non_size_changing_ops
|| _current_size_changing_ops
) {
538 // Can't issue an ftruncate() if something is going on
541 auto speculative_size
= _committed_size
;
542 unsigned n_appending_writes
= 0;
543 for (const auto& op
: _q
) {
544 // stop calculating speculative size after a non-write, size-changing
545 // operation is found to prevent an useless truncate from being issued.
546 if (must_run_alone(op
)) {
549 if (op
.type
== opcode::write
&& op
.pos
+ op
.len
> _committed_size
) {
550 speculative_size
= std::max(speculative_size
, op
.pos
+ op
.len
);
551 ++n_appending_writes
;
554 if (n_appending_writes
> _max_size_changing_ops
555 || (n_appending_writes
&& _sloppy_size
)) {
556 if (_sloppy_size
&& speculative_size
< 2 * _committed_size
) {
557 speculative_size
= align_up
<uint64_t>(2 * _committed_size
, _disk_write_dma_alignment
);
559 // We're all alone, so issuing the ftruncate() in the reactor
560 // thread won't block us.
562 // Issuing it in the syscall thread is too slow; this can happen
563 // every several ops, and the syscall thread latency can be very
565 auto r
= ::ftruncate(_fd
, speculative_size
);
567 _committed_size
= speculative_size
;
568 // If we failed, the next write will pick it up.
574 append_challenged_posix_file_impl::process_queue() noexcept
{
576 while (!_q
.empty() && may_dispatch(_q
.front())) {
577 op candidate
= std::move(_q
.front());
582 _completed
.set_value();
583 _closing_state
= state::closing
; // prevents _completed to be signaled again in case of recursion
588 append_challenged_posix_file_impl::enqueue(op
&& op
) {
589 _q
.push_back(std::move(op
));
594 append_challenged_posix_file_impl::may_quit() const noexcept
{
595 return _closing_state
== state::draining
&& _q
.empty() && !_current_non_size_changing_ops
&&
596 !_current_size_changing_ops
;
600 append_challenged_posix_file_impl::commit_size(uint64_t size
) noexcept
{
601 _committed_size
= std::max(size
, _committed_size
);
602 _logical_size
= std::max(size
, _logical_size
);
606 append_challenged_posix_file_impl::read_dma(uint64_t pos
, void* buffer
, size_t len
, const io_priority_class
& pc
) {
607 if (pos
>= _logical_size
) {
608 // later() avoids tail recursion
609 return later().then([] {
613 len
= std::min(pos
+ len
, align_up
<uint64_t>(_logical_size
, _disk_read_dma_alignment
)) - pos
;
614 auto pr
= make_lw_shared(promise
<size_t>());
619 [this, pr
, pos
, buffer
, len
, &pc
] {
620 return futurize_apply([this, pos
, buffer
, len
, &pc
] () mutable {
621 return posix_file_impl::read_dma(pos
, buffer
, len
, pc
);
622 }).then_wrapped([pr
] (future
<size_t> f
) {
623 f
.forward_to(std::move(*pr
));
627 return pr
->get_future();
631 append_challenged_posix_file_impl::read_dma(uint64_t pos
, std::vector
<iovec
> iov
, const io_priority_class
& pc
) {
632 if (pos
>= _logical_size
) {
633 // later() avoids tail recursion
634 return later().then([] {
639 auto i
= iov
.begin();
640 while (i
!= iov
.end() && pos
+ len
+ i
->iov_len
<= _logical_size
) {
643 auto aligned_logical_size
= align_up
<uint64_t>(_logical_size
, _disk_read_dma_alignment
);
644 if (i
!= iov
.end()) {
645 auto last_len
= pos
+ len
+ i
->iov_len
- aligned_logical_size
;
647 i
++->iov_len
= last_len
;
649 iov
.erase(i
, iov
.end());
651 auto pr
= make_lw_shared(promise
<size_t>());
656 [this, pr
, pos
, iov
= std::move(iov
), &pc
] () mutable {
657 return futurize_apply([this, pos
, iov
= std::move(iov
), &pc
] () mutable {
658 return posix_file_impl::read_dma(pos
, std::move(iov
), pc
);
659 }).then_wrapped([pr
] (future
<size_t> f
) {
660 f
.forward_to(std::move(*pr
));
664 return pr
->get_future();
668 append_challenged_posix_file_impl::write_dma(uint64_t pos
, const void* buffer
, size_t len
, const io_priority_class
& pc
) {
669 auto pr
= make_lw_shared(promise
<size_t>());
674 [this, pr
, pos
, buffer
, len
, &pc
] {
675 return futurize_apply([this, pos
, buffer
, len
, &pc
] () mutable {
676 return posix_file_impl::write_dma(pos
, buffer
, len
, pc
);
677 }).then_wrapped([this, pos
, pr
] (future
<size_t> f
) {
680 commit_size(pos
+ ret
);
681 // Can't use forward_to(), because future::get0() invalidates the future.
684 f
.forward_to(std::move(*pr
));
689 return pr
->get_future();
693 append_challenged_posix_file_impl::write_dma(uint64_t pos
, std::vector
<iovec
> iov
, const io_priority_class
& pc
) {
694 auto pr
= make_lw_shared(promise
<size_t>());
695 auto len
= boost::accumulate(iov
| boost::adaptors::transformed(std::mem_fn(&iovec::iov_len
)), size_t(0));
700 [this, pr
, pos
, iov
= std::move(iov
), &pc
] () mutable {
701 return futurize_apply([this, pos
, iov
= std::move(iov
), &pc
] () mutable {
702 return posix_file_impl::write_dma(pos
, std::move(iov
), pc
);
703 }).then_wrapped([this, pos
, pr
] (future
<size_t> f
) {
706 commit_size(pos
+ ret
);
707 // Can't use forward_to(), because future::get0() invalidates the future.
710 f
.forward_to(std::move(*pr
));
715 return pr
->get_future();
719 append_challenged_posix_file_impl::flush() {
720 if ((!_sloppy_size
|| _logical_size
== _committed_size
) && !_fsync_is_exclusive
) {
721 // FIXME: determine if flush can block concurrent reads or writes
722 return posix_file_impl::flush();
724 auto pr
= make_lw_shared(promise
<>());
730 return futurize_apply([this] {
731 if (_logical_size
!= _committed_size
) {
732 // We're all alone, so can truncate in reactor thread
733 auto r
= ::ftruncate(_fd
, _logical_size
);
734 throw_system_error_on(r
== -1);
735 _committed_size
= _logical_size
;
737 return posix_file_impl::flush();
738 }).then_wrapped([pr
] (future
<> f
) {
739 f
.forward_to(std::move(*pr
));
743 return pr
->get_future();
748 append_challenged_posix_file_impl::stat() {
749 // FIXME: can this conflict with anything?
750 return posix_file_impl::stat().then([this] (struct stat stat
) {
751 stat
.st_size
= _logical_size
;
757 append_challenged_posix_file_impl::truncate(uint64_t length
) {
758 auto pr
= make_lw_shared(promise
<>());
763 [this, pr
, length
] () mutable {
764 return futurize_apply([this, length
] {
765 return posix_file_impl::truncate(length
);
766 }).then_wrapped([this, pr
, length
] (future
<> f
) {
768 _committed_size
= _logical_size
= length
;
770 f
.forward_to(std::move(*pr
));
774 return pr
->get_future();
778 append_challenged_posix_file_impl::size() {
779 return make_ready_future
<size_t>(_logical_size
);
783 append_challenged_posix_file_impl::close() noexcept
{
784 // Caller should have drained all pending I/O
785 _closing_state
= state::draining
;
787 return _completed
.get_future().then([this] {
788 if (_logical_size
!= _committed_size
) {
789 auto r
= ::ftruncate(_fd
, _logical_size
);
791 _committed_size
= _logical_size
;
794 return posix_file_impl::close().finally([this] { _closing_state
= state::closed
; });
798 posix_file_handle_impl::~posix_file_handle_impl() {
799 if (_refcount
&& _refcount
->fetch_add(-1, std::memory_order_relaxed
) == 1) {
805 std::unique_ptr
<seastar::file_handle_impl
>
806 posix_file_handle_impl::clone() const {
807 auto ret
= std::make_unique
<posix_file_handle_impl
>(_fd
, _open_flags
, _refcount
, _io_queue
);
809 _refcount
->fetch_add(1, std::memory_order_relaxed
);
814 shared_ptr
<file_impl
>
815 posix_file_handle_impl::to_file() && {
816 auto ret
= ::seastar::make_shared
<posix_file_impl
>(_fd
, _open_flags
, _refcount
, _io_queue
);
822 // Some kernels can append to xfs filesystems, some cannot; determine
823 // from kernel version.
826 xfs_concurrency_from_kernel_version() {
827 // try to see if this is a mainline kernel with xfs append fixed (3.15+)
828 // or a RHEL kernel with the backported fix (3.10.0-325.el7+)
829 if (kernel_uname().whitelisted({"3.15", "3.10.0-325.el7"})) {
830 // Can append, but not concurrently
833 // Cannot append at all; need ftrucnate().
838 shared_ptr
<file_impl
>
839 make_file_impl(int fd
, file_open_options options
) {
841 auto r
= ::fstat(fd
, &st
);
842 throw_system_error_on(r
== -1);
844 r
= ::ioctl(fd
, BLKGETSIZE
);
845 io_queue
& io_queue
= engine().get_io_queue(st
.st_dev
);
847 // FIXME: obtain these flags from somewhere else
848 auto flags
= ::fcntl(fd
, F_GETFL
);
849 throw_system_error_on(flags
== -1);
852 return make_shared
<blockdev_file_impl
>(fd
, open_flags(flags
), options
, &io_queue
);
854 if ((flags
& O_ACCMODE
) == O_RDONLY
) {
855 return make_shared
<posix_file_impl
>(fd
, open_flags(flags
), options
, &io_queue
);
857 if (S_ISDIR(st
.st_mode
)) {
858 return make_shared
<posix_file_impl
>(fd
, open_flags(flags
), options
, &io_queue
);
860 struct append_support
{
861 bool append_challenged
;
862 unsigned append_concurrency
;
863 bool fsync_is_exclusive
;
865 static thread_local
std::unordered_map
<decltype(st
.st_dev
), append_support
> s_fstype
;
866 if (!s_fstype
.count(st
.st_dev
)) {
868 auto r
= ::fstatfs(fd
, &sfs
);
869 throw_system_error_on(r
== -1);
871 switch (sfs
.f_type
) {
872 case 0x58465342: /* XFS */
873 as
.append_challenged
= true;
874 static auto xc
= xfs_concurrency_from_kernel_version();
875 as
.append_concurrency
= xc
;
876 as
.fsync_is_exclusive
= true;
878 case 0x6969: /* NFS */
879 as
.append_challenged
= false;
880 as
.append_concurrency
= 0;
881 as
.fsync_is_exclusive
= false;
883 case 0xEF53: /* EXT4 */
884 as
.append_challenged
= true;
885 as
.append_concurrency
= 0;
886 as
.fsync_is_exclusive
= false;
889 as
.append_challenged
= true;
890 as
.append_concurrency
= 0;
891 as
.fsync_is_exclusive
= true;
893 s_fstype
[st
.st_dev
] = as
;
895 auto as
= s_fstype
[st
.st_dev
];
896 if (!as
.append_challenged
) {
897 return make_shared
<posix_file_impl
>(fd
, open_flags(flags
), options
, &io_queue
);
899 return make_shared
<append_challenged_posix_file_impl
>(fd
, open_flags(flags
), options
, as
.append_concurrency
, as
.fsync_is_exclusive
, &io_queue
);
903 file::file(int fd
, file_open_options options
)
904 : _file_impl(make_file_impl(fd
, options
)) {
907 file::file(seastar::file_handle
&& handle
)
908 : _file_impl(std::move(std::move(handle
).to_file()._file_impl
)) {
913 return seastar::file_handle(_file_impl
->dup());
916 file_impl
* file_impl::get_file_impl(file
& f
) {
917 return f
._file_impl
.get();
920 std::unique_ptr
<seastar::file_handle_impl
>
922 throw std::runtime_error("this file type cannot be duplicated");