]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/file.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / src / core / file.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright 2019 ScyllaDB
20 */
21
22 #define __user /* empty */ // for xfs includes, below
23
24 #include <sys/syscall.h>
25 #include <dirent.h>
26 #include <linux/types.h> // for xfs, below
27 #include <sys/ioctl.h>
28 #include <xfs/linux.h>
29 #define min min /* prevent xfs.h from defining min() as a macro */
30 #include <xfs/xfs.h>
31 #undef min
32 #include <boost/range/numeric.hpp>
33 #include <boost/range/adaptor/transformed.hpp>
34 #include <seastar/core/reactor.hh>
35 #include <seastar/core/file.hh>
36 #include <seastar/core/report_exception.hh>
37 #include <seastar/core/linux-aio.hh>
38 #include "core/file-impl.hh"
39 #include "core/syscall_result.hh"
40 #include "core/thread_pool.hh"
41 #include "core/uname.hh"
42
43 namespace seastar {
44
45 using namespace internal;
46 using namespace internal::linux_abi;
47
48 file_handle::file_handle(const file_handle& x)
49 : _impl(x._impl ? x._impl->clone() : std::unique_ptr<file_handle_impl>()) {
50 }
51
52 file_handle::file_handle(file_handle&& x) noexcept = default;
53
54 file_handle&
55 file_handle::operator=(const file_handle& x) {
56 return operator=(file_handle(x));
57 }
58
59 file_handle&
60 file_handle::operator=(file_handle&&) noexcept = default;
61
62 file
63 file_handle::to_file() const & {
64 return file_handle(*this).to_file();
65 }
66
67 file
68 file_handle::to_file() && {
69 return file(std::move(*_impl).to_file());
70 }
71
72 posix_file_impl::posix_file_impl(int fd, open_flags f, file_open_options options, io_queue* ioq)
73 : _io_queue(ioq)
74 , _open_flags(f)
75 , _fd(fd)
76 {
77 query_dma_alignment();
78 }
79
80 posix_file_impl::~posix_file_impl() {
81 if (_refcount && _refcount->fetch_add(-1, std::memory_order_relaxed) != 1) {
82 return;
83 }
84 delete _refcount;
85 if (_fd != -1) {
86 // Note: close() can be a blocking operation on NFS
87 ::close(_fd);
88 }
89 }
90
91 void
92 posix_file_impl::query_dma_alignment() {
93 dioattr da;
94 auto r = ioctl(_fd, XFS_IOC_DIOINFO, &da);
95 if (r == 0) {
96 _memory_dma_alignment = da.d_mem;
97 _disk_read_dma_alignment = da.d_miniosz;
98 // xfs wants at least the block size for writes
99 // FIXME: really read the block size
100 _disk_write_dma_alignment = std::max<unsigned>(da.d_miniosz, 4096);
101 }
102 }
103
104 std::unique_ptr<seastar::file_handle_impl>
105 posix_file_impl::dup() {
106 if (!_refcount) {
107 _refcount = new std::atomic<unsigned>(1u);
108 }
109 auto ret = std::make_unique<posix_file_handle_impl>(_fd, _open_flags, _refcount, _io_queue);
110 _refcount->fetch_add(1, std::memory_order_relaxed);
111 return ret;
112 }
113
114 posix_file_impl::posix_file_impl(int fd, open_flags f, std::atomic<unsigned>* refcount, io_queue *ioq)
115 : _refcount(refcount), _io_queue(ioq), _open_flags(f), _fd(fd) {
116 }
117
118 future<>
119 posix_file_impl::flush(void) {
120 if ((_open_flags & open_flags::dsync) != open_flags{}) {
121 return make_ready_future<>();
122 }
123 return engine().fdatasync(_fd);
124 }
125
126 future<struct stat>
127 posix_file_impl::stat(void) {
128 return engine()._thread_pool->submit<syscall_result_extra<struct stat>>([this] {
129 struct stat st;
130 auto ret = ::fstat(_fd, &st);
131 return wrap_syscall(ret, st);
132 }).then([] (syscall_result_extra<struct stat> ret) {
133 ret.throw_if_error();
134 return make_ready_future<struct stat>(ret.extra);
135 });
136 }
137
138 future<>
139 posix_file_impl::truncate(uint64_t length) {
140 return engine()._thread_pool->submit<syscall_result<int>>([this, length] {
141 return wrap_syscall<int>(::ftruncate(_fd, length));
142 }).then([] (syscall_result<int> sr) {
143 sr.throw_if_error();
144 return make_ready_future<>();
145 });
146 }
147
148 future<>
149 posix_file_impl::discard(uint64_t offset, uint64_t length) {
150 return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable {
151 return wrap_syscall<int>(::fallocate(_fd, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE,
152 offset, length));
153 }).then([] (syscall_result<int> sr) {
154 sr.throw_if_error();
155 return make_ready_future<>();
156 });
157 }
158
159 future<>
160 posix_file_impl::allocate(uint64_t position, uint64_t length) {
161 #ifdef FALLOC_FL_ZERO_RANGE
162 // FALLOC_FL_ZERO_RANGE is fairly new, so don't fail if it's not supported.
163 static bool supported = true;
164 if (!supported) {
165 return make_ready_future<>();
166 }
167 return engine()._thread_pool->submit<syscall_result<int>>([this, position, length] () mutable {
168 auto ret = ::fallocate(_fd, FALLOC_FL_ZERO_RANGE|FALLOC_FL_KEEP_SIZE, position, length);
169 if (ret == -1 && errno == EOPNOTSUPP) {
170 ret = 0;
171 supported = false; // Racy, but harmless. At most we issue an extra call or two.
172 }
173 return wrap_syscall<int>(ret);
174 }).then([] (syscall_result<int> sr) {
175 sr.throw_if_error();
176 return make_ready_future<>();
177 });
178 #else
179 return make_ready_future<>();
180 #endif
181 }
182
183 future<uint64_t>
184 posix_file_impl::size() {
185 auto r = ::lseek(_fd, 0, SEEK_END);
186 if (r == -1) {
187 return make_exception_future<uint64_t>(std::system_error(errno, std::system_category()));
188 }
189 return make_ready_future<uint64_t>(r);
190 }
191
192 future<>
193 posix_file_impl::close() noexcept {
194 if (_fd == -1) {
195 seastar_logger.warn("double close() detected, contact support");
196 return make_ready_future<>();
197 }
198 auto fd = _fd;
199 _fd = -1; // Prevent a concurrent close (which is illegal) from closing another file's fd
200 if (_refcount && _refcount->fetch_add(-1, std::memory_order_relaxed) != 1) {
201 _refcount = nullptr;
202 return make_ready_future<>();
203 }
204 delete _refcount;
205 _refcount = nullptr;
206 auto closed = [fd] () noexcept {
207 try {
208 return engine()._thread_pool->submit<syscall_result<int>>([fd] {
209 return wrap_syscall<int>(::close(fd));
210 });
211 } catch (...) {
212 report_exception("Running ::close() in reactor thread, submission failed with exception", std::current_exception());
213 return make_ready_future<syscall_result<int>>(wrap_syscall<int>(::close(fd)));
214 }
215 }();
216 return closed.then([] (syscall_result<int> sr) {
217 sr.throw_if_error();
218 });
219 }
220
221 future<uint64_t>
222 blockdev_file_impl::size(void) {
223 return engine()._thread_pool->submit<syscall_result_extra<size_t>>([this] {
224 uint64_t size;
225 int ret = ::ioctl(_fd, BLKGETSIZE64, &size);
226 return wrap_syscall(ret, size);
227 }).then([] (syscall_result_extra<uint64_t> ret) {
228 ret.throw_if_error();
229 return make_ready_future<uint64_t>(ret.extra);
230 });
231 }
232
233 subscription<directory_entry>
234 posix_file_impl::list_directory(std::function<future<> (directory_entry de)> next) {
235 static constexpr size_t buffer_size = 8192;
236 struct work {
237 stream<directory_entry> s;
238 unsigned current = 0;
239 unsigned total = 0;
240 bool eof = false;
241 int error = 0;
242 char buffer[buffer_size];
243 };
244
245 // While it would be natural to use fdopendir()/readdir(),
246 // our syscall thread pool doesn't support malloc(), which is
247 // required for this to work. So resort to using getdents()
248 // instead.
249
250 // From getdents(2):
251 struct linux_dirent64 {
252 ino64_t d_ino; /* 64-bit inode number */
253 off64_t d_off; /* 64-bit offset to next structure */
254 unsigned short d_reclen; /* Size of this dirent */
255 unsigned char d_type; /* File type */
256 char d_name[]; /* Filename (null-terminated) */
257 };
258
259 auto w = make_lw_shared<work>();
260 auto ret = w->s.listen(std::move(next));
261 // List the directory asynchronously in the background.
262 // Caller synchronizes using the returned subscription.
263 (void)w->s.started().then([w, this] {
264 auto eofcond = [w] { return w->eof; };
265 return do_until(eofcond, [w, this] {
266 if (w->current == w->total) {
267 return engine()._thread_pool->submit<syscall_result<long>>([w , this] () {
268 auto ret = ::syscall(__NR_getdents64, _fd, reinterpret_cast<linux_dirent64*>(w->buffer), buffer_size);
269 return wrap_syscall(ret);
270 }).then([w] (syscall_result<long> ret) {
271 ret.throw_if_error();
272 if (ret.result == 0) {
273 w->eof = true;
274 } else {
275 w->current = 0;
276 w->total = ret.result;
277 }
278 });
279 }
280 auto start = w->buffer + w->current;
281 auto de = reinterpret_cast<linux_dirent64*>(start);
282 compat::optional<directory_entry_type> type;
283 switch (de->d_type) {
284 case DT_BLK:
285 type = directory_entry_type::block_device;
286 break;
287 case DT_CHR:
288 type = directory_entry_type::char_device;
289 break;
290 case DT_DIR:
291 type = directory_entry_type::directory;
292 break;
293 case DT_FIFO:
294 type = directory_entry_type::fifo;
295 break;
296 case DT_REG:
297 type = directory_entry_type::regular;
298 break;
299 case DT_LNK:
300 type = directory_entry_type::link;
301 break;
302 case DT_SOCK:
303 type = directory_entry_type::socket;
304 break;
305 default:
306 // unknown, ignore
307 ;
308 }
309 w->current += de->d_reclen;
310 sstring name = de->d_name;
311 if (name == "." || name == "..") {
312 return make_ready_future<>();
313 }
314 return w->s.produce({std::move(name), type});
315 });
316 }).then([w] {
317 w->s.close();
318 }).handle_exception([] (std::exception_ptr ignored) {});
319 return ret;
320 }
321
322 future<size_t>
323 posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& io_priority_class) {
324 auto req = internal::io_request::make_write(_fd, pos, buffer, len);
325 return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req));
326 }
327
328 future<size_t>
329 posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& io_priority_class) {
330 auto len = internal::sanitize_iovecs(iov, _disk_write_dma_alignment);
331 auto req = internal::io_request::make_writev(_fd, pos, iov);
332 return engine().submit_io_write(_io_queue, io_priority_class, len, std::move(req)).finally([iov = std::move(iov)] () {});
333 }
334
335 future<size_t>
336 posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& io_priority_class) {
337 auto req = internal::io_request::make_read(_fd, pos, buffer, len);
338 return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req));
339 }
340
341 future<size_t>
342 posix_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& io_priority_class) {
343 auto len = internal::sanitize_iovecs(iov, _disk_read_dma_alignment);
344 auto req = internal::io_request::make_readv(_fd, pos, iov);
345 return engine().submit_io_read(_io_queue, io_priority_class, len, std::move(req)).finally([iov = std::move(iov)] () {});
346 }
347
348 future<temporary_buffer<uint8_t>>
349 posix_file_impl::dma_read_bulk(uint64_t offset, size_t range_size, const io_priority_class& pc) {
350 using tmp_buf_type = typename file::read_state<uint8_t>::tmp_buf_type;
351
352 auto front = offset & (_disk_read_dma_alignment - 1);
353 offset -= front;
354 range_size += front;
355
356 auto rstate = make_lw_shared<file::read_state<uint8_t>>(offset, front,
357 range_size,
358 _memory_dma_alignment,
359 _disk_read_dma_alignment);
360
361 //
362 // First, try to read directly into the buffer. Most of the reads will
363 // end here.
364 //
365 auto read = read_dma(offset, rstate->buf.get_write(),
366 rstate->buf.size(), pc);
367
368 return read.then([rstate, this, &pc] (size_t size) mutable {
369 rstate->pos = size;
370
371 //
372 // If we haven't read all required data at once -
373 // start read-copy sequence. We can't continue with direct reads
374 // into the previously allocated buffer here since we have to ensure
375 // the aligned read length and thus the aligned destination buffer
376 // size.
377 //
378 // The copying will actually take place only if there was a HW glitch.
379 // In EOF case or in case of a persistent I/O error the only overhead is
380 // an extra allocation.
381 //
382 return do_until(
383 [rstate] { return rstate->done(); },
384 [rstate, this, &pc] () mutable {
385 return read_maybe_eof(
386 rstate->cur_offset(), rstate->left_to_read(), pc).then(
387 [rstate] (auto buf1) mutable {
388 if (buf1.size()) {
389 rstate->append_new_data(buf1);
390 } else {
391 rstate->eof = true;
392 }
393
394 return make_ready_future<>();
395 });
396 }).then([rstate] () mutable {
397 //
398 // If we are here we are promised to have read some bytes beyond
399 // "front" so we may trim straight away.
400 //
401 rstate->trim_buf_before_ret();
402 return make_ready_future<tmp_buf_type>(std::move(rstate->buf));
403 });
404 });
405 }
406
407 future<temporary_buffer<uint8_t>>
408 posix_file_impl::read_maybe_eof(uint64_t pos, size_t len, const io_priority_class& pc) {
409 //
410 // We have to allocate a new aligned buffer to make sure we don't get
411 // an EINVAL error due to unaligned destination buffer.
412 //
413 temporary_buffer<uint8_t> buf = temporary_buffer<uint8_t>::aligned(
414 _memory_dma_alignment, align_up(len, size_t(_disk_read_dma_alignment)));
415
416 // try to read a single bulk from the given position
417 auto dst = buf.get_write();
418 auto buf_size = buf.size();
419 return read_dma(pos, dst, buf_size, pc).then_wrapped(
420 [buf = std::move(buf)](future<size_t> f) mutable {
421 try {
422 size_t size = std::get<0>(f.get());
423
424 buf.trim(size);
425
426 return std::move(buf);
427 } catch (std::system_error& e) {
428 //
429 // TODO: implement a non-trowing file_impl::dma_read() interface to
430 // avoid the exceptions throwing in a good flow completely.
431 // Otherwise for users that don't want to care about the
432 // underlying file size and preventing the attempts to read
433 // bytes beyond EOF there will always be at least one
434 // exception throwing at the file end for files with unaligned
435 // length.
436 //
437 if (e.code().value() == EINVAL) {
438 buf.trim(0);
439 return std::move(buf);
440 } else {
441 throw;
442 }
443 }
444 });
445 }
446
447 blockdev_file_impl::blockdev_file_impl(int fd, open_flags f, file_open_options options, io_queue *ioq)
448 : posix_file_impl(fd, f, options, ioq) {
449 }
450
451 future<>
452 blockdev_file_impl::truncate(uint64_t length) {
453 return make_ready_future<>();
454 }
455
456 future<>
457 blockdev_file_impl::discard(uint64_t offset, uint64_t length) {
458 return engine()._thread_pool->submit<syscall_result<int>>([this, offset, length] () mutable {
459 uint64_t range[2] { offset, length };
460 return wrap_syscall<int>(::ioctl(_fd, BLKDISCARD, &range));
461 }).then([] (syscall_result<int> sr) {
462 sr.throw_if_error();
463 return make_ready_future<>();
464 });
465 }
466
467 future<>
468 blockdev_file_impl::allocate(uint64_t position, uint64_t length) {
469 // nothing to do for block device
470 return make_ready_future<>();
471 }
472
473 append_challenged_posix_file_impl::append_challenged_posix_file_impl(int fd, open_flags f, file_open_options options,
474 unsigned max_size_changing_ops, bool fsync_is_exclusive, io_queue* ioq)
475 : posix_file_impl(fd, f, options, ioq)
476 , _max_size_changing_ops(max_size_changing_ops)
477 , _fsync_is_exclusive(fsync_is_exclusive) {
478 auto r = ::lseek(fd, 0, SEEK_END);
479 throw_system_error_on(r == -1);
480 _committed_size = _logical_size = r;
481 _sloppy_size = options.sloppy_size;
482 auto hint = align_up<uint64_t>(options.sloppy_size_hint, _disk_write_dma_alignment);
483 if (_sloppy_size && _committed_size < hint) {
484 auto r = ::ftruncate(_fd, hint);
485 // We can ignore errors, since it's just a hint.
486 if (r != -1) {
487 _committed_size = hint;
488 }
489 }
490 }
491
492 append_challenged_posix_file_impl::~append_challenged_posix_file_impl() {
493 // If the file has not been closed we risk having running tasks
494 // that will try to access freed memory.
495 assert(_closing_state == state::closed);
496 }
497
498 bool
499 append_challenged_posix_file_impl::must_run_alone(const op& candidate) const noexcept {
500 // checks if candidate is a non-write, size-changing operation.
501 return (candidate.type == opcode::truncate)
502 || (candidate.type == opcode::flush && (_fsync_is_exclusive || _sloppy_size));
503 }
504
505 bool
506 append_challenged_posix_file_impl::size_changing(const op& candidate) const noexcept {
507 return (candidate.type == opcode::write && candidate.pos + candidate.len > _committed_size)
508 || must_run_alone(candidate);
509 }
510
511 bool
512 append_challenged_posix_file_impl::may_dispatch(const op& candidate) const noexcept {
513 if (size_changing(candidate)) {
514 return !_current_size_changing_ops && !_current_non_size_changing_ops;
515 } else {
516 return !_current_size_changing_ops;
517 }
518 }
519
520 void
521 append_challenged_posix_file_impl::dispatch(op& candidate) noexcept {
522 unsigned* op_counter = size_changing(candidate)
523 ? &_current_size_changing_ops : &_current_non_size_changing_ops;
524 ++*op_counter;
525 // FIXME: future is discarded
526 (void)candidate.run().then([me = shared_from_this(), op_counter] {
527 --*op_counter;
528 me->process_queue();
529 });
530 }
531
532 // If we have a bunch of size-extending writes in the queue,
533 // issue an ftruncate() extending the file size, so they can
534 // be issued concurrently.
535 void
536 append_challenged_posix_file_impl::optimize_queue() noexcept {
537 if (_current_non_size_changing_ops || _current_size_changing_ops) {
538 // Can't issue an ftruncate() if something is going on
539 return;
540 }
541 auto speculative_size = _committed_size;
542 unsigned n_appending_writes = 0;
543 for (const auto& op : _q) {
544 // stop calculating speculative size after a non-write, size-changing
545 // operation is found to prevent an useless truncate from being issued.
546 if (must_run_alone(op)) {
547 break;
548 }
549 if (op.type == opcode::write && op.pos + op.len > _committed_size) {
550 speculative_size = std::max(speculative_size, op.pos + op.len);
551 ++n_appending_writes;
552 }
553 }
554 if (n_appending_writes > _max_size_changing_ops
555 || (n_appending_writes && _sloppy_size)) {
556 if (_sloppy_size && speculative_size < 2 * _committed_size) {
557 speculative_size = align_up<uint64_t>(2 * _committed_size, _disk_write_dma_alignment);
558 }
559 // We're all alone, so issuing the ftruncate() in the reactor
560 // thread won't block us.
561 //
562 // Issuing it in the syscall thread is too slow; this can happen
563 // every several ops, and the syscall thread latency can be very
564 // high.
565 auto r = ::ftruncate(_fd, speculative_size);
566 if (r != -1) {
567 _committed_size = speculative_size;
568 // If we failed, the next write will pick it up.
569 }
570 }
571 }
572
573 void
574 append_challenged_posix_file_impl::process_queue() noexcept {
575 optimize_queue();
576 while (!_q.empty() && may_dispatch(_q.front())) {
577 op candidate = std::move(_q.front());
578 _q.pop_front();
579 dispatch(candidate);
580 }
581 if (may_quit()) {
582 _completed.set_value();
583 _closing_state = state::closing; // prevents _completed to be signaled again in case of recursion
584 }
585 }
586
587 void
588 append_challenged_posix_file_impl::enqueue(op&& op) {
589 _q.push_back(std::move(op));
590 process_queue();
591 }
592
593 bool
594 append_challenged_posix_file_impl::may_quit() const noexcept {
595 return _closing_state == state::draining && _q.empty() && !_current_non_size_changing_ops &&
596 !_current_size_changing_ops;
597 }
598
599 void
600 append_challenged_posix_file_impl::commit_size(uint64_t size) noexcept {
601 _committed_size = std::max(size, _committed_size);
602 _logical_size = std::max(size, _logical_size);
603 }
604
605 future<size_t>
606 append_challenged_posix_file_impl::read_dma(uint64_t pos, void* buffer, size_t len, const io_priority_class& pc) {
607 if (pos >= _logical_size) {
608 // later() avoids tail recursion
609 return later().then([] {
610 return size_t(0);
611 });
612 }
613 len = std::min(pos + len, align_up<uint64_t>(_logical_size, _disk_read_dma_alignment)) - pos;
614 auto pr = make_lw_shared(promise<size_t>());
615 enqueue({
616 opcode::read,
617 pos,
618 len,
619 [this, pr, pos, buffer, len, &pc] {
620 return futurize_apply([this, pos, buffer, len, &pc] () mutable {
621 return posix_file_impl::read_dma(pos, buffer, len, pc);
622 }).then_wrapped([pr] (future<size_t> f) {
623 f.forward_to(std::move(*pr));
624 });
625 }
626 });
627 return pr->get_future();
628 }
629
630 future<size_t>
631 append_challenged_posix_file_impl::read_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) {
632 if (pos >= _logical_size) {
633 // later() avoids tail recursion
634 return later().then([] {
635 return size_t(0);
636 });
637 }
638 size_t len = 0;
639 auto i = iov.begin();
640 while (i != iov.end() && pos + len + i->iov_len <= _logical_size) {
641 len += i++->iov_len;
642 }
643 auto aligned_logical_size = align_up<uint64_t>(_logical_size, _disk_read_dma_alignment);
644 if (i != iov.end()) {
645 auto last_len = pos + len + i->iov_len - aligned_logical_size;
646 if (last_len) {
647 i++->iov_len = last_len;
648 }
649 iov.erase(i, iov.end());
650 }
651 auto pr = make_lw_shared(promise<size_t>());
652 enqueue({
653 opcode::read,
654 pos,
655 len,
656 [this, pr, pos, iov = std::move(iov), &pc] () mutable {
657 return futurize_apply([this, pos, iov = std::move(iov), &pc] () mutable {
658 return posix_file_impl::read_dma(pos, std::move(iov), pc);
659 }).then_wrapped([pr] (future<size_t> f) {
660 f.forward_to(std::move(*pr));
661 });
662 }
663 });
664 return pr->get_future();
665 }
666
667 future<size_t>
668 append_challenged_posix_file_impl::write_dma(uint64_t pos, const void* buffer, size_t len, const io_priority_class& pc) {
669 auto pr = make_lw_shared(promise<size_t>());
670 enqueue({
671 opcode::write,
672 pos,
673 len,
674 [this, pr, pos, buffer, len, &pc] {
675 return futurize_apply([this, pos, buffer, len, &pc] () mutable {
676 return posix_file_impl::write_dma(pos, buffer, len, pc);
677 }).then_wrapped([this, pos, pr] (future<size_t> f) {
678 if (!f.failed()) {
679 auto ret = f.get0();
680 commit_size(pos + ret);
681 // Can't use forward_to(), because future::get0() invalidates the future.
682 pr->set_value(ret);
683 } else {
684 f.forward_to(std::move(*pr));
685 }
686 });
687 }
688 });
689 return pr->get_future();
690 }
691
692 future<size_t>
693 append_challenged_posix_file_impl::write_dma(uint64_t pos, std::vector<iovec> iov, const io_priority_class& pc) {
694 auto pr = make_lw_shared(promise<size_t>());
695 auto len = boost::accumulate(iov | boost::adaptors::transformed(std::mem_fn(&iovec::iov_len)), size_t(0));
696 enqueue({
697 opcode::write,
698 pos,
699 len,
700 [this, pr, pos, iov = std::move(iov), &pc] () mutable {
701 return futurize_apply([this, pos, iov = std::move(iov), &pc] () mutable {
702 return posix_file_impl::write_dma(pos, std::move(iov), pc);
703 }).then_wrapped([this, pos, pr] (future<size_t> f) {
704 if (!f.failed()) {
705 auto ret = f.get0();
706 commit_size(pos + ret);
707 // Can't use forward_to(), because future::get0() invalidates the future.
708 pr->set_value(ret);
709 } else {
710 f.forward_to(std::move(*pr));
711 }
712 });
713 }
714 });
715 return pr->get_future();
716 }
717
718 future<>
719 append_challenged_posix_file_impl::flush() {
720 if ((!_sloppy_size || _logical_size == _committed_size) && !_fsync_is_exclusive) {
721 // FIXME: determine if flush can block concurrent reads or writes
722 return posix_file_impl::flush();
723 } else {
724 auto pr = make_lw_shared(promise<>());
725 enqueue({
726 opcode::flush,
727 0,
728 0,
729 [this, pr] () {
730 return futurize_apply([this] {
731 if (_logical_size != _committed_size) {
732 // We're all alone, so can truncate in reactor thread
733 auto r = ::ftruncate(_fd, _logical_size);
734 throw_system_error_on(r == -1);
735 _committed_size = _logical_size;
736 }
737 return posix_file_impl::flush();
738 }).then_wrapped([pr] (future<> f) {
739 f.forward_to(std::move(*pr));
740 });
741 }
742 });
743 return pr->get_future();
744 }
745 }
746
747 future<struct stat>
748 append_challenged_posix_file_impl::stat() {
749 // FIXME: can this conflict with anything?
750 return posix_file_impl::stat().then([this] (struct stat stat) {
751 stat.st_size = _logical_size;
752 return stat;
753 });
754 }
755
756 future<>
757 append_challenged_posix_file_impl::truncate(uint64_t length) {
758 auto pr = make_lw_shared(promise<>());
759 enqueue({
760 opcode::truncate,
761 length,
762 0,
763 [this, pr, length] () mutable {
764 return futurize_apply([this, length] {
765 return posix_file_impl::truncate(length);
766 }).then_wrapped([this, pr, length] (future<> f) {
767 if (!f.failed()) {
768 _committed_size = _logical_size = length;
769 }
770 f.forward_to(std::move(*pr));
771 });
772 }
773 });
774 return pr->get_future();
775 }
776
777 future<uint64_t>
778 append_challenged_posix_file_impl::size() {
779 return make_ready_future<size_t>(_logical_size);
780 }
781
782 future<>
783 append_challenged_posix_file_impl::close() noexcept {
784 // Caller should have drained all pending I/O
785 _closing_state = state::draining;
786 process_queue();
787 return _completed.get_future().then([this] {
788 if (_logical_size != _committed_size) {
789 auto r = ::ftruncate(_fd, _logical_size);
790 if (r != -1) {
791 _committed_size = _logical_size;
792 }
793 }
794 return posix_file_impl::close().finally([this] { _closing_state = state::closed; });
795 });
796 }
797
798 posix_file_handle_impl::~posix_file_handle_impl() {
799 if (_refcount && _refcount->fetch_add(-1, std::memory_order_relaxed) == 1) {
800 ::close(_fd);
801 delete _refcount;
802 }
803 }
804
805 std::unique_ptr<seastar::file_handle_impl>
806 posix_file_handle_impl::clone() const {
807 auto ret = std::make_unique<posix_file_handle_impl>(_fd, _open_flags, _refcount, _io_queue);
808 if (_refcount) {
809 _refcount->fetch_add(1, std::memory_order_relaxed);
810 }
811 return ret;
812 }
813
814 shared_ptr<file_impl>
815 posix_file_handle_impl::to_file() && {
816 auto ret = ::seastar::make_shared<posix_file_impl>(_fd, _open_flags, _refcount, _io_queue);
817 _fd = -1;
818 _refcount = nullptr;
819 return ret;
820 }
821
822 // Some kernels can append to xfs filesystems, some cannot; determine
823 // from kernel version.
824 static
825 unsigned
826 xfs_concurrency_from_kernel_version() {
827 // try to see if this is a mainline kernel with xfs append fixed (3.15+)
828 // or a RHEL kernel with the backported fix (3.10.0-325.el7+)
829 if (kernel_uname().whitelisted({"3.15", "3.10.0-325.el7"})) {
830 // Can append, but not concurrently
831 return 1;
832 }
833 // Cannot append at all; need ftrucnate().
834 return 0;
835 }
836
837 inline
838 shared_ptr<file_impl>
839 make_file_impl(int fd, file_open_options options) {
840 struct stat st;
841 auto r = ::fstat(fd, &st);
842 throw_system_error_on(r == -1);
843
844 r = ::ioctl(fd, BLKGETSIZE);
845 io_queue& io_queue = engine().get_io_queue(st.st_dev);
846
847 // FIXME: obtain these flags from somewhere else
848 auto flags = ::fcntl(fd, F_GETFL);
849 throw_system_error_on(flags == -1);
850
851 if (r != -1) {
852 return make_shared<blockdev_file_impl>(fd, open_flags(flags), options, &io_queue);
853 } else {
854 if ((flags & O_ACCMODE) == O_RDONLY) {
855 return make_shared<posix_file_impl>(fd, open_flags(flags), options, &io_queue);
856 }
857 if (S_ISDIR(st.st_mode)) {
858 return make_shared<posix_file_impl>(fd, open_flags(flags), options, &io_queue);
859 }
860 struct append_support {
861 bool append_challenged;
862 unsigned append_concurrency;
863 bool fsync_is_exclusive;
864 };
865 static thread_local std::unordered_map<decltype(st.st_dev), append_support> s_fstype;
866 if (!s_fstype.count(st.st_dev)) {
867 struct statfs sfs;
868 auto r = ::fstatfs(fd, &sfs);
869 throw_system_error_on(r == -1);
870 append_support as;
871 switch (sfs.f_type) {
872 case 0x58465342: /* XFS */
873 as.append_challenged = true;
874 static auto xc = xfs_concurrency_from_kernel_version();
875 as.append_concurrency = xc;
876 as.fsync_is_exclusive = true;
877 break;
878 case 0x6969: /* NFS */
879 as.append_challenged = false;
880 as.append_concurrency = 0;
881 as.fsync_is_exclusive = false;
882 break;
883 case 0xEF53: /* EXT4 */
884 as.append_challenged = true;
885 as.append_concurrency = 0;
886 as.fsync_is_exclusive = false;
887 break;
888 default:
889 as.append_challenged = true;
890 as.append_concurrency = 0;
891 as.fsync_is_exclusive = true;
892 }
893 s_fstype[st.st_dev] = as;
894 }
895 auto as = s_fstype[st.st_dev];
896 if (!as.append_challenged) {
897 return make_shared<posix_file_impl>(fd, open_flags(flags), options, &io_queue);
898 }
899 return make_shared<append_challenged_posix_file_impl>(fd, open_flags(flags), options, as.append_concurrency, as.fsync_is_exclusive, &io_queue);
900 }
901 }
902
903 file::file(int fd, file_open_options options)
904 : _file_impl(make_file_impl(fd, options)) {
905 }
906
907 file::file(seastar::file_handle&& handle)
908 : _file_impl(std::move(std::move(handle).to_file()._file_impl)) {
909 }
910
911 seastar::file_handle
912 file::dup() {
913 return seastar::file_handle(_file_impl->dup());
914 }
915
916 file_impl* file_impl::get_file_impl(file& f) {
917 return f._file_impl.get();
918 }
919
920 std::unique_ptr<seastar::file_handle_impl>
921 file_impl::dup() {
922 throw std::runtime_error("this file type cannot be duplicated");
923 }
924
925 }