]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/librbd/internal.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / librbd / internal.cc
index bd77bb439f849391f5e24939b5ddb45c0be057bf..8b499c283da6446c190f3303b9cb6def1d5bead5 100644 (file)
 #include "common/errno.h"
 #include "common/Throttle.h"
 #include "common/event_socket.h"
-#include "cls/lock/cls_lock_client.h"
+#include "common/perf_counters.h"
+#include "osdc/Striper.h"
 #include "include/stringify.h"
 
+#include "cls/lock/cls_lock_client.h"
 #include "cls/rbd/cls_rbd.h"
 #include "cls/rbd/cls_rbd_types.h"
 #include "cls/rbd/cls_rbd_client.h"
 #include "librbd/Operations.h"
 #include "librbd/Types.h"
 #include "librbd/Utils.h"
+#include "librbd/api/Config.h"
 #include "librbd/api/Image.h"
 #include "librbd/exclusive_lock/AutomaticPolicy.h"
 #include "librbd/exclusive_lock/StandardPolicy.h"
 #include "librbd/image/CloneRequest.h"
 #include "librbd/image/CreateRequest.h"
-#include "librbd/image/RemoveRequest.h"
 #include "librbd/io/AioCompletion.h"
 #include "librbd/io/ImageRequest.h"
 #include "librbd/io/ImageRequestWQ.h"
+#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatchSpec.h"
 #include "librbd/io/ObjectRequest.h"
 #include "librbd/io/ReadResult.h"
 #include "librbd/journal/Types.h"
@@ -50,7 +54,7 @@
 
 #include <boost/scope_exit.hpp>
 #include <boost/variant.hpp>
-#include "include/assert.h"
+#include "include/ceph_assert.h"
 
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
@@ -75,7 +79,7 @@ namespace librbd {
 namespace {
 
 int validate_pool(IoCtx &io_ctx, CephContext *cct) {
-  if (!cct->_conf->get_val<bool>("rbd_validate_pool")) {
+  if (!cct->_conf.get_val<bool>("rbd_validate_pool")) {
     return 0;
   }
 
@@ -109,7 +113,6 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
   return 0;
 }
 
-
 } // anonymous namespace
 
   int detect_format(IoCtx &io_ctx, const string &name,
@@ -196,9 +199,9 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
 
   void trim_image(ImageCtx *ictx, uint64_t newsize, ProgressContext& prog_ctx)
   {
-    assert(ictx->owner_lock.is_locked());
-    assert(ictx->exclusive_lock == nullptr ||
-          ictx->exclusive_lock->is_lock_owner());
+    ceph_assert(ictx->owner_lock.is_locked());
+    ceph_assert(ictx->exclusive_lock == nullptr ||
+                ictx->exclusive_lock->is_lock_owner());
 
     C_SaferCond ctx;
     ictx->snap_lock.get_read();
@@ -229,8 +232,14 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
       off += r;
     } while (r == READ_SIZE);
 
+    static_assert(sizeof(RBD_HEADER_TEXT) == sizeof(RBD_MIGRATE_HEADER_TEXT),
+                  "length of rbd headers must be the same");
+
     if (header.length() < sizeof(RBD_HEADER_TEXT) ||
-       memcmp(RBD_HEADER_TEXT, header.c_str(), sizeof(RBD_HEADER_TEXT))) {
+        (memcmp(RBD_HEADER_TEXT, header.c_str(),
+                sizeof(RBD_HEADER_TEXT)) != 0 &&
+         memcmp(RBD_MIGRATE_HEADER_TEXT, header.c_str(),
+                sizeof(RBD_MIGRATE_HEADER_TEXT)) != 0)) {
       CephContext *cct = (CephContext *)io_ctx.cct();
       lderr(cct) << "unrecognized header format" << dendl;
       return -ENXIO;
@@ -260,9 +269,9 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
   {
     bufferlist cmdbl, emptybl;
     __u8 c = CEPH_OSD_TMAP_SET;
-    ::encode(c, cmdbl);
-    ::encode(imgname, cmdbl);
-    ::encode(emptybl, cmdbl);
+    encode(c, cmdbl);
+    encode(imgname, cmdbl);
+    encode(emptybl, cmdbl);
     return io_ctx.tmap_update(RBD_DIRECTORY, cmdbl);
   }
 
@@ -270,8 +279,8 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
   {
     bufferlist cmdbl;
     __u8 c = CEPH_OSD_TMAP_RM;
-    ::encode(c, cmdbl);
-    ::encode(imgname, cmdbl);
+    encode(c, cmdbl);
+    encode(imgname, cmdbl);
     return io_ctx.tmap_update(RBD_DIRECTORY, cmdbl);
   }
 
@@ -296,6 +305,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     {RBD_IMAGE_OPTION_FEATURES_SET, UINT64},
     {RBD_IMAGE_OPTION_FEATURES_CLEAR, UINT64},
     {RBD_IMAGE_OPTION_DATA_POOL, STR},
+    {RBD_IMAGE_OPTION_FLATTEN, UINT64},
   };
 
   std::string image_option_name(int optname) {
@@ -322,36 +332,13 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
       return "features_clear";
     case RBD_IMAGE_OPTION_DATA_POOL:
       return "data_pool";
+    case RBD_IMAGE_OPTION_FLATTEN:
+      return "flatten";
     default:
       return "unknown (" + stringify(optname) + ")";
     }
   }
 
-  std::ostream &operator<<(std::ostream &os, const ImageOptions &opts) {
-    os << "[";
-
-    const char *delimiter = "";
-    for (auto &i : IMAGE_OPTIONS_TYPE_MAPPING) {
-      if (i.second == STR) {
-       std::string val;
-       if (opts.get(i.first, &val) == 0) {
-         os << delimiter << image_option_name(i.first) << "=" << val;
-         delimiter = ", ";
-       }
-      } else if (i.second == UINT64) {
-       uint64_t val;
-       if (opts.get(i.first, &val) == 0) {
-         os << delimiter << image_option_name(i.first) << "=" << val;
-         delimiter = ", ";
-       }
-      }
-    }
-
-    os << "]";
-
-    return os;
-  }
-
   void image_options_create(rbd_image_options_t* opts)
   {
     image_options_ref* opts_ = new image_options_ref(new image_options_t());
@@ -495,7 +482,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
       IMAGE_OPTIONS_TYPE_MAPPING.find(optname);
 
     if (i == IMAGE_OPTIONS_TYPE_MAPPING.end()) {
-      assert((*opts_)->find(optname) == (*opts_)->end());
+      ceph_assert((*opts_)->find(optname) == (*opts_)->end());
       return -EINVAL;
     }
 
@@ -523,45 +510,6 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     return (*opts_)->empty();
   }
 
-  int list(IoCtx& io_ctx, vector<string>& names)
-  {
-    CephContext *cct = (CephContext *)io_ctx.cct();
-    ldout(cct, 20) << "list " << &io_ctx << dendl;
-
-    bufferlist bl;
-    int r = io_ctx.read(RBD_DIRECTORY, bl, 0, 0);
-    if (r < 0) {
-      if (r == -ENOENT) {
-        r = 0;
-      }
-      return r;
-    }
-
-    // old format images are in a tmap
-    if (bl.length()) {
-      bufferlist::iterator p = bl.begin();
-      bufferlist header;
-      map<string,bufferlist> m;
-      ::decode(header, p);
-      ::decode(m, p);
-      for (map<string,bufferlist>::iterator q = m.begin(); q != m.end(); ++q) {
-       names.push_back(q->first);
-      }
-    }
-
-    map<string, string> images;
-    r = api::Image<>::list_images(io_ctx, &images);
-    if (r < 0) {
-      lderr(cct) << "error listing v2 images: " << cpp_strerror(r) << dendl;
-      return r;
-    }
-    for (const auto& img_pair : images) {
-      names.push_back(img_pair.first);
-    }
-
-    return 0;
-  }
-
   int flatten_children(ImageCtx *ictx, const char* snap_name,
                        ProgressContext& pctx)
   {
@@ -574,111 +522,78 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     }
 
     RWLock::RLocker l(ictx->snap_lock);
-    snap_t snap_id = ictx->get_snap_id(cls::rbd::UserSnapshotNamespace(), snap_name);
-    ParentSpec parent_spec(ictx->md_ctx.get_id(), ictx->id, snap_id);
-    map< pair<int64_t, string>, set<string> > image_info;
-
-    r = api::Image<>::list_children(ictx, parent_spec, &image_info);
+    snap_t snap_id = ictx->get_snap_id(cls::rbd::UserSnapshotNamespace(),
+                                       snap_name);
+
+    cls::rbd::ParentImageSpec parent_spec{ictx->md_ctx.get_id(),
+                                          ictx->md_ctx.get_namespace(),
+                                          ictx->id, snap_id};
+    std::vector<librbd::linked_image_spec_t> child_images;
+    r = api::Image<>::list_children(ictx, parent_spec, &child_images);
     if (r < 0) {
       return r;
     }
 
-    size_t size = image_info.size();
-    if (size == 0)
+    size_t size = child_images.size();
+    if (size == 0) {
       return 0;
+    }
 
+    librados::IoCtx child_io_ctx;
+    int64_t child_pool_id = -1;
     size_t i = 0;
-    Rados rados(ictx->md_ctx);
-    for ( auto &info : image_info){
-      string pool = info.first.second;
-      IoCtx ioctx;
-      r = rados.ioctx_create2(info.first.first, ioctx);
-      if (r < 0) {
-        lderr(cct) << "Error accessing child image pool " << pool
-                   << dendl;
-        return r;
-      }
-
-      for (auto &id_it : info.second) {
-       ImageCtx *imctx = new ImageCtx("", id_it, NULL, ioctx, false);
-       int r = imctx->state->open(false);
-       if (r < 0) {
-         lderr(cct) << "error opening image: "
-                    << cpp_strerror(r) << dendl;
-         return r;
-       }
-
-        if ((imctx->features & RBD_FEATURE_DEEP_FLATTEN) == 0 &&
-            !imctx->snaps.empty()) {
-          lderr(cct) << "snapshot in-use by " << pool << "/" << imctx->name
-                     << dendl;
-          imctx->state->close();
-          return -EBUSY;
-        }
-
-       librbd::NoOpProgressContext prog_ctx;
-       r = imctx->operations->flatten(prog_ctx);
-       if (r < 0) {
-         lderr(cct) << "error flattening image: " << pool << "/" << id_it
-                    << cpp_strerror(r) << dendl;
-          imctx->state->close();
-         return r;
-       }
-
-       r = imctx->state->close();
+    for (auto &child_image : child_images){
+      std::string pool = child_image.pool_name;
+      if (child_pool_id == -1 ||
+          child_pool_id != child_image.pool_id ||
+          child_io_ctx.get_namespace() != child_image.pool_namespace) {
+        r = util::create_ioctx(ictx->md_ctx, "child image",
+                               child_image.pool_id, child_image.pool_namespace,
+                               &child_io_ctx);
         if (r < 0) {
-          lderr(cct) << "failed to close image: " << cpp_strerror(r) << dendl;
           return r;
         }
-      }
-      pctx.update_progress(++i, size);
-      assert(i <= size);
-    }
 
-    return 0;
-  }
-
-  int list_children(ImageCtx *ictx, set<pair<string, string> >& names)
-  {
-    CephContext *cct = ictx->cct;
-    ldout(cct, 20) << "children list " << ictx->name << dendl;
-
-    int r = ictx->state->refresh_if_required();
-    if (r < 0) {
-      return r;
-    }
+        child_pool_id = child_image.pool_id;
+      }
 
-    RWLock::RLocker l(ictx->snap_lock);
-    ParentSpec parent_spec(ictx->md_ctx.get_id(), ictx->id, ictx->snap_id);
-    map< pair<int64_t, string>, set<string> > image_info;
+      ImageCtx *imctx = new ImageCtx("", child_image.image_id, nullptr,
+                                     child_io_ctx, false);
+      r = imctx->state->open(0);
+      if (r < 0) {
+        lderr(cct) << "error opening image: " << cpp_strerror(r) << dendl;
+        return r;
+      }
 
-    r = api::Image<>::list_children(ictx, parent_spec, &image_info);
-    if (r < 0) {
-      return r;
-    }
+      if ((imctx->features & RBD_FEATURE_DEEP_FLATTEN) == 0 &&
+          !imctx->snaps.empty()) {
+        lderr(cct) << "snapshot in-use by " << pool << "/" << imctx->name
+                   << dendl;
+        imctx->state->close();
+        return -EBUSY;
+      }
 
-    Rados rados(ictx->md_ctx);
-    for ( auto &info : image_info){
-      IoCtx ioctx;
-      r = rados.ioctx_create2(info.first.first, ioctx);
+      librbd::NoOpProgressContext prog_ctx;
+      r = imctx->operations->flatten(prog_ctx);
       if (r < 0) {
-        lderr(cct) << "Error accessing child image pool " << info.first.second
-                   << dendl;
+        lderr(cct) << "error flattening image: " << pool << "/"
+                   << (child_image.pool_namespace.empty() ?
+                        "" : "/" + child_image.pool_namespace)
+                   << child_image.image_name << cpp_strerror(r) << dendl;
+        imctx->state->close();
         return r;
       }
 
-      for (auto &id_it : info.second) {
-       string name;
-       r = cls_client::dir_get_name(&ioctx, RBD_DIRECTORY, id_it, &name);
-       if (r < 0) {
-         lderr(cct) << "Error looking up name for image id " << id_it
-                    << " in pool " << info.first.second << dendl;
-         return r;
-       }
-       names.insert(make_pair(info.first.second, name));
+      r = imctx->state->close();
+      if (r < 0) {
+        lderr(cct) << "failed to close image: " << cpp_strerror(r) << dendl;
+        return r;
       }
+
+      pctx.update_progress(++i, size);
+      ceph_assert(i <= size);
     }
-    
+
     return 0;
   }
 
@@ -731,6 +646,11 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
       return r;
     }
 
+    if (!io_ctx.get_namespace().empty()) {
+      lderr(cct) << "attempting to add v1 image to namespace" << dendl;
+      return -EINVAL;
+    }
+
     ldout(cct, 2) << "adding rbd image to directory..." << dendl;
     r = tmap_set(io_ctx, imgname);
     if (r < 0) {
@@ -774,12 +694,12 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     ImageOptions opts;
 
     int r = opts.set(RBD_IMAGE_OPTION_ORDER, order_);
-    assert(r == 0);
+    ceph_assert(r == 0);
 
     r = create(io_ctx, imgname, "", size, opts, "", "", false);
 
     int r1 = opts.get(RBD_IMAGE_OPTION_ORDER, &order_);
-    assert(r1 == 0);
+    ceph_assert(r1 == 0);
     *order = order_;
 
     return r;
@@ -798,20 +718,20 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     int r;
 
     r = opts.set(RBD_IMAGE_OPTION_FORMAT, format);
-    assert(r == 0);
+    ceph_assert(r == 0);
     r = opts.set(RBD_IMAGE_OPTION_FEATURES, features);
-    assert(r == 0);
+    ceph_assert(r == 0);
     r = opts.set(RBD_IMAGE_OPTION_ORDER, order_);
-    assert(r == 0);
+    ceph_assert(r == 0);
     r = opts.set(RBD_IMAGE_OPTION_STRIPE_UNIT, stripe_unit);
-    assert(r == 0);
+    ceph_assert(r == 0);
     r = opts.set(RBD_IMAGE_OPTION_STRIPE_COUNT, stripe_count);
-    assert(r == 0);
+    ceph_assert(r == 0);
 
     r = create(io_ctx, imgname, "", size, opts, "", "", false);
 
     int r1 = opts.get(RBD_IMAGE_OPTION_ORDER, &order_);
-    assert(r1 == 0);
+    ceph_assert(r1 == 0);
     *order = order_;
 
     return r;
@@ -830,13 +750,19 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     }
 
     CephContext *cct = (CephContext *)io_ctx.cct();
+    uint64_t flatten;
+    if (opts.get(RBD_IMAGE_OPTION_FLATTEN, &flatten) == 0) {
+      lderr(cct) << "create does not support 'flatten' image option" << dendl;
+      return -EINVAL;
+    }
+
     ldout(cct, 10) << __func__ << " name=" << image_name << ", "
                   << "id= " << id << ", "
                   << "size=" << size << ", opts=" << opts << dendl;
 
     uint64_t format;
     if (opts.get(RBD_IMAGE_OPTION_FORMAT, &format) != 0)
-      format = cct->_conf->get_val<int64_t>("rbd_default_format");
+      format = cct->_conf.get_val<uint64_t>("rbd_default_format");
     bool old_format = format == 1;
 
     // make sure it doesn't already exist, in either format
@@ -853,7 +779,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
 
     uint64_t order = 0;
     if (opts.get(RBD_IMAGE_OPTION_ORDER, &order) != 0 || order == 0) {
-      order = cct->_conf->get_val<int64_t>("rbd_default_order");
+      order = cct->_conf.get_val<uint64_t>("rbd_default_order");
     }
     r = image::CreateRequest<>::validate_order(cct, order);
     if (r < 0) {
@@ -861,15 +787,23 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     }
 
     if (old_format) {
+      if ( !getenv("RBD_FORCE_ALLOW_V1") ) {
+        lderr(cct) << "Format 1 image creation unsupported. " << dendl;
+        return -EINVAL;
+      }
+      lderr(cct) << "Forced V1 image creation. " << dendl;
       r = create_v1(io_ctx, image_name.c_str(), size, order);
     } else {
       ThreadPool *thread_pool;
       ContextWQ *op_work_queue;
       ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
 
+      ConfigProxy config{cct->_conf};
+      api::Config<>::apply_pool_overrides(io_ctx, &config);
+
       C_SaferCond cond;
       image::CreateRequest<> *req = image::CreateRequest<>::create(
-        io_ctx, image_name, id, size, opts, non_primary_global_image_id,
+        config, io_ctx, image_name, id, size, opts, non_primary_global_image_id,
         primary_mirror_uuid, skip_mirror_enable, op_work_queue, &cond);
       req->send();
 
@@ -877,7 +811,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     }
 
     int r1 = opts.set(RBD_IMAGE_OPTION_ORDER, order);
-    assert(r1 == 0);
+    ceph_assert(r1 == 0);
 
     return r;
   }
@@ -898,70 +832,81 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     opts.set(RBD_IMAGE_OPTION_STRIPE_UNIT, stripe_unit);
     opts.set(RBD_IMAGE_OPTION_STRIPE_COUNT, stripe_count);
 
-    int r = clone(p_ioctx, p_name, p_snap_name, c_ioctx, c_name, opts);
+    int r = clone(p_ioctx, nullptr, p_name, p_snap_name, c_ioctx, nullptr,
+                  c_name, opts, "", "");
     opts.get(RBD_IMAGE_OPTION_ORDER, &order);
     *c_order = order;
     return r;
   }
 
-  int clone(IoCtx& p_ioctx, const char *p_name, const char *p_snap_name,
-           IoCtx& c_ioctx, const char *c_name, ImageOptions& c_opts)
+  int clone(IoCtx& p_ioctx, const char *p_id, const char *p_name,
+            const char *p_snap_name, IoCtx& c_ioctx, const char *c_id,
+            const char *c_name, ImageOptions& c_opts,
+            const std::string &non_primary_global_image_id,
+            const std::string &primary_mirror_uuid)
   {
+    ceph_assert((p_id == nullptr) ^ (p_name == nullptr));
+
     CephContext *cct = (CephContext *)p_ioctx.cct();
-    if (p_snap_name == NULL) {
+    if (p_snap_name == nullptr) {
       lderr(cct) << "image to be cloned must be a snapshot" << dendl;
       return -EINVAL;
     }
 
-    // make sure parent snapshot exists
-    ImageCtx *p_imctx = new ImageCtx(p_name, "", p_snap_name, p_ioctx, true);
-    int r = p_imctx->state->open(false);
-    if (r < 0) {
-      lderr(cct) << "error opening parent image: "
-                << cpp_strerror(r) << dendl;
-      return r;
-    }
-
-    r = clone(p_imctx, c_ioctx, c_name, "", c_opts, "", "");
-
-    int close_r = p_imctx->state->close();
-    if (r == 0 && close_r < 0) {
-      r = close_r;
+    uint64_t flatten;
+    if (c_opts.get(RBD_IMAGE_OPTION_FLATTEN, &flatten) == 0) {
+      lderr(cct) << "clone does not support 'flatten' image option" << dendl;
+      return -EINVAL;
     }
 
-    if (r < 0) {
-      return r;
+    int r;
+    std::string parent_id;
+    if (p_id == nullptr) {
+      r = cls_client::dir_get_id(&p_ioctx, RBD_DIRECTORY, p_name,
+                                 &parent_id);
+      if (r < 0) {
+        if (r != -ENOENT) {
+          lderr(cct) << "failed to retrieve parent image id: "
+                     << cpp_strerror(r) << dendl;
+        }
+        return r;
+      }
+    } else {
+      parent_id = p_id;
     }
-    return 0;
-  }
 
-  int clone(ImageCtx *p_imctx, IoCtx& c_ioctx, const std::string &c_name,
-            const std::string &c_id, ImageOptions& c_opts,
-            const std::string &non_primary_global_image_id,
-            const std::string &primary_mirror_uuid)
-  {
-    std::string id(c_id);
-    if (id.empty()) {
-      id = util::generate_image_id(c_ioctx);
+    std::string clone_id;
+    if (c_id == nullptr) {
+      clone_id = util::generate_image_id(c_ioctx);
+    } else {
+      clone_id = c_id;
     }
 
-    CephContext *cct = (CephContext *)c_ioctx.cct();
     ldout(cct, 10) << __func__ << " "
                   << "c_name=" << c_name << ", "
-                  << "c_id= " << c_id << ", "
+                  << "c_id= " << clone_id << ", "
                   << "c_opts=" << c_opts << dendl;
 
+    ConfigProxy config{reinterpret_cast<CephContext *>(c_ioctx.cct())->_conf};
+    api::Config<>::apply_pool_overrides(c_ioctx, &config);
+
     ThreadPool *thread_pool;
     ContextWQ *op_work_queue;
     ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
 
     C_SaferCond cond;
     auto *req = image::CloneRequest<>::create(
-      p_imctx, c_ioctx, c_name, id, c_opts,
-      non_primary_global_image_id, primary_mirror_uuid, op_work_queue, &cond);
+      config, p_ioctx, parent_id, p_snap_name, CEPH_NOSNAP, c_ioctx, c_name,
+      clone_id, c_opts, non_primary_global_image_id, primary_mirror_uuid,
+      op_work_queue, &cond);
     req->send();
 
-    return cond.wait();
+    r = cond.wait();
+    if (r < 0) {
+      return r;
+    }
+
+    return 0;
   }
 
   int rename(IoCtx& io_ctx, const char *srcname, const char *dstname)
@@ -971,7 +916,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
                   << dstname << dendl;
 
     ImageCtx *ictx = new ImageCtx(srcname, "", "", io_ctx, false);
-    int r = ictx->state->open(false);
+    int r = ictx->state->open(0);
     if (r < 0) {
       lderr(cct) << "error opening source image: " << cpp_strerror(r) << dendl;
       return r;
@@ -1034,67 +979,6 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     return ictx->get_parent_overlap(ictx->snap_id, overlap);
   }
 
-  int get_parent_info(ImageCtx *ictx, string *parent_pool_name,
-                      string *parent_name, string *parent_id,
-                      string *parent_snap_name)
-  {
-    int r = ictx->state->refresh_if_required();
-    if (r < 0)
-      return r;
-
-    RWLock::RLocker l(ictx->snap_lock);
-    RWLock::RLocker l2(ictx->parent_lock);
-    if (ictx->parent == NULL) {
-      return -ENOENT;
-    }
-
-    ParentSpec parent_spec;
-
-    if (ictx->snap_id == CEPH_NOSNAP) {
-      parent_spec = ictx->parent_md.spec;
-    } else {
-      r = ictx->get_parent_spec(ictx->snap_id, &parent_spec);
-      if (r < 0) {
-       lderr(ictx->cct) << "Can't find snapshot id = " << ictx->snap_id
-                         << dendl;
-       return r;
-      }
-      if (parent_spec.pool_id == -1)
-       return -ENOENT;
-    }
-    if (parent_pool_name) {
-      Rados rados(ictx->md_ctx);
-      r = rados.pool_reverse_lookup(parent_spec.pool_id,
-                                   parent_pool_name);
-      if (r < 0) {
-       lderr(ictx->cct) << "error looking up pool name: " << cpp_strerror(r)
-                        << dendl;
-       return r;
-      }
-    }
-
-    if (parent_snap_name) {
-      RWLock::RLocker l(ictx->parent->snap_lock);
-      r = ictx->parent->get_snap_name(parent_spec.snap_id,
-                                     parent_snap_name);
-      if (r < 0) {
-       lderr(ictx->cct) << "error finding parent snap name: "
-                        << cpp_strerror(r) << dendl;
-       return r;
-      }
-    }
-
-    if (parent_name) {
-      RWLock::RLocker snap_locker(ictx->parent->snap_lock);
-      *parent_name = ictx->parent->name;
-    }
-    if (parent_id) {
-      *parent_id = ictx->parent->id;
-    }
-
-    return 0;
-  }
-
   int get_flags(ImageCtx *ictx, uint64_t *flags)
   {
     int r = ictx->state->refresh_if_required();
@@ -1251,8 +1135,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
   }
 
   int lock_break(ImageCtx *ictx, rbd_lock_mode_t lock_mode,
-                 const std::string &lock_owner)
-  {
+                 const std::string &lock_owner) {
     CephContext *cct = ictx->cct;
     ldout(cct, 20) << __func__ << ": ictx=" << ictx << ", "
                    << "lock_mode=" << lock_mode << ", "
@@ -1312,301 +1195,6 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     return 0;
   }
 
-  int remove(IoCtx& io_ctx, const std::string &image_name,
-             const std::string &image_id, ProgressContext& prog_ctx,
-             bool force, bool from_trash_remove)
-  {
-    CephContext *cct((CephContext *)io_ctx.cct());
-    ldout(cct, 20) << "remove " << &io_ctx << " "
-                   << (image_id.empty() ? image_name : image_id) << dendl;
-
-    ThreadPool *thread_pool;
-    ContextWQ *op_work_queue;
-    ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
-
-    C_SaferCond cond;
-    auto req = librbd::image::RemoveRequest<>::create(
-      io_ctx, image_name, image_id, force, from_trash_remove, prog_ctx,
-      op_work_queue, &cond);
-    req->send();
-
-    return cond.wait();
-  }
-
-  int trash_move(librados::IoCtx &io_ctx, rbd_trash_image_source_t source,
-                 const std::string &image_name, uint64_t delay) {
-    CephContext *cct((CephContext *)io_ctx.cct());
-    ldout(cct, 20) << "trash_move " << &io_ctx << " " << image_name
-                   << dendl;
-
-    std::string image_id;
-    ImageCtx *ictx = new ImageCtx(image_name, "", nullptr, io_ctx, false);
-    int r = ictx->state->open(true);
-    if (r < 0) {
-      ictx = nullptr;
-
-      if (r != -ENOENT) {
-        ldout(cct, 2) << "error opening image: " << cpp_strerror(-r) << dendl;
-        return r;
-      }
-
-      // try to get image id from the directory
-      r = cls_client::dir_get_id(&io_ctx, RBD_DIRECTORY, image_name, &image_id);
-      if (r < 0) {
-        if (r != -ENOENT) {
-          ldout(cct, 2) << "error reading image id from dirctory: "
-                        << cpp_strerror(-r) << dendl;
-        }
-        return r;
-      }
-    } else {
-      if (ictx->old_format) {
-        ictx->state->close();
-        return -EOPNOTSUPP;
-      }
-
-      image_id = ictx->id;
-      ictx->owner_lock.get_read();
-      if (ictx->exclusive_lock != nullptr) {
-        r = ictx->operations->prepare_image_update(false);
-        if (r < 0) {
-         lderr(cct) << "cannot obtain exclusive lock - not removing" << dendl;
-         ictx->owner_lock.put_read();
-         ictx->state->close();
-          return -EBUSY;
-        }
-      }
-    }
-
-    BOOST_SCOPE_EXIT_ALL(ictx, cct) {
-      if (ictx == nullptr)
-        return;
-
-      bool is_locked = ictx->exclusive_lock != nullptr &&
-                       ictx->exclusive_lock->is_lock_owner();
-      if (is_locked) {
-        C_SaferCond ctx;
-        auto exclusive_lock = ictx->exclusive_lock;
-        exclusive_lock->shut_down(&ctx);
-        ictx->owner_lock.put_read();
-        int r = ctx.wait();
-        if (r < 0) {
-          lderr(cct) << "error shutting down exclusive lock" << dendl;
-        }
-        delete exclusive_lock;
-      } else {
-        ictx->owner_lock.put_read();
-      }
-      ictx->state->close();
-    };
-
-    ldout(cct, 2) << "adding image entry to rbd_trash" << dendl;
-    utime_t ts = ceph_clock_now();
-    utime_t deferment_end_time = ts;
-    deferment_end_time += (double)delay;
-    cls::rbd::TrashImageSource trash_source =
-        static_cast<cls::rbd::TrashImageSource>(source);
-    cls::rbd::TrashImageSpec trash_spec(trash_source, image_name, ts,
-                                        deferment_end_time);
-    r = cls_client::trash_add(&io_ctx, image_id, trash_spec);
-    if (r < 0 && r != -EEXIST) {
-      lderr(cct) << "error adding image " << image_name << " to rbd_trash"
-                 << dendl;
-      return r;
-    } else if (r == -EEXIST) {
-      ldout(cct, 10) << "found previous unfinished deferred remove for image:"
-                     << image_id << dendl;
-      // continue with removing image from directory
-    }
-
-    ldout(cct, 2) << "removing id object..." << dendl;
-    r = io_ctx.remove(util::id_obj_name(image_name));
-    if (r < 0 && r != -ENOENT) {
-      lderr(cct) << "error removing id object: " << cpp_strerror(r)
-                 << dendl;
-      return r;
-    }
-
-    ldout(cct, 2) << "removing rbd image from v2 directory..." << dendl;
-    r = cls_client::dir_remove_image(&io_ctx, RBD_DIRECTORY, image_name,
-                                     image_id);
-    if (r < 0) {
-      if (r != -ENOENT) {
-        lderr(cct) << "error removing image from v2 directory: "
-                   << cpp_strerror(-r) << dendl;
-      }
-      return r;
-    }
-
-    return 0;
-  }
-
-  int trash_get(IoCtx &io_ctx, const std::string &id,
-                trash_image_info_t *info) {
-    CephContext *cct((CephContext *)io_ctx.cct());
-    ldout(cct, 20) << __func__ << " " << &io_ctx << dendl;
-
-    cls::rbd::TrashImageSpec spec;
-    int r = cls_client::trash_get(&io_ctx, id, &spec);
-    if (r == -ENOENT) {
-      return r;
-    } else if (r < 0) {
-      lderr(cct) << "error retrieving trash entry: " << cpp_strerror(r)
-                 << dendl;
-      return r;
-    }
-
-    rbd_trash_image_source_t source = static_cast<rbd_trash_image_source_t>(
-      spec.source);
-    *info = trash_image_info_t{id, spec.name, source, spec.deletion_time.sec(),
-                               spec.deferment_end_time.sec()};
-    return 0;
-  }
-
-  int trash_list(IoCtx &io_ctx, vector<trash_image_info_t> &entries) {
-    CephContext *cct((CephContext *)io_ctx.cct());
-    ldout(cct, 20) << "trash_list " << &io_ctx << dendl;
-
-    bool more_entries;
-    uint32_t max_read = 1024;
-    std::string last_read = "";
-    do {
-      map<string, cls::rbd::TrashImageSpec> trash_entries;
-      int r = cls_client::trash_list(&io_ctx, last_read, max_read,
-                                     &trash_entries);
-      if (r < 0 && r != -ENOENT) {
-        lderr(cct) << "error listing rbd trash entries: " << cpp_strerror(r)
-                   << dendl;
-        return r;
-      } else if (r == -ENOENT) {
-        break;
-      }
-
-      if (trash_entries.empty()) {
-        break;
-      }
-
-      for (const auto &entry : trash_entries) {
-        rbd_trash_image_source_t source =
-            static_cast<rbd_trash_image_source_t>(entry.second.source);
-        entries.push_back({entry.first, entry.second.name, source,
-                           entry.second.deletion_time.sec(),
-                           entry.second.deferment_end_time.sec()});
-      }
-      last_read = trash_entries.rbegin()->first;
-      more_entries = (trash_entries.size() >= max_read);
-    } while (more_entries);
-
-    return 0;
-  }
-
-  int trash_remove(IoCtx &io_ctx, const std::string &image_id, bool force,
-                   ProgressContext& prog_ctx) {
-    CephContext *cct((CephContext *)io_ctx.cct());
-    ldout(cct, 20) << "trash_remove " << &io_ctx << " " << image_id
-                   << " " << force << dendl;
-
-    cls::rbd::TrashImageSpec trash_spec;
-    int r = cls_client::trash_get(&io_ctx, image_id, &trash_spec);
-    if (r < 0) {
-      lderr(cct) << "error getting image id " << image_id
-                 << " info from trash: " << cpp_strerror(r) << dendl;
-      return r;
-    }
-
-    utime_t now = ceph_clock_now();
-    if (now < trash_spec.deferment_end_time && !force) {
-      lderr(cct) << "error: deferment time has not expired." << dendl;
-      return -EPERM;
-    }
-
-    r = remove(io_ctx, "", image_id, prog_ctx, false, true);
-    if (r < 0) {
-      lderr(cct) << "error removing image " << image_id
-                 << ", which is pending deletion" << dendl;
-      return r;
-    }
-    r = cls_client::trash_remove(&io_ctx, image_id);
-    if (r < 0 && r != -ENOENT) {
-      lderr(cct) << "error removing image " << image_id
-                 << " from rbd_trash object" << dendl;
-      return r;
-    }
-    return 0;
-  }
-
-  int trash_restore(librados::IoCtx &io_ctx, const std::string &image_id,
-                    const std::string &image_new_name) {
-    CephContext *cct((CephContext *)io_ctx.cct());
-    ldout(cct, 20) << "trash_restore " << &io_ctx << " " << image_id << " "
-                   << image_new_name << dendl;
-
-    cls::rbd::TrashImageSpec trash_spec;
-    int r = cls_client::trash_get(&io_ctx, image_id, &trash_spec);
-    if (r < 0) {
-      lderr(cct) << "error getting image id " << image_id
-                 << " info from trash: " << cpp_strerror(r) << dendl;
-      return r;
-    }
-
-    std::string image_name = image_new_name;
-    if (image_name.empty()) {
-      // if user didn't specify a new name, let's try using the old name
-      image_name = trash_spec.name;
-      ldout(cct, 20) << "restoring image id " << image_id << " with name "
-                     << image_name << dendl;
-    }
-
-    // check if no image exists with the same name
-    bool create_id_obj = true;
-    std::string existing_id;
-    r = cls_client::get_id(&io_ctx, util::id_obj_name(image_name), &existing_id);
-    if (r < 0 && r != -ENOENT) {
-      lderr(cct) << "error checking if image " << image_name << " exists: "
-                 << cpp_strerror(r) << dendl;
-      return r;
-    } else if (r != -ENOENT){
-      // checking if we are recovering from an incomplete restore
-      if (existing_id != image_id) {
-        ldout(cct, 2) << "an image with the same name already exists" << dendl;
-        return -EEXIST;
-      }
-      create_id_obj = false;
-    }
-
-    if (create_id_obj) {
-      ldout(cct, 2) << "adding id object" << dendl;
-      librados::ObjectWriteOperation op;
-      op.create(true);
-      cls_client::set_id(&op, image_id);
-      r = io_ctx.operate(util::id_obj_name(image_name), &op);
-      if (r < 0) {
-        lderr(cct) << "error adding id object for image " << image_name
-                   << ": " << cpp_strerror(r) << dendl;
-        return r;
-      }
-    }
-
-    ldout(cct, 2) << "adding rbd image from v2 directory..." << dendl;
-    r = cls_client::dir_add_image(&io_ctx, RBD_DIRECTORY, image_name,
-                                  image_id);
-    if (r < 0 && r != -EEXIST) {
-      lderr(cct) << "error adding image to v2 directory: "
-                 << cpp_strerror(r) << dendl;
-      return r;
-    }
-
-    ldout(cct, 2) << "removing image from trash..." << dendl;
-    r = cls_client::trash_remove(&io_ctx, image_id);
-    if (r < 0 && r != -ENOENT) {
-      lderr(cct) << "error removing image id " << image_id << " from trash: "
-                 << cpp_strerror(r) << dendl;
-      return r;
-    }
-
-    return 0;
-  }
-
   int snap_list(ImageCtx *ictx, vector<snap_info_t>& snaps)
   {
     ldout(ictx->cct, 20) << "snap_list " << ictx << dendl;
@@ -1693,7 +1281,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
   int snap_get_timestamp(ImageCtx *ictx, uint64_t snap_id, struct timespec *timestamp)
   {
     std::map<librados::snap_t, SnapInfo>::iterator snap_it = ictx->snap_info.find(snap_id);
-    assert(snap_it != ictx->snap_info.end());
+    ceph_assert(snap_it != ictx->snap_info.end());
     utime_t time = snap_it->second.timestamp;
     time.to_timespec(timestamp);
     return 0;
@@ -1729,6 +1317,12 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
           ImageOptions& opts, ProgressContext &prog_ctx, size_t sparse_size)
   {
     CephContext *cct = (CephContext *)dest_md_ctx.cct();
+    uint64_t flatten;
+    if (opts.get(RBD_IMAGE_OPTION_FLATTEN, &flatten) == 0) {
+      lderr(cct) << "copy does not support 'flatten' image option" << dendl;
+      return -EINVAL;
+    }
+
     ldout(cct, 20) << "copy " << src->name
                   << (src->snap_name.length() ? "@" + src->snap_name : "")
                   << " -> " << destname << " opts = " << opts << dendl;
@@ -1768,9 +1362,9 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     }
     opts.set(RBD_IMAGE_OPTION_ORDER, static_cast<uint64_t>(order));
 
-    ImageCtx *dest = new librbd::ImageCtx(destname, "", NULL,
-                                         dest_md_ctx, false);
-    r = dest->state->open(false);
+    ImageCtx *dest = new librbd::ImageCtx(destname, "", nullptr, dest_md_ctx,
+                                          false);
+    r = dest->state->open(0);
     if (r < 0) {
       lderr(cct) << "failed to read newly created header" << dendl;
       return r;
@@ -1814,7 +1408,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
        m_throttle->end_op(r);
        return;
       }
-      assert(m_bl->length() == (size_t)r);
+      ceph_assert(m_bl->length() == (size_t)r);
 
       if (m_bl->is_zero()) {
        delete m_bl;
@@ -1832,12 +1426,12 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
       });
       auto gather_ctx = new C_Gather(m_dest->cct, end_op_ctx);
 
-      bufferptr m_ptr(m_bl->length());
-      m_bl->rebuild(m_ptr);
+      m_bl->rebuild(buffer::ptr_node::create(m_bl->length()));
       size_t write_offset = 0;
       size_t write_length = 0;
       size_t offset = 0;
       size_t length = m_bl->length();
+      const auto& m_ptr = m_bl->front();
       while (offset < length) {
        if (util::calc_sparse_extent(m_ptr,
                                     m_sparse_size,
@@ -1845,9 +1439,9 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
                                     &write_offset,
                                     &write_length,
                                     &offset)) {
-         bufferptr write_ptr(m_ptr, write_offset, write_length);
          bufferlist *write_bl = new bufferlist();
-         write_bl->push_back(write_ptr);
+         write_bl->push_back(
+           buffer::ptr_node::create(m_ptr, write_offset, write_length));
          Context *ctx = new C_CopyWrite(write_bl, gather_ctx->new_sub());
          auto comp = io::AioCompletion::create(ctx);
 
@@ -1862,7 +1456,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
        }
       }
       delete m_bl;
-      assert(gather_ctx->get_sub_created_count() > 0);
+      ceph_assert(gather_ctx->get_sub_created_count() > 0);
       gather_ctx->activate();
     }
 
@@ -1923,15 +1517,35 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     }
 
     RWLock::RLocker owner_lock(src->owner_lock);
-    SimpleThrottle throttle(src->concurrent_management_ops, false);
+    SimpleThrottle throttle(src->config.get_val<uint64_t>("rbd_concurrent_management_ops"), false);
     uint64_t period = src->get_stripe_period();
     unsigned fadvise_flags = LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL |
                             LIBRADOS_OP_FLAG_FADVISE_NOCACHE;
+    uint64_t object_id = 0;
     for (uint64_t offset = 0; offset < src_size; offset += period) {
       if (throttle.pending_error()) {
         return throttle.wait_for_ret();
       }
 
+      {
+        RWLock::RLocker snap_locker(src->snap_lock);
+        if (src->object_map != nullptr) {
+          bool skip = true;
+          // each period is related to src->stripe_count objects, check them all
+          for (uint64_t i=0; i < src->stripe_count; i++) {
+            if (object_id < src->object_map->size() &&
+                src->object_map->object_may_exist(object_id)) {
+              skip = false;
+            }
+            ++object_id;
+          }
+
+          if (skip) continue;
+        } else {
+          object_id += src->stripe_count;
+        }
+      }
+
       uint64_t len = min(period, src_size - offset);
       bufferlist *bl = new bufferlist();
       auto ctx = new C_CopyRead(&throttle, dest, offset, bl, sparse_size);
@@ -1953,32 +1567,6 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     return r;
   }
 
-  int snap_set(ImageCtx *ictx, const cls::rbd::SnapshotNamespace &snap_namespace,
-              const char *snap_name)
-  {
-    ldout(ictx->cct, 20) << "snap_set " << ictx << " snap = "
-                        << (snap_name ? snap_name : "NULL") << dendl;
-
-    // ignore return value, since we may be set to a non-existent
-    // snapshot and the user is trying to fix that
-    ictx->state->refresh_if_required();
-
-    C_SaferCond ctx;
-    std::string name(snap_name == nullptr ? "" : snap_name);
-    ictx->state->snap_set(snap_namespace, name, &ctx);
-
-    int r = ctx.wait();
-    if (r < 0) {
-      if (r != -ENOENT) {
-        lderr(ictx->cct) << "failed to " << (name.empty() ? "un" : "") << "set "
-                         << "snapshot: " << cpp_strerror(r) << dendl;
-      }
-      return r;
-    }
-
-    return 0;
-  }
-
   int list_lockers(ImageCtx *ictx,
                   std::list<locker_t> *lockers,
                   bool *exclusive,
@@ -2003,7 +1591,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
        locker_t locker;
        locker.client = stringify(it->first.locker);
        locker.cookie = it->first.cookie;
-       locker.address = stringify(it->second.addr);
+       locker.address = it->second.addr.get_legacy_str();
        lockers->push_back(locker);
       }
     }
@@ -2080,7 +1668,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
       return -EINVAL;
     }
 
-    if (ictx->blacklist_on_break_lock) {
+    if (ictx->config.get_val<bool>("rbd_blacklist_on_break_lock")) {
       typedef std::map<rados::cls::lock::locker_id_t,
                       rados::cls::lock::locker_info_t> Lockers;
       Lockers lockers;
@@ -2099,7 +1687,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
       for (Lockers::iterator it = lockers.begin();
            it != lockers.end(); ++it) {
         if (it->first.locker == lock_client) {
-          client_address = stringify(it->second.addr);
+          client_address = it->second.addr.get_legacy_str();
           break;
         }
       }
@@ -2109,8 +1697,9 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
 
       RWLock::RLocker locker(ictx->md_lock);
       librados::Rados rados(ictx->md_ctx);
-      r = rados.blacklist_add(client_address,
-                             ictx->blacklist_expire_seconds);
+      r = rados.blacklist_add(
+        client_address,
+        ictx->config.get_val<uint64_t>("rbd_blacklist_expire_seconds"));
       if (r < 0) {
         lderr(ictx->cct) << "unable to blacklist client: " << cpp_strerror(r)
                       << dendl;
@@ -2138,7 +1727,8 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
                       int (*cb)(uint64_t, size_t, const char *, void *),
                       void *arg)
   {
-    utime_t start_time, elapsed;
+    coarse_mono_time start_time;
+    ceph::timespan elapsed;
 
     ldout(ictx->cct, 20) << "read_iterate " << ictx << " off = " << off
                         << " len = " << len << dendl;
@@ -2164,7 +1754,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     }
 
     RWLock::RLocker owner_locker(ictx->owner_lock);
-    start_time = ceph_clock_now();
+    start_time = coarse_mono_clock::now();
     while (left > 0) {
       uint64_t period_off = off - (off % period);
       uint64_t read_len = min(period_off + period - off, left);
@@ -2192,7 +1782,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
       off += ret;
     }
 
-    elapsed = ceph_clock_now() - start_time;
+    elapsed = coarse_mono_clock::now() - start_time;
     ictx->perfcounter->tinc(l_librbd_rd_latency, elapsed);
     ictx->perfcounter->inc(l_librbd_rd);
     ictx->perfcounter->inc(l_librbd_rd_bytes, mylen);
@@ -2202,7 +1792,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
   // validate extent against image size; clip to image size if necessary
   int clip_io(ImageCtx *ictx, uint64_t off, uint64_t *len)
   {
-    assert(ictx->snap_lock.is_locked());
+    ceph_assert(ictx->snap_lock.is_locked());
     uint64_t image_size = ictx->get_image_size(ictx->snap_id);
     bool snap_exists = ictx->snap_exists;
 
@@ -2224,40 +1814,22 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     return 0;
   }
 
-  int flush(ImageCtx *ictx)
+  int invalidate_cache(ImageCtx *ictx)
   {
     CephContext *cct = ictx->cct;
-    ldout(cct, 20) << "flush " << ictx << dendl;
+    ldout(cct, 20) << "invalidate_cache " << ictx << dendl;
 
     int r = ictx->state->refresh_if_required();
     if (r < 0) {
       return r;
     }
 
-    ictx->user_flushed();
     C_SaferCond ctx;
     {
       RWLock::RLocker owner_locker(ictx->owner_lock);
-      ictx->flush(&ctx);
+      ictx->io_object_dispatcher->invalidate_cache(&ctx);
     }
     r = ctx.wait();
-
-    ictx->perfcounter->inc(l_librbd_flush);
-    return r;
-  }
-
-  int invalidate_cache(ImageCtx *ictx)
-  {
-    CephContext *cct = ictx->cct;
-    ldout(cct, 20) << "invalidate_cache " << ictx << dendl;
-
-    int r = ictx->state->refresh_if_required();
-    if (r < 0) {
-      return r;
-    }
-
-    RWLock::RLocker owner_locker(ictx->owner_lock);
-    r = ictx->invalidate_cache(false);
     ictx->perfcounter->inc(l_librbd_invalidate_cache);
     return r;
   }
@@ -2271,9 +1843,8 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
                    << dendl;
     int i = 0;
     Mutex::Locker l(ictx->completed_reqs_lock);
+    numcomp = std::min(numcomp, (int)ictx->completed_reqs.size());
     while (i < numcomp) {
-      if (ictx->completed_reqs.empty())
-        break;
       comps[i++] = ictx->completed_reqs.front();
       ictx->completed_reqs.pop_front();
     }
@@ -2311,10 +1882,18 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     object_t oid;
     uint64_t offset;
     uint64_t length;
+
+    bufferlist read_data;
+    io::ExtentMap extent_map;
+
     C_RBD_Readahead(ImageCtx *ictx, object_t oid, uint64_t offset, uint64_t length)
-      : ictx(ictx), oid(oid), offset(offset), length(length) { }
+      : ictx(ictx), oid(oid), offset(offset), length(length) {
+      ictx->readahead.inc_pending();
+    }
+
     void finish(int r) override {
-      ldout(ictx->cct, 20) << "C_RBD_Readahead on " << oid << ": " << offset << "+" << length << dendl;
+      ldout(ictx->cct, 20) << "C_RBD_Readahead on " << oid << ": "
+                           << offset << "~" << length << dendl;
       ictx->readahead.dec_pending();
     }
   };
@@ -2328,7 +1907,7 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
         ++p) {
       total_bytes += p->second;
     }
-    
+
     ictx->md_lock.get_write();
     bool abort = ictx->readahead_disable_after_bytes != 0 &&
       ictx->total_bytes_read > ictx->readahead_disable_after_bytes;
@@ -2339,9 +1918,10 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     ictx->total_bytes_read += total_bytes;
     ictx->snap_lock.get_read();
     uint64_t image_size = ictx->get_image_size(ictx->snap_id);
+    auto snap_id = ictx->snap_id;
     ictx->snap_lock.put_read();
     ictx->md_lock.put_write();
+
     pair<uint64_t, uint64_t> readahead_extent = ictx->readahead.update(image_extents, image_size);
     uint64_t readahead_offset = readahead_extent.first;
     uint64_t readahead_length = readahead_extent.second;
@@ -2355,11 +1935,13 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
        for (vector<ObjectExtent>::iterator q = p->second.begin(); q != p->second.end(); ++q) {
          ldout(ictx->cct, 20) << "(readahead) oid " << q->oid << " " << q->offset << "~" << q->length << dendl;
 
-         Context *req_comp = new C_RBD_Readahead(ictx, q->oid, q->offset, q->length);
-         ictx->readahead.inc_pending();
-         ictx->aio_read_from_cache(q->oid, q->objectno, NULL,
-                                   q->length, q->offset,
-                                   req_comp, 0, nullptr);
+         auto req_comp = new C_RBD_Readahead(ictx, q->oid, q->offset,
+                                              q->length);
+          auto req = io::ObjectDispatchSpec::create_read(
+            ictx, io::OBJECT_DISPATCH_LAYER_NONE, q->oid.name, q->objectno,
+            q->offset, q->length, snap_id, 0, {}, &req_comp->read_data,
+            &req_comp->extent_map, req_comp);
+          req->send();
        }
       }
       ictx->perfcounter->inc(l_librbd_readahead);
@@ -2367,6 +1949,59 @@ int validate_pool(IoCtx &io_ctx, CephContext *cct) {
     }
   }
 
+  int list_watchers(ImageCtx *ictx,
+                   std::list<librbd::image_watcher_t> &watchers)
+  {
+    int r;
+    std::string header_oid;
+    std::list<obj_watch_t> obj_watchers;
+
+    if (ictx->old_format) {
+      header_oid = util::old_header_name(ictx->name);
+    } else {
+      header_oid = util::header_name(ictx->id);
+    }
+
+    r = ictx->md_ctx.list_watchers(header_oid, &obj_watchers);
+    if (r < 0) {
+      return r;
+    }
+
+    for (auto i = obj_watchers.begin(); i != obj_watchers.end(); ++i) {
+      librbd::image_watcher_t watcher;
+      watcher.addr = i->addr;
+      watcher.id = i->watcher_id;
+      watcher.cookie = i->cookie;
+
+      watchers.push_back(watcher);
+    }
+
+    return 0;
+  }
+
+}
+
+std::ostream &operator<<(std::ostream &os, const librbd::ImageOptions &opts) {
+  os << "[";
+
+  const char *delimiter = "";
+  for (auto &i : librbd::IMAGE_OPTIONS_TYPE_MAPPING) {
+    if (i.second == librbd::STR) {
+      std::string val;
+      if (opts.get(i.first, &val) == 0) {
+        os << delimiter << librbd::image_option_name(i.first) << "=" << val;
+        delimiter = ", ";
+      }
+    } else if (i.second == librbd::UINT64) {
+      uint64_t val;
+      if (opts.get(i.first, &val) == 0) {
+        os << delimiter << librbd::image_option_name(i.first) << "=" << val;
+        delimiter = ", ";
+      }
+    }
+  }
 
+  os << "]";
 
+  return os;
 }