1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include <sys/types.h>
21 #include "KernelDevice.h"
22 #include "include/types.h"
23 #include "include/compat.h"
24 #include "include/stringify.h"
25 #include "common/errno.h"
26 #include "common/debug.h"
27 #include "common/blkdev.h"
28 #include "common/align.h"
29 #include "common/blkdev.h"
31 #define dout_context cct
32 #define dout_subsys ceph_subsys_bdev
34 #define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
36 KernelDevice::KernelDevice(CephContext
* cct
, aio_callback_t cb
, void *cbpriv
)
40 size(0), block_size(0),
41 fs(NULL
), aio(false), dio(false),
42 debug_lock("KernelDevice::debug_lock"),
43 aio_queue(cct
->_conf
->bdev_aio_max_queue_depth
),
45 aio_callback_priv(cbpriv
),
52 int KernelDevice::_lock()
55 memset(&l
, 0, sizeof(l
));
57 l
.l_whence
= SEEK_SET
;
58 int r
= ::fcntl(fd_direct
, F_SETLK
, &l
);
64 int KernelDevice::open(const string
& p
)
68 dout(1) << __func__
<< " path " << path
<< dendl
;
70 fd_direct
= ::open(path
.c_str(), O_RDWR
| O_DIRECT
| O_CLOEXEC
);
73 derr
<< __func__
<< " open got: " << cpp_strerror(r
) << dendl
;
76 fd_buffered
= ::open(path
.c_str(), O_RDWR
| O_CLOEXEC
);
77 if (fd_buffered
< 0) {
79 derr
<< __func__
<< " open got: " << cpp_strerror(r
) << dendl
;
83 aio
= cct
->_conf
->bdev_aio
;
85 assert(0 == "non-aio not supported");
88 // disable readahead as it will wreak havoc on our mix of
89 // directio/aio and buffered io.
90 r
= posix_fadvise(fd_buffered
, 0, 0, POSIX_FADV_RANDOM
);
93 derr
<< __func__
<< " open got: " << cpp_strerror(r
) << dendl
;
99 derr
<< __func__
<< " failed to lock " << path
<< ": " << cpp_strerror(r
)
105 r
= ::fstat(fd_direct
, &st
);
108 derr
<< __func__
<< " fstat got " << cpp_strerror(r
) << dendl
;
112 // Operate as though the block size is 4 KB. The backing file
113 // blksize doesn't strictly matter except that some file systems may
114 // require a read/modify/write if we write something smaller than
116 block_size
= cct
->_conf
->bdev_block_size
;
117 if (block_size
!= (unsigned)st
.st_blksize
) {
118 dout(1) << __func__
<< " backing device/file reports st_blksize "
119 << st
.st_blksize
<< ", using bdev_block_size "
120 << block_size
<< " anyway" << dendl
;
123 if (S_ISBLK(st
.st_mode
)) {
125 r
= get_block_device_size(fd_direct
, &s
);
133 if (cct
->_conf
->get_val
<bool>("bdev_inject_bad_size")) {
134 derr
<< "injecting bad size; actual 0x" << std::hex
<< size
135 << " but using 0x" << (size
& ~block_size
) << std::dec
<< dendl
;
136 size
&= ~(block_size
);
140 char partition
[PATH_MAX
], devname
[PATH_MAX
];
141 r
= get_device_by_fd(fd_buffered
, partition
, devname
, sizeof(devname
));
143 derr
<< "unable to get device name for " << path
<< ": "
144 << cpp_strerror(r
) << dendl
;
147 dout(20) << __func__
<< " devname " << devname
<< dendl
;
148 rotational
= block_device_is_rotational(devname
);
157 fs
= FS::create_by_fd(fd_direct
);
160 // round size down to an even block
161 size
&= ~(block_size
- 1);
165 << " (0x" << std::hex
<< size
<< std::dec
<< ", "
166 << byte_u_t(size
) << ")"
167 << " block_size " << block_size
168 << " (" << byte_u_t(block_size
) << ")"
169 << " " << (rotational
? "rotational" : "non-rotational")
174 VOID_TEMP_FAILURE_RETRY(::close(fd_buffered
));
177 VOID_TEMP_FAILURE_RETRY(::close(fd_direct
));
182 void KernelDevice::close()
184 dout(1) << __func__
<< dendl
;
191 assert(fd_direct
>= 0);
192 VOID_TEMP_FAILURE_RETRY(::close(fd_direct
));
195 assert(fd_buffered
>= 0);
196 VOID_TEMP_FAILURE_RETRY(::close(fd_buffered
));
202 static string
get_dev_property(const char *dev
, const char *property
)
204 char val
[1024] = {0};
205 get_block_device_string_property(dev
, property
, val
, sizeof(val
));
209 int KernelDevice::collect_metadata(string prefix
, map
<string
,string
> *pm
) const
211 (*pm
)[prefix
+ "rotational"] = stringify((int)(bool)rotational
);
212 (*pm
)[prefix
+ "size"] = stringify(get_size());
213 (*pm
)[prefix
+ "block_size"] = stringify(get_block_size());
214 (*pm
)[prefix
+ "driver"] = "KernelDevice";
216 (*pm
)[prefix
+ "type"] = "hdd";
218 (*pm
)[prefix
+ "type"] = "ssd";
222 int r
= ::fstat(fd_buffered
, &st
);
225 if (S_ISBLK(st
.st_mode
)) {
226 (*pm
)[prefix
+ "access_mode"] = "blk";
227 char partition_path
[PATH_MAX
];
228 char dev_node
[PATH_MAX
];
229 int rc
= get_device_by_fd(fd_buffered
, partition_path
, dev_node
, PATH_MAX
);
233 (*pm
)[prefix
+ "partition_path"] = "unknown";
234 (*pm
)[prefix
+ "dev_node"] = "unknown";
237 (*pm
)[prefix
+ "partition_path"] = string(partition_path
);
238 (*pm
)[prefix
+ "dev_node"] = "unknown";
242 (*pm
)[prefix
+ "partition_path"] = string(partition_path
);
243 (*pm
)[prefix
+ "dev_node"] = string(dev_node
);
244 (*pm
)[prefix
+ "model"] = get_dev_property(dev_node
, "device/model");
245 (*pm
)[prefix
+ "dev"] = get_dev_property(dev_node
, "dev");
247 // nvme exposes a serial number
248 string serial
= get_dev_property(dev_node
, "device/serial");
249 if (serial
.length()) {
250 (*pm
)[prefix
+ "serial"] = serial
;
253 // nvme has a device/device/* structure; infer from that. there
254 // is probably a better way?
255 string nvme_vendor
= get_dev_property(dev_node
, "device/device/vendor");
256 if (nvme_vendor
.length()) {
257 (*pm
)[prefix
+ "type"] = "nvme";
262 (*pm
)[prefix
+ "access_mode"] = "file";
263 (*pm
)[prefix
+ "path"] = path
;
268 int KernelDevice::flush()
270 // protect flush with a mutex. note that we are not really protecting
271 // data here. instead, we're ensuring that if any flush() caller
272 // sees that io_since_flush is true, they block any racing callers
273 // until the flush is observed. that allows racing threads to be
274 // calling flush while still ensuring that *any* of them that got an
275 // aio completion notification will not return before that aio is
276 // stable on disk: whichever thread sees the flag first will block
277 // followers until the aio is stable.
278 std::lock_guard
<std::mutex
> l(flush_mutex
);
281 if (!io_since_flush
.compare_exchange_strong(expect
, false)) {
282 dout(10) << __func__
<< " no-op (no ios since last flush), flag is "
283 << (int)io_since_flush
.load() << dendl
;
287 dout(10) << __func__
<< " start" << dendl
;
288 if (cct
->_conf
->bdev_inject_crash
) {
290 // sleep for a moment to give other threads a chance to submit or
291 // wait on io that races with a flush.
292 derr
<< __func__
<< " injecting crash. first we sleep..." << dendl
;
293 sleep(cct
->_conf
->bdev_inject_crash_flush_delay
);
294 derr
<< __func__
<< " and now we die" << dendl
;
298 utime_t start
= ceph_clock_now();
299 int r
= ::fdatasync(fd_direct
);
300 utime_t end
= ceph_clock_now();
301 utime_t dur
= end
- start
;
304 derr
<< __func__
<< " fdatasync got: " << cpp_strerror(r
) << dendl
;
307 dout(5) << __func__
<< " in " << dur
<< dendl
;;
311 int KernelDevice::_aio_start()
314 dout(10) << __func__
<< dendl
;
315 int r
= aio_queue
.init();
318 derr
<< __func__
<< " io_setup(2) failed with EAGAIN; "
319 << "try increasing /proc/sys/fs/aio-max-nr" << dendl
;
321 derr
<< __func__
<< " io_setup(2) failed: " << cpp_strerror(r
) << dendl
;
325 aio_thread
.create("bstore_aio");
330 void KernelDevice::_aio_stop()
333 dout(10) << __func__
<< dendl
;
337 aio_queue
.shutdown();
341 static bool is_expected_ioerr(const int r
)
343 // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
344 return (r
== -EOPNOTSUPP
|| r
== -ETIMEDOUT
|| r
== -ENOSPC
||
345 r
== -ENOLINK
|| r
== -EREMOTEIO
|| r
== -EBADE
||
346 r
== -ENODATA
|| r
== -EILSEQ
|| r
== -ENOMEM
||
347 r
== -EAGAIN
|| r
== -EREMCHG
|| r
== -EIO
);
350 void KernelDevice::_aio_thread()
352 dout(10) << __func__
<< " start" << dendl
;
353 int inject_crash_count
= 0;
355 dout(40) << __func__
<< " polling" << dendl
;
356 int max
= cct
->_conf
->bdev_aio_reap_max
;
358 int r
= aio_queue
.get_next_completed(cct
->_conf
->bdev_aio_poll_ms
,
361 derr
<< __func__
<< " got " << cpp_strerror(r
) << dendl
;
362 assert(0 == "got unexpected error from io_getevents");
365 dout(30) << __func__
<< " got " << r
<< " completed aios" << dendl
;
366 for (int i
= 0; i
< r
; ++i
) {
367 IOContext
*ioc
= static_cast<IOContext
*>(aio
[i
]->priv
);
368 _aio_log_finish(ioc
, aio
[i
]->offset
, aio
[i
]->length
);
369 if (aio
[i
]->queue_item
.is_linked()) {
370 std::lock_guard
<std::mutex
> l(debug_queue_lock
);
371 debug_aio_unlink(*aio
[i
]);
374 // set flag indicating new ios have completed. we do this *before*
375 // any completion or notifications so that any user flush() that
376 // follows the observed io completion will include this io. Note
377 // that an earlier, racing flush() could observe and clear this
378 // flag, but that also ensures that the IO will be stable before the
379 // later flush() occurs.
380 io_since_flush
.store(true);
382 long r
= aio
[i
]->get_return_value();
384 derr
<< __func__
<< " got r=" << r
<< " (" << cpp_strerror(r
) << ")"
386 if (ioc
->allow_eio
&& is_expected_ioerr(r
)) {
387 derr
<< __func__
<< " translating the error to EIO for upper layer"
389 ioc
->set_return_value(-EIO
);
391 assert(0 == "got unexpected error from aio_t::get_return_value. "
392 "This may suggest HW issue. Please check your dmesg!");
394 } else if (aio
[i
]->length
!= (uint64_t)r
) {
395 derr
<< "aio to " << aio
[i
]->offset
<< "~" << aio
[i
]->length
396 << " but returned: " << r
<< dendl
;
397 assert(0 == "unexpected aio error");
400 dout(10) << __func__
<< " finished aio " << aio
[i
] << " r " << r
402 << " with " << (ioc
->num_running
.load() - 1)
403 << " aios left" << dendl
;
405 // NOTE: once num_running and we either call the callback or
406 // call aio_wake we cannot touch ioc or aio[] as the caller
409 if (--ioc
->num_running
== 0) {
410 aio_callback(aio_callback_priv
, ioc
->priv
);
417 if (cct
->_conf
->bdev_debug_aio
) {
418 utime_t now
= ceph_clock_now();
419 std::lock_guard
<std::mutex
> l(debug_queue_lock
);
421 if (debug_stall_since
== utime_t()) {
422 debug_stall_since
= now
;
424 utime_t cutoff
= now
;
425 cutoff
-= cct
->_conf
->bdev_debug_aio_suicide_timeout
;
426 if (debug_stall_since
< cutoff
) {
427 derr
<< __func__
<< " stalled aio " << debug_oldest
428 << " since " << debug_stall_since
<< ", timeout is "
429 << cct
->_conf
->bdev_debug_aio_suicide_timeout
430 << "s, suicide" << dendl
;
431 assert(0 == "stalled aio... buggy kernel or bad device?");
437 if (cct
->_conf
->bdev_inject_crash
) {
438 ++inject_crash_count
;
439 if (inject_crash_count
* cct
->_conf
->bdev_aio_poll_ms
/ 1000 >
440 cct
->_conf
->bdev_inject_crash
+ cct
->_conf
->bdev_inject_crash_flush_delay
) {
441 derr
<< __func__
<< " bdev_inject_crash trigger from aio thread"
449 dout(10) << __func__
<< " end" << dendl
;
452 void KernelDevice::_aio_log_start(
457 dout(20) << __func__
<< " 0x" << std::hex
<< offset
<< "~" << length
458 << std::dec
<< dendl
;
459 if (cct
->_conf
->bdev_debug_inflight_ios
) {
460 Mutex::Locker
l(debug_lock
);
461 if (debug_inflight
.intersects(offset
, length
)) {
462 derr
<< __func__
<< " inflight overlap of 0x"
464 << offset
<< "~" << length
<< std::dec
465 << " with " << debug_inflight
<< dendl
;
468 debug_inflight
.insert(offset
, length
);
472 void KernelDevice::debug_aio_link(aio_t
& aio
)
474 if (debug_queue
.empty()) {
477 debug_queue
.push_back(aio
);
480 void KernelDevice::debug_aio_unlink(aio_t
& aio
)
482 if (aio
.queue_item
.is_linked()) {
483 debug_queue
.erase(debug_queue
.iterator_to(aio
));
484 if (debug_oldest
== &aio
) {
485 if (debug_queue
.empty()) {
486 debug_oldest
= nullptr;
488 debug_oldest
= &debug_queue
.front();
490 debug_stall_since
= utime_t();
495 void KernelDevice::_aio_log_finish(
500 dout(20) << __func__
<< " " << aio
<< " 0x"
501 << std::hex
<< offset
<< "~" << length
<< std::dec
<< dendl
;
502 if (cct
->_conf
->bdev_debug_inflight_ios
) {
503 Mutex::Locker
l(debug_lock
);
504 debug_inflight
.erase(offset
, length
);
508 void KernelDevice::aio_submit(IOContext
*ioc
)
510 dout(20) << __func__
<< " ioc " << ioc
511 << " pending " << ioc
->num_pending
.load()
512 << " running " << ioc
->num_running
.load()
515 if (ioc
->num_pending
.load() == 0) {
519 // move these aside, and get our end iterator position now, as the
520 // aios might complete as soon as they are submitted and queue more
522 list
<aio_t
>::iterator e
= ioc
->running_aios
.begin();
523 ioc
->running_aios
.splice(e
, ioc
->pending_aios
);
525 int pending
= ioc
->num_pending
.load();
526 ioc
->num_running
+= pending
;
527 ioc
->num_pending
-= pending
;
528 assert(ioc
->num_pending
.load() == 0); // we should be only thread doing this
529 assert(ioc
->pending_aios
.size() == 0);
531 if (cct
->_conf
->bdev_debug_aio
) {
532 list
<aio_t
>::iterator p
= ioc
->running_aios
.begin();
534 for (auto& io
: p
->iov
)
535 dout(30) << __func__
<< " iov " << (void*)io
.iov_base
536 << " len " << io
.iov_len
<< dendl
;
538 std::lock_guard
<std::mutex
> l(debug_queue_lock
);
539 debug_aio_link(*p
++);
543 void *priv
= static_cast<void*>(ioc
);
545 r
= aio_queue
.submit_batch(ioc
->running_aios
.begin(), e
,
546 ioc
->num_running
.load(), priv
, &retries
);
549 derr
<< __func__
<< " retries " << retries
<< dendl
;
551 derr
<< " aio submit got " << cpp_strerror(r
) << dendl
;
556 int KernelDevice::_sync_write(uint64_t off
, bufferlist
&bl
, bool buffered
)
558 uint64_t len
= bl
.length();
559 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
560 << std::dec
<< " buffered" << dendl
;
561 if (cct
->_conf
->bdev_inject_crash
&&
562 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
563 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
564 << off
<< "~" << len
<< std::dec
<< dendl
;
569 bl
.prepare_iov(&iov
);
570 int r
= ::pwritev(buffered
? fd_buffered
: fd_direct
,
571 &iov
[0], iov
.size(), off
);
575 derr
<< __func__
<< " pwritev error: " << cpp_strerror(r
) << dendl
;
579 // initiate IO (but do not wait)
580 r
= ::sync_file_range(fd_buffered
, off
, len
, SYNC_FILE_RANGE_WRITE
);
583 derr
<< __func__
<< " sync_file_range error: " << cpp_strerror(r
) << dendl
;
588 io_since_flush
.store(true);
593 int KernelDevice::write(
598 uint64_t len
= bl
.length();
599 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
600 << (buffered
? " (buffered)" : " (direct)")
602 assert(off
% block_size
== 0);
603 assert(len
% block_size
== 0);
606 assert(off
+ len
<= size
);
608 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
609 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
, IOV_MAX
)) {
610 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
612 dout(40) << "data: ";
616 return _sync_write(off
, bl
, buffered
);
619 int KernelDevice::aio_write(
625 uint64_t len
= bl
.length();
626 dout(20) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
627 << (buffered
? " (buffered)" : " (direct)")
629 assert(off
% block_size
== 0);
630 assert(len
% block_size
== 0);
633 assert(off
+ len
<= size
);
635 if ((!buffered
|| bl
.get_num_buffers() >= IOV_MAX
) &&
636 bl
.rebuild_aligned_size_and_memory(block_size
, block_size
, IOV_MAX
)) {
637 dout(20) << __func__
<< " rebuilding buffer to be aligned" << dendl
;
639 dout(40) << "data: ";
643 _aio_log_start(ioc
, off
, len
);
646 if (aio
&& dio
&& !buffered
) {
647 ioc
->pending_aios
.push_back(aio_t(ioc
, fd_direct
));
649 aio_t
& aio
= ioc
->pending_aios
.back();
650 if (cct
->_conf
->bdev_inject_crash
&&
651 rand() % cct
->_conf
->bdev_inject_crash
== 0) {
652 derr
<< __func__
<< " bdev_inject_crash: dropping io 0x" << std::hex
653 << off
<< "~" << len
<< std::dec
655 // generate a real io so that aio_wait behaves properly, but make it
656 // a read instead of write, and toss the result.
660 bl
.prepare_iov(&aio
.iov
);
661 for (unsigned i
=0; i
<aio
.iov
.size(); ++i
) {
662 dout(30) << "aio " << i
<< " " << aio
.iov
[i
].iov_base
663 << " " << aio
.iov
[i
].iov_len
<< dendl
;
665 aio
.bl
.claim_append(bl
);
666 aio
.pwritev(off
, len
);
668 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
669 << std::dec
<< " aio " << &aio
<< dendl
;
673 int r
= _sync_write(off
, bl
, buffered
);
674 _aio_log_finish(ioc
, off
, len
);
681 int KernelDevice::read(uint64_t off
, uint64_t len
, bufferlist
*pbl
,
685 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
686 << (buffered
? " (buffered)" : " (direct)")
688 assert(off
% block_size
== 0);
689 assert(len
% block_size
== 0);
692 assert(off
+ len
<= size
);
694 _aio_log_start(ioc
, off
, len
);
696 bufferptr p
= buffer::create_page_aligned(len
);
697 int r
= ::pread(buffered
? fd_buffered
: fd_direct
,
698 p
.c_str(), len
, off
);
703 assert((uint64_t)r
== len
);
704 pbl
->push_back(std::move(p
));
706 dout(40) << "data: ";
707 pbl
->hexdump(*_dout
);
711 _aio_log_finish(ioc
, off
, len
);
712 return r
< 0 ? r
: 0;
715 int KernelDevice::aio_read(
721 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
727 _aio_log_start(ioc
, off
, len
);
728 ioc
->pending_aios
.push_back(aio_t(ioc
, fd_direct
));
730 aio_t
& aio
= ioc
->pending_aios
.back();
732 for (unsigned i
=0; i
<aio
.iov
.size(); ++i
) {
733 dout(30) << "aio " << i
<< " " << aio
.iov
[i
].iov_base
734 << " " << aio
.iov
[i
].iov_len
<< dendl
;
737 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
738 << std::dec
<< " aio " << &aio
<< dendl
;
742 r
= read(off
, len
, pbl
, ioc
, false);
748 int KernelDevice::direct_read_unaligned(uint64_t off
, uint64_t len
, char *buf
)
750 uint64_t aligned_off
= align_down(off
, block_size
);
751 uint64_t aligned_len
= align_up(off
+len
, block_size
) - aligned_off
;
752 bufferptr p
= buffer::create_page_aligned(aligned_len
);
755 r
= ::pread(fd_direct
, p
.c_str(), aligned_len
, aligned_off
);
758 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
759 << " error: " << cpp_strerror(r
) << dendl
;
762 assert((uint64_t)r
== aligned_len
);
763 memcpy(buf
, p
.c_str() + (off
- aligned_off
), len
);
765 dout(40) << __func__
<< " data: ";
772 return r
< 0 ? r
: 0;
775 int KernelDevice::read_random(uint64_t off
, uint64_t len
, char *buf
,
778 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
782 assert(off
+ len
<= size
);
785 //if it's direct io and unaligned, we have to use a internal buffer
786 if (!buffered
&& ((off
% block_size
!= 0)
787 || (len
% block_size
!= 0)
788 || (uintptr_t(buf
) % CEPH_PAGE_SIZE
!= 0)))
789 return direct_read_unaligned(off
, len
, buf
);
796 r
= ::pread(fd_buffered
, t
, left
, off
);
799 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << left
800 << std::dec
<< " error: " << cpp_strerror(r
) << dendl
;
808 //direct and aligned read
809 r
= ::pread(fd_direct
, buf
, len
, off
);
812 derr
<< __func__
<< " direct_aligned_read" << " 0x" << std::hex
813 << off
<< "~" << left
<< std::dec
<< " error: " << cpp_strerror(r
)
817 assert((uint64_t)r
== len
);
820 dout(40) << __func__
<< " data: ";
827 return r
< 0 ? r
: 0;
830 int KernelDevice::invalidate_cache(uint64_t off
, uint64_t len
)
832 dout(5) << __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
834 assert(off
% block_size
== 0);
835 assert(len
% block_size
== 0);
836 int r
= posix_fadvise(fd_buffered
, off
, len
, POSIX_FADV_DONTNEED
);
839 derr
<< __func__
<< " 0x" << std::hex
<< off
<< "~" << len
<< std::dec
840 << " error: " << cpp_strerror(r
) << dendl
;