2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_READ_ONLY_DEFAULT false
75 * block device image metadata (in-memory version)
77 struct rbd_image_header
{
83 struct ceph_snap_context
*snapc
;
97 * an instance of the client. multiple devices may share an rbd client.
100 struct ceph_client
*client
;
102 struct list_head node
;
106 * a request completion status
108 struct rbd_req_status
{
115 * a collection of requests
117 struct rbd_req_coll
{
121 struct rbd_req_status status
[0];
125 * a single io request
128 struct request
*rq
; /* blk layer request */
129 struct bio
*bio
; /* cloned bio */
130 struct page
**pages
; /* list of used pages */
133 struct rbd_req_coll
*coll
;
140 struct list_head node
;
148 int dev_id
; /* blkdev unique id */
150 int major
; /* blkdev assigned major */
151 struct gendisk
*disk
; /* blkdev's gendisk and rq */
152 struct request_queue
*q
;
154 struct rbd_options rbd_opts
;
155 struct rbd_client
*rbd_client
;
157 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
159 spinlock_t lock
; /* queue lock */
161 struct rbd_image_header header
;
163 size_t image_name_len
;
168 struct ceph_osd_event
*watch_event
;
169 struct ceph_osd_request
*watch_request
;
171 /* protects updating the header */
172 struct rw_semaphore header_rwsem
;
173 /* name of the snapshot this device reads from */
175 /* id of the snapshot this device reads from */
176 u64 snap_id
; /* current snapshot id */
177 /* whether the snap_id this device reads from still exists */
181 struct list_head node
;
183 /* list of snapshots */
184 struct list_head snaps
;
190 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
192 static LIST_HEAD(rbd_dev_list
); /* devices */
193 static DEFINE_SPINLOCK(rbd_dev_list_lock
);
195 static LIST_HEAD(rbd_client_list
); /* clients */
196 static DEFINE_SPINLOCK(rbd_client_list_lock
);
198 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
);
199 static void rbd_dev_release(struct device
*dev
);
200 static ssize_t
rbd_snap_add(struct device
*dev
,
201 struct device_attribute
*attr
,
204 static void __rbd_remove_snap_dev(struct rbd_snap
*snap
);
206 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
,
208 static ssize_t
rbd_remove(struct bus_type
*bus
, const char *buf
,
211 static struct bus_attribute rbd_bus_attrs
[] = {
212 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
213 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
217 static struct bus_type rbd_bus_type
= {
219 .bus_attrs
= rbd_bus_attrs
,
222 static void rbd_root_dev_release(struct device
*dev
)
226 static struct device rbd_root_dev
= {
228 .release
= rbd_root_dev_release
,
232 static struct device
*rbd_get_dev(struct rbd_device
*rbd_dev
)
234 return get_device(&rbd_dev
->dev
);
237 static void rbd_put_dev(struct rbd_device
*rbd_dev
)
239 put_device(&rbd_dev
->dev
);
242 static int rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
);
244 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
246 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
248 if ((mode
& FMODE_WRITE
) && rbd_dev
->read_only
)
251 rbd_get_dev(rbd_dev
);
252 set_device_ro(bdev
, rbd_dev
->read_only
);
257 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
259 struct rbd_device
*rbd_dev
= disk
->private_data
;
261 rbd_put_dev(rbd_dev
);
266 static const struct block_device_operations rbd_bd_ops
= {
267 .owner
= THIS_MODULE
,
269 .release
= rbd_release
,
273 * Initialize an rbd client instance.
276 static struct rbd_client
*rbd_client_create(struct ceph_options
*ceph_opts
)
278 struct rbd_client
*rbdc
;
281 dout("rbd_client_create\n");
282 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
286 kref_init(&rbdc
->kref
);
287 INIT_LIST_HEAD(&rbdc
->node
);
289 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
291 rbdc
->client
= ceph_create_client(ceph_opts
, rbdc
, 0, 0);
292 if (IS_ERR(rbdc
->client
))
294 ceph_opts
= NULL
; /* Now rbdc->client is responsible for ceph_opts */
296 ret
= ceph_open_session(rbdc
->client
);
300 spin_lock(&rbd_client_list_lock
);
301 list_add_tail(&rbdc
->node
, &rbd_client_list
);
302 spin_unlock(&rbd_client_list_lock
);
304 mutex_unlock(&ctl_mutex
);
306 dout("rbd_client_create created %p\n", rbdc
);
310 ceph_destroy_client(rbdc
->client
);
312 mutex_unlock(&ctl_mutex
);
316 ceph_destroy_options(ceph_opts
);
321 * Find a ceph client with specific addr and configuration. If
322 * found, bump its reference count.
324 static struct rbd_client
*rbd_client_find(struct ceph_options
*ceph_opts
)
326 struct rbd_client
*client_node
;
329 if (ceph_opts
->flags
& CEPH_OPT_NOSHARE
)
332 spin_lock(&rbd_client_list_lock
);
333 list_for_each_entry(client_node
, &rbd_client_list
, node
) {
334 if (!ceph_compare_options(ceph_opts
, client_node
->client
)) {
335 kref_get(&client_node
->kref
);
340 spin_unlock(&rbd_client_list_lock
);
342 return found
? client_node
: NULL
;
352 /* string args above */
355 /* Boolean args above */
359 static match_table_t rbd_opts_tokens
= {
361 /* string args above */
362 {Opt_read_only
, "read_only"},
363 {Opt_read_only
, "ro"}, /* Alternate spelling */
364 {Opt_read_write
, "read_write"},
365 {Opt_read_write
, "rw"}, /* Alternate spelling */
366 /* Boolean args above */
370 static int parse_rbd_opts_token(char *c
, void *private)
372 struct rbd_options
*rbd_opts
= private;
373 substring_t argstr
[MAX_OPT_ARGS
];
374 int token
, intval
, ret
;
376 token
= match_token(c
, rbd_opts_tokens
, argstr
);
380 if (token
< Opt_last_int
) {
381 ret
= match_int(&argstr
[0], &intval
);
383 pr_err("bad mount option arg (not int) "
387 dout("got int token %d val %d\n", token
, intval
);
388 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
389 dout("got string token %d val %s\n", token
,
391 } else if (token
> Opt_last_string
&& token
< Opt_last_bool
) {
392 dout("got Boolean token %d\n", token
);
394 dout("got token %d\n", token
);
399 rbd_opts
->read_only
= true;
402 rbd_opts
->read_only
= false;
411 * Get a ceph client with specific addr and configuration, if one does
412 * not exist create it.
414 static int rbd_get_client(struct rbd_device
*rbd_dev
, const char *mon_addr
,
415 size_t mon_addr_len
, char *options
)
417 struct rbd_options
*rbd_opts
= &rbd_dev
->rbd_opts
;
418 struct ceph_options
*ceph_opts
;
419 struct rbd_client
*rbdc
;
421 rbd_opts
->read_only
= RBD_READ_ONLY_DEFAULT
;
423 ceph_opts
= ceph_parse_options(options
, mon_addr
,
424 mon_addr
+ mon_addr_len
,
425 parse_rbd_opts_token
, rbd_opts
);
426 if (IS_ERR(ceph_opts
))
427 return PTR_ERR(ceph_opts
);
429 rbdc
= rbd_client_find(ceph_opts
);
431 /* using an existing client */
432 ceph_destroy_options(ceph_opts
);
434 rbdc
= rbd_client_create(ceph_opts
);
436 return PTR_ERR(rbdc
);
438 rbd_dev
->rbd_client
= rbdc
;
444 * Destroy ceph client
446 * Caller must hold rbd_client_list_lock.
448 static void rbd_client_release(struct kref
*kref
)
450 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
452 dout("rbd_release_client %p\n", rbdc
);
453 spin_lock(&rbd_client_list_lock
);
454 list_del(&rbdc
->node
);
455 spin_unlock(&rbd_client_list_lock
);
457 ceph_destroy_client(rbdc
->client
);
462 * Drop reference to ceph client node. If it's not referenced anymore, release
465 static void rbd_put_client(struct rbd_device
*rbd_dev
)
467 kref_put(&rbd_dev
->rbd_client
->kref
, rbd_client_release
);
468 rbd_dev
->rbd_client
= NULL
;
472 * Destroy requests collection
474 static void rbd_coll_release(struct kref
*kref
)
476 struct rbd_req_coll
*coll
=
477 container_of(kref
, struct rbd_req_coll
, kref
);
479 dout("rbd_coll_release %p\n", coll
);
483 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk
*ondisk
)
488 /* The header has to start with the magic rbd header text */
489 if (memcmp(&ondisk
->text
, RBD_HEADER_TEXT
, sizeof (RBD_HEADER_TEXT
)))
493 * The size of a snapshot header has to fit in a size_t, and
494 * that limits the number of snapshots.
496 snap_count
= le32_to_cpu(ondisk
->snap_count
);
497 size
= SIZE_MAX
- sizeof (struct ceph_snap_context
);
498 if (snap_count
> size
/ sizeof (__le64
))
502 * Not only that, but the size of the entire the snapshot
503 * header must also be representable in a size_t.
505 size
-= snap_count
* sizeof (__le64
);
506 if ((u64
) size
< le64_to_cpu(ondisk
->snap_names_len
))
513 * Create a new header structure, translate header format from the on-disk
516 static int rbd_header_from_disk(struct rbd_image_header
*header
,
517 struct rbd_image_header_ondisk
*ondisk
)
524 memset(header
, 0, sizeof (*header
));
526 snap_count
= le32_to_cpu(ondisk
->snap_count
);
528 len
= strnlen(ondisk
->object_prefix
, sizeof (ondisk
->object_prefix
));
529 header
->object_prefix
= kmalloc(len
+ 1, GFP_KERNEL
);
530 if (!header
->object_prefix
)
532 memcpy(header
->object_prefix
, ondisk
->object_prefix
, len
);
533 header
->object_prefix
[len
] = '\0';
536 u64 snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
538 /* Save a copy of the snapshot names */
540 if (snap_names_len
> (u64
) SIZE_MAX
)
542 header
->snap_names
= kmalloc(snap_names_len
, GFP_KERNEL
);
543 if (!header
->snap_names
)
546 * Note that rbd_dev_v1_header_read() guarantees
547 * the ondisk buffer we're working with has
548 * snap_names_len bytes beyond the end of the
549 * snapshot id array, this memcpy() is safe.
551 memcpy(header
->snap_names
, &ondisk
->snaps
[snap_count
],
554 /* Record each snapshot's size */
556 size
= snap_count
* sizeof (*header
->snap_sizes
);
557 header
->snap_sizes
= kmalloc(size
, GFP_KERNEL
);
558 if (!header
->snap_sizes
)
560 for (i
= 0; i
< snap_count
; i
++)
561 header
->snap_sizes
[i
] =
562 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
564 WARN_ON(ondisk
->snap_names_len
);
565 header
->snap_names
= NULL
;
566 header
->snap_sizes
= NULL
;
569 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
570 header
->obj_order
= ondisk
->options
.order
;
571 header
->crypt_type
= ondisk
->options
.crypt_type
;
572 header
->comp_type
= ondisk
->options
.comp_type
;
573 header
->total_snaps
= snap_count
;
575 /* Allocate and fill in the snapshot context */
577 size
= sizeof (struct ceph_snap_context
);
578 size
+= snap_count
* sizeof (header
->snapc
->snaps
[0]);
579 header
->snapc
= kzalloc(size
, GFP_KERNEL
);
583 atomic_set(&header
->snapc
->nref
, 1);
584 header
->snapc
->seq
= le64_to_cpu(ondisk
->snap_seq
);
585 header
->snapc
->num_snaps
= snap_count
;
586 for (i
= 0; i
< snap_count
; i
++)
587 header
->snapc
->snaps
[i
] =
588 le64_to_cpu(ondisk
->snaps
[i
].id
);
593 kfree(header
->snap_sizes
);
594 header
->snap_sizes
= NULL
;
595 kfree(header
->snap_names
);
596 header
->snap_names
= NULL
;
597 kfree(header
->object_prefix
);
598 header
->object_prefix
= NULL
;
603 static int snap_by_name(struct rbd_image_header
*header
, const char *snap_name
,
607 char *p
= header
->snap_names
;
609 for (i
= 0; i
< header
->total_snaps
; i
++) {
610 if (!strcmp(snap_name
, p
)) {
612 /* Found it. Pass back its id and/or size */
615 *seq
= header
->snapc
->snaps
[i
];
617 *size
= header
->snap_sizes
[i
];
620 p
+= strlen(p
) + 1; /* Skip ahead to the next name */
625 static int rbd_header_set_snap(struct rbd_device
*rbd_dev
, u64
*size
)
629 down_write(&rbd_dev
->header_rwsem
);
631 if (!memcmp(rbd_dev
->snap_name
, RBD_SNAP_HEAD_NAME
,
632 sizeof (RBD_SNAP_HEAD_NAME
))) {
633 rbd_dev
->snap_id
= CEPH_NOSNAP
;
634 rbd_dev
->snap_exists
= false;
635 rbd_dev
->read_only
= rbd_dev
->rbd_opts
.read_only
;
637 *size
= rbd_dev
->header
.image_size
;
641 ret
= snap_by_name(&rbd_dev
->header
, rbd_dev
->snap_name
,
645 rbd_dev
->snap_id
= snap_id
;
646 rbd_dev
->snap_exists
= true;
647 rbd_dev
->read_only
= true; /* No choice for snapshots */
652 up_write(&rbd_dev
->header_rwsem
);
656 static void rbd_header_free(struct rbd_image_header
*header
)
658 kfree(header
->object_prefix
);
659 header
->object_prefix
= NULL
;
660 kfree(header
->snap_sizes
);
661 header
->snap_sizes
= NULL
;
662 kfree(header
->snap_names
);
663 header
->snap_names
= NULL
;
664 ceph_put_snap_context(header
->snapc
);
665 header
->snapc
= NULL
;
669 * get the actual striped segment name, offset and length
671 static u64
rbd_get_segment(struct rbd_image_header
*header
,
672 const char *object_prefix
,
674 char *seg_name
, u64
*segofs
)
676 u64 seg
= ofs
>> header
->obj_order
;
679 snprintf(seg_name
, RBD_MAX_SEG_NAME_LEN
,
680 "%s.%012llx", object_prefix
, seg
);
682 ofs
= ofs
& ((1 << header
->obj_order
) - 1);
683 len
= min_t(u64
, len
, (1 << header
->obj_order
) - ofs
);
691 static int rbd_get_num_segments(struct rbd_image_header
*header
,
694 u64 start_seg
= ofs
>> header
->obj_order
;
695 u64 end_seg
= (ofs
+ len
- 1) >> header
->obj_order
;
696 return end_seg
- start_seg
+ 1;
700 * returns the size of an object in the image
702 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
704 return 1 << header
->obj_order
;
711 static void bio_chain_put(struct bio
*chain
)
717 chain
= chain
->bi_next
;
723 * zeros a bio chain, starting at specific offset
725 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
734 bio_for_each_segment(bv
, chain
, i
) {
735 if (pos
+ bv
->bv_len
> start_ofs
) {
736 int remainder
= max(start_ofs
- pos
, 0);
737 buf
= bvec_kmap_irq(bv
, &flags
);
738 memset(buf
+ remainder
, 0,
739 bv
->bv_len
- remainder
);
740 bvec_kunmap_irq(buf
, &flags
);
745 chain
= chain
->bi_next
;
750 * bio_chain_clone - clone a chain of bios up to a certain length.
751 * might return a bio_pair that will need to be released.
753 static struct bio
*bio_chain_clone(struct bio
**old
, struct bio
**next
,
754 struct bio_pair
**bp
,
755 int len
, gfp_t gfpmask
)
757 struct bio
*old_chain
= *old
;
758 struct bio
*new_chain
= NULL
;
763 bio_pair_release(*bp
);
767 while (old_chain
&& (total
< len
)) {
770 tmp
= bio_kmalloc(gfpmask
, old_chain
->bi_max_vecs
);
773 gfpmask
&= ~__GFP_WAIT
; /* can't wait after the first */
775 if (total
+ old_chain
->bi_size
> len
) {
779 * this split can only happen with a single paged bio,
780 * split_bio will BUG_ON if this is not the case
782 dout("bio_chain_clone split! total=%d remaining=%d"
784 total
, len
- total
, old_chain
->bi_size
);
786 /* split the bio. We'll release it either in the next
787 call, or it will have to be released outside */
788 bp
= bio_split(old_chain
, (len
- total
) / SECTOR_SIZE
);
792 __bio_clone(tmp
, &bp
->bio1
);
796 __bio_clone(tmp
, old_chain
);
797 *next
= old_chain
->bi_next
;
807 old_chain
= old_chain
->bi_next
;
809 total
+= tmp
->bi_size
;
819 dout("bio_chain_clone with err\n");
820 bio_chain_put(new_chain
);
825 * helpers for osd request op vectors.
827 static struct ceph_osd_req_op
*rbd_create_rw_ops(int num_ops
,
828 int opcode
, u32 payload_len
)
830 struct ceph_osd_req_op
*ops
;
832 ops
= kzalloc(sizeof (*ops
) * (num_ops
+ 1), GFP_NOIO
);
839 * op extent offset and length will be set later on
840 * in calc_raw_layout()
842 ops
[0].payload_len
= payload_len
;
847 static void rbd_destroy_ops(struct ceph_osd_req_op
*ops
)
852 static void rbd_coll_end_req_index(struct request
*rq
,
853 struct rbd_req_coll
*coll
,
857 struct request_queue
*q
;
860 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
861 coll
, index
, ret
, (unsigned long long) len
);
867 blk_end_request(rq
, ret
, len
);
873 spin_lock_irq(q
->queue_lock
);
874 coll
->status
[index
].done
= 1;
875 coll
->status
[index
].rc
= ret
;
876 coll
->status
[index
].bytes
= len
;
877 max
= min
= coll
->num_done
;
878 while (max
< coll
->total
&& coll
->status
[max
].done
)
881 for (i
= min
; i
<max
; i
++) {
882 __blk_end_request(rq
, coll
->status
[i
].rc
,
883 coll
->status
[i
].bytes
);
885 kref_put(&coll
->kref
, rbd_coll_release
);
887 spin_unlock_irq(q
->queue_lock
);
890 static void rbd_coll_end_req(struct rbd_request
*req
,
893 rbd_coll_end_req_index(req
->rq
, req
->coll
, req
->coll_index
, ret
, len
);
897 * Send ceph osd request
899 static int rbd_do_request(struct request
*rq
,
900 struct rbd_device
*rbd_dev
,
901 struct ceph_snap_context
*snapc
,
903 const char *object_name
, u64 ofs
, u64 len
,
908 struct ceph_osd_req_op
*ops
,
909 struct rbd_req_coll
*coll
,
911 void (*rbd_cb
)(struct ceph_osd_request
*req
,
912 struct ceph_msg
*msg
),
913 struct ceph_osd_request
**linger_req
,
916 struct ceph_osd_request
*req
;
917 struct ceph_file_layout
*layout
;
920 struct timespec mtime
= CURRENT_TIME
;
921 struct rbd_request
*req_data
;
922 struct ceph_osd_request_head
*reqhead
;
923 struct ceph_osd_client
*osdc
;
925 req_data
= kzalloc(sizeof(*req_data
), GFP_NOIO
);
928 rbd_coll_end_req_index(rq
, coll
, coll_index
,
934 req_data
->coll
= coll
;
935 req_data
->coll_index
= coll_index
;
938 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name
,
939 (unsigned long long) ofs
, (unsigned long long) len
);
941 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
942 req
= ceph_osdc_alloc_request(osdc
, flags
, snapc
, ops
,
943 false, GFP_NOIO
, pages
, bio
);
949 req
->r_callback
= rbd_cb
;
953 req_data
->pages
= pages
;
956 req
->r_priv
= req_data
;
958 reqhead
= req
->r_request
->front
.iov_base
;
959 reqhead
->snapid
= cpu_to_le64(CEPH_NOSNAP
);
961 strncpy(req
->r_oid
, object_name
, sizeof(req
->r_oid
));
962 req
->r_oid_len
= strlen(req
->r_oid
);
964 layout
= &req
->r_file_layout
;
965 memset(layout
, 0, sizeof(*layout
));
966 layout
->fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
967 layout
->fl_stripe_count
= cpu_to_le32(1);
968 layout
->fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
969 layout
->fl_pg_pool
= cpu_to_le32(rbd_dev
->pool_id
);
970 ceph_calc_raw_layout(osdc
, layout
, snapid
, ofs
, &len
, &bno
,
973 ceph_osdc_build_request(req
, ofs
, &len
,
977 req
->r_oid
, req
->r_oid_len
);
980 ceph_osdc_set_request_linger(osdc
, req
);
984 ret
= ceph_osdc_start_request(osdc
, req
, false);
989 ret
= ceph_osdc_wait_request(osdc
, req
);
991 *ver
= le64_to_cpu(req
->r_reassert_version
.version
);
992 dout("reassert_ver=%llu\n",
994 le64_to_cpu(req
->r_reassert_version
.version
));
995 ceph_osdc_put_request(req
);
1000 bio_chain_put(req_data
->bio
);
1001 ceph_osdc_put_request(req
);
1003 rbd_coll_end_req(req_data
, ret
, len
);
1009 * Ceph osd op callback
1011 static void rbd_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1013 struct rbd_request
*req_data
= req
->r_priv
;
1014 struct ceph_osd_reply_head
*replyhead
;
1015 struct ceph_osd_op
*op
;
1021 replyhead
= msg
->front
.iov_base
;
1022 WARN_ON(le32_to_cpu(replyhead
->num_ops
) == 0);
1023 op
= (void *)(replyhead
+ 1);
1024 rc
= le32_to_cpu(replyhead
->result
);
1025 bytes
= le64_to_cpu(op
->extent
.length
);
1026 read_op
= (le16_to_cpu(op
->op
) == CEPH_OSD_OP_READ
);
1028 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1029 (unsigned long long) bytes
, read_op
, (int) rc
);
1031 if (rc
== -ENOENT
&& read_op
) {
1032 zero_bio_chain(req_data
->bio
, 0);
1034 } else if (rc
== 0 && read_op
&& bytes
< req_data
->len
) {
1035 zero_bio_chain(req_data
->bio
, bytes
);
1036 bytes
= req_data
->len
;
1039 rbd_coll_end_req(req_data
, rc
, bytes
);
1042 bio_chain_put(req_data
->bio
);
1044 ceph_osdc_put_request(req
);
1048 static void rbd_simple_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1050 ceph_osdc_put_request(req
);
1054 * Do a synchronous ceph osd operation
1056 static int rbd_req_sync_op(struct rbd_device
*rbd_dev
,
1057 struct ceph_snap_context
*snapc
,
1060 struct ceph_osd_req_op
*ops
,
1061 const char *object_name
,
1064 struct ceph_osd_request
**linger_req
,
1068 struct page
**pages
;
1071 BUG_ON(ops
== NULL
);
1073 num_pages
= calc_pages_for(ofs
, len
);
1074 pages
= ceph_alloc_page_vector(num_pages
, GFP_KERNEL
);
1076 return PTR_ERR(pages
);
1078 ret
= rbd_do_request(NULL
, rbd_dev
, snapc
, snapid
,
1079 object_name
, ofs
, len
, NULL
,
1089 if ((flags
& CEPH_OSD_FLAG_READ
) && buf
)
1090 ret
= ceph_copy_from_page_vector(pages
, buf
, ofs
, ret
);
1093 ceph_release_page_vector(pages
, num_pages
);
1098 * Do an asynchronous ceph osd operation
1100 static int rbd_do_op(struct request
*rq
,
1101 struct rbd_device
*rbd_dev
,
1102 struct ceph_snap_context
*snapc
,
1104 int opcode
, int flags
,
1107 struct rbd_req_coll
*coll
,
1114 struct ceph_osd_req_op
*ops
;
1117 seg_name
= kmalloc(RBD_MAX_SEG_NAME_LEN
+ 1, GFP_NOIO
);
1121 seg_len
= rbd_get_segment(&rbd_dev
->header
,
1122 rbd_dev
->header
.object_prefix
,
1124 seg_name
, &seg_ofs
);
1126 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? seg_len
: 0);
1129 ops
= rbd_create_rw_ops(1, opcode
, payload_len
);
1133 /* we've taken care of segment sizes earlier when we
1134 cloned the bios. We should never have a segment
1135 truncated at this point */
1136 BUG_ON(seg_len
< len
);
1138 ret
= rbd_do_request(rq
, rbd_dev
, snapc
, snapid
,
1139 seg_name
, seg_ofs
, seg_len
,
1145 rbd_req_cb
, 0, NULL
);
1147 rbd_destroy_ops(ops
);
1154 * Request async osd write
1156 static int rbd_req_write(struct request
*rq
,
1157 struct rbd_device
*rbd_dev
,
1158 struct ceph_snap_context
*snapc
,
1161 struct rbd_req_coll
*coll
,
1164 return rbd_do_op(rq
, rbd_dev
, snapc
, CEPH_NOSNAP
,
1166 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1167 ofs
, len
, bio
, coll
, coll_index
);
1171 * Request async osd read
1173 static int rbd_req_read(struct request
*rq
,
1174 struct rbd_device
*rbd_dev
,
1178 struct rbd_req_coll
*coll
,
1181 return rbd_do_op(rq
, rbd_dev
, NULL
,
1185 ofs
, len
, bio
, coll
, coll_index
);
1189 * Request sync osd read
1191 static int rbd_req_sync_read(struct rbd_device
*rbd_dev
,
1193 const char *object_name
,
1198 struct ceph_osd_req_op
*ops
;
1201 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_READ
, 0);
1205 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1208 ops
, object_name
, ofs
, len
, buf
, NULL
, ver
);
1209 rbd_destroy_ops(ops
);
1215 * Request sync osd watch
1217 static int rbd_req_sync_notify_ack(struct rbd_device
*rbd_dev
,
1221 struct ceph_osd_req_op
*ops
;
1224 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK
, 0);
1228 ops
[0].watch
.ver
= cpu_to_le64(ver
);
1229 ops
[0].watch
.cookie
= notify_id
;
1230 ops
[0].watch
.flag
= 0;
1232 ret
= rbd_do_request(NULL
, rbd_dev
, NULL
, CEPH_NOSNAP
,
1233 rbd_dev
->header_name
, 0, 0, NULL
,
1238 rbd_simple_req_cb
, 0, NULL
);
1240 rbd_destroy_ops(ops
);
1244 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1246 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1253 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1254 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1255 (unsigned int) opcode
);
1256 rc
= rbd_refresh_header(rbd_dev
, &hver
);
1258 pr_warning(RBD_DRV_NAME
"%d got notification but failed to "
1259 " update snaps: %d\n", rbd_dev
->major
, rc
);
1261 rbd_req_sync_notify_ack(rbd_dev
, hver
, notify_id
);
1265 * Request sync osd watch
1267 static int rbd_req_sync_watch(struct rbd_device
*rbd_dev
)
1269 struct ceph_osd_req_op
*ops
;
1270 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1273 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH
, 0);
1277 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, 0,
1278 (void *)rbd_dev
, &rbd_dev
->watch_event
);
1282 ops
[0].watch
.ver
= cpu_to_le64(rbd_dev
->header
.obj_version
);
1283 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1284 ops
[0].watch
.flag
= 1;
1286 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1288 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1290 rbd_dev
->header_name
,
1292 &rbd_dev
->watch_request
, NULL
);
1297 rbd_destroy_ops(ops
);
1301 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1302 rbd_dev
->watch_event
= NULL
;
1304 rbd_destroy_ops(ops
);
1309 * Request sync osd unwatch
1311 static int rbd_req_sync_unwatch(struct rbd_device
*rbd_dev
)
1313 struct ceph_osd_req_op
*ops
;
1316 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH
, 0);
1320 ops
[0].watch
.ver
= 0;
1321 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1322 ops
[0].watch
.flag
= 0;
1324 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1326 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1328 rbd_dev
->header_name
,
1329 0, 0, NULL
, NULL
, NULL
);
1332 rbd_destroy_ops(ops
);
1333 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1334 rbd_dev
->watch_event
= NULL
;
1338 struct rbd_notify_info
{
1339 struct rbd_device
*rbd_dev
;
1342 static void rbd_notify_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1344 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1348 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1349 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1350 (unsigned int) opcode
);
1354 * Request sync osd notify
1356 static int rbd_req_sync_notify(struct rbd_device
*rbd_dev
)
1358 struct ceph_osd_req_op
*ops
;
1359 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1360 struct ceph_osd_event
*event
;
1361 struct rbd_notify_info info
;
1362 int payload_len
= sizeof(u32
) + sizeof(u32
);
1365 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY
, payload_len
);
1369 info
.rbd_dev
= rbd_dev
;
1371 ret
= ceph_osdc_create_event(osdc
, rbd_notify_cb
, 1,
1372 (void *)&info
, &event
);
1376 ops
[0].watch
.ver
= 1;
1377 ops
[0].watch
.flag
= 1;
1378 ops
[0].watch
.cookie
= event
->cookie
;
1379 ops
[0].watch
.prot_ver
= RADOS_NOTIFY_VER
;
1380 ops
[0].watch
.timeout
= 12;
1382 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1384 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1386 rbd_dev
->header_name
,
1387 0, 0, NULL
, NULL
, NULL
);
1391 ret
= ceph_osdc_wait_event(event
, CEPH_OSD_TIMEOUT_DEFAULT
);
1392 dout("ceph_osdc_wait_event returned %d\n", ret
);
1393 rbd_destroy_ops(ops
);
1397 ceph_osdc_cancel_event(event
);
1399 rbd_destroy_ops(ops
);
1404 * Request sync osd read
1406 static int rbd_req_sync_exec(struct rbd_device
*rbd_dev
,
1407 const char *object_name
,
1408 const char *class_name
,
1409 const char *method_name
,
1414 struct ceph_osd_req_op
*ops
;
1415 int class_name_len
= strlen(class_name
);
1416 int method_name_len
= strlen(method_name
);
1419 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_CALL
,
1420 class_name_len
+ method_name_len
+ len
);
1424 ops
[0].cls
.class_name
= class_name
;
1425 ops
[0].cls
.class_len
= (__u8
) class_name_len
;
1426 ops
[0].cls
.method_name
= method_name
;
1427 ops
[0].cls
.method_len
= (__u8
) method_name_len
;
1428 ops
[0].cls
.argc
= 0;
1429 ops
[0].cls
.indata
= data
;
1430 ops
[0].cls
.indata_len
= len
;
1432 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1434 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1436 object_name
, 0, 0, NULL
, NULL
, ver
);
1438 rbd_destroy_ops(ops
);
1440 dout("cls_exec returned %d\n", ret
);
1444 static struct rbd_req_coll
*rbd_alloc_coll(int num_reqs
)
1446 struct rbd_req_coll
*coll
=
1447 kzalloc(sizeof(struct rbd_req_coll
) +
1448 sizeof(struct rbd_req_status
) * num_reqs
,
1453 coll
->total
= num_reqs
;
1454 kref_init(&coll
->kref
);
1459 * block device queue callback
1461 static void rbd_rq_fn(struct request_queue
*q
)
1463 struct rbd_device
*rbd_dev
= q
->queuedata
;
1465 struct bio_pair
*bp
= NULL
;
1467 while ((rq
= blk_fetch_request(q
))) {
1469 struct bio
*rq_bio
, *next_bio
= NULL
;
1474 int num_segs
, cur_seg
= 0;
1475 struct rbd_req_coll
*coll
;
1476 struct ceph_snap_context
*snapc
;
1478 /* peek at request from block layer */
1482 dout("fetched request\n");
1484 /* filter out block requests we don't understand */
1485 if ((rq
->cmd_type
!= REQ_TYPE_FS
)) {
1486 __blk_end_request_all(rq
, 0);
1490 /* deduce our operation (read, write) */
1491 do_write
= (rq_data_dir(rq
) == WRITE
);
1493 size
= blk_rq_bytes(rq
);
1494 ofs
= blk_rq_pos(rq
) * SECTOR_SIZE
;
1496 if (do_write
&& rbd_dev
->read_only
) {
1497 __blk_end_request_all(rq
, -EROFS
);
1501 spin_unlock_irq(q
->queue_lock
);
1503 down_read(&rbd_dev
->header_rwsem
);
1505 if (rbd_dev
->snap_id
!= CEPH_NOSNAP
&& !rbd_dev
->snap_exists
) {
1506 up_read(&rbd_dev
->header_rwsem
);
1507 dout("request for non-existent snapshot");
1508 spin_lock_irq(q
->queue_lock
);
1509 __blk_end_request_all(rq
, -ENXIO
);
1513 snapc
= ceph_get_snap_context(rbd_dev
->header
.snapc
);
1515 up_read(&rbd_dev
->header_rwsem
);
1517 dout("%s 0x%x bytes at 0x%llx\n",
1518 do_write
? "write" : "read",
1519 size
, (unsigned long long) blk_rq_pos(rq
) * SECTOR_SIZE
);
1521 num_segs
= rbd_get_num_segments(&rbd_dev
->header
, ofs
, size
);
1522 coll
= rbd_alloc_coll(num_segs
);
1524 spin_lock_irq(q
->queue_lock
);
1525 __blk_end_request_all(rq
, -ENOMEM
);
1526 ceph_put_snap_context(snapc
);
1531 /* a bio clone to be passed down to OSD req */
1532 dout("rq->bio->bi_vcnt=%hu\n", rq
->bio
->bi_vcnt
);
1533 op_size
= rbd_get_segment(&rbd_dev
->header
,
1534 rbd_dev
->header
.object_prefix
,
1537 kref_get(&coll
->kref
);
1538 bio
= bio_chain_clone(&rq_bio
, &next_bio
, &bp
,
1539 op_size
, GFP_ATOMIC
);
1541 rbd_coll_end_req_index(rq
, coll
, cur_seg
,
1547 /* init OSD command: write or read */
1549 rbd_req_write(rq
, rbd_dev
,
1555 rbd_req_read(rq
, rbd_dev
,
1568 kref_put(&coll
->kref
, rbd_coll_release
);
1571 bio_pair_release(bp
);
1572 spin_lock_irq(q
->queue_lock
);
1574 ceph_put_snap_context(snapc
);
1579 * a queue callback. Makes sure that we don't create a bio that spans across
1580 * multiple osd objects. One exception would be with a single page bios,
1581 * which we handle later at bio_chain_clone
1583 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
1584 struct bio_vec
*bvec
)
1586 struct rbd_device
*rbd_dev
= q
->queuedata
;
1587 unsigned int chunk_sectors
;
1589 unsigned int bio_sectors
;
1592 chunk_sectors
= 1 << (rbd_dev
->header
.obj_order
- SECTOR_SHIFT
);
1593 sector
= bmd
->bi_sector
+ get_start_sect(bmd
->bi_bdev
);
1594 bio_sectors
= bmd
->bi_size
>> SECTOR_SHIFT
;
1596 max
= (chunk_sectors
- ((sector
& (chunk_sectors
- 1))
1597 + bio_sectors
)) << SECTOR_SHIFT
;
1599 max
= 0; /* bio_add cannot handle a negative return */
1600 if (max
<= bvec
->bv_len
&& bio_sectors
== 0)
1601 return bvec
->bv_len
;
1605 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
1607 struct gendisk
*disk
= rbd_dev
->disk
;
1612 rbd_header_free(&rbd_dev
->header
);
1614 if (disk
->flags
& GENHD_FL_UP
)
1617 blk_cleanup_queue(disk
->queue
);
1622 * Read the complete header for the given rbd device.
1624 * Returns a pointer to a dynamically-allocated buffer containing
1625 * the complete and validated header. Caller can pass the address
1626 * of a variable that will be filled in with the version of the
1627 * header object at the time it was read.
1629 * Returns a pointer-coded errno if a failure occurs.
1631 static struct rbd_image_header_ondisk
*
1632 rbd_dev_v1_header_read(struct rbd_device
*rbd_dev
, u64
*version
)
1634 struct rbd_image_header_ondisk
*ondisk
= NULL
;
1641 * The complete header will include an array of its 64-bit
1642 * snapshot ids, followed by the names of those snapshots as
1643 * a contiguous block of NUL-terminated strings. Note that
1644 * the number of snapshots could change by the time we read
1645 * it in, in which case we re-read it.
1652 size
= sizeof (*ondisk
);
1653 size
+= snap_count
* sizeof (struct rbd_image_snap_ondisk
);
1655 ondisk
= kmalloc(size
, GFP_KERNEL
);
1657 return ERR_PTR(-ENOMEM
);
1659 ret
= rbd_req_sync_read(rbd_dev
, CEPH_NOSNAP
,
1660 rbd_dev
->header_name
,
1662 (char *) ondisk
, version
);
1666 if (WARN_ON((size_t) ret
< size
)) {
1668 pr_warning("short header read for image %s"
1669 " (want %zd got %d)\n",
1670 rbd_dev
->image_name
, size
, ret
);
1673 if (!rbd_dev_ondisk_valid(ondisk
)) {
1675 pr_warning("invalid header for image %s\n",
1676 rbd_dev
->image_name
);
1680 names_size
= le64_to_cpu(ondisk
->snap_names_len
);
1681 want_count
= snap_count
;
1682 snap_count
= le32_to_cpu(ondisk
->snap_count
);
1683 } while (snap_count
!= want_count
);
1690 return ERR_PTR(ret
);
1694 * reload the ondisk the header
1696 static int rbd_read_header(struct rbd_device
*rbd_dev
,
1697 struct rbd_image_header
*header
)
1699 struct rbd_image_header_ondisk
*ondisk
;
1703 ondisk
= rbd_dev_v1_header_read(rbd_dev
, &ver
);
1705 return PTR_ERR(ondisk
);
1706 ret
= rbd_header_from_disk(header
, ondisk
);
1708 header
->obj_version
= ver
;
1717 static int rbd_header_add_snap(struct rbd_device
*rbd_dev
,
1718 const char *snap_name
,
1721 int name_len
= strlen(snap_name
);
1725 struct ceph_mon_client
*monc
;
1727 /* we should create a snapshot only if we're pointing at the head */
1728 if (rbd_dev
->snap_id
!= CEPH_NOSNAP
)
1731 monc
= &rbd_dev
->rbd_client
->client
->monc
;
1732 ret
= ceph_monc_create_snapid(monc
, rbd_dev
->pool_id
, &new_snapid
);
1733 dout("created snapid=%llu\n", (unsigned long long) new_snapid
);
1737 data
= kmalloc(name_len
+ 16, gfp_flags
);
1742 e
= data
+ name_len
+ 16;
1744 ceph_encode_string_safe(&p
, e
, snap_name
, name_len
, bad
);
1745 ceph_encode_64_safe(&p
, e
, new_snapid
, bad
);
1747 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
1749 data
, p
- data
, NULL
);
1753 return ret
< 0 ? ret
: 0;
1758 static void __rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
1760 struct rbd_snap
*snap
;
1761 struct rbd_snap
*next
;
1763 list_for_each_entry_safe(snap
, next
, &rbd_dev
->snaps
, node
)
1764 __rbd_remove_snap_dev(snap
);
1768 * only read the first part of the ondisk header, without the snaps info
1770 static int __rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
)
1773 struct rbd_image_header h
;
1775 ret
= rbd_read_header(rbd_dev
, &h
);
1779 down_write(&rbd_dev
->header_rwsem
);
1782 if (rbd_dev
->snap_id
== CEPH_NOSNAP
) {
1783 sector_t size
= (sector_t
) h
.image_size
/ SECTOR_SIZE
;
1785 dout("setting size to %llu sectors", (unsigned long long) size
);
1786 set_capacity(rbd_dev
->disk
, size
);
1789 /* rbd_dev->header.object_prefix shouldn't change */
1790 kfree(rbd_dev
->header
.snap_sizes
);
1791 kfree(rbd_dev
->header
.snap_names
);
1792 /* osd requests may still refer to snapc */
1793 ceph_put_snap_context(rbd_dev
->header
.snapc
);
1796 *hver
= h
.obj_version
;
1797 rbd_dev
->header
.obj_version
= h
.obj_version
;
1798 rbd_dev
->header
.image_size
= h
.image_size
;
1799 rbd_dev
->header
.total_snaps
= h
.total_snaps
;
1800 rbd_dev
->header
.snapc
= h
.snapc
;
1801 rbd_dev
->header
.snap_names
= h
.snap_names
;
1802 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
1803 /* Free the extra copy of the object prefix */
1804 WARN_ON(strcmp(rbd_dev
->header
.object_prefix
, h
.object_prefix
));
1805 kfree(h
.object_prefix
);
1807 ret
= __rbd_init_snaps_header(rbd_dev
);
1809 up_write(&rbd_dev
->header_rwsem
);
1814 static int rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
)
1818 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1819 ret
= __rbd_refresh_header(rbd_dev
, hver
);
1820 mutex_unlock(&ctl_mutex
);
1825 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
1827 struct gendisk
*disk
;
1828 struct request_queue
*q
;
1833 /* contact OSD, request size info about the object being mapped */
1834 rc
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
1838 /* no need to lock here, as rbd_dev is not registered yet */
1839 rc
= __rbd_init_snaps_header(rbd_dev
);
1843 rc
= rbd_header_set_snap(rbd_dev
, &total_size
);
1847 /* create gendisk info */
1849 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
1853 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), RBD_DRV_NAME
"%d",
1855 disk
->major
= rbd_dev
->major
;
1856 disk
->first_minor
= 0;
1857 disk
->fops
= &rbd_bd_ops
;
1858 disk
->private_data
= rbd_dev
;
1862 q
= blk_init_queue(rbd_rq_fn
, &rbd_dev
->lock
);
1866 /* We use the default size, but let's be explicit about it. */
1867 blk_queue_physical_block_size(q
, SECTOR_SIZE
);
1869 /* set io sizes to object size */
1870 segment_size
= rbd_obj_bytes(&rbd_dev
->header
);
1871 blk_queue_max_hw_sectors(q
, segment_size
/ SECTOR_SIZE
);
1872 blk_queue_max_segment_size(q
, segment_size
);
1873 blk_queue_io_min(q
, segment_size
);
1874 blk_queue_io_opt(q
, segment_size
);
1876 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
1879 q
->queuedata
= rbd_dev
;
1881 rbd_dev
->disk
= disk
;
1884 /* finally, announce the disk to the world */
1885 set_capacity(disk
, total_size
/ SECTOR_SIZE
);
1888 pr_info("%s: added with size 0x%llx\n",
1889 disk
->disk_name
, (unsigned long long)total_size
);
1902 static struct rbd_device
*dev_to_rbd_dev(struct device
*dev
)
1904 return container_of(dev
, struct rbd_device
, dev
);
1907 static ssize_t
rbd_size_show(struct device
*dev
,
1908 struct device_attribute
*attr
, char *buf
)
1910 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1913 down_read(&rbd_dev
->header_rwsem
);
1914 size
= get_capacity(rbd_dev
->disk
);
1915 up_read(&rbd_dev
->header_rwsem
);
1917 return sprintf(buf
, "%llu\n", (unsigned long long) size
* SECTOR_SIZE
);
1920 static ssize_t
rbd_major_show(struct device
*dev
,
1921 struct device_attribute
*attr
, char *buf
)
1923 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1925 return sprintf(buf
, "%d\n", rbd_dev
->major
);
1928 static ssize_t
rbd_client_id_show(struct device
*dev
,
1929 struct device_attribute
*attr
, char *buf
)
1931 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1933 return sprintf(buf
, "client%lld\n",
1934 ceph_client_id(rbd_dev
->rbd_client
->client
));
1937 static ssize_t
rbd_pool_show(struct device
*dev
,
1938 struct device_attribute
*attr
, char *buf
)
1940 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1942 return sprintf(buf
, "%s\n", rbd_dev
->pool_name
);
1945 static ssize_t
rbd_pool_id_show(struct device
*dev
,
1946 struct device_attribute
*attr
, char *buf
)
1948 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1950 return sprintf(buf
, "%d\n", rbd_dev
->pool_id
);
1953 static ssize_t
rbd_name_show(struct device
*dev
,
1954 struct device_attribute
*attr
, char *buf
)
1956 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1958 return sprintf(buf
, "%s\n", rbd_dev
->image_name
);
1961 static ssize_t
rbd_snap_show(struct device
*dev
,
1962 struct device_attribute
*attr
,
1965 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1967 return sprintf(buf
, "%s\n", rbd_dev
->snap_name
);
1970 static ssize_t
rbd_image_refresh(struct device
*dev
,
1971 struct device_attribute
*attr
,
1975 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1978 ret
= rbd_refresh_header(rbd_dev
, NULL
);
1980 return ret
< 0 ? ret
: size
;
1983 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
1984 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
1985 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
1986 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
1987 static DEVICE_ATTR(pool_id
, S_IRUGO
, rbd_pool_id_show
, NULL
);
1988 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
1989 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
1990 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
1991 static DEVICE_ATTR(create_snap
, S_IWUSR
, NULL
, rbd_snap_add
);
1993 static struct attribute
*rbd_attrs
[] = {
1994 &dev_attr_size
.attr
,
1995 &dev_attr_major
.attr
,
1996 &dev_attr_client_id
.attr
,
1997 &dev_attr_pool
.attr
,
1998 &dev_attr_pool_id
.attr
,
1999 &dev_attr_name
.attr
,
2000 &dev_attr_current_snap
.attr
,
2001 &dev_attr_refresh
.attr
,
2002 &dev_attr_create_snap
.attr
,
2006 static struct attribute_group rbd_attr_group
= {
2010 static const struct attribute_group
*rbd_attr_groups
[] = {
2015 static void rbd_sysfs_dev_release(struct device
*dev
)
2019 static struct device_type rbd_device_type
= {
2021 .groups
= rbd_attr_groups
,
2022 .release
= rbd_sysfs_dev_release
,
2030 static ssize_t
rbd_snap_size_show(struct device
*dev
,
2031 struct device_attribute
*attr
,
2034 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2036 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->size
);
2039 static ssize_t
rbd_snap_id_show(struct device
*dev
,
2040 struct device_attribute
*attr
,
2043 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2045 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->id
);
2048 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
2049 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
2051 static struct attribute
*rbd_snap_attrs
[] = {
2052 &dev_attr_snap_size
.attr
,
2053 &dev_attr_snap_id
.attr
,
2057 static struct attribute_group rbd_snap_attr_group
= {
2058 .attrs
= rbd_snap_attrs
,
2061 static void rbd_snap_dev_release(struct device
*dev
)
2063 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2068 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
2069 &rbd_snap_attr_group
,
2073 static struct device_type rbd_snap_device_type
= {
2074 .groups
= rbd_snap_attr_groups
,
2075 .release
= rbd_snap_dev_release
,
2078 static void __rbd_remove_snap_dev(struct rbd_snap
*snap
)
2080 list_del(&snap
->node
);
2081 device_unregister(&snap
->dev
);
2084 static int rbd_register_snap_dev(struct rbd_snap
*snap
,
2085 struct device
*parent
)
2087 struct device
*dev
= &snap
->dev
;
2090 dev
->type
= &rbd_snap_device_type
;
2091 dev
->parent
= parent
;
2092 dev
->release
= rbd_snap_dev_release
;
2093 dev_set_name(dev
, "snap_%s", snap
->name
);
2094 ret
= device_register(dev
);
2099 static struct rbd_snap
*__rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
2100 int i
, const char *name
)
2102 struct rbd_snap
*snap
;
2105 snap
= kzalloc(sizeof (*snap
), GFP_KERNEL
);
2107 return ERR_PTR(-ENOMEM
);
2110 snap
->name
= kstrdup(name
, GFP_KERNEL
);
2114 snap
->size
= rbd_dev
->header
.snap_sizes
[i
];
2115 snap
->id
= rbd_dev
->header
.snapc
->snaps
[i
];
2116 if (device_is_registered(&rbd_dev
->dev
)) {
2117 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
2128 return ERR_PTR(ret
);
2132 * Scan the rbd device's current snapshot list and compare it to the
2133 * newly-received snapshot context. Remove any existing snapshots
2134 * not present in the new snapshot context. Add a new snapshot for
2135 * any snaphots in the snapshot context not in the current list.
2136 * And verify there are no changes to snapshots we already know
2139 * Assumes the snapshots in the snapshot context are sorted by
2140 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2141 * are also maintained in that order.)
2143 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
)
2145 struct ceph_snap_context
*snapc
= rbd_dev
->header
.snapc
;
2146 const u32 snap_count
= snapc
->num_snaps
;
2147 char *snap_name
= rbd_dev
->header
.snap_names
;
2148 struct list_head
*head
= &rbd_dev
->snaps
;
2149 struct list_head
*links
= head
->next
;
2152 while (index
< snap_count
|| links
!= head
) {
2154 struct rbd_snap
*snap
;
2156 snap_id
= index
< snap_count
? snapc
->snaps
[index
]
2158 snap
= links
!= head
? list_entry(links
, struct rbd_snap
, node
)
2160 BUG_ON(snap
&& snap
->id
== CEPH_NOSNAP
);
2162 if (snap_id
== CEPH_NOSNAP
|| (snap
&& snap
->id
> snap_id
)) {
2163 struct list_head
*next
= links
->next
;
2165 /* Existing snapshot not in the new snap context */
2167 if (rbd_dev
->snap_id
== snap
->id
)
2168 rbd_dev
->snap_exists
= false;
2169 __rbd_remove_snap_dev(snap
);
2171 /* Done with this list entry; advance */
2177 if (!snap
|| (snap_id
!= CEPH_NOSNAP
&& snap
->id
< snap_id
)) {
2178 struct rbd_snap
*new_snap
;
2180 /* We haven't seen this snapshot before */
2182 new_snap
= __rbd_add_snap_dev(rbd_dev
, index
,
2184 if (IS_ERR(new_snap
))
2185 return PTR_ERR(new_snap
);
2187 /* New goes before existing, or at end of list */
2190 list_add_tail(&new_snap
->node
, &snap
->node
);
2192 list_add_tail(&new_snap
->node
, head
);
2194 /* Already have this one */
2196 BUG_ON(snap
->size
!= rbd_dev
->header
.snap_sizes
[index
]);
2197 BUG_ON(strcmp(snap
->name
, snap_name
));
2199 /* Done with this list entry; advance */
2201 links
= links
->next
;
2204 /* Advance to the next entry in the snapshot context */
2207 snap_name
+= strlen(snap_name
) + 1;
2213 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
2217 struct rbd_snap
*snap
;
2219 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2220 dev
= &rbd_dev
->dev
;
2222 dev
->bus
= &rbd_bus_type
;
2223 dev
->type
= &rbd_device_type
;
2224 dev
->parent
= &rbd_root_dev
;
2225 dev
->release
= rbd_dev_release
;
2226 dev_set_name(dev
, "%d", rbd_dev
->dev_id
);
2227 ret
= device_register(dev
);
2231 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
2232 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
2237 mutex_unlock(&ctl_mutex
);
2241 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
2243 device_unregister(&rbd_dev
->dev
);
2246 static int rbd_init_watch_dev(struct rbd_device
*rbd_dev
)
2251 ret
= rbd_req_sync_watch(rbd_dev
);
2252 if (ret
== -ERANGE
) {
2253 rc
= rbd_refresh_header(rbd_dev
, NULL
);
2257 } while (ret
== -ERANGE
);
2262 static atomic64_t rbd_id_max
= ATOMIC64_INIT(0);
2265 * Get a unique rbd identifier for the given new rbd_dev, and add
2266 * the rbd_dev to the global list. The minimum rbd id is 1.
2268 static void rbd_id_get(struct rbd_device
*rbd_dev
)
2270 rbd_dev
->dev_id
= atomic64_inc_return(&rbd_id_max
);
2272 spin_lock(&rbd_dev_list_lock
);
2273 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
2274 spin_unlock(&rbd_dev_list_lock
);
2278 * Remove an rbd_dev from the global list, and record that its
2279 * identifier is no longer in use.
2281 static void rbd_id_put(struct rbd_device
*rbd_dev
)
2283 struct list_head
*tmp
;
2284 int rbd_id
= rbd_dev
->dev_id
;
2289 spin_lock(&rbd_dev_list_lock
);
2290 list_del_init(&rbd_dev
->node
);
2293 * If the id being "put" is not the current maximum, there
2294 * is nothing special we need to do.
2296 if (rbd_id
!= atomic64_read(&rbd_id_max
)) {
2297 spin_unlock(&rbd_dev_list_lock
);
2302 * We need to update the current maximum id. Search the
2303 * list to find out what it is. We're more likely to find
2304 * the maximum at the end, so search the list backward.
2307 list_for_each_prev(tmp
, &rbd_dev_list
) {
2308 struct rbd_device
*rbd_dev
;
2310 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2311 if (rbd_id
> max_id
)
2314 spin_unlock(&rbd_dev_list_lock
);
2317 * The max id could have been updated by rbd_id_get(), in
2318 * which case it now accurately reflects the new maximum.
2319 * Be careful not to overwrite the maximum value in that
2322 atomic64_cmpxchg(&rbd_id_max
, rbd_id
, max_id
);
2326 * Skips over white space at *buf, and updates *buf to point to the
2327 * first found non-space character (if any). Returns the length of
2328 * the token (string of non-white space characters) found. Note
2329 * that *buf must be terminated with '\0'.
2331 static inline size_t next_token(const char **buf
)
2334 * These are the characters that produce nonzero for
2335 * isspace() in the "C" and "POSIX" locales.
2337 const char *spaces
= " \f\n\r\t\v";
2339 *buf
+= strspn(*buf
, spaces
); /* Find start of token */
2341 return strcspn(*buf
, spaces
); /* Return token length */
2345 * Finds the next token in *buf, and if the provided token buffer is
2346 * big enough, copies the found token into it. The result, if
2347 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2348 * must be terminated with '\0' on entry.
2350 * Returns the length of the token found (not including the '\0').
2351 * Return value will be 0 if no token is found, and it will be >=
2352 * token_size if the token would not fit.
2354 * The *buf pointer will be updated to point beyond the end of the
2355 * found token. Note that this occurs even if the token buffer is
2356 * too small to hold it.
2358 static inline size_t copy_token(const char **buf
,
2364 len
= next_token(buf
);
2365 if (len
< token_size
) {
2366 memcpy(token
, *buf
, len
);
2367 *(token
+ len
) = '\0';
2375 * Finds the next token in *buf, dynamically allocates a buffer big
2376 * enough to hold a copy of it, and copies the token into the new
2377 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2378 * that a duplicate buffer is created even for a zero-length token.
2380 * Returns a pointer to the newly-allocated duplicate, or a null
2381 * pointer if memory for the duplicate was not available. If
2382 * the lenp argument is a non-null pointer, the length of the token
2383 * (not including the '\0') is returned in *lenp.
2385 * If successful, the *buf pointer will be updated to point beyond
2386 * the end of the found token.
2388 * Note: uses GFP_KERNEL for allocation.
2390 static inline char *dup_token(const char **buf
, size_t *lenp
)
2395 len
= next_token(buf
);
2396 dup
= kmalloc(len
+ 1, GFP_KERNEL
);
2400 memcpy(dup
, *buf
, len
);
2401 *(dup
+ len
) = '\0';
2411 * This fills in the pool_name, image_name, image_name_len, snap_name,
2412 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2413 * on the list of monitor addresses and other options provided via
2416 * Note: rbd_dev is assumed to have been initially zero-filled.
2418 static int rbd_add_parse_args(struct rbd_device
*rbd_dev
,
2420 const char **mon_addrs
,
2421 size_t *mon_addrs_size
,
2423 size_t options_size
)
2428 /* The first four tokens are required */
2430 len
= next_token(&buf
);
2433 *mon_addrs_size
= len
+ 1;
2438 len
= copy_token(&buf
, options
, options_size
);
2439 if (!len
|| len
>= options_size
)
2443 rbd_dev
->pool_name
= dup_token(&buf
, NULL
);
2444 if (!rbd_dev
->pool_name
)
2447 rbd_dev
->image_name
= dup_token(&buf
, &rbd_dev
->image_name_len
);
2448 if (!rbd_dev
->image_name
)
2451 /* Create the name of the header object */
2453 rbd_dev
->header_name
= kmalloc(rbd_dev
->image_name_len
2454 + sizeof (RBD_SUFFIX
),
2456 if (!rbd_dev
->header_name
)
2458 sprintf(rbd_dev
->header_name
, "%s%s", rbd_dev
->image_name
, RBD_SUFFIX
);
2461 * The snapshot name is optional. If none is is supplied,
2462 * we use the default value.
2464 rbd_dev
->snap_name
= dup_token(&buf
, &len
);
2465 if (!rbd_dev
->snap_name
)
2468 /* Replace the empty name with the default */
2469 kfree(rbd_dev
->snap_name
);
2471 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME
), GFP_KERNEL
);
2472 if (!rbd_dev
->snap_name
)
2475 memcpy(rbd_dev
->snap_name
, RBD_SNAP_HEAD_NAME
,
2476 sizeof (RBD_SNAP_HEAD_NAME
));
2482 kfree(rbd_dev
->header_name
);
2483 rbd_dev
->header_name
= NULL
;
2484 kfree(rbd_dev
->image_name
);
2485 rbd_dev
->image_name
= NULL
;
2486 rbd_dev
->image_name_len
= 0;
2487 kfree(rbd_dev
->pool_name
);
2488 rbd_dev
->pool_name
= NULL
;
2493 static ssize_t
rbd_add(struct bus_type
*bus
,
2498 struct rbd_device
*rbd_dev
= NULL
;
2499 const char *mon_addrs
= NULL
;
2500 size_t mon_addrs_size
= 0;
2501 struct ceph_osd_client
*osdc
;
2504 if (!try_module_get(THIS_MODULE
))
2507 options
= kmalloc(count
, GFP_KERNEL
);
2510 rbd_dev
= kzalloc(sizeof(*rbd_dev
), GFP_KERNEL
);
2514 /* static rbd_device initialization */
2515 spin_lock_init(&rbd_dev
->lock
);
2516 INIT_LIST_HEAD(&rbd_dev
->node
);
2517 INIT_LIST_HEAD(&rbd_dev
->snaps
);
2518 init_rwsem(&rbd_dev
->header_rwsem
);
2520 /* generate unique id: find highest unique id, add one */
2521 rbd_id_get(rbd_dev
);
2523 /* Fill in the device name, now that we have its id. */
2524 BUILD_BUG_ON(DEV_NAME_LEN
2525 < sizeof (RBD_DRV_NAME
) + MAX_INT_FORMAT_WIDTH
);
2526 sprintf(rbd_dev
->name
, "%s%d", RBD_DRV_NAME
, rbd_dev
->dev_id
);
2528 /* parse add command */
2529 rc
= rbd_add_parse_args(rbd_dev
, buf
, &mon_addrs
, &mon_addrs_size
,
2534 rc
= rbd_get_client(rbd_dev
, mon_addrs
, mon_addrs_size
- 1, options
);
2539 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2540 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, rbd_dev
->pool_name
);
2542 goto err_out_client
;
2543 rbd_dev
->pool_id
= rc
;
2545 /* register our block device */
2546 rc
= register_blkdev(0, rbd_dev
->name
);
2548 goto err_out_client
;
2549 rbd_dev
->major
= rc
;
2551 rc
= rbd_bus_add_dev(rbd_dev
);
2553 goto err_out_blkdev
;
2556 * At this point cleanup in the event of an error is the job
2557 * of the sysfs code (initiated by rbd_bus_del_dev()).
2559 * Set up and announce blkdev mapping.
2561 rc
= rbd_init_disk(rbd_dev
);
2565 rc
= rbd_init_watch_dev(rbd_dev
);
2572 /* this will also clean up rest of rbd_dev stuff */
2574 rbd_bus_del_dev(rbd_dev
);
2579 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2581 rbd_put_client(rbd_dev
);
2583 if (rbd_dev
->pool_name
) {
2584 kfree(rbd_dev
->snap_name
);
2585 kfree(rbd_dev
->header_name
);
2586 kfree(rbd_dev
->image_name
);
2587 kfree(rbd_dev
->pool_name
);
2589 rbd_id_put(rbd_dev
);
2594 dout("Error adding device %s\n", buf
);
2595 module_put(THIS_MODULE
);
2597 return (ssize_t
) rc
;
2600 static struct rbd_device
*__rbd_get_dev(unsigned long dev_id
)
2602 struct list_head
*tmp
;
2603 struct rbd_device
*rbd_dev
;
2605 spin_lock(&rbd_dev_list_lock
);
2606 list_for_each(tmp
, &rbd_dev_list
) {
2607 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2608 if (rbd_dev
->dev_id
== dev_id
) {
2609 spin_unlock(&rbd_dev_list_lock
);
2613 spin_unlock(&rbd_dev_list_lock
);
2617 static void rbd_dev_release(struct device
*dev
)
2619 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2621 if (rbd_dev
->watch_request
) {
2622 struct ceph_client
*client
= rbd_dev
->rbd_client
->client
;
2624 ceph_osdc_unregister_linger_request(&client
->osdc
,
2625 rbd_dev
->watch_request
);
2627 if (rbd_dev
->watch_event
)
2628 rbd_req_sync_unwatch(rbd_dev
);
2630 rbd_put_client(rbd_dev
);
2632 /* clean up and free blkdev */
2633 rbd_free_disk(rbd_dev
);
2634 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2636 /* done with the id, and with the rbd_dev */
2637 kfree(rbd_dev
->snap_name
);
2638 kfree(rbd_dev
->header_name
);
2639 kfree(rbd_dev
->pool_name
);
2640 kfree(rbd_dev
->image_name
);
2641 rbd_id_put(rbd_dev
);
2644 /* release module ref */
2645 module_put(THIS_MODULE
);
2648 static ssize_t
rbd_remove(struct bus_type
*bus
,
2652 struct rbd_device
*rbd_dev
= NULL
;
2657 rc
= strict_strtoul(buf
, 10, &ul
);
2661 /* convert to int; abort if we lost anything in the conversion */
2662 target_id
= (int) ul
;
2663 if (target_id
!= ul
)
2666 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2668 rbd_dev
= __rbd_get_dev(target_id
);
2674 __rbd_remove_all_snaps(rbd_dev
);
2675 rbd_bus_del_dev(rbd_dev
);
2678 mutex_unlock(&ctl_mutex
);
2682 static ssize_t
rbd_snap_add(struct device
*dev
,
2683 struct device_attribute
*attr
,
2687 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2689 char *name
= kmalloc(count
+ 1, GFP_KERNEL
);
2693 snprintf(name
, count
, "%s", buf
);
2695 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2697 ret
= rbd_header_add_snap(rbd_dev
,
2702 ret
= __rbd_refresh_header(rbd_dev
, NULL
);
2706 /* shouldn't hold ctl_mutex when notifying.. notify might
2707 trigger a watch callback that would need to get that mutex */
2708 mutex_unlock(&ctl_mutex
);
2710 /* make a best effort, don't error if failed */
2711 rbd_req_sync_notify(rbd_dev
);
2718 mutex_unlock(&ctl_mutex
);
2724 * create control files in sysfs
2727 static int rbd_sysfs_init(void)
2731 ret
= device_register(&rbd_root_dev
);
2735 ret
= bus_register(&rbd_bus_type
);
2737 device_unregister(&rbd_root_dev
);
2742 static void rbd_sysfs_cleanup(void)
2744 bus_unregister(&rbd_bus_type
);
2745 device_unregister(&rbd_root_dev
);
2748 int __init
rbd_init(void)
2752 rc
= rbd_sysfs_init();
2755 pr_info("loaded " RBD_DRV_NAME_LONG
"\n");
2759 void __exit
rbd_exit(void)
2761 rbd_sysfs_cleanup();
2764 module_init(rbd_init
);
2765 module_exit(rbd_exit
);
2767 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2768 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2769 MODULE_DESCRIPTION("rados block device");
2771 /* following authorship retained from original osdblk.c */
2772 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2774 MODULE_LICENSE("GPL");