]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/KernelDevice.cc
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / os / bluestore / KernelDevice.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <unistd.h>
16 #include <stdlib.h>
17 #include <sys/types.h>
18 #include <sys/stat.h>
19 #include <fcntl.h>
20 #include <sys/file.h>
21
22 #include "KernelDevice.h"
23 #include "include/types.h"
24 #include "include/compat.h"
25 #include "include/stringify.h"
26 #include "common/blkdev.h"
27 #include "common/errno.h"
28 #if defined(__FreeBSD__)
29 #include "bsm/audit_errno.h"
30 #endif
31 #include "common/debug.h"
32 #include "common/align.h"
33 #include "common/numa.h"
34
35 #include "global/global_context.h"
36
37 #define dout_context cct
38 #define dout_subsys ceph_subsys_bdev
39 #undef dout_prefix
40 #define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
41
42 KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv, aio_callback_t d_cb, void *d_cbpriv)
43 : BlockDevice(cct, cb, cbpriv),
44 aio(false), dio(false),
45 aio_queue(cct->_conf->bdev_aio_max_queue_depth),
46 discard_callback(d_cb),
47 discard_callback_priv(d_cbpriv),
48 aio_stop(false),
49 discard_started(false),
50 discard_stop(false),
51 aio_thread(this),
52 discard_thread(this),
53 injecting_crash(0)
54 {
55 fd_directs.resize(WRITE_LIFE_MAX, -1);
56 fd_buffereds.resize(WRITE_LIFE_MAX, -1);
57 }
58
59 int KernelDevice::_lock()
60 {
61 dout(10) << __func__ << " " << fd_directs[WRITE_LIFE_NOT_SET] << dendl;
62 int r = ::flock(fd_directs[WRITE_LIFE_NOT_SET], LOCK_EX | LOCK_NB);
63 if (r < 0) {
64 derr << __func__ << " flock failed on " << path << dendl;
65 return -errno;
66 }
67 return 0;
68 }
69
70 int KernelDevice::open(const string& p)
71 {
72 path = p;
73 int r = 0, i = 0;
74 dout(1) << __func__ << " path " << path << dendl;
75
76 for (i = 0; i < WRITE_LIFE_MAX; i++) {
77 int fd = ::open(path.c_str(), O_RDWR | O_DIRECT);
78 if (fd < 0) {
79 r = -errno;
80 break;
81 }
82 fd_directs[i] = fd;
83
84 fd = ::open(path.c_str(), O_RDWR | O_CLOEXEC);
85 if (fd < 0) {
86 r = -errno;
87 break;
88 }
89 fd_buffereds[i] = fd;
90 }
91
92 if (i != WRITE_LIFE_MAX) {
93 derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
94 goto out_fail;
95 }
96
97 #if defined(F_SET_FILE_RW_HINT)
98 for (i = WRITE_LIFE_NONE; i < WRITE_LIFE_MAX; i++) {
99 if (fcntl(fd_directs[i], F_SET_FILE_RW_HINT, &i) < 0) {
100 r = -errno;
101 break;
102 }
103 if (fcntl(fd_buffereds[i], F_SET_FILE_RW_HINT, &i) < 0) {
104 r = -errno;
105 break;
106 }
107 }
108 if (i != WRITE_LIFE_MAX) {
109 enable_wrt = false;
110 dout(0) << "ioctl(F_SET_FILE_RW_HINT) on " << path << " failed: " << cpp_strerror(r) << dendl;
111 }
112 #endif
113
114 dio = true;
115 aio = cct->_conf->bdev_aio;
116 if (!aio) {
117 ceph_abort_msg("non-aio not supported");
118 }
119
120 // disable readahead as it will wreak havoc on our mix of
121 // directio/aio and buffered io.
122 r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], 0, 0, POSIX_FADV_RANDOM);
123 if (r) {
124 r = -r;
125 derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
126 goto out_fail;
127 }
128
129 if (lock_exclusive) {
130 r = _lock();
131 if (r < 0) {
132 derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r)
133 << dendl;
134 goto out_fail;
135 }
136 }
137
138 struct stat st;
139 r = ::fstat(fd_directs[WRITE_LIFE_NOT_SET], &st);
140 if (r < 0) {
141 r = -errno;
142 derr << __func__ << " fstat got " << cpp_strerror(r) << dendl;
143 goto out_fail;
144 }
145
146 // Operate as though the block size is 4 KB. The backing file
147 // blksize doesn't strictly matter except that some file systems may
148 // require a read/modify/write if we write something smaller than
149 // it.
150 block_size = cct->_conf->bdev_block_size;
151 if (block_size != (unsigned)st.st_blksize) {
152 dout(1) << __func__ << " backing device/file reports st_blksize "
153 << st.st_blksize << ", using bdev_block_size "
154 << block_size << " anyway" << dendl;
155 }
156
157
158 {
159 BlkDev blkdev_direct(fd_directs[WRITE_LIFE_NOT_SET]);
160 BlkDev blkdev_buffered(fd_buffereds[WRITE_LIFE_NOT_SET]);
161
162 if (S_ISBLK(st.st_mode)) {
163 int64_t s;
164 r = blkdev_direct.get_size(&s);
165 if (r < 0) {
166 goto out_fail;
167 }
168 size = s;
169 } else {
170 size = st.st_size;
171 }
172
173 char partition[PATH_MAX], devname[PATH_MAX];
174 if ((r = blkdev_buffered.partition(partition, PATH_MAX)) ||
175 (r = blkdev_buffered.wholedisk(devname, PATH_MAX))) {
176 derr << "unable to get device name for " << path << ": "
177 << cpp_strerror(r) << dendl;
178 rotational = true;
179 } else {
180 dout(20) << __func__ << " devname " << devname << dendl;
181 rotational = blkdev_buffered.is_rotational();
182 support_discard = blkdev_buffered.support_discard();
183 this->devname = devname;
184 _detect_vdo();
185 }
186 }
187
188 r = _aio_start();
189 if (r < 0) {
190 goto out_fail;
191 }
192 _discard_start();
193
194 // round size down to an even block
195 size &= ~(block_size - 1);
196
197 dout(1) << __func__
198 << " size " << size
199 << " (0x" << std::hex << size << std::dec << ", "
200 << byte_u_t(size) << ")"
201 << " block_size " << block_size
202 << " (" << byte_u_t(block_size) << ")"
203 << " " << (rotational ? "rotational" : "non-rotational")
204 << " discard " << (support_discard ? "supported" : "not supported")
205 << dendl;
206 return 0;
207
208 out_fail:
209 for (i = 0; i < WRITE_LIFE_MAX; i++) {
210 if (fd_directs[i] >= 0) {
211 VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
212 fd_directs[i] = -1;
213 } else {
214 break;
215 }
216 if (fd_buffereds[i] >= 0) {
217 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
218 fd_buffereds[i] = -1;
219 } else {
220 break;
221 }
222 }
223 return r;
224 }
225
226 int KernelDevice::get_devices(std::set<std::string> *ls)
227 {
228 if (devname.empty()) {
229 return 0;
230 }
231 get_raw_devices(devname, ls);
232 return 0;
233 }
234
235 void KernelDevice::close()
236 {
237 dout(1) << __func__ << dendl;
238 _aio_stop();
239 _discard_stop();
240
241 if (vdo_fd >= 0) {
242 VOID_TEMP_FAILURE_RETRY(::close(vdo_fd));
243 vdo_fd = -1;
244 }
245
246 for (int i = 0; i < WRITE_LIFE_MAX; i++) {
247 assert(fd_directs[i] >= 0);
248 VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
249 fd_directs[i] = -1;
250
251 assert(fd_buffereds[i] >= 0);
252 VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
253 fd_buffereds[i] = -1;
254 }
255 path.clear();
256 }
257
258 int KernelDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
259 {
260 (*pm)[prefix + "support_discard"] = stringify((int)(bool)support_discard);
261 (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational);
262 (*pm)[prefix + "size"] = stringify(get_size());
263 (*pm)[prefix + "block_size"] = stringify(get_block_size());
264 (*pm)[prefix + "driver"] = "KernelDevice";
265 if (rotational) {
266 (*pm)[prefix + "type"] = "hdd";
267 } else {
268 (*pm)[prefix + "type"] = "ssd";
269 }
270 if (vdo_fd >= 0) {
271 (*pm)[prefix + "vdo"] = "true";
272 uint64_t total, avail;
273 get_vdo_utilization(vdo_fd, &total, &avail);
274 (*pm)[prefix + "vdo_physical_size"] = stringify(total);
275 }
276
277 struct stat st;
278 int r = ::fstat(fd_buffereds[WRITE_LIFE_NOT_SET], &st);
279 if (r < 0)
280 return -errno;
281 if (S_ISBLK(st.st_mode)) {
282 (*pm)[prefix + "access_mode"] = "blk";
283
284 char buffer[1024] = {0};
285 BlkDev blkdev{fd_buffereds[WRITE_LIFE_NOT_SET]};
286 if (r = blkdev.partition(buffer, sizeof(buffer)); r) {
287 (*pm)[prefix + "partition_path"] = "unknown";
288 } else {
289 (*pm)[prefix + "partition_path"] = buffer;
290 }
291 buffer[0] = '\0';
292 if (r = blkdev.partition(buffer, sizeof(buffer)); r) {
293 (*pm)[prefix + "dev_node"] = "unknown";
294 } else {
295 (*pm)[prefix + "dev_node"] = buffer;
296 }
297 if (!r) {
298 return 0;
299 }
300 buffer[0] = '\0';
301 blkdev.model(buffer, sizeof(buffer));
302 (*pm)[prefix + "model"] = buffer;
303
304 buffer[0] = '\0';
305 blkdev.dev(buffer, sizeof(buffer));
306 (*pm)[prefix + "dev"] = buffer;
307
308 // nvme exposes a serial number
309 buffer[0] = '\0';
310 blkdev.serial(buffer, sizeof(buffer));
311 (*pm)[prefix + "serial"] = buffer;
312
313 if (blkdev.is_nvme())
314 (*pm)[prefix + "type"] = "nvme";
315
316 // numa
317 int node;
318 r = blkdev.get_numa_node(&node);
319 if (r >= 0) {
320 (*pm)[prefix + "numa_node"] = stringify(node);
321 }
322 } else {
323 (*pm)[prefix + "access_mode"] = "file";
324 (*pm)[prefix + "path"] = path;
325 }
326 return 0;
327 }
328
329 void KernelDevice::_detect_vdo()
330 {
331 vdo_fd = get_vdo_stats_handle(devname.c_str(), &vdo_name);
332 if (vdo_fd >= 0) {
333 dout(1) << __func__ << " VDO volume " << vdo_name
334 << " maps to " << devname << dendl;
335 } else {
336 dout(20) << __func__ << " no VDO volume maps to " << devname << dendl;
337 }
338 return;
339 }
340
341 bool KernelDevice::get_thin_utilization(uint64_t *total, uint64_t *avail) const
342 {
343 if (vdo_fd < 0) {
344 return false;
345 }
346 return get_vdo_utilization(vdo_fd, total, avail);
347 }
348
349 int KernelDevice::choose_fd(bool buffered, int write_hint) const
350 {
351 assert(write_hint >= WRITE_LIFE_NOT_SET && write_hint < WRITE_LIFE_MAX);
352 if (!enable_wrt)
353 write_hint = WRITE_LIFE_NOT_SET;
354 return buffered ? fd_buffereds[write_hint] : fd_directs[write_hint];
355 }
356
357 int KernelDevice::flush()
358 {
359 // protect flush with a mutex. note that we are not really protecting
360 // data here. instead, we're ensuring that if any flush() caller
361 // sees that io_since_flush is true, they block any racing callers
362 // until the flush is observed. that allows racing threads to be
363 // calling flush while still ensuring that *any* of them that got an
364 // aio completion notification will not return before that aio is
365 // stable on disk: whichever thread sees the flag first will block
366 // followers until the aio is stable.
367 std::lock_guard l(flush_mutex);
368
369 bool expect = true;
370 if (!io_since_flush.compare_exchange_strong(expect, false)) {
371 dout(10) << __func__ << " no-op (no ios since last flush), flag is "
372 << (int)io_since_flush.load() << dendl;
373 return 0;
374 }
375
376 dout(10) << __func__ << " start" << dendl;
377 if (cct->_conf->bdev_inject_crash) {
378 ++injecting_crash;
379 // sleep for a moment to give other threads a chance to submit or
380 // wait on io that races with a flush.
381 derr << __func__ << " injecting crash. first we sleep..." << dendl;
382 sleep(cct->_conf->bdev_inject_crash_flush_delay);
383 derr << __func__ << " and now we die" << dendl;
384 cct->_log->flush();
385 _exit(1);
386 }
387 utime_t start = ceph_clock_now();
388 int r = ::fdatasync(fd_directs[WRITE_LIFE_NOT_SET]);
389 utime_t end = ceph_clock_now();
390 utime_t dur = end - start;
391 if (r < 0) {
392 r = -errno;
393 derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl;
394 ceph_abort();
395 }
396 dout(5) << __func__ << " in " << dur << dendl;;
397 return r;
398 }
399
400 int KernelDevice::_aio_start()
401 {
402 if (aio) {
403 dout(10) << __func__ << dendl;
404 int r = aio_queue.init();
405 if (r < 0) {
406 if (r == -EAGAIN) {
407 derr << __func__ << " io_setup(2) failed with EAGAIN; "
408 << "try increasing /proc/sys/fs/aio-max-nr" << dendl;
409 } else {
410 derr << __func__ << " io_setup(2) failed: " << cpp_strerror(r) << dendl;
411 }
412 return r;
413 }
414 aio_thread.create("bstore_aio");
415 }
416 return 0;
417 }
418
419 void KernelDevice::_aio_stop()
420 {
421 if (aio) {
422 dout(10) << __func__ << dendl;
423 aio_stop = true;
424 aio_thread.join();
425 aio_stop = false;
426 aio_queue.shutdown();
427 }
428 }
429
430 int KernelDevice::_discard_start()
431 {
432 discard_thread.create("bstore_discard");
433 return 0;
434 }
435
436 void KernelDevice::_discard_stop()
437 {
438 dout(10) << __func__ << dendl;
439 {
440 std::unique_lock l(discard_lock);
441 while (!discard_started) {
442 discard_cond.wait(l);
443 }
444 discard_stop = true;
445 discard_cond.notify_all();
446 }
447 discard_thread.join();
448 {
449 std::lock_guard l(discard_lock);
450 discard_stop = false;
451 }
452 dout(10) << __func__ << " stopped" << dendl;
453 }
454
455 void KernelDevice::discard_drain()
456 {
457 dout(10) << __func__ << dendl;
458 std::unique_lock l(discard_lock);
459 while (!discard_queued.empty() || discard_running) {
460 discard_cond.wait(l);
461 }
462 }
463
464 static bool is_expected_ioerr(const int r)
465 {
466 // https://lxr.missinglinkelectronics.com/linux+v4.15/block/blk-core.c#L135
467 return (r == -EOPNOTSUPP || r == -ETIMEDOUT || r == -ENOSPC ||
468 r == -ENOLINK || r == -EREMOTEIO || r == -EAGAIN || r == -EIO ||
469 r == -ENODATA || r == -EILSEQ || r == -ENOMEM ||
470 #if defined(__linux__)
471 r == -EREMCHG || r == -EBADE
472 #elif defined(__FreeBSD__)
473 r == - BSM_ERRNO_EREMCHG || r == -BSM_ERRNO_EBADE
474 #endif
475 );
476 }
477
478 void KernelDevice::_aio_thread()
479 {
480 dout(10) << __func__ << " start" << dendl;
481 int inject_crash_count = 0;
482 while (!aio_stop) {
483 dout(40) << __func__ << " polling" << dendl;
484 int max = cct->_conf->bdev_aio_reap_max;
485 aio_t *aio[max];
486 int r = aio_queue.get_next_completed(cct->_conf->bdev_aio_poll_ms,
487 aio, max);
488 if (r < 0) {
489 derr << __func__ << " got " << cpp_strerror(r) << dendl;
490 ceph_abort_msg("got unexpected error from io_getevents");
491 }
492 if (r > 0) {
493 dout(30) << __func__ << " got " << r << " completed aios" << dendl;
494 for (int i = 0; i < r; ++i) {
495 IOContext *ioc = static_cast<IOContext*>(aio[i]->priv);
496 _aio_log_finish(ioc, aio[i]->offset, aio[i]->length);
497 if (aio[i]->queue_item.is_linked()) {
498 std::lock_guard l(debug_queue_lock);
499 debug_aio_unlink(*aio[i]);
500 }
501
502 // set flag indicating new ios have completed. we do this *before*
503 // any completion or notifications so that any user flush() that
504 // follows the observed io completion will include this io. Note
505 // that an earlier, racing flush() could observe and clear this
506 // flag, but that also ensures that the IO will be stable before the
507 // later flush() occurs.
508 io_since_flush.store(true);
509
510 long r = aio[i]->get_return_value();
511 if (r < 0) {
512 derr << __func__ << " got r=" << r << " (" << cpp_strerror(r) << ")"
513 << dendl;
514 if (ioc->allow_eio && is_expected_ioerr(r)) {
515 derr << __func__ << " translating the error to EIO for upper layer"
516 << dendl;
517 ioc->set_return_value(-EIO);
518 } else {
519 if (is_expected_ioerr(r)) {
520 note_io_error_event(
521 devname.c_str(),
522 path.c_str(),
523 r,
524 #if defined(HAVE_POSIXAIO)
525 aio[i]->aio.aiocb.aio_lio_opcode,
526 #else
527 aio[i]->iocb.aio_lio_opcode,
528 #endif
529 aio[i]->offset,
530 aio[i]->length);
531 ceph_abort_msg(
532 "Unexpected IO error. "
533 "This may suggest a hardware issue. "
534 "Please check your kernel log!");
535 }
536 ceph_abort_msg(
537 "Unexpected IO error. "
538 "This may suggest HW issue. Please check your dmesg!");
539 }
540 } else if (aio[i]->length != (uint64_t)r) {
541 derr << "aio to " << aio[i]->offset << "~" << aio[i]->length
542 << " but returned: " << r << dendl;
543 ceph_abort_msg("unexpected aio return value: does not match length");
544 }
545
546 dout(10) << __func__ << " finished aio " << aio[i] << " r " << r
547 << " ioc " << ioc
548 << " with " << (ioc->num_running.load() - 1)
549 << " aios left" << dendl;
550
551 // NOTE: once num_running and we either call the callback or
552 // call aio_wake we cannot touch ioc or aio[] as the caller
553 // may free it.
554 if (ioc->priv) {
555 if (--ioc->num_running == 0) {
556 aio_callback(aio_callback_priv, ioc->priv);
557 }
558 } else {
559 ioc->try_aio_wake();
560 }
561 }
562 }
563 if (cct->_conf->bdev_debug_aio) {
564 utime_t now = ceph_clock_now();
565 std::lock_guard l(debug_queue_lock);
566 if (debug_oldest) {
567 if (debug_stall_since == utime_t()) {
568 debug_stall_since = now;
569 } else {
570 if (cct->_conf->bdev_debug_aio_suicide_timeout) {
571 utime_t cutoff = now;
572 cutoff -= cct->_conf->bdev_debug_aio_suicide_timeout;
573 if (debug_stall_since < cutoff) {
574 derr << __func__ << " stalled aio " << debug_oldest
575 << " since " << debug_stall_since << ", timeout is "
576 << cct->_conf->bdev_debug_aio_suicide_timeout
577 << "s, suicide" << dendl;
578 ceph_abort_msg("stalled aio... buggy kernel or bad device?");
579 }
580 }
581 }
582 }
583 }
584 reap_ioc();
585 if (cct->_conf->bdev_inject_crash) {
586 ++inject_crash_count;
587 if (inject_crash_count * cct->_conf->bdev_aio_poll_ms / 1000 >
588 cct->_conf->bdev_inject_crash + cct->_conf->bdev_inject_crash_flush_delay) {
589 derr << __func__ << " bdev_inject_crash trigger from aio thread"
590 << dendl;
591 cct->_log->flush();
592 _exit(1);
593 }
594 }
595 }
596 reap_ioc();
597 dout(10) << __func__ << " end" << dendl;
598 }
599
600 void KernelDevice::_discard_thread()
601 {
602 std::unique_lock l(discard_lock);
603 ceph_assert(!discard_started);
604 discard_started = true;
605 discard_cond.notify_all();
606 while (true) {
607 ceph_assert(discard_finishing.empty());
608 if (discard_queued.empty()) {
609 if (discard_stop)
610 break;
611 dout(20) << __func__ << " sleep" << dendl;
612 discard_cond.notify_all(); // for the thread trying to drain...
613 discard_cond.wait(l);
614 dout(20) << __func__ << " wake" << dendl;
615 } else {
616 discard_finishing.swap(discard_queued);
617 discard_running = true;
618 l.unlock();
619 dout(20) << __func__ << " finishing" << dendl;
620 for (auto p = discard_finishing.begin();p != discard_finishing.end(); ++p) {
621 discard(p.get_start(), p.get_len());
622 }
623
624 discard_callback(discard_callback_priv, static_cast<void*>(&discard_finishing));
625 discard_finishing.clear();
626 l.lock();
627 discard_running = false;
628 }
629 }
630 dout(10) << __func__ << " finish" << dendl;
631 discard_started = false;
632 }
633
634 int KernelDevice::queue_discard(interval_set<uint64_t> &to_release)
635 {
636 if (!support_discard)
637 return -1;
638
639 if (to_release.empty())
640 return 0;
641
642 std::lock_guard l(discard_lock);
643 discard_queued.insert(to_release);
644 discard_cond.notify_all();
645 return 0;
646 }
647
648 void KernelDevice::_aio_log_start(
649 IOContext *ioc,
650 uint64_t offset,
651 uint64_t length)
652 {
653 dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
654 << std::dec << dendl;
655 if (cct->_conf->bdev_debug_inflight_ios) {
656 std::lock_guard l(debug_lock);
657 if (debug_inflight.intersects(offset, length)) {
658 derr << __func__ << " inflight overlap of 0x"
659 << std::hex
660 << offset << "~" << length << std::dec
661 << " with " << debug_inflight << dendl;
662 ceph_abort();
663 }
664 debug_inflight.insert(offset, length);
665 }
666 }
667
668 void KernelDevice::debug_aio_link(aio_t& aio)
669 {
670 if (debug_queue.empty()) {
671 debug_oldest = &aio;
672 }
673 debug_queue.push_back(aio);
674 }
675
676 void KernelDevice::debug_aio_unlink(aio_t& aio)
677 {
678 if (aio.queue_item.is_linked()) {
679 debug_queue.erase(debug_queue.iterator_to(aio));
680 if (debug_oldest == &aio) {
681 auto age = cct->_conf->bdev_debug_aio_log_age;
682 if (age && debug_stall_since != utime_t()) {
683 utime_t cutoff = ceph_clock_now();
684 cutoff -= age;
685 if (debug_stall_since < cutoff) {
686 derr << __func__ << " stalled aio " << debug_oldest
687 << " since " << debug_stall_since << ", timeout is "
688 << age
689 << "s" << dendl;
690 }
691 }
692
693 if (debug_queue.empty()) {
694 debug_oldest = nullptr;
695 } else {
696 debug_oldest = &debug_queue.front();
697 }
698 debug_stall_since = utime_t();
699 }
700 }
701 }
702
703 void KernelDevice::_aio_log_finish(
704 IOContext *ioc,
705 uint64_t offset,
706 uint64_t length)
707 {
708 dout(20) << __func__ << " " << aio << " 0x"
709 << std::hex << offset << "~" << length << std::dec << dendl;
710 if (cct->_conf->bdev_debug_inflight_ios) {
711 std::lock_guard l(debug_lock);
712 debug_inflight.erase(offset, length);
713 }
714 }
715
716 void KernelDevice::aio_submit(IOContext *ioc)
717 {
718 dout(20) << __func__ << " ioc " << ioc
719 << " pending " << ioc->num_pending.load()
720 << " running " << ioc->num_running.load()
721 << dendl;
722
723 if (ioc->num_pending.load() == 0) {
724 return;
725 }
726
727 // move these aside, and get our end iterator position now, as the
728 // aios might complete as soon as they are submitted and queue more
729 // wal aio's.
730 list<aio_t>::iterator e = ioc->running_aios.begin();
731 ioc->running_aios.splice(e, ioc->pending_aios);
732
733 int pending = ioc->num_pending.load();
734 ioc->num_running += pending;
735 ioc->num_pending -= pending;
736 ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this
737 ceph_assert(ioc->pending_aios.size() == 0);
738
739 if (cct->_conf->bdev_debug_aio) {
740 list<aio_t>::iterator p = ioc->running_aios.begin();
741 while (p != e) {
742 dout(30) << __func__ << " " << *p << dendl;
743 std::lock_guard l(debug_queue_lock);
744 debug_aio_link(*p++);
745 }
746 }
747
748 void *priv = static_cast<void*>(ioc);
749 int r, retries = 0;
750 r = aio_queue.submit_batch(ioc->running_aios.begin(), e,
751 pending, priv, &retries);
752
753 if (retries)
754 derr << __func__ << " retries " << retries << dendl;
755 if (r < 0) {
756 derr << " aio submit got " << cpp_strerror(r) << dendl;
757 ceph_assert(r == 0);
758 }
759 }
760
761 int KernelDevice::_sync_write(uint64_t off, bufferlist &bl, bool buffered, int write_hint)
762 {
763 uint64_t len = bl.length();
764 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
765 << std::dec << (buffered ? " (buffered)" : " (direct)") << dendl;
766 if (cct->_conf->bdev_inject_crash &&
767 rand() % cct->_conf->bdev_inject_crash == 0) {
768 derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
769 << off << "~" << len << std::dec << dendl;
770 ++injecting_crash;
771 return 0;
772 }
773 vector<iovec> iov;
774 bl.prepare_iov(&iov);
775 int r = ::pwritev(choose_fd(buffered, write_hint),
776 &iov[0], iov.size(), off);
777
778 if (r < 0) {
779 r = -errno;
780 derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl;
781 return r;
782 }
783 #ifdef HAVE_SYNC_FILE_RANGE
784 if (buffered) {
785 // initiate IO (but do not wait)
786 r = ::sync_file_range(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, SYNC_FILE_RANGE_WRITE);
787 if (r < 0) {
788 r = -errno;
789 derr << __func__ << " sync_file_range error: " << cpp_strerror(r) << dendl;
790 return r;
791 }
792 }
793 #endif
794
795 io_since_flush.store(true);
796
797 return 0;
798 }
799
800 int KernelDevice::write(
801 uint64_t off,
802 bufferlist &bl,
803 bool buffered,
804 int write_hint)
805 {
806 uint64_t len = bl.length();
807 dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
808 << (buffered ? " (buffered)" : " (direct)")
809 << dendl;
810 ceph_assert(is_valid_io(off, len));
811
812 if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
813 bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) {
814 dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
815 }
816 dout(40) << "data: ";
817 bl.hexdump(*_dout);
818 *_dout << dendl;
819
820 return _sync_write(off, bl, buffered, write_hint);
821 }
822
823 int KernelDevice::aio_write(
824 uint64_t off,
825 bufferlist &bl,
826 IOContext *ioc,
827 bool buffered,
828 int write_hint)
829 {
830 uint64_t len = bl.length();
831 dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
832 << (buffered ? " (buffered)" : " (direct)")
833 << dendl;
834 ceph_assert(is_valid_io(off, len));
835
836 if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
837 bl.rebuild_aligned_size_and_memory(block_size, block_size, IOV_MAX)) {
838 dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
839 }
840 dout(40) << "data: ";
841 bl.hexdump(*_dout);
842 *_dout << dendl;
843
844 _aio_log_start(ioc, off, len);
845
846 #ifdef HAVE_LIBAIO
847 if (aio && dio && !buffered) {
848 ioc->pending_aios.push_back(aio_t(ioc, choose_fd(false, write_hint)));
849 ++ioc->num_pending;
850 aio_t& aio = ioc->pending_aios.back();
851 if (cct->_conf->bdev_inject_crash &&
852 rand() % cct->_conf->bdev_inject_crash == 0) {
853 derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
854 << off << "~" << len << std::dec
855 << dendl;
856 // generate a real io so that aio_wait behaves properly, but make it
857 // a read instead of write, and toss the result.
858 aio.pread(off, len);
859 ++injecting_crash;
860 } else {
861 bl.prepare_iov(&aio.iov);
862 dout(30) << aio << dendl;
863 aio.bl.claim_append(bl);
864 aio.pwritev(off, len);
865 }
866 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
867 << std::dec << " aio " << &aio << dendl;
868 } else
869 #endif
870 {
871 int r = _sync_write(off, bl, buffered, write_hint);
872 _aio_log_finish(ioc, off, len);
873 if (r < 0)
874 return r;
875 }
876 return 0;
877 }
878
879 int KernelDevice::discard(uint64_t offset, uint64_t len)
880 {
881 int r = 0;
882 if (support_discard) {
883 dout(10) << __func__
884 << " 0x" << std::hex << offset << "~" << len << std::dec
885 << dendl;
886
887 r = BlkDev{fd_directs[WRITE_LIFE_NOT_SET]}.discard((int64_t)offset, (int64_t)len);
888 }
889 return r;
890 }
891
892 int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
893 IOContext *ioc,
894 bool buffered)
895 {
896 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
897 << (buffered ? " (buffered)" : " (direct)")
898 << dendl;
899 ceph_assert(is_valid_io(off, len));
900
901 _aio_log_start(ioc, off, len);
902
903 auto start1 = mono_clock::now();
904
905 auto p = buffer::ptr_node::create(buffer::create_small_page_aligned(len));
906 int r = ::pread(buffered ? fd_buffereds[WRITE_LIFE_NOT_SET] : fd_directs[WRITE_LIFE_NOT_SET],
907 p->c_str(), len, off);
908 auto age = cct->_conf->bdev_debug_aio_log_age;
909 if (mono_clock::now() - start1 >= make_timespan(age)) {
910 derr << __func__ << " stalled read "
911 << " 0x" << std::hex << off << "~" << len << std::dec
912 << (buffered ? " (buffered)" : " (direct)")
913 << " since " << start1 << ", timeout is "
914 << age
915 << "s" << dendl;
916 }
917
918 if (r < 0) {
919 if (ioc->allow_eio && is_expected_ioerr(r)) {
920 r = -EIO;
921 } else {
922 r = -errno;
923 }
924 goto out;
925 }
926 ceph_assert((uint64_t)r == len);
927 pbl->push_back(std::move(p));
928
929 dout(40) << "data: ";
930 pbl->hexdump(*_dout);
931 *_dout << dendl;
932
933 out:
934 _aio_log_finish(ioc, off, len);
935 return r < 0 ? r : 0;
936 }
937
938 int KernelDevice::aio_read(
939 uint64_t off,
940 uint64_t len,
941 bufferlist *pbl,
942 IOContext *ioc)
943 {
944 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
945 << dendl;
946
947 int r = 0;
948 #ifdef HAVE_LIBAIO
949 if (aio && dio) {
950 ceph_assert(is_valid_io(off, len));
951 _aio_log_start(ioc, off, len);
952 ioc->pending_aios.push_back(aio_t(ioc, fd_directs[WRITE_LIFE_NOT_SET]));
953 ++ioc->num_pending;
954 aio_t& aio = ioc->pending_aios.back();
955 aio.pread(off, len);
956 dout(30) << aio << dendl;
957 pbl->append(aio.bl);
958 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
959 << std::dec << " aio " << &aio << dendl;
960 } else
961 #endif
962 {
963 r = read(off, len, pbl, ioc, false);
964 }
965
966 return r;
967 }
968
969 int KernelDevice::direct_read_unaligned(uint64_t off, uint64_t len, char *buf)
970 {
971 uint64_t aligned_off = align_down(off, block_size);
972 uint64_t aligned_len = align_up(off+len, block_size) - aligned_off;
973 bufferptr p = buffer::create_small_page_aligned(aligned_len);
974 int r = 0;
975
976 auto start1 = mono_clock::now();
977 r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], p.c_str(), aligned_len, aligned_off);
978 auto age = cct->_conf->bdev_debug_aio_log_age;
979 if (mono_clock::now() - start1 >= make_timespan(age)) {
980 derr << __func__ << " stalled read "
981 << " 0x" << std::hex << off << "~" << len << std::dec
982 << " since " << start1 << ", timeout is "
983 << age
984 << "s" << dendl;
985 }
986
987 if (r < 0) {
988 r = -errno;
989 derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
990 << " error: " << cpp_strerror(r) << dendl;
991 goto out;
992 }
993 ceph_assert((uint64_t)r == aligned_len);
994 memcpy(buf, p.c_str() + (off - aligned_off), len);
995
996 dout(40) << __func__ << " data: ";
997 bufferlist bl;
998 bl.append(buf, len);
999 bl.hexdump(*_dout);
1000 *_dout << dendl;
1001
1002 out:
1003 return r < 0 ? r : 0;
1004 }
1005
1006 int KernelDevice::read_random(uint64_t off, uint64_t len, char *buf,
1007 bool buffered)
1008 {
1009 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
1010 << dendl;
1011 ceph_assert(len > 0);
1012 ceph_assert(off < size);
1013 ceph_assert(off + len <= size);
1014 int r = 0;
1015 auto age = cct->_conf->bdev_debug_aio_log_age;
1016
1017 //if it's direct io and unaligned, we have to use a internal buffer
1018 if (!buffered && ((off % block_size != 0)
1019 || (len % block_size != 0)
1020 || (uintptr_t(buf) % CEPH_PAGE_SIZE != 0)))
1021 return direct_read_unaligned(off, len, buf);
1022
1023 auto start1 = mono_clock::now();
1024 if (buffered) {
1025 //buffered read
1026 auto off0 = off;
1027 char *t = buf;
1028 uint64_t left = len;
1029 while (left > 0) {
1030 r = ::pread(fd_buffereds[WRITE_LIFE_NOT_SET], t, left, off);
1031 if (r < 0) {
1032 r = -errno;
1033 derr << __func__ << " 0x" << std::hex << off << "~" << left
1034 << std::dec << " error: " << cpp_strerror(r) << dendl;
1035 goto out;
1036 }
1037 off += r;
1038 t += r;
1039 left -= r;
1040 }
1041 if (mono_clock::now() - start1 >= make_timespan(age)) {
1042 derr << __func__ << " stalled read "
1043 << " 0x" << std::hex << off0 << "~" << len << std::dec
1044 << " (buffered) since " << start1 << ", timeout is "
1045 << age
1046 << "s" << dendl;
1047 }
1048 } else {
1049 //direct and aligned read
1050 r = ::pread(fd_directs[WRITE_LIFE_NOT_SET], buf, len, off);
1051 if (mono_clock::now() - start1 >= make_timespan(age)) {
1052 derr << __func__ << " stalled read "
1053 << " 0x" << std::hex << off << "~" << len << std::dec
1054 << " (direct) since " << start1 << ", timeout is "
1055 << age
1056 << "s" << dendl;
1057 }
1058 if (r < 0) {
1059 r = -errno;
1060 derr << __func__ << " direct_aligned_read" << " 0x" << std::hex
1061 << off << "~" << left << std::dec << " error: " << cpp_strerror(r)
1062 << dendl;
1063 goto out;
1064 }
1065 ceph_assert((uint64_t)r == len);
1066 }
1067
1068 dout(40) << __func__ << " data: ";
1069 bufferlist bl;
1070 bl.append(buf, len);
1071 bl.hexdump(*_dout);
1072 *_dout << dendl;
1073
1074 out:
1075 return r < 0 ? r : 0;
1076 }
1077
1078 int KernelDevice::invalidate_cache(uint64_t off, uint64_t len)
1079 {
1080 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
1081 << dendl;
1082 ceph_assert(off % block_size == 0);
1083 ceph_assert(len % block_size == 0);
1084 int r = posix_fadvise(fd_buffereds[WRITE_LIFE_NOT_SET], off, len, POSIX_FADV_DONTNEED);
1085 if (r) {
1086 r = -r;
1087 derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
1088 << " error: " << cpp_strerror(r) << dendl;
1089 }
1090 return r;
1091 }