#include <unistd.h>
#include <stdlib.h>
+#include <strings.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <chrono>
+#include <fstream>
#include <functional>
#include <map>
#include <thread>
-#include <xmmintrin.h>
#include <spdk/nvme.h>
-#include <rte_lcore.h>
-
+#include "include/intarith.h"
#include "include/stringify.h"
#include "include/types.h"
#include "include/compat.h"
-#include "common/align.h"
#include "common/errno.h"
#include "common/debug.h"
#include "common/perf_counters.h"
-#include "common/io_priority.h"
#include "NVMEDevice.h"
#undef dout_prefix
#define dout_prefix *_dout << "bdev(" << sn << ") "
-static constexpr uint16_t data_buffer_default_num = 2048;
+thread_local SharedDriverQueueData *queue_t;
+
+static constexpr uint16_t data_buffer_default_num = 1024;
static constexpr uint32_t data_buffer_size = 8192;
static constexpr uint16_t inline_segment_num = 32;
-static thread_local int queue_id = -1;
-
enum {
l_bluestore_nvmedevice_first = 632430,
- l_bluestore_nvmedevice_aio_write_lat,
+ l_bluestore_nvmedevice_write_lat,
l_bluestore_nvmedevice_read_lat,
l_bluestore_nvmedevice_flush_lat,
- l_bluestore_nvmedevice_aio_write_queue_lat,
+ l_bluestore_nvmedevice_write_queue_lat,
l_bluestore_nvmedevice_read_queue_lat,
l_bluestore_nvmedevice_flush_queue_lat,
l_bluestore_nvmedevice_queue_ops,
static void io_complete(void *t, const struct spdk_nvme_cpl *completion);
-int dpdk_thread_adaptor(void *f)
-{
- (*static_cast<std::function<void ()>*>(f))();
- return 0;
-}
-
-struct IOSegment {
- uint32_t len;
- void *addr;
-};
-
struct IORequest {
uint16_t cur_seg_idx = 0;
uint16_t nseg;
void **extra_segs = nullptr;
};
-class SharedDriverQueueData {
- SharedDriverData *driver;
- spdk_nvme_ctrlr *ctrlr;
- spdk_nvme_ns *ns;
- std::string sn;
- uint64_t block_size;
- uint32_t sector_size;
- uint32_t core_id;
- uint32_t queueid;
- struct spdk_nvme_qpair *qpair;
- std::function<void ()> run_func;
- friend class AioCompletionThread;
-
- bool aio_stop = false;
- void _aio_thread();
- int alloc_buf_from_pool(Task *t, bool write);
-
- std::atomic_bool queue_empty;
- Mutex queue_lock;
- Cond queue_cond;
- std::queue<Task*> task_queue;
-
- Mutex flush_lock;
- Cond flush_cond;
- std::atomic_int flush_waiters;
- std::set<uint64_t> flush_waiter_seqs;
-
- public:
- std::atomic_ulong completed_op_seq, queue_op_seq;
- std::vector<void*> data_buf_mempool;
- PerfCounters *logger = nullptr;
-
- SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size,
- const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id)
- : driver(driver),
- ctrlr(c),
- ns(ns),
- sn(sn_tag),
- block_size(block_size),
- sector_size(sector_size),
- core_id(core),
- queueid(queue_id),
- run_func([this]() { _aio_thread(); }),
- queue_empty(false),
- queue_lock("NVMEDevice::queue_lock"),
- flush_lock("NVMEDevice::flush_lock"),
- flush_waiters(0),
- completed_op_seq(0), queue_op_seq(0) {
-
- qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, SPDK_NVME_QPRIO_URGENT);
- PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
- l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
- b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency");
- b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
- b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
- b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
- b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
- b.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat, "aio_write_queue_lat", "Average queue write request latency");
- b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
- b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
- b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
- logger = b.create_perf_counters();
- g_ceph_context->get_perfcounters_collection()->add(logger);
- }
-
- void queue_task(Task *t, uint64_t ops = 1) {
- queue_op_seq += ops;
- Mutex::Locker l(queue_lock);
- task_queue.push(t);
- if (queue_empty.load()) {
- queue_empty = false;
- queue_cond.Signal();
- }
- }
-
- void flush_wait() {
- uint64_t cur_seq = queue_op_seq.load();
- uint64_t left = cur_seq - completed_op_seq.load();
- if (cur_seq > completed_op_seq) {
- // TODO: this may contains read op
- dout(10) << __func__ << " existed inflight ops " << left << dendl;
- Mutex::Locker l(flush_lock);
- ++flush_waiters;
- flush_waiter_seqs.insert(cur_seq);
- while (cur_seq > completed_op_seq.load()) {
- flush_cond.Wait(flush_lock);
- }
- flush_waiter_seqs.erase(cur_seq);
- --flush_waiters;
- }
- }
-
- void start() {
- int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
- core_id);
- assert(r == 0);
-
- }
-
- void stop() {
- {
- Mutex::Locker l(queue_lock);
- aio_stop = true;
- queue_cond.Signal();
- }
- int r = rte_eal_wait_lcore(core_id);
- assert(r == 0);
- aio_stop = false;
- }
-
- ~SharedDriverQueueData() {
- g_ceph_context->get_perfcounters_collection()->remove(logger);
- if (!qpair) {
- spdk_nvme_ctrlr_free_io_qpair(qpair);
- }
- delete logger;
- }
-};
+struct Task;
class SharedDriverData {
unsigned id;
- uint32_t core_id;
-
- std::string sn;
+ spdk_nvme_transport_id trid;
spdk_nvme_ctrlr *ctrlr;
spdk_nvme_ns *ns;
- uint64_t block_size = 0;
- uint32_t sector_size = 0;
+ uint32_t block_size = 0;
uint64_t size = 0;
- uint32_t queue_number;
- std::vector<SharedDriverQueueData*> queues;
-
- void _aio_start() {
- for (auto &&it : queues)
- it->start();
- }
- void _aio_stop() {
- for (auto &&it : queues)
- it->stop();
- }
public:
std::vector<NVMEDevice*> registered_devices;
- SharedDriverData(unsigned _id, const std::string &sn_tag,
- spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
- : id(_id),
- sn(sn_tag),
+ friend class SharedDriverQueueData;
+ SharedDriverData(unsigned id_, const spdk_nvme_transport_id& trid_,
+ spdk_nvme_ctrlr *c, spdk_nvme_ns *ns_)
+ : id(id_),
+ trid(trid_),
ctrlr(c),
- ns(ns) {
- int i;
- sector_size = spdk_nvme_ns_get_sector_size(ns);
- block_size = std::max(CEPH_PAGE_SIZE, sector_size);
- size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
-
- RTE_LCORE_FOREACH_SLAVE(i) {
- queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++));
- }
-
- _aio_start();
+ ns(ns_) {
+ block_size = spdk_nvme_ns_get_extended_sector_size(ns);
+ size = spdk_nvme_ns_get_size(ns);
}
- bool is_equal(const string &tag) const { return sn == tag; }
- ~SharedDriverData() {
- for (auto p : queues) {
- delete p;
- }
+ bool is_equal(const spdk_nvme_transport_id& trid2) const {
+ return spdk_nvme_transport_id_compare(&trid, &trid2) == 0;
}
-
- SharedDriverQueueData *get_queue(uint32_t i) {
- return queues.at(i%queue_number);
+ ~SharedDriverData() {
}
void register_device(NVMEDevice *device) {
- // in case of registered_devices, we stop thread now.
- // Because release is really a rare case, we could bear this
- _aio_stop();
registered_devices.push_back(device);
- _aio_start();
}
void remove_device(NVMEDevice *device) {
- _aio_stop();
std::vector<NVMEDevice*> new_devices;
for (auto &&it : registered_devices) {
if (it != device)
new_devices.push_back(it);
}
registered_devices.swap(new_devices);
- _aio_start();
}
- uint64_t get_block_size() {
+ uint32_t get_block_size() {
return block_size;
}
uint64_t get_size() {
}
};
+class SharedDriverQueueData {
+ NVMEDevice *bdev;
+ SharedDriverData *driver;
+ spdk_nvme_ctrlr *ctrlr;
+ spdk_nvme_ns *ns;
+ std::string sn;
+ uint32_t block_size;
+ uint32_t max_queue_depth;
+ struct spdk_nvme_qpair *qpair;
+ bool reap_io = false;
+ int alloc_buf_from_pool(Task *t, bool write);
+
+ public:
+ uint32_t current_queue_depth = 0;
+ std::atomic_ulong completed_op_seq, queue_op_seq;
+ std::vector<void*> data_buf_mempool;
+ PerfCounters *logger = nullptr;
+ void _aio_handle(Task *t, IOContext *ioc);
+
+ SharedDriverQueueData(NVMEDevice *bdev, SharedDriverData *driver)
+ : bdev(bdev),
+ driver(driver) {
+ ctrlr = driver->ctrlr;
+ ns = driver->ns;
+ block_size = driver->block_size;
+
+ struct spdk_nvme_io_qpair_opts opts = {};
+ spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts));
+ opts.qprio = SPDK_NVME_QPRIO_URGENT;
+ // usable queue depth should minus 1 to aovid overflow.
+ max_queue_depth = opts.io_queue_size - 1;
+ qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, &opts, sizeof(opts));
+ ceph_assert(qpair != NULL);
+
+ // allocate spdk dma memory
+ for (uint16_t i = 0; i < data_buffer_default_num; i++) {
+ void *b = spdk_dma_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
+ if (!b) {
+ derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
+ ceph_assert(b);
+ }
+ data_buf_mempool.push_back(b);
+ }
+
+ PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
+ l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
+ b.add_time_avg(l_bluestore_nvmedevice_write_lat, "write_lat", "Average write completing latency");
+ b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
+ b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
+ b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
+ b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
+ b.add_time_avg(l_bluestore_nvmedevice_write_queue_lat, "write_queue_lat", "Average queue write request latency");
+ b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
+ b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
+ b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
+ logger = b.create_perf_counters();
+ g_ceph_context->get_perfcounters_collection()->add(logger);
+ bdev->queue_number++;
+ if (bdev->queue_number.load() == 1)
+ reap_io = true;
+ }
+
+ ~SharedDriverQueueData() {
+ g_ceph_context->get_perfcounters_collection()->remove(logger);
+ if (qpair) {
+ spdk_nvme_ctrlr_free_io_qpair(qpair);
+ bdev->queue_number--;
+ }
+
+ // free all spdk dma memory;
+ if (!data_buf_mempool.empty()) {
+ for (uint16_t i = 0; i < data_buffer_default_num; i++) {
+ void *b = data_buf_mempool[i];
+ ceph_assert(b);
+ spdk_dma_free(b);
+ }
+ data_buf_mempool.clear();
+ }
+
+ delete logger;
+ }
+};
+
struct Task {
NVMEDevice *device;
IOContext *ctx = nullptr;
IOCommand command;
uint64_t offset;
uint64_t len;
- bufferlist write_bl;
+ bufferlist bl;
std::function<void()> fill_cb;
Task *next = nullptr;
int64_t return_code;
+ Task *primary = nullptr;
ceph::coarse_real_clock::time_point start;
- IORequest io_request;
- std::mutex lock;
- std::condition_variable cond;
- SharedDriverQueueData *queue;
- Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0)
+ IORequest io_request = {};
+ ceph::mutex lock = ceph::make_mutex("Task::lock");
+ ceph::condition_variable cond;
+ SharedDriverQueueData *queue = nullptr;
+ // reference count by subtasks.
+ int ref = 0;
+ Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0,
+ Task *p = nullptr)
: device(dev), command(c), offset(off), len(l),
- return_code(rc),
- start(ceph::coarse_real_clock::now()) {}
+ return_code(rc), primary(p),
+ start(ceph::coarse_real_clock::now()) {
+ if (primary) {
+ primary->ref++;
+ return_code = primary->return_code;
+ }
+ }
~Task() {
- assert(!io_request.nseg);
+ if (primary)
+ primary->ref--;
+ ceph_assert(!io_request.nseg);
}
void release_segs(SharedDriverQueueData *queue_data) {
if (io_request.extra_segs) {
for (uint16_t i = 0; i < io_request.nseg; i++)
queue_data->data_buf_mempool.push_back(io_request.inline_segs[i]);
}
+ ctx->total_nseg -= io_request.nseg;
io_request.nseg = 0;
}
copied += need_copy;
}
}
-
- void io_wait() {
- std::unique_lock<std::mutex> l(lock);
- cond.wait(l);
- }
-
- void io_wake() {
- std::lock_guard<std::mutex> l(lock);
- cond.notify_all();
- }
};
static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
Task *t = static_cast<Task*>(cb_arg);
uint32_t i = sgl_offset / data_buffer_size;
uint32_t offset = i * data_buffer_size;
- assert(i <= t->io_request.nseg);
+ ceph_assert(i <= t->io_request.nseg);
for (; i < t->io_request.nseg; i++) {
offset += data_buffer_size;
data_buf_mempool.pop_back();
}
t->io_request.nseg = count;
+ t->ctx->total_nseg += count;
if (write) {
- auto blp = t->write_bl.begin();
+ auto blp = t->bl.begin();
uint32_t len = 0;
uint16_t i = 0;
for (; i < count - 1; ++i) {
blp.copy(data_buffer_size, static_cast<char*>(segs[i]));
len += data_buffer_size;
}
- blp.copy(t->write_bl.length() - len, static_cast<char*>(segs[i]));
+ blp.copy(t->bl.length() - len, static_cast<char*>(segs[i]));
}
return 0;
}
-void SharedDriverQueueData::_aio_thread()
+void SharedDriverQueueData::_aio_handle(Task *t, IOContext *ioc)
{
- dout(1) << __func__ << " start" << dendl;
+ dout(20) << __func__ << " start" << dendl;
- if (data_buf_mempool.empty()) {
- for (uint16_t i = 0; i < data_buffer_default_num; i++) {
- void *b = spdk_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
- if (!b) {
- derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
- assert(b);
- }
- data_buf_mempool.push_back(b);
- }
- }
-
- Task *t = nullptr;
int r = 0;
uint64_t lba_off, lba_count;
+ uint32_t max_io_completion = (uint32_t)g_conf().get_val<uint64_t>("bluestore_spdk_max_io_completion");
+ uint64_t io_sleep_in_us = g_conf().get_val<uint64_t>("bluestore_spdk_io_sleep");
ceph::coarse_real_clock::time_point cur, start
= ceph::coarse_real_clock::now();
- while (true) {
- bool inflight = queue_op_seq.load() - completed_op_seq.load();
+ while (ioc->num_running) {
again:
dout(40) << __func__ << " polling" << dendl;
- if (inflight) {
- if (!spdk_nvme_qpair_process_completions(qpair, g_conf->bluestore_spdk_max_io_completion)) {
- dout(30) << __func__ << " idle, have a pause" << dendl;
- _mm_pause();
+ if (current_queue_depth) {
+ r = spdk_nvme_qpair_process_completions(qpair, max_io_completion);
+ if (r < 0) {
+ ceph_abort();
+ } else if (r == 0) {
+ usleep(io_sleep_in_us);
}
}
for (; t; t = t->next) {
+ if (current_queue_depth == max_queue_depth) {
+ // no slots
+ goto again;
+ }
+
t->queue = this;
- lba_off = t->offset / sector_size;
- lba_count = t->len / sector_size;
+ lba_off = t->offset / block_size;
+ lba_count = t->len / block_size;
switch (t->command) {
case IOCommand::WRITE_COMMAND:
{
ns, qpair, lba_off, lba_count, io_complete, t, 0,
data_buf_reset_sgl, data_buf_next_sge);
if (r < 0) {
- derr << __func__ << " failed to do write command" << dendl;
+ derr << __func__ << " failed to do write command: " << cpp_strerror(r) << dendl;
t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
t->release_segs(this);
delete t;
ceph_abort();
}
cur = ceph::coarse_real_clock::now();
- auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
- logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, dur);
+ auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - t->start);
+ logger->tinc(l_bluestore_nvmedevice_write_queue_lat, dur);
break;
}
case IOCommand::READ_COMMAND:
ns, qpair, lba_off, lba_count, io_complete, t, 0,
data_buf_reset_sgl, data_buf_next_sge);
if (r < 0) {
- derr << __func__ << " failed to read" << dendl;
+ derr << __func__ << " failed to read: " << cpp_strerror(r) << dendl;
t->release_segs(this);
delete t;
ceph_abort();
} else {
cur = ceph::coarse_real_clock::now();
- auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
+ auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - t->start);
logger->tinc(l_bluestore_nvmedevice_read_queue_lat, dur);
}
break;
dout(20) << __func__ << " flush command issueed " << dendl;
r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t);
if (r < 0) {
- derr << __func__ << " failed to flush" << dendl;
+ derr << __func__ << " failed to flush: " << cpp_strerror(r) << dendl;
t->release_segs(this);
delete t;
ceph_abort();
} else {
cur = ceph::coarse_real_clock::now();
- auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
+ auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - t->start);
logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, dur);
}
break;
}
}
+ current_queue_depth++;
}
-
- if (!queue_empty.load()) {
- Mutex::Locker l(queue_lock);
- if (!task_queue.empty()) {
- t = task_queue.front();
- task_queue.pop();
- logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size());
- }
- if (!t)
- queue_empty = true;
- } else {
- if (flush_waiters.load()) {
- Mutex::Locker l(flush_lock);
- if (*flush_waiter_seqs.begin() <= completed_op_seq.load())
- flush_cond.Signal();
- }
-
- if (!inflight) {
- // be careful, here we need to let each thread reap its own, currently it is done
- // by only one dedicatd dpdk thread
- if(!queueid) {
- for (auto &&it : driver->registered_devices)
- it->reap_ioc();
- }
-
- Mutex::Locker l(queue_lock);
- if (queue_empty.load()) {
- cur = ceph::coarse_real_clock::now();
- auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
- logger->tinc(l_bluestore_nvmedevice_polling_lat, dur);
- if (aio_stop)
- break;
- queue_cond.Wait(queue_lock);
- start = ceph::coarse_real_clock::now();
- }
- }
- }
+ cur = ceph::coarse_real_clock::now();
+ auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
+ logger->tinc(l_bluestore_nvmedevice_polling_lat, dur);
+ start = ceph::coarse_real_clock::now();
}
- assert(data_buf_mempool.size() == data_buffer_default_num);
- dout(1) << __func__ << " end" << dendl;
+
+ if (reap_io)
+ bdev->reap_ioc();
+ dout(20) << __func__ << " end" << dendl;
}
#define dout_subsys ceph_subsys_bdev
class NVMEManager {
public:
struct ProbeContext {
- string sn_tag;
+ spdk_nvme_transport_id trid;
NVMEManager *manager;
SharedDriverData *driver;
bool done;
};
private:
- Mutex lock;
- bool init = false;
+ ceph::mutex lock = ceph::make_mutex("NVMEManager::lock");
+ bool stopping = false;
std::vector<SharedDriverData*> shared_driver_datas;
std::thread dpdk_thread;
- std::mutex probe_queue_lock;
- std::condition_variable probe_queue_cond;
+ ceph::mutex probe_queue_lock = ceph::make_mutex("NVMEManager::probe_queue_lock");
+ ceph::condition_variable probe_queue_cond;
std::list<ProbeContext*> probe_queue;
public:
- NVMEManager()
- : lock("NVMEDevice::NVMEManager::lock") {}
- int try_get(const string &sn_tag, SharedDriverData **driver);
- void register_ctrlr(const string &sn_tag, spdk_nvme_ctrlr *c, struct spdk_pci_device *pci_dev,
- SharedDriverData **driver) {
- assert(lock.is_locked());
+ NVMEManager() {}
+ ~NVMEManager() {
+ if (!dpdk_thread.joinable())
+ return;
+ {
+ std::lock_guard guard(probe_queue_lock);
+ stopping = true;
+ probe_queue_cond.notify_all();
+ }
+ dpdk_thread.join();
+ }
+
+ int try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver);
+ void register_ctrlr(const spdk_nvme_transport_id& trid, spdk_nvme_ctrlr *c, SharedDriverData **driver) {
+ ceph_assert(ceph_mutex_is_locked(lock));
spdk_nvme_ns *ns;
int num_ns = spdk_nvme_ctrlr_get_num_ns(c);
- assert(num_ns >= 1);
+ ceph_assert(num_ns >= 1);
if (num_ns > 1) {
dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl;
}
derr << __func__ << " failed to get namespace at 1" << dendl;
ceph_abort();
}
- dout(1) << __func__ << " successfully attach nvme device at" << spdk_pci_device_get_bus(pci_dev)
- << ":" << spdk_pci_device_get_dev(pci_dev) << ":" << spdk_pci_device_get_func(pci_dev) << dendl;
+ dout(1) << __func__ << " successfully attach nvme device at" << trid.traddr << dendl;
// only support one device per osd now!
- assert(shared_driver_datas.empty());
- // index 0 is occured by master thread
- shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, c, ns));
+ ceph_assert(shared_driver_datas.empty());
+ // index 0 is occurred by master thread
+ shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, trid, c, ns));
*driver = shared_driver_datas.back();
}
};
static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts)
{
NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
- char serial_number[128];
- struct spdk_pci_addr pci_addr;
- struct spdk_pci_device *pci_dev = NULL;
- int result = 0;
if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) {
dout(0) << __func__ << " only probe local nvme device" << dendl;
return false;
}
- result = spdk_pci_addr_parse(&pci_addr, trid->traddr);
- if (result) {
- dout(0) << __func__ << " failed to get pci address from %s, " << trid->traddr << " return value is: %d" << result << dendl;
- return false;
- }
-
- pci_dev = spdk_pci_get_device(&pci_addr);
- if (!pci_dev) {
- dout(0) << __func__ << " failed to get pci device" << dendl;
- return false;
- }
-
- dout(0) << __func__ << " found device at bus: " << spdk_pci_device_get_bus(pci_dev)
- << ":" << spdk_pci_device_get_dev(pci_dev) << ":"
- << spdk_pci_device_get_func(pci_dev) << " vendor:0x" << spdk_pci_device_get_vendor_id(pci_dev) << " device:0x" << spdk_pci_device_get_device_id(pci_dev)
- << dendl;
- result = spdk_pci_device_get_serial_number(pci_dev, serial_number, 128);
- if (result < 0) {
- dout(10) << __func__ << " failed to get serial number from %p" << pci_dev << dendl;
- return false;
- }
-
- if (ctx->sn_tag.compare(string(serial_number, 16))) {
- dout(0) << __func__ << " device serial number (" << ctx->sn_tag << ") not match " << serial_number << dendl;
+ dout(0) << __func__ << " found device at: "
+ << "trtype=" << spdk_nvme_transport_id_trtype_str(trid->trtype) << ", "
+ << "traddr=" << trid->traddr << dendl;
+ if (spdk_nvme_transport_id_compare(&ctx->trid, trid)) {
+ dout(0) << __func__ << " device traddr (" << ctx->trid.traddr << ") not match " << trid->traddr << dendl;
return false;
}
static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
{
- struct spdk_pci_addr pci_addr;
- struct spdk_pci_device *pci_dev = NULL;
-
- spdk_pci_addr_parse(&pci_addr, trid->traddr);
-
- pci_dev = spdk_pci_get_device(&pci_addr);
- if (!pci_dev) {
- dout(0) << __func__ << " failed to get pci device" << dendl;
- assert(pci_dev);
- }
-
- NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
- ctx->manager->register_ctrlr(ctx->sn_tag, ctrlr, pci_dev, &ctx->driver);
+ auto ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
+ ctx->manager->register_ctrlr(ctx->trid, ctrlr, &ctx->driver);
}
-int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver)
+static int hex2dec(unsigned char c)
{
- Mutex::Locker l(lock);
- int r = 0;
- unsigned long long core_value;
- uint32_t core_num = 0;
- int m_core_arg = -1;
- uint32_t mem_size_arg = g_conf->bluestore_spdk_mem;
- char *coremask_arg = (char *)g_conf->bluestore_spdk_coremask.c_str();
-
- if (sn_tag.empty()) {
- r = -ENOENT;
- derr << __func__ << " empty serial number: " << cpp_strerror(r) << dendl;
- return r;
- }
+ if (isdigit(c))
+ return c - '0';
+ else if (isupper(c))
+ return c - 'A' + 10;
+ else
+ return c - 'a' + 10;
+}
- core_value = strtoll(coremask_arg, NULL, 16);
- for (uint32_t i = 0; i < sizeof(long long) * 8; i++) {
- bool tmp = (core_value >> i) & 0x1;
- if (tmp) {
- core_num += 1;
- // select the least signficant bit as the master core
- if(m_core_arg < 0) {
- m_core_arg = i;
- }
+static int find_first_bitset(const string& s)
+{
+ auto e = s.rend();
+ if (s.compare(0, 2, "0x") == 0 ||
+ s.compare(0, 2, "0X") == 0) {
+ advance(e, -2);
+ }
+ auto p = s.rbegin();
+ for (int pos = 0; p != e; ++p, pos += 4) {
+ if (!isxdigit(*p)) {
+ return -EINVAL;
+ }
+ if (int val = hex2dec(*p); val != 0) {
+ return pos + ffs(val);
}
}
+ return 0;
+}
- // at least two cores are needed for using spdk
- if (core_num < 2) {
- r = -ENOENT;
- derr << __func__ << " invalid spdk coremask, at least two cores are needed: "
- << cpp_strerror(r) << dendl;
- return r;
- }
-
+int NVMEManager::try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver)
+{
+ std::lock_guard l(lock);
for (auto &&it : shared_driver_datas) {
- if (it->is_equal(sn_tag)) {
+ if (it->is_equal(trid)) {
*driver = it;
return 0;
}
}
- if (!init) {
- init = true;
+ auto coremask_arg = g_conf().get_val<std::string>("bluestore_spdk_coremask");
+ int m_core_arg = find_first_bitset(coremask_arg);
+ // at least one core is needed for using spdk
+ if (m_core_arg <= 0) {
+ derr << __func__ << " invalid bluestore_spdk_coremask, "
+ << "at least one core is needed" << dendl;
+ return -ENOENT;
+ }
+ m_core_arg -= 1;
+
+ uint32_t mem_size_arg = (uint32_t)g_conf().get_val<Option::size_t>("bluestore_spdk_mem");
+
+ if (!dpdk_thread.joinable()) {
dpdk_thread = std::thread(
[this, coremask_arg, m_core_arg, mem_size_arg]() {
static struct spdk_env_opts opts;
int r;
spdk_env_opts_init(&opts);
- opts.name = "ceph-osd";
- opts.core_mask = coremask_arg;
- opts.dpdk_master_core = m_core_arg;
- opts.dpdk_mem_size = mem_size_arg;
+ opts.name = "nvme-device-manager";
+ opts.core_mask = coremask_arg.c_str();
+ opts.master_core = m_core_arg;
+ opts.mem_size = mem_size_arg;
spdk_env_init(&opts);
+ spdk_unaffinitize_thread();
spdk_nvme_retry_count = g_ceph_context->_conf->bdev_nvme_retry_count;
if (spdk_nvme_retry_count < 0)
spdk_nvme_retry_count = SPDK_NVME_DEFAULT_RETRY_COUNT;
- std::unique_lock<std::mutex> l(probe_queue_lock);
- while (true) {
+ std::unique_lock l(probe_queue_lock);
+ while (!stopping) {
if (!probe_queue.empty()) {
ProbeContext* ctxt = probe_queue.front();
probe_queue.pop_front();
r = spdk_nvme_probe(NULL, ctxt, probe_cb, attach_cb, NULL);
if (r < 0) {
- assert(!ctxt->driver);
+ ceph_assert(!ctxt->driver);
derr << __func__ << " device probe nvme failed" << dendl;
}
ctxt->done = true;
probe_queue_cond.wait(l);
}
}
+ for (auto p : probe_queue)
+ p->done = true;
+ probe_queue_cond.notify_all();
}
);
- dpdk_thread.detach();
}
- ProbeContext ctx = {sn_tag, this, nullptr, false};
+ ProbeContext ctx{trid, this, nullptr, false};
{
- std::unique_lock<std::mutex> l(probe_queue_lock);
+ std::unique_lock l(probe_queue_lock);
probe_queue.push_back(&ctx);
while (!ctx.done)
probe_queue_cond.wait(l);
IOContext *ctx = task->ctx;
SharedDriverQueueData *queue = task->queue;
- assert(queue != NULL);
- assert(ctx != NULL);
- ++queue->completed_op_seq;
+ ceph_assert(queue != NULL);
+ ceph_assert(ctx != NULL);
+ --queue->current_queue_depth;
auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
ceph::coarse_real_clock::now() - task->start);
if (task->command == IOCommand::WRITE_COMMAND) {
- queue->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, dur);
- assert(!spdk_nvme_cpl_is_error(completion));
+ queue->logger->tinc(l_bluestore_nvmedevice_write_lat, dur);
+ ceph_assert(!spdk_nvme_cpl_is_error(completion));
dout(20) << __func__ << " write/zero op successfully, left "
<< queue->queue_op_seq - queue->completed_op_seq << dendl;
// check waiting count before doing callback (which may
delete task;
} else if (task->command == IOCommand::READ_COMMAND) {
queue->logger->tinc(l_bluestore_nvmedevice_read_lat, dur);
- assert(!spdk_nvme_cpl_is_error(completion));
+ ceph_assert(!spdk_nvme_cpl_is_error(completion));
dout(20) << __func__ << " read op successfully" << dendl;
task->fill_cb();
task->release_segs(queue);
// read submitted by AIO
- if(!task->return_code) {
+ if (!task->return_code) {
if (ctx->priv) {
if (!--ctx->num_running) {
task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
}
} else {
- ctx->try_aio_wake();
+ ctx->try_aio_wake();
}
delete task;
} else {
- task->return_code = 0;
- if (!--ctx->num_running) {
- task->io_wake();
+ if (Task* primary = task->primary; primary != nullptr) {
+ delete task;
+ if (!primary->ref)
+ primary->return_code = 0;
+ } else {
+ task->return_code = 0;
}
+ --ctx->num_running;
}
} else {
- assert(task->command == IOCommand::FLUSH_COMMAND);
- assert(!spdk_nvme_cpl_is_error(completion));
+ ceph_assert(task->command == IOCommand::FLUSH_COMMAND);
+ ceph_assert(!spdk_nvme_cpl_is_error(completion));
queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
dout(20) << __func__ << " flush op successfully" << dendl;
task->return_code = 0;
#define dout_prefix *_dout << "bdev(" << name << ") "
NVMEDevice::NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
- : BlockDevice(cct),
- driver(nullptr),
- size(0),
- block_size(0),
- aio_stop(false),
- buffer_lock("NVMEDevice::buffer_lock"),
- aio_callback(cb),
- aio_callback_priv(cbpriv)
+ : BlockDevice(cct, cb, cbpriv),
+ driver(nullptr)
{
}
-
int NVMEDevice::open(const string& p)
{
- int r = 0;
dout(1) << __func__ << " path " << p << dendl;
- string serial_number;
- int fd = ::open(p.c_str(), O_RDONLY);
- if (fd < 0) {
- r = -errno;
- derr << __func__ << " unable to open " << p << ": " << cpp_strerror(r)
- << dendl;
- return r;
+ std::ifstream ifs(p);
+ if (!ifs) {
+ derr << __func__ << " unable to open " << p << dendl;
+ return -1;
}
- char buf[100];
- r = ::read(fd, buf, sizeof(buf));
- VOID_TEMP_FAILURE_RETRY(::close(fd));
- fd = -1; // defensive
- if (r <= 0) {
- if (r == 0) {
- r = -EINVAL;
- } else {
- r = -errno;
- }
- derr << __func__ << " unable to read " << p << ": " << cpp_strerror(r) << dendl;
+ string val;
+ std::getline(ifs, val);
+ spdk_nvme_transport_id trid;
+ if (int r = spdk_nvme_transport_id_parse(&trid, val.c_str()); r) {
+ derr << __func__ << " unable to read " << p << ": " << cpp_strerror(r)
+ << dendl;
return r;
}
- /* scan buf from the beginning with isxdigit. */
- int i = 0;
- while (i < r && isxdigit(buf[i])) {
- i++;
- }
- serial_number = string(buf, i);
- r = manager.try_get(serial_number, &driver);
- if (r < 0) {
- derr << __func__ << " failed to get nvme device with sn " << serial_number << dendl;
+ if (int r = manager.try_get(trid, &driver); r < 0) {
+ derr << __func__ << " failed to get nvme device with transport address " << trid.traddr << dendl;
return r;
}
driver->register_device(this);
block_size = driver->get_block_size();
size = driver->get_size();
- name = serial_number;
+ name = trid.traddr;
//nvme is non-rotational device.
rotational = false;
<< " block_size " << block_size << " (" << byte_u_t(block_size)
<< ")" << dendl;
+
return 0;
}
dout(1) << __func__ << " end" << dendl;
}
-int NVMEDevice::collect_metadata(string prefix, map<string,string> *pm) const
+int NVMEDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
{
(*pm)[prefix + "rotational"] = "0";
(*pm)[prefix + "size"] = stringify(get_size());
int NVMEDevice::flush()
{
- dout(10) << __func__ << " start" << dendl;
- auto start = ceph::coarse_real_clock::now();
-
- if(queue_id == -1)
- queue_id = ceph_gettid();
- SharedDriverQueueData *queue = driver->get_queue(queue_id);
- assert(queue != NULL);
- queue->flush_wait();
- auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
- ceph::coarse_real_clock::now() - start);
- queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
return 0;
}
if (pending && t) {
ioc->num_running += pending;
ioc->num_pending -= pending;
- assert(ioc->num_pending.load() == 0); // we should be only thread doing this
+ ceph_assert(ioc->num_pending.load() == 0); // we should be only thread doing this
// Only need to push the first entry
- if(queue_id == -1)
- queue_id = ceph_gettid();
- driver->get_queue(queue_id)->queue_task(t, pending);
ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
+ if (!queue_t)
+ queue_t = new SharedDriverQueueData(this, driver);
+ queue_t->_aio_handle(t, ioc);
+ }
+}
+
+static void ioc_append_task(IOContext *ioc, Task *t)
+{
+ Task *first, *last;
+
+ first = static_cast<Task*>(ioc->nvme_task_first);
+ last = static_cast<Task*>(ioc->nvme_task_last);
+ if (last)
+ last->next = t;
+ if (!first)
+ ioc->nvme_task_first = t;
+ ioc->nvme_task_last = t;
+ ++ioc->num_pending;
+}
+
+static void write_split(
+ NVMEDevice *dev,
+ uint64_t off,
+ bufferlist &bl,
+ IOContext *ioc)
+{
+ uint64_t remain_len = bl.length(), begin = 0, write_size;
+ Task *t;
+ // This value may need to be got from configuration later.
+ uint64_t split_size = 131072; // 128KB.
+
+ while (remain_len > 0) {
+ write_size = std::min(remain_len, split_size);
+ t = new Task(dev, IOCommand::WRITE_COMMAND, off + begin, write_size);
+ // TODO: if upper layer alloc memory with known physical address,
+ // we can reduce this copy
+ bl.splice(0, write_size, &t->bl);
+ remain_len -= write_size;
+ t->ctx = ioc;
+ ioc_append_task(ioc, t);
+ begin += write_size;
+ }
+}
+
+static void make_read_tasks(
+ NVMEDevice *dev,
+ uint64_t aligned_off,
+ IOContext *ioc, char *buf, uint64_t aligned_len, Task *primary,
+ uint64_t orig_off, uint64_t orig_len)
+{
+ // This value may need to be got from configuration later.
+ uint64_t split_size = 131072; // 128KB.
+ uint64_t tmp_off = orig_off - aligned_off, remain_orig_len = orig_len;
+ auto begin = aligned_off;
+ const auto aligned_end = begin + aligned_len;
+
+ for (; begin < aligned_end; begin += split_size) {
+ auto read_size = std::min(aligned_end - begin, split_size);
+ auto tmp_len = std::min(remain_orig_len, read_size - tmp_off);
+ Task *t = nullptr;
+
+ if (primary && (aligned_len <= split_size)) {
+ t = primary;
+ } else {
+ t = new Task(dev, IOCommand::READ_COMMAND, begin, read_size, 0, primary);
+ }
+
+ t->ctx = ioc;
+
+ // TODO: if upper layer alloc memory with known physical address,
+ // we can reduce this copy
+ t->fill_cb = [buf, t, tmp_off, tmp_len] {
+ t->copy_to_buf(buf, tmp_off, tmp_len);
+ };
+
+ ioc_append_task(ioc, t);
+ remain_orig_len -= tmp_len;
+ buf += tmp_len;
+ tmp_off = 0;
}
}
uint64_t off,
bufferlist &bl,
IOContext *ioc,
- bool buffered)
+ bool buffered,
+ int write_hint)
{
uint64_t len = bl.length();
dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc
<< " buffered " << buffered << dendl;
- assert(off % block_size == 0);
- assert(len % block_size == 0);
- assert(len > 0);
- assert(off < size);
- assert(off + len <= size);
-
- Task *t = new Task(this, IOCommand::WRITE_COMMAND, off, len);
-
- // TODO: if upper layer alloc memory with known physical address,
- // we can reduce this copy
- t->write_bl = std::move(bl);
-
- if (buffered) {
- // Only need to push the first entry
- if(queue_id == -1)
- queue_id = ceph_gettid();
- driver->get_queue(queue_id)->queue_task(t);
- } else {
- t->ctx = ioc;
- Task *first = static_cast<Task*>(ioc->nvme_task_first);
- Task *last = static_cast<Task*>(ioc->nvme_task_last);
- if (last)
- last->next = t;
- if (!first)
- ioc->nvme_task_first = t;
- ioc->nvme_task_last = t;
- ++ioc->num_pending;
- }
+ ceph_assert(is_valid_io(off, len));
+ write_split(this, off, bl, ioc);
dout(5) << __func__ << " " << off << "~" << len << dendl;
return 0;
}
-int NVMEDevice::write(uint64_t off, bufferlist &bl, bool buffered)
+int NVMEDevice::write(uint64_t off, bufferlist &bl, bool buffered, int write_hint)
{
- // FIXME: there is presumably a more efficient way to do this...
+ uint64_t len = bl.length();
+ dout(20) << __func__ << " " << off << "~" << len << " buffered "
+ << buffered << dendl;
+ ceph_assert(off % block_size == 0);
+ ceph_assert(len % block_size == 0);
+ ceph_assert(len > 0);
+ ceph_assert(off < size);
+ ceph_assert(off + len <= size);
+
IOContext ioc(cct, NULL);
- aio_write(off, bl, &ioc, buffered);
+ write_split(this, off, bl, &ioc);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ aio_submit(&ioc);
ioc.aio_wait();
return 0;
}
bool buffered)
{
dout(5) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
- assert(off % block_size == 0);
- assert(len % block_size == 0);
- assert(len > 0);
- assert(off < size);
- assert(off + len <= size);
-
- Task *t = new Task(this, IOCommand::READ_COMMAND, off, len, 1);
- bufferptr p = buffer::create_page_aligned(len);
- int r = 0;
- t->ctx = ioc;
+ ceph_assert(is_valid_io(off, len));
+
+ Task t(this, IOCommand::READ_COMMAND, off, len, 1);
+ bufferptr p = buffer::create_small_page_aligned(len);
char *buf = p.c_str();
- t->fill_cb = [buf, t]() {
- t->copy_to_buf(buf, 0, t->len);
- };
- ++ioc->num_running;
- if(queue_id == -1)
- queue_id = ceph_gettid();
- driver->get_queue(queue_id)->queue_task(t);
- while(t->return_code > 0) {
- t->io_wait();
- }
+ ceph_assert(ioc->nvme_task_first == nullptr);
+ ceph_assert(ioc->nvme_task_last == nullptr);
+ make_read_tasks(this, off, ioc, buf, len, &t, off, len);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
+ aio_submit(ioc);
+
pbl->push_back(std::move(p));
- r = t->return_code;
- delete t;
- return r;
+ return t.return_code;
}
int NVMEDevice::aio_read(
IOContext *ioc)
{
dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
- assert(off % block_size == 0);
- assert(len % block_size == 0);
- assert(len > 0);
- assert(off < size);
- assert(off + len <= size);
-
- Task *t = new Task(this, IOCommand::READ_COMMAND, off, len);
-
- bufferptr p = buffer::create_page_aligned(len);
+ ceph_assert(is_valid_io(off, len));
+ bufferptr p = buffer::create_small_page_aligned(len);
pbl->append(p);
- t->ctx = ioc;
- char *buf = p.c_str();
- t->fill_cb = [buf, t]() {
- t->copy_to_buf(buf, 0, t->len);
- };
-
- Task *first = static_cast<Task*>(ioc->nvme_task_first);
- Task *last = static_cast<Task*>(ioc->nvme_task_last);
- if (last)
- last->next = t;
- if (!first)
- ioc->nvme_task_first = t;
- ioc->nvme_task_last = t;
- ++ioc->num_pending;
+ char* buf = p.c_str();
+ make_read_tasks(this, off, ioc, buf, len, NULL, off, len);
+ dout(5) << __func__ << " " << off << "~" << len << dendl;
return 0;
}
int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
{
- assert(len > 0);
- assert(off < size);
- assert(off + len <= size);
+ ceph_assert(len > 0);
+ ceph_assert(off < size);
+ ceph_assert(off + len <= size);
- uint64_t aligned_off = align_down(off, block_size);
- uint64_t aligned_len = align_up(off+len, block_size) - aligned_off;
+ uint64_t aligned_off = p2align(off, block_size);
+ uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off;
dout(5) << __func__ << " " << off << "~" << len
<< " aligned " << aligned_off << "~" << aligned_len << dendl;
IOContext ioc(g_ceph_context, nullptr);
- Task *t = new Task(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1);
- int r = 0;
- t->ctx = &ioc;
- t->fill_cb = [buf, t, off, len]() {
- t->copy_to_buf(buf, off-t->offset, len);
- };
- ++ioc.num_running;
- if(queue_id == -1)
- queue_id = ceph_gettid();
- driver->get_queue(queue_id)->queue_task(t);
+ Task t(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1);
- while(t->return_code > 0) {
- t->io_wait();
- }
- r = t->return_code;
- delete t;
- return r;
+ make_read_tasks(this, aligned_off, &ioc, buf, aligned_len, &t, off, len);
+ aio_submit(&ioc);
+
+ return t.return_code;
}
int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len)