]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/librbd/ImageCtx.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / librbd / ImageCtx.cc
index 3f64a8003b05a95a489aa86f08c615a501b43cbc..8375d1a6390647ff63d4e552c5a70861b69612b2 100644 (file)
 #include "librbd/io/AioCompletion.h"
 #include "librbd/io/AsyncOperation.h"
 #include "librbd/io/ImageRequestWQ.h"
+#include "librbd/io/ObjectDispatcher.h"
 #include "librbd/journal/StandardPolicy.h"
 
 #include "osdc/Striper.h"
 #include <boost/bind.hpp>
+#include <boost/algorithm/string/predicate.hpp>
 
 #define dout_subsys ceph_subsys_rbd
 #undef dout_prefix
@@ -61,7 +63,7 @@ public:
     : ThreadPool(cct, "librbd::thread_pool", "tp_librbd", 1,
                  "rbd_op_threads"),
       op_work_queue(new ContextWQ("librbd::op_work_queue",
-                                  cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
+                                  cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout"),
                                   this)) {
     start();
   }
@@ -88,79 +90,6 @@ public:
   }
 };
 
-struct C_FlushCache : public Context {
-  ImageCtx *image_ctx;
-  Context *on_safe;
-
-  C_FlushCache(ImageCtx *_image_ctx, Context *_on_safe)
-    : image_ctx(_image_ctx), on_safe(_on_safe) {
-  }
-  void finish(int r) override {
-    // successful cache flush indicates all IO is now safe
-    image_ctx->flush_cache(on_safe);
-  }
-};
-
-struct C_ShutDownCache : public Context {
-  ImageCtx *image_ctx;
-  Context *on_finish;
-
-  C_ShutDownCache(ImageCtx *_image_ctx, Context *_on_finish)
-    : image_ctx(_image_ctx), on_finish(_on_finish) {
-  }
-  void finish(int r) override {
-    image_ctx->object_cacher->stop();
-    on_finish->complete(r);
-  }
-};
-
-struct C_InvalidateCache : public Context {
-  ImageCtx *image_ctx;
-  bool purge_on_error;
-  bool reentrant_safe;
-  Context *on_finish;
-
-  C_InvalidateCache(ImageCtx *_image_ctx, bool _purge_on_error,
-                    bool _reentrant_safe, Context *_on_finish)
-    : image_ctx(_image_ctx), purge_on_error(_purge_on_error),
-      reentrant_safe(_reentrant_safe), on_finish(_on_finish) {
-  }
-  void finish(int r) override {
-    assert(image_ctx->cache_lock.is_locked());
-    CephContext *cct = image_ctx->cct;
-
-    if (r == -EBLACKLISTED) {
-      lderr(cct) << "Blacklisted during flush!  Purging cache..." << dendl;
-      image_ctx->object_cacher->purge_set(image_ctx->object_set);
-    } else if (r != 0 && purge_on_error) {
-      lderr(cct) << "invalidate cache encountered error "
-                 << cpp_strerror(r) << " !Purging cache..." << dendl;
-      image_ctx->object_cacher->purge_set(image_ctx->object_set);
-    } else if (r != 0) {
-      lderr(cct) << "flush_cache returned " << r << dendl;
-    }
-
-    loff_t unclean = image_ctx->object_cacher->release_set(
-      image_ctx->object_set);
-    if (unclean == 0) {
-      r = 0;
-    } else {
-      lderr(cct) << "could not release all objects from cache: "
-                 << unclean << " bytes remain" << dendl;
-      if (r == 0) {
-        r = -EBUSY;
-      }
-    }
-
-    if (reentrant_safe) {
-      on_finish->complete(r);
-    } else {
-      image_ctx->op_work_queue->queue(on_finish, r);
-    }
-  }
-
-};
-
 } // anonymous namespace
 
   const string ImageCtx::METADATA_CONF_PREFIX = "conf_";
@@ -168,31 +97,30 @@ struct C_InvalidateCache : public Context {
   ImageCtx::ImageCtx(const string &image_name, const string &image_id,
                     const char *snap, IoCtx& p, bool ro)
     : cct((CephContext*)p.cct()),
+      config(cct->_conf),
       perfcounter(NULL),
       snap_id(CEPH_NOSNAP),
       snap_exists(true),
       read_only(ro),
-      flush_encountered(false),
       exclusive_locked(false),
       name(image_name),
       image_watcher(NULL),
       journal(NULL),
       owner_lock(util::unique_lock_name("librbd::ImageCtx::owner_lock", this)),
       md_lock(util::unique_lock_name("librbd::ImageCtx::md_lock", this)),
-      cache_lock(util::unique_lock_name("librbd::ImageCtx::cache_lock", this)),
       snap_lock(util::unique_lock_name("librbd::ImageCtx::snap_lock", this)),
+      timestamp_lock(util::unique_lock_name("librbd::ImageCtx::timestamp_lock", this)),
       parent_lock(util::unique_lock_name("librbd::ImageCtx::parent_lock", this)),
       object_map_lock(util::unique_lock_name("librbd::ImageCtx::object_map_lock", this)),
       async_ops_lock(util::unique_lock_name("librbd::ImageCtx::async_ops_lock", this)),
       copyup_list_lock(util::unique_lock_name("librbd::ImageCtx::copyup_list_lock", this)),
       completed_reqs_lock(util::unique_lock_name("librbd::ImageCtx::completed_reqs_lock", this)),
       extra_read_flags(0),
-      old_format(true),
+      old_format(false),
       order(0), size(0), features(0),
       format_string(NULL),
       id(image_id), parent(NULL),
       stripe_unit(0), stripe_count(0), flags(0),
-      object_cacher(NULL), writeback_handler(NULL), object_set(NULL),
       readahead(),
       total_bytes_read(0),
       state(new ImageState<>(this)),
@@ -207,16 +135,18 @@ struct C_InvalidateCache : public Context {
     if (snap)
       snap_name = snap;
 
+    // FIPS zeroization audit 20191117: this memset is not security related.
     memset(&header, 0, sizeof(header));
 
     ThreadPool *thread_pool;
     get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
     io_work_queue = new io::ImageRequestWQ<>(
       this, "librbd::io_work_queue",
-      cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
+      cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout"),
       thread_pool);
+    io_object_dispatcher = new io::ObjectDispatcher<>(this);
 
-    if (cct->_conf->get_val<bool>("rbd_auto_exclusive_lock_until_manual_request")) {
+    if (cct->_conf.get_val<bool>("rbd_auto_exclusive_lock_until_manual_request")) {
       exclusive_lock_policy = new exclusive_lock::AutomaticPolicy(this);
     } else {
       exclusive_lock_policy = new exclusive_lock::StandardPolicy(this);
@@ -224,34 +154,32 @@ struct C_InvalidateCache : public Context {
     journal_policy = new journal::StandardPolicy<ImageCtx>(this);
   }
 
+  ImageCtx::ImageCtx(const string &image_name, const string &image_id,
+                    uint64_t snap_id, IoCtx& p, bool ro)
+    : ImageCtx(image_name, image_id, "", p, ro) {
+    open_snap_id = snap_id;
+  }
+
   ImageCtx::~ImageCtx() {
-    assert(image_watcher == NULL);
-    assert(exclusive_lock == NULL);
-    assert(object_map == NULL);
-    assert(journal == NULL);
-    assert(asok_hook == NULL);
+    ceph_assert(image_watcher == NULL);
+    ceph_assert(exclusive_lock == NULL);
+    ceph_assert(object_map == NULL);
+    ceph_assert(journal == NULL);
+    ceph_assert(asok_hook == NULL);
 
     if (perfcounter) {
       perf_stop();
     }
-    if (object_cacher) {
-      delete object_cacher;
-      object_cacher = NULL;
-    }
-    if (writeback_handler) {
-      delete writeback_handler;
-      writeback_handler = NULL;
-    }
-    if (object_set) {
-      delete object_set;
-      object_set = NULL;
-    }
     delete[] format_string;
 
     md_ctx.aio_flush();
-    data_ctx.aio_flush();
+    if (data_ctx.is_valid()) {
+      data_ctx.aio_flush();
+    }
     io_work_queue->drain();
 
+    delete io_object_dispatcher;
+
     delete journal_policy;
     delete exclusive_lock_policy;
     delete io_work_queue;
@@ -260,13 +188,13 @@ struct C_InvalidateCache : public Context {
   }
 
   void ImageCtx::init() {
-    assert(!header_oid.empty());
-    assert(old_format || !id.empty());
+    ceph_assert(!header_oid.empty());
+    ceph_assert(old_format || !id.empty());
 
     asok_hook = new LibrbdAdminSocketHook(this);
 
     string pname = string("librbd-") + id + string("-") +
-      data_ctx.get_pool_name() + string("-") + name;
+      md_ctx.get_pool_name() + string("-") + name;
     if (!snap_name.empty()) {
       pname += "-";
       pname += snap_name;
@@ -275,47 +203,8 @@ struct C_InvalidateCache : public Context {
     trace_endpoint.copy_name(pname);
     perf_start(pname);
 
-    if (cache) {
-      Mutex::Locker l(cache_lock);
-      ldout(cct, 20) << "enabling caching..." << dendl;
-      writeback_handler = new LibrbdWriteback(this, cache_lock);
-
-      uint64_t init_max_dirty = cache_max_dirty;
-      if (cache_writethrough_until_flush)
-       init_max_dirty = 0;
-      ldout(cct, 20) << "Initial cache settings:"
-                    << " size=" << cache_size
-                    << " num_objects=" << 10
-                    << " max_dirty=" << init_max_dirty
-                    << " target_dirty=" << cache_target_dirty
-                    << " max_dirty_age="
-                    << cache_max_dirty_age << dendl;
-
-      object_cacher = new ObjectCacher(cct, pname, *writeback_handler, cache_lock,
-                                      NULL, NULL,
-                                      cache_size,
-                                      10,  /* reset this in init */
-                                      init_max_dirty,
-                                      cache_target_dirty,
-                                      cache_max_dirty_age,
-                                      cache_block_writes_upfront);
-
-      // size object cache appropriately
-      uint64_t obj = cache_max_dirty_object;
-      if (!obj) {
-       obj = MIN(2000, MAX(10, cache_size / 100 / sizeof(ObjectCacher::Object)));
-      }
-      ldout(cct, 10) << " cache bytes " << cache_size
-       << " -> about " << obj << " objects" << dendl;
-      object_cacher->set_max_objects(obj);
-
-      object_set = new ObjectCacher::ObjectSet(NULL, data_ctx.get_id(), 0);
-      object_set->return_enoent = true;
-      object_cacher->start();
-    }
-
-    readahead.set_trigger_requests(readahead_trigger_requests);
-    readahead.set_max_readahead_size(readahead_max_bytes);
+    ceph_assert(image_watcher == NULL);
+    image_watcher = new ImageWatcher<>(*this);
   }
 
   void ImageCtx::shutdown() {
@@ -326,7 +215,7 @@ struct C_InvalidateCache : public Context {
     asok_hook = nullptr;
   }
 
-  void ImageCtx::init_layout()
+  void ImageCtx::init_layout(int64_t pool_id)
   {
     if (stripe_unit == 0 || stripe_count == 0) {
       stripe_unit = 1ull << order;
@@ -343,7 +232,7 @@ struct C_InvalidateCache : public Context {
     layout.stripe_unit = stripe_unit;
     layout.stripe_count = stripe_count;
     layout.object_size = 1ull << order;
-    layout.pool_id = data_ctx.get_id();  // FIXME: pool id overflow?
+    layout.pool_id = pool_id;  // FIXME: pool id overflow?
 
     delete[] format_string;
     size_t len = object_prefix.length() + 16;
@@ -373,25 +262,24 @@ struct C_InvalidateCache : public Context {
 
     plb.add_u64_counter(l_librbd_rd, "rd", "Reads", "r", perf_prio);
     plb.add_u64_counter(l_librbd_rd_bytes, "rd_bytes", "Data size in reads",
-                        "rb", perf_prio);
+                        "rb", perf_prio, unit_t(UNIT_BYTES));
     plb.add_time_avg(l_librbd_rd_latency, "rd_latency", "Latency of reads",
                      "rl", perf_prio);
     plb.add_u64_counter(l_librbd_wr, "wr", "Writes", "w", perf_prio);
     plb.add_u64_counter(l_librbd_wr_bytes, "wr_bytes", "Written data",
-                        "wb", perf_prio);
+                        "wb", perf_prio, unit_t(UNIT_BYTES));
     plb.add_time_avg(l_librbd_wr_latency, "wr_latency", "Write latency",
                      "wl", perf_prio);
     plb.add_u64_counter(l_librbd_discard, "discard", "Discards");
-    plb.add_u64_counter(l_librbd_discard_bytes, "discard_bytes", "Discarded data");
+    plb.add_u64_counter(l_librbd_discard_bytes, "discard_bytes", "Discarded data", NULL, 0, unit_t(UNIT_BYTES));
     plb.add_time_avg(l_librbd_discard_latency, "discard_latency", "Discard latency");
     plb.add_u64_counter(l_librbd_flush, "flush", "Flushes");
-    plb.add_u64_counter(l_librbd_aio_flush, "aio_flush", "Async flushes");
-    plb.add_time_avg(l_librbd_aio_flush_latency, "aio_flush_latency", "Latency of async flushes");
+    plb.add_time_avg(l_librbd_flush_latency, "flush_latency", "Latency of flushes");
     plb.add_u64_counter(l_librbd_ws, "ws", "WriteSames");
-    plb.add_u64_counter(l_librbd_ws_bytes, "ws_bytes", "WriteSame data");
+    plb.add_u64_counter(l_librbd_ws_bytes, "ws_bytes", "WriteSame data", NULL, 0, unit_t(UNIT_BYTES));
     plb.add_time_avg(l_librbd_ws_latency, "ws_latency", "WriteSame latency");
     plb.add_u64_counter(l_librbd_cmp, "cmp", "CompareAndWrites");
-    plb.add_u64_counter(l_librbd_cmp_bytes, "cmp_bytes", "Data size in cmps");
+    plb.add_u64_counter(l_librbd_cmp_bytes, "cmp_bytes", "Data size in cmps", NULL, 0, unit_t(UNIT_BYTES));
     plb.add_time_avg(l_librbd_cmp_latency, "cmp_latency", "Latency of cmps");
     plb.add_u64_counter(l_librbd_snap_create, "snap_create", "Snap creations");
     plb.add_u64_counter(l_librbd_snap_remove, "snap_remove", "Snap removals");
@@ -400,7 +288,7 @@ struct C_InvalidateCache : public Context {
     plb.add_u64_counter(l_librbd_notify, "notify", "Updated header notifications");
     plb.add_u64_counter(l_librbd_resize, "resize", "Resizes");
     plb.add_u64_counter(l_librbd_readahead, "readahead", "Read ahead");
-    plb.add_u64_counter(l_librbd_readahead_bytes, "readahead_bytes", "Data size in read ahead");
+    plb.add_u64_counter(l_librbd_readahead_bytes, "readahead_bytes", "Data size in read ahead", NULL, 0, unit_t(UNIT_BYTES));
     plb.add_u64_counter(l_librbd_invalidate_cache, "invalidate_cache", "Cache invalidates");
 
     plb.add_time(l_librbd_opened_time, "opened_time", "Opened time",
@@ -415,7 +303,7 @@ struct C_InvalidateCache : public Context {
   }
 
   void ImageCtx::perf_stop() {
-    assert(perfcounter);
+    ceph_assert(perfcounter);
     cct->get_perfcounters_collection()->remove(perfcounter);
     delete perfcounter;
   }
@@ -429,24 +317,24 @@ struct C_InvalidateCache : public Context {
     if (snap_id == LIBRADOS_SNAP_HEAD)
       return flags;
 
-    if (balance_snap_reads)
+    if (config.get_val<bool>("rbd_balance_snap_reads"))
       flags |= librados::OPERATION_BALANCE_READS;
-    else if (localize_snap_reads)
+    else if (config.get_val<bool>("rbd_localize_snap_reads"))
       flags |= librados::OPERATION_LOCALIZE_READS;
     return flags;
   }
 
-  int ImageCtx::snap_set(cls::rbd::SnapshotNamespace in_snap_namespace,
-                        string in_snap_name)
-  {
-    assert(snap_lock.is_wlocked());
-    snap_t in_snap_id = get_snap_id(in_snap_namespace, in_snap_name);
-    if (in_snap_id != CEPH_NOSNAP) {
+  int ImageCtx::snap_set(uint64_t in_snap_id) {
+    ceph_assert(snap_lock.is_wlocked());
+    auto it = snap_info.find(in_snap_id);
+    if (in_snap_id != CEPH_NOSNAP && it != snap_info.end()) {
       snap_id = in_snap_id;
-      snap_namespace = in_snap_namespace;
-      snap_name = in_snap_name;
+      snap_namespace = it->second.snap_namespace;
+      snap_name = it->second.name;
       snap_exists = true;
-      data_ctx.snap_set_read(snap_id);
+      if (data_ctx.is_valid()) {
+        data_ctx.snap_set_read(snap_id);
+      }
       return 0;
     }
     return -ENOENT;
@@ -454,38 +342,41 @@ struct C_InvalidateCache : public Context {
 
   void ImageCtx::snap_unset()
   {
-    assert(snap_lock.is_wlocked());
+    ceph_assert(snap_lock.is_wlocked());
     snap_id = CEPH_NOSNAP;
     snap_namespace = {};
     snap_name = "";
     snap_exists = true;
-    data_ctx.snap_set_read(snap_id);
+    if (data_ctx.is_valid()) {
+      data_ctx.snap_set_read(snap_id);
+    }
   }
 
-  snap_t ImageCtx::get_snap_id(cls::rbd::SnapshotNamespace in_snap_namespace,
-                              string in_snap_name) const
+  snap_t ImageCtx::get_snap_id(const cls::rbd::SnapshotNamespace& in_snap_namespace,
+                               const string& in_snap_name) const
   {
-    assert(snap_lock.is_locked());
+    ceph_assert(snap_lock.is_locked());
     auto it = snap_ids.find({in_snap_namespace, in_snap_name});
-    if (it != snap_ids.end())
+    if (it != snap_ids.end()) {
       return it->second;
+    }
     return CEPH_NOSNAP;
   }
 
   const SnapInfo* ImageCtx::get_snap_info(snap_t in_snap_id) const
   {
-    assert(snap_lock.is_locked());
+    ceph_assert(snap_lock.is_locked());
     map<snap_t, SnapInfo>::const_iterator it =
       snap_info.find(in_snap_id);
     if (it != snap_info.end())
       return &it->second;
-    return NULL;
+    return nullptr;
   }
 
   int ImageCtx::get_snap_name(snap_t in_snap_id,
                              string *out_snap_name) const
   {
-    assert(snap_lock.is_locked());
+    ceph_assert(snap_lock.is_locked());
     const SnapInfo *info = get_snap_info(in_snap_id);
     if (info) {
       *out_snap_name = info->name;
@@ -497,7 +388,7 @@ struct C_InvalidateCache : public Context {
   int ImageCtx::get_snap_namespace(snap_t in_snap_id,
                                   cls::rbd::SnapshotNamespace *out_snap_namespace) const
   {
-    assert(snap_lock.is_locked());
+    ceph_assert(snap_lock.is_locked());
     const SnapInfo *info = get_snap_info(in_snap_id);
     if (info) {
       *out_snap_namespace = info->snap_namespace;
@@ -507,7 +398,7 @@ struct C_InvalidateCache : public Context {
   }
 
   int ImageCtx::get_parent_spec(snap_t in_snap_id,
-                               ParentSpec *out_pspec) const
+                               cls::rbd::ParentImageSpec *out_pspec) const
   {
     const SnapInfo *info = get_snap_info(in_snap_id);
     if (info) {
@@ -519,7 +410,7 @@ struct C_InvalidateCache : public Context {
 
   uint64_t ImageCtx::get_current_size() const
   {
-    assert(snap_lock.is_locked());
+    ceph_assert(snap_lock.is_locked());
     return size;
   }
 
@@ -554,10 +445,32 @@ struct C_InvalidateCache : public Context {
     return create_timestamp;
   }
 
+  utime_t ImageCtx::get_access_timestamp() const
+  {
+    return access_timestamp;
+  }
+
+  utime_t ImageCtx::get_modify_timestamp() const
+  {
+    return modify_timestamp;
+  }
+
+  void ImageCtx::set_access_timestamp(utime_t at)
+  {
+    ceph_assert(timestamp_lock.is_wlocked());
+    access_timestamp = at;
+  }
+
+  void ImageCtx::set_modify_timestamp(utime_t mt)
+  {
+    ceph_assert(timestamp_lock.is_locked());
+    modify_timestamp = mt;
+  }
+
   int ImageCtx::is_snap_protected(snap_t in_snap_id,
                                  bool *is_protected) const
   {
-    assert(snap_lock.is_locked());
+    ceph_assert(snap_lock.is_locked());
     const SnapInfo *info = get_snap_info(in_snap_id);
     if (info) {
       *is_protected =
@@ -570,7 +483,7 @@ struct C_InvalidateCache : public Context {
   int ImageCtx::is_snap_unprotected(snap_t in_snap_id,
                                    bool *is_unprotected) const
   {
-    assert(snap_lock.is_locked());
+    ceph_assert(snap_lock.is_locked());
     const SnapInfo *info = get_snap_info(in_snap_id);
     if (info) {
       *is_unprotected =
@@ -583,10 +496,11 @@ struct C_InvalidateCache : public Context {
   void ImageCtx::add_snap(cls::rbd::SnapshotNamespace in_snap_namespace,
                          string in_snap_name,
                          snap_t id, uint64_t in_size,
-                         const ParentInfo &parent, uint8_t protection_status,
-                          uint64_t flags, utime_t timestamp)
+                         const ParentImageInfo &parent,
+                          uint8_t protection_status, uint64_t flags,
+                          utime_t timestamp)
   {
-    assert(snap_lock.is_wlocked());
+    ceph_assert(snap_lock.is_wlocked());
     snaps.push_back(id);
     SnapInfo info(in_snap_name, in_snap_namespace,
                  in_size, parent, protection_status, flags, timestamp);
@@ -598,7 +512,7 @@ struct C_InvalidateCache : public Context {
                         string in_snap_name,
                         snap_t id)
   {
-    assert(snap_lock.is_wlocked());
+    ceph_assert(snap_lock.is_wlocked());
     snaps.erase(std::remove(snaps.begin(), snaps.end(), id), snaps.end());
     snap_info.erase(id);
     snap_ids.erase({in_snap_namespace, in_snap_name});
@@ -606,7 +520,7 @@ struct C_InvalidateCache : public Context {
 
   uint64_t ImageCtx::get_image_size(snap_t in_snap_id) const
   {
-    assert(snap_lock.is_locked());
+    ceph_assert(snap_lock.is_locked());
     if (in_snap_id == CEPH_NOSNAP) {
       if (!resize_reqs.empty() &&
           resize_reqs.front()->shrinking()) {
@@ -623,7 +537,7 @@ struct C_InvalidateCache : public Context {
   }
 
   uint64_t ImageCtx::get_object_count(snap_t in_snap_id) const {
-    assert(snap_lock.is_locked());
+    ceph_assert(snap_lock.is_locked());
     uint64_t image_size = get_image_size(in_snap_id);
     return Striper::get_num_objects(layout, image_size);
   }
@@ -637,13 +551,26 @@ struct C_InvalidateCache : public Context {
   bool ImageCtx::test_features(uint64_t in_features,
                                const RWLock &in_snap_lock) const
   {
-    assert(snap_lock.is_locked());
+    ceph_assert(snap_lock.is_locked());
     return ((features & in_features) == in_features);
   }
 
+  bool ImageCtx::test_op_features(uint64_t in_op_features) const
+  {
+    RWLock::RLocker snap_locker(snap_lock);
+    return test_op_features(in_op_features, snap_lock);
+  }
+
+  bool ImageCtx::test_op_features(uint64_t in_op_features,
+                                  const RWLock &in_snap_lock) const
+  {
+    ceph_assert(snap_lock.is_locked());
+    return ((op_features & in_op_features) == in_op_features);
+  }
+
   int ImageCtx::get_flags(librados::snap_t _snap_id, uint64_t *_flags) const
   {
-    assert(snap_lock.is_locked());
+    ceph_assert(snap_lock.is_locked());
     if (_snap_id == CEPH_NOSNAP) {
       *_flags = flags;
       return 0;
@@ -656,18 +583,20 @@ struct C_InvalidateCache : public Context {
     return -ENOENT;
   }
 
-  int ImageCtx::test_flags(uint64_t flags, bool *flags_set) const
+  int ImageCtx::test_flags(librados::snap_t in_snap_id,
+                           uint64_t flags, bool *flags_set) const
   {
     RWLock::RLocker l(snap_lock);
-    return test_flags(flags, snap_lock, flags_set);
+    return test_flags(in_snap_id, flags, snap_lock, flags_set);
   }
 
-  int ImageCtx::test_flags(uint64_t flags, const RWLock &in_snap_lock,
+  int ImageCtx::test_flags(librados::snap_t in_snap_id,
+                           uint64_t flags, const RWLock &in_snap_lock,
                            bool *flags_set) const
   {
-    assert(snap_lock.is_locked());
+    ceph_assert(snap_lock.is_locked());
     uint64_t snap_flags;
-    int r = get_flags(snap_id, &snap_flags);
+    int r = get_flags(in_snap_id, &snap_flags);
     if (r < 0) {
       return r;
     }
@@ -677,7 +606,7 @@ struct C_InvalidateCache : public Context {
 
   int ImageCtx::update_flags(snap_t in_snap_id, uint64_t flag, bool enabled)
   {
-    assert(snap_lock.is_wlocked());
+    ceph_assert(snap_lock.is_wlocked());
     uint64_t *_flags;
     if (in_snap_id == CEPH_NOSNAP) {
       _flags = &flags;
@@ -697,10 +626,10 @@ struct C_InvalidateCache : public Context {
     return 0;
   }
 
-  const ParentInfo* ImageCtx::get_parent_info(snap_t in_snap_id) const
+  const ParentImageInfo* ImageCtx::get_parent_info(snap_t in_snap_id) const
   {
-    assert(snap_lock.is_locked());
-    assert(parent_lock.is_locked());
+    ceph_assert(snap_lock.is_locked());
+    ceph_assert(parent_lock.is_locked());
     if (in_snap_id == CEPH_NOSNAP)
       return &parent_md;
     const SnapInfo *info = get_snap_info(in_snap_id);
@@ -711,7 +640,7 @@ struct C_InvalidateCache : public Context {
 
   int64_t ImageCtx::get_parent_pool_id(snap_t in_snap_id) const
   {
-    const ParentInfo *info = get_parent_info(in_snap_id);
+    const auto info = get_parent_info(in_snap_id);
     if (info)
       return info->spec.pool_id;
     return -1;
@@ -719,7 +648,7 @@ struct C_InvalidateCache : public Context {
 
   string ImageCtx::get_parent_image_id(snap_t in_snap_id) const
   {
-    const ParentInfo *info = get_parent_info(in_snap_id);
+    const auto info = get_parent_info(in_snap_id);
     if (info)
       return info->spec.image_id;
     return "";
@@ -727,7 +656,7 @@ struct C_InvalidateCache : public Context {
 
   uint64_t ImageCtx::get_parent_snap_id(snap_t in_snap_id) const
   {
-    const ParentInfo *info = get_parent_info(in_snap_id);
+    const auto info = get_parent_info(in_snap_id);
     if (info)
       return info->spec.snap_id;
     return CEPH_NOSNAP;
@@ -735,8 +664,8 @@ struct C_InvalidateCache : public Context {
 
   int ImageCtx::get_parent_overlap(snap_t in_snap_id, uint64_t *overlap) const
   {
-    assert(snap_lock.is_locked());
-    const ParentInfo *info = get_parent_info(in_snap_id);
+    ceph_assert(snap_lock.is_locked());
+    const auto info = get_parent_info(in_snap_id);
     if (info) {
       *overlap = info->overlap;
       return 0;
@@ -744,128 +673,8 @@ struct C_InvalidateCache : public Context {
     return -ENOENT;
   }
 
-  void ImageCtx::aio_read_from_cache(object_t o, uint64_t object_no,
-                                    bufferlist *bl, size_t len,
-                                    uint64_t off, Context *onfinish,
-                                    int fadvise_flags, ZTracer::Trace *trace) {
-    snap_lock.get_read();
-    ObjectCacher::OSDRead *rd = object_cacher->prepare_read(snap_id, bl, fadvise_flags);
-    snap_lock.put_read();
-    ObjectExtent extent(o, object_no, off, len, 0);
-    extent.oloc.pool = data_ctx.get_id();
-    extent.buffer_extents.push_back(make_pair(0, len));
-    rd->extents.push_back(extent);
-    cache_lock.Lock();
-    int r = object_cacher->readx(rd, object_set, onfinish, trace);
-    cache_lock.Unlock();
-    if (r != 0)
-      onfinish->complete(r);
-  }
-
-  void ImageCtx::write_to_cache(object_t o, const bufferlist& bl, size_t len,
-                               uint64_t off, Context *onfinish,
-                               int fadvise_flags, uint64_t journal_tid,
-                               ZTracer::Trace *trace) {
-    snap_lock.get_read();
-    ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(
-      snapc, bl, ceph::real_time::min(), fadvise_flags, journal_tid);
-    snap_lock.put_read();
-    ObjectExtent extent(o, 0, off, len, 0);
-    extent.oloc.pool = data_ctx.get_id();
-    // XXX: nspace is always default, io_ctx_impl field private
-    //extent.oloc.nspace = data_ctx.io_ctx_impl->oloc.nspace;
-    extent.buffer_extents.push_back(make_pair(0, len));
-    wr->extents.push_back(extent);
-    {
-      Mutex::Locker l(cache_lock);
-      object_cacher->writex(wr, object_set, onfinish, trace);
-    }
-  }
-
-  void ImageCtx::user_flushed() {
-    if (object_cacher && cache_writethrough_until_flush) {
-      md_lock.get_read();
-      bool flushed_before = flush_encountered;
-      md_lock.put_read();
-
-      uint64_t max_dirty = cache_max_dirty;
-      if (!flushed_before && max_dirty > 0) {
-       md_lock.get_write();
-       flush_encountered = true;
-       md_lock.put_write();
-
-       ldout(cct, 10) << "saw first user flush, enabling writeback" << dendl;
-       Mutex::Locker l(cache_lock);
-       object_cacher->set_max_dirty(max_dirty);
-      }
-    }
-  }
-
-  void ImageCtx::flush_cache(Context *onfinish) {
-    cache_lock.Lock();
-    object_cacher->flush_set(object_set, onfinish);
-    cache_lock.Unlock();
-  }
-
-  void ImageCtx::shut_down_cache(Context *on_finish) {
-    if (object_cacher == NULL) {
-      on_finish->complete(0);
-      return;
-    }
-
-    cache_lock.Lock();
-    object_cacher->release_set(object_set);
-    cache_lock.Unlock();
-
-    C_ShutDownCache *shut_down = new C_ShutDownCache(this, on_finish);
-    flush_cache(new C_InvalidateCache(this, true, false, shut_down));
-  }
-
-  int ImageCtx::invalidate_cache(bool purge_on_error) {
-    flush_async_operations();
-    if (object_cacher == NULL) {
-      return 0;
-    }
-
-    cache_lock.Lock();
-    object_cacher->release_set(object_set);
-    cache_lock.Unlock();
-
-    C_SaferCond ctx;
-    flush_cache(new C_InvalidateCache(this, purge_on_error, true, &ctx));
-
-    int result = ctx.wait();
-    return result;
-  }
-
-  void ImageCtx::invalidate_cache(bool purge_on_error, Context *on_finish) {
-    if (object_cacher == NULL) {
-      op_work_queue->queue(on_finish, 0);
-      return;
-    }
-
-    cache_lock.Lock();
-    object_cacher->release_set(object_set);
-    cache_lock.Unlock();
-
-    flush_cache(new C_InvalidateCache(this, purge_on_error, false, on_finish));
-  }
-
-  void ImageCtx::clear_nonexistence_cache() {
-    assert(cache_lock.is_locked());
-    if (!object_cacher)
-      return;
-    object_cacher->clear_nonexistence(object_set);
-  }
-
-  bool ImageCtx::is_cache_empty() {
-    Mutex::Locker locker(cache_lock);
-    return object_cacher->set_is_empty(object_set);
-  }
-
   void ImageCtx::register_watch(Context *on_finish) {
-    assert(image_watcher == NULL);
-    image_watcher = new ImageWatcher<>(*this);
+    ceph_assert(image_watcher != NULL);
     image_watcher->register_watch(on_finish);
   }
 
@@ -891,42 +700,6 @@ struct C_InvalidateCache : public Context {
     return len;
   }
 
-  void ImageCtx::flush_async_operations() {
-    C_SaferCond ctx;
-    flush_async_operations(&ctx);
-    ctx.wait();
-  }
-
-  void ImageCtx::flush_async_operations(Context *on_finish) {
-    {
-      Mutex::Locker l(async_ops_lock);
-      if (!async_ops.empty()) {
-        ldout(cct, 20) << "flush async operations: " << on_finish << " "
-                       << "count=" << async_ops.size() << dendl;
-        async_ops.front()->add_flush_context(on_finish);
-        return;
-      }
-    }
-    on_finish->complete(0);
-  }
-
-  int ImageCtx::flush() {
-    C_SaferCond cond_ctx;
-    flush(&cond_ctx);
-    return cond_ctx.wait();
-  }
-
-  void ImageCtx::flush(Context *on_safe) {
-    // ensure no locks are held when flush is complete
-    on_safe = util::create_async_context_callback(*this, on_safe);
-
-    if (object_cacher != NULL) {
-      // flush cache after completing all in-flight AIO ops
-      on_safe = new C_FlushCache(this, on_safe);
-    }
-    flush_async_operations(on_safe);
-  }
-
   void ImageCtx::cancel_async_requests() {
     C_SaferCond ctx;
     cancel_async_requests(&ctx);
@@ -958,135 +731,112 @@ struct C_InvalidateCache : public Context {
     completed_reqs.clear();
   }
 
-  bool ImageCtx::_filter_metadata_confs(const string &prefix,
-                                        map<string, bool> &configs,
-                                        const map<string, bufferlist> &pairs,
-                                        map<string, bufferlist> *res) {
-    size_t conf_prefix_len = prefix.size();
+  void ImageCtx::apply_metadata(const std::map<std::string, bufferlist> &meta,
+                                bool thread_safe) {
+    ldout(cct, 20) << __func__ << dendl;
+
+    // reset settings back to global defaults
+    for (auto& key : config_overrides) {
+      std::string value;
+      int r = cct->_conf.get_val(key, &value);
+      ceph_assert(r == 0);
 
-    for (auto it : pairs) {
-      if (it.first.compare(0, MIN(conf_prefix_len, it.first.size()), prefix) > 0)
-        return false;
+      config.set_val(key, value);
+    }
+    config_overrides.clear();
 
-      if (it.first.size() <= conf_prefix_len)
+    // extract config overrides
+    for (auto meta_pair : meta) {
+      if (!boost::starts_with(meta_pair.first, METADATA_CONF_PREFIX)) {
         continue;
+      }
 
-      string key = it.first.substr(conf_prefix_len, it.first.size() - conf_prefix_len);
-      auto cit = configs.find(key);
-      if (cit != configs.end()) {
-        cit->second = true;
-        res->insert(make_pair(key, it.second));
+      std::string key = meta_pair.first.substr(METADATA_CONF_PREFIX.size());
+      if (!boost::starts_with(key, "rbd_")) {
+        // ignore non-RBD configuration keys
+        // TODO use option schema to determine applicable subsystem
+        ldout(cct, 0) << __func__ << ": ignoring config " << key << dendl;
+        continue;
       }
-    }
-    return true;
-  }
 
-  void ImageCtx::apply_metadata(const std::map<std::string, bufferlist> &meta,
-                                bool thread_safe) {
-    ldout(cct, 20) << __func__ << dendl;
-    std::map<string, bool> configs = boost::assign::map_list_of(
-        "rbd_non_blocking_aio", false)(
-        "rbd_cache", false)(
-        "rbd_cache_writethrough_until_flush", false)(
-        "rbd_cache_size", false)(
-        "rbd_cache_max_dirty", false)(
-        "rbd_cache_target_dirty", false)(
-        "rbd_cache_max_dirty_age", false)(
-        "rbd_cache_max_dirty_object", false)(
-        "rbd_cache_block_writes_upfront", false)(
-        "rbd_concurrent_management_ops", false)(
-        "rbd_balance_snap_reads", false)(
-        "rbd_localize_snap_reads", false)(
-        "rbd_balance_parent_reads", false)(
-        "rbd_localize_parent_reads", false)(
-        "rbd_sparse_read_threshold_bytes", false)(
-        "rbd_readahead_trigger_requests", false)(
-        "rbd_readahead_max_bytes", false)(
-        "rbd_readahead_disable_after_bytes", false)(
-        "rbd_clone_copy_on_read", false)(
-        "rbd_blacklist_on_break_lock", false)(
-        "rbd_blacklist_expire_seconds", false)(
-        "rbd_request_timed_out_seconds", false)(
-        "rbd_journal_order", false)(
-        "rbd_journal_splay_width", false)(
-        "rbd_journal_commit_age", false)(
-        "rbd_journal_object_flush_interval", false)(
-        "rbd_journal_object_flush_bytes", false)(
-        "rbd_journal_object_flush_age", false)(
-        "rbd_journal_pool", false)(
-        "rbd_journal_max_payload_bytes", false)(
-        "rbd_journal_max_concurrent_object_sets", false)(
-        "rbd_mirroring_resync_after_disconnect", false)(
-        "rbd_mirroring_replay_delay", false)(
-        "rbd_skip_partial_discard", false);
-
-    md_config_t local_config_t;
-    std::map<std::string, bufferlist> res;
-
-    _filter_metadata_confs(METADATA_CONF_PREFIX, configs, meta, &res);
-    for (auto it : res) {
-      std::string val(it.second.c_str(), it.second.length());
-      int j = local_config_t.set_val(it.first.c_str(), val);
-      if (j < 0) {
-        lderr(cct) << __func__ << " failed to set config " << it.first
-                   << " with value " << it.second.c_str() << ": " << j
-                   << dendl;
+      if (config.find_option(key) != nullptr) {
+        std::string val(meta_pair.second.c_str(), meta_pair.second.length());
+        int r = config.set_val(key, val);
+        if (r >= 0) {
+          ldout(cct, 20) << __func__ << ": " << key << "=" << val << dendl;
+          config_overrides.insert(key);
+        } else {
+          lderr(cct) << __func__ << ": failed to set config " << key << " "
+                     << "with value " << val << ": " << cpp_strerror(r)
+                     << dendl;
+        }
       }
     }
 
-#define ASSIGN_OPTION(config, type)                                            \
-    do {                                                                       \
-      string key = "rbd_";                                                    \
-      key = key + #config;                                                    \
-      if (configs[key])                                                        \
-        config = local_config_t.get_val<type>("rbd_"#config);                  \
-      else                                                                     \
-        config = cct->_conf->get_val<type>("rbd_"#config);                     \
-    } while (0);
+#define ASSIGN_OPTION(param, type)              \
+    param = config.get_val<type>("rbd_"#param)
 
+    bool skip_partial_discard = true;
     ASSIGN_OPTION(non_blocking_aio, bool);
     ASSIGN_OPTION(cache, bool);
     ASSIGN_OPTION(cache_writethrough_until_flush, bool);
-    ASSIGN_OPTION(cache_size, int64_t);
-    ASSIGN_OPTION(cache_max_dirty, int64_t);
-    ASSIGN_OPTION(cache_target_dirty, int64_t);
-    ASSIGN_OPTION(cache_max_dirty_age, double);
-    ASSIGN_OPTION(cache_max_dirty_object, int64_t);
-    ASSIGN_OPTION(cache_block_writes_upfront, bool);
-    ASSIGN_OPTION(concurrent_management_ops, int64_t);
-    ASSIGN_OPTION(balance_snap_reads, bool);
-    ASSIGN_OPTION(localize_snap_reads, bool);
-    ASSIGN_OPTION(balance_parent_reads, bool);
-    ASSIGN_OPTION(localize_parent_reads, bool);
-    ASSIGN_OPTION(sparse_read_threshold_bytes, uint64_t);
-    ASSIGN_OPTION(readahead_trigger_requests, int64_t);
-    ASSIGN_OPTION(readahead_max_bytes, int64_t);
-    ASSIGN_OPTION(readahead_disable_after_bytes, int64_t);
+    ASSIGN_OPTION(cache_max_dirty, Option::size_t);
+    ASSIGN_OPTION(sparse_read_threshold_bytes, Option::size_t);
+    ASSIGN_OPTION(readahead_max_bytes, Option::size_t);
+    ASSIGN_OPTION(readahead_disable_after_bytes, Option::size_t);
     ASSIGN_OPTION(clone_copy_on_read, bool);
-    ASSIGN_OPTION(blacklist_on_break_lock, bool);
-    ASSIGN_OPTION(blacklist_expire_seconds, int64_t);
-    ASSIGN_OPTION(request_timed_out_seconds, int64_t);
     ASSIGN_OPTION(enable_alloc_hint, bool);
-    ASSIGN_OPTION(journal_order, uint64_t);
-    ASSIGN_OPTION(journal_splay_width, uint64_t);
-    ASSIGN_OPTION(journal_commit_age, double);
-    ASSIGN_OPTION(journal_object_flush_interval, int64_t);
-    ASSIGN_OPTION(journal_object_flush_bytes, int64_t);
-    ASSIGN_OPTION(journal_object_flush_age, double);
-    ASSIGN_OPTION(journal_max_payload_bytes, uint64_t);
-    ASSIGN_OPTION(journal_max_concurrent_object_sets, int64_t);
-    ASSIGN_OPTION(mirroring_resync_after_disconnect, bool);
-    ASSIGN_OPTION(mirroring_replay_delay, int64_t);
+    ASSIGN_OPTION(mirroring_replay_delay, uint64_t);
+    ASSIGN_OPTION(mtime_update_interval, uint64_t);
+    ASSIGN_OPTION(atime_update_interval, uint64_t);
     ASSIGN_OPTION(skip_partial_discard, bool);
+    ASSIGN_OPTION(discard_granularity_bytes, uint64_t);
     ASSIGN_OPTION(blkin_trace_all, bool);
 
-    if (thread_safe) {
-      ASSIGN_OPTION(journal_pool, std::string);
-    }
+#undef ASSIGN_OPTION
 
     if (sparse_read_threshold_bytes == 0) {
       sparse_read_threshold_bytes = get_object_size();
     }
+    if (!skip_partial_discard) {
+      discard_granularity_bytes = 0;
+    }
+
+    alloc_hint_flags = 0;
+    auto compression_hint = config.get_val<std::string>("rbd_compression_hint");
+    if (compression_hint == "compressible") {
+      alloc_hint_flags |= librados::ALLOC_HINT_FLAG_COMPRESSIBLE;
+    } else if (compression_hint == "incompressible") {
+      alloc_hint_flags |= librados::ALLOC_HINT_FLAG_INCOMPRESSIBLE;
+    }
+
+    io_work_queue->apply_qos_schedule_tick_min(
+      config.get_val<uint64_t>("rbd_qos_schedule_tick_min"));
+
+    io_work_queue->apply_qos_limit(
+      RBD_QOS_IOPS_THROTTLE,
+      config.get_val<uint64_t>("rbd_qos_iops_limit"),
+      config.get_val<uint64_t>("rbd_qos_iops_burst"));
+    io_work_queue->apply_qos_limit(
+      RBD_QOS_BPS_THROTTLE,
+      config.get_val<uint64_t>("rbd_qos_bps_limit"),
+      config.get_val<uint64_t>("rbd_qos_bps_burst"));
+    io_work_queue->apply_qos_limit(
+      RBD_QOS_READ_IOPS_THROTTLE,
+      config.get_val<uint64_t>("rbd_qos_read_iops_limit"),
+      config.get_val<uint64_t>("rbd_qos_read_iops_burst"));
+    io_work_queue->apply_qos_limit(
+      RBD_QOS_WRITE_IOPS_THROTTLE,
+      config.get_val<uint64_t>("rbd_qos_write_iops_limit"),
+      config.get_val<uint64_t>("rbd_qos_write_iops_burst"));
+    io_work_queue->apply_qos_limit(
+      RBD_QOS_READ_BPS_THROTTLE,
+      config.get_val<uint64_t>("rbd_qos_read_bps_limit"),
+      config.get_val<uint64_t>("rbd_qos_read_bps_burst"));
+    io_work_queue->apply_qos_limit(
+      RBD_QOS_WRITE_BPS_THROTTLE,
+      config.get_val<uint64_t>("rbd_qos_write_bps_limit"),
+      config.get_val<uint64_t>("rbd_qos_write_bps_burst"));
   }
 
   ExclusiveLock<ImageCtx> *ImageCtx::create_exclusive_lock() {
@@ -1122,27 +872,27 @@ struct C_InvalidateCache : public Context {
   }
 
   exclusive_lock::Policy *ImageCtx::get_exclusive_lock_policy() const {
-    assert(owner_lock.is_locked());
-    assert(exclusive_lock_policy != nullptr);
+    ceph_assert(owner_lock.is_locked());
+    ceph_assert(exclusive_lock_policy != nullptr);
     return exclusive_lock_policy;
   }
 
   void ImageCtx::set_exclusive_lock_policy(exclusive_lock::Policy *policy) {
-    assert(owner_lock.is_wlocked());
-    assert(policy != nullptr);
+    ceph_assert(owner_lock.is_wlocked());
+    ceph_assert(policy != nullptr);
     delete exclusive_lock_policy;
     exclusive_lock_policy = policy;
   }
 
   journal::Policy *ImageCtx::get_journal_policy() const {
-    assert(snap_lock.is_locked());
-    assert(journal_policy != nullptr);
+    ceph_assert(snap_lock.is_locked());
+    ceph_assert(journal_policy != nullptr);
     return journal_policy;
   }
 
   void ImageCtx::set_journal_policy(journal::Policy *policy) {
-    assert(snap_lock.is_wlocked());
-    assert(policy != nullptr);
+    ceph_assert(snap_lock.is_wlocked());
+    ceph_assert(policy != nullptr);
     delete journal_policy;
     journal_policy = policy;
   }
@@ -1154,18 +904,18 @@ struct C_InvalidateCache : public Context {
   void ImageCtx::get_thread_pool_instance(CephContext *cct,
                                           ThreadPool **thread_pool,
                                           ContextWQ **op_work_queue) {
-    ThreadPoolSingleton *thread_pool_singleton;
-    cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
-      thread_pool_singleton, "librbd::thread_pool");
+    auto thread_pool_singleton =
+      &cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
+       "librbd::thread_pool", false, cct);
     *thread_pool = thread_pool_singleton;
     *op_work_queue = thread_pool_singleton->op_work_queue;
   }
 
   void ImageCtx::get_timer_instance(CephContext *cct, SafeTimer **timer,
                                     Mutex **timer_lock) {
-    SafeTimerSingleton *safe_timer_singleton;
-    cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
-      safe_timer_singleton, "librbd::journal::safe_timer");
+    auto safe_timer_singleton =
+      &cct->lookup_or_create_singleton_object<SafeTimerSingleton>(
+       "librbd::journal::safe_timer", false, cct);
     *timer = safe_timer_singleton;
     *timer_lock = &safe_timer_singleton->lock;
   }