1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include <sys/types.h>
21 #include "KernelDevice.h"
22 #include "include/types.h"
23 #include "include/compat.h"
24 #include "include/stringify.h"
25 #include "common/errno.h"
26 #include "common/debug.h"
27 #include "common/blkdev.h"
28 #include "common/align.h"
29 #include "common/blkdev.h"
31 #define dout_context cct
32 #define dout_subsys ceph_subsys_bdev
34 #define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
36 KernelDevice::KernelDevice(CephContext
* cct
, aio_callback_t cb
, void *cbpriv
)
40 size(0), block_size(0),
41 fs(NULL
), aio(false), dio(false),
42 debug_lock("KernelDevice::debug_lock"),
43 aio_queue(cct
->_conf
->bdev_aio_max_queue_depth
),
45 aio_callback_priv(cbpriv
),
52 int KernelDevice::_lock()
55 memset(&l
, 0, sizeof(l
));
57 l
.l_whence
= SEEK_SET
;
58 int r
= ::fcntl(fd_direct
, F_SETLK
, &l
);
64 int KernelDevice::open(const string
& p
)
68 dout(1) << __func__
<< " path " << path
<< dendl
;
70 fd_direct
= ::open(path
.c_str(), O_RDWR
| O_DIRECT
);
73 derr
<< __func__
<< " open got: " << cpp_strerror(r
) << dendl
;
76 fd_buffered
= ::open(path
.c_str(), O_RDWR
);
77 if (fd_buffered
< 0) {
79 derr
<< __func__
<< " open got: " << cpp_strerror(r
) << dendl
;
83 aio
= cct
->_conf
->bdev_aio
;
85 assert(0 == "non-aio not supported");
88 // disable readahead as it will wreak havoc on our mix of
89 // directio/aio and buffered io.
90 r
= posix_fadvise(fd_buffered
, 0, 0, POSIX_FADV_RANDOM
);
93 derr
<< __func__
<< " open got: " << cpp_strerror(r
) << dendl
;
99 derr
<< __func__
<< " failed to lock " << path
<< ": " << cpp_strerror(r
)
105 r
= ::fstat(fd_direct
, &st
);
108 derr
<< __func__
<< " fstat got " << cpp_strerror(r
) << dendl
;
112 // Operate as though the block size is 4 KB. The backing file
113 // blksize doesn't strictly matter except that some file systems may
114 // require a read/modify/write if we write something smaller than
116 block_size
= cct
->_conf
->bdev_block_size
;
117 if (block_size
!= (unsigned)st
.st_blksize
) {
118 dout(1) << __func__
<< " backing device/file reports st_blksize "
119 << st
.st_blksize
<< ", using bdev_block_size "
120 << block_size
<< " anyway" << dendl
;
123 if (S_ISBLK(st
.st_mode
)) {
125 r
= get_block_device_size(fd_direct
, &s
);
135 char partition
[PATH_MAX
], devname
[PATH_MAX
];
136 r
= get_device_by_fd(fd_buffered
, partition
, devname
, sizeof(devname
));
138 derr
<< "unable to get device name for " << path
<< ": "
139 << cpp_strerror(r
) << dendl
;
142 dout(20) << __func__
<< " devname " << devname
<< dendl
;
143 rotational
= block_device_is_rotational(devname
);
152 fs
= FS::create_by_fd(fd_direct
);
155 // round size down to an even block
156 size
&= ~(block_size
- 1);
160 << " (0x" << std::hex
<< size
<< std::dec
<< ", "
161 << pretty_si_t(size
) << "B)"
162 << " block_size " << block_size
163 << " (" << pretty_si_t(block_size
) << "B)"
164 << " " << (rotational
? "rotational" : "non-rotational")
169 VOID_TEMP_FAILURE_RETRY(::close(fd_buffered
));
172 VOID_TEMP_FAILURE_RETRY(::close(fd_direct
));
177 void KernelDevice::close()
179 dout(1) << __func__
<< dendl
;
186 assert(fd_direct
>= 0);
187 VOID_TEMP_FAILURE_RETRY(::close(fd_direct
));
190 assert(fd_buffered
>= 0);
191 VOID_TEMP_FAILURE_RETRY(::close(fd_buffered
));
197 static string
get_dev_property(const char *dev
, const char *property
)
199 char val
[1024] = {0};
200 get_block_device_string_property(dev
, property
, val
, sizeof(val
));
204 int KernelDevice::collect_metadata(string prefix
, map
<string
,string
> *pm
) const
206 (*pm
)[prefix
+ "rotational"] = stringify((int)(bool)rotational
);
207 (*pm
)[prefix
+ "size"] = stringify(get_size());
208 (*pm
)[prefix
+ "block_size"] = stringify(get_block_size());
209 (*pm
)[prefix
+ "driver"] = "KernelDevice";
211 (*pm
)[prefix
+ "type"] = "hdd";
213 (*pm
)[prefix
+ "type"] = "ssd";
217 int r
= ::fstat(fd_buffered
, &st
);
220 if (S_ISBLK(st
.st_mode
)) {
221 (*pm
)[prefix
+ "access_mode"] = "blk";
222 char partition_path
[PATH_MAX
];
223 char dev_node
[PATH_MAX
];
224 int rc
= get_device_by_fd(fd_buffered
, partition_path
, dev_node
, PATH_MAX
);
228 (*pm
)[prefix
+ "partition_path"] = "unknown";
229 (*pm
)[prefix
+ "dev_node"] = "unknown";
232 (*pm
)[prefix
+ "partition_path"] = string(partition_path
);
233 (*pm
)[prefix
+ "dev_node"] = "unknown";
237 (*pm
)[prefix
+ "partition_path"] = string(partition_path
);
238 (*pm
)[prefix
+ "dev_node"] = string(dev_node
);
239 (*pm
)[prefix
+ "model"] = get_dev_property(dev_node
, "device/model");
240 (*pm
)[prefix
+ "dev"] = get_dev_property(dev_node
, "dev");
242 // nvme exposes a serial number
243 string serial
= get_dev_property(dev_node
, "device/serial");
244 if (serial
.length()) {
245 (*pm
)[prefix
+ "serial"] = serial
;
248 // nvme has a device/device/* structure; infer from that. there
249 // is probably a better way?
250 string nvme_vendor
= get_dev_property(dev_node
, "device/device/vendor");
251 if (nvme_vendor
.length()) {
252 (*pm
)[prefix
+ "type"] = "nvme";
257 (*pm
)[prefix
+ "access_mode"] = "file";
258 (*pm
)[prefix
+ "path"] = path
;
263 int KernelDevice::flush()
265 // protect flush with a mutex. note that we are not really protecting
266 // data here. instead, we're ensuring that if any flush() caller
267 // sees that io_since_flush is true, they block any racing callers
268 // until the flush is observed. that allows racing threads to be
269 // calling flush while still ensuring that *any* of them that got an
270 // aio completion notification will not return before that aio is
271 // stable on disk: whichever thread sees the flag first will block
272 // followers until the aio is stable.
273 std::lock_guard
<std::mutex
> l(flush_mutex
);
276 if (!io_since_flush
.compare_exchange_strong(expect
, false)) {
277 dout(10) << __func__
<< " no-op (no ios since last flush), flag is "
278 << (int)io_since_flush
.load() << dendl
;
282 dout(10) << __func__
<< " start" << dendl
;
283 if (cct
->_conf
->bdev_inject_crash
) {
285 // sleep for a moment to give other threads a chance to submit or
286 // wait on io that races with a flush.
287 derr
<< __func__
<< " injecting crash. first we sleep..." << dendl
;
288 sleep(cct
->_conf
->bdev_inject_crash_flush_delay
);
289 derr
<< __func__
<< " and now we die" << dendl
;
293 utime_t start
= ceph_clock_now();
294 int r
= ::fdatasync(fd_direct
);
295 utime_t end
= ceph_clock_now();
296 utime_t dur
= end
- start
;
299 derr
<< __func__
<< " fdatasync got: " << cpp_strerror(r
) << dendl
;
302 dout(5) << __func__
<< " in " << dur
<< dendl
;;
306 int KernelDevice::_aio_start()
309 dout(10) << __func__
<< dendl
;
310 int r
= aio_queue
.init();
313 derr
<< __func__
<< " io_setup(2) failed with EAGAIN; "
314 << "try increasing /proc/sys/fs/aio-max-nr" << dendl
;
316 derr
<< __func__
<< " io_setup(2) failed: " << cpp_strerror(r
) << dendl
;
320 aio_thread
.create("bstore_aio");
325 void KernelDevice::_aio_stop()
328 dout(10) << __func__
<< dendl
;
332 aio_queue
.shutdown();
336 void KernelDevice::_aio_thread()
338 dout(10) << __func__
<< " start" << dendl
;
339 int inject_crash_count
= 0;
341 dout(40) << __func__
<< " polling" << dendl
;
342 int max
= cct
->_conf
->bdev_aio_reap_max
;
344 int r
= aio_queue
.get_next_completed(cct
->_conf
->bdev_aio_poll_ms
,
347 derr
<< __func__
<< " got " << cpp_strerror(r
) << dendl
;
350 dout(30) << __func__
<< " got " << r
<< " completed aios" << dendl
;
351 for (int i
= 0; i
< r
; ++i
) {
352 IOContext
*ioc
= static_cast<IOContext
*>(aio
[i
]->priv
);
353 _aio_log_finish(ioc
, aio
[i
]->offset
, aio
[i
]->length
);
354 if (aio
[i
]->queue_item
.is_linked()) {
355 std::lock_guard
<std::mutex
> l(debug_queue_lock
);
356 debug_aio_unlink(*aio
[i
]);
359 // set flag indicating new ios have completed. we do this *before*
360 // any completion or notifications so that any user flush() that
361 // follows the observed io completion will include this io. Note
362 // that an earlier, racing flush() could observe and clear this
363 // flag, but that also ensures that the IO will be stable before the
364 // later flush() occurs.
365 io_since_flush
.store(true);
367 int r
= aio
[i
]->get_return_value();
368 dout(10) << __func__
<< " finished aio " << aio
[i
] << " r " << r
370 << " with " << (ioc
->num_running
.load() - 1)
371 << " aios left" << dendl
;
374 // NOTE: once num_running and we either call the callback or
375 // call aio_wake we cannot touch ioc or aio[] as the caller
378 if (--ioc
->num_running
== 0) {
379 aio_callback(aio_callback_priv
, ioc
->priv
);
386 if (cct
->_conf
->bdev_debug_aio
) {
387 utime_t now
= ceph_clock_now();
388 std::lock_guard
<std::mutex
> l(debug_queue_lock
);
390 if (debug_stall_since
== utime_t()) {
391 debug_stall_since
= now
;
393 utime_t cutoff
= now
;
394 cutoff
-= cct
->_conf
->bdev_debug_aio_suicide_timeout
;
395 if (debug_stall_since
< cutoff
) {
396 derr
<< __func__
<< " stalled aio " << debug_oldest
397 << " since " << debug_stall_since
<< ", timeout is "
398 << cct
->_conf
->bdev_debug_aio_suicide_timeout
399 << "s, suicide" << dendl
;
400 assert(0 == "stalled aio... buggy kernel or bad device?");
406 if (cct
->_conf
->bdev_inject_crash
) {
407 ++inject_crash_count
;
408 if (inject_crash_count
* cct
->_conf
->bdev_aio_poll_ms
/ 1000 >
409 cct
->_conf
->bdev_inject_crash
+ cct
->_conf
->bdev_inject_crash_flush_delay
) {
410 derr
<< __func__
<< " bdev_inject_crash trigger from aio thread"
418 dout(10) << __func__
<< " end" << dendl
;
421 void KernelDevice::_aio_log_start(
426 dout(20) << __func__
<< " 0x" << std::hex
<< offset
<< "~" << length
427 << std::dec
<< dendl
;
428 if (cct
->_conf
->bdev_debug_inflight_ios
) {
429 Mutex::Locker
l(debug_lock
);
430 if (debug_inflight
.intersects(offset
, length
)) {
431 derr
<< __func__
<< " inflight overlap of 0x"
433 << offset
<< "~" << length
<< std::dec
434 << " with " << debug_inflight
<< dendl
;
437 debug_inflight
.insert(offset
, length
);
441 void KernelDevice::debug_aio_link(aio_t
& aio
)
443 if (debug_queue
.empty()) {
446 debug_queue
.push_back(aio
);
449 void KernelDevice::debug_aio_unlink(aio_t
& aio
)
451 if (aio
.queue_item
.is_linked()) {
452 debug_queue
.erase(debug_queue
.iterator_to(aio
));
453 if (debug_oldest
== &aio
) {
454 if (debug_queue
.empty()) {
455 debug_oldest
= nullptr;
457 debug_oldest
= &debug_queue
.front();
459 debug_stall_since
= utime_t();
464 void KernelDevice::_aio_log_finish(
469 dout(20) << __func__
<< " " << aio
<< " 0x"
470 << std::hex
<< offset
<< "~" << length
<< std::dec
<< dendl
;
471 if (cct
->_conf
->bdev_debug_inflight_ios
) {
472 Mutex::Locker
l(debug_lock
);
473 debug_inflight
.erase(offset
, length
);
477 void KernelDevice::aio_submit(IOContext
*ioc
)
479 dout(20) << __func__
<< " ioc " << ioc
480 << " pending " << ioc
->num_pending
.load()
481 << " running " << ioc
->num_running
.load()
484 if (ioc
->num_pending
.load() == 0) {
488 // move these aside, and get our end iterator position now, as the
489 // aios might complete as soon as they are submitted and queue more
491 list
<aio_t
>::iterator e
= ioc
->running_aios
.begin();
492 ioc
->running_aios
.splice(e
, ioc
->pending_aios
);
494 int pending
= ioc
->num_pending
.load();
495 ioc
->num_running
+= pending
;
496 ioc
->num_pending
-= pending
;
497 assert(ioc
->num_pending
.load() == 0); // we should be only thread doing this
498 assert(ioc
->pending_aios
.size() == 0);
500 if (cct
->_conf
->bdev_debug_aio
) {
501 list
<aio_t
>::iterator p
= ioc
->running_aios
.begin();
503 for (auto& io
: p
->iov
)
504 dout(30) << __func__
<< " iov " << (void*)io
.iov_base
505 << " len " << io
.iov_len
<< dendl
;
507 std::lock_guard
<std::mutex
> l(debug_queue_lock
);
508 debug_aio_link(*p
++);
512 void *priv
= static_cast<void*>(ioc
);
514 r
= aio_queue
.submit_batch(ioc
->running_aios
.begin(), e
,
515 ioc
->num_running
.load(), priv
, &retries
);
518 derr
<< __func__
<< " retries " << retries
<< dendl
;
520 derr
<< " aio submit got " << cpp_strerror(r
) << dendl
;
525 int KernelDevice::_sync_write(uint64_t off
, bufferlist
&bl
, bool buffered
)
527 uint64_t len
= bl
.length();
528 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
529 << std::dec
<< " buffered" << dendl
;
530 if (cct
->_conf
->bdev_inject_crash
&&
531 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
532 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
533 << off
<< "~" << len
<< std::dec
<< dendl
;
538 bl
.prepare_iov(&iov
);
539 int r
= ::pwritev(buffered
? fd_buffered
: fd_direct
,
540 &iov
[0], iov
.size(), off
);
544 derr
<< __func__
<< " pwritev error: " << cpp_strerror(r
) << dendl
;
548 // initiate IO (but do not wait)
549 r
= ::sync_file_range(fd_buffered
, off
, len
, SYNC_FILE_RANGE_WRITE
);
552 derr
<< __func__
<< " sync_file_range error: " << cpp_strerror(r
) << dendl
;
557 io_since_flush
.store(true);
562 int KernelDevice::write(
567 uint64_t len
= bl
.length();
568 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
569 << (buffered
? " (buffered)" : " (direct)")
571 assert(off
% block_size
== 0);
572 assert(len
% block_size
== 0);
575 assert(off
+ len
<= size
);
577 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
578 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
)) {
579 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
581 dout(40) << "data: ";
585 return _sync_write(off
, bl
, buffered
);
588 int KernelDevice::aio_write(
594 uint64_t len
= bl
.length();
595 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
596 << (buffered
? " (buffered)" : " (direct)")
598 assert(off
% block_size
== 0);
599 assert(len
% block_size
== 0);
602 assert(off
+ len
<= size
);
604 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
605 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
)) {
606 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
608 dout(40) << "data: ";
612 _aio_log_start(ioc
, off
, len
);
615 if (aio
&& dio
&& !buffered
) {
616 ioc
->pending_aios
.push_back(aio_t(ioc
, fd_direct
));
618 aio_t
& aio
= ioc
->pending_aios
.back();
619 if (cct
->_conf
->bdev_inject_crash
&&
620 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
621 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
622 << off
<< "~" << len
<< std::dec
624 // generate a real io so that aio_wait behaves properly, but make it
625 // a read instead of write, and toss the result.
629 bl
.prepare_iov(&aio
.iov
);
630 for (unsigned i
=0; i
<aio
.iov
.size(); ++i
) {
631 dout(30) << "aio " << i
<< " " << aio
.iov
[i
].iov_base
632 << " " << aio
.iov
[i
].iov_len
<< dendl
;
634 aio
.bl
.claim_append(bl
);
635 aio
.pwritev(off
, len
);
637 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
638 << std::dec
<< " aio " << &aio
<< dendl
;
642 int r
= _sync_write(off
, bl
, buffered
);
643 _aio_log_finish(ioc
, off
, len
);
650 int KernelDevice::read(uint64_t off
, uint64_t len
, bufferlist
*pbl
,
654 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
655 << (buffered
? " (buffered)" : " (direct)")
657 assert(off
% block_size
== 0);
658 assert(len
% block_size
== 0);
661 assert(off
+ len
<= size
);
663 _aio_log_start(ioc
, off
, len
);
665 bufferptr p
= buffer::create_page_aligned(len
);
666 int r
= ::pread(buffered
? fd_buffered
: fd_direct
,
667 p
.c_str(), len
, off
);
672 assert((uint64_t)r
== len
);
673 pbl
->push_back(std::move(p
));
675 dout(40) << "data: ";
676 pbl
->hexdump(*_dout
);
680 _aio_log_finish(ioc
, off
, len
);
681 return r
< 0 ? r
: 0;
684 int KernelDevice::aio_read(
690 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
696 _aio_log_start(ioc
, off
, len
);
697 ioc
->pending_aios
.push_back(aio_t(ioc
, fd_direct
));
699 aio_t
& aio
= ioc
->pending_aios
.back();
701 for (unsigned i
=0; i
<aio
.iov
.size(); ++i
) {
702 dout(30) << "aio " << i
<< " " << aio
.iov
[i
].iov_base
703 << " " << aio
.iov
[i
].iov_len
<< dendl
;
706 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
707 << std::dec
<< " aio " << &aio
<< dendl
;
711 r
= read(off
, len
, pbl
, ioc
, false);
717 int KernelDevice::direct_read_unaligned(uint64_t off
, uint64_t len
, char *buf
)
719 uint64_t aligned_off
= align_down(off
, block_size
);
720 uint64_t aligned_len
= align_up(off
+len
, block_size
) - aligned_off
;
721 bufferptr p
= buffer::create_page_aligned(aligned_len
);
724 r
= ::pread(fd_direct
, p
.c_str(), aligned_len
, aligned_off
);
727 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
728 << " error: " << cpp_strerror(r
) << dendl
;
731 assert((uint64_t)r
== aligned_len
);
732 memcpy(buf
, p
.c_str() + (off
- aligned_off
), len
);
734 dout(40) << __func__
<< " data: ";
741 return r
< 0 ? r
: 0;
744 int KernelDevice::read_random(uint64_t off
, uint64_t len
, char *buf
,
747 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
751 assert(off
+ len
<= size
);
754 //if it's direct io and unaligned, we have to use a internal buffer
755 if (!buffered
&& ((off
% block_size
!= 0)
756 || (len
% block_size
!= 0)
757 || (uintptr_t(buf
) % CEPH_PAGE_SIZE
!= 0)))
758 return direct_read_unaligned(off
, len
, buf
);
765 r
= ::pread(fd_buffered
, t
, left
, off
);
768 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << left
769 << std::dec
<< " error: " << cpp_strerror(r
) << dendl
;
777 //direct and aligned read
778 r
= ::pread(fd_direct
, buf
, len
, off
);
781 derr
<< __func__
<< " direct_aligned_read" << " 0x" << std::hex
782 << off
<< "~" << left
<< std::dec
<< " error: " << cpp_strerror(r
)
786 assert((uint64_t)r
== len
);
789 dout(40) << __func__
<< " data: ";
796 return r
< 0 ? r
: 0;
799 int KernelDevice::invalidate_cache(uint64_t off
, uint64_t len
)
801 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
803 assert(off
% block_size
== 0);
804 assert(len
% block_size
== 0);
805 int r
= posix_fadvise(fd_buffered
, off
, len
, POSIX_FADV_DONTNEED
);
808 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
809 << " error: " << cpp_strerror(r
) << dendl
;