1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include <sys/types.h>
24 #include <boost/container/flat_map.hpp>
25 #include <boost/lockfree/queue.hpp>
27 #include "KernelDevice.h"
28 #include "include/buffer_raw.h"
29 #include "include/intarith.h"
30 #include "include/types.h"
31 #include "include/compat.h"
32 #include "include/stringify.h"
33 #include "include/str_map.h"
34 #include "common/blkdev.h"
35 #include "common/buffer_instrumentation.h"
36 #include "common/errno.h"
37 #if defined(__FreeBSD__)
38 #include "bsm/audit_errno.h"
40 #include "common/debug.h"
41 #include "common/numa.h"
43 #include "global/global_context.h"
46 #define dout_context cct
47 #define dout_subsys ceph_subsys_bdev
49 #define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
56 using ceph::bufferlist
;
57 using ceph::bufferptr
;
58 using ceph::make_timespan
;
59 using ceph::mono_clock
;
60 using ceph::operator <<;
62 KernelDevice::KernelDevice(CephContext
* cct
, aio_callback_t cb
, void *cbpriv
, aio_callback_t d_cb
, void *d_cbpriv
)
63 : BlockDevice(cct
, cb
, cbpriv
),
64 aio(false), dio(false),
65 discard_callback(d_cb
),
66 discard_callback_priv(d_cbpriv
),
68 discard_started(false),
74 fd_directs
.resize(WRITE_LIFE_MAX
, -1);
75 fd_buffereds
.resize(WRITE_LIFE_MAX
, -1);
77 bool use_ioring
= cct
->_conf
.get_val
<bool>("bdev_ioring");
78 unsigned int iodepth
= cct
->_conf
->bdev_aio_max_queue_depth
;
80 if (use_ioring
&& ioring_queue_t::supported()) {
81 bool use_ioring_hipri
= cct
->_conf
.get_val
<bool>("bdev_ioring_hipri");
82 bool use_ioring_sqthread_poll
= cct
->_conf
.get_val
<bool>("bdev_ioring_sqthread_poll");
83 io_queue
= std::make_unique
<ioring_queue_t
>(iodepth
, use_ioring_hipri
, use_ioring_sqthread_poll
);
86 if (use_ioring
&& !once
) {
87 derr
<< "WARNING: io_uring API is not supported! Fallback to libaio!"
91 io_queue
= std::make_unique
<aio_queue_t
>(iodepth
);
95 int KernelDevice::_lock()
97 // When the block changes, systemd-udevd will open the block,
98 // read some information and close it. Then a failure occurs here.
99 // So we need to try again here.
100 int fd
= fd_directs
[WRITE_LIFE_NOT_SET
];
101 dout(10) << __func__
<< " fd=" << fd
<< dendl
;
102 uint64_t nr_tries
= 0;
104 struct flock fl
= { .l_type
= F_WRLCK
,
105 .l_whence
= SEEK_SET
};
106 int r
= ::fcntl(fd
, F_OFD_SETLK
, &fl
);
108 if (errno
== EINVAL
) {
109 r
= ::flock(fd
, LOCK_EX
| LOCK_NB
);
115 if (errno
!= EAGAIN
) {
118 dout(1) << __func__
<< " flock busy on " << path
<< dendl
;
119 if (const uint64_t max_retry
=
120 cct
->_conf
.get_val
<uint64_t>("bdev_flock_retry");
121 max_retry
> 0 && nr_tries
++ == max_retry
) {
124 double retry_interval
=
125 cct
->_conf
.get_val
<double>("bdev_flock_retry_interval");
126 std::this_thread::sleep_for(ceph::make_timespan(retry_interval
));
130 int KernelDevice::open(const string
& p
)
134 dout(1) << __func__
<< " path " << path
<< dendl
;
136 for (i
= 0; i
< WRITE_LIFE_MAX
; i
++) {
137 int fd
= ::open(path
.c_str(), O_RDWR
| O_DIRECT
);
144 fd
= ::open(path
.c_str(), O_RDWR
| O_CLOEXEC
);
149 fd_buffereds
[i
] = fd
;
152 if (i
!= WRITE_LIFE_MAX
) {
153 derr
<< __func__
<< " open got: " << cpp_strerror(r
) << dendl
;
157 #if defined(F_SET_FILE_RW_HINT)
158 for (i
= WRITE_LIFE_NONE
; i
< WRITE_LIFE_MAX
; i
++) {
159 if (fcntl(fd_directs
[i
], F_SET_FILE_RW_HINT
, &i
) < 0) {
163 if (fcntl(fd_buffereds
[i
], F_SET_FILE_RW_HINT
, &i
) < 0) {
168 if (i
!= WRITE_LIFE_MAX
) {
170 dout(0) << "ioctl(F_SET_FILE_RW_HINT) on " << path
<< " failed: " << cpp_strerror(r
) << dendl
;
175 aio
= cct
->_conf
->bdev_aio
;
177 ceph_abort_msg("non-aio not supported");
180 // disable readahead as it will wreak havoc on our mix of
181 // directio/aio and buffered io.
182 r
= posix_fadvise(fd_buffereds
[WRITE_LIFE_NOT_SET
], 0, 0, POSIX_FADV_RANDOM
);
185 derr
<< __func__
<< " posix_fadvise got: " << cpp_strerror(r
) << dendl
;
189 if (lock_exclusive
) {
192 derr
<< __func__
<< " failed to lock " << path
<< ": " << cpp_strerror(r
)
199 r
= ::fstat(fd_directs
[WRITE_LIFE_NOT_SET
], &st
);
202 derr
<< __func__
<< " fstat got " << cpp_strerror(r
) << dendl
;
206 // Operate as though the block size is 4 KB. The backing file
207 // blksize doesn't strictly matter except that some file systems may
208 // require a read/modify/write if we write something smaller than
210 block_size
= cct
->_conf
->bdev_block_size
;
211 if (block_size
!= (unsigned)st
.st_blksize
) {
212 dout(1) << __func__
<< " backing device/file reports st_blksize "
213 << st
.st_blksize
<< ", using bdev_block_size "
214 << block_size
<< " anyway" << dendl
;
219 BlkDev
blkdev_direct(fd_directs
[WRITE_LIFE_NOT_SET
]);
220 BlkDev
blkdev_buffered(fd_buffereds
[WRITE_LIFE_NOT_SET
]);
222 if (S_ISBLK(st
.st_mode
)) {
224 r
= blkdev_direct
.get_size(&s
);
233 char partition
[PATH_MAX
], devname
[PATH_MAX
];
234 if ((r
= blkdev_buffered
.partition(partition
, PATH_MAX
)) ||
235 (r
= blkdev_buffered
.wholedisk(devname
, PATH_MAX
))) {
236 derr
<< "unable to get device name for " << path
<< ": "
237 << cpp_strerror(r
) << dendl
;
240 dout(20) << __func__
<< " devname " << devname
<< dendl
;
241 rotational
= blkdev_buffered
.is_rotational();
242 support_discard
= blkdev_buffered
.support_discard();
243 optimal_io_size
= blkdev_buffered
.get_optimal_io_size();
244 this->devname
= devname
;
260 // round size down to an even block
261 size
&= ~(block_size
- 1);
265 << " (0x" << std::hex
<< size
<< std::dec
<< ", "
266 << byte_u_t(size
) << ")"
267 << " block_size " << block_size
268 << " (" << byte_u_t(block_size
) << ")"
269 << " " << (rotational
? "rotational" : "non-rotational")
270 << " discard " << (support_discard
? "supported" : "not supported")
275 for (i
= 0; i
< WRITE_LIFE_MAX
; i
++) {
276 if (fd_directs
[i
] >= 0) {
277 VOID_TEMP_FAILURE_RETRY(::close(fd_directs
[i
]));
282 if (fd_buffereds
[i
] >= 0) {
283 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds
[i
]));
284 fd_buffereds
[i
] = -1;
292 int KernelDevice::get_devices(std::set
<std::string
> *ls
) const
294 if (devname
.empty()) {
297 get_raw_devices(devname
, ls
);
301 void KernelDevice::close()
303 dout(1) << __func__
<< dendl
;
309 VOID_TEMP_FAILURE_RETRY(::close(vdo_fd
));
313 for (int i
= 0; i
< WRITE_LIFE_MAX
; i
++) {
314 assert(fd_directs
[i
] >= 0);
315 VOID_TEMP_FAILURE_RETRY(::close(fd_directs
[i
]));
318 assert(fd_buffereds
[i
] >= 0);
319 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds
[i
]));
320 fd_buffereds
[i
] = -1;
325 int KernelDevice::collect_metadata(const string
& prefix
, map
<string
,string
> *pm
) const
327 (*pm
)[prefix
+ "support_discard"] = stringify((int)(bool)support_discard
);
328 (*pm
)[prefix
+ "rotational"] = stringify((int)(bool)rotational
);
329 (*pm
)[prefix
+ "size"] = stringify(get_size());
330 (*pm
)[prefix
+ "block_size"] = stringify(get_block_size());
331 (*pm
)[prefix
+ "optimal_io_size"] = stringify(get_optimal_io_size());
332 (*pm
)[prefix
+ "driver"] = "KernelDevice";
334 (*pm
)[prefix
+ "type"] = "hdd";
336 (*pm
)[prefix
+ "type"] = "ssd";
339 (*pm
)[prefix
+ "vdo"] = "true";
340 uint64_t total
, avail
;
341 get_vdo_utilization(vdo_fd
, &total
, &avail
);
342 (*pm
)[prefix
+ "vdo_physical_size"] = stringify(total
);
347 std::set
<std::string
> devnames
;
348 if (get_devices(&devnames
) == 0) {
349 for (auto& dev
: devnames
) {
350 if (!res_names
.empty()) {
355 if (res_names
.size()) {
356 (*pm
)[prefix
+ "devices"] = res_names
;
362 int r
= ::fstat(fd_buffereds
[WRITE_LIFE_NOT_SET
], &st
);
365 if (S_ISBLK(st
.st_mode
)) {
366 (*pm
)[prefix
+ "access_mode"] = "blk";
368 char buffer
[1024] = {0};
369 BlkDev blkdev
{fd_buffereds
[WRITE_LIFE_NOT_SET
]};
370 if (r
= blkdev
.partition(buffer
, sizeof(buffer
)); r
) {
371 (*pm
)[prefix
+ "partition_path"] = "unknown";
373 (*pm
)[prefix
+ "partition_path"] = buffer
;
376 if (r
= blkdev
.partition(buffer
, sizeof(buffer
)); r
) {
377 (*pm
)[prefix
+ "dev_node"] = "unknown";
379 (*pm
)[prefix
+ "dev_node"] = buffer
;
385 blkdev
.model(buffer
, sizeof(buffer
));
386 (*pm
)[prefix
+ "model"] = buffer
;
389 blkdev
.dev(buffer
, sizeof(buffer
));
390 (*pm
)[prefix
+ "dev"] = buffer
;
392 // nvme exposes a serial number
394 blkdev
.serial(buffer
, sizeof(buffer
));
395 (*pm
)[prefix
+ "serial"] = buffer
;
399 r
= blkdev
.get_numa_node(&node
);
401 (*pm
)[prefix
+ "numa_node"] = stringify(node
);
404 (*pm
)[prefix
+ "access_mode"] = "file";
405 (*pm
)[prefix
+ "path"] = path
;
410 void KernelDevice::_detect_vdo()
412 vdo_fd
= get_vdo_stats_handle(devname
.c_str(), &vdo_name
);
414 dout(1) << __func__
<< " VDO volume " << vdo_name
415 << " maps to " << devname
<< dendl
;
417 dout(20) << __func__
<< " no VDO volume maps to " << devname
<< dendl
;
422 bool KernelDevice::get_thin_utilization(uint64_t *total
, uint64_t *avail
) const
427 return get_vdo_utilization(vdo_fd
, total
, avail
);
430 int KernelDevice::choose_fd(bool buffered
, int write_hint
) const
432 #if defined(F_SET_FILE_RW_HINT)
434 write_hint
= WRITE_LIFE_NOT_SET
;
436 // Without WRITE_LIFE capabilities, only one file is used.
437 // And rocksdb sets this value also to > 0, so we need to catch this here
438 // instead of trusting rocksdb to set write_hint.
439 write_hint
= WRITE_LIFE_NOT_SET
;
441 return buffered
? fd_buffereds
[write_hint
] : fd_directs
[write_hint
];
444 int KernelDevice::flush()
446 // protect flush with a mutex. note that we are not really protecting
447 // data here. instead, we're ensuring that if any flush() caller
448 // sees that io_since_flush is true, they block any racing callers
449 // until the flush is observed. that allows racing threads to be
450 // calling flush while still ensuring that *any* of them that got an
451 // aio completion notification will not return before that aio is
452 // stable on disk: whichever thread sees the flag first will block
453 // followers until the aio is stable.
454 std::lock_guard
l(flush_mutex
);
457 if (!io_since_flush
.compare_exchange_strong(expect
, false)) {
458 dout(10) << __func__
<< " no-op (no ios since last flush), flag is "
459 << (int)io_since_flush
.load() << dendl
;
463 dout(10) << __func__
<< " start" << dendl
;
464 if (cct
->_conf
->bdev_inject_crash
) {
466 // sleep for a moment to give other threads a chance to submit or
467 // wait on io that races with a flush.
468 derr
<< __func__
<< " injecting crash. first we sleep..." << dendl
;
469 sleep(cct
->_conf
->bdev_inject_crash_flush_delay
);
470 derr
<< __func__
<< " and now we die" << dendl
;
474 utime_t start
= ceph_clock_now();
475 int r
= ::fdatasync(fd_directs
[WRITE_LIFE_NOT_SET
]);
476 utime_t end
= ceph_clock_now();
477 utime_t dur
= end
- start
;
480 derr
<< __func__
<< " fdatasync got: " << cpp_strerror(r
) << dendl
;
483 dout(5) << __func__
<< " in " << dur
<< dendl
;;
487 int KernelDevice::_aio_start()
490 dout(10) << __func__
<< dendl
;
491 int r
= io_queue
->init(fd_directs
);
494 derr
<< __func__
<< " io_setup(2) failed with EAGAIN; "
495 << "try increasing /proc/sys/fs/aio-max-nr" << dendl
;
497 derr
<< __func__
<< " io_setup(2) failed: " << cpp_strerror(r
) << dendl
;
501 aio_thread
.create("bstore_aio");
506 void KernelDevice::_aio_stop()
509 dout(10) << __func__
<< dendl
;
513 io_queue
->shutdown();
517 int KernelDevice::_discard_start()
519 discard_thread
.create("bstore_discard");
523 void KernelDevice::_discard_stop()
525 dout(10) << __func__
<< dendl
;
527 std::unique_lock
l(discard_lock
);
528 while (!discard_started
) {
529 discard_cond
.wait(l
);
532 discard_cond
.notify_all();
534 discard_thread
.join();
536 std::lock_guard
l(discard_lock
);
537 discard_stop
= false;
539 dout(10) << __func__
<< " stopped" << dendl
;
542 void KernelDevice::discard_drain()
544 dout(10) << __func__
<< dendl
;
545 std::unique_lock
l(discard_lock
);
546 while (!discard_queued
.empty() || discard_running
) {
547 discard_cond
.wait(l
);
551 static bool is_expected_ioerr(const int r
)
553 // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
554 return (r
== -EOPNOTSUPP
|| r
== -ETIMEDOUT
|| r
== -ENOSPC
||
555 r
== -ENOLINK
|| r
== -EREMOTEIO
|| r
== -EAGAIN
|| r
== -EIO
||
556 r
== -ENODATA
|| r
== -EILSEQ
|| r
== -ENOMEM
||
557 #if defined(__linux__)
558 r
== -EREMCHG
|| r
== -EBADE
559 #elif defined(__FreeBSD__)
560 r
== - BSM_ERRNO_EREMCHG
|| r
== -BSM_ERRNO_EBADE
565 void KernelDevice::_aio_thread()
567 dout(10) << __func__
<< " start" << dendl
;
568 int inject_crash_count
= 0;
570 dout(40) << __func__
<< " polling" << dendl
;
571 int max
= cct
->_conf
->bdev_aio_reap_max
;
573 int r
= io_queue
->get_next_completed(cct
->_conf
->bdev_aio_poll_ms
,
576 derr
<< __func__
<< " got " << cpp_strerror(r
) << dendl
;
577 ceph_abort_msg("got unexpected error from io_getevents");
580 dout(30) << __func__
<< " got " << r
<< " completed aios" << dendl
;
581 for (int i
= 0; i
< r
; ++i
) {
582 IOContext
*ioc
= static_cast<IOContext
*>(aio
[i
]->priv
);
583 _aio_log_finish(ioc
, aio
[i
]->offset
, aio
[i
]->length
);
584 if (aio
[i
]->queue_item
.is_linked()) {
585 std::lock_guard
l(debug_queue_lock
);
586 debug_aio_unlink(*aio
[i
]);
589 // set flag indicating new ios have completed. we do this *before*
590 // any completion or notifications so that any user flush() that
591 // follows the observed io completion will include this io. Note
592 // that an earlier, racing flush() could observe and clear this
593 // flag, but that also ensures that the IO will be stable before the
594 // later flush() occurs.
595 io_since_flush
.store(true);
597 long r
= aio
[i
]->get_return_value();
599 derr
<< __func__
<< " got r=" << r
<< " (" << cpp_strerror(r
) << ")"
601 if (ioc
->allow_eio
&& is_expected_ioerr(r
)) {
602 derr
<< __func__
<< " translating the error to EIO for upper layer"
604 ioc
->set_return_value(-EIO
);
606 if (is_expected_ioerr(r
)) {
611 #if defined(HAVE_POSIXAIO)
612 aio
[i
]->aio
.aiocb
.aio_lio_opcode
,
614 aio
[i
]->iocb
.aio_lio_opcode
,
619 "Unexpected IO error. "
620 "This may suggest a hardware issue. "
621 "Please check your kernel log!");
624 "Unexpected IO error. "
625 "This may suggest HW issue. Please check your dmesg!");
627 } else if (aio
[i
]->length
!= (uint64_t)r
) {
628 derr
<< "aio to 0x" << std::hex
<< aio
[i
]->offset
629 << "~" << aio
[i
]->length
<< std::dec
630 << " but returned: " << r
<< dendl
;
631 ceph_abort_msg("unexpected aio return value: does not match length");
634 dout(10) << __func__
<< " finished aio " << aio
[i
] << " r " << r
636 << " with " << (ioc
->num_running
.load() - 1)
637 << " aios left" << dendl
;
639 // NOTE: once num_running and we either call the callback or
640 // call aio_wake we cannot touch ioc or aio[] as the caller
643 if (--ioc
->num_running
== 0) {
644 aio_callback(aio_callback_priv
, ioc
->priv
);
651 if (cct
->_conf
->bdev_debug_aio
) {
652 utime_t now
= ceph_clock_now();
653 std::lock_guard
l(debug_queue_lock
);
655 if (debug_stall_since
== utime_t()) {
656 debug_stall_since
= now
;
658 if (cct
->_conf
->bdev_debug_aio_suicide_timeout
) {
659 utime_t cutoff
= now
;
660 cutoff
-= cct
->_conf
->bdev_debug_aio_suicide_timeout
;
661 if (debug_stall_since
< cutoff
) {
662 derr
<< __func__
<< " stalled aio " << debug_oldest
663 << " since " << debug_stall_since
<< ", timeout is "
664 << cct
->_conf
->bdev_debug_aio_suicide_timeout
665 << "s, suicide" << dendl
;
666 ceph_abort_msg("stalled aio... buggy kernel or bad device?");
672 if (cct
->_conf
->bdev_inject_crash
) {
673 ++inject_crash_count
;
674 if (inject_crash_count
* cct
->_conf
->bdev_aio_poll_ms
/ 1000 >
675 cct
->_conf
->bdev_inject_crash
+ cct
->_conf
->bdev_inject_crash_flush_delay
) {
676 derr
<< __func__
<< " bdev_inject_crash trigger from aio thread"
683 dout(10) << __func__
<< " end" << dendl
;
686 void KernelDevice::_discard_thread()
688 std::unique_lock
l(discard_lock
);
689 ceph_assert(!discard_started
);
690 discard_started
= true;
691 discard_cond
.notify_all();
693 ceph_assert(discard_finishing
.empty());
694 if (discard_queued
.empty()) {
697 dout(20) << __func__
<< " sleep" << dendl
;
698 discard_cond
.notify_all(); // for the thread trying to drain...
699 discard_cond
.wait(l
);
700 dout(20) << __func__
<< " wake" << dendl
;
702 discard_finishing
.swap(discard_queued
);
703 discard_running
= true;
705 dout(20) << __func__
<< " finishing" << dendl
;
706 for (auto p
= discard_finishing
.begin();p
!= discard_finishing
.end(); ++p
) {
707 discard(p
.get_start(), p
.get_len());
710 discard_callback(discard_callback_priv
, static_cast<void*>(&discard_finishing
));
711 discard_finishing
.clear();
713 discard_running
= false;
716 dout(10) << __func__
<< " finish" << dendl
;
717 discard_started
= false;
720 int KernelDevice::queue_discard(interval_set
<uint64_t> &to_release
)
722 if (!support_discard
)
725 if (to_release
.empty())
728 std::lock_guard
l(discard_lock
);
729 discard_queued
.insert(to_release
);
730 discard_cond
.notify_all();
734 void KernelDevice::_aio_log_start(
739 dout(20) << __func__
<< " 0x" << std::hex
<< offset
<< "~" << length
740 << std::dec
<< dendl
;
741 if (cct
->_conf
->bdev_debug_inflight_ios
) {
742 std::lock_guard
l(debug_lock
);
743 if (debug_inflight
.intersects(offset
, length
)) {
744 derr
<< __func__
<< " inflight overlap of 0x"
746 << offset
<< "~" << length
<< std::dec
747 << " with " << debug_inflight
<< dendl
;
750 debug_inflight
.insert(offset
, length
);
754 void KernelDevice::debug_aio_link(aio_t
& aio
)
756 if (debug_queue
.empty()) {
759 debug_queue
.push_back(aio
);
762 void KernelDevice::debug_aio_unlink(aio_t
& aio
)
764 if (aio
.queue_item
.is_linked()) {
765 debug_queue
.erase(debug_queue
.iterator_to(aio
));
766 if (debug_oldest
== &aio
) {
767 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
768 if (age
&& debug_stall_since
!= utime_t()) {
769 utime_t cutoff
= ceph_clock_now();
771 if (debug_stall_since
< cutoff
) {
772 derr
<< __func__
<< " stalled aio " << debug_oldest
773 << " since " << debug_stall_since
<< ", timeout is "
779 if (debug_queue
.empty()) {
780 debug_oldest
= nullptr;
782 debug_oldest
= &debug_queue
.front();
784 debug_stall_since
= utime_t();
789 void KernelDevice::_aio_log_finish(
794 dout(20) << __func__
<< " " << aio
<< " 0x"
795 << std::hex
<< offset
<< "~" << length
<< std::dec
<< dendl
;
796 if (cct
->_conf
->bdev_debug_inflight_ios
) {
797 std::lock_guard
l(debug_lock
);
798 debug_inflight
.erase(offset
, length
);
802 void KernelDevice::aio_submit(IOContext
*ioc
)
804 dout(20) << __func__
<< " ioc " << ioc
805 << " pending " << ioc
->num_pending
.load()
806 << " running " << ioc
->num_running
.load()
809 if (ioc
->num_pending
.load() == 0) {
813 // move these aside, and get our end iterator position now, as the
814 // aios might complete as soon as they are submitted and queue more
816 list
<aio_t
>::iterator e
= ioc
->running_aios
.begin();
817 ioc
->running_aios
.splice(e
, ioc
->pending_aios
);
819 int pending
= ioc
->num_pending
.load();
820 ioc
->num_running
+= pending
;
821 ioc
->num_pending
-= pending
;
822 ceph_assert(ioc
->num_pending
.load() == 0); // we should be only thread doing this
823 ceph_assert(ioc
->pending_aios
.size() == 0);
825 if (cct
->_conf
->bdev_debug_aio
) {
826 list
<aio_t
>::iterator p
= ioc
->running_aios
.begin();
828 dout(30) << __func__
<< " " << *p
<< dendl
;
829 std::lock_guard
l(debug_queue_lock
);
830 debug_aio_link(*p
++);
834 void *priv
= static_cast<void*>(ioc
);
836 // num of pending aios should not overflow when passed to submit_batch()
837 assert(pending
<= std::numeric_limits
<uint16_t>::max());
838 r
= io_queue
->submit_batch(ioc
->running_aios
.begin(), e
,
839 pending
, priv
, &retries
);
842 derr
<< __func__
<< " retries " << retries
<< dendl
;
844 derr
<< " aio submit got " << cpp_strerror(r
) << dendl
;
849 int KernelDevice::_sync_write(uint64_t off
, bufferlist
&bl
, bool buffered
, int write_hint
)
851 uint64_t len
= bl
.length();
852 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
853 << std::dec
<< " " << buffermode(buffered
) << dendl
;
854 if (cct
->_conf
->bdev_inject_crash
&&
855 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
856 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
857 << off
<< "~" << len
<< std::dec
<< dendl
;
862 bl
.prepare_iov(&iov
);
868 auto r
= ::pwritev(choose_fd(buffered
, write_hint
),
869 &iov
[idx
], iov
.size() - idx
, o
);
873 derr
<< __func__
<< " pwritev error: " << cpp_strerror(r
) << dendl
;
879 // skip fully processed IOVs
880 while (idx
< iov
.size() && (size_t)r
>= iov
[idx
].iov_len
) {
881 r
-= iov
[idx
++].iov_len
;
883 // update partially processed one if any
885 ceph_assert(idx
< iov
.size());
886 ceph_assert((size_t)r
< iov
[idx
].iov_len
);
887 iov
[idx
].iov_base
= static_cast<char*>(iov
[idx
].iov_base
) + r
;
888 iov
[idx
].iov_len
-= r
;
895 #ifdef HAVE_SYNC_FILE_RANGE
897 // initiate IO and wait till it completes
898 auto r
= ::sync_file_range(fd_buffereds
[WRITE_LIFE_NOT_SET
], off
, len
, SYNC_FILE_RANGE_WRITE
|SYNC_FILE_RANGE_WAIT_AFTER
|SYNC_FILE_RANGE_WAIT_BEFORE
);
901 derr
<< __func__
<< " sync_file_range error: " << cpp_strerror(r
) << dendl
;
907 io_since_flush
.store(true);
912 int KernelDevice::write(
918 uint64_t len
= bl
.length();
919 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
920 << " " << buffermode(buffered
)
922 ceph_assert(is_valid_io(off
, len
));
923 if (cct
->_conf
->objectstore_blackhole
) {
924 lderr(cct
) << __func__
<< " objectstore_blackhole=true, throwing out IO"
929 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
930 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
, IOV_MAX
)) {
931 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
933 dout(40) << "data:\n";
937 return _sync_write(off
, bl
, buffered
, write_hint
);
940 int KernelDevice::aio_write(
947 uint64_t len
= bl
.length();
948 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
949 << " " << buffermode(buffered
)
951 ceph_assert(is_valid_io(off
, len
));
952 if (cct
->_conf
->objectstore_blackhole
) {
953 lderr(cct
) << __func__
<< " objectstore_blackhole=true, throwing out IO"
958 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
959 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
, IOV_MAX
)) {
960 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
962 dout(40) << "data:\n";
966 _aio_log_start(ioc
, off
, len
);
969 if (aio
&& dio
&& !buffered
) {
970 if (cct
->_conf
->bdev_inject_crash
&&
971 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
972 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
973 << off
<< "~" << len
<< std::dec
975 // generate a real io so that aio_wait behaves properly, but make it
976 // a read instead of write, and toss the result.
977 ioc
->pending_aios
.push_back(aio_t(ioc
, choose_fd(false, write_hint
)));
979 auto& aio
= ioc
->pending_aios
.back();
981 ceph::buffer::ptr_node::create(ceph::buffer::create_small_page_aligned(len
)));
982 aio
.bl
.prepare_iov(&aio
.iov
);
983 aio
.preadv(off
, len
);
986 if (bl
.length() <= RW_IO_MAX
) {
987 // fast path (non-huge write)
988 ioc
->pending_aios
.push_back(aio_t(ioc
, choose_fd(false, write_hint
)));
990 auto& aio
= ioc
->pending_aios
.back();
991 bl
.prepare_iov(&aio
.iov
);
992 aio
.bl
.claim_append(bl
);
993 aio
.pwritev(off
, len
);
994 dout(30) << aio
<< dendl
;
995 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
996 << std::dec
<< " aio " << &aio
<< dendl
;
998 // write in RW_IO_MAX-sized chunks
999 uint64_t prev_len
= 0;
1000 while (prev_len
< bl
.length()) {
1002 if (prev_len
+ RW_IO_MAX
< bl
.length()) {
1003 tmp
.substr_of(bl
, prev_len
, RW_IO_MAX
);
1005 tmp
.substr_of(bl
, prev_len
, bl
.length() - prev_len
);
1007 auto len
= tmp
.length();
1008 ioc
->pending_aios
.push_back(aio_t(ioc
, choose_fd(false, write_hint
)));
1010 auto& aio
= ioc
->pending_aios
.back();
1011 tmp
.prepare_iov(&aio
.iov
);
1012 aio
.bl
.claim_append(tmp
);
1013 aio
.pwritev(off
+ prev_len
, len
);
1014 dout(30) << aio
<< dendl
;
1015 dout(5) << __func__
<< " 0x" << std::hex
<< off
+ prev_len
1017 << std::dec
<< " aio " << &aio
<< " (piece)" << dendl
;
1025 int r
= _sync_write(off
, bl
, buffered
, write_hint
);
1026 _aio_log_finish(ioc
, off
, len
);
1033 int KernelDevice::discard(uint64_t offset
, uint64_t len
)
1036 if (cct
->_conf
->objectstore_blackhole
) {
1037 lderr(cct
) << __func__
<< " objectstore_blackhole=true, throwing out IO"
1041 if (support_discard
) {
1042 dout(10) << __func__
1043 << " 0x" << std::hex
<< offset
<< "~" << len
<< std::dec
1046 r
= BlkDev
{fd_directs
[WRITE_LIFE_NOT_SET
]}.discard((int64_t)offset
, (int64_t)len
);
1051 struct ExplicitHugePagePool
{
1052 using region_queue_t
= boost::lockfree::queue
<void*>;
1053 using instrumented_raw
= ceph::buffer_instrumentation::instrumented_raw
<
1054 BlockDevice::hugepaged_raw_marker_t
>;
1056 struct mmaped_buffer_raw
: public instrumented_raw
{
1057 region_queue_t
& region_q
; // for recycling
1059 mmaped_buffer_raw(void* mmaped_region
, ExplicitHugePagePool
& parent
)
1060 : instrumented_raw(static_cast<char*>(mmaped_region
), parent
.buffer_size
),
1061 region_q(parent
.region_q
) {
1062 // the `mmaped_region` has been passed to `raw` as the buffer's `data`
1064 ~mmaped_buffer_raw() override
{
1065 // don't delete nor unmmap; recycle the region instead
1066 region_q
.push(data
);
1068 raw
* clone_empty() override
{
1069 // the entire cloning facility is used solely by the dev-only MemDB.
1070 // see: https://github.com/ceph/ceph/pull/36282
1071 ceph_abort_msg("this should be never called on this path!");
1075 ExplicitHugePagePool(const size_t buffer_size
, size_t buffers_in_pool
)
1076 : buffer_size(buffer_size
), region_q(buffers_in_pool
) {
1077 while (buffers_in_pool
--) {
1078 void* const mmaped_region
= ::mmap(
1081 PROT_READ
| PROT_WRITE
,
1082 #if defined(__FreeBSD__)
1083 // FreeBSD doesn't have MAP_HUGETLB nor MAP_POPULATE but it has
1084 // a different, more automated / implicit mechanisms. However,
1085 // we want to mimic the Linux behavior as closely as possible
1086 // also in the matter of error handling which is the reason
1087 // behind MAP_ALIGNED_SUPER.
1088 // See: https://lists.freebsd.org/pipermail/freebsd-questions/2014-August/260578.html
1089 MAP_PRIVATE
| MAP_ANONYMOUS
| MAP_PREFAULT_READ
| MAP_ALIGNED_SUPER
,
1091 MAP_PRIVATE
| MAP_ANONYMOUS
| MAP_POPULATE
| MAP_HUGETLB
,
1092 #endif // __FreeBSD__
1095 if (mmaped_region
== MAP_FAILED
) {
1096 ceph_abort("can't allocate huge buffer;"
1097 " /proc/sys/vm/nr_hugepages misconfigured?");
1099 region_q
.push(mmaped_region
);
1103 ~ExplicitHugePagePool() {
1104 void* mmaped_region
;
1105 while (region_q
.pop(mmaped_region
)) {
1106 ::munmap(mmaped_region
, buffer_size
);
1110 ceph::unique_leakable_ptr
<buffer::raw
> try_create() {
1111 if (void* mmaped_region
; region_q
.pop(mmaped_region
)) {
1112 return ceph::unique_leakable_ptr
<buffer::raw
> {
1113 new mmaped_buffer_raw(mmaped_region
, *this)
1116 // oops, empty queue.
1121 size_t get_buffer_size() const {
1126 const size_t buffer_size
;
1127 region_queue_t region_q
;
1130 struct HugePagePoolOfPools
{
1131 HugePagePoolOfPools(const std::map
<size_t, size_t> conf
)
1132 : pools(conf
.size(), [conf
] (size_t index
, auto emplacer
) {
1133 ceph_assert(index
< conf
.size());
1134 // it could be replaced with a state-mutating lambda and
1135 // `conf::erase()` but performance is not a concern here.
1136 const auto [buffer_size
, buffers_in_pool
] =
1137 *std::next(std::begin(conf
), index
);
1138 emplacer
.emplace(buffer_size
, buffers_in_pool
);
1142 ceph::unique_leakable_ptr
<buffer::raw
> try_create(const size_t size
) {
1143 // thankfully to `conf` being a `std::map` we store the pools
1144 // sorted by buffer sizes. this would allow to clamp to log(n)
1145 // but I doubt admins want to have dozens of accelerated buffer
1146 // size. let's keep this simple for now.
1147 if (auto iter
= std::find_if(std::begin(pools
), std::end(pools
),
1148 [size
] (const auto& pool
) {
1149 return size
== pool
.get_buffer_size();
1151 iter
!= std::end(pools
)) {
1152 return iter
->try_create();
1157 static HugePagePoolOfPools
from_desc(const std::string
& conf
);
1160 // let's have some space inside (for 2 MB and 4 MB perhaps?)
1161 // NOTE: we need tiny_vector as the boost::lockfree queue inside
1162 // pool is not-movable.
1163 ceph::containers::tiny_vector
<ExplicitHugePagePool
, 2> pools
;
1167 HugePagePoolOfPools
HugePagePoolOfPools::from_desc(const std::string
& desc
) {
1168 std::map
<size_t, size_t> conf
; // buffer_size -> buffers_in_pool
1169 std::map
<std::string
, std::string
> exploded_str_conf
;
1170 get_str_map(desc
, &exploded_str_conf
);
1171 for (const auto& [buffer_size_s
, buffers_in_pool_s
] : exploded_str_conf
) {
1172 size_t buffer_size
, buffers_in_pool
;
1173 if (sscanf(buffer_size_s
.c_str(), "%zu", &buffer_size
) != 1) {
1174 ceph_abort("can't parse a key in the configuration");
1176 if (sscanf(buffers_in_pool_s
.c_str(), "%zu", &buffers_in_pool
) != 1) {
1177 ceph_abort("can't parse a value in the configuration");
1179 conf
[buffer_size
] = buffers_in_pool
;
1181 return HugePagePoolOfPools
{std::move(conf
)};
1184 // create a buffer basing on user-configurable. it's intended to make
1185 // our buffers THP-able.
1186 ceph::unique_leakable_ptr
<buffer::raw
> KernelDevice::create_custom_aligned(
1188 IOContext
* const ioc
) const
1190 // just to preserve the logic of create_small_page_aligned().
1191 if (len
< CEPH_PAGE_SIZE
) {
1192 return ceph::buffer::create_small_page_aligned(len
);
1194 static HugePagePoolOfPools hp_pools
= HugePagePoolOfPools::from_desc(
1195 cct
->_conf
.get_val
<std::string
>("bdev_read_preallocated_huge_buffers")
1197 if (auto lucky_raw
= hp_pools
.try_create(len
); lucky_raw
) {
1198 dout(20) << __func__
<< " allocated from huge pool"
1199 << " lucky_raw.data=" << (void*)lucky_raw
->get_data()
1200 << " bdev_read_preallocated_huge_buffers="
1201 << cct
->_conf
.get_val
<std::string
>("bdev_read_preallocated_huge_buffers")
1203 ioc
->flags
|= IOContext::FLAG_DONT_CACHE
;
1206 // fallthrough due to empty buffer pool. this can happen also
1207 // when the configurable was explicitly set to 0.
1208 dout(20) << __func__
<< " cannot allocate from huge pool"
1212 const size_t custom_alignment
= cct
->_conf
->bdev_read_buffer_alignment
;
1213 dout(20) << __func__
<< " with the custom alignment;"
1215 << " custom_alignment=" << custom_alignment
1217 return ceph::buffer::create_aligned(len
, custom_alignment
);
1220 int KernelDevice::read(uint64_t off
, uint64_t len
, bufferlist
*pbl
,
1224 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1225 << " " << buffermode(buffered
)
1227 ceph_assert(is_valid_io(off
, len
));
1229 _aio_log_start(ioc
, off
, len
);
1231 auto start1
= mono_clock::now();
1233 auto p
= ceph::buffer::ptr_node::create(create_custom_aligned(len
, ioc
));
1234 int r
= ::pread(choose_fd(buffered
, WRITE_LIFE_NOT_SET
),
1235 p
->c_str(), len
, off
);
1236 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
1237 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1238 derr
<< __func__
<< " stalled read "
1239 << " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1240 << " " << buffermode(buffered
)
1241 << " since " << start1
<< ", timeout is "
1246 if (ioc
->allow_eio
&& is_expected_ioerr(r
)) {
1251 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << std::left
1252 << std::dec
<< " error: " << cpp_strerror(r
) << dendl
;
1255 ceph_assert((uint64_t)r
== len
);
1256 pbl
->push_back(std::move(p
));
1258 dout(40) << "data:\n";
1259 pbl
->hexdump(*_dout
);
1263 _aio_log_finish(ioc
, off
, len
);
1264 return r
< 0 ? r
: 0;
1267 int KernelDevice::aio_read(
1273 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1279 ceph_assert(is_valid_io(off
, len
));
1280 _aio_log_start(ioc
, off
, len
);
1281 ioc
->pending_aios
.push_back(aio_t(ioc
, fd_directs
[WRITE_LIFE_NOT_SET
]));
1283 aio_t
& aio
= ioc
->pending_aios
.back();
1285 ceph::buffer::ptr_node::create(create_custom_aligned(len
, ioc
)));
1286 aio
.bl
.prepare_iov(&aio
.iov
);
1287 aio
.preadv(off
, len
);
1288 dout(30) << aio
<< dendl
;
1289 pbl
->append(aio
.bl
);
1290 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
1291 << std::dec
<< " aio " << &aio
<< dendl
;
1295 r
= read(off
, len
, pbl
, ioc
, false);
1301 int KernelDevice::direct_read_unaligned(uint64_t off
, uint64_t len
, char *buf
)
1303 uint64_t aligned_off
= p2align(off
, block_size
);
1304 uint64_t aligned_len
= p2roundup(off
+len
, block_size
) - aligned_off
;
1305 bufferptr p
= ceph::buffer::create_small_page_aligned(aligned_len
);
1308 auto start1
= mono_clock::now();
1309 r
= ::pread(fd_directs
[WRITE_LIFE_NOT_SET
], p
.c_str(), aligned_len
, aligned_off
);
1310 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
1311 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1312 derr
<< __func__
<< " stalled read "
1313 << " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1314 << " since " << start1
<< ", timeout is "
1321 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1322 << " error: " << cpp_strerror(r
) << dendl
;
1325 ceph_assert((uint64_t)r
== aligned_len
);
1326 memcpy(buf
, p
.c_str() + (off
- aligned_off
), len
);
1328 dout(40) << __func__
<< " data:\n";
1330 bl
.append(buf
, len
);
1335 return r
< 0 ? r
: 0;
1338 int KernelDevice::read_random(uint64_t off
, uint64_t len
, char *buf
,
1341 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1342 << "buffered " << buffered
1344 ceph_assert(len
> 0);
1345 ceph_assert(off
< size
);
1346 ceph_assert(off
+ len
<= size
);
1348 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
1350 //if it's direct io and unaligned, we have to use a internal buffer
1351 if (!buffered
&& ((off
% block_size
!= 0)
1352 || (len
% block_size
!= 0)
1353 || (uintptr_t(buf
) % CEPH_PAGE_SIZE
!= 0)))
1354 return direct_read_unaligned(off
, len
, buf
);
1356 auto start1
= mono_clock::now();
1361 uint64_t left
= len
;
1363 r
= ::pread(fd_buffereds
[WRITE_LIFE_NOT_SET
], t
, left
, off
);
1366 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << left
1367 << std::dec
<< " error: " << cpp_strerror(r
) << dendl
;
1374 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1375 derr
<< __func__
<< " stalled read "
1376 << " 0x" << std::hex
<< off0
<< "~" << len
<< std::dec
1377 << " (buffered) since " << start1
<< ", timeout is "
1382 //direct and aligned read
1383 r
= ::pread(fd_directs
[WRITE_LIFE_NOT_SET
], buf
, len
, off
);
1384 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1385 derr
<< __func__
<< " stalled read "
1386 << " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1387 << " (direct) since " << start1
<< ", timeout is "
1393 derr
<< __func__
<< " direct_aligned_read" << " 0x" << std::hex
1394 << off
<< "~" << std::left
<< std::dec
<< " error: " << cpp_strerror(r
)
1398 ceph_assert((uint64_t)r
== len
);
1401 dout(40) << __func__
<< " data:\n";
1403 bl
.append(buf
, len
);
1408 return r
< 0 ? r
: 0;
1411 int KernelDevice::invalidate_cache(uint64_t off
, uint64_t len
)
1413 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1415 ceph_assert(off
% block_size
== 0);
1416 ceph_assert(len
% block_size
== 0);
1417 int r
= posix_fadvise(fd_buffereds
[WRITE_LIFE_NOT_SET
], off
, len
, POSIX_FADV_DONTNEED
);
1420 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1421 << " error: " << cpp_strerror(r
) << dendl
;