1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include <sys/types.h>
23 #include "KernelDevice.h"
24 #include "include/intarith.h"
25 #include "include/types.h"
26 #include "include/compat.h"
27 #include "include/stringify.h"
28 #include "common/blkdev.h"
29 #include "common/errno.h"
30 #if defined(__FreeBSD__)
31 #include "bsm/audit_errno.h"
33 #include "common/debug.h"
34 #include "common/numa.h"
36 #include "global/global_context.h"
39 #define dout_context cct
40 #define dout_subsys ceph_subsys_bdev
42 #define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
49 using ceph::bufferlist
;
50 using ceph::bufferptr
;
51 using ceph::make_timespan
;
52 using ceph::mono_clock
;
53 using ceph::operator <<;
55 KernelDevice::KernelDevice(CephContext
* cct
, aio_callback_t cb
, void *cbpriv
, aio_callback_t d_cb
, void *d_cbpriv
)
56 : BlockDevice(cct
, cb
, cbpriv
),
57 aio(false), dio(false),
58 discard_callback(d_cb
),
59 discard_callback_priv(d_cbpriv
),
61 discard_started(false),
67 fd_directs
.resize(WRITE_LIFE_MAX
, -1);
68 fd_buffereds
.resize(WRITE_LIFE_MAX
, -1);
70 bool use_ioring
= cct
->_conf
.get_val
<bool>("bdev_ioring");
71 unsigned int iodepth
= cct
->_conf
->bdev_aio_max_queue_depth
;
73 if (use_ioring
&& ioring_queue_t::supported()) {
74 bool use_ioring_hipri
= cct
->_conf
.get_val
<bool>("bdev_ioring_hipri");
75 bool use_ioring_sqthread_poll
= cct
->_conf
.get_val
<bool>("bdev_ioring_sqthread_poll");
76 io_queue
= std::make_unique
<ioring_queue_t
>(iodepth
, use_ioring_hipri
, use_ioring_sqthread_poll
);
79 if (use_ioring
&& !once
) {
80 derr
<< "WARNING: io_uring API is not supported! Fallback to libaio!"
84 io_queue
= std::make_unique
<aio_queue_t
>(iodepth
);
88 int KernelDevice::_lock()
90 dout(10) << __func__
<< " " << fd_directs
[WRITE_LIFE_NOT_SET
] << dendl
;
91 // When the block changes, systemd-udevd will open the block,
92 // read some information and close it. Then a failure occurs here.
93 // So we need to try again here.
94 int fd
= fd_directs
[WRITE_LIFE_NOT_SET
];
95 uint64_t nr_tries
= 0;
97 struct flock fl
= { F_WRLCK
,
99 int r
= ::fcntl(fd
, F_OFD_SETLK
, &fl
);
101 if (errno
== EINVAL
) {
102 r
= ::flock(fd
, LOCK_EX
| LOCK_NB
);
108 if (errno
!= EAGAIN
) {
111 dout(1) << __func__
<< " flock busy on " << path
<< dendl
;
112 if (const uint64_t max_retry
=
113 cct
->_conf
.get_val
<uint64_t>("bdev_flock_retry");
114 max_retry
> 0 && nr_tries
++ == max_retry
) {
117 double retry_interval
=
118 cct
->_conf
.get_val
<double>("bdev_flock_retry_interval");
119 std::this_thread::sleep_for(ceph::make_timespan(retry_interval
));
123 int KernelDevice::open(const string
& p
)
127 dout(1) << __func__
<< " path " << path
<< dendl
;
129 for (i
= 0; i
< WRITE_LIFE_MAX
; i
++) {
130 int fd
= ::open(path
.c_str(), O_RDWR
| O_DIRECT
);
137 fd
= ::open(path
.c_str(), O_RDWR
| O_CLOEXEC
);
142 fd_buffereds
[i
] = fd
;
145 if (i
!= WRITE_LIFE_MAX
) {
146 derr
<< __func__
<< " open got: " << cpp_strerror(r
) << dendl
;
150 #if defined(F_SET_FILE_RW_HINT)
151 for (i
= WRITE_LIFE_NONE
; i
< WRITE_LIFE_MAX
; i
++) {
152 if (fcntl(fd_directs
[i
], F_SET_FILE_RW_HINT
, &i
) < 0) {
156 if (fcntl(fd_buffereds
[i
], F_SET_FILE_RW_HINT
, &i
) < 0) {
161 if (i
!= WRITE_LIFE_MAX
) {
163 dout(0) << "ioctl(F_SET_FILE_RW_HINT) on " << path
<< " failed: " << cpp_strerror(r
) << dendl
;
168 aio
= cct
->_conf
->bdev_aio
;
170 ceph_abort_msg("non-aio not supported");
173 // disable readahead as it will wreak havoc on our mix of
174 // directio/aio and buffered io.
175 r
= posix_fadvise(fd_buffereds
[WRITE_LIFE_NOT_SET
], 0, 0, POSIX_FADV_RANDOM
);
178 derr
<< __func__
<< " posix_fadvise got: " << cpp_strerror(r
) << dendl
;
182 if (lock_exclusive
) {
185 derr
<< __func__
<< " failed to lock " << path
<< ": " << cpp_strerror(r
)
192 r
= ::fstat(fd_directs
[WRITE_LIFE_NOT_SET
], &st
);
195 derr
<< __func__
<< " fstat got " << cpp_strerror(r
) << dendl
;
199 // Operate as though the block size is 4 KB. The backing file
200 // blksize doesn't strictly matter except that some file systems may
201 // require a read/modify/write if we write something smaller than
203 block_size
= cct
->_conf
->bdev_block_size
;
204 if (block_size
!= (unsigned)st
.st_blksize
) {
205 dout(1) << __func__
<< " backing device/file reports st_blksize "
206 << st
.st_blksize
<< ", using bdev_block_size "
207 << block_size
<< " anyway" << dendl
;
212 BlkDev
blkdev_direct(fd_directs
[WRITE_LIFE_NOT_SET
]);
213 BlkDev
blkdev_buffered(fd_buffereds
[WRITE_LIFE_NOT_SET
]);
215 if (S_ISBLK(st
.st_mode
)) {
217 r
= blkdev_direct
.get_size(&s
);
226 char partition
[PATH_MAX
], devname
[PATH_MAX
];
227 if ((r
= blkdev_buffered
.partition(partition
, PATH_MAX
)) ||
228 (r
= blkdev_buffered
.wholedisk(devname
, PATH_MAX
))) {
229 derr
<< "unable to get device name for " << path
<< ": "
230 << cpp_strerror(r
) << dendl
;
233 dout(20) << __func__
<< " devname " << devname
<< dendl
;
234 rotational
= blkdev_buffered
.is_rotational();
235 support_discard
= blkdev_buffered
.support_discard();
236 this->devname
= devname
;
247 // round size down to an even block
248 size
&= ~(block_size
- 1);
252 << " (0x" << std::hex
<< size
<< std::dec
<< ", "
253 << byte_u_t(size
) << ")"
254 << " block_size " << block_size
255 << " (" << byte_u_t(block_size
) << ")"
256 << " " << (rotational
? "rotational" : "non-rotational")
257 << " discard " << (support_discard
? "supported" : "not supported")
262 for (i
= 0; i
< WRITE_LIFE_MAX
; i
++) {
263 if (fd_directs
[i
] >= 0) {
264 VOID_TEMP_FAILURE_RETRY(::close(fd_directs
[i
]));
269 if (fd_buffereds
[i
] >= 0) {
270 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds
[i
]));
271 fd_buffereds
[i
] = -1;
279 int KernelDevice::get_devices(std::set
<std::string
> *ls
) const
281 if (devname
.empty()) {
284 get_raw_devices(devname
, ls
);
288 void KernelDevice::close()
290 dout(1) << __func__
<< dendl
;
295 VOID_TEMP_FAILURE_RETRY(::close(vdo_fd
));
299 for (int i
= 0; i
< WRITE_LIFE_MAX
; i
++) {
300 assert(fd_directs
[i
] >= 0);
301 VOID_TEMP_FAILURE_RETRY(::close(fd_directs
[i
]));
304 assert(fd_buffereds
[i
] >= 0);
305 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds
[i
]));
306 fd_buffereds
[i
] = -1;
311 int KernelDevice::collect_metadata(const string
& prefix
, map
<string
,string
> *pm
) const
313 (*pm
)[prefix
+ "support_discard"] = stringify((int)(bool)support_discard
);
314 (*pm
)[prefix
+ "rotational"] = stringify((int)(bool)rotational
);
315 (*pm
)[prefix
+ "size"] = stringify(get_size());
316 (*pm
)[prefix
+ "block_size"] = stringify(get_block_size());
317 (*pm
)[prefix
+ "driver"] = "KernelDevice";
319 (*pm
)[prefix
+ "type"] = "hdd";
321 (*pm
)[prefix
+ "type"] = "ssd";
324 (*pm
)[prefix
+ "vdo"] = "true";
325 uint64_t total
, avail
;
326 get_vdo_utilization(vdo_fd
, &total
, &avail
);
327 (*pm
)[prefix
+ "vdo_physical_size"] = stringify(total
);
332 std::set
<std::string
> devnames
;
333 if (get_devices(&devnames
) == 0) {
334 for (auto& dev
: devnames
) {
335 if (!res_names
.empty()) {
340 if (res_names
.size()) {
341 (*pm
)[prefix
+ "devices"] = res_names
;
347 int r
= ::fstat(fd_buffereds
[WRITE_LIFE_NOT_SET
], &st
);
350 if (S_ISBLK(st
.st_mode
)) {
351 (*pm
)[prefix
+ "access_mode"] = "blk";
353 char buffer
[1024] = {0};
354 BlkDev blkdev
{fd_buffereds
[WRITE_LIFE_NOT_SET
]};
355 if (r
= blkdev
.partition(buffer
, sizeof(buffer
)); r
) {
356 (*pm
)[prefix
+ "partition_path"] = "unknown";
358 (*pm
)[prefix
+ "partition_path"] = buffer
;
361 if (r
= blkdev
.partition(buffer
, sizeof(buffer
)); r
) {
362 (*pm
)[prefix
+ "dev_node"] = "unknown";
364 (*pm
)[prefix
+ "dev_node"] = buffer
;
370 blkdev
.model(buffer
, sizeof(buffer
));
371 (*pm
)[prefix
+ "model"] = buffer
;
374 blkdev
.dev(buffer
, sizeof(buffer
));
375 (*pm
)[prefix
+ "dev"] = buffer
;
377 // nvme exposes a serial number
379 blkdev
.serial(buffer
, sizeof(buffer
));
380 (*pm
)[prefix
+ "serial"] = buffer
;
384 r
= blkdev
.get_numa_node(&node
);
386 (*pm
)[prefix
+ "numa_node"] = stringify(node
);
389 (*pm
)[prefix
+ "access_mode"] = "file";
390 (*pm
)[prefix
+ "path"] = path
;
395 void KernelDevice::_detect_vdo()
397 vdo_fd
= get_vdo_stats_handle(devname
.c_str(), &vdo_name
);
399 dout(1) << __func__
<< " VDO volume " << vdo_name
400 << " maps to " << devname
<< dendl
;
402 dout(20) << __func__
<< " no VDO volume maps to " << devname
<< dendl
;
407 bool KernelDevice::get_thin_utilization(uint64_t *total
, uint64_t *avail
) const
412 return get_vdo_utilization(vdo_fd
, total
, avail
);
415 int KernelDevice::choose_fd(bool buffered
, int write_hint
) const
417 assert(write_hint
>= WRITE_LIFE_NOT_SET
&& write_hint
< WRITE_LIFE_MAX
);
419 write_hint
= WRITE_LIFE_NOT_SET
;
420 return buffered
? fd_buffereds
[write_hint
] : fd_directs
[write_hint
];
423 int KernelDevice::flush()
425 // protect flush with a mutex. note that we are not really protecting
426 // data here. instead, we're ensuring that if any flush() caller
427 // sees that io_since_flush is true, they block any racing callers
428 // until the flush is observed. that allows racing threads to be
429 // calling flush while still ensuring that *any* of them that got an
430 // aio completion notification will not return before that aio is
431 // stable on disk: whichever thread sees the flag first will block
432 // followers until the aio is stable.
433 std::lock_guard
l(flush_mutex
);
436 if (!io_since_flush
.compare_exchange_strong(expect
, false)) {
437 dout(10) << __func__
<< " no-op (no ios since last flush), flag is "
438 << (int)io_since_flush
.load() << dendl
;
442 dout(10) << __func__
<< " start" << dendl
;
443 if (cct
->_conf
->bdev_inject_crash
) {
445 // sleep for a moment to give other threads a chance to submit or
446 // wait on io that races with a flush.
447 derr
<< __func__
<< " injecting crash. first we sleep..." << dendl
;
448 sleep(cct
->_conf
->bdev_inject_crash_flush_delay
);
449 derr
<< __func__
<< " and now we die" << dendl
;
453 utime_t start
= ceph_clock_now();
454 int r
= ::fdatasync(fd_directs
[WRITE_LIFE_NOT_SET
]);
455 utime_t end
= ceph_clock_now();
456 utime_t dur
= end
- start
;
459 derr
<< __func__
<< " fdatasync got: " << cpp_strerror(r
) << dendl
;
462 dout(5) << __func__
<< " in " << dur
<< dendl
;;
466 int KernelDevice::_aio_start()
469 dout(10) << __func__
<< dendl
;
470 int r
= io_queue
->init(fd_directs
);
473 derr
<< __func__
<< " io_setup(2) failed with EAGAIN; "
474 << "try increasing /proc/sys/fs/aio-max-nr" << dendl
;
476 derr
<< __func__
<< " io_setup(2) failed: " << cpp_strerror(r
) << dendl
;
480 aio_thread
.create("bstore_aio");
485 void KernelDevice::_aio_stop()
488 dout(10) << __func__
<< dendl
;
492 io_queue
->shutdown();
496 int KernelDevice::_discard_start()
498 discard_thread
.create("bstore_discard");
502 void KernelDevice::_discard_stop()
504 dout(10) << __func__
<< dendl
;
506 std::unique_lock
l(discard_lock
);
507 while (!discard_started
) {
508 discard_cond
.wait(l
);
511 discard_cond
.notify_all();
513 discard_thread
.join();
515 std::lock_guard
l(discard_lock
);
516 discard_stop
= false;
518 dout(10) << __func__
<< " stopped" << dendl
;
521 void KernelDevice::discard_drain()
523 dout(10) << __func__
<< dendl
;
524 std::unique_lock
l(discard_lock
);
525 while (!discard_queued
.empty() || discard_running
) {
526 discard_cond
.wait(l
);
530 static bool is_expected_ioerr(const int r
)
532 // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
533 return (r
== -EOPNOTSUPP
|| r
== -ETIMEDOUT
|| r
== -ENOSPC
||
534 r
== -ENOLINK
|| r
== -EREMOTEIO
|| r
== -EAGAIN
|| r
== -EIO
||
535 r
== -ENODATA
|| r
== -EILSEQ
|| r
== -ENOMEM
||
536 #if defined(__linux__)
537 r
== -EREMCHG
|| r
== -EBADE
538 #elif defined(__FreeBSD__)
539 r
== - BSM_ERRNO_EREMCHG
|| r
== -BSM_ERRNO_EBADE
544 void KernelDevice::_aio_thread()
546 dout(10) << __func__
<< " start" << dendl
;
547 int inject_crash_count
= 0;
549 dout(40) << __func__
<< " polling" << dendl
;
550 int max
= cct
->_conf
->bdev_aio_reap_max
;
552 int r
= io_queue
->get_next_completed(cct
->_conf
->bdev_aio_poll_ms
,
555 derr
<< __func__
<< " got " << cpp_strerror(r
) << dendl
;
556 ceph_abort_msg("got unexpected error from io_getevents");
559 dout(30) << __func__
<< " got " << r
<< " completed aios" << dendl
;
560 for (int i
= 0; i
< r
; ++i
) {
561 IOContext
*ioc
= static_cast<IOContext
*>(aio
[i
]->priv
);
562 _aio_log_finish(ioc
, aio
[i
]->offset
, aio
[i
]->length
);
563 if (aio
[i
]->queue_item
.is_linked()) {
564 std::lock_guard
l(debug_queue_lock
);
565 debug_aio_unlink(*aio
[i
]);
568 // set flag indicating new ios have completed. we do this *before*
569 // any completion or notifications so that any user flush() that
570 // follows the observed io completion will include this io. Note
571 // that an earlier, racing flush() could observe and clear this
572 // flag, but that also ensures that the IO will be stable before the
573 // later flush() occurs.
574 io_since_flush
.store(true);
576 long r
= aio
[i
]->get_return_value();
578 derr
<< __func__
<< " got r=" << r
<< " (" << cpp_strerror(r
) << ")"
580 if (ioc
->allow_eio
&& is_expected_ioerr(r
)) {
581 derr
<< __func__
<< " translating the error to EIO for upper layer"
583 ioc
->set_return_value(-EIO
);
585 if (is_expected_ioerr(r
)) {
590 #if defined(HAVE_POSIXAIO)
591 aio
[i
]->aio
.aiocb
.aio_lio_opcode
,
593 aio
[i
]->iocb
.aio_lio_opcode
,
598 "Unexpected IO error. "
599 "This may suggest a hardware issue. "
600 "Please check your kernel log!");
603 "Unexpected IO error. "
604 "This may suggest HW issue. Please check your dmesg!");
606 } else if (aio
[i
]->length
!= (uint64_t)r
) {
607 derr
<< "aio to 0x" << std::hex
<< aio
[i
]->offset
608 << "~" << aio
[i
]->length
<< std::dec
609 << " but returned: " << r
<< dendl
;
610 ceph_abort_msg("unexpected aio return value: does not match length");
613 dout(10) << __func__
<< " finished aio " << aio
[i
] << " r " << r
615 << " with " << (ioc
->num_running
.load() - 1)
616 << " aios left" << dendl
;
618 // NOTE: once num_running and we either call the callback or
619 // call aio_wake we cannot touch ioc or aio[] as the caller
622 if (--ioc
->num_running
== 0) {
623 aio_callback(aio_callback_priv
, ioc
->priv
);
630 if (cct
->_conf
->bdev_debug_aio
) {
631 utime_t now
= ceph_clock_now();
632 std::lock_guard
l(debug_queue_lock
);
634 if (debug_stall_since
== utime_t()) {
635 debug_stall_since
= now
;
637 if (cct
->_conf
->bdev_debug_aio_suicide_timeout
) {
638 utime_t cutoff
= now
;
639 cutoff
-= cct
->_conf
->bdev_debug_aio_suicide_timeout
;
640 if (debug_stall_since
< cutoff
) {
641 derr
<< __func__
<< " stalled aio " << debug_oldest
642 << " since " << debug_stall_since
<< ", timeout is "
643 << cct
->_conf
->bdev_debug_aio_suicide_timeout
644 << "s, suicide" << dendl
;
645 ceph_abort_msg("stalled aio... buggy kernel or bad device?");
652 if (cct
->_conf
->bdev_inject_crash
) {
653 ++inject_crash_count
;
654 if (inject_crash_count
* cct
->_conf
->bdev_aio_poll_ms
/ 1000 >
655 cct
->_conf
->bdev_inject_crash
+ cct
->_conf
->bdev_inject_crash_flush_delay
) {
656 derr
<< __func__
<< " bdev_inject_crash trigger from aio thread"
664 dout(10) << __func__
<< " end" << dendl
;
667 void KernelDevice::_discard_thread()
669 std::unique_lock
l(discard_lock
);
670 ceph_assert(!discard_started
);
671 discard_started
= true;
672 discard_cond
.notify_all();
674 ceph_assert(discard_finishing
.empty());
675 if (discard_queued
.empty()) {
678 dout(20) << __func__
<< " sleep" << dendl
;
679 discard_cond
.notify_all(); // for the thread trying to drain...
680 discard_cond
.wait(l
);
681 dout(20) << __func__
<< " wake" << dendl
;
683 discard_finishing
.swap(discard_queued
);
684 discard_running
= true;
686 dout(20) << __func__
<< " finishing" << dendl
;
687 for (auto p
= discard_finishing
.begin();p
!= discard_finishing
.end(); ++p
) {
688 discard(p
.get_start(), p
.get_len());
691 discard_callback(discard_callback_priv
, static_cast<void*>(&discard_finishing
));
692 discard_finishing
.clear();
694 discard_running
= false;
697 dout(10) << __func__
<< " finish" << dendl
;
698 discard_started
= false;
701 int KernelDevice::queue_discard(interval_set
<uint64_t> &to_release
)
703 if (!support_discard
)
706 if (to_release
.empty())
709 std::lock_guard
l(discard_lock
);
710 discard_queued
.insert(to_release
);
711 discard_cond
.notify_all();
715 void KernelDevice::_aio_log_start(
720 dout(20) << __func__
<< " 0x" << std::hex
<< offset
<< "~" << length
721 << std::dec
<< dendl
;
722 if (cct
->_conf
->bdev_debug_inflight_ios
) {
723 std::lock_guard
l(debug_lock
);
724 if (debug_inflight
.intersects(offset
, length
)) {
725 derr
<< __func__
<< " inflight overlap of 0x"
727 << offset
<< "~" << length
<< std::dec
728 << " with " << debug_inflight
<< dendl
;
731 debug_inflight
.insert(offset
, length
);
735 void KernelDevice::debug_aio_link(aio_t
& aio
)
737 if (debug_queue
.empty()) {
740 debug_queue
.push_back(aio
);
743 void KernelDevice::debug_aio_unlink(aio_t
& aio
)
745 if (aio
.queue_item
.is_linked()) {
746 debug_queue
.erase(debug_queue
.iterator_to(aio
));
747 if (debug_oldest
== &aio
) {
748 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
749 if (age
&& debug_stall_since
!= utime_t()) {
750 utime_t cutoff
= ceph_clock_now();
752 if (debug_stall_since
< cutoff
) {
753 derr
<< __func__
<< " stalled aio " << debug_oldest
754 << " since " << debug_stall_since
<< ", timeout is "
760 if (debug_queue
.empty()) {
761 debug_oldest
= nullptr;
763 debug_oldest
= &debug_queue
.front();
765 debug_stall_since
= utime_t();
770 void KernelDevice::_aio_log_finish(
775 dout(20) << __func__
<< " " << aio
<< " 0x"
776 << std::hex
<< offset
<< "~" << length
<< std::dec
<< dendl
;
777 if (cct
->_conf
->bdev_debug_inflight_ios
) {
778 std::lock_guard
l(debug_lock
);
779 debug_inflight
.erase(offset
, length
);
783 void KernelDevice::aio_submit(IOContext
*ioc
)
785 dout(20) << __func__
<< " ioc " << ioc
786 << " pending " << ioc
->num_pending
.load()
787 << " running " << ioc
->num_running
.load()
790 if (ioc
->num_pending
.load() == 0) {
794 // move these aside, and get our end iterator position now, as the
795 // aios might complete as soon as they are submitted and queue more
797 list
<aio_t
>::iterator e
= ioc
->running_aios
.begin();
798 ioc
->running_aios
.splice(e
, ioc
->pending_aios
);
800 int pending
= ioc
->num_pending
.load();
801 ioc
->num_running
+= pending
;
802 ioc
->num_pending
-= pending
;
803 ceph_assert(ioc
->num_pending
.load() == 0); // we should be only thread doing this
804 ceph_assert(ioc
->pending_aios
.size() == 0);
806 if (cct
->_conf
->bdev_debug_aio
) {
807 list
<aio_t
>::iterator p
= ioc
->running_aios
.begin();
809 dout(30) << __func__
<< " " << *p
<< dendl
;
810 std::lock_guard
l(debug_queue_lock
);
811 debug_aio_link(*p
++);
815 void *priv
= static_cast<void*>(ioc
);
817 // num of pending aios should not overflow when passed to submit_batch()
818 assert(pending
<= std::numeric_limits
<uint16_t>::max());
819 r
= io_queue
->submit_batch(ioc
->running_aios
.begin(), e
,
820 pending
, priv
, &retries
);
823 derr
<< __func__
<< " retries " << retries
<< dendl
;
825 derr
<< " aio submit got " << cpp_strerror(r
) << dendl
;
830 int KernelDevice::_sync_write(uint64_t off
, bufferlist
&bl
, bool buffered
, int write_hint
)
832 uint64_t len
= bl
.length();
833 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
834 << std::dec
<< (buffered
? " (buffered)" : " (direct)") << dendl
;
835 if (cct
->_conf
->bdev_inject_crash
&&
836 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
837 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
838 << off
<< "~" << len
<< std::dec
<< dendl
;
843 bl
.prepare_iov(&iov
);
849 auto r
= ::pwritev(choose_fd(buffered
, write_hint
),
850 &iov
[idx
], iov
.size() - idx
, o
);
854 derr
<< __func__
<< " pwritev error: " << cpp_strerror(r
) << dendl
;
860 // skip fully processed IOVs
861 while (idx
< iov
.size() && (size_t)r
>= iov
[idx
].iov_len
) {
862 r
-= iov
[idx
++].iov_len
;
864 // update partially processed one if any
866 ceph_assert(idx
< iov
.size());
867 ceph_assert((size_t)r
< iov
[idx
].iov_len
);
868 iov
[idx
].iov_base
= static_cast<char*>(iov
[idx
].iov_base
) + r
;
869 iov
[idx
].iov_len
-= r
;
876 #ifdef HAVE_SYNC_FILE_RANGE
878 // initiate IO and wait till it completes
879 auto r
= ::sync_file_range(fd_buffereds
[WRITE_LIFE_NOT_SET
], off
, len
, SYNC_FILE_RANGE_WRITE
|SYNC_FILE_RANGE_WAIT_AFTER
|SYNC_FILE_RANGE_WAIT_BEFORE
);
882 derr
<< __func__
<< " sync_file_range error: " << cpp_strerror(r
) << dendl
;
888 io_since_flush
.store(true);
893 int KernelDevice::write(
899 uint64_t len
= bl
.length();
900 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
901 << (buffered
? " (buffered)" : " (direct)")
903 ceph_assert(is_valid_io(off
, len
));
904 if (cct
->_conf
->objectstore_blackhole
) {
905 lderr(cct
) << __func__
<< " objectstore_blackhole=true, throwing out IO"
910 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
911 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
, IOV_MAX
)) {
912 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
914 dout(40) << "data: ";
918 return _sync_write(off
, bl
, buffered
, write_hint
);
921 int KernelDevice::aio_write(
928 uint64_t len
= bl
.length();
929 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
930 << (buffered
? " (buffered)" : " (direct)")
932 ceph_assert(is_valid_io(off
, len
));
933 if (cct
->_conf
->objectstore_blackhole
) {
934 lderr(cct
) << __func__
<< " objectstore_blackhole=true, throwing out IO"
939 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
940 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
, IOV_MAX
)) {
941 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
943 dout(40) << "data: ";
947 _aio_log_start(ioc
, off
, len
);
950 if (aio
&& dio
&& !buffered
) {
951 if (cct
->_conf
->bdev_inject_crash
&&
952 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
953 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
954 << off
<< "~" << len
<< std::dec
956 // generate a real io so that aio_wait behaves properly, but make it
957 // a read instead of write, and toss the result.
958 ioc
->pending_aios
.push_back(aio_t(ioc
, choose_fd(false, write_hint
)));
960 auto& aio
= ioc
->pending_aios
.back();
961 bufferptr p
= ceph::buffer::create_small_page_aligned(len
);
962 aio
.bl
.append(std::move(p
));
963 aio
.bl
.prepare_iov(&aio
.iov
);
964 aio
.preadv(off
, len
);
967 if (bl
.length() <= RW_IO_MAX
) {
968 // fast path (non-huge write)
969 ioc
->pending_aios
.push_back(aio_t(ioc
, choose_fd(false, write_hint
)));
971 auto& aio
= ioc
->pending_aios
.back();
972 bl
.prepare_iov(&aio
.iov
);
973 aio
.bl
.claim_append(bl
);
974 aio
.pwritev(off
, len
);
975 dout(30) << aio
<< dendl
;
976 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
977 << std::dec
<< " aio " << &aio
<< dendl
;
979 // write in RW_IO_MAX-sized chunks
980 uint64_t prev_len
= 0;
981 while (prev_len
< bl
.length()) {
983 if (prev_len
+ RW_IO_MAX
< bl
.length()) {
984 tmp
.substr_of(bl
, prev_len
, RW_IO_MAX
);
986 tmp
.substr_of(bl
, prev_len
, bl
.length() - prev_len
);
988 auto len
= tmp
.length();
989 ioc
->pending_aios
.push_back(aio_t(ioc
, choose_fd(false, write_hint
)));
991 auto& aio
= ioc
->pending_aios
.back();
992 tmp
.prepare_iov(&aio
.iov
);
993 aio
.bl
.claim_append(tmp
);
994 aio
.pwritev(off
+ prev_len
, len
);
995 dout(30) << aio
<< dendl
;
996 dout(5) << __func__
<< " 0x" << std::hex
<< off
+ prev_len
998 << std::dec
<< " aio " << &aio
<< " (piece)" << dendl
;
1006 int r
= _sync_write(off
, bl
, buffered
, write_hint
);
1007 _aio_log_finish(ioc
, off
, len
);
1014 int KernelDevice::discard(uint64_t offset
, uint64_t len
)
1017 if (cct
->_conf
->objectstore_blackhole
) {
1018 lderr(cct
) << __func__
<< " objectstore_blackhole=true, throwing out IO"
1022 if (support_discard
) {
1023 dout(10) << __func__
1024 << " 0x" << std::hex
<< offset
<< "~" << len
<< std::dec
1027 r
= BlkDev
{fd_directs
[WRITE_LIFE_NOT_SET
]}.discard((int64_t)offset
, (int64_t)len
);
1032 int KernelDevice::read(uint64_t off
, uint64_t len
, bufferlist
*pbl
,
1036 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1037 << (buffered
? " (buffered)" : " (direct)")
1039 ceph_assert(is_valid_io(off
, len
));
1041 _aio_log_start(ioc
, off
, len
);
1043 auto start1
= mono_clock::now();
1045 auto p
= ceph::buffer::ptr_node::create(ceph::buffer::create_small_page_aligned(len
));
1046 int r
= ::pread(buffered
? fd_buffereds
[WRITE_LIFE_NOT_SET
] : fd_directs
[WRITE_LIFE_NOT_SET
],
1047 p
->c_str(), len
, off
);
1048 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
1049 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1050 derr
<< __func__
<< " stalled read "
1051 << " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1052 << (buffered
? " (buffered)" : " (direct)")
1053 << " since " << start1
<< ", timeout is "
1059 if (ioc
->allow_eio
&& is_expected_ioerr(r
)) {
1066 ceph_assert((uint64_t)r
== len
);
1067 pbl
->push_back(std::move(p
));
1069 dout(40) << "data: ";
1070 pbl
->hexdump(*_dout
);
1074 _aio_log_finish(ioc
, off
, len
);
1075 return r
< 0 ? r
: 0;
1078 int KernelDevice::aio_read(
1084 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1090 ceph_assert(is_valid_io(off
, len
));
1091 _aio_log_start(ioc
, off
, len
);
1092 ioc
->pending_aios
.push_back(aio_t(ioc
, fd_directs
[WRITE_LIFE_NOT_SET
]));
1094 aio_t
& aio
= ioc
->pending_aios
.back();
1095 bufferptr p
= ceph::buffer::create_small_page_aligned(len
);
1096 aio
.bl
.append(std::move(p
));
1097 aio
.bl
.prepare_iov(&aio
.iov
);
1098 aio
.preadv(off
, len
);
1099 dout(30) << aio
<< dendl
;
1100 pbl
->append(aio
.bl
);
1101 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
1102 << std::dec
<< " aio " << &aio
<< dendl
;
1106 r
= read(off
, len
, pbl
, ioc
, false);
1112 int KernelDevice::direct_read_unaligned(uint64_t off
, uint64_t len
, char *buf
)
1114 uint64_t aligned_off
= p2align(off
, block_size
);
1115 uint64_t aligned_len
= p2roundup(off
+len
, block_size
) - aligned_off
;
1116 bufferptr p
= ceph::buffer::create_small_page_aligned(aligned_len
);
1119 auto start1
= mono_clock::now();
1120 r
= ::pread(fd_directs
[WRITE_LIFE_NOT_SET
], p
.c_str(), aligned_len
, aligned_off
);
1121 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
1122 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1123 derr
<< __func__
<< " stalled read "
1124 << " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1125 << " since " << start1
<< ", timeout is "
1132 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1133 << " error: " << cpp_strerror(r
) << dendl
;
1136 ceph_assert((uint64_t)r
== aligned_len
);
1137 memcpy(buf
, p
.c_str() + (off
- aligned_off
), len
);
1139 dout(40) << __func__
<< " data: ";
1141 bl
.append(buf
, len
);
1146 return r
< 0 ? r
: 0;
1149 int KernelDevice::read_random(uint64_t off
, uint64_t len
, char *buf
,
1152 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1153 << "buffered " << buffered
1155 ceph_assert(len
> 0);
1156 ceph_assert(off
< size
);
1157 ceph_assert(off
+ len
<= size
);
1159 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
1161 //if it's direct io and unaligned, we have to use a internal buffer
1162 if (!buffered
&& ((off
% block_size
!= 0)
1163 || (len
% block_size
!= 0)
1164 || (uintptr_t(buf
) % CEPH_PAGE_SIZE
!= 0)))
1165 return direct_read_unaligned(off
, len
, buf
);
1167 auto start1
= mono_clock::now();
1172 uint64_t left
= len
;
1174 r
= ::pread(fd_buffereds
[WRITE_LIFE_NOT_SET
], t
, left
, off
);
1177 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << left
1178 << std::dec
<< " error: " << cpp_strerror(r
) << dendl
;
1185 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1186 derr
<< __func__
<< " stalled read "
1187 << " 0x" << std::hex
<< off0
<< "~" << len
<< std::dec
1188 << " (buffered) since " << start1
<< ", timeout is "
1193 //direct and aligned read
1194 r
= ::pread(fd_directs
[WRITE_LIFE_NOT_SET
], buf
, len
, off
);
1195 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1196 derr
<< __func__
<< " stalled read "
1197 << " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1198 << " (direct) since " << start1
<< ", timeout is "
1204 derr
<< __func__
<< " direct_aligned_read" << " 0x" << std::hex
1205 << off
<< "~" << std::left
<< std::dec
<< " error: " << cpp_strerror(r
)
1209 ceph_assert((uint64_t)r
== len
);
1212 dout(40) << __func__
<< " data: ";
1214 bl
.append(buf
, len
);
1219 return r
< 0 ? r
: 0;
1222 int KernelDevice::invalidate_cache(uint64_t off
, uint64_t len
)
1224 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1226 ceph_assert(off
% block_size
== 0);
1227 ceph_assert(len
% block_size
== 0);
1228 int r
= posix_fadvise(fd_buffereds
[WRITE_LIFE_NOT_SET
], off
, len
, POSIX_FADV_DONTNEED
);
1231 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1232 << " error: " << cpp_strerror(r
) << dendl
;