]> git.proxmox.com Git - ceph.git/blame - ceph/src/blk/kernel/KernelDevice.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / blk / kernel / KernelDevice.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
f67539c2 15#include <limits>
7c673cae
FG
16#include <unistd.h>
17#include <stdlib.h>
18#include <sys/types.h>
19#include <sys/stat.h>
20#include <fcntl.h>
11fdf7f2 21#include <sys/file.h>
2a845540 22#include <sys/mman.h>
7c673cae 23
20effc67
TL
24#include <boost/container/flat_map.hpp>
25#include <boost/lockfree/queue.hpp>
26
7c673cae 27#include "KernelDevice.h"
20effc67 28#include "include/buffer_raw.h"
9f95a23c 29#include "include/intarith.h"
7c673cae
FG
30#include "include/types.h"
31#include "include/compat.h"
32#include "include/stringify.h"
20effc67 33#include "include/str_map.h"
11fdf7f2 34#include "common/blkdev.h"
20effc67 35#include "common/buffer_instrumentation.h"
7c673cae 36#include "common/errno.h"
11fdf7f2
TL
37#if defined(__FreeBSD__)
38#include "bsm/audit_errno.h"
39#endif
7c673cae 40#include "common/debug.h"
11fdf7f2
TL
41#include "common/numa.h"
42
43#include "global/global_context.h"
f67539c2 44#include "io_uring.h"
7c673cae
FG
45
46#define dout_context cct
47#define dout_subsys ceph_subsys_bdev
48#undef dout_prefix
49#define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
50
f67539c2
TL
51using std::list;
52using std::map;
53using std::string;
54using std::vector;
55
56using ceph::bufferlist;
57using ceph::bufferptr;
58using ceph::make_timespan;
59using ceph::mono_clock;
60using ceph::operator <<;
61
11fdf7f2
TL
62KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
63 : BlockDevice(cct, cb, cbpriv),
64 aio(false), dio(false),
11fdf7f2
TL
65 discard_callback(d_cb),
66 discard_callback_priv(d_cbpriv),
7c673cae 67 aio_stop(false),
11fdf7f2
TL
68 discard_started(false),
69 discard_stop(false),
7c673cae 70 aio_thread(this),
11fdf7f2 71 discard_thread(this),
7c673cae
FG
72 injecting_crash(0)
73{
11fdf7f2
TL
74 fd_directs.resize(WRITE_LIFE_MAX, -1);
75 fd_buffereds.resize(WRITE_LIFE_MAX, -1);
9f95a23c 76
f67539c2 77 bool use_ioring = cct->_conf.get_val<bool>("bdev_ioring");
9f95a23c
TL
78 unsigned int iodepth = cct->_conf->bdev_aio_max_queue_depth;
79
80 if (use_ioring && ioring_queue_t::supported()) {
f67539c2
TL
81 bool use_ioring_hipri = cct->_conf.get_val<bool>("bdev_ioring_hipri");
82 bool use_ioring_sqthread_poll = cct->_conf.get_val<bool>("bdev_ioring_sqthread_poll");
83 io_queue = std::make_unique<ioring_queue_t>(iodepth, use_ioring_hipri, use_ioring_sqthread_poll);
9f95a23c
TL
84 } else {
85 static bool once;
86 if (use_ioring && !once) {
87 derr << "WARNING: io_uring API is not supported! Fallback to libaio!"
88 << dendl;
89 once = true;
90 }
91 io_queue = std::make_unique<aio_queue_t>(iodepth);
92 }
7c673cae
FG
93}
94
95int KernelDevice::_lock()
96{
adb31ebb
TL
97 // When the block changes, systemd-udevd will open the block,
98 // read some information and close it. Then a failure occurs here.
99 // So we need to try again here.
f67539c2 100 int fd = fd_directs[WRITE_LIFE_NOT_SET];
20effc67 101 dout(10) << __func__ << " fd=" << fd << dendl;
f67539c2
TL
102 uint64_t nr_tries = 0;
103 for (;;) {
20effc67
TL
104 struct flock fl = { .l_type = F_WRLCK,
105 .l_whence = SEEK_SET };
f67539c2
TL
106 int r = ::fcntl(fd, F_OFD_SETLK, &fl);
107 if (r < 0) {
108 if (errno == EINVAL) {
109 r = ::flock(fd, LOCK_EX | LOCK_NB);
110 }
111 }
112 if (r == 0) {
adb31ebb
TL
113 return 0;
114 }
f67539c2
TL
115 if (errno != EAGAIN) {
116 return -errno;
117 }
118 dout(1) << __func__ << " flock busy on " << path << dendl;
119 if (const uint64_t max_retry =
120 cct->_conf.get_val<uint64_t>("bdev_flock_retry");
121 max_retry > 0 && nr_tries++ == max_retry) {
122 return -EAGAIN;
123 }
124 double retry_interval =
125 cct->_conf.get_val<double>("bdev_flock_retry_interval");
126 std::this_thread::sleep_for(ceph::make_timespan(retry_interval));
11fdf7f2 127 }
7c673cae
FG
128}
129
130int KernelDevice::open(const string& p)
131{
132 path = p;
11fdf7f2 133 int r = 0, i = 0;
7c673cae
FG
134 dout(1) << __func__ << " path " << path << dendl;
135
11fdf7f2
TL
136 for (i = 0; i < WRITE_LIFE_MAX; i++) {
137 int fd = ::open(path.c_str(), O_RDWR | O_DIRECT);
138 if (fd < 0) {
139 r = -errno;
140 break;
141 }
142 fd_directs[i] = fd;
143
144 fd = ::open(path.c_str(), O_RDWR | O_CLOEXEC);
145 if (fd < 0) {
146 r = -errno;
147 break;
148 }
149 fd_buffereds[i] = fd;
7c673cae 150 }
11fdf7f2
TL
151
152 if (i != WRITE_LIFE_MAX) {
7c673cae 153 derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
11fdf7f2
TL
154 goto out_fail;
155 }
156
157#if defined(F_SET_FILE_RW_HINT)
158 for (i = WRITE_LIFE_NONE; i < WRITE_LIFE_MAX; i++) {
159 if (fcntl(fd_directs[i], F_SET_FILE_RW_HINT, &i) < 0) {
160 r = -errno;
161 break;
162 }
163 if (fcntl(fd_buffereds[i], F_SET_FILE_RW_HINT, &i) < 0) {
164 r = -errno;
165 break;
166 }
7c673cae 167 }
11fdf7f2
TL
168 if (i != WRITE_LIFE_MAX) {
169 enable_wrt = false;
170 dout(0) << "ioctl(F_SET_FILE_RW_HINT) on " << path << " failed: " << cpp_strerror(r) << dendl;
171 }
172#endif
173
7c673cae
FG
174 dio = true;
175 aio = cct->_conf->bdev_aio;
176 if (!aio) {
11fdf7f2 177 ceph_abort_msg("non-aio not supported");
7c673cae
FG
178 }
179
180 // disable readahead as it will wreak havoc on our mix of
181 // directio/aio and buffered io.
11fdf7f2 182 r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], 0, 0, POSIX_FADV_RANDOM);
7c673cae
FG
183 if (r) {
184 r = -r;
9f95a23c 185 derr << __func__ << " posix_fadvise got: " << cpp_strerror(r) << dendl;
7c673cae
FG
186 goto out_fail;
187 }
188
11fdf7f2
TL
189 if (lock_exclusive) {
190 r = _lock();
191 if (r < 0) {
192 derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r)
193 << dendl;
194 goto out_fail;
195 }
7c673cae
FG
196 }
197
198 struct stat st;
11fdf7f2 199 r = ::fstat(fd_directs[WRITE_LIFE_NOT_SET], &st);
7c673cae
FG
200 if (r < 0) {
201 r = -errno;
202 derr << __func__ << " fstat got " << cpp_strerror(r) << dendl;
203 goto out_fail;
204 }
205
206 // Operate as though the block size is 4 KB. The backing file
207 // blksize doesn't strictly matter except that some file systems may
208 // require a read/modify/write if we write something smaller than
209 // it.
210 block_size = cct->_conf->bdev_block_size;
211 if (block_size != (unsigned)st.st_blksize) {
212 dout(1) << __func__ << " backing device/file reports st_blksize "
213 << st.st_blksize << ", using bdev_block_size "
214 << block_size << " anyway" << dendl;
215 }
216
7c673cae
FG
217
218 {
11fdf7f2
TL
219 BlkDev blkdev_direct(fd_directs[WRITE_LIFE_NOT_SET]);
220 BlkDev blkdev_buffered(fd_buffereds[WRITE_LIFE_NOT_SET]);
221
222 if (S_ISBLK(st.st_mode)) {
223 int64_t s;
224 r = blkdev_direct.get_size(&s);
225 if (r < 0) {
226 goto out_fail;
227 }
228 size = s;
229 } else {
230 size = st.st_size;
231 }
232
7c673cae 233 char partition[PATH_MAX], devname[PATH_MAX];
11fdf7f2
TL
234 if ((r = blkdev_buffered.partition(partition, PATH_MAX)) ||
235 (r = blkdev_buffered.wholedisk(devname, PATH_MAX))) {
7c673cae 236 derr << "unable to get device name for " << path << ": "
11fdf7f2 237 << cpp_strerror(r) << dendl;
7c673cae
FG
238 rotational = true;
239 } else {
240 dout(20) << __func__ << " devname " << devname << dendl;
11fdf7f2
TL
241 rotational = blkdev_buffered.is_rotational();
242 support_discard = blkdev_buffered.support_discard();
20effc67 243 optimal_io_size = blkdev_buffered.get_optimal_io_size();
11fdf7f2
TL
244 this->devname = devname;
245 _detect_vdo();
7c673cae
FG
246 }
247 }
248
20effc67
TL
249 r = _post_open();
250 if (r < 0) {
251 goto out_fail;
252 }
253
31f18b77
FG
254 r = _aio_start();
255 if (r < 0) {
256 goto out_fail;
257 }
11fdf7f2 258 _discard_start();
7c673cae
FG
259
260 // round size down to an even block
261 size &= ~(block_size - 1);
262
7c673cae
FG
263 dout(1) << __func__
264 << " size " << size
265 << " (0x" << std::hex << size << std::dec << ", "
1adf2230 266 << byte_u_t(size) << ")"
7c673cae 267 << " block_size " << block_size
1adf2230 268 << " (" << byte_u_t(block_size) << ")"
7c673cae 269 << " " << (rotational ? "rotational" : "non-rotational")
11fdf7f2 270 << " discard " << (support_discard ? "supported" : "not supported")
7c673cae
FG
271 << dendl;
272 return 0;
273
11fdf7f2
TL
274out_fail:
275 for (i = 0; i < WRITE_LIFE_MAX; i++) {
276 if (fd_directs[i] >= 0) {
277 VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
278 fd_directs[i] = -1;
279 } else {
280 break;
281 }
282 if (fd_buffereds[i] >= 0) {
283 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
284 fd_buffereds[i] = -1;
285 } else {
286 break;
287 }
288 }
7c673cae
FG
289 return r;
290}
291
9f95a23c 292int KernelDevice::get_devices(std::set<std::string> *ls) const
11fdf7f2
TL
293{
294 if (devname.empty()) {
295 return 0;
296 }
297 get_raw_devices(devname, ls);
298 return 0;
299}
300
7c673cae
FG
301void KernelDevice::close()
302{
303 dout(1) << __func__ << dendl;
304 _aio_stop();
11fdf7f2 305 _discard_stop();
20effc67 306 _pre_close();
7c673cae 307
11fdf7f2
TL
308 if (vdo_fd >= 0) {
309 VOID_TEMP_FAILURE_RETRY(::close(vdo_fd));
310 vdo_fd = -1;
311 }
7c673cae 312
11fdf7f2
TL
313 for (int i = 0; i < WRITE_LIFE_MAX; i++) {
314 assert(fd_directs[i] >= 0);
315 VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
316 fd_directs[i] = -1;
7c673cae 317
11fdf7f2
TL
318 assert(fd_buffereds[i] >= 0);
319 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
320 fd_buffereds[i] = -1;
321 }
7c673cae
FG
322 path.clear();
323}
324
11fdf7f2 325int KernelDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
7c673cae 326{
11fdf7f2 327 (*pm)[prefix + "support_discard"] = stringify((int)(bool)support_discard);
7c673cae
FG
328 (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational);
329 (*pm)[prefix + "size"] = stringify(get_size());
330 (*pm)[prefix + "block_size"] = stringify(get_block_size());
20effc67 331 (*pm)[prefix + "optimal_io_size"] = stringify(get_optimal_io_size());
7c673cae
FG
332 (*pm)[prefix + "driver"] = "KernelDevice";
333 if (rotational) {
334 (*pm)[prefix + "type"] = "hdd";
335 } else {
336 (*pm)[prefix + "type"] = "ssd";
337 }
11fdf7f2
TL
338 if (vdo_fd >= 0) {
339 (*pm)[prefix + "vdo"] = "true";
340 uint64_t total, avail;
341 get_vdo_utilization(vdo_fd, &total, &avail);
342 (*pm)[prefix + "vdo_physical_size"] = stringify(total);
343 }
7c673cae 344
9f95a23c
TL
345 {
346 string res_names;
347 std::set<std::string> devnames;
348 if (get_devices(&devnames) == 0) {
349 for (auto& dev : devnames) {
350 if (!res_names.empty()) {
351 res_names += ",";
352 }
353 res_names += dev;
354 }
355 if (res_names.size()) {
356 (*pm)[prefix + "devices"] = res_names;
357 }
358 }
359 }
360
7c673cae 361 struct stat st;
11fdf7f2 362 int r = ::fstat(fd_buffereds[WRITE_LIFE_NOT_SET], &st);
7c673cae
FG
363 if (r < 0)
364 return -errno;
365 if (S_ISBLK(st.st_mode)) {
366 (*pm)[prefix + "access_mode"] = "blk";
11fdf7f2
TL
367
368 char buffer[1024] = {0};
369 BlkDev blkdev{fd_buffereds[WRITE_LIFE_NOT_SET]};
370 if (r = blkdev.partition(buffer, sizeof(buffer)); r) {
7c673cae 371 (*pm)[prefix + "partition_path"] = "unknown";
11fdf7f2
TL
372 } else {
373 (*pm)[prefix + "partition_path"] = buffer;
374 }
375 buffer[0] = '\0';
376 if (r = blkdev.partition(buffer, sizeof(buffer)); r) {
7c673cae 377 (*pm)[prefix + "dev_node"] = "unknown";
11fdf7f2
TL
378 } else {
379 (*pm)[prefix + "dev_node"] = buffer;
380 }
381 if (!r) {
382 return 0;
383 }
384 buffer[0] = '\0';
385 blkdev.model(buffer, sizeof(buffer));
386 (*pm)[prefix + "model"] = buffer;
387
388 buffer[0] = '\0';
389 blkdev.dev(buffer, sizeof(buffer));
390 (*pm)[prefix + "dev"] = buffer;
391
392 // nvme exposes a serial number
393 buffer[0] = '\0';
394 blkdev.serial(buffer, sizeof(buffer));
395 (*pm)[prefix + "serial"] = buffer;
396
11fdf7f2
TL
397 // numa
398 int node;
399 r = blkdev.get_numa_node(&node);
400 if (r >= 0) {
401 (*pm)[prefix + "numa_node"] = stringify(node);
7c673cae
FG
402 }
403 } else {
404 (*pm)[prefix + "access_mode"] = "file";
405 (*pm)[prefix + "path"] = path;
406 }
407 return 0;
408}
409
11fdf7f2
TL
410void KernelDevice::_detect_vdo()
411{
412 vdo_fd = get_vdo_stats_handle(devname.c_str(), &vdo_name);
413 if (vdo_fd >= 0) {
414 dout(1) << __func__ << " VDO volume " << vdo_name
415 << " maps to " << devname << dendl;
416 } else {
417 dout(20) << __func__ << " no VDO volume maps to " << devname << dendl;
418 }
419 return;
420}
421
422bool KernelDevice::get_thin_utilization(uint64_t *total, uint64_t *avail) const
423{
424 if (vdo_fd < 0) {
425 return false;
426 }
427 return get_vdo_utilization(vdo_fd, total, avail);
428}
429
430int KernelDevice::choose_fd(bool buffered, int write_hint) const
431{
20effc67 432#if defined(F_SET_FILE_RW_HINT)
11fdf7f2
TL
433 if (!enable_wrt)
434 write_hint = WRITE_LIFE_NOT_SET;
20effc67
TL
435#else
436 // Without WRITE_LIFE capabilities, only one file is used.
437 // And rocksdb sets this value also to > 0, so we need to catch this here
438 // instead of trusting rocksdb to set write_hint.
439 write_hint = WRITE_LIFE_NOT_SET;
440#endif
11fdf7f2
TL
441 return buffered ? fd_buffereds[write_hint] : fd_directs[write_hint];
442}
443
7c673cae
FG
444int KernelDevice::flush()
445{
31f18b77 446 // protect flush with a mutex. note that we are not really protecting
7c673cae
FG
447 // data here. instead, we're ensuring that if any flush() caller
448 // sees that io_since_flush is true, they block any racing callers
449 // until the flush is observed. that allows racing threads to be
450 // calling flush while still ensuring that *any* of them that got an
451 // aio completion notification will not return before that aio is
452 // stable on disk: whichever thread sees the flag first will block
453 // followers until the aio is stable.
11fdf7f2 454 std::lock_guard l(flush_mutex);
7c673cae
FG
455
456 bool expect = true;
457 if (!io_since_flush.compare_exchange_strong(expect, false)) {
458 dout(10) << __func__ << " no-op (no ios since last flush), flag is "
459 << (int)io_since_flush.load() << dendl;
460 return 0;
461 }
462
463 dout(10) << __func__ << " start" << dendl;
464 if (cct->_conf->bdev_inject_crash) {
465 ++injecting_crash;
466 // sleep for a moment to give other threads a chance to submit or
467 // wait on io that races with a flush.
468 derr << __func__ << " injecting crash. first we sleep..." << dendl;
469 sleep(cct->_conf->bdev_inject_crash_flush_delay);
470 derr << __func__ << " and now we die" << dendl;
471 cct->_log->flush();
472 _exit(1);
473 }
474 utime_t start = ceph_clock_now();
11fdf7f2 475 int r = ::fdatasync(fd_directs[WRITE_LIFE_NOT_SET]);
7c673cae
FG
476 utime_t end = ceph_clock_now();
477 utime_t dur = end - start;
478 if (r < 0) {
479 r = -errno;
480 derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl;
481 ceph_abort();
482 }
483 dout(5) << __func__ << " in " << dur << dendl;;
484 return r;
485}
486
487int KernelDevice::_aio_start()
488{
489 if (aio) {
490 dout(10) << __func__ << dendl;
9f95a23c 491 int r = io_queue->init(fd_directs);
7c673cae 492 if (r < 0) {
31f18b77
FG
493 if (r == -EAGAIN) {
494 derr << __func__ << " io_setup(2) failed with EAGAIN; "
495 << "try increasing /proc/sys/fs/aio-max-nr" << dendl;
496 } else {
497 derr << __func__ << " io_setup(2) failed: " << cpp_strerror(r) << dendl;
498 }
7c673cae
FG
499 return r;
500 }
501 aio_thread.create("bstore_aio");
502 }
503 return 0;
504}
505
506void KernelDevice::_aio_stop()
507{
508 if (aio) {
509 dout(10) << __func__ << dendl;
510 aio_stop = true;
511 aio_thread.join();
512 aio_stop = false;
9f95a23c 513 io_queue->shutdown();
7c673cae
FG
514 }
515}
516
11fdf7f2
TL
517int KernelDevice::_discard_start()
518{
519 discard_thread.create("bstore_discard");
520 return 0;
521}
522
523void KernelDevice::_discard_stop()
524{
525 dout(10) << __func__ << dendl;
526 {
527 std::unique_lock l(discard_lock);
528 while (!discard_started) {
529 discard_cond.wait(l);
530 }
531 discard_stop = true;
532 discard_cond.notify_all();
533 }
534 discard_thread.join();
535 {
536 std::lock_guard l(discard_lock);
537 discard_stop = false;
538 }
539 dout(10) << __func__ << " stopped" << dendl;
540}
541
542void KernelDevice::discard_drain()
543{
544 dout(10) << __func__ << dendl;
545 std::unique_lock l(discard_lock);
546 while (!discard_queued.empty() || discard_running) {
547 discard_cond.wait(l);
548 }
549}
550
28e407b8
AA
551static bool is_expected_ioerr(const int r)
552{
553 // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
554 return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC ||
11fdf7f2 555 r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO ||
28e407b8 556 r == -ENODATA || r == -EILSEQ || r == -ENOMEM ||
11fdf7f2
TL
557#if defined(__linux__)
558 r == -EREMCHG || r == -EBADE
559#elif defined(__FreeBSD__)
560 r == - BSM_ERRNO_EREMCHG || r == -BSM_ERRNO_EBADE
561#endif
562 );
28e407b8
AA
563}
564
7c673cae
FG
565void KernelDevice::_aio_thread()
566{
567 dout(10) << __func__ << " start" << dendl;
568 int inject_crash_count = 0;
569 while (!aio_stop) {
570 dout(40) << __func__ << " polling" << dendl;
224ce89b 571 int max = cct->_conf->bdev_aio_reap_max;
7c673cae 572 aio_t *aio[max];
9f95a23c 573 int r = io_queue->get_next_completed(cct->_conf->bdev_aio_poll_ms,
7c673cae
FG
574 aio, max);
575 if (r < 0) {
576 derr << __func__ << " got " << cpp_strerror(r) << dendl;
11fdf7f2 577 ceph_abort_msg("got unexpected error from io_getevents");
7c673cae
FG
578 }
579 if (r > 0) {
580 dout(30) << __func__ << " got " << r << " completed aios" << dendl;
581 for (int i = 0; i < r; ++i) {
582 IOContext *ioc = static_cast<IOContext*>(aio[i]->priv);
583 _aio_log_finish(ioc, aio[i]->offset, aio[i]->length);
584 if (aio[i]->queue_item.is_linked()) {
11fdf7f2 585 std::lock_guard l(debug_queue_lock);
7c673cae
FG
586 debug_aio_unlink(*aio[i]);
587 }
588
589 // set flag indicating new ios have completed. we do this *before*
590 // any completion or notifications so that any user flush() that
591 // follows the observed io completion will include this io. Note
592 // that an earlier, racing flush() could observe and clear this
593 // flag, but that also ensures that the IO will be stable before the
594 // later flush() occurs.
595 io_since_flush.store(true);
596
94b18763 597 long r = aio[i]->get_return_value();
b32b8144 598 if (r < 0) {
28e407b8
AA
599 derr << __func__ << " got r=" << r << " (" << cpp_strerror(r) << ")"
600 << dendl;
601 if (ioc->allow_eio && is_expected_ioerr(r)) {
602 derr << __func__ << " translating the error to EIO for upper layer"
603 << dendl;
604 ioc->set_return_value(-EIO);
b32b8144 605 } else {
11fdf7f2
TL
606 if (is_expected_ioerr(r)) {
607 note_io_error_event(
608 devname.c_str(),
609 path.c_str(),
610 r,
81eedcae
TL
611#if defined(HAVE_POSIXAIO)
612 aio[i]->aio.aiocb.aio_lio_opcode,
613#else
614 aio[i]->iocb.aio_lio_opcode,
615#endif
11fdf7f2
TL
616 aio[i]->offset,
617 aio[i]->length);
618 ceph_abort_msg(
619 "Unexpected IO error. "
620 "This may suggest a hardware issue. "
621 "Please check your kernel log!");
622 }
623 ceph_abort_msg(
624 "Unexpected IO error. "
625 "This may suggest HW issue. Please check your dmesg!");
b32b8144
FG
626 }
627 } else if (aio[i]->length != (uint64_t)r) {
eafe8130
TL
628 derr << "aio to 0x" << std::hex << aio[i]->offset
629 << "~" << aio[i]->length << std::dec
b32b8144 630 << " but returned: " << r << dendl;
11fdf7f2 631 ceph_abort_msg("unexpected aio return value: does not match length");
b32b8144
FG
632 }
633
634 dout(10) << __func__ << " finished aio " << aio[i] << " r " << r
635 << " ioc " << ioc
636 << " with " << (ioc->num_running.load() - 1)
637 << " aios left" << dendl;
7c673cae
FG
638
639 // NOTE: once num_running and we either call the callback or
640 // call aio_wake we cannot touch ioc or aio[] as the caller
641 // may free it.
642 if (ioc->priv) {
643 if (--ioc->num_running == 0) {
644 aio_callback(aio_callback_priv, ioc->priv);
645 }
646 } else {
31f18b77 647 ioc->try_aio_wake();
7c673cae
FG
648 }
649 }
650 }
651 if (cct->_conf->bdev_debug_aio) {
652 utime_t now = ceph_clock_now();
11fdf7f2 653 std::lock_guard l(debug_queue_lock);
7c673cae
FG
654 if (debug_oldest) {
655 if (debug_stall_since == utime_t()) {
656 debug_stall_since = now;
657 } else {
11fdf7f2
TL
658 if (cct->_conf->bdev_debug_aio_suicide_timeout) {
659 utime_t cutoff = now;
660 cutoff -= cct->_conf->bdev_debug_aio_suicide_timeout;
661 if (debug_stall_since < cutoff) {
662 derr << __func__ << " stalled aio " << debug_oldest
663 << " since " << debug_stall_since << ", timeout is "
664 << cct->_conf->bdev_debug_aio_suicide_timeout
665 << "s, suicide" << dendl;
666 ceph_abort_msg("stalled aio... buggy kernel or bad device?");
667 }
7c673cae
FG
668 }
669 }
670 }
671 }
7c673cae
FG
672 if (cct->_conf->bdev_inject_crash) {
673 ++inject_crash_count;
674 if (inject_crash_count * cct->_conf->bdev_aio_poll_ms / 1000 >
675 cct->_conf->bdev_inject_crash + cct->_conf->bdev_inject_crash_flush_delay) {
676 derr << __func__ << " bdev_inject_crash trigger from aio thread"
677 << dendl;
678 cct->_log->flush();
679 _exit(1);
680 }
681 }
682 }
7c673cae
FG
683 dout(10) << __func__ << " end" << dendl;
684}
685
11fdf7f2
TL
686void KernelDevice::_discard_thread()
687{
688 std::unique_lock l(discard_lock);
689 ceph_assert(!discard_started);
690 discard_started = true;
691 discard_cond.notify_all();
692 while (true) {
693 ceph_assert(discard_finishing.empty());
694 if (discard_queued.empty()) {
695 if (discard_stop)
696 break;
697 dout(20) << __func__ << " sleep" << dendl;
698 discard_cond.notify_all(); // for the thread trying to drain...
699 discard_cond.wait(l);
700 dout(20) << __func__ << " wake" << dendl;
701 } else {
702 discard_finishing.swap(discard_queued);
703 discard_running = true;
704 l.unlock();
705 dout(20) << __func__ << " finishing" << dendl;
706 for (auto p = discard_finishing.begin();p != discard_finishing.end(); ++p) {
707 discard(p.get_start(), p.get_len());
708 }
709
710 discard_callback(discard_callback_priv, static_cast<void*>(&discard_finishing));
711 discard_finishing.clear();
712 l.lock();
713 discard_running = false;
714 }
715 }
716 dout(10) << __func__ << " finish" << dendl;
717 discard_started = false;
718}
719
720int KernelDevice::queue_discard(interval_set<uint64_t> &to_release)
721{
722 if (!support_discard)
723 return -1;
724
725 if (to_release.empty())
726 return 0;
727
728 std::lock_guard l(discard_lock);
729 discard_queued.insert(to_release);
730 discard_cond.notify_all();
731 return 0;
732}
733
7c673cae
FG
734void KernelDevice::_aio_log_start(
735 IOContext *ioc,
736 uint64_t offset,
737 uint64_t length)
738{
739 dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
740 << std::dec << dendl;
741 if (cct->_conf->bdev_debug_inflight_ios) {
11fdf7f2 742 std::lock_guard l(debug_lock);
7c673cae
FG
743 if (debug_inflight.intersects(offset, length)) {
744 derr << __func__ << " inflight overlap of 0x"
745 << std::hex
746 << offset << "~" << length << std::dec
747 << " with " << debug_inflight << dendl;
748 ceph_abort();
749 }
750 debug_inflight.insert(offset, length);
751 }
752}
753
754void KernelDevice::debug_aio_link(aio_t& aio)
755{
756 if (debug_queue.empty()) {
757 debug_oldest = &aio;
758 }
759 debug_queue.push_back(aio);
760}
761
762void KernelDevice::debug_aio_unlink(aio_t& aio)
763{
764 if (aio.queue_item.is_linked()) {
765 debug_queue.erase(debug_queue.iterator_to(aio));
766 if (debug_oldest == &aio) {
11fdf7f2
TL
767 auto age = cct->_conf->bdev_debug_aio_log_age;
768 if (age && debug_stall_since != utime_t()) {
769 utime_t cutoff = ceph_clock_now();
770 cutoff -= age;
771 if (debug_stall_since < cutoff) {
772 derr << __func__ << " stalled aio " << debug_oldest
773 << " since " << debug_stall_since << ", timeout is "
774 << age
775 << "s" << dendl;
776 }
777 }
778
7c673cae
FG
779 if (debug_queue.empty()) {
780 debug_oldest = nullptr;
781 } else {
782 debug_oldest = &debug_queue.front();
783 }
784 debug_stall_since = utime_t();
785 }
786 }
787}
788
789void KernelDevice::_aio_log_finish(
790 IOContext *ioc,
791 uint64_t offset,
792 uint64_t length)
793{
794 dout(20) << __func__ << " " << aio << " 0x"
795 << std::hex << offset << "~" << length << std::dec << dendl;
796 if (cct->_conf->bdev_debug_inflight_ios) {
11fdf7f2 797 std::lock_guard l(debug_lock);
7c673cae
FG
798 debug_inflight.erase(offset, length);
799 }
800}
801
802void KernelDevice::aio_submit(IOContext *ioc)
803{
804 dout(20) << __func__ << " ioc " << ioc
805 << " pending " << ioc->num_pending.load()
806 << " running " << ioc->num_running.load()
807 << dendl;
224ce89b 808
7c673cae
FG
809 if (ioc->num_pending.load() == 0) {
810 return;
811 }
224ce89b 812
7c673cae
FG
813 // move these aside, and get our end iterator position now, as the
814 // aios might complete as soon as they are submitted and queue more
815 // wal aio's.
816 list<aio_t>::iterator e = ioc->running_aios.begin();
817 ioc->running_aios.splice(e, ioc->pending_aios);
7c673cae
FG
818
819 int pending = ioc->num_pending.load();
820 ioc->num_running += pending;
821 ioc->num_pending -= pending;
11fdf7f2
TL
822 ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this
823 ceph_assert(ioc->pending_aios.size() == 0);
824
224ce89b
WB
825 if (cct->_conf->bdev_debug_aio) {
826 list<aio_t>::iterator p = ioc->running_aios.begin();
827 while (p != e) {
11fdf7f2
TL
828 dout(30) << __func__ << " " << *p << dendl;
829 std::lock_guard l(debug_queue_lock);
224ce89b 830 debug_aio_link(*p++);
7c673cae
FG
831 }
832 }
224ce89b
WB
833
834 void *priv = static_cast<void*>(ioc);
835 int r, retries = 0;
f67539c2
TL
836 // num of pending aios should not overflow when passed to submit_batch()
837 assert(pending <= std::numeric_limits<uint16_t>::max());
9f95a23c 838 r = io_queue->submit_batch(ioc->running_aios.begin(), e,
11fdf7f2
TL
839 pending, priv, &retries);
840
224ce89b
WB
841 if (retries)
842 derr << __func__ << " retries " << retries << dendl;
843 if (r < 0) {
844 derr << " aio submit got " << cpp_strerror(r) << dendl;
11fdf7f2 845 ceph_assert(r == 0);
224ce89b 846 }
7c673cae
FG
847}
848
11fdf7f2 849int KernelDevice::_sync_write(uint64_t off, bufferlist &bl, bool buffered, int write_hint)
7c673cae
FG
850{
851 uint64_t len = bl.length();
852 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
20effc67 853 << std::dec << " " << buffermode(buffered) << dendl;
7c673cae
FG
854 if (cct->_conf->bdev_inject_crash &&
855 rand() % cct->_conf->bdev_inject_crash == 0) {
856 derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
857 << off << "~" << len << std::dec << dendl;
858 ++injecting_crash;
859 return 0;
860 }
861 vector<iovec> iov;
862 bl.prepare_iov(&iov);
7c673cae 863
e306af50
TL
864 auto left = len;
865 auto o = off;
866 size_t idx = 0;
867 do {
868 auto r = ::pwritev(choose_fd(buffered, write_hint),
869 &iov[idx], iov.size() - idx, o);
870
871 if (r < 0) {
872 r = -errno;
873 derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl;
874 return r;
875 }
876 o += r;
877 left -= r;
878 if (left) {
879 // skip fully processed IOVs
880 while (idx < iov.size() && (size_t)r >= iov[idx].iov_len) {
881 r -= iov[idx++].iov_len;
882 }
883 // update partially processed one if any
884 if (r) {
885 ceph_assert(idx < iov.size());
886 ceph_assert((size_t)r < iov[idx].iov_len);
887 iov[idx].iov_base = static_cast<char*>(iov[idx].iov_base) + r;
888 iov[idx].iov_len -= r;
889 r = 0;
890 }
891 ceph_assert(r == 0);
892 }
893 } while (left);
894
11fdf7f2 895#ifdef HAVE_SYNC_FILE_RANGE
7c673cae 896 if (buffered) {
494da23a 897 // initiate IO and wait till it completes
e306af50 898 auto r = ::sync_file_range(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, SYNC_FILE_RANGE_WRITE|SYNC_FILE_RANGE_WAIT_AFTER|SYNC_FILE_RANGE_WAIT_BEFORE);
7c673cae
FG
899 if (r < 0) {
900 r = -errno;
901 derr << __func__ << " sync_file_range error: " << cpp_strerror(r) << dendl;
902 return r;
903 }
904 }
11fdf7f2 905#endif
31f18b77
FG
906
907 io_since_flush.store(true);
908
7c673cae
FG
909 return 0;
910}
911
912int KernelDevice::write(
913 uint64_t off,
914 bufferlist &bl,
11fdf7f2
TL
915 bool buffered,
916 int write_hint)
7c673cae
FG
917{
918 uint64_t len = bl.length();
919 dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
20effc67 920 << " " << buffermode(buffered)
7c673cae 921 << dendl;
11fdf7f2 922 ceph_assert(is_valid_io(off, len));
eafe8130
TL
923 if (cct->_conf->objectstore_blackhole) {
924 lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
925 << dendl;
926 return 0;
927 }
7c673cae
FG
928
929 if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
b32b8144 930 bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) {
7c673cae
FG
931 dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
932 }
20effc67 933 dout(40) << "data:\n";
7c673cae
FG
934 bl.hexdump(*_dout);
935 *_dout << dendl;
936
11fdf7f2 937 return _sync_write(off, bl, buffered, write_hint);
7c673cae
FG
938}
939
940int KernelDevice::aio_write(
941 uint64_t off,
942 bufferlist &bl,
943 IOContext *ioc,
11fdf7f2
TL
944 bool buffered,
945 int write_hint)
7c673cae
FG
946{
947 uint64_t len = bl.length();
948 dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
20effc67 949 << " " << buffermode(buffered)
7c673cae 950 << dendl;
11fdf7f2 951 ceph_assert(is_valid_io(off, len));
eafe8130
TL
952 if (cct->_conf->objectstore_blackhole) {
953 lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
954 << dendl;
955 return 0;
956 }
7c673cae
FG
957
958 if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
b32b8144 959 bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) {
7c673cae
FG
960 dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
961 }
20effc67 962 dout(40) << "data:\n";
7c673cae
FG
963 bl.hexdump(*_dout);
964 *_dout << dendl;
965
966 _aio_log_start(ioc, off, len);
967
968#ifdef HAVE_LIBAIO
969 if (aio && dio && !buffered) {
7c673cae
FG
970 if (cct->_conf->bdev_inject_crash &&
971 rand() % cct->_conf->bdev_inject_crash == 0) {
972 derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
973 << off << "~" << len << std::dec
974 << dendl;
975 // generate a real io so that aio_wait behaves properly, but make it
976 // a read instead of write, and toss the result.
494da23a
TL
977 ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
978 ++ioc->num_pending;
979 auto& aio = ioc->pending_aios.back();
20effc67
TL
980 aio.bl.push_back(
981 ceph::buffer::ptr_node::create(ceph::buffer::create_small_page_aligned(len)));
9f95a23c
TL
982 aio.bl.prepare_iov(&aio.iov);
983 aio.preadv(off, len);
7c673cae
FG
984 ++injecting_crash;
985 } else {
494da23a
TL
986 if (bl.length() <= RW_IO_MAX) {
987 // fast path (non-huge write)
988 ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
989 ++ioc->num_pending;
990 auto& aio = ioc->pending_aios.back();
991 bl.prepare_iov(&aio.iov);
992 aio.bl.claim_append(bl);
993 aio.pwritev(off, len);
994 dout(30) << aio << dendl;
995 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
996 << std::dec << " aio " << &aio << dendl;
997 } else {
998 // write in RW_IO_MAX-sized chunks
999 uint64_t prev_len = 0;
1000 while (prev_len < bl.length()) {
1001 bufferlist tmp;
1002 if (prev_len + RW_IO_MAX < bl.length()) {
1003 tmp.substr_of(bl, prev_len, RW_IO_MAX);
1004 } else {
1005 tmp.substr_of(bl, prev_len, bl.length() - prev_len);
1006 }
1007 auto len = tmp.length();
1008 ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
1009 ++ioc->num_pending;
1010 auto& aio = ioc->pending_aios.back();
1011 tmp.prepare_iov(&aio.iov);
1012 aio.bl.claim_append(tmp);
1013 aio.pwritev(off + prev_len, len);
1014 dout(30) << aio << dendl;
1015 dout(5) << __func__ << " 0x" << std::hex << off + prev_len
1016 << "~" << len
1017 << std::dec << " aio " << &aio << " (piece)" << dendl;
1018 prev_len += len;
1019 }
1020 }
7c673cae 1021 }
7c673cae
FG
1022 } else
1023#endif
1024 {
11fdf7f2 1025 int r = _sync_write(off, bl, buffered, write_hint);
7c673cae
FG
1026 _aio_log_finish(ioc, off, len);
1027 if (r < 0)
1028 return r;
1029 }
1030 return 0;
1031}
1032
11fdf7f2
TL
1033int KernelDevice::discard(uint64_t offset, uint64_t len)
1034{
1035 int r = 0;
eafe8130
TL
1036 if (cct->_conf->objectstore_blackhole) {
1037 lderr(cct) << __func__ << " objectstore_blackhole=true, throwing out IO"
1038 << dendl;
1039 return 0;
1040 }
11fdf7f2
TL
1041 if (support_discard) {
1042 dout(10) << __func__
1043 << " 0x" << std::hex << offset << "~" << len << std::dec
1044 << dendl;
1045
1046 r = BlkDev{fd_directs[WRITE_LIFE_NOT_SET]}.discard((int64_t)offset, (int64_t)len);
1047 }
1048 return r;
1049}
1050
20effc67
TL
1051struct ExplicitHugePagePool {
1052 using region_queue_t = boost::lockfree::queue<void*>;
1053 using instrumented_raw = ceph::buffer_instrumentation::instrumented_raw<
1054 BlockDevice::hugepaged_raw_marker_t>;
1055
1056 struct mmaped_buffer_raw : public instrumented_raw {
1057 region_queue_t& region_q; // for recycling
1058
1059 mmaped_buffer_raw(void* mmaped_region, ExplicitHugePagePool& parent)
1060 : instrumented_raw(static_cast<char*>(mmaped_region), parent.buffer_size),
1061 region_q(parent.region_q) {
1062 // the `mmaped_region` has been passed to `raw` as the buffer's `data`
1063 }
1064 ~mmaped_buffer_raw() override {
1065 // don't delete nor unmmap; recycle the region instead
1066 region_q.push(data);
1067 }
1068 raw* clone_empty() override {
1069 // the entire cloning facility is used solely by the dev-only MemDB.
1070 // see: https://github.com/ceph/ceph/pull/36282
1071 ceph_abort_msg("this should be never called on this path!");
1072 }
1073 };
1074
1075 ExplicitHugePagePool(const size_t buffer_size, size_t buffers_in_pool)
1076 : buffer_size(buffer_size), region_q(buffers_in_pool) {
1077 while (buffers_in_pool--) {
1078 void* const mmaped_region = ::mmap(
1079 nullptr,
1080 buffer_size,
1081 PROT_READ | PROT_WRITE,
2a845540
TL
1082#if defined(__FreeBSD__)
1083 // FreeBSD doesn't have MAP_HUGETLB nor MAP_POPULATE but it has
1084 // a different, more automated / implicit mechanisms. However,
1085 // we want to mimic the Linux behavior as closely as possible
1086 // also in the matter of error handling which is the reason
1087 // behind MAP_ALIGNED_SUPER.
1088 // See: https://lists.freebsd.org/pipermail/freebsd-questions/2014-August/260578.html
1089 MAP_PRIVATE | MAP_ANONYMOUS | MAP_PREFAULT_READ | MAP_ALIGNED_SUPER,
1090#else
20effc67 1091 MAP_PRIVATE | MAP_ANONYMOUS | MAP_POPULATE | MAP_HUGETLB,
2a845540 1092#endif // __FreeBSD__
20effc67
TL
1093 -1,
1094 0);
1095 if (mmaped_region == MAP_FAILED) {
1096 ceph_abort("can't allocate huge buffer;"
1097 " /proc/sys/vm/nr_hugepages misconfigured?");
1098 } else {
1099 region_q.push(mmaped_region);
1100 }
1101 }
1102 }
1103 ~ExplicitHugePagePool() {
1104 void* mmaped_region;
1105 while (region_q.pop(mmaped_region)) {
1106 ::munmap(mmaped_region, buffer_size);
1107 }
1108 }
1109
1110 ceph::unique_leakable_ptr<buffer::raw> try_create() {
1111 if (void* mmaped_region; region_q.pop(mmaped_region)) {
1112 return ceph::unique_leakable_ptr<buffer::raw> {
1113 new mmaped_buffer_raw(mmaped_region, *this)
1114 };
1115 } else {
1116 // oops, empty queue.
1117 return nullptr;
1118 }
1119 }
1120
1121 size_t get_buffer_size() const {
1122 return buffer_size;
1123 }
1124
1125private:
1126 const size_t buffer_size;
1127 region_queue_t region_q;
1128};
1129
1130struct HugePagePoolOfPools {
1131 HugePagePoolOfPools(const std::map<size_t, size_t> conf)
1132 : pools(conf.size(), [conf] (size_t index, auto emplacer) {
1133 ceph_assert(index < conf.size());
1134 // it could be replaced with a state-mutating lambda and
1135 // `conf::erase()` but performance is not a concern here.
1136 const auto [buffer_size, buffers_in_pool] =
1137 *std::next(std::begin(conf), index);
1138 emplacer.emplace(buffer_size, buffers_in_pool);
1139 }) {
1140 }
1141
1142 ceph::unique_leakable_ptr<buffer::raw> try_create(const size_t size) {
1143 // thankfully to `conf` being a `std::map` we store the pools
1144 // sorted by buffer sizes. this would allow to clamp to log(n)
1145 // but I doubt admins want to have dozens of accelerated buffer
1146 // size. let's keep this simple for now.
1147 if (auto iter = std::find_if(std::begin(pools), std::end(pools),
1148 [size] (const auto& pool) {
1149 return size == pool.get_buffer_size();
1150 });
1151 iter != std::end(pools)) {
1152 return iter->try_create();
1153 }
1154 return nullptr;
1155 }
1156
1157 static HugePagePoolOfPools from_desc(const std::string& conf);
1158
1159private:
1160 // let's have some space inside (for 2 MB and 4 MB perhaps?)
1161 // NOTE: we need tiny_vector as the boost::lockfree queue inside
1162 // pool is not-movable.
1163 ceph::containers::tiny_vector<ExplicitHugePagePool, 2> pools;
1164};
1165
1166
1167HugePagePoolOfPools HugePagePoolOfPools::from_desc(const std::string& desc) {
1168 std::map<size_t, size_t> conf; // buffer_size -> buffers_in_pool
1169 std::map<std::string, std::string> exploded_str_conf;
1170 get_str_map(desc, &exploded_str_conf);
1171 for (const auto& [buffer_size_s, buffers_in_pool_s] : exploded_str_conf) {
1172 size_t buffer_size, buffers_in_pool;
1173 if (sscanf(buffer_size_s.c_str(), "%zu", &buffer_size) != 1) {
1174 ceph_abort("can't parse a key in the configuration");
1175 }
1176 if (sscanf(buffers_in_pool_s.c_str(), "%zu", &buffers_in_pool) != 1) {
1177 ceph_abort("can't parse a value in the configuration");
1178 }
1179 conf[buffer_size] = buffers_in_pool;
1180 }
1181 return HugePagePoolOfPools{std::move(conf)};
1182}
1183
1184// create a buffer basing on user-configurable. it's intended to make
1185// our buffers THP-able.
1186ceph::unique_leakable_ptr<buffer::raw> KernelDevice::create_custom_aligned(
1187 const size_t len,
1188 IOContext* const ioc) const
1189{
1190 // just to preserve the logic of create_small_page_aligned().
1191 if (len < CEPH_PAGE_SIZE) {
1192 return ceph::buffer::create_small_page_aligned(len);
1193 } else {
1194 static HugePagePoolOfPools hp_pools = HugePagePoolOfPools::from_desc(
1195 cct->_conf.get_val<std::string>("bdev_read_preallocated_huge_buffers")
1196 );
1197 if (auto lucky_raw = hp_pools.try_create(len); lucky_raw) {
1198 dout(20) << __func__ << " allocated from huge pool"
1199 << " lucky_raw.data=" << (void*)lucky_raw->get_data()
1200 << " bdev_read_preallocated_huge_buffers="
1201 << cct->_conf.get_val<std::string>("bdev_read_preallocated_huge_buffers")
1202 << dendl;
1203 ioc->flags |= IOContext::FLAG_DONT_CACHE;
1204 return lucky_raw;
1205 } else {
1206 // fallthrough due to empty buffer pool. this can happen also
1207 // when the configurable was explicitly set to 0.
1208 dout(20) << __func__ << " cannot allocate from huge pool"
1209 << dendl;
1210 }
1211 }
1212 const size_t custom_alignment = cct->_conf->bdev_read_buffer_alignment;
1213 dout(20) << __func__ << " with the custom alignment;"
1214 << " len=" << len
1215 << " custom_alignment=" << custom_alignment
1216 << dendl;
1217 return ceph::buffer::create_aligned(len, custom_alignment);
1218}
1219
7c673cae
FG
1220int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
1221 IOContext *ioc,
1222 bool buffered)
1223{
1224 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
20effc67 1225 << " " << buffermode(buffered)
7c673cae 1226 << dendl;
11fdf7f2 1227 ceph_assert(is_valid_io(off, len));
7c673cae
FG
1228
1229 _aio_log_start(ioc, off, len);
1230
11fdf7f2
TL
1231 auto start1 = mono_clock::now();
1232
20effc67
TL
1233 auto p = ceph::buffer::ptr_node::create(create_custom_aligned(len, ioc));
1234 int r = ::pread(choose_fd(buffered, WRITE_LIFE_NOT_SET),
11fdf7f2
TL
1235 p->c_str(), len, off);
1236 auto age = cct->_conf->bdev_debug_aio_log_age;
1237 if (mono_clock::now() - start1 >= make_timespan(age)) {
1238 derr << __func__ << " stalled read "
1239 << " 0x" << std::hex << off << "~" << len << std::dec
20effc67 1240 << " " << buffermode(buffered)
11fdf7f2
TL
1241 << " since " << start1 << ", timeout is "
1242 << age
1243 << "s" << dendl;
1244 }
7c673cae 1245 if (r < 0) {
a8e16298
TL
1246 if (ioc->allow_eio && is_expected_ioerr(r)) {
1247 r = -EIO;
1248 } else {
1249 r = -errno;
1250 }
20effc67
TL
1251 derr << __func__ << " 0x" << std::hex << off << "~" << std::left
1252 << std::dec << " error: " << cpp_strerror(r) << dendl;
7c673cae
FG
1253 goto out;
1254 }
11fdf7f2 1255 ceph_assert((uint64_t)r == len);
7c673cae
FG
1256 pbl->push_back(std::move(p));
1257
20effc67 1258 dout(40) << "data:\n";
7c673cae
FG
1259 pbl->hexdump(*_dout);
1260 *_dout << dendl;
1261
1262 out:
1263 _aio_log_finish(ioc, off, len);
1264 return r < 0 ? r : 0;
1265}
1266
1267int KernelDevice::aio_read(
1268 uint64_t off,
1269 uint64_t len,
1270 bufferlist *pbl,
1271 IOContext *ioc)
1272{
1273 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
1274 << dendl;
1275
1276 int r = 0;
1277#ifdef HAVE_LIBAIO
1278 if (aio && dio) {
11fdf7f2 1279 ceph_assert(is_valid_io(off, len));
7c673cae 1280 _aio_log_start(ioc, off, len);
11fdf7f2 1281 ioc->pending_aios.push_back(aio_t(ioc, fd_directs[WRITE_LIFE_NOT_SET]));
7c673cae
FG
1282 ++ioc->num_pending;
1283 aio_t& aio = ioc->pending_aios.back();
20effc67
TL
1284 aio.bl.push_back(
1285 ceph::buffer::ptr_node::create(create_custom_aligned(len, ioc)));
9f95a23c
TL
1286 aio.bl.prepare_iov(&aio.iov);
1287 aio.preadv(off, len);
11fdf7f2 1288 dout(30) << aio << dendl;
7c673cae
FG
1289 pbl->append(aio.bl);
1290 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
1291 << std::dec << " aio " << &aio << dendl;
1292 } else
1293#endif
1294 {
1295 r = read(off, len, pbl, ioc, false);
1296 }
1297
1298 return r;
1299}
1300
1301int KernelDevice::direct_read_unaligned(uint64_t off, uint64_t len, char *buf)
1302{
9f95a23c
TL
1303 uint64_t aligned_off = p2align(off, block_size);
1304 uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off;
f67539c2 1305 bufferptr p = ceph::buffer::create_small_page_aligned(aligned_len);
7c673cae
FG
1306 int r = 0;
1307
11fdf7f2
TL
1308 auto start1 = mono_clock::now();
1309 r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], p.c_str(), aligned_len, aligned_off);
1310 auto age = cct->_conf->bdev_debug_aio_log_age;
1311 if (mono_clock::now() - start1 >= make_timespan(age)) {
1312 derr << __func__ << " stalled read "
1313 << " 0x" << std::hex << off << "~" << len << std::dec
1314 << " since " << start1 << ", timeout is "
1315 << age
1316 << "s" << dendl;
1317 }
1318
7c673cae
FG
1319 if (r < 0) {
1320 r = -errno;
11fdf7f2 1321 derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
7c673cae
FG
1322 << " error: " << cpp_strerror(r) << dendl;
1323 goto out;
1324 }
11fdf7f2 1325 ceph_assert((uint64_t)r == aligned_len);
7c673cae
FG
1326 memcpy(buf, p.c_str() + (off - aligned_off), len);
1327
20effc67 1328 dout(40) << __func__ << " data:\n";
7c673cae
FG
1329 bufferlist bl;
1330 bl.append(buf, len);
1331 bl.hexdump(*_dout);
1332 *_dout << dendl;
1333
1334 out:
1335 return r < 0 ? r : 0;
1336}
1337
1338int KernelDevice::read_random(uint64_t off, uint64_t len, char *buf,
1339 bool buffered)
1340{
1341 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
494da23a 1342 << "buffered " << buffered
7c673cae 1343 << dendl;
11fdf7f2
TL
1344 ceph_assert(len > 0);
1345 ceph_assert(off < size);
1346 ceph_assert(off + len <= size);
7c673cae 1347 int r = 0;
11fdf7f2 1348 auto age = cct->_conf->bdev_debug_aio_log_age;
7c673cae
FG
1349
1350 //if it's direct io and unaligned, we have to use a internal buffer
1351 if (!buffered && ((off % block_size != 0)
1352 || (len % block_size != 0)
1353 || (uintptr_t(buf) % CEPH_PAGE_SIZE != 0)))
1354 return direct_read_unaligned(off, len, buf);
1355
11fdf7f2 1356 auto start1 = mono_clock::now();
7c673cae
FG
1357 if (buffered) {
1358 //buffered read
11fdf7f2 1359 auto off0 = off;
7c673cae
FG
1360 char *t = buf;
1361 uint64_t left = len;
1362 while (left > 0) {
11fdf7f2 1363 r = ::pread(fd_buffereds[WRITE_LIFE_NOT_SET], t, left, off);
7c673cae
FG
1364 if (r < 0) {
1365 r = -errno;
11fdf7f2 1366 derr << __func__ << " 0x" << std::hex << off << "~" << left
7c673cae
FG
1367 << std::dec << " error: " << cpp_strerror(r) << dendl;
1368 goto out;
1369 }
1370 off += r;
1371 t += r;
1372 left -= r;
1373 }
11fdf7f2
TL
1374 if (mono_clock::now() - start1 >= make_timespan(age)) {
1375 derr << __func__ << " stalled read "
1376 << " 0x" << std::hex << off0 << "~" << len << std::dec
1377 << " (buffered) since " << start1 << ", timeout is "
1378 << age
1379 << "s" << dendl;
1380 }
7c673cae
FG
1381 } else {
1382 //direct and aligned read
11fdf7f2
TL
1383 r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], buf, len, off);
1384 if (mono_clock::now() - start1 >= make_timespan(age)) {
1385 derr << __func__ << " stalled read "
1386 << " 0x" << std::hex << off << "~" << len << std::dec
1387 << " (direct) since " << start1 << ", timeout is "
1388 << age
1389 << "s" << dendl;
1390 }
7c673cae
FG
1391 if (r < 0) {
1392 r = -errno;
11fdf7f2 1393 derr << __func__ << " direct_aligned_read" << " 0x" << std::hex
f67539c2 1394 << off << "~" << std::left << std::dec << " error: " << cpp_strerror(r)
7c673cae
FG
1395 << dendl;
1396 goto out;
1397 }
11fdf7f2 1398 ceph_assert((uint64_t)r == len);
7c673cae
FG
1399 }
1400
20effc67 1401 dout(40) << __func__ << " data:\n";
7c673cae
FG
1402 bufferlist bl;
1403 bl.append(buf, len);
1404 bl.hexdump(*_dout);
1405 *_dout << dendl;
1406
1407 out:
1408 return r < 0 ? r : 0;
1409}
1410
1411int KernelDevice::invalidate_cache(uint64_t off, uint64_t len)
1412{
1413 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
1414 << dendl;
11fdf7f2
TL
1415 ceph_assert(off % block_size == 0);
1416 ceph_assert(len % block_size == 0);
1417 int r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, POSIX_FADV_DONTNEED);
7c673cae
FG
1418 if (r) {
1419 r = -r;
1420 derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
1421 << " error: " << cpp_strerror(r) << dendl;
1422 }
1423 return r;
1424}