2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
58 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
60 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
64 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
66 #define RBD_SNAP_HEAD_NAME "-"
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX 64
72 #define RBD_OBJ_PREFIX_LEN_MAX 64
76 #define RBD_FEATURE_LAYERING (1<<0)
77 #define RBD_FEATURE_STRIPINGV2 (1<<1)
78 #define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
81 /* Features supported by this (client software) implementation. */
83 #define RBD_FEATURES_SUPPORTED (0)
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
91 #define DEV_NAME_LEN 32
92 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
95 * block device image metadata (in-memory version)
97 struct rbd_image_header
{
98 /* These four fields never change for a given rbd image */
105 /* The remaining fields need to be updated occasionally */
107 struct ceph_snap_context
*snapc
;
115 * An rbd image specification.
117 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118 * identify an image. Each rbd_dev structure includes a pointer to
119 * an rbd_spec structure that encapsulates this identity.
121 * Each of the id's in an rbd_spec has an associated name. For a
122 * user-mapped image, the names are supplied and the id's associated
123 * with them are looked up. For a layered image, a parent image is
124 * defined by the tuple, and the names are looked up.
126 * An rbd_dev structure contains a parent_spec pointer which is
127 * non-null if the image it represents is a child in a layered
128 * image. This pointer will refer to the rbd_spec structure used
129 * by the parent rbd_dev for its own identity (i.e., the structure
130 * is shared between the parent and child).
132 * Since these structures are populated once, during the discovery
133 * phase of image construction, they are effectively immutable so
134 * we make no effort to synchronize access to them.
136 * Note that code herein does not assume the image name is known (it
137 * could be a null pointer).
153 * an instance of the client. multiple devices may share an rbd client.
156 struct ceph_client
*client
;
158 struct list_head node
;
161 struct rbd_img_request
;
162 typedef void (*rbd_img_callback_t
)(struct rbd_img_request
*);
164 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
166 struct rbd_obj_request
;
167 typedef void (*rbd_obj_callback_t
)(struct rbd_obj_request
*);
169 enum obj_request_type
{
170 OBJ_REQUEST_NODATA
, OBJ_REQUEST_BIO
, OBJ_REQUEST_PAGES
174 OBJ_REQ_DONE
, /* completion flag: not done = 0, done = 1 */
175 OBJ_REQ_IMG_DATA
, /* object usage: standalone = 0, image = 1 */
176 OBJ_REQ_KNOWN
, /* EXISTS flag valid: no = 0, yes = 1 */
177 OBJ_REQ_EXISTS
, /* target exists: no = 0, yes = 1 */
180 struct rbd_obj_request
{
181 const char *object_name
;
182 u64 offset
; /* object start byte */
183 u64 length
; /* bytes from offset */
187 * An object request associated with an image will have its
188 * img_data flag set; a standalone object request will not.
190 * A standalone object request will have which == BAD_WHICH
191 * and a null obj_request pointer.
193 * An object request initiated in support of a layered image
194 * object (to check for its existence before a write) will
195 * have which == BAD_WHICH and a non-null obj_request pointer.
197 * Finally, an object request for rbd image data will have
198 * which != BAD_WHICH, and will have a non-null img_request
199 * pointer. The value of which will be in the range
200 * 0..(img_request->obj_request_count-1).
203 struct rbd_obj_request
*obj_request
; /* STAT op */
205 struct rbd_img_request
*img_request
;
207 /* links for img_request->obj_requests list */
208 struct list_head links
;
211 u32 which
; /* posn image request list */
213 enum obj_request_type type
;
215 struct bio
*bio_list
;
222 struct ceph_osd_request
*osd_req
;
224 u64 xferred
; /* bytes transferred */
228 rbd_obj_callback_t callback
;
229 struct completion completion
;
235 IMG_REQ_WRITE
, /* I/O direction: read = 0, write = 1 */
236 IMG_REQ_CHILD
, /* initiator: block = 0, child image = 1 */
237 IMG_REQ_LAYERED
, /* ENOENT handling: normal = 0, layered = 1 */
240 struct rbd_img_request
{
241 struct rbd_device
*rbd_dev
;
242 u64 offset
; /* starting image byte offset */
243 u64 length
; /* byte count from offset */
246 u64 snap_id
; /* for reads */
247 struct ceph_snap_context
*snapc
; /* for writes */
250 struct request
*rq
; /* block request */
251 struct rbd_obj_request
*obj_request
; /* obj req initiator */
253 spinlock_t completion_lock
;/* protects next_completion */
255 rbd_img_callback_t callback
;
256 u64 xferred
;/* aggregate bytes transferred */
257 int result
; /* first nonzero obj_request result */
259 u32 obj_request_count
;
260 struct list_head obj_requests
; /* rbd_obj_request structs */
265 #define for_each_obj_request(ireq, oreq) \
266 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
267 #define for_each_obj_request_from(ireq, oreq) \
268 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
269 #define for_each_obj_request_safe(ireq, oreq, n) \
270 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
276 struct list_head node
;
291 int dev_id
; /* blkdev unique id */
293 int major
; /* blkdev assigned major */
294 struct gendisk
*disk
; /* blkdev's gendisk and rq */
296 u32 image_format
; /* Either 1 or 2 */
297 struct rbd_client
*rbd_client
;
299 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
301 spinlock_t lock
; /* queue, flags, open_count */
303 struct rbd_image_header header
;
304 unsigned long flags
; /* possibly lock protected */
305 struct rbd_spec
*spec
;
309 struct ceph_file_layout layout
;
311 struct ceph_osd_event
*watch_event
;
312 struct rbd_obj_request
*watch_request
;
314 struct rbd_spec
*parent_spec
;
316 struct rbd_device
*parent
;
318 /* protects updating the header */
319 struct rw_semaphore header_rwsem
;
321 struct rbd_mapping mapping
;
323 struct list_head node
;
325 /* list of snapshots */
326 struct list_head snaps
;
330 unsigned long open_count
; /* protected by lock */
334 * Flag bits for rbd_dev->flags. If atomicity is required,
335 * rbd_dev->lock is used to protect access.
337 * Currently, only the "removing" flag (which is coupled with the
338 * "open_count" field) requires atomic access.
341 RBD_DEV_FLAG_EXISTS
, /* mapped snapshot has not been deleted */
342 RBD_DEV_FLAG_REMOVING
, /* this mapping is being removed */
345 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
347 static LIST_HEAD(rbd_dev_list
); /* devices */
348 static DEFINE_SPINLOCK(rbd_dev_list_lock
);
350 static LIST_HEAD(rbd_client_list
); /* clients */
351 static DEFINE_SPINLOCK(rbd_client_list_lock
);
353 static int rbd_dev_snaps_update(struct rbd_device
*rbd_dev
);
354 static int rbd_dev_snaps_register(struct rbd_device
*rbd_dev
);
356 static void rbd_dev_release(struct device
*dev
);
357 static void rbd_remove_snap_dev(struct rbd_snap
*snap
);
359 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
,
361 static ssize_t
rbd_remove(struct bus_type
*bus
, const char *buf
,
363 static int rbd_dev_probe(struct rbd_device
*rbd_dev
);
365 static struct bus_attribute rbd_bus_attrs
[] = {
366 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
367 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
371 static struct bus_type rbd_bus_type
= {
373 .bus_attrs
= rbd_bus_attrs
,
376 static void rbd_root_dev_release(struct device
*dev
)
380 static struct device rbd_root_dev
= {
382 .release
= rbd_root_dev_release
,
385 static __printf(2, 3)
386 void rbd_warn(struct rbd_device
*rbd_dev
, const char *fmt
, ...)
388 struct va_format vaf
;
396 printk(KERN_WARNING
"%s: %pV\n", RBD_DRV_NAME
, &vaf
);
397 else if (rbd_dev
->disk
)
398 printk(KERN_WARNING
"%s: %s: %pV\n",
399 RBD_DRV_NAME
, rbd_dev
->disk
->disk_name
, &vaf
);
400 else if (rbd_dev
->spec
&& rbd_dev
->spec
->image_name
)
401 printk(KERN_WARNING
"%s: image %s: %pV\n",
402 RBD_DRV_NAME
, rbd_dev
->spec
->image_name
, &vaf
);
403 else if (rbd_dev
->spec
&& rbd_dev
->spec
->image_id
)
404 printk(KERN_WARNING
"%s: id %s: %pV\n",
405 RBD_DRV_NAME
, rbd_dev
->spec
->image_id
, &vaf
);
407 printk(KERN_WARNING
"%s: rbd_dev %p: %pV\n",
408 RBD_DRV_NAME
, rbd_dev
, &vaf
);
413 #define rbd_assert(expr) \
414 if (unlikely(!(expr))) { \
415 printk(KERN_ERR "\nAssertion failure in %s() " \
417 "\trbd_assert(%s);\n\n", \
418 __func__, __LINE__, #expr); \
421 #else /* !RBD_DEBUG */
422 # define rbd_assert(expr) ((void) 0)
423 #endif /* !RBD_DEBUG */
425 static void rbd_img_parent_read(struct rbd_obj_request
*obj_request
);
427 static int rbd_dev_refresh(struct rbd_device
*rbd_dev
, u64
*hver
);
428 static int rbd_dev_v2_refresh(struct rbd_device
*rbd_dev
, u64
*hver
);
430 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
432 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
433 bool removing
= false;
435 if ((mode
& FMODE_WRITE
) && rbd_dev
->mapping
.read_only
)
438 spin_lock_irq(&rbd_dev
->lock
);
439 if (test_bit(RBD_DEV_FLAG_REMOVING
, &rbd_dev
->flags
))
442 rbd_dev
->open_count
++;
443 spin_unlock_irq(&rbd_dev
->lock
);
447 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
448 (void) get_device(&rbd_dev
->dev
);
449 set_device_ro(bdev
, rbd_dev
->mapping
.read_only
);
450 mutex_unlock(&ctl_mutex
);
455 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
457 struct rbd_device
*rbd_dev
= disk
->private_data
;
458 unsigned long open_count_before
;
460 spin_lock_irq(&rbd_dev
->lock
);
461 open_count_before
= rbd_dev
->open_count
--;
462 spin_unlock_irq(&rbd_dev
->lock
);
463 rbd_assert(open_count_before
> 0);
465 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
466 put_device(&rbd_dev
->dev
);
467 mutex_unlock(&ctl_mutex
);
472 static const struct block_device_operations rbd_bd_ops
= {
473 .owner
= THIS_MODULE
,
475 .release
= rbd_release
,
479 * Initialize an rbd client instance.
482 static struct rbd_client
*rbd_client_create(struct ceph_options
*ceph_opts
)
484 struct rbd_client
*rbdc
;
487 dout("%s:\n", __func__
);
488 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
492 kref_init(&rbdc
->kref
);
493 INIT_LIST_HEAD(&rbdc
->node
);
495 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
497 rbdc
->client
= ceph_create_client(ceph_opts
, rbdc
, 0, 0);
498 if (IS_ERR(rbdc
->client
))
500 ceph_opts
= NULL
; /* Now rbdc->client is responsible for ceph_opts */
502 ret
= ceph_open_session(rbdc
->client
);
506 spin_lock(&rbd_client_list_lock
);
507 list_add_tail(&rbdc
->node
, &rbd_client_list
);
508 spin_unlock(&rbd_client_list_lock
);
510 mutex_unlock(&ctl_mutex
);
511 dout("%s: rbdc %p\n", __func__
, rbdc
);
516 ceph_destroy_client(rbdc
->client
);
518 mutex_unlock(&ctl_mutex
);
522 ceph_destroy_options(ceph_opts
);
523 dout("%s: error %d\n", __func__
, ret
);
528 static struct rbd_client
*__rbd_get_client(struct rbd_client
*rbdc
)
530 kref_get(&rbdc
->kref
);
536 * Find a ceph client with specific addr and configuration. If
537 * found, bump its reference count.
539 static struct rbd_client
*rbd_client_find(struct ceph_options
*ceph_opts
)
541 struct rbd_client
*client_node
;
544 if (ceph_opts
->flags
& CEPH_OPT_NOSHARE
)
547 spin_lock(&rbd_client_list_lock
);
548 list_for_each_entry(client_node
, &rbd_client_list
, node
) {
549 if (!ceph_compare_options(ceph_opts
, client_node
->client
)) {
550 __rbd_get_client(client_node
);
556 spin_unlock(&rbd_client_list_lock
);
558 return found
? client_node
: NULL
;
568 /* string args above */
571 /* Boolean args above */
575 static match_table_t rbd_opts_tokens
= {
577 /* string args above */
578 {Opt_read_only
, "read_only"},
579 {Opt_read_only
, "ro"}, /* Alternate spelling */
580 {Opt_read_write
, "read_write"},
581 {Opt_read_write
, "rw"}, /* Alternate spelling */
582 /* Boolean args above */
590 #define RBD_READ_ONLY_DEFAULT false
592 static int parse_rbd_opts_token(char *c
, void *private)
594 struct rbd_options
*rbd_opts
= private;
595 substring_t argstr
[MAX_OPT_ARGS
];
596 int token
, intval
, ret
;
598 token
= match_token(c
, rbd_opts_tokens
, argstr
);
602 if (token
< Opt_last_int
) {
603 ret
= match_int(&argstr
[0], &intval
);
605 pr_err("bad mount option arg (not int) "
609 dout("got int token %d val %d\n", token
, intval
);
610 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
611 dout("got string token %d val %s\n", token
,
613 } else if (token
> Opt_last_string
&& token
< Opt_last_bool
) {
614 dout("got Boolean token %d\n", token
);
616 dout("got token %d\n", token
);
621 rbd_opts
->read_only
= true;
624 rbd_opts
->read_only
= false;
634 * Get a ceph client with specific addr and configuration, if one does
635 * not exist create it.
637 static struct rbd_client
*rbd_get_client(struct ceph_options
*ceph_opts
)
639 struct rbd_client
*rbdc
;
641 rbdc
= rbd_client_find(ceph_opts
);
642 if (rbdc
) /* using an existing client */
643 ceph_destroy_options(ceph_opts
);
645 rbdc
= rbd_client_create(ceph_opts
);
651 * Destroy ceph client
653 * Caller must hold rbd_client_list_lock.
655 static void rbd_client_release(struct kref
*kref
)
657 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
659 dout("%s: rbdc %p\n", __func__
, rbdc
);
660 spin_lock(&rbd_client_list_lock
);
661 list_del(&rbdc
->node
);
662 spin_unlock(&rbd_client_list_lock
);
664 ceph_destroy_client(rbdc
->client
);
669 * Drop reference to ceph client node. If it's not referenced anymore, release
672 static void rbd_put_client(struct rbd_client
*rbdc
)
675 kref_put(&rbdc
->kref
, rbd_client_release
);
678 static bool rbd_image_format_valid(u32 image_format
)
680 return image_format
== 1 || image_format
== 2;
683 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk
*ondisk
)
688 /* The header has to start with the magic rbd header text */
689 if (memcmp(&ondisk
->text
, RBD_HEADER_TEXT
, sizeof (RBD_HEADER_TEXT
)))
692 /* The bio layer requires at least sector-sized I/O */
694 if (ondisk
->options
.order
< SECTOR_SHIFT
)
697 /* If we use u64 in a few spots we may be able to loosen this */
699 if (ondisk
->options
.order
> 8 * sizeof (int) - 1)
703 * The size of a snapshot header has to fit in a size_t, and
704 * that limits the number of snapshots.
706 snap_count
= le32_to_cpu(ondisk
->snap_count
);
707 size
= SIZE_MAX
- sizeof (struct ceph_snap_context
);
708 if (snap_count
> size
/ sizeof (__le64
))
712 * Not only that, but the size of the entire the snapshot
713 * header must also be representable in a size_t.
715 size
-= snap_count
* sizeof (__le64
);
716 if ((u64
) size
< le64_to_cpu(ondisk
->snap_names_len
))
723 * Create a new header structure, translate header format from the on-disk
726 static int rbd_header_from_disk(struct rbd_image_header
*header
,
727 struct rbd_image_header_ondisk
*ondisk
)
734 memset(header
, 0, sizeof (*header
));
736 snap_count
= le32_to_cpu(ondisk
->snap_count
);
738 len
= strnlen(ondisk
->object_prefix
, sizeof (ondisk
->object_prefix
));
739 header
->object_prefix
= kmalloc(len
+ 1, GFP_KERNEL
);
740 if (!header
->object_prefix
)
742 memcpy(header
->object_prefix
, ondisk
->object_prefix
, len
);
743 header
->object_prefix
[len
] = '\0';
746 u64 snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
748 /* Save a copy of the snapshot names */
750 if (snap_names_len
> (u64
) SIZE_MAX
)
752 header
->snap_names
= kmalloc(snap_names_len
, GFP_KERNEL
);
753 if (!header
->snap_names
)
756 * Note that rbd_dev_v1_header_read() guarantees
757 * the ondisk buffer we're working with has
758 * snap_names_len bytes beyond the end of the
759 * snapshot id array, this memcpy() is safe.
761 memcpy(header
->snap_names
, &ondisk
->snaps
[snap_count
],
764 /* Record each snapshot's size */
766 size
= snap_count
* sizeof (*header
->snap_sizes
);
767 header
->snap_sizes
= kmalloc(size
, GFP_KERNEL
);
768 if (!header
->snap_sizes
)
770 for (i
= 0; i
< snap_count
; i
++)
771 header
->snap_sizes
[i
] =
772 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
774 WARN_ON(ondisk
->snap_names_len
);
775 header
->snap_names
= NULL
;
776 header
->snap_sizes
= NULL
;
779 header
->features
= 0; /* No features support in v1 images */
780 header
->obj_order
= ondisk
->options
.order
;
781 header
->crypt_type
= ondisk
->options
.crypt_type
;
782 header
->comp_type
= ondisk
->options
.comp_type
;
784 /* Allocate and fill in the snapshot context */
786 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
787 size
= sizeof (struct ceph_snap_context
);
788 size
+= snap_count
* sizeof (header
->snapc
->snaps
[0]);
789 header
->snapc
= kzalloc(size
, GFP_KERNEL
);
793 atomic_set(&header
->snapc
->nref
, 1);
794 header
->snapc
->seq
= le64_to_cpu(ondisk
->snap_seq
);
795 header
->snapc
->num_snaps
= snap_count
;
796 for (i
= 0; i
< snap_count
; i
++)
797 header
->snapc
->snaps
[i
] =
798 le64_to_cpu(ondisk
->snaps
[i
].id
);
803 kfree(header
->snap_sizes
);
804 header
->snap_sizes
= NULL
;
805 kfree(header
->snap_names
);
806 header
->snap_names
= NULL
;
807 kfree(header
->object_prefix
);
808 header
->object_prefix
= NULL
;
813 static const char *rbd_snap_name(struct rbd_device
*rbd_dev
, u64 snap_id
)
815 struct rbd_snap
*snap
;
817 if (snap_id
== CEPH_NOSNAP
)
818 return RBD_SNAP_HEAD_NAME
;
820 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
)
821 if (snap_id
== snap
->id
)
827 static int snap_by_name(struct rbd_device
*rbd_dev
, const char *snap_name
)
830 struct rbd_snap
*snap
;
832 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
833 if (!strcmp(snap_name
, snap
->name
)) {
834 rbd_dev
->spec
->snap_id
= snap
->id
;
835 rbd_dev
->mapping
.size
= snap
->size
;
836 rbd_dev
->mapping
.features
= snap
->features
;
845 static int rbd_dev_set_mapping(struct rbd_device
*rbd_dev
)
849 if (!memcmp(rbd_dev
->spec
->snap_name
, RBD_SNAP_HEAD_NAME
,
850 sizeof (RBD_SNAP_HEAD_NAME
))) {
851 rbd_dev
->spec
->snap_id
= CEPH_NOSNAP
;
852 rbd_dev
->mapping
.size
= rbd_dev
->header
.image_size
;
853 rbd_dev
->mapping
.features
= rbd_dev
->header
.features
;
856 ret
= snap_by_name(rbd_dev
, rbd_dev
->spec
->snap_name
);
859 rbd_dev
->mapping
.read_only
= true;
861 set_bit(RBD_DEV_FLAG_EXISTS
, &rbd_dev
->flags
);
867 static void rbd_header_free(struct rbd_image_header
*header
)
869 kfree(header
->object_prefix
);
870 header
->object_prefix
= NULL
;
871 kfree(header
->snap_sizes
);
872 header
->snap_sizes
= NULL
;
873 kfree(header
->snap_names
);
874 header
->snap_names
= NULL
;
875 ceph_put_snap_context(header
->snapc
);
876 header
->snapc
= NULL
;
879 static const char *rbd_segment_name(struct rbd_device
*rbd_dev
, u64 offset
)
885 name
= kmalloc(MAX_OBJ_NAME_SIZE
+ 1, GFP_NOIO
);
888 segment
= offset
>> rbd_dev
->header
.obj_order
;
889 ret
= snprintf(name
, MAX_OBJ_NAME_SIZE
+ 1, "%s.%012llx",
890 rbd_dev
->header
.object_prefix
, segment
);
891 if (ret
< 0 || ret
> MAX_OBJ_NAME_SIZE
) {
892 pr_err("error formatting segment name for #%llu (%d)\n",
901 static u64
rbd_segment_offset(struct rbd_device
*rbd_dev
, u64 offset
)
903 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
905 return offset
& (segment_size
- 1);
908 static u64
rbd_segment_length(struct rbd_device
*rbd_dev
,
909 u64 offset
, u64 length
)
911 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
913 offset
&= segment_size
- 1;
915 rbd_assert(length
<= U64_MAX
- offset
);
916 if (offset
+ length
> segment_size
)
917 length
= segment_size
- offset
;
923 * returns the size of an object in the image
925 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
927 return 1 << header
->obj_order
;
934 static void bio_chain_put(struct bio
*chain
)
940 chain
= chain
->bi_next
;
946 * zeros a bio chain, starting at specific offset
948 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
957 bio_for_each_segment(bv
, chain
, i
) {
958 if (pos
+ bv
->bv_len
> start_ofs
) {
959 int remainder
= max(start_ofs
- pos
, 0);
960 buf
= bvec_kmap_irq(bv
, &flags
);
961 memset(buf
+ remainder
, 0,
962 bv
->bv_len
- remainder
);
963 bvec_kunmap_irq(buf
, &flags
);
968 chain
= chain
->bi_next
;
973 * Clone a portion of a bio, starting at the given byte offset
974 * and continuing for the number of bytes indicated.
976 static struct bio
*bio_clone_range(struct bio
*bio_src
,
985 unsigned short end_idx
;
989 /* Handle the easy case for the caller */
991 if (!offset
&& len
== bio_src
->bi_size
)
992 return bio_clone(bio_src
, gfpmask
);
994 if (WARN_ON_ONCE(!len
))
996 if (WARN_ON_ONCE(len
> bio_src
->bi_size
))
998 if (WARN_ON_ONCE(offset
> bio_src
->bi_size
- len
))
1001 /* Find first affected segment... */
1004 __bio_for_each_segment(bv
, bio_src
, idx
, 0) {
1005 if (resid
< bv
->bv_len
)
1007 resid
-= bv
->bv_len
;
1011 /* ...and the last affected segment */
1014 __bio_for_each_segment(bv
, bio_src
, end_idx
, idx
) {
1015 if (resid
<= bv
->bv_len
)
1017 resid
-= bv
->bv_len
;
1019 vcnt
= end_idx
- idx
+ 1;
1021 /* Build the clone */
1023 bio
= bio_alloc(gfpmask
, (unsigned int) vcnt
);
1025 return NULL
; /* ENOMEM */
1027 bio
->bi_bdev
= bio_src
->bi_bdev
;
1028 bio
->bi_sector
= bio_src
->bi_sector
+ (offset
>> SECTOR_SHIFT
);
1029 bio
->bi_rw
= bio_src
->bi_rw
;
1030 bio
->bi_flags
|= 1 << BIO_CLONED
;
1033 * Copy over our part of the bio_vec, then update the first
1034 * and last (or only) entries.
1036 memcpy(&bio
->bi_io_vec
[0], &bio_src
->bi_io_vec
[idx
],
1037 vcnt
* sizeof (struct bio_vec
));
1038 bio
->bi_io_vec
[0].bv_offset
+= voff
;
1040 bio
->bi_io_vec
[0].bv_len
-= voff
;
1041 bio
->bi_io_vec
[vcnt
- 1].bv_len
= resid
;
1043 bio
->bi_io_vec
[0].bv_len
= len
;
1046 bio
->bi_vcnt
= vcnt
;
1054 * Clone a portion of a bio chain, starting at the given byte offset
1055 * into the first bio in the source chain and continuing for the
1056 * number of bytes indicated. The result is another bio chain of
1057 * exactly the given length, or a null pointer on error.
1059 * The bio_src and offset parameters are both in-out. On entry they
1060 * refer to the first source bio and the offset into that bio where
1061 * the start of data to be cloned is located.
1063 * On return, bio_src is updated to refer to the bio in the source
1064 * chain that contains first un-cloned byte, and *offset will
1065 * contain the offset of that byte within that bio.
1067 static struct bio
*bio_chain_clone_range(struct bio
**bio_src
,
1068 unsigned int *offset
,
1072 struct bio
*bi
= *bio_src
;
1073 unsigned int off
= *offset
;
1074 struct bio
*chain
= NULL
;
1077 /* Build up a chain of clone bios up to the limit */
1079 if (!bi
|| off
>= bi
->bi_size
|| !len
)
1080 return NULL
; /* Nothing to clone */
1084 unsigned int bi_size
;
1088 rbd_warn(NULL
, "bio_chain exhausted with %u left", len
);
1089 goto out_err
; /* EINVAL; ran out of bio's */
1091 bi_size
= min_t(unsigned int, bi
->bi_size
- off
, len
);
1092 bio
= bio_clone_range(bi
, off
, bi_size
, gfpmask
);
1094 goto out_err
; /* ENOMEM */
1097 end
= &bio
->bi_next
;
1100 if (off
== bi
->bi_size
) {
1111 bio_chain_put(chain
);
1117 * The default/initial value for all object request flags is 0. For
1118 * each flag, once its value is set to 1 it is never reset to 0
1121 static void obj_request_img_data_set(struct rbd_obj_request
*obj_request
)
1123 if (test_and_set_bit(OBJ_REQ_IMG_DATA
, &obj_request
->flags
)) {
1124 struct rbd_device
*rbd_dev
;
1126 rbd_dev
= obj_request
->img_request
->rbd_dev
;
1127 rbd_warn(rbd_dev
, "obj_request %p already marked img_data\n",
1132 static bool obj_request_img_data_test(struct rbd_obj_request
*obj_request
)
1135 return test_bit(OBJ_REQ_IMG_DATA
, &obj_request
->flags
) != 0;
1138 static void obj_request_done_set(struct rbd_obj_request
*obj_request
)
1140 if (test_and_set_bit(OBJ_REQ_DONE
, &obj_request
->flags
)) {
1141 struct rbd_device
*rbd_dev
= NULL
;
1143 if (obj_request_img_data_test(obj_request
))
1144 rbd_dev
= obj_request
->img_request
->rbd_dev
;
1145 rbd_warn(rbd_dev
, "obj_request %p already marked done\n",
1150 static bool obj_request_done_test(struct rbd_obj_request
*obj_request
)
1153 return test_bit(OBJ_REQ_DONE
, &obj_request
->flags
) != 0;
1157 * This sets the KNOWN flag after (possibly) setting the EXISTS
1158 * flag. The latter is set based on the "exists" value provided.
1160 * Note that for our purposes once an object exists it never goes
1161 * away again. It's possible that the response from two existence
1162 * checks are separated by the creation of the target object, and
1163 * the first ("doesn't exist") response arrives *after* the second
1164 * ("does exist"). In that case we ignore the second one.
1166 static void obj_request_existence_set(struct rbd_obj_request
*obj_request
,
1170 set_bit(OBJ_REQ_EXISTS
, &obj_request
->flags
);
1171 set_bit(OBJ_REQ_KNOWN
, &obj_request
->flags
);
1175 static bool obj_request_known_test(struct rbd_obj_request
*obj_request
)
1178 return test_bit(OBJ_REQ_KNOWN
, &obj_request
->flags
) != 0;
1181 static bool obj_request_exists_test(struct rbd_obj_request
*obj_request
)
1184 return test_bit(OBJ_REQ_EXISTS
, &obj_request
->flags
) != 0;
1187 static void rbd_obj_request_get(struct rbd_obj_request
*obj_request
)
1189 dout("%s: obj %p (was %d)\n", __func__
, obj_request
,
1190 atomic_read(&obj_request
->kref
.refcount
));
1191 kref_get(&obj_request
->kref
);
1194 static void rbd_obj_request_destroy(struct kref
*kref
);
1195 static void rbd_obj_request_put(struct rbd_obj_request
*obj_request
)
1197 rbd_assert(obj_request
!= NULL
);
1198 dout("%s: obj %p (was %d)\n", __func__
, obj_request
,
1199 atomic_read(&obj_request
->kref
.refcount
));
1200 kref_put(&obj_request
->kref
, rbd_obj_request_destroy
);
1203 static void rbd_img_request_get(struct rbd_img_request
*img_request
)
1205 dout("%s: img %p (was %d)\n", __func__
, img_request
,
1206 atomic_read(&img_request
->kref
.refcount
));
1207 kref_get(&img_request
->kref
);
1210 static void rbd_img_request_destroy(struct kref
*kref
);
1211 static void rbd_img_request_put(struct rbd_img_request
*img_request
)
1213 rbd_assert(img_request
!= NULL
);
1214 dout("%s: img %p (was %d)\n", __func__
, img_request
,
1215 atomic_read(&img_request
->kref
.refcount
));
1216 kref_put(&img_request
->kref
, rbd_img_request_destroy
);
1219 static inline void rbd_img_obj_request_add(struct rbd_img_request
*img_request
,
1220 struct rbd_obj_request
*obj_request
)
1222 rbd_assert(obj_request
->img_request
== NULL
);
1224 /* Image request now owns object's original reference */
1225 obj_request
->img_request
= img_request
;
1226 obj_request
->which
= img_request
->obj_request_count
;
1227 rbd_assert(!obj_request_img_data_test(obj_request
));
1228 obj_request_img_data_set(obj_request
);
1229 rbd_assert(obj_request
->which
!= BAD_WHICH
);
1230 img_request
->obj_request_count
++;
1231 list_add_tail(&obj_request
->links
, &img_request
->obj_requests
);
1232 dout("%s: img %p obj %p w=%u\n", __func__
, img_request
, obj_request
,
1233 obj_request
->which
);
1236 static inline void rbd_img_obj_request_del(struct rbd_img_request
*img_request
,
1237 struct rbd_obj_request
*obj_request
)
1239 rbd_assert(obj_request
->which
!= BAD_WHICH
);
1241 dout("%s: img %p obj %p w=%u\n", __func__
, img_request
, obj_request
,
1242 obj_request
->which
);
1243 list_del(&obj_request
->links
);
1244 rbd_assert(img_request
->obj_request_count
> 0);
1245 img_request
->obj_request_count
--;
1246 rbd_assert(obj_request
->which
== img_request
->obj_request_count
);
1247 obj_request
->which
= BAD_WHICH
;
1248 rbd_assert(obj_request_img_data_test(obj_request
));
1249 rbd_assert(obj_request
->img_request
== img_request
);
1250 obj_request
->img_request
= NULL
;
1251 obj_request
->callback
= NULL
;
1252 rbd_obj_request_put(obj_request
);
1255 static bool obj_request_type_valid(enum obj_request_type type
)
1258 case OBJ_REQUEST_NODATA
:
1259 case OBJ_REQUEST_BIO
:
1260 case OBJ_REQUEST_PAGES
:
1267 static int rbd_obj_request_submit(struct ceph_osd_client
*osdc
,
1268 struct rbd_obj_request
*obj_request
)
1270 dout("%s: osdc %p obj %p\n", __func__
, osdc
, obj_request
);
1272 return ceph_osdc_start_request(osdc
, obj_request
->osd_req
, false);
1275 static void rbd_img_request_complete(struct rbd_img_request
*img_request
)
1278 dout("%s: img %p\n", __func__
, img_request
);
1281 * If no error occurred, compute the aggregate transfer
1282 * count for the image request. We could instead use
1283 * atomic64_cmpxchg() to update it as each object request
1284 * completes; not clear which way is better off hand.
1286 if (!img_request
->result
) {
1287 struct rbd_obj_request
*obj_request
;
1290 for_each_obj_request(img_request
, obj_request
)
1291 xferred
+= obj_request
->xferred
;
1292 img_request
->xferred
= xferred
;
1295 if (img_request
->callback
)
1296 img_request
->callback(img_request
);
1298 rbd_img_request_put(img_request
);
1301 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1303 static int rbd_obj_request_wait(struct rbd_obj_request
*obj_request
)
1305 dout("%s: obj %p\n", __func__
, obj_request
);
1307 return wait_for_completion_interruptible(&obj_request
->completion
);
1311 * The default/initial value for all image request flags is 0. Each
1312 * is conditionally set to 1 at image request initialization time
1313 * and currently never change thereafter.
1315 static void img_request_write_set(struct rbd_img_request
*img_request
)
1317 set_bit(IMG_REQ_WRITE
, &img_request
->flags
);
1321 static bool img_request_write_test(struct rbd_img_request
*img_request
)
1324 return test_bit(IMG_REQ_WRITE
, &img_request
->flags
) != 0;
1327 static void img_request_child_set(struct rbd_img_request
*img_request
)
1329 set_bit(IMG_REQ_CHILD
, &img_request
->flags
);
1333 static bool img_request_child_test(struct rbd_img_request
*img_request
)
1336 return test_bit(IMG_REQ_CHILD
, &img_request
->flags
) != 0;
1339 static void img_request_layered_set(struct rbd_img_request
*img_request
)
1341 set_bit(IMG_REQ_LAYERED
, &img_request
->flags
);
1345 static bool img_request_layered_test(struct rbd_img_request
*img_request
)
1348 return test_bit(IMG_REQ_LAYERED
, &img_request
->flags
) != 0;
1352 rbd_img_obj_request_read_callback(struct rbd_obj_request
*obj_request
)
1354 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__
,
1355 obj_request
, obj_request
->img_request
, obj_request
->result
,
1356 obj_request
->xferred
, obj_request
->length
);
1358 * ENOENT means a hole in the image. We zero-fill the
1359 * entire length of the request. A short read also implies
1360 * zero-fill to the end of the request. Either way we
1361 * update the xferred count to indicate the whole request
1364 BUG_ON(obj_request
->type
!= OBJ_REQUEST_BIO
);
1365 if (obj_request
->result
== -ENOENT
) {
1366 zero_bio_chain(obj_request
->bio_list
, 0);
1367 obj_request
->result
= 0;
1368 obj_request
->xferred
= obj_request
->length
;
1369 } else if (obj_request
->xferred
< obj_request
->length
&&
1370 !obj_request
->result
) {
1371 zero_bio_chain(obj_request
->bio_list
, obj_request
->xferred
);
1372 obj_request
->xferred
= obj_request
->length
;
1374 obj_request_done_set(obj_request
);
1377 static void rbd_obj_request_complete(struct rbd_obj_request
*obj_request
)
1379 dout("%s: obj %p cb %p\n", __func__
, obj_request
,
1380 obj_request
->callback
);
1381 if (obj_request
->callback
)
1382 obj_request
->callback(obj_request
);
1384 complete_all(&obj_request
->completion
);
1387 static void rbd_osd_trivial_callback(struct rbd_obj_request
*obj_request
)
1389 dout("%s: obj %p\n", __func__
, obj_request
);
1390 obj_request_done_set(obj_request
);
1393 static void rbd_osd_read_callback(struct rbd_obj_request
*obj_request
)
1395 struct rbd_img_request
*img_request
= NULL
;
1396 bool layered
= false;
1398 if (obj_request_img_data_test(obj_request
)) {
1399 img_request
= obj_request
->img_request
;
1400 layered
= img_request
&& img_request_layered_test(img_request
);
1406 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__
,
1407 obj_request
, img_request
, obj_request
->result
,
1408 obj_request
->xferred
, obj_request
->length
);
1409 if (layered
&& obj_request
->result
== -ENOENT
)
1410 rbd_img_parent_read(obj_request
);
1411 else if (img_request
)
1412 rbd_img_obj_request_read_callback(obj_request
);
1414 obj_request_done_set(obj_request
);
1417 static void rbd_osd_write_callback(struct rbd_obj_request
*obj_request
)
1419 dout("%s: obj %p result %d %llu\n", __func__
, obj_request
,
1420 obj_request
->result
, obj_request
->length
);
1422 * There is no such thing as a successful short write. Set
1423 * it to our originally-requested length.
1425 obj_request
->xferred
= obj_request
->length
;
1426 obj_request_done_set(obj_request
);
1430 * For a simple stat call there's nothing to do. We'll do more if
1431 * this is part of a write sequence for a layered image.
1433 static void rbd_osd_stat_callback(struct rbd_obj_request
*obj_request
)
1435 dout("%s: obj %p\n", __func__
, obj_request
);
1436 obj_request_done_set(obj_request
);
1439 static void rbd_osd_req_callback(struct ceph_osd_request
*osd_req
,
1440 struct ceph_msg
*msg
)
1442 struct rbd_obj_request
*obj_request
= osd_req
->r_priv
;
1445 dout("%s: osd_req %p msg %p\n", __func__
, osd_req
, msg
);
1446 rbd_assert(osd_req
== obj_request
->osd_req
);
1447 if (obj_request_img_data_test(obj_request
)) {
1448 rbd_assert(obj_request
->img_request
);
1449 rbd_assert(obj_request
->which
!= BAD_WHICH
);
1451 rbd_assert(obj_request
->which
== BAD_WHICH
);
1454 if (osd_req
->r_result
< 0)
1455 obj_request
->result
= osd_req
->r_result
;
1456 obj_request
->version
= le64_to_cpu(osd_req
->r_reassert_version
.version
);
1458 WARN_ON(osd_req
->r_num_ops
!= 1); /* For now */
1461 * We support a 64-bit length, but ultimately it has to be
1462 * passed to blk_end_request(), which takes an unsigned int.
1464 obj_request
->xferred
= osd_req
->r_reply_op_len
[0];
1465 rbd_assert(obj_request
->xferred
< (u64
)UINT_MAX
);
1466 opcode
= osd_req
->r_ops
[0].op
;
1468 case CEPH_OSD_OP_READ
:
1469 rbd_osd_read_callback(obj_request
);
1471 case CEPH_OSD_OP_WRITE
:
1472 rbd_osd_write_callback(obj_request
);
1474 case CEPH_OSD_OP_STAT
:
1475 rbd_osd_stat_callback(obj_request
);
1477 case CEPH_OSD_OP_CALL
:
1478 case CEPH_OSD_OP_NOTIFY_ACK
:
1479 case CEPH_OSD_OP_WATCH
:
1480 rbd_osd_trivial_callback(obj_request
);
1483 rbd_warn(NULL
, "%s: unsupported op %hu\n",
1484 obj_request
->object_name
, (unsigned short) opcode
);
1488 if (obj_request_done_test(obj_request
))
1489 rbd_obj_request_complete(obj_request
);
1492 static void rbd_osd_req_format(struct rbd_obj_request
*obj_request
,
1495 struct rbd_img_request
*img_request
= obj_request
->img_request
;
1496 struct ceph_osd_request
*osd_req
= obj_request
->osd_req
;
1497 struct ceph_snap_context
*snapc
= NULL
;
1498 u64 snap_id
= CEPH_NOSNAP
;
1499 struct timespec
*mtime
= NULL
;
1500 struct timespec now
;
1502 rbd_assert(osd_req
!= NULL
);
1504 if (write_request
) {
1508 snapc
= img_request
->snapc
;
1509 } else if (img_request
) {
1510 snap_id
= img_request
->snap_id
;
1512 ceph_osdc_build_request(osd_req
, obj_request
->offset
,
1513 snapc
, snap_id
, mtime
);
1516 static struct ceph_osd_request
*rbd_osd_req_create(
1517 struct rbd_device
*rbd_dev
,
1519 struct rbd_obj_request
*obj_request
)
1521 struct ceph_snap_context
*snapc
= NULL
;
1522 struct ceph_osd_client
*osdc
;
1523 struct ceph_osd_request
*osd_req
;
1525 if (obj_request_img_data_test(obj_request
)) {
1526 struct rbd_img_request
*img_request
= obj_request
->img_request
;
1528 rbd_assert(write_request
==
1529 img_request_write_test(img_request
));
1531 snapc
= img_request
->snapc
;
1534 /* Allocate and initialize the request, for the single op */
1536 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1537 osd_req
= ceph_osdc_alloc_request(osdc
, snapc
, 1, false, GFP_ATOMIC
);
1539 return NULL
; /* ENOMEM */
1542 osd_req
->r_flags
= CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
;
1544 osd_req
->r_flags
= CEPH_OSD_FLAG_READ
;
1546 osd_req
->r_callback
= rbd_osd_req_callback
;
1547 osd_req
->r_priv
= obj_request
;
1549 osd_req
->r_oid_len
= strlen(obj_request
->object_name
);
1550 rbd_assert(osd_req
->r_oid_len
< sizeof (osd_req
->r_oid
));
1551 memcpy(osd_req
->r_oid
, obj_request
->object_name
, osd_req
->r_oid_len
);
1553 osd_req
->r_file_layout
= rbd_dev
->layout
; /* struct */
1558 static void rbd_osd_req_destroy(struct ceph_osd_request
*osd_req
)
1560 ceph_osdc_put_request(osd_req
);
1563 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1565 static struct rbd_obj_request
*rbd_obj_request_create(const char *object_name
,
1566 u64 offset
, u64 length
,
1567 enum obj_request_type type
)
1569 struct rbd_obj_request
*obj_request
;
1573 rbd_assert(obj_request_type_valid(type
));
1575 size
= strlen(object_name
) + 1;
1576 obj_request
= kzalloc(sizeof (*obj_request
) + size
, GFP_KERNEL
);
1580 name
= (char *)(obj_request
+ 1);
1581 obj_request
->object_name
= memcpy(name
, object_name
, size
);
1582 obj_request
->offset
= offset
;
1583 obj_request
->length
= length
;
1584 obj_request
->flags
= 0;
1585 obj_request
->which
= BAD_WHICH
;
1586 obj_request
->type
= type
;
1587 INIT_LIST_HEAD(&obj_request
->links
);
1588 init_completion(&obj_request
->completion
);
1589 kref_init(&obj_request
->kref
);
1591 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__
, object_name
,
1592 offset
, length
, (int)type
, obj_request
);
1597 static void rbd_obj_request_destroy(struct kref
*kref
)
1599 struct rbd_obj_request
*obj_request
;
1601 obj_request
= container_of(kref
, struct rbd_obj_request
, kref
);
1603 dout("%s: obj %p\n", __func__
, obj_request
);
1605 rbd_assert(obj_request
->img_request
== NULL
);
1606 rbd_assert(obj_request
->which
== BAD_WHICH
);
1608 if (obj_request
->osd_req
)
1609 rbd_osd_req_destroy(obj_request
->osd_req
);
1611 rbd_assert(obj_request_type_valid(obj_request
->type
));
1612 switch (obj_request
->type
) {
1613 case OBJ_REQUEST_NODATA
:
1614 break; /* Nothing to do */
1615 case OBJ_REQUEST_BIO
:
1616 if (obj_request
->bio_list
)
1617 bio_chain_put(obj_request
->bio_list
);
1619 case OBJ_REQUEST_PAGES
:
1620 if (obj_request
->pages
)
1621 ceph_release_page_vector(obj_request
->pages
,
1622 obj_request
->page_count
);
1630 * Caller is responsible for filling in the list of object requests
1631 * that comprises the image request, and the Linux request pointer
1632 * (if there is one).
1634 static struct rbd_img_request
*rbd_img_request_create(
1635 struct rbd_device
*rbd_dev
,
1636 u64 offset
, u64 length
,
1640 struct rbd_img_request
*img_request
;
1641 struct ceph_snap_context
*snapc
= NULL
;
1643 img_request
= kmalloc(sizeof (*img_request
), GFP_ATOMIC
);
1647 if (write_request
) {
1648 down_read(&rbd_dev
->header_rwsem
);
1649 snapc
= ceph_get_snap_context(rbd_dev
->header
.snapc
);
1650 up_read(&rbd_dev
->header_rwsem
);
1651 if (WARN_ON(!snapc
)) {
1653 return NULL
; /* Shouldn't happen */
1658 img_request
->rq
= NULL
;
1659 img_request
->rbd_dev
= rbd_dev
;
1660 img_request
->offset
= offset
;
1661 img_request
->length
= length
;
1662 img_request
->flags
= 0;
1663 if (write_request
) {
1664 img_request_write_set(img_request
);
1665 img_request
->snapc
= snapc
;
1667 img_request
->snap_id
= rbd_dev
->spec
->snap_id
;
1670 img_request_child_set(img_request
);
1671 if (rbd_dev
->parent_spec
)
1672 img_request_layered_set(img_request
);
1673 spin_lock_init(&img_request
->completion_lock
);
1674 img_request
->next_completion
= 0;
1675 img_request
->callback
= NULL
;
1676 img_request
->result
= 0;
1677 img_request
->obj_request_count
= 0;
1678 INIT_LIST_HEAD(&img_request
->obj_requests
);
1679 kref_init(&img_request
->kref
);
1681 rbd_img_request_get(img_request
); /* Avoid a warning */
1682 rbd_img_request_put(img_request
); /* TEMPORARY */
1684 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__
, rbd_dev
,
1685 write_request
? "write" : "read", offset
, length
,
1691 static void rbd_img_request_destroy(struct kref
*kref
)
1693 struct rbd_img_request
*img_request
;
1694 struct rbd_obj_request
*obj_request
;
1695 struct rbd_obj_request
*next_obj_request
;
1697 img_request
= container_of(kref
, struct rbd_img_request
, kref
);
1699 dout("%s: img %p\n", __func__
, img_request
);
1701 for_each_obj_request_safe(img_request
, obj_request
, next_obj_request
)
1702 rbd_img_obj_request_del(img_request
, obj_request
);
1703 rbd_assert(img_request
->obj_request_count
== 0);
1705 if (img_request_write_test(img_request
))
1706 ceph_put_snap_context(img_request
->snapc
);
1708 if (img_request_child_test(img_request
))
1709 rbd_obj_request_put(img_request
->obj_request
);
1714 static bool rbd_img_obj_end_request(struct rbd_obj_request
*obj_request
)
1716 struct rbd_img_request
*img_request
;
1717 unsigned int xferred
;
1721 rbd_assert(obj_request_img_data_test(obj_request
));
1722 img_request
= obj_request
->img_request
;
1724 rbd_assert(obj_request
->xferred
<= (u64
)UINT_MAX
);
1725 xferred
= (unsigned int)obj_request
->xferred
;
1726 result
= obj_request
->result
;
1728 struct rbd_device
*rbd_dev
= img_request
->rbd_dev
;
1730 rbd_warn(rbd_dev
, "%s %llx at %llx (%llx)\n",
1731 img_request_write_test(img_request
) ? "write" : "read",
1732 obj_request
->length
, obj_request
->img_offset
,
1733 obj_request
->offset
);
1734 rbd_warn(rbd_dev
, " result %d xferred %x\n",
1736 if (!img_request
->result
)
1737 img_request
->result
= result
;
1740 if (img_request_child_test(img_request
)) {
1741 rbd_assert(img_request
->obj_request
!= NULL
);
1742 more
= obj_request
->which
< img_request
->obj_request_count
- 1;
1744 rbd_assert(img_request
->rq
!= NULL
);
1745 more
= blk_end_request(img_request
->rq
, result
, xferred
);
1751 static void rbd_img_obj_callback(struct rbd_obj_request
*obj_request
)
1753 struct rbd_img_request
*img_request
;
1754 u32 which
= obj_request
->which
;
1757 rbd_assert(obj_request_img_data_test(obj_request
));
1758 img_request
= obj_request
->img_request
;
1760 dout("%s: img %p obj %p\n", __func__
, img_request
, obj_request
);
1761 rbd_assert(img_request
!= NULL
);
1762 rbd_assert(img_request
->obj_request_count
> 0);
1763 rbd_assert(which
!= BAD_WHICH
);
1764 rbd_assert(which
< img_request
->obj_request_count
);
1765 rbd_assert(which
>= img_request
->next_completion
);
1767 spin_lock_irq(&img_request
->completion_lock
);
1768 if (which
!= img_request
->next_completion
)
1771 for_each_obj_request_from(img_request
, obj_request
) {
1773 rbd_assert(which
< img_request
->obj_request_count
);
1775 if (!obj_request_done_test(obj_request
))
1777 more
= rbd_img_obj_end_request(obj_request
);
1781 rbd_assert(more
^ (which
== img_request
->obj_request_count
));
1782 img_request
->next_completion
= which
;
1784 spin_unlock_irq(&img_request
->completion_lock
);
1787 rbd_img_request_complete(img_request
);
1790 static int rbd_img_request_fill_bio(struct rbd_img_request
*img_request
,
1791 struct bio
*bio_list
)
1793 struct rbd_device
*rbd_dev
= img_request
->rbd_dev
;
1794 struct rbd_obj_request
*obj_request
= NULL
;
1795 struct rbd_obj_request
*next_obj_request
;
1796 bool write_request
= img_request_write_test(img_request
);
1797 unsigned int bio_offset
;
1802 dout("%s: img %p bio %p\n", __func__
, img_request
, bio_list
);
1804 opcode
= write_request
? CEPH_OSD_OP_WRITE
: CEPH_OSD_OP_READ
;
1806 img_offset
= img_request
->offset
;
1807 rbd_assert(img_offset
== bio_list
->bi_sector
<< SECTOR_SHIFT
);
1808 resid
= img_request
->length
;
1809 rbd_assert(resid
> 0);
1811 struct ceph_osd_request
*osd_req
;
1812 const char *object_name
;
1813 unsigned int clone_size
;
1817 object_name
= rbd_segment_name(rbd_dev
, img_offset
);
1820 offset
= rbd_segment_offset(rbd_dev
, img_offset
);
1821 length
= rbd_segment_length(rbd_dev
, img_offset
, resid
);
1822 obj_request
= rbd_obj_request_create(object_name
,
1825 kfree(object_name
); /* object request has its own copy */
1829 rbd_assert(length
<= (u64
) UINT_MAX
);
1830 clone_size
= (unsigned int) length
;
1831 obj_request
->bio_list
= bio_chain_clone_range(&bio_list
,
1832 &bio_offset
, clone_size
,
1834 if (!obj_request
->bio_list
)
1837 osd_req
= rbd_osd_req_create(rbd_dev
, write_request
,
1841 obj_request
->osd_req
= osd_req
;
1842 obj_request
->callback
= rbd_img_obj_callback
;
1844 osd_req_op_extent_init(osd_req
, 0, opcode
, offset
, length
,
1846 osd_req_op_extent_osd_data_bio(osd_req
, 0,
1847 obj_request
->bio_list
, obj_request
->length
);
1848 rbd_osd_req_format(obj_request
, write_request
);
1850 obj_request
->img_offset
= img_offset
;
1851 rbd_img_obj_request_add(img_request
, obj_request
);
1853 img_offset
+= length
;
1860 rbd_obj_request_put(obj_request
);
1862 for_each_obj_request_safe(img_request
, obj_request
, next_obj_request
)
1863 rbd_obj_request_put(obj_request
);
1868 static void rbd_img_obj_exists_callback(struct rbd_obj_request
*obj_request
)
1870 struct rbd_device
*rbd_dev
;
1871 struct ceph_osd_client
*osdc
;
1872 struct rbd_obj_request
*orig_request
;
1875 rbd_assert(!obj_request_img_data_test(obj_request
));
1878 * All we need from the object request is the original
1879 * request and the result of the STAT op. Grab those, then
1880 * we're done with the request.
1882 orig_request
= obj_request
->obj_request
;
1883 obj_request
->obj_request
= NULL
;
1884 rbd_assert(orig_request
);
1885 rbd_assert(orig_request
->img_request
);
1887 result
= obj_request
->result
;
1888 obj_request
->result
= 0;
1890 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__
,
1891 obj_request
, orig_request
, result
,
1892 obj_request
->xferred
, obj_request
->length
);
1893 rbd_obj_request_put(obj_request
);
1895 rbd_assert(orig_request
);
1896 rbd_assert(orig_request
->img_request
);
1897 rbd_dev
= orig_request
->img_request
->rbd_dev
;
1898 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1901 * Our only purpose here is to determine whether the object
1902 * exists, and we don't want to treat the non-existence as
1903 * an error. If something else comes back, transfer the
1904 * error to the original request and complete it now.
1907 obj_request_existence_set(orig_request
, true);
1908 } else if (result
== -ENOENT
) {
1909 obj_request_existence_set(orig_request
, false);
1910 } else if (result
) {
1911 orig_request
->result
= result
;
1916 * Resubmit the original request now that we have recorded
1917 * whether the target object exists.
1919 orig_request
->result
= rbd_obj_request_submit(osdc
, orig_request
);
1921 if (orig_request
->result
)
1922 rbd_obj_request_complete(orig_request
);
1923 rbd_obj_request_put(orig_request
);
1926 static int rbd_img_obj_exists_submit(struct rbd_obj_request
*obj_request
)
1928 struct rbd_obj_request
*stat_request
;
1929 struct rbd_device
*rbd_dev
;
1930 struct ceph_osd_client
*osdc
;
1931 struct page
**pages
= NULL
;
1937 * The response data for a STAT call consists of:
1944 size
= sizeof (__le64
) + sizeof (__le32
) + sizeof (__le32
);
1945 page_count
= (u32
)calc_pages_for(0, size
);
1946 pages
= ceph_alloc_page_vector(page_count
, GFP_KERNEL
);
1948 return PTR_ERR(pages
);
1951 stat_request
= rbd_obj_request_create(obj_request
->object_name
, 0, 0,
1956 rbd_obj_request_get(obj_request
);
1957 stat_request
->obj_request
= obj_request
;
1958 stat_request
->pages
= pages
;
1959 stat_request
->page_count
= page_count
;
1961 rbd_assert(obj_request
->img_request
);
1962 rbd_dev
= obj_request
->img_request
->rbd_dev
;
1963 stat_request
->osd_req
= rbd_osd_req_create(rbd_dev
, false,
1965 if (!stat_request
->osd_req
)
1967 stat_request
->callback
= rbd_img_obj_exists_callback
;
1969 osd_req_op_init(stat_request
->osd_req
, 0, CEPH_OSD_OP_STAT
);
1970 osd_req_op_raw_data_in_pages(stat_request
->osd_req
, 0, pages
, size
, 0,
1972 rbd_osd_req_format(stat_request
, false);
1974 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1975 ret
= rbd_obj_request_submit(osdc
, stat_request
);
1978 rbd_obj_request_put(obj_request
);
1983 static int rbd_img_request_submit(struct rbd_img_request
*img_request
)
1985 struct rbd_device
*rbd_dev
= img_request
->rbd_dev
;
1986 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1987 struct rbd_obj_request
*obj_request
;
1988 struct rbd_obj_request
*next_obj_request
;
1989 bool write_request
= img_request_write_test(img_request
);
1990 bool layered
= img_request_layered_test(img_request
);
1992 dout("%s: img %p\n", __func__
, img_request
);
1993 for_each_obj_request_safe(img_request
, obj_request
, next_obj_request
) {
1999 * We need to know whether the target object exists
2000 * for a layered write. Issue an existence check
2001 * first if we need to.
2003 known
= obj_request_known_test(obj_request
);
2004 object_exists
= known
&& obj_request_exists_test(obj_request
);
2005 if (!write_request
|| !layered
|| object_exists
)
2006 ret
= rbd_obj_request_submit(osdc
, obj_request
);
2008 ret
= rbd_img_obj_exists_submit(obj_request
);
2016 static void rbd_img_parent_read_callback(struct rbd_img_request
*img_request
)
2018 struct rbd_obj_request
*obj_request
;
2020 rbd_assert(img_request_child_test(img_request
));
2022 obj_request
= img_request
->obj_request
;
2023 rbd_assert(obj_request
!= NULL
);
2024 obj_request
->result
= img_request
->result
;
2025 obj_request
->xferred
= img_request
->xferred
;
2027 rbd_img_obj_request_read_callback(obj_request
);
2028 rbd_obj_request_complete(obj_request
);
2031 static void rbd_img_parent_read(struct rbd_obj_request
*obj_request
)
2033 struct rbd_device
*rbd_dev
;
2034 struct rbd_img_request
*img_request
;
2037 rbd_assert(obj_request_img_data_test(obj_request
));
2038 rbd_assert(obj_request
->img_request
!= NULL
);
2039 rbd_assert(obj_request
->result
== (s32
) -ENOENT
);
2040 rbd_assert(obj_request
->type
== OBJ_REQUEST_BIO
);
2042 rbd_dev
= obj_request
->img_request
->rbd_dev
;
2043 rbd_assert(rbd_dev
->parent
!= NULL
);
2044 /* rbd_read_finish(obj_request, obj_request->length); */
2045 img_request
= rbd_img_request_create(rbd_dev
->parent
,
2046 obj_request
->img_offset
,
2047 obj_request
->length
,
2053 rbd_obj_request_get(obj_request
);
2054 img_request
->obj_request
= obj_request
;
2056 result
= rbd_img_request_fill_bio(img_request
, obj_request
->bio_list
);
2060 img_request
->callback
= rbd_img_parent_read_callback
;
2061 result
= rbd_img_request_submit(img_request
);
2068 rbd_img_request_put(img_request
);
2069 obj_request
->result
= result
;
2070 obj_request
->xferred
= 0;
2071 obj_request_done_set(obj_request
);
2074 static int rbd_obj_notify_ack(struct rbd_device
*rbd_dev
,
2075 u64 ver
, u64 notify_id
)
2077 struct rbd_obj_request
*obj_request
;
2078 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2081 obj_request
= rbd_obj_request_create(rbd_dev
->header_name
, 0, 0,
2082 OBJ_REQUEST_NODATA
);
2087 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, false, obj_request
);
2088 if (!obj_request
->osd_req
)
2090 obj_request
->callback
= rbd_obj_request_put
;
2092 osd_req_op_watch_init(obj_request
->osd_req
, 0, CEPH_OSD_OP_NOTIFY_ACK
,
2094 rbd_osd_req_format(obj_request
, false);
2096 ret
= rbd_obj_request_submit(osdc
, obj_request
);
2099 rbd_obj_request_put(obj_request
);
2104 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
2106 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
2113 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__
,
2114 rbd_dev
->header_name
, (unsigned long long) notify_id
,
2115 (unsigned int) opcode
);
2116 rc
= rbd_dev_refresh(rbd_dev
, &hver
);
2118 rbd_warn(rbd_dev
, "got notification but failed to "
2119 " update snaps: %d\n", rc
);
2121 rbd_obj_notify_ack(rbd_dev
, hver
, notify_id
);
2125 * Request sync osd watch/unwatch. The value of "start" determines
2126 * whether a watch request is being initiated or torn down.
2128 static int rbd_dev_header_watch_sync(struct rbd_device
*rbd_dev
, int start
)
2130 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2131 struct rbd_obj_request
*obj_request
;
2134 rbd_assert(start
^ !!rbd_dev
->watch_event
);
2135 rbd_assert(start
^ !!rbd_dev
->watch_request
);
2138 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, rbd_dev
,
2139 &rbd_dev
->watch_event
);
2142 rbd_assert(rbd_dev
->watch_event
!= NULL
);
2146 obj_request
= rbd_obj_request_create(rbd_dev
->header_name
, 0, 0,
2147 OBJ_REQUEST_NODATA
);
2151 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, true, obj_request
);
2152 if (!obj_request
->osd_req
)
2156 ceph_osdc_set_request_linger(osdc
, obj_request
->osd_req
);
2158 ceph_osdc_unregister_linger_request(osdc
,
2159 rbd_dev
->watch_request
->osd_req
);
2161 osd_req_op_watch_init(obj_request
->osd_req
, 0, CEPH_OSD_OP_WATCH
,
2162 rbd_dev
->watch_event
->cookie
,
2163 rbd_dev
->header
.obj_version
, start
);
2164 rbd_osd_req_format(obj_request
, true);
2166 ret
= rbd_obj_request_submit(osdc
, obj_request
);
2169 ret
= rbd_obj_request_wait(obj_request
);
2172 ret
= obj_request
->result
;
2177 * A watch request is set to linger, so the underlying osd
2178 * request won't go away until we unregister it. We retain
2179 * a pointer to the object request during that time (in
2180 * rbd_dev->watch_request), so we'll keep a reference to
2181 * it. We'll drop that reference (below) after we've
2185 rbd_dev
->watch_request
= obj_request
;
2190 /* We have successfully torn down the watch request */
2192 rbd_obj_request_put(rbd_dev
->watch_request
);
2193 rbd_dev
->watch_request
= NULL
;
2195 /* Cancel the event if we're tearing down, or on error */
2196 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
2197 rbd_dev
->watch_event
= NULL
;
2199 rbd_obj_request_put(obj_request
);
2205 * Synchronous osd object method call
2207 static int rbd_obj_method_sync(struct rbd_device
*rbd_dev
,
2208 const char *object_name
,
2209 const char *class_name
,
2210 const char *method_name
,
2211 const char *outbound
,
2212 size_t outbound_size
,
2214 size_t inbound_size
,
2217 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2218 struct rbd_obj_request
*obj_request
;
2219 struct page
**pages
;
2224 * Method calls are ultimately read operations. The result
2225 * should placed into the inbound buffer provided. They
2226 * also supply outbound data--parameters for the object
2227 * method. Currently if this is present it will be a
2230 page_count
= (u32
) calc_pages_for(0, inbound_size
);
2231 pages
= ceph_alloc_page_vector(page_count
, GFP_KERNEL
);
2233 return PTR_ERR(pages
);
2236 obj_request
= rbd_obj_request_create(object_name
, 0, inbound_size
,
2241 obj_request
->pages
= pages
;
2242 obj_request
->page_count
= page_count
;
2244 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, false, obj_request
);
2245 if (!obj_request
->osd_req
)
2248 osd_req_op_cls_init(obj_request
->osd_req
, 0, CEPH_OSD_OP_CALL
,
2249 class_name
, method_name
);
2250 if (outbound_size
) {
2251 struct ceph_pagelist
*pagelist
;
2253 pagelist
= kmalloc(sizeof (*pagelist
), GFP_NOFS
);
2257 ceph_pagelist_init(pagelist
);
2258 ceph_pagelist_append(pagelist
, outbound
, outbound_size
);
2259 osd_req_op_cls_request_data_pagelist(obj_request
->osd_req
, 0,
2262 osd_req_op_cls_response_data_pages(obj_request
->osd_req
, 0,
2263 obj_request
->pages
, inbound_size
,
2265 rbd_osd_req_format(obj_request
, false);
2267 ret
= rbd_obj_request_submit(osdc
, obj_request
);
2270 ret
= rbd_obj_request_wait(obj_request
);
2274 ret
= obj_request
->result
;
2278 ceph_copy_from_page_vector(pages
, inbound
, 0, obj_request
->xferred
);
2280 *version
= obj_request
->version
;
2283 rbd_obj_request_put(obj_request
);
2285 ceph_release_page_vector(pages
, page_count
);
2290 static void rbd_request_fn(struct request_queue
*q
)
2291 __releases(q
->queue_lock
) __acquires(q
->queue_lock
)
2293 struct rbd_device
*rbd_dev
= q
->queuedata
;
2294 bool read_only
= rbd_dev
->mapping
.read_only
;
2298 while ((rq
= blk_fetch_request(q
))) {
2299 bool write_request
= rq_data_dir(rq
) == WRITE
;
2300 struct rbd_img_request
*img_request
;
2304 /* Ignore any non-FS requests that filter through. */
2306 if (rq
->cmd_type
!= REQ_TYPE_FS
) {
2307 dout("%s: non-fs request type %d\n", __func__
,
2308 (int) rq
->cmd_type
);
2309 __blk_end_request_all(rq
, 0);
2313 /* Ignore/skip any zero-length requests */
2315 offset
= (u64
) blk_rq_pos(rq
) << SECTOR_SHIFT
;
2316 length
= (u64
) blk_rq_bytes(rq
);
2319 dout("%s: zero-length request\n", __func__
);
2320 __blk_end_request_all(rq
, 0);
2324 spin_unlock_irq(q
->queue_lock
);
2326 /* Disallow writes to a read-only device */
2328 if (write_request
) {
2332 rbd_assert(rbd_dev
->spec
->snap_id
== CEPH_NOSNAP
);
2336 * Quit early if the mapped snapshot no longer
2337 * exists. It's still possible the snapshot will
2338 * have disappeared by the time our request arrives
2339 * at the osd, but there's no sense in sending it if
2342 if (!test_bit(RBD_DEV_FLAG_EXISTS
, &rbd_dev
->flags
)) {
2343 dout("request for non-existent snapshot");
2344 rbd_assert(rbd_dev
->spec
->snap_id
!= CEPH_NOSNAP
);
2350 if (WARN_ON(offset
&& length
> U64_MAX
- offset
+ 1))
2351 goto end_request
; /* Shouldn't happen */
2354 img_request
= rbd_img_request_create(rbd_dev
, offset
, length
,
2355 write_request
, false);
2359 img_request
->rq
= rq
;
2361 result
= rbd_img_request_fill_bio(img_request
, rq
->bio
);
2363 result
= rbd_img_request_submit(img_request
);
2365 rbd_img_request_put(img_request
);
2367 spin_lock_irq(q
->queue_lock
);
2369 rbd_warn(rbd_dev
, "%s %llx at %llx result %d\n",
2370 write_request
? "write" : "read",
2371 length
, offset
, result
);
2373 __blk_end_request_all(rq
, result
);
2379 * a queue callback. Makes sure that we don't create a bio that spans across
2380 * multiple osd objects. One exception would be with a single page bios,
2381 * which we handle later at bio_chain_clone_range()
2383 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
2384 struct bio_vec
*bvec
)
2386 struct rbd_device
*rbd_dev
= q
->queuedata
;
2387 sector_t sector_offset
;
2388 sector_t sectors_per_obj
;
2389 sector_t obj_sector_offset
;
2393 * Find how far into its rbd object the partition-relative
2394 * bio start sector is to offset relative to the enclosing
2397 sector_offset
= get_start_sect(bmd
->bi_bdev
) + bmd
->bi_sector
;
2398 sectors_per_obj
= 1 << (rbd_dev
->header
.obj_order
- SECTOR_SHIFT
);
2399 obj_sector_offset
= sector_offset
& (sectors_per_obj
- 1);
2402 * Compute the number of bytes from that offset to the end
2403 * of the object. Account for what's already used by the bio.
2405 ret
= (int) (sectors_per_obj
- obj_sector_offset
) << SECTOR_SHIFT
;
2406 if (ret
> bmd
->bi_size
)
2407 ret
-= bmd
->bi_size
;
2412 * Don't send back more than was asked for. And if the bio
2413 * was empty, let the whole thing through because: "Note
2414 * that a block device *must* allow a single page to be
2415 * added to an empty bio."
2417 rbd_assert(bvec
->bv_len
<= PAGE_SIZE
);
2418 if (ret
> (int) bvec
->bv_len
|| !bmd
->bi_size
)
2419 ret
= (int) bvec
->bv_len
;
2424 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
2426 struct gendisk
*disk
= rbd_dev
->disk
;
2431 if (disk
->flags
& GENHD_FL_UP
)
2434 blk_cleanup_queue(disk
->queue
);
2438 static int rbd_obj_read_sync(struct rbd_device
*rbd_dev
,
2439 const char *object_name
,
2440 u64 offset
, u64 length
,
2441 char *buf
, u64
*version
)
2444 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2445 struct rbd_obj_request
*obj_request
;
2446 struct page
**pages
= NULL
;
2451 page_count
= (u32
) calc_pages_for(offset
, length
);
2452 pages
= ceph_alloc_page_vector(page_count
, GFP_KERNEL
);
2454 ret
= PTR_ERR(pages
);
2457 obj_request
= rbd_obj_request_create(object_name
, offset
, length
,
2462 obj_request
->pages
= pages
;
2463 obj_request
->page_count
= page_count
;
2465 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, false, obj_request
);
2466 if (!obj_request
->osd_req
)
2469 osd_req_op_extent_init(obj_request
->osd_req
, 0, CEPH_OSD_OP_READ
,
2470 offset
, length
, 0, 0);
2471 osd_req_op_extent_osd_data_pages(obj_request
->osd_req
, 0,
2473 obj_request
->length
,
2474 obj_request
->offset
& ~PAGE_MASK
,
2476 rbd_osd_req_format(obj_request
, false);
2478 ret
= rbd_obj_request_submit(osdc
, obj_request
);
2481 ret
= rbd_obj_request_wait(obj_request
);
2485 ret
= obj_request
->result
;
2489 rbd_assert(obj_request
->xferred
<= (u64
) SIZE_MAX
);
2490 size
= (size_t) obj_request
->xferred
;
2491 ceph_copy_from_page_vector(pages
, buf
, 0, size
);
2492 rbd_assert(size
<= (size_t) INT_MAX
);
2495 *version
= obj_request
->version
;
2498 rbd_obj_request_put(obj_request
);
2500 ceph_release_page_vector(pages
, page_count
);
2506 * Read the complete header for the given rbd device.
2508 * Returns a pointer to a dynamically-allocated buffer containing
2509 * the complete and validated header. Caller can pass the address
2510 * of a variable that will be filled in with the version of the
2511 * header object at the time it was read.
2513 * Returns a pointer-coded errno if a failure occurs.
2515 static struct rbd_image_header_ondisk
*
2516 rbd_dev_v1_header_read(struct rbd_device
*rbd_dev
, u64
*version
)
2518 struct rbd_image_header_ondisk
*ondisk
= NULL
;
2525 * The complete header will include an array of its 64-bit
2526 * snapshot ids, followed by the names of those snapshots as
2527 * a contiguous block of NUL-terminated strings. Note that
2528 * the number of snapshots could change by the time we read
2529 * it in, in which case we re-read it.
2536 size
= sizeof (*ondisk
);
2537 size
+= snap_count
* sizeof (struct rbd_image_snap_ondisk
);
2539 ondisk
= kmalloc(size
, GFP_KERNEL
);
2541 return ERR_PTR(-ENOMEM
);
2543 ret
= rbd_obj_read_sync(rbd_dev
, rbd_dev
->header_name
,
2545 (char *) ondisk
, version
);
2548 if (WARN_ON((size_t) ret
< size
)) {
2550 rbd_warn(rbd_dev
, "short header read (want %zd got %d)",
2554 if (!rbd_dev_ondisk_valid(ondisk
)) {
2556 rbd_warn(rbd_dev
, "invalid header");
2560 names_size
= le64_to_cpu(ondisk
->snap_names_len
);
2561 want_count
= snap_count
;
2562 snap_count
= le32_to_cpu(ondisk
->snap_count
);
2563 } while (snap_count
!= want_count
);
2570 return ERR_PTR(ret
);
2574 * reload the ondisk the header
2576 static int rbd_read_header(struct rbd_device
*rbd_dev
,
2577 struct rbd_image_header
*header
)
2579 struct rbd_image_header_ondisk
*ondisk
;
2583 ondisk
= rbd_dev_v1_header_read(rbd_dev
, &ver
);
2585 return PTR_ERR(ondisk
);
2586 ret
= rbd_header_from_disk(header
, ondisk
);
2588 header
->obj_version
= ver
;
2594 static void rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
2596 struct rbd_snap
*snap
;
2597 struct rbd_snap
*next
;
2599 list_for_each_entry_safe(snap
, next
, &rbd_dev
->snaps
, node
)
2600 rbd_remove_snap_dev(snap
);
2603 static void rbd_update_mapping_size(struct rbd_device
*rbd_dev
)
2607 if (rbd_dev
->spec
->snap_id
!= CEPH_NOSNAP
)
2610 size
= (sector_t
) rbd_dev
->header
.image_size
/ SECTOR_SIZE
;
2611 dout("setting size to %llu sectors", (unsigned long long) size
);
2612 rbd_dev
->mapping
.size
= (u64
) size
;
2613 set_capacity(rbd_dev
->disk
, size
);
2617 * only read the first part of the ondisk header, without the snaps info
2619 static int rbd_dev_v1_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
2622 struct rbd_image_header h
;
2624 ret
= rbd_read_header(rbd_dev
, &h
);
2628 down_write(&rbd_dev
->header_rwsem
);
2630 /* Update image size, and check for resize of mapped image */
2631 rbd_dev
->header
.image_size
= h
.image_size
;
2632 rbd_update_mapping_size(rbd_dev
);
2634 /* rbd_dev->header.object_prefix shouldn't change */
2635 kfree(rbd_dev
->header
.snap_sizes
);
2636 kfree(rbd_dev
->header
.snap_names
);
2637 /* osd requests may still refer to snapc */
2638 ceph_put_snap_context(rbd_dev
->header
.snapc
);
2641 *hver
= h
.obj_version
;
2642 rbd_dev
->header
.obj_version
= h
.obj_version
;
2643 rbd_dev
->header
.image_size
= h
.image_size
;
2644 rbd_dev
->header
.snapc
= h
.snapc
;
2645 rbd_dev
->header
.snap_names
= h
.snap_names
;
2646 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
2647 /* Free the extra copy of the object prefix */
2648 WARN_ON(strcmp(rbd_dev
->header
.object_prefix
, h
.object_prefix
));
2649 kfree(h
.object_prefix
);
2651 ret
= rbd_dev_snaps_update(rbd_dev
);
2653 ret
= rbd_dev_snaps_register(rbd_dev
);
2655 up_write(&rbd_dev
->header_rwsem
);
2660 static int rbd_dev_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
2664 rbd_assert(rbd_image_format_valid(rbd_dev
->image_format
));
2665 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2666 if (rbd_dev
->image_format
== 1)
2667 ret
= rbd_dev_v1_refresh(rbd_dev
, hver
);
2669 ret
= rbd_dev_v2_refresh(rbd_dev
, hver
);
2670 mutex_unlock(&ctl_mutex
);
2675 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
2677 struct gendisk
*disk
;
2678 struct request_queue
*q
;
2681 /* create gendisk info */
2682 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
2686 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), RBD_DRV_NAME
"%d",
2688 disk
->major
= rbd_dev
->major
;
2689 disk
->first_minor
= 0;
2690 disk
->fops
= &rbd_bd_ops
;
2691 disk
->private_data
= rbd_dev
;
2693 q
= blk_init_queue(rbd_request_fn
, &rbd_dev
->lock
);
2697 /* We use the default size, but let's be explicit about it. */
2698 blk_queue_physical_block_size(q
, SECTOR_SIZE
);
2700 /* set io sizes to object size */
2701 segment_size
= rbd_obj_bytes(&rbd_dev
->header
);
2702 blk_queue_max_hw_sectors(q
, segment_size
/ SECTOR_SIZE
);
2703 blk_queue_max_segment_size(q
, segment_size
);
2704 blk_queue_io_min(q
, segment_size
);
2705 blk_queue_io_opt(q
, segment_size
);
2707 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
2710 q
->queuedata
= rbd_dev
;
2712 rbd_dev
->disk
= disk
;
2714 set_capacity(rbd_dev
->disk
, rbd_dev
->mapping
.size
/ SECTOR_SIZE
);
2727 static struct rbd_device
*dev_to_rbd_dev(struct device
*dev
)
2729 return container_of(dev
, struct rbd_device
, dev
);
2732 static ssize_t
rbd_size_show(struct device
*dev
,
2733 struct device_attribute
*attr
, char *buf
)
2735 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2738 down_read(&rbd_dev
->header_rwsem
);
2739 size
= get_capacity(rbd_dev
->disk
);
2740 up_read(&rbd_dev
->header_rwsem
);
2742 return sprintf(buf
, "%llu\n", (unsigned long long) size
* SECTOR_SIZE
);
2746 * Note this shows the features for whatever's mapped, which is not
2747 * necessarily the base image.
2749 static ssize_t
rbd_features_show(struct device
*dev
,
2750 struct device_attribute
*attr
, char *buf
)
2752 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2754 return sprintf(buf
, "0x%016llx\n",
2755 (unsigned long long) rbd_dev
->mapping
.features
);
2758 static ssize_t
rbd_major_show(struct device
*dev
,
2759 struct device_attribute
*attr
, char *buf
)
2761 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2763 return sprintf(buf
, "%d\n", rbd_dev
->major
);
2766 static ssize_t
rbd_client_id_show(struct device
*dev
,
2767 struct device_attribute
*attr
, char *buf
)
2769 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2771 return sprintf(buf
, "client%lld\n",
2772 ceph_client_id(rbd_dev
->rbd_client
->client
));
2775 static ssize_t
rbd_pool_show(struct device
*dev
,
2776 struct device_attribute
*attr
, char *buf
)
2778 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2780 return sprintf(buf
, "%s\n", rbd_dev
->spec
->pool_name
);
2783 static ssize_t
rbd_pool_id_show(struct device
*dev
,
2784 struct device_attribute
*attr
, char *buf
)
2786 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2788 return sprintf(buf
, "%llu\n",
2789 (unsigned long long) rbd_dev
->spec
->pool_id
);
2792 static ssize_t
rbd_name_show(struct device
*dev
,
2793 struct device_attribute
*attr
, char *buf
)
2795 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2797 if (rbd_dev
->spec
->image_name
)
2798 return sprintf(buf
, "%s\n", rbd_dev
->spec
->image_name
);
2800 return sprintf(buf
, "(unknown)\n");
2803 static ssize_t
rbd_image_id_show(struct device
*dev
,
2804 struct device_attribute
*attr
, char *buf
)
2806 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2808 return sprintf(buf
, "%s\n", rbd_dev
->spec
->image_id
);
2812 * Shows the name of the currently-mapped snapshot (or
2813 * RBD_SNAP_HEAD_NAME for the base image).
2815 static ssize_t
rbd_snap_show(struct device
*dev
,
2816 struct device_attribute
*attr
,
2819 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2821 return sprintf(buf
, "%s\n", rbd_dev
->spec
->snap_name
);
2825 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2826 * for the parent image. If there is no parent, simply shows
2827 * "(no parent image)".
2829 static ssize_t
rbd_parent_show(struct device
*dev
,
2830 struct device_attribute
*attr
,
2833 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2834 struct rbd_spec
*spec
= rbd_dev
->parent_spec
;
2839 return sprintf(buf
, "(no parent image)\n");
2841 count
= sprintf(bufp
, "pool_id %llu\npool_name %s\n",
2842 (unsigned long long) spec
->pool_id
, spec
->pool_name
);
2847 count
= sprintf(bufp
, "image_id %s\nimage_name %s\n", spec
->image_id
,
2848 spec
->image_name
? spec
->image_name
: "(unknown)");
2853 count
= sprintf(bufp
, "snap_id %llu\nsnap_name %s\n",
2854 (unsigned long long) spec
->snap_id
, spec
->snap_name
);
2859 count
= sprintf(bufp
, "overlap %llu\n", rbd_dev
->parent_overlap
);
2864 return (ssize_t
) (bufp
- buf
);
2867 static ssize_t
rbd_image_refresh(struct device
*dev
,
2868 struct device_attribute
*attr
,
2872 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2875 ret
= rbd_dev_refresh(rbd_dev
, NULL
);
2877 return ret
< 0 ? ret
: size
;
2880 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
2881 static DEVICE_ATTR(features
, S_IRUGO
, rbd_features_show
, NULL
);
2882 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
2883 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
2884 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
2885 static DEVICE_ATTR(pool_id
, S_IRUGO
, rbd_pool_id_show
, NULL
);
2886 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
2887 static DEVICE_ATTR(image_id
, S_IRUGO
, rbd_image_id_show
, NULL
);
2888 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
2889 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
2890 static DEVICE_ATTR(parent
, S_IRUGO
, rbd_parent_show
, NULL
);
2892 static struct attribute
*rbd_attrs
[] = {
2893 &dev_attr_size
.attr
,
2894 &dev_attr_features
.attr
,
2895 &dev_attr_major
.attr
,
2896 &dev_attr_client_id
.attr
,
2897 &dev_attr_pool
.attr
,
2898 &dev_attr_pool_id
.attr
,
2899 &dev_attr_name
.attr
,
2900 &dev_attr_image_id
.attr
,
2901 &dev_attr_current_snap
.attr
,
2902 &dev_attr_parent
.attr
,
2903 &dev_attr_refresh
.attr
,
2907 static struct attribute_group rbd_attr_group
= {
2911 static const struct attribute_group
*rbd_attr_groups
[] = {
2916 static void rbd_sysfs_dev_release(struct device
*dev
)
2920 static struct device_type rbd_device_type
= {
2922 .groups
= rbd_attr_groups
,
2923 .release
= rbd_sysfs_dev_release
,
2931 static ssize_t
rbd_snap_size_show(struct device
*dev
,
2932 struct device_attribute
*attr
,
2935 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2937 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->size
);
2940 static ssize_t
rbd_snap_id_show(struct device
*dev
,
2941 struct device_attribute
*attr
,
2944 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2946 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->id
);
2949 static ssize_t
rbd_snap_features_show(struct device
*dev
,
2950 struct device_attribute
*attr
,
2953 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2955 return sprintf(buf
, "0x%016llx\n",
2956 (unsigned long long) snap
->features
);
2959 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
2960 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
2961 static DEVICE_ATTR(snap_features
, S_IRUGO
, rbd_snap_features_show
, NULL
);
2963 static struct attribute
*rbd_snap_attrs
[] = {
2964 &dev_attr_snap_size
.attr
,
2965 &dev_attr_snap_id
.attr
,
2966 &dev_attr_snap_features
.attr
,
2970 static struct attribute_group rbd_snap_attr_group
= {
2971 .attrs
= rbd_snap_attrs
,
2974 static void rbd_snap_dev_release(struct device
*dev
)
2976 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2981 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
2982 &rbd_snap_attr_group
,
2986 static struct device_type rbd_snap_device_type
= {
2987 .groups
= rbd_snap_attr_groups
,
2988 .release
= rbd_snap_dev_release
,
2991 static struct rbd_spec
*rbd_spec_get(struct rbd_spec
*spec
)
2993 kref_get(&spec
->kref
);
2998 static void rbd_spec_free(struct kref
*kref
);
2999 static void rbd_spec_put(struct rbd_spec
*spec
)
3002 kref_put(&spec
->kref
, rbd_spec_free
);
3005 static struct rbd_spec
*rbd_spec_alloc(void)
3007 struct rbd_spec
*spec
;
3009 spec
= kzalloc(sizeof (*spec
), GFP_KERNEL
);
3012 kref_init(&spec
->kref
);
3017 static void rbd_spec_free(struct kref
*kref
)
3019 struct rbd_spec
*spec
= container_of(kref
, struct rbd_spec
, kref
);
3021 kfree(spec
->pool_name
);
3022 kfree(spec
->image_id
);
3023 kfree(spec
->image_name
);
3024 kfree(spec
->snap_name
);
3028 static struct rbd_device
*rbd_dev_create(struct rbd_client
*rbdc
,
3029 struct rbd_spec
*spec
)
3031 struct rbd_device
*rbd_dev
;
3033 rbd_dev
= kzalloc(sizeof (*rbd_dev
), GFP_KERNEL
);
3037 spin_lock_init(&rbd_dev
->lock
);
3039 INIT_LIST_HEAD(&rbd_dev
->node
);
3040 INIT_LIST_HEAD(&rbd_dev
->snaps
);
3041 init_rwsem(&rbd_dev
->header_rwsem
);
3043 rbd_dev
->spec
= spec
;
3044 rbd_dev
->rbd_client
= rbdc
;
3046 /* Initialize the layout used for all rbd requests */
3048 rbd_dev
->layout
.fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
3049 rbd_dev
->layout
.fl_stripe_count
= cpu_to_le32(1);
3050 rbd_dev
->layout
.fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
3051 rbd_dev
->layout
.fl_pg_pool
= cpu_to_le32((u32
) spec
->pool_id
);
3056 static void rbd_dev_destroy(struct rbd_device
*rbd_dev
)
3058 rbd_spec_put(rbd_dev
->parent_spec
);
3059 kfree(rbd_dev
->header_name
);
3060 rbd_put_client(rbd_dev
->rbd_client
);
3061 rbd_spec_put(rbd_dev
->spec
);
3065 static bool rbd_snap_registered(struct rbd_snap
*snap
)
3067 bool ret
= snap
->dev
.type
== &rbd_snap_device_type
;
3068 bool reg
= device_is_registered(&snap
->dev
);
3070 rbd_assert(!ret
^ reg
);
3075 static void rbd_remove_snap_dev(struct rbd_snap
*snap
)
3077 list_del(&snap
->node
);
3078 if (device_is_registered(&snap
->dev
))
3079 device_unregister(&snap
->dev
);
3082 static int rbd_register_snap_dev(struct rbd_snap
*snap
,
3083 struct device
*parent
)
3085 struct device
*dev
= &snap
->dev
;
3088 dev
->type
= &rbd_snap_device_type
;
3089 dev
->parent
= parent
;
3090 dev
->release
= rbd_snap_dev_release
;
3091 dev_set_name(dev
, "%s%s", RBD_SNAP_DEV_NAME_PREFIX
, snap
->name
);
3092 dout("%s: registering device for snapshot %s\n", __func__
, snap
->name
);
3094 ret
= device_register(dev
);
3099 static struct rbd_snap
*__rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
3100 const char *snap_name
,
3101 u64 snap_id
, u64 snap_size
,
3104 struct rbd_snap
*snap
;
3107 snap
= kzalloc(sizeof (*snap
), GFP_KERNEL
);
3109 return ERR_PTR(-ENOMEM
);
3112 snap
->name
= kstrdup(snap_name
, GFP_KERNEL
);
3117 snap
->size
= snap_size
;
3118 snap
->features
= snap_features
;
3126 return ERR_PTR(ret
);
3129 static char *rbd_dev_v1_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
3130 u64
*snap_size
, u64
*snap_features
)
3134 rbd_assert(which
< rbd_dev
->header
.snapc
->num_snaps
);
3136 *snap_size
= rbd_dev
->header
.snap_sizes
[which
];
3137 *snap_features
= 0; /* No features for v1 */
3139 /* Skip over names until we find the one we are looking for */
3141 snap_name
= rbd_dev
->header
.snap_names
;
3143 snap_name
+= strlen(snap_name
) + 1;
3149 * Get the size and object order for an image snapshot, or if
3150 * snap_id is CEPH_NOSNAP, gets this information for the base
3153 static int _rbd_dev_v2_snap_size(struct rbd_device
*rbd_dev
, u64 snap_id
,
3154 u8
*order
, u64
*snap_size
)
3156 __le64 snapid
= cpu_to_le64(snap_id
);
3161 } __attribute__ ((packed
)) size_buf
= { 0 };
3163 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_name
,
3165 (char *) &snapid
, sizeof (snapid
),
3166 (char *) &size_buf
, sizeof (size_buf
), NULL
);
3167 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
3171 *order
= size_buf
.order
;
3172 *snap_size
= le64_to_cpu(size_buf
.size
);
3174 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
3175 (unsigned long long) snap_id
, (unsigned int) *order
,
3176 (unsigned long long) *snap_size
);
3181 static int rbd_dev_v2_image_size(struct rbd_device
*rbd_dev
)
3183 return _rbd_dev_v2_snap_size(rbd_dev
, CEPH_NOSNAP
,
3184 &rbd_dev
->header
.obj_order
,
3185 &rbd_dev
->header
.image_size
);
3188 static int rbd_dev_v2_object_prefix(struct rbd_device
*rbd_dev
)
3194 reply_buf
= kzalloc(RBD_OBJ_PREFIX_LEN_MAX
, GFP_KERNEL
);
3198 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_name
,
3199 "rbd", "get_object_prefix",
3201 reply_buf
, RBD_OBJ_PREFIX_LEN_MAX
, NULL
);
3202 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
3207 rbd_dev
->header
.object_prefix
= ceph_extract_encoded_string(&p
,
3208 p
+ RBD_OBJ_PREFIX_LEN_MAX
,
3211 if (IS_ERR(rbd_dev
->header
.object_prefix
)) {
3212 ret
= PTR_ERR(rbd_dev
->header
.object_prefix
);
3213 rbd_dev
->header
.object_prefix
= NULL
;
3215 dout(" object_prefix = %s\n", rbd_dev
->header
.object_prefix
);
3224 static int _rbd_dev_v2_snap_features(struct rbd_device
*rbd_dev
, u64 snap_id
,
3227 __le64 snapid
= cpu_to_le64(snap_id
);
3231 } features_buf
= { 0 };
3235 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_name
,
3236 "rbd", "get_features",
3237 (char *) &snapid
, sizeof (snapid
),
3238 (char *) &features_buf
, sizeof (features_buf
),
3240 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
3244 incompat
= le64_to_cpu(features_buf
.incompat
);
3245 if (incompat
& ~RBD_FEATURES_SUPPORTED
)
3248 *snap_features
= le64_to_cpu(features_buf
.features
);
3250 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3251 (unsigned long long) snap_id
,
3252 (unsigned long long) *snap_features
,
3253 (unsigned long long) le64_to_cpu(features_buf
.incompat
));
3258 static int rbd_dev_v2_features(struct rbd_device
*rbd_dev
)
3260 return _rbd_dev_v2_snap_features(rbd_dev
, CEPH_NOSNAP
,
3261 &rbd_dev
->header
.features
);
3264 static int rbd_dev_v2_parent_info(struct rbd_device
*rbd_dev
)
3266 struct rbd_spec
*parent_spec
;
3268 void *reply_buf
= NULL
;
3276 parent_spec
= rbd_spec_alloc();
3280 size
= sizeof (__le64
) + /* pool_id */
3281 sizeof (__le32
) + RBD_IMAGE_ID_LEN_MAX
+ /* image_id */
3282 sizeof (__le64
) + /* snap_id */
3283 sizeof (__le64
); /* overlap */
3284 reply_buf
= kmalloc(size
, GFP_KERNEL
);
3290 snapid
= cpu_to_le64(CEPH_NOSNAP
);
3291 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_name
,
3292 "rbd", "get_parent",
3293 (char *) &snapid
, sizeof (snapid
),
3294 (char *) reply_buf
, size
, NULL
);
3295 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
3301 end
= (char *) reply_buf
+ size
;
3302 ceph_decode_64_safe(&p
, end
, parent_spec
->pool_id
, out_err
);
3303 if (parent_spec
->pool_id
== CEPH_NOPOOL
)
3304 goto out
; /* No parent? No problem. */
3306 /* The ceph file layout needs to fit pool id in 32 bits */
3309 if (WARN_ON(parent_spec
->pool_id
> (u64
) U32_MAX
))
3312 image_id
= ceph_extract_encoded_string(&p
, end
, NULL
, GFP_KERNEL
);
3313 if (IS_ERR(image_id
)) {
3314 ret
= PTR_ERR(image_id
);
3317 parent_spec
->image_id
= image_id
;
3318 ceph_decode_64_safe(&p
, end
, parent_spec
->snap_id
, out_err
);
3319 ceph_decode_64_safe(&p
, end
, overlap
, out_err
);
3321 rbd_dev
->parent_overlap
= overlap
;
3322 rbd_dev
->parent_spec
= parent_spec
;
3323 parent_spec
= NULL
; /* rbd_dev now owns this */
3328 rbd_spec_put(parent_spec
);
3333 static char *rbd_dev_image_name(struct rbd_device
*rbd_dev
)
3335 size_t image_id_size
;
3340 void *reply_buf
= NULL
;
3342 char *image_name
= NULL
;
3345 rbd_assert(!rbd_dev
->spec
->image_name
);
3347 len
= strlen(rbd_dev
->spec
->image_id
);
3348 image_id_size
= sizeof (__le32
) + len
;
3349 image_id
= kmalloc(image_id_size
, GFP_KERNEL
);
3354 end
= (char *) image_id
+ image_id_size
;
3355 ceph_encode_string(&p
, end
, rbd_dev
->spec
->image_id
, (u32
) len
);
3357 size
= sizeof (__le32
) + RBD_IMAGE_NAME_LEN_MAX
;
3358 reply_buf
= kmalloc(size
, GFP_KERNEL
);
3362 ret
= rbd_obj_method_sync(rbd_dev
, RBD_DIRECTORY
,
3363 "rbd", "dir_get_name",
3364 image_id
, image_id_size
,
3365 (char *) reply_buf
, size
, NULL
);
3369 end
= (char *) reply_buf
+ size
;
3370 image_name
= ceph_extract_encoded_string(&p
, end
, &len
, GFP_KERNEL
);
3371 if (IS_ERR(image_name
))
3374 dout("%s: name is %s len is %zd\n", __func__
, image_name
, len
);
3383 * When a parent image gets probed, we only have the pool, image,
3384 * and snapshot ids but not the names of any of them. This call
3385 * is made later to fill in those names. It has to be done after
3386 * rbd_dev_snaps_update() has completed because some of the
3387 * information (in particular, snapshot name) is not available
3390 static int rbd_dev_probe_update_spec(struct rbd_device
*rbd_dev
)
3392 struct ceph_osd_client
*osdc
;
3394 void *reply_buf
= NULL
;
3397 if (rbd_dev
->spec
->pool_name
)
3398 return 0; /* Already have the names */
3400 /* Look up the pool name */
3402 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
3403 name
= ceph_pg_pool_name_by_id(osdc
->osdmap
, rbd_dev
->spec
->pool_id
);
3405 rbd_warn(rbd_dev
, "there is no pool with id %llu",
3406 rbd_dev
->spec
->pool_id
); /* Really a BUG() */
3410 rbd_dev
->spec
->pool_name
= kstrdup(name
, GFP_KERNEL
);
3411 if (!rbd_dev
->spec
->pool_name
)
3414 /* Fetch the image name; tolerate failure here */
3416 name
= rbd_dev_image_name(rbd_dev
);
3418 rbd_dev
->spec
->image_name
= (char *) name
;
3420 rbd_warn(rbd_dev
, "unable to get image name");
3422 /* Look up the snapshot name. */
3424 name
= rbd_snap_name(rbd_dev
, rbd_dev
->spec
->snap_id
);
3426 rbd_warn(rbd_dev
, "no snapshot with id %llu",
3427 rbd_dev
->spec
->snap_id
); /* Really a BUG() */
3431 rbd_dev
->spec
->snap_name
= kstrdup(name
, GFP_KERNEL
);
3432 if(!rbd_dev
->spec
->snap_name
)
3438 kfree(rbd_dev
->spec
->pool_name
);
3439 rbd_dev
->spec
->pool_name
= NULL
;
3444 static int rbd_dev_v2_snap_context(struct rbd_device
*rbd_dev
, u64
*ver
)
3453 struct ceph_snap_context
*snapc
;
3457 * We'll need room for the seq value (maximum snapshot id),
3458 * snapshot count, and array of that many snapshot ids.
3459 * For now we have a fixed upper limit on the number we're
3460 * prepared to receive.
3462 size
= sizeof (__le64
) + sizeof (__le32
) +
3463 RBD_MAX_SNAP_COUNT
* sizeof (__le64
);
3464 reply_buf
= kzalloc(size
, GFP_KERNEL
);
3468 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_name
,
3469 "rbd", "get_snapcontext",
3471 reply_buf
, size
, ver
);
3472 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
3478 end
= (char *) reply_buf
+ size
;
3479 ceph_decode_64_safe(&p
, end
, seq
, out
);
3480 ceph_decode_32_safe(&p
, end
, snap_count
, out
);
3483 * Make sure the reported number of snapshot ids wouldn't go
3484 * beyond the end of our buffer. But before checking that,
3485 * make sure the computed size of the snapshot context we
3486 * allocate is representable in a size_t.
3488 if (snap_count
> (SIZE_MAX
- sizeof (struct ceph_snap_context
))
3493 if (!ceph_has_room(&p
, end
, snap_count
* sizeof (__le64
)))
3496 size
= sizeof (struct ceph_snap_context
) +
3497 snap_count
* sizeof (snapc
->snaps
[0]);
3498 snapc
= kmalloc(size
, GFP_KERNEL
);
3504 atomic_set(&snapc
->nref
, 1);
3506 snapc
->num_snaps
= snap_count
;
3507 for (i
= 0; i
< snap_count
; i
++)
3508 snapc
->snaps
[i
] = ceph_decode_64(&p
);
3510 rbd_dev
->header
.snapc
= snapc
;
3512 dout(" snap context seq = %llu, snap_count = %u\n",
3513 (unsigned long long) seq
, (unsigned int) snap_count
);
3521 static char *rbd_dev_v2_snap_name(struct rbd_device
*rbd_dev
, u32 which
)
3531 size
= sizeof (__le32
) + RBD_MAX_SNAP_NAME_LEN
;
3532 reply_buf
= kmalloc(size
, GFP_KERNEL
);
3534 return ERR_PTR(-ENOMEM
);
3536 snap_id
= cpu_to_le64(rbd_dev
->header
.snapc
->snaps
[which
]);
3537 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_name
,
3538 "rbd", "get_snapshot_name",
3539 (char *) &snap_id
, sizeof (snap_id
),
3540 reply_buf
, size
, NULL
);
3541 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
3546 end
= (char *) reply_buf
+ size
;
3547 snap_name
= ceph_extract_encoded_string(&p
, end
, NULL
, GFP_KERNEL
);
3548 if (IS_ERR(snap_name
)) {
3549 ret
= PTR_ERR(snap_name
);
3552 dout(" snap_id 0x%016llx snap_name = %s\n",
3553 (unsigned long long) le64_to_cpu(snap_id
), snap_name
);
3561 return ERR_PTR(ret
);
3564 static char *rbd_dev_v2_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
3565 u64
*snap_size
, u64
*snap_features
)
3571 snap_id
= rbd_dev
->header
.snapc
->snaps
[which
];
3572 ret
= _rbd_dev_v2_snap_size(rbd_dev
, snap_id
, &order
, snap_size
);
3574 return ERR_PTR(ret
);
3575 ret
= _rbd_dev_v2_snap_features(rbd_dev
, snap_id
, snap_features
);
3577 return ERR_PTR(ret
);
3579 return rbd_dev_v2_snap_name(rbd_dev
, which
);
3582 static char *rbd_dev_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
3583 u64
*snap_size
, u64
*snap_features
)
3585 if (rbd_dev
->image_format
== 1)
3586 return rbd_dev_v1_snap_info(rbd_dev
, which
,
3587 snap_size
, snap_features
);
3588 if (rbd_dev
->image_format
== 2)
3589 return rbd_dev_v2_snap_info(rbd_dev
, which
,
3590 snap_size
, snap_features
);
3591 return ERR_PTR(-EINVAL
);
3594 static int rbd_dev_v2_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
3599 down_write(&rbd_dev
->header_rwsem
);
3601 /* Grab old order first, to see if it changes */
3603 obj_order
= rbd_dev
->header
.obj_order
,
3604 ret
= rbd_dev_v2_image_size(rbd_dev
);
3607 if (rbd_dev
->header
.obj_order
!= obj_order
) {
3611 rbd_update_mapping_size(rbd_dev
);
3613 ret
= rbd_dev_v2_snap_context(rbd_dev
, hver
);
3614 dout("rbd_dev_v2_snap_context returned %d\n", ret
);
3617 ret
= rbd_dev_snaps_update(rbd_dev
);
3618 dout("rbd_dev_snaps_update returned %d\n", ret
);
3621 ret
= rbd_dev_snaps_register(rbd_dev
);
3622 dout("rbd_dev_snaps_register returned %d\n", ret
);
3624 up_write(&rbd_dev
->header_rwsem
);
3630 * Scan the rbd device's current snapshot list and compare it to the
3631 * newly-received snapshot context. Remove any existing snapshots
3632 * not present in the new snapshot context. Add a new snapshot for
3633 * any snaphots in the snapshot context not in the current list.
3634 * And verify there are no changes to snapshots we already know
3637 * Assumes the snapshots in the snapshot context are sorted by
3638 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3639 * are also maintained in that order.)
3641 static int rbd_dev_snaps_update(struct rbd_device
*rbd_dev
)
3643 struct ceph_snap_context
*snapc
= rbd_dev
->header
.snapc
;
3644 const u32 snap_count
= snapc
->num_snaps
;
3645 struct list_head
*head
= &rbd_dev
->snaps
;
3646 struct list_head
*links
= head
->next
;
3649 dout("%s: snap count is %u\n", __func__
, (unsigned int) snap_count
);
3650 while (index
< snap_count
|| links
!= head
) {
3652 struct rbd_snap
*snap
;
3655 u64 snap_features
= 0;
3657 snap_id
= index
< snap_count
? snapc
->snaps
[index
]
3659 snap
= links
!= head
? list_entry(links
, struct rbd_snap
, node
)
3661 rbd_assert(!snap
|| snap
->id
!= CEPH_NOSNAP
);
3663 if (snap_id
== CEPH_NOSNAP
|| (snap
&& snap
->id
> snap_id
)) {
3664 struct list_head
*next
= links
->next
;
3667 * A previously-existing snapshot is not in
3668 * the new snap context.
3670 * If the now missing snapshot is the one the
3671 * image is mapped to, clear its exists flag
3672 * so we can avoid sending any more requests
3675 if (rbd_dev
->spec
->snap_id
== snap
->id
)
3676 clear_bit(RBD_DEV_FLAG_EXISTS
, &rbd_dev
->flags
);
3677 rbd_remove_snap_dev(snap
);
3678 dout("%ssnap id %llu has been removed\n",
3679 rbd_dev
->spec
->snap_id
== snap
->id
?
3681 (unsigned long long) snap
->id
);
3683 /* Done with this list entry; advance */
3689 snap_name
= rbd_dev_snap_info(rbd_dev
, index
,
3690 &snap_size
, &snap_features
);
3691 if (IS_ERR(snap_name
))
3692 return PTR_ERR(snap_name
);
3694 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count
,
3695 (unsigned long long) snap_id
);
3696 if (!snap
|| (snap_id
!= CEPH_NOSNAP
&& snap
->id
< snap_id
)) {
3697 struct rbd_snap
*new_snap
;
3699 /* We haven't seen this snapshot before */
3701 new_snap
= __rbd_add_snap_dev(rbd_dev
, snap_name
,
3702 snap_id
, snap_size
, snap_features
);
3703 if (IS_ERR(new_snap
)) {
3704 int err
= PTR_ERR(new_snap
);
3706 dout(" failed to add dev, error %d\n", err
);
3711 /* New goes before existing, or at end of list */
3713 dout(" added dev%s\n", snap
? "" : " at end\n");
3715 list_add_tail(&new_snap
->node
, &snap
->node
);
3717 list_add_tail(&new_snap
->node
, head
);
3719 /* Already have this one */
3721 dout(" already present\n");
3723 rbd_assert(snap
->size
== snap_size
);
3724 rbd_assert(!strcmp(snap
->name
, snap_name
));
3725 rbd_assert(snap
->features
== snap_features
);
3727 /* Done with this list entry; advance */
3729 links
= links
->next
;
3732 /* Advance to the next entry in the snapshot context */
3736 dout("%s: done\n", __func__
);
3742 * Scan the list of snapshots and register the devices for any that
3743 * have not already been registered.
3745 static int rbd_dev_snaps_register(struct rbd_device
*rbd_dev
)
3747 struct rbd_snap
*snap
;
3750 dout("%s:\n", __func__
);
3751 if (WARN_ON(!device_is_registered(&rbd_dev
->dev
)))
3754 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
3755 if (!rbd_snap_registered(snap
)) {
3756 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
3761 dout("%s: returning %d\n", __func__
, ret
);
3766 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
3771 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
3773 dev
= &rbd_dev
->dev
;
3774 dev
->bus
= &rbd_bus_type
;
3775 dev
->type
= &rbd_device_type
;
3776 dev
->parent
= &rbd_root_dev
;
3777 dev
->release
= rbd_dev_release
;
3778 dev_set_name(dev
, "%d", rbd_dev
->dev_id
);
3779 ret
= device_register(dev
);
3781 mutex_unlock(&ctl_mutex
);
3786 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
3788 device_unregister(&rbd_dev
->dev
);
3791 static atomic64_t rbd_dev_id_max
= ATOMIC64_INIT(0);
3794 * Get a unique rbd identifier for the given new rbd_dev, and add
3795 * the rbd_dev to the global list. The minimum rbd id is 1.
3797 static void rbd_dev_id_get(struct rbd_device
*rbd_dev
)
3799 rbd_dev
->dev_id
= atomic64_inc_return(&rbd_dev_id_max
);
3801 spin_lock(&rbd_dev_list_lock
);
3802 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
3803 spin_unlock(&rbd_dev_list_lock
);
3804 dout("rbd_dev %p given dev id %llu\n", rbd_dev
,
3805 (unsigned long long) rbd_dev
->dev_id
);
3809 * Remove an rbd_dev from the global list, and record that its
3810 * identifier is no longer in use.
3812 static void rbd_dev_id_put(struct rbd_device
*rbd_dev
)
3814 struct list_head
*tmp
;
3815 int rbd_id
= rbd_dev
->dev_id
;
3818 rbd_assert(rbd_id
> 0);
3820 dout("rbd_dev %p released dev id %llu\n", rbd_dev
,
3821 (unsigned long long) rbd_dev
->dev_id
);
3822 spin_lock(&rbd_dev_list_lock
);
3823 list_del_init(&rbd_dev
->node
);
3826 * If the id being "put" is not the current maximum, there
3827 * is nothing special we need to do.
3829 if (rbd_id
!= atomic64_read(&rbd_dev_id_max
)) {
3830 spin_unlock(&rbd_dev_list_lock
);
3835 * We need to update the current maximum id. Search the
3836 * list to find out what it is. We're more likely to find
3837 * the maximum at the end, so search the list backward.
3840 list_for_each_prev(tmp
, &rbd_dev_list
) {
3841 struct rbd_device
*rbd_dev
;
3843 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
3844 if (rbd_dev
->dev_id
> max_id
)
3845 max_id
= rbd_dev
->dev_id
;
3847 spin_unlock(&rbd_dev_list_lock
);
3850 * The max id could have been updated by rbd_dev_id_get(), in
3851 * which case it now accurately reflects the new maximum.
3852 * Be careful not to overwrite the maximum value in that
3855 atomic64_cmpxchg(&rbd_dev_id_max
, rbd_id
, max_id
);
3856 dout(" max dev id has been reset\n");
3860 * Skips over white space at *buf, and updates *buf to point to the
3861 * first found non-space character (if any). Returns the length of
3862 * the token (string of non-white space characters) found. Note
3863 * that *buf must be terminated with '\0'.
3865 static inline size_t next_token(const char **buf
)
3868 * These are the characters that produce nonzero for
3869 * isspace() in the "C" and "POSIX" locales.
3871 const char *spaces
= " \f\n\r\t\v";
3873 *buf
+= strspn(*buf
, spaces
); /* Find start of token */
3875 return strcspn(*buf
, spaces
); /* Return token length */
3879 * Finds the next token in *buf, and if the provided token buffer is
3880 * big enough, copies the found token into it. The result, if
3881 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3882 * must be terminated with '\0' on entry.
3884 * Returns the length of the token found (not including the '\0').
3885 * Return value will be 0 if no token is found, and it will be >=
3886 * token_size if the token would not fit.
3888 * The *buf pointer will be updated to point beyond the end of the
3889 * found token. Note that this occurs even if the token buffer is
3890 * too small to hold it.
3892 static inline size_t copy_token(const char **buf
,
3898 len
= next_token(buf
);
3899 if (len
< token_size
) {
3900 memcpy(token
, *buf
, len
);
3901 *(token
+ len
) = '\0';
3909 * Finds the next token in *buf, dynamically allocates a buffer big
3910 * enough to hold a copy of it, and copies the token into the new
3911 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3912 * that a duplicate buffer is created even for a zero-length token.
3914 * Returns a pointer to the newly-allocated duplicate, or a null
3915 * pointer if memory for the duplicate was not available. If
3916 * the lenp argument is a non-null pointer, the length of the token
3917 * (not including the '\0') is returned in *lenp.
3919 * If successful, the *buf pointer will be updated to point beyond
3920 * the end of the found token.
3922 * Note: uses GFP_KERNEL for allocation.
3924 static inline char *dup_token(const char **buf
, size_t *lenp
)
3929 len
= next_token(buf
);
3930 dup
= kmemdup(*buf
, len
+ 1, GFP_KERNEL
);
3933 *(dup
+ len
) = '\0';
3943 * Parse the options provided for an "rbd add" (i.e., rbd image
3944 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3945 * and the data written is passed here via a NUL-terminated buffer.
3946 * Returns 0 if successful or an error code otherwise.
3948 * The information extracted from these options is recorded in
3949 * the other parameters which return dynamically-allocated
3952 * The address of a pointer that will refer to a ceph options
3953 * structure. Caller must release the returned pointer using
3954 * ceph_destroy_options() when it is no longer needed.
3956 * Address of an rbd options pointer. Fully initialized by
3957 * this function; caller must release with kfree().
3959 * Address of an rbd image specification pointer. Fully
3960 * initialized by this function based on parsed options.
3961 * Caller must release with rbd_spec_put().
3963 * The options passed take this form:
3964 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3967 * A comma-separated list of one or more monitor addresses.
3968 * A monitor address is an ip address, optionally followed
3969 * by a port number (separated by a colon).
3970 * I.e.: ip1[:port1][,ip2[:port2]...]
3972 * A comma-separated list of ceph and/or rbd options.
3974 * The name of the rados pool containing the rbd image.
3976 * The name of the image in that pool to map.
3978 * An optional snapshot id. If provided, the mapping will
3979 * present data from the image at the time that snapshot was
3980 * created. The image head is used if no snapshot id is
3981 * provided. Snapshot mappings are always read-only.
3983 static int rbd_add_parse_args(const char *buf
,
3984 struct ceph_options
**ceph_opts
,
3985 struct rbd_options
**opts
,
3986 struct rbd_spec
**rbd_spec
)
3990 const char *mon_addrs
;
3991 size_t mon_addrs_size
;
3992 struct rbd_spec
*spec
= NULL
;
3993 struct rbd_options
*rbd_opts
= NULL
;
3994 struct ceph_options
*copts
;
3997 /* The first four tokens are required */
3999 len
= next_token(&buf
);
4001 rbd_warn(NULL
, "no monitor address(es) provided");
4005 mon_addrs_size
= len
+ 1;
4009 options
= dup_token(&buf
, NULL
);
4013 rbd_warn(NULL
, "no options provided");
4017 spec
= rbd_spec_alloc();
4021 spec
->pool_name
= dup_token(&buf
, NULL
);
4022 if (!spec
->pool_name
)
4024 if (!*spec
->pool_name
) {
4025 rbd_warn(NULL
, "no pool name provided");
4029 spec
->image_name
= dup_token(&buf
, NULL
);
4030 if (!spec
->image_name
)
4032 if (!*spec
->image_name
) {
4033 rbd_warn(NULL
, "no image name provided");
4038 * Snapshot name is optional; default is to use "-"
4039 * (indicating the head/no snapshot).
4041 len
= next_token(&buf
);
4043 buf
= RBD_SNAP_HEAD_NAME
; /* No snapshot supplied */
4044 len
= sizeof (RBD_SNAP_HEAD_NAME
) - 1;
4045 } else if (len
> RBD_MAX_SNAP_NAME_LEN
) {
4046 ret
= -ENAMETOOLONG
;
4049 spec
->snap_name
= kmemdup(buf
, len
+ 1, GFP_KERNEL
);
4050 if (!spec
->snap_name
)
4052 *(spec
->snap_name
+ len
) = '\0';
4054 /* Initialize all rbd options to the defaults */
4056 rbd_opts
= kzalloc(sizeof (*rbd_opts
), GFP_KERNEL
);
4060 rbd_opts
->read_only
= RBD_READ_ONLY_DEFAULT
;
4062 copts
= ceph_parse_options(options
, mon_addrs
,
4063 mon_addrs
+ mon_addrs_size
- 1,
4064 parse_rbd_opts_token
, rbd_opts
);
4065 if (IS_ERR(copts
)) {
4066 ret
= PTR_ERR(copts
);
4087 * An rbd format 2 image has a unique identifier, distinct from the
4088 * name given to it by the user. Internally, that identifier is
4089 * what's used to specify the names of objects related to the image.
4091 * A special "rbd id" object is used to map an rbd image name to its
4092 * id. If that object doesn't exist, then there is no v2 rbd image
4093 * with the supplied name.
4095 * This function will record the given rbd_dev's image_id field if
4096 * it can be determined, and in that case will return 0. If any
4097 * errors occur a negative errno will be returned and the rbd_dev's
4098 * image_id field will be unchanged (and should be NULL).
4100 static int rbd_dev_image_id(struct rbd_device
*rbd_dev
)
4108 /* If we already have it we don't need to look it up */
4110 if (rbd_dev
->spec
->image_id
)
4114 * When probing a parent image, the image id is already
4115 * known (and the image name likely is not). There's no
4116 * need to fetch the image id again in this case.
4118 if (rbd_dev
->spec
->image_id
)
4122 * First, see if the format 2 image id file exists, and if
4123 * so, get the image's persistent id from it.
4125 size
= sizeof (RBD_ID_PREFIX
) + strlen(rbd_dev
->spec
->image_name
);
4126 object_name
= kmalloc(size
, GFP_NOIO
);
4129 sprintf(object_name
, "%s%s", RBD_ID_PREFIX
, rbd_dev
->spec
->image_name
);
4130 dout("rbd id object name is %s\n", object_name
);
4132 /* Response will be an encoded string, which includes a length */
4134 size
= sizeof (__le32
) + RBD_IMAGE_ID_LEN_MAX
;
4135 response
= kzalloc(size
, GFP_NOIO
);
4141 ret
= rbd_obj_method_sync(rbd_dev
, object_name
,
4144 response
, RBD_IMAGE_ID_LEN_MAX
, NULL
);
4145 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
4150 rbd_dev
->spec
->image_id
= ceph_extract_encoded_string(&p
,
4151 p
+ RBD_IMAGE_ID_LEN_MAX
,
4153 if (IS_ERR(rbd_dev
->spec
->image_id
)) {
4154 ret
= PTR_ERR(rbd_dev
->spec
->image_id
);
4155 rbd_dev
->spec
->image_id
= NULL
;
4157 dout("image_id is %s\n", rbd_dev
->spec
->image_id
);
4166 static int rbd_dev_v1_probe(struct rbd_device
*rbd_dev
)
4171 /* Version 1 images have no id; empty string is used */
4173 rbd_dev
->spec
->image_id
= kstrdup("", GFP_KERNEL
);
4174 if (!rbd_dev
->spec
->image_id
)
4177 /* Record the header object name for this rbd image. */
4179 size
= strlen(rbd_dev
->spec
->image_name
) + sizeof (RBD_SUFFIX
);
4180 rbd_dev
->header_name
= kmalloc(size
, GFP_KERNEL
);
4181 if (!rbd_dev
->header_name
) {
4185 sprintf(rbd_dev
->header_name
, "%s%s",
4186 rbd_dev
->spec
->image_name
, RBD_SUFFIX
);
4188 /* Populate rbd image metadata */
4190 ret
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
4194 /* Version 1 images have no parent (no layering) */
4196 rbd_dev
->parent_spec
= NULL
;
4197 rbd_dev
->parent_overlap
= 0;
4199 rbd_dev
->image_format
= 1;
4201 dout("discovered version 1 image, header name is %s\n",
4202 rbd_dev
->header_name
);
4207 kfree(rbd_dev
->header_name
);
4208 rbd_dev
->header_name
= NULL
;
4209 kfree(rbd_dev
->spec
->image_id
);
4210 rbd_dev
->spec
->image_id
= NULL
;
4215 static int rbd_dev_v2_probe(struct rbd_device
*rbd_dev
)
4222 * Image id was filled in by the caller. Record the header
4223 * object name for this rbd image.
4225 size
= sizeof (RBD_HEADER_PREFIX
) + strlen(rbd_dev
->spec
->image_id
);
4226 rbd_dev
->header_name
= kmalloc(size
, GFP_KERNEL
);
4227 if (!rbd_dev
->header_name
)
4229 sprintf(rbd_dev
->header_name
, "%s%s",
4230 RBD_HEADER_PREFIX
, rbd_dev
->spec
->image_id
);
4232 /* Get the size and object order for the image */
4234 ret
= rbd_dev_v2_image_size(rbd_dev
);
4238 /* Get the object prefix (a.k.a. block_name) for the image */
4240 ret
= rbd_dev_v2_object_prefix(rbd_dev
);
4244 /* Get the and check features for the image */
4246 ret
= rbd_dev_v2_features(rbd_dev
);
4250 /* If the image supports layering, get the parent info */
4252 if (rbd_dev
->header
.features
& RBD_FEATURE_LAYERING
) {
4253 ret
= rbd_dev_v2_parent_info(rbd_dev
);
4258 /* crypto and compression type aren't (yet) supported for v2 images */
4260 rbd_dev
->header
.crypt_type
= 0;
4261 rbd_dev
->header
.comp_type
= 0;
4263 /* Get the snapshot context, plus the header version */
4265 ret
= rbd_dev_v2_snap_context(rbd_dev
, &ver
);
4268 rbd_dev
->header
.obj_version
= ver
;
4270 rbd_dev
->image_format
= 2;
4272 dout("discovered version 2 image, header name is %s\n",
4273 rbd_dev
->header_name
);
4277 rbd_dev
->parent_overlap
= 0;
4278 rbd_spec_put(rbd_dev
->parent_spec
);
4279 rbd_dev
->parent_spec
= NULL
;
4280 kfree(rbd_dev
->header_name
);
4281 rbd_dev
->header_name
= NULL
;
4282 kfree(rbd_dev
->header
.object_prefix
);
4283 rbd_dev
->header
.object_prefix
= NULL
;
4288 static int rbd_dev_probe_finish(struct rbd_device
*rbd_dev
)
4290 struct rbd_device
*parent
= NULL
;
4291 struct rbd_spec
*parent_spec
= NULL
;
4292 struct rbd_client
*rbdc
= NULL
;
4295 /* no need to lock here, as rbd_dev is not registered yet */
4296 ret
= rbd_dev_snaps_update(rbd_dev
);
4300 ret
= rbd_dev_probe_update_spec(rbd_dev
);
4304 ret
= rbd_dev_set_mapping(rbd_dev
);
4308 /* generate unique id: find highest unique id, add one */
4309 rbd_dev_id_get(rbd_dev
);
4311 /* Fill in the device name, now that we have its id. */
4312 BUILD_BUG_ON(DEV_NAME_LEN
4313 < sizeof (RBD_DRV_NAME
) + MAX_INT_FORMAT_WIDTH
);
4314 sprintf(rbd_dev
->name
, "%s%d", RBD_DRV_NAME
, rbd_dev
->dev_id
);
4316 /* Get our block major device number. */
4318 ret
= register_blkdev(0, rbd_dev
->name
);
4321 rbd_dev
->major
= ret
;
4323 /* Set up the blkdev mapping. */
4325 ret
= rbd_init_disk(rbd_dev
);
4327 goto err_out_blkdev
;
4329 ret
= rbd_bus_add_dev(rbd_dev
);
4334 * At this point cleanup in the event of an error is the job
4335 * of the sysfs code (initiated by rbd_bus_del_dev()).
4337 /* Probe the parent if there is one */
4339 if (rbd_dev
->parent_spec
) {
4341 * We need to pass a reference to the client and the
4342 * parent spec when creating the parent rbd_dev.
4343 * Images related by parent/child relationships
4344 * always share both.
4346 parent_spec
= rbd_spec_get(rbd_dev
->parent_spec
);
4347 rbdc
= __rbd_get_client(rbd_dev
->rbd_client
);
4349 parent
= rbd_dev_create(rbdc
, parent_spec
);
4354 rbdc
= NULL
; /* parent now owns reference */
4355 parent_spec
= NULL
; /* parent now owns reference */
4356 ret
= rbd_dev_probe(parent
);
4358 goto err_out_parent
;
4359 rbd_dev
->parent
= parent
;
4362 down_write(&rbd_dev
->header_rwsem
);
4363 ret
= rbd_dev_snaps_register(rbd_dev
);
4364 up_write(&rbd_dev
->header_rwsem
);
4368 ret
= rbd_dev_header_watch_sync(rbd_dev
, 1);
4372 /* Everything's ready. Announce the disk to the world. */
4374 add_disk(rbd_dev
->disk
);
4376 pr_info("%s: added with size 0x%llx\n", rbd_dev
->disk
->disk_name
,
4377 (unsigned long long) rbd_dev
->mapping
.size
);
4382 rbd_dev_destroy(parent
);
4384 rbd_spec_put(parent_spec
);
4385 rbd_put_client(rbdc
);
4387 /* this will also clean up rest of rbd_dev stuff */
4389 rbd_bus_del_dev(rbd_dev
);
4393 rbd_free_disk(rbd_dev
);
4395 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
4397 rbd_dev_id_put(rbd_dev
);
4399 rbd_remove_all_snaps(rbd_dev
);
4405 * Probe for the existence of the header object for the given rbd
4406 * device. For format 2 images this includes determining the image
4409 static int rbd_dev_probe(struct rbd_device
*rbd_dev
)
4414 * Get the id from the image id object. If it's not a
4415 * format 2 image, we'll get ENOENT back, and we'll assume
4416 * it's a format 1 image.
4418 ret
= rbd_dev_image_id(rbd_dev
);
4420 ret
= rbd_dev_v1_probe(rbd_dev
);
4422 ret
= rbd_dev_v2_probe(rbd_dev
);
4424 dout("probe failed, returning %d\n", ret
);
4429 ret
= rbd_dev_probe_finish(rbd_dev
);
4431 rbd_header_free(&rbd_dev
->header
);
4436 static ssize_t
rbd_add(struct bus_type
*bus
,
4440 struct rbd_device
*rbd_dev
= NULL
;
4441 struct ceph_options
*ceph_opts
= NULL
;
4442 struct rbd_options
*rbd_opts
= NULL
;
4443 struct rbd_spec
*spec
= NULL
;
4444 struct rbd_client
*rbdc
;
4445 struct ceph_osd_client
*osdc
;
4448 if (!try_module_get(THIS_MODULE
))
4451 /* parse add command */
4452 rc
= rbd_add_parse_args(buf
, &ceph_opts
, &rbd_opts
, &spec
);
4454 goto err_out_module
;
4456 rbdc
= rbd_get_client(ceph_opts
);
4461 ceph_opts
= NULL
; /* rbd_dev client now owns this */
4464 osdc
= &rbdc
->client
->osdc
;
4465 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, spec
->pool_name
);
4467 goto err_out_client
;
4468 spec
->pool_id
= (u64
) rc
;
4470 /* The ceph file layout needs to fit pool id in 32 bits */
4472 if (WARN_ON(spec
->pool_id
> (u64
) U32_MAX
)) {
4474 goto err_out_client
;
4477 rbd_dev
= rbd_dev_create(rbdc
, spec
);
4479 goto err_out_client
;
4480 rbdc
= NULL
; /* rbd_dev now owns this */
4481 spec
= NULL
; /* rbd_dev now owns this */
4483 rbd_dev
->mapping
.read_only
= rbd_opts
->read_only
;
4485 rbd_opts
= NULL
; /* done with this */
4487 rc
= rbd_dev_probe(rbd_dev
);
4489 goto err_out_rbd_dev
;
4493 rbd_dev_destroy(rbd_dev
);
4495 rbd_put_client(rbdc
);
4498 ceph_destroy_options(ceph_opts
);
4502 module_put(THIS_MODULE
);
4504 dout("Error adding device %s\n", buf
);
4506 return (ssize_t
) rc
;
4509 static struct rbd_device
*__rbd_get_dev(unsigned long dev_id
)
4511 struct list_head
*tmp
;
4512 struct rbd_device
*rbd_dev
;
4514 spin_lock(&rbd_dev_list_lock
);
4515 list_for_each(tmp
, &rbd_dev_list
) {
4516 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
4517 if (rbd_dev
->dev_id
== dev_id
) {
4518 spin_unlock(&rbd_dev_list_lock
);
4522 spin_unlock(&rbd_dev_list_lock
);
4526 static void rbd_dev_release(struct device
*dev
)
4528 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4530 if (rbd_dev
->watch_event
)
4531 rbd_dev_header_watch_sync(rbd_dev
, 0);
4533 /* clean up and free blkdev */
4534 rbd_free_disk(rbd_dev
);
4535 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
4537 /* release allocated disk header fields */
4538 rbd_header_free(&rbd_dev
->header
);
4540 /* done with the id, and with the rbd_dev */
4541 rbd_dev_id_put(rbd_dev
);
4542 rbd_assert(rbd_dev
->rbd_client
!= NULL
);
4543 rbd_dev_destroy(rbd_dev
);
4545 /* release module ref */
4546 module_put(THIS_MODULE
);
4549 static void __rbd_remove(struct rbd_device
*rbd_dev
)
4551 rbd_remove_all_snaps(rbd_dev
);
4552 rbd_bus_del_dev(rbd_dev
);
4555 static ssize_t
rbd_remove(struct bus_type
*bus
,
4559 struct rbd_device
*rbd_dev
= NULL
;
4564 rc
= strict_strtoul(buf
, 10, &ul
);
4568 /* convert to int; abort if we lost anything in the conversion */
4569 target_id
= (int) ul
;
4570 if (target_id
!= ul
)
4573 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
4575 rbd_dev
= __rbd_get_dev(target_id
);
4581 spin_lock_irq(&rbd_dev
->lock
);
4582 if (rbd_dev
->open_count
)
4585 set_bit(RBD_DEV_FLAG_REMOVING
, &rbd_dev
->flags
);
4586 spin_unlock_irq(&rbd_dev
->lock
);
4590 while (rbd_dev
->parent_spec
) {
4591 struct rbd_device
*first
= rbd_dev
;
4592 struct rbd_device
*second
= first
->parent
;
4593 struct rbd_device
*third
;
4596 * Follow to the parent with no grandparent and
4599 while (second
&& (third
= second
->parent
)) {
4603 __rbd_remove(second
);
4604 rbd_spec_put(first
->parent_spec
);
4605 first
->parent_spec
= NULL
;
4606 first
->parent_overlap
= 0;
4607 first
->parent
= NULL
;
4609 __rbd_remove(rbd_dev
);
4612 mutex_unlock(&ctl_mutex
);
4618 * create control files in sysfs
4621 static int rbd_sysfs_init(void)
4625 ret
= device_register(&rbd_root_dev
);
4629 ret
= bus_register(&rbd_bus_type
);
4631 device_unregister(&rbd_root_dev
);
4636 static void rbd_sysfs_cleanup(void)
4638 bus_unregister(&rbd_bus_type
);
4639 device_unregister(&rbd_root_dev
);
4642 static int __init
rbd_init(void)
4646 if (!libceph_compatible(NULL
)) {
4647 rbd_warn(NULL
, "libceph incompatibility (quitting)");
4651 rc
= rbd_sysfs_init();
4654 pr_info("loaded " RBD_DRV_NAME_LONG
"\n");
4658 static void __exit
rbd_exit(void)
4660 rbd_sysfs_cleanup();
4663 module_init(rbd_init
);
4664 module_exit(rbd_exit
);
4666 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4667 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4668 MODULE_DESCRIPTION("rados block device");
4670 /* following authorship retained from original osdblk.c */
4671 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4673 MODULE_LICENSE("GPL");