]> git.proxmox.com Git - ceph.git/blob - ceph/src/blk/kernel/KernelDevice.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / blk / kernel / KernelDevice.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <limits>
16 #include <unistd.h>
17 #include <stdlib.h>
18 #include <sys/types.h>
19 #include <sys/stat.h>
20 #include <fcntl.h>
21 #include <sys/file.h>
22 #include <sys/mman.h>
23
24 #include <boost/container/flat_map.hpp>
25 #include <boost/lockfree/queue.hpp>
26
27 #include "KernelDevice.h"
28 #include "include/buffer_raw.h"
29 #include "include/intarith.h"
30 #include "include/types.h"
31 #include "include/compat.h"
32 #include "include/stringify.h"
33 #include "include/str_map.h"
34 #include "common/blkdev.h"
35 #include "common/buffer_instrumentation.h"
36 #include "common/errno.h"
37 #if defined(__FreeBSD__)
38 #include "bsm/audit_errno.h"
39 #endif
40 #include "common/debug.h"
41 #include "common/numa.h"
42
43 #include "global/global_context.h"
44 #include "io_uring.h"
45
46 #define dout_context cct
47 #define dout_subsys ceph_subsys_bdev
48 #undef dout_prefix
49 #define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
50
51 using std::list;
52 using std::map;
53 using std::string;
54 using std::vector;
55
56 using ceph::bufferlist;
57 using ceph::bufferptr;
58 using ceph::make_timespan;
59 using ceph::mono_clock;
60 using ceph::operator <<;
61
62 KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
63 : BlockDevice(cct, cb, cbpriv),
64 aio(false), dio(false),
65 discard_callback(d_cb),
66 discard_callback_priv(d_cbpriv),
67 aio_stop(false),
68 discard_started(false),
69 discard_stop(false),
70 aio_thread(this),
71 discard_thread(this),
72 injecting_crash(0)
73 {
74 fd_directs.resize(WRITE_LIFE_MAX, -1);
75 fd_buffereds.resize(WRITE_LIFE_MAX, -1);
76
77 bool use_ioring = cct->_conf.get_val<bool>("bdev_ioring");
78 unsigned int iodepth = cct->_conf->bdev_aio_max_queue_depth;
79
80 if (use_ioring && ioring_queue_t::supported()) {
81 bool use_ioring_hipri = cct->_conf.get_val<bool>("bdev_ioring_hipri");
82 bool use_ioring_sqthread_poll = cct->_conf.get_val<bool>("bdev_ioring_sqthread_poll");
83 io_queue = std::make_unique<ioring_queue_t>(iodepth, use_ioring_hipri, use_ioring_sqthread_poll);
84 } else {
85 static bool once;
86 if (use_ioring && !once) {
87 derr << "WARNING: io_uring API is not supported! Fallback to libaio!"
88 << dendl;
89 once = true;
90 }
91 io_queue = std::make_unique<aio_queue_t>(iodepth);
92 }
93 }
94
95 int KernelDevice::_lock()
96 {
97 // When the block changes, systemd-udevd will open the block,
98 // read some information and close it. Then a failure occurs here.
99 // So we need to try again here.
100 int fd = fd_directs[WRITE_LIFE_NOT_SET];
101 dout(10) << __func__ << " fd=" << fd << dendl;
102 uint64_t nr_tries = 0;
103 for (;;) {
104 struct flock fl = { .l_type = F_WRLCK,
105 .l_whence = SEEK_SET };
106 int r = ::fcntl(fd, F_OFD_SETLK, &fl);
107 if (r < 0) {
108 if (errno == EINVAL) {
109 r = ::flock(fd, LOCK_EX | LOCK_NB);
110 }
111 }
112 if (r == 0) {
113 return 0;
114 }
115 if (errno != EAGAIN) {
116 return -errno;
117 }
118 dout(1) << __func__ << " flock busy on " << path << dendl;
119 if (const uint64_t max_retry =
120 cct->_conf.get_val<uint64_t>("bdev_flock_retry");
121 max_retry > 0 && nr_tries++ == max_retry) {
122 return -EAGAIN;
123 }
124 double retry_interval =
125 cct->_conf.get_val<double>("bdev_flock_retry_interval");
126 std::this_thread::sleep_for(ceph::make_timespan(retry_interval));
127 }
128 }
129
130 int KernelDevice::open(const string& p)
131 {
132 path = p;
133 int r = 0, i = 0;
134 dout(1) << __func__ << " path " << path << dendl;
135
136 struct stat statbuf;
137 bool is_block;
138 r = stat(path.c_str(), &statbuf);
139 if (r != 0) {
140 derr << __func__ << " stat got: " << cpp_strerror(r) << dendl;
141 goto out_fail;
142 }
143 is_block = (statbuf.st_mode & S_IFMT) == S_IFBLK;
144 for (i = 0; i < WRITE_LIFE_MAX; i++) {
145 int flags = 0;
146 if (lock_exclusive && is_block && (i == 0)) {
147 // If opening block device use O_EXCL flag. It gives us best protection,
148 // as no other process can overwrite the data for as long as we are running.
149 // For block devices ::flock is not enough,
150 // since 2 different inodes with same major/minor can be locked.
151 // Exclusion by O_EXCL works in containers too.
152 flags |= O_EXCL;
153 }
154 int fd = ::open(path.c_str(), O_RDWR | O_DIRECT | flags);
155 if (fd < 0) {
156 r = -errno;
157 break;
158 }
159 fd_directs[i] = fd;
160
161 fd = ::open(path.c_str(), O_RDWR | O_CLOEXEC);
162 if (fd < 0) {
163 r = -errno;
164 break;
165 }
166 fd_buffereds[i] = fd;
167 }
168
169 if (i != WRITE_LIFE_MAX) {
170 derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
171 goto out_fail;
172 }
173
174 #if defined(F_SET_FILE_RW_HINT)
175 for (i = WRITE_LIFE_NONE; i < WRITE_LIFE_MAX; i++) {
176 if (fcntl(fd_directs[i], F_SET_FILE_RW_HINT, &i) < 0) {
177 r = -errno;
178 break;
179 }
180 if (fcntl(fd_buffereds[i], F_SET_FILE_RW_HINT, &i) < 0) {
181 r = -errno;
182 break;
183 }
184 }
185 if (i != WRITE_LIFE_MAX) {
186 enable_wrt = false;
187 dout(0) << "ioctl(F_SET_FILE_RW_HINT) on " << path << " failed: " << cpp_strerror(r) << dendl;
188 }
189 #endif
190
191 dio = true;
192 aio = cct->_conf->bdev_aio;
193 if (!aio) {
194 ceph_abort_msg("non-aio not supported");
195 }
196
197 // disable readahead as it will wreak havoc on our mix of
198 // directio/aio and buffered io.
199 r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], 0, 0, POSIX_FADV_RANDOM);
200 if (r) {
201 r = -r;
202 derr << __func__ << " posix_fadvise got: " << cpp_strerror(r) << dendl;
203 goto out_fail;
204 }
205
206 if (lock_exclusive) {
207 // We need to keep soft locking (via flock()) because O_EXCL does not work for regular files.
208 // This is as good as we can get. Other processes can still overwrite the data,
209 // but at least we are protected from mounting same device twice in ceph processes.
210 // We also apply soft locking for block devices, as it populates /proc/locks. (see lslocks)
211 r = _lock();
212 if (r < 0) {
213 derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r)
214 << dendl;
215 goto out_fail;
216 }
217 }
218
219 struct stat st;
220 r = ::fstat(fd_directs[WRITE_LIFE_NOT_SET], &st);
221 if (r < 0) {
222 r = -errno;
223 derr << __func__ << " fstat got " << cpp_strerror(r) << dendl;
224 goto out_fail;
225 }
226
227 // Operate as though the block size is 4 KB. The backing file
228 // blksize doesn't strictly matter except that some file systems may
229 // require a read/modify/write if we write something smaller than
230 // it.
231 block_size = cct->_conf->bdev_block_size;
232 if (block_size != (unsigned)st.st_blksize) {
233 dout(1) << __func__ << " backing device/file reports st_blksize "
234 << st.st_blksize << ", using bdev_block_size "
235 << block_size << " anyway" << dendl;
236 }
237
238
239 {
240 BlkDev blkdev_direct(fd_directs[WRITE_LIFE_NOT_SET]);
241 BlkDev blkdev_buffered(fd_buffereds[WRITE_LIFE_NOT_SET]);
242
243 if (S_ISBLK(st.st_mode)) {
244 int64_t s;
245 r = blkdev_direct.get_size(&s);
246 if (r < 0) {
247 goto out_fail;
248 }
249 size = s;
250 } else {
251 size = st.st_size;
252 }
253
254 char partition[PATH_MAX], devname[PATH_MAX];
255 if ((r = blkdev_buffered.partition(partition, PATH_MAX)) ||
256 (r = blkdev_buffered.wholedisk(devname, PATH_MAX))) {
257 derr << "unable to get device name for " << path << ": "
258 << cpp_strerror(r) << dendl;
259 rotational = true;
260 } else {
261 dout(20) << __func__ << " devname " << devname << dendl;
262 rotational = blkdev_buffered.is_rotational();
263 support_discard = blkdev_buffered.support_discard();
264 optimal_io_size = blkdev_buffered.get_optimal_io_size();
265 this->devname = devname;
266 // check if any extended block device plugin recognizes this device
267 // detect_vdo has moved into the VDO plugin
268 int rc = extblkdev::detect_device(cct, devname, ebd_impl);
269 if (rc != 0) {
270 dout(20) << __func__ << " no plugin volume maps to " << devname << dendl;
271 }
272 }
273 }
274
275 r = _post_open();
276 if (r < 0) {
277 goto out_fail;
278 }
279
280 r = _aio_start();
281 if (r < 0) {
282 goto out_fail;
283 }
284 if (support_discard && cct->_conf->bdev_enable_discard && cct->_conf->bdev_async_discard) {
285 _discard_start();
286 }
287
288 // round size down to an even block
289 size &= ~(block_size - 1);
290
291 dout(1) << __func__
292 << " size " << size
293 << " (0x" << std::hex << size << std::dec << ", "
294 << byte_u_t(size) << ")"
295 << " block_size " << block_size
296 << " (" << byte_u_t(block_size) << ")"
297 << " " << (rotational ? "rotational device," : "non-rotational device,")
298 << " discard " << (support_discard ? "supported" : "not supported")
299 << dendl;
300 return 0;
301
302 out_fail:
303 for (i = 0; i < WRITE_LIFE_MAX; i++) {
304 if (fd_directs[i] >= 0) {
305 VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
306 fd_directs[i] = -1;
307 } else {
308 break;
309 }
310 if (fd_buffereds[i] >= 0) {
311 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
312 fd_buffereds[i] = -1;
313 } else {
314 break;
315 }
316 }
317 return r;
318 }
319
320 int KernelDevice::get_devices(std::set<std::string> *ls) const
321 {
322 if (devname.empty()) {
323 return 0;
324 }
325 get_raw_devices(devname, ls);
326 return 0;
327 }
328
329 void KernelDevice::close()
330 {
331 dout(1) << __func__ << dendl;
332 _aio_stop();
333 if (discard_thread.is_started()) {
334 _discard_stop();
335 }
336 _pre_close();
337
338 extblkdev::release_device(ebd_impl);
339
340 for (int i = 0; i < WRITE_LIFE_MAX; i++) {
341 assert(fd_directs[i] >= 0);
342 VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
343 fd_directs[i] = -1;
344
345 assert(fd_buffereds[i] >= 0);
346 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
347 fd_buffereds[i] = -1;
348 }
349 path.clear();
350 }
351
352 int KernelDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
353 {
354 (*pm)[prefix + "support_discard"] = stringify((int)(bool)support_discard);
355 (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational);
356 (*pm)[prefix + "size"] = stringify(get_size());
357 (*pm)[prefix + "block_size"] = stringify(get_block_size());
358 (*pm)[prefix + "optimal_io_size"] = stringify(get_optimal_io_size());
359 (*pm)[prefix + "driver"] = "KernelDevice";
360 if (rotational) {
361 (*pm)[prefix + "type"] = "hdd";
362 } else {
363 (*pm)[prefix + "type"] = "ssd";
364 }
365 // if compression device detected, collect meta data for device
366 // VDO specific meta data has moved into VDO plugin
367 if (ebd_impl) {
368 ebd_impl->collect_metadata(prefix, pm);
369 }
370
371 {
372 string res_names;
373 std::set<std::string> devnames;
374 if (get_devices(&devnames) == 0) {
375 for (auto& dev : devnames) {
376 if (!res_names.empty()) {
377 res_names += ",";
378 }
379 res_names += dev;
380 }
381 if (res_names.size()) {
382 (*pm)[prefix + "devices"] = res_names;
383 }
384 }
385 }
386
387 struct stat st;
388 int r = ::fstat(fd_buffereds[WRITE_LIFE_NOT_SET], &st);
389 if (r < 0)
390 return -errno;
391 if (S_ISBLK(st.st_mode)) {
392 (*pm)[prefix + "access_mode"] = "blk";
393
394 char buffer[1024] = {0};
395 BlkDev blkdev{fd_buffereds[WRITE_LIFE_NOT_SET]};
396 if (r = blkdev.partition(buffer, sizeof(buffer)); r) {
397 (*pm)[prefix + "partition_path"] = "unknown";
398 } else {
399 (*pm)[prefix + "partition_path"] = buffer;
400 }
401 buffer[0] = '\0';
402 if (r = blkdev.partition(buffer, sizeof(buffer)); r) {
403 (*pm)[prefix + "dev_node"] = "unknown";
404 } else {
405 (*pm)[prefix + "dev_node"] = buffer;
406 }
407 if (!r) {
408 return 0;
409 }
410 buffer[0] = '\0';
411 blkdev.model(buffer, sizeof(buffer));
412 (*pm)[prefix + "model"] = buffer;
413
414 buffer[0] = '\0';
415 blkdev.dev(buffer, sizeof(buffer));
416 (*pm)[prefix + "dev"] = buffer;
417
418 // nvme exposes a serial number
419 buffer[0] = '\0';
420 blkdev.serial(buffer, sizeof(buffer));
421 (*pm)[prefix + "serial"] = buffer;
422
423 // numa
424 int node;
425 r = blkdev.get_numa_node(&node);
426 if (r >= 0) {
427 (*pm)[prefix + "numa_node"] = stringify(node);
428 }
429 } else {
430 (*pm)[prefix + "access_mode"] = "file";
431 (*pm)[prefix + "path"] = path;
432 }
433 return 0;
434 }
435
436 int KernelDevice::get_ebd_state(ExtBlkDevState &state) const
437 {
438 // use compression driver plugin to determine physical size and availability
439 // VDO specific get_thin_utilization has moved into VDO plugin
440 if (ebd_impl) {
441 return ebd_impl->get_state(state);
442 }
443 return -ENOENT;
444 }
445
446 int KernelDevice::choose_fd(bool buffered, int write_hint) const
447 {
448 #if defined(F_SET_FILE_RW_HINT)
449 if (!enable_wrt)
450 write_hint = WRITE_LIFE_NOT_SET;
451 #else
452 // Without WRITE_LIFE capabilities, only one file is used.
453 // And rocksdb sets this value also to > 0, so we need to catch this here
454 // instead of trusting rocksdb to set write_hint.
455 write_hint = WRITE_LIFE_NOT_SET;
456 #endif
457 return buffered ? fd_buffereds[write_hint] : fd_directs[write_hint];
458 }
459
460 int KernelDevice::flush()
461 {
462 // protect flush with a mutex. note that we are not really protecting
463 // data here. instead, we're ensuring that if any flush() caller
464 // sees that io_since_flush is true, they block any racing callers
465 // until the flush is observed. that allows racing threads to be
466 // calling flush while still ensuring that *any* of them that got an
467 // aio completion notification will not return before that aio is
468 // stable on disk: whichever thread sees the flag first will block
469 // followers until the aio is stable.
470 std::lock_guard l(flush_mutex);
471
472 bool expect = true;
473 if (!io_since_flush.compare_exchange_strong(expect, false)) {
474 dout(10) << __func__ << " no-op (no ios since last flush), flag is "
475 << (int)io_since_flush.load() << dendl;
476 return 0;
477 }
478
479 dout(10) << __func__ << " start" << dendl;
480 if (cct->_conf->bdev_inject_crash) {
481 ++injecting_crash;
482 // sleep for a moment to give other threads a chance to submit or
483 // wait on io that races with a flush.
484 derr << __func__ << " injecting crash. first we sleep..." << dendl;
485 sleep(cct->_conf->bdev_inject_crash_flush_delay);
486 derr << __func__ << " and now we die" << dendl;
487 cct->_log->flush();
488 _exit(1);
489 }
490 utime_t start = ceph_clock_now();
491 int r = ::fdatasync(fd_directs[WRITE_LIFE_NOT_SET]);
492 utime_t end = ceph_clock_now();
493 utime_t dur = end - start;
494 if (r < 0) {
495 r = -errno;
496 derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl;
497 ceph_abort();
498 }
499 dout(5) << __func__ << " in " << dur << dendl;;
500 return r;
501 }
502
503 int KernelDevice::_aio_start()
504 {
505 if (aio) {
506 dout(10) << __func__ << dendl;
507 int r = io_queue->init(fd_directs);
508 if (r < 0) {
509 if (r == -EAGAIN) {
510 derr << __func__ << " io_setup(2) failed with EAGAIN; "
511 << "try increasing /proc/sys/fs/aio-max-nr" << dendl;
512 } else {
513 derr << __func__ << " io_setup(2) failed: " << cpp_strerror(r) << dendl;
514 }
515 return r;
516 }
517 aio_thread.create("bstore_aio");
518 }
519 return 0;
520 }
521
522 void KernelDevice::_aio_stop()
523 {
524 if (aio) {
525 dout(10) << __func__ << dendl;
526 aio_stop = true;
527 aio_thread.join();
528 aio_stop = false;
529 io_queue->shutdown();
530 }
531 }
532
533 void KernelDevice::_discard_start()
534 {
535 discard_thread.create("bstore_discard");
536 }
537
538 void KernelDevice::_discard_stop()
539 {
540 dout(10) << __func__ << dendl;
541 {
542 std::unique_lock l(discard_lock);
543 while (!discard_started) {
544 discard_cond.wait(l);
545 }
546 discard_stop = true;
547 discard_cond.notify_all();
548 }
549 discard_thread.join();
550 {
551 std::lock_guard l(discard_lock);
552 discard_stop = false;
553 }
554 dout(10) << __func__ << " stopped" << dendl;
555 }
556
557 void KernelDevice::discard_drain()
558 {
559 dout(10) << __func__ << dendl;
560 std::unique_lock l(discard_lock);
561 while (!discard_queued.empty() || discard_running) {
562 discard_cond.wait(l);
563 }
564 }
565
566 static bool is_expected_ioerr(const int r)
567 {
568 // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
569 return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC ||
570 r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO ||
571 r == -ENODATA || r == -EILSEQ || r == -ENOMEM ||
572 #if defined(__linux__)
573 r == -EREMCHG || r == -EBADE
574 #elif defined(__FreeBSD__)
575 r == - BSM_ERRNO_EREMCHG || r == -BSM_ERRNO_EBADE
576 #endif
577 );
578 }
579
580 void KernelDevice::_aio_thread()
581 {
582 dout(10) << __func__ << " start" << dendl;
583 int inject_crash_count = 0;
584 while (!aio_stop) {
585 dout(40) << __func__ << " polling" << dendl;
586 int max = cct->_conf->bdev_aio_reap_max;
587 aio_t *aio[max];
588 int r = io_queue->get_next_completed(cct->_conf->bdev_aio_poll_ms,
589 aio, max);
590 if (r < 0) {
591 derr << __func__ << " got " << cpp_strerror(r) << dendl;
592 ceph_abort_msg("got unexpected error from io_getevents");
593 }
594 if (r > 0) {
595 dout(30) << __func__ << " got " << r << " completed aios" << dendl;
596 for (int i = 0; i < r; ++i) {
597 IOContext *ioc = static_cast<IOContext*>(aio[i]->priv);
598 _aio_log_finish(ioc, aio[i]->offset, aio[i]->length);
599 if (aio[i]->queue_item.is_linked()) {
600 std::lock_guard l(debug_queue_lock);
601 debug_aio_unlink(*aio[i]);
602 }
603
604 // set flag indicating new ios have completed. we do this *before*
605 // any completion or notifications so that any user flush() that
606 // follows the observed io completion will include this io. Note
607 // that an earlier, racing flush() could observe and clear this
608 // flag, but that also ensures that the IO will be stable before the
609 // later flush() occurs.
610 io_since_flush.store(true);
611
612 long r = aio[i]->get_return_value();
613 if (r < 0) {
614 derr << __func__ << " got r=" << r << " (" << cpp_strerror(r) << ")"
615 << dendl;
616 if (ioc->allow_eio && is_expected_ioerr(r)) {
617 derr << __func__ << " translating the error to EIO for upper layer"
618 << dendl;
619 ioc->set_return_value(-EIO);
620 } else {
621 if (is_expected_ioerr(r)) {
622 note_io_error_event(
623 devname.c_str(),
624 path.c_str(),
625 r,
626 #if defined(HAVE_POSIXAIO)
627 aio[i]->aio.aiocb.aio_lio_opcode,
628 #else
629 aio[i]->iocb.aio_lio_opcode,
630 #endif
631 aio[i]->offset,
632 aio[i]->length);
633 ceph_abort_msg(
634 "Unexpected IO error. "
635 "This may suggest a hardware issue. "
636 "Please check your kernel log!");
637 }
638 ceph_abort_msg(
639 "Unexpected IO error. "
640 "This may suggest HW issue. Please check your dmesg!");
641 }
642 } else if (aio[i]->length != (uint64_t)r) {
643 derr << "aio to 0x" << std::hex << aio[i]->offset
644 << "~" << aio[i]->length << std::dec
645 << " but returned: " << r << dendl;
646 ceph_abort_msg("unexpected aio return value: does not match length");
647 }
648
649 dout(10) << __func__ << " finished aio " << aio[i] << " r " << r
650 << " ioc " << ioc
651 << " with " << (ioc->num_running.load() - 1)
652 << " aios left" << dendl;
653
654 // NOTE: once num_running and we either call the callback or
655 // call aio_wake we cannot touch ioc or aio[] as the caller
656 // may free it.
657 if (ioc->priv) {
658 if (--ioc->num_running == 0) {
659 aio_callback(aio_callback_priv, ioc->priv);
660 }
661 } else {
662 ioc->try_aio_wake();
663 }
664 }
665 }
666 if (cct->_conf->bdev_debug_aio) {
667 utime_t now = ceph_clock_now();
668 std::lock_guard l(debug_queue_lock);
669 if (debug_oldest) {
670 if (debug_stall_since == utime_t()) {
671 debug_stall_since = now;
672 } else {
673 if (cct->_conf->bdev_debug_aio_suicide_timeout) {
674 utime_t cutoff = now;
675 cutoff -= cct->_conf->bdev_debug_aio_suicide_timeout;
676 if (debug_stall_since < cutoff) {
677 derr << __func__ << " stalled aio " << debug_oldest
678 << " since " << debug_stall_since << ", timeout is "
679 << cct->_conf->bdev_debug_aio_suicide_timeout
680 << "s, suicide" << dendl;
681 ceph_abort_msg("stalled aio... buggy kernel or bad device?");
682 }
683 }
684 }
685 }
686 }
687 if (cct->_conf->bdev_inject_crash) {
688 ++inject_crash_count;
689 if (inject_crash_count * cct->_conf->bdev_aio_poll_ms / 1000 >
690 cct->_conf->bdev_inject_crash + cct->_conf->bdev_inject_crash_flush_delay) {
691 derr << __func__ << " bdev_inject_crash trigger from aio thread"
692 << dendl;
693 cct->_log->flush();
694 _exit(1);
695 }
696 }
697 }
698 dout(10) << __func__ << " end" << dendl;
699 }
700
701 void KernelDevice::_discard_thread()
702 {
703 std::unique_lock l(discard_lock);
704 ceph_assert(!discard_started);
705 discard_started = true;
706 discard_cond.notify_all();
707 while (true) {
708 ceph_assert(discard_finishing.empty());
709 if (discard_queued.empty()) {
710 if (discard_stop)
711 break;
712 dout(20) << __func__ << " sleep" << dendl;
713 discard_cond.notify_all(); // for the thread trying to drain...
714 discard_cond.wait(l);
715 dout(20) << __func__ << " wake" << dendl;
716 } else {
717 discard_finishing.swap(discard_queued);
718 discard_running = true;
719 l.unlock();
720 dout(20) << __func__ << " finishing" << dendl;
721 for (auto p = discard_finishing.begin();p != discard_finishing.end(); ++p) {
722 _discard(p.get_start(), p.get_len());
723 }
724
725 discard_callback(discard_callback_priv, static_cast<void*>(&discard_finishing));
726 discard_finishing.clear();
727 l.lock();
728 discard_running = false;
729 }
730 }
731 dout(10) << __func__ << " finish" << dendl;
732 discard_started = false;
733 }
734
735 int KernelDevice::_queue_discard(interval_set<uint64_t> &to_release)
736 {
737 // if bdev_async_discard enabled on the fly, discard_thread is not started here, fallback to sync discard
738 if (!discard_thread.is_started())
739 return -1;
740
741 if (to_release.empty())
742 return 0;
743
744 std::lock_guard l(discard_lock);
745 discard_queued.insert(to_release);
746 discard_cond.notify_all();
747 return 0;
748 }
749
750 // return true only if _queue_discard succeeded, so caller won't have to do alloc->release
751 // otherwise false
752 bool KernelDevice::try_discard(interval_set<uint64_t> &to_release, bool async)
753 {
754 if (!support_discard || !cct->_conf->bdev_enable_discard)
755 return false;
756
757 if (async && discard_thread.is_started()) {
758 return 0 == _queue_discard(to_release);
759 } else {
760 for (auto p = to_release.begin(); p != to_release.end(); ++p) {
761 _discard(p.get_start(), p.get_len());
762 }
763 }
764 return false;
765 }
766
767 void KernelDevice::_aio_log_start(
768 IOContext *ioc,
769 uint64_t offset,
770 uint64_t length)
771 {
772 dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
773 << std::dec << dendl;
774 if (cct->_conf->bdev_debug_inflight_ios) {
775 std::lock_guard l(debug_lock);
776 if (debug_inflight.intersects(offset, length)) {
777 derr << __func__ << " inflight overlap of 0x"
778 << std::hex
779 << offset << "~" << length << std::dec
780 << " with " << debug_inflight << dendl;
781 ceph_abort();
782 }
783 debug_inflight.insert(offset, length);
784 }
785 }
786
787 void KernelDevice::debug_aio_link(aio_t& aio)
788 {
789 if (debug_queue.empty()) {
790 debug_oldest = &aio;
791 }
792 debug_queue.push_back(aio);
793 }
794
795 void KernelDevice::debug_aio_unlink(aio_t& aio)
796 {
797 if (aio.queue_item.is_linked()) {
798 debug_queue.erase(debug_queue.iterator_to(aio));
799 if (debug_oldest == &aio) {
800 auto age = cct->_conf->bdev_debug_aio_log_age;
801 if (age && debug_stall_since != utime_t()) {
802 utime_t cutoff = ceph_clock_now();
803 cutoff -= age;
804 if (debug_stall_since < cutoff) {
805 derr << __func__ << " stalled aio " << debug_oldest
806 << " since " << debug_stall_since << ", timeout is "
807 << age
808 << "s" << dendl;
809 }
810 }
811
812 if (debug_queue.empty()) {
813 debug_oldest = nullptr;
814 } else {
815 debug_oldest = &debug_queue.front();
816 }
817 debug_stall_since = utime_t();
818 }
819 }
820 }
821
822 void KernelDevice::_aio_log_finish(
823 IOContext *ioc,
824 uint64_t offset,
825 uint64_t length)
826 {
827 dout(20) << __func__ << " " << aio << " 0x"
828 << std::hex << offset << "~" << length << std::dec << dendl;
829 if (cct->_conf->bdev_debug_inflight_ios) {
830 std::lock_guard l(debug_lock);
831 debug_inflight.erase(offset, length);
832 }
833 }
834
835 void KernelDevice::aio_submit(IOContext *ioc)
836 {
837 dout(20) << __func__ << " ioc " << ioc
838 << " pending " << ioc->num_pending.load()
839 << " running " << ioc->num_running.load()
840 << dendl;
841
842 if (ioc->num_pending.load() == 0) {
843 return;
844 }
845
846 // move these aside, and get our end iterator position now, as the
847 // aios might complete as soon as they are submitted and queue more
848 // wal aio's.
849 list<aio_t>::iterator e = ioc->running_aios.begin();
850 ioc->running_aios.splice(e, ioc->pending_aios);
851
852 int pending = ioc->num_pending.load();
853 ioc->num_running += pending;
854 ioc->num_pending -= pending;
855 ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this
856 ceph_assert(ioc->pending_aios.size() == 0);
857
858 if (cct->_conf->bdev_debug_aio) {
859 list<aio_t>::iterator p = ioc->running_aios.begin();
860 while (p != e) {
861 dout(30) << __func__ << " " << *p << dendl;
862 std::lock_guard l(debug_queue_lock);
863 debug_aio_link(*p++);
864 }
865 }
866
867 void *priv = static_cast<void*>(ioc);
868 int r, retries = 0;
869 // num of pending aios should not overflow when passed to submit_batch()
870 assert(pending <= std::numeric_limits<uint16_t>::max());
871 r = io_queue->submit_batch(ioc->running_aios.begin(), e,
872 pending, priv, &retries);
873
874 if (retries)
875 derr << __func__ << " retries " << retries << dendl;
876 if (r < 0) {
877 derr << " aio submit got " << cpp_strerror(r) << dendl;
878 ceph_assert(r == 0);
879 }
880 }
881
882 int KernelDevice::_sync_write(uint64_t off, bufferlist &bl, bool buffered, int write_hint)
883 {
884 uint64_t len = bl.length();
885 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
886 << std::dec << " " << buffermode(buffered) << dendl;
887 if (cct->_conf->bdev_inject_crash &&
888 rand() % cct->_conf->bdev_inject_crash == 0) {
889 derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
890 << off << "~" << len << std::dec << dendl;
891 ++injecting_crash;
892 return 0;
893 }
894 vector<iovec> iov;
895 bl.prepare_iov(&iov);
896
897 auto left = len;
898 auto o = off;
899 size_t idx = 0;
900 do {
901 auto r = ::pwritev(choose_fd(buffered, write_hint),
902 &iov[idx], iov.size() - idx, o);
903
904 if (r < 0) {
905 r = -errno;
906 derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl;
907 return r;
908 }
909 o += r;
910 left -= r;
911 if (left) {
912 // skip fully processed IOVs
913 while (idx < iov.size() && (size_t)r >= iov[idx].iov_len) {
914 r -= iov[idx++].iov_len;
915 }
916 // update partially processed one if any
917 if (r) {
918 ceph_assert(idx < iov.size());
919 ceph_assert((size_t)r < iov[idx].iov_len);
920 iov[idx].iov_base = static_cast<char*>(iov[idx].iov_base) + r;
921 iov[idx].iov_len -= r;
922 r = 0;
923 }
924 ceph_assert(r == 0);
925 }
926 } while (left);
927
928 #ifdef HAVE_SYNC_FILE_RANGE
929 if (buffered) {
930 // initiate IO and wait till it completes
931 auto r = ::sync_file_range(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER|SYNC_FILE_RANGE_WAIT_BEFORE);
932 if (r < 0) {
933 r = -errno;
934 derr << __func__ << " sync_file_range error: " << cpp_strerror(r) << dendl;
935 return r;
936 }
937 }
938 #endif
939
940 io_since_flush.store(true);
941
942 return 0;
943 }
944
945 int KernelDevice::write(
946 uint64_t off,
947 bufferlist &bl,
948 bool buffered,
949 int write_hint)
950 {
951 uint64_t len = bl.length();
952 dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
953 << " " << buffermode(buffered)
954 << dendl;
955 ceph_assert(is_valid_io(off, len));
956 if (cct->_conf->objectstore_blackhole) {
957 lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
958 << dendl;
959 return 0;
960 }
961
962 if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
963 bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) {
964 dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
965 }
966 dout(40) << "data:\n";
967 bl.hexdump(*_dout);
968 *_dout << dendl;
969
970 return _sync_write(off, bl, buffered, write_hint);
971 }
972
973 int KernelDevice::aio_write(
974 uint64_t off,
975 bufferlist &bl,
976 IOContext *ioc,
977 bool buffered,
978 int write_hint)
979 {
980 uint64_t len = bl.length();
981 dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
982 << " " << buffermode(buffered)
983 << dendl;
984 ceph_assert(is_valid_io(off, len));
985 if (cct->_conf->objectstore_blackhole) {
986 lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
987 << dendl;
988 return 0;
989 }
990
991 if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
992 bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) {
993 dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
994 }
995 dout(40) << "data:\n";
996 bl.hexdump(*_dout);
997 *_dout << dendl;
998
999 _aio_log_start(ioc, off, len);
1000
1001 #ifdef HAVE_LIBAIO
1002 if (aio && dio && !buffered) {
1003 if (cct->_conf->bdev_inject_crash &&
1004 rand() % cct->_conf->bdev_inject_crash == 0) {
1005 derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
1006 << off << "~" << len << std::dec
1007 << dendl;
1008 // generate a real io so that aio_wait behaves properly, but make it
1009 // a read instead of write, and toss the result.
1010 ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
1011 ++ioc->num_pending;
1012 auto& aio = ioc->pending_aios.back();
1013 aio.bl.push_back(
1014 ceph::buffer::ptr_node::create(ceph::buffer::create_small_page_aligned(len)));
1015 aio.bl.prepare_iov(&aio.iov);
1016 aio.preadv(off, len);
1017 ++injecting_crash;
1018 } else {
1019 if (bl.length() <= RW_IO_MAX) {
1020 // fast path (non-huge write)
1021 ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
1022 ++ioc->num_pending;
1023 auto& aio = ioc->pending_aios.back();
1024 bl.prepare_iov(&aio.iov);
1025 aio.bl.claim_append(bl);
1026 aio.pwritev(off, len);
1027 dout(30) << aio << dendl;
1028 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
1029 << std::dec << " aio " << &aio << dendl;
1030 } else {
1031 // write in RW_IO_MAX-sized chunks
1032 uint64_t prev_len = 0;
1033 while (prev_len < bl.length()) {
1034 bufferlist tmp;
1035 if (prev_len + RW_IO_MAX < bl.length()) {
1036 tmp.substr_of(bl, prev_len, RW_IO_MAX);
1037 } else {
1038 tmp.substr_of(bl, prev_len, bl.length() - prev_len);
1039 }
1040 auto len = tmp.length();
1041 ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
1042 ++ioc->num_pending;
1043 auto& aio = ioc->pending_aios.back();
1044 tmp.prepare_iov(&aio.iov);
1045 aio.bl.claim_append(tmp);
1046 aio.pwritev(off + prev_len, len);
1047 dout(30) << aio << dendl;
1048 dout(5) << __func__ << " 0x" << std::hex << off + prev_len
1049 << "~" << len
1050 << std::dec << " aio " << &aio << " (piece)" << dendl;
1051 prev_len += len;
1052 }
1053 }
1054 }
1055 } else
1056 #endif
1057 {
1058 int r = _sync_write(off, bl, buffered, write_hint);
1059 _aio_log_finish(ioc, off, len);
1060 if (r < 0)
1061 return r;
1062 }
1063 return 0;
1064 }
1065
1066 int KernelDevice::_discard(uint64_t offset, uint64_t len)
1067 {
1068 int r = 0;
1069 if (cct->_conf->objectstore_blackhole) {
1070 lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
1071 << dendl;
1072 return 0;
1073 }
1074 dout(10) << __func__
1075 << " 0x" << std::hex << offset << "~" << len << std::dec
1076 << dendl;
1077 r = BlkDev{fd_directs[WRITE_LIFE_NOT_SET]}.discard((int64_t)offset, (int64_t)len);
1078 return r;
1079 }
1080
1081 struct ExplicitHugePagePool {
1082 using region_queue_t = boost::lockfree::queue<void*>;
1083 using instrumented_raw = ceph::buffer_instrumentation::instrumented_raw<
1084 BlockDevice::hugepaged_raw_marker_t>;
1085
1086 struct mmaped_buffer_raw : public instrumented_raw {
1087 region_queue_t& region_q; // for recycling
1088
1089 mmaped_buffer_raw(void* mmaped_region, ExplicitHugePagePool& parent)
1090 : instrumented_raw(static_cast<char*>(mmaped_region), parent.buffer_size),
1091 region_q(parent.region_q) {
1092 // the `mmaped_region` has been passed to `raw` as the buffer's `data`
1093 }
1094 ~mmaped_buffer_raw() override {
1095 // don't delete nor unmmap; recycle the region instead
1096 region_q.push(data);
1097 }
1098 };
1099
1100 ExplicitHugePagePool(const size_t buffer_size, size_t buffers_in_pool)
1101 : buffer_size(buffer_size), region_q(buffers_in_pool) {
1102 while (buffers_in_pool--) {
1103 void* const mmaped_region = ::mmap(
1104 nullptr,
1105 buffer_size,
1106 PROT_READ | PROT_WRITE,
1107 #if defined(__FreeBSD__)
1108 // FreeBSD doesn't have MAP_HUGETLB nor MAP_POPULATE but it has
1109 // a different, more automated / implicit mechanisms. However,
1110 // we want to mimic the Linux behavior as closely as possible
1111 // also in the matter of error handling which is the reason
1112 // behind MAP_ALIGNED_SUPER.
1113 // See: https://lists.freebsd.org/pipermail/freebsd-questions/2014-August/260578.html
1114 MAP_PRIVATE | MAP_ANONYMOUS | MAP_PREFAULT_READ | MAP_ALIGNED_SUPER,
1115 #else
1116 MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE | MAP_HUGETLB,
1117 #endif // __FreeBSD__
1118 -1,
1119 0);
1120 if (mmaped_region == MAP_FAILED) {
1121 ceph_abort("can't allocate huge buffer;"
1122 " /proc/sys/vm/nr_hugepages misconfigured?");
1123 } else {
1124 region_q.push(mmaped_region);
1125 }
1126 }
1127 }
1128 ~ExplicitHugePagePool() {
1129 void* mmaped_region;
1130 while (region_q.pop(mmaped_region)) {
1131 ::munmap(mmaped_region, buffer_size);
1132 }
1133 }
1134
1135 ceph::unique_leakable_ptr<buffer::raw> try_create() {
1136 if (void* mmaped_region; region_q.pop(mmaped_region)) {
1137 return ceph::unique_leakable_ptr<buffer::raw> {
1138 new mmaped_buffer_raw(mmaped_region, *this)
1139 };
1140 } else {
1141 // oops, empty queue.
1142 return nullptr;
1143 }
1144 }
1145
1146 size_t get_buffer_size() const {
1147 return buffer_size;
1148 }
1149
1150 private:
1151 const size_t buffer_size;
1152 region_queue_t region_q;
1153 };
1154
1155 struct HugePagePoolOfPools {
1156 HugePagePoolOfPools(const std::map<size_t, size_t> conf)
1157 : pools(conf.size(), [conf] (size_t index, auto emplacer) {
1158 ceph_assert(index < conf.size());
1159 // it could be replaced with a state-mutating lambda and
1160 // `conf::erase()` but performance is not a concern here.
1161 const auto [buffer_size, buffers_in_pool] =
1162 *std::next(std::begin(conf), index);
1163 emplacer.emplace(buffer_size, buffers_in_pool);
1164 }) {
1165 }
1166
1167 ceph::unique_leakable_ptr<buffer::raw> try_create(const size_t size) {
1168 // thankfully to `conf` being a `std::map` we store the pools
1169 // sorted by buffer sizes. this would allow to clamp to log(n)
1170 // but I doubt admins want to have dozens of accelerated buffer
1171 // size. let's keep this simple for now.
1172 if (auto iter = std::find_if(std::begin(pools), std::end(pools),
1173 [size] (const auto& pool) {
1174 return size == pool.get_buffer_size();
1175 });
1176 iter != std::end(pools)) {
1177 return iter->try_create();
1178 }
1179 return nullptr;
1180 }
1181
1182 static HugePagePoolOfPools from_desc(const std::string& conf);
1183
1184 private:
1185 // let's have some space inside (for 2 MB and 4 MB perhaps?)
1186 // NOTE: we need tiny_vector as the boost::lockfree queue inside
1187 // pool is not-movable.
1188 ceph::containers::tiny_vector<ExplicitHugePagePool, 2> pools;
1189 };
1190
1191
1192 HugePagePoolOfPools HugePagePoolOfPools::from_desc(const std::string& desc) {
1193 std::map<size_t, size_t> conf; // buffer_size -> buffers_in_pool
1194 std::map<std::string, std::string> exploded_str_conf;
1195 get_str_map(desc, &exploded_str_conf);
1196 for (const auto& [buffer_size_s, buffers_in_pool_s] : exploded_str_conf) {
1197 size_t buffer_size, buffers_in_pool;
1198 if (sscanf(buffer_size_s.c_str(), "%zu", &buffer_size) != 1) {
1199 ceph_abort("can't parse a key in the configuration");
1200 }
1201 if (sscanf(buffers_in_pool_s.c_str(), "%zu", &buffers_in_pool) != 1) {
1202 ceph_abort("can't parse a value in the configuration");
1203 }
1204 conf[buffer_size] = buffers_in_pool;
1205 }
1206 return HugePagePoolOfPools{std::move(conf)};
1207 }
1208
1209 // create a buffer basing on user-configurable. it's intended to make
1210 // our buffers THP-able.
1211 ceph::unique_leakable_ptr<buffer::raw> KernelDevice::create_custom_aligned(
1212 const size_t len,
1213 IOContext* const ioc) const
1214 {
1215 // just to preserve the logic of create_small_page_aligned().
1216 if (len < CEPH_PAGE_SIZE) {
1217 return ceph::buffer::create_small_page_aligned(len);
1218 } else {
1219 static HugePagePoolOfPools hp_pools = HugePagePoolOfPools::from_desc(
1220 cct->_conf.get_val<std::string>("bdev_read_preallocated_huge_buffers")
1221 );
1222 if (auto lucky_raw = hp_pools.try_create(len); lucky_raw) {
1223 dout(20) << __func__ << " allocated from huge pool"
1224 << " lucky_raw.data=" << (void*)lucky_raw->get_data()
1225 << " bdev_read_preallocated_huge_buffers="
1226 << cct->_conf.get_val<std::string>("bdev_read_preallocated_huge_buffers")
1227 << dendl;
1228 ioc->flags |= IOContext::FLAG_DONT_CACHE;
1229 return lucky_raw;
1230 } else {
1231 // fallthrough due to empty buffer pool. this can happen also
1232 // when the configurable was explicitly set to 0.
1233 dout(20) << __func__ << " cannot allocate from huge pool"
1234 << dendl;
1235 }
1236 }
1237 const size_t custom_alignment = cct->_conf->bdev_read_buffer_alignment;
1238 dout(20) << __func__ << " with the custom alignment;"
1239 << " len=" << len
1240 << " custom_alignment=" << custom_alignment
1241 << dendl;
1242 return ceph::buffer::create_aligned(len, custom_alignment);
1243 }
1244
1245 int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
1246 IOContext *ioc,
1247 bool buffered)
1248 {
1249 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
1250 << " " << buffermode(buffered)
1251 << dendl;
1252 ceph_assert(is_valid_io(off, len));
1253
1254 _aio_log_start(ioc, off, len);
1255
1256 auto start1 = mono_clock::now();
1257
1258 auto p = ceph::buffer::ptr_node::create(create_custom_aligned(len, ioc));
1259 int r = ::pread(choose_fd(buffered, WRITE_LIFE_NOT_SET),
1260 p->c_str(), len, off);
1261 auto age = cct->_conf->bdev_debug_aio_log_age;
1262 if (mono_clock::now() - start1 >= make_timespan(age)) {
1263 derr << __func__ << " stalled read "
1264 << " 0x" << std::hex << off << "~" << len << std::dec
1265 << " " << buffermode(buffered)
1266 << " since " << start1 << ", timeout is "
1267 << age
1268 << "s" << dendl;
1269 }
1270 if (r < 0) {
1271 if (ioc->allow_eio && is_expected_ioerr(-errno)) {
1272 r = -EIO;
1273 } else {
1274 r = -errno;
1275 }
1276 derr << __func__ << " 0x" << std::hex << off << "~" << std::left
1277 << std::dec << " error: " << cpp_strerror(r) << dendl;
1278 goto out;
1279 }
1280 ceph_assert((uint64_t)r == len);
1281 pbl->push_back(std::move(p));
1282
1283 dout(40) << "data:\n";
1284 pbl->hexdump(*_dout);
1285 *_dout << dendl;
1286
1287 out:
1288 _aio_log_finish(ioc, off, len);
1289 return r < 0 ? r : 0;
1290 }
1291
1292 int KernelDevice::aio_read(
1293 uint64_t off,
1294 uint64_t len,
1295 bufferlist *pbl,
1296 IOContext *ioc)
1297 {
1298 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
1299 << dendl;
1300
1301 int r = 0;
1302 #ifdef HAVE_LIBAIO
1303 if (aio && dio) {
1304 ceph_assert(is_valid_io(off, len));
1305 _aio_log_start(ioc, off, len);
1306 ioc->pending_aios.push_back(aio_t(ioc, fd_directs[WRITE_LIFE_NOT_SET]));
1307 ++ioc->num_pending;
1308 aio_t& aio = ioc->pending_aios.back();
1309 aio.bl.push_back(
1310 ceph::buffer::ptr_node::create(create_custom_aligned(len, ioc)));
1311 aio.bl.prepare_iov(&aio.iov);
1312 aio.preadv(off, len);
1313 dout(30) << aio << dendl;
1314 pbl->append(aio.bl);
1315 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
1316 << std::dec << " aio " << &aio << dendl;
1317 } else
1318 #endif
1319 {
1320 r = read(off, len, pbl, ioc, false);
1321 }
1322
1323 return r;
1324 }
1325
1326 int KernelDevice::direct_read_unaligned(uint64_t off, uint64_t len, char *buf)
1327 {
1328 uint64_t aligned_off = p2align(off, block_size);
1329 uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off;
1330 bufferptr p = ceph::buffer::create_small_page_aligned(aligned_len);
1331 int r = 0;
1332
1333 auto start1 = mono_clock::now();
1334 r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], p.c_str(), aligned_len, aligned_off);
1335 auto age = cct->_conf->bdev_debug_aio_log_age;
1336 if (mono_clock::now() - start1 >= make_timespan(age)) {
1337 derr << __func__ << " stalled read "
1338 << " 0x" << std::hex << off << "~" << len << std::dec
1339 << " since " << start1 << ", timeout is "
1340 << age
1341 << "s" << dendl;
1342 }
1343
1344 if (r < 0) {
1345 r = -errno;
1346 derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
1347 << " error: " << cpp_strerror(r) << dendl;
1348 goto out;
1349 }
1350 ceph_assert((uint64_t)r == aligned_len);
1351 memcpy(buf, p.c_str() + (off - aligned_off), len);
1352
1353 dout(40) << __func__ << " data:\n";
1354 bufferlist bl;
1355 bl.append(buf, len);
1356 bl.hexdump(*_dout);
1357 *_dout << dendl;
1358
1359 out:
1360 return r < 0 ? r : 0;
1361 }
1362
1363 int KernelDevice::read_random(uint64_t off, uint64_t len, char *buf,
1364 bool buffered)
1365 {
1366 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
1367 << "buffered " << buffered
1368 << dendl;
1369 ceph_assert(len > 0);
1370 ceph_assert(off < size);
1371 ceph_assert(off + len <= size);
1372 int r = 0;
1373 auto age = cct->_conf->bdev_debug_aio_log_age;
1374
1375 //if it's direct io and unaligned, we have to use a internal buffer
1376 if (!buffered && ((off % block_size != 0)
1377 || (len % block_size != 0)
1378 || (uintptr_t(buf) % CEPH_PAGE_SIZE != 0)))
1379 return direct_read_unaligned(off, len, buf);
1380
1381 auto start1 = mono_clock::now();
1382 if (buffered) {
1383 //buffered read
1384 auto off0 = off;
1385 char *t = buf;
1386 uint64_t left = len;
1387 while (left > 0) {
1388 r = ::pread(fd_buffereds[WRITE_LIFE_NOT_SET], t, left, off);
1389 if (r < 0) {
1390 r = -errno;
1391 derr << __func__ << " 0x" << std::hex << off << "~" << left
1392 << std::dec << " error: " << cpp_strerror(r) << dendl;
1393 goto out;
1394 }
1395 off += r;
1396 t += r;
1397 left -= r;
1398 }
1399 if (mono_clock::now() - start1 >= make_timespan(age)) {
1400 derr << __func__ << " stalled read "
1401 << " 0x" << std::hex << off0 << "~" << len << std::dec
1402 << " (buffered) since " << start1 << ", timeout is "
1403 << age
1404 << "s" << dendl;
1405 }
1406 } else {
1407 //direct and aligned read
1408 r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], buf, len, off);
1409 if (mono_clock::now() - start1 >= make_timespan(age)) {
1410 derr << __func__ << " stalled read "
1411 << " 0x" << std::hex << off << "~" << len << std::dec
1412 << " (direct) since " << start1 << ", timeout is "
1413 << age
1414 << "s" << dendl;
1415 }
1416 if (r < 0) {
1417 r = -errno;
1418 derr << __func__ << " direct_aligned_read" << " 0x" << std::hex
1419 << off << "~" << std::left << std::dec << " error: " << cpp_strerror(r)
1420 << dendl;
1421 goto out;
1422 }
1423 ceph_assert((uint64_t)r == len);
1424 }
1425
1426 dout(40) << __func__ << " data:\n";
1427 bufferlist bl;
1428 bl.append(buf, len);
1429 bl.hexdump(*_dout);
1430 *_dout << dendl;
1431
1432 out:
1433 return r < 0 ? r : 0;
1434 }
1435
1436 int KernelDevice::invalidate_cache(uint64_t off, uint64_t len)
1437 {
1438 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
1439 << dendl;
1440 ceph_assert(off % block_size == 0);
1441 ceph_assert(len % block_size == 0);
1442 int r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, POSIX_FADV_DONTNEED);
1443 if (r) {
1444 r = -r;
1445 derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
1446 << " error: " << cpp_strerror(r) << dendl;
1447 }
1448 return r;
1449 }