2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN \
66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
68 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN 1024
71 #define RBD_SNAP_HEAD_NAME "-"
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX 64
77 #define RBD_OBJ_PREFIX_LEN_MAX 64
81 #define RBD_FEATURE_LAYERING 1
83 /* Features supported by this (client software) implementation. */
85 #define RBD_FEATURES_ALL (0)
88 * An RBD device name will be "rbd#", where the "rbd" comes from
89 * RBD_DRV_NAME above, and # is a unique integer identifier.
90 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
91 * enough to hold all possible device names.
93 #define DEV_NAME_LEN 32
94 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
96 #define RBD_READ_ONLY_DEFAULT false
99 * block device image metadata (in-memory version)
101 struct rbd_image_header
{
102 /* These four fields never change for a given rbd image */
109 /* The remaining fields need to be updated occasionally */
111 struct ceph_snap_context
*snapc
;
119 * An rbd image specification.
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
131 size_t image_name_len
;
144 * an instance of the client. multiple devices may share an rbd client.
147 struct ceph_client
*client
;
149 struct list_head node
;
153 * a request completion status
155 struct rbd_req_status
{
162 * a collection of requests
164 struct rbd_req_coll
{
168 struct rbd_req_status status
[0];
172 * a single io request
175 struct request
*rq
; /* blk layer request */
176 struct bio
*bio
; /* cloned bio */
177 struct page
**pages
; /* list of used pages */
180 struct rbd_req_coll
*coll
;
187 struct list_head node
;
202 int dev_id
; /* blkdev unique id */
204 int major
; /* blkdev assigned major */
205 struct gendisk
*disk
; /* blkdev's gendisk and rq */
207 u32 image_format
; /* Either 1 or 2 */
208 struct rbd_client
*rbd_client
;
210 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
212 spinlock_t lock
; /* queue lock */
214 struct rbd_image_header header
;
216 struct rbd_spec
*spec
;
220 struct ceph_osd_event
*watch_event
;
221 struct ceph_osd_request
*watch_request
;
223 struct rbd_spec
*parent_spec
;
226 /* protects updating the header */
227 struct rw_semaphore header_rwsem
;
229 struct rbd_mapping mapping
;
231 struct list_head node
;
233 /* list of snapshots */
234 struct list_head snaps
;
238 unsigned long open_count
;
241 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
243 static LIST_HEAD(rbd_dev_list
); /* devices */
244 static DEFINE_SPINLOCK(rbd_dev_list_lock
);
246 static LIST_HEAD(rbd_client_list
); /* clients */
247 static DEFINE_SPINLOCK(rbd_client_list_lock
);
249 static int rbd_dev_snaps_update(struct rbd_device
*rbd_dev
);
250 static int rbd_dev_snaps_register(struct rbd_device
*rbd_dev
);
252 static void rbd_dev_release(struct device
*dev
);
253 static void rbd_remove_snap_dev(struct rbd_snap
*snap
);
255 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
,
257 static ssize_t
rbd_remove(struct bus_type
*bus
, const char *buf
,
260 static struct bus_attribute rbd_bus_attrs
[] = {
261 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
262 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
266 static struct bus_type rbd_bus_type
= {
268 .bus_attrs
= rbd_bus_attrs
,
271 static void rbd_root_dev_release(struct device
*dev
)
275 static struct device rbd_root_dev
= {
277 .release
= rbd_root_dev_release
,
281 #define rbd_assert(expr) \
282 if (unlikely(!(expr))) { \
283 printk(KERN_ERR "\nAssertion failure in %s() " \
285 "\trbd_assert(%s);\n\n", \
286 __func__, __LINE__, #expr); \
289 #else /* !RBD_DEBUG */
290 # define rbd_assert(expr) ((void) 0)
291 #endif /* !RBD_DEBUG */
293 static struct device
*rbd_get_dev(struct rbd_device
*rbd_dev
)
295 return get_device(&rbd_dev
->dev
);
298 static void rbd_put_dev(struct rbd_device
*rbd_dev
)
300 put_device(&rbd_dev
->dev
);
303 static int rbd_dev_refresh(struct rbd_device
*rbd_dev
, u64
*hver
);
304 static int rbd_dev_v2_refresh(struct rbd_device
*rbd_dev
, u64
*hver
);
306 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
308 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
310 if ((mode
& FMODE_WRITE
) && rbd_dev
->mapping
.read_only
)
313 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
314 rbd_get_dev(rbd_dev
);
315 set_device_ro(bdev
, rbd_dev
->mapping
.read_only
);
316 rbd_dev
->open_count
++;
317 mutex_unlock(&ctl_mutex
);
322 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
324 struct rbd_device
*rbd_dev
= disk
->private_data
;
326 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
327 rbd_assert(rbd_dev
->open_count
> 0);
328 rbd_dev
->open_count
--;
329 rbd_put_dev(rbd_dev
);
330 mutex_unlock(&ctl_mutex
);
335 static const struct block_device_operations rbd_bd_ops
= {
336 .owner
= THIS_MODULE
,
338 .release
= rbd_release
,
342 * Initialize an rbd client instance.
345 static struct rbd_client
*rbd_client_create(struct ceph_options
*ceph_opts
)
347 struct rbd_client
*rbdc
;
350 dout("rbd_client_create\n");
351 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
355 kref_init(&rbdc
->kref
);
356 INIT_LIST_HEAD(&rbdc
->node
);
358 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
360 rbdc
->client
= ceph_create_client(ceph_opts
, rbdc
, 0, 0);
361 if (IS_ERR(rbdc
->client
))
363 ceph_opts
= NULL
; /* Now rbdc->client is responsible for ceph_opts */
365 ret
= ceph_open_session(rbdc
->client
);
369 spin_lock(&rbd_client_list_lock
);
370 list_add_tail(&rbdc
->node
, &rbd_client_list
);
371 spin_unlock(&rbd_client_list_lock
);
373 mutex_unlock(&ctl_mutex
);
375 dout("rbd_client_create created %p\n", rbdc
);
379 ceph_destroy_client(rbdc
->client
);
381 mutex_unlock(&ctl_mutex
);
385 ceph_destroy_options(ceph_opts
);
390 * Find a ceph client with specific addr and configuration. If
391 * found, bump its reference count.
393 static struct rbd_client
*rbd_client_find(struct ceph_options
*ceph_opts
)
395 struct rbd_client
*client_node
;
398 if (ceph_opts
->flags
& CEPH_OPT_NOSHARE
)
401 spin_lock(&rbd_client_list_lock
);
402 list_for_each_entry(client_node
, &rbd_client_list
, node
) {
403 if (!ceph_compare_options(ceph_opts
, client_node
->client
)) {
404 kref_get(&client_node
->kref
);
409 spin_unlock(&rbd_client_list_lock
);
411 return found
? client_node
: NULL
;
421 /* string args above */
424 /* Boolean args above */
428 static match_table_t rbd_opts_tokens
= {
430 /* string args above */
431 {Opt_read_only
, "read_only"},
432 {Opt_read_only
, "ro"}, /* Alternate spelling */
433 {Opt_read_write
, "read_write"},
434 {Opt_read_write
, "rw"}, /* Alternate spelling */
435 /* Boolean args above */
439 static int parse_rbd_opts_token(char *c
, void *private)
441 struct rbd_options
*rbd_opts
= private;
442 substring_t argstr
[MAX_OPT_ARGS
];
443 int token
, intval
, ret
;
445 token
= match_token(c
, rbd_opts_tokens
, argstr
);
449 if (token
< Opt_last_int
) {
450 ret
= match_int(&argstr
[0], &intval
);
452 pr_err("bad mount option arg (not int) "
456 dout("got int token %d val %d\n", token
, intval
);
457 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
458 dout("got string token %d val %s\n", token
,
460 } else if (token
> Opt_last_string
&& token
< Opt_last_bool
) {
461 dout("got Boolean token %d\n", token
);
463 dout("got token %d\n", token
);
468 rbd_opts
->read_only
= true;
471 rbd_opts
->read_only
= false;
481 * Get a ceph client with specific addr and configuration, if one does
482 * not exist create it.
484 static struct rbd_client
*rbd_get_client(struct ceph_options
*ceph_opts
)
486 struct rbd_client
*rbdc
;
488 rbdc
= rbd_client_find(ceph_opts
);
489 if (rbdc
) /* using an existing client */
490 ceph_destroy_options(ceph_opts
);
492 rbdc
= rbd_client_create(ceph_opts
);
498 * Destroy ceph client
500 * Caller must hold rbd_client_list_lock.
502 static void rbd_client_release(struct kref
*kref
)
504 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
506 dout("rbd_release_client %p\n", rbdc
);
507 spin_lock(&rbd_client_list_lock
);
508 list_del(&rbdc
->node
);
509 spin_unlock(&rbd_client_list_lock
);
511 ceph_destroy_client(rbdc
->client
);
516 * Drop reference to ceph client node. If it's not referenced anymore, release
519 static void rbd_put_client(struct rbd_client
*rbdc
)
522 kref_put(&rbdc
->kref
, rbd_client_release
);
526 * Destroy requests collection
528 static void rbd_coll_release(struct kref
*kref
)
530 struct rbd_req_coll
*coll
=
531 container_of(kref
, struct rbd_req_coll
, kref
);
533 dout("rbd_coll_release %p\n", coll
);
537 static bool rbd_image_format_valid(u32 image_format
)
539 return image_format
== 1 || image_format
== 2;
542 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk
*ondisk
)
547 /* The header has to start with the magic rbd header text */
548 if (memcmp(&ondisk
->text
, RBD_HEADER_TEXT
, sizeof (RBD_HEADER_TEXT
)))
551 /* The bio layer requires at least sector-sized I/O */
553 if (ondisk
->options
.order
< SECTOR_SHIFT
)
556 /* If we use u64 in a few spots we may be able to loosen this */
558 if (ondisk
->options
.order
> 8 * sizeof (int) - 1)
562 * The size of a snapshot header has to fit in a size_t, and
563 * that limits the number of snapshots.
565 snap_count
= le32_to_cpu(ondisk
->snap_count
);
566 size
= SIZE_MAX
- sizeof (struct ceph_snap_context
);
567 if (snap_count
> size
/ sizeof (__le64
))
571 * Not only that, but the size of the entire the snapshot
572 * header must also be representable in a size_t.
574 size
-= snap_count
* sizeof (__le64
);
575 if ((u64
) size
< le64_to_cpu(ondisk
->snap_names_len
))
582 * Create a new header structure, translate header format from the on-disk
585 static int rbd_header_from_disk(struct rbd_image_header
*header
,
586 struct rbd_image_header_ondisk
*ondisk
)
593 memset(header
, 0, sizeof (*header
));
595 snap_count
= le32_to_cpu(ondisk
->snap_count
);
597 len
= strnlen(ondisk
->object_prefix
, sizeof (ondisk
->object_prefix
));
598 header
->object_prefix
= kmalloc(len
+ 1, GFP_KERNEL
);
599 if (!header
->object_prefix
)
601 memcpy(header
->object_prefix
, ondisk
->object_prefix
, len
);
602 header
->object_prefix
[len
] = '\0';
605 u64 snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
607 /* Save a copy of the snapshot names */
609 if (snap_names_len
> (u64
) SIZE_MAX
)
611 header
->snap_names
= kmalloc(snap_names_len
, GFP_KERNEL
);
612 if (!header
->snap_names
)
615 * Note that rbd_dev_v1_header_read() guarantees
616 * the ondisk buffer we're working with has
617 * snap_names_len bytes beyond the end of the
618 * snapshot id array, this memcpy() is safe.
620 memcpy(header
->snap_names
, &ondisk
->snaps
[snap_count
],
623 /* Record each snapshot's size */
625 size
= snap_count
* sizeof (*header
->snap_sizes
);
626 header
->snap_sizes
= kmalloc(size
, GFP_KERNEL
);
627 if (!header
->snap_sizes
)
629 for (i
= 0; i
< snap_count
; i
++)
630 header
->snap_sizes
[i
] =
631 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
633 WARN_ON(ondisk
->snap_names_len
);
634 header
->snap_names
= NULL
;
635 header
->snap_sizes
= NULL
;
638 header
->features
= 0; /* No features support in v1 images */
639 header
->obj_order
= ondisk
->options
.order
;
640 header
->crypt_type
= ondisk
->options
.crypt_type
;
641 header
->comp_type
= ondisk
->options
.comp_type
;
643 /* Allocate and fill in the snapshot context */
645 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
646 size
= sizeof (struct ceph_snap_context
);
647 size
+= snap_count
* sizeof (header
->snapc
->snaps
[0]);
648 header
->snapc
= kzalloc(size
, GFP_KERNEL
);
652 atomic_set(&header
->snapc
->nref
, 1);
653 header
->snapc
->seq
= le64_to_cpu(ondisk
->snap_seq
);
654 header
->snapc
->num_snaps
= snap_count
;
655 for (i
= 0; i
< snap_count
; i
++)
656 header
->snapc
->snaps
[i
] =
657 le64_to_cpu(ondisk
->snaps
[i
].id
);
662 kfree(header
->snap_sizes
);
663 header
->snap_sizes
= NULL
;
664 kfree(header
->snap_names
);
665 header
->snap_names
= NULL
;
666 kfree(header
->object_prefix
);
667 header
->object_prefix
= NULL
;
672 static const char *rbd_snap_name(struct rbd_device
*rbd_dev
, u64 snap_id
)
674 struct rbd_snap
*snap
;
676 if (snap_id
== CEPH_NOSNAP
)
677 return RBD_SNAP_HEAD_NAME
;
679 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
)
680 if (snap_id
== snap
->id
)
686 static int snap_by_name(struct rbd_device
*rbd_dev
, const char *snap_name
)
689 struct rbd_snap
*snap
;
691 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
692 if (!strcmp(snap_name
, snap
->name
)) {
693 rbd_dev
->spec
->snap_id
= snap
->id
;
694 rbd_dev
->mapping
.size
= snap
->size
;
695 rbd_dev
->mapping
.features
= snap
->features
;
704 static int rbd_dev_set_mapping(struct rbd_device
*rbd_dev
)
708 if (!memcmp(rbd_dev
->spec
->snap_name
, RBD_SNAP_HEAD_NAME
,
709 sizeof (RBD_SNAP_HEAD_NAME
))) {
710 rbd_dev
->spec
->snap_id
= CEPH_NOSNAP
;
711 rbd_dev
->mapping
.size
= rbd_dev
->header
.image_size
;
712 rbd_dev
->mapping
.features
= rbd_dev
->header
.features
;
715 ret
= snap_by_name(rbd_dev
, rbd_dev
->spec
->snap_name
);
718 rbd_dev
->mapping
.read_only
= true;
720 rbd_dev
->exists
= true;
725 static void rbd_header_free(struct rbd_image_header
*header
)
727 kfree(header
->object_prefix
);
728 header
->object_prefix
= NULL
;
729 kfree(header
->snap_sizes
);
730 header
->snap_sizes
= NULL
;
731 kfree(header
->snap_names
);
732 header
->snap_names
= NULL
;
733 ceph_put_snap_context(header
->snapc
);
734 header
->snapc
= NULL
;
737 static char *rbd_segment_name(struct rbd_device
*rbd_dev
, u64 offset
)
743 name
= kmalloc(RBD_MAX_SEG_NAME_LEN
+ 1, GFP_NOIO
);
746 segment
= offset
>> rbd_dev
->header
.obj_order
;
747 ret
= snprintf(name
, RBD_MAX_SEG_NAME_LEN
, "%s.%012llx",
748 rbd_dev
->header
.object_prefix
, segment
);
749 if (ret
< 0 || ret
>= RBD_MAX_SEG_NAME_LEN
) {
750 pr_err("error formatting segment name for #%llu (%d)\n",
759 static u64
rbd_segment_offset(struct rbd_device
*rbd_dev
, u64 offset
)
761 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
763 return offset
& (segment_size
- 1);
766 static u64
rbd_segment_length(struct rbd_device
*rbd_dev
,
767 u64 offset
, u64 length
)
769 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
771 offset
&= segment_size
- 1;
773 rbd_assert(length
<= U64_MAX
- offset
);
774 if (offset
+ length
> segment_size
)
775 length
= segment_size
- offset
;
780 static int rbd_get_num_segments(struct rbd_image_header
*header
,
788 if (len
- 1 > U64_MAX
- ofs
)
791 start_seg
= ofs
>> header
->obj_order
;
792 end_seg
= (ofs
+ len
- 1) >> header
->obj_order
;
794 return end_seg
- start_seg
+ 1;
798 * returns the size of an object in the image
800 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
802 return 1 << header
->obj_order
;
809 static void bio_chain_put(struct bio
*chain
)
815 chain
= chain
->bi_next
;
821 * zeros a bio chain, starting at specific offset
823 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
832 bio_for_each_segment(bv
, chain
, i
) {
833 if (pos
+ bv
->bv_len
> start_ofs
) {
834 int remainder
= max(start_ofs
- pos
, 0);
835 buf
= bvec_kmap_irq(bv
, &flags
);
836 memset(buf
+ remainder
, 0,
837 bv
->bv_len
- remainder
);
838 bvec_kunmap_irq(buf
, &flags
);
843 chain
= chain
->bi_next
;
848 * Clone a portion of a bio, starting at the given byte offset
849 * and continuing for the number of bytes indicated.
851 static struct bio
*bio_clone_range(struct bio
*bio_src
,
860 unsigned short end_idx
;
864 /* Handle the easy case for the caller */
866 if (!offset
&& len
== bio_src
->bi_size
)
867 return bio_clone(bio_src
, gfpmask
);
869 if (WARN_ON_ONCE(!len
))
871 if (WARN_ON_ONCE(len
> bio_src
->bi_size
))
873 if (WARN_ON_ONCE(offset
> bio_src
->bi_size
- len
))
876 /* Find first affected segment... */
879 __bio_for_each_segment(bv
, bio_src
, idx
, 0) {
880 if (resid
< bv
->bv_len
)
886 /* ...and the last affected segment */
889 __bio_for_each_segment(bv
, bio_src
, end_idx
, idx
) {
890 if (resid
<= bv
->bv_len
)
894 vcnt
= end_idx
- idx
+ 1;
896 /* Build the clone */
898 bio
= bio_alloc(gfpmask
, (unsigned int) vcnt
);
900 return NULL
; /* ENOMEM */
902 bio
->bi_bdev
= bio_src
->bi_bdev
;
903 bio
->bi_sector
= bio_src
->bi_sector
+ (offset
>> SECTOR_SHIFT
);
904 bio
->bi_rw
= bio_src
->bi_rw
;
905 bio
->bi_flags
|= 1 << BIO_CLONED
;
908 * Copy over our part of the bio_vec, then update the first
909 * and last (or only) entries.
911 memcpy(&bio
->bi_io_vec
[0], &bio_src
->bi_io_vec
[idx
],
912 vcnt
* sizeof (struct bio_vec
));
913 bio
->bi_io_vec
[0].bv_offset
+= voff
;
915 bio
->bi_io_vec
[0].bv_len
-= voff
;
916 bio
->bi_io_vec
[vcnt
- 1].bv_len
= resid
;
918 bio
->bi_io_vec
[0].bv_len
= len
;
929 * Clone a portion of a bio chain, starting at the given byte offset
930 * into the first bio in the source chain and continuing for the
931 * number of bytes indicated. The result is another bio chain of
932 * exactly the given length, or a null pointer on error.
934 * The bio_src and offset parameters are both in-out. On entry they
935 * refer to the first source bio and the offset into that bio where
936 * the start of data to be cloned is located.
938 * On return, bio_src is updated to refer to the bio in the source
939 * chain that contains first un-cloned byte, and *offset will
940 * contain the offset of that byte within that bio.
942 static struct bio
*bio_chain_clone_range(struct bio
**bio_src
,
943 unsigned int *offset
,
947 struct bio
*bi
= *bio_src
;
948 unsigned int off
= *offset
;
949 struct bio
*chain
= NULL
;
952 /* Build up a chain of clone bios up to the limit */
954 if (!bi
|| off
>= bi
->bi_size
|| !len
)
955 return NULL
; /* Nothing to clone */
959 unsigned int bi_size
;
963 goto out_err
; /* EINVAL; ran out of bio's */
964 bi_size
= min_t(unsigned int, bi
->bi_size
- off
, len
);
965 bio
= bio_clone_range(bi
, off
, bi_size
, gfpmask
);
967 goto out_err
; /* ENOMEM */
973 if (off
== bi
->bi_size
) {
984 bio_chain_put(chain
);
990 * helpers for osd request op vectors.
992 static struct ceph_osd_req_op
*rbd_create_rw_ops(int num_ops
,
993 int opcode
, u32 payload_len
)
995 struct ceph_osd_req_op
*ops
;
997 ops
= kzalloc(sizeof (*ops
) * (num_ops
+ 1), GFP_NOIO
);
1004 * op extent offset and length will be set later on
1005 * in calc_raw_layout()
1007 ops
[0].payload_len
= payload_len
;
1012 static void rbd_destroy_ops(struct ceph_osd_req_op
*ops
)
1017 static void rbd_coll_end_req_index(struct request
*rq
,
1018 struct rbd_req_coll
*coll
,
1022 struct request_queue
*q
;
1025 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1026 coll
, index
, ret
, (unsigned long long) len
);
1032 blk_end_request(rq
, ret
, len
);
1038 spin_lock_irq(q
->queue_lock
);
1039 coll
->status
[index
].done
= 1;
1040 coll
->status
[index
].rc
= ret
;
1041 coll
->status
[index
].bytes
= len
;
1042 max
= min
= coll
->num_done
;
1043 while (max
< coll
->total
&& coll
->status
[max
].done
)
1046 for (i
= min
; i
<max
; i
++) {
1047 __blk_end_request(rq
, coll
->status
[i
].rc
,
1048 coll
->status
[i
].bytes
);
1050 kref_put(&coll
->kref
, rbd_coll_release
);
1052 spin_unlock_irq(q
->queue_lock
);
1055 static void rbd_coll_end_req(struct rbd_request
*req
,
1058 rbd_coll_end_req_index(req
->rq
, req
->coll
, req
->coll_index
, ret
, len
);
1062 * Send ceph osd request
1064 static int rbd_do_request(struct request
*rq
,
1065 struct rbd_device
*rbd_dev
,
1066 struct ceph_snap_context
*snapc
,
1068 const char *object_name
, u64 ofs
, u64 len
,
1070 struct page
**pages
,
1073 struct ceph_osd_req_op
*ops
,
1074 struct rbd_req_coll
*coll
,
1076 void (*rbd_cb
)(struct ceph_osd_request
*req
,
1077 struct ceph_msg
*msg
),
1078 struct ceph_osd_request
**linger_req
,
1081 struct ceph_osd_request
*req
;
1082 struct ceph_file_layout
*layout
;
1085 struct timespec mtime
= CURRENT_TIME
;
1086 struct rbd_request
*req_data
;
1087 struct ceph_osd_request_head
*reqhead
;
1088 struct ceph_osd_client
*osdc
;
1090 req_data
= kzalloc(sizeof(*req_data
), GFP_NOIO
);
1093 rbd_coll_end_req_index(rq
, coll
, coll_index
,
1099 req_data
->coll
= coll
;
1100 req_data
->coll_index
= coll_index
;
1103 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1104 object_name
, (unsigned long long) ofs
,
1105 (unsigned long long) len
, coll
, coll_index
);
1107 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1108 req
= ceph_osdc_alloc_request(osdc
, flags
, snapc
, ops
,
1109 false, GFP_NOIO
, pages
, bio
);
1115 req
->r_callback
= rbd_cb
;
1118 req_data
->bio
= bio
;
1119 req_data
->pages
= pages
;
1120 req_data
->len
= len
;
1122 req
->r_priv
= req_data
;
1124 reqhead
= req
->r_request
->front
.iov_base
;
1125 reqhead
->snapid
= cpu_to_le64(CEPH_NOSNAP
);
1127 strncpy(req
->r_oid
, object_name
, sizeof(req
->r_oid
));
1128 req
->r_oid_len
= strlen(req
->r_oid
);
1130 layout
= &req
->r_file_layout
;
1131 memset(layout
, 0, sizeof(*layout
));
1132 layout
->fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
1133 layout
->fl_stripe_count
= cpu_to_le32(1);
1134 layout
->fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
1135 layout
->fl_pg_pool
= cpu_to_le32((int) rbd_dev
->spec
->pool_id
);
1136 ret
= ceph_calc_raw_layout(osdc
, layout
, snapid
, ofs
, &len
, &bno
,
1138 rbd_assert(ret
== 0);
1140 ceph_osdc_build_request(req
, ofs
, &len
,
1144 req
->r_oid
, req
->r_oid_len
);
1147 ceph_osdc_set_request_linger(osdc
, req
);
1151 ret
= ceph_osdc_start_request(osdc
, req
, false);
1156 ret
= ceph_osdc_wait_request(osdc
, req
);
1158 *ver
= le64_to_cpu(req
->r_reassert_version
.version
);
1159 dout("reassert_ver=%llu\n",
1160 (unsigned long long)
1161 le64_to_cpu(req
->r_reassert_version
.version
));
1162 ceph_osdc_put_request(req
);
1167 bio_chain_put(req_data
->bio
);
1168 ceph_osdc_put_request(req
);
1170 rbd_coll_end_req(req_data
, ret
, len
);
1176 * Ceph osd op callback
1178 static void rbd_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1180 struct rbd_request
*req_data
= req
->r_priv
;
1181 struct ceph_osd_reply_head
*replyhead
;
1182 struct ceph_osd_op
*op
;
1188 replyhead
= msg
->front
.iov_base
;
1189 WARN_ON(le32_to_cpu(replyhead
->num_ops
) == 0);
1190 op
= (void *)(replyhead
+ 1);
1191 rc
= le32_to_cpu(replyhead
->result
);
1192 bytes
= le64_to_cpu(op
->extent
.length
);
1193 read_op
= (le16_to_cpu(op
->op
) == CEPH_OSD_OP_READ
);
1195 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1196 (unsigned long long) bytes
, read_op
, (int) rc
);
1198 if (rc
== -ENOENT
&& read_op
) {
1199 zero_bio_chain(req_data
->bio
, 0);
1201 } else if (rc
== 0 && read_op
&& bytes
< req_data
->len
) {
1202 zero_bio_chain(req_data
->bio
, bytes
);
1203 bytes
= req_data
->len
;
1206 rbd_coll_end_req(req_data
, rc
, bytes
);
1209 bio_chain_put(req_data
->bio
);
1211 ceph_osdc_put_request(req
);
1215 static void rbd_simple_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1217 ceph_osdc_put_request(req
);
1221 * Do a synchronous ceph osd operation
1223 static int rbd_req_sync_op(struct rbd_device
*rbd_dev
,
1224 struct ceph_snap_context
*snapc
,
1227 struct ceph_osd_req_op
*ops
,
1228 const char *object_name
,
1229 u64 ofs
, u64 inbound_size
,
1231 struct ceph_osd_request
**linger_req
,
1235 struct page
**pages
;
1238 rbd_assert(ops
!= NULL
);
1240 num_pages
= calc_pages_for(ofs
, inbound_size
);
1241 pages
= ceph_alloc_page_vector(num_pages
, GFP_KERNEL
);
1243 return PTR_ERR(pages
);
1245 ret
= rbd_do_request(NULL
, rbd_dev
, snapc
, snapid
,
1246 object_name
, ofs
, inbound_size
, NULL
,
1256 if ((flags
& CEPH_OSD_FLAG_READ
) && inbound
)
1257 ret
= ceph_copy_from_page_vector(pages
, inbound
, ofs
, ret
);
1260 ceph_release_page_vector(pages
, num_pages
);
1265 * Do an asynchronous ceph osd operation
1267 static int rbd_do_op(struct request
*rq
,
1268 struct rbd_device
*rbd_dev
,
1269 struct ceph_snap_context
*snapc
,
1272 struct rbd_req_coll
*coll
,
1279 struct ceph_osd_req_op
*ops
;
1285 seg_name
= rbd_segment_name(rbd_dev
, ofs
);
1288 seg_len
= rbd_segment_length(rbd_dev
, ofs
, len
);
1289 seg_ofs
= rbd_segment_offset(rbd_dev
, ofs
);
1291 if (rq_data_dir(rq
) == WRITE
) {
1292 opcode
= CEPH_OSD_OP_WRITE
;
1293 flags
= CEPH_OSD_FLAG_WRITE
|CEPH_OSD_FLAG_ONDISK
;
1294 snapid
= CEPH_NOSNAP
;
1295 payload_len
= seg_len
;
1297 opcode
= CEPH_OSD_OP_READ
;
1298 flags
= CEPH_OSD_FLAG_READ
;
1300 snapid
= rbd_dev
->spec
->snap_id
;
1305 ops
= rbd_create_rw_ops(1, opcode
, payload_len
);
1309 /* we've taken care of segment sizes earlier when we
1310 cloned the bios. We should never have a segment
1311 truncated at this point */
1312 rbd_assert(seg_len
== len
);
1314 ret
= rbd_do_request(rq
, rbd_dev
, snapc
, snapid
,
1315 seg_name
, seg_ofs
, seg_len
,
1321 rbd_req_cb
, 0, NULL
);
1323 rbd_destroy_ops(ops
);
1330 * Request sync osd read
1332 static int rbd_req_sync_read(struct rbd_device
*rbd_dev
,
1334 const char *object_name
,
1339 struct ceph_osd_req_op
*ops
;
1342 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_READ
, 0);
1346 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1349 ops
, object_name
, ofs
, len
, buf
, NULL
, ver
);
1350 rbd_destroy_ops(ops
);
1356 * Request sync osd watch
1358 static int rbd_req_sync_notify_ack(struct rbd_device
*rbd_dev
,
1362 struct ceph_osd_req_op
*ops
;
1365 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK
, 0);
1369 ops
[0].watch
.ver
= cpu_to_le64(ver
);
1370 ops
[0].watch
.cookie
= notify_id
;
1371 ops
[0].watch
.flag
= 0;
1373 ret
= rbd_do_request(NULL
, rbd_dev
, NULL
, CEPH_NOSNAP
,
1374 rbd_dev
->header_name
, 0, 0, NULL
,
1379 rbd_simple_req_cb
, 0, NULL
);
1381 rbd_destroy_ops(ops
);
1385 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1387 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1394 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1395 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1396 (unsigned int) opcode
);
1397 rc
= rbd_dev_refresh(rbd_dev
, &hver
);
1399 pr_warning(RBD_DRV_NAME
"%d got notification but failed to "
1400 " update snaps: %d\n", rbd_dev
->major
, rc
);
1402 rbd_req_sync_notify_ack(rbd_dev
, hver
, notify_id
);
1406 * Request sync osd watch
1408 static int rbd_req_sync_watch(struct rbd_device
*rbd_dev
)
1410 struct ceph_osd_req_op
*ops
;
1411 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1414 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH
, 0);
1418 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, 0,
1419 (void *)rbd_dev
, &rbd_dev
->watch_event
);
1423 ops
[0].watch
.ver
= cpu_to_le64(rbd_dev
->header
.obj_version
);
1424 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1425 ops
[0].watch
.flag
= 1;
1427 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1429 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1431 rbd_dev
->header_name
,
1433 &rbd_dev
->watch_request
, NULL
);
1438 rbd_destroy_ops(ops
);
1442 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1443 rbd_dev
->watch_event
= NULL
;
1445 rbd_destroy_ops(ops
);
1450 * Request sync osd unwatch
1452 static int rbd_req_sync_unwatch(struct rbd_device
*rbd_dev
)
1454 struct ceph_osd_req_op
*ops
;
1457 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH
, 0);
1461 ops
[0].watch
.ver
= 0;
1462 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1463 ops
[0].watch
.flag
= 0;
1465 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1467 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1469 rbd_dev
->header_name
,
1470 0, 0, NULL
, NULL
, NULL
);
1473 rbd_destroy_ops(ops
);
1474 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1475 rbd_dev
->watch_event
= NULL
;
1480 * Synchronous osd object method call
1482 static int rbd_req_sync_exec(struct rbd_device
*rbd_dev
,
1483 const char *object_name
,
1484 const char *class_name
,
1485 const char *method_name
,
1486 const char *outbound
,
1487 size_t outbound_size
,
1489 size_t inbound_size
,
1493 struct ceph_osd_req_op
*ops
;
1494 int class_name_len
= strlen(class_name
);
1495 int method_name_len
= strlen(method_name
);
1500 * Any input parameters required by the method we're calling
1501 * will be sent along with the class and method names as
1502 * part of the message payload. That data and its size are
1503 * supplied via the indata and indata_len fields (named from
1504 * the perspective of the server side) in the OSD request
1507 payload_size
= class_name_len
+ method_name_len
+ outbound_size
;
1508 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_CALL
, payload_size
);
1512 ops
[0].cls
.class_name
= class_name
;
1513 ops
[0].cls
.class_len
= (__u8
) class_name_len
;
1514 ops
[0].cls
.method_name
= method_name
;
1515 ops
[0].cls
.method_len
= (__u8
) method_name_len
;
1516 ops
[0].cls
.argc
= 0;
1517 ops
[0].cls
.indata
= outbound
;
1518 ops
[0].cls
.indata_len
= outbound_size
;
1520 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1523 object_name
, 0, inbound_size
, inbound
,
1526 rbd_destroy_ops(ops
);
1528 dout("cls_exec returned %d\n", ret
);
1532 static struct rbd_req_coll
*rbd_alloc_coll(int num_reqs
)
1534 struct rbd_req_coll
*coll
=
1535 kzalloc(sizeof(struct rbd_req_coll
) +
1536 sizeof(struct rbd_req_status
) * num_reqs
,
1541 coll
->total
= num_reqs
;
1542 kref_init(&coll
->kref
);
1547 * block device queue callback
1549 static void rbd_rq_fn(struct request_queue
*q
)
1551 struct rbd_device
*rbd_dev
= q
->queuedata
;
1554 while ((rq
= blk_fetch_request(q
))) {
1559 int num_segs
, cur_seg
= 0;
1560 struct rbd_req_coll
*coll
;
1561 struct ceph_snap_context
*snapc
;
1562 unsigned int bio_offset
;
1564 dout("fetched request\n");
1566 /* filter out block requests we don't understand */
1567 if ((rq
->cmd_type
!= REQ_TYPE_FS
)) {
1568 __blk_end_request_all(rq
, 0);
1572 /* deduce our operation (read, write) */
1573 do_write
= (rq_data_dir(rq
) == WRITE
);
1574 if (do_write
&& rbd_dev
->mapping
.read_only
) {
1575 __blk_end_request_all(rq
, -EROFS
);
1579 spin_unlock_irq(q
->queue_lock
);
1581 down_read(&rbd_dev
->header_rwsem
);
1583 if (!rbd_dev
->exists
) {
1584 rbd_assert(rbd_dev
->spec
->snap_id
!= CEPH_NOSNAP
);
1585 up_read(&rbd_dev
->header_rwsem
);
1586 dout("request for non-existent snapshot");
1587 spin_lock_irq(q
->queue_lock
);
1588 __blk_end_request_all(rq
, -ENXIO
);
1592 snapc
= ceph_get_snap_context(rbd_dev
->header
.snapc
);
1594 up_read(&rbd_dev
->header_rwsem
);
1596 size
= blk_rq_bytes(rq
);
1597 ofs
= blk_rq_pos(rq
) * SECTOR_SIZE
;
1600 dout("%s 0x%x bytes at 0x%llx\n",
1601 do_write
? "write" : "read",
1602 size
, (unsigned long long) blk_rq_pos(rq
) * SECTOR_SIZE
);
1604 num_segs
= rbd_get_num_segments(&rbd_dev
->header
, ofs
, size
);
1605 if (num_segs
<= 0) {
1606 spin_lock_irq(q
->queue_lock
);
1607 __blk_end_request_all(rq
, num_segs
);
1608 ceph_put_snap_context(snapc
);
1611 coll
= rbd_alloc_coll(num_segs
);
1613 spin_lock_irq(q
->queue_lock
);
1614 __blk_end_request_all(rq
, -ENOMEM
);
1615 ceph_put_snap_context(snapc
);
1621 u64 limit
= rbd_segment_length(rbd_dev
, ofs
, size
);
1622 unsigned int chain_size
;
1623 struct bio
*bio_chain
;
1625 BUG_ON(limit
> (u64
) UINT_MAX
);
1626 chain_size
= (unsigned int) limit
;
1627 dout("rq->bio->bi_vcnt=%hu\n", rq
->bio
->bi_vcnt
);
1629 kref_get(&coll
->kref
);
1631 /* Pass a cloned bio chain via an osd request */
1633 bio_chain
= bio_chain_clone_range(&bio
,
1634 &bio_offset
, chain_size
,
1637 (void) rbd_do_op(rq
, rbd_dev
, snapc
,
1639 bio_chain
, coll
, cur_seg
);
1641 rbd_coll_end_req_index(rq
, coll
, cur_seg
,
1642 -ENOMEM
, chain_size
);
1648 kref_put(&coll
->kref
, rbd_coll_release
);
1650 spin_lock_irq(q
->queue_lock
);
1652 ceph_put_snap_context(snapc
);
1657 * a queue callback. Makes sure that we don't create a bio that spans across
1658 * multiple osd objects. One exception would be with a single page bios,
1659 * which we handle later at bio_chain_clone_range()
1661 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
1662 struct bio_vec
*bvec
)
1664 struct rbd_device
*rbd_dev
= q
->queuedata
;
1665 sector_t sector_offset
;
1666 sector_t sectors_per_obj
;
1667 sector_t obj_sector_offset
;
1671 * Find how far into its rbd object the partition-relative
1672 * bio start sector is to offset relative to the enclosing
1675 sector_offset
= get_start_sect(bmd
->bi_bdev
) + bmd
->bi_sector
;
1676 sectors_per_obj
= 1 << (rbd_dev
->header
.obj_order
- SECTOR_SHIFT
);
1677 obj_sector_offset
= sector_offset
& (sectors_per_obj
- 1);
1680 * Compute the number of bytes from that offset to the end
1681 * of the object. Account for what's already used by the bio.
1683 ret
= (int) (sectors_per_obj
- obj_sector_offset
) << SECTOR_SHIFT
;
1684 if (ret
> bmd
->bi_size
)
1685 ret
-= bmd
->bi_size
;
1690 * Don't send back more than was asked for. And if the bio
1691 * was empty, let the whole thing through because: "Note
1692 * that a block device *must* allow a single page to be
1693 * added to an empty bio."
1695 rbd_assert(bvec
->bv_len
<= PAGE_SIZE
);
1696 if (ret
> (int) bvec
->bv_len
|| !bmd
->bi_size
)
1697 ret
= (int) bvec
->bv_len
;
1702 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
1704 struct gendisk
*disk
= rbd_dev
->disk
;
1709 if (disk
->flags
& GENHD_FL_UP
)
1712 blk_cleanup_queue(disk
->queue
);
1717 * Read the complete header for the given rbd device.
1719 * Returns a pointer to a dynamically-allocated buffer containing
1720 * the complete and validated header. Caller can pass the address
1721 * of a variable that will be filled in with the version of the
1722 * header object at the time it was read.
1724 * Returns a pointer-coded errno if a failure occurs.
1726 static struct rbd_image_header_ondisk
*
1727 rbd_dev_v1_header_read(struct rbd_device
*rbd_dev
, u64
*version
)
1729 struct rbd_image_header_ondisk
*ondisk
= NULL
;
1736 * The complete header will include an array of its 64-bit
1737 * snapshot ids, followed by the names of those snapshots as
1738 * a contiguous block of NUL-terminated strings. Note that
1739 * the number of snapshots could change by the time we read
1740 * it in, in which case we re-read it.
1747 size
= sizeof (*ondisk
);
1748 size
+= snap_count
* sizeof (struct rbd_image_snap_ondisk
);
1750 ondisk
= kmalloc(size
, GFP_KERNEL
);
1752 return ERR_PTR(-ENOMEM
);
1754 ret
= rbd_req_sync_read(rbd_dev
, CEPH_NOSNAP
,
1755 rbd_dev
->header_name
,
1757 (char *) ondisk
, version
);
1761 if (WARN_ON((size_t) ret
< size
)) {
1763 pr_warning("short header read for image %s"
1764 " (want %zd got %d)\n",
1765 rbd_dev
->spec
->image_name
, size
, ret
);
1768 if (!rbd_dev_ondisk_valid(ondisk
)) {
1770 pr_warning("invalid header for image %s\n",
1771 rbd_dev
->spec
->image_name
);
1775 names_size
= le64_to_cpu(ondisk
->snap_names_len
);
1776 want_count
= snap_count
;
1777 snap_count
= le32_to_cpu(ondisk
->snap_count
);
1778 } while (snap_count
!= want_count
);
1785 return ERR_PTR(ret
);
1789 * reload the ondisk the header
1791 static int rbd_read_header(struct rbd_device
*rbd_dev
,
1792 struct rbd_image_header
*header
)
1794 struct rbd_image_header_ondisk
*ondisk
;
1798 ondisk
= rbd_dev_v1_header_read(rbd_dev
, &ver
);
1800 return PTR_ERR(ondisk
);
1801 ret
= rbd_header_from_disk(header
, ondisk
);
1803 header
->obj_version
= ver
;
1809 static void rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
1811 struct rbd_snap
*snap
;
1812 struct rbd_snap
*next
;
1814 list_for_each_entry_safe(snap
, next
, &rbd_dev
->snaps
, node
)
1815 rbd_remove_snap_dev(snap
);
1818 static void rbd_update_mapping_size(struct rbd_device
*rbd_dev
)
1822 if (rbd_dev
->spec
->snap_id
!= CEPH_NOSNAP
)
1825 size
= (sector_t
) rbd_dev
->header
.image_size
/ SECTOR_SIZE
;
1826 dout("setting size to %llu sectors", (unsigned long long) size
);
1827 rbd_dev
->mapping
.size
= (u64
) size
;
1828 set_capacity(rbd_dev
->disk
, size
);
1832 * only read the first part of the ondisk header, without the snaps info
1834 static int rbd_dev_v1_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
1837 struct rbd_image_header h
;
1839 ret
= rbd_read_header(rbd_dev
, &h
);
1843 down_write(&rbd_dev
->header_rwsem
);
1845 /* Update image size, and check for resize of mapped image */
1846 rbd_dev
->header
.image_size
= h
.image_size
;
1847 rbd_update_mapping_size(rbd_dev
);
1849 /* rbd_dev->header.object_prefix shouldn't change */
1850 kfree(rbd_dev
->header
.snap_sizes
);
1851 kfree(rbd_dev
->header
.snap_names
);
1852 /* osd requests may still refer to snapc */
1853 ceph_put_snap_context(rbd_dev
->header
.snapc
);
1856 *hver
= h
.obj_version
;
1857 rbd_dev
->header
.obj_version
= h
.obj_version
;
1858 rbd_dev
->header
.image_size
= h
.image_size
;
1859 rbd_dev
->header
.snapc
= h
.snapc
;
1860 rbd_dev
->header
.snap_names
= h
.snap_names
;
1861 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
1862 /* Free the extra copy of the object prefix */
1863 WARN_ON(strcmp(rbd_dev
->header
.object_prefix
, h
.object_prefix
));
1864 kfree(h
.object_prefix
);
1866 ret
= rbd_dev_snaps_update(rbd_dev
);
1868 ret
= rbd_dev_snaps_register(rbd_dev
);
1870 up_write(&rbd_dev
->header_rwsem
);
1875 static int rbd_dev_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
1879 rbd_assert(rbd_image_format_valid(rbd_dev
->image_format
));
1880 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1881 if (rbd_dev
->image_format
== 1)
1882 ret
= rbd_dev_v1_refresh(rbd_dev
, hver
);
1884 ret
= rbd_dev_v2_refresh(rbd_dev
, hver
);
1885 mutex_unlock(&ctl_mutex
);
1890 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
1892 struct gendisk
*disk
;
1893 struct request_queue
*q
;
1896 /* create gendisk info */
1897 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
1901 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), RBD_DRV_NAME
"%d",
1903 disk
->major
= rbd_dev
->major
;
1904 disk
->first_minor
= 0;
1905 disk
->fops
= &rbd_bd_ops
;
1906 disk
->private_data
= rbd_dev
;
1909 q
= blk_init_queue(rbd_rq_fn
, &rbd_dev
->lock
);
1913 /* We use the default size, but let's be explicit about it. */
1914 blk_queue_physical_block_size(q
, SECTOR_SIZE
);
1916 /* set io sizes to object size */
1917 segment_size
= rbd_obj_bytes(&rbd_dev
->header
);
1918 blk_queue_max_hw_sectors(q
, segment_size
/ SECTOR_SIZE
);
1919 blk_queue_max_segment_size(q
, segment_size
);
1920 blk_queue_io_min(q
, segment_size
);
1921 blk_queue_io_opt(q
, segment_size
);
1923 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
1926 q
->queuedata
= rbd_dev
;
1928 rbd_dev
->disk
= disk
;
1930 set_capacity(rbd_dev
->disk
, rbd_dev
->mapping
.size
/ SECTOR_SIZE
);
1943 static struct rbd_device
*dev_to_rbd_dev(struct device
*dev
)
1945 return container_of(dev
, struct rbd_device
, dev
);
1948 static ssize_t
rbd_size_show(struct device
*dev
,
1949 struct device_attribute
*attr
, char *buf
)
1951 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1954 down_read(&rbd_dev
->header_rwsem
);
1955 size
= get_capacity(rbd_dev
->disk
);
1956 up_read(&rbd_dev
->header_rwsem
);
1958 return sprintf(buf
, "%llu\n", (unsigned long long) size
* SECTOR_SIZE
);
1962 * Note this shows the features for whatever's mapped, which is not
1963 * necessarily the base image.
1965 static ssize_t
rbd_features_show(struct device
*dev
,
1966 struct device_attribute
*attr
, char *buf
)
1968 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1970 return sprintf(buf
, "0x%016llx\n",
1971 (unsigned long long) rbd_dev
->mapping
.features
);
1974 static ssize_t
rbd_major_show(struct device
*dev
,
1975 struct device_attribute
*attr
, char *buf
)
1977 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1979 return sprintf(buf
, "%d\n", rbd_dev
->major
);
1982 static ssize_t
rbd_client_id_show(struct device
*dev
,
1983 struct device_attribute
*attr
, char *buf
)
1985 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1987 return sprintf(buf
, "client%lld\n",
1988 ceph_client_id(rbd_dev
->rbd_client
->client
));
1991 static ssize_t
rbd_pool_show(struct device
*dev
,
1992 struct device_attribute
*attr
, char *buf
)
1994 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1996 return sprintf(buf
, "%s\n", rbd_dev
->spec
->pool_name
);
1999 static ssize_t
rbd_pool_id_show(struct device
*dev
,
2000 struct device_attribute
*attr
, char *buf
)
2002 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2004 return sprintf(buf
, "%llu\n",
2005 (unsigned long long) rbd_dev
->spec
->pool_id
);
2008 static ssize_t
rbd_name_show(struct device
*dev
,
2009 struct device_attribute
*attr
, char *buf
)
2011 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2013 if (rbd_dev
->spec
->image_name
)
2014 return sprintf(buf
, "%s\n", rbd_dev
->spec
->image_name
);
2016 return sprintf(buf
, "(unknown)\n");
2019 static ssize_t
rbd_image_id_show(struct device
*dev
,
2020 struct device_attribute
*attr
, char *buf
)
2022 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2024 return sprintf(buf
, "%s\n", rbd_dev
->spec
->image_id
);
2028 * Shows the name of the currently-mapped snapshot (or
2029 * RBD_SNAP_HEAD_NAME for the base image).
2031 static ssize_t
rbd_snap_show(struct device
*dev
,
2032 struct device_attribute
*attr
,
2035 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2037 return sprintf(buf
, "%s\n", rbd_dev
->spec
->snap_name
);
2041 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2042 * for the parent image. If there is no parent, simply shows
2043 * "(no parent image)".
2045 static ssize_t
rbd_parent_show(struct device
*dev
,
2046 struct device_attribute
*attr
,
2049 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2050 struct rbd_spec
*spec
= rbd_dev
->parent_spec
;
2055 return sprintf(buf
, "(no parent image)\n");
2057 count
= sprintf(bufp
, "pool_id %llu\npool_name %s\n",
2058 (unsigned long long) spec
->pool_id
, spec
->pool_name
);
2063 count
= sprintf(bufp
, "image_id %s\nimage_name %s\n", spec
->image_id
,
2064 spec
->image_name
? spec
->image_name
: "(unknown)");
2069 count
= sprintf(bufp
, "snap_id %llu\nsnap_name %s\n",
2070 (unsigned long long) spec
->snap_id
, spec
->snap_name
);
2075 count
= sprintf(bufp
, "overlap %llu\n", rbd_dev
->parent_overlap
);
2080 return (ssize_t
) (bufp
- buf
);
2083 static ssize_t
rbd_image_refresh(struct device
*dev
,
2084 struct device_attribute
*attr
,
2088 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2091 ret
= rbd_dev_refresh(rbd_dev
, NULL
);
2093 return ret
< 0 ? ret
: size
;
2096 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
2097 static DEVICE_ATTR(features
, S_IRUGO
, rbd_features_show
, NULL
);
2098 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
2099 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
2100 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
2101 static DEVICE_ATTR(pool_id
, S_IRUGO
, rbd_pool_id_show
, NULL
);
2102 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
2103 static DEVICE_ATTR(image_id
, S_IRUGO
, rbd_image_id_show
, NULL
);
2104 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
2105 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
2106 static DEVICE_ATTR(parent
, S_IRUGO
, rbd_parent_show
, NULL
);
2108 static struct attribute
*rbd_attrs
[] = {
2109 &dev_attr_size
.attr
,
2110 &dev_attr_features
.attr
,
2111 &dev_attr_major
.attr
,
2112 &dev_attr_client_id
.attr
,
2113 &dev_attr_pool
.attr
,
2114 &dev_attr_pool_id
.attr
,
2115 &dev_attr_name
.attr
,
2116 &dev_attr_image_id
.attr
,
2117 &dev_attr_current_snap
.attr
,
2118 &dev_attr_parent
.attr
,
2119 &dev_attr_refresh
.attr
,
2123 static struct attribute_group rbd_attr_group
= {
2127 static const struct attribute_group
*rbd_attr_groups
[] = {
2132 static void rbd_sysfs_dev_release(struct device
*dev
)
2136 static struct device_type rbd_device_type
= {
2138 .groups
= rbd_attr_groups
,
2139 .release
= rbd_sysfs_dev_release
,
2147 static ssize_t
rbd_snap_size_show(struct device
*dev
,
2148 struct device_attribute
*attr
,
2151 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2153 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->size
);
2156 static ssize_t
rbd_snap_id_show(struct device
*dev
,
2157 struct device_attribute
*attr
,
2160 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2162 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->id
);
2165 static ssize_t
rbd_snap_features_show(struct device
*dev
,
2166 struct device_attribute
*attr
,
2169 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2171 return sprintf(buf
, "0x%016llx\n",
2172 (unsigned long long) snap
->features
);
2175 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
2176 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
2177 static DEVICE_ATTR(snap_features
, S_IRUGO
, rbd_snap_features_show
, NULL
);
2179 static struct attribute
*rbd_snap_attrs
[] = {
2180 &dev_attr_snap_size
.attr
,
2181 &dev_attr_snap_id
.attr
,
2182 &dev_attr_snap_features
.attr
,
2186 static struct attribute_group rbd_snap_attr_group
= {
2187 .attrs
= rbd_snap_attrs
,
2190 static void rbd_snap_dev_release(struct device
*dev
)
2192 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2197 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
2198 &rbd_snap_attr_group
,
2202 static struct device_type rbd_snap_device_type
= {
2203 .groups
= rbd_snap_attr_groups
,
2204 .release
= rbd_snap_dev_release
,
2207 static struct rbd_spec
*rbd_spec_get(struct rbd_spec
*spec
)
2209 kref_get(&spec
->kref
);
2214 static void rbd_spec_free(struct kref
*kref
);
2215 static void rbd_spec_put(struct rbd_spec
*spec
)
2218 kref_put(&spec
->kref
, rbd_spec_free
);
2221 static struct rbd_spec
*rbd_spec_alloc(void)
2223 struct rbd_spec
*spec
;
2225 spec
= kzalloc(sizeof (*spec
), GFP_KERNEL
);
2228 kref_init(&spec
->kref
);
2230 rbd_spec_put(rbd_spec_get(spec
)); /* TEMPORARY */
2235 static void rbd_spec_free(struct kref
*kref
)
2237 struct rbd_spec
*spec
= container_of(kref
, struct rbd_spec
, kref
);
2239 kfree(spec
->pool_name
);
2240 kfree(spec
->image_id
);
2241 kfree(spec
->image_name
);
2242 kfree(spec
->snap_name
);
2246 struct rbd_device
*rbd_dev_create(struct rbd_client
*rbdc
,
2247 struct rbd_spec
*spec
)
2249 struct rbd_device
*rbd_dev
;
2251 rbd_dev
= kzalloc(sizeof (*rbd_dev
), GFP_KERNEL
);
2255 spin_lock_init(&rbd_dev
->lock
);
2256 INIT_LIST_HEAD(&rbd_dev
->node
);
2257 INIT_LIST_HEAD(&rbd_dev
->snaps
);
2258 init_rwsem(&rbd_dev
->header_rwsem
);
2260 rbd_dev
->spec
= spec
;
2261 rbd_dev
->rbd_client
= rbdc
;
2266 static void rbd_dev_destroy(struct rbd_device
*rbd_dev
)
2268 rbd_spec_put(rbd_dev
->parent_spec
);
2269 kfree(rbd_dev
->header_name
);
2270 rbd_put_client(rbd_dev
->rbd_client
);
2271 rbd_spec_put(rbd_dev
->spec
);
2275 static bool rbd_snap_registered(struct rbd_snap
*snap
)
2277 bool ret
= snap
->dev
.type
== &rbd_snap_device_type
;
2278 bool reg
= device_is_registered(&snap
->dev
);
2280 rbd_assert(!ret
^ reg
);
2285 static void rbd_remove_snap_dev(struct rbd_snap
*snap
)
2287 list_del(&snap
->node
);
2288 if (device_is_registered(&snap
->dev
))
2289 device_unregister(&snap
->dev
);
2292 static int rbd_register_snap_dev(struct rbd_snap
*snap
,
2293 struct device
*parent
)
2295 struct device
*dev
= &snap
->dev
;
2298 dev
->type
= &rbd_snap_device_type
;
2299 dev
->parent
= parent
;
2300 dev
->release
= rbd_snap_dev_release
;
2301 dev_set_name(dev
, "%s%s", RBD_SNAP_DEV_NAME_PREFIX
, snap
->name
);
2302 dout("%s: registering device for snapshot %s\n", __func__
, snap
->name
);
2304 ret
= device_register(dev
);
2309 static struct rbd_snap
*__rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
2310 const char *snap_name
,
2311 u64 snap_id
, u64 snap_size
,
2314 struct rbd_snap
*snap
;
2317 snap
= kzalloc(sizeof (*snap
), GFP_KERNEL
);
2319 return ERR_PTR(-ENOMEM
);
2322 snap
->name
= kstrdup(snap_name
, GFP_KERNEL
);
2327 snap
->size
= snap_size
;
2328 snap
->features
= snap_features
;
2336 return ERR_PTR(ret
);
2339 static char *rbd_dev_v1_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
2340 u64
*snap_size
, u64
*snap_features
)
2344 rbd_assert(which
< rbd_dev
->header
.snapc
->num_snaps
);
2346 *snap_size
= rbd_dev
->header
.snap_sizes
[which
];
2347 *snap_features
= 0; /* No features for v1 */
2349 /* Skip over names until we find the one we are looking for */
2351 snap_name
= rbd_dev
->header
.snap_names
;
2353 snap_name
+= strlen(snap_name
) + 1;
2359 * Get the size and object order for an image snapshot, or if
2360 * snap_id is CEPH_NOSNAP, gets this information for the base
2363 static int _rbd_dev_v2_snap_size(struct rbd_device
*rbd_dev
, u64 snap_id
,
2364 u8
*order
, u64
*snap_size
)
2366 __le64 snapid
= cpu_to_le64(snap_id
);
2371 } __attribute__ ((packed
)) size_buf
= { 0 };
2373 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2375 (char *) &snapid
, sizeof (snapid
),
2376 (char *) &size_buf
, sizeof (size_buf
),
2377 CEPH_OSD_FLAG_READ
, NULL
);
2378 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2382 *order
= size_buf
.order
;
2383 *snap_size
= le64_to_cpu(size_buf
.size
);
2385 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2386 (unsigned long long) snap_id
, (unsigned int) *order
,
2387 (unsigned long long) *snap_size
);
2392 static int rbd_dev_v2_image_size(struct rbd_device
*rbd_dev
)
2394 return _rbd_dev_v2_snap_size(rbd_dev
, CEPH_NOSNAP
,
2395 &rbd_dev
->header
.obj_order
,
2396 &rbd_dev
->header
.image_size
);
2399 static int rbd_dev_v2_object_prefix(struct rbd_device
*rbd_dev
)
2405 reply_buf
= kzalloc(RBD_OBJ_PREFIX_LEN_MAX
, GFP_KERNEL
);
2409 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2410 "rbd", "get_object_prefix",
2412 reply_buf
, RBD_OBJ_PREFIX_LEN_MAX
,
2413 CEPH_OSD_FLAG_READ
, NULL
);
2414 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2417 ret
= 0; /* rbd_req_sync_exec() can return positive */
2420 rbd_dev
->header
.object_prefix
= ceph_extract_encoded_string(&p
,
2421 p
+ RBD_OBJ_PREFIX_LEN_MAX
,
2424 if (IS_ERR(rbd_dev
->header
.object_prefix
)) {
2425 ret
= PTR_ERR(rbd_dev
->header
.object_prefix
);
2426 rbd_dev
->header
.object_prefix
= NULL
;
2428 dout(" object_prefix = %s\n", rbd_dev
->header
.object_prefix
);
2437 static int _rbd_dev_v2_snap_features(struct rbd_device
*rbd_dev
, u64 snap_id
,
2440 __le64 snapid
= cpu_to_le64(snap_id
);
2444 } features_buf
= { 0 };
2448 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2449 "rbd", "get_features",
2450 (char *) &snapid
, sizeof (snapid
),
2451 (char *) &features_buf
, sizeof (features_buf
),
2452 CEPH_OSD_FLAG_READ
, NULL
);
2453 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2457 incompat
= le64_to_cpu(features_buf
.incompat
);
2458 if (incompat
& ~RBD_FEATURES_ALL
)
2461 *snap_features
= le64_to_cpu(features_buf
.features
);
2463 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2464 (unsigned long long) snap_id
,
2465 (unsigned long long) *snap_features
,
2466 (unsigned long long) le64_to_cpu(features_buf
.incompat
));
2471 static int rbd_dev_v2_features(struct rbd_device
*rbd_dev
)
2473 return _rbd_dev_v2_snap_features(rbd_dev
, CEPH_NOSNAP
,
2474 &rbd_dev
->header
.features
);
2477 static int rbd_dev_v2_parent_info(struct rbd_device
*rbd_dev
)
2479 struct rbd_spec
*parent_spec
;
2481 void *reply_buf
= NULL
;
2490 parent_spec
= rbd_spec_alloc();
2494 size
= sizeof (__le64
) + /* pool_id */
2495 sizeof (__le32
) + RBD_IMAGE_ID_LEN_MAX
+ /* image_id */
2496 sizeof (__le64
) + /* snap_id */
2497 sizeof (__le64
); /* overlap */
2498 reply_buf
= kmalloc(size
, GFP_KERNEL
);
2504 snapid
= cpu_to_le64(CEPH_NOSNAP
);
2505 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2506 "rbd", "get_parent",
2507 (char *) &snapid
, sizeof (snapid
),
2508 (char *) reply_buf
, size
,
2509 CEPH_OSD_FLAG_READ
, NULL
);
2510 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2516 end
= (char *) reply_buf
+ size
;
2517 ceph_decode_64_safe(&p
, end
, parent_spec
->pool_id
, out_err
);
2518 if (parent_spec
->pool_id
== CEPH_NOPOOL
)
2519 goto out
; /* No parent? No problem. */
2521 image_id
= ceph_extract_encoded_string(&p
, end
, &len
, GFP_KERNEL
);
2522 if (IS_ERR(image_id
)) {
2523 ret
= PTR_ERR(image_id
);
2526 parent_spec
->image_id
= image_id
;
2527 parent_spec
->image_id_len
= len
;
2528 ceph_decode_64_safe(&p
, end
, parent_spec
->snap_id
, out_err
);
2529 ceph_decode_64_safe(&p
, end
, overlap
, out_err
);
2531 rbd_dev
->parent_overlap
= overlap
;
2532 rbd_dev
->parent_spec
= parent_spec
;
2533 parent_spec
= NULL
; /* rbd_dev now owns this */
2538 rbd_spec_put(parent_spec
);
2543 static char *rbd_dev_image_name(struct rbd_device
*rbd_dev
)
2545 size_t image_id_size
;
2550 void *reply_buf
= NULL
;
2552 char *image_name
= NULL
;
2555 rbd_assert(!rbd_dev
->spec
->image_name
);
2557 image_id_size
= sizeof (__le32
) + rbd_dev
->spec
->image_id_len
;
2558 image_id
= kmalloc(image_id_size
, GFP_KERNEL
);
2563 end
= (char *) image_id
+ image_id_size
;
2564 ceph_encode_string(&p
, end
, rbd_dev
->spec
->image_id
,
2565 (u32
) rbd_dev
->spec
->image_id_len
);
2567 size
= sizeof (__le32
) + RBD_IMAGE_NAME_LEN_MAX
;
2568 reply_buf
= kmalloc(size
, GFP_KERNEL
);
2572 ret
= rbd_req_sync_exec(rbd_dev
, RBD_DIRECTORY
,
2573 "rbd", "dir_get_name",
2574 image_id
, image_id_size
,
2575 (char *) reply_buf
, size
,
2576 CEPH_OSD_FLAG_READ
, NULL
);
2580 end
= (char *) reply_buf
+ size
;
2581 image_name
= ceph_extract_encoded_string(&p
, end
, &len
, GFP_KERNEL
);
2582 if (IS_ERR(image_name
))
2585 dout("%s: name is %s len is %zd\n", __func__
, image_name
, len
);
2594 * When a parent image gets probed, we only have the pool, image,
2595 * and snapshot ids but not the names of any of them. This call
2596 * is made later to fill in those names. It has to be done after
2597 * rbd_dev_snaps_update() has completed because some of the
2598 * information (in particular, snapshot name) is not available
2601 static int rbd_dev_probe_update_spec(struct rbd_device
*rbd_dev
)
2603 struct ceph_osd_client
*osdc
;
2605 void *reply_buf
= NULL
;
2608 if (rbd_dev
->spec
->pool_name
)
2609 return 0; /* Already have the names */
2611 /* Look up the pool name */
2613 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2614 name
= ceph_pg_pool_name_by_id(osdc
->osdmap
, rbd_dev
->spec
->pool_id
);
2616 return -EIO
; /* pool id too large (>= 2^31) */
2618 rbd_dev
->spec
->pool_name
= kstrdup(name
, GFP_KERNEL
);
2619 if (!rbd_dev
->spec
->pool_name
)
2622 /* Fetch the image name; tolerate failure here */
2624 name
= rbd_dev_image_name(rbd_dev
);
2626 rbd_dev
->spec
->image_name_len
= strlen(name
);
2627 rbd_dev
->spec
->image_name
= (char *) name
;
2629 pr_warning(RBD_DRV_NAME
"%d "
2630 "unable to get image name for image id %s\n",
2631 rbd_dev
->major
, rbd_dev
->spec
->image_id
);
2634 /* Look up the snapshot name. */
2636 name
= rbd_snap_name(rbd_dev
, rbd_dev
->spec
->snap_id
);
2641 rbd_dev
->spec
->snap_name
= kstrdup(name
, GFP_KERNEL
);
2642 if(!rbd_dev
->spec
->snap_name
)
2648 kfree(rbd_dev
->spec
->pool_name
);
2649 rbd_dev
->spec
->pool_name
= NULL
;
2654 static int rbd_dev_v2_snap_context(struct rbd_device
*rbd_dev
, u64
*ver
)
2663 struct ceph_snap_context
*snapc
;
2667 * We'll need room for the seq value (maximum snapshot id),
2668 * snapshot count, and array of that many snapshot ids.
2669 * For now we have a fixed upper limit on the number we're
2670 * prepared to receive.
2672 size
= sizeof (__le64
) + sizeof (__le32
) +
2673 RBD_MAX_SNAP_COUNT
* sizeof (__le64
);
2674 reply_buf
= kzalloc(size
, GFP_KERNEL
);
2678 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2679 "rbd", "get_snapcontext",
2682 CEPH_OSD_FLAG_READ
, ver
);
2683 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2689 end
= (char *) reply_buf
+ size
;
2690 ceph_decode_64_safe(&p
, end
, seq
, out
);
2691 ceph_decode_32_safe(&p
, end
, snap_count
, out
);
2694 * Make sure the reported number of snapshot ids wouldn't go
2695 * beyond the end of our buffer. But before checking that,
2696 * make sure the computed size of the snapshot context we
2697 * allocate is representable in a size_t.
2699 if (snap_count
> (SIZE_MAX
- sizeof (struct ceph_snap_context
))
2704 if (!ceph_has_room(&p
, end
, snap_count
* sizeof (__le64
)))
2707 size
= sizeof (struct ceph_snap_context
) +
2708 snap_count
* sizeof (snapc
->snaps
[0]);
2709 snapc
= kmalloc(size
, GFP_KERNEL
);
2715 atomic_set(&snapc
->nref
, 1);
2717 snapc
->num_snaps
= snap_count
;
2718 for (i
= 0; i
< snap_count
; i
++)
2719 snapc
->snaps
[i
] = ceph_decode_64(&p
);
2721 rbd_dev
->header
.snapc
= snapc
;
2723 dout(" snap context seq = %llu, snap_count = %u\n",
2724 (unsigned long long) seq
, (unsigned int) snap_count
);
2732 static char *rbd_dev_v2_snap_name(struct rbd_device
*rbd_dev
, u32 which
)
2742 size
= sizeof (__le32
) + RBD_MAX_SNAP_NAME_LEN
;
2743 reply_buf
= kmalloc(size
, GFP_KERNEL
);
2745 return ERR_PTR(-ENOMEM
);
2747 snap_id
= cpu_to_le64(rbd_dev
->header
.snapc
->snaps
[which
]);
2748 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
2749 "rbd", "get_snapshot_name",
2750 (char *) &snap_id
, sizeof (snap_id
),
2752 CEPH_OSD_FLAG_READ
, NULL
);
2753 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
2758 end
= (char *) reply_buf
+ size
;
2759 snap_name
= ceph_extract_encoded_string(&p
, end
, NULL
, GFP_KERNEL
);
2760 if (IS_ERR(snap_name
)) {
2761 ret
= PTR_ERR(snap_name
);
2764 dout(" snap_id 0x%016llx snap_name = %s\n",
2765 (unsigned long long) le64_to_cpu(snap_id
), snap_name
);
2773 return ERR_PTR(ret
);
2776 static char *rbd_dev_v2_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
2777 u64
*snap_size
, u64
*snap_features
)
2783 snap_id
= rbd_dev
->header
.snapc
->snaps
[which
];
2784 ret
= _rbd_dev_v2_snap_size(rbd_dev
, snap_id
, &order
, snap_size
);
2786 return ERR_PTR(ret
);
2787 ret
= _rbd_dev_v2_snap_features(rbd_dev
, snap_id
, snap_features
);
2789 return ERR_PTR(ret
);
2791 return rbd_dev_v2_snap_name(rbd_dev
, which
);
2794 static char *rbd_dev_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
2795 u64
*snap_size
, u64
*snap_features
)
2797 if (rbd_dev
->image_format
== 1)
2798 return rbd_dev_v1_snap_info(rbd_dev
, which
,
2799 snap_size
, snap_features
);
2800 if (rbd_dev
->image_format
== 2)
2801 return rbd_dev_v2_snap_info(rbd_dev
, which
,
2802 snap_size
, snap_features
);
2803 return ERR_PTR(-EINVAL
);
2806 static int rbd_dev_v2_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
2811 down_write(&rbd_dev
->header_rwsem
);
2813 /* Grab old order first, to see if it changes */
2815 obj_order
= rbd_dev
->header
.obj_order
,
2816 ret
= rbd_dev_v2_image_size(rbd_dev
);
2819 if (rbd_dev
->header
.obj_order
!= obj_order
) {
2823 rbd_update_mapping_size(rbd_dev
);
2825 ret
= rbd_dev_v2_snap_context(rbd_dev
, hver
);
2826 dout("rbd_dev_v2_snap_context returned %d\n", ret
);
2829 ret
= rbd_dev_snaps_update(rbd_dev
);
2830 dout("rbd_dev_snaps_update returned %d\n", ret
);
2833 ret
= rbd_dev_snaps_register(rbd_dev
);
2834 dout("rbd_dev_snaps_register returned %d\n", ret
);
2836 up_write(&rbd_dev
->header_rwsem
);
2842 * Scan the rbd device's current snapshot list and compare it to the
2843 * newly-received snapshot context. Remove any existing snapshots
2844 * not present in the new snapshot context. Add a new snapshot for
2845 * any snaphots in the snapshot context not in the current list.
2846 * And verify there are no changes to snapshots we already know
2849 * Assumes the snapshots in the snapshot context are sorted by
2850 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2851 * are also maintained in that order.)
2853 static int rbd_dev_snaps_update(struct rbd_device
*rbd_dev
)
2855 struct ceph_snap_context
*snapc
= rbd_dev
->header
.snapc
;
2856 const u32 snap_count
= snapc
->num_snaps
;
2857 struct list_head
*head
= &rbd_dev
->snaps
;
2858 struct list_head
*links
= head
->next
;
2861 dout("%s: snap count is %u\n", __func__
, (unsigned int) snap_count
);
2862 while (index
< snap_count
|| links
!= head
) {
2864 struct rbd_snap
*snap
;
2867 u64 snap_features
= 0;
2869 snap_id
= index
< snap_count
? snapc
->snaps
[index
]
2871 snap
= links
!= head
? list_entry(links
, struct rbd_snap
, node
)
2873 rbd_assert(!snap
|| snap
->id
!= CEPH_NOSNAP
);
2875 if (snap_id
== CEPH_NOSNAP
|| (snap
&& snap
->id
> snap_id
)) {
2876 struct list_head
*next
= links
->next
;
2878 /* Existing snapshot not in the new snap context */
2880 if (rbd_dev
->spec
->snap_id
== snap
->id
)
2881 rbd_dev
->exists
= false;
2882 rbd_remove_snap_dev(snap
);
2883 dout("%ssnap id %llu has been removed\n",
2884 rbd_dev
->spec
->snap_id
== snap
->id
?
2886 (unsigned long long) snap
->id
);
2888 /* Done with this list entry; advance */
2894 snap_name
= rbd_dev_snap_info(rbd_dev
, index
,
2895 &snap_size
, &snap_features
);
2896 if (IS_ERR(snap_name
))
2897 return PTR_ERR(snap_name
);
2899 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count
,
2900 (unsigned long long) snap_id
);
2901 if (!snap
|| (snap_id
!= CEPH_NOSNAP
&& snap
->id
< snap_id
)) {
2902 struct rbd_snap
*new_snap
;
2904 /* We haven't seen this snapshot before */
2906 new_snap
= __rbd_add_snap_dev(rbd_dev
, snap_name
,
2907 snap_id
, snap_size
, snap_features
);
2908 if (IS_ERR(new_snap
)) {
2909 int err
= PTR_ERR(new_snap
);
2911 dout(" failed to add dev, error %d\n", err
);
2916 /* New goes before existing, or at end of list */
2918 dout(" added dev%s\n", snap
? "" : " at end\n");
2920 list_add_tail(&new_snap
->node
, &snap
->node
);
2922 list_add_tail(&new_snap
->node
, head
);
2924 /* Already have this one */
2926 dout(" already present\n");
2928 rbd_assert(snap
->size
== snap_size
);
2929 rbd_assert(!strcmp(snap
->name
, snap_name
));
2930 rbd_assert(snap
->features
== snap_features
);
2932 /* Done with this list entry; advance */
2934 links
= links
->next
;
2937 /* Advance to the next entry in the snapshot context */
2941 dout("%s: done\n", __func__
);
2947 * Scan the list of snapshots and register the devices for any that
2948 * have not already been registered.
2950 static int rbd_dev_snaps_register(struct rbd_device
*rbd_dev
)
2952 struct rbd_snap
*snap
;
2955 dout("%s called\n", __func__
);
2956 if (WARN_ON(!device_is_registered(&rbd_dev
->dev
)))
2959 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
2960 if (!rbd_snap_registered(snap
)) {
2961 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
2966 dout("%s: returning %d\n", __func__
, ret
);
2971 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
2976 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2978 dev
= &rbd_dev
->dev
;
2979 dev
->bus
= &rbd_bus_type
;
2980 dev
->type
= &rbd_device_type
;
2981 dev
->parent
= &rbd_root_dev
;
2982 dev
->release
= rbd_dev_release
;
2983 dev_set_name(dev
, "%d", rbd_dev
->dev_id
);
2984 ret
= device_register(dev
);
2986 mutex_unlock(&ctl_mutex
);
2991 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
2993 device_unregister(&rbd_dev
->dev
);
2996 static int rbd_init_watch_dev(struct rbd_device
*rbd_dev
)
3001 ret
= rbd_req_sync_watch(rbd_dev
);
3002 if (ret
== -ERANGE
) {
3003 rc
= rbd_dev_refresh(rbd_dev
, NULL
);
3007 } while (ret
== -ERANGE
);
3012 static atomic64_t rbd_dev_id_max
= ATOMIC64_INIT(0);
3015 * Get a unique rbd identifier for the given new rbd_dev, and add
3016 * the rbd_dev to the global list. The minimum rbd id is 1.
3018 static void rbd_dev_id_get(struct rbd_device
*rbd_dev
)
3020 rbd_dev
->dev_id
= atomic64_inc_return(&rbd_dev_id_max
);
3022 spin_lock(&rbd_dev_list_lock
);
3023 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
3024 spin_unlock(&rbd_dev_list_lock
);
3025 dout("rbd_dev %p given dev id %llu\n", rbd_dev
,
3026 (unsigned long long) rbd_dev
->dev_id
);
3030 * Remove an rbd_dev from the global list, and record that its
3031 * identifier is no longer in use.
3033 static void rbd_dev_id_put(struct rbd_device
*rbd_dev
)
3035 struct list_head
*tmp
;
3036 int rbd_id
= rbd_dev
->dev_id
;
3039 rbd_assert(rbd_id
> 0);
3041 dout("rbd_dev %p released dev id %llu\n", rbd_dev
,
3042 (unsigned long long) rbd_dev
->dev_id
);
3043 spin_lock(&rbd_dev_list_lock
);
3044 list_del_init(&rbd_dev
->node
);
3047 * If the id being "put" is not the current maximum, there
3048 * is nothing special we need to do.
3050 if (rbd_id
!= atomic64_read(&rbd_dev_id_max
)) {
3051 spin_unlock(&rbd_dev_list_lock
);
3056 * We need to update the current maximum id. Search the
3057 * list to find out what it is. We're more likely to find
3058 * the maximum at the end, so search the list backward.
3061 list_for_each_prev(tmp
, &rbd_dev_list
) {
3062 struct rbd_device
*rbd_dev
;
3064 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
3065 if (rbd_dev
->dev_id
> max_id
)
3066 max_id
= rbd_dev
->dev_id
;
3068 spin_unlock(&rbd_dev_list_lock
);
3071 * The max id could have been updated by rbd_dev_id_get(), in
3072 * which case it now accurately reflects the new maximum.
3073 * Be careful not to overwrite the maximum value in that
3076 atomic64_cmpxchg(&rbd_dev_id_max
, rbd_id
, max_id
);
3077 dout(" max dev id has been reset\n");
3081 * Skips over white space at *buf, and updates *buf to point to the
3082 * first found non-space character (if any). Returns the length of
3083 * the token (string of non-white space characters) found. Note
3084 * that *buf must be terminated with '\0'.
3086 static inline size_t next_token(const char **buf
)
3089 * These are the characters that produce nonzero for
3090 * isspace() in the "C" and "POSIX" locales.
3092 const char *spaces
= " \f\n\r\t\v";
3094 *buf
+= strspn(*buf
, spaces
); /* Find start of token */
3096 return strcspn(*buf
, spaces
); /* Return token length */
3100 * Finds the next token in *buf, and if the provided token buffer is
3101 * big enough, copies the found token into it. The result, if
3102 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3103 * must be terminated with '\0' on entry.
3105 * Returns the length of the token found (not including the '\0').
3106 * Return value will be 0 if no token is found, and it will be >=
3107 * token_size if the token would not fit.
3109 * The *buf pointer will be updated to point beyond the end of the
3110 * found token. Note that this occurs even if the token buffer is
3111 * too small to hold it.
3113 static inline size_t copy_token(const char **buf
,
3119 len
= next_token(buf
);
3120 if (len
< token_size
) {
3121 memcpy(token
, *buf
, len
);
3122 *(token
+ len
) = '\0';
3130 * Finds the next token in *buf, dynamically allocates a buffer big
3131 * enough to hold a copy of it, and copies the token into the new
3132 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3133 * that a duplicate buffer is created even for a zero-length token.
3135 * Returns a pointer to the newly-allocated duplicate, or a null
3136 * pointer if memory for the duplicate was not available. If
3137 * the lenp argument is a non-null pointer, the length of the token
3138 * (not including the '\0') is returned in *lenp.
3140 * If successful, the *buf pointer will be updated to point beyond
3141 * the end of the found token.
3143 * Note: uses GFP_KERNEL for allocation.
3145 static inline char *dup_token(const char **buf
, size_t *lenp
)
3150 len
= next_token(buf
);
3151 dup
= kmalloc(len
+ 1, GFP_KERNEL
);
3155 memcpy(dup
, *buf
, len
);
3156 *(dup
+ len
) = '\0';
3166 * Parse the options provided for an "rbd add" (i.e., rbd image
3167 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3168 * and the data written is passed here via a NUL-terminated buffer.
3169 * Returns 0 if successful or an error code otherwise.
3171 * The information extracted from these options is recorded in
3172 * the other parameters which return dynamically-allocated
3175 * The address of a pointer that will refer to a ceph options
3176 * structure. Caller must release the returned pointer using
3177 * ceph_destroy_options() when it is no longer needed.
3179 * Address of an rbd options pointer. Fully initialized by
3180 * this function; caller must release with kfree().
3182 * Address of an rbd image specification pointer. Fully
3183 * initialized by this function based on parsed options.
3184 * Caller must release with rbd_spec_put().
3186 * The options passed take this form:
3187 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3190 * A comma-separated list of one or more monitor addresses.
3191 * A monitor address is an ip address, optionally followed
3192 * by a port number (separated by a colon).
3193 * I.e.: ip1[:port1][,ip2[:port2]...]
3195 * A comma-separated list of ceph and/or rbd options.
3197 * The name of the rados pool containing the rbd image.
3199 * The name of the image in that pool to map.
3201 * An optional snapshot id. If provided, the mapping will
3202 * present data from the image at the time that snapshot was
3203 * created. The image head is used if no snapshot id is
3204 * provided. Snapshot mappings are always read-only.
3206 static int rbd_add_parse_args(const char *buf
,
3207 struct ceph_options
**ceph_opts
,
3208 struct rbd_options
**opts
,
3209 struct rbd_spec
**rbd_spec
)
3213 const char *mon_addrs
;
3214 size_t mon_addrs_size
;
3215 struct rbd_spec
*spec
= NULL
;
3216 struct rbd_options
*rbd_opts
= NULL
;
3217 struct ceph_options
*copts
;
3220 /* The first four tokens are required */
3222 len
= next_token(&buf
);
3224 return -EINVAL
; /* Missing monitor address(es) */
3226 mon_addrs_size
= len
+ 1;
3230 options
= dup_token(&buf
, NULL
);
3234 goto out_err
; /* Missing options */
3236 spec
= rbd_spec_alloc();
3240 spec
->pool_name
= dup_token(&buf
, NULL
);
3241 if (!spec
->pool_name
)
3243 if (!*spec
->pool_name
)
3244 goto out_err
; /* Missing pool name */
3246 spec
->image_name
= dup_token(&buf
, &spec
->image_name_len
);
3247 if (!spec
->image_name
)
3249 if (!*spec
->image_name
)
3250 goto out_err
; /* Missing image name */
3253 * Snapshot name is optional; default is to use "-"
3254 * (indicating the head/no snapshot).
3256 len
= next_token(&buf
);
3258 buf
= RBD_SNAP_HEAD_NAME
; /* No snapshot supplied */
3259 len
= sizeof (RBD_SNAP_HEAD_NAME
) - 1;
3260 } else if (len
> RBD_MAX_SNAP_NAME_LEN
) {
3261 ret
= -ENAMETOOLONG
;
3264 spec
->snap_name
= kmalloc(len
+ 1, GFP_KERNEL
);
3265 if (!spec
->snap_name
)
3267 memcpy(spec
->snap_name
, buf
, len
);
3268 *(spec
->snap_name
+ len
) = '\0';
3270 /* Initialize all rbd options to the defaults */
3272 rbd_opts
= kzalloc(sizeof (*rbd_opts
), GFP_KERNEL
);
3276 rbd_opts
->read_only
= RBD_READ_ONLY_DEFAULT
;
3278 copts
= ceph_parse_options(options
, mon_addrs
,
3279 mon_addrs
+ mon_addrs_size
- 1,
3280 parse_rbd_opts_token
, rbd_opts
);
3281 if (IS_ERR(copts
)) {
3282 ret
= PTR_ERR(copts
);
3303 * An rbd format 2 image has a unique identifier, distinct from the
3304 * name given to it by the user. Internally, that identifier is
3305 * what's used to specify the names of objects related to the image.
3307 * A special "rbd id" object is used to map an rbd image name to its
3308 * id. If that object doesn't exist, then there is no v2 rbd image
3309 * with the supplied name.
3311 * This function will record the given rbd_dev's image_id field if
3312 * it can be determined, and in that case will return 0. If any
3313 * errors occur a negative errno will be returned and the rbd_dev's
3314 * image_id field will be unchanged (and should be NULL).
3316 static int rbd_dev_image_id(struct rbd_device
*rbd_dev
)
3325 * When probing a parent image, the image id is already
3326 * known (and the image name likely is not). There's no
3327 * need to fetch the image id again in this case.
3329 if (rbd_dev
->spec
->image_id
)
3333 * First, see if the format 2 image id file exists, and if
3334 * so, get the image's persistent id from it.
3336 size
= sizeof (RBD_ID_PREFIX
) + rbd_dev
->spec
->image_name_len
;
3337 object_name
= kmalloc(size
, GFP_NOIO
);
3340 sprintf(object_name
, "%s%s", RBD_ID_PREFIX
, rbd_dev
->spec
->image_name
);
3341 dout("rbd id object name is %s\n", object_name
);
3343 /* Response will be an encoded string, which includes a length */
3345 size
= sizeof (__le32
) + RBD_IMAGE_ID_LEN_MAX
;
3346 response
= kzalloc(size
, GFP_NOIO
);
3352 ret
= rbd_req_sync_exec(rbd_dev
, object_name
,
3355 response
, RBD_IMAGE_ID_LEN_MAX
,
3356 CEPH_OSD_FLAG_READ
, NULL
);
3357 dout("%s: rbd_req_sync_exec returned %d\n", __func__
, ret
);
3360 ret
= 0; /* rbd_req_sync_exec() can return positive */
3363 rbd_dev
->spec
->image_id
= ceph_extract_encoded_string(&p
,
3364 p
+ RBD_IMAGE_ID_LEN_MAX
,
3365 &rbd_dev
->spec
->image_id_len
,
3367 if (IS_ERR(rbd_dev
->spec
->image_id
)) {
3368 ret
= PTR_ERR(rbd_dev
->spec
->image_id
);
3369 rbd_dev
->spec
->image_id
= NULL
;
3371 dout("image_id is %s\n", rbd_dev
->spec
->image_id
);
3380 static int rbd_dev_v1_probe(struct rbd_device
*rbd_dev
)
3385 /* Version 1 images have no id; empty string is used */
3387 rbd_dev
->spec
->image_id
= kstrdup("", GFP_KERNEL
);
3388 if (!rbd_dev
->spec
->image_id
)
3390 rbd_dev
->spec
->image_id_len
= 0;
3392 /* Record the header object name for this rbd image. */
3394 size
= rbd_dev
->spec
->image_name_len
+ sizeof (RBD_SUFFIX
);
3395 rbd_dev
->header_name
= kmalloc(size
, GFP_KERNEL
);
3396 if (!rbd_dev
->header_name
) {
3400 sprintf(rbd_dev
->header_name
, "%s%s",
3401 rbd_dev
->spec
->image_name
, RBD_SUFFIX
);
3403 /* Populate rbd image metadata */
3405 ret
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
3409 /* Version 1 images have no parent (no layering) */
3411 rbd_dev
->parent_spec
= NULL
;
3412 rbd_dev
->parent_overlap
= 0;
3414 rbd_dev
->image_format
= 1;
3416 dout("discovered version 1 image, header name is %s\n",
3417 rbd_dev
->header_name
);
3422 kfree(rbd_dev
->header_name
);
3423 rbd_dev
->header_name
= NULL
;
3424 kfree(rbd_dev
->spec
->image_id
);
3425 rbd_dev
->spec
->image_id
= NULL
;
3430 static int rbd_dev_v2_probe(struct rbd_device
*rbd_dev
)
3437 * Image id was filled in by the caller. Record the header
3438 * object name for this rbd image.
3440 size
= sizeof (RBD_HEADER_PREFIX
) + rbd_dev
->spec
->image_id_len
;
3441 rbd_dev
->header_name
= kmalloc(size
, GFP_KERNEL
);
3442 if (!rbd_dev
->header_name
)
3444 sprintf(rbd_dev
->header_name
, "%s%s",
3445 RBD_HEADER_PREFIX
, rbd_dev
->spec
->image_id
);
3447 /* Get the size and object order for the image */
3449 ret
= rbd_dev_v2_image_size(rbd_dev
);
3453 /* Get the object prefix (a.k.a. block_name) for the image */
3455 ret
= rbd_dev_v2_object_prefix(rbd_dev
);
3459 /* Get the and check features for the image */
3461 ret
= rbd_dev_v2_features(rbd_dev
);
3465 /* If the image supports layering, get the parent info */
3467 if (rbd_dev
->header
.features
& RBD_FEATURE_LAYERING
) {
3468 ret
= rbd_dev_v2_parent_info(rbd_dev
);
3473 /* crypto and compression type aren't (yet) supported for v2 images */
3475 rbd_dev
->header
.crypt_type
= 0;
3476 rbd_dev
->header
.comp_type
= 0;
3478 /* Get the snapshot context, plus the header version */
3480 ret
= rbd_dev_v2_snap_context(rbd_dev
, &ver
);
3483 rbd_dev
->header
.obj_version
= ver
;
3485 rbd_dev
->image_format
= 2;
3487 dout("discovered version 2 image, header name is %s\n",
3488 rbd_dev
->header_name
);
3492 rbd_dev
->parent_overlap
= 0;
3493 rbd_spec_put(rbd_dev
->parent_spec
);
3494 rbd_dev
->parent_spec
= NULL
;
3495 kfree(rbd_dev
->header_name
);
3496 rbd_dev
->header_name
= NULL
;
3497 kfree(rbd_dev
->header
.object_prefix
);
3498 rbd_dev
->header
.object_prefix
= NULL
;
3503 static int rbd_dev_probe_finish(struct rbd_device
*rbd_dev
)
3507 /* no need to lock here, as rbd_dev is not registered yet */
3508 ret
= rbd_dev_snaps_update(rbd_dev
);
3512 ret
= rbd_dev_probe_update_spec(rbd_dev
);
3516 ret
= rbd_dev_set_mapping(rbd_dev
);
3520 /* generate unique id: find highest unique id, add one */
3521 rbd_dev_id_get(rbd_dev
);
3523 /* Fill in the device name, now that we have its id. */
3524 BUILD_BUG_ON(DEV_NAME_LEN
3525 < sizeof (RBD_DRV_NAME
) + MAX_INT_FORMAT_WIDTH
);
3526 sprintf(rbd_dev
->name
, "%s%d", RBD_DRV_NAME
, rbd_dev
->dev_id
);
3528 /* Get our block major device number. */
3530 ret
= register_blkdev(0, rbd_dev
->name
);
3533 rbd_dev
->major
= ret
;
3535 /* Set up the blkdev mapping. */
3537 ret
= rbd_init_disk(rbd_dev
);
3539 goto err_out_blkdev
;
3541 ret
= rbd_bus_add_dev(rbd_dev
);
3546 * At this point cleanup in the event of an error is the job
3547 * of the sysfs code (initiated by rbd_bus_del_dev()).
3549 down_write(&rbd_dev
->header_rwsem
);
3550 ret
= rbd_dev_snaps_register(rbd_dev
);
3551 up_write(&rbd_dev
->header_rwsem
);
3555 ret
= rbd_init_watch_dev(rbd_dev
);
3559 /* Everything's ready. Announce the disk to the world. */
3561 add_disk(rbd_dev
->disk
);
3563 pr_info("%s: added with size 0x%llx\n", rbd_dev
->disk
->disk_name
,
3564 (unsigned long long) rbd_dev
->mapping
.size
);
3568 /* this will also clean up rest of rbd_dev stuff */
3570 rbd_bus_del_dev(rbd_dev
);
3574 rbd_free_disk(rbd_dev
);
3576 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
3578 rbd_dev_id_put(rbd_dev
);
3580 rbd_remove_all_snaps(rbd_dev
);
3586 * Probe for the existence of the header object for the given rbd
3587 * device. For format 2 images this includes determining the image
3590 static int rbd_dev_probe(struct rbd_device
*rbd_dev
)
3595 * Get the id from the image id object. If it's not a
3596 * format 2 image, we'll get ENOENT back, and we'll assume
3597 * it's a format 1 image.
3599 ret
= rbd_dev_image_id(rbd_dev
);
3601 ret
= rbd_dev_v1_probe(rbd_dev
);
3603 ret
= rbd_dev_v2_probe(rbd_dev
);
3605 dout("probe failed, returning %d\n", ret
);
3610 ret
= rbd_dev_probe_finish(rbd_dev
);
3612 rbd_header_free(&rbd_dev
->header
);
3617 static ssize_t
rbd_add(struct bus_type
*bus
,
3621 struct rbd_device
*rbd_dev
= NULL
;
3622 struct ceph_options
*ceph_opts
= NULL
;
3623 struct rbd_options
*rbd_opts
= NULL
;
3624 struct rbd_spec
*spec
= NULL
;
3625 struct rbd_client
*rbdc
;
3626 struct ceph_osd_client
*osdc
;
3629 if (!try_module_get(THIS_MODULE
))
3632 /* parse add command */
3633 rc
= rbd_add_parse_args(buf
, &ceph_opts
, &rbd_opts
, &spec
);
3635 goto err_out_module
;
3637 rbdc
= rbd_get_client(ceph_opts
);
3642 ceph_opts
= NULL
; /* rbd_dev client now owns this */
3645 osdc
= &rbdc
->client
->osdc
;
3646 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, spec
->pool_name
);
3648 goto err_out_client
;
3649 spec
->pool_id
= (u64
) rc
;
3651 rbd_dev
= rbd_dev_create(rbdc
, spec
);
3653 goto err_out_client
;
3654 rbdc
= NULL
; /* rbd_dev now owns this */
3655 spec
= NULL
; /* rbd_dev now owns this */
3657 rbd_dev
->mapping
.read_only
= rbd_opts
->read_only
;
3659 rbd_opts
= NULL
; /* done with this */
3661 rc
= rbd_dev_probe(rbd_dev
);
3663 goto err_out_rbd_dev
;
3667 rbd_dev_destroy(rbd_dev
);
3669 rbd_put_client(rbdc
);
3672 ceph_destroy_options(ceph_opts
);
3676 module_put(THIS_MODULE
);
3678 dout("Error adding device %s\n", buf
);
3680 return (ssize_t
) rc
;
3683 static struct rbd_device
*__rbd_get_dev(unsigned long dev_id
)
3685 struct list_head
*tmp
;
3686 struct rbd_device
*rbd_dev
;
3688 spin_lock(&rbd_dev_list_lock
);
3689 list_for_each(tmp
, &rbd_dev_list
) {
3690 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
3691 if (rbd_dev
->dev_id
== dev_id
) {
3692 spin_unlock(&rbd_dev_list_lock
);
3696 spin_unlock(&rbd_dev_list_lock
);
3700 static void rbd_dev_release(struct device
*dev
)
3702 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
3704 if (rbd_dev
->watch_request
) {
3705 struct ceph_client
*client
= rbd_dev
->rbd_client
->client
;
3707 ceph_osdc_unregister_linger_request(&client
->osdc
,
3708 rbd_dev
->watch_request
);
3710 if (rbd_dev
->watch_event
)
3711 rbd_req_sync_unwatch(rbd_dev
);
3714 /* clean up and free blkdev */
3715 rbd_free_disk(rbd_dev
);
3716 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
3718 /* release allocated disk header fields */
3719 rbd_header_free(&rbd_dev
->header
);
3721 /* done with the id, and with the rbd_dev */
3722 rbd_dev_id_put(rbd_dev
);
3723 rbd_assert(rbd_dev
->rbd_client
!= NULL
);
3724 rbd_dev_destroy(rbd_dev
);
3726 /* release module ref */
3727 module_put(THIS_MODULE
);
3730 static ssize_t
rbd_remove(struct bus_type
*bus
,
3734 struct rbd_device
*rbd_dev
= NULL
;
3739 rc
= strict_strtoul(buf
, 10, &ul
);
3743 /* convert to int; abort if we lost anything in the conversion */
3744 target_id
= (int) ul
;
3745 if (target_id
!= ul
)
3748 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
3750 rbd_dev
= __rbd_get_dev(target_id
);
3756 if (rbd_dev
->open_count
) {
3761 rbd_remove_all_snaps(rbd_dev
);
3762 rbd_bus_del_dev(rbd_dev
);
3765 mutex_unlock(&ctl_mutex
);
3771 * create control files in sysfs
3774 static int rbd_sysfs_init(void)
3778 ret
= device_register(&rbd_root_dev
);
3782 ret
= bus_register(&rbd_bus_type
);
3784 device_unregister(&rbd_root_dev
);
3789 static void rbd_sysfs_cleanup(void)
3791 bus_unregister(&rbd_bus_type
);
3792 device_unregister(&rbd_root_dev
);
3795 int __init
rbd_init(void)
3799 rc
= rbd_sysfs_init();
3802 pr_info("loaded " RBD_DRV_NAME_LONG
"\n");
3806 void __exit
rbd_exit(void)
3808 rbd_sysfs_cleanup();
3811 module_init(rbd_init
);
3812 module_exit(rbd_exit
);
3814 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3815 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3816 MODULE_DESCRIPTION("rados block device");
3818 /* following authorship retained from original osdblk.c */
3819 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3821 MODULE_LICENSE("GPL");