1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include <sys/types.h>
21 #include "KernelDevice.h"
22 #include "include/types.h"
23 #include "include/compat.h"
24 #include "include/stringify.h"
25 #include "common/errno.h"
26 #include "common/debug.h"
27 #include "common/blkdev.h"
28 #include "common/align.h"
29 #include "common/blkdev.h"
31 #define dout_context cct
32 #define dout_subsys ceph_subsys_bdev
34 #define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
36 KernelDevice::KernelDevice(CephContext
* cct
, aio_callback_t cb
, void *cbpriv
)
40 size(0), block_size(0),
41 fs(NULL
), aio(false), dio(false),
42 debug_lock("KernelDevice::debug_lock"),
43 aio_queue(cct
->_conf
->bdev_aio_max_queue_depth
),
45 aio_callback_priv(cbpriv
),
52 int KernelDevice::_lock()
55 memset(&l
, 0, sizeof(l
));
57 l
.l_whence
= SEEK_SET
;
58 int r
= ::fcntl(fd_direct
, F_SETLK
, &l
);
64 int KernelDevice::open(const string
& p
)
68 dout(1) << __func__
<< " path " << path
<< dendl
;
70 fd_direct
= ::open(path
.c_str(), O_RDWR
| O_DIRECT
);
73 derr
<< __func__
<< " open got: " << cpp_strerror(r
) << dendl
;
76 fd_buffered
= ::open(path
.c_str(), O_RDWR
);
77 if (fd_buffered
< 0) {
79 derr
<< __func__
<< " open got: " << cpp_strerror(r
) << dendl
;
83 aio
= cct
->_conf
->bdev_aio
;
85 assert(0 == "non-aio not supported");
88 // disable readahead as it will wreak havoc on our mix of
89 // directio/aio and buffered io.
90 r
= posix_fadvise(fd_buffered
, 0, 0, POSIX_FADV_RANDOM
);
93 derr
<< __func__
<< " open got: " << cpp_strerror(r
) << dendl
;
99 derr
<< __func__
<< " failed to lock " << path
<< ": " << cpp_strerror(r
)
105 r
= ::fstat(fd_direct
, &st
);
108 derr
<< __func__
<< " fstat got " << cpp_strerror(r
) << dendl
;
112 // Operate as though the block size is 4 KB. The backing file
113 // blksize doesn't strictly matter except that some file systems may
114 // require a read/modify/write if we write something smaller than
116 block_size
= cct
->_conf
->bdev_block_size
;
117 if (block_size
!= (unsigned)st
.st_blksize
) {
118 dout(1) << __func__
<< " backing device/file reports st_blksize "
119 << st
.st_blksize
<< ", using bdev_block_size "
120 << block_size
<< " anyway" << dendl
;
123 if (S_ISBLK(st
.st_mode
)) {
125 r
= get_block_device_size(fd_direct
, &s
);
133 size
&= ~(block_size
);
136 char partition
[PATH_MAX
], devname
[PATH_MAX
];
137 r
= get_device_by_fd(fd_buffered
, partition
, devname
, sizeof(devname
));
139 derr
<< "unable to get device name for " << path
<< ": "
140 << cpp_strerror(r
) << dendl
;
143 dout(20) << __func__
<< " devname " << devname
<< dendl
;
144 rotational
= block_device_is_rotational(devname
);
153 fs
= FS::create_by_fd(fd_direct
);
156 // round size down to an even block
157 size
&= ~(block_size
- 1);
161 << " (0x" << std::hex
<< size
<< std::dec
<< ", "
162 << pretty_si_t(size
) << "B)"
163 << " block_size " << block_size
164 << " (" << pretty_si_t(block_size
) << "B)"
165 << " " << (rotational
? "rotational" : "non-rotational")
170 VOID_TEMP_FAILURE_RETRY(::close(fd_buffered
));
173 VOID_TEMP_FAILURE_RETRY(::close(fd_direct
));
178 void KernelDevice::close()
180 dout(1) << __func__
<< dendl
;
187 assert(fd_direct
>= 0);
188 VOID_TEMP_FAILURE_RETRY(::close(fd_direct
));
191 assert(fd_buffered
>= 0);
192 VOID_TEMP_FAILURE_RETRY(::close(fd_buffered
));
198 static string
get_dev_property(const char *dev
, const char *property
)
200 char val
[1024] = {0};
201 get_block_device_string_property(dev
, property
, val
, sizeof(val
));
205 int KernelDevice::collect_metadata(string prefix
, map
<string
,string
> *pm
) const
207 (*pm
)[prefix
+ "rotational"] = stringify((int)(bool)rotational
);
208 (*pm
)[prefix
+ "size"] = stringify(get_size());
209 (*pm
)[prefix
+ "block_size"] = stringify(get_block_size());
210 (*pm
)[prefix
+ "driver"] = "KernelDevice";
212 (*pm
)[prefix
+ "type"] = "hdd";
214 (*pm
)[prefix
+ "type"] = "ssd";
218 int r
= ::fstat(fd_buffered
, &st
);
221 if (S_ISBLK(st
.st_mode
)) {
222 (*pm
)[prefix
+ "access_mode"] = "blk";
223 char partition_path
[PATH_MAX
];
224 char dev_node
[PATH_MAX
];
225 int rc
= get_device_by_fd(fd_buffered
, partition_path
, dev_node
, PATH_MAX
);
229 (*pm
)[prefix
+ "partition_path"] = "unknown";
230 (*pm
)[prefix
+ "dev_node"] = "unknown";
233 (*pm
)[prefix
+ "partition_path"] = string(partition_path
);
234 (*pm
)[prefix
+ "dev_node"] = "unknown";
238 (*pm
)[prefix
+ "partition_path"] = string(partition_path
);
239 (*pm
)[prefix
+ "dev_node"] = string(dev_node
);
240 (*pm
)[prefix
+ "model"] = get_dev_property(dev_node
, "device/model");
241 (*pm
)[prefix
+ "dev"] = get_dev_property(dev_node
, "dev");
243 // nvme exposes a serial number
244 string serial
= get_dev_property(dev_node
, "device/serial");
245 if (serial
.length()) {
246 (*pm
)[prefix
+ "serial"] = serial
;
249 // nvme has a device/device/* structure; infer from that. there
250 // is probably a better way?
251 string nvme_vendor
= get_dev_property(dev_node
, "device/device/vendor");
252 if (nvme_vendor
.length()) {
253 (*pm
)[prefix
+ "type"] = "nvme";
258 (*pm
)[prefix
+ "access_mode"] = "file";
259 (*pm
)[prefix
+ "path"] = path
;
264 int KernelDevice::flush()
266 // protect flush with a mutex. note that we are not really protecting
267 // data here. instead, we're ensuring that if any flush() caller
268 // sees that io_since_flush is true, they block any racing callers
269 // until the flush is observed. that allows racing threads to be
270 // calling flush while still ensuring that *any* of them that got an
271 // aio completion notification will not return before that aio is
272 // stable on disk: whichever thread sees the flag first will block
273 // followers until the aio is stable.
274 std::lock_guard
<std::mutex
> l(flush_mutex
);
277 if (!io_since_flush
.compare_exchange_strong(expect
, false)) {
278 dout(10) << __func__
<< " no-op (no ios since last flush), flag is "
279 << (int)io_since_flush
.load() << dendl
;
283 dout(10) << __func__
<< " start" << dendl
;
284 if (cct
->_conf
->bdev_inject_crash
) {
286 // sleep for a moment to give other threads a chance to submit or
287 // wait on io that races with a flush.
288 derr
<< __func__
<< " injecting crash. first we sleep..." << dendl
;
289 sleep(cct
->_conf
->bdev_inject_crash_flush_delay
);
290 derr
<< __func__
<< " and now we die" << dendl
;
294 utime_t start
= ceph_clock_now();
295 int r
= ::fdatasync(fd_direct
);
296 utime_t end
= ceph_clock_now();
297 utime_t dur
= end
- start
;
300 derr
<< __func__
<< " fdatasync got: " << cpp_strerror(r
) << dendl
;
303 dout(5) << __func__
<< " in " << dur
<< dendl
;;
307 int KernelDevice::_aio_start()
310 dout(10) << __func__
<< dendl
;
311 int r
= aio_queue
.init();
314 derr
<< __func__
<< " io_setup(2) failed with EAGAIN; "
315 << "try increasing /proc/sys/fs/aio-max-nr" << dendl
;
317 derr
<< __func__
<< " io_setup(2) failed: " << cpp_strerror(r
) << dendl
;
321 aio_thread
.create("bstore_aio");
326 void KernelDevice::_aio_stop()
329 dout(10) << __func__
<< dendl
;
333 aio_queue
.shutdown();
337 void KernelDevice::_aio_thread()
339 dout(10) << __func__
<< " start" << dendl
;
340 int inject_crash_count
= 0;
342 dout(40) << __func__
<< " polling" << dendl
;
345 int r
= aio_queue
.get_next_completed(cct
->_conf
->bdev_aio_poll_ms
,
348 derr
<< __func__
<< " got " << cpp_strerror(r
) << dendl
;
351 dout(30) << __func__
<< " got " << r
<< " completed aios" << dendl
;
352 for (int i
= 0; i
< r
; ++i
) {
353 IOContext
*ioc
= static_cast<IOContext
*>(aio
[i
]->priv
);
354 _aio_log_finish(ioc
, aio
[i
]->offset
, aio
[i
]->length
);
355 if (aio
[i
]->queue_item
.is_linked()) {
356 std::lock_guard
<std::mutex
> l(debug_queue_lock
);
357 debug_aio_unlink(*aio
[i
]);
360 // set flag indicating new ios have completed. we do this *before*
361 // any completion or notifications so that any user flush() that
362 // follows the observed io completion will include this io. Note
363 // that an earlier, racing flush() could observe and clear this
364 // flag, but that also ensures that the IO will be stable before the
365 // later flush() occurs.
366 io_since_flush
.store(true);
368 int r
= aio
[i
]->get_return_value();
369 dout(10) << __func__
<< " finished aio " << aio
[i
] << " r " << r
371 << " with " << (ioc
->num_running
.load() - 1)
372 << " aios left" << dendl
;
375 // NOTE: once num_running and we either call the callback or
376 // call aio_wake we cannot touch ioc or aio[] as the caller
379 if (--ioc
->num_running
== 0) {
380 aio_callback(aio_callback_priv
, ioc
->priv
);
387 if (cct
->_conf
->bdev_debug_aio
) {
388 utime_t now
= ceph_clock_now();
389 std::lock_guard
<std::mutex
> l(debug_queue_lock
);
391 if (debug_stall_since
== utime_t()) {
392 debug_stall_since
= now
;
394 utime_t cutoff
= now
;
395 cutoff
-= cct
->_conf
->bdev_debug_aio_suicide_timeout
;
396 if (debug_stall_since
< cutoff
) {
397 derr
<< __func__
<< " stalled aio " << debug_oldest
398 << " since " << debug_stall_since
<< ", timeout is "
399 << cct
->_conf
->bdev_debug_aio_suicide_timeout
400 << "s, suicide" << dendl
;
401 assert(0 == "stalled aio... buggy kernel or bad device?");
407 if (cct
->_conf
->bdev_inject_crash
) {
408 ++inject_crash_count
;
409 if (inject_crash_count
* cct
->_conf
->bdev_aio_poll_ms
/ 1000 >
410 cct
->_conf
->bdev_inject_crash
+ cct
->_conf
->bdev_inject_crash_flush_delay
) {
411 derr
<< __func__
<< " bdev_inject_crash trigger from aio thread"
419 dout(10) << __func__
<< " end" << dendl
;
422 void KernelDevice::_aio_log_start(
427 dout(20) << __func__
<< " 0x" << std::hex
<< offset
<< "~" << length
428 << std::dec
<< dendl
;
429 if (cct
->_conf
->bdev_debug_inflight_ios
) {
430 Mutex::Locker
l(debug_lock
);
431 if (debug_inflight
.intersects(offset
, length
)) {
432 derr
<< __func__
<< " inflight overlap of 0x"
434 << offset
<< "~" << length
<< std::dec
435 << " with " << debug_inflight
<< dendl
;
438 debug_inflight
.insert(offset
, length
);
442 void KernelDevice::debug_aio_link(aio_t
& aio
)
444 if (debug_queue
.empty()) {
447 debug_queue
.push_back(aio
);
450 void KernelDevice::debug_aio_unlink(aio_t
& aio
)
452 if (aio
.queue_item
.is_linked()) {
453 debug_queue
.erase(debug_queue
.iterator_to(aio
));
454 if (debug_oldest
== &aio
) {
455 if (debug_queue
.empty()) {
456 debug_oldest
= nullptr;
458 debug_oldest
= &debug_queue
.front();
460 debug_stall_since
= utime_t();
465 void KernelDevice::_aio_log_finish(
470 dout(20) << __func__
<< " " << aio
<< " 0x"
471 << std::hex
<< offset
<< "~" << length
<< std::dec
<< dendl
;
472 if (cct
->_conf
->bdev_debug_inflight_ios
) {
473 Mutex::Locker
l(debug_lock
);
474 debug_inflight
.erase(offset
, length
);
478 void KernelDevice::aio_submit(IOContext
*ioc
)
480 dout(20) << __func__
<< " ioc " << ioc
481 << " pending " << ioc
->num_pending
.load()
482 << " running " << ioc
->num_running
.load()
484 if (ioc
->num_pending
.load() == 0) {
487 // move these aside, and get our end iterator position now, as the
488 // aios might complete as soon as they are submitted and queue more
490 list
<aio_t
>::iterator e
= ioc
->running_aios
.begin();
491 ioc
->running_aios
.splice(e
, ioc
->pending_aios
);
492 list
<aio_t
>::iterator p
= ioc
->running_aios
.begin();
494 int pending
= ioc
->num_pending
.load();
495 ioc
->num_running
+= pending
;
496 ioc
->num_pending
-= pending
;
497 assert(ioc
->num_pending
.load() == 0); // we should be only thread doing this
502 aio
.priv
= static_cast<void*>(ioc
);
503 dout(20) << __func__
<< " aio " << &aio
<< " fd " << aio
.fd
504 << " 0x" << std::hex
<< aio
.offset
<< "~" << aio
.length
505 << std::dec
<< dendl
;
506 for (auto& io
: aio
.iov
)
507 dout(30) << __func__
<< " iov " << (void*)io
.iov_base
508 << " len " << io
.iov_len
<< dendl
;
510 // be careful: as soon as we submit aio we race with completion.
511 // since we are holding a ref take care not to dereference txc at
512 // all after that point.
513 list
<aio_t
>::iterator cur
= p
;
517 // do not dereference txc (or it's contents) after we submit (if
518 // done == true and we don't loop)
520 if (cct
->_conf
->bdev_debug_aio
) {
521 std::lock_guard
<std::mutex
> l(debug_queue_lock
);
522 debug_aio_link(*cur
);
524 int r
= aio_queue
.submit(*cur
, &retries
);
526 derr
<< __func__
<< " retries " << retries
<< dendl
;
528 derr
<< " aio submit got " << cpp_strerror(r
) << dendl
;
534 int KernelDevice::_sync_write(uint64_t off
, bufferlist
&bl
, bool buffered
)
536 uint64_t len
= bl
.length();
537 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
538 << std::dec
<< " buffered" << dendl
;
539 if (cct
->_conf
->bdev_inject_crash
&&
540 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
541 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
542 << off
<< "~" << len
<< std::dec
<< dendl
;
547 bl
.prepare_iov(&iov
);
548 int r
= ::pwritev(buffered
? fd_buffered
: fd_direct
,
549 &iov
[0], iov
.size(), off
);
553 derr
<< __func__
<< " pwritev error: " << cpp_strerror(r
) << dendl
;
557 // initiate IO (but do not wait)
558 r
= ::sync_file_range(fd_buffered
, off
, len
, SYNC_FILE_RANGE_WRITE
);
561 derr
<< __func__
<< " sync_file_range error: " << cpp_strerror(r
) << dendl
;
566 io_since_flush
.store(true);
571 int KernelDevice::write(
576 uint64_t len
= bl
.length();
577 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
578 << (buffered
? " (buffered)" : " (direct)")
580 assert(off
% block_size
== 0);
581 assert(len
% block_size
== 0);
584 assert(off
+ len
<= size
);
586 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
587 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
)) {
588 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
590 dout(40) << "data: ";
594 return _sync_write(off
, bl
, buffered
);
597 int KernelDevice::aio_write(
603 uint64_t len
= bl
.length();
604 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
605 << (buffered
? " (buffered)" : " (direct)")
607 assert(off
% block_size
== 0);
608 assert(len
% block_size
== 0);
611 assert(off
+ len
<= size
);
613 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
614 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
)) {
615 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
617 dout(40) << "data: ";
621 _aio_log_start(ioc
, off
, len
);
624 if (aio
&& dio
&& !buffered
) {
625 ioc
->pending_aios
.push_back(aio_t(ioc
, fd_direct
));
627 aio_t
& aio
= ioc
->pending_aios
.back();
628 if (cct
->_conf
->bdev_inject_crash
&&
629 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
630 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
631 << off
<< "~" << len
<< std::dec
633 // generate a real io so that aio_wait behaves properly, but make it
634 // a read instead of write, and toss the result.
638 bl
.prepare_iov(&aio
.iov
);
639 for (unsigned i
=0; i
<aio
.iov
.size(); ++i
) {
640 dout(30) << "aio " << i
<< " " << aio
.iov
[i
].iov_base
641 << " " << aio
.iov
[i
].iov_len
<< dendl
;
643 aio
.bl
.claim_append(bl
);
644 aio
.pwritev(off
, len
);
646 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
647 << std::dec
<< " aio " << &aio
<< dendl
;
651 int r
= _sync_write(off
, bl
, buffered
);
652 _aio_log_finish(ioc
, off
, len
);
659 int KernelDevice::read(uint64_t off
, uint64_t len
, bufferlist
*pbl
,
663 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
664 << (buffered
? " (buffered)" : " (direct)")
666 assert(off
% block_size
== 0);
667 assert(len
% block_size
== 0);
670 assert(off
+ len
<= size
);
672 _aio_log_start(ioc
, off
, len
);
674 bufferptr p
= buffer::create_page_aligned(len
);
675 int r
= ::pread(buffered
? fd_buffered
: fd_direct
,
676 p
.c_str(), len
, off
);
681 assert((uint64_t)r
== len
);
682 pbl
->push_back(std::move(p
));
684 dout(40) << "data: ";
685 pbl
->hexdump(*_dout
);
689 _aio_log_finish(ioc
, off
, len
);
690 return r
< 0 ? r
: 0;
693 int KernelDevice::aio_read(
699 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
705 _aio_log_start(ioc
, off
, len
);
706 ioc
->pending_aios
.push_back(aio_t(ioc
, fd_direct
));
708 aio_t
& aio
= ioc
->pending_aios
.back();
710 for (unsigned i
=0; i
<aio
.iov
.size(); ++i
) {
711 dout(30) << "aio " << i
<< " " << aio
.iov
[i
].iov_base
712 << " " << aio
.iov
[i
].iov_len
<< dendl
;
715 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
716 << std::dec
<< " aio " << &aio
<< dendl
;
720 r
= read(off
, len
, pbl
, ioc
, false);
726 int KernelDevice::direct_read_unaligned(uint64_t off
, uint64_t len
, char *buf
)
728 uint64_t aligned_off
= align_down(off
, block_size
);
729 uint64_t aligned_len
= align_up(off
+len
, block_size
) - aligned_off
;
730 bufferptr p
= buffer::create_page_aligned(aligned_len
);
733 r
= ::pread(fd_direct
, p
.c_str(), aligned_len
, aligned_off
);
736 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
737 << " error: " << cpp_strerror(r
) << dendl
;
740 assert((uint64_t)r
== aligned_len
);
741 memcpy(buf
, p
.c_str() + (off
- aligned_off
), len
);
743 dout(40) << __func__
<< " data: ";
750 return r
< 0 ? r
: 0;
753 int KernelDevice::read_random(uint64_t off
, uint64_t len
, char *buf
,
756 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
760 assert(off
+ len
<= size
);
763 //if it's direct io and unaligned, we have to use a internal buffer
764 if (!buffered
&& ((off
% block_size
!= 0)
765 || (len
% block_size
!= 0)
766 || (uintptr_t(buf
) % CEPH_PAGE_SIZE
!= 0)))
767 return direct_read_unaligned(off
, len
, buf
);
774 r
= ::pread(fd_buffered
, t
, left
, off
);
777 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << left
778 << std::dec
<< " error: " << cpp_strerror(r
) << dendl
;
786 //direct and aligned read
787 r
= ::pread(fd_direct
, buf
, len
, off
);
790 derr
<< __func__
<< " direct_aligned_read" << " 0x" << std::hex
791 << off
<< "~" << left
<< std::dec
<< " error: " << cpp_strerror(r
)
795 assert((uint64_t)r
== len
);
798 dout(40) << __func__
<< " data: ";
805 return r
< 0 ? r
: 0;
808 int KernelDevice::invalidate_cache(uint64_t off
, uint64_t len
)
810 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
812 assert(off
% block_size
== 0);
813 assert(len
% block_size
== 0);
814 int r
= posix_fadvise(fd_buffered
, off
, len
, POSIX_FADV_DONTNEED
);
817 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
818 << " error: " << cpp_strerror(r
) << dendl
;