]> git.proxmox.com Git - ceph.git/blame - ceph/src/os/bluestore/NVMEDevice.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / os / bluestore / NVMEDevice.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2//
3// vim: ts=8 sw=2 smarttab
4/*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2015 XSky <haomai@xsky.com>
8 *
9 * Author: Haomai Wang <haomaiwang@gmail.com>
10 *
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
15 *
16 */
17
18#include <unistd.h>
19#include <stdlib.h>
11fdf7f2 20#include <strings.h>
7c673cae
FG
21#include <sys/types.h>
22#include <sys/stat.h>
23#include <fcntl.h>
24#include <unistd.h>
25
26#include <chrono>
11fdf7f2 27#include <fstream>
7c673cae
FG
28#include <functional>
29#include <map>
30#include <thread>
7c673cae
FG
31
32#include <spdk/nvme.h>
33
7c673cae
FG
34#include "include/stringify.h"
35#include "include/types.h"
36#include "include/compat.h"
37#include "common/align.h"
38#include "common/errno.h"
39#include "common/debug.h"
40#include "common/perf_counters.h"
7c673cae
FG
41
42#include "NVMEDevice.h"
43
44#define dout_context g_ceph_context
45#define dout_subsys ceph_subsys_bdev
46#undef dout_prefix
47#define dout_prefix *_dout << "bdev(" << sn << ") "
48
11fdf7f2
TL
49thread_local SharedDriverQueueData *queue_t;
50
51static constexpr uint16_t data_buffer_default_num = 1024;
7c673cae
FG
52
53static constexpr uint32_t data_buffer_size = 8192;
54
55static constexpr uint16_t inline_segment_num = 32;
56
7c673cae
FG
57enum {
58 l_bluestore_nvmedevice_first = 632430,
11fdf7f2 59 l_bluestore_nvmedevice_write_lat,
7c673cae
FG
60 l_bluestore_nvmedevice_read_lat,
61 l_bluestore_nvmedevice_flush_lat,
11fdf7f2 62 l_bluestore_nvmedevice_write_queue_lat,
7c673cae
FG
63 l_bluestore_nvmedevice_read_queue_lat,
64 l_bluestore_nvmedevice_flush_queue_lat,
65 l_bluestore_nvmedevice_queue_ops,
66 l_bluestore_nvmedevice_polling_lat,
67 l_bluestore_nvmedevice_buffer_alloc_failed,
68 l_bluestore_nvmedevice_last
69};
70
71static void io_complete(void *t, const struct spdk_nvme_cpl *completion);
72
7c673cae
FG
73struct IORequest {
74 uint16_t cur_seg_idx = 0;
75 uint16_t nseg;
76 uint32_t cur_seg_left = 0;
77 void *inline_segs[inline_segment_num];
78 void **extra_segs = nullptr;
79};
80
11fdf7f2 81struct Task;
7c673cae
FG
82
83class SharedDriverData {
84 unsigned id;
11fdf7f2 85 spdk_nvme_transport_id trid;
7c673cae
FG
86 spdk_nvme_ctrlr *ctrlr;
87 spdk_nvme_ns *ns;
11fdf7f2 88 uint32_t block_size = 0;
7c673cae 89 uint64_t size = 0;
7c673cae
FG
90
91 public:
92 std::vector<NVMEDevice*> registered_devices;
11fdf7f2
TL
93 friend class SharedDriverQueueData;
94 SharedDriverData(unsigned id_, const spdk_nvme_transport_id& trid_,
95 spdk_nvme_ctrlr *c, spdk_nvme_ns *ns_)
96 : id(id_),
97 trid(trid_),
7c673cae 98 ctrlr(c),
11fdf7f2
TL
99 ns(ns_) {
100 block_size = spdk_nvme_ns_get_extended_sector_size(ns);
101 size = spdk_nvme_ns_get_size(ns);
7c673cae
FG
102 }
103
11fdf7f2
TL
104 bool is_equal(const spdk_nvme_transport_id& trid2) const {
105 return spdk_nvme_transport_id_compare(&trid, &trid2) == 0;
7c673cae 106 }
11fdf7f2 107 ~SharedDriverData() {
7c673cae
FG
108 }
109
110 void register_device(NVMEDevice *device) {
7c673cae 111 registered_devices.push_back(device);
7c673cae
FG
112 }
113
114 void remove_device(NVMEDevice *device) {
7c673cae
FG
115 std::vector<NVMEDevice*> new_devices;
116 for (auto &&it : registered_devices) {
117 if (it != device)
118 new_devices.push_back(it);
119 }
120 registered_devices.swap(new_devices);
7c673cae
FG
121 }
122
11fdf7f2 123 uint32_t get_block_size() {
7c673cae
FG
124 return block_size;
125 }
126 uint64_t get_size() {
127 return size;
128 }
129};
130
11fdf7f2
TL
131class SharedDriverQueueData {
132 NVMEDevice *bdev;
133 SharedDriverData *driver;
134 spdk_nvme_ctrlr *ctrlr;
135 spdk_nvme_ns *ns;
136 std::string sn;
137 uint32_t block_size;
138 uint32_t max_queue_depth;
139 struct spdk_nvme_qpair *qpair;
140 bool reap_io = false;
141 int alloc_buf_from_pool(Task *t, bool write);
142
143 public:
144 uint32_t current_queue_depth = 0;
145 std::atomic_ulong completed_op_seq, queue_op_seq;
146 std::vector<void*> data_buf_mempool;
147 PerfCounters *logger = nullptr;
148 void _aio_handle(Task *t, IOContext *ioc);
149
150 SharedDriverQueueData(NVMEDevice *bdev, SharedDriverData *driver)
151 : bdev(bdev),
152 driver(driver) {
153 ctrlr = driver->ctrlr;
154 ns = driver->ns;
155 block_size = driver->block_size;
156
157 struct spdk_nvme_io_qpair_opts opts = {};
158 spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts));
159 opts.qprio = SPDK_NVME_QPRIO_URGENT;
160 // usable queue depth should minus 1 to aovid overflow.
161 max_queue_depth = opts.io_queue_size - 1;
162 qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, &opts, sizeof(opts));
163 ceph_assert(qpair != NULL);
164
165 // allocate spdk dma memory
166 for (uint16_t i = 0; i < data_buffer_default_num; i++) {
167 void *b = spdk_dma_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
168 if (!b) {
169 derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
170 ceph_assert(b);
171 }
172 data_buf_mempool.push_back(b);
173 }
174
175 PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
176 l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
177 b.add_time_avg(l_bluestore_nvmedevice_write_lat, "write_lat", "Average write completing latency");
178 b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
179 b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
180 b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
181 b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
182 b.add_time_avg(l_bluestore_nvmedevice_write_queue_lat, "write_queue_lat", "Average queue write request latency");
183 b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
184 b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
185 b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
186 logger = b.create_perf_counters();
187 g_ceph_context->get_perfcounters_collection()->add(logger);
188 bdev->queue_number++;
189 if (bdev->queue_number.load() == 1)
190 reap_io = true;
191 }
192
193 ~SharedDriverQueueData() {
194 g_ceph_context->get_perfcounters_collection()->remove(logger);
195 if (qpair) {
196 spdk_nvme_ctrlr_free_io_qpair(qpair);
197 bdev->queue_number--;
198 }
199
200 // free all spdk dma memory;
201 if (!data_buf_mempool.empty()) {
202 for (uint16_t i = 0; i < data_buffer_default_num; i++) {
203 void *b = data_buf_mempool[i];
204 ceph_assert(b);
205 spdk_dma_free(b);
206 }
207 data_buf_mempool.clear();
208 }
209
210 delete logger;
211 }
212};
213
7c673cae
FG
214struct Task {
215 NVMEDevice *device;
216 IOContext *ctx = nullptr;
217 IOCommand command;
218 uint64_t offset;
219 uint64_t len;
11fdf7f2 220 bufferlist bl;
7c673cae
FG
221 std::function<void()> fill_cb;
222 Task *next = nullptr;
223 int64_t return_code;
224 ceph::coarse_real_clock::time_point start;
225 IORequest io_request;
11fdf7f2
TL
226 ceph::mutex lock = ceph::make_mutex("Task::lock");
227 ceph::condition_variable cond;
228 SharedDriverQueueData *queue = nullptr;
7c673cae
FG
229 Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0)
230 : device(dev), command(c), offset(off), len(l),
231 return_code(rc),
232 start(ceph::coarse_real_clock::now()) {}
233 ~Task() {
11fdf7f2 234 ceph_assert(!io_request.nseg);
7c673cae
FG
235 }
236 void release_segs(SharedDriverQueueData *queue_data) {
237 if (io_request.extra_segs) {
238 for (uint16_t i = 0; i < io_request.nseg; i++)
239 queue_data->data_buf_mempool.push_back(io_request.extra_segs[i]);
240 delete io_request.extra_segs;
241 } else if (io_request.nseg) {
242 for (uint16_t i = 0; i < io_request.nseg; i++)
243 queue_data->data_buf_mempool.push_back(io_request.inline_segs[i]);
244 }
11fdf7f2 245 ctx->total_nseg -= io_request.nseg;
7c673cae
FG
246 io_request.nseg = 0;
247 }
248
249 void copy_to_buf(char *buf, uint64_t off, uint64_t len) {
250 uint64_t copied = 0;
251 uint64_t left = len;
252 void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs;
253 uint16_t i = 0;
254 while (left > 0) {
255 char *src = static_cast<char*>(segs[i++]);
256 uint64_t need_copy = std::min(left, data_buffer_size-off);
257 memcpy(buf+copied, src+off, need_copy);
258 off = 0;
259 left -= need_copy;
260 copied += need_copy;
261 }
262 }
7c673cae
FG
263};
264
265static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
266{
267 Task *t = static_cast<Task*>(cb_arg);
268 uint32_t i = sgl_offset / data_buffer_size;
269 uint32_t offset = i * data_buffer_size;
11fdf7f2 270 ceph_assert(i <= t->io_request.nseg);
7c673cae
FG
271
272 for (; i < t->io_request.nseg; i++) {
273 offset += data_buffer_size;
274 if (offset > sgl_offset) {
275 if (offset > t->len)
276 offset = t->len;
277 break;
278 }
279 }
280
281 t->io_request.cur_seg_idx = i;
282 t->io_request.cur_seg_left = offset - sgl_offset;
283 return ;
284}
285
286static int data_buf_next_sge(void *cb_arg, void **address, uint32_t *length)
287{
288 uint32_t size;
289 void *addr;
290 Task *t = static_cast<Task*>(cb_arg);
291 if (t->io_request.cur_seg_idx >= t->io_request.nseg) {
292 *length = 0;
293 *address = 0;
294 return 0;
295 }
296
297 addr = t->io_request.extra_segs ? t->io_request.extra_segs[t->io_request.cur_seg_idx] : t->io_request.inline_segs[t->io_request.cur_seg_idx];
298
299 size = data_buffer_size;
300 if (t->io_request.cur_seg_idx == t->io_request.nseg - 1) {
301 uint64_t tail = t->len % data_buffer_size;
302 if (tail) {
303 size = (uint32_t) tail;
304 }
305 }
306
307 if (t->io_request.cur_seg_left) {
308 *address = (void *)((uint64_t)addr + size - t->io_request.cur_seg_left);
309 *length = t->io_request.cur_seg_left;
310 t->io_request.cur_seg_left = 0;
311 } else {
312 *address = addr;
313 *length = size;
314 }
315
316 t->io_request.cur_seg_idx++;
317 return 0;
318}
319
320int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write)
321{
322 uint64_t count = t->len / data_buffer_size;
323 if (t->len % data_buffer_size)
324 ++count;
325 void **segs;
326 if (count > data_buf_mempool.size())
327 return -ENOMEM;
328 if (count <= inline_segment_num) {
329 segs = t->io_request.inline_segs;
330 } else {
331 t->io_request.extra_segs = new void*[count];
332 segs = t->io_request.extra_segs;
333 }
334 for (uint16_t i = 0; i < count; i++) {
335 segs[i] = data_buf_mempool.back();
336 data_buf_mempool.pop_back();
337 }
338 t->io_request.nseg = count;
11fdf7f2 339 t->ctx->total_nseg += count;
7c673cae 340 if (write) {
11fdf7f2 341 auto blp = t->bl.begin();
7c673cae
FG
342 uint32_t len = 0;
343 uint16_t i = 0;
344 for (; i < count - 1; ++i) {
345 blp.copy(data_buffer_size, static_cast<char*>(segs[i]));
346 len += data_buffer_size;
347 }
11fdf7f2 348 blp.copy(t->bl.length() - len, static_cast<char*>(segs[i]));
7c673cae
FG
349 }
350
351 return 0;
352}
353
11fdf7f2 354void SharedDriverQueueData::_aio_handle(Task *t, IOContext *ioc)
7c673cae 355{
11fdf7f2 356 dout(20) << __func__ << " start" << dendl;
7c673cae 357
7c673cae
FG
358 int r = 0;
359 uint64_t lba_off, lba_count;
11fdf7f2
TL
360 uint32_t max_io_completion = (uint32_t)g_conf().get_val<uint64_t>("bluestore_spdk_max_io_completion");
361 uint64_t io_sleep_in_us = g_conf().get_val<uint64_t>("bluestore_spdk_io_sleep");
7c673cae
FG
362
363 ceph::coarse_real_clock::time_point cur, start
364 = ceph::coarse_real_clock::now();
11fdf7f2 365 while (ioc->num_running) {
7c673cae
FG
366 again:
367 dout(40) << __func__ << " polling" << dendl;
11fdf7f2
TL
368 if (current_queue_depth) {
369 r = spdk_nvme_qpair_process_completions(qpair, max_io_completion);
370 if (r < 0) {
371 ceph_abort();
372 } else if (r == 0) {
373 usleep(io_sleep_in_us);
7c673cae
FG
374 }
375 }
376
377 for (; t; t = t->next) {
11fdf7f2
TL
378 if (current_queue_depth == max_queue_depth) {
379 // no slots
380 goto again;
381 }
382
7c673cae 383 t->queue = this;
11fdf7f2
TL
384 lba_off = t->offset / block_size;
385 lba_count = t->len / block_size;
7c673cae
FG
386 switch (t->command) {
387 case IOCommand::WRITE_COMMAND:
388 {
389 dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl;
390 r = alloc_buf_from_pool(t, true);
391 if (r < 0) {
392 logger->inc(l_bluestore_nvmedevice_buffer_alloc_failed);
393 goto again;
394 }
395
396 r = spdk_nvme_ns_cmd_writev(
397 ns, qpair, lba_off, lba_count, io_complete, t, 0,
398 data_buf_reset_sgl, data_buf_next_sge);
399 if (r < 0) {
400 derr << __func__ << " failed to do write command" << dendl;
401 t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
402 t->release_segs(this);
403 delete t;
404 ceph_abort();
405 }
406 cur = ceph::coarse_real_clock::now();
11fdf7f2
TL
407 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - t->start);
408 logger->tinc(l_bluestore_nvmedevice_write_queue_lat, dur);
7c673cae
FG
409 break;
410 }
411 case IOCommand::READ_COMMAND:
412 {
413 dout(20) << __func__ << " read command issued " << lba_off << "~" << lba_count << dendl;
414 r = alloc_buf_from_pool(t, false);
415 if (r < 0) {
416 logger->inc(l_bluestore_nvmedevice_buffer_alloc_failed);
417 goto again;
418 }
419
420 r = spdk_nvme_ns_cmd_readv(
421 ns, qpair, lba_off, lba_count, io_complete, t, 0,
422 data_buf_reset_sgl, data_buf_next_sge);
423 if (r < 0) {
424 derr << __func__ << " failed to read" << dendl;
425 t->release_segs(this);
426 delete t;
427 ceph_abort();
428 } else {
429 cur = ceph::coarse_real_clock::now();
11fdf7f2 430 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - t->start);
7c673cae
FG
431 logger->tinc(l_bluestore_nvmedevice_read_queue_lat, dur);
432 }
433 break;
434 }
435 case IOCommand::FLUSH_COMMAND:
436 {
437 dout(20) << __func__ << " flush command issueed " << dendl;
438 r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t);
439 if (r < 0) {
440 derr << __func__ << " failed to flush" << dendl;
441 t->release_segs(this);
442 delete t;
443 ceph_abort();
444 } else {
445 cur = ceph::coarse_real_clock::now();
11fdf7f2 446 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - t->start);
7c673cae
FG
447 logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, dur);
448 }
449 break;
450 }
451 }
11fdf7f2 452 current_queue_depth++;
7c673cae 453 }
11fdf7f2
TL
454 cur = ceph::coarse_real_clock::now();
455 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
456 logger->tinc(l_bluestore_nvmedevice_polling_lat, dur);
457 start = ceph::coarse_real_clock::now();
7c673cae 458 }
11fdf7f2
TL
459
460 if (reap_io)
461 bdev->reap_ioc();
462 dout(20) << __func__ << " end" << dendl;
7c673cae
FG
463}
464
465#define dout_subsys ceph_subsys_bdev
466#undef dout_prefix
467#define dout_prefix *_dout << "bdev "
468
469class NVMEManager {
470 public:
471 struct ProbeContext {
11fdf7f2 472 spdk_nvme_transport_id trid;
7c673cae
FG
473 NVMEManager *manager;
474 SharedDriverData *driver;
475 bool done;
476 };
477
478 private:
11fdf7f2
TL
479 ceph::mutex lock = ceph::make_mutex("NVMEManager::lock");
480 bool stopping = false;
7c673cae
FG
481 std::vector<SharedDriverData*> shared_driver_datas;
482 std::thread dpdk_thread;
11fdf7f2
TL
483 ceph::mutex probe_queue_lock = ceph::make_mutex("NVMEManager::probe_queue_lock");
484 ceph::condition_variable probe_queue_cond;
7c673cae
FG
485 std::list<ProbeContext*> probe_queue;
486
487 public:
11fdf7f2
TL
488 NVMEManager() {}
489 ~NVMEManager() {
490 if (!dpdk_thread.joinable())
491 return;
492 {
493 std::lock_guard guard(probe_queue_lock);
494 stopping = true;
495 probe_queue_cond.notify_all();
496 }
497 dpdk_thread.join();
498 }
499
500 int try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver);
501 void register_ctrlr(const spdk_nvme_transport_id& trid, spdk_nvme_ctrlr *c, SharedDriverData **driver) {
502 ceph_assert(ceph_mutex_is_locked(lock));
7c673cae
FG
503 spdk_nvme_ns *ns;
504 int num_ns = spdk_nvme_ctrlr_get_num_ns(c);
11fdf7f2 505 ceph_assert(num_ns >= 1);
7c673cae
FG
506 if (num_ns > 1) {
507 dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl;
508 }
509 ns = spdk_nvme_ctrlr_get_ns(c, 1);
510 if (!ns) {
511 derr << __func__ << " failed to get namespace at 1" << dendl;
512 ceph_abort();
513 }
11fdf7f2 514 dout(1) << __func__ << " successfully attach nvme device at" << trid.traddr << dendl;
7c673cae
FG
515
516 // only support one device per osd now!
11fdf7f2
TL
517 ceph_assert(shared_driver_datas.empty());
518 // index 0 is occurred by master thread
519 shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, trid, c, ns));
7c673cae
FG
520 *driver = shared_driver_datas.back();
521 }
522};
523
524static NVMEManager manager;
525
526static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts)
527{
528 NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
7c673cae
FG
529
530 if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) {
531 dout(0) << __func__ << " only probe local nvme device" << dendl;
532 return false;
533 }
534
11fdf7f2
TL
535 dout(0) << __func__ << " found device at: "
536 << "trtype=" << spdk_nvme_transport_id_trtype_str(trid->trtype) << ", "
537 << "traddr=" << trid->traddr << dendl;
538 if (spdk_nvme_transport_id_compare(&ctx->trid, trid)) {
539 dout(0) << __func__ << " device traddr (" << ctx->trid.traddr << ") not match " << trid->traddr << dendl;
7c673cae
FG
540 return false;
541 }
542
543 return true;
544}
545
546static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
547 struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
548{
11fdf7f2
TL
549 auto ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
550 ctx->manager->register_ctrlr(ctx->trid, ctrlr, &ctx->driver);
7c673cae
FG
551}
552
11fdf7f2 553int NVMEManager::try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver)
7c673cae 554{
11fdf7f2 555 std::lock_guard l(lock);
7c673cae 556 for (auto &&it : shared_driver_datas) {
11fdf7f2 557 if (it->is_equal(trid)) {
7c673cae
FG
558 *driver = it;
559 return 0;
560 }
561 }
562
11fdf7f2
TL
563 auto coremask_arg = g_conf().get_val<std::string>("bluestore_spdk_coremask");
564 int m_core_arg = -1;
565 try {
566 auto core_value = stoull(coremask_arg, nullptr, 16);
567 m_core_arg = ffsll(core_value);
568 } catch (const std::logic_error& e) {
569 derr << __func__ << " invalid bluestore_spdk_coremask: "
570 << coremask_arg << dendl;
571 return -EINVAL;
572 }
573 // at least one core is needed for using spdk
574 if (m_core_arg == 0) {
575 derr << __func__ << " invalid bluestore_spdk_coremask, "
576 << "at least one core is needed" << dendl;
577 return -ENOENT;
578 }
579 m_core_arg -= 1;
580
581 uint32_t mem_size_arg = (uint32_t)g_conf().get_val<Option::size_t>("bluestore_spdk_mem");
582
583 if (!dpdk_thread.joinable()) {
7c673cae
FG
584 dpdk_thread = std::thread(
585 [this, coremask_arg, m_core_arg, mem_size_arg]() {
586 static struct spdk_env_opts opts;
587 int r;
588
589 spdk_env_opts_init(&opts);
11fdf7f2
TL
590 opts.name = "nvme-device-manager";
591 opts.core_mask = coremask_arg.c_str();
592 opts.master_core = m_core_arg;
593 opts.mem_size = mem_size_arg;
7c673cae 594 spdk_env_init(&opts);
11fdf7f2 595 spdk_unaffinitize_thread();
7c673cae
FG
596
597 spdk_nvme_retry_count = g_ceph_context->_conf->bdev_nvme_retry_count;
598 if (spdk_nvme_retry_count < 0)
599 spdk_nvme_retry_count = SPDK_NVME_DEFAULT_RETRY_COUNT;
600
11fdf7f2
TL
601 std::unique_lock l(probe_queue_lock);
602 while (!stopping) {
7c673cae
FG
603 if (!probe_queue.empty()) {
604 ProbeContext* ctxt = probe_queue.front();
605 probe_queue.pop_front();
606 r = spdk_nvme_probe(NULL, ctxt, probe_cb, attach_cb, NULL);
607 if (r < 0) {
11fdf7f2 608 ceph_assert(!ctxt->driver);
7c673cae
FG
609 derr << __func__ << " device probe nvme failed" << dendl;
610 }
611 ctxt->done = true;
612 probe_queue_cond.notify_all();
613 } else {
614 probe_queue_cond.wait(l);
615 }
616 }
11fdf7f2
TL
617 for (auto p : probe_queue)
618 p->done = true;
619 probe_queue_cond.notify_all();
7c673cae
FG
620 }
621 );
7c673cae
FG
622 }
623
11fdf7f2 624 ProbeContext ctx{trid, this, nullptr, false};
7c673cae 625 {
11fdf7f2 626 std::unique_lock l(probe_queue_lock);
7c673cae
FG
627 probe_queue.push_back(&ctx);
628 while (!ctx.done)
629 probe_queue_cond.wait(l);
630 }
631 if (!ctx.driver)
632 return -1;
633 *driver = ctx.driver;
634
635 return 0;
636}
637
638void io_complete(void *t, const struct spdk_nvme_cpl *completion)
639{
640 Task *task = static_cast<Task*>(t);
641 IOContext *ctx = task->ctx;
642 SharedDriverQueueData *queue = task->queue;
643
11fdf7f2
TL
644 ceph_assert(queue != NULL);
645 ceph_assert(ctx != NULL);
646 --queue->current_queue_depth;
7c673cae
FG
647 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
648 ceph::coarse_real_clock::now() - task->start);
649 if (task->command == IOCommand::WRITE_COMMAND) {
11fdf7f2
TL
650 queue->logger->tinc(l_bluestore_nvmedevice_write_lat, dur);
651 ceph_assert(!spdk_nvme_cpl_is_error(completion));
7c673cae
FG
652 dout(20) << __func__ << " write/zero op successfully, left "
653 << queue->queue_op_seq - queue->completed_op_seq << dendl;
654 // check waiting count before doing callback (which may
655 // destroy this ioc).
656 if (ctx->priv) {
657 if (!--ctx->num_running) {
658 task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
659 }
660 } else {
31f18b77 661 ctx->try_aio_wake();
7c673cae
FG
662 }
663 task->release_segs(queue);
664 delete task;
665 } else if (task->command == IOCommand::READ_COMMAND) {
666 queue->logger->tinc(l_bluestore_nvmedevice_read_lat, dur);
11fdf7f2 667 ceph_assert(!spdk_nvme_cpl_is_error(completion));
7c673cae
FG
668 dout(20) << __func__ << " read op successfully" << dendl;
669 task->fill_cb();
670 task->release_segs(queue);
671 // read submitted by AIO
11fdf7f2 672 if (!task->return_code) {
7c673cae
FG
673 if (ctx->priv) {
674 if (!--ctx->num_running) {
675 task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
676 }
677 } else {
11fdf7f2 678 ctx->try_aio_wake();
7c673cae
FG
679 }
680 delete task;
681 } else {
682 task->return_code = 0;
11fdf7f2 683 ctx->try_aio_wake();
7c673cae
FG
684 }
685 } else {
11fdf7f2
TL
686 ceph_assert(task->command == IOCommand::FLUSH_COMMAND);
687 ceph_assert(!spdk_nvme_cpl_is_error(completion));
7c673cae
FG
688 queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
689 dout(20) << __func__ << " flush op successfully" << dendl;
690 task->return_code = 0;
691 }
692}
693
694// ----------------
695#undef dout_prefix
696#define dout_prefix *_dout << "bdev(" << name << ") "
697
698NVMEDevice::NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
11fdf7f2
TL
699 : BlockDevice(cct, cb, cbpriv),
700 driver(nullptr)
7c673cae
FG
701{
702}
703
7c673cae
FG
704int NVMEDevice::open(const string& p)
705{
7c673cae
FG
706 dout(1) << __func__ << " path " << p << dendl;
707
11fdf7f2
TL
708 std::ifstream ifs(p);
709 if (!ifs) {
710 derr << __func__ << " unable to open " << p << dendl;
711 return -1;
7c673cae 712 }
11fdf7f2
TL
713 string val;
714 std::getline(ifs, val);
715 spdk_nvme_transport_id trid;
716 if (int r = spdk_nvme_transport_id_parse(&trid, val.c_str()); r) {
717 derr << __func__ << " unable to read " << p << ": " << cpp_strerror(r)
718 << dendl;
7c673cae
FG
719 return r;
720 }
11fdf7f2
TL
721 if (int r = manager.try_get(trid, &driver); r < 0) {
722 derr << __func__ << " failed to get nvme device with transport address " << trid.traddr << dendl;
7c673cae
FG
723 return r;
724 }
725
726 driver->register_device(this);
727 block_size = driver->get_block_size();
728 size = driver->get_size();
11fdf7f2 729 name = trid.traddr;
7c673cae
FG
730
731 //nvme is non-rotational device.
732 rotational = false;
733
734 // round size down to an even block
735 size &= ~(block_size - 1);
736
1adf2230
AA
737 dout(1) << __func__ << " size " << size << " (" << byte_u_t(size) << ")"
738 << " block_size " << block_size << " (" << byte_u_t(block_size)
739 << ")" << dendl;
7c673cae 740
11fdf7f2 741
7c673cae
FG
742 return 0;
743}
744
745void NVMEDevice::close()
746{
747 dout(1) << __func__ << dendl;
748
1adf2230
AA
749 delete queue_t;
750 queue_t = nullptr;
7c673cae
FG
751 name.clear();
752 driver->remove_device(this);
753
754 dout(1) << __func__ << " end" << dendl;
755}
756
11fdf7f2 757int NVMEDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
7c673cae
FG
758{
759 (*pm)[prefix + "rotational"] = "0";
760 (*pm)[prefix + "size"] = stringify(get_size());
761 (*pm)[prefix + "block_size"] = stringify(get_block_size());
762 (*pm)[prefix + "driver"] = "NVMEDevice";
763 (*pm)[prefix + "type"] = "nvme";
764 (*pm)[prefix + "access_mode"] = "spdk";
765 (*pm)[prefix + "nvme_serial_number"] = name;
766
767 return 0;
768}
769
770int NVMEDevice::flush()
771{
7c673cae
FG
772 return 0;
773}
774
775void NVMEDevice::aio_submit(IOContext *ioc)
776{
777 dout(20) << __func__ << " ioc " << ioc << " pending "
778 << ioc->num_pending.load() << " running "
779 << ioc->num_running.load() << dendl;
780 int pending = ioc->num_pending.load();
781 Task *t = static_cast<Task*>(ioc->nvme_task_first);
782 if (pending && t) {
783 ioc->num_running += pending;
784 ioc->num_pending -= pending;
11fdf7f2 785 ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this
7c673cae 786 // Only need to push the first entry
7c673cae 787 ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
11fdf7f2
TL
788 if (!queue_t)
789 queue_t = new SharedDriverQueueData(this, driver);
790 queue_t->_aio_handle(t, ioc);
7c673cae
FG
791 }
792}
793
11fdf7f2
TL
794static void write_split(
795 NVMEDevice *dev,
7c673cae
FG
796 uint64_t off,
797 bufferlist &bl,
11fdf7f2 798 IOContext *ioc)
7c673cae 799{
11fdf7f2
TL
800 uint64_t remain_len = bl.length(), begin = 0, write_size;
801 Task *t, *first, *last;
802 // This value may need to be got from configuration later.
803 uint64_t split_size = 131072; // 128KB.
804
805 while (remain_len > 0) {
806 write_size = std::min(remain_len, split_size);
807 t = new Task(dev, IOCommand::WRITE_COMMAND, off + begin, write_size);
808 // TODO: if upper layer alloc memory with known physical address,
809 // we can reduce this copy
810 bl.splice(0, write_size, &t->bl);
811 remain_len -= write_size;
7c673cae 812 t->ctx = ioc;
11fdf7f2
TL
813 first = static_cast<Task*>(ioc->nvme_task_first);
814 last = static_cast<Task*>(ioc->nvme_task_last);
7c673cae
FG
815 if (last)
816 last->next = t;
817 if (!first)
818 ioc->nvme_task_first = t;
819 ioc->nvme_task_last = t;
820 ++ioc->num_pending;
11fdf7f2 821 begin += write_size;
7c673cae 822 }
11fdf7f2 823}
7c673cae 824
11fdf7f2
TL
825int NVMEDevice::aio_write(
826 uint64_t off,
827 bufferlist &bl,
828 IOContext *ioc,
829 bool buffered,
830 int write_hint)
831{
832 uint64_t len = bl.length();
833 dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc
834 << " buffered " << buffered << dendl;
835 ceph_assert(is_valid_io(off, len));
836
837 write_split(this, off, bl, ioc);
7c673cae
FG
838 dout(5) << __func__ << " " << off << "~" << len << dendl;
839
840 return 0;
841}
842
11fdf7f2 843int NVMEDevice::write(uint64_t off, bufferlist &bl, bool buffered, int write_hint)
7c673cae 844{
11fdf7f2
TL
845 uint64_t len = bl.length();
846 dout(20) << __func__ << " " << off << "~" << len << " buffered "
847 << buffered << dendl;
848 ceph_assert(off % block_size == 0);
849 ceph_assert(len % block_size == 0);
850 ceph_assert(len > 0);
851 ceph_assert(off < size);
852 ceph_assert(off + len <= size);
853
224ce89b 854 IOContext ioc(cct, NULL);
11fdf7f2
TL
855 write_split(this, off, bl, &ioc);
856 dout(5) << __func__ << " " << off << "~" << len << dendl;
857 aio_submit(&ioc);
7c673cae
FG
858 ioc.aio_wait();
859 return 0;
860}
861
862int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
863 IOContext *ioc,
864 bool buffered)
865{
866 dout(5) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
11fdf7f2 867 ceph_assert(is_valid_io(off, len));
7c673cae
FG
868
869 Task *t = new Task(this, IOCommand::READ_COMMAND, off, len, 1);
11fdf7f2 870 bufferptr p = buffer::create_small_page_aligned(len);
7c673cae
FG
871 int r = 0;
872 t->ctx = ioc;
873 char *buf = p.c_str();
874 t->fill_cb = [buf, t]() {
875 t->copy_to_buf(buf, 0, t->len);
876 };
7c673cae 877
11fdf7f2
TL
878 ++ioc->num_pending;
879 ioc->nvme_task_first = t;
880 aio_submit(ioc);
881 ioc->aio_wait();
882
7c673cae
FG
883 pbl->push_back(std::move(p));
884 r = t->return_code;
885 delete t;
886 return r;
887}
888
889int NVMEDevice::aio_read(
890 uint64_t off,
891 uint64_t len,
892 bufferlist *pbl,
893 IOContext *ioc)
894{
895 dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
11fdf7f2 896 ceph_assert(is_valid_io(off, len));
7c673cae
FG
897
898 Task *t = new Task(this, IOCommand::READ_COMMAND, off, len);
899
11fdf7f2 900 bufferptr p = buffer::create_small_page_aligned(len);
7c673cae
FG
901 pbl->append(p);
902 t->ctx = ioc;
11fdf7f2 903 char* buf = p.c_str();
7c673cae
FG
904 t->fill_cb = [buf, t]() {
905 t->copy_to_buf(buf, 0, t->len);
906 };
907
908 Task *first = static_cast<Task*>(ioc->nvme_task_first);
909 Task *last = static_cast<Task*>(ioc->nvme_task_last);
910 if (last)
911 last->next = t;
912 if (!first)
913 ioc->nvme_task_first = t;
914 ioc->nvme_task_last = t;
915 ++ioc->num_pending;
916
917 return 0;
918}
919
920int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
921{
11fdf7f2
TL
922 ceph_assert(len > 0);
923 ceph_assert(off < size);
924 ceph_assert(off + len <= size);
7c673cae
FG
925
926 uint64_t aligned_off = align_down(off, block_size);
927 uint64_t aligned_len = align_up(off+len, block_size) - aligned_off;
928 dout(5) << __func__ << " " << off << "~" << len
929 << " aligned " << aligned_off << "~" << aligned_len << dendl;
930 IOContext ioc(g_ceph_context, nullptr);
931 Task *t = new Task(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1);
932 int r = 0;
933 t->ctx = &ioc;
934 t->fill_cb = [buf, t, off, len]() {
935 t->copy_to_buf(buf, off-t->offset, len);
936 };
7c673cae 937
11fdf7f2
TL
938 ++ioc.num_pending;
939 ioc.nvme_task_first = t;
940 aio_submit(&ioc);
941 ioc.aio_wait();
942
7c673cae
FG
943 r = t->return_code;
944 delete t;
945 return r;
946}
947
948int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len)
949{
950 dout(5) << __func__ << " " << off << "~" << len << dendl;
951 return 0;
952}