]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/cls/rbd/cls_rbd.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / cls / rbd / cls_rbd.cc
index 0c2d7aee595ec4ac52acc82cab009019225fe8dc..3a810e2e91452bbbba88bd3411538f81fdac75de 100644 (file)
@@ -31,6 +31,7 @@
 #include <errno.h>
 #include <sstream>
 
+#include "include/uuid.h"
 #include "common/bit_vector.hpp"
 #include "common/errno.h"
 #include "objclass/objclass.h"
@@ -60,11 +61,50 @@ CLS_NAME(rbd)
 
 #define RBD_MAX_KEYS_READ 64
 #define RBD_SNAP_KEY_PREFIX "snapshot_"
+#define RBD_SNAP_CHILDREN_KEY_PREFIX "snap_children_"
 #define RBD_DIR_ID_KEY_PREFIX "id_"
 #define RBD_DIR_NAME_KEY_PREFIX "name_"
 #define RBD_METADATA_KEY_PREFIX "metadata_"
 
-#define GROUP_SNAP_SEQ "snap_seq"
+namespace {
+
+uint64_t get_encode_features(cls_method_context_t hctx) {
+  uint64_t features = 0;
+  int8_t require_osd_release = cls_get_required_osd_release(hctx);
+  if (require_osd_release >= CEPH_RELEASE_NAUTILUS) {
+    features |= CEPH_FEATURE_SERVER_NAUTILUS;
+  }
+  return features;
+}
+
+bool calc_sparse_extent(const bufferptr &bp, size_t sparse_size,
+                        uint64_t length, size_t *write_offset,
+                        size_t *write_length, size_t *offset) {
+  size_t extent_size;
+  if (*offset + sparse_size > length) {
+    extent_size = length - *offset;
+  } else {
+    extent_size = sparse_size;
+  }
+
+  bufferptr extent(bp, *offset, extent_size);
+  *offset += extent_size;
+
+  bool extent_is_zero = extent.is_zero();
+  if (!extent_is_zero) {
+    *write_length += extent_size;
+  }
+  if (extent_is_zero && *write_length == 0) {
+    *write_offset += extent_size;
+  }
+
+  if ((extent_is_zero || *offset == length) && *write_length != 0) {
+    return true;
+  }
+  return false;
+}
+
+} // anonymous namespace
 
 static int snap_read_header(cls_method_context_t hctx, bufferlist& bl)
 {
@@ -87,7 +127,7 @@ static int snap_read_header(cls_method_context_t hctx, bufferlist& bl)
       return -EINVAL;
 
     header = (struct rbd_obj_header_ondisk *)bl.c_str();
-    assert(header);
+    ceph_assert(header);
 
     if ((snap_count != header->snap_count) ||
         (snap_names_len != header->snap_names_len)) {
@@ -110,8 +150,7 @@ static void key_from_snap_id(snapid_t snap_id, string *out)
   *out = oss.str();
 }
 
-static snapid_t snap_id_from_key(const string &key)
-{
+static snapid_t snap_id_from_key(const string &key) {
   istringstream iss(key);
   uint64_t id;
   iss.ignore(strlen(RBD_SNAP_KEY_PREFIX)) >> std::hex >> id;
@@ -131,8 +170,8 @@ static int read_key(cls_method_context_t hctx, const string &key, T *out)
   }
 
   try {
-    bufferlist::iterator it = bl.begin();
-    ::decode(*out, it);
+    auto it = bl.cbegin();
+    decode(*out, it);
   } catch (const buffer::error &err) {
     CLS_ERR("error decoding %s", key.c_str());
     return -EIO;
@@ -141,6 +180,33 @@ static int read_key(cls_method_context_t hctx, const string &key, T *out)
   return 0;
 }
 
+template <typename T>
+static int write_key(cls_method_context_t hctx, const string &key, const T &t) {
+  bufferlist bl;
+  encode(t, bl);
+
+  int r = cls_cxx_map_set_val(hctx, key, &bl);
+  if (r < 0) {
+    CLS_ERR("failed to set omap key: %s", key.c_str());
+    return r;
+  }
+  return 0;
+}
+
+template <typename T>
+static int write_key(cls_method_context_t hctx, const string &key, const T &t,
+                     uint64_t features) {
+  bufferlist bl;
+  encode(t, bl, features);
+
+  int r = cls_cxx_map_set_val(hctx, key, &bl);
+  if (r < 0) {
+    CLS_ERR("failed to set omap key: %s", key.c_str());
+    return r;
+  }
+  return 0;
+}
+
 static int remove_key(cls_method_context_t hctx, const string &key) {
   int r = cls_cxx_map_remove_key(hctx, key);
   if (r < 0 && r != -ENOENT) {
@@ -161,6 +227,511 @@ static bool is_valid_id(const string &id) {
   return true;
 }
 
+/**
+ * verify that the header object exists
+ *
+ * @return 0 if the object exists, -ENOENT if it does not, or other error
+ */
+static int check_exists(cls_method_context_t hctx)
+{
+  uint64_t size;
+  time_t mtime;
+  return cls_cxx_stat(hctx, &size, &mtime);
+}
+
+namespace image {
+
+/**
+ * check that given feature(s) are set
+ *
+ * @param hctx context
+ * @param need features needed
+ * @return 0 if features are set, negative error (like ENOEXEC) otherwise
+ */
+int require_feature(cls_method_context_t hctx, uint64_t need)
+{
+  uint64_t features;
+  int r = read_key(hctx, "features", &features);
+  if (r == -ENOENT)   // this implies it's an old-style image with no features
+    return -ENOEXEC;
+  if (r < 0)
+    return r;
+  if ((features & need) != need) {
+    CLS_LOG(10, "require_feature missing feature %llx, have %llx",
+            (unsigned long long)need, (unsigned long long)features);
+    return -ENOEXEC;
+  }
+  return 0;
+}
+
+std::string snap_children_key_from_snap_id(snapid_t snap_id)
+{
+  ostringstream oss;
+  oss << RBD_SNAP_CHILDREN_KEY_PREFIX
+      << std::setw(16) << std::setfill('0') << std::hex << snap_id;
+  return oss.str();
+}
+
+int set_op_features(cls_method_context_t hctx, uint64_t op_features,
+                    uint64_t mask) {
+  uint64_t orig_features;
+  int r = read_key(hctx, "features", &orig_features);
+  if (r < 0) {
+    CLS_ERR("failed to read features off disk: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  uint64_t orig_op_features = 0;
+  r = read_key(hctx, "op_features", &orig_op_features);
+  if (r < 0 && r != -ENOENT) {
+    CLS_ERR("Could not read op features off disk: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  op_features = (orig_op_features & ~mask) | (op_features & mask);
+  CLS_LOG(10, "op_features=%" PRIu64 " orig_op_features=%" PRIu64,
+          op_features, orig_op_features);
+  if (op_features == orig_op_features) {
+    return 0;
+  }
+
+  uint64_t features = orig_features;
+  if (op_features == 0ULL) {
+    features &= ~RBD_FEATURE_OPERATIONS;
+
+    r = cls_cxx_map_remove_key(hctx, "op_features");
+    if (r == -ENOENT) {
+      r = 0;
+    }
+  } else {
+    features |= RBD_FEATURE_OPERATIONS;
+
+    bufferlist bl;
+    encode(op_features, bl);
+    r = cls_cxx_map_set_val(hctx, "op_features", &bl);
+  }
+
+  if (r < 0) {
+    CLS_ERR("error updating op features: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  if (features != orig_features) {
+    bufferlist bl;
+    encode(features, bl);
+    r = cls_cxx_map_set_val(hctx, "features", &bl);
+    if (r < 0) {
+      CLS_ERR("error updating features: %s", cpp_strerror(r).c_str());
+      return r;
+    }
+  }
+
+  return 0;
+}
+
+int set_migration(cls_method_context_t hctx,
+                  const cls::rbd::MigrationSpec &migration_spec, bool init) {
+  if (init) {
+    bufferlist bl;
+    int r = cls_cxx_map_get_val(hctx, "migration", &bl);
+    if (r != -ENOENT) {
+      if (r == 0) {
+        CLS_LOG(10, "migration already set");
+        return -EEXIST;
+      }
+      CLS_ERR("failed to read migration off disk: %s", cpp_strerror(r).c_str());
+      return r;
+    }
+
+    uint64_t features = 0;
+    r = read_key(hctx, "features", &features);
+    if (r == -ENOENT) {
+      CLS_LOG(20, "no features, assuming v1 format");
+      bufferlist header;
+      r = cls_cxx_read(hctx, 0, sizeof(RBD_HEADER_TEXT), &header);
+      if (r < 0) {
+        CLS_ERR("failed to read v1 header: %s", cpp_strerror(r).c_str());
+        return r;
+      }
+      if (header.length() != sizeof(RBD_HEADER_TEXT)) {
+        CLS_ERR("unrecognized v1 header format");
+        return -ENXIO;
+      }
+      if (memcmp(RBD_HEADER_TEXT, header.c_str(), header.length()) != 0) {
+        if (memcmp(RBD_MIGRATE_HEADER_TEXT, header.c_str(),
+                   header.length()) == 0) {
+          CLS_LOG(10, "migration already set");
+          return -EEXIST;
+        } else {
+          CLS_ERR("unrecognized v1 header format");
+          return -ENXIO;
+        }
+      }
+      if (migration_spec.header_type != cls::rbd::MIGRATION_HEADER_TYPE_SRC) {
+        CLS_LOG(10, "v1 format image can only be migration source");
+        return -EINVAL;
+      }
+
+      header.clear();
+      header.append(RBD_MIGRATE_HEADER_TEXT);
+      r = cls_cxx_write(hctx, 0, header.length(), &header);
+      if (r < 0) {
+        CLS_ERR("error updating v1 header: %s", cpp_strerror(r).c_str());
+        return r;
+      }
+    } else if (r < 0) {
+      CLS_ERR("failed to read features off disk: %s", cpp_strerror(r).c_str());
+      return r;
+    } else if ((features & RBD_FEATURE_MIGRATING) != 0ULL) {
+      if (migration_spec.header_type != cls::rbd::MIGRATION_HEADER_TYPE_DST) {
+        CLS_LOG(10, "migrating feature already set");
+        return -EEXIST;
+      }
+    } else {
+      features |= RBD_FEATURE_MIGRATING;
+      bl.clear();
+      encode(features, bl);
+      r = cls_cxx_map_set_val(hctx, "features", &bl);
+      if (r < 0) {
+        CLS_ERR("error updating features: %s", cpp_strerror(r).c_str());
+        return r;
+      }
+    }
+  }
+
+  bufferlist bl;
+  encode(migration_spec, bl);
+  int r = cls_cxx_map_set_val(hctx, "migration", &bl);
+  if (r < 0) {
+    CLS_ERR("error setting migration: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  return 0;
+}
+
+int read_migration(cls_method_context_t hctx,
+                   cls::rbd::MigrationSpec *migration_spec) {
+  uint64_t features = 0;
+  int r = read_key(hctx, "features", &features);
+  if (r == -ENOENT) {
+    CLS_LOG(20, "no features, assuming v1 format");
+    bufferlist header;
+    r = cls_cxx_read(hctx, 0, sizeof(RBD_HEADER_TEXT), &header);
+    if (r < 0) {
+      CLS_ERR("failed to read v1 header: %s", cpp_strerror(r).c_str());
+      return r;
+    }
+    if (header.length() != sizeof(RBD_HEADER_TEXT)) {
+      CLS_ERR("unrecognized v1 header format");
+      return -ENXIO;
+    }
+    if (memcmp(RBD_MIGRATE_HEADER_TEXT, header.c_str(), header.length()) != 0) {
+      if (memcmp(RBD_HEADER_TEXT, header.c_str(), header.length()) == 0) {
+        CLS_LOG(10, "migration feature not set");
+        return -EINVAL;
+      } else {
+        CLS_ERR("unrecognized v1 header format");
+        return -ENXIO;
+      }
+    }
+    if (migration_spec->header_type != cls::rbd::MIGRATION_HEADER_TYPE_SRC) {
+      CLS_LOG(10, "v1 format image can only be migration source");
+      return -EINVAL;
+    }
+  } else if (r < 0) {
+    CLS_ERR("failed to read features off disk: %s", cpp_strerror(r).c_str());
+    return r;
+  } else if ((features & RBD_FEATURE_MIGRATING) == 0ULL) {
+    CLS_LOG(10, "migration feature not set");
+    return -EINVAL;
+  }
+
+  r = read_key(hctx, "migration", migration_spec);
+  if (r < 0) {
+    CLS_ERR("failed to read migration off disk: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  return 0;
+}
+
+int remove_migration(cls_method_context_t hctx) {
+  int r = remove_key(hctx, "migration");
+  if (r < 0) {
+    return r;
+  }
+
+  uint64_t features = 0;
+  r = read_key(hctx, "features", &features);
+  if (r == -ENOENT) {
+    CLS_LOG(20, "no features, assuming v1 format");
+    bufferlist header;
+    r = cls_cxx_read(hctx, 0, sizeof(RBD_MIGRATE_HEADER_TEXT), &header);
+    if (header.length() != sizeof(RBD_MIGRATE_HEADER_TEXT)) {
+      CLS_ERR("unrecognized v1 header format");
+      return -ENXIO;
+    }
+    if (memcmp(RBD_MIGRATE_HEADER_TEXT, header.c_str(), header.length()) != 0) {
+      if (memcmp(RBD_HEADER_TEXT, header.c_str(), header.length()) == 0) {
+        CLS_LOG(10, "migration feature not set");
+        return -EINVAL;
+      } else {
+        CLS_ERR("unrecognized v1 header format");
+        return -ENXIO;
+      }
+    }
+    header.clear();
+    header.append(RBD_HEADER_TEXT);
+    r = cls_cxx_write(hctx, 0, header.length(), &header);
+    if (r < 0) {
+      CLS_ERR("error updating v1 header: %s", cpp_strerror(r).c_str());
+      return r;
+    }
+  } else if (r < 0) {
+    CLS_ERR("failed to read features off disk: %s", cpp_strerror(r).c_str());
+    return r;
+  } else if ((features & RBD_FEATURE_MIGRATING) == 0ULL) {
+    CLS_LOG(10, "migrating feature not set");
+  } else {
+    features &= ~RBD_FEATURE_MIGRATING;
+    bufferlist bl;
+    encode(features, bl);
+    r = cls_cxx_map_set_val(hctx, "features", &bl);
+    if (r < 0) {
+      CLS_ERR("error updating features: %s", cpp_strerror(r).c_str());
+      return r;
+    }
+  }
+
+  return 0;
+}
+
+namespace snapshot {
+
+template<typename L>
+int iterate(cls_method_context_t hctx, L& lambda) {
+  int max_read = RBD_MAX_KEYS_READ;
+  string last_read = RBD_SNAP_KEY_PREFIX;
+  bool more = false;
+  do {
+    map<string, bufferlist> vals;
+    int r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
+                                max_read, &vals, &more);
+    if (r < 0) {
+      return r;
+    }
+
+    cls_rbd_snap snap_meta;
+    for (auto& val : vals) {
+      auto iter = val.second.cbegin();
+      try {
+       decode(snap_meta, iter);
+      } catch (const buffer::error &err) {
+       CLS_ERR("error decoding snapshot metadata for snap : %s",
+               val.first.c_str());
+       return -EIO;
+      }
+
+      r = lambda(snap_meta);
+      if (r < 0) {
+        return r;
+      }
+    }
+
+    if (!vals.empty()) {
+      last_read = vals.rbegin()->first;
+    }
+  } while (more);
+
+  return 0;
+}
+
+int write(cls_method_context_t hctx, const std::string& snap_key,
+          cls_rbd_snap&& snap) {
+  int r;
+  uint64_t encode_features = get_encode_features(hctx);
+  if (snap.migrate_parent_format(encode_features)) {
+    // ensure the normalized parent link exists before removing it from the
+    // snapshot record
+    cls_rbd_parent on_disk_parent;
+    r = read_key(hctx, "parent", &on_disk_parent);
+    if (r < 0 && r != -ENOENT) {
+      return r;
+    }
+
+    if (!on_disk_parent.exists()) {
+      on_disk_parent = snap.parent;
+      on_disk_parent.head_overlap = std::nullopt;
+
+      r = write_key(hctx, "parent", on_disk_parent, encode_features);
+      if (r < 0) {
+        return r;
+      }
+    }
+
+    // only store the parent overlap in the snapshot
+    snap.parent_overlap = snap.parent.head_overlap;
+    snap.parent = {};
+  }
+
+  r = write_key(hctx, snap_key, snap, encode_features);
+  if (r < 0) {
+    return r;
+  }
+  return 0;
+}
+
+} // namespace snapshot
+
+namespace parent {
+
+int attach(cls_method_context_t hctx, cls_rbd_parent parent,
+           bool reattach) {
+  int r = check_exists(hctx);
+  if (r < 0) {
+    CLS_LOG(20, "cls_rbd::image::parent::attach: child doesn't exist");
+    return r;
+  }
+
+  r = image::require_feature(hctx, RBD_FEATURE_LAYERING);
+  if (r < 0) {
+    CLS_LOG(20, "cls_rbd::image::parent::attach: child does not support "
+                "layering");
+    return r;
+  }
+
+  CLS_LOG(20, "cls_rbd::image::parent::attach: pool=%" PRIi64 ", ns=%s, id=%s, "
+              "snapid=%" PRIu64 ", size=%" PRIu64,
+          parent.pool_id, parent.pool_namespace.c_str(),
+          parent.image_id.c_str(), parent.snap_id.val,
+          parent.head_overlap.value_or(0ULL));
+  if (!parent.exists() || parent.head_overlap.value_or(0ULL) == 0ULL) {
+    return -EINVAL;
+  }
+
+  // make sure there isn't already a parent
+  cls_rbd_parent on_disk_parent;
+  r = read_key(hctx, "parent", &on_disk_parent);
+  if (r < 0 && r != -ENOENT) {
+    return r;
+  }
+
+  auto on_disk_parent_without_overlap{on_disk_parent};
+  on_disk_parent_without_overlap.head_overlap = parent.head_overlap;
+
+  if (r == 0 &&
+      (on_disk_parent.head_overlap ||
+       on_disk_parent_without_overlap != parent) &&
+      !reattach) {
+    CLS_LOG(20, "cls_rbd::parent::attach: existing legacy parent "
+                "pool=%" PRIi64 ", ns=%s, id=%s, snapid=%" PRIu64 ", "
+                "overlap=%" PRIu64,
+            on_disk_parent.pool_id, on_disk_parent.pool_namespace.c_str(),
+            on_disk_parent.image_id.c_str(), on_disk_parent.snap_id.val,
+            on_disk_parent.head_overlap.value_or(0ULL));
+    return -EEXIST;
+  }
+
+  // our overlap is the min of our size and the parent's size.
+  uint64_t our_size;
+  r = read_key(hctx, "size", &our_size);
+  if (r < 0) {
+    return r;
+  }
+
+  parent.head_overlap = std::min(*parent.head_overlap, our_size);
+
+  r = write_key(hctx, "parent", parent, get_encode_features(hctx));
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+int detach(cls_method_context_t hctx, bool legacy_api) {
+  int r = check_exists(hctx);
+  if (r < 0) {
+    CLS_LOG(20, "cls_rbd::parent::detach: child doesn't exist");
+    return r;
+  }
+
+  uint64_t features;
+  r = read_key(hctx, "features", &features);
+  if (r == -ENOENT || ((features & RBD_FEATURE_LAYERING) == 0)) {
+    CLS_LOG(20, "cls_rbd::image::parent::detach: child does not support "
+                "layering");
+    return -ENOEXEC;
+  } else if (r < 0) {
+    return r;
+  }
+
+  cls_rbd_parent on_disk_parent;
+  r = read_key(hctx, "parent", &on_disk_parent);
+  if (r < 0) {
+    return r;
+  } else if (legacy_api && !on_disk_parent.pool_namespace.empty()) {
+    return -EXDEV;
+  } else if (!on_disk_parent.head_overlap) {
+    return -ENOENT;
+  }
+
+  auto detach_lambda = [hctx, features](const cls_rbd_snap& snap_meta) {
+    if (snap_meta.parent.pool_id != -1 || snap_meta.parent_overlap) {
+      if ((features & RBD_FEATURE_DEEP_FLATTEN) != 0ULL) {
+        // remove parent reference from snapshot
+        cls_rbd_snap snap_meta_copy = snap_meta;
+        snap_meta_copy.parent = {};
+        snap_meta_copy.parent_overlap = std::nullopt;
+
+        std::string snap_key;
+        key_from_snap_id(snap_meta_copy.id, &snap_key);
+        int r = snapshot::write(hctx, snap_key, std::move(snap_meta_copy));
+        if (r < 0) {
+          return r;
+        }
+      } else {
+        return -EEXIST;
+      }
+    }
+    return 0;
+  };
+
+  r = snapshot::iterate(hctx, detach_lambda);
+  bool has_child_snaps = (r == -EEXIST);
+  if (r < 0 && r != -EEXIST) {
+    return r;
+  }
+
+  int8_t require_osd_release = cls_get_required_osd_release(hctx);
+  if (has_child_snaps && require_osd_release >= CEPH_RELEASE_NAUTILUS) {
+    // remove overlap from HEAD revision but keep spec for snapshots
+    on_disk_parent.head_overlap = std::nullopt;
+    r = write_key(hctx, "parent", on_disk_parent, get_encode_features(hctx));
+    if (r < 0) {
+      return r;
+    }
+  } else {
+    r = remove_key(hctx, "parent");
+    if (r < 0 && r != -ENOENT) {
+      return r;
+    }
+  }
+
+  if (!has_child_snaps) {
+    // disable clone child op feature if no longer associated
+    r = set_op_features(hctx, 0, RBD_OPERATION_FEATURE_CLONE_CHILD);
+    if (r < 0) {
+      return r;
+    }
+  }
+  return 0;
+}
+
+} // namespace parent
+} // namespace image
+
 /**
  * Initialize the header with basic metadata.
  * Extra features may initialize more fields in the future.
@@ -187,13 +758,13 @@ int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   int64_t data_pool_id = -1;
 
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(size, iter);
-    ::decode(order, iter);
-    ::decode(features, iter);
-    ::decode(object_prefix, iter);
+    auto iter = in->cbegin();
+    decode(size, iter);
+    decode(order, iter);
+    decode(features, iter);
+    decode(object_prefix, iter);
     if (!iter.end()) {
-      ::decode(data_pool_id, iter);
+      decode(data_pool_id, iter);
     }
   } catch (const buffer::error &err) {
     return -EINVAL;
@@ -223,15 +794,15 @@ int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   bufferlist featuresbl;
   bufferlist object_prefixbl;
   bufferlist snap_seqbl;
-  bufferlist create_timestampbl;
+  bufferlist timestampbl;
   uint64_t snap_seq = 0;
-  utime_t create_timestamp = ceph_clock_now();
-  ::encode(size, sizebl);
-  ::encode(order, orderbl);
-  ::encode(features, featuresbl);
-  ::encode(object_prefix, object_prefixbl);
-  ::encode(snap_seq, snap_seqbl);
-  ::encode(create_timestamp, create_timestampbl);
+  utime_t timestamp = ceph_clock_now();
+  encode(size, sizebl);
+  encode(order, orderbl);
+  encode(features, featuresbl);
+  encode(object_prefix, object_prefixbl);
+  encode(snap_seq, snap_seqbl);
+  encode(timestamp, timestampbl);
 
   map<string, bufferlist> omap_vals;
   omap_vals["size"] = sizebl;
@@ -239,7 +810,14 @@ int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   omap_vals["features"] = featuresbl;
   omap_vals["object_prefix"] = object_prefixbl;
   omap_vals["snap_seq"] = snap_seqbl;
-  omap_vals["create_timestamp"] = create_timestampbl;
+  omap_vals["create_timestamp"] = timestampbl;
+  omap_vals["access_timestamp"] = timestampbl;
+  omap_vals["modify_timestamp"] = timestampbl;
+
+  if ((features & RBD_FEATURE_OPERATIONS) != 0ULL) {
+    CLS_ERR("Attempting to set internal feature: operations");
+    return -EINVAL;
+  }
 
   if (features & RBD_FEATURE_DATA_POOL) {
     if (data_pool_id == -1) {
@@ -248,7 +826,7 @@ int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     }
 
     bufferlist data_pool_id_bl;
-    ::encode(data_pool_id, data_pool_id_bl);
+    encode(data_pool_id, data_pool_id_bl);
     omap_vals["data_pool_id"] = data_pool_id_bl;
   } else if (data_pool_id != -1) {
     CLS_ERR("data pool provided with feature disabled");
@@ -274,34 +852,20 @@ int create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
  */
 int get_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
-  uint64_t snap_id;
   bool read_only = false;
 
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   try {
-    ::decode(snap_id, iter);
+    uint64_t snap_id;
+    decode(snap_id, iter);
     if (!iter.end()) {
-      ::decode(read_only, iter);
+      decode(read_only, iter);
     }
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
 
-  CLS_LOG(20, "get_features snap_id=%" PRIu64 ", read_only=%d",
-          snap_id, read_only);
-
-  // NOTE: keep this deprecated snapshot logic to support negative
-  // test cases in older (pre-Infernalis) releases. Remove once older
-  // releases are no longer supported.
-  if (snap_id != CEPH_NOSNAP) {
-    cls_rbd_snap snap;
-    string snapshot_key;
-    key_from_snap_id(snap_id, &snapshot_key);
-    int r = read_key(hctx, snapshot_key, &snap);
-    if (r < 0) {
-      return r;
-    }
-  }
+  CLS_LOG(20, "get_features read_only=%d", read_only);
 
   uint64_t features;
   int r = read_key(hctx, "features", &features);
@@ -312,8 +876,8 @@ int get_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 
   uint64_t incompatible = (read_only ? features & RBD_FEATURES_INCOMPATIBLE :
                                       features & RBD_FEATURES_RW_INCOMPATIBLE);
-  ::encode(features, *out);
-  ::encode(incompatible, *out);
+  encode(features, *out);
+  encode(incompatible, *out);
   return 0;
 }
 
@@ -333,10 +897,10 @@ int set_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   uint64_t features;
   uint64_t mask;
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   try {
-    ::decode(features, iter);
-    ::decode(mask, iter);
+    decode(features, iter);
+    decode(mask, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -351,6 +915,12 @@ int set_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     return r;
   }
 
+  if ((mask & RBD_FEATURES_INTERNAL) != 0ULL) {
+    CLS_ERR("Attempting to set internal feature: %" PRIu64,
+            static_cast<uint64_t>(mask & RBD_FEATURES_INTERNAL));
+    return -EINVAL;
+  }
+
   // newer clients might attempt to mask off features we don't support
   mask &= RBD_FEATURES_ALL;
 
@@ -374,34 +944,11 @@ int set_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
           features, orig_features);
 
   bufferlist bl;
-  ::encode(features, bl);
+  encode(features, bl);
   r = cls_cxx_map_set_val(hctx, "features", &bl);
-  if (r < 0) {
-    CLS_ERR("error updating features: %s", cpp_strerror(r).c_str());
-    return r;
-  }
-  return 0;
-}
-
-/**
- * check that given feature(s) are set
- *
- * @param hctx context
- * @param need features needed
- * @return 0 if features are set, negative error (like ENOEXEC) otherwise
- */
-int require_feature(cls_method_context_t hctx, uint64_t need)
-{
-  uint64_t features;
-  int r = read_key(hctx, "features", &features);
-  if (r == -ENOENT)   // this implies it's an old-style image with no features
-    return -ENOEXEC;
-  if (r < 0)
-    return r;
-  if ((features & need) != need) {
-    CLS_LOG(10, "require_feature missing feature %llx, have %llx",
-            (unsigned long long)need, (unsigned long long)features);
-    return -ENOEXEC;
+  if (r < 0) {
+    CLS_ERR("error updating features: %s", cpp_strerror(r).c_str());
+    return r;
   }
   return 0;
 }
@@ -420,9 +967,9 @@ int get_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   uint64_t snap_id, size;
   uint8_t order;
 
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   try {
-    ::decode(snap_id, iter);
+    decode(snap_id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -452,8 +999,8 @@ int get_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     size = snap.image_size;
   }
 
-  ::encode(order, *out);
-  ::encode(size, *out);
+  encode(order, *out);
+  encode(size, *out);
 
   return 0;
 }
@@ -469,9 +1016,9 @@ int set_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   uint64_t size;
 
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   try {
-    ::decode(size, iter);
+    decode(size, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -489,7 +1036,7 @@ int set_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
           (unsigned long long)orig_size);
 
   bufferlist sizebl;
-  ::encode(size, sizebl);
+  encode(size, sizebl);
   r = cls_cxx_map_set_val(hctx, "size", &sizebl);
   if (r < 0) {
     CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
@@ -505,13 +1052,10 @@ int set_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
       r = 0;
     if (r < 0)
       return r;
-    if (parent.exists() && parent.overlap > size) {
-      bufferlist parentbl;
-      parent.overlap = size;
-      ::encode(parent, parentbl);
-      r = cls_cxx_map_set_val(hctx, "parent", &parentbl);
+    if (parent.exists() && parent.head_overlap.value_or(0ULL) > size) {
+      parent.head_overlap = size;
+      r = write_key(hctx, "parent", parent, get_encode_features(hctx));
       if (r < 0) {
-       CLS_ERR("error writing parent: %s", cpp_strerror(r).c_str());
        return r;
       }
     }
@@ -520,18 +1064,6 @@ int set_size(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   return 0;
 }
 
-/**
- * verify that the header object exists
- *
- * @return 0 if the object exists, -ENOENT if it does not, or other error
- */
-int check_exists(cls_method_context_t hctx)
-{
-  uint64_t size;
-  time_t mtime;
-  return cls_cxx_stat(hctx, &size, &mtime);
-}
-
 /**
  * get the current protection status of the specified snapshot
  *
@@ -550,9 +1082,9 @@ int get_protection_status(cls_method_context_t hctx, bufferlist *in,
 {
   snapid_t snap_id;
 
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   try {
-    ::decode(snap_id, iter);
+    decode(snap_id, iter);
   } catch (const buffer::error &err) {
     CLS_LOG(20, "get_protection_status: invalid decode");
     return -EINVAL;
@@ -583,7 +1115,7 @@ int get_protection_status(cls_method_context_t hctx, bufferlist *in,
     return -EIO;
   }
 
-  ::encode(snap.protection_status, *out);
+  encode(snap.protection_status, *out);
   return 0;
 }
 
@@ -604,10 +1136,10 @@ int set_protection_status(cls_method_context_t hctx, bufferlist *in,
   snapid_t snap_id;
   uint8_t status;
 
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   try {
-    ::decode(snap_id, iter);
-    ::decode(status, iter);
+    decode(snap_id, iter);
+    decode(status, iter);
   } catch (const buffer::error &err) {
     CLS_LOG(20, "set_protection_status: invalid decode");
     return -EINVAL;
@@ -617,7 +1149,7 @@ int set_protection_status(cls_method_context_t hctx, bufferlist *in,
   if (r < 0)
     return r;
 
-  r = require_feature(hctx, RBD_FEATURE_LAYERING);
+  r = image::require_feature(hctx, RBD_FEATURE_LAYERING);
   if (r < 0) {
     CLS_LOG(20, "image does not support layering");
     return r;
@@ -645,11 +1177,8 @@ int set_protection_status(cls_method_context_t hctx, bufferlist *in,
   }
 
   snap.protection_status = status;
-  bufferlist snapshot_bl;
-  ::encode(snap, snapshot_bl);
-  r = cls_cxx_map_set_val(hctx, snapshot_key, &snapshot_bl);
+  r = image::snapshot::write(hctx, snapshot_key, std::move(snap));
   if (r < 0) {
-    CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
     return r;
   }
 
@@ -676,7 +1205,7 @@ int get_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist
 
   CLS_LOG(20, "get_stripe_unit_count");
 
-  r = require_feature(hctx, RBD_FEATURE_STRIPINGV2);
+  r = image::require_feature(hctx, RBD_FEATURE_STRIPINGV2);
   if (r < 0)
     return r;
 
@@ -703,8 +1232,8 @@ int get_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist
   if (r < 0)
     return r;
 
-  ::encode(stripe_unit, *out);
-  ::encode(stripe_count, *out);
+  encode(stripe_unit, *out);
+  encode(stripe_count, *out);
   return 0;
 }
 
@@ -721,10 +1250,10 @@ int set_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist
 {
   uint64_t stripe_unit, stripe_count;
 
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   try {
-    ::decode(stripe_unit, iter);
-    ::decode(stripe_count, iter);
+    decode(stripe_unit, iter);
+    decode(stripe_count, iter);
   } catch (const buffer::error &err) {
     CLS_LOG(20, "set_stripe_unit_count: invalid decode");
     return -EINVAL;
@@ -739,7 +1268,7 @@ int set_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist
 
   CLS_LOG(20, "set_stripe_unit_count");
 
-  r = require_feature(hctx, RBD_FEATURE_STRIPINGV2);
+  r = image::require_feature(hctx, RBD_FEATURE_STRIPINGV2);
   if (r < 0)
     return r;
 
@@ -756,14 +1285,14 @@ int set_stripe_unit_count(cls_method_context_t hctx, bufferlist *in, bufferlist
   }
 
   bufferlist bl, bl2;
-  ::encode(stripe_unit, bl);
+  encode(stripe_unit, bl);
   r = cls_cxx_map_set_val(hctx, "stripe_unit", &bl);
   if (r < 0) {
     CLS_ERR("error writing stripe_unit metadata: %s", cpp_strerror(r).c_str());
     return r;
   }
 
-  ::encode(stripe_count, bl2);
+  encode(stripe_count, bl2);
   r = cls_cxx_map_set_val(hctx, "stripe_count", &bl2);
   if (r < 0) {
     CLS_ERR("error writing stripe_count metadata: %s", cpp_strerror(r).c_str());
@@ -787,18 +1316,93 @@ int get_create_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *
     }
   } else {
     try {
-      bufferlist::iterator it = bl.begin();
-      ::decode(timestamp, it);
+      auto it = bl.cbegin();
+      decode(timestamp, it);
     } catch (const buffer::error &err) {
       CLS_ERR("could not decode create_timestamp");
       return -EIO;
     }
   }
 
-  ::encode(timestamp, *out);
+  encode(timestamp, *out);
+  return 0;
+}
+
+/**
+ * get the image access timestamp
+ *
+ * Input:
+ * @param none
+ *
+ * Output:
+ * @param timestamp the image access timestamp
+ *
+ * @returns 0 on success, negative error code upon failure
+ */
+int get_access_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "get_access_timestamp");
+
+  utime_t timestamp;
+  bufferlist bl;
+  int r = cls_cxx_map_get_val(hctx, "access_timestamp", &bl);
+  if (r < 0) {
+    if (r != -ENOENT) {
+      CLS_ERR("error reading access_timestamp: %s", cpp_strerror(r).c_str());
+      return r;
+    }
+  } else {
+    try {
+      auto it = bl.cbegin();
+      decode(timestamp, it);
+    } catch (const buffer::error &err) {
+      CLS_ERR("could not decode access_timestamp");
+      return -EIO;
+    }
+  }
+
+  encode(timestamp, *out);
+  return 0;
+}
+
+/**
+ * get the image modify timestamp
+ *
+ * Input:
+ * @param none
+ *
+ * Output:
+ * @param timestamp the image modify timestamp
+ *
+ * @returns 0 on success, negative error code upon failure
+ */
+int get_modify_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "get_modify_timestamp");
+
+  utime_t timestamp;
+  bufferlist bl;
+  int r = cls_cxx_map_get_val(hctx, "modify_timestamp", &bl);
+  if (r < 0) {
+    if (r != -ENOENT) {
+      CLS_ERR("error reading modify_timestamp: %s", cpp_strerror(r).c_str());
+      return r;
+    }
+  } else {
+    try {
+      auto it = bl.cbegin();
+      decode(timestamp, it);
+    } catch (const buffer::error &err) {
+      CLS_ERR("could not decode modify_timestamp");
+      return -EIO;
+    }
+  }
+
+  encode(timestamp, *out);
   return 0;
 }
 
+
 /**
  * get the image flags
  *
@@ -813,9 +1417,9 @@ int get_create_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *
 int get_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   uint64_t snap_id;
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   try {
-    ::decode(snap_id, iter);
+    decode(snap_id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -840,7 +1444,7 @@ int get_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     flags = snap.flags;
   }
 
-  ::encode(flags, *out);
+  encode(flags, *out);
   return 0;
 }
 
@@ -862,12 +1466,12 @@ int set_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   uint64_t flags;
   uint64_t mask;
   uint64_t snap_id = CEPH_NOSNAP;
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   try {
-    ::decode(flags, iter);
-    ::decode(mask, iter);
+    decode(flags, iter);
+    decode(mask, iter);
     if (!iter.end()) {
-      ::decode(snap_id, iter);
+      decode(snap_id, iter);
     }
   } catch (const buffer::error &err) {
     return -EINVAL;
@@ -903,24 +1507,75 @@ int set_flags(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
               flags, mask);
 
   if (snap_id == CEPH_NOSNAP) {
-    bufferlist bl;
-    ::encode(flags, bl);
-    r = cls_cxx_map_set_val(hctx, "flags", &bl);
+    r = write_key(hctx, "flags", flags);
   } else {
     snap_meta.flags = flags;
-
-    bufferlist bl;
-    ::encode(snap_meta, bl);
-    r = cls_cxx_map_set_val(hctx, snap_meta_key, &bl);
+    r = image::snapshot::write(hctx, snap_meta_key, std::move(snap_meta));
   }
 
   if (r < 0) {
-    CLS_ERR("error updating flags: %s", cpp_strerror(r).c_str());
     return r;
   }
   return 0;
 }
 
+/**
+ * Get the operation-based image features
+ *
+ * Input:
+ *
+ * Output:
+ * @param bitmask of enabled op features (uint64_t)
+ * @returns 0 on success, negative error code on failure
+ */
+int op_features_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "op_features_get");
+
+  uint64_t op_features = 0;
+  int r = read_key(hctx, "op_features", &op_features);
+  if (r < 0 && r != -ENOENT) {
+    CLS_ERR("failed to read op features off disk: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  encode(op_features, *out);
+  return 0;
+}
+
+/**
+ * Set the operation-based image features
+ *
+ * Input:
+ * @param op_features image op features
+ * @param mask image op feature mask
+ *
+ * Output:
+ * none
+ *
+ * @returns 0 on success, negative error code upon failure
+ */
+int op_features_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  uint64_t op_features;
+  uint64_t mask;
+  auto iter = in->cbegin();
+  try {
+    decode(op_features, iter);
+    decode(mask, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  uint64_t unsupported_op_features = (mask & ~RBD_OPERATION_FEATURES_ALL);
+  if (unsupported_op_features != 0ULL) {
+    CLS_ERR("unsupported op features: %" PRIu64, unsupported_op_features);
+    return -EINVAL;
+  }
+
+  return image::set_op_features(hctx, op_features, mask);
+}
+
 /**
  * get the current parent, if any
  *
@@ -939,41 +1594,62 @@ int get_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   uint64_t snap_id;
 
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   try {
-    ::decode(snap_id, iter);
+    decode(snap_id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
 
   int r = check_exists(hctx);
-  if (r < 0)
+  if (r < 0) {
     return r;
+  }
 
-  CLS_LOG(20, "get_parent snap_id=%llu", (unsigned long long)snap_id);
+  CLS_LOG(20, "get_parent snap_id=%" PRIu64, snap_id);
 
   cls_rbd_parent parent;
-  r = require_feature(hctx, RBD_FEATURE_LAYERING);
+  r = image::require_feature(hctx, RBD_FEATURE_LAYERING);
   if (r == 0) {
-    if (snap_id == CEPH_NOSNAP) {
-      r = read_key(hctx, "parent", &parent);
-      if (r < 0 && r != -ENOENT)
-       return r;
-    } else {
+    r = read_key(hctx, "parent", &parent);
+    if (r < 0 && r != -ENOENT) {
+      return r;
+    } else if (!parent.pool_namespace.empty()) {
+      return -EXDEV;
+    }
+
+    if (snap_id != CEPH_NOSNAP) {
       cls_rbd_snap snap;
-      string snapshot_key;
+      std::string snapshot_key;
       key_from_snap_id(snap_id, &snapshot_key);
       r = read_key(hctx, snapshot_key, &snap);
-      if (r < 0 && r != -ENOENT)
+      if (r < 0 && r != -ENOENT) {
        return r;
-      parent = snap.parent;
+      }
+
+      if (snap.parent.exists()) {
+        // legacy format where full parent spec is written within
+        // each snapshot record
+        parent = snap.parent;
+      } else if (snap.parent_overlap) {
+        // normalized parent reference
+        if (!parent.exists()) {
+          CLS_ERR("get_parent: snap_id=%" PRIu64 ": invalid parent spec",
+                  snap_id);
+          return -EINVAL;
+        }
+        parent.head_overlap = *snap.parent_overlap;
+      } else {
+        // snapshot doesn't have associated parent
+        parent = {};
+      }
     }
   }
 
-  ::encode(parent.pool, *out);
-  ::encode(parent.id, *out);
-  ::encode(parent.snapid, *out);
-  ::encode(parent.overlap, *out);
+  encode(parent.pool_id, *out);
+  encode(parent.image_id, *out);
+  encode(parent.snap_id, *out);
+  encode(parent.head_overlap.value_or(0ULL), *out);
   return 0;
 }
 
@@ -990,68 +1666,23 @@ int get_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
  */
 int set_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
-  int64_t pool;
-  string id;
-  snapid_t snapid;
-  uint64_t size;
-
-  bufferlist::iterator iter = in->begin();
+  cls_rbd_parent parent;
+  auto iter = in->cbegin();
   try {
-    ::decode(pool, iter);
-    ::decode(id, iter);
-    ::decode(snapid, iter);
-    ::decode(size, iter);
+    decode(parent.pool_id, iter);
+    decode(parent.image_id, iter);
+    decode(parent.snap_id, iter);
+
+    uint64_t overlap;
+    decode(overlap, iter);
+    parent.head_overlap = overlap;
   } catch (const buffer::error &err) {
     CLS_LOG(20, "cls_rbd::set_parent: invalid decode");
     return -EINVAL;
   }
 
-  int r = check_exists(hctx);
-  if (r < 0) {
-    CLS_LOG(20, "cls_rbd::set_parent: child already exists");
-    return r;
-  }
-
-  r = require_feature(hctx, RBD_FEATURE_LAYERING);
-  if (r < 0) {
-    CLS_LOG(20, "cls_rbd::set_parent: child does not support layering");
-    return r;
-  }
-
-  CLS_LOG(20, "set_parent pool=%llu id=%s snapid=%llu size=%llu",
-         (unsigned long long)pool, id.c_str(), (unsigned long long)snapid.val,
-         (unsigned long long)size);
-
-  if (pool < 0 || id.length() == 0 || snapid == CEPH_NOSNAP || size == 0) {
-    return -EINVAL;
-  }
-
-  // make sure there isn't already a parent
-  cls_rbd_parent parent;
-  r = read_key(hctx, "parent", &parent);
-  if (r == 0) {
-    CLS_LOG(20, "set_parent existing parent pool=%llu id=%s snapid=%llu"
-            "overlap=%llu", (unsigned long long)parent.pool, parent.id.c_str(),
-           (unsigned long long)parent.snapid.val,
-           (unsigned long long)parent.overlap);
-    return -EEXIST;
-  }
-
-  // our overlap is the min of our size and the parent's size.
-  uint64_t our_size;
-  r = read_key(hctx, "size", &our_size);
-  if (r < 0)
-    return r;
-
-  bufferlist parentbl;
-  parent.pool = pool;
-  parent.id = id;
-  parent.snapid = snapid;
-  parent.overlap = MIN(our_size, size);
-  ::encode(parent, parentbl);
-  r = cls_cxx_map_set_val(hctx, "parent", &parentbl);
+  int r = image::parent::attach(hctx, parent, false);
   if (r < 0) {
-    CLS_ERR("error writing parent: %s", cpp_strerror(r).c_str());
     return r;
   }
 
@@ -1068,91 +1699,176 @@ int set_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
  */
 int remove_parent(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
-  int r = check_exists(hctx);
-  if (r < 0)
+  int r = image::parent::detach(hctx, true);
+  if (r < 0) {
     return r;
+  }
 
-  r = require_feature(hctx, RBD_FEATURE_LAYERING);
-  if (r < 0)
-    return r;
+  return 0;
+}
 
-  uint64_t features;
-  r = read_key(hctx, "features", &features);
+/**
+ * Input:
+ * none
+ *
+ * Output:
+ * @param parent spec (cls::rbd::ParentImageSpec)
+ * @returns 0 on success, negative error code on failure
+ */
+int parent_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+  int r = check_exists(hctx);
   if (r < 0) {
     return r;
   }
 
-  // remove the parent from all snapshots
-  if ((features & RBD_FEATURE_DEEP_FLATTEN) != 0) {
-    int max_read = RBD_MAX_KEYS_READ;
-    vector<snapid_t> snap_ids;
-    string last_read = RBD_SNAP_KEY_PREFIX;
-    bool more;
+  CLS_LOG(20, "parent_get");
 
-    do {
-      set<string> keys;
-      r = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys, &more);
+  cls_rbd_parent parent;
+  r = image::require_feature(hctx, RBD_FEATURE_LAYERING);
+  if (r == 0) {
+    r = read_key(hctx, "parent", &parent);
+    if (r < 0 && r != -ENOENT) {
+      return r;
+    } else if (r == -ENOENT) {
+      // examine oldest snapshot to see if it has a denormalized parent
+      auto parent_lambda = [hctx, &parent](const cls_rbd_snap& snap_meta) {
+        if (snap_meta.parent.exists()) {
+          parent = snap_meta.parent;
+        }
+        return 0;
+      };
+
+      r = image::snapshot::iterate(hctx, parent_lambda);
       if (r < 0) {
         return r;
       }
+    }
+  }
 
-      for (std::set<string>::const_iterator it = keys.begin();
-           it != keys.end(); ++it) {
-        if ((*it).find(RBD_SNAP_KEY_PREFIX) != 0) {
-         break;
-        }
+  cls::rbd::ParentImageSpec parent_image_spec{
+    parent.pool_id, parent.pool_namespace, parent.image_id,
+    parent.snap_id};
+  encode(parent_image_spec, *out);
+  return 0;
+}
 
-        uint64_t snap_id = snap_id_from_key(*it);
-        cls_rbd_snap snap_meta;
-        r = read_key(hctx, *it, &snap_meta);
-        if (r < 0) {
-          CLS_ERR("Could not read snapshot: snap_id=%" PRIu64 ": %s",
-                  snap_id, cpp_strerror(r).c_str());
-          return r;
-        }
+/**
+ * Input:
+ * @param snap id (uint64_t) parent snapshot id
+ *
+ * Output:
+ * @param byte overlap of parent image (std::optional<uint64_t>)
+ * @returns 0 on success, negative error code on failure
+ */
+int parent_overlap_get(cls_method_context_t hctx, bufferlist *in,
+                       bufferlist *out) {
+  uint64_t snap_id;
+  auto iter = in->cbegin();
+  try {
+    decode(snap_id, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
 
-        snap_meta.parent = cls_rbd_parent();
+  int r = check_exists(hctx);
+  CLS_LOG(20, "parent_overlap_get");
 
-        bufferlist bl;
-        ::encode(snap_meta, bl);
-        r = cls_cxx_map_set_val(hctx, *it, &bl);
-        if (r < 0) {
-          CLS_ERR("Could not update snapshot: snap_id=%" PRIu64 ": %s",
-                  snap_id, cpp_strerror(r).c_str());
-          return r;
-        }
+  std::optional<uint64_t> parent_overlap = std::nullopt;
+  r = image::require_feature(hctx, RBD_FEATURE_LAYERING);
+  if (r == 0) {
+    if (snap_id == CEPH_NOSNAP) {
+      cls_rbd_parent parent;
+      r = read_key(hctx, "parent", &parent);
+      if (r < 0 && r != -ENOENT) {
+        return r;
+      } else if (r == 0) {
+        parent_overlap = parent.head_overlap;
+      }
+    } else {
+      cls_rbd_snap snap;
+      std::string snapshot_key;
+      key_from_snap_id(snap_id, &snapshot_key);
+      r = read_key(hctx, snapshot_key, &snap);
+      if (r < 0) {
+        return r;
       }
 
-      if (!keys.empty()) {
-        last_read = *(keys.rbegin());
+      if (snap.parent_overlap) {
+        parent_overlap = snap.parent_overlap;
+      } else if (snap.parent.exists()) {
+        // legacy format where full parent spec is written within
+        // each snapshot record
+        parent_overlap = snap.parent.head_overlap;
       }
-    } while (more);
+    }
+  };
+
+  encode(parent_overlap, *out);
+  return 0;
+}
+
+/**
+ * Input:
+ * @param parent spec (cls::rbd::ParentImageSpec)
+ * @param size parent size (uint64_t)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int parent_attach(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+  cls::rbd::ParentImageSpec parent_image_spec;
+  uint64_t parent_overlap;
+  bool reattach = false;
+
+  auto iter = in->cbegin();
+  try {
+    decode(parent_image_spec, iter);
+    decode(parent_overlap, iter);
+    if (!iter.end()) {
+      decode(reattach, iter);
+    }
+  } catch (const buffer::error &err) {
+    CLS_LOG(20, "cls_rbd::parent_attach: invalid decode");
+    return -EINVAL;
   }
 
-  cls_rbd_parent parent;
-  r = read_key(hctx, "parent", &parent);
-  if (r < 0)
+  int r = image::parent::attach(hctx, {parent_image_spec, parent_overlap},
+                                reattach);
+  if (r < 0) {
     return r;
+  }
 
-  r = cls_cxx_map_remove_key(hctx, "parent");
+  return 0;
+}
+
+/**
+ * Input:
+ * none
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int parent_detach(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+  int r = image::parent::detach(hctx, false);
   if (r < 0) {
-    CLS_ERR("error removing parent: %s", cpp_strerror(r).c_str());
     return r;
   }
+
   return 0;
 }
 
+
 /**
  * methods for dealing with rbd_children object
  */
 
-static int decode_parent_common(bufferlist::iterator& it, uint64_t *pool_id,
+static int decode_parent_common(bufferlist::const_iterator& it, uint64_t *pool_id,
                                string *image_id, snapid_t *snap_id)
 {
   try {
-    ::decode(*pool_id, it);
-    ::decode(*image_id, it);
-    ::decode(*snap_id, it);
+    decode(*pool_id, it);
+    decode(*image_id, it);
+    decode(*snap_id, it);
   } catch (const buffer::error &err) {
     CLS_ERR("error decoding parent spec");
     return -EINVAL;
@@ -1163,7 +1879,7 @@ static int decode_parent_common(bufferlist::iterator& it, uint64_t *pool_id,
 static int decode_parent(bufferlist *in, uint64_t *pool_id,
                         string *image_id, snapid_t *snap_id)
 {
-  bufferlist::iterator it = in->begin();
+  auto it = in->cbegin();
   return decode_parent_common(it, pool_id, image_id, snap_id);
 }
 
@@ -1171,12 +1887,12 @@ static int decode_parent_and_child(bufferlist *in, uint64_t *pool_id,
                                   string *image_id, snapid_t *snap_id,
                                   string *c_image_id)
 {
-  bufferlist::iterator it = in->begin();
+  auto it = in->cbegin();
   int r = decode_parent_common(it, pool_id, image_id, snap_id);
   if (r < 0)
     return r;
   try {
-    ::decode(*c_image_id, it);
+    decode(*c_image_id, it);
   } catch (const buffer::error &err) {
     CLS_ERR("error decoding child image id");
     return -EINVAL;
@@ -1187,9 +1903,9 @@ static int decode_parent_and_child(bufferlist *in, uint64_t *pool_id,
 static string parent_key(uint64_t pool_id, string image_id, snapid_t snap_id)
 {
   bufferlist key_bl;
-  ::encode(pool_id, key_bl);
-  ::encode(image_id, key_bl);
-  ::encode(snap_id, key_bl);
+  encode(pool_id, key_bl);
+  encode(image_id, key_bl);
+  encode(snap_id, key_bl);
   return string(key_bl.c_str(), key_bl.length());
 }
 
@@ -1244,7 +1960,7 @@ int add_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 
   // write back
   bufferlist childbl;
-  ::encode(children, childbl);
+  encode(children, childbl);
   r = cls_cxx_map_set_val(hctx, key, &childbl);
   if (r < 0)
     CLS_LOG(20, "add_child: omap write failed: %s", cpp_strerror(r).c_str());
@@ -1306,7 +2022,7 @@ int remove_child(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   } else {
     // write back shortened children list
     bufferlist childbl;
-    ::encode(children, childbl);
+    encode(children, childbl);
     r = cls_cxx_map_set_val(hctx, key, &childbl);
     if (r < 0)
       CLS_LOG(20, "remove_child: write omap failed: %s", cpp_strerror(r).c_str());
@@ -1349,7 +2065,7 @@ int get_children(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
       CLS_LOG(20, "get_children: read omap failed: %s", cpp_strerror(r).c_str());
     return r;
   }
-  ::encode(children, *out);
+  encode(children, *out);
   return 0;
 }
 
@@ -1400,8 +2116,8 @@ int get_snapcontext(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   // snap_ids must be descending in a snap context
   std::reverse(snap_ids.begin(), snap_ids.end());
 
-  ::encode(snap_seq, *out);
-  ::encode(snap_ids, *out);
+  encode(snap_seq, *out);
+  encode(snap_ids, *out);
 
   return 0;
 }
@@ -1423,7 +2139,7 @@ int get_object_prefix(cls_method_context_t hctx, bufferlist *in, bufferlist *out
     return r;
   }
 
-  ::encode(object_prefix, *out);
+  encode(object_prefix, *out);
 
   return 0;
 }
@@ -1449,17 +2165,25 @@ int get_data_pool(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     return r;
   }
 
-  ::encode(data_pool_id, *out);
+  encode(data_pool_id, *out);
   return 0;
 }
 
+/**
+ * Input:
+ * @param snap_id which snapshot to query
+ *
+ * Output:
+ * @param name (string) of the snapshot
+ * @returns 0 on success, negative error code on failure
+ */
 int get_snapshot_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   uint64_t snap_id;
 
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   try {
-    ::decode(snap_id, iter);
+    decode(snap_id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -1476,18 +2200,28 @@ int get_snapshot_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out
   if (r < 0)
     return r;
 
-  ::encode(snap.name, *out);
+  encode(snap.name, *out);
 
   return 0;
 }
 
+/**
+ * Input:
+ * @param snap_id which snapshot to query
+ *
+ * Output:
+ * @param timestamp (utime_t) of the snapshot
+ * @returns 0 on success, negative error code on failure
+ *
+ * NOTE: deprecated - remove this method after Luminous is unsupported
+ */
 int get_snapshot_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   uint64_t snap_id;
-  
-  bufferlist::iterator iter = in->begin();
+
+  auto iter = in->cbegin();
   try {
-    ::decode(snap_id, iter);
+    decode(snap_id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -1497,7 +2231,7 @@ int get_snapshot_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist
   if (snap_id == CEPH_NOSNAP) {
     return -EINVAL;
   }
-  
+
   cls_rbd_snap snap;
   string snapshot_key;
   key_from_snap_id(snap_id, &snapshot_key);
@@ -1506,33 +2240,30 @@ int get_snapshot_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist
     return r;
   }
 
-  ::encode(snap.timestamp, *out);
+  encode(snap.timestamp, *out);
   return 0;
 }
 
 /**
- * Retrieve namespace of a snapshot.
- *
  * Input:
- * @param snap_id id of the snapshot (uint64_t)
+ * @param snap_id which snapshot to query
  *
  * Output:
- * @param SnapshotNamespace
- * @returns 0 on success, negative error code on failure.
+ * @param snapshot (cls::rbd::SnapshotInfo)
+ * @returns 0 on success, negative error code on failure
  */
-int get_snapshot_namespace(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int snapshot_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   uint64_t snap_id;
 
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   try {
-    ::decode(snap_id, iter);
+    decode(snap_id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
 
-  CLS_LOG(20, "get_snapshot_namespace snap_id=%" PRIu64, snap_id);
-
+  CLS_LOG(20, "snapshot_get snap_id=%llu", (unsigned long long)snap_id);
   if (snap_id == CEPH_NOSNAP) {
     return -EINVAL;
   }
@@ -1545,8 +2276,10 @@ int get_snapshot_namespace(cls_method_context_t hctx, bufferlist *in, bufferlist
     return r;
   }
 
-  ::encode(snap.snapshot_namespace, *out);
-
+  cls::rbd::SnapshotInfo snapshot_info{snap.id, snap.snapshot_namespace,
+                                       snap.name, snap.image_size,
+                                       snap.timestamp, snap.child_count};
+  encode(snapshot_info, *out);
   return 0;
 }
 
@@ -1556,7 +2289,7 @@ int get_snapshot_namespace(cls_method_context_t hctx, bufferlist *in, bufferlist
  * Input:
  * @param snap_name name of the snapshot (string)
  * @param snap_id id of the snapshot (uint64_t)
- * @param snap_namespace namespace of the snapshot (cls::rbd::SnapshotNamespaceOnDisk)
+ * @param snap_namespace namespace of the snapshot (cls::rbd::SnapshotNamespace)
  *
  * Output:
  * @returns 0 on success, negative error code on failure.
@@ -1570,18 +2303,18 @@ int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   uint64_t snap_limit;
 
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(snap_meta.name, iter);
-    ::decode(snap_meta.id, iter);
+    auto iter = in->cbegin();
+    decode(snap_meta.name, iter);
+    decode(snap_meta.id, iter);
     if (!iter.end()) {
-      ::decode(snap_meta.snapshot_namespace, iter);
+      decode(snap_meta.snapshot_namespace, iter);
     }
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
 
   if (boost::get<cls::rbd::UnknownSnapshotNamespace>(
-        &snap_meta.snapshot_namespace.snapshot_namespace) != nullptr) {
+        &snap_meta.snapshot_namespace) != nullptr) {
     CLS_ERR("Unknown snapshot namespace provided");
     return -EINVAL;
   }
@@ -1609,11 +2342,6 @@ int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     CLS_ERR("Could not read image's size off disk: %s", cpp_strerror(r).c_str());
     return r;
   }
-  r = read_key(hctx, "features", &snap_meta.features);
-  if (r < 0) {
-    CLS_ERR("Could not read image's features off disk: %s", cpp_strerror(r).c_str());
-    return r;
-  }
   r = read_key(hctx, "flags", &snap_meta.flags);
   if (r < 0 && r != -ENOENT) {
     CLS_ERR("Could not read image's flags off disk: %s", cpp_strerror(r).c_str());
@@ -1630,77 +2358,68 @@ int snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 
   snap_meta.timestamp = ceph_clock_now();
 
-  int max_read = RBD_MAX_KEYS_READ;
   uint64_t total_read = 0;
-  string last_read = RBD_SNAP_KEY_PREFIX;
-  bool more;
-  do {
-    map<string, bufferlist> vals;
-    r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
-                            max_read, &vals, &more);
-    if (r < 0)
-      return r;
-
-    total_read += vals.size();
-    if (total_read >= snap_limit) {
-      CLS_ERR("Attempt to create snapshot over limit of %" PRIu64, snap_limit);
-      return -EDQUOT;
-    }
-
-    for (map<string, bufferlist>::iterator it = vals.begin();
-        it != vals.end(); ++it) {
-      cls_rbd_snap old_meta;
-      bufferlist::iterator iter = it->second.begin();
-      try {
-       ::decode(old_meta, iter);
-      } catch (const buffer::error &err) {
-       snapid_t snap_id = snap_id_from_key(it->first);
-       CLS_ERR("error decoding snapshot metadata for snap_id: %llu",
-               (unsigned long long)snap_id.val);
-       return -EIO;
+  auto pre_check_lambda =
+    [&snap_meta, &total_read, snap_limit](const cls_rbd_snap& old_meta) {
+      ++total_read;
+      if (total_read >= snap_limit) {
+        CLS_ERR("Attempt to create snapshot over limit of %" PRIu64,
+                snap_limit);
+        return -EDQUOT;
       }
+
       if ((snap_meta.name == old_meta.name &&
            snap_meta.snapshot_namespace == old_meta.snapshot_namespace) ||
          snap_meta.id == old_meta.id) {
-       CLS_LOG(20, "snap_name %s or snap_id %llu matches existing snap %s %llu",
-               snap_meta.name.c_str(), (unsigned long long)snap_meta.id.val,
-               old_meta.name.c_str(), (unsigned long long)old_meta.id.val);
+       CLS_LOG(20, "snap_name %s or snap_id %" PRIu64 " matches existing snap "
+                "%s %" PRIu64, snap_meta.name.c_str(), snap_meta.id.val,
+               old_meta.name.c_str(), old_meta.id.val);
        return -EEXIST;
       }
-    }
+      return 0;
+    };
 
-    if (!vals.empty())
-      last_read = vals.rbegin()->first;
-  } while (more);
+  r = image::snapshot::iterate(hctx, pre_check_lambda);
+  if (r < 0) {
+    return r;
+  }
 
   // snapshot inherits parent, if any
   cls_rbd_parent parent;
   r = read_key(hctx, "parent", &parent);
-  if (r < 0 && r != -ENOENT)
+  if (r < 0 && r != -ENOENT) {
     return r;
+  }
   if (r == 0) {
+    // write helper method will convert to normalized format if required
     snap_meta.parent = parent;
   }
 
-  bufferlist snap_metabl, snap_seqbl;
-  ::encode(snap_meta, snap_metabl);
-  ::encode(snap_meta.id, snap_seqbl);
+  if (cls::rbd::get_snap_namespace_type(snap_meta.snapshot_namespace) ==
+        cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH) {
+    // add snap_trash feature bit if not already enabled
+    r = image::set_op_features(hctx, RBD_OPERATION_FEATURE_SNAP_TRASH,
+                               RBD_OPERATION_FEATURE_SNAP_TRASH);
+    if (r < 0) {
+      return r;
+    }
+  }
 
-  string snapshot_key;
+  r = write_key(hctx, "snap_seq", snap_meta.id);
+  if (r < 0) {
+    return r;
+  }
+
+  std::string snapshot_key;
   key_from_snap_id(snap_meta.id, &snapshot_key);
-  map<string, bufferlist> vals;
-  vals["snap_seq"] = snap_seqbl;
-  vals[snapshot_key] = snap_metabl;
-  r = cls_cxx_map_set_vals(hctx, &vals);
+  r = image::snapshot::write(hctx, snapshot_key, std::move(snap_meta));
   if (r < 0) {
-    CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
     return r;
   }
 
   return 0;
 }
 
-
 /**
  * rename snapshot .
  *
@@ -1715,69 +2434,60 @@ int snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   bufferlist snap_namebl, snap_idbl;
   snapid_t src_snap_id;
-  string src_snap_key,dst_snap_name;
+  string dst_snap_name;
   cls_rbd_snap snap_meta;
   int r;
 
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(src_snap_id, iter);
-    ::decode(dst_snap_name, iter);
+    auto iter = in->cbegin();
+    decode(src_snap_id, iter);
+    decode(dst_snap_name, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
-  
-  CLS_LOG(20, "snapshot_rename id=%llu dst_name=%s", (unsigned long long)src_snap_id.val,
-        dst_snap_name.c_str());
 
-  int max_read = RBD_MAX_KEYS_READ;
-  string last_read = RBD_SNAP_KEY_PREFIX;
-  bool more;
-  do {
-    map<string, bufferlist> vals;
-    r = cls_cxx_map_get_vals(hctx, last_read, RBD_SNAP_KEY_PREFIX,
-                            max_read, &vals, &more);
-    if (r < 0)
-      return r;
+  CLS_LOG(20, "snapshot_rename id=%" PRIu64 ", dst_name=%s",
+          src_snap_id.val, dst_snap_name.c_str());
 
-    for (map<string, bufferlist>::iterator it = vals.begin();
-        it != vals.end(); ++it) {
-      bufferlist::iterator iter = it->second.begin();
-      try {
-       ::decode(snap_meta, iter);
-      } catch (const buffer::error &err) {
-       CLS_ERR("error decoding snapshot metadata for snap : %s",
-               dst_snap_name.c_str());
-       return -EIO;
-      }
-      if (dst_snap_name == snap_meta.name) {
-       CLS_LOG(20, "snap_name %s  matches existing snap with snap id = %llu ",
-               dst_snap_name.c_str(), (unsigned long long)snap_meta.id.val);
-        return -EEXIST;
-      }
+  auto duplicate_name_lambda = [&dst_snap_name](const cls_rbd_snap& snap_meta) {
+    if (cls::rbd::get_snap_namespace_type(snap_meta.snapshot_namespace) ==
+          cls::rbd::SNAPSHOT_NAMESPACE_TYPE_USER &&
+        snap_meta.name == dst_snap_name) {
+      CLS_LOG(20, "snap_name %s matches existing snap with snap id %" PRIu64,
+              dst_snap_name.c_str(), snap_meta.id.val);
+      return -EEXIST;
     }
-    if (!vals.empty())
-      last_read = vals.rbegin()->first;
-  } while (more);
+    return 0;
+  };
+  r = image::snapshot::iterate(hctx, duplicate_name_lambda);
+  if (r < 0) {
+    return r;
+  }
 
+  std::string src_snap_key;
   key_from_snap_id(src_snap_id, &src_snap_key);
-  r = read_key(hctx, src_snap_key, &snap_meta); 
+  r = read_key(hctx, src_snap_key, &snap_meta);
   if (r == -ENOENT) {
-    CLS_LOG(20, "cannot find existing snap with snap id = %llu ", (unsigned long long)src_snap_id);
+    CLS_LOG(20, "cannot find existing snap with snap id = %" PRIu64,
+            src_snap_id.val);
     return r;
   }
-  snap_meta.name = dst_snap_name;
-  bufferlist snap_metabl;
-  ::encode(snap_meta, snap_metabl);
 
-  r = cls_cxx_map_set_val(hctx, src_snap_key, &snap_metabl);
+  if (cls::rbd::get_snap_namespace_type(snap_meta.snapshot_namespace) !=
+        cls::rbd::SNAPSHOT_NAMESPACE_TYPE_USER) {
+    // can only rename user snapshots
+    return -EINVAL;
+  }
+
+  snap_meta.name = dst_snap_name;
+  r = image::snapshot::write(hctx, src_snap_key, std::move(snap_meta));
   if (r < 0) {
-    CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
     return r;
   }
 
   return 0;
 }
+
 /**
  * Removes a snapshot from an rbd header.
  *
@@ -1792,8 +2502,8 @@ int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   snapid_t snap_id;
 
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(snap_id, iter);
+    auto iter = in->cbegin();
+    decode(snap_id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -1808,15 +2518,139 @@ int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   string snapshot_key;
   key_from_snap_id(snap_id, &snapshot_key);
   int r = read_key(hctx, snapshot_key, &snap);
-  if (r == -ENOENT)
+  if (r == -ENOENT) {
     return -ENOENT;
+  }
 
-  if (snap.protection_status != RBD_PROTECTION_STATUS_UNPROTECTED)
+  if (snap.protection_status != RBD_PROTECTION_STATUS_UNPROTECTED) {
     return -EBUSY;
+  }
+
+  // snapshot is in-use by clone v2 child
+  if (snap.child_count > 0) {
+    return -EBUSY;
+  }
 
-  r = cls_cxx_map_remove_key(hctx, snapshot_key);
+  r = remove_key(hctx, snapshot_key);
+  if (r < 0) {
+    return r;
+  }
+
+  bool has_child_snaps = false;
+  bool has_trash_snaps = false;
+  auto remove_lambda = [snap_id, &has_child_snaps, &has_trash_snaps](
+      const cls_rbd_snap& snap_meta) {
+    if (snap_meta.id != snap_id) {
+      if (snap_meta.parent.pool_id != -1 || snap_meta.parent_overlap) {
+        has_child_snaps = true;
+      }
+
+      if (cls::rbd::get_snap_namespace_type(snap_meta.snapshot_namespace) ==
+            cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH) {
+        has_trash_snaps = true;
+      }
+    }
+    return 0;
+  };
+
+  r = image::snapshot::iterate(hctx, remove_lambda);
+  if (r < 0) {
+    return r;
+  }
+
+  cls_rbd_parent parent;
+  r = read_key(hctx, "parent", &parent);
+  if (r < 0 && r != -ENOENT) {
+    return r;
+  }
+
+  bool has_parent = (r >= 0 && parent.exists());
+  bool is_head_child = (has_parent && parent.head_overlap);
+  int8_t require_osd_release = cls_get_required_osd_release(hctx);
+  if (has_parent && !is_head_child && !has_child_snaps &&
+      require_osd_release >= CEPH_RELEASE_NAUTILUS) {
+    // remove the unused parent image spec
+    r = remove_key(hctx, "parent");
+    if (r < 0 && r != -ENOENT) {
+      return r;
+    }
+  }
+
+  uint64_t op_features_mask = 0ULL;
+  if (!has_child_snaps && !is_head_child) {
+    // disable clone child op feature if no longer associated
+    op_features_mask |= RBD_OPERATION_FEATURE_CLONE_CHILD;
+  }
+  if (!has_trash_snaps) {
+    // remove the snap_trash op feature if not in-use by any other snapshots
+    op_features_mask |= RBD_OPERATION_FEATURE_SNAP_TRASH;
+  }
+
+  if (op_features_mask != 0ULL) {
+    r = image::set_op_features(hctx, 0, op_features_mask);
+    if (r < 0) {
+      return r;
+    }
+  }
+
+  return 0;
+}
+
+/**
+ * Moves a snapshot to the trash namespace.
+ *
+ * Input:
+ * @param snap_id the id of the snapshot to move to the trash (uint64_t)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int snapshot_trash_add(cls_method_context_t hctx, bufferlist *in,
+                       bufferlist *out)
+{
+  snapid_t snap_id;
+
+  try {
+    auto iter = in->cbegin();
+    decode(snap_id, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "snapshot_trash_add id=%" PRIu64, snap_id.val);
+
+  cls_rbd_snap snap;
+  std::string snapshot_key;
+  key_from_snap_id(snap_id, &snapshot_key);
+  int r = read_key(hctx, snapshot_key, &snap);
+  if (r == -ENOENT) {
+    return r;
+  }
+
+  if (snap.protection_status != RBD_PROTECTION_STATUS_UNPROTECTED) {
+    return -EBUSY;
+  }
+
+  auto snap_type = cls::rbd::get_snap_namespace_type(snap.snapshot_namespace);
+  if (snap_type == cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH) {
+    return -EEXIST;
+  }
+
+  // add snap_trash feature bit if not already enabled
+  r = image::set_op_features(hctx, RBD_OPERATION_FEATURE_SNAP_TRASH,
+                             RBD_OPERATION_FEATURE_SNAP_TRASH);
+  if (r < 0) {
+    return r;
+  }
+
+  snap.snapshot_namespace = cls::rbd::TrashSnapshotNamespace{snap_type,
+                                                             snap.name};
+  uuid_d uuid_gen;
+  uuid_gen.generate_random();
+  snap.name = uuid_gen.to_string();
+
+  r = image::snapshot::write(hctx, snapshot_key, std::move(snap));
   if (r < 0) {
-    CLS_ERR("error writing snapshot metadata: %s", cpp_strerror(r).c_str());
     return r;
   }
 
@@ -1829,7 +2663,7 @@ int snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 int get_all_features(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   uint64_t all_features = RBD_FEATURES_ALL;
-  ::encode(all_features, *out);
+  encode(all_features, *out);
   return 0;
 }
 
@@ -1890,13 +2724,13 @@ int get_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 
   string id;
   try {
-    bufferlist::iterator iter = read_bl.begin();
-    ::decode(id, iter);
+    auto iter = read_bl.cbegin();
+    decode(id, iter);
   } catch (const buffer::error &err) {
     return -EIO;
   }
 
-  ::encode(id, *out);
+  encode(id, *out);
   return 0;
 }
 
@@ -1918,8 +2752,8 @@ int set_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 
   string id;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(id, iter);
+    auto iter = in->cbegin();
+    decode(id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -1939,10 +2773,63 @@ int set_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   CLS_LOG(20, "set_id: id=%s", id.c_str());
 
   bufferlist write_bl;
-  ::encode(id, write_bl);
+  encode(id, write_bl);
   return cls_cxx_write(hctx, 0, write_bl.length(), &write_bl);
 }
 
+/**
+ * Update the access timestamp of an image
+ *
+ * Input:
+ * @param none
+ *
+ * Output:
+ * @returns 0 on success, negative error code on other error
+ */
+int set_access_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+   int r = check_exists(hctx);
+   if(r < 0)
+     return r;
+   
+   utime_t timestamp = ceph_clock_now();
+   r = write_key(hctx, "access_timestamp", timestamp);
+   if(r < 0) {
+     CLS_ERR("error setting access_timestamp");
+     return r;
+   }
+
+   return 0;
+}
+
+/**
+ * Update the modify timestamp of an image
+ *
+ * Input:
+ * @param none
+ *
+ * Output:
+ * @returns 0 on success, negative error code on other error
+ */
+
+int set_modify_timestamp(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+   int r = check_exists(hctx);
+   if(r < 0)
+     return r;
+   
+   utime_t timestamp = ceph_clock_now();
+   r = write_key(hctx, "modify_timestamp", timestamp);
+   if(r < 0) {
+     CLS_ERR("error setting modify_timestamp");
+     return r;
+   }
+
+   return 0;
+}
+
+
+
 /*********************** methods for rbd_directory ***********************/
 
 static const string dir_key_for_id(const string &id)
@@ -1986,8 +2873,8 @@ static int dir_add_image_helper(cls_method_context_t hctx,
     return -EBADF;
   }
   bufferlist id_bl, name_bl;
-  ::encode(id, id_bl);
-  ::encode(name, name_bl);
+  encode(id, id_bl);
+  encode(name, name_bl);
   map<string, bufferlist> omap_vals;
   omap_vals[name_key] = id_bl;
   omap_vals[id_key] = name_bl;
@@ -2058,10 +2945,10 @@ int dir_rename_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   string src, dest, id;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(src, iter);
-    ::decode(dest, iter);
-    ::decode(id, iter);
+    auto iter = in->cbegin();
+    decode(src, iter);
+    decode(dest, iter);
+    decode(id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -2089,8 +2976,8 @@ int dir_get_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   string name;
 
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(name, iter);
+    auto iter = in->cbegin();
+    decode(name, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -2104,7 +2991,7 @@ int dir_get_id(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
       CLS_ERR("error reading id for name '%s': %s", name.c_str(), cpp_strerror(r).c_str());
     return r;
   }
-  ::encode(id, *out);
+  encode(id, *out);
   return 0;
 }
 
@@ -2123,8 +3010,8 @@ int dir_get_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   string id;
 
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(id, iter);
+    auto iter = in->cbegin();
+    decode(id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -2134,10 +3021,13 @@ int dir_get_name(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   string name;
   int r = read_key(hctx, dir_key_for_id(id), &name);
   if (r < 0) {
-    CLS_ERR("error reading name for id '%s': %s", id.c_str(), cpp_strerror(r).c_str());
+    if (r != -ENOENT) {
+      CLS_ERR("error reading name for id '%s': %s", id.c_str(),
+              cpp_strerror(r).c_str());
+    }
     return r;
   }
-  ::encode(name, *out);
+  encode(name, *out);
   return 0;
 }
 
@@ -2160,9 +3050,9 @@ int dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   uint64_t max_return;
 
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(start_after, iter);
-    ::decode(max_return, iter);
+    auto iter = in->cbegin();
+    decode(start_after, iter);
+    decode(max_return, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -2178,16 +3068,18 @@ int dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     int r = cls_cxx_map_get_vals(hctx, last_read, RBD_DIR_NAME_KEY_PREFIX,
                                  max_read, &vals, &more);
     if (r < 0) {
-      CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
+      if (r != -ENOENT) {
+        CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
+      }
       return r;
     }
 
     for (map<string, bufferlist>::iterator it = vals.begin();
         it != vals.end(); ++it) {
       string id;
-      bufferlist::iterator iter = it->second.begin();
+      auto iter = it->second.cbegin();
       try {
-       ::decode(id, iter);
+       decode(id, iter);
       } catch (const buffer::error &err) {
        CLS_ERR("could not decode id of image '%s'", it->first.c_str());
        return -EIO;
@@ -2202,7 +3094,7 @@ int dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     }
   }
 
-  ::encode(images, *out);
+  encode(images, *out);
 
   return 0;
 }
@@ -2230,9 +3122,9 @@ int dir_add_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 
   string name, id;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(name, iter);
-    ::decode(id, iter);
+    auto iter = in->cbegin();
+    decode(name, iter);
+    decode(id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -2255,14 +3147,104 @@ int dir_remove_image(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   string name, id;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(name, iter);
-    ::decode(id, iter);
+    auto iter = in->cbegin();
+    decode(name, iter);
+    decode(id, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  return dir_remove_image_helper(hctx, name, id);
+}
+
+/**
+ * Verify the current state of the directory
+ *
+ * Input:
+ * @param state the DirectoryState of the directory
+ *
+ * Output:
+ * @returns -ENOENT if the state does not match
+ * @returns 0 on success, negative error code on failure
+ */
+int dir_state_assert(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  cls::rbd::DirectoryState directory_state = cls::rbd::DIRECTORY_STATE_READY;
+  try {
+    auto iter = in->cbegin();
+    decode(directory_state, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  cls::rbd::DirectoryState on_disk_directory_state = directory_state;
+  int r = read_key(hctx, "state", &on_disk_directory_state);
+  if (r < 0) {
+    return r;
+  }
+
+  if (directory_state != on_disk_directory_state) {
+    return -ENOENT;
+  }
+  return 0;
+}
+
+/**
+ * Set the current state of the directory
+ *
+ * Input:
+ * @param state the DirectoryState of the directory
+ *
+ * Output:
+ * @returns -ENOENT if the state does not match
+ * @returns 0 on success, negative error code on failure
+ */
+int dir_state_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  cls::rbd::DirectoryState directory_state;
+  try {
+    auto iter = in->cbegin();
+    decode(directory_state, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
 
-  return dir_remove_image_helper(hctx, name, id);
+  int r = check_exists(hctx);
+  if (r < 0 && r != -ENOENT) {
+    return r;
+  }
+
+  switch (directory_state) {
+  case cls::rbd::DIRECTORY_STATE_READY:
+    break;
+  case cls::rbd::DIRECTORY_STATE_ADD_DISABLED:
+    {
+      if (r == -ENOENT) {
+        return r;
+      }
+
+      // verify that the directory is empty
+      std::map<std::string, bufferlist> vals;
+      bool more;
+      r = cls_cxx_map_get_vals(hctx, RBD_DIR_NAME_KEY_PREFIX,
+                               RBD_DIR_NAME_KEY_PREFIX, 1, &vals, &more);
+      if (r < 0) {
+        return r;
+      } else if (!vals.empty()) {
+        return -EBUSY;
+      }
+    }
+    break;
+  default:
+    return -EINVAL;
+  }
+
+  r = write_key(hctx, "state", directory_state);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
 }
 
 int object_map_read(cls_method_context_t hctx, BitVector<2> &object_map)
@@ -2283,8 +3265,8 @@ int object_map_read(cls_method_context_t hctx, BitVector<2> &object_map)
   }
 
   try {
-    bufferlist::iterator iter = bl.begin();
-    ::decode(object_map, iter);
+    auto iter = bl.cbegin();
+    decode(object_map, iter);
   } catch (const buffer::error &err) {
     CLS_ERR("failed to decode object map: %s", err.what());
     return -EINVAL;
@@ -2311,7 +3293,7 @@ int object_map_load(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   }
 
   object_map.set_crc_enabled(false);
-  ::encode(object_map, *out);
+  encode(object_map, *out);
   return 0;
 }
 
@@ -2328,8 +3310,8 @@ int object_map_save(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   BitVector<2> object_map;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(object_map, iter);
+    auto iter = in->cbegin();
+    decode(object_map, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -2337,7 +3319,7 @@ int object_map_save(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   object_map.set_crc_enabled(true);
 
   bufferlist bl;
-  ::encode(object_map, bl);
+  encode(object_map, bl);
   CLS_LOG(20, "object_map_save: object size=%" PRIu64 ", byte size=%u",
          object_map.size(), bl.length());
   return cls_cxx_write_full(hctx, &bl);
@@ -2358,9 +3340,9 @@ int object_map_resize(cls_method_context_t hctx, bufferlist *in, bufferlist *out
   uint64_t object_count;
   uint8_t default_state;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(object_count, iter);
-    ::decode(default_state, iter);
+    auto iter = in->cbegin();
+    decode(object_count, iter);
+    decode(default_state, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -2379,8 +3361,11 @@ int object_map_resize(cls_method_context_t hctx, bufferlist *in, bufferlist *out
 
   size_t orig_object_map_size = object_map.size();
   if (object_count < orig_object_map_size) {
-    for (uint64_t i = object_count + 1; i < orig_object_map_size; ++i) {
-      if (object_map[i] != default_state) {
+    auto it = object_map.begin() + object_count;
+    auto end_it = object_map.end() ;
+    uint64_t i = object_count;
+    for (; it != end_it; ++it, ++i) {
+      if (*it != default_state) {
        CLS_ERR("object map indicates object still exists: %" PRIu64, i);
        return -ESTALE;
       }
@@ -2388,13 +3373,15 @@ int object_map_resize(cls_method_context_t hctx, bufferlist *in, bufferlist *out
     object_map.resize(object_count);
   } else if (object_count > orig_object_map_size) {
     object_map.resize(object_count);
-    for (uint64_t i = orig_object_map_size; i < object_count; ++i) {
-      object_map[i] = default_state;
+    auto it = object_map.begin() + orig_object_map_size;
+    auto end_it = object_map.end();
+    for (; it != end_it; ++it) {
+      *it = default_state;
     }
   }
 
   bufferlist map;
-  ::encode(object_map, map);
+  encode(object_map, map);
   CLS_LOG(20, "object_map_resize: object size=%" PRIu64 ", byte size=%u",
          object_count, map.length());
   return cls_cxx_write_full(hctx, &map);
@@ -2419,11 +3406,11 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out
   uint8_t new_object_state;
   boost::optional<uint8_t> current_object_state;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(start_object_no, iter);
-    ::decode(end_object_no, iter);
-    ::decode(new_object_state, iter);
-    ::decode(current_object_state, iter);
+    auto iter = in->cbegin();
+    decode(start_object_no, iter);
+    decode(end_object_no, iter);
+    decode(new_object_state, iter);
+    decode(current_object_state, iter);
   } catch (const buffer::error &err) {
     CLS_ERR("failed to decode message");
     return -EINVAL;
@@ -2445,77 +3432,98 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out
   }
 
   try {
-    bufferlist::iterator it = header_bl.begin();
+    auto it = header_bl.cbegin();
     object_map.decode_header(it);
   } catch (const buffer::error &err) {
     CLS_ERR("failed to decode object map header: %s", err.what());
     return -EINVAL;
   }
 
+  uint64_t object_byte_offset;
+  uint64_t byte_length;
+  object_map.get_header_crc_extents(&object_byte_offset, &byte_length);
+
   bufferlist footer_bl;
-  r = cls_cxx_read2(hctx, object_map.get_footer_offset(),
-                   size - object_map.get_footer_offset(), &footer_bl,
+  r = cls_cxx_read2(hctx, object_byte_offset, byte_length, &footer_bl,
                     CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
   if (r < 0) {
-    CLS_ERR("object map footer read failed");
+    CLS_ERR("object map footer read header CRC failed");
     return r;
   }
+
   try {
-    bufferlist::iterator it = footer_bl.begin();
-    object_map.decode_footer(it);
+    auto it = footer_bl.cbegin();
+    object_map.decode_header_crc(it);
   } catch (const buffer::error &err) {
-    CLS_ERR("failed to decode object map footer: %s", err.what());
+    CLS_ERR("failed to decode object map header CRC: %s", err.what());
   }
 
   if (start_object_no >= end_object_no || end_object_no > object_map.size()) {
     return -ERANGE;
   }
 
-  uint64_t byte_offset;
-  uint64_t byte_length;
-  object_map.get_data_extents(start_object_no,
-                             end_object_no - start_object_no,
-                             &byte_offset, &byte_length);
+  uint64_t object_count = end_object_no - start_object_no;
+  object_map.get_data_crcs_extents(start_object_no, object_count,
+                                   &object_byte_offset, &byte_length);
+  const auto footer_object_offset = object_byte_offset;
+
+  footer_bl.clear();
+  r = cls_cxx_read2(hctx, object_byte_offset, byte_length, &footer_bl,
+                    CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+  if (r < 0) {
+    CLS_ERR("object map footer read data CRCs failed");
+    return r;
+  }
+
+  try {
+    auto it = footer_bl.cbegin();
+    object_map.decode_data_crcs(it, start_object_no);
+  } catch (const buffer::error &err) {
+    CLS_ERR("failed to decode object map data CRCs: %s", err.what());
+  }
+
+  uint64_t data_byte_offset;
+  object_map.get_data_extents(start_object_no, object_count,
+                              &data_byte_offset, &object_byte_offset,
+                              &byte_length);
 
   bufferlist data_bl;
-  r = cls_cxx_read2(hctx, object_map.get_header_length() + byte_offset,
-                   byte_length, &data_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
+  r = cls_cxx_read2(hctx, object_byte_offset, byte_length, &data_bl,
+                    CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
   if (r < 0) {
     CLS_ERR("object map data read failed");
     return r;
   }
 
   try {
-    bufferlist::iterator it = data_bl.begin();
-    object_map.decode_data(it, byte_offset);
+    auto it = data_bl.cbegin();
+    object_map.decode_data(it, data_byte_offset);
   } catch (const buffer::error &err) {
     CLS_ERR("failed to decode data chunk [%" PRIu64 "]: %s",
-           byte_offset, err.what());
+           data_byte_offset, err.what());
     return -EINVAL;
   }
 
   bool updated = false;
-  for (uint64_t object_no = start_object_no; object_no < end_object_no;
-       ++object_no) {
-    uint8_t state = object_map[object_no];
+  auto it = object_map.begin() + start_object_no;
+  auto end_it = object_map.begin() + end_object_no;
+  for (; it != end_it; ++it) {
+    uint8_t state = *it;
     if ((!current_object_state || state == *current_object_state ||
         (*current_object_state == OBJECT_EXISTS &&
          state == OBJECT_EXISTS_CLEAN)) && state != new_object_state) {
-      object_map[object_no] = new_object_state;
+      *it = new_object_state;
       updated = true;
     }
   }
 
   if (updated) {
     CLS_LOG(20, "object_map_update: %" PRIu64 "~%" PRIu64 " -> %" PRIu64,
-           byte_offset, byte_length,
-           object_map.get_header_length() + byte_offset);
+           data_byte_offset, byte_length, object_byte_offset);
 
     bufferlist data_bl;
-    object_map.encode_data(data_bl, byte_offset, byte_length);
-    r = cls_cxx_write2(hctx, object_map.get_header_length() + byte_offset,
-                      data_bl.length(), &data_bl,
+    object_map.encode_data(data_bl, data_byte_offset, byte_length);
+    r = cls_cxx_write2(hctx, object_byte_offset, data_bl.length(), &data_bl,
                        CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
     if (r < 0) {
       CLS_ERR("failed to write object map header: %s", cpp_strerror(r).c_str());
@@ -2523,8 +3531,8 @@ int object_map_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out
     }
 
     footer_bl.clear();
-    object_map.encode_footer(footer_bl);
-    r = cls_cxx_write2(hctx, object_map.get_footer_offset(), footer_bl.length(),
+    object_map.encode_data_crcs(footer_bl, start_object_no, object_count);
+    r = cls_cxx_write2(hctx, footer_object_offset, footer_bl.length(),
                       &footer_bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
     if (r < 0) {
       CLS_ERR("failed to write object map footer: %s", cpp_strerror(r).c_str());
@@ -2557,16 +3565,18 @@ int object_map_snap_add(cls_method_context_t hctx, bufferlist *in,
   }
 
   bool updated = false;
-  for (uint64_t i = 0; i < object_map.size(); ++i) {
-    if (object_map[i] == OBJECT_EXISTS) {
-      object_map[i] = OBJECT_EXISTS_CLEAN;
+  auto it = object_map.begin();
+  auto end_it = object_map.end();
+  for (; it != end_it; ++it) {
+    if (*it == OBJECT_EXISTS) {
+      *it = OBJECT_EXISTS_CLEAN;
       updated = true;
     }
   }
 
   if (updated) {
     bufferlist bl;
-    ::encode(object_map, bl);
+    encode(object_map, bl);
     r = cls_cxx_write_full(hctx, &bl);
   }
   return r;
@@ -2587,8 +3597,8 @@ int object_map_snap_remove(cls_method_context_t hctx, bufferlist *in,
 {
   BitVector<2> src_object_map;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(src_object_map, iter);
+    auto iter = in->cbegin();
+    decode(src_object_map, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -2600,17 +3610,24 @@ int object_map_snap_remove(cls_method_context_t hctx, bufferlist *in,
   }
 
   bool updated = false;
-  for (uint64_t i = 0; i < dst_object_map.size(); ++i) {
-    if (dst_object_map[i] == OBJECT_EXISTS_CLEAN &&
-        (i >= src_object_map.size() || src_object_map[i] == OBJECT_EXISTS)) {
-      dst_object_map[i] = OBJECT_EXISTS;
+  auto src_it = src_object_map.begin();
+  auto dst_it = dst_object_map.begin();
+  auto dst_it_end = dst_object_map.end();
+  uint64_t i = 0;
+  for (; dst_it != dst_it_end; ++dst_it) {
+    if (*dst_it == OBJECT_EXISTS_CLEAN &&
+        (i >= src_object_map.size() || *src_it == OBJECT_EXISTS)) {
+      *dst_it = OBJECT_EXISTS;
       updated = true;
     }
+    if (i < src_object_map.size())
+      ++src_it;
+    ++i;
   }
 
   if (updated) {
     bufferlist bl;
-    ::encode(dst_object_map, bl);
+    encode(dst_object_map, bl);
     r = cls_cxx_write_full(hctx, &bl);
   }
   return r;
@@ -2630,7 +3647,7 @@ static const string metadata_name_from_key(const string &key)
  * Input:
  * @param start_after which name to begin listing after
  *        (use the empty string to start at the beginning)
- * @param max_return the maximum number of names to list(if 0 means no limit)
+ * @param max_return the maximum number of names to list
 
  * Output:
  * @param value
@@ -2641,102 +3658,459 @@ int metadata_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   string start_after;
   uint64_t max_return;
 
-  try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(start_after, iter);
-    ::decode(max_return, iter);
-  } catch (const buffer::error &err) {
+  try {
+    auto iter = in->cbegin();
+    decode(start_after, iter);
+    decode(max_return, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  // TODO remove implicit support for zero during the N-release
+  if (max_return == 0) {
+    max_return = RBD_MAX_KEYS_READ;
+  }
+
+  map<string, bufferlist> data;
+  string last_read = metadata_key_for_name(start_after);
+  bool more = true;
+
+  while (more && data.size() < max_return) {
+    map<string, bufferlist> raw_data;
+    int max_read = std::min<uint64_t>(RBD_MAX_KEYS_READ, max_return - data.size());
+    int r = cls_cxx_map_get_vals(hctx, last_read, RBD_METADATA_KEY_PREFIX,
+                                 max_read, &raw_data, &more);
+    if (r < 0) {
+      if (r != -ENOENT) {
+        CLS_ERR("failed to read the vals off of disk: %s",
+                cpp_strerror(r).c_str());
+      }
+      return r;
+    }
+
+    for (auto& kv : raw_data) {
+      data[metadata_name_from_key(kv.first)].swap(kv.second);
+    }
+
+    if (!raw_data.empty()) {
+      last_read = raw_data.rbegin()->first;
+    }
+  }
+
+  encode(data, *out);
+  return 0;
+}
+
+/**
+ * Input:
+ * @param data <map(key, value)>
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int metadata_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  map<string, bufferlist> data, raw_data;
+
+  auto iter = in->cbegin();
+  try {
+    decode(data, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  for (map<string, bufferlist>::iterator it = data.begin();
+       it != data.end(); ++it) {
+    CLS_LOG(20, "metadata_set key=%s value=%.*s", it->first.c_str(),
+           it->second.length(), it->second.c_str());
+    raw_data[metadata_key_for_name(it->first)].swap(it->second);
+  }
+  int r = cls_cxx_map_set_vals(hctx, &raw_data);
+  if (r < 0) {
+    CLS_ERR("error writing metadata: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  return 0;
+}
+
+/**
+ * Input:
+ * @param key
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int metadata_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  string key;
+
+  auto iter = in->cbegin();
+  try {
+    decode(key, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "metadata_remove key=%s", key.c_str());
+
+  int r = cls_cxx_map_remove_key(hctx, metadata_key_for_name(key));
+  if (r < 0) {
+    CLS_ERR("error removing metadata: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  return 0;
+}
+
+/**
+ * Input:
+ * @param key
+ *
+ * Output:
+ * @param metadata value associated with the key
+ * @returns 0 on success, negative error code on failure
+ */
+int metadata_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  string key;
+  bufferlist value;
+
+  auto iter = in->cbegin();
+  try {
+    decode(key, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "metadata_get key=%s", key.c_str());
+
+  int r = cls_cxx_map_get_val(hctx, metadata_key_for_name(key), &value);
+  if (r < 0) {
+    if (r != -ENOENT)
+      CLS_ERR("error getting metadata: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  encode(value, *out);
+  return 0;
+}
+
+int snapshot_get_limit(cls_method_context_t hctx, bufferlist *in,
+                      bufferlist *out)
+{
+  uint64_t snap_limit;
+  int r = read_key(hctx, "snap_limit", &snap_limit);
+  if (r == -ENOENT) {
+    snap_limit = UINT64_MAX;
+  } else if (r < 0) {
+    CLS_ERR("error retrieving snapshot limit: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  CLS_LOG(20, "read snapshot limit %" PRIu64, snap_limit);
+  encode(snap_limit, *out);
+
+  return 0;
+}
+
+int snapshot_set_limit(cls_method_context_t hctx, bufferlist *in,
+                      bufferlist *out)
+{
+  int rc;
+  uint64_t new_limit;
+  bufferlist bl;
+  size_t snap_count = 0;
+
+  try {
+    auto iter = in->cbegin();
+    decode(new_limit, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  if (new_limit == UINT64_MAX) {
+    CLS_LOG(20, "remove snapshot limit\n");
+    rc = cls_cxx_map_remove_key(hctx, "snap_limit");
+    return rc;
+  }
+
+  //try to read header as v1 format
+  rc = snap_read_header(hctx, bl);
+
+  // error when reading header
+  if (rc < 0 && rc != -EINVAL) {
+    return rc;
+  } else if (rc >= 0) {
+    // success, the image is v1 format
+    struct rbd_obj_header_ondisk *header;
+    header = (struct rbd_obj_header_ondisk *)bl.c_str();
+    snap_count = header->snap_count;
+  } else {
+    // else, the image is v2 format
+    int max_read = RBD_MAX_KEYS_READ;
+    string last_read = RBD_SNAP_KEY_PREFIX;
+    bool more;
+
+    do {
+      set<string> keys;
+      rc = cls_cxx_map_get_keys(hctx, last_read, max_read, &keys, &more);
+      if (rc < 0) {
+        CLS_ERR("error retrieving snapshots: %s", cpp_strerror(rc).c_str());
+        return rc;
+      }
+      for (auto& key : keys) {
+        if (key.find(RBD_SNAP_KEY_PREFIX) != 0)
+          break;
+        snap_count++;
+      }
+      if (!keys.empty())
+        last_read = *(keys.rbegin());
+    } while (more);
+  }
+
+  if (new_limit < snap_count) {
+    rc = -ERANGE;
+    CLS_LOG(10, "snapshot limit is less than the number of snapshots.\n");
+  } else {
+    CLS_LOG(20, "set snapshot limit to %" PRIu64 "\n", new_limit);
+    bl.clear();
+    encode(new_limit, bl);
+    rc = cls_cxx_map_set_val(hctx, "snap_limit", &bl);
+  }
+
+  return rc;
+}
+
+
+/**
+ * Input:
+ * @param snap id (uint64_t) parent snapshot id
+ * @param child spec (cls::rbd::ChildImageSpec) child image
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int child_attach(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  uint64_t snap_id;
+  cls::rbd::ChildImageSpec child_image;
+  try {
+    auto it = in->cbegin();
+    decode(snap_id, it);
+    decode(child_image, it);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "child_attach snap_id=%" PRIu64 ", child_pool_id=%" PRIi64 ", "
+              "child_image_id=%s", snap_id, child_image.pool_id,
+               child_image.image_id.c_str());
+
+  cls_rbd_snap snap;
+  std::string snapshot_key;
+  key_from_snap_id(snap_id, &snapshot_key);
+  int r = read_key(hctx, snapshot_key, &snap);
+  if (r < 0) {
+    return r;
+  }
+
+  if (cls::rbd::get_snap_namespace_type(snap.snapshot_namespace) ==
+        cls::rbd::SNAPSHOT_NAMESPACE_TYPE_TRASH) {
+    // cannot attach to a deleted snapshot
+    return -ENOENT;
+  }
+
+  auto children_key = image::snap_children_key_from_snap_id(snap_id);
+  cls::rbd::ChildImageSpecs child_images;
+  r = read_key(hctx, children_key, &child_images);
+  if (r < 0 && r != -ENOENT) {
+    CLS_ERR("error reading snapshot children: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  auto it = child_images.insert(child_image);
+  if (!it.second) {
+    // child already attached to the snapshot
+    return -EEXIST;
+  }
+
+  r = write_key(hctx, children_key, child_images);
+  if (r < 0) {
+    CLS_ERR("error writing snapshot children: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  ++snap.child_count;
+  r = image::snapshot::write(hctx, snapshot_key, std::move(snap));
+  if (r < 0) {
+    return r;
+  }
+
+  r = image::set_op_features(hctx, RBD_OPERATION_FEATURE_CLONE_PARENT,
+                             RBD_OPERATION_FEATURE_CLONE_PARENT);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+/**
+ * Input:
+ * @param snap id (uint64_t) parent snapshot id
+ * @param child spec (cls::rbd::ChildImageSpec) child image
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int child_detach(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  uint64_t snap_id;
+  cls::rbd::ChildImageSpec child_image;
+  try {
+    auto it = in->cbegin();
+    decode(snap_id, it);
+    decode(child_image, it);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "child_detach snap_id=%" PRIu64 ", child_pool_id=%" PRIi64 ", "
+              "child_image_id=%s", snap_id, child_image.pool_id,
+               child_image.image_id.c_str());
+
+  cls_rbd_snap snap;
+  std::string snapshot_key;
+  key_from_snap_id(snap_id, &snapshot_key);
+  int r = read_key(hctx, snapshot_key, &snap);
+  if (r < 0) {
+    return r;
+  }
+
+  auto children_key = image::snap_children_key_from_snap_id(snap_id);
+  cls::rbd::ChildImageSpecs child_images;
+  r = read_key(hctx, children_key, &child_images);
+  if (r < 0 && r != -ENOENT) {
+    CLS_ERR("error reading snapshot children: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  if (snap.child_count != child_images.size()) {
+    // children and reference count don't match
+    CLS_ERR("children reference count mismatch: %" PRIu64, snap_id);
     return -EINVAL;
   }
 
-  map<string, bufferlist> data;
-  string last_read = metadata_key_for_name(start_after);
-  int max_read = max_return ? MIN(RBD_MAX_KEYS_READ, max_return) : RBD_MAX_KEYS_READ;
-  bool more;
+  if (child_images.erase(child_image) == 0) {
+    // child not attached to the snapshot
+    return -ENOENT;
+  }
 
-  do {
-    map<string, bufferlist> raw_data;
-    int r = cls_cxx_map_get_vals(hctx, last_read, RBD_METADATA_KEY_PREFIX,
-                                 max_read, &raw_data, &more);
+  if (child_images.empty()) {
+    r = remove_key(hctx, children_key);
+  } else {
+    r = write_key(hctx, children_key, child_images);
     if (r < 0) {
-      CLS_ERR("failed to read the vals off of disk: %s", cpp_strerror(r).c_str());
+      CLS_ERR("error writing snapshot children: %s", cpp_strerror(r).c_str());
       return r;
     }
-    if (raw_data.empty())
-      break;
+  }
 
-    map<string, bufferlist>::iterator it = raw_data.begin();
-    for (; it != raw_data.end(); ++it)
-      data[metadata_name_from_key(it->first)].swap(it->second);
+  --snap.child_count;
+  r = image::snapshot::write(hctx, snapshot_key, std::move(snap));
+  if (r < 0) {
+    return r;
+  }
 
-    if (!more)
-      break;
+  if (snap.child_count == 0) {
+    auto clone_in_use_lambda = [snap_id](const cls_rbd_snap& snap_meta) {
+      if (snap_meta.id != snap_id && snap_meta.child_count > 0) {
+        return -EEXIST;
+      }
+      return 0;
+    };
 
-    last_read = raw_data.rbegin()->first;
-    if (max_return)
-      max_read = MIN(RBD_MAX_KEYS_READ, max_return - data.size());
-  } while (more);
+    r = image::snapshot::iterate(hctx, clone_in_use_lambda);
+    if (r < 0 && r != -EEXIST) {
+      return r;
+    }
+
+    if (r != -EEXIST) {
+      // remove the clone_v2 op feature if not in-use by any other snapshots
+      r = image::set_op_features(hctx, 0, RBD_OPERATION_FEATURE_CLONE_PARENT);
+      if (r < 0) {
+        return r;
+      }
+    }
+  }
 
-  ::encode(data, *out);
   return 0;
 }
 
 /**
  * Input:
- * @param data <map(key, value)>
+ * @param snap id (uint64_t) parent snapshot id
  *
  * Output:
+ * @param (cls::rbd::ChildImageSpecs) child images
  * @returns 0 on success, negative error code on failure
  */
-int metadata_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+int children_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
-  map<string, bufferlist> data, raw_data;
-
-  bufferlist::iterator iter = in->begin();
+  uint64_t snap_id;
   try {
-    ::decode(data, iter);
+    auto it = in->cbegin();
+    decode(snap_id, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
 
-  for (map<string, bufferlist>::iterator it = data.begin();
-       it != data.end(); ++it) {
-    CLS_LOG(20, "metdata_set key=%s value=%.*s", it->first.c_str(),
-           it->second.length(), it->second.c_str());
-    raw_data[metadata_key_for_name(it->first)].swap(it->second);
-  }
-  int r = cls_cxx_map_set_vals(hctx, &raw_data);
+  CLS_LOG(20, "child_detach snap_id=%" PRIu64, snap_id);
+
+  cls_rbd_snap snap;
+  std::string snapshot_key;
+  key_from_snap_id(snap_id, &snapshot_key);
+  int r = read_key(hctx, snapshot_key, &snap);
   if (r < 0) {
-    CLS_ERR("error writing metadata: %s", cpp_strerror(r).c_str());
     return r;
   }
 
+  auto children_key = image::snap_children_key_from_snap_id(snap_id);
+  cls::rbd::ChildImageSpecs child_images;
+  r = read_key(hctx, children_key, &child_images);
+  if (r == -ENOENT) {
+    return r;
+  } else if (r < 0) {
+    CLS_ERR("error reading snapshot children: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  encode(child_images, *out);
   return 0;
 }
 
 /**
+ * Set image migration.
+ *
  * Input:
- * @param key
+ * @param migration_spec (cls::rbd::MigrationSpec) image migration spec
  *
  * Output:
+ *
  * @returns 0 on success, negative error code on failure
  */
-int metadata_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
-  string key;
-
-  bufferlist::iterator iter = in->begin();
+int migration_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+  cls::rbd::MigrationSpec migration_spec;
   try {
-    ::decode(key, iter);
+    auto it = in->cbegin();
+    decode(migration_spec, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
 
-  CLS_LOG(20, "metdata_set key=%s", key.c_str());
-
-  int r = cls_cxx_map_remove_key(hctx, metadata_key_for_name(key));
+  int r = image::set_migration(hctx, migration_spec, true);
   if (r < 0) {
-    CLS_ERR("error remove metadata: %s", cpp_strerror(r).c_str());
     return r;
   }
 
@@ -2744,82 +4118,126 @@ int metadata_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 }
 
 /**
+ * Set image migration state.
+ *
  * Input:
- * @param key
+ * @param state (cls::rbd::MigrationState) migration state
+ * @param description (std::string) migration state description
  *
  * Output:
- * @param metadata value associated with the key
+ *
  * @returns 0 on success, negative error code on failure
  */
-int metadata_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
-{
-  string key;
-  bufferlist value;
-
-  bufferlist::iterator iter = in->begin();
+int migration_set_state(cls_method_context_t hctx, bufferlist *in,
+                        bufferlist *out) {
+  cls::rbd::MigrationState state;
+  std::string description;
   try {
-    ::decode(key, iter);
+    auto it = in->cbegin();
+    decode(state, it);
+    decode(description, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
 
-  CLS_LOG(20, "metdata_get key=%s", key.c_str());
+  cls::rbd::MigrationSpec migration_spec;
+  int r = image::read_migration(hctx, &migration_spec);
+  if (r < 0) {
+    return r;
+  }
+
+  migration_spec.state = state;
+  migration_spec.state_description = description;
 
-  int r = cls_cxx_map_get_val(hctx, metadata_key_for_name(key), &value);
+  r = image::set_migration(hctx, migration_spec, false);
   if (r < 0) {
-    CLS_ERR("error get metadata: %s", cpp_strerror(r).c_str());
     return r;
   }
 
-  ::encode(value, *out);
   return 0;
 }
 
-int snapshot_get_limit(cls_method_context_t hctx, bufferlist *in,
-                      bufferlist *out)
-{
-  uint64_t snap_limit;
-  int r = read_key(hctx, "snap_limit", &snap_limit);
-  if (r == -ENOENT) {
-    snap_limit = UINT64_MAX;
-  } else if (r < 0) {
-    CLS_ERR("error retrieving snapshot limit: %s", cpp_strerror(r).c_str());
+/**
+ * Get image migration spec.
+ *
+ * Input:
+ *
+ * Output:
+ * @param migration_spec (cls::rbd::MigrationSpec) image migration spec
+ *
+ * @returns 0 on success, negative error code on failure
+ */
+int migration_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
+  cls::rbd::MigrationSpec migration_spec;
+  int r = image::read_migration(hctx, &migration_spec);
+  if (r < 0) {
     return r;
   }
 
-  CLS_LOG(20, "read snapshot limit %" PRIu64, snap_limit);
-  ::encode(snap_limit, *out);
+  encode(migration_spec, *out);
 
   return 0;
 }
 
-int snapshot_set_limit(cls_method_context_t hctx, bufferlist *in,
-                      bufferlist *out)
-{
-  int rc;
-  uint64_t new_limit;
-  bufferlist bl;
+/**
+ * Remove image migration spec.
+ *
+ * Input:
+ *
+ * Output:
+ *
+ * @returns 0 on success, negative error code on failure
+ */
+int migration_remove(cls_method_context_t hctx, bufferlist *in,
+                     bufferlist *out) {
+  int r = image::remove_migration(hctx);
+  if (r < 0) {
+    return r;
+  }
 
+  return 0;
+}
+
+/**
+ * Ensure writer snapc state
+ *
+ * Input:
+ * @param snap id (uint64_t) snap context sequence id
+ * @param state (cls::rbd::AssertSnapcSeqState) snap context state
+ *
+ * Output:
+ * @returns -ERANGE if assertion fails
+ * @returns 0 on success, negative error code on failure
+ */
+int assert_snapc_seq(cls_method_context_t hctx, bufferlist *in,
+                     bufferlist *out)
+{
+  uint64_t snapc_seq;
+  cls::rbd::AssertSnapcSeqState state;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(new_limit, iter);
+    auto it = in->cbegin();
+    decode(snapc_seq, it);
+    decode(state, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
 
-  if (new_limit == UINT64_MAX) {
-    CLS_LOG(20, "remove snapshot limit\n");
-    rc = cls_cxx_map_remove_key(hctx, "snap_limit");
-  } else {
-    CLS_LOG(20, "set snapshot limit to %" PRIu64 "\n", new_limit);
-    ::encode(new_limit, bl);
-    rc = cls_cxx_map_set_val(hctx, "snap_limit", &bl);
+  uint64_t snapset_seq;
+  int r = cls_get_snapset_seq(hctx, &snapset_seq);
+  if (r < 0 && r != -ENOENT) {
+    return r;
   }
 
-  return rc;
+  switch (state) {
+  case cls::rbd::ASSERT_SNAPC_SEQ_GT_SNAPSET_SEQ:
+    return (r == -ENOENT || snapc_seq > snapset_seq) ? 0 : -ERANGE;
+  case cls::rbd::ASSERT_SNAPC_SEQ_LE_SNAPSET_SEQ:
+    return (r == -ENOENT || snapc_seq > snapset_seq) ? -ERANGE : 0;
+  default:
+    return -EOPNOTSUPP;
+  }
 }
 
-
 /****************************** Old format *******************************/
 
 int old_snapshots_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
@@ -2839,14 +4257,14 @@ int old_snapshots_list(cls_method_context_t hctx, bufferlist *in, bufferlist *ou
          buf + sizeof(*header) + header->snap_count * sizeof(struct rbd_obj_snap_ondisk),
          header->snap_names_len);
 
-  ::encode(header->snap_seq, *out);
-  ::encode(header->snap_count, *out);
+  encode(header->snap_seq, *out);
+  encode(header->snap_count, *out);
 
   for (unsigned i = 0; i < header->snap_count; i++) {
     string s = name;
-    ::encode(header->snaps[i].id, *out);
-    ::encode(header->snaps[i].image_size, *out);
-    ::encode(s, *out);
+    encode(header->snaps[i].id, *out);
+    encode(header->snaps[i].image_size, *out);
+    encode(s, *out);
 
     name += strlen(name) + 1;
     if (name > end)
@@ -2875,13 +4293,13 @@ int old_snapshot_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   const char *snap_name;
   const char *snap_names = ((char *)header) + names_ofs;
   const char *end = snap_names + header->snap_names_len;
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   string s;
   uint64_t snap_id;
 
   try {
-    ::decode(s, iter);
-    ::decode(snap_id, iter);
+    decode(s, iter);
+    decode(snap_id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -2962,14 +4380,14 @@ int old_snapshot_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *o
   const char *snap_names = ((char *)header) + names_ofs;
   const char *orig_names = snap_names;
   const char *end = snap_names + header->snap_names_len;
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   string s;
   unsigned i;
   bool found = false;
   struct rbd_obj_snap_ondisk snap;
 
   try {
-    ::decode(s, iter);
+    decode(s, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -3057,13 +4475,13 @@ int old_snapshot_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *o
   const char *snap_names = ((char *)header) + names_ofs;
   const char *orig_names = snap_names;
   const char *end = snap_names + header->snap_names_len;
-  bufferlist::iterator iter = in->begin();
+  auto iter = in->cbegin();
   unsigned i;
   bool found = false;
 
   try {
-    ::decode(src_snap_id, iter);
-    ::decode(dst, iter);
+    decode(src_snap_id, iter);
+    decode(dst, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -3132,6 +4550,7 @@ static const std::string IMAGE_KEY_PREFIX("image_");
 static const std::string GLOBAL_KEY_PREFIX("global_");
 static const std::string STATUS_GLOBAL_KEY_PREFIX("status_global_");
 static const std::string INSTANCE_KEY_PREFIX("instance_");
+static const std::string MIRROR_IMAGE_MAP_KEY_PREFIX("image_map_");
 
 std::string peer_key(const std::string &uuid) {
   return PEER_KEY_PREFIX + uuid;
@@ -3153,6 +4572,10 @@ std::string instance_key(const string &instance_id) {
   return INSTANCE_KEY_PREFIX + instance_id;
 }
 
+std::string mirror_image_map_key(const string& global_image_id) {
+  return MIRROR_IMAGE_MAP_KEY_PREFIX + global_image_id;
+}
+
 int uuid_get(cls_method_context_t hctx, std::string *mirror_uuid) {
   bufferlist mirror_uuid_bl;
   int r = cls_cxx_map_get_val(hctx, mirror::UUID, &mirror_uuid_bl);
@@ -3167,6 +4590,22 @@ int uuid_get(cls_method_context_t hctx, std::string *mirror_uuid) {
   return 0;
 }
 
+int list_watchers(cls_method_context_t hctx,
+                  std::set<entity_inst_t> *entities) {
+  obj_list_watch_response_t watchers;
+  int r = cls_cxx_list_watchers(hctx, &watchers);
+  if (r < 0 && r != -ENOENT) {
+    CLS_ERR("error listing watchers: '%s'", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  entities->clear();
+  for (auto &w : watchers.entries) {
+    entities->emplace(w.name, w.addr);
+  }
+  return 0;
+}
+
 int read_peers(cls_method_context_t hctx,
                std::vector<cls::rbd::MirrorPeer> *peers) {
   std::string last_read = PEER_KEY_PREFIX;
@@ -3177,15 +4616,17 @@ int read_peers(cls_method_context_t hctx,
     int r = cls_cxx_map_get_vals(hctx, last_read, PEER_KEY_PREFIX.c_str(),
                                  max_read, &vals, &more);
     if (r < 0) {
-      CLS_ERR("error reading peers: %s", cpp_strerror(r).c_str());
+      if (r != -ENOENT) {
+        CLS_ERR("error reading peers: %s", cpp_strerror(r).c_str());
+      }
       return r;
     }
 
     for (auto &it : vals) {
       try {
-        bufferlist::iterator bl_it = it.second.begin();
+        auto bl_it = it.second.cbegin();
         cls::rbd::MirrorPeer peer;
-       ::decode(peer, bl_it);
+       decode(peer, bl_it);
         peers->push_back(peer);
       } catch (const buffer::error &err) {
        CLS_ERR("could not decode peer '%s'", it.first.c_str());
@@ -3211,8 +4652,8 @@ int read_peer(cls_method_context_t hctx, const std::string &id,
   }
 
   try {
-    bufferlist::iterator bl_it = bl.begin();
-    ::decode(*peer, bl_it);
+    auto bl_it = bl.cbegin();
+    decode(*peer, bl_it);
   } catch (const buffer::error &err) {
     CLS_ERR("could not decode peer '%s'", id.c_str());
     return -EIO;
@@ -3223,7 +4664,7 @@ int read_peer(cls_method_context_t hctx, const std::string &id,
 int write_peer(cls_method_context_t hctx, const std::string &id,
                const cls::rbd::MirrorPeer &peer) {
   bufferlist bl;
-  ::encode(peer, bl);
+  encode(peer, bl);
 
   int r = cls_cxx_map_set_val(hctx, peer_key(id), &bl);
   if (r < 0) {
@@ -3247,8 +4688,8 @@ int image_get(cls_method_context_t hctx, const string &image_id,
   }
 
   try {
-    bufferlist::iterator it = bl.begin();
-    ::decode(*mirror_image, it);
+    auto it = bl.cbegin();
+    decode(*mirror_image, it);
   } catch (const buffer::error &err) {
     CLS_ERR("could not decode mirrored image '%s'", image_id.c_str());
     return -EIO;
@@ -3260,7 +4701,7 @@ int image_get(cls_method_context_t hctx, const string &image_id,
 int image_set(cls_method_context_t hctx, const string &image_id,
              const cls::rbd::MirrorImage &mirror_image) {
   bufferlist bl;
-  ::encode(mirror_image, bl);
+  encode(mirror_image, bl);
 
   cls::rbd::MirrorImage existing_mirror_image;
   int r = image_get(hctx, image_id, &existing_mirror_image);
@@ -3300,7 +4741,7 @@ int image_set(cls_method_context_t hctx, const string &image_id,
   }
 
   bufferlist image_id_bl;
-  ::encode(image_id, image_id_bl);
+  encode(image_id, image_id_bl);
   r = cls_cxx_map_set_val(hctx, global_key(mirror_image.global_image_id),
                           &image_id_bl);
   if (r < 0) {
@@ -3363,7 +4804,7 @@ struct MirrorImageStatusOnDisk : cls::rbd::MirrorImageStatus {
 
   void encode_meta(bufferlist &bl, uint64_t features) const {
     ENCODE_START(1, 1, bl);
-    ::encode(origin, bl, features);
+    encode(origin, bl, features);
     ENCODE_FINISH(bl);
   }
 
@@ -3372,13 +4813,13 @@ struct MirrorImageStatusOnDisk : cls::rbd::MirrorImageStatus {
     cls::rbd::MirrorImageStatus::encode(bl);
   }
 
-  void decode_meta(bufferlist::iterator &it) {
+  void decode_meta(bufferlist::const_iterator &it) {
     DECODE_START(1, it);
-    ::decode(origin, it);
+    decode(origin, it);
     DECODE_FINISH(it);
   }
 
-  void decode(bufferlist::iterator &it) {
+  void decode(bufferlist::const_iterator &it) {
     decode_meta(it);
     cls::rbd::MirrorImageStatus::decode(it);
   }
@@ -3392,7 +4833,7 @@ int image_status_set(cls_method_context_t hctx, const string &global_image_id,
   ondisk_status.last_update = ceph_clock_now();
 
   int r = cls_get_request_origin(hctx, &ondisk_status.origin);
-  assert(r == 0);
+  ceph_assert(r == 0);
 
   bufferlist bl;
   encode(ondisk_status, bl, cls_get_features(hctx));
@@ -3419,6 +4860,7 @@ int image_status_remove(cls_method_context_t hctx,
 }
 
 int image_status_get(cls_method_context_t hctx, const string &global_image_id,
+                     const std::set<entity_inst_t> &watchers,
                     cls::rbd::MirrorImageStatus *status) {
 
   bufferlist bl;
@@ -3433,7 +4875,7 @@ int image_status_get(cls_method_context_t hctx, const string &global_image_id,
 
   MirrorImageStatusOnDisk ondisk_status;
   try {
-    bufferlist::iterator it = bl.begin();
+    auto it = bl.cbegin();
     decode(ondisk_status, it);
   } catch (const buffer::error &err) {
     CLS_ERR("could not decode status for mirrored image, global id '%s'",
@@ -3441,23 +4883,9 @@ int image_status_get(cls_method_context_t hctx, const string &global_image_id,
     return -EIO;
   }
 
-  obj_list_watch_response_t watchers;
-  r = cls_cxx_list_watchers(hctx, &watchers);
-  if (r < 0 && r != -ENOENT) {
-    CLS_ERR("error listing watchers: '%s'", cpp_strerror(r).c_str());
-    return r;
-  }
 
   *status = static_cast<cls::rbd::MirrorImageStatus>(ondisk_status);
-  status->up = false;
-  for (auto &w : watchers.entries) {
-    if (w.name == ondisk_status.origin.name &&
-       w.addr == ondisk_status.origin.addr) {
-      status->up = true;
-      break;
-    }
-  }
-
+  status->up = (watchers.find(ondisk_status.origin) != watchers.end());
   return 0;
 }
 
@@ -3469,14 +4897,22 @@ int image_status_list(cls_method_context_t hctx,
   int max_read = RBD_MAX_KEYS_READ;
   bool more = true;
 
+  std::set<entity_inst_t> watchers;
+  int r = list_watchers(hctx, &watchers);
+  if (r < 0) {
+    return r;
+  }
+
   while (more && mirror_images->size() < max_return) {
     std::map<std::string, bufferlist> vals;
     CLS_LOG(20, "last_read = '%s'", last_read.c_str());
-    int r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX, max_read,
-                                 &vals, &more);
+    r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX, max_read, &vals,
+                             &more);
     if (r < 0) {
-      CLS_ERR("error reading mirror image directory by name: %s",
-              cpp_strerror(r).c_str());
+      if (r != -ENOENT) {
+        CLS_ERR("error reading mirror image directory by name: %s",
+                cpp_strerror(r).c_str());
+      }
       return r;
     }
 
@@ -3484,9 +4920,9 @@ int image_status_list(cls_method_context_t hctx,
           mirror_images->size() < max_return; ++it) {
       const std::string &image_id = it->first.substr(IMAGE_KEY_PREFIX.size());
       cls::rbd::MirrorImage mirror_image;
-      bufferlist::iterator iter = it->second.begin();
+      auto iter = it->second.cbegin();
       try {
-       ::decode(mirror_image, iter);
+       decode(mirror_image, iter);
       } catch (const buffer::error &err) {
        CLS_ERR("could not decode mirror image payload of image '%s'",
                 image_id.c_str());
@@ -3496,7 +4932,8 @@ int image_status_list(cls_method_context_t hctx,
       (*mirror_images)[image_id] = mirror_image;
 
       cls::rbd::MirrorImageStatus status;
-      int r1 = image_status_get(hctx, mirror_image.global_image_id, &status);
+      int r1 = image_status_get(hctx, mirror_image.global_image_id, watchers,
+                                &status);
       if (r1 < 0) {
        continue;
       }
@@ -3511,22 +4948,15 @@ int image_status_list(cls_method_context_t hctx,
   return 0;
 }
 
-int image_status_get_summary(cls_method_context_t hctx,
-       std::map<cls::rbd::MirrorImageStatusState, int> *states) {
-  obj_list_watch_response_t watchers_;
-  int r = cls_cxx_list_watchers(hctx, &watchers_);
+int image_status_get_summary(
+    cls_method_context_t hctx,
+    std::map<cls::rbd::MirrorImageStatusState, int> *states) {
+  std::set<entity_inst_t> watchers;
+  int r = list_watchers(hctx, &watchers);
   if (r < 0) {
-    if (r != -ENOENT) {
-      CLS_ERR("error listing watchers: '%s'", cpp_strerror(r).c_str());
-    }
     return r;
   }
 
-  set<entity_inst_t> watchers;
-  for (auto &w : watchers_.entries) {
-    watchers.insert(entity_inst_t(w.name, w.addr));
-  }
-
   states->clear();
 
   string last_read = IMAGE_KEY_PREFIX;
@@ -3537,7 +4967,9 @@ int image_status_get_summary(cls_method_context_t hctx,
     r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX,
                             max_read, &vals, &more);
     if (r < 0) {
-      CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
+      if (r != -ENOENT) {
+        CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
+      }
       return r;
     }
 
@@ -3549,9 +4981,9 @@ int image_status_get_summary(cls_method_context_t hctx,
       }
 
       cls::rbd::MirrorImage mirror_image;
-      bufferlist::iterator iter = list_it.second.begin();
+      auto iter = list_it.second.cbegin();
       try {
-       ::decode(mirror_image, iter);
+       decode(mirror_image, iter);
       } catch (const buffer::error &err) {
        CLS_ERR("could not decode mirror image payload for key '%s'",
                 key.c_str());
@@ -3559,7 +4991,7 @@ int image_status_get_summary(cls_method_context_t hctx,
       }
 
       cls::rbd::MirrorImageStatus status;
-      image_status_get(hctx, mirror_image.global_image_id, &status);
+      image_status_get(hctx, mirror_image.global_image_id, watchers, &status);
 
       cls::rbd::MirrorImageStatusState state = status.up ? status.state :
        cls::rbd::MIRROR_IMAGE_STATUS_STATE_UNKNOWN;
@@ -3575,20 +5007,12 @@ int image_status_get_summary(cls_method_context_t hctx,
 }
 
 int image_status_remove_down(cls_method_context_t hctx) {
-  obj_list_watch_response_t watchers_;
-  int r = cls_cxx_list_watchers(hctx, &watchers_);
+  std::set<entity_inst_t> watchers;
+  int r = list_watchers(hctx, &watchers);
   if (r < 0) {
-    if (r != -ENOENT) {
-      CLS_ERR("error listing watchers: '%s'", cpp_strerror(r).c_str());
-    }
     return r;
   }
 
-  set<entity_inst_t> watchers;
-  for (auto &w : watchers_.entries) {
-    watchers.insert(entity_inst_t(w.name, w.addr));
-  }
-
   string last_read = STATUS_GLOBAL_KEY_PREFIX;
   int max_read = RBD_MAX_KEYS_READ;
   bool more = true;
@@ -3597,7 +5021,9 @@ int image_status_remove_down(cls_method_context_t hctx) {
     r = cls_cxx_map_get_vals(hctx, last_read, STATUS_GLOBAL_KEY_PREFIX,
                             max_read, &vals, &more);
     if (r < 0) {
-      CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
+      if (r != -ENOENT) {
+        CLS_ERR("error reading mirrored images: %s", cpp_strerror(r).c_str());
+      }
       return r;
     }
 
@@ -3611,26 +5037,115 @@ int image_status_remove_down(cls_method_context_t hctx) {
 
       MirrorImageStatusOnDisk status;
       try {
-       bufferlist::iterator it = list_it.second.begin();
-       status.decode_meta(it);
+       auto it = list_it.second.cbegin();
+       status.decode_meta(it);
+      } catch (const buffer::error &err) {
+       CLS_ERR("could not decode status metadata for mirrored image '%s'",
+               key.c_str());
+       return -EIO;
+      }
+
+      if (watchers.find(status.origin) == watchers.end()) {
+       CLS_LOG(20, "removing stale status object for key %s",
+               key.c_str());
+       int r1 = cls_cxx_map_remove_key(hctx, key);
+       if (r1 < 0) {
+         CLS_ERR("error removing stale status for key '%s': %s",
+                 key.c_str(), cpp_strerror(r1).c_str());
+         return r1;
+       }
+      }
+    }
+
+    if (!vals.empty()) {
+      last_read = vals.rbegin()->first;
+    }
+  }
+
+  return 0;
+}
+
+int image_instance_get(cls_method_context_t hctx,
+                       const string &global_image_id,
+                       const std::set<entity_inst_t> &watchers,
+                       entity_inst_t *instance) {
+  bufferlist bl;
+  int r = cls_cxx_map_get_val(hctx, status_global_key(global_image_id), &bl);
+  if (r < 0) {
+    if (r != -ENOENT) {
+      CLS_ERR("error reading status for mirrored image, global id '%s': '%s'",
+              global_image_id.c_str(), cpp_strerror(r).c_str());
+    }
+    return r;
+  }
+
+  MirrorImageStatusOnDisk ondisk_status;
+  try {
+    auto it = bl.cbegin();
+    decode(ondisk_status, it);
+  } catch (const buffer::error &err) {
+    CLS_ERR("could not decode status for mirrored image, global id '%s'",
+            global_image_id.c_str());
+    return -EIO;
+  }
+
+  if (watchers.find(ondisk_status.origin) == watchers.end()) {
+    return -ESTALE;
+  }
+
+  *instance = ondisk_status.origin;
+  return 0;
+}
+
+int image_instance_list(cls_method_context_t hctx,
+                        const std::string &start_after,
+                        uint64_t max_return,
+                        map<std::string, entity_inst_t> *instances) {
+  std::string last_read = image_key(start_after);
+  int max_read = RBD_MAX_KEYS_READ;
+  bool more = true;
+
+  std::set<entity_inst_t> watchers;
+  int r = list_watchers(hctx, &watchers);
+  if (r < 0) {
+    return r;
+  }
+
+  while (more && instances->size() < max_return) {
+    std::map<std::string, bufferlist> vals;
+    CLS_LOG(20, "last_read = '%s'", last_read.c_str());
+    r = cls_cxx_map_get_vals(hctx, last_read, IMAGE_KEY_PREFIX, max_read, &vals,
+                             &more);
+    if (r < 0) {
+      if (r != -ENOENT) {
+        CLS_ERR("error reading mirror image directory by name: %s",
+                cpp_strerror(r).c_str());
+      }
+      return r;
+    }
+
+    for (auto it = vals.begin(); it != vals.end() &&
+           instances->size() < max_return; ++it) {
+      const std::string &image_id = it->first.substr(IMAGE_KEY_PREFIX.size());
+      cls::rbd::MirrorImage mirror_image;
+      auto iter = it->second.cbegin();
+      try {
+        decode(mirror_image, iter);
       } catch (const buffer::error &err) {
-       CLS_ERR("could not decode status metadata for mirrored image '%s'",
-               key.c_str());
-       return -EIO;
+        CLS_ERR("could not decode mirror image payload of image '%s'",
+                image_id.c_str());
+        return -EIO;
       }
 
-      if (watchers.find(status.origin) == watchers.end()) {
-       CLS_LOG(20, "removing stale status object for key %s",
-               key.c_str());
-       int r1 = cls_cxx_map_remove_key(hctx, key);
-       if (r1 < 0) {
-         CLS_ERR("error removing stale status for key '%s': %s",
-                 key.c_str(), cpp_strerror(r1).c_str());
-         return r1;
-       }
+      entity_inst_t instance;
+      r = image_instance_get(hctx, mirror_image.global_image_id, watchers,
+                             &instance);
+      if (r < 0) {
+        continue;
       }
-    }
 
+      (*instances)[image_id] = instance;
+    }
     if (!vals.empty()) {
       last_read = vals.rbegin()->first;
     }
@@ -3689,6 +5204,54 @@ int instances_remove(cls_method_context_t hctx, const string &instance_id) {
   return 0;
 }
 
+int mirror_image_map_list(cls_method_context_t hctx,
+                          const std::string &start_after,
+                          uint64_t max_return,
+                          std::map<std::string, cls::rbd::MirrorImageMap> *image_mapping) {
+  bool more = true;
+  std::string last_read = mirror_image_map_key(start_after);
+
+  while (more && image_mapping->size() < max_return) {
+    std::map<std::string, bufferlist> vals;
+    CLS_LOG(20, "last read: '%s'", last_read.c_str());
+
+    int max_read = std::min<uint64_t>(RBD_MAX_KEYS_READ, max_return - image_mapping->size());
+    int r = cls_cxx_map_get_vals(hctx, last_read, MIRROR_IMAGE_MAP_KEY_PREFIX,
+                                 max_read, &vals, &more);
+    if (r < 0) {
+      CLS_ERR("error reading image map: %s", cpp_strerror(r).c_str());
+      return r;
+    }
+
+    if (vals.empty()) {
+      return 0;
+    }
+
+    for (auto it = vals.begin(); it != vals.end(); ++it) {
+      const std::string &global_image_id =
+        it->first.substr(MIRROR_IMAGE_MAP_KEY_PREFIX.size());
+
+      cls::rbd::MirrorImageMap mirror_image_map;
+      auto iter = it->second.cbegin();
+      try {
+        decode(mirror_image_map, iter);
+      } catch (const buffer::error &err) {
+        CLS_ERR("could not decode image map payload: %s",
+                cpp_strerror(r).c_str());
+        return -EINVAL;
+      }
+
+      image_mapping->insert(std::make_pair(global_image_id, mirror_image_map));
+    }
+
+    if (!vals.empty()) {
+      last_read = vals.rbegin()->first;
+    }
+  }
+
+  return 0;
+}
+
 } // namespace mirror
 
 /**
@@ -3707,7 +5270,7 @@ int mirror_uuid_get(cls_method_context_t hctx, bufferlist *in,
     return r;
   }
 
-  ::encode(mirror_uuid, *out);
+  encode(mirror_uuid, *out);
   return 0;
 }
 
@@ -3722,8 +5285,8 @@ int mirror_uuid_set(cls_method_context_t hctx, bufferlist *in,
                     bufferlist *out) {
   std::string mirror_uuid;
   try {
-    bufferlist::iterator bl_it = in->begin();
-    ::decode(mirror_uuid, bl_it);
+    auto bl_it = in->cbegin();
+    decode(mirror_uuid, bl_it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -3768,7 +5331,7 @@ int mirror_mode_get(cls_method_context_t hctx, bufferlist *in,
     return r;
   }
 
-  ::encode(mirror_mode_decode, *out);
+  encode(mirror_mode_decode, *out);
   return 0;
 }
 
@@ -3783,8 +5346,8 @@ int mirror_mode_set(cls_method_context_t hctx, bufferlist *in,
                     bufferlist *out) {
   uint32_t mirror_mode_decode;
   try {
-    bufferlist::iterator bl_it = in->begin();
-    ::decode(mirror_mode_decode, bl_it);
+    auto bl_it = in->cbegin();
+    decode(mirror_mode_decode, bl_it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -3814,7 +5377,7 @@ int mirror_mode_set(cls_method_context_t hctx, bufferlist *in,
     }
 
     bufferlist bl;
-    ::encode(mirror_mode_decode, bl);
+    encode(mirror_mode_decode, bl);
 
     r = cls_cxx_map_set_val(hctx, mirror::MODE, &bl);
     if (r < 0) {
@@ -3862,7 +5425,7 @@ int mirror_peer_list(cls_method_context_t hctx, bufferlist *in,
     return r;
   }
 
-  ::encode(peers, *out);
+  encode(peers, *out);
   return 0;
 }
 
@@ -3877,8 +5440,8 @@ int mirror_peer_add(cls_method_context_t hctx, bufferlist *in,
                     bufferlist *out) {
   cls::rbd::MirrorPeer mirror_peer;
   try {
-    bufferlist::iterator it = in->begin();
-    ::decode(mirror_peer, it);
+    auto it = in->cbegin();
+    decode(mirror_peer, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -3928,7 +5491,7 @@ int mirror_peer_add(cls_method_context_t hctx, bufferlist *in,
   }
 
   bufferlist bl;
-  ::encode(mirror_peer, bl);
+  encode(mirror_peer, bl);
   r = cls_cxx_map_set_val(hctx, mirror::peer_key(mirror_peer.uuid),
                           &bl);
   if (r < 0) {
@@ -3949,8 +5512,8 @@ int mirror_peer_remove(cls_method_context_t hctx, bufferlist *in,
                        bufferlist *out) {
   std::string uuid;
   try {
-    bufferlist::iterator it = in->begin();
-    ::decode(uuid, it);
+    auto it = in->cbegin();
+    decode(uuid, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -3976,9 +5539,9 @@ int mirror_peer_set_client(cls_method_context_t hctx, bufferlist *in,
   std::string uuid;
   std::string client_name;
   try {
-    bufferlist::iterator it = in->begin();
-    ::decode(uuid, it);
-    ::decode(client_name, it);
+    auto it = in->cbegin();
+    decode(uuid, it);
+    decode(client_name, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4010,9 +5573,9 @@ int mirror_peer_set_cluster(cls_method_context_t hctx, bufferlist *in,
   std::string uuid;
   std::string cluster_name;
   try {
-    bufferlist::iterator it = in->begin();
-    ::decode(uuid, it);
-    ::decode(cluster_name, it);
+    auto it = in->cbegin();
+    decode(uuid, it);
+    decode(cluster_name, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4046,9 +5609,9 @@ int mirror_image_list(cls_method_context_t hctx, bufferlist *in,
   std::string start_after;
   uint64_t max_return;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(start_after, iter);
-    ::decode(max_return, iter);
+    auto iter = in->cbegin();
+    decode(start_after, iter);
+    decode(max_return, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4064,8 +5627,10 @@ int mirror_image_list(cls_method_context_t hctx, bufferlist *in,
     int r = cls_cxx_map_get_vals(hctx, last_read, mirror::IMAGE_KEY_PREFIX,
                                  max_read, &vals, &more);
     if (r < 0) {
-      CLS_ERR("error reading mirror image directory by name: %s",
-              cpp_strerror(r).c_str());
+      if (r != -ENOENT) {
+        CLS_ERR("error reading mirror image directory by name: %s",
+                cpp_strerror(r).c_str());
+      }
       return r;
     }
 
@@ -4073,9 +5638,9 @@ int mirror_image_list(cls_method_context_t hctx, bufferlist *in,
       const std::string &image_id =
         it->first.substr(mirror::IMAGE_KEY_PREFIX.size());
       cls::rbd::MirrorImage mirror_image;
-      bufferlist::iterator iter = it->second.begin();
+      auto iter = it->second.cbegin();
       try {
-       ::decode(mirror_image, iter);
+       decode(mirror_image, iter);
       } catch (const buffer::error &err) {
        CLS_ERR("could not decode mirror image payload of image '%s'",
                 image_id.c_str());
@@ -4092,7 +5657,7 @@ int mirror_image_list(cls_method_context_t hctx, bufferlist *in,
     }
   }
 
-  ::encode(mirror_images, *out);
+  encode(mirror_images, *out);
   return 0;
 }
 
@@ -4108,8 +5673,8 @@ int mirror_image_get_image_id(cls_method_context_t hctx, bufferlist *in,
                               bufferlist *out) {
   std::string global_id;
   try {
-    bufferlist::iterator it = in->begin();
-    ::decode(global_id, it);
+    auto it = in->cbegin();
+    decode(global_id, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4117,12 +5682,14 @@ int mirror_image_get_image_id(cls_method_context_t hctx, bufferlist *in,
   std::string image_id;
   int r = read_key(hctx, mirror::global_key(global_id), &image_id);
   if (r < 0) {
-    CLS_ERR("error retrieving image id for global id '%s': %s",
-            global_id.c_str(), cpp_strerror(r).c_str());
+    if (r != -ENOENT) {
+      CLS_ERR("error retrieving image id for global id '%s': %s",
+              global_id.c_str(), cpp_strerror(r).c_str());
+    }
     return r;
   }
 
-  ::encode(image_id, *out);
+  encode(image_id, *out);
   return 0;
 }
 
@@ -4138,8 +5705,8 @@ int mirror_image_get(cls_method_context_t hctx, bufferlist *in,
                     bufferlist *out) {
   string image_id;
   try {
-    bufferlist::iterator it = in->begin();
-    ::decode(image_id, it);
+    auto it = in->cbegin();
+    decode(image_id, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4150,7 +5717,7 @@ int mirror_image_get(cls_method_context_t hctx, bufferlist *in,
     return r;
   }
 
-  ::encode(mirror_image, *out);
+  encode(mirror_image, *out);
   return 0;
 }
 
@@ -4168,9 +5735,9 @@ int mirror_image_set(cls_method_context_t hctx, bufferlist *in,
   string image_id;
   cls::rbd::MirrorImage mirror_image;
   try {
-    bufferlist::iterator it = in->begin();
-    ::decode(image_id, it);
-    ::decode(mirror_image, it);
+    auto it = in->cbegin();
+    decode(image_id, it);
+    decode(mirror_image, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4193,8 +5760,8 @@ int mirror_image_remove(cls_method_context_t hctx, bufferlist *in,
                        bufferlist *out) {
   string image_id;
   try {
-    bufferlist::iterator it = in->begin();
-    ::decode(image_id, it);
+    auto it = in->cbegin();
+    decode(image_id, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4219,9 +5786,9 @@ int mirror_image_status_set(cls_method_context_t hctx, bufferlist *in,
   string global_image_id;
   cls::rbd::MirrorImageStatus status;
   try {
-    bufferlist::iterator it = in->begin();
-    ::decode(global_image_id, it);
-    ::decode(status, it);
+    auto it = in->cbegin();
+    decode(global_image_id, it);
+    decode(status, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4244,8 +5811,8 @@ int mirror_image_status_remove(cls_method_context_t hctx, bufferlist *in,
                               bufferlist *out) {
   string global_image_id;
   try {
-    bufferlist::iterator it = in->begin();
-    ::decode(global_image_id, it);
+    auto it = in->cbegin();
+    decode(global_image_id, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4269,19 +5836,25 @@ int mirror_image_status_get(cls_method_context_t hctx, bufferlist *in,
                            bufferlist *out) {
   string global_image_id;
   try {
-    bufferlist::iterator it = in->begin();
-    ::decode(global_image_id, it);
+    auto it = in->cbegin();
+    decode(global_image_id, it);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
 
+  std::set<entity_inst_t> watchers;
+  int r = mirror::list_watchers(hctx, &watchers);
+  if (r < 0) {
+    return r;
+  }
+
   cls::rbd::MirrorImageStatus status;
-  int r = mirror::image_status_get(hctx, global_image_id, &status);
+  r = mirror::image_status_get(hctx, global_image_id, watchers, &status);
   if (r < 0) {
     return r;
   }
 
-  ::encode(status, *out);
+  encode(status, *out);
   return 0;
 }
 
@@ -4301,9 +5874,9 @@ int mirror_image_status_list(cls_method_context_t hctx, bufferlist *in,
   std::string start_after;
   uint64_t max_return;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(start_after, iter);
-    ::decode(max_return, iter);
+    auto iter = in->cbegin();
+    decode(start_after, iter);
+    decode(max_return, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4316,8 +5889,8 @@ int mirror_image_status_list(cls_method_context_t hctx, bufferlist *in,
     return r;
   }
 
-  ::encode(images, *out);
-  ::encode(statuses, *out);
+  encode(images, *out);
+  encode(statuses, *out);
   return 0;
 }
 
@@ -4338,7 +5911,7 @@ int mirror_image_status_get_summary(cls_method_context_t hctx, bufferlist *in,
     return r;
   }
 
-  ::encode(states, *out);
+  encode(states, *out);
   return 0;
 }
 
@@ -4358,6 +5931,73 @@ int mirror_image_status_remove_down(cls_method_context_t hctx, bufferlist *in,
   return 0;
 }
 
+/**
+ * Input:
+ * @param global_image_id (std::string)
+ *
+ * Output:
+ * @param entity_inst_t - instance
+ * @returns 0 on success, negative error code on failure
+ */
+int mirror_image_instance_get(cls_method_context_t hctx, bufferlist *in,
+                              bufferlist *out) {
+  string global_image_id;
+  try {
+    auto it = in->cbegin();
+    decode(global_image_id, it);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  std::set<entity_inst_t> watchers;
+  int r = mirror::list_watchers(hctx, &watchers);
+  if (r < 0) {
+    return r;
+  }
+
+  entity_inst_t instance;
+  r = mirror::image_instance_get(hctx, global_image_id, watchers, &instance);
+  if (r < 0) {
+    return r;
+  }
+
+  encode(instance, *out, cls_get_features(hctx));
+  return 0;
+}
+
+/**
+ * Input:
+ * @param start_after which name to begin listing after
+ *        (use the empty string to start at the beginning)
+ * @param max_return the maximum number of names to list
+ *
+ * Output:
+ * @param std::map<std::string, entity_inst_t>: image id to instance map
+ * @returns 0 on success, negative error code on failure
+ */
+int mirror_image_instance_list(cls_method_context_t hctx, bufferlist *in,
+                               bufferlist *out) {
+  std::string start_after;
+  uint64_t max_return;
+  try {
+    auto iter = in->cbegin();
+    decode(start_after, iter);
+    decode(max_return, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  map<std::string, entity_inst_t> instances;
+  int r = mirror::image_instance_list(hctx, start_after, max_return,
+                                      &instances);
+  if (r < 0) {
+    return r;
+  }
+
+  encode(instances, *out, cls_get_features(hctx));
+  return 0;
+}
+
 /**
  * Input:
  * none
@@ -4375,7 +6015,7 @@ int mirror_instances_list(cls_method_context_t hctx, bufferlist *in,
     return r;
   }
 
-  ::encode(instance_ids, *out);
+  encode(instance_ids, *out);
   return 0;
 }
 
@@ -4390,8 +6030,8 @@ int mirror_instances_add(cls_method_context_t hctx, bufferlist *in,
                          bufferlist *out) {
   std::string instance_id;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(instance_id, iter);
+    auto iter = in->cbegin();
+    decode(instance_id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4403,61 +6043,295 @@ int mirror_instances_add(cls_method_context_t hctx, bufferlist *in,
   return 0;
 }
 
-/**
- * Input:
- * @param instance_id (std::string)
- *
- * Output:
- * @returns 0 on success, negative error code on failure
- */
-int mirror_instances_remove(cls_method_context_t hctx, bufferlist *in,
-                            bufferlist *out) {
-  std::string instance_id;
-  try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(instance_id, iter);
-  } catch (const buffer::error &err) {
-    return -EINVAL;
-  }
+/**
+ * Input:
+ * @param instance_id (std::string)
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int mirror_instances_remove(cls_method_context_t hctx, bufferlist *in,
+                            bufferlist *out) {
+  std::string instance_id;
+  try {
+    auto iter = in->cbegin();
+    decode(instance_id, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  int r = mirror::instances_remove(hctx, instance_id);
+  if (r < 0) {
+    return r;
+  }
+  return 0;
+}
+
+/**
+ * Input:
+ * @param start_after: key to start after
+ * @param max_return: max return items
+ *
+ * Output:
+ * @param std::map<std::string, cls::rbd::MirrorImageMap>: image mapping
+ * @returns 0 on success, negative error code on failure
+ */
+int mirror_image_map_list(cls_method_context_t hctx, bufferlist *in,
+                          bufferlist *out) {
+  std::string start_after;
+  uint64_t max_return;
+  try {
+    auto it = in->cbegin();
+    decode(start_after, it);
+    decode(max_return, it);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  std::map<std::string, cls::rbd::MirrorImageMap> image_mapping;
+  int r = mirror::mirror_image_map_list(hctx, start_after, max_return, &image_mapping);
+  if (r < 0) {
+    return r;
+  }
+
+  encode(image_mapping, *out);
+  return 0;
+}
+
+/**
+ * Input:
+ * @param global_image_id: global image id
+ * @param image_map: image map
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int mirror_image_map_update(cls_method_context_t hctx, bufferlist *in,
+                            bufferlist *out) {
+  std::string global_image_id;
+  cls::rbd::MirrorImageMap image_map;
+
+  try {
+    auto it = in->cbegin();
+    decode(global_image_id, it);
+    decode(image_map, it);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  bufferlist bl;
+  encode(image_map, bl);
+
+  const std::string key = mirror::mirror_image_map_key(global_image_id);
+  int r = cls_cxx_map_set_val(hctx, key, &bl);
+  if (r < 0) {
+    CLS_ERR("error updating image map %s: %s", key.c_str(),
+            cpp_strerror(r).c_str());
+    return r;
+  }
+
+  return 0;
+}
+
+/**
+ * Input:
+ * @param global_image_id: global image id
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int mirror_image_map_remove(cls_method_context_t hctx, bufferlist *in,
+                            bufferlist *out) {
+  std::string global_image_id;
+
+  try {
+    auto it = in->cbegin();
+    decode(global_image_id, it);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  const std::string key = mirror::mirror_image_map_key(global_image_id);
+  int r = cls_cxx_map_remove_key(hctx, key);
+  if (r < 0 && r != -ENOENT) {
+    CLS_ERR("error removing image map %s: %s", key.c_str(),
+            cpp_strerror(r).c_str());
+    return r;
+  }
+
+  return 0;
+}
+
+namespace group {
+
+/********************** methods for rbd_group_directory ***********************/
+
+int dir_add(cls_method_context_t hctx,
+            const string &name, const string &id,
+            bool check_for_unique_id)
+{
+  if (!name.size() || !is_valid_id(id)) {
+    CLS_ERR("invalid group name '%s' or id '%s'",
+            name.c_str(), id.c_str());
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "dir_add name=%s id=%s", name.c_str(), id.c_str());
+
+  string name_key = dir_key_for_name(name);
+  string id_key = dir_key_for_id(id);
+  string tmp;
+  int r = read_key(hctx, name_key, &tmp);
+  if (r != -ENOENT) {
+    CLS_LOG(10, "name already exists");
+    return -EEXIST;
+  }
+  r = read_key(hctx, id_key, &tmp);
+  if (r != -ENOENT && check_for_unique_id) {
+    CLS_LOG(10, "id already exists");
+    return -EBADF;
+  }
+  bufferlist id_bl, name_bl;
+  encode(id, id_bl);
+  encode(name, name_bl);
+  map<string, bufferlist> omap_vals;
+  omap_vals[name_key] = id_bl;
+  omap_vals[id_key] = name_bl;
+  return cls_cxx_map_set_vals(hctx, &omap_vals);
+}
+
+int dir_remove(cls_method_context_t hctx,
+               const string &name, const string &id)
+{
+  CLS_LOG(20, "dir_remove name=%s id=%s", name.c_str(), id.c_str());
+
+  string name_key = dir_key_for_name(name);
+  string id_key = dir_key_for_id(id);
+  string stored_name, stored_id;
+
+  int r = read_key(hctx, name_key, &stored_id);
+  if (r < 0) {
+    if (r != -ENOENT)
+      CLS_ERR("error reading name to id mapping: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+  r = read_key(hctx, id_key, &stored_name);
+  if (r < 0) {
+    if (r != -ENOENT)
+      CLS_ERR("error reading id to name mapping: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  // check if this op raced with a rename
+  if (stored_name != name || stored_id != id) {
+    CLS_ERR("stored name '%s' and id '%s' do not match args '%s' and '%s'",
+            stored_name.c_str(), stored_id.c_str(), name.c_str(), id.c_str());
+    return -ESTALE;
+  }
+
+  r = cls_cxx_map_remove_key(hctx, name_key);
+  if (r < 0) {
+    CLS_ERR("error removing name: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  r = cls_cxx_map_remove_key(hctx, id_key);
+  if (r < 0) {
+    CLS_ERR("error removing id: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  return 0;
+}
+
+static const string RBD_GROUP_SNAP_KEY_PREFIX = "snapshot_";
+
+std::string snap_key(const std::string &snap_id) {
+  ostringstream oss;
+  oss << RBD_GROUP_SNAP_KEY_PREFIX << snap_id;
+  return oss.str();
+}
+
+int snap_list(cls_method_context_t hctx, cls::rbd::GroupSnapshot start_after,
+              uint64_t max_return,
+              std::vector<cls::rbd::GroupSnapshot> *group_snaps)
+{
+  int max_read = RBD_MAX_KEYS_READ;
+  std::map<string, bufferlist> vals;
+  string last_read = snap_key(start_after.id);
+
+  group_snaps->clear();
+
+  bool more;
+  do {
+    int r = cls_cxx_map_get_vals(hctx, last_read,
+                                RBD_GROUP_SNAP_KEY_PREFIX,
+                                max_read, &vals, &more);
+    if (r < 0)
+      return r;
+
+    for (map<string, bufferlist>::iterator it = vals.begin();
+        it != vals.end() && group_snaps->size() < max_return; ++it) {
+
+      auto iter = it->second.cbegin();
+      cls::rbd::GroupSnapshot snap;
+      try {
+       decode(snap, iter);
+      } catch (const buffer::error &err) {
+       CLS_ERR("error decoding snapshot: %s", it->first.c_str());
+       return -EIO;
+      }
+      CLS_LOG(20, "Discovered snapshot %s %s",
+             snap.name.c_str(),
+             snap.id.c_str());
+      group_snaps->push_back(snap);
+    }
+
+  } while (more && (group_snaps->size() < max_return));
 
-  int r = mirror::instances_remove(hctx, instance_id);
-  if (r < 0) {
-    return r;
-  }
   return 0;
 }
 
-/**
- * Initialize the header with basic metadata.
- * Everything is stored as key/value pairs as omaps in the header object.
- *
- * Input:
- * none
- *
- * Output:
- * @return 0 on success, negative error code on failure
- */
-int group_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+static int check_duplicate_snap_name(cls_method_context_t hctx,
+                                    const std::string &snap_name,
+                                    const std::string &snap_id)
 {
-  bufferlist snap_seqbl;
-  uint64_t snap_seq = 0;
-  ::encode(snap_seq, snap_seqbl);
-  int r = cls_cxx_map_set_val(hctx, GROUP_SNAP_SEQ, &snap_seqbl);
-  if (r < 0)
-    return r;
+  const int max_read = 1024;
+  cls::rbd::GroupSnapshot snap_last;
+  std::vector<cls::rbd::GroupSnapshot> page;
+
+  for (;;) {
+    int r = snap_list(hctx, snap_last, max_read, &page);
+    if (r < 0) {
+      return r;
+    }
+    for (auto& snap: page) {
+      if (snap.name == snap_name && snap.id != snap_id) {
+       return -EEXIST;
+      }
+    }
+
+    if (page.size() < max_read) {
+      break;
+    }
+
+    snap_last = *page.rbegin();
+  }
 
   return 0;
 }
 
+} // namespace group
+
 /**
- * List consistency groups from the directory.
+ * List groups from the directory.
  *
  * Input:
  * @param start_after (std::string)
  * @param max_return (int64_t)
  *
  * Output:
- * @param map of consistency groups (name, id)
+ * @param map of groups (name, id)
  * @return 0 on success, negative error code on failure
  */
 int group_dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
@@ -4466,9 +6340,9 @@ int group_dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   uint64_t max_return;
 
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(start_after, iter);
-    ::decode(max_return, iter);
+    auto iter = in->cbegin();
+    decode(start_after, iter);
+    decode(max_return, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4484,17 +6358,19 @@ int group_dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     int r = cls_cxx_map_get_vals(hctx, last_read, RBD_DIR_NAME_KEY_PREFIX,
                                  max_read, &vals, &more);
     if (r < 0) {
-      CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
+      if (r != -ENOENT) {
+        CLS_ERR("error reading directory by name: %s", cpp_strerror(r).c_str());
+      }
       return r;
     }
 
     for (pair<string, bufferlist> val: vals) {
       string id;
-      bufferlist::iterator iter = val.second.begin();
+      auto iter = val.second.cbegin();
       try {
-       ::decode(id, iter);
+       decode(id, iter);
       } catch (const buffer::error &err) {
-       CLS_ERR("could not decode id of consistency group '%s'", val.first.c_str());
+       CLS_ERR("could not decode id of group '%s'", val.first.c_str());
        return -EIO;
       }
       CLS_LOG(20, "adding '%s' -> '%s'", dir_name_from_key(val.first).c_str(), id.c_str());
@@ -4507,13 +6383,13 @@ int group_dir_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     }
   }
 
-  ::encode(groups, *out);
+  encode(groups, *out);
 
   return 0;
 }
 
 /**
- * Add a consistency group to the directory.
+ * Add a group to the directory.
  *
  * Input:
  * @param name (std::string)
@@ -4527,52 +6403,55 @@ int group_dir_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   int r = cls_cxx_create(hctx, false);
 
   if (r < 0) {
-    CLS_ERR("could not create consistency group directory: %s",
+    CLS_ERR("could not create group directory: %s",
            cpp_strerror(r).c_str());
     return r;
   }
 
   string name, id;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(name, iter);
-    ::decode(id, iter);
+    auto iter = in->cbegin();
+    decode(name, iter);
+    decode(id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
 
-  if (!name.size() || !is_valid_id(id)) {
-    CLS_ERR("invalid consistency group name '%s' or id '%s'",
-           name.c_str(), id.c_str());
+  return group::dir_add(hctx, name, id, true);
+}
+
+/**
+ * Rename a group to the directory.
+ *
+ * Input:
+ * @param src original name of the group (std::string)
+ * @param dest new name of the group (std::string)
+ * @param id the id of the group (std::string)
+ *
+ * Output:
+ * @return 0 on success, negative error code on failure
+ */
+int group_dir_rename(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  string src, dest, id;
+  try {
+    auto iter = in->cbegin();
+    decode(src, iter);
+    decode(dest, iter);
+    decode(id, iter);
+  } catch (const buffer::error &err) {
     return -EINVAL;
   }
 
-  CLS_LOG(20, "group_dir_add name=%s id=%s", name.c_str(), id.c_str());
+  int r = group::dir_remove(hctx, src, id);
+  if (r < 0)
+    return r;
 
-  string tmp;
-  string name_key = dir_key_for_name(name);
-  string id_key = dir_key_for_id(id);
-  r = read_key(hctx, name_key, &tmp);
-  if (r != -ENOENT) {
-    CLS_LOG(10, "name already exists");
-    return -EEXIST;
-  }
-  r = read_key(hctx, id_key, &tmp);
-  if (r != -ENOENT) {
-    CLS_LOG(10, "id already exists");
-    return -EBADF;
-  }
-  bufferlist id_bl, name_bl;
-  ::encode(id, id_bl);
-  ::encode(name, name_bl);
-  map<string, bufferlist> omap_vals;
-  omap_vals[name_key] = id_bl;
-  omap_vals[id_key] = name_bl;
-  return cls_cxx_map_set_vals(hctx, &omap_vals);
+  return group::dir_add(hctx, dest, id, false);
 }
 
 /**
- * Remove a consistency group from the directory.
+ * Remove a group from the directory.
  *
  * Input:
  * @param name (std::string)
@@ -4585,56 +6464,18 @@ int group_dir_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   string name, id;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(name, iter);
-    ::decode(id, iter);
+    auto iter = in->cbegin();
+    decode(name, iter);
+    decode(id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
 
-  CLS_LOG(20, "group_dir_remove name=%s id=%s", name.c_str(), id.c_str());
-
-  string stored_name, stored_id;
-  string name_key = dir_key_for_name(name);
-  string id_key = dir_key_for_id(id);
-
-  int r = read_key(hctx, name_key, &stored_id);
-  if (r < 0) {
-    if (r != -ENOENT)
-      CLS_ERR("error reading name to id mapping: %s", cpp_strerror(r).c_str());
-    return r;
-  }
-  r = read_key(hctx, id_key, &stored_name);
-  if (r < 0) {
-    if (r != -ENOENT)
-      CLS_ERR("error reading id to name mapping: %s", cpp_strerror(r).c_str());
-    return r;
-  }
-
-  // check if this op raced with a rename
-  if (stored_name != name || stored_id != id) {
-    CLS_ERR("stored name '%s' and id '%s' do not match args '%s' and '%s'",
-           stored_name.c_str(), stored_id.c_str(), name.c_str(), id.c_str());
-    return -ESTALE;
-  }
-
-  r = cls_cxx_map_remove_key(hctx, name_key);
-  if (r < 0) {
-    CLS_ERR("error removing name: %s", cpp_strerror(r).c_str());
-    return r;
-  }
-
-  r = cls_cxx_map_remove_key(hctx, id_key);
-  if (r < 0) {
-    CLS_ERR("error removing id: %s", cpp_strerror(r).c_str());
-    return r;
-  }
-
-  return 0;
+  return group::dir_remove(hctx, name, id);
 }
 
 /**
- * Set state of an image in the consistency group.
+ * Set state of an image in the group.
  *
  * Input:
  * @param image_status (cls::rbd::GroupImageStatus)
@@ -4648,8 +6489,8 @@ int group_image_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 
   cls::rbd::GroupImageStatus st;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(st, iter);
+    auto iter = in->cbegin();
+    decode(st, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4657,7 +6498,7 @@ int group_image_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   string image_key = st.spec.image_key();
 
   bufferlist image_val_bl;
-  ::encode(st.state, image_val_bl);
+  encode(st.state, image_val_bl);
   int r = cls_cxx_map_set_val(hctx, image_key, &image_val_bl);
   if (r < 0) {
     return r;
@@ -4667,7 +6508,7 @@ int group_image_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 }
 
 /**
- * Remove reference to an image from the consistency group.
+ * Remove reference to an image from the group.
  *
  * Input:
  * @param spec (cls::rbd::GroupImageSpec)
@@ -4681,8 +6522,8 @@ int group_image_remove(cls_method_context_t hctx,
   CLS_LOG(20, "group_image_remove");
   cls::rbd::GroupImageSpec spec;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(spec, iter);
+    auto iter = in->cbegin();
+    decode(spec, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4699,7 +6540,7 @@ int group_image_remove(cls_method_context_t hctx,
 }
 
 /*
- * List images in the consistency group.
+ * List images in the group.
  *
  * Input:
  * @param start_after which name to begin listing after
@@ -4717,9 +6558,9 @@ int group_image_list(cls_method_context_t hctx,
   cls::rbd::GroupImageSpec start_after;
   uint64_t max_return;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(start_after, iter);
-    ::decode(max_return, iter);
+    auto iter = in->cbegin();
+    decode(start_after, iter);
+    decode(max_return, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4730,18 +6571,19 @@ int group_image_list(cls_method_context_t hctx,
   std::vector<cls::rbd::GroupImageStatus> res;
   bool more;
   do {
-    int r = cls_cxx_map_get_vals(hctx, last_read,cls::rbd::RBD_GROUP_IMAGE_KEY_PREFIX,
-                                 max_read, &vals, &more);
+    int r = cls_cxx_map_get_vals(hctx, last_read,
+                                cls::rbd::RBD_GROUP_IMAGE_KEY_PREFIX,
+                                max_read, &vals, &more);
     if (r < 0)
       return r;
 
     for (map<string, bufferlist>::iterator it = vals.begin();
         it != vals.end() && res.size() < max_return; ++it) {
 
-      bufferlist::iterator iter = it->second.begin();
+      auto iter = it->second.cbegin();
       cls::rbd::GroupImageLinkState state;
       try {
-       ::decode(state, iter);
+       decode(state, iter);
       } catch (const buffer::error &err) {
        CLS_ERR("error decoding state for image: %s", it->first.c_str());
        return -EIO;
@@ -4761,13 +6603,13 @@ int group_image_list(cls_method_context_t hctx,
     }
 
   } while (more && (res.size() < max_return));
-  ::encode(res, *out);
+  encode(res, *out);
 
   return 0;
 }
 
 /**
- * Reference the consistency group this image belongs to.
+ * Reference the group this image belongs to.
  *
  * Input:
  * @param group_id (std::string)
@@ -4776,14 +6618,14 @@ int group_image_list(cls_method_context_t hctx,
  * Output:
  * @return 0 on success, negative error code on failure
  */
-int image_add_group(cls_method_context_t hctx,
+int image_group_add(cls_method_context_t hctx,
                    bufferlist *in, bufferlist *out)
 {
-  CLS_LOG(20, "image_add_group");
+  CLS_LOG(20, "image_group_add");
   cls::rbd::GroupSpec new_group;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(new_group, iter);
+    auto iter = in->cbegin();
+    decode(new_group, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4792,30 +6634,36 @@ int image_add_group(cls_method_context_t hctx,
 
   int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &existing_refbl);
   if (r == 0) {
-    // If we are trying to link this image to the same group then return success.
-    // If this image already belongs to another group then abort.
+    // If we are trying to link this image to the same group then return
+    // success. If this image already belongs to another group then abort.
     cls::rbd::GroupSpec old_group;
     try {
-      bufferlist::iterator iter = existing_refbl.begin();
-      ::decode(old_group, iter);
+      auto iter = existing_refbl.cbegin();
+      decode(old_group, iter);
     } catch (const buffer::error &err) {
       return -EINVAL;
     }
 
-    if ((old_group.group_id != new_group.group_id)
-       || (old_group.pool_id != new_group.pool_id)) {
+    if ((old_group.group_id != new_group.group_id) ||
+        (old_group.pool_id != new_group.pool_id)) {
       return -EEXIST;
     } else {
       return 0; // In this case the values are already correct
     }
-  } else if (r < 0 && r != -ENOENT) { // No entry means this image is not a member of any consistency group. So, we can use it.
+  } else if (r < 0 && r != -ENOENT) {
+    // No entry means this image is not a member of any group.
+    return r;
+  }
+
+  r = image::set_op_features(hctx, RBD_OPERATION_FEATURE_GROUP,
+                             RBD_OPERATION_FEATURE_GROUP);
+  if (r < 0) {
     return r;
   }
 
   bufferlist refbl;
-  ::encode(new_group, refbl);
+  encode(new_group, refbl);
   r = cls_cxx_map_set_val(hctx, RBD_GROUP_REF, &refbl);
-
   if (r < 0) {
     return r;
   }
@@ -4824,7 +6672,7 @@ int image_add_group(cls_method_context_t hctx,
 }
 
 /**
- * Remove image's pointer to the consistency group.
+ * Remove image's pointer to the group.
  *
  * Input:
  * @param cg_id (std::string)
@@ -4833,15 +6681,15 @@ int image_add_group(cls_method_context_t hctx,
  * Output:
  * @return 0 on success, negative error code on failure
  */
-int image_remove_group(cls_method_context_t hctx,
+int image_group_remove(cls_method_context_t hctx,
                       bufferlist *in,
                       bufferlist *out)
 {
-  CLS_LOG(20, "image_remove_group");
+  CLS_LOG(20, "image_group_remove");
   cls::rbd::GroupSpec spec;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(spec, iter);
+    auto iter = in->cbegin();
+    decode(spec, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4853,9 +6701,9 @@ int image_remove_group(cls_method_context_t hctx,
   }
 
   cls::rbd::GroupSpec ref_spec;
-  bufferlist::iterator iter = refbl.begin();
+  auto iter = refbl.cbegin();
   try {
-    ::decode(ref_spec, iter);
+    decode(ref_spec, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4869,41 +6717,205 @@ int image_remove_group(cls_method_context_t hctx,
     return r;
   }
 
+  r = image::set_op_features(hctx, 0, RBD_OPERATION_FEATURE_GROUP);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+/**
+ * Retrieve the id and pool of the group this image belongs to.
+ *
+ * Input:
+ * none
+ *
+ * Output:
+ * @param GroupSpec
+ * @return 0 on success, negative error code on failure
+ */
+int image_group_get(cls_method_context_t hctx,
+                   bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "image_group_get");
+  bufferlist refbl;
+  int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
+  if (r < 0 && r != -ENOENT) {
+    return r;
+  }
+
+  cls::rbd::GroupSpec spec;
+
+  if (r != -ENOENT) {
+    auto iter = refbl.cbegin();
+    try {
+      decode(spec, iter);
+    } catch (const buffer::error &err) {
+      return -EINVAL;
+    }
+  }
+
+  encode(spec, *out);
+  return 0;
+}
+
+/**
+ * Save initial snapshot record.
+ *
+ * Input:
+ * @param GroupSnapshot
+ *
+ * Output:
+ * @return 0 on success, negative error code on failure
+ */
+int group_snap_set(cls_method_context_t hctx,
+                  bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "group_snap_set");
+  cls::rbd::GroupSnapshot group_snap;
+  try {
+    auto iter = in->cbegin();
+    decode(group_snap, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  if (group_snap.name.empty()) {
+    CLS_ERR("group snapshot name is empty");
+    return -EINVAL;
+  }
+  if (group_snap.id.empty()) {
+    CLS_ERR("group snapshot id is empty");
+    return -EINVAL;
+  }
+
+  int r = group::check_duplicate_snap_name(hctx, group_snap.name,
+                                           group_snap.id);
+  if (r < 0) {
+    return r;
+  }
+
+  std::string key = group::snap_key(group_snap.id);
+  if (group_snap.state == cls::rbd::GROUP_SNAPSHOT_STATE_INCOMPLETE) {
+    bufferlist snap_bl;
+    r = cls_cxx_map_get_val(hctx, key, &snap_bl);
+    if (r < 0 && r != -ENOENT) {
+      return r;
+    } else if (r >= 0) {
+      return -EEXIST;
+    }
+  }
+
+  bufferlist obl;
+  encode(group_snap, obl);
+  r = cls_cxx_map_set_val(hctx, key, &obl);
+  return r;
+}
+
+/**
+ * Remove snapshot record.
+ *
+ * Input:
+ * @param id Snapshot id
+ *
+ * Output:
+ * @return 0 on success, negative error code on failure
+ */
+int group_snap_remove(cls_method_context_t hctx,
+                     bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "group_snap_remove");
+  std::string snap_id;
+  try {
+    auto iter = in->cbegin();
+    decode(snap_id, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  std::string snap_key = group::snap_key(snap_id);
+
+  CLS_LOG(20, "removing snapshot with key %s", snap_key.c_str());
+  int r = cls_cxx_map_remove_key(hctx, snap_key);
+  return r;
+}
+
+/**
+ * Get group's snapshot by id.
+ *
+ * Input:
+ * @param snapshot_id the id of the snapshot to look for.
+ *
+ * Output:
+ * @param GroupSnapshot the requested snapshot
+ * @return 0 on success, negative error code on failure
+ */
+int group_snap_get_by_id(cls_method_context_t hctx,
+                        bufferlist *in, bufferlist *out)
+{
+  CLS_LOG(20, "group_snap_get_by_id");
+
+  std::string snap_id;
+  try {
+    auto iter = in->cbegin();
+    decode(snap_id, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  bufferlist snapbl;
+
+  int r = cls_cxx_map_get_val(hctx, group::snap_key(snap_id), &snapbl);
+  if (r < 0) {
+    return r;
+  }
+
+  cls::rbd::GroupSnapshot group_snap;
+  auto iter = snapbl.cbegin();
+  try {
+    decode(group_snap, iter);
+  } catch (const buffer::error &err) {
+    CLS_ERR("error decoding snapshot: %s", snap_id.c_str());
+    return -EIO;
+  }
+
+  encode(group_snap, *out);
+
   return 0;
 }
 
 /**
- * Retrieve the id and pool of the consistency group this image belongs to.
+ * List group's snapshots.
  *
  * Input:
- * none
+ * @param start_after which name to begin listing after
+ *       (use the empty string to start at the beginning)
+ * @param max_return the maximum number of snapshots to list
  *
  * Output:
- * @param GroupSpec
+ * @param list of snapshots
  * @return 0 on success, negative error code on failure
  */
-int image_get_group(cls_method_context_t hctx,
+int group_snap_list(cls_method_context_t hctx,
                    bufferlist *in, bufferlist *out)
 {
-  CLS_LOG(20, "image_get_group");
-  bufferlist refbl;
-  int r = cls_cxx_map_get_val(hctx, RBD_GROUP_REF, &refbl);
-  if (r < 0 && r != -ENOENT) {
-    return r;
-  }
-
-  cls::rbd::GroupSpec spec;
+  CLS_LOG(20, "group_snap_list");
 
-  if (r != -ENOENT) {
-    bufferlist::iterator iter = refbl.begin();
-    try {
-      ::decode(spec, iter);
-    } catch (const buffer::error &err) {
-      return -EINVAL;
-    }
+  cls::rbd::GroupSnapshot start_after;
+  uint64_t max_return;
+  try {
+    auto iter = in->cbegin();
+    decode(start_after, iter);
+    decode(max_return, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
   }
+  std::vector<cls::rbd::GroupSnapshot> group_snaps;
+  group::snap_list(hctx, start_after, max_return, &group_snaps);
+
+  encode(group_snaps, *out);
 
-  ::encode(spec, *out);
   return 0;
 }
 
@@ -4944,9 +6956,9 @@ int trash_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   string id;
   cls::rbd::TrashImageSpec trash_spec;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(id, iter);
-    ::decode(trash_spec, iter);
+    auto iter = in->cbegin();
+    decode(id, iter);
+    decode(trash_spec, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -4971,7 +6983,7 @@ int trash_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   }
 
   map<string, bufferlist> omap_vals;
-  ::encode(trash_spec, omap_vals[key]);
+  encode(trash_spec, omap_vals[key]);
   return cls_cxx_map_set_vals(hctx, &omap_vals);
 }
 
@@ -4990,8 +7002,8 @@ int trash_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   string id;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(id, iter);
+    auto iter = in->cbegin();
+    decode(id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -5037,9 +7049,9 @@ int trash_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   uint64_t max_return;
 
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(start_after, iter);
-    ::decode(max_return, iter);
+    auto iter = in->cbegin();
+    decode(start_after, iter);
+    decode(max_return, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -5056,8 +7068,10 @@ int trash_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     int r = cls_cxx_map_get_vals(hctx, last_read, trash::IMAGE_KEY_PREFIX,
                                  max_read, &raw_data, &more);
     if (r < 0) {
-      CLS_ERR("failed to read the vals off of disk: %s",
-              cpp_strerror(r).c_str());
+      if (r != -ENOENT) {
+        CLS_ERR("failed to read the vals off of disk: %s",
+                cpp_strerror(r).c_str());
+      }
       return r;
     }
     if (raw_data.empty()) {
@@ -5066,7 +7080,7 @@ int trash_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 
     map<string, bufferlist>::iterator it = raw_data.begin();
     for (; it != raw_data.end(); ++it) {
-      ::decode(data[trash::image_id_from_key(it->first)], it->second);
+      decode(data[trash::image_id_from_key(it->first)], it->second);
     }
 
     if (!more) {
@@ -5076,7 +7090,7 @@ int trash_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
     last_read = raw_data.rbegin()->first;
   }
 
-  ::encode(data, *out);
+  encode(data, *out);
   return 0;
 }
 
@@ -5096,8 +7110,8 @@ int trash_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
 {
   string id;
   try {
-    bufferlist::iterator iter = in->begin();
-    ::decode(id, iter);
+    auto iter = in->cbegin();
+    decode(id, iter);
   } catch (const buffer::error &err) {
     return -EINVAL;
   }
@@ -5108,13 +7122,307 @@ int trash_get(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
   string key = trash::image_key(id);
   bufferlist bl;
   int r = cls_cxx_map_get_val(hctx, key, out);
-  if (r != -ENOENT) {
+  if (r < 0 && r != -ENOENT) {
     CLS_ERR("error reading image from trash '%s': '%s'", id.c_str(),
             cpp_strerror(r).c_str());
   }
   return r;
 }
 
+/**
+ * Set state of an image in the rbd_trash object.
+ *
+ * Input:
+ * @param id the id of the image
+ * @param trash_state the state of the image to be set
+ * @param expect_state the expected state of the image
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int trash_state_set(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  string id;
+  cls::rbd::TrashImageState trash_state;
+  cls::rbd::TrashImageState expect_state;
+  try {
+    bufferlist::const_iterator iter = in->begin();
+    decode(id, iter);
+    decode(trash_state, iter);
+    decode(expect_state, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  CLS_LOG(20, "trash_state_set id=%s", id.c_str());
+
+  string key = trash::image_key(id);
+  cls::rbd::TrashImageSpec trash_spec;
+  int r = read_key(hctx, key, &trash_spec);
+  if (r < 0) {
+    if (r != -ENOENT) {
+      CLS_ERR("Could not read trash image spec off disk: %s",
+              cpp_strerror(r).c_str());
+    }
+    return r;
+  }
+
+  if (trash_spec.state == expect_state) {
+    trash_spec.state = trash_state;
+    r = write_key(hctx, key, trash_spec);
+    if (r < 0) {
+      CLS_ERR("error setting trash image state: %s", cpp_strerror(r).c_str());
+      return r;
+    }
+
+    return 0;
+  } else if (trash_spec.state == trash_state) {
+    return 0;
+  } else {
+    CLS_ERR("Current trash state: %d do not match expected: %d or set: %d",
+            trash_spec.state, expect_state, trash_state);
+    return -ESTALE;
+  }
+}
+
+namespace nspace {
+
+const std::string NAME_KEY_PREFIX("name_");
+
+std::string key_for_name(const std::string& name) {
+  return NAME_KEY_PREFIX + name;
+}
+
+std::string name_from_key(const std::string &key) {
+  return key.substr(NAME_KEY_PREFIX.size());
+}
+
+} // namespace nspace
+
+/**
+ * Add a namespace to the namespace directory.
+ *
+ * Input:
+ * @param name the name of the namespace
+ *
+ * Output:
+ * @returns -EEXIST if the namespace is already exists
+ * @returns 0 on success, negative error code on failure
+ */
+int namespace_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  std::string name;
+  try {
+    auto iter = in->cbegin();
+    decode(name, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  std::string key(nspace::key_for_name(name));
+  bufferlist value;
+  int r = cls_cxx_map_get_val(hctx, key, &value);
+  if (r < 0 && r != -ENOENT) {
+    return r;
+  } else if (r == 0) {
+    return -EEXIST;
+  }
+
+  r = cls_cxx_map_set_val(hctx, key, &value);
+  if (r < 0) {
+    CLS_ERR("failed to set omap key: %s", key.c_str());
+    return r;
+  }
+
+  return 0;
+}
+
+/**
+ * Remove a namespace from the namespace directory.
+ *
+ * Input:
+ * @param name the name of the namespace
+ *
+ * Output:
+ * @returns 0 on success, negative error code on failure
+ */
+int namespace_remove(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  std::string name;
+  try {
+    auto iter = in->cbegin();
+    decode(name, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  std::string key(nspace::key_for_name(name));
+  bufferlist bl;
+  int r = cls_cxx_map_get_val(hctx, key, &bl);
+  if (r < 0) {
+    return r;
+  }
+
+  r = cls_cxx_map_remove_key(hctx, key);
+  if (r < 0) {
+    return r;
+  }
+
+  return 0;
+}
+
+/**
+ * Returns the list of namespaces in the rbd_namespace object
+ *
+ * Input:
+ * @param start_after which name to begin listing after
+ *        (use the empty string to start at the beginning)
+ * @param max_return the maximum number of names to list
+ *
+ * Output:
+ * @param data list of namespace names
+ * @returns 0 on success, negative error code on failure
+ */
+int namespace_list(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  string start_after;
+  uint64_t max_return;
+  try {
+    auto iter = in->cbegin();
+    decode(start_after, iter);
+    decode(max_return, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  std::list<std::string> data;
+  std::string last_read = nspace::key_for_name(start_after);
+  bool more = true;
+
+  CLS_LOG(20, "namespace_list");
+  while (data.size() < max_return) {
+    std::map<std::string, bufferlist> raw_data;
+    int max_read = std::min<int32_t>(RBD_MAX_KEYS_READ,
+                                     max_return - data.size());
+    int r = cls_cxx_map_get_vals(hctx, last_read, nspace::NAME_KEY_PREFIX,
+                                 max_read, &raw_data, &more);
+    if (r < 0) {
+      if (r != -ENOENT) {
+        CLS_ERR("failed to read the vals off of disk: %s",
+                cpp_strerror(r).c_str());
+      }
+      return r;
+    }
+
+    for (auto& it : raw_data) {
+      data.push_back(nspace::name_from_key(it.first));
+    }
+
+    if (raw_data.empty() || !more) {
+      break;
+    }
+
+    last_read = raw_data.rbegin()->first;
+  }
+
+  encode(data, *out);
+  return 0;
+}
+
+/**
+ *  Reclaim space for zeroed extents
+ *
+ * Input:
+ * @param sparse_size minimal zeroed block to sparse
+ * @param remove_empty boolean, true if the object should be removed if empty
+ *
+ * Output:
+ * @returns -ENOENT if the object does not exist or has been removed
+ * @returns 0 on success, negative error code on failure
+ */
+int sparsify(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+  size_t sparse_size;
+  bool remove_empty;
+  try {
+    auto iter = in->cbegin();
+    decode(sparse_size, iter);
+    decode(remove_empty, iter);
+  } catch (const buffer::error &err) {
+    return -EINVAL;
+  }
+
+  int r = check_exists(hctx);
+  if (r < 0) {
+    return r;
+  }
+
+  bufferlist bl;
+  r = cls_cxx_read(hctx, 0, 0, &bl);
+  if (r < 0) {
+    CLS_ERR("failed to read data off of disk: %s", cpp_strerror(r).c_str());
+    return r;
+  }
+
+  if (bl.is_zero()) {
+    if (remove_empty) {
+      CLS_LOG(20, "remove");
+      r = cls_cxx_remove(hctx);
+      if (r < 0) {
+        CLS_ERR("remove failed: %s", cpp_strerror(r).c_str());
+        return r;
+      }
+    } else if (bl.length() > 0) {
+      CLS_LOG(20, "truncate");
+      bufferlist write_bl;
+      r = cls_cxx_replace(hctx, 0, 0, &write_bl);
+      if (r < 0) {
+        CLS_ERR("truncate failed: %s", cpp_strerror(r).c_str());
+        return r;
+      }
+    } else {
+      CLS_LOG(20, "skip empty");
+    }
+    return 0;
+  }
+
+  bl.rebuild(buffer::ptr_node::create(bl.length()));
+  size_t write_offset = 0;
+  size_t write_length = 0;
+  size_t offset = 0;
+  size_t length = bl.length();
+  const auto& ptr = bl.front();
+  bool replace = true;
+  while (offset < length) {
+    if (calc_sparse_extent(ptr, sparse_size, length, &write_offset,
+                           &write_length, &offset)) {
+      if (write_offset == 0 && write_length == length) {
+        CLS_LOG(20, "nothing to do");
+        return 0;
+      }
+      CLS_LOG(20, "write%s %" PRIu64 "~%" PRIu64, (replace ? "(replace)" : ""),
+              write_offset, write_length);
+      bufferlist write_bl;
+      write_bl.push_back(buffer::ptr_node::create(ptr, write_offset,
+                                                  write_length));
+      if (replace) {
+        r = cls_cxx_replace(hctx, write_offset, write_length, &write_bl);
+        replace = false;
+      } else {
+        r = cls_cxx_write(hctx, write_offset, write_length, &write_bl);
+      }
+      if (r < 0) {
+        CLS_ERR("write failed: %s", cpp_strerror(r).c_str());
+        return r;
+      }
+      write_offset = offset;
+      write_length = 0;
+    }
+  }
+
+  return 0;
+}
+
 CLS_INIT(rbd)
 {
   CLS_LOG(20, "Loaded rbd class!");
@@ -5127,14 +7435,22 @@ CLS_INIT(rbd)
   cls_method_handle_t h_set_size;
   cls_method_handle_t h_get_parent;
   cls_method_handle_t h_set_parent;
+  cls_method_handle_t h_remove_parent;
+  cls_method_handle_t h_parent_get;
+  cls_method_handle_t h_parent_overlap_get;
+  cls_method_handle_t h_parent_attach;
+  cls_method_handle_t h_parent_detach;
   cls_method_handle_t h_get_protection_status;
   cls_method_handle_t h_set_protection_status;
   cls_method_handle_t h_get_stripe_unit_count;
   cls_method_handle_t h_set_stripe_unit_count;
   cls_method_handle_t h_get_create_timestamp;
+  cls_method_handle_t h_get_access_timestamp;
+  cls_method_handle_t h_get_modify_timestamp;
   cls_method_handle_t h_get_flags;
   cls_method_handle_t h_set_flags;
-  cls_method_handle_t h_remove_parent;
+  cls_method_handle_t h_op_features_get;
+  cls_method_handle_t h_op_features_set;
   cls_method_handle_t h_add_child;
   cls_method_handle_t h_remove_child;
   cls_method_handle_t h_get_children;
@@ -5142,21 +7458,25 @@ CLS_INIT(rbd)
   cls_method_handle_t h_get_object_prefix;
   cls_method_handle_t h_get_data_pool;
   cls_method_handle_t h_get_snapshot_name;
-  cls_method_handle_t h_get_snapshot_namespace;
   cls_method_handle_t h_get_snapshot_timestamp;
+  cls_method_handle_t h_snapshot_get;
   cls_method_handle_t h_snapshot_add;
   cls_method_handle_t h_snapshot_remove;
   cls_method_handle_t h_snapshot_rename;
+  cls_method_handle_t h_snapshot_trash_add;
   cls_method_handle_t h_get_all_features;
-  cls_method_handle_t h_copyup;
   cls_method_handle_t h_get_id;
   cls_method_handle_t h_set_id;
+  cls_method_handle_t h_set_modify_timestamp;
+  cls_method_handle_t h_set_access_timestamp;
   cls_method_handle_t h_dir_get_id;
   cls_method_handle_t h_dir_get_name;
   cls_method_handle_t h_dir_list;
   cls_method_handle_t h_dir_add_image;
   cls_method_handle_t h_dir_remove_image;
   cls_method_handle_t h_dir_rename_image;
+  cls_method_handle_t h_dir_state_assert;
+  cls_method_handle_t h_dir_state_set;
   cls_method_handle_t h_object_map_load;
   cls_method_handle_t h_object_map_save;
   cls_method_handle_t h_object_map_resize;
@@ -5169,6 +7489,13 @@ CLS_INIT(rbd)
   cls_method_handle_t h_metadata_get;
   cls_method_handle_t h_snapshot_get_limit;
   cls_method_handle_t h_snapshot_set_limit;
+  cls_method_handle_t h_child_attach;
+  cls_method_handle_t h_child_detach;
+  cls_method_handle_t h_children_list;
+  cls_method_handle_t h_migration_set;
+  cls_method_handle_t h_migration_set_state;
+  cls_method_handle_t h_migration_get;
+  cls_method_handle_t h_migration_remove;
   cls_method_handle_t h_old_snapshots_list;
   cls_method_handle_t h_old_snapshot_add;
   cls_method_handle_t h_old_snapshot_remove;
@@ -5193,23 +7520,39 @@ CLS_INIT(rbd)
   cls_method_handle_t h_mirror_image_status_list;
   cls_method_handle_t h_mirror_image_status_get_summary;
   cls_method_handle_t h_mirror_image_status_remove_down;
+  cls_method_handle_t h_mirror_image_instance_get;
+  cls_method_handle_t h_mirror_image_instance_list;
   cls_method_handle_t h_mirror_instances_list;
   cls_method_handle_t h_mirror_instances_add;
   cls_method_handle_t h_mirror_instances_remove;
-  cls_method_handle_t h_group_create;
+  cls_method_handle_t h_mirror_image_map_list;
+  cls_method_handle_t h_mirror_image_map_update;
+  cls_method_handle_t h_mirror_image_map_remove;
   cls_method_handle_t h_group_dir_list;
   cls_method_handle_t h_group_dir_add;
   cls_method_handle_t h_group_dir_remove;
+  cls_method_handle_t h_group_dir_rename;
   cls_method_handle_t h_group_image_remove;
   cls_method_handle_t h_group_image_list;
   cls_method_handle_t h_group_image_set;
-  cls_method_handle_t h_image_add_group;
-  cls_method_handle_t h_image_remove_group;
-  cls_method_handle_t h_image_get_group;
+  cls_method_handle_t h_image_group_add;
+  cls_method_handle_t h_image_group_remove;
+  cls_method_handle_t h_image_group_get;
+  cls_method_handle_t h_group_snap_set;
+  cls_method_handle_t h_group_snap_remove;
+  cls_method_handle_t h_group_snap_get_by_id;
+  cls_method_handle_t h_group_snap_list;
   cls_method_handle_t h_trash_add;
   cls_method_handle_t h_trash_remove;
   cls_method_handle_t h_trash_list;
   cls_method_handle_t h_trash_get;
+  cls_method_handle_t h_trash_state_set;
+  cls_method_handle_t h_namespace_add;
+  cls_method_handle_t h_namespace_remove;
+  cls_method_handle_t h_namespace_list;
+  cls_method_handle_t h_copyup;
+  cls_method_handle_t h_assert_snapc_seq;
+  cls_method_handle_t h_sparsify;
 
   cls_register("rbd", &h_class);
   cls_register_cxx_method(h_class, "create",
@@ -5238,12 +7581,12 @@ CLS_INIT(rbd)
   cls_register_cxx_method(h_class, "get_snapshot_name",
                          CLS_METHOD_RD,
                          get_snapshot_name, &h_get_snapshot_name);
-  cls_register_cxx_method(h_class, "get_snapshot_namespace",
-                         CLS_METHOD_RD,
-                         get_snapshot_namespace, &h_get_snapshot_namespace);
   cls_register_cxx_method(h_class, "get_snapshot_timestamp",
                          CLS_METHOD_RD,
                          get_snapshot_timestamp, &h_get_snapshot_timestamp);
+  cls_register_cxx_method(h_class, "snapshot_get",
+                          CLS_METHOD_RD,
+                          snapshot_get, &h_snapshot_get);
   cls_register_cxx_method(h_class, "snapshot_add",
                          CLS_METHOD_RD | CLS_METHOD_WR,
                          snapshot_add, &h_snapshot_add);
@@ -5253,12 +7596,14 @@ CLS_INIT(rbd)
   cls_register_cxx_method(h_class, "snapshot_rename",
                          CLS_METHOD_RD | CLS_METHOD_WR,
                          snapshot_rename, &h_snapshot_rename);
+  cls_register_cxx_method(h_class, "snapshot_trash_add",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          snapshot_trash_add, &h_snapshot_trash_add);
   cls_register_cxx_method(h_class, "get_all_features",
                          CLS_METHOD_RD,
                          get_all_features, &h_get_all_features);
-  cls_register_cxx_method(h_class, "copyup",
-                         CLS_METHOD_RD | CLS_METHOD_WR,
-                         copyup, &h_copyup);
+
+  // NOTE: deprecate v1 parent APIs after mimic EOLed
   cls_register_cxx_method(h_class, "get_parent",
                          CLS_METHOD_RD,
                          get_parent, &h_get_parent);
@@ -5268,6 +7613,19 @@ CLS_INIT(rbd)
   cls_register_cxx_method(h_class, "remove_parent",
                          CLS_METHOD_RD | CLS_METHOD_WR,
                          remove_parent, &h_remove_parent);
+
+  cls_register_cxx_method(h_class, "parent_get",
+                          CLS_METHOD_RD, parent_get, &h_parent_get);
+  cls_register_cxx_method(h_class, "parent_overlap_get",
+                          CLS_METHOD_RD, parent_overlap_get,
+                          &h_parent_overlap_get);
+  cls_register_cxx_method(h_class, "parent_attach",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          parent_attach, &h_parent_attach);
+  cls_register_cxx_method(h_class, "parent_detach",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          parent_detach, &h_parent_detach);
+
   cls_register_cxx_method(h_class, "set_protection_status",
                          CLS_METHOD_RD | CLS_METHOD_WR,
                          set_protection_status, &h_set_protection_status);
@@ -5283,12 +7641,23 @@ CLS_INIT(rbd)
   cls_register_cxx_method(h_class, "get_create_timestamp",
                           CLS_METHOD_RD,
                           get_create_timestamp, &h_get_create_timestamp);
+  cls_register_cxx_method(h_class, "get_access_timestamp",
+                          CLS_METHOD_RD,
+                          get_access_timestamp, &h_get_access_timestamp);
+  cls_register_cxx_method(h_class, "get_modify_timestamp",
+                          CLS_METHOD_RD,
+                          get_modify_timestamp, &h_get_modify_timestamp);
   cls_register_cxx_method(h_class, "get_flags",
                           CLS_METHOD_RD,
                           get_flags, &h_get_flags);
   cls_register_cxx_method(h_class, "set_flags",
                           CLS_METHOD_RD | CLS_METHOD_WR,
                           set_flags, &h_set_flags);
+  cls_register_cxx_method(h_class, "op_features_get", CLS_METHOD_RD,
+                          op_features_get, &h_op_features_get);
+  cls_register_cxx_method(h_class, "op_features_set",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          op_features_set, &h_op_features_set);
   cls_register_cxx_method(h_class, "metadata_list",
                           CLS_METHOD_RD,
                          metadata_list, &h_metadata_list);
@@ -5305,8 +7674,37 @@ CLS_INIT(rbd)
                          CLS_METHOD_RD,
                          snapshot_get_limit, &h_snapshot_get_limit);
   cls_register_cxx_method(h_class, "snapshot_set_limit",
-                         CLS_METHOD_WR,
+                         CLS_METHOD_RD | CLS_METHOD_WR,
                          snapshot_set_limit, &h_snapshot_set_limit);
+  cls_register_cxx_method(h_class, "child_attach",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          child_attach, &h_child_attach);
+  cls_register_cxx_method(h_class, "child_detach",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          child_detach, &h_child_detach);
+  cls_register_cxx_method(h_class, "children_list",
+                          CLS_METHOD_RD,
+                          children_list, &h_children_list);
+  cls_register_cxx_method(h_class, "migration_set",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          migration_set, &h_migration_set);
+  cls_register_cxx_method(h_class, "migration_set_state",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          migration_set_state, &h_migration_set_state);
+  cls_register_cxx_method(h_class, "migration_get",
+                          CLS_METHOD_RD,
+                          migration_get, &h_migration_get);
+  cls_register_cxx_method(h_class, "migration_remove",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          migration_remove, &h_migration_remove);
+
+  cls_register_cxx_method(h_class, "set_modify_timestamp",
+                         CLS_METHOD_RD | CLS_METHOD_WR,
+                          set_modify_timestamp, &h_set_modify_timestamp);
+
+  cls_register_cxx_method(h_class, "set_access_timestamp",
+                         CLS_METHOD_RD | CLS_METHOD_WR,
+                          set_access_timestamp, &h_set_access_timestamp);
 
   /* methods for the rbd_children object */
   cls_register_cxx_method(h_class, "add_child",
@@ -5346,6 +7744,11 @@ CLS_INIT(rbd)
   cls_register_cxx_method(h_class, "dir_rename_image",
                          CLS_METHOD_RD | CLS_METHOD_WR,
                          dir_rename_image, &h_dir_rename_image);
+  cls_register_cxx_method(h_class, "dir_state_assert", CLS_METHOD_RD,
+                          dir_state_assert, &h_dir_state_assert);
+  cls_register_cxx_method(h_class, "dir_state_set",
+                         CLS_METHOD_RD | CLS_METHOD_WR,
+                          dir_state_set, &h_dir_state_set);
 
   /* methods for the rbd_object_map.$image_id object */
   cls_register_cxx_method(h_class, "object_map_load",
@@ -5438,6 +7841,12 @@ CLS_INIT(rbd)
                           CLS_METHOD_RD | CLS_METHOD_WR,
                           mirror_image_status_remove_down,
                          &h_mirror_image_status_remove_down);
+  cls_register_cxx_method(h_class, "mirror_image_instance_get", CLS_METHOD_RD,
+                          mirror_image_instance_get,
+                          &h_mirror_image_instance_get);
+  cls_register_cxx_method(h_class, "mirror_image_instance_list", CLS_METHOD_RD,
+                          mirror_image_instance_list,
+                          &h_mirror_image_instance_list);
   cls_register_cxx_method(h_class, "mirror_instances_list", CLS_METHOD_RD,
                           mirror_instances_list, &h_mirror_instances_list);
   cls_register_cxx_method(h_class, "mirror_instances_add",
@@ -5447,10 +7856,17 @@ CLS_INIT(rbd)
                           CLS_METHOD_RD | CLS_METHOD_WR,
                           mirror_instances_remove,
                           &h_mirror_instances_remove);
-  /* methods for the consistency groups feature */
-  cls_register_cxx_method(h_class, "group_create",
-                         CLS_METHOD_RD | CLS_METHOD_WR,
-                         group_create, &h_group_create);
+  cls_register_cxx_method(h_class, "mirror_image_map_list",
+                          CLS_METHOD_RD, mirror_image_map_list,
+                          &h_mirror_image_map_list);
+  cls_register_cxx_method(h_class, "mirror_image_map_update",
+                          CLS_METHOD_WR, mirror_image_map_update,
+                          &h_mirror_image_map_update);
+  cls_register_cxx_method(h_class, "mirror_image_map_remove",
+                          CLS_METHOD_WR, mirror_image_map_remove,
+                          &h_mirror_image_map_remove);
+
+  /* methods for the groups feature */
   cls_register_cxx_method(h_class, "group_dir_list",
                          CLS_METHOD_RD,
                          group_dir_list, &h_group_dir_list);
@@ -5460,24 +7876,39 @@ CLS_INIT(rbd)
   cls_register_cxx_method(h_class, "group_dir_remove",
                          CLS_METHOD_RD | CLS_METHOD_WR,
                          group_dir_remove, &h_group_dir_remove);
+  cls_register_cxx_method(h_class, "group_dir_rename",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          group_dir_rename, &h_group_dir_rename);
   cls_register_cxx_method(h_class, "group_image_remove",
                          CLS_METHOD_RD | CLS_METHOD_WR,
                          group_image_remove, &h_group_image_remove);
   cls_register_cxx_method(h_class, "group_image_list",
-                         CLS_METHOD_RD | CLS_METHOD_WR,
+                         CLS_METHOD_RD,
                          group_image_list, &h_group_image_list);
   cls_register_cxx_method(h_class, "group_image_set",
                          CLS_METHOD_RD | CLS_METHOD_WR,
                          group_image_set, &h_group_image_set);
-  cls_register_cxx_method(h_class, "image_add_group",
+  cls_register_cxx_method(h_class, "image_group_add",
+                         CLS_METHOD_RD | CLS_METHOD_WR,
+                         image_group_add, &h_image_group_add);
+  cls_register_cxx_method(h_class, "image_group_remove",
                          CLS_METHOD_RD | CLS_METHOD_WR,
-                         image_add_group, &h_image_add_group);
-  cls_register_cxx_method(h_class, "image_remove_group",
+                         image_group_remove, &h_image_group_remove);
+  cls_register_cxx_method(h_class, "image_group_get",
+                         CLS_METHOD_RD,
+                         image_group_get, &h_image_group_get);
+  cls_register_cxx_method(h_class, "group_snap_set",
+                         CLS_METHOD_RD | CLS_METHOD_WR,
+                         group_snap_set, &h_group_snap_set);
+  cls_register_cxx_method(h_class, "group_snap_remove",
                          CLS_METHOD_RD | CLS_METHOD_WR,
-                         image_remove_group, &h_image_remove_group);
-  cls_register_cxx_method(h_class, "image_get_group",
+                         group_snap_remove, &h_group_snap_remove);
+  cls_register_cxx_method(h_class, "group_snap_get_by_id",
                          CLS_METHOD_RD,
-                         image_get_group, &h_image_get_group);
+                         group_snap_get_by_id, &h_group_snap_get_by_id);
+  cls_register_cxx_method(h_class, "group_snap_list",
+                         CLS_METHOD_RD,
+                         group_snap_list, &h_group_snap_list);
 
   /* rbd_trash object methods */
   cls_register_cxx_method(h_class, "trash_add",
@@ -5492,6 +7923,29 @@ CLS_INIT(rbd)
   cls_register_cxx_method(h_class, "trash_get",
                           CLS_METHOD_RD,
                           trash_get, &h_trash_get);
+  cls_register_cxx_method(h_class, "trash_state_set",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          trash_state_set, &h_trash_state_set);
 
-  return;
+  /* rbd_namespace object methods */
+  cls_register_cxx_method(h_class, "namespace_add",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          namespace_add, &h_namespace_add);
+  cls_register_cxx_method(h_class, "namespace_remove",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          namespace_remove, &h_namespace_remove);
+  cls_register_cxx_method(h_class, "namespace_list", CLS_METHOD_RD,
+                          namespace_list, &h_namespace_list);
+
+  /* data object methods */
+  cls_register_cxx_method(h_class, "copyup",
+                         CLS_METHOD_RD | CLS_METHOD_WR,
+                         copyup, &h_copyup);
+  cls_register_cxx_method(h_class, "assert_snapc_seq",
+                          CLS_METHOD_RD | CLS_METHOD_WR,
+                          assert_snapc_seq,
+                          &h_assert_snapc_seq);
+  cls_register_cxx_method(h_class, "sparsify",
+                         CLS_METHOD_RD | CLS_METHOD_WR,
+                         sparsify, &h_sparsify);
 }