1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include <sys/types.h>
24 #include <boost/container/flat_map.hpp>
25 #include <boost/lockfree/queue.hpp>
27 #include "KernelDevice.h"
28 #include "include/buffer_raw.h"
29 #include "include/intarith.h"
30 #include "include/types.h"
31 #include "include/compat.h"
32 #include "include/stringify.h"
33 #include "include/str_map.h"
34 #include "common/blkdev.h"
35 #include "common/buffer_instrumentation.h"
36 #include "common/errno.h"
37 #if defined(__FreeBSD__)
38 #include "bsm/audit_errno.h"
40 #include "common/debug.h"
41 #include "common/numa.h"
43 #include "global/global_context.h"
46 #define dout_context cct
47 #define dout_subsys ceph_subsys_bdev
49 #define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
56 using ceph::bufferlist
;
57 using ceph::bufferptr
;
58 using ceph::make_timespan
;
59 using ceph::mono_clock
;
60 using ceph::operator <<;
62 KernelDevice::KernelDevice(CephContext
* cct
, aio_callback_t cb
, void *cbpriv
, aio_callback_t d_cb
, void *d_cbpriv
)
63 : BlockDevice(cct
, cb
, cbpriv
),
64 aio(false), dio(false),
65 discard_callback(d_cb
),
66 discard_callback_priv(d_cbpriv
),
68 discard_started(false),
74 fd_directs
.resize(WRITE_LIFE_MAX
, -1);
75 fd_buffereds
.resize(WRITE_LIFE_MAX
, -1);
77 bool use_ioring
= cct
->_conf
.get_val
<bool>("bdev_ioring");
78 unsigned int iodepth
= cct
->_conf
->bdev_aio_max_queue_depth
;
80 if (use_ioring
&& ioring_queue_t::supported()) {
81 bool use_ioring_hipri
= cct
->_conf
.get_val
<bool>("bdev_ioring_hipri");
82 bool use_ioring_sqthread_poll
= cct
->_conf
.get_val
<bool>("bdev_ioring_sqthread_poll");
83 io_queue
= std::make_unique
<ioring_queue_t
>(iodepth
, use_ioring_hipri
, use_ioring_sqthread_poll
);
86 if (use_ioring
&& !once
) {
87 derr
<< "WARNING: io_uring API is not supported! Fallback to libaio!"
91 io_queue
= std::make_unique
<aio_queue_t
>(iodepth
);
95 int KernelDevice::_lock()
97 // When the block changes, systemd-udevd will open the block,
98 // read some information and close it. Then a failure occurs here.
99 // So we need to try again here.
100 int fd
= fd_directs
[WRITE_LIFE_NOT_SET
];
101 dout(10) << __func__
<< " fd=" << fd
<< dendl
;
102 uint64_t nr_tries
= 0;
104 struct flock fl
= { .l_type
= F_WRLCK
,
105 .l_whence
= SEEK_SET
};
106 int r
= ::fcntl(fd
, F_OFD_SETLK
, &fl
);
108 if (errno
== EINVAL
) {
109 r
= ::flock(fd
, LOCK_EX
| LOCK_NB
);
115 if (errno
!= EAGAIN
) {
118 dout(1) << __func__
<< " flock busy on " << path
<< dendl
;
119 if (const uint64_t max_retry
=
120 cct
->_conf
.get_val
<uint64_t>("bdev_flock_retry");
121 max_retry
> 0 && nr_tries
++ == max_retry
) {
124 double retry_interval
=
125 cct
->_conf
.get_val
<double>("bdev_flock_retry_interval");
126 std::this_thread::sleep_for(ceph::make_timespan(retry_interval
));
130 int KernelDevice::open(const string
& p
)
134 dout(1) << __func__
<< " path " << path
<< dendl
;
138 r
= stat(path
.c_str(), &statbuf
);
140 derr
<< __func__
<< " stat got: " << cpp_strerror(r
) << dendl
;
143 is_block
= (statbuf
.st_mode
& S_IFMT
) == S_IFBLK
;
144 for (i
= 0; i
< WRITE_LIFE_MAX
; i
++) {
146 if (lock_exclusive
&& is_block
&& (i
== 0)) {
147 // If opening block device use O_EXCL flag. It gives us best protection,
148 // as no other process can overwrite the data for as long as we are running.
149 // For block devices ::flock is not enough,
150 // since 2 different inodes with same major/minor can be locked.
151 // Exclusion by O_EXCL works in containers too.
154 int fd
= ::open(path
.c_str(), O_RDWR
| O_DIRECT
| flags
);
161 fd
= ::open(path
.c_str(), O_RDWR
| O_CLOEXEC
);
166 fd_buffereds
[i
] = fd
;
169 if (i
!= WRITE_LIFE_MAX
) {
170 derr
<< __func__
<< " open got: " << cpp_strerror(r
) << dendl
;
174 #if defined(F_SET_FILE_RW_HINT)
175 for (i
= WRITE_LIFE_NONE
; i
< WRITE_LIFE_MAX
; i
++) {
176 if (fcntl(fd_directs
[i
], F_SET_FILE_RW_HINT
, &i
) < 0) {
180 if (fcntl(fd_buffereds
[i
], F_SET_FILE_RW_HINT
, &i
) < 0) {
185 if (i
!= WRITE_LIFE_MAX
) {
187 dout(0) << "ioctl(F_SET_FILE_RW_HINT) on " << path
<< " failed: " << cpp_strerror(r
) << dendl
;
192 aio
= cct
->_conf
->bdev_aio
;
194 ceph_abort_msg("non-aio not supported");
197 // disable readahead as it will wreak havoc on our mix of
198 // directio/aio and buffered io.
199 r
= posix_fadvise(fd_buffereds
[WRITE_LIFE_NOT_SET
], 0, 0, POSIX_FADV_RANDOM
);
202 derr
<< __func__
<< " posix_fadvise got: " << cpp_strerror(r
) << dendl
;
206 if (lock_exclusive
) {
207 // We need to keep soft locking (via flock()) because O_EXCL does not work for regular files.
208 // This is as good as we can get. Other processes can still overwrite the data,
209 // but at least we are protected from mounting same device twice in ceph processes.
210 // We also apply soft locking for block devices, as it populates /proc/locks. (see lslocks)
213 derr
<< __func__
<< " failed to lock " << path
<< ": " << cpp_strerror(r
)
220 r
= ::fstat(fd_directs
[WRITE_LIFE_NOT_SET
], &st
);
223 derr
<< __func__
<< " fstat got " << cpp_strerror(r
) << dendl
;
227 // Operate as though the block size is 4 KB. The backing file
228 // blksize doesn't strictly matter except that some file systems may
229 // require a read/modify/write if we write something smaller than
231 block_size
= cct
->_conf
->bdev_block_size
;
232 if (block_size
!= (unsigned)st
.st_blksize
) {
233 dout(1) << __func__
<< " backing device/file reports st_blksize "
234 << st
.st_blksize
<< ", using bdev_block_size "
235 << block_size
<< " anyway" << dendl
;
240 BlkDev
blkdev_direct(fd_directs
[WRITE_LIFE_NOT_SET
]);
241 BlkDev
blkdev_buffered(fd_buffereds
[WRITE_LIFE_NOT_SET
]);
243 if (S_ISBLK(st
.st_mode
)) {
245 r
= blkdev_direct
.get_size(&s
);
254 char partition
[PATH_MAX
], devname
[PATH_MAX
];
255 if ((r
= blkdev_buffered
.partition(partition
, PATH_MAX
)) ||
256 (r
= blkdev_buffered
.wholedisk(devname
, PATH_MAX
))) {
257 derr
<< "unable to get device name for " << path
<< ": "
258 << cpp_strerror(r
) << dendl
;
261 dout(20) << __func__
<< " devname " << devname
<< dendl
;
262 rotational
= blkdev_buffered
.is_rotational();
263 support_discard
= blkdev_buffered
.support_discard();
264 optimal_io_size
= blkdev_buffered
.get_optimal_io_size();
265 this->devname
= devname
;
266 // check if any extended block device plugin recognizes this device
267 // detect_vdo has moved into the VDO plugin
268 int rc
= extblkdev::detect_device(cct
, devname
, ebd_impl
);
270 dout(20) << __func__
<< " no plugin volume maps to " << devname
<< dendl
;
284 if (support_discard
&& cct
->_conf
->bdev_enable_discard
&& cct
->_conf
->bdev_async_discard
) {
288 // round size down to an even block
289 size
&= ~(block_size
- 1);
293 << " (0x" << std::hex
<< size
<< std::dec
<< ", "
294 << byte_u_t(size
) << ")"
295 << " block_size " << block_size
296 << " (" << byte_u_t(block_size
) << ")"
297 << " " << (rotational
? "rotational device," : "non-rotational device,")
298 << " discard " << (support_discard
? "supported" : "not supported")
303 for (i
= 0; i
< WRITE_LIFE_MAX
; i
++) {
304 if (fd_directs
[i
] >= 0) {
305 VOID_TEMP_FAILURE_RETRY(::close(fd_directs
[i
]));
310 if (fd_buffereds
[i
] >= 0) {
311 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds
[i
]));
312 fd_buffereds
[i
] = -1;
320 int KernelDevice::get_devices(std::set
<std::string
> *ls
) const
322 if (devname
.empty()) {
325 get_raw_devices(devname
, ls
);
329 void KernelDevice::close()
331 dout(1) << __func__
<< dendl
;
333 if (discard_thread
.is_started()) {
338 extblkdev::release_device(ebd_impl
);
340 for (int i
= 0; i
< WRITE_LIFE_MAX
; i
++) {
341 assert(fd_directs
[i
] >= 0);
342 VOID_TEMP_FAILURE_RETRY(::close(fd_directs
[i
]));
345 assert(fd_buffereds
[i
] >= 0);
346 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds
[i
]));
347 fd_buffereds
[i
] = -1;
352 int KernelDevice::collect_metadata(const string
& prefix
, map
<string
,string
> *pm
) const
354 (*pm
)[prefix
+ "support_discard"] = stringify((int)(bool)support_discard
);
355 (*pm
)[prefix
+ "rotational"] = stringify((int)(bool)rotational
);
356 (*pm
)[prefix
+ "size"] = stringify(get_size());
357 (*pm
)[prefix
+ "block_size"] = stringify(get_block_size());
358 (*pm
)[prefix
+ "optimal_io_size"] = stringify(get_optimal_io_size());
359 (*pm
)[prefix
+ "driver"] = "KernelDevice";
361 (*pm
)[prefix
+ "type"] = "hdd";
363 (*pm
)[prefix
+ "type"] = "ssd";
365 // if compression device detected, collect meta data for device
366 // VDO specific meta data has moved into VDO plugin
368 ebd_impl
->collect_metadata(prefix
, pm
);
373 std::set
<std::string
> devnames
;
374 if (get_devices(&devnames
) == 0) {
375 for (auto& dev
: devnames
) {
376 if (!res_names
.empty()) {
381 if (res_names
.size()) {
382 (*pm
)[prefix
+ "devices"] = res_names
;
388 int r
= ::fstat(fd_buffereds
[WRITE_LIFE_NOT_SET
], &st
);
391 if (S_ISBLK(st
.st_mode
)) {
392 (*pm
)[prefix
+ "access_mode"] = "blk";
394 char buffer
[1024] = {0};
395 BlkDev blkdev
{fd_buffereds
[WRITE_LIFE_NOT_SET
]};
396 if (r
= blkdev
.partition(buffer
, sizeof(buffer
)); r
) {
397 (*pm
)[prefix
+ "partition_path"] = "unknown";
399 (*pm
)[prefix
+ "partition_path"] = buffer
;
402 if (r
= blkdev
.partition(buffer
, sizeof(buffer
)); r
) {
403 (*pm
)[prefix
+ "dev_node"] = "unknown";
405 (*pm
)[prefix
+ "dev_node"] = buffer
;
411 blkdev
.model(buffer
, sizeof(buffer
));
412 (*pm
)[prefix
+ "model"] = buffer
;
415 blkdev
.dev(buffer
, sizeof(buffer
));
416 (*pm
)[prefix
+ "dev"] = buffer
;
418 // nvme exposes a serial number
420 blkdev
.serial(buffer
, sizeof(buffer
));
421 (*pm
)[prefix
+ "serial"] = buffer
;
425 r
= blkdev
.get_numa_node(&node
);
427 (*pm
)[prefix
+ "numa_node"] = stringify(node
);
430 (*pm
)[prefix
+ "access_mode"] = "file";
431 (*pm
)[prefix
+ "path"] = path
;
436 int KernelDevice::get_ebd_state(ExtBlkDevState
&state
) const
438 // use compression driver plugin to determine physical size and availability
439 // VDO specific get_thin_utilization has moved into VDO plugin
441 return ebd_impl
->get_state(state
);
446 int KernelDevice::choose_fd(bool buffered
, int write_hint
) const
448 #if defined(F_SET_FILE_RW_HINT)
450 write_hint
= WRITE_LIFE_NOT_SET
;
452 // Without WRITE_LIFE capabilities, only one file is used.
453 // And rocksdb sets this value also to > 0, so we need to catch this here
454 // instead of trusting rocksdb to set write_hint.
455 write_hint
= WRITE_LIFE_NOT_SET
;
457 return buffered
? fd_buffereds
[write_hint
] : fd_directs
[write_hint
];
460 int KernelDevice::flush()
462 // protect flush with a mutex. note that we are not really protecting
463 // data here. instead, we're ensuring that if any flush() caller
464 // sees that io_since_flush is true, they block any racing callers
465 // until the flush is observed. that allows racing threads to be
466 // calling flush while still ensuring that *any* of them that got an
467 // aio completion notification will not return before that aio is
468 // stable on disk: whichever thread sees the flag first will block
469 // followers until the aio is stable.
470 std::lock_guard
l(flush_mutex
);
473 if (!io_since_flush
.compare_exchange_strong(expect
, false)) {
474 dout(10) << __func__
<< " no-op (no ios since last flush), flag is "
475 << (int)io_since_flush
.load() << dendl
;
479 dout(10) << __func__
<< " start" << dendl
;
480 if (cct
->_conf
->bdev_inject_crash
) {
482 // sleep for a moment to give other threads a chance to submit or
483 // wait on io that races with a flush.
484 derr
<< __func__
<< " injecting crash. first we sleep..." << dendl
;
485 sleep(cct
->_conf
->bdev_inject_crash_flush_delay
);
486 derr
<< __func__
<< " and now we die" << dendl
;
490 utime_t start
= ceph_clock_now();
491 int r
= ::fdatasync(fd_directs
[WRITE_LIFE_NOT_SET
]);
492 utime_t end
= ceph_clock_now();
493 utime_t dur
= end
- start
;
496 derr
<< __func__
<< " fdatasync got: " << cpp_strerror(r
) << dendl
;
499 dout(5) << __func__
<< " in " << dur
<< dendl
;;
503 int KernelDevice::_aio_start()
506 dout(10) << __func__
<< dendl
;
507 int r
= io_queue
->init(fd_directs
);
510 derr
<< __func__
<< " io_setup(2) failed with EAGAIN; "
511 << "try increasing /proc/sys/fs/aio-max-nr" << dendl
;
513 derr
<< __func__
<< " io_setup(2) failed: " << cpp_strerror(r
) << dendl
;
517 aio_thread
.create("bstore_aio");
522 void KernelDevice::_aio_stop()
525 dout(10) << __func__
<< dendl
;
529 io_queue
->shutdown();
533 void KernelDevice::_discard_start()
535 discard_thread
.create("bstore_discard");
538 void KernelDevice::_discard_stop()
540 dout(10) << __func__
<< dendl
;
542 std::unique_lock
l(discard_lock
);
543 while (!discard_started
) {
544 discard_cond
.wait(l
);
547 discard_cond
.notify_all();
549 discard_thread
.join();
551 std::lock_guard
l(discard_lock
);
552 discard_stop
= false;
554 dout(10) << __func__
<< " stopped" << dendl
;
557 void KernelDevice::discard_drain()
559 dout(10) << __func__
<< dendl
;
560 std::unique_lock
l(discard_lock
);
561 while (!discard_queued
.empty() || discard_running
) {
562 discard_cond
.wait(l
);
566 static bool is_expected_ioerr(const int r
)
568 // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
569 return (r
== -EOPNOTSUPP
|| r
== -ETIMEDOUT
|| r
== -ENOSPC
||
570 r
== -ENOLINK
|| r
== -EREMOTEIO
|| r
== -EAGAIN
|| r
== -EIO
||
571 r
== -ENODATA
|| r
== -EILSEQ
|| r
== -ENOMEM
||
572 #if defined(__linux__)
573 r
== -EREMCHG
|| r
== -EBADE
574 #elif defined(__FreeBSD__)
575 r
== - BSM_ERRNO_EREMCHG
|| r
== -BSM_ERRNO_EBADE
580 void KernelDevice::_aio_thread()
582 dout(10) << __func__
<< " start" << dendl
;
583 int inject_crash_count
= 0;
585 dout(40) << __func__
<< " polling" << dendl
;
586 int max
= cct
->_conf
->bdev_aio_reap_max
;
588 int r
= io_queue
->get_next_completed(cct
->_conf
->bdev_aio_poll_ms
,
591 derr
<< __func__
<< " got " << cpp_strerror(r
) << dendl
;
592 ceph_abort_msg("got unexpected error from io_getevents");
595 dout(30) << __func__
<< " got " << r
<< " completed aios" << dendl
;
596 for (int i
= 0; i
< r
; ++i
) {
597 IOContext
*ioc
= static_cast<IOContext
*>(aio
[i
]->priv
);
598 _aio_log_finish(ioc
, aio
[i
]->offset
, aio
[i
]->length
);
599 if (aio
[i
]->queue_item
.is_linked()) {
600 std::lock_guard
l(debug_queue_lock
);
601 debug_aio_unlink(*aio
[i
]);
604 // set flag indicating new ios have completed. we do this *before*
605 // any completion or notifications so that any user flush() that
606 // follows the observed io completion will include this io. Note
607 // that an earlier, racing flush() could observe and clear this
608 // flag, but that also ensures that the IO will be stable before the
609 // later flush() occurs.
610 io_since_flush
.store(true);
612 long r
= aio
[i
]->get_return_value();
614 derr
<< __func__
<< " got r=" << r
<< " (" << cpp_strerror(r
) << ")"
616 if (ioc
->allow_eio
&& is_expected_ioerr(r
)) {
617 derr
<< __func__
<< " translating the error to EIO for upper layer"
619 ioc
->set_return_value(-EIO
);
621 if (is_expected_ioerr(r
)) {
626 #if defined(HAVE_POSIXAIO)
627 aio
[i
]->aio
.aiocb
.aio_lio_opcode
,
629 aio
[i
]->iocb
.aio_lio_opcode
,
634 "Unexpected IO error. "
635 "This may suggest a hardware issue. "
636 "Please check your kernel log!");
639 "Unexpected IO error. "
640 "This may suggest HW issue. Please check your dmesg!");
642 } else if (aio
[i
]->length
!= (uint64_t)r
) {
643 derr
<< "aio to 0x" << std::hex
<< aio
[i
]->offset
644 << "~" << aio
[i
]->length
<< std::dec
645 << " but returned: " << r
<< dendl
;
646 ceph_abort_msg("unexpected aio return value: does not match length");
649 dout(10) << __func__
<< " finished aio " << aio
[i
] << " r " << r
651 << " with " << (ioc
->num_running
.load() - 1)
652 << " aios left" << dendl
;
654 // NOTE: once num_running and we either call the callback or
655 // call aio_wake we cannot touch ioc or aio[] as the caller
658 if (--ioc
->num_running
== 0) {
659 aio_callback(aio_callback_priv
, ioc
->priv
);
666 if (cct
->_conf
->bdev_debug_aio
) {
667 utime_t now
= ceph_clock_now();
668 std::lock_guard
l(debug_queue_lock
);
670 if (debug_stall_since
== utime_t()) {
671 debug_stall_since
= now
;
673 if (cct
->_conf
->bdev_debug_aio_suicide_timeout
) {
674 utime_t cutoff
= now
;
675 cutoff
-= cct
->_conf
->bdev_debug_aio_suicide_timeout
;
676 if (debug_stall_since
< cutoff
) {
677 derr
<< __func__
<< " stalled aio " << debug_oldest
678 << " since " << debug_stall_since
<< ", timeout is "
679 << cct
->_conf
->bdev_debug_aio_suicide_timeout
680 << "s, suicide" << dendl
;
681 ceph_abort_msg("stalled aio... buggy kernel or bad device?");
687 if (cct
->_conf
->bdev_inject_crash
) {
688 ++inject_crash_count
;
689 if (inject_crash_count
* cct
->_conf
->bdev_aio_poll_ms
/ 1000 >
690 cct
->_conf
->bdev_inject_crash
+ cct
->_conf
->bdev_inject_crash_flush_delay
) {
691 derr
<< __func__
<< " bdev_inject_crash trigger from aio thread"
698 dout(10) << __func__
<< " end" << dendl
;
701 void KernelDevice::_discard_thread()
703 std::unique_lock
l(discard_lock
);
704 ceph_assert(!discard_started
);
705 discard_started
= true;
706 discard_cond
.notify_all();
708 ceph_assert(discard_finishing
.empty());
709 if (discard_queued
.empty()) {
712 dout(20) << __func__
<< " sleep" << dendl
;
713 discard_cond
.notify_all(); // for the thread trying to drain...
714 discard_cond
.wait(l
);
715 dout(20) << __func__
<< " wake" << dendl
;
717 discard_finishing
.swap(discard_queued
);
718 discard_running
= true;
720 dout(20) << __func__
<< " finishing" << dendl
;
721 for (auto p
= discard_finishing
.begin();p
!= discard_finishing
.end(); ++p
) {
722 _discard(p
.get_start(), p
.get_len());
725 discard_callback(discard_callback_priv
, static_cast<void*>(&discard_finishing
));
726 discard_finishing
.clear();
728 discard_running
= false;
731 dout(10) << __func__
<< " finish" << dendl
;
732 discard_started
= false;
735 int KernelDevice::_queue_discard(interval_set
<uint64_t> &to_release
)
737 // if bdev_async_discard enabled on the fly, discard_thread is not started here, fallback to sync discard
738 if (!discard_thread
.is_started())
741 if (to_release
.empty())
744 std::lock_guard
l(discard_lock
);
745 discard_queued
.insert(to_release
);
746 discard_cond
.notify_all();
750 // return true only if _queue_discard succeeded, so caller won't have to do alloc->release
752 bool KernelDevice::try_discard(interval_set
<uint64_t> &to_release
, bool async
)
754 if (!support_discard
|| !cct
->_conf
->bdev_enable_discard
)
757 if (async
&& discard_thread
.is_started()) {
758 return 0 == _queue_discard(to_release
);
760 for (auto p
= to_release
.begin(); p
!= to_release
.end(); ++p
) {
761 _discard(p
.get_start(), p
.get_len());
767 void KernelDevice::_aio_log_start(
772 dout(20) << __func__
<< " 0x" << std::hex
<< offset
<< "~" << length
773 << std::dec
<< dendl
;
774 if (cct
->_conf
->bdev_debug_inflight_ios
) {
775 std::lock_guard
l(debug_lock
);
776 if (debug_inflight
.intersects(offset
, length
)) {
777 derr
<< __func__
<< " inflight overlap of 0x"
779 << offset
<< "~" << length
<< std::dec
780 << " with " << debug_inflight
<< dendl
;
783 debug_inflight
.insert(offset
, length
);
787 void KernelDevice::debug_aio_link(aio_t
& aio
)
789 if (debug_queue
.empty()) {
792 debug_queue
.push_back(aio
);
795 void KernelDevice::debug_aio_unlink(aio_t
& aio
)
797 if (aio
.queue_item
.is_linked()) {
798 debug_queue
.erase(debug_queue
.iterator_to(aio
));
799 if (debug_oldest
== &aio
) {
800 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
801 if (age
&& debug_stall_since
!= utime_t()) {
802 utime_t cutoff
= ceph_clock_now();
804 if (debug_stall_since
< cutoff
) {
805 derr
<< __func__
<< " stalled aio " << debug_oldest
806 << " since " << debug_stall_since
<< ", timeout is "
812 if (debug_queue
.empty()) {
813 debug_oldest
= nullptr;
815 debug_oldest
= &debug_queue
.front();
817 debug_stall_since
= utime_t();
822 void KernelDevice::_aio_log_finish(
827 dout(20) << __func__
<< " " << aio
<< " 0x"
828 << std::hex
<< offset
<< "~" << length
<< std::dec
<< dendl
;
829 if (cct
->_conf
->bdev_debug_inflight_ios
) {
830 std::lock_guard
l(debug_lock
);
831 debug_inflight
.erase(offset
, length
);
835 void KernelDevice::aio_submit(IOContext
*ioc
)
837 dout(20) << __func__
<< " ioc " << ioc
838 << " pending " << ioc
->num_pending
.load()
839 << " running " << ioc
->num_running
.load()
842 if (ioc
->num_pending
.load() == 0) {
846 // move these aside, and get our end iterator position now, as the
847 // aios might complete as soon as they are submitted and queue more
849 list
<aio_t
>::iterator e
= ioc
->running_aios
.begin();
850 ioc
->running_aios
.splice(e
, ioc
->pending_aios
);
852 int pending
= ioc
->num_pending
.load();
853 ioc
->num_running
+= pending
;
854 ioc
->num_pending
-= pending
;
855 ceph_assert(ioc
->num_pending
.load() == 0); // we should be only thread doing this
856 ceph_assert(ioc
->pending_aios
.size() == 0);
858 if (cct
->_conf
->bdev_debug_aio
) {
859 list
<aio_t
>::iterator p
= ioc
->running_aios
.begin();
861 dout(30) << __func__
<< " " << *p
<< dendl
;
862 std::lock_guard
l(debug_queue_lock
);
863 debug_aio_link(*p
++);
867 void *priv
= static_cast<void*>(ioc
);
869 // num of pending aios should not overflow when passed to submit_batch()
870 assert(pending
<= std::numeric_limits
<uint16_t>::max());
871 r
= io_queue
->submit_batch(ioc
->running_aios
.begin(), e
,
872 pending
, priv
, &retries
);
875 derr
<< __func__
<< " retries " << retries
<< dendl
;
877 derr
<< " aio submit got " << cpp_strerror(r
) << dendl
;
882 int KernelDevice::_sync_write(uint64_t off
, bufferlist
&bl
, bool buffered
, int write_hint
)
884 uint64_t len
= bl
.length();
885 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
886 << std::dec
<< " " << buffermode(buffered
) << dendl
;
887 if (cct
->_conf
->bdev_inject_crash
&&
888 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
889 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
890 << off
<< "~" << len
<< std::dec
<< dendl
;
895 bl
.prepare_iov(&iov
);
901 auto r
= ::pwritev(choose_fd(buffered
, write_hint
),
902 &iov
[idx
], iov
.size() - idx
, o
);
906 derr
<< __func__
<< " pwritev error: " << cpp_strerror(r
) << dendl
;
912 // skip fully processed IOVs
913 while (idx
< iov
.size() && (size_t)r
>= iov
[idx
].iov_len
) {
914 r
-= iov
[idx
++].iov_len
;
916 // update partially processed one if any
918 ceph_assert(idx
< iov
.size());
919 ceph_assert((size_t)r
< iov
[idx
].iov_len
);
920 iov
[idx
].iov_base
= static_cast<char*>(iov
[idx
].iov_base
) + r
;
921 iov
[idx
].iov_len
-= r
;
928 #ifdef HAVE_SYNC_FILE_RANGE
930 // initiate IO and wait till it completes
931 auto r
= ::sync_file_range(fd_buffereds
[WRITE_LIFE_NOT_SET
], off
, len
, SYNC_FILE_RANGE_WRITE
|SYNC_FILE_RANGE_WAIT_AFTER
|SYNC_FILE_RANGE_WAIT_BEFORE
);
934 derr
<< __func__
<< " sync_file_range error: " << cpp_strerror(r
) << dendl
;
940 io_since_flush
.store(true);
945 int KernelDevice::write(
951 uint64_t len
= bl
.length();
952 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
953 << " " << buffermode(buffered
)
955 ceph_assert(is_valid_io(off
, len
));
956 if (cct
->_conf
->objectstore_blackhole
) {
957 lderr(cct
) << __func__
<< " objectstore_blackhole=true, throwing out IO"
962 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
963 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
, IOV_MAX
)) {
964 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
966 dout(40) << "data:\n";
970 return _sync_write(off
, bl
, buffered
, write_hint
);
973 int KernelDevice::aio_write(
980 uint64_t len
= bl
.length();
981 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
982 << " " << buffermode(buffered
)
984 ceph_assert(is_valid_io(off
, len
));
985 if (cct
->_conf
->objectstore_blackhole
) {
986 lderr(cct
) << __func__
<< " objectstore_blackhole=true, throwing out IO"
991 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
992 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
, IOV_MAX
)) {
993 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
995 dout(40) << "data:\n";
999 _aio_log_start(ioc
, off
, len
);
1002 if (aio
&& dio
&& !buffered
) {
1003 if (cct
->_conf
->bdev_inject_crash
&&
1004 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
1005 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
1006 << off
<< "~" << len
<< std::dec
1008 // generate a real io so that aio_wait behaves properly, but make it
1009 // a read instead of write, and toss the result.
1010 ioc
->pending_aios
.push_back(aio_t(ioc
, choose_fd(false, write_hint
)));
1012 auto& aio
= ioc
->pending_aios
.back();
1014 ceph::buffer::ptr_node::create(ceph::buffer::create_small_page_aligned(len
)));
1015 aio
.bl
.prepare_iov(&aio
.iov
);
1016 aio
.preadv(off
, len
);
1019 if (bl
.length() <= RW_IO_MAX
) {
1020 // fast path (non-huge write)
1021 ioc
->pending_aios
.push_back(aio_t(ioc
, choose_fd(false, write_hint
)));
1023 auto& aio
= ioc
->pending_aios
.back();
1024 bl
.prepare_iov(&aio
.iov
);
1025 aio
.bl
.claim_append(bl
);
1026 aio
.pwritev(off
, len
);
1027 dout(30) << aio
<< dendl
;
1028 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
1029 << std::dec
<< " aio " << &aio
<< dendl
;
1031 // write in RW_IO_MAX-sized chunks
1032 uint64_t prev_len
= 0;
1033 while (prev_len
< bl
.length()) {
1035 if (prev_len
+ RW_IO_MAX
< bl
.length()) {
1036 tmp
.substr_of(bl
, prev_len
, RW_IO_MAX
);
1038 tmp
.substr_of(bl
, prev_len
, bl
.length() - prev_len
);
1040 auto len
= tmp
.length();
1041 ioc
->pending_aios
.push_back(aio_t(ioc
, choose_fd(false, write_hint
)));
1043 auto& aio
= ioc
->pending_aios
.back();
1044 tmp
.prepare_iov(&aio
.iov
);
1045 aio
.bl
.claim_append(tmp
);
1046 aio
.pwritev(off
+ prev_len
, len
);
1047 dout(30) << aio
<< dendl
;
1048 dout(5) << __func__
<< " 0x" << std::hex
<< off
+ prev_len
1050 << std::dec
<< " aio " << &aio
<< " (piece)" << dendl
;
1058 int r
= _sync_write(off
, bl
, buffered
, write_hint
);
1059 _aio_log_finish(ioc
, off
, len
);
1066 int KernelDevice::_discard(uint64_t offset
, uint64_t len
)
1069 if (cct
->_conf
->objectstore_blackhole
) {
1070 lderr(cct
) << __func__
<< " objectstore_blackhole=true, throwing out IO"
1074 dout(10) << __func__
1075 << " 0x" << std::hex
<< offset
<< "~" << len
<< std::dec
1077 r
= BlkDev
{fd_directs
[WRITE_LIFE_NOT_SET
]}.discard((int64_t)offset
, (int64_t)len
);
1081 struct ExplicitHugePagePool
{
1082 using region_queue_t
= boost::lockfree::queue
<void*>;
1083 using instrumented_raw
= ceph::buffer_instrumentation::instrumented_raw
<
1084 BlockDevice::hugepaged_raw_marker_t
>;
1086 struct mmaped_buffer_raw
: public instrumented_raw
{
1087 region_queue_t
& region_q
; // for recycling
1089 mmaped_buffer_raw(void* mmaped_region
, ExplicitHugePagePool
& parent
)
1090 : instrumented_raw(static_cast<char*>(mmaped_region
), parent
.buffer_size
),
1091 region_q(parent
.region_q
) {
1092 // the `mmaped_region` has been passed to `raw` as the buffer's `data`
1094 ~mmaped_buffer_raw() override
{
1095 // don't delete nor unmmap; recycle the region instead
1096 region_q
.push(data
);
1100 ExplicitHugePagePool(const size_t buffer_size
, size_t buffers_in_pool
)
1101 : buffer_size(buffer_size
), region_q(buffers_in_pool
) {
1102 while (buffers_in_pool
--) {
1103 void* const mmaped_region
= ::mmap(
1106 PROT_READ
| PROT_WRITE
,
1107 #if defined(__FreeBSD__)
1108 // FreeBSD doesn't have MAP_HUGETLB nor MAP_POPULATE but it has
1109 // a different, more automated / implicit mechanisms. However,
1110 // we want to mimic the Linux behavior as closely as possible
1111 // also in the matter of error handling which is the reason
1112 // behind MAP_ALIGNED_SUPER.
1113 // See: https://lists.freebsd.org/pipermail/freebsd-questions/2014-August/260578.html
1114 MAP_PRIVATE
| MAP_ANONYMOUS
| MAP_PREFAULT_READ
| MAP_ALIGNED_SUPER
,
1116 MAP_PRIVATE
| MAP_ANONYMOUS
| MAP_POPULATE
| MAP_HUGETLB
,
1117 #endif // __FreeBSD__
1120 if (mmaped_region
== MAP_FAILED
) {
1121 ceph_abort("can't allocate huge buffer;"
1122 " /proc/sys/vm/nr_hugepages misconfigured?");
1124 region_q
.push(mmaped_region
);
1128 ~ExplicitHugePagePool() {
1129 void* mmaped_region
;
1130 while (region_q
.pop(mmaped_region
)) {
1131 ::munmap(mmaped_region
, buffer_size
);
1135 ceph::unique_leakable_ptr
<buffer::raw
> try_create() {
1136 if (void* mmaped_region
; region_q
.pop(mmaped_region
)) {
1137 return ceph::unique_leakable_ptr
<buffer::raw
> {
1138 new mmaped_buffer_raw(mmaped_region
, *this)
1141 // oops, empty queue.
1146 size_t get_buffer_size() const {
1151 const size_t buffer_size
;
1152 region_queue_t region_q
;
1155 struct HugePagePoolOfPools
{
1156 HugePagePoolOfPools(const std::map
<size_t, size_t> conf
)
1157 : pools(conf
.size(), [conf
] (size_t index
, auto emplacer
) {
1158 ceph_assert(index
< conf
.size());
1159 // it could be replaced with a state-mutating lambda and
1160 // `conf::erase()` but performance is not a concern here.
1161 const auto [buffer_size
, buffers_in_pool
] =
1162 *std::next(std::begin(conf
), index
);
1163 emplacer
.emplace(buffer_size
, buffers_in_pool
);
1167 ceph::unique_leakable_ptr
<buffer::raw
> try_create(const size_t size
) {
1168 // thankfully to `conf` being a `std::map` we store the pools
1169 // sorted by buffer sizes. this would allow to clamp to log(n)
1170 // but I doubt admins want to have dozens of accelerated buffer
1171 // size. let's keep this simple for now.
1172 if (auto iter
= std::find_if(std::begin(pools
), std::end(pools
),
1173 [size
] (const auto& pool
) {
1174 return size
== pool
.get_buffer_size();
1176 iter
!= std::end(pools
)) {
1177 return iter
->try_create();
1182 static HugePagePoolOfPools
from_desc(const std::string
& conf
);
1185 // let's have some space inside (for 2 MB and 4 MB perhaps?)
1186 // NOTE: we need tiny_vector as the boost::lockfree queue inside
1187 // pool is not-movable.
1188 ceph::containers::tiny_vector
<ExplicitHugePagePool
, 2> pools
;
1192 HugePagePoolOfPools
HugePagePoolOfPools::from_desc(const std::string
& desc
) {
1193 std::map
<size_t, size_t> conf
; // buffer_size -> buffers_in_pool
1194 std::map
<std::string
, std::string
> exploded_str_conf
;
1195 get_str_map(desc
, &exploded_str_conf
);
1196 for (const auto& [buffer_size_s
, buffers_in_pool_s
] : exploded_str_conf
) {
1197 size_t buffer_size
, buffers_in_pool
;
1198 if (sscanf(buffer_size_s
.c_str(), "%zu", &buffer_size
) != 1) {
1199 ceph_abort("can't parse a key in the configuration");
1201 if (sscanf(buffers_in_pool_s
.c_str(), "%zu", &buffers_in_pool
) != 1) {
1202 ceph_abort("can't parse a value in the configuration");
1204 conf
[buffer_size
] = buffers_in_pool
;
1206 return HugePagePoolOfPools
{std::move(conf
)};
1209 // create a buffer basing on user-configurable. it's intended to make
1210 // our buffers THP-able.
1211 ceph::unique_leakable_ptr
<buffer::raw
> KernelDevice::create_custom_aligned(
1213 IOContext
* const ioc
) const
1215 // just to preserve the logic of create_small_page_aligned().
1216 if (len
< CEPH_PAGE_SIZE
) {
1217 return ceph::buffer::create_small_page_aligned(len
);
1219 static HugePagePoolOfPools hp_pools
= HugePagePoolOfPools::from_desc(
1220 cct
->_conf
.get_val
<std::string
>("bdev_read_preallocated_huge_buffers")
1222 if (auto lucky_raw
= hp_pools
.try_create(len
); lucky_raw
) {
1223 dout(20) << __func__
<< " allocated from huge pool"
1224 << " lucky_raw.data=" << (void*)lucky_raw
->get_data()
1225 << " bdev_read_preallocated_huge_buffers="
1226 << cct
->_conf
.get_val
<std::string
>("bdev_read_preallocated_huge_buffers")
1228 ioc
->flags
|= IOContext::FLAG_DONT_CACHE
;
1231 // fallthrough due to empty buffer pool. this can happen also
1232 // when the configurable was explicitly set to 0.
1233 dout(20) << __func__
<< " cannot allocate from huge pool"
1237 const size_t custom_alignment
= cct
->_conf
->bdev_read_buffer_alignment
;
1238 dout(20) << __func__
<< " with the custom alignment;"
1240 << " custom_alignment=" << custom_alignment
1242 return ceph::buffer::create_aligned(len
, custom_alignment
);
1245 int KernelDevice::read(uint64_t off
, uint64_t len
, bufferlist
*pbl
,
1249 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1250 << " " << buffermode(buffered
)
1252 ceph_assert(is_valid_io(off
, len
));
1254 _aio_log_start(ioc
, off
, len
);
1256 auto start1
= mono_clock::now();
1258 auto p
= ceph::buffer::ptr_node::create(create_custom_aligned(len
, ioc
));
1259 int r
= ::pread(choose_fd(buffered
, WRITE_LIFE_NOT_SET
),
1260 p
->c_str(), len
, off
);
1261 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
1262 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1263 derr
<< __func__
<< " stalled read "
1264 << " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1265 << " " << buffermode(buffered
)
1266 << " since " << start1
<< ", timeout is "
1271 if (ioc
->allow_eio
&& is_expected_ioerr(-errno
)) {
1276 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << std::left
1277 << std::dec
<< " error: " << cpp_strerror(r
) << dendl
;
1280 ceph_assert((uint64_t)r
== len
);
1281 pbl
->push_back(std::move(p
));
1283 dout(40) << "data:\n";
1284 pbl
->hexdump(*_dout
);
1288 _aio_log_finish(ioc
, off
, len
);
1289 return r
< 0 ? r
: 0;
1292 int KernelDevice::aio_read(
1298 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1304 ceph_assert(is_valid_io(off
, len
));
1305 _aio_log_start(ioc
, off
, len
);
1306 ioc
->pending_aios
.push_back(aio_t(ioc
, fd_directs
[WRITE_LIFE_NOT_SET
]));
1308 aio_t
& aio
= ioc
->pending_aios
.back();
1310 ceph::buffer::ptr_node::create(create_custom_aligned(len
, ioc
)));
1311 aio
.bl
.prepare_iov(&aio
.iov
);
1312 aio
.preadv(off
, len
);
1313 dout(30) << aio
<< dendl
;
1314 pbl
->append(aio
.bl
);
1315 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
1316 << std::dec
<< " aio " << &aio
<< dendl
;
1320 r
= read(off
, len
, pbl
, ioc
, false);
1326 int KernelDevice::direct_read_unaligned(uint64_t off
, uint64_t len
, char *buf
)
1328 uint64_t aligned_off
= p2align(off
, block_size
);
1329 uint64_t aligned_len
= p2roundup(off
+len
, block_size
) - aligned_off
;
1330 bufferptr p
= ceph::buffer::create_small_page_aligned(aligned_len
);
1333 auto start1
= mono_clock::now();
1334 r
= ::pread(fd_directs
[WRITE_LIFE_NOT_SET
], p
.c_str(), aligned_len
, aligned_off
);
1335 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
1336 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1337 derr
<< __func__
<< " stalled read "
1338 << " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1339 << " since " << start1
<< ", timeout is "
1346 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1347 << " error: " << cpp_strerror(r
) << dendl
;
1350 ceph_assert((uint64_t)r
== aligned_len
);
1351 memcpy(buf
, p
.c_str() + (off
- aligned_off
), len
);
1353 dout(40) << __func__
<< " data:\n";
1355 bl
.append(buf
, len
);
1360 return r
< 0 ? r
: 0;
1363 int KernelDevice::read_random(uint64_t off
, uint64_t len
, char *buf
,
1366 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1367 << "buffered " << buffered
1369 ceph_assert(len
> 0);
1370 ceph_assert(off
< size
);
1371 ceph_assert(off
+ len
<= size
);
1373 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
1375 //if it's direct io and unaligned, we have to use a internal buffer
1376 if (!buffered
&& ((off
% block_size
!= 0)
1377 || (len
% block_size
!= 0)
1378 || (uintptr_t(buf
) % CEPH_PAGE_SIZE
!= 0)))
1379 return direct_read_unaligned(off
, len
, buf
);
1381 auto start1
= mono_clock::now();
1386 uint64_t left
= len
;
1388 r
= ::pread(fd_buffereds
[WRITE_LIFE_NOT_SET
], t
, left
, off
);
1391 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << left
1392 << std::dec
<< " error: " << cpp_strerror(r
) << dendl
;
1399 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1400 derr
<< __func__
<< " stalled read "
1401 << " 0x" << std::hex
<< off0
<< "~" << len
<< std::dec
1402 << " (buffered) since " << start1
<< ", timeout is "
1407 //direct and aligned read
1408 r
= ::pread(fd_directs
[WRITE_LIFE_NOT_SET
], buf
, len
, off
);
1409 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1410 derr
<< __func__
<< " stalled read "
1411 << " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1412 << " (direct) since " << start1
<< ", timeout is "
1418 derr
<< __func__
<< " direct_aligned_read" << " 0x" << std::hex
1419 << off
<< "~" << std::left
<< std::dec
<< " error: " << cpp_strerror(r
)
1423 ceph_assert((uint64_t)r
== len
);
1426 dout(40) << __func__
<< " data:\n";
1428 bl
.append(buf
, len
);
1433 return r
< 0 ? r
: 0;
1436 int KernelDevice::invalidate_cache(uint64_t off
, uint64_t len
)
1438 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1440 ceph_assert(off
% block_size
== 0);
1441 ceph_assert(len
% block_size
== 0);
1442 int r
= posix_fadvise(fd_buffereds
[WRITE_LIFE_NOT_SET
], off
, len
, POSIX_FADV_DONTNEED
);
1445 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1446 << " error: " << cpp_strerror(r
) << dendl
;