]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/KernelDevice.cc
c6b9e5ad2898c33e32cb992113a8c408b8a8bb2e
[ceph.git] / ceph / src / os / bluestore / KernelDevice.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include <unistd.h>
16 #include <stdlib.h>
17 #include <sys/types.h>
18 #include <sys/stat.h>
19 #include <fcntl.h>
20
21 #include "KernelDevice.h"
22 #include "include/types.h"
23 #include "include/compat.h"
24 #include "include/stringify.h"
25 #include "common/errno.h"
26 #include "common/debug.h"
27 #include "common/blkdev.h"
28 #include "common/align.h"
29 #include "common/blkdev.h"
30
31 #define dout_context cct
32 #define dout_subsys ceph_subsys_bdev
33 #undef dout_prefix
34 #define dout_prefix *_dout << "bdev(" << this << " " << path << ") "
35
36 KernelDevice::KernelDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
37 : BlockDevice(cct),
38 fd_direct(-1),
39 fd_buffered(-1),
40 size(0), block_size(0),
41 fs(NULL), aio(false), dio(false),
42 debug_lock("KernelDevice::debug_lock"),
43 aio_queue(cct->_conf->bdev_aio_max_queue_depth),
44 aio_callback(cb),
45 aio_callback_priv(cbpriv),
46 aio_stop(false),
47 aio_thread(this),
48 injecting_crash(0)
49 {
50 }
51
52 int KernelDevice::_lock()
53 {
54 struct flock l;
55 memset(&l, 0, sizeof(l));
56 l.l_type = F_WRLCK;
57 l.l_whence = SEEK_SET;
58 int r = ::fcntl(fd_direct, F_SETLK, &l);
59 if (r < 0)
60 return -errno;
61 return 0;
62 }
63
64 int KernelDevice::open(const string& p)
65 {
66 path = p;
67 int r = 0;
68 dout(1) << __func__ << " path " << path << dendl;
69
70 fd_direct = ::open(path.c_str(), O_RDWR | O_DIRECT);
71 if (fd_direct < 0) {
72 r = -errno;
73 derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
74 return r;
75 }
76 fd_buffered = ::open(path.c_str(), O_RDWR);
77 if (fd_buffered < 0) {
78 r = -errno;
79 derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
80 goto out_direct;
81 }
82 dio = true;
83 aio = cct->_conf->bdev_aio;
84 if (!aio) {
85 assert(0 == "non-aio not supported");
86 }
87
88 // disable readahead as it will wreak havoc on our mix of
89 // directio/aio and buffered io.
90 r = posix_fadvise(fd_buffered, 0, 0, POSIX_FADV_RANDOM);
91 if (r) {
92 r = -r;
93 derr << __func__ << " open got: " << cpp_strerror(r) << dendl;
94 goto out_fail;
95 }
96
97 r = _lock();
98 if (r < 0) {
99 derr << __func__ << " failed to lock " << path << ": " << cpp_strerror(r)
100 << dendl;
101 goto out_fail;
102 }
103
104 struct stat st;
105 r = ::fstat(fd_direct, &st);
106 if (r < 0) {
107 r = -errno;
108 derr << __func__ << " fstat got " << cpp_strerror(r) << dendl;
109 goto out_fail;
110 }
111
112 // Operate as though the block size is 4 KB. The backing file
113 // blksize doesn't strictly matter except that some file systems may
114 // require a read/modify/write if we write something smaller than
115 // it.
116 block_size = cct->_conf->bdev_block_size;
117 if (block_size != (unsigned)st.st_blksize) {
118 dout(1) << __func__ << " backing device/file reports st_blksize "
119 << st.st_blksize << ", using bdev_block_size "
120 << block_size << " anyway" << dendl;
121 }
122
123 if (S_ISBLK(st.st_mode)) {
124 int64_t s;
125 r = get_block_device_size(fd_direct, &s);
126 if (r < 0) {
127 goto out_fail;
128 }
129 size = s;
130 } else {
131 size = st.st_size;
132 }
133 size &= ~(block_size);
134
135 {
136 char partition[PATH_MAX], devname[PATH_MAX];
137 r = get_device_by_fd(fd_buffered, partition, devname, sizeof(devname));
138 if (r < 0) {
139 derr << "unable to get device name for " << path << ": "
140 << cpp_strerror(r) << dendl;
141 rotational = true;
142 } else {
143 dout(20) << __func__ << " devname " << devname << dendl;
144 rotational = block_device_is_rotational(devname);
145 }
146 }
147
148 r = _aio_start();
149 if (r < 0) {
150 goto out_fail;
151 }
152
153 fs = FS::create_by_fd(fd_direct);
154 assert(fs);
155
156 // round size down to an even block
157 size &= ~(block_size - 1);
158
159 dout(1) << __func__
160 << " size " << size
161 << " (0x" << std::hex << size << std::dec << ", "
162 << pretty_si_t(size) << "B)"
163 << " block_size " << block_size
164 << " (" << pretty_si_t(block_size) << "B)"
165 << " " << (rotational ? "rotational" : "non-rotational")
166 << dendl;
167 return 0;
168
169 out_fail:
170 VOID_TEMP_FAILURE_RETRY(::close(fd_buffered));
171 fd_buffered = -1;
172 out_direct:
173 VOID_TEMP_FAILURE_RETRY(::close(fd_direct));
174 fd_direct = -1;
175 return r;
176 }
177
178 void KernelDevice::close()
179 {
180 dout(1) << __func__ << dendl;
181 _aio_stop();
182
183 assert(fs);
184 delete fs;
185 fs = NULL;
186
187 assert(fd_direct >= 0);
188 VOID_TEMP_FAILURE_RETRY(::close(fd_direct));
189 fd_direct = -1;
190
191 assert(fd_buffered >= 0);
192 VOID_TEMP_FAILURE_RETRY(::close(fd_buffered));
193 fd_buffered = -1;
194
195 path.clear();
196 }
197
198 static string get_dev_property(const char *dev, const char *property)
199 {
200 char val[1024] = {0};
201 get_block_device_string_property(dev, property, val, sizeof(val));
202 return val;
203 }
204
205 int KernelDevice::collect_metadata(string prefix, map<string,string> *pm) const
206 {
207 (*pm)[prefix + "rotational"] = stringify((int)(bool)rotational);
208 (*pm)[prefix + "size"] = stringify(get_size());
209 (*pm)[prefix + "block_size"] = stringify(get_block_size());
210 (*pm)[prefix + "driver"] = "KernelDevice";
211 if (rotational) {
212 (*pm)[prefix + "type"] = "hdd";
213 } else {
214 (*pm)[prefix + "type"] = "ssd";
215 }
216
217 struct stat st;
218 int r = ::fstat(fd_buffered, &st);
219 if (r < 0)
220 return -errno;
221 if (S_ISBLK(st.st_mode)) {
222 (*pm)[prefix + "access_mode"] = "blk";
223 char partition_path[PATH_MAX];
224 char dev_node[PATH_MAX];
225 int rc = get_device_by_fd(fd_buffered, partition_path, dev_node, PATH_MAX);
226 switch (rc) {
227 case -EOPNOTSUPP:
228 case -EINVAL:
229 (*pm)[prefix + "partition_path"] = "unknown";
230 (*pm)[prefix + "dev_node"] = "unknown";
231 break;
232 case -ENODEV:
233 (*pm)[prefix + "partition_path"] = string(partition_path);
234 (*pm)[prefix + "dev_node"] = "unknown";
235 break;
236 default:
237 {
238 (*pm)[prefix + "partition_path"] = string(partition_path);
239 (*pm)[prefix + "dev_node"] = string(dev_node);
240 (*pm)[prefix + "model"] = get_dev_property(dev_node, "device/model");
241 (*pm)[prefix + "dev"] = get_dev_property(dev_node, "dev");
242
243 // nvme exposes a serial number
244 string serial = get_dev_property(dev_node, "device/serial");
245 if (serial.length()) {
246 (*pm)[prefix + "serial"] = serial;
247 }
248
249 // nvme has a device/device/* structure; infer from that. there
250 // is probably a better way?
251 string nvme_vendor = get_dev_property(dev_node, "device/device/vendor");
252 if (nvme_vendor.length()) {
253 (*pm)[prefix + "type"] = "nvme";
254 }
255 }
256 }
257 } else {
258 (*pm)[prefix + "access_mode"] = "file";
259 (*pm)[prefix + "path"] = path;
260 }
261 return 0;
262 }
263
264 int KernelDevice::flush()
265 {
266 // protect flush with a mutex. note that we are not really protecting
267 // data here. instead, we're ensuring that if any flush() caller
268 // sees that io_since_flush is true, they block any racing callers
269 // until the flush is observed. that allows racing threads to be
270 // calling flush while still ensuring that *any* of them that got an
271 // aio completion notification will not return before that aio is
272 // stable on disk: whichever thread sees the flag first will block
273 // followers until the aio is stable.
274 std::lock_guard<std::mutex> l(flush_mutex);
275
276 bool expect = true;
277 if (!io_since_flush.compare_exchange_strong(expect, false)) {
278 dout(10) << __func__ << " no-op (no ios since last flush), flag is "
279 << (int)io_since_flush.load() << dendl;
280 return 0;
281 }
282
283 dout(10) << __func__ << " start" << dendl;
284 if (cct->_conf->bdev_inject_crash) {
285 ++injecting_crash;
286 // sleep for a moment to give other threads a chance to submit or
287 // wait on io that races with a flush.
288 derr << __func__ << " injecting crash. first we sleep..." << dendl;
289 sleep(cct->_conf->bdev_inject_crash_flush_delay);
290 derr << __func__ << " and now we die" << dendl;
291 cct->_log->flush();
292 _exit(1);
293 }
294 utime_t start = ceph_clock_now();
295 int r = ::fdatasync(fd_direct);
296 utime_t end = ceph_clock_now();
297 utime_t dur = end - start;
298 if (r < 0) {
299 r = -errno;
300 derr << __func__ << " fdatasync got: " << cpp_strerror(r) << dendl;
301 ceph_abort();
302 }
303 dout(5) << __func__ << " in " << dur << dendl;;
304 return r;
305 }
306
307 int KernelDevice::_aio_start()
308 {
309 if (aio) {
310 dout(10) << __func__ << dendl;
311 int r = aio_queue.init();
312 if (r < 0) {
313 if (r == -EAGAIN) {
314 derr << __func__ << " io_setup(2) failed with EAGAIN; "
315 << "try increasing /proc/sys/fs/aio-max-nr" << dendl;
316 } else {
317 derr << __func__ << " io_setup(2) failed: " << cpp_strerror(r) << dendl;
318 }
319 return r;
320 }
321 aio_thread.create("bstore_aio");
322 }
323 return 0;
324 }
325
326 void KernelDevice::_aio_stop()
327 {
328 if (aio) {
329 dout(10) << __func__ << dendl;
330 aio_stop = true;
331 aio_thread.join();
332 aio_stop = false;
333 aio_queue.shutdown();
334 }
335 }
336
337 void KernelDevice::_aio_thread()
338 {
339 dout(10) << __func__ << " start" << dendl;
340 int inject_crash_count = 0;
341 while (!aio_stop) {
342 dout(40) << __func__ << " polling" << dendl;
343 int max = 16;
344 aio_t *aio[max];
345 int r = aio_queue.get_next_completed(cct->_conf->bdev_aio_poll_ms,
346 aio, max);
347 if (r < 0) {
348 derr << __func__ << " got " << cpp_strerror(r) << dendl;
349 }
350 if (r > 0) {
351 dout(30) << __func__ << " got " << r << " completed aios" << dendl;
352 for (int i = 0; i < r; ++i) {
353 IOContext *ioc = static_cast<IOContext*>(aio[i]->priv);
354 _aio_log_finish(ioc, aio[i]->offset, aio[i]->length);
355 if (aio[i]->queue_item.is_linked()) {
356 std::lock_guard<std::mutex> l(debug_queue_lock);
357 debug_aio_unlink(*aio[i]);
358 }
359
360 // set flag indicating new ios have completed. we do this *before*
361 // any completion or notifications so that any user flush() that
362 // follows the observed io completion will include this io. Note
363 // that an earlier, racing flush() could observe and clear this
364 // flag, but that also ensures that the IO will be stable before the
365 // later flush() occurs.
366 io_since_flush.store(true);
367
368 int r = aio[i]->get_return_value();
369 dout(10) << __func__ << " finished aio " << aio[i] << " r " << r
370 << " ioc " << ioc
371 << " with " << (ioc->num_running.load() - 1)
372 << " aios left" << dendl;
373 assert(r >= 0);
374
375 // NOTE: once num_running and we either call the callback or
376 // call aio_wake we cannot touch ioc or aio[] as the caller
377 // may free it.
378 if (ioc->priv) {
379 if (--ioc->num_running == 0) {
380 aio_callback(aio_callback_priv, ioc->priv);
381 }
382 } else {
383 ioc->try_aio_wake();
384 }
385 }
386 }
387 if (cct->_conf->bdev_debug_aio) {
388 utime_t now = ceph_clock_now();
389 std::lock_guard<std::mutex> l(debug_queue_lock);
390 if (debug_oldest) {
391 if (debug_stall_since == utime_t()) {
392 debug_stall_since = now;
393 } else {
394 utime_t cutoff = now;
395 cutoff -= cct->_conf->bdev_debug_aio_suicide_timeout;
396 if (debug_stall_since < cutoff) {
397 derr << __func__ << " stalled aio " << debug_oldest
398 << " since " << debug_stall_since << ", timeout is "
399 << cct->_conf->bdev_debug_aio_suicide_timeout
400 << "s, suicide" << dendl;
401 assert(0 == "stalled aio... buggy kernel or bad device?");
402 }
403 }
404 }
405 }
406 reap_ioc();
407 if (cct->_conf->bdev_inject_crash) {
408 ++inject_crash_count;
409 if (inject_crash_count * cct->_conf->bdev_aio_poll_ms / 1000 >
410 cct->_conf->bdev_inject_crash + cct->_conf->bdev_inject_crash_flush_delay) {
411 derr << __func__ << " bdev_inject_crash trigger from aio thread"
412 << dendl;
413 cct->_log->flush();
414 _exit(1);
415 }
416 }
417 }
418 reap_ioc();
419 dout(10) << __func__ << " end" << dendl;
420 }
421
422 void KernelDevice::_aio_log_start(
423 IOContext *ioc,
424 uint64_t offset,
425 uint64_t length)
426 {
427 dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
428 << std::dec << dendl;
429 if (cct->_conf->bdev_debug_inflight_ios) {
430 Mutex::Locker l(debug_lock);
431 if (debug_inflight.intersects(offset, length)) {
432 derr << __func__ << " inflight overlap of 0x"
433 << std::hex
434 << offset << "~" << length << std::dec
435 << " with " << debug_inflight << dendl;
436 ceph_abort();
437 }
438 debug_inflight.insert(offset, length);
439 }
440 }
441
442 void KernelDevice::debug_aio_link(aio_t& aio)
443 {
444 if (debug_queue.empty()) {
445 debug_oldest = &aio;
446 }
447 debug_queue.push_back(aio);
448 }
449
450 void KernelDevice::debug_aio_unlink(aio_t& aio)
451 {
452 if (aio.queue_item.is_linked()) {
453 debug_queue.erase(debug_queue.iterator_to(aio));
454 if (debug_oldest == &aio) {
455 if (debug_queue.empty()) {
456 debug_oldest = nullptr;
457 } else {
458 debug_oldest = &debug_queue.front();
459 }
460 debug_stall_since = utime_t();
461 }
462 }
463 }
464
465 void KernelDevice::_aio_log_finish(
466 IOContext *ioc,
467 uint64_t offset,
468 uint64_t length)
469 {
470 dout(20) << __func__ << " " << aio << " 0x"
471 << std::hex << offset << "~" << length << std::dec << dendl;
472 if (cct->_conf->bdev_debug_inflight_ios) {
473 Mutex::Locker l(debug_lock);
474 debug_inflight.erase(offset, length);
475 }
476 }
477
478 void KernelDevice::aio_submit(IOContext *ioc)
479 {
480 dout(20) << __func__ << " ioc " << ioc
481 << " pending " << ioc->num_pending.load()
482 << " running " << ioc->num_running.load()
483 << dendl;
484 if (ioc->num_pending.load() == 0) {
485 return;
486 }
487 // move these aside, and get our end iterator position now, as the
488 // aios might complete as soon as they are submitted and queue more
489 // wal aio's.
490 list<aio_t>::iterator e = ioc->running_aios.begin();
491 ioc->running_aios.splice(e, ioc->pending_aios);
492 list<aio_t>::iterator p = ioc->running_aios.begin();
493
494 int pending = ioc->num_pending.load();
495 ioc->num_running += pending;
496 ioc->num_pending -= pending;
497 assert(ioc->num_pending.load() == 0); // we should be only thread doing this
498
499 bool done = false;
500 while (!done) {
501 aio_t& aio = *p;
502 aio.priv = static_cast<void*>(ioc);
503 dout(20) << __func__ << " aio " << &aio << " fd " << aio.fd
504 << " 0x" << std::hex << aio.offset << "~" << aio.length
505 << std::dec << dendl;
506 for (auto& io : aio.iov)
507 dout(30) << __func__ << " iov " << (void*)io.iov_base
508 << " len " << io.iov_len << dendl;
509
510 // be careful: as soon as we submit aio we race with completion.
511 // since we are holding a ref take care not to dereference txc at
512 // all after that point.
513 list<aio_t>::iterator cur = p;
514 ++p;
515 done = (p == e);
516
517 // do not dereference txc (or it's contents) after we submit (if
518 // done == true and we don't loop)
519 int retries = 0;
520 if (cct->_conf->bdev_debug_aio) {
521 std::lock_guard<std::mutex> l(debug_queue_lock);
522 debug_aio_link(*cur);
523 }
524 int r = aio_queue.submit(*cur, &retries);
525 if (retries)
526 derr << __func__ << " retries " << retries << dendl;
527 if (r) {
528 derr << " aio submit got " << cpp_strerror(r) << dendl;
529 assert(r == 0);
530 }
531 }
532 }
533
534 int KernelDevice::_sync_write(uint64_t off, bufferlist &bl, bool buffered)
535 {
536 uint64_t len = bl.length();
537 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
538 << std::dec << " buffered" << dendl;
539 if (cct->_conf->bdev_inject_crash &&
540 rand() % cct->_conf->bdev_inject_crash == 0) {
541 derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
542 << off << "~" << len << std::dec << dendl;
543 ++injecting_crash;
544 return 0;
545 }
546 vector<iovec> iov;
547 bl.prepare_iov(&iov);
548 int r = ::pwritev(buffered ? fd_buffered : fd_direct,
549 &iov[0], iov.size(), off);
550
551 if (r < 0) {
552 r = -errno;
553 derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl;
554 return r;
555 }
556 if (buffered) {
557 // initiate IO (but do not wait)
558 r = ::sync_file_range(fd_buffered, off, len, SYNC_FILE_RANGE_WRITE);
559 if (r < 0) {
560 r = -errno;
561 derr << __func__ << " sync_file_range error: " << cpp_strerror(r) << dendl;
562 return r;
563 }
564 }
565
566 io_since_flush.store(true);
567
568 return 0;
569 }
570
571 int KernelDevice::write(
572 uint64_t off,
573 bufferlist &bl,
574 bool buffered)
575 {
576 uint64_t len = bl.length();
577 dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
578 << (buffered ? " (buffered)" : " (direct)")
579 << dendl;
580 assert(off % block_size == 0);
581 assert(len % block_size == 0);
582 assert(len > 0);
583 assert(off < size);
584 assert(off + len <= size);
585
586 if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
587 bl.rebuild_aligned_size_and_memory(block_size, block_size)) {
588 dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
589 }
590 dout(40) << "data: ";
591 bl.hexdump(*_dout);
592 *_dout << dendl;
593
594 return _sync_write(off, bl, buffered);
595 }
596
597 int KernelDevice::aio_write(
598 uint64_t off,
599 bufferlist &bl,
600 IOContext *ioc,
601 bool buffered)
602 {
603 uint64_t len = bl.length();
604 dout(20) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
605 << (buffered ? " (buffered)" : " (direct)")
606 << dendl;
607 assert(off % block_size == 0);
608 assert(len % block_size == 0);
609 assert(len > 0);
610 assert(off < size);
611 assert(off + len <= size);
612
613 if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
614 bl.rebuild_aligned_size_and_memory(block_size, block_size)) {
615 dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
616 }
617 dout(40) << "data: ";
618 bl.hexdump(*_dout);
619 *_dout << dendl;
620
621 _aio_log_start(ioc, off, len);
622
623 #ifdef HAVE_LIBAIO
624 if (aio && dio && !buffered) {
625 ioc->pending_aios.push_back(aio_t(ioc, fd_direct));
626 ++ioc->num_pending;
627 aio_t& aio = ioc->pending_aios.back();
628 if (cct->_conf->bdev_inject_crash &&
629 rand() % cct->_conf->bdev_inject_crash == 0) {
630 derr << __func__ << " bdev_inject_crash: dropping io 0x" << std::hex
631 << off << "~" << len << std::dec
632 << dendl;
633 // generate a real io so that aio_wait behaves properly, but make it
634 // a read instead of write, and toss the result.
635 aio.pread(off, len);
636 ++injecting_crash;
637 } else {
638 bl.prepare_iov(&aio.iov);
639 for (unsigned i=0; i<aio.iov.size(); ++i) {
640 dout(30) << "aio " << i << " " << aio.iov[i].iov_base
641 << " " << aio.iov[i].iov_len << dendl;
642 }
643 aio.bl.claim_append(bl);
644 aio.pwritev(off, len);
645 }
646 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
647 << std::dec << " aio " << &aio << dendl;
648 } else
649 #endif
650 {
651 int r = _sync_write(off, bl, buffered);
652 _aio_log_finish(ioc, off, len);
653 if (r < 0)
654 return r;
655 }
656 return 0;
657 }
658
659 int KernelDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
660 IOContext *ioc,
661 bool buffered)
662 {
663 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
664 << (buffered ? " (buffered)" : " (direct)")
665 << dendl;
666 assert(off % block_size == 0);
667 assert(len % block_size == 0);
668 assert(len > 0);
669 assert(off < size);
670 assert(off + len <= size);
671
672 _aio_log_start(ioc, off, len);
673
674 bufferptr p = buffer::create_page_aligned(len);
675 int r = ::pread(buffered ? fd_buffered : fd_direct,
676 p.c_str(), len, off);
677 if (r < 0) {
678 r = -errno;
679 goto out;
680 }
681 assert((uint64_t)r == len);
682 pbl->push_back(std::move(p));
683
684 dout(40) << "data: ";
685 pbl->hexdump(*_dout);
686 *_dout << dendl;
687
688 out:
689 _aio_log_finish(ioc, off, len);
690 return r < 0 ? r : 0;
691 }
692
693 int KernelDevice::aio_read(
694 uint64_t off,
695 uint64_t len,
696 bufferlist *pbl,
697 IOContext *ioc)
698 {
699 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
700 << dendl;
701
702 int r = 0;
703 #ifdef HAVE_LIBAIO
704 if (aio && dio) {
705 _aio_log_start(ioc, off, len);
706 ioc->pending_aios.push_back(aio_t(ioc, fd_direct));
707 ++ioc->num_pending;
708 aio_t& aio = ioc->pending_aios.back();
709 aio.pread(off, len);
710 for (unsigned i=0; i<aio.iov.size(); ++i) {
711 dout(30) << "aio " << i << " " << aio.iov[i].iov_base
712 << " " << aio.iov[i].iov_len << dendl;
713 }
714 pbl->append(aio.bl);
715 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len
716 << std::dec << " aio " << &aio << dendl;
717 } else
718 #endif
719 {
720 r = read(off, len, pbl, ioc, false);
721 }
722
723 return r;
724 }
725
726 int KernelDevice::direct_read_unaligned(uint64_t off, uint64_t len, char *buf)
727 {
728 uint64_t aligned_off = align_down(off, block_size);
729 uint64_t aligned_len = align_up(off+len, block_size) - aligned_off;
730 bufferptr p = buffer::create_page_aligned(aligned_len);
731 int r = 0;
732
733 r = ::pread(fd_direct, p.c_str(), aligned_len, aligned_off);
734 if (r < 0) {
735 r = -errno;
736 derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
737 << " error: " << cpp_strerror(r) << dendl;
738 goto out;
739 }
740 assert((uint64_t)r == aligned_len);
741 memcpy(buf, p.c_str() + (off - aligned_off), len);
742
743 dout(40) << __func__ << " data: ";
744 bufferlist bl;
745 bl.append(buf, len);
746 bl.hexdump(*_dout);
747 *_dout << dendl;
748
749 out:
750 return r < 0 ? r : 0;
751 }
752
753 int KernelDevice::read_random(uint64_t off, uint64_t len, char *buf,
754 bool buffered)
755 {
756 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
757 << dendl;
758 assert(len > 0);
759 assert(off < size);
760 assert(off + len <= size);
761 int r = 0;
762
763 //if it's direct io and unaligned, we have to use a internal buffer
764 if (!buffered && ((off % block_size != 0)
765 || (len % block_size != 0)
766 || (uintptr_t(buf) % CEPH_PAGE_SIZE != 0)))
767 return direct_read_unaligned(off, len, buf);
768
769 if (buffered) {
770 //buffered read
771 char *t = buf;
772 uint64_t left = len;
773 while (left > 0) {
774 r = ::pread(fd_buffered, t, left, off);
775 if (r < 0) {
776 r = -errno;
777 derr << __func__ << " 0x" << std::hex << off << "~" << left
778 << std::dec << " error: " << cpp_strerror(r) << dendl;
779 goto out;
780 }
781 off += r;
782 t += r;
783 left -= r;
784 }
785 } else {
786 //direct and aligned read
787 r = ::pread(fd_direct, buf, len, off);
788 if (r < 0) {
789 r = -errno;
790 derr << __func__ << " direct_aligned_read" << " 0x" << std::hex
791 << off << "~" << left << std::dec << " error: " << cpp_strerror(r)
792 << dendl;
793 goto out;
794 }
795 assert((uint64_t)r == len);
796 }
797
798 dout(40) << __func__ << " data: ";
799 bufferlist bl;
800 bl.append(buf, len);
801 bl.hexdump(*_dout);
802 *_dout << dendl;
803
804 out:
805 return r < 0 ? r : 0;
806 }
807
808 int KernelDevice::invalidate_cache(uint64_t off, uint64_t len)
809 {
810 dout(5) << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
811 << dendl;
812 assert(off % block_size == 0);
813 assert(len % block_size == 0);
814 int r = posix_fadvise(fd_buffered, off, len, POSIX_FADV_DONTNEED);
815 if (r) {
816 r = -r;
817 derr << __func__ << " 0x" << std::hex << off << "~" << len << std::dec
818 << " error: " << cpp_strerror(r) << dendl;
819 }
820 return r;
821 }
822