1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2015 XSky <haomai@xsky.com>
9 * Author: Haomai Wang <haomaiwang@gmail.com>
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
21 #include <sys/types.h>
32 #include <spdk/nvme.h>
34 #include "include/intarith.h"
35 #include "include/stringify.h"
36 #include "include/types.h"
37 #include "include/compat.h"
38 #include "common/errno.h"
39 #include "common/debug.h"
40 #include "common/perf_counters.h"
42 #include "NVMEDevice.h"
44 #define dout_context g_ceph_context
45 #define dout_subsys ceph_subsys_bdev
47 #define dout_prefix *_dout << "bdev(" << sn << ") "
49 thread_local SharedDriverQueueData
*queue_t
;
51 static constexpr uint16_t data_buffer_default_num
= 1024;
53 static constexpr uint32_t data_buffer_size
= 8192;
55 static constexpr uint16_t inline_segment_num
= 32;
58 l_bluestore_nvmedevice_first
= 632430,
59 l_bluestore_nvmedevice_write_lat
,
60 l_bluestore_nvmedevice_read_lat
,
61 l_bluestore_nvmedevice_flush_lat
,
62 l_bluestore_nvmedevice_write_queue_lat
,
63 l_bluestore_nvmedevice_read_queue_lat
,
64 l_bluestore_nvmedevice_flush_queue_lat
,
65 l_bluestore_nvmedevice_queue_ops
,
66 l_bluestore_nvmedevice_polling_lat
,
67 l_bluestore_nvmedevice_buffer_alloc_failed
,
68 l_bluestore_nvmedevice_last
71 static void io_complete(void *t
, const struct spdk_nvme_cpl
*completion
);
74 uint16_t cur_seg_idx
= 0;
76 uint32_t cur_seg_left
= 0;
77 void *inline_segs
[inline_segment_num
];
78 void **extra_segs
= nullptr;
83 class SharedDriverData
{
85 spdk_nvme_transport_id trid
;
86 spdk_nvme_ctrlr
*ctrlr
;
88 uint32_t block_size
= 0;
92 std::vector
<NVMEDevice
*> registered_devices
;
93 friend class SharedDriverQueueData
;
94 SharedDriverData(unsigned id_
, const spdk_nvme_transport_id
& trid_
,
95 spdk_nvme_ctrlr
*c
, spdk_nvme_ns
*ns_
)
100 block_size
= spdk_nvme_ns_get_extended_sector_size(ns
);
101 size
= spdk_nvme_ns_get_size(ns
);
104 bool is_equal(const spdk_nvme_transport_id
& trid2
) const {
105 return spdk_nvme_transport_id_compare(&trid
, &trid2
) == 0;
107 ~SharedDriverData() {
110 void register_device(NVMEDevice
*device
) {
111 registered_devices
.push_back(device
);
114 void remove_device(NVMEDevice
*device
) {
115 std::vector
<NVMEDevice
*> new_devices
;
116 for (auto &&it
: registered_devices
) {
118 new_devices
.push_back(it
);
120 registered_devices
.swap(new_devices
);
123 uint32_t get_block_size() {
126 uint64_t get_size() {
131 class SharedDriverQueueData
{
133 SharedDriverData
*driver
;
134 spdk_nvme_ctrlr
*ctrlr
;
138 uint32_t max_queue_depth
;
139 struct spdk_nvme_qpair
*qpair
;
140 bool reap_io
= false;
141 int alloc_buf_from_pool(Task
*t
, bool write
);
144 uint32_t current_queue_depth
= 0;
145 std::atomic_ulong completed_op_seq
, queue_op_seq
;
146 std::vector
<void*> data_buf_mempool
;
147 PerfCounters
*logger
= nullptr;
148 void _aio_handle(Task
*t
, IOContext
*ioc
);
150 SharedDriverQueueData(NVMEDevice
*bdev
, SharedDriverData
*driver
)
153 ctrlr
= driver
->ctrlr
;
155 block_size
= driver
->block_size
;
157 struct spdk_nvme_io_qpair_opts opts
= {};
158 spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr
, &opts
, sizeof(opts
));
159 opts
.qprio
= SPDK_NVME_QPRIO_URGENT
;
160 // usable queue depth should minus 1 to aovid overflow.
161 max_queue_depth
= opts
.io_queue_size
- 1;
162 qpair
= spdk_nvme_ctrlr_alloc_io_qpair(ctrlr
, &opts
, sizeof(opts
));
163 ceph_assert(qpair
!= NULL
);
165 // allocate spdk dma memory
166 for (uint16_t i
= 0; i
< data_buffer_default_num
; i
++) {
167 void *b
= spdk_dma_zmalloc(data_buffer_size
, CEPH_PAGE_SIZE
, NULL
);
169 derr
<< __func__
<< " failed to create memory pool for nvme data buffer" << dendl
;
172 data_buf_mempool
.push_back(b
);
175 PerfCountersBuilder
b(g_ceph_context
, string("NVMEDevice-AIOThread-"+stringify(this)),
176 l_bluestore_nvmedevice_first
, l_bluestore_nvmedevice_last
);
177 b
.add_time_avg(l_bluestore_nvmedevice_write_lat
, "write_lat", "Average write completing latency");
178 b
.add_time_avg(l_bluestore_nvmedevice_read_lat
, "read_lat", "Average read completing latency");
179 b
.add_time_avg(l_bluestore_nvmedevice_flush_lat
, "flush_lat", "Average flush completing latency");
180 b
.add_u64(l_bluestore_nvmedevice_queue_ops
, "queue_ops", "Operations in nvme queue");
181 b
.add_time_avg(l_bluestore_nvmedevice_polling_lat
, "polling_lat", "Average polling latency");
182 b
.add_time_avg(l_bluestore_nvmedevice_write_queue_lat
, "write_queue_lat", "Average queue write request latency");
183 b
.add_time_avg(l_bluestore_nvmedevice_read_queue_lat
, "read_queue_lat", "Average queue read request latency");
184 b
.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat
, "flush_queue_lat", "Average queue flush request latency");
185 b
.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed
, "buffer_alloc_failed", "Alloc data buffer failed count");
186 logger
= b
.create_perf_counters();
187 g_ceph_context
->get_perfcounters_collection()->add(logger
);
188 bdev
->queue_number
++;
189 if (bdev
->queue_number
.load() == 1)
193 ~SharedDriverQueueData() {
194 g_ceph_context
->get_perfcounters_collection()->remove(logger
);
196 spdk_nvme_ctrlr_free_io_qpair(qpair
);
197 bdev
->queue_number
--;
200 // free all spdk dma memory;
201 if (!data_buf_mempool
.empty()) {
202 for (uint16_t i
= 0; i
< data_buffer_default_num
; i
++) {
203 void *b
= data_buf_mempool
[i
];
207 data_buf_mempool
.clear();
216 IOContext
*ctx
= nullptr;
221 std::function
<void()> fill_cb
;
222 Task
*next
= nullptr;
224 Task
*primary
= nullptr;
225 ceph::coarse_real_clock::time_point start
;
226 IORequest io_request
= {};
227 ceph::mutex lock
= ceph::make_mutex("Task::lock");
228 ceph::condition_variable cond
;
229 SharedDriverQueueData
*queue
= nullptr;
230 // reference count by subtasks.
232 Task(NVMEDevice
*dev
, IOCommand c
, uint64_t off
, uint64_t l
, int64_t rc
= 0,
234 : device(dev
), command(c
), offset(off
), len(l
),
235 return_code(rc
), primary(p
),
236 start(ceph::coarse_real_clock::now()) {
239 return_code
= primary
->return_code
;
245 ceph_assert(!io_request
.nseg
);
247 void release_segs(SharedDriverQueueData
*queue_data
) {
248 if (io_request
.extra_segs
) {
249 for (uint16_t i
= 0; i
< io_request
.nseg
; i
++)
250 queue_data
->data_buf_mempool
.push_back(io_request
.extra_segs
[i
]);
251 delete io_request
.extra_segs
;
252 } else if (io_request
.nseg
) {
253 for (uint16_t i
= 0; i
< io_request
.nseg
; i
++)
254 queue_data
->data_buf_mempool
.push_back(io_request
.inline_segs
[i
]);
256 ctx
->total_nseg
-= io_request
.nseg
;
260 void copy_to_buf(char *buf
, uint64_t off
, uint64_t len
) {
263 void **segs
= io_request
.extra_segs
? io_request
.extra_segs
: io_request
.inline_segs
;
266 char *src
= static_cast<char*>(segs
[i
++]);
267 uint64_t need_copy
= std::min(left
, data_buffer_size
-off
);
268 memcpy(buf
+copied
, src
+off
, need_copy
);
276 static void data_buf_reset_sgl(void *cb_arg
, uint32_t sgl_offset
)
278 Task
*t
= static_cast<Task
*>(cb_arg
);
279 uint32_t i
= sgl_offset
/ data_buffer_size
;
280 uint32_t offset
= i
* data_buffer_size
;
281 ceph_assert(i
<= t
->io_request
.nseg
);
283 for (; i
< t
->io_request
.nseg
; i
++) {
284 offset
+= data_buffer_size
;
285 if (offset
> sgl_offset
) {
292 t
->io_request
.cur_seg_idx
= i
;
293 t
->io_request
.cur_seg_left
= offset
- sgl_offset
;
297 static int data_buf_next_sge(void *cb_arg
, void **address
, uint32_t *length
)
301 Task
*t
= static_cast<Task
*>(cb_arg
);
302 if (t
->io_request
.cur_seg_idx
>= t
->io_request
.nseg
) {
308 addr
= t
->io_request
.extra_segs
? t
->io_request
.extra_segs
[t
->io_request
.cur_seg_idx
] : t
->io_request
.inline_segs
[t
->io_request
.cur_seg_idx
];
310 size
= data_buffer_size
;
311 if (t
->io_request
.cur_seg_idx
== t
->io_request
.nseg
- 1) {
312 uint64_t tail
= t
->len
% data_buffer_size
;
314 size
= (uint32_t) tail
;
318 if (t
->io_request
.cur_seg_left
) {
319 *address
= (void *)((uint64_t)addr
+ size
- t
->io_request
.cur_seg_left
);
320 *length
= t
->io_request
.cur_seg_left
;
321 t
->io_request
.cur_seg_left
= 0;
327 t
->io_request
.cur_seg_idx
++;
331 int SharedDriverQueueData::alloc_buf_from_pool(Task
*t
, bool write
)
333 uint64_t count
= t
->len
/ data_buffer_size
;
334 if (t
->len
% data_buffer_size
)
337 if (count
> data_buf_mempool
.size())
339 if (count
<= inline_segment_num
) {
340 segs
= t
->io_request
.inline_segs
;
342 t
->io_request
.extra_segs
= new void*[count
];
343 segs
= t
->io_request
.extra_segs
;
345 for (uint16_t i
= 0; i
< count
; i
++) {
346 segs
[i
] = data_buf_mempool
.back();
347 data_buf_mempool
.pop_back();
349 t
->io_request
.nseg
= count
;
350 t
->ctx
->total_nseg
+= count
;
352 auto blp
= t
->bl
.begin();
355 for (; i
< count
- 1; ++i
) {
356 blp
.copy(data_buffer_size
, static_cast<char*>(segs
[i
]));
357 len
+= data_buffer_size
;
359 blp
.copy(t
->bl
.length() - len
, static_cast<char*>(segs
[i
]));
365 void SharedDriverQueueData::_aio_handle(Task
*t
, IOContext
*ioc
)
367 dout(20) << __func__
<< " start" << dendl
;
370 uint64_t lba_off
, lba_count
;
371 uint32_t max_io_completion
= (uint32_t)g_conf().get_val
<uint64_t>("bluestore_spdk_max_io_completion");
372 uint64_t io_sleep_in_us
= g_conf().get_val
<uint64_t>("bluestore_spdk_io_sleep");
374 ceph::coarse_real_clock::time_point cur
, start
375 = ceph::coarse_real_clock::now();
376 while (ioc
->num_running
) {
378 dout(40) << __func__
<< " polling" << dendl
;
379 if (current_queue_depth
) {
380 r
= spdk_nvme_qpair_process_completions(qpair
, max_io_completion
);
384 usleep(io_sleep_in_us
);
388 for (; t
; t
= t
->next
) {
389 if (current_queue_depth
== max_queue_depth
) {
395 lba_off
= t
->offset
/ block_size
;
396 lba_count
= t
->len
/ block_size
;
397 switch (t
->command
) {
398 case IOCommand::WRITE_COMMAND
:
400 dout(20) << __func__
<< " write command issued " << lba_off
<< "~" << lba_count
<< dendl
;
401 r
= alloc_buf_from_pool(t
, true);
403 logger
->inc(l_bluestore_nvmedevice_buffer_alloc_failed
);
407 r
= spdk_nvme_ns_cmd_writev(
408 ns
, qpair
, lba_off
, lba_count
, io_complete
, t
, 0,
409 data_buf_reset_sgl
, data_buf_next_sge
);
411 derr
<< __func__
<< " failed to do write command: " << cpp_strerror(r
) << dendl
;
412 t
->ctx
->nvme_task_first
= t
->ctx
->nvme_task_last
= nullptr;
413 t
->release_segs(this);
417 cur
= ceph::coarse_real_clock::now();
418 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(cur
- t
->start
);
419 logger
->tinc(l_bluestore_nvmedevice_write_queue_lat
, dur
);
422 case IOCommand::READ_COMMAND
:
424 dout(20) << __func__
<< " read command issued " << lba_off
<< "~" << lba_count
<< dendl
;
425 r
= alloc_buf_from_pool(t
, false);
427 logger
->inc(l_bluestore_nvmedevice_buffer_alloc_failed
);
431 r
= spdk_nvme_ns_cmd_readv(
432 ns
, qpair
, lba_off
, lba_count
, io_complete
, t
, 0,
433 data_buf_reset_sgl
, data_buf_next_sge
);
435 derr
<< __func__
<< " failed to read: " << cpp_strerror(r
) << dendl
;
436 t
->release_segs(this);
440 cur
= ceph::coarse_real_clock::now();
441 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(cur
- t
->start
);
442 logger
->tinc(l_bluestore_nvmedevice_read_queue_lat
, dur
);
446 case IOCommand::FLUSH_COMMAND
:
448 dout(20) << __func__
<< " flush command issueed " << dendl
;
449 r
= spdk_nvme_ns_cmd_flush(ns
, qpair
, io_complete
, t
);
451 derr
<< __func__
<< " failed to flush: " << cpp_strerror(r
) << dendl
;
452 t
->release_segs(this);
456 cur
= ceph::coarse_real_clock::now();
457 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(cur
- t
->start
);
458 logger
->tinc(l_bluestore_nvmedevice_flush_queue_lat
, dur
);
463 current_queue_depth
++;
465 cur
= ceph::coarse_real_clock::now();
466 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(cur
- start
);
467 logger
->tinc(l_bluestore_nvmedevice_polling_lat
, dur
);
468 start
= ceph::coarse_real_clock::now();
473 dout(20) << __func__
<< " end" << dendl
;
476 #define dout_subsys ceph_subsys_bdev
478 #define dout_prefix *_dout << "bdev "
482 struct ProbeContext
{
483 spdk_nvme_transport_id trid
;
484 NVMEManager
*manager
;
485 SharedDriverData
*driver
;
490 ceph::mutex lock
= ceph::make_mutex("NVMEManager::lock");
491 bool stopping
= false;
492 std::vector
<SharedDriverData
*> shared_driver_datas
;
493 std::thread dpdk_thread
;
494 ceph::mutex probe_queue_lock
= ceph::make_mutex("NVMEManager::probe_queue_lock");
495 ceph::condition_variable probe_queue_cond
;
496 std::list
<ProbeContext
*> probe_queue
;
501 if (!dpdk_thread
.joinable())
504 std::lock_guard
guard(probe_queue_lock
);
506 probe_queue_cond
.notify_all();
511 int try_get(const spdk_nvme_transport_id
& trid
, SharedDriverData
**driver
);
512 void register_ctrlr(const spdk_nvme_transport_id
& trid
, spdk_nvme_ctrlr
*c
, SharedDriverData
**driver
) {
513 ceph_assert(ceph_mutex_is_locked(lock
));
515 int num_ns
= spdk_nvme_ctrlr_get_num_ns(c
);
516 ceph_assert(num_ns
>= 1);
518 dout(0) << __func__
<< " namespace count larger than 1, currently only use the first namespace" << dendl
;
520 ns
= spdk_nvme_ctrlr_get_ns(c
, 1);
522 derr
<< __func__
<< " failed to get namespace at 1" << dendl
;
525 dout(1) << __func__
<< " successfully attach nvme device at" << trid
.traddr
<< dendl
;
527 // only support one device per osd now!
528 ceph_assert(shared_driver_datas
.empty());
529 // index 0 is occurred by master thread
530 shared_driver_datas
.push_back(new SharedDriverData(shared_driver_datas
.size()+1, trid
, c
, ns
));
531 *driver
= shared_driver_datas
.back();
535 static NVMEManager manager
;
537 static bool probe_cb(void *cb_ctx
, const struct spdk_nvme_transport_id
*trid
, struct spdk_nvme_ctrlr_opts
*opts
)
539 NVMEManager::ProbeContext
*ctx
= static_cast<NVMEManager::ProbeContext
*>(cb_ctx
);
541 if (trid
->trtype
!= SPDK_NVME_TRANSPORT_PCIE
) {
542 dout(0) << __func__
<< " only probe local nvme device" << dendl
;
546 dout(0) << __func__
<< " found device at: "
547 << "trtype=" << spdk_nvme_transport_id_trtype_str(trid
->trtype
) << ", "
548 << "traddr=" << trid
->traddr
<< dendl
;
549 if (spdk_nvme_transport_id_compare(&ctx
->trid
, trid
)) {
550 dout(0) << __func__
<< " device traddr (" << ctx
->trid
.traddr
<< ") not match " << trid
->traddr
<< dendl
;
557 static void attach_cb(void *cb_ctx
, const struct spdk_nvme_transport_id
*trid
,
558 struct spdk_nvme_ctrlr
*ctrlr
, const struct spdk_nvme_ctrlr_opts
*opts
)
560 auto ctx
= static_cast<NVMEManager::ProbeContext
*>(cb_ctx
);
561 ctx
->manager
->register_ctrlr(ctx
->trid
, ctrlr
, &ctx
->driver
);
564 static int hex2dec(unsigned char c
)
574 static int find_first_bitset(const string
& s
)
577 if (s
.compare(0, 2, "0x") == 0 ||
578 s
.compare(0, 2, "0X") == 0) {
582 for (int pos
= 0; p
!= e
; ++p
, pos
+= 4) {
586 if (int val
= hex2dec(*p
); val
!= 0) {
587 return pos
+ ffs(val
);
593 int NVMEManager::try_get(const spdk_nvme_transport_id
& trid
, SharedDriverData
**driver
)
595 std::lock_guard
l(lock
);
596 for (auto &&it
: shared_driver_datas
) {
597 if (it
->is_equal(trid
)) {
603 auto coremask_arg
= g_conf().get_val
<std::string
>("bluestore_spdk_coremask");
604 int m_core_arg
= find_first_bitset(coremask_arg
);
605 // at least one core is needed for using spdk
606 if (m_core_arg
<= 0) {
607 derr
<< __func__
<< " invalid bluestore_spdk_coremask, "
608 << "at least one core is needed" << dendl
;
613 uint32_t mem_size_arg
= (uint32_t)g_conf().get_val
<Option::size_t>("bluestore_spdk_mem");
615 if (!dpdk_thread
.joinable()) {
616 dpdk_thread
= std::thread(
617 [this, coremask_arg
, m_core_arg
, mem_size_arg
]() {
618 static struct spdk_env_opts opts
;
621 spdk_env_opts_init(&opts
);
622 opts
.name
= "nvme-device-manager";
623 opts
.core_mask
= coremask_arg
.c_str();
624 opts
.master_core
= m_core_arg
;
625 opts
.mem_size
= mem_size_arg
;
626 spdk_env_init(&opts
);
627 spdk_unaffinitize_thread();
629 spdk_nvme_retry_count
= g_ceph_context
->_conf
->bdev_nvme_retry_count
;
630 if (spdk_nvme_retry_count
< 0)
631 spdk_nvme_retry_count
= SPDK_NVME_DEFAULT_RETRY_COUNT
;
633 std::unique_lock
l(probe_queue_lock
);
635 if (!probe_queue
.empty()) {
636 ProbeContext
* ctxt
= probe_queue
.front();
637 probe_queue
.pop_front();
638 r
= spdk_nvme_probe(NULL
, ctxt
, probe_cb
, attach_cb
, NULL
);
640 ceph_assert(!ctxt
->driver
);
641 derr
<< __func__
<< " device probe nvme failed" << dendl
;
644 probe_queue_cond
.notify_all();
646 probe_queue_cond
.wait(l
);
649 for (auto p
: probe_queue
)
651 probe_queue_cond
.notify_all();
656 ProbeContext ctx
{trid
, this, nullptr, false};
658 std::unique_lock
l(probe_queue_lock
);
659 probe_queue
.push_back(&ctx
);
661 probe_queue_cond
.wait(l
);
665 *driver
= ctx
.driver
;
670 void io_complete(void *t
, const struct spdk_nvme_cpl
*completion
)
672 Task
*task
= static_cast<Task
*>(t
);
673 IOContext
*ctx
= task
->ctx
;
674 SharedDriverQueueData
*queue
= task
->queue
;
676 ceph_assert(queue
!= NULL
);
677 ceph_assert(ctx
!= NULL
);
678 --queue
->current_queue_depth
;
679 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(
680 ceph::coarse_real_clock::now() - task
->start
);
681 if (task
->command
== IOCommand::WRITE_COMMAND
) {
682 queue
->logger
->tinc(l_bluestore_nvmedevice_write_lat
, dur
);
683 ceph_assert(!spdk_nvme_cpl_is_error(completion
));
684 dout(20) << __func__
<< " write/zero op successfully, left "
685 << queue
->queue_op_seq
- queue
->completed_op_seq
<< dendl
;
686 // check waiting count before doing callback (which may
687 // destroy this ioc).
689 if (!--ctx
->num_running
) {
690 task
->device
->aio_callback(task
->device
->aio_callback_priv
, ctx
->priv
);
695 task
->release_segs(queue
);
697 } else if (task
->command
== IOCommand::READ_COMMAND
) {
698 queue
->logger
->tinc(l_bluestore_nvmedevice_read_lat
, dur
);
699 ceph_assert(!spdk_nvme_cpl_is_error(completion
));
700 dout(20) << __func__
<< " read op successfully" << dendl
;
702 task
->release_segs(queue
);
703 // read submitted by AIO
704 if (!task
->return_code
) {
706 if (!--ctx
->num_running
) {
707 task
->device
->aio_callback(task
->device
->aio_callback_priv
, ctx
->priv
);
714 if (Task
* primary
= task
->primary
; primary
!= nullptr) {
717 primary
->return_code
= 0;
719 task
->return_code
= 0;
724 ceph_assert(task
->command
== IOCommand::FLUSH_COMMAND
);
725 ceph_assert(!spdk_nvme_cpl_is_error(completion
));
726 queue
->logger
->tinc(l_bluestore_nvmedevice_flush_lat
, dur
);
727 dout(20) << __func__
<< " flush op successfully" << dendl
;
728 task
->return_code
= 0;
734 #define dout_prefix *_dout << "bdev(" << name << ") "
736 NVMEDevice::NVMEDevice(CephContext
* cct
, aio_callback_t cb
, void *cbpriv
)
737 : BlockDevice(cct
, cb
, cbpriv
),
742 int NVMEDevice::open(const string
& p
)
744 dout(1) << __func__
<< " path " << p
<< dendl
;
746 std::ifstream
ifs(p
);
748 derr
<< __func__
<< " unable to open " << p
<< dendl
;
752 std::getline(ifs
, val
);
753 spdk_nvme_transport_id trid
;
754 if (int r
= spdk_nvme_transport_id_parse(&trid
, val
.c_str()); r
) {
755 derr
<< __func__
<< " unable to read " << p
<< ": " << cpp_strerror(r
)
759 if (int r
= manager
.try_get(trid
, &driver
); r
< 0) {
760 derr
<< __func__
<< " failed to get nvme device with transport address " << trid
.traddr
<< dendl
;
764 driver
->register_device(this);
765 block_size
= driver
->get_block_size();
766 size
= driver
->get_size();
769 //nvme is non-rotational device.
772 // round size down to an even block
773 size
&= ~(block_size
- 1);
775 dout(1) << __func__
<< " size " << size
<< " (" << byte_u_t(size
) << ")"
776 << " block_size " << block_size
<< " (" << byte_u_t(block_size
)
783 void NVMEDevice::close()
785 dout(1) << __func__
<< dendl
;
790 driver
->remove_device(this);
792 dout(1) << __func__
<< " end" << dendl
;
795 int NVMEDevice::collect_metadata(const string
& prefix
, map
<string
,string
> *pm
) const
797 (*pm
)[prefix
+ "rotational"] = "0";
798 (*pm
)[prefix
+ "size"] = stringify(get_size());
799 (*pm
)[prefix
+ "block_size"] = stringify(get_block_size());
800 (*pm
)[prefix
+ "driver"] = "NVMEDevice";
801 (*pm
)[prefix
+ "type"] = "nvme";
802 (*pm
)[prefix
+ "access_mode"] = "spdk";
803 (*pm
)[prefix
+ "nvme_serial_number"] = name
;
808 int NVMEDevice::flush()
813 void NVMEDevice::aio_submit(IOContext
*ioc
)
815 dout(20) << __func__
<< " ioc " << ioc
<< " pending "
816 << ioc
->num_pending
.load() << " running "
817 << ioc
->num_running
.load() << dendl
;
818 int pending
= ioc
->num_pending
.load();
819 Task
*t
= static_cast<Task
*>(ioc
->nvme_task_first
);
821 ioc
->num_running
+= pending
;
822 ioc
->num_pending
-= pending
;
823 ceph_assert(ioc
->num_pending
.load() == 0); // we should be only thread doing this
824 // Only need to push the first entry
825 ioc
->nvme_task_first
= ioc
->nvme_task_last
= nullptr;
827 queue_t
= new SharedDriverQueueData(this, driver
);
828 queue_t
->_aio_handle(t
, ioc
);
832 static void ioc_append_task(IOContext
*ioc
, Task
*t
)
836 first
= static_cast<Task
*>(ioc
->nvme_task_first
);
837 last
= static_cast<Task
*>(ioc
->nvme_task_last
);
841 ioc
->nvme_task_first
= t
;
842 ioc
->nvme_task_last
= t
;
846 static void write_split(
852 uint64_t remain_len
= bl
.length(), begin
= 0, write_size
;
854 // This value may need to be got from configuration later.
855 uint64_t split_size
= 131072; // 128KB.
857 while (remain_len
> 0) {
858 write_size
= std::min(remain_len
, split_size
);
859 t
= new Task(dev
, IOCommand::WRITE_COMMAND
, off
+ begin
, write_size
);
860 // TODO: if upper layer alloc memory with known physical address,
861 // we can reduce this copy
862 bl
.splice(0, write_size
, &t
->bl
);
863 remain_len
-= write_size
;
865 ioc_append_task(ioc
, t
);
870 static void make_read_tasks(
872 uint64_t aligned_off
,
873 IOContext
*ioc
, char *buf
, uint64_t aligned_len
, Task
*primary
,
874 uint64_t orig_off
, uint64_t orig_len
)
876 // This value may need to be got from configuration later.
877 uint64_t split_size
= 131072; // 128KB.
878 uint64_t tmp_off
= orig_off
- aligned_off
, remain_orig_len
= orig_len
;
879 auto begin
= aligned_off
;
880 const auto aligned_end
= begin
+ aligned_len
;
882 for (; begin
< aligned_end
; begin
+= split_size
) {
883 auto read_size
= std::min(aligned_end
- begin
, split_size
);
884 auto tmp_len
= std::min(remain_orig_len
, read_size
- tmp_off
);
887 if (primary
&& (aligned_len
<= split_size
)) {
890 t
= new Task(dev
, IOCommand::READ_COMMAND
, begin
, read_size
, 0, primary
);
895 // TODO: if upper layer alloc memory with known physical address,
896 // we can reduce this copy
897 t
->fill_cb
= [buf
, t
, tmp_off
, tmp_len
] {
898 t
->copy_to_buf(buf
, tmp_off
, tmp_len
);
901 ioc_append_task(ioc
, t
);
902 remain_orig_len
-= tmp_len
;
908 int NVMEDevice::aio_write(
915 uint64_t len
= bl
.length();
916 dout(20) << __func__
<< " " << off
<< "~" << len
<< " ioc " << ioc
917 << " buffered " << buffered
<< dendl
;
918 ceph_assert(is_valid_io(off
, len
));
920 write_split(this, off
, bl
, ioc
);
921 dout(5) << __func__
<< " " << off
<< "~" << len
<< dendl
;
926 int NVMEDevice::write(uint64_t off
, bufferlist
&bl
, bool buffered
, int write_hint
)
928 uint64_t len
= bl
.length();
929 dout(20) << __func__
<< " " << off
<< "~" << len
<< " buffered "
930 << buffered
<< dendl
;
931 ceph_assert(off
% block_size
== 0);
932 ceph_assert(len
% block_size
== 0);
933 ceph_assert(len
> 0);
934 ceph_assert(off
< size
);
935 ceph_assert(off
+ len
<= size
);
937 IOContext
ioc(cct
, NULL
);
938 write_split(this, off
, bl
, &ioc
);
939 dout(5) << __func__
<< " " << off
<< "~" << len
<< dendl
;
945 int NVMEDevice::read(uint64_t off
, uint64_t len
, bufferlist
*pbl
,
949 dout(5) << __func__
<< " " << off
<< "~" << len
<< " ioc " << ioc
<< dendl
;
950 ceph_assert(is_valid_io(off
, len
));
952 Task
t(this, IOCommand::READ_COMMAND
, off
, len
, 1);
953 bufferptr p
= buffer::create_small_page_aligned(len
);
954 char *buf
= p
.c_str();
956 ceph_assert(ioc
->nvme_task_first
== nullptr);
957 ceph_assert(ioc
->nvme_task_last
== nullptr);
958 make_read_tasks(this, off
, ioc
, buf
, len
, &t
, off
, len
);
959 dout(5) << __func__
<< " " << off
<< "~" << len
<< dendl
;
962 pbl
->push_back(std::move(p
));
963 return t
.return_code
;
966 int NVMEDevice::aio_read(
972 dout(20) << __func__
<< " " << off
<< "~" << len
<< " ioc " << ioc
<< dendl
;
973 ceph_assert(is_valid_io(off
, len
));
974 bufferptr p
= buffer::create_small_page_aligned(len
);
976 char* buf
= p
.c_str();
978 make_read_tasks(this, off
, ioc
, buf
, len
, NULL
, off
, len
);
979 dout(5) << __func__
<< " " << off
<< "~" << len
<< dendl
;
983 int NVMEDevice::read_random(uint64_t off
, uint64_t len
, char *buf
, bool buffered
)
985 ceph_assert(len
> 0);
986 ceph_assert(off
< size
);
987 ceph_assert(off
+ len
<= size
);
989 uint64_t aligned_off
= p2align(off
, block_size
);
990 uint64_t aligned_len
= p2roundup(off
+len
, block_size
) - aligned_off
;
991 dout(5) << __func__
<< " " << off
<< "~" << len
992 << " aligned " << aligned_off
<< "~" << aligned_len
<< dendl
;
993 IOContext
ioc(g_ceph_context
, nullptr);
994 Task
t(this, IOCommand::READ_COMMAND
, aligned_off
, aligned_len
, 1);
996 make_read_tasks(this, aligned_off
, &ioc
, buf
, aligned_len
, &t
, off
, len
);
999 return t
.return_code
;
1002 int NVMEDevice::invalidate_cache(uint64_t off
, uint64_t len
)
1004 dout(5) << __func__
<< " " << off
<< "~" << len
<< dendl
;