]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/src/core/file.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / core / file.cc
CommitLineData
9f95a23c
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright 2019 ScyllaDB
20 */
21
22#define __user /* empty */ // for xfs includes, below
23
24#include <sys/syscall.h>
25#include <dirent.h>
26#include <linux/types.h> // for xfs, below
27#include <sys/ioctl.h>
28#include <xfs/linux.h>
29#define min min /* prevent xfs.h from defining min() as a macro */
30#include <xfs/xfs.h>
31#undef min
32#include <boost/range/numeric.hpp>
33#include <boost/range/adaptor/transformed.hpp>
34#include <seastar/core/reactor.hh>
35#include <seastar/core/file.hh>
36#include <seastar/core/report_exception.hh>
37#include <seastar/core/linux-aio.hh>
f67539c2 38#include <seastar/util/later.hh>
9f95a23c
TL
39#include "core/file-impl.hh"
40#include "core/syscall_result.hh"
41#include "core/thread_pool.hh"
42#include "core/uname.hh"
43
44namespace seastar {
45
f67539c2
TL
46static_assert(std::is_nothrow_copy_constructible_v<io_priority_class>);
47static_assert(std::is_nothrow_move_constructible_v<io_priority_class>);
48
9f95a23c
TL
49using namespace internal;
50using namespace internal::linux_abi;
51
52file_handle::file_handle(const file_handle& x)
53 : _impl(x._impl ? x._impl->clone() : std::unique_ptr<file_handle_impl>()) {
54}
55
56file_handle::file_handle(file_handle&& x) noexcept = default;
57
58file_handle&
59file_handle::operator=(const file_handle& x) {
60 return operator=(file_handle(x));
61}
62
63file_handle&
64file_handle::operator=(file_handle&&) noexcept = default;
65
66file
67file_handle::to_file() const & {
68 return file_handle(*this).to_file();
69}
70
71file
72file_handle::to_file() && {
73 return file(std::move(*_impl).to_file());
74}
75
f67539c2
TL
76posix_file_impl::posix_file_impl(int fd, open_flags f, file_open_options options, dev_t device_id)
77 : _device_id(device_id)
78 , _io_queue(&(engine().get_io_queue(_device_id)))
9f95a23c
TL
79 , _open_flags(f)
80 , _fd(fd)
81{
82 query_dma_alignment();
83}
84
85posix_file_impl::~posix_file_impl() {
86 if (_refcount && _refcount->fetch_add(-1, std::memory_order_relaxed) != 1) {
87 return;
88 }
89 delete _refcount;
90 if (_fd != -1) {
91 // Note: close() can be a blocking operation on NFS
92 ::close(_fd);
93 }
94}
95
96void
97posix_file_impl::query_dma_alignment() {
98 dioattr da;
99 auto r = ioctl(_fd, XFS_IOC_DIOINFO, &da);
100 if (r == 0) {
101 _memory_dma_alignment = da.d_mem;
102 _disk_read_dma_alignment = da.d_miniosz;
103 // xfs wants at least the block size for writes
104 // FIXME: really read the block size
105 _disk_write_dma_alignment = std::max<unsigned>(da.d_miniosz, 4096);
106 }
107}
108
109std::unique_ptr<seastar::file_handle_impl>
110posix_file_impl::dup() {
111 if (!_refcount) {
112 _refcount = new std::atomic<unsigned>(1u);
113 }
f67539c2 114 auto ret = std::make_unique<posix_file_handle_impl>(_fd, _open_flags, _refcount, _device_id);
9f95a23c
TL
115 _refcount->fetch_add(1, std::memory_order_relaxed);
116 return ret;
117}
118
f67539c2
TL
119posix_file_impl::posix_file_impl(int fd, open_flags f, std::atomic<unsigned>* refcount, dev_t device_id)
120 : _refcount(refcount)
121 , _device_id(device_id)
122 , _io_queue(&(engine().get_io_queue(_device_id)))
123 , _open_flags(f)
124 , _fd(fd) {
9f95a23c
TL
125}
126
127future<>
f67539c2 128posix_file_impl::flush(void) noexcept {
9f95a23c
TL
129 if ((_open_flags & open_flags::dsync) != open_flags{}) {
130 return make_ready_future<>();
131 }
132 return engine().fdatasync(_fd);
133}
134
135future<struct stat>
f67539c2
TL
136posix_file_impl::stat() noexcept {
137 return engine().fstat(_fd);
9f95a23c
TL
138}
139
140future<>
f67539c2 141posix_file_impl::truncate(uint64_t length) noexcept {
9f95a23c
TL
142 return engine()._thread_pool->submit<syscall_result<int>>([this, length] {
143 return wrap_syscall<int>(::ftruncate(_fd, length));
144 }).then([] (syscall_result<int> sr) {
145 sr.throw_if_error();
146 return make_ready_future<>();
147 });
148}
149
150future<>
f67539c2 151posix_file_impl::discard(uint64_t offset, uint64_t length) noexcept {
9f95a23c
TL
152 return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable {
153 return wrap_syscall<int>(::fallocate(_fd, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE,
154 offset, length));
155 }).then([] (syscall_result<int> sr) {
156 sr.throw_if_error();
157 return make_ready_future<>();
158 });
159}
160
161future<>
f67539c2 162posix_file_impl::allocate(uint64_t position, uint64_t length) noexcept {
9f95a23c
TL
163#ifdef FALLOC_FL_ZERO_RANGE
164 // FALLOC_FL_ZERO_RANGE is fairly new, so don't fail if it's not supported.
165 static bool supported = true;
166 if (!supported) {
167 return make_ready_future<>();
168 }
169 return engine()._thread_pool->submit<syscall_result<int>>([this, position, length] () mutable {
170 auto ret = ::fallocate(_fd, FALLOC_FL_ZERO_RANGE|FALLOC_FL_KEEP_SIZE, position, length);
171 if (ret == -1 && errno == EOPNOTSUPP) {
172 ret = 0;
173 supported = false; // Racy, but harmless. At most we issue an extra call or two.
174 }
175 return wrap_syscall<int>(ret);
176 }).then([] (syscall_result<int> sr) {
177 sr.throw_if_error();
178 return make_ready_future<>();
179 });
180#else
181 return make_ready_future<>();
182#endif
183}
184
185future<uint64_t>
f67539c2 186posix_file_impl::size() noexcept {
9f95a23c
TL
187 auto r = ::lseek(_fd, 0, SEEK_END);
188 if (r == -1) {
189 return make_exception_future<uint64_t>(std::system_error(errno, std::system_category()));
190 }
191 return make_ready_future<uint64_t>(r);
192}
193
194future<>
195posix_file_impl::close() noexcept {
196 if (_fd == -1) {
197 seastar_logger.warn("double close() detected, contact support");
198 return make_ready_future<>();
199 }
200 auto fd = _fd;
201 _fd = -1; // Prevent a concurrent close (which is illegal) from closing another file's fd
202 if (_refcount && _refcount->fetch_add(-1, std::memory_order_relaxed) != 1) {
203 _refcount = nullptr;
204 return make_ready_future<>();
205 }
206 delete _refcount;
207 _refcount = nullptr;
208 auto closed = [fd] () noexcept {
209 try {
210 return engine()._thread_pool->submit<syscall_result<int>>([fd] {
211 return wrap_syscall<int>(::close(fd));
212 });
213 } catch (...) {
214 report_exception("Running ::close() in reactor thread, submission failed with exception", std::current_exception());
215 return make_ready_future<syscall_result<int>>(wrap_syscall<int>(::close(fd)));
216 }
217 }();
218 return closed.then([] (syscall_result<int> sr) {
219 sr.throw_if_error();
220 });
221}
222
223future<uint64_t>
f67539c2 224blockdev_file_impl::size(void) noexcept {
9f95a23c
TL
225 return engine()._thread_pool->submit<syscall_result_extra<size_t>>([this] {
226 uint64_t size;
227 int ret = ::ioctl(_fd, BLKGETSIZE64, &size);
228 return wrap_syscall(ret, size);
229 }).then([] (syscall_result_extra<uint64_t> ret) {
230 ret.throw_if_error();
231 return make_ready_future<uint64_t>(ret.extra);
232 });
233}
234
235subscription<directory_entry>
236posix_file_impl::list_directory(std::function<future<> (directory_entry de)> next) {
237 static constexpr size_t buffer_size = 8192;
238 struct work {
239 stream<directory_entry> s;
240 unsigned current = 0;
241 unsigned total = 0;
242 bool eof = false;
243 int error = 0;
244 char buffer[buffer_size];
245 };
246
247 // While it would be natural to use fdopendir()/readdir(),
248 // our syscall thread pool doesn't support malloc(), which is
249 // required for this to work. So resort to using getdents()
250 // instead.
251
252 // From getdents(2):
253 struct linux_dirent64 {
254 ino64_t d_ino; /* 64-bit inode number */
255 off64_t d_off; /* 64-bit offset to next structure */
256 unsigned short d_reclen; /* Size of this dirent */
257 unsigned char d_type; /* File type */
258 char d_name[]; /* Filename (null-terminated) */
259 };
260
261 auto w = make_lw_shared<work>();
262 auto ret = w->s.listen(std::move(next));
263 // List the directory asynchronously in the background.
264 // Caller synchronizes using the returned subscription.
265 (void)w->s.started().then([w, this] {
266 auto eofcond = [w] { return w->eof; };
267 return do_until(eofcond, [w, this] {
268 if (w->current == w->total) {
269 return engine()._thread_pool->submit<syscall_result<long>>([w , this] () {
270 auto ret = ::syscall(__NR_getdents64, _fd, reinterpret_cast<linux_dirent64*>(w->buffer), buffer_size);
271 return wrap_syscall(ret);
272 }).then([w] (syscall_result<long> ret) {
273 ret.throw_if_error();
274 if (ret.result == 0) {
275 w->eof = true;
276 } else {
277 w->current = 0;
278 w->total = ret.result;
279 }
280 });
281 }
282 auto start = w->buffer + w->current;
283 auto de = reinterpret_cast<linux_dirent64*>(start);
f67539c2 284 std::optional<directory_entry_type> type;
9f95a23c
TL
285 switch (de->d_type) {
286 case DT_BLK:
287 type = directory_entry_type::block_device;
288 break;
289 case DT_CHR:
290 type = directory_entry_type::char_device;
291 break;
292 case DT_DIR:
293 type = directory_entry_type::directory;
294 break;
295 case DT_FIFO:
296 type = directory_entry_type::fifo;
297 break;
298 case DT_REG:
299 type = directory_entry_type::regular;
300 break;
301 case DT_LNK:
302 type = directory_entry_type::link;
303 break;
304 case DT_SOCK:
305 type = directory_entry_type::socket;
306 break;
307 default:
308 // unknown, ignore
309 ;
310 }
311 w->current += de->d_reclen;
312 sstring name = de->d_name;
313 if (name == "." || name == "..") {
314 return make_ready_future<>();
315 }
316 return w->s.produce({std::move(name), type});
317 });
318 }).then([w] {
319 w->s.close();
320 }).handle_exception([] (std::exception_ptr ignored) {});
321 return ret;
322}
323
324future<size_t>
f67539c2 325posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& io_priority_class) noexcept {
9f95a23c
TL
326 auto req = internal::io_request::make_write(_fd, pos, buffer, len);
327 return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req));
328}
329
330future<size_t>
f67539c2 331posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& io_priority_class) noexcept {
9f95a23c
TL
332 auto len = internal::sanitize_iovecs(iov, _disk_write_dma_alignment);
333 auto req = internal::io_request::make_writev(_fd, pos, iov);
334 return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).finally([iov = std::move(iov)] () {});
335}
336
337future<size_t>
f67539c2 338posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& io_priority_class) noexcept {
9f95a23c
TL
339 auto req = internal::io_request::make_read(_fd, pos, buffer, len);
340 return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req));
341}
342
343future<size_t>
f67539c2 344posix_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& io_priority_class) noexcept {
9f95a23c
TL
345 auto len = internal::sanitize_iovecs(iov, _disk_read_dma_alignment);
346 auto req = internal::io_request::make_readv(_fd, pos, iov);
347 return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)).finally([iov = std::move(iov)] () {});
348}
349
350future<temporary_buffer<uint8_t>>
f67539c2 351posix_file_impl::dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class& pc) noexcept {
9f95a23c
TL
352 using tmp_buf_type = typename file::read_state<uint8_t>::tmp_buf_type;
353
f67539c2 354 try {
9f95a23c
TL
355 auto front = offset & (_disk_read_dma_alignment - 1);
356 offset -= front;
357 range_size += front;
358
359 auto rstate = make_lw_shared<file::read_state<uint8_t>>(offset, front,
360 range_size,
361 _memory_dma_alignment,
362 _disk_read_dma_alignment);
363
364 //
365 // First, try to read directly into the buffer. Most of the reads will
366 // end here.
367 //
368 auto read = read_dma(offset, rstate->buf.get_write(),
369 rstate->buf.size(), pc);
370
371 return read.then([rstate, this, &pc] (size_t size) mutable {
372 rstate->pos = size;
373
374 //
375 // If we haven't read all required data at once -
376 // start read-copy sequence. We can't continue with direct reads
377 // into the previously allocated buffer here since we have to ensure
378 // the aligned read length and thus the aligned destination buffer
379 // size.
380 //
381 // The copying will actually take place only if there was a HW glitch.
382 // In EOF case or in case of a persistent I/O error the only overhead is
383 // an extra allocation.
384 //
385 return do_until(
386 [rstate] { return rstate->done(); },
387 [rstate, this, &pc] () mutable {
388 return read_maybe_eof(
389 rstate->cur_offset(), rstate->left_to_read(), pc).then(
390 [rstate] (auto buf1) mutable {
391 if (buf1.size()) {
392 rstate->append_new_data(buf1);
393 } else {
394 rstate->eof = true;
395 }
396
397 return make_ready_future<>();
398 });
399 }).then([rstate] () mutable {
400 //
401 // If we are here we are promised to have read some bytes beyond
402 // "front" so we may trim straight away.
403 //
404 rstate->trim_buf_before_ret();
405 return make_ready_future<tmp_buf_type>(std::move(rstate->buf));
406 });
407 });
f67539c2
TL
408 } catch (...) {
409 return make_exception_future<tmp_buf_type>(std::current_exception());
410 }
9f95a23c
TL
411}
412
413future<temporary_buffer<uint8_t>>
414posix_file_impl::read_maybe_eof(uint64_t pos, size_t len, const io_priority_class& pc) {
415 //
416 // We have to allocate a new aligned buffer to make sure we don't get
417 // an EINVAL error due to unaligned destination buffer.
418 //
419 temporary_buffer<uint8_t> buf = temporary_buffer<uint8_t>::aligned(
420 _memory_dma_alignment, align_up(len, size_t(_disk_read_dma_alignment)));
421
422 // try to read a single bulk from the given position
423 auto dst = buf.get_write();
424 auto buf_size = buf.size();
425 return read_dma(pos, dst, buf_size, pc).then_wrapped(
426 [buf = std::move(buf)](future<size_t> f) mutable {
427 try {
f67539c2 428 size_t size = f.get0();
9f95a23c
TL
429
430 buf.trim(size);
431
432 return std::move(buf);
433 } catch (std::system_error& e) {
434 //
435 // TODO: implement a non-trowing file_impl::dma_read() interface to
436 // avoid the exceptions throwing in a good flow completely.
437 // Otherwise for users that don't want to care about the
438 // underlying file size and preventing the attempts to read
439 // bytes beyond EOF there will always be at least one
440 // exception throwing at the file end for files with unaligned
441 // length.
442 //
443 if (e.code().value() == EINVAL) {
444 buf.trim(0);
445 return std::move(buf);
446 } else {
447 throw;
448 }
449 }
450 });
451}
452
f67539c2
TL
453blockdev_file_impl::blockdev_file_impl(int fd, open_flags f, file_open_options options, dev_t device_id)
454 : posix_file_impl(fd, f, options, device_id) {
9f95a23c
TL
455}
456
457future<>
f67539c2 458blockdev_file_impl::truncate(uint64_t length) noexcept {
9f95a23c
TL
459 return make_ready_future<>();
460}
461
462future<>
f67539c2 463blockdev_file_impl::discard(uint64_t offset, uint64_t length) noexcept {
9f95a23c
TL
464 return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable {
465 uint64_t range[2] { offset, length };
466 return wrap_syscall<int>(::ioctl(_fd, BLKDISCARD, &range));
467 }).then([] (syscall_result<int> sr) {
468 sr.throw_if_error();
469 return make_ready_future<>();
470 });
471}
472
473future<>
f67539c2 474blockdev_file_impl::allocate(uint64_t position, uint64_t length) noexcept {
9f95a23c
TL
475 // nothing to do for block device
476 return make_ready_future<>();
477}
478
479append_challenged_posix_file_impl::append_challenged_posix_file_impl(int fd, open_flags f, file_open_options options,
f67539c2
TL
480 unsigned max_size_changing_ops, bool fsync_is_exclusive, dev_t device_id)
481 : posix_file_impl(fd, f, options, device_id)
9f95a23c
TL
482 , _max_size_changing_ops(max_size_changing_ops)
483 , _fsync_is_exclusive(fsync_is_exclusive) {
484 auto r = ::lseek(fd, 0, SEEK_END);
485 throw_system_error_on(r == -1);
486 _committed_size = _logical_size = r;
487 _sloppy_size = options.sloppy_size;
488 auto hint = align_up<uint64_t>(options.sloppy_size_hint, _disk_write_dma_alignment);
489 if (_sloppy_size && _committed_size < hint) {
490 auto r = ::ftruncate(_fd, hint);
491 // We can ignore errors, since it's just a hint.
492 if (r != -1) {
493 _committed_size = hint;
494 }
495 }
496}
497
498append_challenged_posix_file_impl::~append_challenged_posix_file_impl() {
499 // If the file has not been closed we risk having running tasks
500 // that will try to access freed memory.
f67539c2
TL
501 //
502 // It is safe to destory it if nothing is queued.
503 // Note that posix_file_impl::~posix_file_impl auto-closes the file descriptor.
504 assert(_q.empty() && _logical_size == _committed_size);
9f95a23c
TL
505}
506
507bool
508append_challenged_posix_file_impl::must_run_alone(const op& candidate) const noexcept {
509 // checks if candidate is a non-write, size-changing operation.
510 return (candidate.type == opcode::truncate)
511 || (candidate.type == opcode::flush && (_fsync_is_exclusive || _sloppy_size));
512}
513
514bool
515append_challenged_posix_file_impl::size_changing(const op& candidate) const noexcept {
516 return (candidate.type == opcode::write && candidate.pos + candidate.len > _committed_size)
517 || must_run_alone(candidate);
518}
519
520bool
521append_challenged_posix_file_impl::may_dispatch(const op& candidate) const noexcept {
522 if (size_changing(candidate)) {
523 return !_current_size_changing_ops && !_current_non_size_changing_ops;
524 } else {
525 return !_current_size_changing_ops;
526 }
527}
528
529void
530append_challenged_posix_file_impl::dispatch(op& candidate) noexcept {
531 unsigned* op_counter = size_changing(candidate)
532 ? &_current_size_changing_ops : &_current_non_size_changing_ops;
533 ++*op_counter;
534 // FIXME: future is discarded
535 (void)candidate.run().then([me = shared_from_this(), op_counter] {
536 --*op_counter;
537 me->process_queue();
538 });
539}
540
541// If we have a bunch of size-extending writes in the queue,
542// issue an ftruncate() extending the file size, so they can
543// be issued concurrently.
544void
545append_challenged_posix_file_impl::optimize_queue() noexcept {
546 if (_current_non_size_changing_ops || _current_size_changing_ops) {
547 // Can't issue an ftruncate() if something is going on
548 return;
549 }
550 auto speculative_size = _committed_size;
551 unsigned n_appending_writes = 0;
552 for (const auto& op : _q) {
553 // stop calculating speculative size after a non-write, size-changing
554 // operation is found to prevent an useless truncate from being issued.
555 if (must_run_alone(op)) {
556 break;
557 }
558 if (op.type == opcode::write && op.pos + op.len > _committed_size) {
559 speculative_size = std::max(speculative_size, op.pos + op.len);
560 ++n_appending_writes;
561 }
562 }
563 if (n_appending_writes > _max_size_changing_ops
564 || (n_appending_writes && _sloppy_size)) {
565 if (_sloppy_size && speculative_size < 2 * _committed_size) {
566 speculative_size = align_up<uint64_t>(2 * _committed_size, _disk_write_dma_alignment);
567 }
568 // We're all alone, so issuing the ftruncate() in the reactor
569 // thread won't block us.
570 //
571 // Issuing it in the syscall thread is too slow; this can happen
572 // every several ops, and the syscall thread latency can be very
573 // high.
574 auto r = ::ftruncate(_fd, speculative_size);
575 if (r != -1) {
576 _committed_size = speculative_size;
577 // If we failed, the next write will pick it up.
578 }
579 }
580}
581
582void
583append_challenged_posix_file_impl::process_queue() noexcept {
584 optimize_queue();
585 while (!_q.empty() && may_dispatch(_q.front())) {
586 op candidate = std::move(_q.front());
587 _q.pop_front();
588 dispatch(candidate);
589 }
590 if (may_quit()) {
591 _completed.set_value();
592 _closing_state = state::closing; // prevents _completed to be signaled again in case of recursion
593 }
594}
595
596void
f67539c2 597append_challenged_posix_file_impl::enqueue_op(op&& op) {
9f95a23c
TL
598 _q.push_back(std::move(op));
599 process_queue();
600}
601
602bool
603append_challenged_posix_file_impl::may_quit() const noexcept {
604 return _closing_state == state::draining && _q.empty() && !_current_non_size_changing_ops &&
605 !_current_size_changing_ops;
606}
607
608void
609append_challenged_posix_file_impl::commit_size(uint64_t size) noexcept {
610 _committed_size = std::max(size, _committed_size);
611 _logical_size = std::max(size, _logical_size);
612}
613
614future<size_t>
f67539c2 615append_challenged_posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& pc) noexcept {
9f95a23c
TL
616 if (pos >= _logical_size) {
617 // later() avoids tail recursion
618 return later().then([] {
619 return size_t(0);
620 });
621 }
622 len = std::min(pos + len, align_up<uint64_t>(_logical_size, _disk_read_dma_alignment)) - pos;
f67539c2 623 return enqueue<size_t>(
9f95a23c
TL
624 opcode::read,
625 pos,
626 len,
f67539c2
TL
627 [this, pos, buffer, len, &pc] () mutable {
628 return posix_file_impl::read_dma(pos, buffer, len, pc);
9f95a23c 629 }
f67539c2 630 );
9f95a23c
TL
631}
632
633future<size_t>
f67539c2 634append_challenged_posix_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) noexcept {
9f95a23c
TL
635 if (pos >= _logical_size) {
636 // later() avoids tail recursion
637 return later().then([] {
638 return size_t(0);
639 });
640 }
641 size_t len = 0;
642 auto i = iov.begin();
f67539c2
TL
643 auto aligned_logical_size = align_up<uint64_t>(_logical_size, _disk_read_dma_alignment);
644 while (i != iov.end() && pos + len + i->iov_len <= aligned_logical_size) {
9f95a23c
TL
645 len += i++->iov_len;
646 }
9f95a23c
TL
647 if (i != iov.end()) {
648 auto last_len = pos + len + i->iov_len - aligned_logical_size;
649 if (last_len) {
650 i++->iov_len = last_len;
651 }
652 iov.erase(i, iov.end());
653 }
f67539c2 654 return enqueue<size_t>(
9f95a23c
TL
655 opcode::read,
656 pos,
657 len,
f67539c2
TL
658 [this, pos, iov = std::move(iov), &pc] () mutable {
659 return posix_file_impl::read_dma(pos, std::move(iov), pc);
9f95a23c 660 }
f67539c2 661 );
9f95a23c
TL
662}
663
664future<size_t>
f67539c2
TL
665append_challenged_posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& pc) noexcept {
666 return enqueue<size_t>(
9f95a23c
TL
667 opcode::write,
668 pos,
669 len,
f67539c2
TL
670 [this, pos, buffer, len, &pc] {
671 return posix_file_impl::write_dma(pos, buffer, len, pc).then([this, pos] (size_t ret) {
672 commit_size(pos + ret);
673 return make_ready_future<size_t>(ret);
9f95a23c
TL
674 });
675 }
f67539c2 676 );
9f95a23c
TL
677}
678
679future<size_t>
f67539c2 680append_challenged_posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) noexcept {
9f95a23c 681 auto len = boost::accumulate(iov | boost::adaptors::transformed(std::mem_fn(&iovec::iov_len)), size_t(0));
f67539c2 682 return enqueue<size_t>(
9f95a23c
TL
683 opcode::write,
684 pos,
685 len,
f67539c2
TL
686 [this, pos, iov = std::move(iov), &pc] () mutable {
687 return posix_file_impl::write_dma(pos, std::move(iov), pc).then([this, pos] (size_t ret) {
688 commit_size(pos + ret);
689 return make_ready_future<size_t>(ret);
9f95a23c
TL
690 });
691 }
f67539c2 692 );
9f95a23c
TL
693}
694
695future<>
f67539c2 696append_challenged_posix_file_impl::flush() noexcept {
9f95a23c
TL
697 if ((!_sloppy_size || _logical_size == _committed_size) && !_fsync_is_exclusive) {
698 // FIXME: determine if flush can block concurrent reads or writes
699 return posix_file_impl::flush();
700 } else {
f67539c2 701 return enqueue<>(
9f95a23c
TL
702 opcode::flush,
703 0,
704 0,
f67539c2
TL
705 [this] () {
706 if (_logical_size != _committed_size) {
707 // We're all alone, so can truncate in reactor thread
708 auto r = ::ftruncate(_fd, _logical_size);
709 if (r == -1) {
710 return make_exception_future<>(std::system_error(errno, std::system_category(), "flush"));
9f95a23c 711 }
f67539c2
TL
712 _committed_size = _logical_size;
713 }
714 return posix_file_impl::flush();
9f95a23c 715 }
f67539c2 716 );
9f95a23c
TL
717 }
718}
719
720future<struct stat>
f67539c2 721append_challenged_posix_file_impl::stat() noexcept {
9f95a23c
TL
722 // FIXME: can this conflict with anything?
723 return posix_file_impl::stat().then([this] (struct stat stat) {
724 stat.st_size = _logical_size;
725 return stat;
726 });
727}
728
729future<>
f67539c2
TL
730append_challenged_posix_file_impl::truncate(uint64_t length) noexcept {
731 return enqueue<>(
9f95a23c
TL
732 opcode::truncate,
733 length,
734 0,
f67539c2
TL
735 [this, length] () mutable {
736 return posix_file_impl::truncate(length).then([this, length] () mutable {
737 _committed_size = _logical_size = length;
9f95a23c
TL
738 });
739 }
f67539c2 740 );
9f95a23c
TL
741}
742
743future<uint64_t>
f67539c2 744append_challenged_posix_file_impl::size() noexcept {
9f95a23c
TL
745 return make_ready_future<size_t>(_logical_size);
746}
747
748future<>
749append_challenged_posix_file_impl::close() noexcept {
750 // Caller should have drained all pending I/O
751 _closing_state = state::draining;
752 process_queue();
753 return _completed.get_future().then([this] {
754 if (_logical_size != _committed_size) {
755 auto r = ::ftruncate(_fd, _logical_size);
756 if (r != -1) {
757 _committed_size = _logical_size;
758 }
759 }
760 return posix_file_impl::close().finally([this] { _closing_state = state::closed; });
761 });
762}
763
764posix_file_handle_impl::~posix_file_handle_impl() {
765 if (_refcount && _refcount->fetch_add(-1, std::memory_order_relaxed) == 1) {
766 ::close(_fd);
767 delete _refcount;
768 }
769}
770
771std::unique_ptr<seastar::file_handle_impl>
772posix_file_handle_impl::clone() const {
f67539c2 773 auto ret = std::make_unique<posix_file_handle_impl>(_fd, _open_flags, _refcount, _device_id);
9f95a23c
TL
774 if (_refcount) {
775 _refcount->fetch_add(1, std::memory_order_relaxed);
776 }
777 return ret;
778}
779
780shared_ptr<file_impl>
781posix_file_handle_impl::to_file() && {
f67539c2 782 auto ret = ::seastar::make_shared<posix_file_impl>(_fd, _open_flags, _refcount, _device_id);
9f95a23c
TL
783 _fd = -1;
784 _refcount = nullptr;
785 return ret;
786}
787
788// Some kernels can append to xfs filesystems, some cannot; determine
789// from kernel version.
790static
791unsigned
792xfs_concurrency_from_kernel_version() {
793 // try to see if this is a mainline kernel with xfs append fixed (3.15+)
794 // or a RHEL kernel with the backported fix (3.10.0-325.el7+)
795 if (kernel_uname().whitelisted({"3.15", "3.10.0-325.el7"})) {
796 // Can append, but not concurrently
797 return 1;
798 }
799 // Cannot append at all; need ftrucnate().
800 return 0;
801}
802
f67539c2
TL
803future<shared_ptr<file_impl>>
804make_file_impl(int fd, file_open_options options, int flags) noexcept {
805 return engine().fstat(fd).then([fd, options = std::move(options), flags] (struct stat st) mutable {
806 auto st_dev = st.st_dev;
9f95a23c 807
f67539c2
TL
808 if (S_ISBLK(st.st_mode)) {
809 return make_ready_future<shared_ptr<file_impl>>(make_shared<blockdev_file_impl>(fd, open_flags(flags), options, st_dev));
810 } else {
811 if ((flags & O_ACCMODE) == O_RDONLY || S_ISDIR(st.st_mode)) {
812 return make_ready_future<shared_ptr<file_impl>>(make_shared<posix_file_impl>(fd, open_flags(flags), options, st_dev));
9f95a23c 813 }
f67539c2
TL
814 struct append_support {
815 bool append_challenged;
816 unsigned append_concurrency;
817 bool fsync_is_exclusive;
818 };
819 static thread_local std::unordered_map<decltype(st_dev), append_support> s_fstype;
820 future<> get_append_support = s_fstype.count(st_dev) ? make_ready_future<>() :
821 engine().fstatfs(fd).then([st_dev] (struct statfs sfs) {
822 append_support as;
823 switch (sfs.f_type) {
824 case 0x58465342: /* XFS */
825 as.append_challenged = true;
826 static auto xc = xfs_concurrency_from_kernel_version();
827 as.append_concurrency = xc;
828 as.fsync_is_exclusive = true;
829 break;
830 case 0x6969: /* NFS */
831 as.append_challenged = false;
832 as.append_concurrency = 0;
833 as.fsync_is_exclusive = false;
834 break;
835 case 0xEF53: /* EXT4 */
836 as.append_challenged = true;
837 as.append_concurrency = 0;
838 as.fsync_is_exclusive = false;
839 break;
840 default:
841 as.append_challenged = true;
842 as.append_concurrency = 0;
843 as.fsync_is_exclusive = true;
844 }
845 s_fstype[st_dev] = as;
846 });
847 return get_append_support.then([st_dev, fd, flags, options = std::move(options)] () mutable {
848 auto as = s_fstype[st_dev];
849 if (!as.append_challenged) {
850 return make_ready_future<shared_ptr<file_impl>>(make_shared<posix_file_impl>(fd, open_flags(flags), std::move(options), st_dev));
851 }
852 return make_ready_future<shared_ptr<file_impl>>(make_shared<append_challenged_posix_file_impl>(fd, open_flags(flags), std::move(options), as.append_concurrency, as.fsync_is_exclusive, st_dev));
853 });
9f95a23c 854 }
f67539c2
TL
855 });
856}
857
858file::file(seastar::file_handle&& handle) noexcept
859 : _file_impl(std::move(std::move(handle).to_file()._file_impl)) {
860}
861
862future<uint64_t> file::size() const noexcept {
863 try {
864 return _file_impl->size();
865 } catch (...) {
866 return current_exception_as_future<uint64_t>();
867 }
868}
869
870future<> file::close() noexcept {
871 return do_with(shared_ptr<file_impl>(_file_impl), [](shared_ptr<file_impl>& f) {
872 return f->close();
873 });
874}
875
876subscription<directory_entry>
877file::list_directory(std::function<future<>(directory_entry de)> next) {
878 return _file_impl->list_directory(std::move(next));
879}
880
881future<temporary_buffer<uint8_t>>
882file::dma_read_bulk_impl(uint64_t offset, size_t range_size, const io_priority_class& pc) noexcept {
883 try {
884 return _file_impl->dma_read_bulk(offset, range_size, pc);
885 } catch (...) {
886 return current_exception_as_future<temporary_buffer<uint8_t>>();
887 }
888}
889
890future<> file::discard(uint64_t offset, uint64_t length) noexcept {
891 try {
892 return _file_impl->discard(offset, length);
893 } catch (...) {
894 return current_exception_as_future();
895 }
896}
897
898future<> file::allocate(uint64_t position, uint64_t length) noexcept {
899 try {
900 return _file_impl->allocate(position, length);
901 } catch (...) {
902 return current_exception_as_future();
903 }
904}
905
906future<> file::truncate(uint64_t length) noexcept {
907 try {
908 return _file_impl->truncate(length);
909 } catch (...) {
910 return current_exception_as_future();
911 }
912}
913
914future<struct stat> file::stat() noexcept {
915 try {
916 return _file_impl->stat();
917 } catch (...) {
918 return current_exception_as_future<struct stat>();
919 }
920}
921
922future<> file::flush() noexcept {
923 try {
924 return _file_impl->flush();
925 } catch (...) {
926 return current_exception_as_future();
927 }
928}
929
930future<size_t> file::dma_write(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) noexcept {
931 try {
932 return _file_impl->write_dma(pos, std::move(iov), pc);
933 } catch (...) {
934 return current_exception_as_future<size_t>();
935 }
936}
937
938future<size_t>
939file::dma_write_impl(uint64_t pos, const uint8_t* buffer, size_t len, const io_priority_class& pc) noexcept {
940 try {
941 return _file_impl->write_dma(pos, buffer, len, pc);
942 } catch (...) {
943 return current_exception_as_future<size_t>();
944 }
945}
946
947future<size_t> file::dma_read(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) noexcept {
948 try {
949 return _file_impl->read_dma(pos, std::move(iov), pc);
950 } catch (...) {
951 return current_exception_as_future<size_t>();
952 }
953}
954
955future<temporary_buffer<uint8_t>>
956file::dma_read_exactly_impl(uint64_t pos, size_t len, const io_priority_class& pc) noexcept {
957 return dma_read<uint8_t>(pos, len, pc).then([len](auto buf) {
958 if (buf.size() < len) {
959 throw eof_error();
9f95a23c 960 }
f67539c2
TL
961
962 return buf;
963 });
9f95a23c
TL
964}
965
f67539c2
TL
966future<temporary_buffer<uint8_t>>
967file::dma_read_impl(uint64_t pos, size_t len, const io_priority_class& pc) noexcept {
968 return dma_read_bulk<uint8_t>(pos, len, pc).then([len](temporary_buffer<uint8_t> buf) {
969 if (len < buf.size()) {
970 buf.trim(len);
971 }
972
973 return buf;
974 });
9f95a23c
TL
975}
976
f67539c2
TL
977future<size_t>
978file::dma_read_impl(uint64_t aligned_pos, uint8_t* aligned_buffer, size_t aligned_len, const io_priority_class& pc) noexcept {
979 try {
980 return _file_impl->read_dma(aligned_pos, aligned_buffer, aligned_len, pc);
981 } catch (...) {
982 return current_exception_as_future<size_t>();
983 }
9f95a23c
TL
984}
985
986seastar::file_handle
987file::dup() {
988 return seastar::file_handle(_file_impl->dup());
989}
990
991file_impl* file_impl::get_file_impl(file& f) {
992 return f._file_impl.get();
993}
994
995std::unique_ptr<seastar::file_handle_impl>
996file_impl::dup() {
997 throw std::runtime_error("this file type cannot be duplicated");
998}
999
f67539c2
TL
1000future<file> open_file_dma(std::string_view name, open_flags flags) noexcept {
1001 return engine().open_file_dma(name, flags, file_open_options());
1002}
1003
1004future<file> open_file_dma(std::string_view name, open_flags flags, file_open_options options) noexcept {
1005 return engine().open_file_dma(name, flags, std::move(options));
1006}
1007
1008future<file> open_directory(std::string_view name) noexcept {
1009 return engine().open_directory(name);
1010}
1011
9f95a23c 1012}