]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/NVMEDevice.cc
import 15.2.5
[ceph.git] / ceph / src / os / bluestore / NVMEDevice.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 //
3 // vim: ts=8 sw=2 smarttab
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2015 XSky <haomai@xsky.com>
8 *
9 * Author: Haomai Wang <haomaiwang@gmail.com>
10 *
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
15 *
16 */
17
18 #include <unistd.h>
19 #include <stdlib.h>
20 #include <strings.h>
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <unistd.h>
25
26 #include <chrono>
27 #include <fstream>
28 #include <functional>
29 #include <map>
30 #include <thread>
31
32 #include <spdk/nvme.h>
33
34 #include "include/intarith.h"
35 #include "include/stringify.h"
36 #include "include/types.h"
37 #include "include/compat.h"
38 #include "common/errno.h"
39 #include "common/debug.h"
40 #include "common/perf_counters.h"
41
42 #include "NVMEDevice.h"
43
44 #define dout_context g_ceph_context
45 #define dout_subsys ceph_subsys_bdev
46 #undef dout_prefix
47 #define dout_prefix *_dout << "bdev(" << sn << ") "
48
49 thread_local SharedDriverQueueData *queue_t;
50
51 static constexpr uint16_t data_buffer_default_num = 1024;
52
53 static constexpr uint32_t data_buffer_size = 8192;
54
55 static constexpr uint16_t inline_segment_num = 32;
56
57 enum {
58 l_bluestore_nvmedevice_first = 632430,
59 l_bluestore_nvmedevice_write_lat,
60 l_bluestore_nvmedevice_read_lat,
61 l_bluestore_nvmedevice_flush_lat,
62 l_bluestore_nvmedevice_write_queue_lat,
63 l_bluestore_nvmedevice_read_queue_lat,
64 l_bluestore_nvmedevice_flush_queue_lat,
65 l_bluestore_nvmedevice_queue_ops,
66 l_bluestore_nvmedevice_polling_lat,
67 l_bluestore_nvmedevice_buffer_alloc_failed,
68 l_bluestore_nvmedevice_last
69 };
70
71 static void io_complete(void *t, const struct spdk_nvme_cpl *completion);
72
73 struct IORequest {
74 uint16_t cur_seg_idx = 0;
75 uint16_t nseg;
76 uint32_t cur_seg_left = 0;
77 void *inline_segs[inline_segment_num];
78 void **extra_segs = nullptr;
79 };
80
81 struct Task;
82
83 class SharedDriverData {
84 unsigned id;
85 spdk_nvme_transport_id trid;
86 spdk_nvme_ctrlr *ctrlr;
87 spdk_nvme_ns *ns;
88 uint32_t block_size = 0;
89 uint64_t size = 0;
90
91 public:
92 std::vector<NVMEDevice*> registered_devices;
93 friend class SharedDriverQueueData;
94 SharedDriverData(unsigned id_, const spdk_nvme_transport_id& trid_,
95 spdk_nvme_ctrlr *c, spdk_nvme_ns *ns_)
96 : id(id_),
97 trid(trid_),
98 ctrlr(c),
99 ns(ns_) {
100 block_size = spdk_nvme_ns_get_extended_sector_size(ns);
101 size = spdk_nvme_ns_get_size(ns);
102 }
103
104 bool is_equal(const spdk_nvme_transport_id& trid2) const {
105 return spdk_nvme_transport_id_compare(&trid, &trid2) == 0;
106 }
107 ~SharedDriverData() {
108 }
109
110 void register_device(NVMEDevice *device) {
111 registered_devices.push_back(device);
112 }
113
114 void remove_device(NVMEDevice *device) {
115 std::vector<NVMEDevice*> new_devices;
116 for (auto &&it : registered_devices) {
117 if (it != device)
118 new_devices.push_back(it);
119 }
120 registered_devices.swap(new_devices);
121 }
122
123 uint32_t get_block_size() {
124 return block_size;
125 }
126 uint64_t get_size() {
127 return size;
128 }
129 };
130
131 class SharedDriverQueueData {
132 NVMEDevice *bdev;
133 SharedDriverData *driver;
134 spdk_nvme_ctrlr *ctrlr;
135 spdk_nvme_ns *ns;
136 std::string sn;
137 uint32_t block_size;
138 uint32_t max_queue_depth;
139 struct spdk_nvme_qpair *qpair;
140 bool reap_io = false;
141 int alloc_buf_from_pool(Task *t, bool write);
142
143 public:
144 uint32_t current_queue_depth = 0;
145 std::atomic_ulong completed_op_seq, queue_op_seq;
146 std::vector<void*> data_buf_mempool;
147 PerfCounters *logger = nullptr;
148 void _aio_handle(Task *t, IOContext *ioc);
149
150 SharedDriverQueueData(NVMEDevice *bdev, SharedDriverData *driver)
151 : bdev(bdev),
152 driver(driver) {
153 ctrlr = driver->ctrlr;
154 ns = driver->ns;
155 block_size = driver->block_size;
156
157 struct spdk_nvme_io_qpair_opts opts = {};
158 spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts));
159 opts.qprio = SPDK_NVME_QPRIO_URGENT;
160 // usable queue depth should minus 1 to aovid overflow.
161 max_queue_depth = opts.io_queue_size - 1;
162 qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, &opts, sizeof(opts));
163 ceph_assert(qpair != NULL);
164
165 // allocate spdk dma memory
166 for (uint16_t i = 0; i < data_buffer_default_num; i++) {
167 void *b = spdk_dma_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
168 if (!b) {
169 derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
170 ceph_assert(b);
171 }
172 data_buf_mempool.push_back(b);
173 }
174
175 PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
176 l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
177 b.add_time_avg(l_bluestore_nvmedevice_write_lat, "write_lat", "Average write completing latency");
178 b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
179 b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
180 b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
181 b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
182 b.add_time_avg(l_bluestore_nvmedevice_write_queue_lat, "write_queue_lat", "Average queue write request latency");
183 b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
184 b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
185 b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
186 logger = b.create_perf_counters();
187 g_ceph_context->get_perfcounters_collection()->add(logger);
188 bdev->queue_number++;
189 if (bdev->queue_number.load() == 1)
190 reap_io = true;
191 }
192
193 ~SharedDriverQueueData() {
194 g_ceph_context->get_perfcounters_collection()->remove(logger);
195 if (qpair) {
196 spdk_nvme_ctrlr_free_io_qpair(qpair);
197 bdev->queue_number--;
198 }
199
200 // free all spdk dma memory;
201 if (!data_buf_mempool.empty()) {
202 for (uint16_t i = 0; i < data_buffer_default_num; i++) {
203 void *b = data_buf_mempool[i];
204 ceph_assert(b);
205 spdk_dma_free(b);
206 }
207 data_buf_mempool.clear();
208 }
209
210 delete logger;
211 }
212 };
213
214 struct Task {
215 NVMEDevice *device;
216 IOContext *ctx = nullptr;
217 IOCommand command;
218 uint64_t offset;
219 uint64_t len;
220 bufferlist bl;
221 std::function<void()> fill_cb;
222 Task *next = nullptr;
223 int64_t return_code;
224 Task *primary = nullptr;
225 ceph::coarse_real_clock::time_point start;
226 IORequest io_request = {};
227 ceph::mutex lock = ceph::make_mutex("Task::lock");
228 ceph::condition_variable cond;
229 SharedDriverQueueData *queue = nullptr;
230 // reference count by subtasks.
231 int ref = 0;
232 Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0,
233 Task *p = nullptr)
234 : device(dev), command(c), offset(off), len(l),
235 return_code(rc), primary(p),
236 start(ceph::coarse_real_clock::now()) {
237 if (primary) {
238 primary->ref++;
239 return_code = primary->return_code;
240 }
241 }
242 ~Task() {
243 if (primary)
244 primary->ref--;
245 ceph_assert(!io_request.nseg);
246 }
247 void release_segs(SharedDriverQueueData *queue_data) {
248 if (io_request.extra_segs) {
249 for (uint16_t i = 0; i < io_request.nseg; i++)
250 queue_data->data_buf_mempool.push_back(io_request.extra_segs[i]);
251 delete io_request.extra_segs;
252 } else if (io_request.nseg) {
253 for (uint16_t i = 0; i < io_request.nseg; i++)
254 queue_data->data_buf_mempool.push_back(io_request.inline_segs[i]);
255 }
256 ctx->total_nseg -= io_request.nseg;
257 io_request.nseg = 0;
258 }
259
260 void copy_to_buf(char *buf, uint64_t off, uint64_t len) {
261 uint64_t copied = 0;
262 uint64_t left = len;
263 void **segs = io_request.extra_segs ? io_request.extra_segs : io_request.inline_segs;
264 uint16_t i = 0;
265 while (left > 0) {
266 char *src = static_cast<char*>(segs[i++]);
267 uint64_t need_copy = std::min(left, data_buffer_size-off);
268 memcpy(buf+copied, src+off, need_copy);
269 off = 0;
270 left -= need_copy;
271 copied += need_copy;
272 }
273 }
274 };
275
276 static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
277 {
278 Task *t = static_cast<Task*>(cb_arg);
279 uint32_t i = sgl_offset / data_buffer_size;
280 uint32_t offset = i * data_buffer_size;
281 ceph_assert(i <= t->io_request.nseg);
282
283 for (; i < t->io_request.nseg; i++) {
284 offset += data_buffer_size;
285 if (offset > sgl_offset) {
286 if (offset > t->len)
287 offset = t->len;
288 break;
289 }
290 }
291
292 t->io_request.cur_seg_idx = i;
293 t->io_request.cur_seg_left = offset - sgl_offset;
294 return ;
295 }
296
297 static int data_buf_next_sge(void *cb_arg, void **address, uint32_t *length)
298 {
299 uint32_t size;
300 void *addr;
301 Task *t = static_cast<Task*>(cb_arg);
302 if (t->io_request.cur_seg_idx >= t->io_request.nseg) {
303 *length = 0;
304 *address = 0;
305 return 0;
306 }
307
308 addr = t->io_request.extra_segs ? t->io_request.extra_segs[t->io_request.cur_seg_idx] : t->io_request.inline_segs[t->io_request.cur_seg_idx];
309
310 size = data_buffer_size;
311 if (t->io_request.cur_seg_idx == t->io_request.nseg - 1) {
312 uint64_t tail = t->len % data_buffer_size;
313 if (tail) {
314 size = (uint32_t) tail;
315 }
316 }
317
318 if (t->io_request.cur_seg_left) {
319 *address = (void *)((uint64_t)addr + size - t->io_request.cur_seg_left);
320 *length = t->io_request.cur_seg_left;
321 t->io_request.cur_seg_left = 0;
322 } else {
323 *address = addr;
324 *length = size;
325 }
326
327 t->io_request.cur_seg_idx++;
328 return 0;
329 }
330
331 int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write)
332 {
333 uint64_t count = t->len / data_buffer_size;
334 if (t->len % data_buffer_size)
335 ++count;
336 void **segs;
337 if (count > data_buf_mempool.size())
338 return -ENOMEM;
339 if (count <= inline_segment_num) {
340 segs = t->io_request.inline_segs;
341 } else {
342 t->io_request.extra_segs = new void*[count];
343 segs = t->io_request.extra_segs;
344 }
345 for (uint16_t i = 0; i < count; i++) {
346 segs[i] = data_buf_mempool.back();
347 data_buf_mempool.pop_back();
348 }
349 t->io_request.nseg = count;
350 t->ctx->total_nseg += count;
351 if (write) {
352 auto blp = t->bl.begin();
353 uint32_t len = 0;
354 uint16_t i = 0;
355 for (; i < count - 1; ++i) {
356 blp.copy(data_buffer_size, static_cast<char*>(segs[i]));
357 len += data_buffer_size;
358 }
359 blp.copy(t->bl.length() - len, static_cast<char*>(segs[i]));
360 }
361
362 return 0;
363 }
364
365 void SharedDriverQueueData::_aio_handle(Task *t, IOContext *ioc)
366 {
367 dout(20) << __func__ << " start" << dendl;
368
369 int r = 0;
370 uint64_t lba_off, lba_count;
371 uint32_t max_io_completion = (uint32_t)g_conf().get_val<uint64_t>("bluestore_spdk_max_io_completion");
372 uint64_t io_sleep_in_us = g_conf().get_val<uint64_t>("bluestore_spdk_io_sleep");
373
374 ceph::coarse_real_clock::time_point cur, start
375 = ceph::coarse_real_clock::now();
376 while (ioc->num_running) {
377 again:
378 dout(40) << __func__ << " polling" << dendl;
379 if (current_queue_depth) {
380 r = spdk_nvme_qpair_process_completions(qpair, max_io_completion);
381 if (r < 0) {
382 ceph_abort();
383 } else if (r == 0) {
384 usleep(io_sleep_in_us);
385 }
386 }
387
388 for (; t; t = t->next) {
389 if (current_queue_depth == max_queue_depth) {
390 // no slots
391 goto again;
392 }
393
394 t->queue = this;
395 lba_off = t->offset / block_size;
396 lba_count = t->len / block_size;
397 switch (t->command) {
398 case IOCommand::WRITE_COMMAND:
399 {
400 dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl;
401 r = alloc_buf_from_pool(t, true);
402 if (r < 0) {
403 logger->inc(l_bluestore_nvmedevice_buffer_alloc_failed);
404 goto again;
405 }
406
407 r = spdk_nvme_ns_cmd_writev(
408 ns, qpair, lba_off, lba_count, io_complete, t, 0,
409 data_buf_reset_sgl, data_buf_next_sge);
410 if (r < 0) {
411 derr << __func__ << " failed to do write command: " << cpp_strerror(r) << dendl;
412 t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
413 t->release_segs(this);
414 delete t;
415 ceph_abort();
416 }
417 cur = ceph::coarse_real_clock::now();
418 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - t->start);
419 logger->tinc(l_bluestore_nvmedevice_write_queue_lat, dur);
420 break;
421 }
422 case IOCommand::READ_COMMAND:
423 {
424 dout(20) << __func__ << " read command issued " << lba_off << "~" << lba_count << dendl;
425 r = alloc_buf_from_pool(t, false);
426 if (r < 0) {
427 logger->inc(l_bluestore_nvmedevice_buffer_alloc_failed);
428 goto again;
429 }
430
431 r = spdk_nvme_ns_cmd_readv(
432 ns, qpair, lba_off, lba_count, io_complete, t, 0,
433 data_buf_reset_sgl, data_buf_next_sge);
434 if (r < 0) {
435 derr << __func__ << " failed to read: " << cpp_strerror(r) << dendl;
436 t->release_segs(this);
437 delete t;
438 ceph_abort();
439 } else {
440 cur = ceph::coarse_real_clock::now();
441 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - t->start);
442 logger->tinc(l_bluestore_nvmedevice_read_queue_lat, dur);
443 }
444 break;
445 }
446 case IOCommand::FLUSH_COMMAND:
447 {
448 dout(20) << __func__ << " flush command issueed " << dendl;
449 r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t);
450 if (r < 0) {
451 derr << __func__ << " failed to flush: " << cpp_strerror(r) << dendl;
452 t->release_segs(this);
453 delete t;
454 ceph_abort();
455 } else {
456 cur = ceph::coarse_real_clock::now();
457 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - t->start);
458 logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, dur);
459 }
460 break;
461 }
462 }
463 current_queue_depth++;
464 }
465 cur = ceph::coarse_real_clock::now();
466 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
467 logger->tinc(l_bluestore_nvmedevice_polling_lat, dur);
468 start = ceph::coarse_real_clock::now();
469 }
470
471 if (reap_io)
472 bdev->reap_ioc();
473 dout(20) << __func__ << " end" << dendl;
474 }
475
476 #define dout_subsys ceph_subsys_bdev
477 #undef dout_prefix
478 #define dout_prefix *_dout << "bdev "
479
480 class NVMEManager {
481 public:
482 struct ProbeContext {
483 spdk_nvme_transport_id trid;
484 NVMEManager *manager;
485 SharedDriverData *driver;
486 bool done;
487 };
488
489 private:
490 ceph::mutex lock = ceph::make_mutex("NVMEManager::lock");
491 bool stopping = false;
492 std::vector<SharedDriverData*> shared_driver_datas;
493 std::thread dpdk_thread;
494 ceph::mutex probe_queue_lock = ceph::make_mutex("NVMEManager::probe_queue_lock");
495 ceph::condition_variable probe_queue_cond;
496 std::list<ProbeContext*> probe_queue;
497
498 public:
499 NVMEManager() {}
500 ~NVMEManager() {
501 if (!dpdk_thread.joinable())
502 return;
503 {
504 std::lock_guard guard(probe_queue_lock);
505 stopping = true;
506 probe_queue_cond.notify_all();
507 }
508 dpdk_thread.join();
509 }
510
511 int try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver);
512 void register_ctrlr(const spdk_nvme_transport_id& trid, spdk_nvme_ctrlr *c, SharedDriverData **driver) {
513 ceph_assert(ceph_mutex_is_locked(lock));
514 spdk_nvme_ns *ns;
515 int num_ns = spdk_nvme_ctrlr_get_num_ns(c);
516 ceph_assert(num_ns >= 1);
517 if (num_ns > 1) {
518 dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl;
519 }
520 ns = spdk_nvme_ctrlr_get_ns(c, 1);
521 if (!ns) {
522 derr << __func__ << " failed to get namespace at 1" << dendl;
523 ceph_abort();
524 }
525 dout(1) << __func__ << " successfully attach nvme device at" << trid.traddr << dendl;
526
527 // only support one device per osd now!
528 ceph_assert(shared_driver_datas.empty());
529 // index 0 is occurred by master thread
530 shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, trid, c, ns));
531 *driver = shared_driver_datas.back();
532 }
533 };
534
535 static NVMEManager manager;
536
537 static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts)
538 {
539 NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
540
541 if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) {
542 dout(0) << __func__ << " only probe local nvme device" << dendl;
543 return false;
544 }
545
546 dout(0) << __func__ << " found device at: "
547 << "trtype=" << spdk_nvme_transport_id_trtype_str(trid->trtype) << ", "
548 << "traddr=" << trid->traddr << dendl;
549 if (spdk_nvme_transport_id_compare(&ctx->trid, trid)) {
550 dout(0) << __func__ << " device traddr (" << ctx->trid.traddr << ") not match " << trid->traddr << dendl;
551 return false;
552 }
553
554 return true;
555 }
556
557 static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
558 struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
559 {
560 auto ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
561 ctx->manager->register_ctrlr(ctx->trid, ctrlr, &ctx->driver);
562 }
563
564 static int hex2dec(unsigned char c)
565 {
566 if (isdigit(c))
567 return c - '0';
568 else if (isupper(c))
569 return c - 'A' + 10;
570 else
571 return c - 'a' + 10;
572 }
573
574 static int find_first_bitset(const string& s)
575 {
576 auto e = s.rend();
577 if (s.compare(0, 2, "0x") == 0 ||
578 s.compare(0, 2, "0X") == 0) {
579 advance(e, -2);
580 }
581 auto p = s.rbegin();
582 for (int pos = 0; p != e; ++p, pos += 4) {
583 if (!isxdigit(*p)) {
584 return -EINVAL;
585 }
586 if (int val = hex2dec(*p); val != 0) {
587 return pos + ffs(val);
588 }
589 }
590 return 0;
591 }
592
593 int NVMEManager::try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver)
594 {
595 std::lock_guard l(lock);
596 for (auto &&it : shared_driver_datas) {
597 if (it->is_equal(trid)) {
598 *driver = it;
599 return 0;
600 }
601 }
602
603 auto coremask_arg = g_conf().get_val<std::string>("bluestore_spdk_coremask");
604 int m_core_arg = find_first_bitset(coremask_arg);
605 // at least one core is needed for using spdk
606 if (m_core_arg <= 0) {
607 derr << __func__ << " invalid bluestore_spdk_coremask, "
608 << "at least one core is needed" << dendl;
609 return -ENOENT;
610 }
611 m_core_arg -= 1;
612
613 uint32_t mem_size_arg = (uint32_t)g_conf().get_val<Option::size_t>("bluestore_spdk_mem");
614
615 if (!dpdk_thread.joinable()) {
616 dpdk_thread = std::thread(
617 [this, coremask_arg, m_core_arg, mem_size_arg]() {
618 static struct spdk_env_opts opts;
619 int r;
620
621 spdk_env_opts_init(&opts);
622 opts.name = "nvme-device-manager";
623 opts.core_mask = coremask_arg.c_str();
624 opts.master_core = m_core_arg;
625 opts.mem_size = mem_size_arg;
626 spdk_env_init(&opts);
627 spdk_unaffinitize_thread();
628
629 spdk_nvme_retry_count = g_ceph_context->_conf->bdev_nvme_retry_count;
630 if (spdk_nvme_retry_count < 0)
631 spdk_nvme_retry_count = SPDK_NVME_DEFAULT_RETRY_COUNT;
632
633 std::unique_lock l(probe_queue_lock);
634 while (!stopping) {
635 if (!probe_queue.empty()) {
636 ProbeContext* ctxt = probe_queue.front();
637 probe_queue.pop_front();
638 r = spdk_nvme_probe(NULL, ctxt, probe_cb, attach_cb, NULL);
639 if (r < 0) {
640 ceph_assert(!ctxt->driver);
641 derr << __func__ << " device probe nvme failed" << dendl;
642 }
643 ctxt->done = true;
644 probe_queue_cond.notify_all();
645 } else {
646 probe_queue_cond.wait(l);
647 }
648 }
649 for (auto p : probe_queue)
650 p->done = true;
651 probe_queue_cond.notify_all();
652 }
653 );
654 }
655
656 ProbeContext ctx{trid, this, nullptr, false};
657 {
658 std::unique_lock l(probe_queue_lock);
659 probe_queue.push_back(&ctx);
660 while (!ctx.done)
661 probe_queue_cond.wait(l);
662 }
663 if (!ctx.driver)
664 return -1;
665 *driver = ctx.driver;
666
667 return 0;
668 }
669
670 void io_complete(void *t, const struct spdk_nvme_cpl *completion)
671 {
672 Task *task = static_cast<Task*>(t);
673 IOContext *ctx = task->ctx;
674 SharedDriverQueueData *queue = task->queue;
675
676 ceph_assert(queue != NULL);
677 ceph_assert(ctx != NULL);
678 --queue->current_queue_depth;
679 auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
680 ceph::coarse_real_clock::now() - task->start);
681 if (task->command == IOCommand::WRITE_COMMAND) {
682 queue->logger->tinc(l_bluestore_nvmedevice_write_lat, dur);
683 ceph_assert(!spdk_nvme_cpl_is_error(completion));
684 dout(20) << __func__ << " write/zero op successfully, left "
685 << queue->queue_op_seq - queue->completed_op_seq << dendl;
686 // check waiting count before doing callback (which may
687 // destroy this ioc).
688 if (ctx->priv) {
689 if (!--ctx->num_running) {
690 task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
691 }
692 } else {
693 ctx->try_aio_wake();
694 }
695 task->release_segs(queue);
696 delete task;
697 } else if (task->command == IOCommand::READ_COMMAND) {
698 queue->logger->tinc(l_bluestore_nvmedevice_read_lat, dur);
699 ceph_assert(!spdk_nvme_cpl_is_error(completion));
700 dout(20) << __func__ << " read op successfully" << dendl;
701 task->fill_cb();
702 task->release_segs(queue);
703 // read submitted by AIO
704 if (!task->return_code) {
705 if (ctx->priv) {
706 if (!--ctx->num_running) {
707 task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
708 }
709 } else {
710 ctx->try_aio_wake();
711 }
712 delete task;
713 } else {
714 if (Task* primary = task->primary; primary != nullptr) {
715 delete task;
716 if (!primary->ref)
717 primary->return_code = 0;
718 } else {
719 task->return_code = 0;
720 }
721 --ctx->num_running;
722 }
723 } else {
724 ceph_assert(task->command == IOCommand::FLUSH_COMMAND);
725 ceph_assert(!spdk_nvme_cpl_is_error(completion));
726 queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
727 dout(20) << __func__ << " flush op successfully" << dendl;
728 task->return_code = 0;
729 }
730 }
731
732 // ----------------
733 #undef dout_prefix
734 #define dout_prefix *_dout << "bdev(" << name << ") "
735
736 NVMEDevice::NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
737 : BlockDevice(cct, cb, cbpriv),
738 driver(nullptr)
739 {
740 }
741
742 int NVMEDevice::open(const string& p)
743 {
744 dout(1) << __func__ << " path " << p << dendl;
745
746 std::ifstream ifs(p);
747 if (!ifs) {
748 derr << __func__ << " unable to open " << p << dendl;
749 return -1;
750 }
751 string val;
752 std::getline(ifs, val);
753 spdk_nvme_transport_id trid;
754 if (int r = spdk_nvme_transport_id_parse(&trid, val.c_str()); r) {
755 derr << __func__ << " unable to read " << p << ": " << cpp_strerror(r)
756 << dendl;
757 return r;
758 }
759 if (int r = manager.try_get(trid, &driver); r < 0) {
760 derr << __func__ << " failed to get nvme device with transport address " << trid.traddr << dendl;
761 return r;
762 }
763
764 driver->register_device(this);
765 block_size = driver->get_block_size();
766 size = driver->get_size();
767 name = trid.traddr;
768
769 //nvme is non-rotational device.
770 rotational = false;
771
772 // round size down to an even block
773 size &= ~(block_size - 1);
774
775 dout(1) << __func__ << " size " << size << " (" << byte_u_t(size) << ")"
776 << " block_size " << block_size << " (" << byte_u_t(block_size)
777 << ")" << dendl;
778
779
780 return 0;
781 }
782
783 void NVMEDevice::close()
784 {
785 dout(1) << __func__ << dendl;
786
787 delete queue_t;
788 queue_t = nullptr;
789 name.clear();
790 driver->remove_device(this);
791
792 dout(1) << __func__ << " end" << dendl;
793 }
794
795 int NVMEDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
796 {
797 (*pm)[prefix + "rotational"] = "0";
798 (*pm)[prefix + "size"] = stringify(get_size());
799 (*pm)[prefix + "block_size"] = stringify(get_block_size());
800 (*pm)[prefix + "driver"] = "NVMEDevice";
801 (*pm)[prefix + "type"] = "nvme";
802 (*pm)[prefix + "access_mode"] = "spdk";
803 (*pm)[prefix + "nvme_serial_number"] = name;
804
805 return 0;
806 }
807
808 int NVMEDevice::flush()
809 {
810 return 0;
811 }
812
813 void NVMEDevice::aio_submit(IOContext *ioc)
814 {
815 dout(20) << __func__ << " ioc " << ioc << " pending "
816 << ioc->num_pending.load() << " running "
817 << ioc->num_running.load() << dendl;
818 int pending = ioc->num_pending.load();
819 Task *t = static_cast<Task*>(ioc->nvme_task_first);
820 if (pending && t) {
821 ioc->num_running += pending;
822 ioc->num_pending -= pending;
823 ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this
824 // Only need to push the first entry
825 ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
826 if (!queue_t)
827 queue_t = new SharedDriverQueueData(this, driver);
828 queue_t->_aio_handle(t, ioc);
829 }
830 }
831
832 static void ioc_append_task(IOContext *ioc, Task *t)
833 {
834 Task *first, *last;
835
836 first = static_cast<Task*>(ioc->nvme_task_first);
837 last = static_cast<Task*>(ioc->nvme_task_last);
838 if (last)
839 last->next = t;
840 if (!first)
841 ioc->nvme_task_first = t;
842 ioc->nvme_task_last = t;
843 ++ioc->num_pending;
844 }
845
846 static void write_split(
847 NVMEDevice *dev,
848 uint64_t off,
849 bufferlist &bl,
850 IOContext *ioc)
851 {
852 uint64_t remain_len = bl.length(), begin = 0, write_size;
853 Task *t;
854 // This value may need to be got from configuration later.
855 uint64_t split_size = 131072; // 128KB.
856
857 while (remain_len > 0) {
858 write_size = std::min(remain_len, split_size);
859 t = new Task(dev, IOCommand::WRITE_COMMAND, off + begin, write_size);
860 // TODO: if upper layer alloc memory with known physical address,
861 // we can reduce this copy
862 bl.splice(0, write_size, &t->bl);
863 remain_len -= write_size;
864 t->ctx = ioc;
865 ioc_append_task(ioc, t);
866 begin += write_size;
867 }
868 }
869
870 static void make_read_tasks(
871 NVMEDevice *dev,
872 uint64_t aligned_off,
873 IOContext *ioc, char *buf, uint64_t aligned_len, Task *primary,
874 uint64_t orig_off, uint64_t orig_len)
875 {
876 // This value may need to be got from configuration later.
877 uint64_t split_size = 131072; // 128KB.
878 uint64_t tmp_off = orig_off - aligned_off, remain_orig_len = orig_len;
879 auto begin = aligned_off;
880 const auto aligned_end = begin + aligned_len;
881
882 for (; begin < aligned_end; begin += split_size) {
883 auto read_size = std::min(aligned_end - begin, split_size);
884 auto tmp_len = std::min(remain_orig_len, read_size - tmp_off);
885 Task *t = nullptr;
886
887 if (primary && (aligned_len <= split_size)) {
888 t = primary;
889 } else {
890 t = new Task(dev, IOCommand::READ_COMMAND, begin, read_size, 0, primary);
891 }
892
893 t->ctx = ioc;
894
895 // TODO: if upper layer alloc memory with known physical address,
896 // we can reduce this copy
897 t->fill_cb = [buf, t, tmp_off, tmp_len] {
898 t->copy_to_buf(buf, tmp_off, tmp_len);
899 };
900
901 ioc_append_task(ioc, t);
902 remain_orig_len -= tmp_len;
903 buf += tmp_len;
904 tmp_off = 0;
905 }
906 }
907
908 int NVMEDevice::aio_write(
909 uint64_t off,
910 bufferlist &bl,
911 IOContext *ioc,
912 bool buffered,
913 int write_hint)
914 {
915 uint64_t len = bl.length();
916 dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc
917 << " buffered " << buffered << dendl;
918 ceph_assert(is_valid_io(off, len));
919
920 write_split(this, off, bl, ioc);
921 dout(5) << __func__ << " " << off << "~" << len << dendl;
922
923 return 0;
924 }
925
926 int NVMEDevice::write(uint64_t off, bufferlist &bl, bool buffered, int write_hint)
927 {
928 uint64_t len = bl.length();
929 dout(20) << __func__ << " " << off << "~" << len << " buffered "
930 << buffered << dendl;
931 ceph_assert(off % block_size == 0);
932 ceph_assert(len % block_size == 0);
933 ceph_assert(len > 0);
934 ceph_assert(off < size);
935 ceph_assert(off + len <= size);
936
937 IOContext ioc(cct, NULL);
938 write_split(this, off, bl, &ioc);
939 dout(5) << __func__ << " " << off << "~" << len << dendl;
940 aio_submit(&ioc);
941 ioc.aio_wait();
942 return 0;
943 }
944
945 int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
946 IOContext *ioc,
947 bool buffered)
948 {
949 dout(5) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
950 ceph_assert(is_valid_io(off, len));
951
952 Task t(this, IOCommand::READ_COMMAND, off, len, 1);
953 bufferptr p = buffer::create_small_page_aligned(len);
954 char *buf = p.c_str();
955
956 ceph_assert(ioc->nvme_task_first == nullptr);
957 ceph_assert(ioc->nvme_task_last == nullptr);
958 make_read_tasks(this, off, ioc, buf, len, &t, off, len);
959 dout(5) << __func__ << " " << off << "~" << len << dendl;
960 aio_submit(ioc);
961
962 pbl->push_back(std::move(p));
963 return t.return_code;
964 }
965
966 int NVMEDevice::aio_read(
967 uint64_t off,
968 uint64_t len,
969 bufferlist *pbl,
970 IOContext *ioc)
971 {
972 dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
973 ceph_assert(is_valid_io(off, len));
974 bufferptr p = buffer::create_small_page_aligned(len);
975 pbl->append(p);
976 char* buf = p.c_str();
977
978 make_read_tasks(this, off, ioc, buf, len, NULL, off, len);
979 dout(5) << __func__ << " " << off << "~" << len << dendl;
980 return 0;
981 }
982
983 int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
984 {
985 ceph_assert(len > 0);
986 ceph_assert(off < size);
987 ceph_assert(off + len <= size);
988
989 uint64_t aligned_off = p2align(off, block_size);
990 uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off;
991 dout(5) << __func__ << " " << off << "~" << len
992 << " aligned " << aligned_off << "~" << aligned_len << dendl;
993 IOContext ioc(g_ceph_context, nullptr);
994 Task t(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1);
995
996 make_read_tasks(this, aligned_off, &ioc, buf, aligned_len, &t, off, len);
997 aio_submit(&ioc);
998
999 return t.return_code;
1000 }
1001
1002 int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len)
1003 {
1004 dout(5) << __func__ << " " << off << "~" << len << dendl;
1005 return 0;
1006 }