]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blobdiff - drivers/block/rbd.c
rbd: implement feature checks
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
index 9917943a3572ef577ac7a511ef375d11dae08350..0f260a6e97c44addbf8120d131e4afb1fb3cabb9 100644 (file)
@@ -41,6 +41,8 @@
 
 #include "rbd_types.h"
 
+#define RBD_DEBUG      /* Activate rbd_assert() calls */
+
 /*
  * The basic unit of block I/O is a sector.  It is interpreted in a
  * number of contexts in Linux (blk, bio, genhd), but the default is
 #define        SECTOR_SHIFT    9
 #define        SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
 
+/* It might be useful to have this defined elsewhere too */
+
+#define        U64_MAX ((u64) (~0ULL))
+
 #define RBD_DRV_NAME "rbd"
 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
 
 #define RBD_MINORS_PER_MAJOR   256             /* max minors per blkdev */
 
 #define RBD_MAX_SNAP_NAME_LEN  32
+#define RBD_MAX_SNAP_COUNT     510     /* allows max snapc to fit in 4KB */
 #define RBD_MAX_OPT_LEN                1024
 
 #define RBD_SNAP_HEAD_NAME     "-"
 
+#define RBD_IMAGE_ID_LEN_MAX   64
+#define RBD_OBJ_PREFIX_LEN_MAX 64
+
+/* Feature bits */
+
+#define RBD_FEATURE_LAYERING      1
+
+/* Features supported by this (client software) implementation. */
+
+#define RBD_FEATURES_ALL          (0)
+
 /*
  * An RBD device name will be "rbd#", where the "rbd" comes from
  * RBD_DRV_NAME above, and # is a unique integer identifier.
 #define DEV_NAME_LEN           32
 #define MAX_INT_FORMAT_WIDTH   ((5 * sizeof (int)) / 2 + 1)
 
-#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
+#define RBD_READ_ONLY_DEFAULT          false
 
 /*
  * block device image metadata (in-memory version)
  */
 struct rbd_image_header {
-       u64 image_size;
+       /* These four fields never change for a given rbd image */
        char *object_prefix;
+       u64 features;
        __u8 obj_order;
        __u8 crypt_type;
        __u8 comp_type;
-       struct ceph_snap_context *snapc;
-       size_t snap_names_len;
-       u32 total_snaps;
 
+       /* The remaining fields need to be updated occasionally */
+       u64 image_size;
+       struct ceph_snap_context *snapc;
        char *snap_names;
        u64 *snap_sizes;
 
@@ -91,7 +110,7 @@ struct rbd_image_header {
 };
 
 struct rbd_options {
-       int     notify_timeout;
+       bool    read_only;
 };
 
 /*
@@ -99,7 +118,6 @@ struct rbd_options {
  */
 struct rbd_client {
        struct ceph_client      *client;
-       struct rbd_options      *rbd_opts;
        struct kref             kref;
        struct list_head        node;
 };
@@ -141,6 +159,16 @@ struct rbd_snap {
        u64                     size;
        struct list_head        node;
        u64                     id;
+       u64                     features;
+};
+
+struct rbd_mapping {
+       char                    *snap_name;
+       u64                     snap_id;
+       u64                     size;
+       u64                     features;
+       bool                    snap_exists;
+       bool                    read_only;
 };
 
 /*
@@ -151,8 +179,9 @@ struct rbd_device {
 
        int                     major;          /* blkdev assigned major */
        struct gendisk          *disk;          /* blkdev's gendisk and rq */
-       struct request_queue    *q;
 
+       u32                     image_format;   /* Either 1 or 2 */
+       struct rbd_options      rbd_opts;
        struct rbd_client       *rbd_client;
 
        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -160,6 +189,8 @@ struct rbd_device {
        spinlock_t              lock;           /* queue lock */
 
        struct rbd_image_header header;
+       char                    *image_id;
+       size_t                  image_id_len;
        char                    *image_name;
        size_t                  image_name_len;
        char                    *header_name;
@@ -171,13 +202,8 @@ struct rbd_device {
 
        /* protects updating the header */
        struct rw_semaphore     header_rwsem;
-       /* name of the snapshot this device reads from */
-       char                    *snap_name;
-       /* id of the snapshot this device reads from */
-       u64                     snap_id;        /* current snapshot id */
-       /* whether the snap_id this device reads from still exists */
-       bool                    snap_exists;
-       int                     read_only;
+
+       struct rbd_mapping      mapping;
 
        struct list_head        node;
 
@@ -196,12 +222,10 @@ static DEFINE_SPINLOCK(rbd_dev_list_lock);
 static LIST_HEAD(rbd_client_list);             /* clients */
 static DEFINE_SPINLOCK(rbd_client_list_lock);
 
-static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
+static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
+static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
+
 static void rbd_dev_release(struct device *dev);
-static ssize_t rbd_snap_add(struct device *dev,
-                           struct device_attribute *attr,
-                           const char *buf,
-                           size_t count);
 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
 
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
@@ -229,6 +253,18 @@ static struct device rbd_root_dev = {
        .release =      rbd_root_dev_release,
 };
 
+#ifdef RBD_DEBUG
+#define rbd_assert(expr)                                               \
+               if (unlikely(!(expr))) {                                \
+                       printk(KERN_ERR "\nAssertion failure in %s() "  \
+                                               "at line %d:\n\n"       \
+                                       "\trbd_assert(%s);\n\n",        \
+                                       __func__, __LINE__, #expr);     \
+                       BUG();                                          \
+               }
+#else /* !RBD_DEBUG */
+#  define rbd_assert(expr)     ((void) 0)
+#endif /* !RBD_DEBUG */
 
 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
 {
@@ -240,19 +276,19 @@ static void rbd_put_dev(struct rbd_device *rbd_dev)
        put_device(&rbd_dev->dev);
 }
 
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
 
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
 
-       rbd_get_dev(rbd_dev);
-
-       set_device_ro(bdev, rbd_dev->read_only);
-
-       if ((mode & FMODE_WRITE) && rbd_dev->read_only)
+       if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
                return -EROFS;
 
+       rbd_get_dev(rbd_dev);
+       set_device_ro(bdev, rbd_dev->mapping.read_only);
+
        return 0;
 }
 
@@ -275,8 +311,7 @@ static const struct block_device_operations rbd_bd_ops = {
  * Initialize an rbd client instance.
  * We own *ceph_opts.
  */
-static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
-                                           struct rbd_options *rbd_opts)
+static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
 {
        struct rbd_client *rbdc;
        int ret = -ENOMEM;
@@ -300,8 +335,6 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
        if (ret < 0)
                goto out_err;
 
-       rbdc->rbd_opts = rbd_opts;
-
        spin_lock(&rbd_client_list_lock);
        list_add_tail(&rbdc->node, &rbd_client_list);
        spin_unlock(&rbd_client_list_lock);
@@ -323,36 +356,52 @@ out_opt:
 }
 
 /*
- * Find a ceph client with specific addr and configuration.
+ * Find a ceph client with specific addr and configuration.  If
+ * found, bump its reference count.
  */
-static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
+static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
 {
        struct rbd_client *client_node;
+       bool found = false;
 
        if (ceph_opts->flags & CEPH_OPT_NOSHARE)
                return NULL;
 
-       list_for_each_entry(client_node, &rbd_client_list, node)
-               if (!ceph_compare_options(ceph_opts, client_node->client))
-                       return client_node;
-       return NULL;
+       spin_lock(&rbd_client_list_lock);
+       list_for_each_entry(client_node, &rbd_client_list, node) {
+               if (!ceph_compare_options(ceph_opts, client_node->client)) {
+                       kref_get(&client_node->kref);
+                       found = true;
+                       break;
+               }
+       }
+       spin_unlock(&rbd_client_list_lock);
+
+       return found ? client_node : NULL;
 }
 
 /*
  * mount options
  */
 enum {
-       Opt_notify_timeout,
        Opt_last_int,
        /* int args above */
        Opt_last_string,
        /* string args above */
+       Opt_read_only,
+       Opt_read_write,
+       /* Boolean args above */
+       Opt_last_bool,
 };
 
 static match_table_t rbd_opts_tokens = {
-       {Opt_notify_timeout, "notify_timeout=%d"},
        /* int args above */
        /* string args above */
+       {Opt_read_only, "mapping.read_only"},
+       {Opt_read_only, "ro"},          /* Alternate spelling */
+       {Opt_read_write, "read_write"},
+       {Opt_read_write, "rw"},         /* Alternate spelling */
+       /* Boolean args above */
        {-1, NULL}
 };
 
@@ -377,16 +426,22 @@ static int parse_rbd_opts_token(char *c, void *private)
        } else if (token > Opt_last_int && token < Opt_last_string) {
                dout("got string token %d val %s\n", token,
                     argstr[0].from);
+       } else if (token > Opt_last_string && token < Opt_last_bool) {
+               dout("got Boolean token %d\n", token);
        } else {
                dout("got token %d\n", token);
        }
 
        switch (token) {
-       case Opt_notify_timeout:
-               rbd_opts->notify_timeout = intval;
+       case Opt_read_only:
+               rbd_opts->read_only = true;
+               break;
+       case Opt_read_write:
+               rbd_opts->read_only = false;
                break;
        default:
-               BUG_ON(token);
+               rbd_assert(false);
+               break;
        }
        return 0;
 }
@@ -395,48 +450,33 @@ static int parse_rbd_opts_token(char *c, void *private)
  * Get a ceph client with specific addr and configuration, if one does
  * not exist create it.
  */
-static struct rbd_client *rbd_get_client(const char *mon_addr,
-                                        size_t mon_addr_len,
-                                        char *options)
+static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
+                               size_t mon_addr_len, char *options)
 {
-       struct rbd_client *rbdc;
+       struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
        struct ceph_options *ceph_opts;
-       struct rbd_options *rbd_opts;
-
-       rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
-       if (!rbd_opts)
-               return ERR_PTR(-ENOMEM);
+       struct rbd_client *rbdc;
 
-       rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
+       rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
 
        ceph_opts = ceph_parse_options(options, mon_addr,
                                        mon_addr + mon_addr_len,
                                        parse_rbd_opts_token, rbd_opts);
-       if (IS_ERR(ceph_opts)) {
-               kfree(rbd_opts);
-               return ERR_CAST(ceph_opts);
-       }
+       if (IS_ERR(ceph_opts))
+               return PTR_ERR(ceph_opts);
 
-       spin_lock(&rbd_client_list_lock);
-       rbdc = __rbd_client_find(ceph_opts);
+       rbdc = rbd_client_find(ceph_opts);
        if (rbdc) {
                /* using an existing client */
-               kref_get(&rbdc->kref);
-               spin_unlock(&rbd_client_list_lock);
-
                ceph_destroy_options(ceph_opts);
-               kfree(rbd_opts);
-
-               return rbdc;
+       } else {
+               rbdc = rbd_client_create(ceph_opts);
+               if (IS_ERR(rbdc))
+                       return PTR_ERR(rbdc);
        }
-       spin_unlock(&rbd_client_list_lock);
-
-       rbdc = rbd_client_create(ceph_opts, rbd_opts);
-
-       if (IS_ERR(rbdc))
-               kfree(rbd_opts);
+       rbd_dev->rbd_client = rbdc;
 
-       return rbdc;
+       return 0;
 }
 
 /*
@@ -454,7 +494,6 @@ static void rbd_client_release(struct kref *kref)
        spin_unlock(&rbd_client_list_lock);
 
        ceph_destroy_client(rbdc->client);
-       kfree(rbdc->rbd_opts);
        kfree(rbdc);
 }
 
@@ -480,10 +519,38 @@ static void rbd_coll_release(struct kref *kref)
        kfree(coll);
 }
 
+static bool rbd_image_format_valid(u32 image_format)
+{
+       return image_format == 1 || image_format == 2;
+}
+
 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
 {
-       return !memcmp(&ondisk->text,
-                       RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
+       size_t size;
+       u32 snap_count;
+
+       /* The header has to start with the magic rbd header text */
+       if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
+               return false;
+
+       /*
+        * The size of a snapshot header has to fit in a size_t, and
+        * that limits the number of snapshots.
+        */
+       snap_count = le32_to_cpu(ondisk->snap_count);
+       size = SIZE_MAX - sizeof (struct ceph_snap_context);
+       if (snap_count > size / sizeof (__le64))
+               return false;
+
+       /*
+        * Not only that, but the size of the entire the snapshot
+        * header must also be representable in a size_t.
+        */
+       size -= snap_count * sizeof (__le64);
+       if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
+               return false;
+
+       return true;
 }
 
 /*
@@ -491,179 +558,203 @@ static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
  * header.
  */
 static int rbd_header_from_disk(struct rbd_image_header *header,
-                                struct rbd_image_header_ondisk *ondisk,
-                                u32 allocated_snaps)
+                                struct rbd_image_header_ondisk *ondisk)
 {
        u32 snap_count;
+       size_t len;
+       size_t size;
+       u32 i;
 
-       if (!rbd_dev_ondisk_valid(ondisk))
-               return -ENXIO;
+       memset(header, 0, sizeof (*header));
 
        snap_count = le32_to_cpu(ondisk->snap_count);
-       if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
-                                / sizeof (u64))
-               return -EINVAL;
-       header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
-                               snap_count * sizeof(u64),
-                               GFP_KERNEL);
-       if (!header->snapc)
+
+       len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
+       header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
+       if (!header->object_prefix)
                return -ENOMEM;
+       memcpy(header->object_prefix, ondisk->object_prefix, len);
+       header->object_prefix[len] = '\0';
 
        if (snap_count) {
-               header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
-               header->snap_names = kmalloc(header->snap_names_len,
-                                            GFP_KERNEL);
+               u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
+
+               /* Save a copy of the snapshot names */
+
+               if (snap_names_len > (u64) SIZE_MAX)
+                       return -EIO;
+               header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
                if (!header->snap_names)
-                       goto err_snapc;
-               header->snap_sizes = kmalloc(snap_count * sizeof(u64),
-                                            GFP_KERNEL);
+                       goto out_err;
+               /*
+                * Note that rbd_dev_v1_header_read() guarantees
+                * the ondisk buffer we're working with has
+                * snap_names_len bytes beyond the end of the
+                * snapshot id array, this memcpy() is safe.
+                */
+               memcpy(header->snap_names, &ondisk->snaps[snap_count],
+                       snap_names_len);
+
+               /* Record each snapshot's size */
+
+               size = snap_count * sizeof (*header->snap_sizes);
+               header->snap_sizes = kmalloc(size, GFP_KERNEL);
                if (!header->snap_sizes)
-                       goto err_names;
+                       goto out_err;
+               for (i = 0; i < snap_count; i++)
+                       header->snap_sizes[i] =
+                               le64_to_cpu(ondisk->snaps[i].image_size);
        } else {
                WARN_ON(ondisk->snap_names_len);
-               header->snap_names_len = 0;
                header->snap_names = NULL;
                header->snap_sizes = NULL;
        }
 
-       header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
-                                       GFP_KERNEL);
-       if (!header->object_prefix)
-               goto err_sizes;
-
-       memcpy(header->object_prefix, ondisk->block_name,
-              sizeof(ondisk->block_name));
-       header->object_prefix[sizeof (ondisk->block_name)] = '\0';
-
-       header->image_size = le64_to_cpu(ondisk->image_size);
+       header->features = 0;   /* No features support in v1 images */
        header->obj_order = ondisk->options.order;
        header->crypt_type = ondisk->options.crypt_type;
        header->comp_type = ondisk->options.comp_type;
 
+       /* Allocate and fill in the snapshot context */
+
+       header->image_size = le64_to_cpu(ondisk->image_size);
+       size = sizeof (struct ceph_snap_context);
+       size += snap_count * sizeof (header->snapc->snaps[0]);
+       header->snapc = kzalloc(size, GFP_KERNEL);
+       if (!header->snapc)
+               goto out_err;
+
        atomic_set(&header->snapc->nref, 1);
        header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
        header->snapc->num_snaps = snap_count;
-       header->total_snaps = snap_count;
-
-       if (snap_count && allocated_snaps == snap_count) {
-               int i;
-
-               for (i = 0; i < snap_count; i++) {
-                       header->snapc->snaps[i] =
-                               le64_to_cpu(ondisk->snaps[i].id);
-                       header->snap_sizes[i] =
-                               le64_to_cpu(ondisk->snaps[i].image_size);
-               }
-
-               /* copy snapshot names */
-               memcpy(header->snap_names, &ondisk->snaps[snap_count],
-                       header->snap_names_len);
-       }
+       for (i = 0; i < snap_count; i++)
+               header->snapc->snaps[i] =
+                       le64_to_cpu(ondisk->snaps[i].id);
 
        return 0;
 
-err_sizes:
+out_err:
        kfree(header->snap_sizes);
        header->snap_sizes = NULL;
-err_names:
        kfree(header->snap_names);
        header->snap_names = NULL;
-err_snapc:
-       kfree(header->snapc);
-       header->snapc = NULL;
+       kfree(header->object_prefix);
+       header->object_prefix = NULL;
 
        return -ENOMEM;
 }
 
-static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
-                       u64 *seq, u64 *size)
+static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
 {
-       int i;
-       char *p = header->snap_names;
 
-       for (i = 0; i < header->total_snaps; i++) {
-               if (!strcmp(snap_name, p)) {
+       struct rbd_snap *snap;
 
-                       /* Found it.  Pass back its id and/or size */
+       list_for_each_entry(snap, &rbd_dev->snaps, node) {
+               if (!strcmp(snap_name, snap->name)) {
+                       rbd_dev->mapping.snap_id = snap->id;
+                       rbd_dev->mapping.size = snap->size;
+                       rbd_dev->mapping.features = snap->features;
 
-                       if (seq)
-                               *seq = header->snapc->snaps[i];
-                       if (size)
-                               *size = header->snap_sizes[i];
-                       return i;
+                       return 0;
                }
-               p += strlen(p) + 1;     /* Skip ahead to the next name */
        }
+
        return -ENOENT;
 }
 
-static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
+static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
 {
        int ret;
 
-       down_write(&rbd_dev->header_rwsem);
-
-       if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
+       if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
                    sizeof (RBD_SNAP_HEAD_NAME))) {
-               rbd_dev->snap_id = CEPH_NOSNAP;
-               rbd_dev->snap_exists = false;
-               rbd_dev->read_only = 0;
-               if (size)
-                       *size = rbd_dev->header.image_size;
+               rbd_dev->mapping.snap_id = CEPH_NOSNAP;
+               rbd_dev->mapping.size = rbd_dev->header.image_size;
+               rbd_dev->mapping.features = rbd_dev->header.features;
+               rbd_dev->mapping.snap_exists = false;
+               rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
+               ret = 0;
        } else {
-               u64 snap_id = 0;
-
-               ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
-                                       &snap_id, size);
+               ret = snap_by_name(rbd_dev, snap_name);
                if (ret < 0)
                        goto done;
-               rbd_dev->snap_id = snap_id;
-               rbd_dev->snap_exists = true;
-               rbd_dev->read_only = 1;
+               rbd_dev->mapping.snap_exists = true;
+               rbd_dev->mapping.read_only = true;
        }
-
-       ret = 0;
+       rbd_dev->mapping.snap_name = snap_name;
 done:
-       up_write(&rbd_dev->header_rwsem);
        return ret;
 }
 
 static void rbd_header_free(struct rbd_image_header *header)
 {
        kfree(header->object_prefix);
+       header->object_prefix = NULL;
        kfree(header->snap_sizes);
+       header->snap_sizes = NULL;
        kfree(header->snap_names);
+       header->snap_names = NULL;
        ceph_put_snap_context(header->snapc);
+       header->snapc = NULL;
 }
 
-/*
- * get the actual striped segment name, offset and length
- */
-static u64 rbd_get_segment(struct rbd_image_header *header,
-                          const char *object_prefix,
-                          u64 ofs, u64 len,
-                          char *seg_name, u64 *segofs)
+static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
 {
-       u64 seg = ofs >> header->obj_order;
+       char *name;
+       u64 segment;
+       int ret;
+
+       name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
+       if (!name)
+               return NULL;
+       segment = offset >> rbd_dev->header.obj_order;
+       ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
+                       rbd_dev->header.object_prefix, segment);
+       if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
+               pr_err("error formatting segment name for #%llu (%d)\n",
+                       segment, ret);
+               kfree(name);
+               name = NULL;
+       }
+
+       return name;
+}
+
+static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
+{
+       u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
+
+       return offset & (segment_size - 1);
+}
 
-       if (seg_name)
-               snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
-                        "%s.%012llx", object_prefix, seg);
+static u64 rbd_segment_length(struct rbd_device *rbd_dev,
+                               u64 offset, u64 length)
+{
+       u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
 
-       ofs = ofs & ((1 << header->obj_order) - 1);
-       len = min_t(u64, len, (1 << header->obj_order) - ofs);
+       offset &= segment_size - 1;
 
-       if (segofs)
-               *segofs = ofs;
+       rbd_assert(length <= U64_MAX - offset);
+       if (offset + length > segment_size)
+               length = segment_size - offset;
 
-       return len;
+       return length;
 }
 
 static int rbd_get_num_segments(struct rbd_image_header *header,
                                u64 ofs, u64 len)
 {
-       u64 start_seg = ofs >> header->obj_order;
-       u64 end_seg = (ofs + len - 1) >> header->obj_order;
+       u64 start_seg;
+       u64 end_seg;
+
+       if (!len)
+               return 0;
+       if (len - 1 > U64_MAX - ofs)
+               return -ERANGE;
+
+       start_seg = ofs >> header->obj_order;
+       end_seg = (ofs + len - 1) >> header->obj_order;
+
        return end_seg - start_seg + 1;
 }
 
@@ -725,7 +816,9 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
                                   struct bio_pair **bp,
                                   int len, gfp_t gfpmask)
 {
-       struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
+       struct bio *old_chain = *old;
+       struct bio *new_chain = NULL;
+       struct bio *tail;
        int total = 0;
 
        if (*bp) {
@@ -734,9 +827,12 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
        }
 
        while (old_chain && (total < len)) {
+               struct bio *tmp;
+
                tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
                if (!tmp)
                        goto err_out;
+               gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
 
                if (total + old_chain->bi_size > len) {
                        struct bio_pair *bp;
@@ -764,24 +860,18 @@ static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
                }
 
                tmp->bi_bdev = NULL;
-               gfpmask &= ~__GFP_WAIT;
                tmp->bi_next = NULL;
-
-               if (!new_chain) {
-                       new_chain = tail = tmp;
-               } else {
+               if (new_chain)
                        tail->bi_next = tmp;
-                       tail = tmp;
-               }
+               else
+                       new_chain = tmp;
+               tail = tmp;
                old_chain = old_chain->bi_next;
 
                total += tmp->bi_size;
        }
 
-       BUG_ON(total < len);
-
-       if (tail)
-               tail->bi_next = NULL;
+       rbd_assert(total == len);
 
        *old = old_chain;
 
@@ -939,8 +1029,9 @@ static int rbd_do_request(struct request *rq,
        layout->fl_stripe_count = cpu_to_le32(1);
        layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
        layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
-       ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
-                               req, ops);
+       ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
+                                  req, ops);
+       rbd_assert(ret == 0);
 
        ceph_osdc_build_request(req, ofs, &len,
                                ops,
@@ -1031,8 +1122,8 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev,
                           int flags,
                           struct ceph_osd_req_op *ops,
                           const char *object_name,
-                          u64 ofs, u64 len,
-                          char *buf,
+                          u64 ofs, u64 inbound_size,
+                          char *inbound,
                           struct ceph_osd_request **linger_req,
                           u64 *ver)
 {
@@ -1040,15 +1131,15 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev,
        struct page **pages;
        int num_pages;
 
-       BUG_ON(ops == NULL);
+       rbd_assert(ops != NULL);
 
-       num_pages = calc_pages_for(ofs , len);
+       num_pages = calc_pages_for(ofs, inbound_size);
        pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
        if (IS_ERR(pages))
                return PTR_ERR(pages);
 
        ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
-                         object_name, ofs, len, NULL,
+                         object_name, ofs, inbound_size, NULL,
                          pages, num_pages,
                          flags,
                          ops,
@@ -1058,8 +1149,8 @@ static int rbd_req_sync_op(struct rbd_device *rbd_dev,
        if (ret < 0)
                goto done;
 
-       if ((flags & CEPH_OSD_FLAG_READ) && buf)
-               ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
+       if ((flags & CEPH_OSD_FLAG_READ) && inbound)
+               ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
 
 done:
        ceph_release_page_vector(pages, num_pages);
@@ -1086,14 +1177,11 @@ static int rbd_do_op(struct request *rq,
        struct ceph_osd_req_op *ops;
        u32 payload_len;
 
-       seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
+       seg_name = rbd_segment_name(rbd_dev, ofs);
        if (!seg_name)
                return -ENOMEM;
-
-       seg_len = rbd_get_segment(&rbd_dev->header,
-                                 rbd_dev->header.object_prefix,
-                                 ofs, len,
-                                 seg_name, &seg_ofs);
+       seg_len = rbd_segment_length(rbd_dev, ofs, len);
+       seg_ofs = rbd_segment_offset(rbd_dev, ofs);
 
        payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
 
@@ -1105,7 +1193,7 @@ static int rbd_do_op(struct request *rq,
        /* we've taken care of segment sizes earlier when we
           cloned the bios. We should never have a segment
           truncated at this point */
-       BUG_ON(seg_len < len);
+       rbd_assert(seg_len == len);
 
        ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
                             seg_name, seg_ofs, seg_len,
@@ -1225,7 +1313,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
        dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
                rbd_dev->header_name, (unsigned long long) notify_id,
                (unsigned int) opcode);
-       rc = rbd_refresh_header(rbd_dev, &hver);
+       rc = rbd_dev_refresh(rbd_dev, &hver);
        if (rc)
                pr_warning(RBD_DRV_NAME "%d got notification but failed to "
                           " update snaps: %d\n", rbd_dev->major, rc);
@@ -1307,89 +1395,36 @@ static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
        return ret;
 }
 
-struct rbd_notify_info {
-       struct rbd_device *rbd_dev;
-};
-
-static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
-{
-       struct rbd_device *rbd_dev = (struct rbd_device *)data;
-       if (!rbd_dev)
-               return;
-
-       dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
-                       rbd_dev->header_name, (unsigned long long) notify_id,
-                       (unsigned int) opcode);
-}
-
-/*
- * Request sync osd notify
- */
-static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
-{
-       struct ceph_osd_req_op *ops;
-       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-       struct ceph_osd_event *event;
-       struct rbd_notify_info info;
-       int payload_len = sizeof(u32) + sizeof(u32);
-       int ret;
-
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
-       if (!ops)
-               return -ENOMEM;
-
-       info.rbd_dev = rbd_dev;
-
-       ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
-                                    (void *)&info, &event);
-       if (ret < 0)
-               goto fail;
-
-       ops[0].watch.ver = 1;
-       ops[0].watch.flag = 1;
-       ops[0].watch.cookie = event->cookie;
-       ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
-       ops[0].watch.timeout = 12;
-
-       ret = rbd_req_sync_op(rbd_dev, NULL,
-                              CEPH_NOSNAP,
-                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-                              ops,
-                              rbd_dev->header_name,
-                              0, 0, NULL, NULL, NULL);
-       if (ret < 0)
-               goto fail_event;
-
-       ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
-       dout("ceph_osdc_wait_event returned %d\n", ret);
-       rbd_destroy_ops(ops);
-       return 0;
-
-fail_event:
-       ceph_osdc_cancel_event(event);
-fail:
-       rbd_destroy_ops(ops);
-       return ret;
-}
-
 /*
- * Request sync osd read
+ * Synchronous osd object method call
  */
 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
                             const char *object_name,
                             const char *class_name,
                             const char *method_name,
-                            const char *data,
-                            int len,
+                            const char *outbound,
+                            size_t outbound_size,
+                            char *inbound,
+                            size_t inbound_size,
+                            int flags,
                             u64 *ver)
 {
        struct ceph_osd_req_op *ops;
        int class_name_len = strlen(class_name);
        int method_name_len = strlen(method_name);
+       int payload_size;
        int ret;
 
-       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
-                                   class_name_len + method_name_len + len);
+       /*
+        * Any input parameters required by the method we're calling
+        * will be sent along with the class and method names as
+        * part of the message payload.  That data and its size are
+        * supplied via the indata and indata_len fields (named from
+        * the perspective of the server side) in the OSD request
+        * operation.
+        */
+       payload_size = class_name_len + method_name_len + outbound_size;
+       ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
        if (!ops)
                return -ENOMEM;
 
@@ -1398,14 +1433,14 @@ static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
        ops[0].cls.method_name = method_name;
        ops[0].cls.method_len = (__u8) method_name_len;
        ops[0].cls.argc = 0;
-       ops[0].cls.indata = data;
-       ops[0].cls.indata_len = len;
+       ops[0].cls.indata = outbound;
+       ops[0].cls.indata_len = outbound_size;
 
        ret = rbd_req_sync_op(rbd_dev, NULL,
                               CEPH_NOSNAP,
-                              CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-                              ops,
-                              object_name, 0, 0, NULL, NULL, ver);
+                              flags, ops,
+                              object_name, 0, inbound_size, inbound,
+                              NULL, ver);
 
        rbd_destroy_ops(ops);
 
@@ -1447,10 +1482,6 @@ static void rbd_rq_fn(struct request_queue *q)
                struct rbd_req_coll *coll;
                struct ceph_snap_context *snapc;
 
-               /* peek at request from block layer */
-               if (!rq)
-                       break;
-
                dout("fetched request\n");
 
                /* filter out block requests we don't understand */
@@ -1465,7 +1496,7 @@ static void rbd_rq_fn(struct request_queue *q)
                size = blk_rq_bytes(rq);
                ofs = blk_rq_pos(rq) * SECTOR_SIZE;
                rq_bio = rq->bio;
-               if (do_write && rbd_dev->read_only) {
+               if (do_write && rbd_dev->mapping.read_only) {
                        __blk_end_request_all(rq, -EROFS);
                        continue;
                }
@@ -1474,7 +1505,8 @@ static void rbd_rq_fn(struct request_queue *q)
 
                down_read(&rbd_dev->header_rwsem);
 
-               if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
+               if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
+                               !rbd_dev->mapping.snap_exists) {
                        up_read(&rbd_dev->header_rwsem);
                        dout("request for non-existent snapshot");
                        spin_lock_irq(q->queue_lock);
@@ -1491,6 +1523,12 @@ static void rbd_rq_fn(struct request_queue *q)
                     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
 
                num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
+               if (num_segs <= 0) {
+                       spin_lock_irq(q->queue_lock);
+                       __blk_end_request_all(rq, num_segs);
+                       ceph_put_snap_context(snapc);
+                       continue;
+               }
                coll = rbd_alloc_coll(num_segs);
                if (!coll) {
                        spin_lock_irq(q->queue_lock);
@@ -1502,10 +1540,7 @@ static void rbd_rq_fn(struct request_queue *q)
                do {
                        /* a bio clone to be passed down to OSD req */
                        dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
-                       op_size = rbd_get_segment(&rbd_dev->header,
-                                                 rbd_dev->header.object_prefix,
-                                                 ofs, size,
-                                                 NULL, NULL);
+                       op_size = rbd_segment_length(rbd_dev, ofs, size);
                        kref_get(&coll->kref);
                        bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
                                              op_size, GFP_ATOMIC);
@@ -1525,7 +1560,7 @@ static void rbd_rq_fn(struct request_queue *q)
                                              coll, cur_seg);
                        else
                                rbd_req_read(rq, rbd_dev,
-                                            rbd_dev->snap_id,
+                                            rbd_dev->mapping.snap_id,
                                             ofs,
                                             op_size, bio,
                                             coll, cur_seg);
@@ -1581,8 +1616,6 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
        if (!disk)
                return;
 
-       rbd_header_free(&rbd_dev->header);
-
        if (disk->flags & GENHD_FL_UP)
                del_gendisk(disk);
        if (disk->queue)
@@ -1591,105 +1624,96 @@ static void rbd_free_disk(struct rbd_device *rbd_dev)
 }
 
 /*
- * reload the ondisk the header 
+ * Read the complete header for the given rbd device.
+ *
+ * Returns a pointer to a dynamically-allocated buffer containing
+ * the complete and validated header.  Caller can pass the address
+ * of a variable that will be filled in with the version of the
+ * header object at the time it was read.
+ *
+ * Returns a pointer-coded errno if a failure occurs.
  */
-static int rbd_read_header(struct rbd_device *rbd_dev,
-                          struct rbd_image_header *header)
+static struct rbd_image_header_ondisk *
+rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
 {
-       ssize_t rc;
-       struct rbd_image_header_ondisk *dh;
+       struct rbd_image_header_ondisk *ondisk = NULL;
        u32 snap_count = 0;
-       u64 ver;
-       size_t len;
+       u64 names_size = 0;
+       u32 want_count;
+       int ret;
 
        /*
-        * First reads the fixed-size header to determine the number
-        * of snapshots, then re-reads it, along with all snapshot
-        * records as well as their stored names.
+        * The complete header will include an array of its 64-bit
+        * snapshot ids, followed by the names of those snapshots as
+        * a contiguous block of NUL-terminated strings.  Note that
+        * the number of snapshots could change by the time we read
+        * it in, in which case we re-read it.
         */
-       len = sizeof (*dh);
-       while (1) {
-               dh = kmalloc(len, GFP_KERNEL);
-               if (!dh)
-                       return -ENOMEM;
-
-               rc = rbd_req_sync_read(rbd_dev,
-                                      CEPH_NOSNAP,
+       do {
+               size_t size;
+
+               kfree(ondisk);
+
+               size = sizeof (*ondisk);
+               size += snap_count * sizeof (struct rbd_image_snap_ondisk);
+               size += names_size;
+               ondisk = kmalloc(size, GFP_KERNEL);
+               if (!ondisk)
+                       return ERR_PTR(-ENOMEM);
+
+               ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
                                       rbd_dev->header_name,
-                                      0, len,
-                                      (char *)dh, &ver);
-               if (rc < 0)
-                       goto out_dh;
-
-               rc = rbd_header_from_disk(header, dh, snap_count);
-               if (rc < 0) {
-                       if (rc == -ENXIO)
-                               pr_warning("unrecognized header format"
-                                          " for image %s\n",
-                                          rbd_dev->image_name);
-                       goto out_dh;
+                                      0, size,
+                                      (char *) ondisk, version);
+
+               if (ret < 0)
+                       goto out_err;
+               if (WARN_ON((size_t) ret < size)) {
+                       ret = -ENXIO;
+                       pr_warning("short header read for image %s"
+                                       " (want %zd got %d)\n",
+                               rbd_dev->image_name, size, ret);
+                       goto out_err;
+               }
+               if (!rbd_dev_ondisk_valid(ondisk)) {
+                       ret = -ENXIO;
+                       pr_warning("invalid header for image %s\n",
+                               rbd_dev->image_name);
+                       goto out_err;
                }
 
-               if (snap_count == header->total_snaps)
-                       break;
+               names_size = le64_to_cpu(ondisk->snap_names_len);
+               want_count = snap_count;
+               snap_count = le32_to_cpu(ondisk->snap_count);
+       } while (snap_count != want_count);
 
-               snap_count = header->total_snaps;
-               len = sizeof (*dh) +
-                       snap_count * sizeof(struct rbd_image_snap_ondisk) +
-                       header->snap_names_len;
+       return ondisk;
 
-               rbd_header_free(header);
-               kfree(dh);
-       }
-       header->obj_version = ver;
+out_err:
+       kfree(ondisk);
 
-out_dh:
-       kfree(dh);
-       return rc;
+       return ERR_PTR(ret);
 }
 
 /*
- * create a snapshot
+ * reload the ondisk the header
  */
-static int rbd_header_add_snap(struct rbd_device *rbd_dev,
-                              const char *snap_name,
-                              gfp_t gfp_flags)
+static int rbd_read_header(struct rbd_device *rbd_dev,
+                          struct rbd_image_header *header)
 {
-       int name_len = strlen(snap_name);
-       u64 new_snapid;
+       struct rbd_image_header_ondisk *ondisk;
+       u64 ver = 0;
        int ret;
-       void *data, *p, *e;
-       struct ceph_mon_client *monc;
 
-       /* we should create a snapshot only if we're pointing at the head */
-       if (rbd_dev->snap_id != CEPH_NOSNAP)
-               return -EINVAL;
-
-       monc = &rbd_dev->rbd_client->client->monc;
-       ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
-       dout("created snapid=%llu\n", (unsigned long long) new_snapid);
-       if (ret < 0)
-               return ret;
-
-       data = kmalloc(name_len + 16, gfp_flags);
-       if (!data)
-               return -ENOMEM;
-
-       p = data;
-       e = data + name_len + 16;
-
-       ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
-       ceph_encode_64_safe(&p, e, new_snapid, bad);
+       ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
+       if (IS_ERR(ondisk))
+               return PTR_ERR(ondisk);
+       ret = rbd_header_from_disk(header, ondisk);
+       if (ret >= 0)
+               header->obj_version = ver;
+       kfree(ondisk);
 
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
-                               "rbd", "snap_add",
-                               data, p - data, NULL);
-
-       kfree(data);
-
-       return ret < 0 ? ret : 0;
-bad:
-       return -ERANGE;
+       return ret;
 }
 
 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
@@ -1701,10 +1725,23 @@ static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
                __rbd_remove_snap_dev(snap);
 }
 
+static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
+{
+       sector_t size;
+
+       if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
+               return;
+
+       size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
+       dout("setting size to %llu sectors", (unsigned long long) size);
+       rbd_dev->mapping.size = (u64) size;
+       set_capacity(rbd_dev->disk, size);
+}
+
 /*
  * only read the first part of the ondisk header, without the snaps info
  */
-static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
 {
        int ret;
        struct rbd_image_header h;
@@ -1715,13 +1752,9 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
 
        down_write(&rbd_dev->header_rwsem);
 
-       /* resized? */
-       if (rbd_dev->snap_id == CEPH_NOSNAP) {
-               sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
-
-               dout("setting size to %llu sectors", (unsigned long long) size);
-               set_capacity(rbd_dev->disk, size);
-       }
+       /* Update image size, and check for resize of mapped image */
+       rbd_dev->header.image_size = h.image_size;
+       rbd_update_mapping_size(rbd_dev);
 
        /* rbd_dev->header.object_prefix shouldn't change */
        kfree(rbd_dev->header.snap_sizes);
@@ -1733,28 +1766,32 @@ static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
                *hver = h.obj_version;
        rbd_dev->header.obj_version = h.obj_version;
        rbd_dev->header.image_size = h.image_size;
-       rbd_dev->header.total_snaps = h.total_snaps;
        rbd_dev->header.snapc = h.snapc;
        rbd_dev->header.snap_names = h.snap_names;
-       rbd_dev->header.snap_names_len = h.snap_names_len;
        rbd_dev->header.snap_sizes = h.snap_sizes;
        /* Free the extra copy of the object prefix */
        WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
        kfree(h.object_prefix);
 
-       ret = __rbd_init_snaps_header(rbd_dev);
+       ret = rbd_dev_snaps_update(rbd_dev);
+       if (!ret)
+               ret = rbd_dev_snaps_register(rbd_dev);
 
        up_write(&rbd_dev->header_rwsem);
 
        return ret;
 }
 
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
 {
        int ret;
 
+       rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-       ret = __rbd_refresh_header(rbd_dev, hver);
+       if (rbd_dev->image_format == 1)
+               ret = rbd_dev_v1_refresh(rbd_dev, hver);
+       else
+               ret = rbd_dev_v2_refresh(rbd_dev, hver);
        mutex_unlock(&ctl_mutex);
 
        return ret;
@@ -1764,29 +1801,12 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
 {
        struct gendisk *disk;
        struct request_queue *q;
-       int rc;
        u64 segment_size;
-       u64 total_size = 0;
-
-       /* contact OSD, request size info about the object being mapped */
-       rc = rbd_read_header(rbd_dev, &rbd_dev->header);
-       if (rc)
-               return rc;
-
-       /* no need to lock here, as rbd_dev is not registered yet */
-       rc = __rbd_init_snaps_header(rbd_dev);
-       if (rc)
-               return rc;
-
-       rc = rbd_header_set_snap(rbd_dev, &total_size);
-       if (rc)
-               return rc;
 
        /* create gendisk info */
-       rc = -ENOMEM;
        disk = alloc_disk(RBD_MINORS_PER_MAJOR);
        if (!disk)
-               goto out;
+               return -ENOMEM;
 
        snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
                 rbd_dev->dev_id);
@@ -1796,7 +1816,6 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        disk->private_data = rbd_dev;
 
        /* init rq */
-       rc = -ENOMEM;
        q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
        if (!q)
                goto out_disk;
@@ -1817,20 +1836,14 @@ static int rbd_init_disk(struct rbd_device *rbd_dev)
        q->queuedata = rbd_dev;
 
        rbd_dev->disk = disk;
-       rbd_dev->q = q;
 
-       /* finally, announce the disk to the world */
-       set_capacity(disk, total_size / SECTOR_SIZE);
-       add_disk(disk);
+       set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
 
-       pr_info("%s: added with size 0x%llx\n",
-               disk->disk_name, (unsigned long long)total_size);
        return 0;
-
 out_disk:
        put_disk(disk);
-out:
-       return rc;
+
+       return -ENOMEM;
 }
 
 /*
@@ -1855,6 +1868,19 @@ static ssize_t rbd_size_show(struct device *dev,
        return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
 }
 
+/*
+ * Note this shows the features for whatever's mapped, which is not
+ * necessarily the base image.
+ */
+static ssize_t rbd_features_show(struct device *dev,
+                            struct device_attribute *attr, char *buf)
+{
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+       return sprintf(buf, "0x%016llx\n",
+                       (unsigned long long) rbd_dev->mapping.features);
+}
+
 static ssize_t rbd_major_show(struct device *dev,
                              struct device_attribute *attr, char *buf)
 {
@@ -1896,13 +1922,25 @@ static ssize_t rbd_name_show(struct device *dev,
        return sprintf(buf, "%s\n", rbd_dev->image_name);
 }
 
+static ssize_t rbd_image_id_show(struct device *dev,
+                            struct device_attribute *attr, char *buf)
+{
+       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+
+       return sprintf(buf, "%s\n", rbd_dev->image_id);
+}
+
+/*
+ * Shows the name of the currently-mapped snapshot (or
+ * RBD_SNAP_HEAD_NAME for the base image).
+ */
 static ssize_t rbd_snap_show(struct device *dev,
                             struct device_attribute *attr,
                             char *buf)
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       return sprintf(buf, "%s\n", rbd_dev->snap_name);
+       return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
 }
 
 static ssize_t rbd_image_refresh(struct device *dev,
@@ -1913,31 +1951,33 @@ static ssize_t rbd_image_refresh(struct device *dev,
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
        int ret;
 
-       ret = rbd_refresh_header(rbd_dev, NULL);
+       ret = rbd_dev_refresh(rbd_dev, NULL);
 
        return ret < 0 ? ret : size;
 }
 
 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
+static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
+static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
-static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
 
 static struct attribute *rbd_attrs[] = {
        &dev_attr_size.attr,
+       &dev_attr_features.attr,
        &dev_attr_major.attr,
        &dev_attr_client_id.attr,
        &dev_attr_pool.attr,
        &dev_attr_pool_id.attr,
        &dev_attr_name.attr,
+       &dev_attr_image_id.attr,
        &dev_attr_current_snap.attr,
        &dev_attr_refresh.attr,
-       &dev_attr_create_snap.attr,
        NULL
 };
 
@@ -1983,12 +2023,24 @@ static ssize_t rbd_snap_id_show(struct device *dev,
        return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
 }
 
+static ssize_t rbd_snap_features_show(struct device *dev,
+                               struct device_attribute *attr,
+                               char *buf)
+{
+       struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
+
+       return sprintf(buf, "0x%016llx\n",
+                       (unsigned long long) snap->features);
+}
+
 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
+static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
 
 static struct attribute *rbd_snap_attrs[] = {
        &dev_attr_snap_size.attr,
        &dev_attr_snap_id.attr,
+       &dev_attr_snap_features.attr,
        NULL,
 };
 
@@ -2013,10 +2065,21 @@ static struct device_type rbd_snap_device_type = {
        .release        = rbd_snap_dev_release,
 };
 
+static bool rbd_snap_registered(struct rbd_snap *snap)
+{
+       bool ret = snap->dev.type == &rbd_snap_device_type;
+       bool reg = device_is_registered(&snap->dev);
+
+       rbd_assert(!ret ^ reg);
+
+       return ret;
+}
+
 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
 {
        list_del(&snap->node);
-       device_unregister(&snap->dev);
+       if (device_is_registered(&snap->dev))
+               device_unregister(&snap->dev);
 }
 
 static int rbd_register_snap_dev(struct rbd_snap *snap,
@@ -2029,13 +2092,17 @@ static int rbd_register_snap_dev(struct rbd_snap *snap,
        dev->parent = parent;
        dev->release = rbd_snap_dev_release;
        dev_set_name(dev, "snap_%s", snap->name);
+       dout("%s: registering device for snapshot %s\n", __func__, snap->name);
+
        ret = device_register(dev);
 
        return ret;
 }
 
 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
-                                             int i, const char *name)
+                                               const char *snap_name,
+                                               u64 snap_id, u64 snap_size,
+                                               u64 snap_features)
 {
        struct rbd_snap *snap;
        int ret;
@@ -2045,17 +2112,13 @@ static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
                return ERR_PTR(-ENOMEM);
 
        ret = -ENOMEM;
-       snap->name = kstrdup(name, GFP_KERNEL);
+       snap->name = kstrdup(snap_name, GFP_KERNEL);
        if (!snap->name)
                goto err;
 
-       snap->size = rbd_dev->header.snap_sizes[i];
-       snap->id = rbd_dev->header.snapc->snaps[i];
-       if (device_is_registered(&rbd_dev->dev)) {
-               ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
-               if (ret < 0)
-                       goto err;
-       }
+       snap->id = snap_id;
+       snap->size = snap_size;
+       snap->features = snap_features;
 
        return snap;
 
@@ -2066,128 +2129,480 @@ err:
        return ERR_PTR(ret);
 }
 
+static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
+               u64 *snap_size, u64 *snap_features)
+{
+       char *snap_name;
+
+       rbd_assert(which < rbd_dev->header.snapc->num_snaps);
+
+       *snap_size = rbd_dev->header.snap_sizes[which];
+       *snap_features = 0;     /* No features for v1 */
+
+       /* Skip over names until we find the one we are looking for */
+
+       snap_name = rbd_dev->header.snap_names;
+       while (which--)
+               snap_name += strlen(snap_name) + 1;
+
+       return snap_name;
+}
+
 /*
- * search for the previous snap in a null delimited string list
+ * Get the size and object order for an image snapshot, or if
+ * snap_id is CEPH_NOSNAP, gets this information for the base
+ * image.
  */
-const char *rbd_prev_snap_name(const char *name, const char *start)
+static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
+                               u8 *order, u64 *snap_size)
 {
-       if (name < start + 2)
-               return NULL;
+       __le64 snapid = cpu_to_le64(snap_id);
+       int ret;
+       struct {
+               u8 order;
+               __le64 size;
+       } __attribute__ ((packed)) size_buf = { 0 };
+
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_size",
+                               (char *) &snapid, sizeof (snapid),
+                               (char *) &size_buf, sizeof (size_buf),
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               return ret;
+
+       *order = size_buf.order;
+       *snap_size = le64_to_cpu(size_buf.size);
+
+       dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
+               (unsigned long long) snap_id, (unsigned int) *order,
+               (unsigned long long) *snap_size);
+
+       return 0;
+}
+
+static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
+{
+       return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
+                                       &rbd_dev->header.obj_order,
+                                       &rbd_dev->header.image_size);
+}
 
-       name -= 2;
-       while (*name) {
-               if (name == start)
-                       return start;
-               name--;
+static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
+{
+       void *reply_buf;
+       int ret;
+       void *p;
+
+       reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
+       if (!reply_buf)
+               return -ENOMEM;
+
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_object_prefix",
+                               NULL, 0,
+                               reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               goto out;
+
+       p = reply_buf;
+       rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
+                                               p + RBD_OBJ_PREFIX_LEN_MAX,
+                                               NULL, GFP_NOIO);
+
+       if (IS_ERR(rbd_dev->header.object_prefix)) {
+               ret = PTR_ERR(rbd_dev->header.object_prefix);
+               rbd_dev->header.object_prefix = NULL;
+       } else {
+               dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
        }
-       return name + 1;
+
+out:
+       kfree(reply_buf);
+
+       return ret;
 }
 
-/*
- * compare the old list of snapshots that we have to what's in the header
- * and update it accordingly. Note that the header holds the snapshots
- * in a reverse order (from newest to oldest) and we need to go from
- * older to new so that we don't get a duplicate snap name when
- * doing the process (e.g., removed snapshot and recreated a new
- * one with the same name.
- */
-static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
+static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
+               u64 *snap_features)
 {
-       const char *name, *first_name;
-       int i = rbd_dev->header.total_snaps;
-       struct rbd_snap *snap, *old_snap = NULL;
-       struct list_head *p, *n;
+       __le64 snapid = cpu_to_le64(snap_id);
+       struct {
+               __le64 features;
+               __le64 incompat;
+       } features_buf = { 0 };
+       u64 incompat;
+       int ret;
 
-       first_name = rbd_dev->header.snap_names;
-       name = first_name + rbd_dev->header.snap_names_len;
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_features",
+                               (char *) &snapid, sizeof (snapid),
+                               (char *) &features_buf, sizeof (features_buf),
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               return ret;
 
-       list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
-               u64 cur_id;
+       incompat = le64_to_cpu(features_buf.incompat);
+       if (incompat & ~RBD_FEATURES_ALL)
+               return -ENOTSUPP;
 
-               old_snap = list_entry(p, struct rbd_snap, node);
+       *snap_features = le64_to_cpu(features_buf.features);
 
-               if (i)
-                       cur_id = rbd_dev->header.snapc->snaps[i - 1];
+       dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
+               (unsigned long long) snap_id,
+               (unsigned long long) *snap_features,
+               (unsigned long long) le64_to_cpu(features_buf.incompat));
 
-               if (!i || old_snap->id < cur_id) {
-                       /*
-                        * old_snap->id was skipped, thus was
-                        * removed.  If this rbd_dev is mapped to
-                        * the removed snapshot, record that it no
-                        * longer exists, to prevent further I/O.
-                        */
-                       if (rbd_dev->snap_id == old_snap->id)
-                               rbd_dev->snap_exists = false;
-                       __rbd_remove_snap_dev(old_snap);
-                       continue;
-               }
-               if (old_snap->id == cur_id) {
-                       /* we have this snapshot already */
-                       i--;
-                       name = rbd_prev_snap_name(name, first_name);
+       return 0;
+}
+
+static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
+{
+       return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
+                                               &rbd_dev->header.features);
+}
+
+static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
+{
+       size_t size;
+       int ret;
+       void *reply_buf;
+       void *p;
+       void *end;
+       u64 seq;
+       u32 snap_count;
+       struct ceph_snap_context *snapc;
+       u32 i;
+
+       /*
+        * We'll need room for the seq value (maximum snapshot id),
+        * snapshot count, and array of that many snapshot ids.
+        * For now we have a fixed upper limit on the number we're
+        * prepared to receive.
+        */
+       size = sizeof (__le64) + sizeof (__le32) +
+                       RBD_MAX_SNAP_COUNT * sizeof (__le64);
+       reply_buf = kzalloc(size, GFP_KERNEL);
+       if (!reply_buf)
+               return -ENOMEM;
+
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_snapcontext",
+                               NULL, 0,
+                               reply_buf, size,
+                               CEPH_OSD_FLAG_READ, ver);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               goto out;
+
+       ret = -ERANGE;
+       p = reply_buf;
+       end = (char *) reply_buf + size;
+       ceph_decode_64_safe(&p, end, seq, out);
+       ceph_decode_32_safe(&p, end, snap_count, out);
+
+       /*
+        * Make sure the reported number of snapshot ids wouldn't go
+        * beyond the end of our buffer.  But before checking that,
+        * make sure the computed size of the snapshot context we
+        * allocate is representable in a size_t.
+        */
+       if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
+                                / sizeof (u64)) {
+               ret = -EINVAL;
+               goto out;
+       }
+       if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
+               goto out;
+
+       size = sizeof (struct ceph_snap_context) +
+                               snap_count * sizeof (snapc->snaps[0]);
+       snapc = kmalloc(size, GFP_KERNEL);
+       if (!snapc) {
+               ret = -ENOMEM;
+               goto out;
+       }
+
+       atomic_set(&snapc->nref, 1);
+       snapc->seq = seq;
+       snapc->num_snaps = snap_count;
+       for (i = 0; i < snap_count; i++)
+               snapc->snaps[i] = ceph_decode_64(&p);
+
+       rbd_dev->header.snapc = snapc;
+
+       dout("  snap context seq = %llu, snap_count = %u\n",
+               (unsigned long long) seq, (unsigned int) snap_count);
+
+out:
+       kfree(reply_buf);
+
+       return 0;
+}
+
+static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
+{
+       size_t size;
+       void *reply_buf;
+       __le64 snap_id;
+       int ret;
+       void *p;
+       void *end;
+       size_t snap_name_len;
+       char *snap_name;
+
+       size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
+       reply_buf = kmalloc(size, GFP_KERNEL);
+       if (!reply_buf)
+               return ERR_PTR(-ENOMEM);
+
+       snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
+       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+                               "rbd", "get_snapshot_name",
+                               (char *) &snap_id, sizeof (snap_id),
+                               reply_buf, size,
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               goto out;
+
+       p = reply_buf;
+       end = (char *) reply_buf + size;
+       snap_name_len = 0;
+       snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
+                               GFP_KERNEL);
+       if (IS_ERR(snap_name)) {
+               ret = PTR_ERR(snap_name);
+               goto out;
+       } else {
+               dout("  snap_id 0x%016llx snap_name = %s\n",
+                       (unsigned long long) le64_to_cpu(snap_id), snap_name);
+       }
+       kfree(reply_buf);
+
+       return snap_name;
+out:
+       kfree(reply_buf);
+
+       return ERR_PTR(ret);
+}
+
+static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
+               u64 *snap_size, u64 *snap_features)
+{
+       __le64 snap_id;
+       u8 order;
+       int ret;
+
+       snap_id = rbd_dev->header.snapc->snaps[which];
+       ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
+       if (ret)
+               return ERR_PTR(ret);
+       ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
+       if (ret)
+               return ERR_PTR(ret);
+
+       return rbd_dev_v2_snap_name(rbd_dev, which);
+}
+
+static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
+               u64 *snap_size, u64 *snap_features)
+{
+       if (rbd_dev->image_format == 1)
+               return rbd_dev_v1_snap_info(rbd_dev, which,
+                                       snap_size, snap_features);
+       if (rbd_dev->image_format == 2)
+               return rbd_dev_v2_snap_info(rbd_dev, which,
+                                       snap_size, snap_features);
+       return ERR_PTR(-EINVAL);
+}
+
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
+{
+       int ret;
+       __u8 obj_order;
+
+       down_write(&rbd_dev->header_rwsem);
+
+       /* Grab old order first, to see if it changes */
+
+       obj_order = rbd_dev->header.obj_order,
+       ret = rbd_dev_v2_image_size(rbd_dev);
+       if (ret)
+               goto out;
+       if (rbd_dev->header.obj_order != obj_order) {
+               ret = -EIO;
+               goto out;
+       }
+       rbd_update_mapping_size(rbd_dev);
+
+       ret = rbd_dev_v2_snap_context(rbd_dev, hver);
+       dout("rbd_dev_v2_snap_context returned %d\n", ret);
+       if (ret)
+               goto out;
+       ret = rbd_dev_snaps_update(rbd_dev);
+       dout("rbd_dev_snaps_update returned %d\n", ret);
+       if (ret)
+               goto out;
+       ret = rbd_dev_snaps_register(rbd_dev);
+       dout("rbd_dev_snaps_register returned %d\n", ret);
+out:
+       up_write(&rbd_dev->header_rwsem);
+
+       return ret;
+}
+
+/*
+ * Scan the rbd device's current snapshot list and compare it to the
+ * newly-received snapshot context.  Remove any existing snapshots
+ * not present in the new snapshot context.  Add a new snapshot for
+ * any snaphots in the snapshot context not in the current list.
+ * And verify there are no changes to snapshots we already know
+ * about.
+ *
+ * Assumes the snapshots in the snapshot context are sorted by
+ * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
+ * are also maintained in that order.)
+ */
+static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
+{
+       struct ceph_snap_context *snapc = rbd_dev->header.snapc;
+       const u32 snap_count = snapc->num_snaps;
+       struct list_head *head = &rbd_dev->snaps;
+       struct list_head *links = head->next;
+       u32 index = 0;
+
+       dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
+       while (index < snap_count || links != head) {
+               u64 snap_id;
+               struct rbd_snap *snap;
+               char *snap_name;
+               u64 snap_size = 0;
+               u64 snap_features = 0;
+
+               snap_id = index < snap_count ? snapc->snaps[index]
+                                            : CEPH_NOSNAP;
+               snap = links != head ? list_entry(links, struct rbd_snap, node)
+                                    : NULL;
+               rbd_assert(!snap || snap->id != CEPH_NOSNAP);
+
+               if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
+                       struct list_head *next = links->next;
+
+                       /* Existing snapshot not in the new snap context */
+
+                       if (rbd_dev->mapping.snap_id == snap->id)
+                               rbd_dev->mapping.snap_exists = false;
+                       __rbd_remove_snap_dev(snap);
+                       dout("%ssnap id %llu has been removed\n",
+                               rbd_dev->mapping.snap_id == snap->id ?
+                                                               "mapped " : "",
+                               (unsigned long long) snap->id);
+
+                       /* Done with this list entry; advance */
+
+                       links = next;
                        continue;
                }
-               for (; i > 0;
-                    i--, name = rbd_prev_snap_name(name, first_name)) {
-                       if (!name) {
-                               WARN_ON(1);
-                               return -EINVAL;
+
+               snap_name = rbd_dev_snap_info(rbd_dev, index,
+                                       &snap_size, &snap_features);
+               if (IS_ERR(snap_name))
+                       return PTR_ERR(snap_name);
+
+               dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
+                       (unsigned long long) snap_id);
+               if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
+                       struct rbd_snap *new_snap;
+
+                       /* We haven't seen this snapshot before */
+
+                       new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
+                                       snap_id, snap_size, snap_features);
+                       if (IS_ERR(new_snap)) {
+                               int err = PTR_ERR(new_snap);
+
+                               dout("  failed to add dev, error %d\n", err);
+
+                               return err;
                        }
-                       cur_id = rbd_dev->header.snapc->snaps[i];
-                       /* snapshot removal? handle it above */
-                       if (cur_id >= old_snap->id)
-                               break;
-                       /* a new snapshot */
-                       snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
-                       if (IS_ERR(snap))
-                               return PTR_ERR(snap);
-
-                       /* note that we add it backward so using n and not p */
-                       list_add(&snap->node, n);
-                       p = &snap->node;
+
+                       /* New goes before existing, or at end of list */
+
+                       dout("  added dev%s\n", snap ? "" : " at end\n");
+                       if (snap)
+                               list_add_tail(&new_snap->node, &snap->node);
+                       else
+                               list_add_tail(&new_snap->node, head);
+               } else {
+                       /* Already have this one */
+
+                       dout("  already present\n");
+
+                       rbd_assert(snap->size == snap_size);
+                       rbd_assert(!strcmp(snap->name, snap_name));
+                       rbd_assert(snap->features == snap_features);
+
+                       /* Done with this list entry; advance */
+
+                       links = links->next;
                }
+
+               /* Advance to the next entry in the snapshot context */
+
+               index++;
        }
-       /* we're done going over the old snap list, just add what's left */
-       for (; i > 0; i--) {
-               name = rbd_prev_snap_name(name, first_name);
-               if (!name) {
-                       WARN_ON(1);
-                       return -EINVAL;
+       dout("%s: done\n", __func__);
+
+       return 0;
+}
+
+/*
+ * Scan the list of snapshots and register the devices for any that
+ * have not already been registered.
+ */
+static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
+{
+       struct rbd_snap *snap;
+       int ret = 0;
+
+       dout("%s called\n", __func__);
+       if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
+               return -EIO;
+
+       list_for_each_entry(snap, &rbd_dev->snaps, node) {
+               if (!rbd_snap_registered(snap)) {
+                       ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
+                       if (ret < 0)
+                               break;
                }
-               snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
-               if (IS_ERR(snap))
-                       return PTR_ERR(snap);
-               list_add(&snap->node, &rbd_dev->snaps);
        }
+       dout("%s: returning %d\n", __func__, ret);
 
-       return 0;
+       return ret;
 }
 
 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
 {
-       int ret;
        struct device *dev;
-       struct rbd_snap *snap;
+       int ret;
 
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-       dev = &rbd_dev->dev;
 
+       dev = &rbd_dev->dev;
        dev->bus = &rbd_bus_type;
        dev->type = &rbd_device_type;
        dev->parent = &rbd_root_dev;
        dev->release = rbd_dev_release;
        dev_set_name(dev, "%d", rbd_dev->dev_id);
        ret = device_register(dev);
-       if (ret < 0)
-               goto out;
 
-       list_for_each_entry(snap, &rbd_dev->snaps, node) {
-               ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
-               if (ret < 0)
-                       break;
-       }
-out:
        mutex_unlock(&ctl_mutex);
+
        return ret;
 }
 
@@ -2203,7 +2618,7 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
        do {
                ret = rbd_req_sync_watch(rbd_dev);
                if (ret == -ERANGE) {
-                       rc = rbd_refresh_header(rbd_dev, NULL);
+                       rc = rbd_dev_refresh(rbd_dev, NULL);
                        if (rc < 0)
                                return rc;
                }
@@ -2212,33 +2627,37 @@ static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
        return ret;
 }
 
-static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
+static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
 
 /*
  * Get a unique rbd identifier for the given new rbd_dev, and add
  * the rbd_dev to the global list.  The minimum rbd id is 1.
  */
-static void rbd_id_get(struct rbd_device *rbd_dev)
+static void rbd_dev_id_get(struct rbd_device *rbd_dev)
 {
-       rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
+       rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
 
        spin_lock(&rbd_dev_list_lock);
        list_add_tail(&rbd_dev->node, &rbd_dev_list);
        spin_unlock(&rbd_dev_list_lock);
+       dout("rbd_dev %p given dev id %llu\n", rbd_dev,
+               (unsigned long long) rbd_dev->dev_id);
 }
 
 /*
  * Remove an rbd_dev from the global list, and record that its
  * identifier is no longer in use.
  */
-static void rbd_id_put(struct rbd_device *rbd_dev)
+static void rbd_dev_id_put(struct rbd_device *rbd_dev)
 {
        struct list_head *tmp;
        int rbd_id = rbd_dev->dev_id;
        int max_id;
 
-       BUG_ON(rbd_id < 1);
+       rbd_assert(rbd_id > 0);
 
+       dout("rbd_dev %p released dev id %llu\n", rbd_dev,
+               (unsigned long long) rbd_dev->dev_id);
        spin_lock(&rbd_dev_list_lock);
        list_del_init(&rbd_dev->node);
 
@@ -2246,7 +2665,7 @@ static void rbd_id_put(struct rbd_device *rbd_dev)
         * If the id being "put" is not the current maximum, there
         * is nothing special we need to do.
         */
-       if (rbd_id != atomic64_read(&rbd_id_max)) {
+       if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
                spin_unlock(&rbd_dev_list_lock);
                return;
        }
@@ -2267,12 +2686,13 @@ static void rbd_id_put(struct rbd_device *rbd_dev)
        spin_unlock(&rbd_dev_list_lock);
 
        /*
-        * The max id could have been updated by rbd_id_get(), in
+        * The max id could have been updated by rbd_dev_id_get(), in
         * which case it now accurately reflects the new maximum.
         * Be careful not to overwrite the maximum value in that
         * case.
         */
-       atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
+       atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
+       dout("  max dev id has been reset\n");
 }
 
 /*
@@ -2361,28 +2781,31 @@ static inline char *dup_token(const char **buf, size_t *lenp)
 }
 
 /*
- * This fills in the pool_name, image_name, image_name_len, snap_name,
- * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
- * on the list of monitor addresses and other options provided via
- * /sys/bus/rbd/add.
+ * This fills in the pool_name, image_name, image_name_len, rbd_dev,
+ * rbd_md_name, and name fields of the given rbd_dev, based on the
+ * list of monitor addresses and other options provided via
+ * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
+ * copy of the snapshot name to map if successful, or a
+ * pointer-coded error otherwise.
  *
  * Note: rbd_dev is assumed to have been initially zero-filled.
  */
-static int rbd_add_parse_args(struct rbd_device *rbd_dev,
-                             const char *buf,
-                             const char **mon_addrs,
-                             size_t *mon_addrs_size,
-                             char *options,
-                            size_t options_size)
+static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
+                               const char *buf,
+                               const char **mon_addrs,
+                               size_t *mon_addrs_size,
+                               char *options,
+                               size_t options_size)
 {
        size_t len;
-       int ret;
+       char *err_ptr = ERR_PTR(-EINVAL);
+       char *snap_name;
 
        /* The first four tokens are required */
 
        len = next_token(&buf);
        if (!len)
-               return -EINVAL;
+               return err_ptr;
        *mon_addrs_size = len + 1;
        *mon_addrs = buf;
 
@@ -2390,9 +2813,9 @@ static int rbd_add_parse_args(struct rbd_device *rbd_dev,
 
        len = copy_token(&buf, options, options_size);
        if (!len || len >= options_size)
-               return -EINVAL;
+               return err_ptr;
 
-       ret = -ENOMEM;
+       err_ptr = ERR_PTR(-ENOMEM);
        rbd_dev->pool_name = dup_token(&buf, NULL);
        if (!rbd_dev->pool_name)
                goto out_err;
@@ -2401,41 +2824,227 @@ static int rbd_add_parse_args(struct rbd_device *rbd_dev,
        if (!rbd_dev->image_name)
                goto out_err;
 
-       /* Create the name of the header object */
+       /* Snapshot name is optional */
+       len = next_token(&buf);
+       if (!len) {
+               buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
+               len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
+       }
+       snap_name = kmalloc(len + 1, GFP_KERNEL);
+       if (!snap_name)
+               goto out_err;
+       memcpy(snap_name, buf, len);
+       *(snap_name + len) = '\0';
+
+dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
 
-       rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
-                                               + sizeof (RBD_SUFFIX),
-                                       GFP_KERNEL);
-       if (!rbd_dev->header_name)
+       return snap_name;
+
+out_err:
+       kfree(rbd_dev->image_name);
+       rbd_dev->image_name = NULL;
+       rbd_dev->image_name_len = 0;
+       kfree(rbd_dev->pool_name);
+       rbd_dev->pool_name = NULL;
+
+       return err_ptr;
+}
+
+/*
+ * An rbd format 2 image has a unique identifier, distinct from the
+ * name given to it by the user.  Internally, that identifier is
+ * what's used to specify the names of objects related to the image.
+ *
+ * A special "rbd id" object is used to map an rbd image name to its
+ * id.  If that object doesn't exist, then there is no v2 rbd image
+ * with the supplied name.
+ *
+ * This function will record the given rbd_dev's image_id field if
+ * it can be determined, and in that case will return 0.  If any
+ * errors occur a negative errno will be returned and the rbd_dev's
+ * image_id field will be unchanged (and should be NULL).
+ */
+static int rbd_dev_image_id(struct rbd_device *rbd_dev)
+{
+       int ret;
+       size_t size;
+       char *object_name;
+       void *response;
+       void *p;
+
+       /*
+        * First, see if the format 2 image id file exists, and if
+        * so, get the image's persistent id from it.
+        */
+       size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
+       object_name = kmalloc(size, GFP_NOIO);
+       if (!object_name)
+               return -ENOMEM;
+       sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
+       dout("rbd id object name is %s\n", object_name);
+
+       /* Response will be an encoded string, which includes a length */
+
+       size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
+       response = kzalloc(size, GFP_NOIO);
+       if (!response) {
+               ret = -ENOMEM;
+               goto out;
+       }
+
+       ret = rbd_req_sync_exec(rbd_dev, object_name,
+                               "rbd", "get_id",
+                               NULL, 0,
+                               response, RBD_IMAGE_ID_LEN_MAX,
+                               CEPH_OSD_FLAG_READ, NULL);
+       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       if (ret < 0)
+               goto out;
+
+       p = response;
+       rbd_dev->image_id = ceph_extract_encoded_string(&p,
+                                               p + RBD_IMAGE_ID_LEN_MAX,
+                                               &rbd_dev->image_id_len,
+                                               GFP_NOIO);
+       if (IS_ERR(rbd_dev->image_id)) {
+               ret = PTR_ERR(rbd_dev->image_id);
+               rbd_dev->image_id = NULL;
+       } else {
+               dout("image_id is %s\n", rbd_dev->image_id);
+       }
+out:
+       kfree(response);
+       kfree(object_name);
+
+       return ret;
+}
+
+static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
+{
+       int ret;
+       size_t size;
+
+       /* Version 1 images have no id; empty string is used */
+
+       rbd_dev->image_id = kstrdup("", GFP_KERNEL);
+       if (!rbd_dev->image_id)
+               return -ENOMEM;
+       rbd_dev->image_id_len = 0;
+
+       /* Record the header object name for this rbd image. */
+
+       size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
+       rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
+       if (!rbd_dev->header_name) {
+               ret = -ENOMEM;
                goto out_err;
+       }
        sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
 
+       /* Populate rbd image metadata */
+
+       ret = rbd_read_header(rbd_dev, &rbd_dev->header);
+       if (ret < 0)
+               goto out_err;
+       rbd_dev->image_format = 1;
+
+       dout("discovered version 1 image, header name is %s\n",
+               rbd_dev->header_name);
+
+       return 0;
+
+out_err:
+       kfree(rbd_dev->header_name);
+       rbd_dev->header_name = NULL;
+       kfree(rbd_dev->image_id);
+       rbd_dev->image_id = NULL;
+
+       return ret;
+}
+
+static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
+{
+       size_t size;
+       int ret;
+       u64 ver = 0;
+
        /*
-        * The snapshot name is optional.  If none is is supplied,
-        * we use the default value.
+        * Image id was filled in by the caller.  Record the header
+        * object name for this rbd image.
         */
-       rbd_dev->snap_name = dup_token(&buf, &len);
-       if (!rbd_dev->snap_name)
+       size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
+       rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
+       if (!rbd_dev->header_name)
+               return -ENOMEM;
+       sprintf(rbd_dev->header_name, "%s%s",
+                       RBD_HEADER_PREFIX, rbd_dev->image_id);
+
+       /* Get the size and object order for the image */
+
+       ret = rbd_dev_v2_image_size(rbd_dev);
+       if (ret < 0)
                goto out_err;
-       if (!len) {
-               /* Replace the empty name with the default */
-               kfree(rbd_dev->snap_name);
-               rbd_dev->snap_name
-                       = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
-               if (!rbd_dev->snap_name)
-                       goto out_err;
 
-               memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
-                       sizeof (RBD_SNAP_HEAD_NAME));
-       }
+       /* Get the object prefix (a.k.a. block_name) for the image */
 
-       return 0;
+       ret = rbd_dev_v2_object_prefix(rbd_dev);
+       if (ret < 0)
+               goto out_err;
+
+       /* Get the and check features for the image */
 
+       ret = rbd_dev_v2_features(rbd_dev);
+       if (ret < 0)
+               goto out_err;
+
+       /* crypto and compression type aren't (yet) supported for v2 images */
+
+       rbd_dev->header.crypt_type = 0;
+       rbd_dev->header.comp_type = 0;
+
+       /* Get the snapshot context, plus the header version */
+
+       ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
+       if (ret)
+               goto out_err;
+       rbd_dev->header.obj_version = ver;
+
+       rbd_dev->image_format = 2;
+
+       dout("discovered version 2 image, header name is %s\n",
+               rbd_dev->header_name);
+
+       return -ENOTSUPP;
 out_err:
        kfree(rbd_dev->header_name);
-       kfree(rbd_dev->image_name);
-       kfree(rbd_dev->pool_name);
-       rbd_dev->pool_name = NULL;
+       rbd_dev->header_name = NULL;
+       kfree(rbd_dev->header.object_prefix);
+       rbd_dev->header.object_prefix = NULL;
+
+       return ret;
+}
+
+/*
+ * Probe for the existence of the header object for the given rbd
+ * device.  For format 2 images this includes determining the image
+ * id.
+ */
+static int rbd_dev_probe(struct rbd_device *rbd_dev)
+{
+       int ret;
+
+       /*
+        * Get the id from the image id object.  If it's not a
+        * format 2 image, we'll get ENOENT back, and we'll assume
+        * it's a format 1 image.
+        */
+       ret = rbd_dev_image_id(rbd_dev);
+       if (ret)
+               ret = rbd_dev_v1_probe(rbd_dev);
+       else
+               ret = rbd_dev_v2_probe(rbd_dev);
+       if (ret)
+               dout("probe failed, returning %d\n", ret);
 
        return ret;
 }
@@ -2450,16 +3059,17 @@ static ssize_t rbd_add(struct bus_type *bus,
        size_t mon_addrs_size = 0;
        struct ceph_osd_client *osdc;
        int rc = -ENOMEM;
+       char *snap_name;
 
        if (!try_module_get(THIS_MODULE))
                return -ENODEV;
 
        options = kmalloc(count, GFP_KERNEL);
        if (!options)
-               goto err_nomem;
+               goto err_out_mem;
        rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
        if (!rbd_dev)
-               goto err_nomem;
+               goto err_out_mem;
 
        /* static rbd_device initialization */
        spin_lock_init(&rbd_dev->lock);
@@ -2467,27 +3077,18 @@ static ssize_t rbd_add(struct bus_type *bus,
        INIT_LIST_HEAD(&rbd_dev->snaps);
        init_rwsem(&rbd_dev->header_rwsem);
 
-       /* generate unique id: find highest unique id, add one */
-       rbd_id_get(rbd_dev);
-
-       /* Fill in the device name, now that we have its id. */
-       BUILD_BUG_ON(DEV_NAME_LEN
-                       < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
-       sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
-
        /* parse add command */
-       rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
-                               options, count);
-       if (rc)
-               goto err_put_id;
-
-       rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
-                                               options);
-       if (IS_ERR(rbd_dev->rbd_client)) {
-               rc = PTR_ERR(rbd_dev->rbd_client);
-               goto err_put_id;
+       snap_name = rbd_add_parse_args(rbd_dev, buf,
+                               &mon_addrs, &mon_addrs_size, options, count);
+       if (IS_ERR(snap_name)) {
+               rc = PTR_ERR(snap_name);
+               goto err_out_mem;
        }
 
+       rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
+       if (rc < 0)
+               goto err_out_args;
+
        /* pick the pool */
        osdc = &rbd_dev->rbd_client->client->osdc;
        rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
@@ -2495,23 +3096,52 @@ static ssize_t rbd_add(struct bus_type *bus,
                goto err_out_client;
        rbd_dev->pool_id = rc;
 
-       /* register our block device */
-       rc = register_blkdev(0, rbd_dev->name);
+       rc = rbd_dev_probe(rbd_dev);
        if (rc < 0)
                goto err_out_client;
+
+       /* no need to lock here, as rbd_dev is not registered yet */
+       rc = rbd_dev_snaps_update(rbd_dev);
+       if (rc)
+               goto err_out_header;
+
+       rc = rbd_dev_set_mapping(rbd_dev, snap_name);
+       if (rc)
+               goto err_out_header;
+
+       /* generate unique id: find highest unique id, add one */
+       rbd_dev_id_get(rbd_dev);
+
+       /* Fill in the device name, now that we have its id. */
+       BUILD_BUG_ON(DEV_NAME_LEN
+                       < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
+       sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
+
+       /* Get our block major device number. */
+
+       rc = register_blkdev(0, rbd_dev->name);
+       if (rc < 0)
+               goto err_out_id;
        rbd_dev->major = rc;
 
-       rc = rbd_bus_add_dev(rbd_dev);
+       /* Set up the blkdev mapping. */
+
+       rc = rbd_init_disk(rbd_dev);
        if (rc)
                goto err_out_blkdev;
 
+       rc = rbd_bus_add_dev(rbd_dev);
+       if (rc)
+               goto err_out_disk;
+
        /*
         * At this point cleanup in the event of an error is the job
         * of the sysfs code (initiated by rbd_bus_del_dev()).
-        *
-        * Set up and announce blkdev mapping.
         */
-       rc = rbd_init_disk(rbd_dev);
+
+       down_write(&rbd_dev->header_rwsem);
+       rc = rbd_dev_snaps_register(rbd_dev);
+       up_write(&rbd_dev->header_rwsem);
        if (rc)
                goto err_out_bus;
 
@@ -2519,6 +3149,13 @@ static ssize_t rbd_add(struct bus_type *bus,
        if (rc)
                goto err_out_bus;
 
+       /* Everything's ready.  Announce the disk to the world. */
+
+       add_disk(rbd_dev->disk);
+
+       pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
+               (unsigned long long) rbd_dev->mapping.size);
+
        return count;
 
 err_out_bus:
@@ -2528,19 +3165,23 @@ err_out_bus:
        kfree(options);
        return rc;
 
+err_out_disk:
+       rbd_free_disk(rbd_dev);
 err_out_blkdev:
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
+err_out_id:
+       rbd_dev_id_put(rbd_dev);
+err_out_header:
+       rbd_header_free(&rbd_dev->header);
 err_out_client:
+       kfree(rbd_dev->header_name);
        rbd_put_client(rbd_dev);
-err_put_id:
-       if (rbd_dev->pool_name) {
-               kfree(rbd_dev->snap_name);
-               kfree(rbd_dev->header_name);
-               kfree(rbd_dev->image_name);
-               kfree(rbd_dev->pool_name);
-       }
-       rbd_id_put(rbd_dev);
-err_nomem:
+       kfree(rbd_dev->image_id);
+err_out_args:
+       kfree(rbd_dev->mapping.snap_name);
+       kfree(rbd_dev->image_name);
+       kfree(rbd_dev->pool_name);
+err_out_mem:
        kfree(rbd_dev);
        kfree(options);
 
@@ -2586,12 +3227,16 @@ static void rbd_dev_release(struct device *dev)
        rbd_free_disk(rbd_dev);
        unregister_blkdev(rbd_dev->major, rbd_dev->name);
 
+       /* release allocated disk header fields */
+       rbd_header_free(&rbd_dev->header);
+
        /* done with the id, and with the rbd_dev */
-       kfree(rbd_dev->snap_name);
+       kfree(rbd_dev->mapping.snap_name);
+       kfree(rbd_dev->image_id);
        kfree(rbd_dev->header_name);
        kfree(rbd_dev->pool_name);
        kfree(rbd_dev->image_name);
-       rbd_id_put(rbd_dev);
+       rbd_dev_id_put(rbd_dev);
        kfree(rbd_dev);
 
        /* release module ref */
@@ -2629,47 +3274,7 @@ static ssize_t rbd_remove(struct bus_type *bus,
 
 done:
        mutex_unlock(&ctl_mutex);
-       return ret;
-}
-
-static ssize_t rbd_snap_add(struct device *dev,
-                           struct device_attribute *attr,
-                           const char *buf,
-                           size_t count)
-{
-       struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
-       int ret;
-       char *name = kmalloc(count + 1, GFP_KERNEL);
-       if (!name)
-               return -ENOMEM;
-
-       snprintf(name, count, "%s", buf);
 
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
-       ret = rbd_header_add_snap(rbd_dev,
-                                 name, GFP_KERNEL);
-       if (ret < 0)
-               goto err_unlock;
-
-       ret = __rbd_refresh_header(rbd_dev, NULL);
-       if (ret < 0)
-               goto err_unlock;
-
-       /* shouldn't hold ctl_mutex when notifying.. notify might
-          trigger a watch callback that would need to get that mutex */
-       mutex_unlock(&ctl_mutex);
-
-       /* make a best effort, don't error if failed */
-       rbd_req_sync_notify(rbd_dev);
-
-       ret = count;
-       kfree(name);
-       return ret;
-
-err_unlock:
-       mutex_unlock(&ctl_mutex);
-       kfree(name);
        return ret;
 }