1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include <sys/types.h>
22 #include "KernelDevice.h"
23 #include "include/intarith.h"
24 #include "include/types.h"
25 #include "include/compat.h"
26 #include "include/stringify.h"
27 #include "common/blkdev.h"
28 #include "common/errno.h"
29 #if defined(__FreeBSD__)
30 #include "bsm/audit_errno.h"
32 #include "common/debug.h"
33 #include "common/numa.h"
35 #include "global/global_context.h"
36 #include "ceph_io_uring.h"
38 #define dout_context cct
39 #define dout_subsys ceph_subsys_bdev
41 #define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
43 KernelDevice::KernelDevice(CephContext
* cct
, aio_callback_t cb
, void *cbpriv
, aio_callback_t d_cb
, void *d_cbpriv
)
44 : BlockDevice(cct
, cb
, cbpriv
),
45 aio(false), dio(false),
46 discard_callback(d_cb
),
47 discard_callback_priv(d_cbpriv
),
49 discard_started(false),
55 fd_directs
.resize(WRITE_LIFE_MAX
, -1);
56 fd_buffereds
.resize(WRITE_LIFE_MAX
, -1);
58 bool use_ioring
= g_ceph_context
->_conf
.get_val
<bool>("bluestore_ioring");
59 unsigned int iodepth
= cct
->_conf
->bdev_aio_max_queue_depth
;
61 if (use_ioring
&& ioring_queue_t::supported()) {
62 io_queue
= std::make_unique
<ioring_queue_t
>(iodepth
);
65 if (use_ioring
&& !once
) {
66 derr
<< "WARNING: io_uring API is not supported! Fallback to libaio!"
70 io_queue
= std::make_unique
<aio_queue_t
>(iodepth
);
74 int KernelDevice::_lock()
76 dout(10) << __func__
<< " " << fd_directs
[WRITE_LIFE_NOT_SET
] << dendl
;
77 int r
= ::flock(fd_directs
[WRITE_LIFE_NOT_SET
], LOCK_EX
| LOCK_NB
);
79 derr
<< __func__
<< " flock failed on " << path
<< dendl
;
85 int KernelDevice::open(const string
& p
)
89 dout(1) << __func__
<< " path " << path
<< dendl
;
91 for (i
= 0; i
< WRITE_LIFE_MAX
; i
++) {
92 int fd
= ::open(path
.c_str(), O_RDWR
| O_DIRECT
);
99 fd
= ::open(path
.c_str(), O_RDWR
| O_CLOEXEC
);
104 fd_buffereds
[i
] = fd
;
107 if (i
!= WRITE_LIFE_MAX
) {
108 derr
<< __func__
<< " open got: " << cpp_strerror(r
) << dendl
;
112 #if defined(F_SET_FILE_RW_HINT)
113 for (i
= WRITE_LIFE_NONE
; i
< WRITE_LIFE_MAX
; i
++) {
114 if (fcntl(fd_directs
[i
], F_SET_FILE_RW_HINT
, &i
) < 0) {
118 if (fcntl(fd_buffereds
[i
], F_SET_FILE_RW_HINT
, &i
) < 0) {
123 if (i
!= WRITE_LIFE_MAX
) {
125 dout(0) << "ioctl(F_SET_FILE_RW_HINT) on " << path
<< " failed: " << cpp_strerror(r
) << dendl
;
130 aio
= cct
->_conf
->bdev_aio
;
132 ceph_abort_msg("non-aio not supported");
135 // disable readahead as it will wreak havoc on our mix of
136 // directio/aio and buffered io.
137 r
= posix_fadvise(fd_buffereds
[WRITE_LIFE_NOT_SET
], 0, 0, POSIX_FADV_RANDOM
);
140 derr
<< __func__
<< " posix_fadvise got: " << cpp_strerror(r
) << dendl
;
144 if (lock_exclusive
) {
147 derr
<< __func__
<< " failed to lock " << path
<< ": " << cpp_strerror(r
)
154 r
= ::fstat(fd_directs
[WRITE_LIFE_NOT_SET
], &st
);
157 derr
<< __func__
<< " fstat got " << cpp_strerror(r
) << dendl
;
161 // Operate as though the block size is 4 KB. The backing file
162 // blksize doesn't strictly matter except that some file systems may
163 // require a read/modify/write if we write something smaller than
165 block_size
= cct
->_conf
->bdev_block_size
;
166 if (block_size
!= (unsigned)st
.st_blksize
) {
167 dout(1) << __func__
<< " backing device/file reports st_blksize "
168 << st
.st_blksize
<< ", using bdev_block_size "
169 << block_size
<< " anyway" << dendl
;
174 BlkDev
blkdev_direct(fd_directs
[WRITE_LIFE_NOT_SET
]);
175 BlkDev
blkdev_buffered(fd_buffereds
[WRITE_LIFE_NOT_SET
]);
177 if (S_ISBLK(st
.st_mode
)) {
179 r
= blkdev_direct
.get_size(&s
);
188 char partition
[PATH_MAX
], devname
[PATH_MAX
];
189 if ((r
= blkdev_buffered
.partition(partition
, PATH_MAX
)) ||
190 (r
= blkdev_buffered
.wholedisk(devname
, PATH_MAX
))) {
191 derr
<< "unable to get device name for " << path
<< ": "
192 << cpp_strerror(r
) << dendl
;
195 dout(20) << __func__
<< " devname " << devname
<< dendl
;
196 rotational
= blkdev_buffered
.is_rotational();
197 support_discard
= blkdev_buffered
.support_discard();
198 this->devname
= devname
;
209 // round size down to an even block
210 size
&= ~(block_size
- 1);
214 << " (0x" << std::hex
<< size
<< std::dec
<< ", "
215 << byte_u_t(size
) << ")"
216 << " block_size " << block_size
217 << " (" << byte_u_t(block_size
) << ")"
218 << " " << (rotational
? "rotational" : "non-rotational")
219 << " discard " << (support_discard
? "supported" : "not supported")
224 for (i
= 0; i
< WRITE_LIFE_MAX
; i
++) {
225 if (fd_directs
[i
] >= 0) {
226 VOID_TEMP_FAILURE_RETRY(::close(fd_directs
[i
]));
231 if (fd_buffereds
[i
] >= 0) {
232 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds
[i
]));
233 fd_buffereds
[i
] = -1;
241 int KernelDevice::get_devices(std::set
<std::string
> *ls
) const
243 if (devname
.empty()) {
246 get_raw_devices(devname
, ls
);
250 void KernelDevice::close()
252 dout(1) << __func__
<< dendl
;
257 VOID_TEMP_FAILURE_RETRY(::close(vdo_fd
));
261 for (int i
= 0; i
< WRITE_LIFE_MAX
; i
++) {
262 assert(fd_directs
[i
] >= 0);
263 VOID_TEMP_FAILURE_RETRY(::close(fd_directs
[i
]));
266 assert(fd_buffereds
[i
] >= 0);
267 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds
[i
]));
268 fd_buffereds
[i
] = -1;
273 int KernelDevice::collect_metadata(const string
& prefix
, map
<string
,string
> *pm
) const
275 (*pm
)[prefix
+ "support_discard"] = stringify((int)(bool)support_discard
);
276 (*pm
)[prefix
+ "rotational"] = stringify((int)(bool)rotational
);
277 (*pm
)[prefix
+ "size"] = stringify(get_size());
278 (*pm
)[prefix
+ "block_size"] = stringify(get_block_size());
279 (*pm
)[prefix
+ "driver"] = "KernelDevice";
281 (*pm
)[prefix
+ "type"] = "hdd";
283 (*pm
)[prefix
+ "type"] = "ssd";
286 (*pm
)[prefix
+ "vdo"] = "true";
287 uint64_t total
, avail
;
288 get_vdo_utilization(vdo_fd
, &total
, &avail
);
289 (*pm
)[prefix
+ "vdo_physical_size"] = stringify(total
);
294 std::set
<std::string
> devnames
;
295 if (get_devices(&devnames
) == 0) {
296 for (auto& dev
: devnames
) {
297 if (!res_names
.empty()) {
302 if (res_names
.size()) {
303 (*pm
)[prefix
+ "devices"] = res_names
;
309 int r
= ::fstat(fd_buffereds
[WRITE_LIFE_NOT_SET
], &st
);
312 if (S_ISBLK(st
.st_mode
)) {
313 (*pm
)[prefix
+ "access_mode"] = "blk";
315 char buffer
[1024] = {0};
316 BlkDev blkdev
{fd_buffereds
[WRITE_LIFE_NOT_SET
]};
317 if (r
= blkdev
.partition(buffer
, sizeof(buffer
)); r
) {
318 (*pm
)[prefix
+ "partition_path"] = "unknown";
320 (*pm
)[prefix
+ "partition_path"] = buffer
;
323 if (r
= blkdev
.partition(buffer
, sizeof(buffer
)); r
) {
324 (*pm
)[prefix
+ "dev_node"] = "unknown";
326 (*pm
)[prefix
+ "dev_node"] = buffer
;
332 blkdev
.model(buffer
, sizeof(buffer
));
333 (*pm
)[prefix
+ "model"] = buffer
;
336 blkdev
.dev(buffer
, sizeof(buffer
));
337 (*pm
)[prefix
+ "dev"] = buffer
;
339 // nvme exposes a serial number
341 blkdev
.serial(buffer
, sizeof(buffer
));
342 (*pm
)[prefix
+ "serial"] = buffer
;
346 r
= blkdev
.get_numa_node(&node
);
348 (*pm
)[prefix
+ "numa_node"] = stringify(node
);
351 (*pm
)[prefix
+ "access_mode"] = "file";
352 (*pm
)[prefix
+ "path"] = path
;
357 void KernelDevice::_detect_vdo()
359 vdo_fd
= get_vdo_stats_handle(devname
.c_str(), &vdo_name
);
361 dout(1) << __func__
<< " VDO volume " << vdo_name
362 << " maps to " << devname
<< dendl
;
364 dout(20) << __func__
<< " no VDO volume maps to " << devname
<< dendl
;
369 bool KernelDevice::get_thin_utilization(uint64_t *total
, uint64_t *avail
) const
374 return get_vdo_utilization(vdo_fd
, total
, avail
);
377 int KernelDevice::choose_fd(bool buffered
, int write_hint
) const
379 assert(write_hint
>= WRITE_LIFE_NOT_SET
&& write_hint
< WRITE_LIFE_MAX
);
381 write_hint
= WRITE_LIFE_NOT_SET
;
382 return buffered
? fd_buffereds
[write_hint
] : fd_directs
[write_hint
];
385 int KernelDevice::flush()
387 // protect flush with a mutex. note that we are not really protecting
388 // data here. instead, we're ensuring that if any flush() caller
389 // sees that io_since_flush is true, they block any racing callers
390 // until the flush is observed. that allows racing threads to be
391 // calling flush while still ensuring that *any* of them that got an
392 // aio completion notification will not return before that aio is
393 // stable on disk: whichever thread sees the flag first will block
394 // followers until the aio is stable.
395 std::lock_guard
l(flush_mutex
);
398 if (!io_since_flush
.compare_exchange_strong(expect
, false)) {
399 dout(10) << __func__
<< " no-op (no ios since last flush), flag is "
400 << (int)io_since_flush
.load() << dendl
;
404 dout(10) << __func__
<< " start" << dendl
;
405 if (cct
->_conf
->bdev_inject_crash
) {
407 // sleep for a moment to give other threads a chance to submit or
408 // wait on io that races with a flush.
409 derr
<< __func__
<< " injecting crash. first we sleep..." << dendl
;
410 sleep(cct
->_conf
->bdev_inject_crash_flush_delay
);
411 derr
<< __func__
<< " and now we die" << dendl
;
415 utime_t start
= ceph_clock_now();
416 int r
= ::fdatasync(fd_directs
[WRITE_LIFE_NOT_SET
]);
417 utime_t end
= ceph_clock_now();
418 utime_t dur
= end
- start
;
421 derr
<< __func__
<< " fdatasync got: " << cpp_strerror(r
) << dendl
;
424 dout(5) << __func__
<< " in " << dur
<< dendl
;;
428 int KernelDevice::_aio_start()
431 dout(10) << __func__
<< dendl
;
432 int r
= io_queue
->init(fd_directs
);
435 derr
<< __func__
<< " io_setup(2) failed with EAGAIN; "
436 << "try increasing /proc/sys/fs/aio-max-nr" << dendl
;
438 derr
<< __func__
<< " io_setup(2) failed: " << cpp_strerror(r
) << dendl
;
442 aio_thread
.create("bstore_aio");
447 void KernelDevice::_aio_stop()
450 dout(10) << __func__
<< dendl
;
454 io_queue
->shutdown();
458 int KernelDevice::_discard_start()
460 discard_thread
.create("bstore_discard");
464 void KernelDevice::_discard_stop()
466 dout(10) << __func__
<< dendl
;
468 std::unique_lock
l(discard_lock
);
469 while (!discard_started
) {
470 discard_cond
.wait(l
);
473 discard_cond
.notify_all();
475 discard_thread
.join();
477 std::lock_guard
l(discard_lock
);
478 discard_stop
= false;
480 dout(10) << __func__
<< " stopped" << dendl
;
483 void KernelDevice::discard_drain()
485 dout(10) << __func__
<< dendl
;
486 std::unique_lock
l(discard_lock
);
487 while (!discard_queued
.empty() || discard_running
) {
488 discard_cond
.wait(l
);
492 static bool is_expected_ioerr(const int r
)
494 // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
495 return (r
== -EOPNOTSUPP
|| r
== -ETIMEDOUT
|| r
== -ENOSPC
||
496 r
== -ENOLINK
|| r
== -EREMOTEIO
|| r
== -EAGAIN
|| r
== -EIO
||
497 r
== -ENODATA
|| r
== -EILSEQ
|| r
== -ENOMEM
||
498 #if defined(__linux__)
499 r
== -EREMCHG
|| r
== -EBADE
500 #elif defined(__FreeBSD__)
501 r
== - BSM_ERRNO_EREMCHG
|| r
== -BSM_ERRNO_EBADE
506 void KernelDevice::_aio_thread()
508 dout(10) << __func__
<< " start" << dendl
;
509 int inject_crash_count
= 0;
511 dout(40) << __func__
<< " polling" << dendl
;
512 int max
= cct
->_conf
->bdev_aio_reap_max
;
514 int r
= io_queue
->get_next_completed(cct
->_conf
->bdev_aio_poll_ms
,
517 derr
<< __func__
<< " got " << cpp_strerror(r
) << dendl
;
518 ceph_abort_msg("got unexpected error from io_getevents");
521 dout(30) << __func__
<< " got " << r
<< " completed aios" << dendl
;
522 for (int i
= 0; i
< r
; ++i
) {
523 IOContext
*ioc
= static_cast<IOContext
*>(aio
[i
]->priv
);
524 _aio_log_finish(ioc
, aio
[i
]->offset
, aio
[i
]->length
);
525 if (aio
[i
]->queue_item
.is_linked()) {
526 std::lock_guard
l(debug_queue_lock
);
527 debug_aio_unlink(*aio
[i
]);
530 // set flag indicating new ios have completed. we do this *before*
531 // any completion or notifications so that any user flush() that
532 // follows the observed io completion will include this io. Note
533 // that an earlier, racing flush() could observe and clear this
534 // flag, but that also ensures that the IO will be stable before the
535 // later flush() occurs.
536 io_since_flush
.store(true);
538 long r
= aio
[i
]->get_return_value();
540 derr
<< __func__
<< " got r=" << r
<< " (" << cpp_strerror(r
) << ")"
542 if (ioc
->allow_eio
&& is_expected_ioerr(r
)) {
543 derr
<< __func__
<< " translating the error to EIO for upper layer"
545 ioc
->set_return_value(-EIO
);
547 if (is_expected_ioerr(r
)) {
552 #if defined(HAVE_POSIXAIO)
553 aio
[i
]->aio
.aiocb
.aio_lio_opcode
,
555 aio
[i
]->iocb
.aio_lio_opcode
,
560 "Unexpected IO error. "
561 "This may suggest a hardware issue. "
562 "Please check your kernel log!");
565 "Unexpected IO error. "
566 "This may suggest HW issue. Please check your dmesg!");
568 } else if (aio
[i
]->length
!= (uint64_t)r
) {
569 derr
<< "aio to 0x" << std::hex
<< aio
[i
]->offset
570 << "~" << aio
[i
]->length
<< std::dec
571 << " but returned: " << r
<< dendl
;
572 ceph_abort_msg("unexpected aio return value: does not match length");
575 dout(10) << __func__
<< " finished aio " << aio
[i
] << " r " << r
577 << " with " << (ioc
->num_running
.load() - 1)
578 << " aios left" << dendl
;
580 // NOTE: once num_running and we either call the callback or
581 // call aio_wake we cannot touch ioc or aio[] as the caller
584 if (--ioc
->num_running
== 0) {
585 aio_callback(aio_callback_priv
, ioc
->priv
);
592 if (cct
->_conf
->bdev_debug_aio
) {
593 utime_t now
= ceph_clock_now();
594 std::lock_guard
l(debug_queue_lock
);
596 if (debug_stall_since
== utime_t()) {
597 debug_stall_since
= now
;
599 if (cct
->_conf
->bdev_debug_aio_suicide_timeout
) {
600 utime_t cutoff
= now
;
601 cutoff
-= cct
->_conf
->bdev_debug_aio_suicide_timeout
;
602 if (debug_stall_since
< cutoff
) {
603 derr
<< __func__
<< " stalled aio " << debug_oldest
604 << " since " << debug_stall_since
<< ", timeout is "
605 << cct
->_conf
->bdev_debug_aio_suicide_timeout
606 << "s, suicide" << dendl
;
607 ceph_abort_msg("stalled aio... buggy kernel or bad device?");
614 if (cct
->_conf
->bdev_inject_crash
) {
615 ++inject_crash_count
;
616 if (inject_crash_count
* cct
->_conf
->bdev_aio_poll_ms
/ 1000 >
617 cct
->_conf
->bdev_inject_crash
+ cct
->_conf
->bdev_inject_crash_flush_delay
) {
618 derr
<< __func__
<< " bdev_inject_crash trigger from aio thread"
626 dout(10) << __func__
<< " end" << dendl
;
629 void KernelDevice::_discard_thread()
631 std::unique_lock
l(discard_lock
);
632 ceph_assert(!discard_started
);
633 discard_started
= true;
634 discard_cond
.notify_all();
636 ceph_assert(discard_finishing
.empty());
637 if (discard_queued
.empty()) {
640 dout(20) << __func__
<< " sleep" << dendl
;
641 discard_cond
.notify_all(); // for the thread trying to drain...
642 discard_cond
.wait(l
);
643 dout(20) << __func__
<< " wake" << dendl
;
645 discard_finishing
.swap(discard_queued
);
646 discard_running
= true;
648 dout(20) << __func__
<< " finishing" << dendl
;
649 for (auto p
= discard_finishing
.begin();p
!= discard_finishing
.end(); ++p
) {
650 discard(p
.get_start(), p
.get_len());
653 discard_callback(discard_callback_priv
, static_cast<void*>(&discard_finishing
));
654 discard_finishing
.clear();
656 discard_running
= false;
659 dout(10) << __func__
<< " finish" << dendl
;
660 discard_started
= false;
663 int KernelDevice::queue_discard(interval_set
<uint64_t> &to_release
)
665 if (!support_discard
)
668 if (to_release
.empty())
671 std::lock_guard
l(discard_lock
);
672 discard_queued
.insert(to_release
);
673 discard_cond
.notify_all();
677 void KernelDevice::_aio_log_start(
682 dout(20) << __func__
<< " 0x" << std::hex
<< offset
<< "~" << length
683 << std::dec
<< dendl
;
684 if (cct
->_conf
->bdev_debug_inflight_ios
) {
685 std::lock_guard
l(debug_lock
);
686 if (debug_inflight
.intersects(offset
, length
)) {
687 derr
<< __func__
<< " inflight overlap of 0x"
689 << offset
<< "~" << length
<< std::dec
690 << " with " << debug_inflight
<< dendl
;
693 debug_inflight
.insert(offset
, length
);
697 void KernelDevice::debug_aio_link(aio_t
& aio
)
699 if (debug_queue
.empty()) {
702 debug_queue
.push_back(aio
);
705 void KernelDevice::debug_aio_unlink(aio_t
& aio
)
707 if (aio
.queue_item
.is_linked()) {
708 debug_queue
.erase(debug_queue
.iterator_to(aio
));
709 if (debug_oldest
== &aio
) {
710 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
711 if (age
&& debug_stall_since
!= utime_t()) {
712 utime_t cutoff
= ceph_clock_now();
714 if (debug_stall_since
< cutoff
) {
715 derr
<< __func__
<< " stalled aio " << debug_oldest
716 << " since " << debug_stall_since
<< ", timeout is "
722 if (debug_queue
.empty()) {
723 debug_oldest
= nullptr;
725 debug_oldest
= &debug_queue
.front();
727 debug_stall_since
= utime_t();
732 void KernelDevice::_aio_log_finish(
737 dout(20) << __func__
<< " " << aio
<< " 0x"
738 << std::hex
<< offset
<< "~" << length
<< std::dec
<< dendl
;
739 if (cct
->_conf
->bdev_debug_inflight_ios
) {
740 std::lock_guard
l(debug_lock
);
741 debug_inflight
.erase(offset
, length
);
745 void KernelDevice::aio_submit(IOContext
*ioc
)
747 dout(20) << __func__
<< " ioc " << ioc
748 << " pending " << ioc
->num_pending
.load()
749 << " running " << ioc
->num_running
.load()
752 if (ioc
->num_pending
.load() == 0) {
756 // move these aside, and get our end iterator position now, as the
757 // aios might complete as soon as they are submitted and queue more
759 list
<aio_t
>::iterator e
= ioc
->running_aios
.begin();
760 ioc
->running_aios
.splice(e
, ioc
->pending_aios
);
762 int pending
= ioc
->num_pending
.load();
763 ioc
->num_running
+= pending
;
764 ioc
->num_pending
-= pending
;
765 ceph_assert(ioc
->num_pending
.load() == 0); // we should be only thread doing this
766 ceph_assert(ioc
->pending_aios
.size() == 0);
768 if (cct
->_conf
->bdev_debug_aio
) {
769 list
<aio_t
>::iterator p
= ioc
->running_aios
.begin();
771 dout(30) << __func__
<< " " << *p
<< dendl
;
772 std::lock_guard
l(debug_queue_lock
);
773 debug_aio_link(*p
++);
777 void *priv
= static_cast<void*>(ioc
);
779 r
= io_queue
->submit_batch(ioc
->running_aios
.begin(), e
,
780 pending
, priv
, &retries
);
783 derr
<< __func__
<< " retries " << retries
<< dendl
;
785 derr
<< " aio submit got " << cpp_strerror(r
) << dendl
;
790 int KernelDevice::_sync_write(uint64_t off
, bufferlist
&bl
, bool buffered
, int write_hint
)
792 uint64_t len
= bl
.length();
793 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
794 << std::dec
<< (buffered
? " (buffered)" : " (direct)") << dendl
;
795 if (cct
->_conf
->bdev_inject_crash
&&
796 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
797 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
798 << off
<< "~" << len
<< std::dec
<< dendl
;
803 bl
.prepare_iov(&iov
);
809 auto r
= ::pwritev(choose_fd(buffered
, write_hint
),
810 &iov
[idx
], iov
.size() - idx
, o
);
814 derr
<< __func__
<< " pwritev error: " << cpp_strerror(r
) << dendl
;
820 // skip fully processed IOVs
821 while (idx
< iov
.size() && (size_t)r
>= iov
[idx
].iov_len
) {
822 r
-= iov
[idx
++].iov_len
;
824 // update partially processed one if any
826 ceph_assert(idx
< iov
.size());
827 ceph_assert((size_t)r
< iov
[idx
].iov_len
);
828 iov
[idx
].iov_base
= static_cast<char*>(iov
[idx
].iov_base
) + r
;
829 iov
[idx
].iov_len
-= r
;
836 #ifdef HAVE_SYNC_FILE_RANGE
838 // initiate IO and wait till it completes
839 auto r
= ::sync_file_range(fd_buffereds
[WRITE_LIFE_NOT_SET
], off
, len
, SYNC_FILE_RANGE_WRITE
|SYNC_FILE_RANGE_WAIT_AFTER
|SYNC_FILE_RANGE_WAIT_BEFORE
);
842 derr
<< __func__
<< " sync_file_range error: " << cpp_strerror(r
) << dendl
;
848 io_since_flush
.store(true);
853 int KernelDevice::write(
859 uint64_t len
= bl
.length();
860 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
861 << (buffered
? " (buffered)" : " (direct)")
863 ceph_assert(is_valid_io(off
, len
));
864 if (cct
->_conf
->objectstore_blackhole
) {
865 lderr(cct
) << __func__
<< " objectstore_blackhole=true, throwing out IO"
870 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
871 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
, IOV_MAX
)) {
872 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
874 dout(40) << "data: ";
878 return _sync_write(off
, bl
, buffered
, write_hint
);
881 int KernelDevice::aio_write(
888 uint64_t len
= bl
.length();
889 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
890 << (buffered
? " (buffered)" : " (direct)")
892 ceph_assert(is_valid_io(off
, len
));
893 if (cct
->_conf
->objectstore_blackhole
) {
894 lderr(cct
) << __func__
<< " objectstore_blackhole=true, throwing out IO"
899 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
900 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
, IOV_MAX
)) {
901 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
903 dout(40) << "data: ";
907 _aio_log_start(ioc
, off
, len
);
910 if (aio
&& dio
&& !buffered
) {
911 if (cct
->_conf
->bdev_inject_crash
&&
912 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
913 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
914 << off
<< "~" << len
<< std::dec
916 // generate a real io so that aio_wait behaves properly, but make it
917 // a read instead of write, and toss the result.
918 ioc
->pending_aios
.push_back(aio_t(ioc
, choose_fd(false, write_hint
)));
920 auto& aio
= ioc
->pending_aios
.back();
921 bufferptr p
= buffer::create_small_page_aligned(len
);
922 aio
.bl
.append(std::move(p
));
923 aio
.bl
.prepare_iov(&aio
.iov
);
924 aio
.preadv(off
, len
);
927 if (bl
.length() <= RW_IO_MAX
) {
928 // fast path (non-huge write)
929 ioc
->pending_aios
.push_back(aio_t(ioc
, choose_fd(false, write_hint
)));
931 auto& aio
= ioc
->pending_aios
.back();
932 bl
.prepare_iov(&aio
.iov
);
933 aio
.bl
.claim_append(bl
);
934 aio
.pwritev(off
, len
);
935 dout(30) << aio
<< dendl
;
936 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
937 << std::dec
<< " aio " << &aio
<< dendl
;
939 // write in RW_IO_MAX-sized chunks
940 uint64_t prev_len
= 0;
941 while (prev_len
< bl
.length()) {
943 if (prev_len
+ RW_IO_MAX
< bl
.length()) {
944 tmp
.substr_of(bl
, prev_len
, RW_IO_MAX
);
946 tmp
.substr_of(bl
, prev_len
, bl
.length() - prev_len
);
948 auto len
= tmp
.length();
949 ioc
->pending_aios
.push_back(aio_t(ioc
, choose_fd(false, write_hint
)));
951 auto& aio
= ioc
->pending_aios
.back();
952 tmp
.prepare_iov(&aio
.iov
);
953 aio
.bl
.claim_append(tmp
);
954 aio
.pwritev(off
+ prev_len
, len
);
955 dout(30) << aio
<< dendl
;
956 dout(5) << __func__
<< " 0x" << std::hex
<< off
+ prev_len
958 << std::dec
<< " aio " << &aio
<< " (piece)" << dendl
;
966 int r
= _sync_write(off
, bl
, buffered
, write_hint
);
967 _aio_log_finish(ioc
, off
, len
);
974 int KernelDevice::discard(uint64_t offset
, uint64_t len
)
977 if (cct
->_conf
->objectstore_blackhole
) {
978 lderr(cct
) << __func__
<< " objectstore_blackhole=true, throwing out IO"
982 if (support_discard
) {
984 << " 0x" << std::hex
<< offset
<< "~" << len
<< std::dec
987 r
= BlkDev
{fd_directs
[WRITE_LIFE_NOT_SET
]}.discard((int64_t)offset
, (int64_t)len
);
992 int KernelDevice::read(uint64_t off
, uint64_t len
, bufferlist
*pbl
,
996 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
997 << (buffered
? " (buffered)" : " (direct)")
999 ceph_assert(is_valid_io(off
, len
));
1001 _aio_log_start(ioc
, off
, len
);
1003 auto start1
= mono_clock::now();
1005 auto p
= buffer::ptr_node::create(buffer::create_small_page_aligned(len
));
1006 int r
= ::pread(buffered
? fd_buffereds
[WRITE_LIFE_NOT_SET
] : fd_directs
[WRITE_LIFE_NOT_SET
],
1007 p
->c_str(), len
, off
);
1008 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
1009 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1010 derr
<< __func__
<< " stalled read "
1011 << " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1012 << (buffered
? " (buffered)" : " (direct)")
1013 << " since " << start1
<< ", timeout is "
1019 if (ioc
->allow_eio
&& is_expected_ioerr(r
)) {
1026 ceph_assert((uint64_t)r
== len
);
1027 pbl
->push_back(std::move(p
));
1029 dout(40) << "data: ";
1030 pbl
->hexdump(*_dout
);
1034 _aio_log_finish(ioc
, off
, len
);
1035 return r
< 0 ? r
: 0;
1038 int KernelDevice::aio_read(
1044 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1050 ceph_assert(is_valid_io(off
, len
));
1051 _aio_log_start(ioc
, off
, len
);
1052 ioc
->pending_aios
.push_back(aio_t(ioc
, fd_directs
[WRITE_LIFE_NOT_SET
]));
1054 aio_t
& aio
= ioc
->pending_aios
.back();
1055 bufferptr p
= buffer::create_small_page_aligned(len
);
1056 aio
.bl
.append(std::move(p
));
1057 aio
.bl
.prepare_iov(&aio
.iov
);
1058 aio
.preadv(off
, len
);
1059 dout(30) << aio
<< dendl
;
1060 pbl
->append(aio
.bl
);
1061 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
1062 << std::dec
<< " aio " << &aio
<< dendl
;
1066 r
= read(off
, len
, pbl
, ioc
, false);
1072 int KernelDevice::direct_read_unaligned(uint64_t off
, uint64_t len
, char *buf
)
1074 uint64_t aligned_off
= p2align(off
, block_size
);
1075 uint64_t aligned_len
= p2roundup(off
+len
, block_size
) - aligned_off
;
1076 bufferptr p
= buffer::create_small_page_aligned(aligned_len
);
1079 auto start1
= mono_clock::now();
1080 r
= ::pread(fd_directs
[WRITE_LIFE_NOT_SET
], p
.c_str(), aligned_len
, aligned_off
);
1081 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
1082 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1083 derr
<< __func__
<< " stalled read "
1084 << " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1085 << " since " << start1
<< ", timeout is "
1092 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1093 << " error: " << cpp_strerror(r
) << dendl
;
1096 ceph_assert((uint64_t)r
== aligned_len
);
1097 memcpy(buf
, p
.c_str() + (off
- aligned_off
), len
);
1099 dout(40) << __func__
<< " data: ";
1101 bl
.append(buf
, len
);
1106 return r
< 0 ? r
: 0;
1109 int KernelDevice::read_random(uint64_t off
, uint64_t len
, char *buf
,
1112 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1113 << "buffered " << buffered
1115 ceph_assert(len
> 0);
1116 ceph_assert(off
< size
);
1117 ceph_assert(off
+ len
<= size
);
1119 auto age
= cct
->_conf
->bdev_debug_aio_log_age
;
1121 //if it's direct io and unaligned, we have to use a internal buffer
1122 if (!buffered
&& ((off
% block_size
!= 0)
1123 || (len
% block_size
!= 0)
1124 || (uintptr_t(buf
) % CEPH_PAGE_SIZE
!= 0)))
1125 return direct_read_unaligned(off
, len
, buf
);
1127 auto start1
= mono_clock::now();
1132 uint64_t left
= len
;
1134 r
= ::pread(fd_buffereds
[WRITE_LIFE_NOT_SET
], t
, left
, off
);
1137 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << left
1138 << std::dec
<< " error: " << cpp_strerror(r
) << dendl
;
1145 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1146 derr
<< __func__
<< " stalled read "
1147 << " 0x" << std::hex
<< off0
<< "~" << len
<< std::dec
1148 << " (buffered) since " << start1
<< ", timeout is "
1153 //direct and aligned read
1154 r
= ::pread(fd_directs
[WRITE_LIFE_NOT_SET
], buf
, len
, off
);
1155 if (mono_clock::now() - start1
>= make_timespan(age
)) {
1156 derr
<< __func__
<< " stalled read "
1157 << " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1158 << " (direct) since " << start1
<< ", timeout is "
1164 derr
<< __func__
<< " direct_aligned_read" << " 0x" << std::hex
1165 << off
<< "~" << left
<< std::dec
<< " error: " << cpp_strerror(r
)
1169 ceph_assert((uint64_t)r
== len
);
1172 dout(40) << __func__
<< " data: ";
1174 bl
.append(buf
, len
);
1179 return r
< 0 ? r
: 0;
1182 int KernelDevice::invalidate_cache(uint64_t off
, uint64_t len
)
1184 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1186 ceph_assert(off
% block_size
== 0);
1187 ceph_assert(len
% block_size
== 0);
1188 int r
= posix_fadvise(fd_buffereds
[WRITE_LIFE_NOT_SET
], off
, len
, POSIX_FADV_DONTNEED
);
1191 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
1192 << " error: " << cpp_strerror(r
) << dendl
;