1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2015 XSky <haomai@xsky.com>
9 * Author: Haomai Wang <haomaiwang@gmail.com>
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
20 #include <sys/types.h>
29 #include <xmmintrin.h>
31 #include <spdk/nvme.h>
33 #include <rte_lcore.h>
35 #include "include/stringify.h"
36 #include "include/types.h"
37 #include "include/compat.h"
38 #include "common/align.h"
39 #include "common/errno.h"
40 #include "common/debug.h"
41 #include "common/perf_counters.h"
42 #include "common/io_priority.h"
44 #include "NVMEDevice.h"
46 #define dout_context g_ceph_context
47 #define dout_subsys ceph_subsys_bdev
49 #define dout_prefix *_dout << "bdev(" << sn << ") "
51 static constexpr uint16_t data_buffer_default_num
= 2048;
53 static constexpr uint32_t data_buffer_size
= 8192;
55 static constexpr uint16_t inline_segment_num
= 32;
57 static thread_local
int queue_id
= -1;
60 l_bluestore_nvmedevice_first
= 632430,
61 l_bluestore_nvmedevice_aio_write_lat
,
62 l_bluestore_nvmedevice_read_lat
,
63 l_bluestore_nvmedevice_flush_lat
,
64 l_bluestore_nvmedevice_aio_write_queue_lat
,
65 l_bluestore_nvmedevice_read_queue_lat
,
66 l_bluestore_nvmedevice_flush_queue_lat
,
67 l_bluestore_nvmedevice_queue_ops
,
68 l_bluestore_nvmedevice_polling_lat
,
69 l_bluestore_nvmedevice_buffer_alloc_failed
,
70 l_bluestore_nvmedevice_last
73 static void io_complete(void *t
, const struct spdk_nvme_cpl
*completion
);
75 int dpdk_thread_adaptor(void *f
)
77 (*static_cast<std::function
<void ()>*>(f
))();
87 uint16_t cur_seg_idx
= 0;
89 uint32_t cur_seg_left
= 0;
90 void *inline_segs
[inline_segment_num
];
91 void **extra_segs
= nullptr;
94 class SharedDriverQueueData
{
95 SharedDriverData
*driver
;
96 spdk_nvme_ctrlr
*ctrlr
;
100 uint32_t sector_size
;
103 struct spdk_nvme_qpair
*qpair
;
104 std::function
<void ()> run_func
;
105 friend class AioCompletionThread
;
107 bool aio_stop
= false;
109 int alloc_buf_from_pool(Task
*t
, bool write
);
111 std::atomic_bool queue_empty
;
114 std::queue
<Task
*> task_queue
;
118 std::atomic_int flush_waiters
;
119 std::set
<uint64_t> flush_waiter_seqs
;
122 std::atomic_ulong completed_op_seq
, queue_op_seq
;
123 std::vector
<void*> data_buf_mempool
;
124 PerfCounters
*logger
= nullptr;
126 SharedDriverQueueData(SharedDriverData
*driver
, spdk_nvme_ctrlr
*c
, spdk_nvme_ns
*ns
, uint64_t block_size
,
127 const std::string
&sn_tag
, uint32_t sector_size
, uint32_t core
, uint32_t queue_id
)
132 block_size(block_size
),
133 sector_size(sector_size
),
136 run_func([this]() { _aio_thread(); }),
138 queue_lock("NVMEDevice::queue_lock"),
139 flush_lock("NVMEDevice::flush_lock"),
141 completed_op_seq(0), queue_op_seq(0) {
143 qpair
= spdk_nvme_ctrlr_alloc_io_qpair(ctrlr
, SPDK_NVME_QPRIO_URGENT
);
144 PerfCountersBuilder
b(g_ceph_context
, string("NVMEDevice-AIOThread-"+stringify(this)),
145 l_bluestore_nvmedevice_first
, l_bluestore_nvmedevice_last
);
146 b
.add_time_avg(l_bluestore_nvmedevice_aio_write_lat
, "aio_write_lat", "Average write completing latency");
147 b
.add_time_avg(l_bluestore_nvmedevice_read_lat
, "read_lat", "Average read completing latency");
148 b
.add_time_avg(l_bluestore_nvmedevice_flush_lat
, "flush_lat", "Average flush completing latency");
149 b
.add_u64(l_bluestore_nvmedevice_queue_ops
, "queue_ops", "Operations in nvme queue");
150 b
.add_time_avg(l_bluestore_nvmedevice_polling_lat
, "polling_lat", "Average polling latency");
151 b
.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat
, "aio_write_queue_lat", "Average queue write request latency");
152 b
.add_time_avg(l_bluestore_nvmedevice_read_queue_lat
, "read_queue_lat", "Average queue read request latency");
153 b
.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat
, "flush_queue_lat", "Average queue flush request latency");
154 b
.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed
, "buffer_alloc_failed", "Alloc data buffer failed count");
155 logger
= b
.create_perf_counters();
156 g_ceph_context
->get_perfcounters_collection()->add(logger
);
159 void queue_task(Task
*t
, uint64_t ops
= 1) {
161 Mutex::Locker
l(queue_lock
);
163 if (queue_empty
.load()) {
170 uint64_t cur_seq
= queue_op_seq
.load();
171 uint64_t left
= cur_seq
- completed_op_seq
.load();
172 if (cur_seq
> completed_op_seq
) {
173 // TODO: this may contains read op
174 dout(10) << __func__
<< " existed inflight ops " << left
<< dendl
;
175 Mutex::Locker
l(flush_lock
);
177 flush_waiter_seqs
.insert(cur_seq
);
178 while (cur_seq
> completed_op_seq
.load()) {
179 flush_cond
.Wait(flush_lock
);
181 flush_waiter_seqs
.erase(cur_seq
);
187 int r
= rte_eal_remote_launch(dpdk_thread_adaptor
, static_cast<void*>(&run_func
),
195 Mutex::Locker
l(queue_lock
);
199 int r
= rte_eal_wait_lcore(core_id
);
204 ~SharedDriverQueueData() {
205 g_ceph_context
->get_perfcounters_collection()->remove(logger
);
207 spdk_nvme_ctrlr_free_io_qpair(qpair
);
213 class SharedDriverData
{
218 spdk_nvme_ctrlr
*ctrlr
;
220 uint64_t block_size
= 0;
221 uint32_t sector_size
= 0;
223 uint32_t queue_number
;
224 std::vector
<SharedDriverQueueData
*> queues
;
227 for (auto &&it
: queues
)
231 for (auto &&it
: queues
)
236 std::vector
<NVMEDevice
*> registered_devices
;
237 SharedDriverData(unsigned _id
, const std::string
&sn_tag
,
238 spdk_nvme_ctrlr
*c
, spdk_nvme_ns
*ns
)
244 sector_size
= spdk_nvme_ns_get_sector_size(ns
);
245 block_size
= std::max(CEPH_PAGE_SIZE
, sector_size
);
246 size
= ((uint64_t)sector_size
) * spdk_nvme_ns_get_num_sectors(ns
);
248 RTE_LCORE_FOREACH_SLAVE(i
) {
249 queues
.push_back(new SharedDriverQueueData(this, ctrlr
, ns
, block_size
, sn
, sector_size
, i
, queue_number
++));
255 bool is_equal(const string
&tag
) const { return sn
== tag
; }
256 ~SharedDriverData() {
257 for (auto p
: queues
) {
262 SharedDriverQueueData
*get_queue(uint32_t i
) {
263 return queues
.at(i
%queue_number
);
266 void register_device(NVMEDevice
*device
) {
267 // in case of registered_devices, we stop thread now.
268 // Because release is really a rare case, we could bear this
270 registered_devices
.push_back(device
);
274 void remove_device(NVMEDevice
*device
) {
276 std::vector
<NVMEDevice
*> new_devices
;
277 for (auto &&it
: registered_devices
) {
279 new_devices
.push_back(it
);
281 registered_devices
.swap(new_devices
);
285 uint64_t get_block_size() {
288 uint64_t get_size() {
295 IOContext
*ctx
= nullptr;
300 std::function
<void()> fill_cb
;
301 Task
*next
= nullptr;
303 ceph::coarse_real_clock::time_point start
;
304 IORequest io_request
;
306 std::condition_variable cond
;
307 SharedDriverQueueData
*queue
;
308 Task(NVMEDevice
*dev
, IOCommand c
, uint64_t off
, uint64_t l
, int64_t rc
= 0)
309 : device(dev
), command(c
), offset(off
), len(l
),
311 start(ceph::coarse_real_clock::now()) {}
313 assert(!io_request
.nseg
);
315 void release_segs(SharedDriverQueueData
*queue_data
) {
316 if (io_request
.extra_segs
) {
317 for (uint16_t i
= 0; i
< io_request
.nseg
; i
++)
318 queue_data
->data_buf_mempool
.push_back(io_request
.extra_segs
[i
]);
319 delete io_request
.extra_segs
;
320 } else if (io_request
.nseg
) {
321 for (uint16_t i
= 0; i
< io_request
.nseg
; i
++)
322 queue_data
->data_buf_mempool
.push_back(io_request
.inline_segs
[i
]);
327 void copy_to_buf(char *buf
, uint64_t off
, uint64_t len
) {
330 void **segs
= io_request
.extra_segs
? io_request
.extra_segs
: io_request
.inline_segs
;
333 char *src
= static_cast<char*>(segs
[i
++]);
334 uint64_t need_copy
= std::min(left
, data_buffer_size
-off
);
335 memcpy(buf
+copied
, src
+off
, need_copy
);
343 std::unique_lock
<std::mutex
> l(lock
);
348 std::lock_guard
<std::mutex
> l(lock
);
353 static void data_buf_reset_sgl(void *cb_arg
, uint32_t sgl_offset
)
355 Task
*t
= static_cast<Task
*>(cb_arg
);
356 uint32_t i
= sgl_offset
/ data_buffer_size
;
357 uint32_t offset
= i
* data_buffer_size
;
358 assert(i
<= t
->io_request
.nseg
);
360 for (; i
< t
->io_request
.nseg
; i
++) {
361 offset
+= data_buffer_size
;
362 if (offset
> sgl_offset
) {
369 t
->io_request
.cur_seg_idx
= i
;
370 t
->io_request
.cur_seg_left
= offset
- sgl_offset
;
374 static int data_buf_next_sge(void *cb_arg
, void **address
, uint32_t *length
)
378 Task
*t
= static_cast<Task
*>(cb_arg
);
379 if (t
->io_request
.cur_seg_idx
>= t
->io_request
.nseg
) {
385 addr
= t
->io_request
.extra_segs
? t
->io_request
.extra_segs
[t
->io_request
.cur_seg_idx
] : t
->io_request
.inline_segs
[t
->io_request
.cur_seg_idx
];
387 size
= data_buffer_size
;
388 if (t
->io_request
.cur_seg_idx
== t
->io_request
.nseg
- 1) {
389 uint64_t tail
= t
->len
% data_buffer_size
;
391 size
= (uint32_t) tail
;
395 if (t
->io_request
.cur_seg_left
) {
396 *address
= (void *)((uint64_t)addr
+ size
- t
->io_request
.cur_seg_left
);
397 *length
= t
->io_request
.cur_seg_left
;
398 t
->io_request
.cur_seg_left
= 0;
404 t
->io_request
.cur_seg_idx
++;
408 int SharedDriverQueueData::alloc_buf_from_pool(Task
*t
, bool write
)
410 uint64_t count
= t
->len
/ data_buffer_size
;
411 if (t
->len
% data_buffer_size
)
414 if (count
> data_buf_mempool
.size())
416 if (count
<= inline_segment_num
) {
417 segs
= t
->io_request
.inline_segs
;
419 t
->io_request
.extra_segs
= new void*[count
];
420 segs
= t
->io_request
.extra_segs
;
422 for (uint16_t i
= 0; i
< count
; i
++) {
423 segs
[i
] = data_buf_mempool
.back();
424 data_buf_mempool
.pop_back();
426 t
->io_request
.nseg
= count
;
428 auto blp
= t
->write_bl
.begin();
431 for (; i
< count
- 1; ++i
) {
432 blp
.copy(data_buffer_size
, static_cast<char*>(segs
[i
]));
433 len
+= data_buffer_size
;
435 blp
.copy(t
->write_bl
.length() - len
, static_cast<char*>(segs
[i
]));
441 void SharedDriverQueueData::_aio_thread()
443 dout(1) << __func__
<< " start" << dendl
;
445 if (data_buf_mempool
.empty()) {
446 for (uint16_t i
= 0; i
< data_buffer_default_num
; i
++) {
447 void *b
= spdk_zmalloc(data_buffer_size
, CEPH_PAGE_SIZE
, NULL
);
449 derr
<< __func__
<< " failed to create memory pool for nvme data buffer" << dendl
;
452 data_buf_mempool
.push_back(b
);
458 uint64_t lba_off
, lba_count
;
460 ceph::coarse_real_clock::time_point cur
, start
461 = ceph::coarse_real_clock::now();
463 bool inflight
= queue_op_seq
.load() - completed_op_seq
.load();
465 dout(40) << __func__
<< " polling" << dendl
;
467 if (!spdk_nvme_qpair_process_completions(qpair
, g_conf
->bluestore_spdk_max_io_completion
)) {
468 dout(30) << __func__
<< " idle, have a pause" << dendl
;
473 for (; t
; t
= t
->next
) {
475 lba_off
= t
->offset
/ sector_size
;
476 lba_count
= t
->len
/ sector_size
;
477 switch (t
->command
) {
478 case IOCommand::WRITE_COMMAND
:
480 dout(20) << __func__
<< " write command issued " << lba_off
<< "~" << lba_count
<< dendl
;
481 r
= alloc_buf_from_pool(t
, true);
483 logger
->inc(l_bluestore_nvmedevice_buffer_alloc_failed
);
487 r
= spdk_nvme_ns_cmd_writev(
488 ns
, qpair
, lba_off
, lba_count
, io_complete
, t
, 0,
489 data_buf_reset_sgl
, data_buf_next_sge
);
491 derr
<< __func__
<< " failed to do write command" << dendl
;
492 t
->ctx
->nvme_task_first
= t
->ctx
->nvme_task_last
= nullptr;
493 t
->release_segs(this);
497 cur
= ceph::coarse_real_clock::now();
498 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(cur
- start
);
499 logger
->tinc(l_bluestore_nvmedevice_aio_write_queue_lat
, dur
);
502 case IOCommand::READ_COMMAND
:
504 dout(20) << __func__
<< " read command issued " << lba_off
<< "~" << lba_count
<< dendl
;
505 r
= alloc_buf_from_pool(t
, false);
507 logger
->inc(l_bluestore_nvmedevice_buffer_alloc_failed
);
511 r
= spdk_nvme_ns_cmd_readv(
512 ns
, qpair
, lba_off
, lba_count
, io_complete
, t
, 0,
513 data_buf_reset_sgl
, data_buf_next_sge
);
515 derr
<< __func__
<< " failed to read" << dendl
;
516 t
->release_segs(this);
520 cur
= ceph::coarse_real_clock::now();
521 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(cur
- start
);
522 logger
->tinc(l_bluestore_nvmedevice_read_queue_lat
, dur
);
526 case IOCommand::FLUSH_COMMAND
:
528 dout(20) << __func__
<< " flush command issueed " << dendl
;
529 r
= spdk_nvme_ns_cmd_flush(ns
, qpair
, io_complete
, t
);
531 derr
<< __func__
<< " failed to flush" << dendl
;
532 t
->release_segs(this);
536 cur
= ceph::coarse_real_clock::now();
537 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(cur
- start
);
538 logger
->tinc(l_bluestore_nvmedevice_flush_queue_lat
, dur
);
545 if (!queue_empty
.load()) {
546 Mutex::Locker
l(queue_lock
);
547 if (!task_queue
.empty()) {
548 t
= task_queue
.front();
550 logger
->set(l_bluestore_nvmedevice_queue_ops
, task_queue
.size());
555 if (flush_waiters
.load()) {
556 Mutex::Locker
l(flush_lock
);
557 if (*flush_waiter_seqs
.begin() <= completed_op_seq
.load())
562 // be careful, here we need to let each thread reap its own, currently it is done
563 // by only one dedicatd dpdk thread
565 for (auto &&it
: driver
->registered_devices
)
569 Mutex::Locker
l(queue_lock
);
570 if (queue_empty
.load()) {
571 cur
= ceph::coarse_real_clock::now();
572 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(cur
- start
);
573 logger
->tinc(l_bluestore_nvmedevice_polling_lat
, dur
);
576 queue_cond
.Wait(queue_lock
);
577 start
= ceph::coarse_real_clock::now();
582 assert(data_buf_mempool
.size() == data_buffer_default_num
);
583 dout(1) << __func__
<< " end" << dendl
;
586 #define dout_subsys ceph_subsys_bdev
588 #define dout_prefix *_dout << "bdev "
592 struct ProbeContext
{
594 NVMEManager
*manager
;
595 SharedDriverData
*driver
;
602 std::vector
<SharedDriverData
*> shared_driver_datas
;
603 std::thread dpdk_thread
;
604 std::mutex probe_queue_lock
;
605 std::condition_variable probe_queue_cond
;
606 std::list
<ProbeContext
*> probe_queue
;
610 : lock("NVMEDevice::NVMEManager::lock") {}
611 int try_get(const string
&sn_tag
, SharedDriverData
**driver
);
612 void register_ctrlr(const string
&sn_tag
, spdk_nvme_ctrlr
*c
, struct spdk_pci_device
*pci_dev
,
613 SharedDriverData
**driver
) {
614 assert(lock
.is_locked());
616 int num_ns
= spdk_nvme_ctrlr_get_num_ns(c
);
619 dout(0) << __func__
<< " namespace count larger than 1, currently only use the first namespace" << dendl
;
621 ns
= spdk_nvme_ctrlr_get_ns(c
, 1);
623 derr
<< __func__
<< " failed to get namespace at 1" << dendl
;
626 dout(1) << __func__
<< " successfully attach nvme device at" << spdk_pci_device_get_bus(pci_dev
)
627 << ":" << spdk_pci_device_get_dev(pci_dev
) << ":" << spdk_pci_device_get_func(pci_dev
) << dendl
;
629 // only support one device per osd now!
630 assert(shared_driver_datas
.empty());
631 // index 0 is occured by master thread
632 shared_driver_datas
.push_back(new SharedDriverData(shared_driver_datas
.size()+1, sn_tag
, c
, ns
));
633 *driver
= shared_driver_datas
.back();
637 static NVMEManager manager
;
639 static bool probe_cb(void *cb_ctx
, const struct spdk_nvme_transport_id
*trid
, struct spdk_nvme_ctrlr_opts
*opts
)
641 NVMEManager::ProbeContext
*ctx
= static_cast<NVMEManager::ProbeContext
*>(cb_ctx
);
642 char serial_number
[128];
643 struct spdk_pci_addr pci_addr
;
644 struct spdk_pci_device
*pci_dev
= NULL
;
647 if (trid
->trtype
!= SPDK_NVME_TRANSPORT_PCIE
) {
648 dout(0) << __func__
<< " only probe local nvme device" << dendl
;
652 result
= spdk_pci_addr_parse(&pci_addr
, trid
->traddr
);
654 dout(0) << __func__
<< " failed to get pci address from %s, " << trid
->traddr
<< " return value is: %d" << result
<< dendl
;
658 pci_dev
= spdk_pci_get_device(&pci_addr
);
660 dout(0) << __func__
<< " failed to get pci device" << dendl
;
664 dout(0) << __func__
<< " found device at bus: " << spdk_pci_device_get_bus(pci_dev
)
665 << ":" << spdk_pci_device_get_dev(pci_dev
) << ":"
666 << spdk_pci_device_get_func(pci_dev
) << " vendor:0x" << spdk_pci_device_get_vendor_id(pci_dev
) << " device:0x" << spdk_pci_device_get_device_id(pci_dev
)
668 result
= spdk_pci_device_get_serial_number(pci_dev
, serial_number
, 128);
670 dout(10) << __func__
<< " failed to get serial number from %p" << pci_dev
<< dendl
;
674 if (ctx
->sn_tag
.compare(string(serial_number
, 16))) {
675 dout(0) << __func__
<< " device serial number (" << ctx
->sn_tag
<< ") not match " << serial_number
<< dendl
;
682 static void attach_cb(void *cb_ctx
, const struct spdk_nvme_transport_id
*trid
,
683 struct spdk_nvme_ctrlr
*ctrlr
, const struct spdk_nvme_ctrlr_opts
*opts
)
685 struct spdk_pci_addr pci_addr
;
686 struct spdk_pci_device
*pci_dev
= NULL
;
688 spdk_pci_addr_parse(&pci_addr
, trid
->traddr
);
690 pci_dev
= spdk_pci_get_device(&pci_addr
);
692 dout(0) << __func__
<< " failed to get pci device" << dendl
;
696 NVMEManager::ProbeContext
*ctx
= static_cast<NVMEManager::ProbeContext
*>(cb_ctx
);
697 ctx
->manager
->register_ctrlr(ctx
->sn_tag
, ctrlr
, pci_dev
, &ctx
->driver
);
700 int NVMEManager::try_get(const string
&sn_tag
, SharedDriverData
**driver
)
702 Mutex::Locker
l(lock
);
704 unsigned long long core_value
;
705 uint32_t core_num
= 0;
707 uint32_t mem_size_arg
= g_conf
->bluestore_spdk_mem
;
708 char *coremask_arg
= (char *)g_conf
->bluestore_spdk_coremask
.c_str();
710 if (sn_tag
.empty()) {
712 derr
<< __func__
<< " empty serial number: " << cpp_strerror(r
) << dendl
;
716 core_value
= strtoll(coremask_arg
, NULL
, 16);
717 for (uint32_t i
= 0; i
< sizeof(long long) * 8; i
++) {
718 bool tmp
= (core_value
>> i
) & 0x1;
721 // select the least signficant bit as the master core
728 // at least two cores are needed for using spdk
731 derr
<< __func__
<< " invalid spdk coremask, at least two cores are needed: "
732 << cpp_strerror(r
) << dendl
;
736 for (auto &&it
: shared_driver_datas
) {
737 if (it
->is_equal(sn_tag
)) {
745 dpdk_thread
= std::thread(
746 [this, coremask_arg
, m_core_arg
, mem_size_arg
]() {
747 static struct spdk_env_opts opts
;
750 spdk_env_opts_init(&opts
);
751 opts
.name
= "ceph-osd";
752 opts
.core_mask
= coremask_arg
;
753 opts
.dpdk_master_core
= m_core_arg
;
754 opts
.dpdk_mem_size
= mem_size_arg
;
755 spdk_env_init(&opts
);
757 spdk_nvme_retry_count
= g_ceph_context
->_conf
->bdev_nvme_retry_count
;
758 if (spdk_nvme_retry_count
< 0)
759 spdk_nvme_retry_count
= SPDK_NVME_DEFAULT_RETRY_COUNT
;
761 std::unique_lock
<std::mutex
> l(probe_queue_lock
);
763 if (!probe_queue
.empty()) {
764 ProbeContext
* ctxt
= probe_queue
.front();
765 probe_queue
.pop_front();
766 r
= spdk_nvme_probe(NULL
, ctxt
, probe_cb
, attach_cb
, NULL
);
768 assert(!ctxt
->driver
);
769 derr
<< __func__
<< " device probe nvme failed" << dendl
;
772 probe_queue_cond
.notify_all();
774 probe_queue_cond
.wait(l
);
779 dpdk_thread
.detach();
782 ProbeContext ctx
= {sn_tag
, this, nullptr, false};
784 std::unique_lock
<std::mutex
> l(probe_queue_lock
);
785 probe_queue
.push_back(&ctx
);
787 probe_queue_cond
.wait(l
);
791 *driver
= ctx
.driver
;
796 void io_complete(void *t
, const struct spdk_nvme_cpl
*completion
)
798 Task
*task
= static_cast<Task
*>(t
);
799 IOContext
*ctx
= task
->ctx
;
800 SharedDriverQueueData
*queue
= task
->queue
;
802 assert(queue
!= NULL
);
804 ++queue
->completed_op_seq
;
805 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(
806 ceph::coarse_real_clock::now() - task
->start
);
807 if (task
->command
== IOCommand::WRITE_COMMAND
) {
808 queue
->logger
->tinc(l_bluestore_nvmedevice_aio_write_lat
, dur
);
809 assert(!spdk_nvme_cpl_is_error(completion
));
810 dout(20) << __func__
<< " write/zero op successfully, left "
811 << queue
->queue_op_seq
- queue
->completed_op_seq
<< dendl
;
812 // check waiting count before doing callback (which may
813 // destroy this ioc).
815 if (!--ctx
->num_running
) {
816 task
->device
->aio_callback(task
->device
->aio_callback_priv
, ctx
->priv
);
821 task
->release_segs(queue
);
823 } else if (task
->command
== IOCommand::READ_COMMAND
) {
824 queue
->logger
->tinc(l_bluestore_nvmedevice_read_lat
, dur
);
825 assert(!spdk_nvme_cpl_is_error(completion
));
826 dout(20) << __func__
<< " read op successfully" << dendl
;
828 task
->release_segs(queue
);
829 // read submitted by AIO
830 if(!task
->return_code
) {
832 if (!--ctx
->num_running
) {
833 task
->device
->aio_callback(task
->device
->aio_callback_priv
, ctx
->priv
);
840 task
->return_code
= 0;
841 if (!--ctx
->num_running
) {
846 assert(task
->command
== IOCommand::FLUSH_COMMAND
);
847 assert(!spdk_nvme_cpl_is_error(completion
));
848 queue
->logger
->tinc(l_bluestore_nvmedevice_flush_lat
, dur
);
849 dout(20) << __func__
<< " flush op successfully" << dendl
;
850 task
->return_code
= 0;
856 #define dout_prefix *_dout << "bdev(" << name << ") "
858 NVMEDevice::NVMEDevice(CephContext
* cct
, aio_callback_t cb
, void *cbpriv
)
864 buffer_lock("NVMEDevice::buffer_lock"),
866 aio_callback_priv(cbpriv
)
871 int NVMEDevice::open(const string
& p
)
874 dout(1) << __func__
<< " path " << p
<< dendl
;
876 string serial_number
;
877 int fd
= ::open(p
.c_str(), O_RDONLY
);
880 derr
<< __func__
<< " unable to open " << p
<< ": " << cpp_strerror(r
)
885 r
= ::read(fd
, buf
, sizeof(buf
));
886 VOID_TEMP_FAILURE_RETRY(::close(fd
));
887 fd
= -1; // defensive
894 derr
<< __func__
<< " unable to read " << p
<< ": " << cpp_strerror(r
) << dendl
;
897 /* scan buf from the beginning with isxdigit. */
899 while (i
< r
&& isxdigit(buf
[i
])) {
902 serial_number
= string(buf
, i
);
903 r
= manager
.try_get(serial_number
, &driver
);
905 derr
<< __func__
<< " failed to get nvme device with sn " << serial_number
<< dendl
;
909 driver
->register_device(this);
910 block_size
= driver
->get_block_size();
911 size
= driver
->get_size();
912 name
= serial_number
;
914 //nvme is non-rotational device.
917 // round size down to an even block
918 size
&= ~(block_size
- 1);
920 dout(1) << __func__
<< " size " << size
<< " (" << pretty_si_t(size
) << "B)"
921 << " block_size " << block_size
<< " (" << pretty_si_t(block_size
)
927 void NVMEDevice::close()
929 dout(1) << __func__
<< dendl
;
932 driver
->remove_device(this);
934 dout(1) << __func__
<< " end" << dendl
;
937 int NVMEDevice::collect_metadata(string prefix
, map
<string
,string
> *pm
) const
939 (*pm
)[prefix
+ "rotational"] = "0";
940 (*pm
)[prefix
+ "size"] = stringify(get_size());
941 (*pm
)[prefix
+ "block_size"] = stringify(get_block_size());
942 (*pm
)[prefix
+ "driver"] = "NVMEDevice";
943 (*pm
)[prefix
+ "type"] = "nvme";
944 (*pm
)[prefix
+ "access_mode"] = "spdk";
945 (*pm
)[prefix
+ "nvme_serial_number"] = name
;
950 int NVMEDevice::flush()
952 dout(10) << __func__
<< " start" << dendl
;
953 auto start
= ceph::coarse_real_clock::now();
956 queue_id
= ceph_gettid();
957 SharedDriverQueueData
*queue
= driver
->get_queue(queue_id
);
958 assert(queue
!= NULL
);
960 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(
961 ceph::coarse_real_clock::now() - start
);
962 queue
->logger
->tinc(l_bluestore_nvmedevice_flush_lat
, dur
);
966 void NVMEDevice::aio_submit(IOContext
*ioc
)
968 dout(20) << __func__
<< " ioc " << ioc
<< " pending "
969 << ioc
->num_pending
.load() << " running "
970 << ioc
->num_running
.load() << dendl
;
971 int pending
= ioc
->num_pending
.load();
972 Task
*t
= static_cast<Task
*>(ioc
->nvme_task_first
);
974 ioc
->num_running
+= pending
;
975 ioc
->num_pending
-= pending
;
976 assert(ioc
->num_pending
.load() == 0); // we should be only thread doing this
977 // Only need to push the first entry
979 queue_id
= ceph_gettid();
980 driver
->get_queue(queue_id
)->queue_task(t
, pending
);
981 ioc
->nvme_task_first
= ioc
->nvme_task_last
= nullptr;
985 int NVMEDevice::aio_write(
991 uint64_t len
= bl
.length();
992 dout(20) << __func__
<< " " << off
<< "~" << len
<< " ioc " << ioc
993 << " buffered " << buffered
<< dendl
;
994 assert(off
% block_size
== 0);
995 assert(len
% block_size
== 0);
998 assert(off
+ len
<= size
);
1000 Task
*t
= new Task(this, IOCommand::WRITE_COMMAND
, off
, len
);
1002 // TODO: if upper layer alloc memory with known physical address,
1003 // we can reduce this copy
1004 t
->write_bl
= std::move(bl
);
1007 // Only need to push the first entry
1009 queue_id
= ceph_gettid();
1010 driver
->get_queue(queue_id
)->queue_task(t
);
1013 Task
*first
= static_cast<Task
*>(ioc
->nvme_task_first
);
1014 Task
*last
= static_cast<Task
*>(ioc
->nvme_task_last
);
1018 ioc
->nvme_task_first
= t
;
1019 ioc
->nvme_task_last
= t
;
1023 dout(5) << __func__
<< " " << off
<< "~" << len
<< dendl
;
1028 int NVMEDevice::write(uint64_t off
, bufferlist
&bl
, bool buffered
)
1030 // FIXME: there is presumably a more efficient way to do this...
1031 IOContext
ioc(cct
, NULL
);
1032 aio_write(off
, bl
, &ioc
, buffered
);
1037 int NVMEDevice::read(uint64_t off
, uint64_t len
, bufferlist
*pbl
,
1041 dout(5) << __func__
<< " " << off
<< "~" << len
<< " ioc " << ioc
<< dendl
;
1042 assert(off
% block_size
== 0);
1043 assert(len
% block_size
== 0);
1046 assert(off
+ len
<= size
);
1048 Task
*t
= new Task(this, IOCommand::READ_COMMAND
, off
, len
, 1);
1049 bufferptr p
= buffer::create_page_aligned(len
);
1052 char *buf
= p
.c_str();
1053 t
->fill_cb
= [buf
, t
]() {
1054 t
->copy_to_buf(buf
, 0, t
->len
);
1058 queue_id
= ceph_gettid();
1059 driver
->get_queue(queue_id
)->queue_task(t
);
1061 while(t
->return_code
> 0) {
1064 pbl
->push_back(std::move(p
));
1070 int NVMEDevice::aio_read(
1076 dout(20) << __func__
<< " " << off
<< "~" << len
<< " ioc " << ioc
<< dendl
;
1077 assert(off
% block_size
== 0);
1078 assert(len
% block_size
== 0);
1081 assert(off
+ len
<= size
);
1083 Task
*t
= new Task(this, IOCommand::READ_COMMAND
, off
, len
);
1085 bufferptr p
= buffer::create_page_aligned(len
);
1088 char *buf
= p
.c_str();
1089 t
->fill_cb
= [buf
, t
]() {
1090 t
->copy_to_buf(buf
, 0, t
->len
);
1093 Task
*first
= static_cast<Task
*>(ioc
->nvme_task_first
);
1094 Task
*last
= static_cast<Task
*>(ioc
->nvme_task_last
);
1098 ioc
->nvme_task_first
= t
;
1099 ioc
->nvme_task_last
= t
;
1105 int NVMEDevice::read_random(uint64_t off
, uint64_t len
, char *buf
, bool buffered
)
1109 assert(off
+ len
<= size
);
1111 uint64_t aligned_off
= align_down(off
, block_size
);
1112 uint64_t aligned_len
= align_up(off
+len
, block_size
) - aligned_off
;
1113 dout(5) << __func__
<< " " << off
<< "~" << len
1114 << " aligned " << aligned_off
<< "~" << aligned_len
<< dendl
;
1115 IOContext
ioc(g_ceph_context
, nullptr);
1116 Task
*t
= new Task(this, IOCommand::READ_COMMAND
, aligned_off
, aligned_len
, 1);
1119 t
->fill_cb
= [buf
, t
, off
, len
]() {
1120 t
->copy_to_buf(buf
, off
-t
->offset
, len
);
1124 queue_id
= ceph_gettid();
1125 driver
->get_queue(queue_id
)->queue_task(t
);
1127 while(t
->return_code
> 0) {
1135 int NVMEDevice::invalidate_cache(uint64_t off
, uint64_t len
)
1137 dout(5) << __func__
<< " " << off
<< "~" << len
<< dendl
;