2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_MAX_SNAP_NAME_LEN 32
65 #define RBD_MAX_OPT_LEN 1024
67 #define RBD_SNAP_HEAD_NAME "-"
70 * An RBD device name will be "rbd#", where the "rbd" comes from
71 * RBD_DRV_NAME above, and # is a unique integer identifier.
72 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
73 * enough to hold all possible device names.
75 #define DEV_NAME_LEN 32
76 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
78 #define RBD_READ_ONLY_DEFAULT false
81 * block device image metadata (in-memory version)
83 struct rbd_image_header
{
84 /* These four fields never change for a given rbd image */
90 /* The remaining fields need to be updated occasionally */
92 struct ceph_snap_context
*snapc
;
104 * an instance of the client. multiple devices may share an rbd client.
107 struct ceph_client
*client
;
109 struct list_head node
;
113 * a request completion status
115 struct rbd_req_status
{
122 * a collection of requests
124 struct rbd_req_coll
{
128 struct rbd_req_status status
[0];
132 * a single io request
135 struct request
*rq
; /* blk layer request */
136 struct bio
*bio
; /* cloned bio */
137 struct page
**pages
; /* list of used pages */
140 struct rbd_req_coll
*coll
;
147 struct list_head node
;
163 int dev_id
; /* blkdev unique id */
165 int major
; /* blkdev assigned major */
166 struct gendisk
*disk
; /* blkdev's gendisk and rq */
168 struct rbd_options rbd_opts
;
169 struct rbd_client
*rbd_client
;
171 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
173 spinlock_t lock
; /* queue lock */
175 struct rbd_image_header header
;
177 size_t image_name_len
;
182 struct ceph_osd_event
*watch_event
;
183 struct ceph_osd_request
*watch_request
;
185 /* protects updating the header */
186 struct rw_semaphore header_rwsem
;
188 struct rbd_mapping mapping
;
190 struct list_head node
;
192 /* list of snapshots */
193 struct list_head snaps
;
199 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
201 static LIST_HEAD(rbd_dev_list
); /* devices */
202 static DEFINE_SPINLOCK(rbd_dev_list_lock
);
204 static LIST_HEAD(rbd_client_list
); /* clients */
205 static DEFINE_SPINLOCK(rbd_client_list_lock
);
207 static int rbd_dev_snap_devs_update(struct rbd_device
*rbd_dev
);
208 static void rbd_dev_release(struct device
*dev
);
209 static ssize_t
rbd_snap_add(struct device
*dev
,
210 struct device_attribute
*attr
,
213 static void __rbd_remove_snap_dev(struct rbd_snap
*snap
);
215 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
,
217 static ssize_t
rbd_remove(struct bus_type
*bus
, const char *buf
,
220 static struct bus_attribute rbd_bus_attrs
[] = {
221 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
222 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
226 static struct bus_type rbd_bus_type
= {
228 .bus_attrs
= rbd_bus_attrs
,
231 static void rbd_root_dev_release(struct device
*dev
)
235 static struct device rbd_root_dev
= {
237 .release
= rbd_root_dev_release
,
241 #define rbd_assert(expr) \
242 if (unlikely(!(expr))) { \
243 printk(KERN_ERR "\nAssertion failure in %s() " \
245 "\trbd_assert(%s);\n\n", \
246 __func__, __LINE__, #expr); \
249 #else /* !RBD_DEBUG */
250 # define rbd_assert(expr) ((void) 0)
251 #endif /* !RBD_DEBUG */
253 static struct device
*rbd_get_dev(struct rbd_device
*rbd_dev
)
255 return get_device(&rbd_dev
->dev
);
258 static void rbd_put_dev(struct rbd_device
*rbd_dev
)
260 put_device(&rbd_dev
->dev
);
263 static int rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
);
265 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
267 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
269 if ((mode
& FMODE_WRITE
) && rbd_dev
->mapping
.read_only
)
272 rbd_get_dev(rbd_dev
);
273 set_device_ro(bdev
, rbd_dev
->mapping
.read_only
);
278 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
280 struct rbd_device
*rbd_dev
= disk
->private_data
;
282 rbd_put_dev(rbd_dev
);
287 static const struct block_device_operations rbd_bd_ops
= {
288 .owner
= THIS_MODULE
,
290 .release
= rbd_release
,
294 * Initialize an rbd client instance.
297 static struct rbd_client
*rbd_client_create(struct ceph_options
*ceph_opts
)
299 struct rbd_client
*rbdc
;
302 dout("rbd_client_create\n");
303 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
307 kref_init(&rbdc
->kref
);
308 INIT_LIST_HEAD(&rbdc
->node
);
310 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
312 rbdc
->client
= ceph_create_client(ceph_opts
, rbdc
, 0, 0);
313 if (IS_ERR(rbdc
->client
))
315 ceph_opts
= NULL
; /* Now rbdc->client is responsible for ceph_opts */
317 ret
= ceph_open_session(rbdc
->client
);
321 spin_lock(&rbd_client_list_lock
);
322 list_add_tail(&rbdc
->node
, &rbd_client_list
);
323 spin_unlock(&rbd_client_list_lock
);
325 mutex_unlock(&ctl_mutex
);
327 dout("rbd_client_create created %p\n", rbdc
);
331 ceph_destroy_client(rbdc
->client
);
333 mutex_unlock(&ctl_mutex
);
337 ceph_destroy_options(ceph_opts
);
342 * Find a ceph client with specific addr and configuration. If
343 * found, bump its reference count.
345 static struct rbd_client
*rbd_client_find(struct ceph_options
*ceph_opts
)
347 struct rbd_client
*client_node
;
350 if (ceph_opts
->flags
& CEPH_OPT_NOSHARE
)
353 spin_lock(&rbd_client_list_lock
);
354 list_for_each_entry(client_node
, &rbd_client_list
, node
) {
355 if (!ceph_compare_options(ceph_opts
, client_node
->client
)) {
356 kref_get(&client_node
->kref
);
361 spin_unlock(&rbd_client_list_lock
);
363 return found
? client_node
: NULL
;
373 /* string args above */
376 /* Boolean args above */
380 static match_table_t rbd_opts_tokens
= {
382 /* string args above */
383 {Opt_read_only
, "mapping.read_only"},
384 {Opt_read_only
, "ro"}, /* Alternate spelling */
385 {Opt_read_write
, "read_write"},
386 {Opt_read_write
, "rw"}, /* Alternate spelling */
387 /* Boolean args above */
391 static int parse_rbd_opts_token(char *c
, void *private)
393 struct rbd_options
*rbd_opts
= private;
394 substring_t argstr
[MAX_OPT_ARGS
];
395 int token
, intval
, ret
;
397 token
= match_token(c
, rbd_opts_tokens
, argstr
);
401 if (token
< Opt_last_int
) {
402 ret
= match_int(&argstr
[0], &intval
);
404 pr_err("bad mount option arg (not int) "
408 dout("got int token %d val %d\n", token
, intval
);
409 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
410 dout("got string token %d val %s\n", token
,
412 } else if (token
> Opt_last_string
&& token
< Opt_last_bool
) {
413 dout("got Boolean token %d\n", token
);
415 dout("got token %d\n", token
);
420 rbd_opts
->read_only
= true;
423 rbd_opts
->read_only
= false;
433 * Get a ceph client with specific addr and configuration, if one does
434 * not exist create it.
436 static int rbd_get_client(struct rbd_device
*rbd_dev
, const char *mon_addr
,
437 size_t mon_addr_len
, char *options
)
439 struct rbd_options
*rbd_opts
= &rbd_dev
->rbd_opts
;
440 struct ceph_options
*ceph_opts
;
441 struct rbd_client
*rbdc
;
443 rbd_opts
->read_only
= RBD_READ_ONLY_DEFAULT
;
445 ceph_opts
= ceph_parse_options(options
, mon_addr
,
446 mon_addr
+ mon_addr_len
,
447 parse_rbd_opts_token
, rbd_opts
);
448 if (IS_ERR(ceph_opts
))
449 return PTR_ERR(ceph_opts
);
451 rbdc
= rbd_client_find(ceph_opts
);
453 /* using an existing client */
454 ceph_destroy_options(ceph_opts
);
456 rbdc
= rbd_client_create(ceph_opts
);
458 return PTR_ERR(rbdc
);
460 rbd_dev
->rbd_client
= rbdc
;
466 * Destroy ceph client
468 * Caller must hold rbd_client_list_lock.
470 static void rbd_client_release(struct kref
*kref
)
472 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
474 dout("rbd_release_client %p\n", rbdc
);
475 spin_lock(&rbd_client_list_lock
);
476 list_del(&rbdc
->node
);
477 spin_unlock(&rbd_client_list_lock
);
479 ceph_destroy_client(rbdc
->client
);
484 * Drop reference to ceph client node. If it's not referenced anymore, release
487 static void rbd_put_client(struct rbd_device
*rbd_dev
)
489 kref_put(&rbd_dev
->rbd_client
->kref
, rbd_client_release
);
490 rbd_dev
->rbd_client
= NULL
;
494 * Destroy requests collection
496 static void rbd_coll_release(struct kref
*kref
)
498 struct rbd_req_coll
*coll
=
499 container_of(kref
, struct rbd_req_coll
, kref
);
501 dout("rbd_coll_release %p\n", coll
);
505 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk
*ondisk
)
510 /* The header has to start with the magic rbd header text */
511 if (memcmp(&ondisk
->text
, RBD_HEADER_TEXT
, sizeof (RBD_HEADER_TEXT
)))
515 * The size of a snapshot header has to fit in a size_t, and
516 * that limits the number of snapshots.
518 snap_count
= le32_to_cpu(ondisk
->snap_count
);
519 size
= SIZE_MAX
- sizeof (struct ceph_snap_context
);
520 if (snap_count
> size
/ sizeof (__le64
))
524 * Not only that, but the size of the entire the snapshot
525 * header must also be representable in a size_t.
527 size
-= snap_count
* sizeof (__le64
);
528 if ((u64
) size
< le64_to_cpu(ondisk
->snap_names_len
))
535 * Create a new header structure, translate header format from the on-disk
538 static int rbd_header_from_disk(struct rbd_image_header
*header
,
539 struct rbd_image_header_ondisk
*ondisk
)
546 memset(header
, 0, sizeof (*header
));
548 snap_count
= le32_to_cpu(ondisk
->snap_count
);
550 len
= strnlen(ondisk
->object_prefix
, sizeof (ondisk
->object_prefix
));
551 header
->object_prefix
= kmalloc(len
+ 1, GFP_KERNEL
);
552 if (!header
->object_prefix
)
554 memcpy(header
->object_prefix
, ondisk
->object_prefix
, len
);
555 header
->object_prefix
[len
] = '\0';
558 u64 snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
560 /* Save a copy of the snapshot names */
562 if (snap_names_len
> (u64
) SIZE_MAX
)
564 header
->snap_names
= kmalloc(snap_names_len
, GFP_KERNEL
);
565 if (!header
->snap_names
)
568 * Note that rbd_dev_v1_header_read() guarantees
569 * the ondisk buffer we're working with has
570 * snap_names_len bytes beyond the end of the
571 * snapshot id array, this memcpy() is safe.
573 memcpy(header
->snap_names
, &ondisk
->snaps
[snap_count
],
576 /* Record each snapshot's size */
578 size
= snap_count
* sizeof (*header
->snap_sizes
);
579 header
->snap_sizes
= kmalloc(size
, GFP_KERNEL
);
580 if (!header
->snap_sizes
)
582 for (i
= 0; i
< snap_count
; i
++)
583 header
->snap_sizes
[i
] =
584 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
586 WARN_ON(ondisk
->snap_names_len
);
587 header
->snap_names
= NULL
;
588 header
->snap_sizes
= NULL
;
591 header
->obj_order
= ondisk
->options
.order
;
592 header
->crypt_type
= ondisk
->options
.crypt_type
;
593 header
->comp_type
= ondisk
->options
.comp_type
;
595 /* Allocate and fill in the snapshot context */
597 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
598 size
= sizeof (struct ceph_snap_context
);
599 size
+= snap_count
* sizeof (header
->snapc
->snaps
[0]);
600 header
->snapc
= kzalloc(size
, GFP_KERNEL
);
604 atomic_set(&header
->snapc
->nref
, 1);
605 header
->snapc
->seq
= le64_to_cpu(ondisk
->snap_seq
);
606 header
->snapc
->num_snaps
= snap_count
;
607 for (i
= 0; i
< snap_count
; i
++)
608 header
->snapc
->snaps
[i
] =
609 le64_to_cpu(ondisk
->snaps
[i
].id
);
614 kfree(header
->snap_sizes
);
615 header
->snap_sizes
= NULL
;
616 kfree(header
->snap_names
);
617 header
->snap_names
= NULL
;
618 kfree(header
->object_prefix
);
619 header
->object_prefix
= NULL
;
624 static int snap_by_name(struct rbd_image_header
*header
, const char *snap_name
,
628 char *p
= header
->snap_names
;
630 rbd_assert(header
->snapc
!= NULL
);
631 for (i
= 0; i
< header
->snapc
->num_snaps
; i
++) {
632 if (!strcmp(snap_name
, p
)) {
634 /* Found it. Pass back its id and/or size */
637 *seq
= header
->snapc
->snaps
[i
];
639 *size
= header
->snap_sizes
[i
];
642 p
+= strlen(p
) + 1; /* Skip ahead to the next name */
647 static int rbd_header_set_snap(struct rbd_device
*rbd_dev
, char *snap_name
)
651 down_write(&rbd_dev
->header_rwsem
);
653 if (!memcmp(snap_name
, RBD_SNAP_HEAD_NAME
,
654 sizeof (RBD_SNAP_HEAD_NAME
))) {
655 rbd_dev
->mapping
.snap_id
= CEPH_NOSNAP
;
656 rbd_dev
->mapping
.size
= rbd_dev
->header
.image_size
;
657 rbd_dev
->mapping
.snap_exists
= false;
658 rbd_dev
->mapping
.read_only
= rbd_dev
->rbd_opts
.read_only
;
660 ret
= snap_by_name(&rbd_dev
->header
, snap_name
,
661 &rbd_dev
->mapping
.snap_id
,
662 &rbd_dev
->mapping
.size
);
665 rbd_dev
->mapping
.snap_exists
= true;
666 rbd_dev
->mapping
.read_only
= true;
668 rbd_dev
->mapping
.snap_name
= snap_name
;
672 up_write(&rbd_dev
->header_rwsem
);
676 static void rbd_header_free(struct rbd_image_header
*header
)
678 kfree(header
->object_prefix
);
679 header
->object_prefix
= NULL
;
680 kfree(header
->snap_sizes
);
681 header
->snap_sizes
= NULL
;
682 kfree(header
->snap_names
);
683 header
->snap_names
= NULL
;
684 ceph_put_snap_context(header
->snapc
);
685 header
->snapc
= NULL
;
688 static char *rbd_segment_name(struct rbd_device
*rbd_dev
, u64 offset
)
694 name
= kmalloc(RBD_MAX_SEG_NAME_LEN
+ 1, GFP_NOIO
);
697 segment
= offset
>> rbd_dev
->header
.obj_order
;
698 ret
= snprintf(name
, RBD_MAX_SEG_NAME_LEN
, "%s.%012llx",
699 rbd_dev
->header
.object_prefix
, segment
);
700 if (ret
< 0 || ret
>= RBD_MAX_SEG_NAME_LEN
) {
701 pr_err("error formatting segment name for #%llu (%d)\n",
710 static u64
rbd_segment_offset(struct rbd_device
*rbd_dev
, u64 offset
)
712 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
714 return offset
& (segment_size
- 1);
717 static u64
rbd_segment_length(struct rbd_device
*rbd_dev
,
718 u64 offset
, u64 length
)
720 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
722 offset
&= segment_size
- 1;
724 rbd_assert(length
<= U64_MAX
- offset
);
725 if (offset
+ length
> segment_size
)
726 length
= segment_size
- offset
;
731 static int rbd_get_num_segments(struct rbd_image_header
*header
,
739 if (len
- 1 > U64_MAX
- ofs
)
742 start_seg
= ofs
>> header
->obj_order
;
743 end_seg
= (ofs
+ len
- 1) >> header
->obj_order
;
745 return end_seg
- start_seg
+ 1;
749 * returns the size of an object in the image
751 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
753 return 1 << header
->obj_order
;
760 static void bio_chain_put(struct bio
*chain
)
766 chain
= chain
->bi_next
;
772 * zeros a bio chain, starting at specific offset
774 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
783 bio_for_each_segment(bv
, chain
, i
) {
784 if (pos
+ bv
->bv_len
> start_ofs
) {
785 int remainder
= max(start_ofs
- pos
, 0);
786 buf
= bvec_kmap_irq(bv
, &flags
);
787 memset(buf
+ remainder
, 0,
788 bv
->bv_len
- remainder
);
789 bvec_kunmap_irq(buf
, &flags
);
794 chain
= chain
->bi_next
;
799 * bio_chain_clone - clone a chain of bios up to a certain length.
800 * might return a bio_pair that will need to be released.
802 static struct bio
*bio_chain_clone(struct bio
**old
, struct bio
**next
,
803 struct bio_pair
**bp
,
804 int len
, gfp_t gfpmask
)
806 struct bio
*old_chain
= *old
;
807 struct bio
*new_chain
= NULL
;
812 bio_pair_release(*bp
);
816 while (old_chain
&& (total
< len
)) {
819 tmp
= bio_kmalloc(gfpmask
, old_chain
->bi_max_vecs
);
822 gfpmask
&= ~__GFP_WAIT
; /* can't wait after the first */
824 if (total
+ old_chain
->bi_size
> len
) {
828 * this split can only happen with a single paged bio,
829 * split_bio will BUG_ON if this is not the case
831 dout("bio_chain_clone split! total=%d remaining=%d"
833 total
, len
- total
, old_chain
->bi_size
);
835 /* split the bio. We'll release it either in the next
836 call, or it will have to be released outside */
837 bp
= bio_split(old_chain
, (len
- total
) / SECTOR_SIZE
);
841 __bio_clone(tmp
, &bp
->bio1
);
845 __bio_clone(tmp
, old_chain
);
846 *next
= old_chain
->bi_next
;
856 old_chain
= old_chain
->bi_next
;
858 total
+= tmp
->bi_size
;
861 rbd_assert(total
== len
);
868 dout("bio_chain_clone with err\n");
869 bio_chain_put(new_chain
);
874 * helpers for osd request op vectors.
876 static struct ceph_osd_req_op
*rbd_create_rw_ops(int num_ops
,
877 int opcode
, u32 payload_len
)
879 struct ceph_osd_req_op
*ops
;
881 ops
= kzalloc(sizeof (*ops
) * (num_ops
+ 1), GFP_NOIO
);
888 * op extent offset and length will be set later on
889 * in calc_raw_layout()
891 ops
[0].payload_len
= payload_len
;
896 static void rbd_destroy_ops(struct ceph_osd_req_op
*ops
)
901 static void rbd_coll_end_req_index(struct request
*rq
,
902 struct rbd_req_coll
*coll
,
906 struct request_queue
*q
;
909 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
910 coll
, index
, ret
, (unsigned long long) len
);
916 blk_end_request(rq
, ret
, len
);
922 spin_lock_irq(q
->queue_lock
);
923 coll
->status
[index
].done
= 1;
924 coll
->status
[index
].rc
= ret
;
925 coll
->status
[index
].bytes
= len
;
926 max
= min
= coll
->num_done
;
927 while (max
< coll
->total
&& coll
->status
[max
].done
)
930 for (i
= min
; i
<max
; i
++) {
931 __blk_end_request(rq
, coll
->status
[i
].rc
,
932 coll
->status
[i
].bytes
);
934 kref_put(&coll
->kref
, rbd_coll_release
);
936 spin_unlock_irq(q
->queue_lock
);
939 static void rbd_coll_end_req(struct rbd_request
*req
,
942 rbd_coll_end_req_index(req
->rq
, req
->coll
, req
->coll_index
, ret
, len
);
946 * Send ceph osd request
948 static int rbd_do_request(struct request
*rq
,
949 struct rbd_device
*rbd_dev
,
950 struct ceph_snap_context
*snapc
,
952 const char *object_name
, u64 ofs
, u64 len
,
957 struct ceph_osd_req_op
*ops
,
958 struct rbd_req_coll
*coll
,
960 void (*rbd_cb
)(struct ceph_osd_request
*req
,
961 struct ceph_msg
*msg
),
962 struct ceph_osd_request
**linger_req
,
965 struct ceph_osd_request
*req
;
966 struct ceph_file_layout
*layout
;
969 struct timespec mtime
= CURRENT_TIME
;
970 struct rbd_request
*req_data
;
971 struct ceph_osd_request_head
*reqhead
;
972 struct ceph_osd_client
*osdc
;
974 req_data
= kzalloc(sizeof(*req_data
), GFP_NOIO
);
977 rbd_coll_end_req_index(rq
, coll
, coll_index
,
983 req_data
->coll
= coll
;
984 req_data
->coll_index
= coll_index
;
987 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name
,
988 (unsigned long long) ofs
, (unsigned long long) len
);
990 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
991 req
= ceph_osdc_alloc_request(osdc
, flags
, snapc
, ops
,
992 false, GFP_NOIO
, pages
, bio
);
998 req
->r_callback
= rbd_cb
;
1001 req_data
->bio
= bio
;
1002 req_data
->pages
= pages
;
1003 req_data
->len
= len
;
1005 req
->r_priv
= req_data
;
1007 reqhead
= req
->r_request
->front
.iov_base
;
1008 reqhead
->snapid
= cpu_to_le64(CEPH_NOSNAP
);
1010 strncpy(req
->r_oid
, object_name
, sizeof(req
->r_oid
));
1011 req
->r_oid_len
= strlen(req
->r_oid
);
1013 layout
= &req
->r_file_layout
;
1014 memset(layout
, 0, sizeof(*layout
));
1015 layout
->fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
1016 layout
->fl_stripe_count
= cpu_to_le32(1);
1017 layout
->fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
1018 layout
->fl_pg_pool
= cpu_to_le32(rbd_dev
->pool_id
);
1019 ceph_calc_raw_layout(osdc
, layout
, snapid
, ofs
, &len
, &bno
,
1022 ceph_osdc_build_request(req
, ofs
, &len
,
1026 req
->r_oid
, req
->r_oid_len
);
1029 ceph_osdc_set_request_linger(osdc
, req
);
1033 ret
= ceph_osdc_start_request(osdc
, req
, false);
1038 ret
= ceph_osdc_wait_request(osdc
, req
);
1040 *ver
= le64_to_cpu(req
->r_reassert_version
.version
);
1041 dout("reassert_ver=%llu\n",
1042 (unsigned long long)
1043 le64_to_cpu(req
->r_reassert_version
.version
));
1044 ceph_osdc_put_request(req
);
1049 bio_chain_put(req_data
->bio
);
1050 ceph_osdc_put_request(req
);
1052 rbd_coll_end_req(req_data
, ret
, len
);
1058 * Ceph osd op callback
1060 static void rbd_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1062 struct rbd_request
*req_data
= req
->r_priv
;
1063 struct ceph_osd_reply_head
*replyhead
;
1064 struct ceph_osd_op
*op
;
1070 replyhead
= msg
->front
.iov_base
;
1071 WARN_ON(le32_to_cpu(replyhead
->num_ops
) == 0);
1072 op
= (void *)(replyhead
+ 1);
1073 rc
= le32_to_cpu(replyhead
->result
);
1074 bytes
= le64_to_cpu(op
->extent
.length
);
1075 read_op
= (le16_to_cpu(op
->op
) == CEPH_OSD_OP_READ
);
1077 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1078 (unsigned long long) bytes
, read_op
, (int) rc
);
1080 if (rc
== -ENOENT
&& read_op
) {
1081 zero_bio_chain(req_data
->bio
, 0);
1083 } else if (rc
== 0 && read_op
&& bytes
< req_data
->len
) {
1084 zero_bio_chain(req_data
->bio
, bytes
);
1085 bytes
= req_data
->len
;
1088 rbd_coll_end_req(req_data
, rc
, bytes
);
1091 bio_chain_put(req_data
->bio
);
1093 ceph_osdc_put_request(req
);
1097 static void rbd_simple_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1099 ceph_osdc_put_request(req
);
1103 * Do a synchronous ceph osd operation
1105 static int rbd_req_sync_op(struct rbd_device
*rbd_dev
,
1106 struct ceph_snap_context
*snapc
,
1109 struct ceph_osd_req_op
*ops
,
1110 const char *object_name
,
1113 struct ceph_osd_request
**linger_req
,
1117 struct page
**pages
;
1120 rbd_assert(ops
!= NULL
);
1122 num_pages
= calc_pages_for(ofs
, len
);
1123 pages
= ceph_alloc_page_vector(num_pages
, GFP_KERNEL
);
1125 return PTR_ERR(pages
);
1127 ret
= rbd_do_request(NULL
, rbd_dev
, snapc
, snapid
,
1128 object_name
, ofs
, len
, NULL
,
1138 if ((flags
& CEPH_OSD_FLAG_READ
) && buf
)
1139 ret
= ceph_copy_from_page_vector(pages
, buf
, ofs
, ret
);
1142 ceph_release_page_vector(pages
, num_pages
);
1147 * Do an asynchronous ceph osd operation
1149 static int rbd_do_op(struct request
*rq
,
1150 struct rbd_device
*rbd_dev
,
1151 struct ceph_snap_context
*snapc
,
1153 int opcode
, int flags
,
1156 struct rbd_req_coll
*coll
,
1163 struct ceph_osd_req_op
*ops
;
1166 seg_name
= rbd_segment_name(rbd_dev
, ofs
);
1169 seg_len
= rbd_segment_length(rbd_dev
, ofs
, len
);
1170 seg_ofs
= rbd_segment_offset(rbd_dev
, ofs
);
1172 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? seg_len
: 0);
1175 ops
= rbd_create_rw_ops(1, opcode
, payload_len
);
1179 /* we've taken care of segment sizes earlier when we
1180 cloned the bios. We should never have a segment
1181 truncated at this point */
1182 rbd_assert(seg_len
== len
);
1184 ret
= rbd_do_request(rq
, rbd_dev
, snapc
, snapid
,
1185 seg_name
, seg_ofs
, seg_len
,
1191 rbd_req_cb
, 0, NULL
);
1193 rbd_destroy_ops(ops
);
1200 * Request async osd write
1202 static int rbd_req_write(struct request
*rq
,
1203 struct rbd_device
*rbd_dev
,
1204 struct ceph_snap_context
*snapc
,
1207 struct rbd_req_coll
*coll
,
1210 return rbd_do_op(rq
, rbd_dev
, snapc
, CEPH_NOSNAP
,
1212 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1213 ofs
, len
, bio
, coll
, coll_index
);
1217 * Request async osd read
1219 static int rbd_req_read(struct request
*rq
,
1220 struct rbd_device
*rbd_dev
,
1224 struct rbd_req_coll
*coll
,
1227 return rbd_do_op(rq
, rbd_dev
, NULL
,
1231 ofs
, len
, bio
, coll
, coll_index
);
1235 * Request sync osd read
1237 static int rbd_req_sync_read(struct rbd_device
*rbd_dev
,
1239 const char *object_name
,
1244 struct ceph_osd_req_op
*ops
;
1247 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_READ
, 0);
1251 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1254 ops
, object_name
, ofs
, len
, buf
, NULL
, ver
);
1255 rbd_destroy_ops(ops
);
1261 * Request sync osd watch
1263 static int rbd_req_sync_notify_ack(struct rbd_device
*rbd_dev
,
1267 struct ceph_osd_req_op
*ops
;
1270 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK
, 0);
1274 ops
[0].watch
.ver
= cpu_to_le64(ver
);
1275 ops
[0].watch
.cookie
= notify_id
;
1276 ops
[0].watch
.flag
= 0;
1278 ret
= rbd_do_request(NULL
, rbd_dev
, NULL
, CEPH_NOSNAP
,
1279 rbd_dev
->header_name
, 0, 0, NULL
,
1284 rbd_simple_req_cb
, 0, NULL
);
1286 rbd_destroy_ops(ops
);
1290 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1292 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1299 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1300 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1301 (unsigned int) opcode
);
1302 rc
= rbd_refresh_header(rbd_dev
, &hver
);
1304 pr_warning(RBD_DRV_NAME
"%d got notification but failed to "
1305 " update snaps: %d\n", rbd_dev
->major
, rc
);
1307 rbd_req_sync_notify_ack(rbd_dev
, hver
, notify_id
);
1311 * Request sync osd watch
1313 static int rbd_req_sync_watch(struct rbd_device
*rbd_dev
)
1315 struct ceph_osd_req_op
*ops
;
1316 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1319 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH
, 0);
1323 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, 0,
1324 (void *)rbd_dev
, &rbd_dev
->watch_event
);
1328 ops
[0].watch
.ver
= cpu_to_le64(rbd_dev
->header
.obj_version
);
1329 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1330 ops
[0].watch
.flag
= 1;
1332 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1334 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1336 rbd_dev
->header_name
,
1338 &rbd_dev
->watch_request
, NULL
);
1343 rbd_destroy_ops(ops
);
1347 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1348 rbd_dev
->watch_event
= NULL
;
1350 rbd_destroy_ops(ops
);
1355 * Request sync osd unwatch
1357 static int rbd_req_sync_unwatch(struct rbd_device
*rbd_dev
)
1359 struct ceph_osd_req_op
*ops
;
1362 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH
, 0);
1366 ops
[0].watch
.ver
= 0;
1367 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1368 ops
[0].watch
.flag
= 0;
1370 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1372 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1374 rbd_dev
->header_name
,
1375 0, 0, NULL
, NULL
, NULL
);
1378 rbd_destroy_ops(ops
);
1379 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1380 rbd_dev
->watch_event
= NULL
;
1384 struct rbd_notify_info
{
1385 struct rbd_device
*rbd_dev
;
1388 static void rbd_notify_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1390 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1394 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1395 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1396 (unsigned int) opcode
);
1400 * Request sync osd notify
1402 static int rbd_req_sync_notify(struct rbd_device
*rbd_dev
)
1404 struct ceph_osd_req_op
*ops
;
1405 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1406 struct ceph_osd_event
*event
;
1407 struct rbd_notify_info info
;
1408 int payload_len
= sizeof(u32
) + sizeof(u32
);
1411 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY
, payload_len
);
1415 info
.rbd_dev
= rbd_dev
;
1417 ret
= ceph_osdc_create_event(osdc
, rbd_notify_cb
, 1,
1418 (void *)&info
, &event
);
1422 ops
[0].watch
.ver
= 1;
1423 ops
[0].watch
.flag
= 1;
1424 ops
[0].watch
.cookie
= event
->cookie
;
1425 ops
[0].watch
.prot_ver
= RADOS_NOTIFY_VER
;
1426 ops
[0].watch
.timeout
= 12;
1428 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1430 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1432 rbd_dev
->header_name
,
1433 0, 0, NULL
, NULL
, NULL
);
1437 ret
= ceph_osdc_wait_event(event
, CEPH_OSD_TIMEOUT_DEFAULT
);
1438 dout("ceph_osdc_wait_event returned %d\n", ret
);
1439 rbd_destroy_ops(ops
);
1443 ceph_osdc_cancel_event(event
);
1445 rbd_destroy_ops(ops
);
1450 * Request sync osd read
1452 static int rbd_req_sync_exec(struct rbd_device
*rbd_dev
,
1453 const char *object_name
,
1454 const char *class_name
,
1455 const char *method_name
,
1460 struct ceph_osd_req_op
*ops
;
1461 int class_name_len
= strlen(class_name
);
1462 int method_name_len
= strlen(method_name
);
1465 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_CALL
,
1466 class_name_len
+ method_name_len
+ len
);
1470 ops
[0].cls
.class_name
= class_name
;
1471 ops
[0].cls
.class_len
= (__u8
) class_name_len
;
1472 ops
[0].cls
.method_name
= method_name
;
1473 ops
[0].cls
.method_len
= (__u8
) method_name_len
;
1474 ops
[0].cls
.argc
= 0;
1475 ops
[0].cls
.indata
= data
;
1476 ops
[0].cls
.indata_len
= len
;
1478 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1480 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1482 object_name
, 0, 0, NULL
, NULL
, ver
);
1484 rbd_destroy_ops(ops
);
1486 dout("cls_exec returned %d\n", ret
);
1490 static struct rbd_req_coll
*rbd_alloc_coll(int num_reqs
)
1492 struct rbd_req_coll
*coll
=
1493 kzalloc(sizeof(struct rbd_req_coll
) +
1494 sizeof(struct rbd_req_status
) * num_reqs
,
1499 coll
->total
= num_reqs
;
1500 kref_init(&coll
->kref
);
1505 * block device queue callback
1507 static void rbd_rq_fn(struct request_queue
*q
)
1509 struct rbd_device
*rbd_dev
= q
->queuedata
;
1511 struct bio_pair
*bp
= NULL
;
1513 while ((rq
= blk_fetch_request(q
))) {
1515 struct bio
*rq_bio
, *next_bio
= NULL
;
1520 int num_segs
, cur_seg
= 0;
1521 struct rbd_req_coll
*coll
;
1522 struct ceph_snap_context
*snapc
;
1524 dout("fetched request\n");
1526 /* filter out block requests we don't understand */
1527 if ((rq
->cmd_type
!= REQ_TYPE_FS
)) {
1528 __blk_end_request_all(rq
, 0);
1532 /* deduce our operation (read, write) */
1533 do_write
= (rq_data_dir(rq
) == WRITE
);
1535 size
= blk_rq_bytes(rq
);
1536 ofs
= blk_rq_pos(rq
) * SECTOR_SIZE
;
1538 if (do_write
&& rbd_dev
->mapping
.read_only
) {
1539 __blk_end_request_all(rq
, -EROFS
);
1543 spin_unlock_irq(q
->queue_lock
);
1545 down_read(&rbd_dev
->header_rwsem
);
1547 if (rbd_dev
->mapping
.snap_id
!= CEPH_NOSNAP
&&
1548 !rbd_dev
->mapping
.snap_exists
) {
1549 up_read(&rbd_dev
->header_rwsem
);
1550 dout("request for non-existent snapshot");
1551 spin_lock_irq(q
->queue_lock
);
1552 __blk_end_request_all(rq
, -ENXIO
);
1556 snapc
= ceph_get_snap_context(rbd_dev
->header
.snapc
);
1558 up_read(&rbd_dev
->header_rwsem
);
1560 dout("%s 0x%x bytes at 0x%llx\n",
1561 do_write
? "write" : "read",
1562 size
, (unsigned long long) blk_rq_pos(rq
) * SECTOR_SIZE
);
1564 num_segs
= rbd_get_num_segments(&rbd_dev
->header
, ofs
, size
);
1565 if (num_segs
<= 0) {
1566 spin_lock_irq(q
->queue_lock
);
1567 __blk_end_request_all(rq
, num_segs
);
1568 ceph_put_snap_context(snapc
);
1571 coll
= rbd_alloc_coll(num_segs
);
1573 spin_lock_irq(q
->queue_lock
);
1574 __blk_end_request_all(rq
, -ENOMEM
);
1575 ceph_put_snap_context(snapc
);
1580 /* a bio clone to be passed down to OSD req */
1581 dout("rq->bio->bi_vcnt=%hu\n", rq
->bio
->bi_vcnt
);
1582 op_size
= rbd_segment_length(rbd_dev
, ofs
, size
);
1583 kref_get(&coll
->kref
);
1584 bio
= bio_chain_clone(&rq_bio
, &next_bio
, &bp
,
1585 op_size
, GFP_ATOMIC
);
1587 rbd_coll_end_req_index(rq
, coll
, cur_seg
,
1593 /* init OSD command: write or read */
1595 rbd_req_write(rq
, rbd_dev
,
1601 rbd_req_read(rq
, rbd_dev
,
1602 rbd_dev
->mapping
.snap_id
,
1614 kref_put(&coll
->kref
, rbd_coll_release
);
1617 bio_pair_release(bp
);
1618 spin_lock_irq(q
->queue_lock
);
1620 ceph_put_snap_context(snapc
);
1625 * a queue callback. Makes sure that we don't create a bio that spans across
1626 * multiple osd objects. One exception would be with a single page bios,
1627 * which we handle later at bio_chain_clone
1629 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
1630 struct bio_vec
*bvec
)
1632 struct rbd_device
*rbd_dev
= q
->queuedata
;
1633 unsigned int chunk_sectors
;
1635 unsigned int bio_sectors
;
1638 chunk_sectors
= 1 << (rbd_dev
->header
.obj_order
- SECTOR_SHIFT
);
1639 sector
= bmd
->bi_sector
+ get_start_sect(bmd
->bi_bdev
);
1640 bio_sectors
= bmd
->bi_size
>> SECTOR_SHIFT
;
1642 max
= (chunk_sectors
- ((sector
& (chunk_sectors
- 1))
1643 + bio_sectors
)) << SECTOR_SHIFT
;
1645 max
= 0; /* bio_add cannot handle a negative return */
1646 if (max
<= bvec
->bv_len
&& bio_sectors
== 0)
1647 return bvec
->bv_len
;
1651 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
1653 struct gendisk
*disk
= rbd_dev
->disk
;
1658 rbd_header_free(&rbd_dev
->header
);
1660 if (disk
->flags
& GENHD_FL_UP
)
1663 blk_cleanup_queue(disk
->queue
);
1668 * Read the complete header for the given rbd device.
1670 * Returns a pointer to a dynamically-allocated buffer containing
1671 * the complete and validated header. Caller can pass the address
1672 * of a variable that will be filled in with the version of the
1673 * header object at the time it was read.
1675 * Returns a pointer-coded errno if a failure occurs.
1677 static struct rbd_image_header_ondisk
*
1678 rbd_dev_v1_header_read(struct rbd_device
*rbd_dev
, u64
*version
)
1680 struct rbd_image_header_ondisk
*ondisk
= NULL
;
1687 * The complete header will include an array of its 64-bit
1688 * snapshot ids, followed by the names of those snapshots as
1689 * a contiguous block of NUL-terminated strings. Note that
1690 * the number of snapshots could change by the time we read
1691 * it in, in which case we re-read it.
1698 size
= sizeof (*ondisk
);
1699 size
+= snap_count
* sizeof (struct rbd_image_snap_ondisk
);
1701 ondisk
= kmalloc(size
, GFP_KERNEL
);
1703 return ERR_PTR(-ENOMEM
);
1705 ret
= rbd_req_sync_read(rbd_dev
, CEPH_NOSNAP
,
1706 rbd_dev
->header_name
,
1708 (char *) ondisk
, version
);
1712 if (WARN_ON((size_t) ret
< size
)) {
1714 pr_warning("short header read for image %s"
1715 " (want %zd got %d)\n",
1716 rbd_dev
->image_name
, size
, ret
);
1719 if (!rbd_dev_ondisk_valid(ondisk
)) {
1721 pr_warning("invalid header for image %s\n",
1722 rbd_dev
->image_name
);
1726 names_size
= le64_to_cpu(ondisk
->snap_names_len
);
1727 want_count
= snap_count
;
1728 snap_count
= le32_to_cpu(ondisk
->snap_count
);
1729 } while (snap_count
!= want_count
);
1736 return ERR_PTR(ret
);
1740 * reload the ondisk the header
1742 static int rbd_read_header(struct rbd_device
*rbd_dev
,
1743 struct rbd_image_header
*header
)
1745 struct rbd_image_header_ondisk
*ondisk
;
1749 ondisk
= rbd_dev_v1_header_read(rbd_dev
, &ver
);
1751 return PTR_ERR(ondisk
);
1752 ret
= rbd_header_from_disk(header
, ondisk
);
1754 header
->obj_version
= ver
;
1763 static int rbd_header_add_snap(struct rbd_device
*rbd_dev
,
1764 const char *snap_name
,
1767 int name_len
= strlen(snap_name
);
1771 struct ceph_mon_client
*monc
;
1773 /* we should create a snapshot only if we're pointing at the head */
1774 if (rbd_dev
->mapping
.snap_id
!= CEPH_NOSNAP
)
1777 monc
= &rbd_dev
->rbd_client
->client
->monc
;
1778 ret
= ceph_monc_create_snapid(monc
, rbd_dev
->pool_id
, &new_snapid
);
1779 dout("created snapid=%llu\n", (unsigned long long) new_snapid
);
1783 data
= kmalloc(name_len
+ 16, gfp_flags
);
1788 e
= data
+ name_len
+ 16;
1790 ceph_encode_string_safe(&p
, e
, snap_name
, name_len
, bad
);
1791 ceph_encode_64_safe(&p
, e
, new_snapid
, bad
);
1793 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
1795 data
, p
- data
, NULL
);
1799 return ret
< 0 ? ret
: 0;
1804 static void __rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
1806 struct rbd_snap
*snap
;
1807 struct rbd_snap
*next
;
1809 list_for_each_entry_safe(snap
, next
, &rbd_dev
->snaps
, node
)
1810 __rbd_remove_snap_dev(snap
);
1814 * only read the first part of the ondisk header, without the snaps info
1816 static int __rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
)
1819 struct rbd_image_header h
;
1821 ret
= rbd_read_header(rbd_dev
, &h
);
1825 down_write(&rbd_dev
->header_rwsem
);
1828 if (rbd_dev
->mapping
.snap_id
== CEPH_NOSNAP
) {
1829 sector_t size
= (sector_t
) h
.image_size
/ SECTOR_SIZE
;
1831 if (size
!= (sector_t
) rbd_dev
->mapping
.size
) {
1832 dout("setting size to %llu sectors",
1833 (unsigned long long) size
);
1834 rbd_dev
->mapping
.size
= (u64
) size
;
1835 set_capacity(rbd_dev
->disk
, size
);
1839 /* rbd_dev->header.object_prefix shouldn't change */
1840 kfree(rbd_dev
->header
.snap_sizes
);
1841 kfree(rbd_dev
->header
.snap_names
);
1842 /* osd requests may still refer to snapc */
1843 ceph_put_snap_context(rbd_dev
->header
.snapc
);
1846 *hver
= h
.obj_version
;
1847 rbd_dev
->header
.obj_version
= h
.obj_version
;
1848 rbd_dev
->header
.image_size
= h
.image_size
;
1849 rbd_dev
->header
.snapc
= h
.snapc
;
1850 rbd_dev
->header
.snap_names
= h
.snap_names
;
1851 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
1852 /* Free the extra copy of the object prefix */
1853 WARN_ON(strcmp(rbd_dev
->header
.object_prefix
, h
.object_prefix
));
1854 kfree(h
.object_prefix
);
1856 ret
= rbd_dev_snap_devs_update(rbd_dev
);
1858 up_write(&rbd_dev
->header_rwsem
);
1863 static int rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
)
1867 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1868 ret
= __rbd_refresh_header(rbd_dev
, hver
);
1869 mutex_unlock(&ctl_mutex
);
1874 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
1876 struct gendisk
*disk
;
1877 struct request_queue
*q
;
1881 /* contact OSD, request size info about the object being mapped */
1882 rc
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
1886 /* no need to lock here, as rbd_dev is not registered yet */
1887 rc
= rbd_dev_snap_devs_update(rbd_dev
);
1891 rc
= rbd_header_set_snap(rbd_dev
, snap_name
);
1895 /* create gendisk info */
1897 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
1901 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), RBD_DRV_NAME
"%d",
1903 disk
->major
= rbd_dev
->major
;
1904 disk
->first_minor
= 0;
1905 disk
->fops
= &rbd_bd_ops
;
1906 disk
->private_data
= rbd_dev
;
1910 q
= blk_init_queue(rbd_rq_fn
, &rbd_dev
->lock
);
1914 /* We use the default size, but let's be explicit about it. */
1915 blk_queue_physical_block_size(q
, SECTOR_SIZE
);
1917 /* set io sizes to object size */
1918 segment_size
= rbd_obj_bytes(&rbd_dev
->header
);
1919 blk_queue_max_hw_sectors(q
, segment_size
/ SECTOR_SIZE
);
1920 blk_queue_max_segment_size(q
, segment_size
);
1921 blk_queue_io_min(q
, segment_size
);
1922 blk_queue_io_opt(q
, segment_size
);
1924 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
1927 q
->queuedata
= rbd_dev
;
1929 rbd_dev
->disk
= disk
;
1931 /* finally, announce the disk to the world */
1932 set_capacity(disk
, (sector_t
) rbd_dev
->mapping
.size
/ SECTOR_SIZE
);
1935 pr_info("%s: added with size 0x%llx\n",
1936 disk
->disk_name
, (unsigned long long) rbd_dev
->mapping
.size
);
1949 static struct rbd_device
*dev_to_rbd_dev(struct device
*dev
)
1951 return container_of(dev
, struct rbd_device
, dev
);
1954 static ssize_t
rbd_size_show(struct device
*dev
,
1955 struct device_attribute
*attr
, char *buf
)
1957 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1960 down_read(&rbd_dev
->header_rwsem
);
1961 size
= get_capacity(rbd_dev
->disk
);
1962 up_read(&rbd_dev
->header_rwsem
);
1964 return sprintf(buf
, "%llu\n", (unsigned long long) size
* SECTOR_SIZE
);
1967 static ssize_t
rbd_major_show(struct device
*dev
,
1968 struct device_attribute
*attr
, char *buf
)
1970 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1972 return sprintf(buf
, "%d\n", rbd_dev
->major
);
1975 static ssize_t
rbd_client_id_show(struct device
*dev
,
1976 struct device_attribute
*attr
, char *buf
)
1978 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1980 return sprintf(buf
, "client%lld\n",
1981 ceph_client_id(rbd_dev
->rbd_client
->client
));
1984 static ssize_t
rbd_pool_show(struct device
*dev
,
1985 struct device_attribute
*attr
, char *buf
)
1987 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1989 return sprintf(buf
, "%s\n", rbd_dev
->pool_name
);
1992 static ssize_t
rbd_pool_id_show(struct device
*dev
,
1993 struct device_attribute
*attr
, char *buf
)
1995 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1997 return sprintf(buf
, "%d\n", rbd_dev
->pool_id
);
2000 static ssize_t
rbd_name_show(struct device
*dev
,
2001 struct device_attribute
*attr
, char *buf
)
2003 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2005 return sprintf(buf
, "%s\n", rbd_dev
->image_name
);
2008 static ssize_t
rbd_snap_show(struct device
*dev
,
2009 struct device_attribute
*attr
,
2012 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2014 return sprintf(buf
, "%s\n", rbd_dev
->mapping
.snap_name
);
2017 static ssize_t
rbd_image_refresh(struct device
*dev
,
2018 struct device_attribute
*attr
,
2022 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2025 ret
= rbd_refresh_header(rbd_dev
, NULL
);
2027 return ret
< 0 ? ret
: size
;
2030 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
2031 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
2032 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
2033 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
2034 static DEVICE_ATTR(pool_id
, S_IRUGO
, rbd_pool_id_show
, NULL
);
2035 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
2036 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
2037 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
2038 static DEVICE_ATTR(create_snap
, S_IWUSR
, NULL
, rbd_snap_add
);
2040 static struct attribute
*rbd_attrs
[] = {
2041 &dev_attr_size
.attr
,
2042 &dev_attr_major
.attr
,
2043 &dev_attr_client_id
.attr
,
2044 &dev_attr_pool
.attr
,
2045 &dev_attr_pool_id
.attr
,
2046 &dev_attr_name
.attr
,
2047 &dev_attr_current_snap
.attr
,
2048 &dev_attr_refresh
.attr
,
2049 &dev_attr_create_snap
.attr
,
2053 static struct attribute_group rbd_attr_group
= {
2057 static const struct attribute_group
*rbd_attr_groups
[] = {
2062 static void rbd_sysfs_dev_release(struct device
*dev
)
2066 static struct device_type rbd_device_type
= {
2068 .groups
= rbd_attr_groups
,
2069 .release
= rbd_sysfs_dev_release
,
2077 static ssize_t
rbd_snap_size_show(struct device
*dev
,
2078 struct device_attribute
*attr
,
2081 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2083 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->size
);
2086 static ssize_t
rbd_snap_id_show(struct device
*dev
,
2087 struct device_attribute
*attr
,
2090 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2092 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->id
);
2095 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
2096 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
2098 static struct attribute
*rbd_snap_attrs
[] = {
2099 &dev_attr_snap_size
.attr
,
2100 &dev_attr_snap_id
.attr
,
2104 static struct attribute_group rbd_snap_attr_group
= {
2105 .attrs
= rbd_snap_attrs
,
2108 static void rbd_snap_dev_release(struct device
*dev
)
2110 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2115 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
2116 &rbd_snap_attr_group
,
2120 static struct device_type rbd_snap_device_type
= {
2121 .groups
= rbd_snap_attr_groups
,
2122 .release
= rbd_snap_dev_release
,
2125 static void __rbd_remove_snap_dev(struct rbd_snap
*snap
)
2127 list_del(&snap
->node
);
2128 device_unregister(&snap
->dev
);
2131 static int rbd_register_snap_dev(struct rbd_snap
*snap
,
2132 struct device
*parent
)
2134 struct device
*dev
= &snap
->dev
;
2137 dev
->type
= &rbd_snap_device_type
;
2138 dev
->parent
= parent
;
2139 dev
->release
= rbd_snap_dev_release
;
2140 dev_set_name(dev
, "snap_%s", snap
->name
);
2141 ret
= device_register(dev
);
2146 static struct rbd_snap
*__rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
2147 int i
, const char *name
)
2149 struct rbd_snap
*snap
;
2152 snap
= kzalloc(sizeof (*snap
), GFP_KERNEL
);
2154 return ERR_PTR(-ENOMEM
);
2157 snap
->name
= kstrdup(name
, GFP_KERNEL
);
2161 snap
->size
= rbd_dev
->header
.snap_sizes
[i
];
2162 snap
->id
= rbd_dev
->header
.snapc
->snaps
[i
];
2163 if (device_is_registered(&rbd_dev
->dev
)) {
2164 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
2175 return ERR_PTR(ret
);
2179 * Scan the rbd device's current snapshot list and compare it to the
2180 * newly-received snapshot context. Remove any existing snapshots
2181 * not present in the new snapshot context. Add a new snapshot for
2182 * any snaphots in the snapshot context not in the current list.
2183 * And verify there are no changes to snapshots we already know
2186 * Assumes the snapshots in the snapshot context are sorted by
2187 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2188 * are also maintained in that order.)
2190 static int rbd_dev_snap_devs_update(struct rbd_device
*rbd_dev
)
2192 struct ceph_snap_context
*snapc
= rbd_dev
->header
.snapc
;
2193 const u32 snap_count
= snapc
->num_snaps
;
2194 char *snap_name
= rbd_dev
->header
.snap_names
;
2195 struct list_head
*head
= &rbd_dev
->snaps
;
2196 struct list_head
*links
= head
->next
;
2199 dout("%s: snap count is %u\n", __func__
, (unsigned int) snap_count
);
2200 while (index
< snap_count
|| links
!= head
) {
2202 struct rbd_snap
*snap
;
2204 snap_id
= index
< snap_count
? snapc
->snaps
[index
]
2206 snap
= links
!= head
? list_entry(links
, struct rbd_snap
, node
)
2208 rbd_assert(!snap
|| snap
->id
!= CEPH_NOSNAP
);
2210 if (snap_id
== CEPH_NOSNAP
|| (snap
&& snap
->id
> snap_id
)) {
2211 struct list_head
*next
= links
->next
;
2213 /* Existing snapshot not in the new snap context */
2215 if (rbd_dev
->mapping
.snap_id
== snap
->id
)
2216 rbd_dev
->mapping
.snap_exists
= false;
2217 __rbd_remove_snap_dev(snap
);
2218 dout("%ssnap id %llu has been removed\n",
2219 rbd_dev
->mapping
.snap_id
== snap
->id
?
2221 (unsigned long long) snap
->id
);
2223 /* Done with this list entry; advance */
2229 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count
,
2230 (unsigned long long) snap_id
);
2231 if (!snap
|| (snap_id
!= CEPH_NOSNAP
&& snap
->id
< snap_id
)) {
2232 struct rbd_snap
*new_snap
;
2234 /* We haven't seen this snapshot before */
2236 new_snap
= __rbd_add_snap_dev(rbd_dev
, index
,
2238 if (IS_ERR(new_snap
)) {
2239 int err
= PTR_ERR(new_snap
);
2241 dout(" failed to add dev, error %d\n", err
);
2246 /* New goes before existing, or at end of list */
2248 dout(" added dev%s\n", snap
? "" : " at end\n");
2250 list_add_tail(&new_snap
->node
, &snap
->node
);
2252 list_add_tail(&new_snap
->node
, head
);
2254 /* Already have this one */
2256 dout(" already present\n");
2258 rbd_assert(snap
->size
==
2259 rbd_dev
->header
.snap_sizes
[index
]);
2260 rbd_assert(!strcmp(snap
->name
, snap_name
));
2262 /* Done with this list entry; advance */
2264 links
= links
->next
;
2267 /* Advance to the next entry in the snapshot context */
2270 snap_name
+= strlen(snap_name
) + 1;
2272 dout("%s: done\n", __func__
);
2277 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
2281 struct rbd_snap
*snap
;
2283 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2284 dev
= &rbd_dev
->dev
;
2286 dev
->bus
= &rbd_bus_type
;
2287 dev
->type
= &rbd_device_type
;
2288 dev
->parent
= &rbd_root_dev
;
2289 dev
->release
= rbd_dev_release
;
2290 dev_set_name(dev
, "%d", rbd_dev
->dev_id
);
2291 ret
= device_register(dev
);
2295 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
2296 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
2301 mutex_unlock(&ctl_mutex
);
2305 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
2307 device_unregister(&rbd_dev
->dev
);
2310 static int rbd_init_watch_dev(struct rbd_device
*rbd_dev
)
2315 ret
= rbd_req_sync_watch(rbd_dev
);
2316 if (ret
== -ERANGE
) {
2317 rc
= rbd_refresh_header(rbd_dev
, NULL
);
2321 } while (ret
== -ERANGE
);
2326 static atomic64_t rbd_dev_id_max
= ATOMIC64_INIT(0);
2329 * Get a unique rbd identifier for the given new rbd_dev, and add
2330 * the rbd_dev to the global list. The minimum rbd id is 1.
2332 static void rbd_dev_id_get(struct rbd_device
*rbd_dev
)
2334 rbd_dev
->dev_id
= atomic64_inc_return(&rbd_dev_id_max
);
2336 spin_lock(&rbd_dev_list_lock
);
2337 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
2338 spin_unlock(&rbd_dev_list_lock
);
2339 dout("rbd_dev %p given dev id %llu\n", rbd_dev
,
2340 (unsigned long long) rbd_dev
->dev_id
);
2344 * Remove an rbd_dev from the global list, and record that its
2345 * identifier is no longer in use.
2347 static void rbd_dev_id_put(struct rbd_device
*rbd_dev
)
2349 struct list_head
*tmp
;
2350 int rbd_id
= rbd_dev
->dev_id
;
2353 rbd_assert(rbd_id
> 0);
2355 dout("rbd_dev %p released dev id %llu\n", rbd_dev
,
2356 (unsigned long long) rbd_dev
->dev_id
);
2357 spin_lock(&rbd_dev_list_lock
);
2358 list_del_init(&rbd_dev
->node
);
2361 * If the id being "put" is not the current maximum, there
2362 * is nothing special we need to do.
2364 if (rbd_id
!= atomic64_read(&rbd_dev_id_max
)) {
2365 spin_unlock(&rbd_dev_list_lock
);
2370 * We need to update the current maximum id. Search the
2371 * list to find out what it is. We're more likely to find
2372 * the maximum at the end, so search the list backward.
2375 list_for_each_prev(tmp
, &rbd_dev_list
) {
2376 struct rbd_device
*rbd_dev
;
2378 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2379 if (rbd_id
> max_id
)
2382 spin_unlock(&rbd_dev_list_lock
);
2385 * The max id could have been updated by rbd_dev_id_get(), in
2386 * which case it now accurately reflects the new maximum.
2387 * Be careful not to overwrite the maximum value in that
2390 atomic64_cmpxchg(&rbd_dev_id_max
, rbd_id
, max_id
);
2391 dout(" max dev id has been reset\n");
2395 * Skips over white space at *buf, and updates *buf to point to the
2396 * first found non-space character (if any). Returns the length of
2397 * the token (string of non-white space characters) found. Note
2398 * that *buf must be terminated with '\0'.
2400 static inline size_t next_token(const char **buf
)
2403 * These are the characters that produce nonzero for
2404 * isspace() in the "C" and "POSIX" locales.
2406 const char *spaces
= " \f\n\r\t\v";
2408 *buf
+= strspn(*buf
, spaces
); /* Find start of token */
2410 return strcspn(*buf
, spaces
); /* Return token length */
2414 * Finds the next token in *buf, and if the provided token buffer is
2415 * big enough, copies the found token into it. The result, if
2416 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2417 * must be terminated with '\0' on entry.
2419 * Returns the length of the token found (not including the '\0').
2420 * Return value will be 0 if no token is found, and it will be >=
2421 * token_size if the token would not fit.
2423 * The *buf pointer will be updated to point beyond the end of the
2424 * found token. Note that this occurs even if the token buffer is
2425 * too small to hold it.
2427 static inline size_t copy_token(const char **buf
,
2433 len
= next_token(buf
);
2434 if (len
< token_size
) {
2435 memcpy(token
, *buf
, len
);
2436 *(token
+ len
) = '\0';
2444 * Finds the next token in *buf, dynamically allocates a buffer big
2445 * enough to hold a copy of it, and copies the token into the new
2446 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2447 * that a duplicate buffer is created even for a zero-length token.
2449 * Returns a pointer to the newly-allocated duplicate, or a null
2450 * pointer if memory for the duplicate was not available. If
2451 * the lenp argument is a non-null pointer, the length of the token
2452 * (not including the '\0') is returned in *lenp.
2454 * If successful, the *buf pointer will be updated to point beyond
2455 * the end of the found token.
2457 * Note: uses GFP_KERNEL for allocation.
2459 static inline char *dup_token(const char **buf
, size_t *lenp
)
2464 len
= next_token(buf
);
2465 dup
= kmalloc(len
+ 1, GFP_KERNEL
);
2469 memcpy(dup
, *buf
, len
);
2470 *(dup
+ len
) = '\0';
2480 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2481 * rbd_md_name, and name fields of the given rbd_dev, based on the
2482 * list of monitor addresses and other options provided via
2483 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2484 * copy of the snapshot name to map if successful, or a
2485 * pointer-coded error otherwise.
2487 * Note: rbd_dev is assumed to have been initially zero-filled.
2489 static char *rbd_add_parse_args(struct rbd_device
*rbd_dev
,
2491 const char **mon_addrs
,
2492 size_t *mon_addrs_size
,
2494 size_t options_size
)
2497 char *err_ptr
= ERR_PTR(-EINVAL
);
2500 /* The first four tokens are required */
2502 len
= next_token(&buf
);
2505 *mon_addrs_size
= len
+ 1;
2510 len
= copy_token(&buf
, options
, options_size
);
2511 if (!len
|| len
>= options_size
)
2514 err_ptr
= ERR_PTR(-ENOMEM
);
2515 rbd_dev
->pool_name
= dup_token(&buf
, NULL
);
2516 if (!rbd_dev
->pool_name
)
2519 rbd_dev
->image_name
= dup_token(&buf
, &rbd_dev
->image_name_len
);
2520 if (!rbd_dev
->image_name
)
2523 /* Create the name of the header object */
2525 rbd_dev
->header_name
= kmalloc(rbd_dev
->image_name_len
2526 + sizeof (RBD_SUFFIX
),
2528 if (!rbd_dev
->header_name
)
2530 sprintf(rbd_dev
->header_name
, "%s%s", rbd_dev
->image_name
, RBD_SUFFIX
);
2532 /* Snapshot name is optional */
2533 len
= next_token(&buf
);
2535 buf
= RBD_SNAP_HEAD_NAME
; /* No snapshot supplied */
2536 len
= sizeof (RBD_SNAP_HEAD_NAME
) - 1;
2538 snap_name
= kmalloc(len
+ 1, GFP_KERNEL
);
2541 memcpy(snap_name
, buf
, len
);
2542 *(snap_name
+ len
) = '\0';
2544 dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name
, len
);
2549 kfree(rbd_dev
->header_name
);
2550 rbd_dev
->header_name
= NULL
;
2551 kfree(rbd_dev
->image_name
);
2552 rbd_dev
->image_name
= NULL
;
2553 rbd_dev
->image_name_len
= 0;
2554 kfree(rbd_dev
->pool_name
);
2555 rbd_dev
->pool_name
= NULL
;
2560 static ssize_t
rbd_add(struct bus_type
*bus
,
2565 struct rbd_device
*rbd_dev
= NULL
;
2566 const char *mon_addrs
= NULL
;
2567 size_t mon_addrs_size
= 0;
2568 struct ceph_osd_client
*osdc
;
2572 if (!try_module_get(THIS_MODULE
))
2575 options
= kmalloc(count
, GFP_KERNEL
);
2578 rbd_dev
= kzalloc(sizeof(*rbd_dev
), GFP_KERNEL
);
2582 /* static rbd_device initialization */
2583 spin_lock_init(&rbd_dev
->lock
);
2584 INIT_LIST_HEAD(&rbd_dev
->node
);
2585 INIT_LIST_HEAD(&rbd_dev
->snaps
);
2586 init_rwsem(&rbd_dev
->header_rwsem
);
2588 /* generate unique id: find highest unique id, add one */
2589 rbd_dev_id_get(rbd_dev
);
2591 /* Fill in the device name, now that we have its id. */
2592 BUILD_BUG_ON(DEV_NAME_LEN
2593 < sizeof (RBD_DRV_NAME
) + MAX_INT_FORMAT_WIDTH
);
2594 sprintf(rbd_dev
->name
, "%s%d", RBD_DRV_NAME
, rbd_dev
->dev_id
);
2596 /* parse add command */
2597 snap_name
= rbd_add_parse_args(rbd_dev
, buf
,
2598 &mon_addrs
, &mon_addrs_size
, options
, count
);
2599 if (IS_ERR(snap_name
)) {
2600 rc
= PTR_ERR(snap_name
);
2604 rc
= rbd_get_client(rbd_dev
, mon_addrs
, mon_addrs_size
- 1, options
);
2609 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2610 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, rbd_dev
->pool_name
);
2612 goto err_out_client
;
2613 rbd_dev
->pool_id
= rc
;
2615 /* register our block device */
2616 rc
= register_blkdev(0, rbd_dev
->name
);
2618 goto err_out_client
;
2619 rbd_dev
->major
= rc
;
2621 rc
= rbd_bus_add_dev(rbd_dev
);
2623 goto err_out_blkdev
;
2626 * At this point cleanup in the event of an error is the job
2627 * of the sysfs code (initiated by rbd_bus_del_dev()).
2629 * Set up and announce blkdev mapping.
2631 rc
= rbd_init_disk(rbd_dev
);
2635 rc
= rbd_init_watch_dev(rbd_dev
);
2642 /* this will also clean up rest of rbd_dev stuff */
2644 rbd_bus_del_dev(rbd_dev
);
2649 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2651 rbd_put_client(rbd_dev
);
2653 if (rbd_dev
->pool_name
) {
2654 kfree(rbd_dev
->mapping
.snap_name
);
2655 kfree(rbd_dev
->header_name
);
2656 kfree(rbd_dev
->image_name
);
2657 kfree(rbd_dev
->pool_name
);
2659 rbd_dev_id_put(rbd_dev
);
2664 dout("Error adding device %s\n", buf
);
2665 module_put(THIS_MODULE
);
2667 return (ssize_t
) rc
;
2670 static struct rbd_device
*__rbd_get_dev(unsigned long dev_id
)
2672 struct list_head
*tmp
;
2673 struct rbd_device
*rbd_dev
;
2675 spin_lock(&rbd_dev_list_lock
);
2676 list_for_each(tmp
, &rbd_dev_list
) {
2677 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2678 if (rbd_dev
->dev_id
== dev_id
) {
2679 spin_unlock(&rbd_dev_list_lock
);
2683 spin_unlock(&rbd_dev_list_lock
);
2687 static void rbd_dev_release(struct device
*dev
)
2689 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2691 if (rbd_dev
->watch_request
) {
2692 struct ceph_client
*client
= rbd_dev
->rbd_client
->client
;
2694 ceph_osdc_unregister_linger_request(&client
->osdc
,
2695 rbd_dev
->watch_request
);
2697 if (rbd_dev
->watch_event
)
2698 rbd_req_sync_unwatch(rbd_dev
);
2700 rbd_put_client(rbd_dev
);
2702 /* clean up and free blkdev */
2703 rbd_free_disk(rbd_dev
);
2704 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2706 /* done with the id, and with the rbd_dev */
2707 kfree(rbd_dev
->mapping
.snap_name
);
2708 kfree(rbd_dev
->header_name
);
2709 kfree(rbd_dev
->pool_name
);
2710 kfree(rbd_dev
->image_name
);
2711 rbd_dev_id_put(rbd_dev
);
2714 /* release module ref */
2715 module_put(THIS_MODULE
);
2718 static ssize_t
rbd_remove(struct bus_type
*bus
,
2722 struct rbd_device
*rbd_dev
= NULL
;
2727 rc
= strict_strtoul(buf
, 10, &ul
);
2731 /* convert to int; abort if we lost anything in the conversion */
2732 target_id
= (int) ul
;
2733 if (target_id
!= ul
)
2736 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2738 rbd_dev
= __rbd_get_dev(target_id
);
2744 __rbd_remove_all_snaps(rbd_dev
);
2745 rbd_bus_del_dev(rbd_dev
);
2748 mutex_unlock(&ctl_mutex
);
2753 static ssize_t
rbd_snap_add(struct device
*dev
,
2754 struct device_attribute
*attr
,
2758 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2760 char *name
= kmalloc(count
+ 1, GFP_KERNEL
);
2764 snprintf(name
, count
, "%s", buf
);
2766 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2768 ret
= rbd_header_add_snap(rbd_dev
,
2773 ret
= __rbd_refresh_header(rbd_dev
, NULL
);
2777 /* shouldn't hold ctl_mutex when notifying.. notify might
2778 trigger a watch callback that would need to get that mutex */
2779 mutex_unlock(&ctl_mutex
);
2781 /* make a best effort, don't error if failed */
2782 rbd_req_sync_notify(rbd_dev
);
2789 mutex_unlock(&ctl_mutex
);
2795 * create control files in sysfs
2798 static int rbd_sysfs_init(void)
2802 ret
= device_register(&rbd_root_dev
);
2806 ret
= bus_register(&rbd_bus_type
);
2808 device_unregister(&rbd_root_dev
);
2813 static void rbd_sysfs_cleanup(void)
2815 bus_unregister(&rbd_bus_type
);
2816 device_unregister(&rbd_root_dev
);
2819 int __init
rbd_init(void)
2823 rc
= rbd_sysfs_init();
2826 pr_info("loaded " RBD_DRV_NAME_LONG
"\n");
2830 void __exit
rbd_exit(void)
2832 rbd_sysfs_cleanup();
2835 module_init(rbd_init
);
2836 module_exit(rbd_exit
);
2838 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2839 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2840 MODULE_DESCRIPTION("rados block device");
2842 /* following authorship retained from original osdblk.c */
2843 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2845 MODULE_LICENSE("GPL");