]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/os/bluestore/NVMEDevice.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / os / bluestore / NVMEDevice.cc
index 72e5db9aea31dd1654391bc23313df1bd640b97f..dc3745ce40756e51539701526976c2e50bbaf5d1 100644 (file)
 
 #include <unistd.h>
 #include <stdlib.h>
+#include <strings.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <fcntl.h>
 #include <unistd.h>
 
 #include <chrono>
+#include <fstream>
 #include <functional>
 #include <map>
 #include <thread>
-#include <xmmintrin.h>
 
 #include <spdk/nvme.h>
 
-#include <rte_lcore.h>
-
+#include "include/intarith.h"
 #include "include/stringify.h"
 #include "include/types.h"
 #include "include/compat.h"
-#include "common/align.h"
 #include "common/errno.h"
 #include "common/debug.h"
 #include "common/perf_counters.h"
-#include "common/io_priority.h"
 
 #include "NVMEDevice.h"
 
 #undef dout_prefix
 #define dout_prefix *_dout << "bdev(" << sn << ") "
 
-static constexpr uint16_t data_buffer_default_num = 2048;
+thread_local SharedDriverQueueData *queue_t;
+
+static constexpr uint16_t data_buffer_default_num = 1024;
 
 static constexpr uint32_t data_buffer_size = 8192;
 
 static constexpr uint16_t inline_segment_num = 32;
 
-static thread_local int queue_id = -1;
-
 enum {
   l_bluestore_nvmedevice_first = 632430,
-  l_bluestore_nvmedevice_aio_write_lat,
+  l_bluestore_nvmedevice_write_lat,
   l_bluestore_nvmedevice_read_lat,
   l_bluestore_nvmedevice_flush_lat,
-  l_bluestore_nvmedevice_aio_write_queue_lat,
+  l_bluestore_nvmedevice_write_queue_lat,
   l_bluestore_nvmedevice_read_queue_lat,
   l_bluestore_nvmedevice_flush_queue_lat,
   l_bluestore_nvmedevice_queue_ops,
@@ -72,17 +70,6 @@ enum {
 
 static void io_complete(void *t, const struct spdk_nvme_cpl *completion);
 
-int dpdk_thread_adaptor(void *f)
-{
-  (*static_cast<std::function<void ()>*>(f))();
-  return 0;
-}
-
-struct IOSegment {
-  uint32_t len;
-  void *addr;
-};
-
 struct IORequest {
   uint16_t cur_seg_idx = 0;
   uint16_t nseg;
@@ -91,198 +78,49 @@ struct IORequest {
   void **extra_segs = nullptr;
 };
 
-class SharedDriverQueueData {
-  SharedDriverData *driver;
-  spdk_nvme_ctrlr *ctrlr;
-  spdk_nvme_ns *ns;
-  std::string sn;
-  uint64_t block_size;
-  uint32_t sector_size;
-  uint32_t core_id;
-  uint32_t queueid;
-  struct spdk_nvme_qpair *qpair;
-  std::function<void ()> run_func;
-  friend class AioCompletionThread;
-
-  bool aio_stop = false;
-  void _aio_thread();
-  int alloc_buf_from_pool(Task *t, bool write);
-
-  std::atomic_bool queue_empty;
-  Mutex queue_lock;
-  Cond queue_cond;
-  std::queue<Task*> task_queue;
-
-  Mutex flush_lock;
-  Cond flush_cond;
-  std::atomic_int flush_waiters;
-  std::set<uint64_t> flush_waiter_seqs;
-
-  public:
-    std::atomic_ulong completed_op_seq, queue_op_seq;
-    std::vector<void*> data_buf_mempool;
-    PerfCounters *logger = nullptr;
-
-    SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size,
-                          const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id)
-      : driver(driver),
-        ctrlr(c),
-        ns(ns),
-       sn(sn_tag),
-        block_size(block_size),
-        sector_size(sector_size),
-        core_id(core),
-        queueid(queue_id),
-        run_func([this]() { _aio_thread(); }),
-        queue_empty(false),
-        queue_lock("NVMEDevice::queue_lock"),
-        flush_lock("NVMEDevice::flush_lock"),
-        flush_waiters(0),
-        completed_op_seq(0), queue_op_seq(0) {
-
-    qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, SPDK_NVME_QPRIO_URGENT);
-    PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
-                          l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
-    b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency");
-    b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
-    b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
-    b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
-    b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
-    b.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat, "aio_write_queue_lat", "Average queue write request latency");
-    b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
-    b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
-    b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
-    logger = b.create_perf_counters();
-    g_ceph_context->get_perfcounters_collection()->add(logger);
-   }
-
-   void queue_task(Task *t, uint64_t ops = 1) {
-    queue_op_seq += ops;
-    Mutex::Locker l(queue_lock);
-    task_queue.push(t);
-    if (queue_empty.load()) {
-      queue_empty = false;
-      queue_cond.Signal();
-    }
-  }
-
-  void flush_wait() {
-    uint64_t cur_seq = queue_op_seq.load();
-    uint64_t left = cur_seq - completed_op_seq.load();
-    if (cur_seq > completed_op_seq) {
-      // TODO: this may contains read op
-      dout(10) << __func__ << " existed inflight ops " << left << dendl;
-      Mutex::Locker l(flush_lock);
-      ++flush_waiters;
-      flush_waiter_seqs.insert(cur_seq);
-      while (cur_seq > completed_op_seq.load()) {
-       flush_cond.Wait(flush_lock);
-      }
-      flush_waiter_seqs.erase(cur_seq);
-      --flush_waiters;
-    }
-  }
-
-  void start() {
-    int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
-                                  core_id);
-    assert(r == 0);
-
-  }
-
-  void stop() {
-    {
-      Mutex::Locker l(queue_lock);
-      aio_stop = true;
-      queue_cond.Signal();
-    }
-    int r = rte_eal_wait_lcore(core_id);
-    assert(r == 0);
-    aio_stop = false;
-  }
-
-  ~SharedDriverQueueData() {
-    g_ceph_context->get_perfcounters_collection()->remove(logger);
-    if (!qpair) {
-      spdk_nvme_ctrlr_free_io_qpair(qpair); 
-    }
-    delete logger;
-  }
-};
+struct Task;
 
 class SharedDriverData {
   unsigned id;
-  uint32_t core_id;
-
-  std::string sn;
+  spdk_nvme_transport_id trid;
   spdk_nvme_ctrlr *ctrlr;
   spdk_nvme_ns *ns;
-  uint64_t block_size = 0;
-  uint32_t sector_size = 0;
+  uint32_t block_size = 0;
   uint64_t size = 0;
-  uint32_t queue_number;
-  std::vector<SharedDriverQueueData*> queues;
-
-  void _aio_start() {
-     for (auto &&it : queues)
-             it->start();
-  }
-  void _aio_stop() {
-      for (auto &&it : queues)
-             it->stop();
-  }
 
   public:
   std::vector<NVMEDevice*> registered_devices;
-  SharedDriverData(unsigned _id, const std::string &sn_tag,
-                   spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
-      : id(_id),
-        sn(sn_tag),
+  friend class SharedDriverQueueData;
+  SharedDriverData(unsigned id_, const spdk_nvme_transport_id& trid_,
+                   spdk_nvme_ctrlr *c, spdk_nvme_ns *ns_)
+      : id(id_),
+        trid(trid_),
         ctrlr(c),
-        ns(ns) {
-    int i;
-    sector_size = spdk_nvme_ns_get_sector_size(ns);
-    block_size = std::max(CEPH_PAGE_SIZE, sector_size);
-    size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
-
-    RTE_LCORE_FOREACH_SLAVE(i) {
-      queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++));
-   }
-
-   _aio_start();
+        ns(ns_) {
+    block_size = spdk_nvme_ns_get_extended_sector_size(ns);
+    size = spdk_nvme_ns_get_size(ns);
   }
 
-  bool is_equal(const string &tag) const { return sn == tag; }
-  ~SharedDriverData() {
-    for (auto p : queues) {
-      delete p;
-   }
+  bool is_equal(const spdk_nvme_transport_id& trid2) const {
+    return spdk_nvme_transport_id_compare(&trid, &trid2) == 0;
   }
-
-  SharedDriverQueueData *get_queue(uint32_t i) {
-       return queues.at(i%queue_number);
+  ~SharedDriverData() {
   }
 
   void register_device(NVMEDevice *device) {
-    // in case of registered_devices, we stop thread now.
-    // Because release is really a rare case, we could bear this
-    _aio_stop();
     registered_devices.push_back(device);
-    _aio_start();
   }
 
   void remove_device(NVMEDevice *device) {
-    _aio_stop();
     std::vector<NVMEDevice*> new_devices;
     for (auto &&it : registered_devices) {
       if (it != device)
         new_devices.push_back(it);
     }
     registered_devices.swap(new_devices);
-    _aio_start();
   }
 
-  uint64_t get_block_size() {
+  uint32_t get_block_size() {
     return block_size;
   }
   uint64_t get_size() {
@@ -290,27 +128,121 @@ class SharedDriverData {
   }
 };
 
+class SharedDriverQueueData {
+  NVMEDevice *bdev;
+  SharedDriverData *driver;
+  spdk_nvme_ctrlr *ctrlr;
+  spdk_nvme_ns *ns;
+  std::string sn;
+  uint32_t block_size;
+  uint32_t max_queue_depth;
+  struct spdk_nvme_qpair *qpair;
+  bool reap_io = false;
+  int alloc_buf_from_pool(Task *t, bool write);
+
+  public:
+    uint32_t current_queue_depth = 0;
+    std::atomic_ulong completed_op_seq, queue_op_seq;
+    std::vector<void*> data_buf_mempool;
+    PerfCounters *logger = nullptr;
+    void _aio_handle(Task *t, IOContext *ioc);
+
+    SharedDriverQueueData(NVMEDevice *bdev, SharedDriverData *driver)
+      : bdev(bdev),
+        driver(driver) {
+    ctrlr = driver->ctrlr;
+    ns = driver->ns;
+    block_size = driver->block_size;
+
+    struct spdk_nvme_io_qpair_opts opts = {};
+    spdk_nvme_ctrlr_get_default_io_qpair_opts(ctrlr, &opts, sizeof(opts));
+    opts.qprio = SPDK_NVME_QPRIO_URGENT;
+    // usable queue depth should minus 1 to aovid overflow.
+    max_queue_depth = opts.io_queue_size - 1;
+    qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, &opts, sizeof(opts));
+    ceph_assert(qpair != NULL);
+
+    // allocate spdk dma memory
+    for (uint16_t i = 0; i < data_buffer_default_num; i++) {
+      void *b = spdk_dma_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
+      if (!b) {
+        derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
+        ceph_assert(b);
+      }
+      data_buf_mempool.push_back(b);
+    }
+
+    PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
+                          l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
+    b.add_time_avg(l_bluestore_nvmedevice_write_lat, "write_lat", "Average write completing latency");
+    b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
+    b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
+    b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
+    b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
+    b.add_time_avg(l_bluestore_nvmedevice_write_queue_lat, "write_queue_lat", "Average queue write request latency");
+    b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
+    b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
+    b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
+    logger = b.create_perf_counters();
+    g_ceph_context->get_perfcounters_collection()->add(logger);
+    bdev->queue_number++;
+    if (bdev->queue_number.load() == 1)
+      reap_io = true;
+  }
+
+  ~SharedDriverQueueData() {
+    g_ceph_context->get_perfcounters_collection()->remove(logger);
+    if (qpair) {
+      spdk_nvme_ctrlr_free_io_qpair(qpair);
+      bdev->queue_number--;
+    }
+
+    // free all spdk dma memory;
+    if (!data_buf_mempool.empty()) {
+      for (uint16_t i = 0; i < data_buffer_default_num; i++) {
+        void *b = data_buf_mempool[i];
+        ceph_assert(b);
+        spdk_dma_free(b);
+      }
+      data_buf_mempool.clear();
+    }
+
+    delete logger;
+  }
+};
+
 struct Task {
   NVMEDevice *device;
   IOContext *ctx = nullptr;
   IOCommand command;
   uint64_t offset;
   uint64_t len;
-  bufferlist write_bl;
+  bufferlist bl;
   std::function<void()> fill_cb;
   Task *next = nullptr;
   int64_t return_code;
+  Task *primary = nullptr;
   ceph::coarse_real_clock::time_point start;
-  IORequest io_request;
-  std::mutex lock;
-  std::condition_variable cond;
-  SharedDriverQueueData *queue;
-  Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0)
+  IORequest io_request = {};
+  ceph::mutex lock = ceph::make_mutex("Task::lock");
+  ceph::condition_variable cond;
+  SharedDriverQueueData *queue = nullptr;
+  // reference count by subtasks.
+  int ref = 0;
+  Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0,
+       Task *p = nullptr)
     : device(dev), command(c), offset(off), len(l),
-      return_code(rc),
-      start(ceph::coarse_real_clock::now()) {}
+      return_code(rc), primary(p),
+      start(ceph::coarse_real_clock::now()) {
+        if (primary) {
+          primary->ref++;
+          return_code = primary->return_code;
+        }
+     }
   ~Task() {
-    assert(!io_request.nseg);
+    if (primary)
+      primary->ref--;
+    ceph_assert(!io_request.nseg);
   }
   void release_segs(SharedDriverQueueData *queue_data) {
     if (io_request.extra_segs) {
@@ -321,6 +253,7 @@ struct Task {
       for (uint16_t i = 0; i < io_request.nseg; i++)
         queue_data->data_buf_mempool.push_back(io_request.inline_segs[i]);
     }
+    ctx->total_nseg -= io_request.nseg;
     io_request.nseg = 0;
   }
 
@@ -338,16 +271,6 @@ struct Task {
       copied += need_copy;
     }
   }
-
-  void io_wait() {
-    std::unique_lock<std::mutex> l(lock);
-    cond.wait(l);
-  }
-
-  void io_wake() {
-    std::lock_guard<std::mutex> l(lock);
-    cond.notify_all();
-  }
 };
 
 static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
@@ -355,7 +278,7 @@ static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
   Task *t = static_cast<Task*>(cb_arg);
   uint32_t i = sgl_offset / data_buffer_size;
   uint32_t offset = i * data_buffer_size;
-  assert(i <= t->io_request.nseg);
+  ceph_assert(i <= t->io_request.nseg);
 
   for (; i < t->io_request.nseg; i++) {
     offset += data_buffer_size;
@@ -424,56 +347,53 @@ int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write)
     data_buf_mempool.pop_back();
   }
   t->io_request.nseg = count;
+  t->ctx->total_nseg += count;
   if (write) {
-    auto blp = t->write_bl.begin();
+    auto blp = t->bl.begin();
     uint32_t len = 0;
     uint16_t i = 0;
     for (; i < count - 1; ++i) {
       blp.copy(data_buffer_size, static_cast<char*>(segs[i]));
       len += data_buffer_size;
     }
-    blp.copy(t->write_bl.length() - len, static_cast<char*>(segs[i]));
+    blp.copy(t->bl.length() - len, static_cast<char*>(segs[i]));
   }
 
   return 0;
 }
 
-void SharedDriverQueueData::_aio_thread()
+void SharedDriverQueueData::_aio_handle(Task *t, IOContext *ioc)
 {
-  dout(1) << __func__ << " start" << dendl;
+  dout(20) << __func__ << " start" << dendl;
 
-  if (data_buf_mempool.empty()) {
-    for (uint16_t i = 0; i < data_buffer_default_num; i++) {
-      void *b = spdk_zmalloc(data_buffer_size, CEPH_PAGE_SIZE, NULL);
-      if (!b) {
-        derr << __func__ << " failed to create memory pool for nvme data buffer" << dendl;
-        assert(b);
-      }
-      data_buf_mempool.push_back(b);
-    }
-  }
-
-  Task *t = nullptr;
   int r = 0;
   uint64_t lba_off, lba_count;
+  uint32_t max_io_completion = (uint32_t)g_conf().get_val<uint64_t>("bluestore_spdk_max_io_completion");
+  uint64_t io_sleep_in_us = g_conf().get_val<uint64_t>("bluestore_spdk_io_sleep");
 
   ceph::coarse_real_clock::time_point cur, start
     = ceph::coarse_real_clock::now();
-  while (true) {
-    bool inflight = queue_op_seq.load() - completed_op_seq.load();
+  while (ioc->num_running) {
  again:
     dout(40) << __func__ << " polling" << dendl;
-    if (inflight) {
-      if (!spdk_nvme_qpair_process_completions(qpair, g_conf->bluestore_spdk_max_io_completion)) {
-        dout(30) << __func__ << " idle, have a pause" << dendl;
-        _mm_pause();
+    if (current_queue_depth) {
+      r = spdk_nvme_qpair_process_completions(qpair, max_io_completion);
+      if (r < 0) {
+        ceph_abort();
+      } else if (r == 0) {
+        usleep(io_sleep_in_us);
       }
     }
 
     for (; t; t = t->next) {
+      if (current_queue_depth == max_queue_depth) {
+        // no slots
+        goto again;
+      }
+
       t->queue = this;
-      lba_off = t->offset / sector_size;
-      lba_count = t->len / sector_size;
+      lba_off = t->offset / block_size;
+      lba_count = t->len / block_size;
       switch (t->command) {
         case IOCommand::WRITE_COMMAND:
         {
@@ -488,15 +408,15 @@ void SharedDriverQueueData::_aio_thread()
               ns, qpair, lba_off, lba_count, io_complete, t, 0,
               data_buf_reset_sgl, data_buf_next_sge);
           if (r < 0) {
-            derr << __func__ << " failed to do write command" << dendl;
+            derr << __func__ << " failed to do write command: " << cpp_strerror(r) << dendl;
             t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
             t->release_segs(this);
             delete t;
             ceph_abort();
           }
           cur = ceph::coarse_real_clock::now();
-          auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
-          logger->tinc(l_bluestore_nvmedevice_aio_write_queue_lat, dur);
+          auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - t->start);
+          logger->tinc(l_bluestore_nvmedevice_write_queue_lat, dur);
           break;
         }
         case IOCommand::READ_COMMAND:
@@ -512,13 +432,13 @@ void SharedDriverQueueData::_aio_thread()
               ns, qpair, lba_off, lba_count, io_complete, t, 0,
               data_buf_reset_sgl, data_buf_next_sge);
           if (r < 0) {
-            derr << __func__ << " failed to read" << dendl;
+            derr << __func__ << " failed to read: " << cpp_strerror(r) << dendl;
             t->release_segs(this);
             delete t;
             ceph_abort();
           } else {
             cur = ceph::coarse_real_clock::now();
-            auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
+            auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - t->start);
             logger->tinc(l_bluestore_nvmedevice_read_queue_lat, dur);
           }
           break;
@@ -528,59 +448,29 @@ void SharedDriverQueueData::_aio_thread()
           dout(20) << __func__ << " flush command issueed " << dendl;
           r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t);
           if (r < 0) {
-            derr << __func__ << " failed to flush" << dendl;
+            derr << __func__ << " failed to flush: " << cpp_strerror(r) << dendl;
             t->release_segs(this);
             delete t;
             ceph_abort();
           } else {
             cur = ceph::coarse_real_clock::now();
-            auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
+            auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - t->start);
             logger->tinc(l_bluestore_nvmedevice_flush_queue_lat, dur);
           }
           break;
         }
       }
+      current_queue_depth++;
     }
-
-    if (!queue_empty.load()) {
-      Mutex::Locker l(queue_lock);
-      if (!task_queue.empty()) {
-        t = task_queue.front();
-        task_queue.pop();
-        logger->set(l_bluestore_nvmedevice_queue_ops, task_queue.size());
-      }
-      if (!t)
-        queue_empty = true;
-    } else {
-      if (flush_waiters.load()) {
-        Mutex::Locker l(flush_lock);
-        if (*flush_waiter_seqs.begin() <= completed_op_seq.load())
-          flush_cond.Signal();
-      }
-
-      if (!inflight) {
-        // be careful, here we need to let each thread reap its own, currently it is done
-        // by only one dedicatd dpdk thread
-        if(!queueid) {
-          for (auto &&it : driver->registered_devices)
-            it->reap_ioc();
-        }
-
-        Mutex::Locker l(queue_lock);
-        if (queue_empty.load()) {
-         cur = ceph::coarse_real_clock::now();
-          auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
-          logger->tinc(l_bluestore_nvmedevice_polling_lat, dur);
-          if (aio_stop)
-            break;
-          queue_cond.Wait(queue_lock);
-          start = ceph::coarse_real_clock::now();
-        }
-      }
-    }
+    cur = ceph::coarse_real_clock::now();
+    auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(cur - start);
+    logger->tinc(l_bluestore_nvmedevice_polling_lat, dur);
+    start = ceph::coarse_real_clock::now();
   }
-  assert(data_buf_mempool.size() == data_buffer_default_num);
-  dout(1) << __func__ << " end" << dendl;
+
+  if (reap_io)
+    bdev->reap_ioc();
+  dout(20) << __func__ << " end" << dendl;
 }
 
 #define dout_subsys ceph_subsys_bdev
@@ -590,31 +480,40 @@ void SharedDriverQueueData::_aio_thread()
 class NVMEManager {
  public:
   struct ProbeContext {
-    string sn_tag;
+    spdk_nvme_transport_id trid;
     NVMEManager *manager;
     SharedDriverData *driver;
     bool done;
   };
 
  private:
-  Mutex lock;
-  bool init = false;
+  ceph::mutex lock = ceph::make_mutex("NVMEManager::lock");
+  bool stopping = false;
   std::vector<SharedDriverData*> shared_driver_datas;
   std::thread dpdk_thread;
-  std::mutex probe_queue_lock;
-  std::condition_variable probe_queue_cond;
+  ceph::mutex probe_queue_lock = ceph::make_mutex("NVMEManager::probe_queue_lock");
+  ceph::condition_variable probe_queue_cond;
   std::list<ProbeContext*> probe_queue;
 
  public:
-  NVMEManager()
-      : lock("NVMEDevice::NVMEManager::lock") {}
-  int try_get(const string &sn_tag, SharedDriverData **driver);
-  void register_ctrlr(const string &sn_tag, spdk_nvme_ctrlr *c, struct spdk_pci_device *pci_dev,
-                      SharedDriverData **driver) {
-    assert(lock.is_locked());
+  NVMEManager() {}
+  ~NVMEManager() {
+    if (!dpdk_thread.joinable())
+      return;
+    {
+      std::lock_guard guard(probe_queue_lock);
+      stopping = true;
+      probe_queue_cond.notify_all();
+    }
+    dpdk_thread.join();
+  }
+
+  int try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver);
+  void register_ctrlr(const spdk_nvme_transport_id& trid, spdk_nvme_ctrlr *c, SharedDriverData **driver) {
+    ceph_assert(ceph_mutex_is_locked(lock));
     spdk_nvme_ns *ns;
     int num_ns = spdk_nvme_ctrlr_get_num_ns(c);
-    assert(num_ns >= 1);
+    ceph_assert(num_ns >= 1);
     if (num_ns > 1) {
       dout(0) << __func__ << " namespace count larger than 1, currently only use the first namespace" << dendl;
     }
@@ -623,13 +522,12 @@ class NVMEManager {
       derr << __func__ << " failed to get namespace at 1" << dendl;
       ceph_abort();
     }
-    dout(1) << __func__ << " successfully attach nvme device at" << spdk_pci_device_get_bus(pci_dev)
-            << ":" << spdk_pci_device_get_dev(pci_dev) << ":" << spdk_pci_device_get_func(pci_dev) << dendl;
+    dout(1) << __func__ << " successfully attach nvme device at" << trid.traddr << dendl;
 
     // only support one device per osd now!
-    assert(shared_driver_datas.empty());
-    // index 0 is occured by master thread
-    shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, c, ns));
+    ceph_assert(shared_driver_datas.empty());
+    // index 0 is occurred by master thread
+    shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, trid, c, ns));
     *driver = shared_driver_datas.back();
   }
 };
@@ -639,40 +537,17 @@ static NVMEManager manager;
 static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, struct spdk_nvme_ctrlr_opts *opts)
 {
   NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
-  char serial_number[128];
-  struct spdk_pci_addr pci_addr;
-  struct spdk_pci_device *pci_dev = NULL;
-  int result = 0;
 
   if (trid->trtype != SPDK_NVME_TRANSPORT_PCIE) {
     dout(0) << __func__ << " only probe local nvme device" << dendl;
     return false;
   }
 
-  result = spdk_pci_addr_parse(&pci_addr, trid->traddr);
-  if (result) {
-    dout(0) << __func__ << " failed to get pci address from %s, " << trid->traddr << " return value is: %d" << result << dendl;
-    return false;
-  }
-
-  pci_dev = spdk_pci_get_device(&pci_addr);
-  if (!pci_dev) {
-    dout(0) << __func__ << " failed to get pci device" << dendl; 
-    return false;
-  }
-
-  dout(0) << __func__ << " found device at bus: " << spdk_pci_device_get_bus(pci_dev)
-          << ":" << spdk_pci_device_get_dev(pci_dev) << ":"
-          << spdk_pci_device_get_func(pci_dev) << " vendor:0x" << spdk_pci_device_get_vendor_id(pci_dev) << " device:0x" << spdk_pci_device_get_device_id(pci_dev)
-          << dendl;
-  result = spdk_pci_device_get_serial_number(pci_dev, serial_number, 128);
-  if (result < 0) {
-    dout(10) << __func__ << " failed to get serial number from %p" << pci_dev << dendl;
-    return false;
-  }
-
-  if (ctx->sn_tag.compare(string(serial_number, 16))) {
-    dout(0) << __func__ << " device serial number (" << ctx->sn_tag << ") not match " << serial_number << dendl;
+  dout(0) << __func__ << " found device at: "
+         << "trtype=" << spdk_nvme_transport_id_trtype_str(trid->trtype) << ", "
+          << "traddr=" << trid->traddr << dendl;
+  if (spdk_nvme_transport_id_compare(&ctx->trid, trid)) {
+    dout(0) << __func__ << " device traddr (" << ctx->trid.traddr << ") not match " << trid->traddr << dendl;
     return false;
   }
 
@@ -682,90 +557,87 @@ static bool probe_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid, st
 static void attach_cb(void *cb_ctx, const struct spdk_nvme_transport_id *trid,
                       struct spdk_nvme_ctrlr *ctrlr, const struct spdk_nvme_ctrlr_opts *opts)
 {
-  struct spdk_pci_addr pci_addr;
-  struct spdk_pci_device *pci_dev = NULL;
-
-  spdk_pci_addr_parse(&pci_addr, trid->traddr);
-
-  pci_dev = spdk_pci_get_device(&pci_addr);
-  if (!pci_dev) {
-    dout(0) << __func__ << " failed to get pci device" << dendl; 
-    assert(pci_dev);
-  }
-
-  NVMEManager::ProbeContext *ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
-  ctx->manager->register_ctrlr(ctx->sn_tag, ctrlr, pci_dev, &ctx->driver);
+  auto ctx = static_cast<NVMEManager::ProbeContext*>(cb_ctx);
+  ctx->manager->register_ctrlr(ctx->trid, ctrlr, &ctx->driver);
 }
 
-int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver)
+static int hex2dec(unsigned char c)
 {
-  Mutex::Locker l(lock);
-  int r = 0;
-  unsigned long long core_value;
-  uint32_t core_num = 0;
-  int m_core_arg = -1;
-  uint32_t mem_size_arg = g_conf->bluestore_spdk_mem;
-  char *coremask_arg = (char *)g_conf->bluestore_spdk_coremask.c_str();
-
-  if (sn_tag.empty()) {
-    r = -ENOENT;
-    derr << __func__ << " empty serial number: " << cpp_strerror(r) << dendl;
-    return r;
-  }
+  if (isdigit(c))
+    return c - '0';
+  else if (isupper(c))
+    return c - 'A' + 10;
+  else
+    return c - 'a' + 10;
+}
 
-  core_value = strtoll(coremask_arg, NULL, 16);
-  for (uint32_t i = 0; i < sizeof(long long) * 8; i++) {
-    bool tmp = (core_value >> i) & 0x1;
-    if (tmp) {
-      core_num += 1;
-      // select the least signficant bit as the master core
-      if(m_core_arg < 0) {
-        m_core_arg = i;
-      }
+static int find_first_bitset(const string& s)
+{
+  auto e = s.rend();
+  if (s.compare(0, 2, "0x") == 0 ||
+      s.compare(0, 2, "0X") == 0) {
+    advance(e, -2);
+  }
+  auto p = s.rbegin();
+  for (int pos = 0; p != e; ++p, pos += 4) {
+    if (!isxdigit(*p)) {
+      return -EINVAL;
+    }
+    if (int val = hex2dec(*p); val != 0) {
+      return pos + ffs(val);
     }
   }
+  return 0;
+}
 
-  // at least two cores are needed for using spdk
-  if (core_num < 2) {
-    r = -ENOENT;
-    derr << __func__ << " invalid spdk coremask, at least two cores are needed: "
-         << cpp_strerror(r) << dendl;
-    return r;
-  }
-
+int NVMEManager::try_get(const spdk_nvme_transport_id& trid, SharedDriverData **driver)
+{
+  std::lock_guard l(lock);
   for (auto &&it : shared_driver_datas) {
-    if (it->is_equal(sn_tag)) {
+    if (it->is_equal(trid)) {
       *driver = it;
       return 0;
     }
   }
 
-  if (!init) {
-    init = true;
+  auto coremask_arg = g_conf().get_val<std::string>("bluestore_spdk_coremask");
+  int m_core_arg = find_first_bitset(coremask_arg);
+  // at least one core is needed for using spdk
+  if (m_core_arg <= 0) {
+    derr << __func__ << " invalid bluestore_spdk_coremask, "
+        << "at least one core is needed" << dendl;
+    return -ENOENT;
+  }
+  m_core_arg -= 1;
+
+  uint32_t mem_size_arg = (uint32_t)g_conf().get_val<Option::size_t>("bluestore_spdk_mem");
+
+  if (!dpdk_thread.joinable()) {
     dpdk_thread = std::thread(
       [this, coremask_arg, m_core_arg, mem_size_arg]() {
         static struct spdk_env_opts opts;
         int r;
 
         spdk_env_opts_init(&opts);
-        opts.name = "ceph-osd";
-        opts.core_mask = coremask_arg;
-        opts.dpdk_master_core = m_core_arg;
-        opts.dpdk_mem_size = mem_size_arg;
+        opts.name = "nvme-device-manager";
+        opts.core_mask = coremask_arg.c_str();
+        opts.master_core = m_core_arg;
+        opts.mem_size = mem_size_arg;
         spdk_env_init(&opts);
+        spdk_unaffinitize_thread();
 
         spdk_nvme_retry_count = g_ceph_context->_conf->bdev_nvme_retry_count;
         if (spdk_nvme_retry_count < 0)
           spdk_nvme_retry_count = SPDK_NVME_DEFAULT_RETRY_COUNT;
 
-        std::unique_lock<std::mutex> l(probe_queue_lock);
-        while (true) {
+        std::unique_lock l(probe_queue_lock);
+        while (!stopping) {
           if (!probe_queue.empty()) {
             ProbeContext* ctxt = probe_queue.front();
             probe_queue.pop_front();
             r = spdk_nvme_probe(NULL, ctxt, probe_cb, attach_cb, NULL);
             if (r < 0) {
-              assert(!ctxt->driver);
+              ceph_assert(!ctxt->driver);
               derr << __func__ << " device probe nvme failed" << dendl;
             }
             ctxt->done = true;
@@ -774,14 +646,16 @@ int NVMEManager::try_get(const string &sn_tag, SharedDriverData **driver)
             probe_queue_cond.wait(l);
           }
         }
+        for (auto p : probe_queue)
+          p->done = true;
+        probe_queue_cond.notify_all();
       }
     );
-    dpdk_thread.detach();
   }
 
-  ProbeContext ctx = {sn_tag, this, nullptr, false};
+  ProbeContext ctx{trid, this, nullptr, false};
   {
-    std::unique_lock<std::mutex> l(probe_queue_lock);
+    std::unique_lock l(probe_queue_lock);
     probe_queue.push_back(&ctx);
     while (!ctx.done)
       probe_queue_cond.wait(l);
@@ -799,14 +673,14 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion)
   IOContext *ctx = task->ctx;
   SharedDriverQueueData *queue = task->queue;
 
-  assert(queue != NULL);
-  assert(ctx != NULL);
-  ++queue->completed_op_seq;
+  ceph_assert(queue != NULL);
+  ceph_assert(ctx != NULL);
+  --queue->current_queue_depth;
   auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
       ceph::coarse_real_clock::now() - task->start);
   if (task->command == IOCommand::WRITE_COMMAND) {
-    queue->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, dur);
-    assert(!spdk_nvme_cpl_is_error(completion));
+    queue->logger->tinc(l_bluestore_nvmedevice_write_lat, dur);
+    ceph_assert(!spdk_nvme_cpl_is_error(completion));
     dout(20) << __func__ << " write/zero op successfully, left "
              << queue->queue_op_seq - queue->completed_op_seq << dendl;
     // check waiting count before doing callback (which may
@@ -822,29 +696,33 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion)
     delete task;
   } else if (task->command == IOCommand::READ_COMMAND) {
     queue->logger->tinc(l_bluestore_nvmedevice_read_lat, dur);
-    assert(!spdk_nvme_cpl_is_error(completion));
+    ceph_assert(!spdk_nvme_cpl_is_error(completion));
     dout(20) << __func__ << " read op successfully" << dendl;
     task->fill_cb();
     task->release_segs(queue);
     // read submitted by AIO
-    if(!task->return_code) {
+    if (!task->return_code) {
       if (ctx->priv) {
        if (!--ctx->num_running) {
           task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
        }
       } else {
-       ctx->try_aio_wake();
+        ctx->try_aio_wake();
       }
       delete task;
     } else {
-      task->return_code = 0;
-      if (!--ctx->num_running) {
-        task->io_wake();
+      if (Task* primary = task->primary; primary != nullptr) {
+        delete task;
+        if (!primary->ref)
+          primary->return_code = 0;
+      } else {
+         task->return_code = 0;
       }
+      --ctx->num_running;
     }
   } else {
-    assert(task->command == IOCommand::FLUSH_COMMAND);
-    assert(!spdk_nvme_cpl_is_error(completion));
+    ceph_assert(task->command == IOCommand::FLUSH_COMMAND);
+    ceph_assert(!spdk_nvme_cpl_is_error(completion));
     queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
     dout(20) << __func__ << " flush op successfully" << dendl;
     task->return_code = 0;
@@ -856,60 +734,37 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion)
 #define dout_prefix *_dout << "bdev(" << name << ") "
 
 NVMEDevice::NVMEDevice(CephContext* cct, aio_callback_t cb, void *cbpriv)
-  :   BlockDevice(cct),
-      driver(nullptr),
-      size(0),
-      block_size(0),
-      aio_stop(false),
-      buffer_lock("NVMEDevice::buffer_lock"),
-      aio_callback(cb),
-      aio_callback_priv(cbpriv)
+  :   BlockDevice(cct, cb, cbpriv),
+      driver(nullptr)
 {
 }
 
-
 int NVMEDevice::open(const string& p)
 {
-  int r = 0;
   dout(1) << __func__ << " path " << p << dendl;
 
-  string serial_number;
-  int fd = ::open(p.c_str(), O_RDONLY);
-  if (fd < 0) {
-    r = -errno;
-    derr << __func__ << " unable to open " << p << ": " << cpp_strerror(r)
-        << dendl;
-    return r;
+  std::ifstream ifs(p);
+  if (!ifs) {
+    derr << __func__ << " unable to open " << p << dendl;
+    return -1;
   }
-  char buf[100];
-  r = ::read(fd, buf, sizeof(buf));
-  VOID_TEMP_FAILURE_RETRY(::close(fd));
-  fd = -1; // defensive
-  if (r <= 0) {
-    if (r == 0) {
-      r = -EINVAL;
-    } else {
-      r = -errno;
-    }
-    derr << __func__ << " unable to read " << p << ": " << cpp_strerror(r) << dendl;
+  string val;
+  std::getline(ifs, val);
+  spdk_nvme_transport_id trid;
+  if (int r = spdk_nvme_transport_id_parse(&trid, val.c_str()); r) {
+    derr << __func__ << " unable to read " << p << ": " << cpp_strerror(r)
+        << dendl;
     return r;
   }
-  /* scan buf from the beginning with isxdigit. */
-  int i = 0;
-  while (i < r && isxdigit(buf[i])) {
-    i++;
-  }
-  serial_number = string(buf, i);
-  r = manager.try_get(serial_number, &driver);
-  if (r < 0) {
-    derr << __func__ << " failed to get nvme device with sn " << serial_number << dendl;
+  if (int r = manager.try_get(trid, &driver); r < 0) {
+    derr << __func__ << " failed to get nvme device with transport address " << trid.traddr << dendl;
     return r;
   }
 
   driver->register_device(this);
   block_size = driver->get_block_size();
   size = driver->get_size();
-  name = serial_number;
+  name = trid.traddr;
 
   //nvme is non-rotational device.
   rotational = false;
@@ -921,6 +776,7 @@ int NVMEDevice::open(const string& p)
           << " block_size " << block_size << " (" << byte_u_t(block_size)
           << ")" << dendl;
 
+
   return 0;
 }
 
@@ -936,7 +792,7 @@ void NVMEDevice::close()
   dout(1) << __func__ << " end" << dendl;
 }
 
-int NVMEDevice::collect_metadata(string prefix, map<string,string> *pm) const
+int NVMEDevice::collect_metadata(const string& prefix, map<string,string> *pm) const
 {
   (*pm)[prefix + "rotational"] = "0";
   (*pm)[prefix + "size"] = stringify(get_size());
@@ -951,17 +807,6 @@ int NVMEDevice::collect_metadata(string prefix, map<string,string> *pm) const
 
 int NVMEDevice::flush()
 {
-  dout(10) << __func__ << " start" << dendl;
-  auto start = ceph::coarse_real_clock::now();
-
-  if(queue_id == -1)
-    queue_id = ceph_gettid();
-  SharedDriverQueueData *queue = driver->get_queue(queue_id);
-  assert(queue != NULL);
-  queue->flush_wait();
-  auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
-      ceph::coarse_real_clock::now() - start);
-  queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
   return 0;
 }
 
@@ -975,12 +820,88 @@ void NVMEDevice::aio_submit(IOContext *ioc)
   if (pending && t) {
     ioc->num_running += pending;
     ioc->num_pending -= pending;
-    assert(ioc->num_pending.load() == 0);  // we should be only thread doing this
+    ceph_assert(ioc->num_pending.load() == 0);  // we should be only thread doing this
     // Only need to push the first entry
-  if(queue_id == -1)
-    queue_id = ceph_gettid();
-    driver->get_queue(queue_id)->queue_task(t, pending);
     ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
+    if (!queue_t)
+       queue_t = new SharedDriverQueueData(this, driver);
+    queue_t->_aio_handle(t, ioc);
+  }
+}
+
+static void ioc_append_task(IOContext *ioc, Task *t)
+{
+  Task *first, *last;
+
+  first = static_cast<Task*>(ioc->nvme_task_first);
+  last = static_cast<Task*>(ioc->nvme_task_last);
+  if (last)
+    last->next = t;
+  if (!first)
+    ioc->nvme_task_first = t;
+  ioc->nvme_task_last = t;
+  ++ioc->num_pending;
+}
+
+static void write_split(
+    NVMEDevice *dev,
+    uint64_t off,
+    bufferlist &bl,
+    IOContext *ioc)
+{
+  uint64_t remain_len = bl.length(), begin = 0, write_size;
+  Task *t;
+  // This value may need to be got from configuration later.
+  uint64_t split_size = 131072; // 128KB.
+
+  while (remain_len > 0) {
+    write_size = std::min(remain_len, split_size);
+    t = new Task(dev, IOCommand::WRITE_COMMAND, off + begin, write_size);
+    // TODO: if upper layer alloc memory with known physical address,
+    // we can reduce this copy
+    bl.splice(0, write_size, &t->bl);
+    remain_len -= write_size;
+    t->ctx = ioc;
+    ioc_append_task(ioc, t);
+    begin += write_size;
+  }
+}
+
+static void make_read_tasks(
+    NVMEDevice *dev,
+    uint64_t aligned_off,
+    IOContext *ioc, char *buf, uint64_t aligned_len, Task *primary,
+    uint64_t orig_off, uint64_t orig_len)
+{
+  // This value may need to be got from configuration later.
+  uint64_t split_size = 131072; // 128KB.
+  uint64_t tmp_off = orig_off - aligned_off, remain_orig_len = orig_len;
+  auto begin = aligned_off;
+  const auto aligned_end = begin + aligned_len;
+
+  for (; begin < aligned_end; begin += split_size) {
+    auto read_size = std::min(aligned_end - begin, split_size);
+    auto tmp_len = std::min(remain_orig_len, read_size - tmp_off);
+    Task *t = nullptr;
+
+    if (primary && (aligned_len <= split_size)) {
+      t = primary;
+    } else {
+      t = new Task(dev, IOCommand::READ_COMMAND, begin, read_size, 0, primary);
+    }
+
+    t->ctx = ioc;
+
+    // TODO: if upper layer alloc memory with known physical address,
+    // we can reduce this copy
+    t->fill_cb = [buf, t, tmp_off, tmp_len]  {
+      t->copy_to_buf(buf, tmp_off, tmp_len);
+    };
+
+    ioc_append_task(ioc, t);
+    remain_orig_len -= tmp_len;
+    buf += tmp_len;
+    tmp_off = 0;
   }
 }
 
@@ -988,50 +909,35 @@ int NVMEDevice::aio_write(
     uint64_t off,
     bufferlist &bl,
     IOContext *ioc,
-    bool buffered)
+    bool buffered,
+    int write_hint)
 {
   uint64_t len = bl.length();
   dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc
            << " buffered " << buffered << dendl;
-  assert(off % block_size == 0);
-  assert(len % block_size == 0);
-  assert(len > 0);
-  assert(off < size);
-  assert(off + len <= size);
-
-  Task *t = new Task(this, IOCommand::WRITE_COMMAND, off, len);
-
-  // TODO: if upper layer alloc memory with known physical address,
-  // we can reduce this copy
-  t->write_bl = std::move(bl);
-
-  if (buffered) {
-    // Only need to push the first entry
-    if(queue_id == -1)
-      queue_id = ceph_gettid();
-    driver->get_queue(queue_id)->queue_task(t);
-  } else {
-    t->ctx = ioc;
-    Task *first = static_cast<Task*>(ioc->nvme_task_first);
-    Task *last = static_cast<Task*>(ioc->nvme_task_last);
-    if (last)
-      last->next = t;
-    if (!first)
-      ioc->nvme_task_first = t;
-    ioc->nvme_task_last = t;
-    ++ioc->num_pending;
-  }
+  ceph_assert(is_valid_io(off, len));
 
+  write_split(this, off, bl, ioc);
   dout(5) << __func__ << " " << off << "~" << len << dendl;
 
   return 0;
 }
 
-int NVMEDevice::write(uint64_t off, bufferlist &bl, bool buffered)
+int NVMEDevice::write(uint64_t off, bufferlist &bl, bool buffered, int write_hint)
 {
-  // FIXME: there is presumably a more efficient way to do this...
+  uint64_t len = bl.length();
+  dout(20) << __func__ << " " << off << "~" << len << " buffered "
+           << buffered << dendl;
+  ceph_assert(off % block_size == 0);
+  ceph_assert(len % block_size == 0);
+  ceph_assert(len > 0);
+  ceph_assert(off < size);
+  ceph_assert(off + len <= size);
+
   IOContext ioc(cct, NULL);
-  aio_write(off, bl, &ioc, buffered);
+  write_split(this, off, bl, &ioc);
+  dout(5) << __func__ << " " << off << "~" << len << dendl;
+  aio_submit(&ioc);
   ioc.aio_wait();
   return 0;
 }
@@ -1041,32 +947,20 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
                      bool buffered)
 {
   dout(5) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
-  assert(off % block_size == 0);
-  assert(len % block_size == 0);
-  assert(len > 0);
-  assert(off < size);
-  assert(off + len <= size);
-
-  Task *t = new Task(this, IOCommand::READ_COMMAND, off, len, 1);
-  bufferptr p = buffer::create_page_aligned(len);
-  int r = 0;
-  t->ctx = ioc;
+  ceph_assert(is_valid_io(off, len));
+
+  Task t(this, IOCommand::READ_COMMAND, off, len, 1);
+  bufferptr p = buffer::create_small_page_aligned(len);
   char *buf = p.c_str();
-  t->fill_cb = [buf, t]() {
-    t->copy_to_buf(buf, 0, t->len);
-  };
-  ++ioc->num_running;
-  if(queue_id == -1)
-    queue_id = ceph_gettid();
-  driver->get_queue(queue_id)->queue_task(t);
 
-  while(t->return_code > 0) {
-    t->io_wait();
-  }
+  ceph_assert(ioc->nvme_task_first == nullptr);
+  ceph_assert(ioc->nvme_task_last == nullptr);
+  make_read_tasks(this, off, ioc, buf, len, &t, off, len);
+  dout(5) << __func__ << " " << off << "~" << len << dendl;
+  aio_submit(ioc);
+
   pbl->push_back(std::move(p));
-  r = t->return_code;
-  delete t;
-  return r;
+  return t.return_code;
 }
 
 int NVMEDevice::aio_read(
@@ -1076,62 +970,33 @@ int NVMEDevice::aio_read(
     IOContext *ioc)
 {
   dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
-  assert(off % block_size == 0);
-  assert(len % block_size == 0);
-  assert(len > 0);
-  assert(off < size);
-  assert(off + len <= size);
-
-  Task *t = new Task(this, IOCommand::READ_COMMAND, off, len);
-
-  bufferptr p = buffer::create_page_aligned(len);
+  ceph_assert(is_valid_io(off, len));
+  bufferptr p = buffer::create_small_page_aligned(len);
   pbl->append(p);
-  t->ctx = ioc;
-  char *buf = p.c_str();
-  t->fill_cb = [buf, t]() {
-    t->copy_to_buf(buf, 0, t->len);
-  };
-
-  Task *first = static_cast<Task*>(ioc->nvme_task_first);
-  Task *last = static_cast<Task*>(ioc->nvme_task_last);
-  if (last)
-    last->next = t;
-  if (!first)
-    ioc->nvme_task_first = t;
-  ioc->nvme_task_last = t;
-  ++ioc->num_pending;
+  char* buf = p.c_str();
 
+  make_read_tasks(this, off, ioc, buf, len, NULL, off, len);
+  dout(5) << __func__ << " " << off << "~" << len << dendl;
   return 0;
 }
 
 int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
 {
-  assert(len > 0);
-  assert(off < size);
-  assert(off + len <= size);
+  ceph_assert(len > 0);
+  ceph_assert(off < size);
+  ceph_assert(off + len <= size);
 
-  uint64_t aligned_off = align_down(off, block_size);
-  uint64_t aligned_len = align_up(off+len, block_size) - aligned_off;
+  uint64_t aligned_off = p2align(off, block_size);
+  uint64_t aligned_len = p2roundup(off+len, block_size) - aligned_off;
   dout(5) << __func__ << " " << off << "~" << len
           << " aligned " << aligned_off << "~" << aligned_len << dendl;
   IOContext ioc(g_ceph_context, nullptr);
-  Task *t = new Task(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1);
-  int r = 0;
-  t->ctx = &ioc;
-  t->fill_cb = [buf, t, off, len]() {
-    t->copy_to_buf(buf, off-t->offset, len);
-  };
-  ++ioc.num_running;
-  if(queue_id == -1)
-    queue_id = ceph_gettid();
-  driver->get_queue(queue_id)->queue_task(t);
+  Task t(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1);
 
-  while(t->return_code > 0) {
-    t->io_wait();
-  }
-  r = t->return_code;
-  delete t;
-  return r;
+  make_read_tasks(this, aligned_off, &ioc, buf, aligned_len, &t, off, len);
+  aio_submit(&ioc);
+
+  return t.return_code;
 }
 
 int NVMEDevice::invalidate_cache(uint64_t off, uint64_t len)