2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have these defined elsewhere */
57 #define U8_MAX ((u8) (~0U))
58 #define U16_MAX ((u16) (~0U))
59 #define U32_MAX ((u32) (~0U))
60 #define U64_MAX ((u64) (~0ULL))
62 #define RBD_DRV_NAME "rbd"
63 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
65 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
67 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
68 #define RBD_MAX_SNAP_NAME_LEN \
69 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
71 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
73 #define RBD_SNAP_HEAD_NAME "-"
75 /* This allows a single page to hold an image name sent by OSD */
76 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
77 #define RBD_IMAGE_ID_LEN_MAX 64
79 #define RBD_OBJ_PREFIX_LEN_MAX 64
83 #define RBD_FEATURE_LAYERING 1
85 /* Features supported by this (client software) implementation. */
87 #define RBD_FEATURES_ALL (0)
90 * An RBD device name will be "rbd#", where the "rbd" comes from
91 * RBD_DRV_NAME above, and # is a unique integer identifier.
92 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
93 * enough to hold all possible device names.
95 #define DEV_NAME_LEN 32
96 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
99 * block device image metadata (in-memory version)
101 struct rbd_image_header
{
102 /* These four fields never change for a given rbd image */
109 /* The remaining fields need to be updated occasionally */
111 struct ceph_snap_context
*snapc
;
119 * An rbd image specification.
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image. Each rbd_dev structure includes a pointer to
123 * an rbd_spec structure that encapsulates this identity.
125 * Each of the id's in an rbd_spec has an associated name. For a
126 * user-mapped image, the names are supplied and the id's associated
127 * with them are looked up. For a layered image, a parent image is
128 * defined by the tuple, and the names are looked up.
130 * An rbd_dev structure contains a parent_spec pointer which is
131 * non-null if the image it represents is a child in a layered
132 * image. This pointer will refer to the rbd_spec structure used
133 * by the parent rbd_dev for its own identity (i.e., the structure
134 * is shared between the parent and child).
136 * Since these structures are populated once, during the discovery
137 * phase of image construction, they are effectively immutable so
138 * we make no effort to synchronize access to them.
140 * Note that code herein does not assume the image name is known (it
141 * could be a null pointer).
157 * an instance of the client. multiple devices may share an rbd client.
160 struct ceph_client
*client
;
162 struct list_head node
;
165 struct rbd_img_request
;
166 typedef void (*rbd_img_callback_t
)(struct rbd_img_request
*);
168 #define BAD_WHICH U32_MAX /* Good which or bad which, which? */
170 struct rbd_obj_request
;
171 typedef void (*rbd_obj_callback_t
)(struct rbd_obj_request
*);
173 enum obj_request_type
{
174 OBJ_REQUEST_NODATA
, OBJ_REQUEST_BIO
, OBJ_REQUEST_PAGES
177 struct rbd_obj_request
{
178 const char *object_name
;
179 u64 offset
; /* object start byte */
180 u64 length
; /* bytes from offset */
182 struct rbd_img_request
*img_request
;
183 struct list_head links
; /* img_request->obj_requests */
184 u32 which
; /* posn image request list */
186 enum obj_request_type type
;
188 struct bio
*bio_list
;
195 struct ceph_osd_request
*osd_req
;
197 u64 xferred
; /* bytes transferred */
202 rbd_obj_callback_t callback
;
203 struct completion completion
;
208 struct rbd_img_request
{
210 struct rbd_device
*rbd_dev
;
211 u64 offset
; /* starting image byte offset */
212 u64 length
; /* byte count from offset */
213 bool write_request
; /* false for read */
215 struct ceph_snap_context
*snapc
; /* for writes */
216 u64 snap_id
; /* for reads */
218 spinlock_t completion_lock
;/* protects next_completion */
220 rbd_img_callback_t callback
;
222 u32 obj_request_count
;
223 struct list_head obj_requests
; /* rbd_obj_request structs */
228 #define for_each_obj_request(ireq, oreq) \
229 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
230 #define for_each_obj_request_from(ireq, oreq) \
231 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
232 #define for_each_obj_request_safe(ireq, oreq, n) \
233 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
239 struct list_head node
;
254 int dev_id
; /* blkdev unique id */
256 int major
; /* blkdev assigned major */
257 struct gendisk
*disk
; /* blkdev's gendisk and rq */
259 u32 image_format
; /* Either 1 or 2 */
260 struct rbd_client
*rbd_client
;
262 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
264 spinlock_t lock
; /* queue, flags, open_count */
266 struct rbd_image_header header
;
267 unsigned long flags
; /* possibly lock protected */
268 struct rbd_spec
*spec
;
272 struct ceph_file_layout layout
;
274 struct ceph_osd_event
*watch_event
;
275 struct rbd_obj_request
*watch_request
;
277 struct rbd_spec
*parent_spec
;
280 /* protects updating the header */
281 struct rw_semaphore header_rwsem
;
283 struct rbd_mapping mapping
;
285 struct list_head node
;
287 /* list of snapshots */
288 struct list_head snaps
;
292 unsigned long open_count
; /* protected by lock */
296 * Flag bits for rbd_dev->flags. If atomicity is required,
297 * rbd_dev->lock is used to protect access.
299 * Currently, only the "removing" flag (which is coupled with the
300 * "open_count" field) requires atomic access.
303 RBD_DEV_FLAG_EXISTS
, /* mapped snapshot has not been deleted */
304 RBD_DEV_FLAG_REMOVING
, /* this mapping is being removed */
307 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
309 static LIST_HEAD(rbd_dev_list
); /* devices */
310 static DEFINE_SPINLOCK(rbd_dev_list_lock
);
312 static LIST_HEAD(rbd_client_list
); /* clients */
313 static DEFINE_SPINLOCK(rbd_client_list_lock
);
315 static int rbd_dev_snaps_update(struct rbd_device
*rbd_dev
);
316 static int rbd_dev_snaps_register(struct rbd_device
*rbd_dev
);
318 static void rbd_dev_release(struct device
*dev
);
319 static void rbd_remove_snap_dev(struct rbd_snap
*snap
);
321 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
,
323 static ssize_t
rbd_remove(struct bus_type
*bus
, const char *buf
,
326 static struct bus_attribute rbd_bus_attrs
[] = {
327 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
328 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
332 static struct bus_type rbd_bus_type
= {
334 .bus_attrs
= rbd_bus_attrs
,
337 static void rbd_root_dev_release(struct device
*dev
)
341 static struct device rbd_root_dev
= {
343 .release
= rbd_root_dev_release
,
346 static __printf(2, 3)
347 void rbd_warn(struct rbd_device
*rbd_dev
, const char *fmt
, ...)
349 struct va_format vaf
;
357 printk(KERN_WARNING
"%s: %pV\n", RBD_DRV_NAME
, &vaf
);
358 else if (rbd_dev
->disk
)
359 printk(KERN_WARNING
"%s: %s: %pV\n",
360 RBD_DRV_NAME
, rbd_dev
->disk
->disk_name
, &vaf
);
361 else if (rbd_dev
->spec
&& rbd_dev
->spec
->image_name
)
362 printk(KERN_WARNING
"%s: image %s: %pV\n",
363 RBD_DRV_NAME
, rbd_dev
->spec
->image_name
, &vaf
);
364 else if (rbd_dev
->spec
&& rbd_dev
->spec
->image_id
)
365 printk(KERN_WARNING
"%s: id %s: %pV\n",
366 RBD_DRV_NAME
, rbd_dev
->spec
->image_id
, &vaf
);
368 printk(KERN_WARNING
"%s: rbd_dev %p: %pV\n",
369 RBD_DRV_NAME
, rbd_dev
, &vaf
);
374 #define rbd_assert(expr) \
375 if (unlikely(!(expr))) { \
376 printk(KERN_ERR "\nAssertion failure in %s() " \
378 "\trbd_assert(%s);\n\n", \
379 __func__, __LINE__, #expr); \
382 #else /* !RBD_DEBUG */
383 # define rbd_assert(expr) ((void) 0)
384 #endif /* !RBD_DEBUG */
386 static int rbd_dev_refresh(struct rbd_device
*rbd_dev
, u64
*hver
);
387 static int rbd_dev_v2_refresh(struct rbd_device
*rbd_dev
, u64
*hver
);
389 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
391 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
392 bool removing
= false;
394 if ((mode
& FMODE_WRITE
) && rbd_dev
->mapping
.read_only
)
397 spin_lock_irq(&rbd_dev
->lock
);
398 if (test_bit(RBD_DEV_FLAG_REMOVING
, &rbd_dev
->flags
))
401 rbd_dev
->open_count
++;
402 spin_unlock_irq(&rbd_dev
->lock
);
406 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
407 (void) get_device(&rbd_dev
->dev
);
408 set_device_ro(bdev
, rbd_dev
->mapping
.read_only
);
409 mutex_unlock(&ctl_mutex
);
414 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
416 struct rbd_device
*rbd_dev
= disk
->private_data
;
417 unsigned long open_count_before
;
419 spin_lock_irq(&rbd_dev
->lock
);
420 open_count_before
= rbd_dev
->open_count
--;
421 spin_unlock_irq(&rbd_dev
->lock
);
422 rbd_assert(open_count_before
> 0);
424 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
425 put_device(&rbd_dev
->dev
);
426 mutex_unlock(&ctl_mutex
);
431 static const struct block_device_operations rbd_bd_ops
= {
432 .owner
= THIS_MODULE
,
434 .release
= rbd_release
,
438 * Initialize an rbd client instance.
441 static struct rbd_client
*rbd_client_create(struct ceph_options
*ceph_opts
)
443 struct rbd_client
*rbdc
;
446 dout("%s:\n", __func__
);
447 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
451 kref_init(&rbdc
->kref
);
452 INIT_LIST_HEAD(&rbdc
->node
);
454 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
456 rbdc
->client
= ceph_create_client(ceph_opts
, rbdc
, 0, 0);
457 if (IS_ERR(rbdc
->client
))
459 ceph_opts
= NULL
; /* Now rbdc->client is responsible for ceph_opts */
461 ret
= ceph_open_session(rbdc
->client
);
465 spin_lock(&rbd_client_list_lock
);
466 list_add_tail(&rbdc
->node
, &rbd_client_list
);
467 spin_unlock(&rbd_client_list_lock
);
469 mutex_unlock(&ctl_mutex
);
470 dout("%s: rbdc %p\n", __func__
, rbdc
);
475 ceph_destroy_client(rbdc
->client
);
477 mutex_unlock(&ctl_mutex
);
481 ceph_destroy_options(ceph_opts
);
482 dout("%s: error %d\n", __func__
, ret
);
488 * Find a ceph client with specific addr and configuration. If
489 * found, bump its reference count.
491 static struct rbd_client
*rbd_client_find(struct ceph_options
*ceph_opts
)
493 struct rbd_client
*client_node
;
496 if (ceph_opts
->flags
& CEPH_OPT_NOSHARE
)
499 spin_lock(&rbd_client_list_lock
);
500 list_for_each_entry(client_node
, &rbd_client_list
, node
) {
501 if (!ceph_compare_options(ceph_opts
, client_node
->client
)) {
502 kref_get(&client_node
->kref
);
507 spin_unlock(&rbd_client_list_lock
);
509 return found
? client_node
: NULL
;
519 /* string args above */
522 /* Boolean args above */
526 static match_table_t rbd_opts_tokens
= {
528 /* string args above */
529 {Opt_read_only
, "read_only"},
530 {Opt_read_only
, "ro"}, /* Alternate spelling */
531 {Opt_read_write
, "read_write"},
532 {Opt_read_write
, "rw"}, /* Alternate spelling */
533 /* Boolean args above */
541 #define RBD_READ_ONLY_DEFAULT false
543 static int parse_rbd_opts_token(char *c
, void *private)
545 struct rbd_options
*rbd_opts
= private;
546 substring_t argstr
[MAX_OPT_ARGS
];
547 int token
, intval
, ret
;
549 token
= match_token(c
, rbd_opts_tokens
, argstr
);
553 if (token
< Opt_last_int
) {
554 ret
= match_int(&argstr
[0], &intval
);
556 pr_err("bad mount option arg (not int) "
560 dout("got int token %d val %d\n", token
, intval
);
561 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
562 dout("got string token %d val %s\n", token
,
564 } else if (token
> Opt_last_string
&& token
< Opt_last_bool
) {
565 dout("got Boolean token %d\n", token
);
567 dout("got token %d\n", token
);
572 rbd_opts
->read_only
= true;
575 rbd_opts
->read_only
= false;
585 * Get a ceph client with specific addr and configuration, if one does
586 * not exist create it.
588 static struct rbd_client
*rbd_get_client(struct ceph_options
*ceph_opts
)
590 struct rbd_client
*rbdc
;
592 rbdc
= rbd_client_find(ceph_opts
);
593 if (rbdc
) /* using an existing client */
594 ceph_destroy_options(ceph_opts
);
596 rbdc
= rbd_client_create(ceph_opts
);
602 * Destroy ceph client
604 * Caller must hold rbd_client_list_lock.
606 static void rbd_client_release(struct kref
*kref
)
608 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
610 dout("%s: rbdc %p\n", __func__
, rbdc
);
611 spin_lock(&rbd_client_list_lock
);
612 list_del(&rbdc
->node
);
613 spin_unlock(&rbd_client_list_lock
);
615 ceph_destroy_client(rbdc
->client
);
620 * Drop reference to ceph client node. If it's not referenced anymore, release
623 static void rbd_put_client(struct rbd_client
*rbdc
)
626 kref_put(&rbdc
->kref
, rbd_client_release
);
629 static bool rbd_image_format_valid(u32 image_format
)
631 return image_format
== 1 || image_format
== 2;
634 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk
*ondisk
)
639 /* The header has to start with the magic rbd header text */
640 if (memcmp(&ondisk
->text
, RBD_HEADER_TEXT
, sizeof (RBD_HEADER_TEXT
)))
643 /* The bio layer requires at least sector-sized I/O */
645 if (ondisk
->options
.order
< SECTOR_SHIFT
)
648 /* If we use u64 in a few spots we may be able to loosen this */
650 if (ondisk
->options
.order
> 8 * sizeof (int) - 1)
654 * The size of a snapshot header has to fit in a size_t, and
655 * that limits the number of snapshots.
657 snap_count
= le32_to_cpu(ondisk
->snap_count
);
658 size
= SIZE_MAX
- sizeof (struct ceph_snap_context
);
659 if (snap_count
> size
/ sizeof (__le64
))
663 * Not only that, but the size of the entire the snapshot
664 * header must also be representable in a size_t.
666 size
-= snap_count
* sizeof (__le64
);
667 if ((u64
) size
< le64_to_cpu(ondisk
->snap_names_len
))
674 * Create a new header structure, translate header format from the on-disk
677 static int rbd_header_from_disk(struct rbd_image_header
*header
,
678 struct rbd_image_header_ondisk
*ondisk
)
685 memset(header
, 0, sizeof (*header
));
687 snap_count
= le32_to_cpu(ondisk
->snap_count
);
689 len
= strnlen(ondisk
->object_prefix
, sizeof (ondisk
->object_prefix
));
690 header
->object_prefix
= kmalloc(len
+ 1, GFP_KERNEL
);
691 if (!header
->object_prefix
)
693 memcpy(header
->object_prefix
, ondisk
->object_prefix
, len
);
694 header
->object_prefix
[len
] = '\0';
697 u64 snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
699 /* Save a copy of the snapshot names */
701 if (snap_names_len
> (u64
) SIZE_MAX
)
703 header
->snap_names
= kmalloc(snap_names_len
, GFP_KERNEL
);
704 if (!header
->snap_names
)
707 * Note that rbd_dev_v1_header_read() guarantees
708 * the ondisk buffer we're working with has
709 * snap_names_len bytes beyond the end of the
710 * snapshot id array, this memcpy() is safe.
712 memcpy(header
->snap_names
, &ondisk
->snaps
[snap_count
],
715 /* Record each snapshot's size */
717 size
= snap_count
* sizeof (*header
->snap_sizes
);
718 header
->snap_sizes
= kmalloc(size
, GFP_KERNEL
);
719 if (!header
->snap_sizes
)
721 for (i
= 0; i
< snap_count
; i
++)
722 header
->snap_sizes
[i
] =
723 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
725 WARN_ON(ondisk
->snap_names_len
);
726 header
->snap_names
= NULL
;
727 header
->snap_sizes
= NULL
;
730 header
->features
= 0; /* No features support in v1 images */
731 header
->obj_order
= ondisk
->options
.order
;
732 header
->crypt_type
= ondisk
->options
.crypt_type
;
733 header
->comp_type
= ondisk
->options
.comp_type
;
735 /* Allocate and fill in the snapshot context */
737 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
738 size
= sizeof (struct ceph_snap_context
);
739 size
+= snap_count
* sizeof (header
->snapc
->snaps
[0]);
740 header
->snapc
= kzalloc(size
, GFP_KERNEL
);
744 atomic_set(&header
->snapc
->nref
, 1);
745 header
->snapc
->seq
= le64_to_cpu(ondisk
->snap_seq
);
746 header
->snapc
->num_snaps
= snap_count
;
747 for (i
= 0; i
< snap_count
; i
++)
748 header
->snapc
->snaps
[i
] =
749 le64_to_cpu(ondisk
->snaps
[i
].id
);
754 kfree(header
->snap_sizes
);
755 header
->snap_sizes
= NULL
;
756 kfree(header
->snap_names
);
757 header
->snap_names
= NULL
;
758 kfree(header
->object_prefix
);
759 header
->object_prefix
= NULL
;
764 static const char *rbd_snap_name(struct rbd_device
*rbd_dev
, u64 snap_id
)
766 struct rbd_snap
*snap
;
768 if (snap_id
== CEPH_NOSNAP
)
769 return RBD_SNAP_HEAD_NAME
;
771 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
)
772 if (snap_id
== snap
->id
)
778 static int snap_by_name(struct rbd_device
*rbd_dev
, const char *snap_name
)
781 struct rbd_snap
*snap
;
783 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
784 if (!strcmp(snap_name
, snap
->name
)) {
785 rbd_dev
->spec
->snap_id
= snap
->id
;
786 rbd_dev
->mapping
.size
= snap
->size
;
787 rbd_dev
->mapping
.features
= snap
->features
;
796 static int rbd_dev_set_mapping(struct rbd_device
*rbd_dev
)
800 if (!memcmp(rbd_dev
->spec
->snap_name
, RBD_SNAP_HEAD_NAME
,
801 sizeof (RBD_SNAP_HEAD_NAME
))) {
802 rbd_dev
->spec
->snap_id
= CEPH_NOSNAP
;
803 rbd_dev
->mapping
.size
= rbd_dev
->header
.image_size
;
804 rbd_dev
->mapping
.features
= rbd_dev
->header
.features
;
807 ret
= snap_by_name(rbd_dev
, rbd_dev
->spec
->snap_name
);
810 rbd_dev
->mapping
.read_only
= true;
812 set_bit(RBD_DEV_FLAG_EXISTS
, &rbd_dev
->flags
);
818 static void rbd_header_free(struct rbd_image_header
*header
)
820 kfree(header
->object_prefix
);
821 header
->object_prefix
= NULL
;
822 kfree(header
->snap_sizes
);
823 header
->snap_sizes
= NULL
;
824 kfree(header
->snap_names
);
825 header
->snap_names
= NULL
;
826 ceph_put_snap_context(header
->snapc
);
827 header
->snapc
= NULL
;
830 static const char *rbd_segment_name(struct rbd_device
*rbd_dev
, u64 offset
)
836 name
= kmalloc(MAX_OBJ_NAME_SIZE
+ 1, GFP_NOIO
);
839 segment
= offset
>> rbd_dev
->header
.obj_order
;
840 ret
= snprintf(name
, MAX_OBJ_NAME_SIZE
+ 1, "%s.%012llx",
841 rbd_dev
->header
.object_prefix
, segment
);
842 if (ret
< 0 || ret
> MAX_OBJ_NAME_SIZE
) {
843 pr_err("error formatting segment name for #%llu (%d)\n",
852 static u64
rbd_segment_offset(struct rbd_device
*rbd_dev
, u64 offset
)
854 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
856 return offset
& (segment_size
- 1);
859 static u64
rbd_segment_length(struct rbd_device
*rbd_dev
,
860 u64 offset
, u64 length
)
862 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
864 offset
&= segment_size
- 1;
866 rbd_assert(length
<= U64_MAX
- offset
);
867 if (offset
+ length
> segment_size
)
868 length
= segment_size
- offset
;
874 * returns the size of an object in the image
876 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
878 return 1 << header
->obj_order
;
885 static void bio_chain_put(struct bio
*chain
)
891 chain
= chain
->bi_next
;
897 * zeros a bio chain, starting at specific offset
899 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
908 bio_for_each_segment(bv
, chain
, i
) {
909 if (pos
+ bv
->bv_len
> start_ofs
) {
910 int remainder
= max(start_ofs
- pos
, 0);
911 buf
= bvec_kmap_irq(bv
, &flags
);
912 memset(buf
+ remainder
, 0,
913 bv
->bv_len
- remainder
);
914 bvec_kunmap_irq(buf
, &flags
);
919 chain
= chain
->bi_next
;
924 * Clone a portion of a bio, starting at the given byte offset
925 * and continuing for the number of bytes indicated.
927 static struct bio
*bio_clone_range(struct bio
*bio_src
,
936 unsigned short end_idx
;
940 /* Handle the easy case for the caller */
942 if (!offset
&& len
== bio_src
->bi_size
)
943 return bio_clone(bio_src
, gfpmask
);
945 if (WARN_ON_ONCE(!len
))
947 if (WARN_ON_ONCE(len
> bio_src
->bi_size
))
949 if (WARN_ON_ONCE(offset
> bio_src
->bi_size
- len
))
952 /* Find first affected segment... */
955 __bio_for_each_segment(bv
, bio_src
, idx
, 0) {
956 if (resid
< bv
->bv_len
)
962 /* ...and the last affected segment */
965 __bio_for_each_segment(bv
, bio_src
, end_idx
, idx
) {
966 if (resid
<= bv
->bv_len
)
970 vcnt
= end_idx
- idx
+ 1;
972 /* Build the clone */
974 bio
= bio_alloc(gfpmask
, (unsigned int) vcnt
);
976 return NULL
; /* ENOMEM */
978 bio
->bi_bdev
= bio_src
->bi_bdev
;
979 bio
->bi_sector
= bio_src
->bi_sector
+ (offset
>> SECTOR_SHIFT
);
980 bio
->bi_rw
= bio_src
->bi_rw
;
981 bio
->bi_flags
|= 1 << BIO_CLONED
;
984 * Copy over our part of the bio_vec, then update the first
985 * and last (or only) entries.
987 memcpy(&bio
->bi_io_vec
[0], &bio_src
->bi_io_vec
[idx
],
988 vcnt
* sizeof (struct bio_vec
));
989 bio
->bi_io_vec
[0].bv_offset
+= voff
;
991 bio
->bi_io_vec
[0].bv_len
-= voff
;
992 bio
->bi_io_vec
[vcnt
- 1].bv_len
= resid
;
994 bio
->bi_io_vec
[0].bv_len
= len
;
1005 * Clone a portion of a bio chain, starting at the given byte offset
1006 * into the first bio in the source chain and continuing for the
1007 * number of bytes indicated. The result is another bio chain of
1008 * exactly the given length, or a null pointer on error.
1010 * The bio_src and offset parameters are both in-out. On entry they
1011 * refer to the first source bio and the offset into that bio where
1012 * the start of data to be cloned is located.
1014 * On return, bio_src is updated to refer to the bio in the source
1015 * chain that contains first un-cloned byte, and *offset will
1016 * contain the offset of that byte within that bio.
1018 static struct bio
*bio_chain_clone_range(struct bio
**bio_src
,
1019 unsigned int *offset
,
1023 struct bio
*bi
= *bio_src
;
1024 unsigned int off
= *offset
;
1025 struct bio
*chain
= NULL
;
1028 /* Build up a chain of clone bios up to the limit */
1030 if (!bi
|| off
>= bi
->bi_size
|| !len
)
1031 return NULL
; /* Nothing to clone */
1035 unsigned int bi_size
;
1039 rbd_warn(NULL
, "bio_chain exhausted with %u left", len
);
1040 goto out_err
; /* EINVAL; ran out of bio's */
1042 bi_size
= min_t(unsigned int, bi
->bi_size
- off
, len
);
1043 bio
= bio_clone_range(bi
, off
, bi_size
, gfpmask
);
1045 goto out_err
; /* ENOMEM */
1048 end
= &bio
->bi_next
;
1051 if (off
== bi
->bi_size
) {
1062 bio_chain_put(chain
);
1067 static void rbd_obj_request_get(struct rbd_obj_request
*obj_request
)
1069 dout("%s: obj %p (was %d)\n", __func__
, obj_request
,
1070 atomic_read(&obj_request
->kref
.refcount
));
1071 kref_get(&obj_request
->kref
);
1074 static void rbd_obj_request_destroy(struct kref
*kref
);
1075 static void rbd_obj_request_put(struct rbd_obj_request
*obj_request
)
1077 rbd_assert(obj_request
!= NULL
);
1078 dout("%s: obj %p (was %d)\n", __func__
, obj_request
,
1079 atomic_read(&obj_request
->kref
.refcount
));
1080 kref_put(&obj_request
->kref
, rbd_obj_request_destroy
);
1083 static void rbd_img_request_get(struct rbd_img_request
*img_request
)
1085 dout("%s: img %p (was %d)\n", __func__
, img_request
,
1086 atomic_read(&img_request
->kref
.refcount
));
1087 kref_get(&img_request
->kref
);
1090 static void rbd_img_request_destroy(struct kref
*kref
);
1091 static void rbd_img_request_put(struct rbd_img_request
*img_request
)
1093 rbd_assert(img_request
!= NULL
);
1094 dout("%s: img %p (was %d)\n", __func__
, img_request
,
1095 atomic_read(&img_request
->kref
.refcount
));
1096 kref_put(&img_request
->kref
, rbd_img_request_destroy
);
1099 static inline void rbd_img_obj_request_add(struct rbd_img_request
*img_request
,
1100 struct rbd_obj_request
*obj_request
)
1102 rbd_assert(obj_request
->img_request
== NULL
);
1104 rbd_obj_request_get(obj_request
);
1105 obj_request
->img_request
= img_request
;
1106 obj_request
->which
= img_request
->obj_request_count
;
1107 rbd_assert(obj_request
->which
!= BAD_WHICH
);
1108 img_request
->obj_request_count
++;
1109 list_add_tail(&obj_request
->links
, &img_request
->obj_requests
);
1110 dout("%s: img %p obj %p w=%u\n", __func__
, img_request
, obj_request
,
1111 obj_request
->which
);
1114 static inline void rbd_img_obj_request_del(struct rbd_img_request
*img_request
,
1115 struct rbd_obj_request
*obj_request
)
1117 rbd_assert(obj_request
->which
!= BAD_WHICH
);
1119 dout("%s: img %p obj %p w=%u\n", __func__
, img_request
, obj_request
,
1120 obj_request
->which
);
1121 list_del(&obj_request
->links
);
1122 rbd_assert(img_request
->obj_request_count
> 0);
1123 img_request
->obj_request_count
--;
1124 rbd_assert(obj_request
->which
== img_request
->obj_request_count
);
1125 obj_request
->which
= BAD_WHICH
;
1126 rbd_assert(obj_request
->img_request
== img_request
);
1127 obj_request
->img_request
= NULL
;
1128 obj_request
->callback
= NULL
;
1129 rbd_obj_request_put(obj_request
);
1132 static bool obj_request_type_valid(enum obj_request_type type
)
1135 case OBJ_REQUEST_NODATA
:
1136 case OBJ_REQUEST_BIO
:
1137 case OBJ_REQUEST_PAGES
:
1144 static struct ceph_osd_req_op
*rbd_osd_req_op_create(u16 opcode
, ...)
1146 struct ceph_osd_req_op
*op
;
1150 op
= kzalloc(sizeof (*op
), GFP_NOIO
);
1154 va_start(args
, opcode
);
1156 case CEPH_OSD_OP_READ
:
1157 case CEPH_OSD_OP_WRITE
:
1158 /* rbd_osd_req_op_create(READ, offset, length) */
1159 /* rbd_osd_req_op_create(WRITE, offset, length) */
1160 op
->extent
.offset
= va_arg(args
, u64
);
1161 op
->extent
.length
= va_arg(args
, u64
);
1162 if (opcode
== CEPH_OSD_OP_WRITE
)
1163 op
->payload_len
= op
->extent
.length
;
1165 case CEPH_OSD_OP_STAT
:
1167 case CEPH_OSD_OP_CALL
:
1168 /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
1169 op
->cls
.class_name
= va_arg(args
, char *);
1170 size
= strlen(op
->cls
.class_name
);
1171 rbd_assert(size
<= (size_t) U8_MAX
);
1172 op
->cls
.class_len
= size
;
1173 op
->payload_len
= size
;
1175 op
->cls
.method_name
= va_arg(args
, char *);
1176 size
= strlen(op
->cls
.method_name
);
1177 rbd_assert(size
<= (size_t) U8_MAX
);
1178 op
->cls
.method_len
= size
;
1179 op
->payload_len
+= size
;
1182 op
->cls
.indata
= va_arg(args
, void *);
1183 size
= va_arg(args
, size_t);
1184 rbd_assert(size
<= (size_t) U32_MAX
);
1185 op
->cls
.indata_len
= (u32
) size
;
1186 op
->payload_len
+= size
;
1188 case CEPH_OSD_OP_NOTIFY_ACK
:
1189 case CEPH_OSD_OP_WATCH
:
1190 /* rbd_osd_req_op_create(NOTIFY_ACK, cookie, version) */
1191 /* rbd_osd_req_op_create(WATCH, cookie, version, flag) */
1192 op
->watch
.cookie
= va_arg(args
, u64
);
1193 op
->watch
.ver
= va_arg(args
, u64
);
1194 op
->watch
.ver
= cpu_to_le64(op
->watch
.ver
);
1195 if (opcode
== CEPH_OSD_OP_WATCH
&& va_arg(args
, int))
1196 op
->watch
.flag
= (u8
) 1;
1199 rbd_warn(NULL
, "unsupported opcode %hu\n", opcode
);
1209 static void rbd_osd_req_op_destroy(struct ceph_osd_req_op
*op
)
1214 static int rbd_obj_request_submit(struct ceph_osd_client
*osdc
,
1215 struct rbd_obj_request
*obj_request
)
1217 dout("%s: osdc %p obj %p\n", __func__
, osdc
, obj_request
);
1219 return ceph_osdc_start_request(osdc
, obj_request
->osd_req
, false);
1222 static void rbd_img_request_complete(struct rbd_img_request
*img_request
)
1224 dout("%s: img %p\n", __func__
, img_request
);
1225 if (img_request
->callback
)
1226 img_request
->callback(img_request
);
1228 rbd_img_request_put(img_request
);
1231 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1233 static int rbd_obj_request_wait(struct rbd_obj_request
*obj_request
)
1235 dout("%s: obj %p\n", __func__
, obj_request
);
1237 return wait_for_completion_interruptible(&obj_request
->completion
);
1240 static void obj_request_done_init(struct rbd_obj_request
*obj_request
)
1242 atomic_set(&obj_request
->done
, 0);
1246 static void obj_request_done_set(struct rbd_obj_request
*obj_request
)
1250 done
= atomic_inc_return(&obj_request
->done
);
1252 struct rbd_img_request
*img_request
= obj_request
->img_request
;
1253 struct rbd_device
*rbd_dev
;
1255 rbd_dev
= img_request
? img_request
->rbd_dev
: NULL
;
1256 rbd_warn(rbd_dev
, "obj_request %p was already done\n",
1261 static bool obj_request_done_test(struct rbd_obj_request
*obj_request
)
1264 return atomic_read(&obj_request
->done
) != 0;
1268 rbd_img_obj_request_read_callback(struct rbd_obj_request
*obj_request
)
1270 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__
,
1271 obj_request
, obj_request
->img_request
, obj_request
->result
,
1272 obj_request
->xferred
, obj_request
->length
);
1274 * ENOENT means a hole in the image. We zero-fill the
1275 * entire length of the request. A short read also implies
1276 * zero-fill to the end of the request. Either way we
1277 * update the xferred count to indicate the whole request
1280 BUG_ON(obj_request
->type
!= OBJ_REQUEST_BIO
);
1281 if (obj_request
->result
== -ENOENT
) {
1282 zero_bio_chain(obj_request
->bio_list
, 0);
1283 obj_request
->result
= 0;
1284 obj_request
->xferred
= obj_request
->length
;
1285 } else if (obj_request
->xferred
< obj_request
->length
&&
1286 !obj_request
->result
) {
1287 zero_bio_chain(obj_request
->bio_list
, obj_request
->xferred
);
1288 obj_request
->xferred
= obj_request
->length
;
1290 obj_request_done_set(obj_request
);
1293 static void rbd_obj_request_complete(struct rbd_obj_request
*obj_request
)
1295 dout("%s: obj %p cb %p\n", __func__
, obj_request
,
1296 obj_request
->callback
);
1297 if (obj_request
->callback
)
1298 obj_request
->callback(obj_request
);
1300 complete_all(&obj_request
->completion
);
1303 static void rbd_osd_trivial_callback(struct rbd_obj_request
*obj_request
)
1305 dout("%s: obj %p\n", __func__
, obj_request
);
1306 obj_request_done_set(obj_request
);
1309 static void rbd_osd_read_callback(struct rbd_obj_request
*obj_request
)
1311 dout("%s: obj %p result %d %llu/%llu\n", __func__
, obj_request
,
1312 obj_request
->result
, obj_request
->xferred
, obj_request
->length
);
1313 if (obj_request
->img_request
)
1314 rbd_img_obj_request_read_callback(obj_request
);
1316 obj_request_done_set(obj_request
);
1319 static void rbd_osd_write_callback(struct rbd_obj_request
*obj_request
)
1321 dout("%s: obj %p result %d %llu\n", __func__
, obj_request
,
1322 obj_request
->result
, obj_request
->length
);
1324 * There is no such thing as a successful short write.
1325 * Our xferred value is the number of bytes transferred
1326 * back. Set it to our originally-requested length.
1328 obj_request
->xferred
= obj_request
->length
;
1329 obj_request_done_set(obj_request
);
1333 * For a simple stat call there's nothing to do. We'll do more if
1334 * this is part of a write sequence for a layered image.
1336 static void rbd_osd_stat_callback(struct rbd_obj_request
*obj_request
)
1338 dout("%s: obj %p\n", __func__
, obj_request
);
1339 obj_request_done_set(obj_request
);
1342 static void rbd_osd_req_callback(struct ceph_osd_request
*osd_req
,
1343 struct ceph_msg
*msg
)
1345 struct rbd_obj_request
*obj_request
= osd_req
->r_priv
;
1348 dout("%s: osd_req %p msg %p\n", __func__
, osd_req
, msg
);
1349 rbd_assert(osd_req
== obj_request
->osd_req
);
1350 rbd_assert(!!obj_request
->img_request
^
1351 (obj_request
->which
== BAD_WHICH
));
1353 if (osd_req
->r_result
< 0)
1354 obj_request
->result
= osd_req
->r_result
;
1355 obj_request
->version
= le64_to_cpu(osd_req
->r_reassert_version
.version
);
1357 WARN_ON(osd_req
->r_num_ops
!= 1); /* For now */
1360 * We support a 64-bit length, but ultimately it has to be
1361 * passed to blk_end_request(), which takes an unsigned int.
1363 obj_request
->xferred
= osd_req
->r_reply_op_len
[0];
1364 rbd_assert(obj_request
->xferred
< (u64
) UINT_MAX
);
1365 opcode
= osd_req
->r_request_ops
[0].op
;
1367 case CEPH_OSD_OP_READ
:
1368 rbd_osd_read_callback(obj_request
);
1370 case CEPH_OSD_OP_WRITE
:
1371 rbd_osd_write_callback(obj_request
);
1373 case CEPH_OSD_OP_STAT
:
1374 rbd_osd_stat_callback(obj_request
);
1376 case CEPH_OSD_OP_CALL
:
1377 case CEPH_OSD_OP_NOTIFY_ACK
:
1378 case CEPH_OSD_OP_WATCH
:
1379 rbd_osd_trivial_callback(obj_request
);
1382 rbd_warn(NULL
, "%s: unsupported op %hu\n",
1383 obj_request
->object_name
, (unsigned short) opcode
);
1387 if (obj_request_done_test(obj_request
))
1388 rbd_obj_request_complete(obj_request
);
1391 static struct ceph_osd_request
*rbd_osd_req_create(
1392 struct rbd_device
*rbd_dev
,
1394 struct rbd_obj_request
*obj_request
,
1395 struct ceph_osd_req_op
*op
)
1397 struct rbd_img_request
*img_request
= obj_request
->img_request
;
1398 struct ceph_snap_context
*snapc
= NULL
;
1399 struct ceph_osd_client
*osdc
;
1400 struct ceph_osd_request
*osd_req
;
1401 struct timespec now
;
1402 struct timespec
*mtime
;
1403 u64 snap_id
= CEPH_NOSNAP
;
1404 u64 offset
= obj_request
->offset
;
1405 u64 length
= obj_request
->length
;
1408 rbd_assert(img_request
->write_request
== write_request
);
1409 if (img_request
->write_request
)
1410 snapc
= img_request
->snapc
;
1412 snap_id
= img_request
->snap_id
;
1415 /* Allocate and initialize the request, for the single op */
1417 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1418 osd_req
= ceph_osdc_alloc_request(osdc
, snapc
, 1, false, GFP_ATOMIC
);
1420 return NULL
; /* ENOMEM */
1422 rbd_assert(obj_request_type_valid(obj_request
->type
));
1423 switch (obj_request
->type
) {
1424 case OBJ_REQUEST_NODATA
:
1425 break; /* Nothing to do */
1426 case OBJ_REQUEST_BIO
:
1427 rbd_assert(obj_request
->bio_list
!= NULL
);
1428 osd_req
->r_bio
= obj_request
->bio_list
;
1430 case OBJ_REQUEST_PAGES
:
1431 osd_req
->r_pages
= obj_request
->pages
;
1432 osd_req
->r_num_pages
= obj_request
->page_count
;
1433 osd_req
->r_page_alignment
= offset
& ~PAGE_MASK
;
1437 if (write_request
) {
1438 osd_req
->r_flags
= CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
;
1442 osd_req
->r_flags
= CEPH_OSD_FLAG_READ
;
1443 mtime
= NULL
; /* not needed for reads */
1444 offset
= 0; /* These are not used... */
1445 length
= 0; /* ...for osd read requests */
1448 osd_req
->r_callback
= rbd_osd_req_callback
;
1449 osd_req
->r_priv
= obj_request
;
1451 osd_req
->r_oid_len
= strlen(obj_request
->object_name
);
1452 rbd_assert(osd_req
->r_oid_len
< sizeof (osd_req
->r_oid
));
1453 memcpy(osd_req
->r_oid
, obj_request
->object_name
, osd_req
->r_oid_len
);
1455 osd_req
->r_file_layout
= rbd_dev
->layout
; /* struct */
1457 /* osd_req will get its own reference to snapc (if non-null) */
1459 ceph_osdc_build_request(osd_req
, offset
, length
, 1, op
,
1460 snapc
, snap_id
, mtime
);
1465 static void rbd_osd_req_destroy(struct ceph_osd_request
*osd_req
)
1467 ceph_osdc_put_request(osd_req
);
1470 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1472 static struct rbd_obj_request
*rbd_obj_request_create(const char *object_name
,
1473 u64 offset
, u64 length
,
1474 enum obj_request_type type
)
1476 struct rbd_obj_request
*obj_request
;
1480 rbd_assert(obj_request_type_valid(type
));
1482 size
= strlen(object_name
) + 1;
1483 obj_request
= kzalloc(sizeof (*obj_request
) + size
, GFP_KERNEL
);
1487 name
= (char *)(obj_request
+ 1);
1488 obj_request
->object_name
= memcpy(name
, object_name
, size
);
1489 obj_request
->offset
= offset
;
1490 obj_request
->length
= length
;
1491 obj_request
->which
= BAD_WHICH
;
1492 obj_request
->type
= type
;
1493 INIT_LIST_HEAD(&obj_request
->links
);
1494 obj_request_done_init(obj_request
);
1495 init_completion(&obj_request
->completion
);
1496 kref_init(&obj_request
->kref
);
1498 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__
, object_name
,
1499 offset
, length
, (int)type
, obj_request
);
1504 static void rbd_obj_request_destroy(struct kref
*kref
)
1506 struct rbd_obj_request
*obj_request
;
1508 obj_request
= container_of(kref
, struct rbd_obj_request
, kref
);
1510 dout("%s: obj %p\n", __func__
, obj_request
);
1512 rbd_assert(obj_request
->img_request
== NULL
);
1513 rbd_assert(obj_request
->which
== BAD_WHICH
);
1515 if (obj_request
->osd_req
)
1516 rbd_osd_req_destroy(obj_request
->osd_req
);
1518 rbd_assert(obj_request_type_valid(obj_request
->type
));
1519 switch (obj_request
->type
) {
1520 case OBJ_REQUEST_NODATA
:
1521 break; /* Nothing to do */
1522 case OBJ_REQUEST_BIO
:
1523 if (obj_request
->bio_list
)
1524 bio_chain_put(obj_request
->bio_list
);
1526 case OBJ_REQUEST_PAGES
:
1527 if (obj_request
->pages
)
1528 ceph_release_page_vector(obj_request
->pages
,
1529 obj_request
->page_count
);
1537 * Caller is responsible for filling in the list of object requests
1538 * that comprises the image request, and the Linux request pointer
1539 * (if there is one).
1541 static struct rbd_img_request
*rbd_img_request_create(
1542 struct rbd_device
*rbd_dev
,
1543 u64 offset
, u64 length
,
1546 struct rbd_img_request
*img_request
;
1547 struct ceph_snap_context
*snapc
= NULL
;
1549 img_request
= kmalloc(sizeof (*img_request
), GFP_ATOMIC
);
1553 if (write_request
) {
1554 down_read(&rbd_dev
->header_rwsem
);
1555 snapc
= ceph_get_snap_context(rbd_dev
->header
.snapc
);
1556 up_read(&rbd_dev
->header_rwsem
);
1557 if (WARN_ON(!snapc
)) {
1559 return NULL
; /* Shouldn't happen */
1563 img_request
->rq
= NULL
;
1564 img_request
->rbd_dev
= rbd_dev
;
1565 img_request
->offset
= offset
;
1566 img_request
->length
= length
;
1567 img_request
->write_request
= write_request
;
1569 img_request
->snapc
= snapc
;
1571 img_request
->snap_id
= rbd_dev
->spec
->snap_id
;
1572 spin_lock_init(&img_request
->completion_lock
);
1573 img_request
->next_completion
= 0;
1574 img_request
->callback
= NULL
;
1575 img_request
->obj_request_count
= 0;
1576 INIT_LIST_HEAD(&img_request
->obj_requests
);
1577 kref_init(&img_request
->kref
);
1579 rbd_img_request_get(img_request
); /* Avoid a warning */
1580 rbd_img_request_put(img_request
); /* TEMPORARY */
1582 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__
, rbd_dev
,
1583 write_request
? "write" : "read", offset
, length
,
1589 static void rbd_img_request_destroy(struct kref
*kref
)
1591 struct rbd_img_request
*img_request
;
1592 struct rbd_obj_request
*obj_request
;
1593 struct rbd_obj_request
*next_obj_request
;
1595 img_request
= container_of(kref
, struct rbd_img_request
, kref
);
1597 dout("%s: img %p\n", __func__
, img_request
);
1599 for_each_obj_request_safe(img_request
, obj_request
, next_obj_request
)
1600 rbd_img_obj_request_del(img_request
, obj_request
);
1601 rbd_assert(img_request
->obj_request_count
== 0);
1603 if (img_request
->write_request
)
1604 ceph_put_snap_context(img_request
->snapc
);
1609 static int rbd_img_request_fill_bio(struct rbd_img_request
*img_request
,
1610 struct bio
*bio_list
)
1612 struct rbd_device
*rbd_dev
= img_request
->rbd_dev
;
1613 struct rbd_obj_request
*obj_request
= NULL
;
1614 struct rbd_obj_request
*next_obj_request
;
1615 unsigned int bio_offset
;
1620 dout("%s: img %p bio %p\n", __func__
, img_request
, bio_list
);
1622 opcode
= img_request
->write_request
? CEPH_OSD_OP_WRITE
1625 image_offset
= img_request
->offset
;
1626 rbd_assert(image_offset
== bio_list
->bi_sector
<< SECTOR_SHIFT
);
1627 resid
= img_request
->length
;
1628 rbd_assert(resid
> 0);
1630 const char *object_name
;
1631 unsigned int clone_size
;
1632 struct ceph_osd_req_op
*op
;
1636 object_name
= rbd_segment_name(rbd_dev
, image_offset
);
1639 offset
= rbd_segment_offset(rbd_dev
, image_offset
);
1640 length
= rbd_segment_length(rbd_dev
, image_offset
, resid
);
1641 obj_request
= rbd_obj_request_create(object_name
,
1644 kfree(object_name
); /* object request has its own copy */
1648 rbd_assert(length
<= (u64
) UINT_MAX
);
1649 clone_size
= (unsigned int) length
;
1650 obj_request
->bio_list
= bio_chain_clone_range(&bio_list
,
1651 &bio_offset
, clone_size
,
1653 if (!obj_request
->bio_list
)
1657 * Build up the op to use in building the osd
1658 * request. Note that the contents of the op are
1659 * copied by rbd_osd_req_create().
1661 op
= rbd_osd_req_op_create(opcode
, offset
, length
);
1664 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
,
1665 img_request
->write_request
,
1667 rbd_osd_req_op_destroy(op
);
1668 if (!obj_request
->osd_req
)
1670 /* status and version are initially zero-filled */
1672 rbd_img_obj_request_add(img_request
, obj_request
);
1674 image_offset
+= length
;
1681 rbd_obj_request_put(obj_request
);
1683 for_each_obj_request_safe(img_request
, obj_request
, next_obj_request
)
1684 rbd_obj_request_put(obj_request
);
1689 static void rbd_img_obj_callback(struct rbd_obj_request
*obj_request
)
1691 struct rbd_img_request
*img_request
;
1692 u32 which
= obj_request
->which
;
1695 img_request
= obj_request
->img_request
;
1697 dout("%s: img %p obj %p\n", __func__
, img_request
, obj_request
);
1698 rbd_assert(img_request
!= NULL
);
1699 rbd_assert(img_request
->rq
!= NULL
);
1700 rbd_assert(img_request
->obj_request_count
> 0);
1701 rbd_assert(which
!= BAD_WHICH
);
1702 rbd_assert(which
< img_request
->obj_request_count
);
1703 rbd_assert(which
>= img_request
->next_completion
);
1705 spin_lock_irq(&img_request
->completion_lock
);
1706 if (which
!= img_request
->next_completion
)
1709 for_each_obj_request_from(img_request
, obj_request
) {
1710 unsigned int xferred
;
1714 rbd_assert(which
< img_request
->obj_request_count
);
1716 if (!obj_request_done_test(obj_request
))
1719 rbd_assert(obj_request
->xferred
<= (u64
) UINT_MAX
);
1720 xferred
= (unsigned int) obj_request
->xferred
;
1721 result
= (int) obj_request
->result
;
1723 rbd_warn(NULL
, "obj_request %s result %d xferred %u\n",
1724 img_request
->write_request
? "write" : "read",
1727 more
= blk_end_request(img_request
->rq
, result
, xferred
);
1731 rbd_assert(more
^ (which
== img_request
->obj_request_count
));
1732 img_request
->next_completion
= which
;
1734 spin_unlock_irq(&img_request
->completion_lock
);
1737 rbd_img_request_complete(img_request
);
1740 static int rbd_img_request_submit(struct rbd_img_request
*img_request
)
1742 struct rbd_device
*rbd_dev
= img_request
->rbd_dev
;
1743 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1744 struct rbd_obj_request
*obj_request
;
1746 dout("%s: img %p\n", __func__
, img_request
);
1747 for_each_obj_request(img_request
, obj_request
) {
1750 obj_request
->callback
= rbd_img_obj_callback
;
1751 ret
= rbd_obj_request_submit(osdc
, obj_request
);
1755 * The image request has its own reference to each
1756 * of its object requests, so we can safely drop the
1759 rbd_obj_request_put(obj_request
);
1765 static int rbd_obj_notify_ack(struct rbd_device
*rbd_dev
,
1766 u64 ver
, u64 notify_id
)
1768 struct rbd_obj_request
*obj_request
;
1769 struct ceph_osd_req_op
*op
;
1770 struct ceph_osd_client
*osdc
;
1773 obj_request
= rbd_obj_request_create(rbd_dev
->header_name
, 0, 0,
1774 OBJ_REQUEST_NODATA
);
1779 op
= rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK
, notify_id
, ver
);
1782 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, false,
1784 rbd_osd_req_op_destroy(op
);
1785 if (!obj_request
->osd_req
)
1788 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1789 obj_request
->callback
= rbd_obj_request_put
;
1790 ret
= rbd_obj_request_submit(osdc
, obj_request
);
1793 rbd_obj_request_put(obj_request
);
1798 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1800 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1807 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__
,
1808 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1809 (unsigned int) opcode
);
1810 rc
= rbd_dev_refresh(rbd_dev
, &hver
);
1812 rbd_warn(rbd_dev
, "got notification but failed to "
1813 " update snaps: %d\n", rc
);
1815 rbd_obj_notify_ack(rbd_dev
, hver
, notify_id
);
1819 * Request sync osd watch/unwatch. The value of "start" determines
1820 * whether a watch request is being initiated or torn down.
1822 static int rbd_dev_header_watch_sync(struct rbd_device
*rbd_dev
, int start
)
1824 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1825 struct rbd_obj_request
*obj_request
;
1826 struct ceph_osd_req_op
*op
;
1829 rbd_assert(start
^ !!rbd_dev
->watch_event
);
1830 rbd_assert(start
^ !!rbd_dev
->watch_request
);
1833 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, rbd_dev
,
1834 &rbd_dev
->watch_event
);
1837 rbd_assert(rbd_dev
->watch_event
!= NULL
);
1841 obj_request
= rbd_obj_request_create(rbd_dev
->header_name
, 0, 0,
1842 OBJ_REQUEST_NODATA
);
1846 op
= rbd_osd_req_op_create(CEPH_OSD_OP_WATCH
,
1847 rbd_dev
->watch_event
->cookie
,
1848 rbd_dev
->header
.obj_version
, start
);
1851 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, true,
1853 rbd_osd_req_op_destroy(op
);
1854 if (!obj_request
->osd_req
)
1858 ceph_osdc_set_request_linger(osdc
, obj_request
->osd_req
);
1860 ceph_osdc_unregister_linger_request(osdc
,
1861 rbd_dev
->watch_request
->osd_req
);
1862 ret
= rbd_obj_request_submit(osdc
, obj_request
);
1865 ret
= rbd_obj_request_wait(obj_request
);
1868 ret
= obj_request
->result
;
1873 * A watch request is set to linger, so the underlying osd
1874 * request won't go away until we unregister it. We retain
1875 * a pointer to the object request during that time (in
1876 * rbd_dev->watch_request), so we'll keep a reference to
1877 * it. We'll drop that reference (below) after we've
1881 rbd_dev
->watch_request
= obj_request
;
1886 /* We have successfully torn down the watch request */
1888 rbd_obj_request_put(rbd_dev
->watch_request
);
1889 rbd_dev
->watch_request
= NULL
;
1891 /* Cancel the event if we're tearing down, or on error */
1892 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1893 rbd_dev
->watch_event
= NULL
;
1895 rbd_obj_request_put(obj_request
);
1901 * Synchronous osd object method call
1903 static int rbd_obj_method_sync(struct rbd_device
*rbd_dev
,
1904 const char *object_name
,
1905 const char *class_name
,
1906 const char *method_name
,
1907 const char *outbound
,
1908 size_t outbound_size
,
1910 size_t inbound_size
,
1913 struct rbd_obj_request
*obj_request
;
1914 struct ceph_osd_client
*osdc
;
1915 struct ceph_osd_req_op
*op
;
1916 struct page
**pages
;
1921 * Method calls are ultimately read operations but they
1922 * don't involve object data (so no offset or length).
1923 * The result should placed into the inbound buffer
1924 * provided. They also supply outbound data--parameters for
1925 * the object method. Currently if this is present it will
1928 page_count
= (u32
) calc_pages_for(0, inbound_size
);
1929 pages
= ceph_alloc_page_vector(page_count
, GFP_KERNEL
);
1931 return PTR_ERR(pages
);
1934 obj_request
= rbd_obj_request_create(object_name
, 0, 0,
1939 obj_request
->pages
= pages
;
1940 obj_request
->page_count
= page_count
;
1942 op
= rbd_osd_req_op_create(CEPH_OSD_OP_CALL
, class_name
,
1943 method_name
, outbound
, outbound_size
);
1946 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, false,
1948 rbd_osd_req_op_destroy(op
);
1949 if (!obj_request
->osd_req
)
1952 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1953 ret
= rbd_obj_request_submit(osdc
, obj_request
);
1956 ret
= rbd_obj_request_wait(obj_request
);
1960 ret
= obj_request
->result
;
1964 ceph_copy_from_page_vector(pages
, inbound
, 0, obj_request
->xferred
);
1966 *version
= obj_request
->version
;
1969 rbd_obj_request_put(obj_request
);
1971 ceph_release_page_vector(pages
, page_count
);
1976 static void rbd_request_fn(struct request_queue
*q
)
1977 __releases(q
->queue_lock
) __acquires(q
->queue_lock
)
1979 struct rbd_device
*rbd_dev
= q
->queuedata
;
1980 bool read_only
= rbd_dev
->mapping
.read_only
;
1984 while ((rq
= blk_fetch_request(q
))) {
1985 bool write_request
= rq_data_dir(rq
) == WRITE
;
1986 struct rbd_img_request
*img_request
;
1990 /* Ignore any non-FS requests that filter through. */
1992 if (rq
->cmd_type
!= REQ_TYPE_FS
) {
1993 dout("%s: non-fs request type %d\n", __func__
,
1994 (int) rq
->cmd_type
);
1995 __blk_end_request_all(rq
, 0);
1999 /* Ignore/skip any zero-length requests */
2001 offset
= (u64
) blk_rq_pos(rq
) << SECTOR_SHIFT
;
2002 length
= (u64
) blk_rq_bytes(rq
);
2005 dout("%s: zero-length request\n", __func__
);
2006 __blk_end_request_all(rq
, 0);
2010 spin_unlock_irq(q
->queue_lock
);
2012 /* Disallow writes to a read-only device */
2014 if (write_request
) {
2018 rbd_assert(rbd_dev
->spec
->snap_id
== CEPH_NOSNAP
);
2022 * Quit early if the mapped snapshot no longer
2023 * exists. It's still possible the snapshot will
2024 * have disappeared by the time our request arrives
2025 * at the osd, but there's no sense in sending it if
2028 if (!test_bit(RBD_DEV_FLAG_EXISTS
, &rbd_dev
->flags
)) {
2029 dout("request for non-existent snapshot");
2030 rbd_assert(rbd_dev
->spec
->snap_id
!= CEPH_NOSNAP
);
2036 if (WARN_ON(offset
&& length
> U64_MAX
- offset
+ 1))
2037 goto end_request
; /* Shouldn't happen */
2040 img_request
= rbd_img_request_create(rbd_dev
, offset
, length
,
2045 img_request
->rq
= rq
;
2047 result
= rbd_img_request_fill_bio(img_request
, rq
->bio
);
2049 result
= rbd_img_request_submit(img_request
);
2051 rbd_img_request_put(img_request
);
2053 spin_lock_irq(q
->queue_lock
);
2055 rbd_warn(rbd_dev
, "obj_request %s result %d\n",
2056 write_request
? "write" : "read", result
);
2057 __blk_end_request_all(rq
, result
);
2063 * a queue callback. Makes sure that we don't create a bio that spans across
2064 * multiple osd objects. One exception would be with a single page bios,
2065 * which we handle later at bio_chain_clone_range()
2067 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
2068 struct bio_vec
*bvec
)
2070 struct rbd_device
*rbd_dev
= q
->queuedata
;
2071 sector_t sector_offset
;
2072 sector_t sectors_per_obj
;
2073 sector_t obj_sector_offset
;
2077 * Find how far into its rbd object the partition-relative
2078 * bio start sector is to offset relative to the enclosing
2081 sector_offset
= get_start_sect(bmd
->bi_bdev
) + bmd
->bi_sector
;
2082 sectors_per_obj
= 1 << (rbd_dev
->header
.obj_order
- SECTOR_SHIFT
);
2083 obj_sector_offset
= sector_offset
& (sectors_per_obj
- 1);
2086 * Compute the number of bytes from that offset to the end
2087 * of the object. Account for what's already used by the bio.
2089 ret
= (int) (sectors_per_obj
- obj_sector_offset
) << SECTOR_SHIFT
;
2090 if (ret
> bmd
->bi_size
)
2091 ret
-= bmd
->bi_size
;
2096 * Don't send back more than was asked for. And if the bio
2097 * was empty, let the whole thing through because: "Note
2098 * that a block device *must* allow a single page to be
2099 * added to an empty bio."
2101 rbd_assert(bvec
->bv_len
<= PAGE_SIZE
);
2102 if (ret
> (int) bvec
->bv_len
|| !bmd
->bi_size
)
2103 ret
= (int) bvec
->bv_len
;
2108 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
2110 struct gendisk
*disk
= rbd_dev
->disk
;
2115 if (disk
->flags
& GENHD_FL_UP
)
2118 blk_cleanup_queue(disk
->queue
);
2122 static int rbd_obj_read_sync(struct rbd_device
*rbd_dev
,
2123 const char *object_name
,
2124 u64 offset
, u64 length
,
2125 char *buf
, u64
*version
)
2128 struct ceph_osd_req_op
*op
;
2129 struct rbd_obj_request
*obj_request
;
2130 struct ceph_osd_client
*osdc
;
2131 struct page
**pages
= NULL
;
2136 page_count
= (u32
) calc_pages_for(offset
, length
);
2137 pages
= ceph_alloc_page_vector(page_count
, GFP_KERNEL
);
2139 ret
= PTR_ERR(pages
);
2142 obj_request
= rbd_obj_request_create(object_name
, offset
, length
,
2147 obj_request
->pages
= pages
;
2148 obj_request
->page_count
= page_count
;
2150 op
= rbd_osd_req_op_create(CEPH_OSD_OP_READ
, offset
, length
);
2153 obj_request
->osd_req
= rbd_osd_req_create(rbd_dev
, false,
2155 rbd_osd_req_op_destroy(op
);
2156 if (!obj_request
->osd_req
)
2159 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2160 ret
= rbd_obj_request_submit(osdc
, obj_request
);
2163 ret
= rbd_obj_request_wait(obj_request
);
2167 ret
= obj_request
->result
;
2171 rbd_assert(obj_request
->xferred
<= (u64
) SIZE_MAX
);
2172 size
= (size_t) obj_request
->xferred
;
2173 ceph_copy_from_page_vector(pages
, buf
, 0, size
);
2174 rbd_assert(size
<= (size_t) INT_MAX
);
2177 *version
= obj_request
->version
;
2180 rbd_obj_request_put(obj_request
);
2182 ceph_release_page_vector(pages
, page_count
);
2188 * Read the complete header for the given rbd device.
2190 * Returns a pointer to a dynamically-allocated buffer containing
2191 * the complete and validated header. Caller can pass the address
2192 * of a variable that will be filled in with the version of the
2193 * header object at the time it was read.
2195 * Returns a pointer-coded errno if a failure occurs.
2197 static struct rbd_image_header_ondisk
*
2198 rbd_dev_v1_header_read(struct rbd_device
*rbd_dev
, u64
*version
)
2200 struct rbd_image_header_ondisk
*ondisk
= NULL
;
2207 * The complete header will include an array of its 64-bit
2208 * snapshot ids, followed by the names of those snapshots as
2209 * a contiguous block of NUL-terminated strings. Note that
2210 * the number of snapshots could change by the time we read
2211 * it in, in which case we re-read it.
2218 size
= sizeof (*ondisk
);
2219 size
+= snap_count
* sizeof (struct rbd_image_snap_ondisk
);
2221 ondisk
= kmalloc(size
, GFP_KERNEL
);
2223 return ERR_PTR(-ENOMEM
);
2225 ret
= rbd_obj_read_sync(rbd_dev
, rbd_dev
->header_name
,
2227 (char *) ondisk
, version
);
2230 if (WARN_ON((size_t) ret
< size
)) {
2232 rbd_warn(rbd_dev
, "short header read (want %zd got %d)",
2236 if (!rbd_dev_ondisk_valid(ondisk
)) {
2238 rbd_warn(rbd_dev
, "invalid header");
2242 names_size
= le64_to_cpu(ondisk
->snap_names_len
);
2243 want_count
= snap_count
;
2244 snap_count
= le32_to_cpu(ondisk
->snap_count
);
2245 } while (snap_count
!= want_count
);
2252 return ERR_PTR(ret
);
2256 * reload the ondisk the header
2258 static int rbd_read_header(struct rbd_device
*rbd_dev
,
2259 struct rbd_image_header
*header
)
2261 struct rbd_image_header_ondisk
*ondisk
;
2265 ondisk
= rbd_dev_v1_header_read(rbd_dev
, &ver
);
2267 return PTR_ERR(ondisk
);
2268 ret
= rbd_header_from_disk(header
, ondisk
);
2270 header
->obj_version
= ver
;
2276 static void rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
2278 struct rbd_snap
*snap
;
2279 struct rbd_snap
*next
;
2281 list_for_each_entry_safe(snap
, next
, &rbd_dev
->snaps
, node
)
2282 rbd_remove_snap_dev(snap
);
2285 static void rbd_update_mapping_size(struct rbd_device
*rbd_dev
)
2289 if (rbd_dev
->spec
->snap_id
!= CEPH_NOSNAP
)
2292 size
= (sector_t
) rbd_dev
->header
.image_size
/ SECTOR_SIZE
;
2293 dout("setting size to %llu sectors", (unsigned long long) size
);
2294 rbd_dev
->mapping
.size
= (u64
) size
;
2295 set_capacity(rbd_dev
->disk
, size
);
2299 * only read the first part of the ondisk header, without the snaps info
2301 static int rbd_dev_v1_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
2304 struct rbd_image_header h
;
2306 ret
= rbd_read_header(rbd_dev
, &h
);
2310 down_write(&rbd_dev
->header_rwsem
);
2312 /* Update image size, and check for resize of mapped image */
2313 rbd_dev
->header
.image_size
= h
.image_size
;
2314 rbd_update_mapping_size(rbd_dev
);
2316 /* rbd_dev->header.object_prefix shouldn't change */
2317 kfree(rbd_dev
->header
.snap_sizes
);
2318 kfree(rbd_dev
->header
.snap_names
);
2319 /* osd requests may still refer to snapc */
2320 ceph_put_snap_context(rbd_dev
->header
.snapc
);
2323 *hver
= h
.obj_version
;
2324 rbd_dev
->header
.obj_version
= h
.obj_version
;
2325 rbd_dev
->header
.image_size
= h
.image_size
;
2326 rbd_dev
->header
.snapc
= h
.snapc
;
2327 rbd_dev
->header
.snap_names
= h
.snap_names
;
2328 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
2329 /* Free the extra copy of the object prefix */
2330 WARN_ON(strcmp(rbd_dev
->header
.object_prefix
, h
.object_prefix
));
2331 kfree(h
.object_prefix
);
2333 ret
= rbd_dev_snaps_update(rbd_dev
);
2335 ret
= rbd_dev_snaps_register(rbd_dev
);
2337 up_write(&rbd_dev
->header_rwsem
);
2342 static int rbd_dev_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
2346 rbd_assert(rbd_image_format_valid(rbd_dev
->image_format
));
2347 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2348 if (rbd_dev
->image_format
== 1)
2349 ret
= rbd_dev_v1_refresh(rbd_dev
, hver
);
2351 ret
= rbd_dev_v2_refresh(rbd_dev
, hver
);
2352 mutex_unlock(&ctl_mutex
);
2357 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
2359 struct gendisk
*disk
;
2360 struct request_queue
*q
;
2363 /* create gendisk info */
2364 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
2368 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), RBD_DRV_NAME
"%d",
2370 disk
->major
= rbd_dev
->major
;
2371 disk
->first_minor
= 0;
2372 disk
->fops
= &rbd_bd_ops
;
2373 disk
->private_data
= rbd_dev
;
2375 q
= blk_init_queue(rbd_request_fn
, &rbd_dev
->lock
);
2379 /* We use the default size, but let's be explicit about it. */
2380 blk_queue_physical_block_size(q
, SECTOR_SIZE
);
2382 /* set io sizes to object size */
2383 segment_size
= rbd_obj_bytes(&rbd_dev
->header
);
2384 blk_queue_max_hw_sectors(q
, segment_size
/ SECTOR_SIZE
);
2385 blk_queue_max_segment_size(q
, segment_size
);
2386 blk_queue_io_min(q
, segment_size
);
2387 blk_queue_io_opt(q
, segment_size
);
2389 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
2392 q
->queuedata
= rbd_dev
;
2394 rbd_dev
->disk
= disk
;
2396 set_capacity(rbd_dev
->disk
, rbd_dev
->mapping
.size
/ SECTOR_SIZE
);
2409 static struct rbd_device
*dev_to_rbd_dev(struct device
*dev
)
2411 return container_of(dev
, struct rbd_device
, dev
);
2414 static ssize_t
rbd_size_show(struct device
*dev
,
2415 struct device_attribute
*attr
, char *buf
)
2417 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2420 down_read(&rbd_dev
->header_rwsem
);
2421 size
= get_capacity(rbd_dev
->disk
);
2422 up_read(&rbd_dev
->header_rwsem
);
2424 return sprintf(buf
, "%llu\n", (unsigned long long) size
* SECTOR_SIZE
);
2428 * Note this shows the features for whatever's mapped, which is not
2429 * necessarily the base image.
2431 static ssize_t
rbd_features_show(struct device
*dev
,
2432 struct device_attribute
*attr
, char *buf
)
2434 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2436 return sprintf(buf
, "0x%016llx\n",
2437 (unsigned long long) rbd_dev
->mapping
.features
);
2440 static ssize_t
rbd_major_show(struct device
*dev
,
2441 struct device_attribute
*attr
, char *buf
)
2443 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2445 return sprintf(buf
, "%d\n", rbd_dev
->major
);
2448 static ssize_t
rbd_client_id_show(struct device
*dev
,
2449 struct device_attribute
*attr
, char *buf
)
2451 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2453 return sprintf(buf
, "client%lld\n",
2454 ceph_client_id(rbd_dev
->rbd_client
->client
));
2457 static ssize_t
rbd_pool_show(struct device
*dev
,
2458 struct device_attribute
*attr
, char *buf
)
2460 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2462 return sprintf(buf
, "%s\n", rbd_dev
->spec
->pool_name
);
2465 static ssize_t
rbd_pool_id_show(struct device
*dev
,
2466 struct device_attribute
*attr
, char *buf
)
2468 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2470 return sprintf(buf
, "%llu\n",
2471 (unsigned long long) rbd_dev
->spec
->pool_id
);
2474 static ssize_t
rbd_name_show(struct device
*dev
,
2475 struct device_attribute
*attr
, char *buf
)
2477 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2479 if (rbd_dev
->spec
->image_name
)
2480 return sprintf(buf
, "%s\n", rbd_dev
->spec
->image_name
);
2482 return sprintf(buf
, "(unknown)\n");
2485 static ssize_t
rbd_image_id_show(struct device
*dev
,
2486 struct device_attribute
*attr
, char *buf
)
2488 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2490 return sprintf(buf
, "%s\n", rbd_dev
->spec
->image_id
);
2494 * Shows the name of the currently-mapped snapshot (or
2495 * RBD_SNAP_HEAD_NAME for the base image).
2497 static ssize_t
rbd_snap_show(struct device
*dev
,
2498 struct device_attribute
*attr
,
2501 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2503 return sprintf(buf
, "%s\n", rbd_dev
->spec
->snap_name
);
2507 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2508 * for the parent image. If there is no parent, simply shows
2509 * "(no parent image)".
2511 static ssize_t
rbd_parent_show(struct device
*dev
,
2512 struct device_attribute
*attr
,
2515 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2516 struct rbd_spec
*spec
= rbd_dev
->parent_spec
;
2521 return sprintf(buf
, "(no parent image)\n");
2523 count
= sprintf(bufp
, "pool_id %llu\npool_name %s\n",
2524 (unsigned long long) spec
->pool_id
, spec
->pool_name
);
2529 count
= sprintf(bufp
, "image_id %s\nimage_name %s\n", spec
->image_id
,
2530 spec
->image_name
? spec
->image_name
: "(unknown)");
2535 count
= sprintf(bufp
, "snap_id %llu\nsnap_name %s\n",
2536 (unsigned long long) spec
->snap_id
, spec
->snap_name
);
2541 count
= sprintf(bufp
, "overlap %llu\n", rbd_dev
->parent_overlap
);
2546 return (ssize_t
) (bufp
- buf
);
2549 static ssize_t
rbd_image_refresh(struct device
*dev
,
2550 struct device_attribute
*attr
,
2554 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2557 ret
= rbd_dev_refresh(rbd_dev
, NULL
);
2559 return ret
< 0 ? ret
: size
;
2562 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
2563 static DEVICE_ATTR(features
, S_IRUGO
, rbd_features_show
, NULL
);
2564 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
2565 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
2566 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
2567 static DEVICE_ATTR(pool_id
, S_IRUGO
, rbd_pool_id_show
, NULL
);
2568 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
2569 static DEVICE_ATTR(image_id
, S_IRUGO
, rbd_image_id_show
, NULL
);
2570 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
2571 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
2572 static DEVICE_ATTR(parent
, S_IRUGO
, rbd_parent_show
, NULL
);
2574 static struct attribute
*rbd_attrs
[] = {
2575 &dev_attr_size
.attr
,
2576 &dev_attr_features
.attr
,
2577 &dev_attr_major
.attr
,
2578 &dev_attr_client_id
.attr
,
2579 &dev_attr_pool
.attr
,
2580 &dev_attr_pool_id
.attr
,
2581 &dev_attr_name
.attr
,
2582 &dev_attr_image_id
.attr
,
2583 &dev_attr_current_snap
.attr
,
2584 &dev_attr_parent
.attr
,
2585 &dev_attr_refresh
.attr
,
2589 static struct attribute_group rbd_attr_group
= {
2593 static const struct attribute_group
*rbd_attr_groups
[] = {
2598 static void rbd_sysfs_dev_release(struct device
*dev
)
2602 static struct device_type rbd_device_type
= {
2604 .groups
= rbd_attr_groups
,
2605 .release
= rbd_sysfs_dev_release
,
2613 static ssize_t
rbd_snap_size_show(struct device
*dev
,
2614 struct device_attribute
*attr
,
2617 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2619 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->size
);
2622 static ssize_t
rbd_snap_id_show(struct device
*dev
,
2623 struct device_attribute
*attr
,
2626 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2628 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->id
);
2631 static ssize_t
rbd_snap_features_show(struct device
*dev
,
2632 struct device_attribute
*attr
,
2635 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2637 return sprintf(buf
, "0x%016llx\n",
2638 (unsigned long long) snap
->features
);
2641 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
2642 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
2643 static DEVICE_ATTR(snap_features
, S_IRUGO
, rbd_snap_features_show
, NULL
);
2645 static struct attribute
*rbd_snap_attrs
[] = {
2646 &dev_attr_snap_size
.attr
,
2647 &dev_attr_snap_id
.attr
,
2648 &dev_attr_snap_features
.attr
,
2652 static struct attribute_group rbd_snap_attr_group
= {
2653 .attrs
= rbd_snap_attrs
,
2656 static void rbd_snap_dev_release(struct device
*dev
)
2658 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2663 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
2664 &rbd_snap_attr_group
,
2668 static struct device_type rbd_snap_device_type
= {
2669 .groups
= rbd_snap_attr_groups
,
2670 .release
= rbd_snap_dev_release
,
2673 static struct rbd_spec
*rbd_spec_get(struct rbd_spec
*spec
)
2675 kref_get(&spec
->kref
);
2680 static void rbd_spec_free(struct kref
*kref
);
2681 static void rbd_spec_put(struct rbd_spec
*spec
)
2684 kref_put(&spec
->kref
, rbd_spec_free
);
2687 static struct rbd_spec
*rbd_spec_alloc(void)
2689 struct rbd_spec
*spec
;
2691 spec
= kzalloc(sizeof (*spec
), GFP_KERNEL
);
2694 kref_init(&spec
->kref
);
2696 rbd_spec_put(rbd_spec_get(spec
)); /* TEMPORARY */
2701 static void rbd_spec_free(struct kref
*kref
)
2703 struct rbd_spec
*spec
= container_of(kref
, struct rbd_spec
, kref
);
2705 kfree(spec
->pool_name
);
2706 kfree(spec
->image_id
);
2707 kfree(spec
->image_name
);
2708 kfree(spec
->snap_name
);
2712 static struct rbd_device
*rbd_dev_create(struct rbd_client
*rbdc
,
2713 struct rbd_spec
*spec
)
2715 struct rbd_device
*rbd_dev
;
2717 rbd_dev
= kzalloc(sizeof (*rbd_dev
), GFP_KERNEL
);
2721 spin_lock_init(&rbd_dev
->lock
);
2723 INIT_LIST_HEAD(&rbd_dev
->node
);
2724 INIT_LIST_HEAD(&rbd_dev
->snaps
);
2725 init_rwsem(&rbd_dev
->header_rwsem
);
2727 rbd_dev
->spec
= spec
;
2728 rbd_dev
->rbd_client
= rbdc
;
2730 /* Initialize the layout used for all rbd requests */
2732 rbd_dev
->layout
.fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
2733 rbd_dev
->layout
.fl_stripe_count
= cpu_to_le32(1);
2734 rbd_dev
->layout
.fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
2735 rbd_dev
->layout
.fl_pg_pool
= cpu_to_le32((u32
) spec
->pool_id
);
2740 static void rbd_dev_destroy(struct rbd_device
*rbd_dev
)
2742 rbd_spec_put(rbd_dev
->parent_spec
);
2743 kfree(rbd_dev
->header_name
);
2744 rbd_put_client(rbd_dev
->rbd_client
);
2745 rbd_spec_put(rbd_dev
->spec
);
2749 static bool rbd_snap_registered(struct rbd_snap
*snap
)
2751 bool ret
= snap
->dev
.type
== &rbd_snap_device_type
;
2752 bool reg
= device_is_registered(&snap
->dev
);
2754 rbd_assert(!ret
^ reg
);
2759 static void rbd_remove_snap_dev(struct rbd_snap
*snap
)
2761 list_del(&snap
->node
);
2762 if (device_is_registered(&snap
->dev
))
2763 device_unregister(&snap
->dev
);
2766 static int rbd_register_snap_dev(struct rbd_snap
*snap
,
2767 struct device
*parent
)
2769 struct device
*dev
= &snap
->dev
;
2772 dev
->type
= &rbd_snap_device_type
;
2773 dev
->parent
= parent
;
2774 dev
->release
= rbd_snap_dev_release
;
2775 dev_set_name(dev
, "%s%s", RBD_SNAP_DEV_NAME_PREFIX
, snap
->name
);
2776 dout("%s: registering device for snapshot %s\n", __func__
, snap
->name
);
2778 ret
= device_register(dev
);
2783 static struct rbd_snap
*__rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
2784 const char *snap_name
,
2785 u64 snap_id
, u64 snap_size
,
2788 struct rbd_snap
*snap
;
2791 snap
= kzalloc(sizeof (*snap
), GFP_KERNEL
);
2793 return ERR_PTR(-ENOMEM
);
2796 snap
->name
= kstrdup(snap_name
, GFP_KERNEL
);
2801 snap
->size
= snap_size
;
2802 snap
->features
= snap_features
;
2810 return ERR_PTR(ret
);
2813 static char *rbd_dev_v1_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
2814 u64
*snap_size
, u64
*snap_features
)
2818 rbd_assert(which
< rbd_dev
->header
.snapc
->num_snaps
);
2820 *snap_size
= rbd_dev
->header
.snap_sizes
[which
];
2821 *snap_features
= 0; /* No features for v1 */
2823 /* Skip over names until we find the one we are looking for */
2825 snap_name
= rbd_dev
->header
.snap_names
;
2827 snap_name
+= strlen(snap_name
) + 1;
2833 * Get the size and object order for an image snapshot, or if
2834 * snap_id is CEPH_NOSNAP, gets this information for the base
2837 static int _rbd_dev_v2_snap_size(struct rbd_device
*rbd_dev
, u64 snap_id
,
2838 u8
*order
, u64
*snap_size
)
2840 __le64 snapid
= cpu_to_le64(snap_id
);
2845 } __attribute__ ((packed
)) size_buf
= { 0 };
2847 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_name
,
2849 (char *) &snapid
, sizeof (snapid
),
2850 (char *) &size_buf
, sizeof (size_buf
), NULL
);
2851 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
2855 *order
= size_buf
.order
;
2856 *snap_size
= le64_to_cpu(size_buf
.size
);
2858 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2859 (unsigned long long) snap_id
, (unsigned int) *order
,
2860 (unsigned long long) *snap_size
);
2865 static int rbd_dev_v2_image_size(struct rbd_device
*rbd_dev
)
2867 return _rbd_dev_v2_snap_size(rbd_dev
, CEPH_NOSNAP
,
2868 &rbd_dev
->header
.obj_order
,
2869 &rbd_dev
->header
.image_size
);
2872 static int rbd_dev_v2_object_prefix(struct rbd_device
*rbd_dev
)
2878 reply_buf
= kzalloc(RBD_OBJ_PREFIX_LEN_MAX
, GFP_KERNEL
);
2882 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_name
,
2883 "rbd", "get_object_prefix",
2885 reply_buf
, RBD_OBJ_PREFIX_LEN_MAX
, NULL
);
2886 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
2891 rbd_dev
->header
.object_prefix
= ceph_extract_encoded_string(&p
,
2892 p
+ RBD_OBJ_PREFIX_LEN_MAX
,
2895 if (IS_ERR(rbd_dev
->header
.object_prefix
)) {
2896 ret
= PTR_ERR(rbd_dev
->header
.object_prefix
);
2897 rbd_dev
->header
.object_prefix
= NULL
;
2899 dout(" object_prefix = %s\n", rbd_dev
->header
.object_prefix
);
2908 static int _rbd_dev_v2_snap_features(struct rbd_device
*rbd_dev
, u64 snap_id
,
2911 __le64 snapid
= cpu_to_le64(snap_id
);
2915 } features_buf
= { 0 };
2919 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_name
,
2920 "rbd", "get_features",
2921 (char *) &snapid
, sizeof (snapid
),
2922 (char *) &features_buf
, sizeof (features_buf
),
2924 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
2928 incompat
= le64_to_cpu(features_buf
.incompat
);
2929 if (incompat
& ~RBD_FEATURES_ALL
)
2932 *snap_features
= le64_to_cpu(features_buf
.features
);
2934 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2935 (unsigned long long) snap_id
,
2936 (unsigned long long) *snap_features
,
2937 (unsigned long long) le64_to_cpu(features_buf
.incompat
));
2942 static int rbd_dev_v2_features(struct rbd_device
*rbd_dev
)
2944 return _rbd_dev_v2_snap_features(rbd_dev
, CEPH_NOSNAP
,
2945 &rbd_dev
->header
.features
);
2948 static int rbd_dev_v2_parent_info(struct rbd_device
*rbd_dev
)
2950 struct rbd_spec
*parent_spec
;
2952 void *reply_buf
= NULL
;
2960 parent_spec
= rbd_spec_alloc();
2964 size
= sizeof (__le64
) + /* pool_id */
2965 sizeof (__le32
) + RBD_IMAGE_ID_LEN_MAX
+ /* image_id */
2966 sizeof (__le64
) + /* snap_id */
2967 sizeof (__le64
); /* overlap */
2968 reply_buf
= kmalloc(size
, GFP_KERNEL
);
2974 snapid
= cpu_to_le64(CEPH_NOSNAP
);
2975 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_name
,
2976 "rbd", "get_parent",
2977 (char *) &snapid
, sizeof (snapid
),
2978 (char *) reply_buf
, size
, NULL
);
2979 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
2985 end
= (char *) reply_buf
+ size
;
2986 ceph_decode_64_safe(&p
, end
, parent_spec
->pool_id
, out_err
);
2987 if (parent_spec
->pool_id
== CEPH_NOPOOL
)
2988 goto out
; /* No parent? No problem. */
2990 /* The ceph file layout needs to fit pool id in 32 bits */
2993 if (WARN_ON(parent_spec
->pool_id
> (u64
) U32_MAX
))
2996 image_id
= ceph_extract_encoded_string(&p
, end
, NULL
, GFP_KERNEL
);
2997 if (IS_ERR(image_id
)) {
2998 ret
= PTR_ERR(image_id
);
3001 parent_spec
->image_id
= image_id
;
3002 ceph_decode_64_safe(&p
, end
, parent_spec
->snap_id
, out_err
);
3003 ceph_decode_64_safe(&p
, end
, overlap
, out_err
);
3005 rbd_dev
->parent_overlap
= overlap
;
3006 rbd_dev
->parent_spec
= parent_spec
;
3007 parent_spec
= NULL
; /* rbd_dev now owns this */
3012 rbd_spec_put(parent_spec
);
3017 static char *rbd_dev_image_name(struct rbd_device
*rbd_dev
)
3019 size_t image_id_size
;
3024 void *reply_buf
= NULL
;
3026 char *image_name
= NULL
;
3029 rbd_assert(!rbd_dev
->spec
->image_name
);
3031 len
= strlen(rbd_dev
->spec
->image_id
);
3032 image_id_size
= sizeof (__le32
) + len
;
3033 image_id
= kmalloc(image_id_size
, GFP_KERNEL
);
3038 end
= (char *) image_id
+ image_id_size
;
3039 ceph_encode_string(&p
, end
, rbd_dev
->spec
->image_id
, (u32
) len
);
3041 size
= sizeof (__le32
) + RBD_IMAGE_NAME_LEN_MAX
;
3042 reply_buf
= kmalloc(size
, GFP_KERNEL
);
3046 ret
= rbd_obj_method_sync(rbd_dev
, RBD_DIRECTORY
,
3047 "rbd", "dir_get_name",
3048 image_id
, image_id_size
,
3049 (char *) reply_buf
, size
, NULL
);
3053 end
= (char *) reply_buf
+ size
;
3054 image_name
= ceph_extract_encoded_string(&p
, end
, &len
, GFP_KERNEL
);
3055 if (IS_ERR(image_name
))
3058 dout("%s: name is %s len is %zd\n", __func__
, image_name
, len
);
3067 * When a parent image gets probed, we only have the pool, image,
3068 * and snapshot ids but not the names of any of them. This call
3069 * is made later to fill in those names. It has to be done after
3070 * rbd_dev_snaps_update() has completed because some of the
3071 * information (in particular, snapshot name) is not available
3074 static int rbd_dev_probe_update_spec(struct rbd_device
*rbd_dev
)
3076 struct ceph_osd_client
*osdc
;
3078 void *reply_buf
= NULL
;
3081 if (rbd_dev
->spec
->pool_name
)
3082 return 0; /* Already have the names */
3084 /* Look up the pool name */
3086 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
3087 name
= ceph_pg_pool_name_by_id(osdc
->osdmap
, rbd_dev
->spec
->pool_id
);
3089 rbd_warn(rbd_dev
, "there is no pool with id %llu",
3090 rbd_dev
->spec
->pool_id
); /* Really a BUG() */
3094 rbd_dev
->spec
->pool_name
= kstrdup(name
, GFP_KERNEL
);
3095 if (!rbd_dev
->spec
->pool_name
)
3098 /* Fetch the image name; tolerate failure here */
3100 name
= rbd_dev_image_name(rbd_dev
);
3102 rbd_dev
->spec
->image_name
= (char *) name
;
3104 rbd_warn(rbd_dev
, "unable to get image name");
3106 /* Look up the snapshot name. */
3108 name
= rbd_snap_name(rbd_dev
, rbd_dev
->spec
->snap_id
);
3110 rbd_warn(rbd_dev
, "no snapshot with id %llu",
3111 rbd_dev
->spec
->snap_id
); /* Really a BUG() */
3115 rbd_dev
->spec
->snap_name
= kstrdup(name
, GFP_KERNEL
);
3116 if(!rbd_dev
->spec
->snap_name
)
3122 kfree(rbd_dev
->spec
->pool_name
);
3123 rbd_dev
->spec
->pool_name
= NULL
;
3128 static int rbd_dev_v2_snap_context(struct rbd_device
*rbd_dev
, u64
*ver
)
3137 struct ceph_snap_context
*snapc
;
3141 * We'll need room for the seq value (maximum snapshot id),
3142 * snapshot count, and array of that many snapshot ids.
3143 * For now we have a fixed upper limit on the number we're
3144 * prepared to receive.
3146 size
= sizeof (__le64
) + sizeof (__le32
) +
3147 RBD_MAX_SNAP_COUNT
* sizeof (__le64
);
3148 reply_buf
= kzalloc(size
, GFP_KERNEL
);
3152 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_name
,
3153 "rbd", "get_snapcontext",
3155 reply_buf
, size
, ver
);
3156 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
3162 end
= (char *) reply_buf
+ size
;
3163 ceph_decode_64_safe(&p
, end
, seq
, out
);
3164 ceph_decode_32_safe(&p
, end
, snap_count
, out
);
3167 * Make sure the reported number of snapshot ids wouldn't go
3168 * beyond the end of our buffer. But before checking that,
3169 * make sure the computed size of the snapshot context we
3170 * allocate is representable in a size_t.
3172 if (snap_count
> (SIZE_MAX
- sizeof (struct ceph_snap_context
))
3177 if (!ceph_has_room(&p
, end
, snap_count
* sizeof (__le64
)))
3180 size
= sizeof (struct ceph_snap_context
) +
3181 snap_count
* sizeof (snapc
->snaps
[0]);
3182 snapc
= kmalloc(size
, GFP_KERNEL
);
3188 atomic_set(&snapc
->nref
, 1);
3190 snapc
->num_snaps
= snap_count
;
3191 for (i
= 0; i
< snap_count
; i
++)
3192 snapc
->snaps
[i
] = ceph_decode_64(&p
);
3194 rbd_dev
->header
.snapc
= snapc
;
3196 dout(" snap context seq = %llu, snap_count = %u\n",
3197 (unsigned long long) seq
, (unsigned int) snap_count
);
3205 static char *rbd_dev_v2_snap_name(struct rbd_device
*rbd_dev
, u32 which
)
3215 size
= sizeof (__le32
) + RBD_MAX_SNAP_NAME_LEN
;
3216 reply_buf
= kmalloc(size
, GFP_KERNEL
);
3218 return ERR_PTR(-ENOMEM
);
3220 snap_id
= cpu_to_le64(rbd_dev
->header
.snapc
->snaps
[which
]);
3221 ret
= rbd_obj_method_sync(rbd_dev
, rbd_dev
->header_name
,
3222 "rbd", "get_snapshot_name",
3223 (char *) &snap_id
, sizeof (snap_id
),
3224 reply_buf
, size
, NULL
);
3225 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
3230 end
= (char *) reply_buf
+ size
;
3231 snap_name
= ceph_extract_encoded_string(&p
, end
, NULL
, GFP_KERNEL
);
3232 if (IS_ERR(snap_name
)) {
3233 ret
= PTR_ERR(snap_name
);
3236 dout(" snap_id 0x%016llx snap_name = %s\n",
3237 (unsigned long long) le64_to_cpu(snap_id
), snap_name
);
3245 return ERR_PTR(ret
);
3248 static char *rbd_dev_v2_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
3249 u64
*snap_size
, u64
*snap_features
)
3255 snap_id
= rbd_dev
->header
.snapc
->snaps
[which
];
3256 ret
= _rbd_dev_v2_snap_size(rbd_dev
, snap_id
, &order
, snap_size
);
3258 return ERR_PTR(ret
);
3259 ret
= _rbd_dev_v2_snap_features(rbd_dev
, snap_id
, snap_features
);
3261 return ERR_PTR(ret
);
3263 return rbd_dev_v2_snap_name(rbd_dev
, which
);
3266 static char *rbd_dev_snap_info(struct rbd_device
*rbd_dev
, u32 which
,
3267 u64
*snap_size
, u64
*snap_features
)
3269 if (rbd_dev
->image_format
== 1)
3270 return rbd_dev_v1_snap_info(rbd_dev
, which
,
3271 snap_size
, snap_features
);
3272 if (rbd_dev
->image_format
== 2)
3273 return rbd_dev_v2_snap_info(rbd_dev
, which
,
3274 snap_size
, snap_features
);
3275 return ERR_PTR(-EINVAL
);
3278 static int rbd_dev_v2_refresh(struct rbd_device
*rbd_dev
, u64
*hver
)
3283 down_write(&rbd_dev
->header_rwsem
);
3285 /* Grab old order first, to see if it changes */
3287 obj_order
= rbd_dev
->header
.obj_order
,
3288 ret
= rbd_dev_v2_image_size(rbd_dev
);
3291 if (rbd_dev
->header
.obj_order
!= obj_order
) {
3295 rbd_update_mapping_size(rbd_dev
);
3297 ret
= rbd_dev_v2_snap_context(rbd_dev
, hver
);
3298 dout("rbd_dev_v2_snap_context returned %d\n", ret
);
3301 ret
= rbd_dev_snaps_update(rbd_dev
);
3302 dout("rbd_dev_snaps_update returned %d\n", ret
);
3305 ret
= rbd_dev_snaps_register(rbd_dev
);
3306 dout("rbd_dev_snaps_register returned %d\n", ret
);
3308 up_write(&rbd_dev
->header_rwsem
);
3314 * Scan the rbd device's current snapshot list and compare it to the
3315 * newly-received snapshot context. Remove any existing snapshots
3316 * not present in the new snapshot context. Add a new snapshot for
3317 * any snaphots in the snapshot context not in the current list.
3318 * And verify there are no changes to snapshots we already know
3321 * Assumes the snapshots in the snapshot context are sorted by
3322 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
3323 * are also maintained in that order.)
3325 static int rbd_dev_snaps_update(struct rbd_device
*rbd_dev
)
3327 struct ceph_snap_context
*snapc
= rbd_dev
->header
.snapc
;
3328 const u32 snap_count
= snapc
->num_snaps
;
3329 struct list_head
*head
= &rbd_dev
->snaps
;
3330 struct list_head
*links
= head
->next
;
3333 dout("%s: snap count is %u\n", __func__
, (unsigned int) snap_count
);
3334 while (index
< snap_count
|| links
!= head
) {
3336 struct rbd_snap
*snap
;
3339 u64 snap_features
= 0;
3341 snap_id
= index
< snap_count
? snapc
->snaps
[index
]
3343 snap
= links
!= head
? list_entry(links
, struct rbd_snap
, node
)
3345 rbd_assert(!snap
|| snap
->id
!= CEPH_NOSNAP
);
3347 if (snap_id
== CEPH_NOSNAP
|| (snap
&& snap
->id
> snap_id
)) {
3348 struct list_head
*next
= links
->next
;
3351 * A previously-existing snapshot is not in
3352 * the new snap context.
3354 * If the now missing snapshot is the one the
3355 * image is mapped to, clear its exists flag
3356 * so we can avoid sending any more requests
3359 if (rbd_dev
->spec
->snap_id
== snap
->id
)
3360 clear_bit(RBD_DEV_FLAG_EXISTS
, &rbd_dev
->flags
);
3361 rbd_remove_snap_dev(snap
);
3362 dout("%ssnap id %llu has been removed\n",
3363 rbd_dev
->spec
->snap_id
== snap
->id
?
3365 (unsigned long long) snap
->id
);
3367 /* Done with this list entry; advance */
3373 snap_name
= rbd_dev_snap_info(rbd_dev
, index
,
3374 &snap_size
, &snap_features
);
3375 if (IS_ERR(snap_name
))
3376 return PTR_ERR(snap_name
);
3378 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count
,
3379 (unsigned long long) snap_id
);
3380 if (!snap
|| (snap_id
!= CEPH_NOSNAP
&& snap
->id
< snap_id
)) {
3381 struct rbd_snap
*new_snap
;
3383 /* We haven't seen this snapshot before */
3385 new_snap
= __rbd_add_snap_dev(rbd_dev
, snap_name
,
3386 snap_id
, snap_size
, snap_features
);
3387 if (IS_ERR(new_snap
)) {
3388 int err
= PTR_ERR(new_snap
);
3390 dout(" failed to add dev, error %d\n", err
);
3395 /* New goes before existing, or at end of list */
3397 dout(" added dev%s\n", snap
? "" : " at end\n");
3399 list_add_tail(&new_snap
->node
, &snap
->node
);
3401 list_add_tail(&new_snap
->node
, head
);
3403 /* Already have this one */
3405 dout(" already present\n");
3407 rbd_assert(snap
->size
== snap_size
);
3408 rbd_assert(!strcmp(snap
->name
, snap_name
));
3409 rbd_assert(snap
->features
== snap_features
);
3411 /* Done with this list entry; advance */
3413 links
= links
->next
;
3416 /* Advance to the next entry in the snapshot context */
3420 dout("%s: done\n", __func__
);
3426 * Scan the list of snapshots and register the devices for any that
3427 * have not already been registered.
3429 static int rbd_dev_snaps_register(struct rbd_device
*rbd_dev
)
3431 struct rbd_snap
*snap
;
3434 dout("%s:\n", __func__
);
3435 if (WARN_ON(!device_is_registered(&rbd_dev
->dev
)))
3438 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
3439 if (!rbd_snap_registered(snap
)) {
3440 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
3445 dout("%s: returning %d\n", __func__
, ret
);
3450 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
3455 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
3457 dev
= &rbd_dev
->dev
;
3458 dev
->bus
= &rbd_bus_type
;
3459 dev
->type
= &rbd_device_type
;
3460 dev
->parent
= &rbd_root_dev
;
3461 dev
->release
= rbd_dev_release
;
3462 dev_set_name(dev
, "%d", rbd_dev
->dev_id
);
3463 ret
= device_register(dev
);
3465 mutex_unlock(&ctl_mutex
);
3470 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
3472 device_unregister(&rbd_dev
->dev
);
3475 static atomic64_t rbd_dev_id_max
= ATOMIC64_INIT(0);
3478 * Get a unique rbd identifier for the given new rbd_dev, and add
3479 * the rbd_dev to the global list. The minimum rbd id is 1.
3481 static void rbd_dev_id_get(struct rbd_device
*rbd_dev
)
3483 rbd_dev
->dev_id
= atomic64_inc_return(&rbd_dev_id_max
);
3485 spin_lock(&rbd_dev_list_lock
);
3486 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
3487 spin_unlock(&rbd_dev_list_lock
);
3488 dout("rbd_dev %p given dev id %llu\n", rbd_dev
,
3489 (unsigned long long) rbd_dev
->dev_id
);
3493 * Remove an rbd_dev from the global list, and record that its
3494 * identifier is no longer in use.
3496 static void rbd_dev_id_put(struct rbd_device
*rbd_dev
)
3498 struct list_head
*tmp
;
3499 int rbd_id
= rbd_dev
->dev_id
;
3502 rbd_assert(rbd_id
> 0);
3504 dout("rbd_dev %p released dev id %llu\n", rbd_dev
,
3505 (unsigned long long) rbd_dev
->dev_id
);
3506 spin_lock(&rbd_dev_list_lock
);
3507 list_del_init(&rbd_dev
->node
);
3510 * If the id being "put" is not the current maximum, there
3511 * is nothing special we need to do.
3513 if (rbd_id
!= atomic64_read(&rbd_dev_id_max
)) {
3514 spin_unlock(&rbd_dev_list_lock
);
3519 * We need to update the current maximum id. Search the
3520 * list to find out what it is. We're more likely to find
3521 * the maximum at the end, so search the list backward.
3524 list_for_each_prev(tmp
, &rbd_dev_list
) {
3525 struct rbd_device
*rbd_dev
;
3527 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
3528 if (rbd_dev
->dev_id
> max_id
)
3529 max_id
= rbd_dev
->dev_id
;
3531 spin_unlock(&rbd_dev_list_lock
);
3534 * The max id could have been updated by rbd_dev_id_get(), in
3535 * which case it now accurately reflects the new maximum.
3536 * Be careful not to overwrite the maximum value in that
3539 atomic64_cmpxchg(&rbd_dev_id_max
, rbd_id
, max_id
);
3540 dout(" max dev id has been reset\n");
3544 * Skips over white space at *buf, and updates *buf to point to the
3545 * first found non-space character (if any). Returns the length of
3546 * the token (string of non-white space characters) found. Note
3547 * that *buf must be terminated with '\0'.
3549 static inline size_t next_token(const char **buf
)
3552 * These are the characters that produce nonzero for
3553 * isspace() in the "C" and "POSIX" locales.
3555 const char *spaces
= " \f\n\r\t\v";
3557 *buf
+= strspn(*buf
, spaces
); /* Find start of token */
3559 return strcspn(*buf
, spaces
); /* Return token length */
3563 * Finds the next token in *buf, and if the provided token buffer is
3564 * big enough, copies the found token into it. The result, if
3565 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3566 * must be terminated with '\0' on entry.
3568 * Returns the length of the token found (not including the '\0').
3569 * Return value will be 0 if no token is found, and it will be >=
3570 * token_size if the token would not fit.
3572 * The *buf pointer will be updated to point beyond the end of the
3573 * found token. Note that this occurs even if the token buffer is
3574 * too small to hold it.
3576 static inline size_t copy_token(const char **buf
,
3582 len
= next_token(buf
);
3583 if (len
< token_size
) {
3584 memcpy(token
, *buf
, len
);
3585 *(token
+ len
) = '\0';
3593 * Finds the next token in *buf, dynamically allocates a buffer big
3594 * enough to hold a copy of it, and copies the token into the new
3595 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3596 * that a duplicate buffer is created even for a zero-length token.
3598 * Returns a pointer to the newly-allocated duplicate, or a null
3599 * pointer if memory for the duplicate was not available. If
3600 * the lenp argument is a non-null pointer, the length of the token
3601 * (not including the '\0') is returned in *lenp.
3603 * If successful, the *buf pointer will be updated to point beyond
3604 * the end of the found token.
3606 * Note: uses GFP_KERNEL for allocation.
3608 static inline char *dup_token(const char **buf
, size_t *lenp
)
3613 len
= next_token(buf
);
3614 dup
= kmemdup(*buf
, len
+ 1, GFP_KERNEL
);
3617 *(dup
+ len
) = '\0';
3627 * Parse the options provided for an "rbd add" (i.e., rbd image
3628 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3629 * and the data written is passed here via a NUL-terminated buffer.
3630 * Returns 0 if successful or an error code otherwise.
3632 * The information extracted from these options is recorded in
3633 * the other parameters which return dynamically-allocated
3636 * The address of a pointer that will refer to a ceph options
3637 * structure. Caller must release the returned pointer using
3638 * ceph_destroy_options() when it is no longer needed.
3640 * Address of an rbd options pointer. Fully initialized by
3641 * this function; caller must release with kfree().
3643 * Address of an rbd image specification pointer. Fully
3644 * initialized by this function based on parsed options.
3645 * Caller must release with rbd_spec_put().
3647 * The options passed take this form:
3648 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3651 * A comma-separated list of one or more monitor addresses.
3652 * A monitor address is an ip address, optionally followed
3653 * by a port number (separated by a colon).
3654 * I.e.: ip1[:port1][,ip2[:port2]...]
3656 * A comma-separated list of ceph and/or rbd options.
3658 * The name of the rados pool containing the rbd image.
3660 * The name of the image in that pool to map.
3662 * An optional snapshot id. If provided, the mapping will
3663 * present data from the image at the time that snapshot was
3664 * created. The image head is used if no snapshot id is
3665 * provided. Snapshot mappings are always read-only.
3667 static int rbd_add_parse_args(const char *buf
,
3668 struct ceph_options
**ceph_opts
,
3669 struct rbd_options
**opts
,
3670 struct rbd_spec
**rbd_spec
)
3674 const char *mon_addrs
;
3675 size_t mon_addrs_size
;
3676 struct rbd_spec
*spec
= NULL
;
3677 struct rbd_options
*rbd_opts
= NULL
;
3678 struct ceph_options
*copts
;
3681 /* The first four tokens are required */
3683 len
= next_token(&buf
);
3685 rbd_warn(NULL
, "no monitor address(es) provided");
3689 mon_addrs_size
= len
+ 1;
3693 options
= dup_token(&buf
, NULL
);
3697 rbd_warn(NULL
, "no options provided");
3701 spec
= rbd_spec_alloc();
3705 spec
->pool_name
= dup_token(&buf
, NULL
);
3706 if (!spec
->pool_name
)
3708 if (!*spec
->pool_name
) {
3709 rbd_warn(NULL
, "no pool name provided");
3713 spec
->image_name
= dup_token(&buf
, NULL
);
3714 if (!spec
->image_name
)
3716 if (!*spec
->image_name
) {
3717 rbd_warn(NULL
, "no image name provided");
3722 * Snapshot name is optional; default is to use "-"
3723 * (indicating the head/no snapshot).
3725 len
= next_token(&buf
);
3727 buf
= RBD_SNAP_HEAD_NAME
; /* No snapshot supplied */
3728 len
= sizeof (RBD_SNAP_HEAD_NAME
) - 1;
3729 } else if (len
> RBD_MAX_SNAP_NAME_LEN
) {
3730 ret
= -ENAMETOOLONG
;
3733 spec
->snap_name
= kmemdup(buf
, len
+ 1, GFP_KERNEL
);
3734 if (!spec
->snap_name
)
3736 *(spec
->snap_name
+ len
) = '\0';
3738 /* Initialize all rbd options to the defaults */
3740 rbd_opts
= kzalloc(sizeof (*rbd_opts
), GFP_KERNEL
);
3744 rbd_opts
->read_only
= RBD_READ_ONLY_DEFAULT
;
3746 copts
= ceph_parse_options(options
, mon_addrs
,
3747 mon_addrs
+ mon_addrs_size
- 1,
3748 parse_rbd_opts_token
, rbd_opts
);
3749 if (IS_ERR(copts
)) {
3750 ret
= PTR_ERR(copts
);
3771 * An rbd format 2 image has a unique identifier, distinct from the
3772 * name given to it by the user. Internally, that identifier is
3773 * what's used to specify the names of objects related to the image.
3775 * A special "rbd id" object is used to map an rbd image name to its
3776 * id. If that object doesn't exist, then there is no v2 rbd image
3777 * with the supplied name.
3779 * This function will record the given rbd_dev's image_id field if
3780 * it can be determined, and in that case will return 0. If any
3781 * errors occur a negative errno will be returned and the rbd_dev's
3782 * image_id field will be unchanged (and should be NULL).
3784 static int rbd_dev_image_id(struct rbd_device
*rbd_dev
)
3793 * When probing a parent image, the image id is already
3794 * known (and the image name likely is not). There's no
3795 * need to fetch the image id again in this case.
3797 if (rbd_dev
->spec
->image_id
)
3801 * First, see if the format 2 image id file exists, and if
3802 * so, get the image's persistent id from it.
3804 size
= sizeof (RBD_ID_PREFIX
) + strlen(rbd_dev
->spec
->image_name
);
3805 object_name
= kmalloc(size
, GFP_NOIO
);
3808 sprintf(object_name
, "%s%s", RBD_ID_PREFIX
, rbd_dev
->spec
->image_name
);
3809 dout("rbd id object name is %s\n", object_name
);
3811 /* Response will be an encoded string, which includes a length */
3813 size
= sizeof (__le32
) + RBD_IMAGE_ID_LEN_MAX
;
3814 response
= kzalloc(size
, GFP_NOIO
);
3820 ret
= rbd_obj_method_sync(rbd_dev
, object_name
,
3823 response
, RBD_IMAGE_ID_LEN_MAX
, NULL
);
3824 dout("%s: rbd_obj_method_sync returned %d\n", __func__
, ret
);
3829 rbd_dev
->spec
->image_id
= ceph_extract_encoded_string(&p
,
3830 p
+ RBD_IMAGE_ID_LEN_MAX
,
3832 if (IS_ERR(rbd_dev
->spec
->image_id
)) {
3833 ret
= PTR_ERR(rbd_dev
->spec
->image_id
);
3834 rbd_dev
->spec
->image_id
= NULL
;
3836 dout("image_id is %s\n", rbd_dev
->spec
->image_id
);
3845 static int rbd_dev_v1_probe(struct rbd_device
*rbd_dev
)
3850 /* Version 1 images have no id; empty string is used */
3852 rbd_dev
->spec
->image_id
= kstrdup("", GFP_KERNEL
);
3853 if (!rbd_dev
->spec
->image_id
)
3856 /* Record the header object name for this rbd image. */
3858 size
= strlen(rbd_dev
->spec
->image_name
) + sizeof (RBD_SUFFIX
);
3859 rbd_dev
->header_name
= kmalloc(size
, GFP_KERNEL
);
3860 if (!rbd_dev
->header_name
) {
3864 sprintf(rbd_dev
->header_name
, "%s%s",
3865 rbd_dev
->spec
->image_name
, RBD_SUFFIX
);
3867 /* Populate rbd image metadata */
3869 ret
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
3873 /* Version 1 images have no parent (no layering) */
3875 rbd_dev
->parent_spec
= NULL
;
3876 rbd_dev
->parent_overlap
= 0;
3878 rbd_dev
->image_format
= 1;
3880 dout("discovered version 1 image, header name is %s\n",
3881 rbd_dev
->header_name
);
3886 kfree(rbd_dev
->header_name
);
3887 rbd_dev
->header_name
= NULL
;
3888 kfree(rbd_dev
->spec
->image_id
);
3889 rbd_dev
->spec
->image_id
= NULL
;
3894 static int rbd_dev_v2_probe(struct rbd_device
*rbd_dev
)
3901 * Image id was filled in by the caller. Record the header
3902 * object name for this rbd image.
3904 size
= sizeof (RBD_HEADER_PREFIX
) + strlen(rbd_dev
->spec
->image_id
);
3905 rbd_dev
->header_name
= kmalloc(size
, GFP_KERNEL
);
3906 if (!rbd_dev
->header_name
)
3908 sprintf(rbd_dev
->header_name
, "%s%s",
3909 RBD_HEADER_PREFIX
, rbd_dev
->spec
->image_id
);
3911 /* Get the size and object order for the image */
3913 ret
= rbd_dev_v2_image_size(rbd_dev
);
3917 /* Get the object prefix (a.k.a. block_name) for the image */
3919 ret
= rbd_dev_v2_object_prefix(rbd_dev
);
3923 /* Get the and check features for the image */
3925 ret
= rbd_dev_v2_features(rbd_dev
);
3929 /* If the image supports layering, get the parent info */
3931 if (rbd_dev
->header
.features
& RBD_FEATURE_LAYERING
) {
3932 ret
= rbd_dev_v2_parent_info(rbd_dev
);
3937 /* crypto and compression type aren't (yet) supported for v2 images */
3939 rbd_dev
->header
.crypt_type
= 0;
3940 rbd_dev
->header
.comp_type
= 0;
3942 /* Get the snapshot context, plus the header version */
3944 ret
= rbd_dev_v2_snap_context(rbd_dev
, &ver
);
3947 rbd_dev
->header
.obj_version
= ver
;
3949 rbd_dev
->image_format
= 2;
3951 dout("discovered version 2 image, header name is %s\n",
3952 rbd_dev
->header_name
);
3956 rbd_dev
->parent_overlap
= 0;
3957 rbd_spec_put(rbd_dev
->parent_spec
);
3958 rbd_dev
->parent_spec
= NULL
;
3959 kfree(rbd_dev
->header_name
);
3960 rbd_dev
->header_name
= NULL
;
3961 kfree(rbd_dev
->header
.object_prefix
);
3962 rbd_dev
->header
.object_prefix
= NULL
;
3967 static int rbd_dev_probe_finish(struct rbd_device
*rbd_dev
)
3971 /* no need to lock here, as rbd_dev is not registered yet */
3972 ret
= rbd_dev_snaps_update(rbd_dev
);
3976 ret
= rbd_dev_probe_update_spec(rbd_dev
);
3980 ret
= rbd_dev_set_mapping(rbd_dev
);
3984 /* generate unique id: find highest unique id, add one */
3985 rbd_dev_id_get(rbd_dev
);
3987 /* Fill in the device name, now that we have its id. */
3988 BUILD_BUG_ON(DEV_NAME_LEN
3989 < sizeof (RBD_DRV_NAME
) + MAX_INT_FORMAT_WIDTH
);
3990 sprintf(rbd_dev
->name
, "%s%d", RBD_DRV_NAME
, rbd_dev
->dev_id
);
3992 /* Get our block major device number. */
3994 ret
= register_blkdev(0, rbd_dev
->name
);
3997 rbd_dev
->major
= ret
;
3999 /* Set up the blkdev mapping. */
4001 ret
= rbd_init_disk(rbd_dev
);
4003 goto err_out_blkdev
;
4005 ret
= rbd_bus_add_dev(rbd_dev
);
4010 * At this point cleanup in the event of an error is the job
4011 * of the sysfs code (initiated by rbd_bus_del_dev()).
4013 down_write(&rbd_dev
->header_rwsem
);
4014 ret
= rbd_dev_snaps_register(rbd_dev
);
4015 up_write(&rbd_dev
->header_rwsem
);
4019 ret
= rbd_dev_header_watch_sync(rbd_dev
, 1);
4023 /* Everything's ready. Announce the disk to the world. */
4025 add_disk(rbd_dev
->disk
);
4027 pr_info("%s: added with size 0x%llx\n", rbd_dev
->disk
->disk_name
,
4028 (unsigned long long) rbd_dev
->mapping
.size
);
4032 /* this will also clean up rest of rbd_dev stuff */
4034 rbd_bus_del_dev(rbd_dev
);
4038 rbd_free_disk(rbd_dev
);
4040 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
4042 rbd_dev_id_put(rbd_dev
);
4044 rbd_remove_all_snaps(rbd_dev
);
4050 * Probe for the existence of the header object for the given rbd
4051 * device. For format 2 images this includes determining the image
4054 static int rbd_dev_probe(struct rbd_device
*rbd_dev
)
4059 * Get the id from the image id object. If it's not a
4060 * format 2 image, we'll get ENOENT back, and we'll assume
4061 * it's a format 1 image.
4063 ret
= rbd_dev_image_id(rbd_dev
);
4065 ret
= rbd_dev_v1_probe(rbd_dev
);
4067 ret
= rbd_dev_v2_probe(rbd_dev
);
4069 dout("probe failed, returning %d\n", ret
);
4074 ret
= rbd_dev_probe_finish(rbd_dev
);
4076 rbd_header_free(&rbd_dev
->header
);
4081 static ssize_t
rbd_add(struct bus_type
*bus
,
4085 struct rbd_device
*rbd_dev
= NULL
;
4086 struct ceph_options
*ceph_opts
= NULL
;
4087 struct rbd_options
*rbd_opts
= NULL
;
4088 struct rbd_spec
*spec
= NULL
;
4089 struct rbd_client
*rbdc
;
4090 struct ceph_osd_client
*osdc
;
4093 if (!try_module_get(THIS_MODULE
))
4096 /* parse add command */
4097 rc
= rbd_add_parse_args(buf
, &ceph_opts
, &rbd_opts
, &spec
);
4099 goto err_out_module
;
4101 rbdc
= rbd_get_client(ceph_opts
);
4106 ceph_opts
= NULL
; /* rbd_dev client now owns this */
4109 osdc
= &rbdc
->client
->osdc
;
4110 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, spec
->pool_name
);
4112 goto err_out_client
;
4113 spec
->pool_id
= (u64
) rc
;
4115 /* The ceph file layout needs to fit pool id in 32 bits */
4117 if (WARN_ON(spec
->pool_id
> (u64
) U32_MAX
)) {
4119 goto err_out_client
;
4122 rbd_dev
= rbd_dev_create(rbdc
, spec
);
4124 goto err_out_client
;
4125 rbdc
= NULL
; /* rbd_dev now owns this */
4126 spec
= NULL
; /* rbd_dev now owns this */
4128 rbd_dev
->mapping
.read_only
= rbd_opts
->read_only
;
4130 rbd_opts
= NULL
; /* done with this */
4132 rc
= rbd_dev_probe(rbd_dev
);
4134 goto err_out_rbd_dev
;
4138 rbd_dev_destroy(rbd_dev
);
4140 rbd_put_client(rbdc
);
4143 ceph_destroy_options(ceph_opts
);
4147 module_put(THIS_MODULE
);
4149 dout("Error adding device %s\n", buf
);
4151 return (ssize_t
) rc
;
4154 static struct rbd_device
*__rbd_get_dev(unsigned long dev_id
)
4156 struct list_head
*tmp
;
4157 struct rbd_device
*rbd_dev
;
4159 spin_lock(&rbd_dev_list_lock
);
4160 list_for_each(tmp
, &rbd_dev_list
) {
4161 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
4162 if (rbd_dev
->dev_id
== dev_id
) {
4163 spin_unlock(&rbd_dev_list_lock
);
4167 spin_unlock(&rbd_dev_list_lock
);
4171 static void rbd_dev_release(struct device
*dev
)
4173 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
4175 if (rbd_dev
->watch_event
)
4176 rbd_dev_header_watch_sync(rbd_dev
, 0);
4178 /* clean up and free blkdev */
4179 rbd_free_disk(rbd_dev
);
4180 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
4182 /* release allocated disk header fields */
4183 rbd_header_free(&rbd_dev
->header
);
4185 /* done with the id, and with the rbd_dev */
4186 rbd_dev_id_put(rbd_dev
);
4187 rbd_assert(rbd_dev
->rbd_client
!= NULL
);
4188 rbd_dev_destroy(rbd_dev
);
4190 /* release module ref */
4191 module_put(THIS_MODULE
);
4194 static ssize_t
rbd_remove(struct bus_type
*bus
,
4198 struct rbd_device
*rbd_dev
= NULL
;
4203 rc
= strict_strtoul(buf
, 10, &ul
);
4207 /* convert to int; abort if we lost anything in the conversion */
4208 target_id
= (int) ul
;
4209 if (target_id
!= ul
)
4212 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
4214 rbd_dev
= __rbd_get_dev(target_id
);
4220 spin_lock_irq(&rbd_dev
->lock
);
4221 if (rbd_dev
->open_count
)
4224 set_bit(RBD_DEV_FLAG_REMOVING
, &rbd_dev
->flags
);
4225 spin_unlock_irq(&rbd_dev
->lock
);
4229 rbd_remove_all_snaps(rbd_dev
);
4230 rbd_bus_del_dev(rbd_dev
);
4233 mutex_unlock(&ctl_mutex
);
4239 * create control files in sysfs
4242 static int rbd_sysfs_init(void)
4246 ret
= device_register(&rbd_root_dev
);
4250 ret
= bus_register(&rbd_bus_type
);
4252 device_unregister(&rbd_root_dev
);
4257 static void rbd_sysfs_cleanup(void)
4259 bus_unregister(&rbd_bus_type
);
4260 device_unregister(&rbd_root_dev
);
4263 static int __init
rbd_init(void)
4267 if (!libceph_compatible(NULL
)) {
4268 rbd_warn(NULL
, "libceph incompatibility (quitting)");
4272 rc
= rbd_sysfs_init();
4275 pr_info("loaded " RBD_DRV_NAME_LONG
"\n");
4279 static void __exit
rbd_exit(void)
4281 rbd_sysfs_cleanup();
4284 module_init(rbd_init
);
4285 module_exit(rbd_exit
);
4287 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
4288 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
4289 MODULE_DESCRIPTION("rados block device");
4291 /* following authorship retained from original osdblk.c */
4292 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
4294 MODULE_LICENSE("GPL");