1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 // vim: ts=8 sw=2 smarttab
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2015 XSky <haomai@xsky.com>
9 * Author: Haomai Wang <haomaiwang@gmail.com>
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
21 #include <sys/types.h>
32 #include <spdk/nvme.h>
34 #include "include/stringify.h"
35 #include "include/types.h"
36 #include "include/compat.h"
37 #include "common/align.h"
38 #include "common/errno.h"
39 #include "common/debug.h"
40 #include "common/perf_counters.h"
42 #include "NVMEDevice.h"
44 #define dout_context g_ceph_context
45 #define dout_subsys ceph_subsys_bdev
47 #define dout_prefix *_dout << "bdev(" << sn << ") "
49 thread_local SharedDriverQueueData
*queue_t
;
51 static constexpr uint16_t data_buffer_default_num
= 1024;
53 static constexpr uint32_t data_buffer_size
= 8192;
55 static constexpr uint16_t inline_segment_num
= 32;
58 l_bluestore_nvmedevice_first
= 632430,
59 l_bluestore_nvmedevice_write_lat
,
60 l_bluestore_nvmedevice_read_lat
,
61 l_bluestore_nvmedevice_flush_lat
,
62 l_bluestore_nvmedevice_write_queue_lat
,
63 l_bluestore_nvmedevice_read_queue_lat
,
64 l_bluestore_nvmedevice_flush_queue_lat
,
65 l_bluestore_nvmedevice_queue_ops
,
66 l_bluestore_nvmedevice_polling_lat
,
67 l_bluestore_nvmedevice_buffer_alloc_failed
,
68 l_bluestore_nvmedevice_last
71 static void io_complete(void *t
, const struct spdk_nvme_cpl
*completion
);
74 uint16_t cur_seg_idx
= 0;
76 uint32_t cur_seg_left
= 0;
77 void *inline_segs
[inline_segment_num
];
78 void **extra_segs
= nullptr;
83 class SharedDriverData
{
85 spdk_nvme_transport_id trid
;
86 spdk_nvme_ctrlr
*ctrlr
;
88 uint32_t block_size
= 0;
92 std::vector
<NVMEDevice
*> registered_devices
;
93 friend class SharedDriverQueueData
;
94 SharedDriverData(unsigned id_
, const spdk_nvme_transport_id
& trid_
,
95 spdk_nvme_ctrlr
*c
, spdk_nvme_ns
*ns_
)
100 block_size
= spdk_nvme_ns_get_extended_sector_size(ns
);
101 size
= spdk_nvme_ns_get_size(ns
);
104 bool is_equal(const spdk_nvme_transport_id
& trid2
) const {
105 return spdk_nvme_transport_id_compare(&trid
, &trid2
) == 0;
107 ~SharedDriverData() {
110 void register_device(NVMEDevice
*device
) {
111 registered_devices
.push_back(device
);
114 void remove_device(NVMEDevice
*device
) {
115 std::vector
<NVMEDevice
*> new_devices
;
116 for (auto &&it
: registered_devices
) {
118 new_devices
.push_back(it
);
120 registered_devices
.swap(new_devices
);
123 uint32_t get_block_size() {
126 uint64_t get_size() {
131 class SharedDriverQueueData
{
133 SharedDriverData
*driver
;
134 spdk_nvme_ctrlr
*ctrlr
;
138 uint32_t max_queue_depth
;
139 struct spdk_nvme_qpair
*qpair
;
140 bool reap_io
= false;
141 int alloc_buf_from_pool(Task
*t
, bool write
);
144 uint32_t current_queue_depth
= 0;
145 std::atomic_ulong completed_op_seq
, queue_op_seq
;
146 std::vector
<void*> data_buf_mempool
;
147 PerfCounters
*logger
= nullptr;
148 void _aio_handle(Task
*t
, IOContext
*ioc
);
150 SharedDriverQueueData(NVMEDevice
*bdev
, SharedDriverData
*driver
)
153 ctrlr
= driver
->ctrlr
;
155 block_size
= driver
->block_size
;
157 struct spdk_nvme_io_qpair_opts opts
= {};
158 spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr
, &opts
, sizeof(opts
));
159 opts
.qprio
= SPDK_NVME_QPRIO_URGENT
;
160 // usable queue depth should minus 1 to aovid overflow.
161 max_queue_depth
= opts
.io_queue_size
- 1;
162 qpair
= spdk_nvme_ctrlr_alloc_io_qpair(ctrlr
, &opts
, sizeof(opts
));
163 ceph_assert(qpair
!= NULL
);
165 // allocate spdk dma memory
166 for (uint16_t i
= 0; i
< data_buffer_default_num
; i
++) {
167 void *b
= spdk_dma_zmalloc(data_buffer_size
, CEPH_PAGE_SIZE
, NULL
);
169 derr
<< __func__
<< " failed to create memory pool for nvme data buffer" << dendl
;
172 data_buf_mempool
.push_back(b
);
175 PerfCountersBuilder
b(g_ceph_context
, string("NVMEDevice-AIOThread-"+stringify(this)),
176 l_bluestore_nvmedevice_first
, l_bluestore_nvmedevice_last
);
177 b
.add_time_avg(l_bluestore_nvmedevice_write_lat
, "write_lat", "Average write completing latency");
178 b
.add_time_avg(l_bluestore_nvmedevice_read_lat
, "read_lat", "Average read completing latency");
179 b
.add_time_avg(l_bluestore_nvmedevice_flush_lat
, "flush_lat", "Average flush completing latency");
180 b
.add_u64(l_bluestore_nvmedevice_queue_ops
, "queue_ops", "Operations in nvme queue");
181 b
.add_time_avg(l_bluestore_nvmedevice_polling_lat
, "polling_lat", "Average polling latency");
182 b
.add_time_avg(l_bluestore_nvmedevice_write_queue_lat
, "write_queue_lat", "Average queue write request latency");
183 b
.add_time_avg(l_bluestore_nvmedevice_read_queue_lat
, "read_queue_lat", "Average queue read request latency");
184 b
.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat
, "flush_queue_lat", "Average queue flush request latency");
185 b
.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed
, "buffer_alloc_failed", "Alloc data buffer failed count");
186 logger
= b
.create_perf_counters();
187 g_ceph_context
->get_perfcounters_collection()->add(logger
);
188 bdev
->queue_number
++;
189 if (bdev
->queue_number
.load() == 1)
193 ~SharedDriverQueueData() {
194 g_ceph_context
->get_perfcounters_collection()->remove(logger
);
196 spdk_nvme_ctrlr_free_io_qpair(qpair
);
197 bdev
->queue_number
--;
200 // free all spdk dma memory;
201 if (!data_buf_mempool
.empty()) {
202 for (uint16_t i
= 0; i
< data_buffer_default_num
; i
++) {
203 void *b
= data_buf_mempool
[i
];
207 data_buf_mempool
.clear();
216 IOContext
*ctx
= nullptr;
221 std::function
<void()> fill_cb
;
222 Task
*next
= nullptr;
224 ceph::coarse_real_clock::time_point start
;
225 IORequest io_request
;
226 ceph::mutex lock
= ceph::make_mutex("Task::lock");
227 ceph::condition_variable cond
;
228 SharedDriverQueueData
*queue
= nullptr;
229 Task(NVMEDevice
*dev
, IOCommand c
, uint64_t off
, uint64_t l
, int64_t rc
= 0)
230 : device(dev
), command(c
), offset(off
), len(l
),
232 start(ceph::coarse_real_clock::now()) {}
234 ceph_assert(!io_request
.nseg
);
236 void release_segs(SharedDriverQueueData
*queue_data
) {
237 if (io_request
.extra_segs
) {
238 for (uint16_t i
= 0; i
< io_request
.nseg
; i
++)
239 queue_data
->data_buf_mempool
.push_back(io_request
.extra_segs
[i
]);
240 delete io_request
.extra_segs
;
241 } else if (io_request
.nseg
) {
242 for (uint16_t i
= 0; i
< io_request
.nseg
; i
++)
243 queue_data
->data_buf_mempool
.push_back(io_request
.inline_segs
[i
]);
245 ctx
->total_nseg
-= io_request
.nseg
;
249 void copy_to_buf(char *buf
, uint64_t off
, uint64_t len
) {
252 void **segs
= io_request
.extra_segs
? io_request
.extra_segs
: io_request
.inline_segs
;
255 char *src
= static_cast<char*>(segs
[i
++]);
256 uint64_t need_copy
= std::min(left
, data_buffer_size
-off
);
257 memcpy(buf
+copied
, src
+off
, need_copy
);
265 static void data_buf_reset_sgl(void *cb_arg
, uint32_t sgl_offset
)
267 Task
*t
= static_cast<Task
*>(cb_arg
);
268 uint32_t i
= sgl_offset
/ data_buffer_size
;
269 uint32_t offset
= i
* data_buffer_size
;
270 ceph_assert(i
<= t
->io_request
.nseg
);
272 for (; i
< t
->io_request
.nseg
; i
++) {
273 offset
+= data_buffer_size
;
274 if (offset
> sgl_offset
) {
281 t
->io_request
.cur_seg_idx
= i
;
282 t
->io_request
.cur_seg_left
= offset
- sgl_offset
;
286 static int data_buf_next_sge(void *cb_arg
, void **address
, uint32_t *length
)
290 Task
*t
= static_cast<Task
*>(cb_arg
);
291 if (t
->io_request
.cur_seg_idx
>= t
->io_request
.nseg
) {
297 addr
= t
->io_request
.extra_segs
? t
->io_request
.extra_segs
[t
->io_request
.cur_seg_idx
] : t
->io_request
.inline_segs
[t
->io_request
.cur_seg_idx
];
299 size
= data_buffer_size
;
300 if (t
->io_request
.cur_seg_idx
== t
->io_request
.nseg
- 1) {
301 uint64_t tail
= t
->len
% data_buffer_size
;
303 size
= (uint32_t) tail
;
307 if (t
->io_request
.cur_seg_left
) {
308 *address
= (void *)((uint64_t)addr
+ size
- t
->io_request
.cur_seg_left
);
309 *length
= t
->io_request
.cur_seg_left
;
310 t
->io_request
.cur_seg_left
= 0;
316 t
->io_request
.cur_seg_idx
++;
320 int SharedDriverQueueData::alloc_buf_from_pool(Task
*t
, bool write
)
322 uint64_t count
= t
->len
/ data_buffer_size
;
323 if (t
->len
% data_buffer_size
)
326 if (count
> data_buf_mempool
.size())
328 if (count
<= inline_segment_num
) {
329 segs
= t
->io_request
.inline_segs
;
331 t
->io_request
.extra_segs
= new void*[count
];
332 segs
= t
->io_request
.extra_segs
;
334 for (uint16_t i
= 0; i
< count
; i
++) {
335 segs
[i
] = data_buf_mempool
.back();
336 data_buf_mempool
.pop_back();
338 t
->io_request
.nseg
= count
;
339 t
->ctx
->total_nseg
+= count
;
341 auto blp
= t
->bl
.begin();
344 for (; i
< count
- 1; ++i
) {
345 blp
.copy(data_buffer_size
, static_cast<char*>(segs
[i
]));
346 len
+= data_buffer_size
;
348 blp
.copy(t
->bl
.length() - len
, static_cast<char*>(segs
[i
]));
354 void SharedDriverQueueData::_aio_handle(Task
*t
, IOContext
*ioc
)
356 dout(20) << __func__
<< " start" << dendl
;
359 uint64_t lba_off
, lba_count
;
360 uint32_t max_io_completion
= (uint32_t)g_conf().get_val
<uint64_t>("bluestore_spdk_max_io_completion");
361 uint64_t io_sleep_in_us
= g_conf().get_val
<uint64_t>("bluestore_spdk_io_sleep");
363 ceph::coarse_real_clock::time_point cur
, start
364 = ceph::coarse_real_clock::now();
365 while (ioc
->num_running
) {
367 dout(40) << __func__
<< " polling" << dendl
;
368 if (current_queue_depth
) {
369 r
= spdk_nvme_qpair_process_completions(qpair
, max_io_completion
);
373 usleep(io_sleep_in_us
);
377 for (; t
; t
= t
->next
) {
378 if (current_queue_depth
== max_queue_depth
) {
384 lba_off
= t
->offset
/ block_size
;
385 lba_count
= t
->len
/ block_size
;
386 switch (t
->command
) {
387 case IOCommand::WRITE_COMMAND
:
389 dout(20) << __func__
<< " write command issued " << lba_off
<< "~" << lba_count
<< dendl
;
390 r
= alloc_buf_from_pool(t
, true);
392 logger
->inc(l_bluestore_nvmedevice_buffer_alloc_failed
);
396 r
= spdk_nvme_ns_cmd_writev(
397 ns
, qpair
, lba_off
, lba_count
, io_complete
, t
, 0,
398 data_buf_reset_sgl
, data_buf_next_sge
);
400 derr
<< __func__
<< " failed to do write command" << dendl
;
401 t
->ctx
->nvme_task_first
= t
->ctx
->nvme_task_last
= nullptr;
402 t
->release_segs(this);
406 cur
= ceph::coarse_real_clock::now();
407 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(cur
- t
->start
);
408 logger
->tinc(l_bluestore_nvmedevice_write_queue_lat
, dur
);
411 case IOCommand::READ_COMMAND
:
413 dout(20) << __func__
<< " read command issued " << lba_off
<< "~" << lba_count
<< dendl
;
414 r
= alloc_buf_from_pool(t
, false);
416 logger
->inc(l_bluestore_nvmedevice_buffer_alloc_failed
);
420 r
= spdk_nvme_ns_cmd_readv(
421 ns
, qpair
, lba_off
, lba_count
, io_complete
, t
, 0,
422 data_buf_reset_sgl
, data_buf_next_sge
);
424 derr
<< __func__
<< " failed to read" << dendl
;
425 t
->release_segs(this);
429 cur
= ceph::coarse_real_clock::now();
430 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(cur
- t
->start
);
431 logger
->tinc(l_bluestore_nvmedevice_read_queue_lat
, dur
);
435 case IOCommand::FLUSH_COMMAND
:
437 dout(20) << __func__
<< " flush command issueed " << dendl
;
438 r
= spdk_nvme_ns_cmd_flush(ns
, qpair
, io_complete
, t
);
440 derr
<< __func__
<< " failed to flush" << dendl
;
441 t
->release_segs(this);
445 cur
= ceph::coarse_real_clock::now();
446 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(cur
- t
->start
);
447 logger
->tinc(l_bluestore_nvmedevice_flush_queue_lat
, dur
);
452 current_queue_depth
++;
454 cur
= ceph::coarse_real_clock::now();
455 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(cur
- start
);
456 logger
->tinc(l_bluestore_nvmedevice_polling_lat
, dur
);
457 start
= ceph::coarse_real_clock::now();
462 dout(20) << __func__
<< " end" << dendl
;
465 #define dout_subsys ceph_subsys_bdev
467 #define dout_prefix *_dout << "bdev "
471 struct ProbeContext
{
472 spdk_nvme_transport_id trid
;
473 NVMEManager
*manager
;
474 SharedDriverData
*driver
;
479 ceph::mutex lock
= ceph::make_mutex("NVMEManager::lock");
480 bool stopping
= false;
481 std::vector
<SharedDriverData
*> shared_driver_datas
;
482 std::thread dpdk_thread
;
483 ceph::mutex probe_queue_lock
= ceph::make_mutex("NVMEManager::probe_queue_lock");
484 ceph::condition_variable probe_queue_cond
;
485 std::list
<ProbeContext
*> probe_queue
;
490 if (!dpdk_thread
.joinable())
493 std::lock_guard
guard(probe_queue_lock
);
495 probe_queue_cond
.notify_all();
500 int try_get(const spdk_nvme_transport_id
& trid
, SharedDriverData
**driver
);
501 void register_ctrlr(const spdk_nvme_transport_id
& trid
, spdk_nvme_ctrlr
*c
, SharedDriverData
**driver
) {
502 ceph_assert(ceph_mutex_is_locked(lock
));
504 int num_ns
= spdk_nvme_ctrlr_get_num_ns(c
);
505 ceph_assert(num_ns
>= 1);
507 dout(0) << __func__
<< " namespace count larger than 1, currently only use the first namespace" << dendl
;
509 ns
= spdk_nvme_ctrlr_get_ns(c
, 1);
511 derr
<< __func__
<< " failed to get namespace at 1" << dendl
;
514 dout(1) << __func__
<< " successfully attach nvme device at" << trid
.traddr
<< dendl
;
516 // only support one device per osd now!
517 ceph_assert(shared_driver_datas
.empty());
518 // index 0 is occurred by master thread
519 shared_driver_datas
.push_back(new SharedDriverData(shared_driver_datas
.size()+1, trid
, c
, ns
));
520 *driver
= shared_driver_datas
.back();
524 static NVMEManager manager
;
526 static bool probe_cb(void *cb_ctx
, const struct spdk_nvme_transport_id
*trid
, struct spdk_nvme_ctrlr_opts
*opts
)
528 NVMEManager::ProbeContext
*ctx
= static_cast<NVMEManager::ProbeContext
*>(cb_ctx
);
530 if (trid
->trtype
!= SPDK_NVME_TRANSPORT_PCIE
) {
531 dout(0) << __func__
<< " only probe local nvme device" << dendl
;
535 dout(0) << __func__
<< " found device at: "
536 << "trtype=" << spdk_nvme_transport_id_trtype_str(trid
->trtype
) << ", "
537 << "traddr=" << trid
->traddr
<< dendl
;
538 if (spdk_nvme_transport_id_compare(&ctx
->trid
, trid
)) {
539 dout(0) << __func__
<< " device traddr (" << ctx
->trid
.traddr
<< ") not match " << trid
->traddr
<< dendl
;
546 static void attach_cb(void *cb_ctx
, const struct spdk_nvme_transport_id
*trid
,
547 struct spdk_nvme_ctrlr
*ctrlr
, const struct spdk_nvme_ctrlr_opts
*opts
)
549 auto ctx
= static_cast<NVMEManager::ProbeContext
*>(cb_ctx
);
550 ctx
->manager
->register_ctrlr(ctx
->trid
, ctrlr
, &ctx
->driver
);
553 int NVMEManager::try_get(const spdk_nvme_transport_id
& trid
, SharedDriverData
**driver
)
555 std::lock_guard
l(lock
);
556 for (auto &&it
: shared_driver_datas
) {
557 if (it
->is_equal(trid
)) {
563 auto coremask_arg
= g_conf().get_val
<std::string
>("bluestore_spdk_coremask");
566 auto core_value
= stoull(coremask_arg
, nullptr, 16);
567 m_core_arg
= ffsll(core_value
);
568 } catch (const std::logic_error
& e
) {
569 derr
<< __func__
<< " invalid bluestore_spdk_coremask: "
570 << coremask_arg
<< dendl
;
573 // at least one core is needed for using spdk
574 if (m_core_arg
== 0) {
575 derr
<< __func__
<< " invalid bluestore_spdk_coremask, "
576 << "at least one core is needed" << dendl
;
581 uint32_t mem_size_arg
= (uint32_t)g_conf().get_val
<Option::size_t>("bluestore_spdk_mem");
583 if (!dpdk_thread
.joinable()) {
584 dpdk_thread
= std::thread(
585 [this, coremask_arg
, m_core_arg
, mem_size_arg
]() {
586 static struct spdk_env_opts opts
;
589 spdk_env_opts_init(&opts
);
590 opts
.name
= "nvme-device-manager";
591 opts
.core_mask
= coremask_arg
.c_str();
592 opts
.master_core
= m_core_arg
;
593 opts
.mem_size
= mem_size_arg
;
594 spdk_env_init(&opts
);
595 spdk_unaffinitize_thread();
597 spdk_nvme_retry_count
= g_ceph_context
->_conf
->bdev_nvme_retry_count
;
598 if (spdk_nvme_retry_count
< 0)
599 spdk_nvme_retry_count
= SPDK_NVME_DEFAULT_RETRY_COUNT
;
601 std::unique_lock
l(probe_queue_lock
);
603 if (!probe_queue
.empty()) {
604 ProbeContext
* ctxt
= probe_queue
.front();
605 probe_queue
.pop_front();
606 r
= spdk_nvme_probe(NULL
, ctxt
, probe_cb
, attach_cb
, NULL
);
608 ceph_assert(!ctxt
->driver
);
609 derr
<< __func__
<< " device probe nvme failed" << dendl
;
612 probe_queue_cond
.notify_all();
614 probe_queue_cond
.wait(l
);
617 for (auto p
: probe_queue
)
619 probe_queue_cond
.notify_all();
624 ProbeContext ctx
{trid
, this, nullptr, false};
626 std::unique_lock
l(probe_queue_lock
);
627 probe_queue
.push_back(&ctx
);
629 probe_queue_cond
.wait(l
);
633 *driver
= ctx
.driver
;
638 void io_complete(void *t
, const struct spdk_nvme_cpl
*completion
)
640 Task
*task
= static_cast<Task
*>(t
);
641 IOContext
*ctx
= task
->ctx
;
642 SharedDriverQueueData
*queue
= task
->queue
;
644 ceph_assert(queue
!= NULL
);
645 ceph_assert(ctx
!= NULL
);
646 --queue
->current_queue_depth
;
647 auto dur
= std::chrono::duration_cast
<std::chrono::nanoseconds
>(
648 ceph::coarse_real_clock::now() - task
->start
);
649 if (task
->command
== IOCommand::WRITE_COMMAND
) {
650 queue
->logger
->tinc(l_bluestore_nvmedevice_write_lat
, dur
);
651 ceph_assert(!spdk_nvme_cpl_is_error(completion
));
652 dout(20) << __func__
<< " write/zero op successfully, left "
653 << queue
->queue_op_seq
- queue
->completed_op_seq
<< dendl
;
654 // check waiting count before doing callback (which may
655 // destroy this ioc).
657 if (!--ctx
->num_running
) {
658 task
->device
->aio_callback(task
->device
->aio_callback_priv
, ctx
->priv
);
663 task
->release_segs(queue
);
665 } else if (task
->command
== IOCommand::READ_COMMAND
) {
666 queue
->logger
->tinc(l_bluestore_nvmedevice_read_lat
, dur
);
667 ceph_assert(!spdk_nvme_cpl_is_error(completion
));
668 dout(20) << __func__
<< " read op successfully" << dendl
;
670 task
->release_segs(queue
);
671 // read submitted by AIO
672 if (!task
->return_code
) {
674 if (!--ctx
->num_running
) {
675 task
->device
->aio_callback(task
->device
->aio_callback_priv
, ctx
->priv
);
682 task
->return_code
= 0;
686 ceph_assert(task
->command
== IOCommand::FLUSH_COMMAND
);
687 ceph_assert(!spdk_nvme_cpl_is_error(completion
));
688 queue
->logger
->tinc(l_bluestore_nvmedevice_flush_lat
, dur
);
689 dout(20) << __func__
<< " flush op successfully" << dendl
;
690 task
->return_code
= 0;
696 #define dout_prefix *_dout << "bdev(" << name << ") "
698 NVMEDevice::NVMEDevice(CephContext
* cct
, aio_callback_t cb
, void *cbpriv
)
699 : BlockDevice(cct
, cb
, cbpriv
),
704 int NVMEDevice::open(const string
& p
)
706 dout(1) << __func__
<< " path " << p
<< dendl
;
708 std::ifstream
ifs(p
);
710 derr
<< __func__
<< " unable to open " << p
<< dendl
;
714 std::getline(ifs
, val
);
715 spdk_nvme_transport_id trid
;
716 if (int r
= spdk_nvme_transport_id_parse(&trid
, val
.c_str()); r
) {
717 derr
<< __func__
<< " unable to read " << p
<< ": " << cpp_strerror(r
)
721 if (int r
= manager
.try_get(trid
, &driver
); r
< 0) {
722 derr
<< __func__
<< " failed to get nvme device with transport address " << trid
.traddr
<< dendl
;
726 driver
->register_device(this);
727 block_size
= driver
->get_block_size();
728 size
= driver
->get_size();
731 //nvme is non-rotational device.
734 // round size down to an even block
735 size
&= ~(block_size
- 1);
737 dout(1) << __func__
<< " size " << size
<< " (" << byte_u_t(size
) << ")"
738 << " block_size " << block_size
<< " (" << byte_u_t(block_size
)
745 void NVMEDevice::close()
747 dout(1) << __func__
<< dendl
;
752 driver
->remove_device(this);
754 dout(1) << __func__
<< " end" << dendl
;
757 int NVMEDevice::collect_metadata(const string
& prefix
, map
<string
,string
> *pm
) const
759 (*pm
)[prefix
+ "rotational"] = "0";
760 (*pm
)[prefix
+ "size"] = stringify(get_size());
761 (*pm
)[prefix
+ "block_size"] = stringify(get_block_size());
762 (*pm
)[prefix
+ "driver"] = "NVMEDevice";
763 (*pm
)[prefix
+ "type"] = "nvme";
764 (*pm
)[prefix
+ "access_mode"] = "spdk";
765 (*pm
)[prefix
+ "nvme_serial_number"] = name
;
770 int NVMEDevice::flush()
775 void NVMEDevice::aio_submit(IOContext
*ioc
)
777 dout(20) << __func__
<< " ioc " << ioc
<< " pending "
778 << ioc
->num_pending
.load() << " running "
779 << ioc
->num_running
.load() << dendl
;
780 int pending
= ioc
->num_pending
.load();
781 Task
*t
= static_cast<Task
*>(ioc
->nvme_task_first
);
783 ioc
->num_running
+= pending
;
784 ioc
->num_pending
-= pending
;
785 ceph_assert(ioc
->num_pending
.load() == 0); // we should be only thread doing this
786 // Only need to push the first entry
787 ioc
->nvme_task_first
= ioc
->nvme_task_last
= nullptr;
789 queue_t
= new SharedDriverQueueData(this, driver
);
790 queue_t
->_aio_handle(t
, ioc
);
794 static void write_split(
800 uint64_t remain_len
= bl
.length(), begin
= 0, write_size
;
801 Task
*t
, *first
, *last
;
802 // This value may need to be got from configuration later.
803 uint64_t split_size
= 131072; // 128KB.
805 while (remain_len
> 0) {
806 write_size
= std::min(remain_len
, split_size
);
807 t
= new Task(dev
, IOCommand::WRITE_COMMAND
, off
+ begin
, write_size
);
808 // TODO: if upper layer alloc memory with known physical address,
809 // we can reduce this copy
810 bl
.splice(0, write_size
, &t
->bl
);
811 remain_len
-= write_size
;
813 first
= static_cast<Task
*>(ioc
->nvme_task_first
);
814 last
= static_cast<Task
*>(ioc
->nvme_task_last
);
818 ioc
->nvme_task_first
= t
;
819 ioc
->nvme_task_last
= t
;
825 int NVMEDevice::aio_write(
832 uint64_t len
= bl
.length();
833 dout(20) << __func__
<< " " << off
<< "~" << len
<< " ioc " << ioc
834 << " buffered " << buffered
<< dendl
;
835 ceph_assert(is_valid_io(off
, len
));
837 write_split(this, off
, bl
, ioc
);
838 dout(5) << __func__
<< " " << off
<< "~" << len
<< dendl
;
843 int NVMEDevice::write(uint64_t off
, bufferlist
&bl
, bool buffered
, int write_hint
)
845 uint64_t len
= bl
.length();
846 dout(20) << __func__
<< " " << off
<< "~" << len
<< " buffered "
847 << buffered
<< dendl
;
848 ceph_assert(off
% block_size
== 0);
849 ceph_assert(len
% block_size
== 0);
850 ceph_assert(len
> 0);
851 ceph_assert(off
< size
);
852 ceph_assert(off
+ len
<= size
);
854 IOContext
ioc(cct
, NULL
);
855 write_split(this, off
, bl
, &ioc
);
856 dout(5) << __func__
<< " " << off
<< "~" << len
<< dendl
;
862 int NVMEDevice::read(uint64_t off
, uint64_t len
, bufferlist
*pbl
,
866 dout(5) << __func__
<< " " << off
<< "~" << len
<< " ioc " << ioc
<< dendl
;
867 ceph_assert(is_valid_io(off
, len
));
869 Task
*t
= new Task(this, IOCommand::READ_COMMAND
, off
, len
, 1);
870 bufferptr p
= buffer::create_small_page_aligned(len
);
873 char *buf
= p
.c_str();
874 t
->fill_cb
= [buf
, t
]() {
875 t
->copy_to_buf(buf
, 0, t
->len
);
879 ioc
->nvme_task_first
= t
;
883 pbl
->push_back(std::move(p
));
889 int NVMEDevice::aio_read(
895 dout(20) << __func__
<< " " << off
<< "~" << len
<< " ioc " << ioc
<< dendl
;
896 ceph_assert(is_valid_io(off
, len
));
898 Task
*t
= new Task(this, IOCommand::READ_COMMAND
, off
, len
);
900 bufferptr p
= buffer::create_small_page_aligned(len
);
903 char* buf
= p
.c_str();
904 t
->fill_cb
= [buf
, t
]() {
905 t
->copy_to_buf(buf
, 0, t
->len
);
908 Task
*first
= static_cast<Task
*>(ioc
->nvme_task_first
);
909 Task
*last
= static_cast<Task
*>(ioc
->nvme_task_last
);
913 ioc
->nvme_task_first
= t
;
914 ioc
->nvme_task_last
= t
;
920 int NVMEDevice::read_random(uint64_t off
, uint64_t len
, char *buf
, bool buffered
)
922 ceph_assert(len
> 0);
923 ceph_assert(off
< size
);
924 ceph_assert(off
+ len
<= size
);
926 uint64_t aligned_off
= align_down(off
, block_size
);
927 uint64_t aligned_len
= align_up(off
+len
, block_size
) - aligned_off
;
928 dout(5) << __func__
<< " " << off
<< "~" << len
929 << " aligned " << aligned_off
<< "~" << aligned_len
<< dendl
;
930 IOContext
ioc(g_ceph_context
, nullptr);
931 Task
*t
= new Task(this, IOCommand::READ_COMMAND
, aligned_off
, aligned_len
, 1);
934 t
->fill_cb
= [buf
, t
, off
, len
]() {
935 t
->copy_to_buf(buf
, off
-t
->offset
, len
);
939 ioc
.nvme_task_first
= t
;
948 int NVMEDevice::invalidate_cache(uint64_t off
, uint64_t len
)
950 dout(5) << __func__
<< " " << off
<< "~" << len
<< dendl
;