2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
55 /* It might be useful to have this defined elsewhere too */
57 #define U64_MAX ((u64) (~0ULL))
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
64 #define RBD_MAX_SNAP_NAME_LEN 32
65 #define RBD_MAX_OPT_LEN 1024
67 #define RBD_SNAP_HEAD_NAME "-"
70 * An RBD device name will be "rbd#", where the "rbd" comes from
71 * RBD_DRV_NAME above, and # is a unique integer identifier.
72 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
73 * enough to hold all possible device names.
75 #define DEV_NAME_LEN 32
76 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
78 #define RBD_READ_ONLY_DEFAULT false
81 * block device image metadata (in-memory version)
83 struct rbd_image_header
{
89 struct ceph_snap_context
*snapc
;
103 * an instance of the client. multiple devices may share an rbd client.
106 struct ceph_client
*client
;
108 struct list_head node
;
112 * a request completion status
114 struct rbd_req_status
{
121 * a collection of requests
123 struct rbd_req_coll
{
127 struct rbd_req_status status
[0];
131 * a single io request
134 struct request
*rq
; /* blk layer request */
135 struct bio
*bio
; /* cloned bio */
136 struct page
**pages
; /* list of used pages */
139 struct rbd_req_coll
*coll
;
146 struct list_head node
;
154 int dev_id
; /* blkdev unique id */
156 int major
; /* blkdev assigned major */
157 struct gendisk
*disk
; /* blkdev's gendisk and rq */
158 struct request_queue
*q
;
160 struct rbd_options rbd_opts
;
161 struct rbd_client
*rbd_client
;
163 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
165 spinlock_t lock
; /* queue lock */
167 struct rbd_image_header header
;
169 size_t image_name_len
;
174 struct ceph_osd_event
*watch_event
;
175 struct ceph_osd_request
*watch_request
;
177 /* protects updating the header */
178 struct rw_semaphore header_rwsem
;
179 /* name of the snapshot this device reads from */
181 /* id of the snapshot this device reads from */
182 u64 snap_id
; /* current snapshot id */
183 /* whether the snap_id this device reads from still exists */
187 struct list_head node
;
189 /* list of snapshots */
190 struct list_head snaps
;
196 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
198 static LIST_HEAD(rbd_dev_list
); /* devices */
199 static DEFINE_SPINLOCK(rbd_dev_list_lock
);
201 static LIST_HEAD(rbd_client_list
); /* clients */
202 static DEFINE_SPINLOCK(rbd_client_list_lock
);
204 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
);
205 static void rbd_dev_release(struct device
*dev
);
206 static ssize_t
rbd_snap_add(struct device
*dev
,
207 struct device_attribute
*attr
,
210 static void __rbd_remove_snap_dev(struct rbd_snap
*snap
);
212 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
,
214 static ssize_t
rbd_remove(struct bus_type
*bus
, const char *buf
,
217 static struct bus_attribute rbd_bus_attrs
[] = {
218 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
219 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
223 static struct bus_type rbd_bus_type
= {
225 .bus_attrs
= rbd_bus_attrs
,
228 static void rbd_root_dev_release(struct device
*dev
)
232 static struct device rbd_root_dev
= {
234 .release
= rbd_root_dev_release
,
238 #define rbd_assert(expr) \
239 if (unlikely(!(expr))) { \
240 printk(KERN_ERR "\nAssertion failure in %s() " \
242 "\trbd_assert(%s);\n\n", \
243 __func__, __LINE__, #expr); \
246 #else /* !RBD_DEBUG */
247 # define rbd_assert(expr) ((void) 0)
248 #endif /* !RBD_DEBUG */
250 static struct device
*rbd_get_dev(struct rbd_device
*rbd_dev
)
252 return get_device(&rbd_dev
->dev
);
255 static void rbd_put_dev(struct rbd_device
*rbd_dev
)
257 put_device(&rbd_dev
->dev
);
260 static int rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
);
262 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
264 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
266 if ((mode
& FMODE_WRITE
) && rbd_dev
->read_only
)
269 rbd_get_dev(rbd_dev
);
270 set_device_ro(bdev
, rbd_dev
->read_only
);
275 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
277 struct rbd_device
*rbd_dev
= disk
->private_data
;
279 rbd_put_dev(rbd_dev
);
284 static const struct block_device_operations rbd_bd_ops
= {
285 .owner
= THIS_MODULE
,
287 .release
= rbd_release
,
291 * Initialize an rbd client instance.
294 static struct rbd_client
*rbd_client_create(struct ceph_options
*ceph_opts
)
296 struct rbd_client
*rbdc
;
299 dout("rbd_client_create\n");
300 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
304 kref_init(&rbdc
->kref
);
305 INIT_LIST_HEAD(&rbdc
->node
);
307 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
309 rbdc
->client
= ceph_create_client(ceph_opts
, rbdc
, 0, 0);
310 if (IS_ERR(rbdc
->client
))
312 ceph_opts
= NULL
; /* Now rbdc->client is responsible for ceph_opts */
314 ret
= ceph_open_session(rbdc
->client
);
318 spin_lock(&rbd_client_list_lock
);
319 list_add_tail(&rbdc
->node
, &rbd_client_list
);
320 spin_unlock(&rbd_client_list_lock
);
322 mutex_unlock(&ctl_mutex
);
324 dout("rbd_client_create created %p\n", rbdc
);
328 ceph_destroy_client(rbdc
->client
);
330 mutex_unlock(&ctl_mutex
);
334 ceph_destroy_options(ceph_opts
);
339 * Find a ceph client with specific addr and configuration. If
340 * found, bump its reference count.
342 static struct rbd_client
*rbd_client_find(struct ceph_options
*ceph_opts
)
344 struct rbd_client
*client_node
;
347 if (ceph_opts
->flags
& CEPH_OPT_NOSHARE
)
350 spin_lock(&rbd_client_list_lock
);
351 list_for_each_entry(client_node
, &rbd_client_list
, node
) {
352 if (!ceph_compare_options(ceph_opts
, client_node
->client
)) {
353 kref_get(&client_node
->kref
);
358 spin_unlock(&rbd_client_list_lock
);
360 return found
? client_node
: NULL
;
370 /* string args above */
373 /* Boolean args above */
377 static match_table_t rbd_opts_tokens
= {
379 /* string args above */
380 {Opt_read_only
, "read_only"},
381 {Opt_read_only
, "ro"}, /* Alternate spelling */
382 {Opt_read_write
, "read_write"},
383 {Opt_read_write
, "rw"}, /* Alternate spelling */
384 /* Boolean args above */
388 static int parse_rbd_opts_token(char *c
, void *private)
390 struct rbd_options
*rbd_opts
= private;
391 substring_t argstr
[MAX_OPT_ARGS
];
392 int token
, intval
, ret
;
394 token
= match_token(c
, rbd_opts_tokens
, argstr
);
398 if (token
< Opt_last_int
) {
399 ret
= match_int(&argstr
[0], &intval
);
401 pr_err("bad mount option arg (not int) "
405 dout("got int token %d val %d\n", token
, intval
);
406 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
407 dout("got string token %d val %s\n", token
,
409 } else if (token
> Opt_last_string
&& token
< Opt_last_bool
) {
410 dout("got Boolean token %d\n", token
);
412 dout("got token %d\n", token
);
417 rbd_opts
->read_only
= true;
420 rbd_opts
->read_only
= false;
430 * Get a ceph client with specific addr and configuration, if one does
431 * not exist create it.
433 static int rbd_get_client(struct rbd_device
*rbd_dev
, const char *mon_addr
,
434 size_t mon_addr_len
, char *options
)
436 struct rbd_options
*rbd_opts
= &rbd_dev
->rbd_opts
;
437 struct ceph_options
*ceph_opts
;
438 struct rbd_client
*rbdc
;
440 rbd_opts
->read_only
= RBD_READ_ONLY_DEFAULT
;
442 ceph_opts
= ceph_parse_options(options
, mon_addr
,
443 mon_addr
+ mon_addr_len
,
444 parse_rbd_opts_token
, rbd_opts
);
445 if (IS_ERR(ceph_opts
))
446 return PTR_ERR(ceph_opts
);
448 rbdc
= rbd_client_find(ceph_opts
);
450 /* using an existing client */
451 ceph_destroy_options(ceph_opts
);
453 rbdc
= rbd_client_create(ceph_opts
);
455 return PTR_ERR(rbdc
);
457 rbd_dev
->rbd_client
= rbdc
;
463 * Destroy ceph client
465 * Caller must hold rbd_client_list_lock.
467 static void rbd_client_release(struct kref
*kref
)
469 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
471 dout("rbd_release_client %p\n", rbdc
);
472 spin_lock(&rbd_client_list_lock
);
473 list_del(&rbdc
->node
);
474 spin_unlock(&rbd_client_list_lock
);
476 ceph_destroy_client(rbdc
->client
);
481 * Drop reference to ceph client node. If it's not referenced anymore, release
484 static void rbd_put_client(struct rbd_device
*rbd_dev
)
486 kref_put(&rbd_dev
->rbd_client
->kref
, rbd_client_release
);
487 rbd_dev
->rbd_client
= NULL
;
491 * Destroy requests collection
493 static void rbd_coll_release(struct kref
*kref
)
495 struct rbd_req_coll
*coll
=
496 container_of(kref
, struct rbd_req_coll
, kref
);
498 dout("rbd_coll_release %p\n", coll
);
502 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk
*ondisk
)
507 /* The header has to start with the magic rbd header text */
508 if (memcmp(&ondisk
->text
, RBD_HEADER_TEXT
, sizeof (RBD_HEADER_TEXT
)))
512 * The size of a snapshot header has to fit in a size_t, and
513 * that limits the number of snapshots.
515 snap_count
= le32_to_cpu(ondisk
->snap_count
);
516 size
= SIZE_MAX
- sizeof (struct ceph_snap_context
);
517 if (snap_count
> size
/ sizeof (__le64
))
521 * Not only that, but the size of the entire the snapshot
522 * header must also be representable in a size_t.
524 size
-= snap_count
* sizeof (__le64
);
525 if ((u64
) size
< le64_to_cpu(ondisk
->snap_names_len
))
532 * Create a new header structure, translate header format from the on-disk
535 static int rbd_header_from_disk(struct rbd_image_header
*header
,
536 struct rbd_image_header_ondisk
*ondisk
)
543 memset(header
, 0, sizeof (*header
));
545 snap_count
= le32_to_cpu(ondisk
->snap_count
);
547 len
= strnlen(ondisk
->object_prefix
, sizeof (ondisk
->object_prefix
));
548 header
->object_prefix
= kmalloc(len
+ 1, GFP_KERNEL
);
549 if (!header
->object_prefix
)
551 memcpy(header
->object_prefix
, ondisk
->object_prefix
, len
);
552 header
->object_prefix
[len
] = '\0';
555 u64 snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
557 /* Save a copy of the snapshot names */
559 if (snap_names_len
> (u64
) SIZE_MAX
)
561 header
->snap_names
= kmalloc(snap_names_len
, GFP_KERNEL
);
562 if (!header
->snap_names
)
565 * Note that rbd_dev_v1_header_read() guarantees
566 * the ondisk buffer we're working with has
567 * snap_names_len bytes beyond the end of the
568 * snapshot id array, this memcpy() is safe.
570 memcpy(header
->snap_names
, &ondisk
->snaps
[snap_count
],
573 /* Record each snapshot's size */
575 size
= snap_count
* sizeof (*header
->snap_sizes
);
576 header
->snap_sizes
= kmalloc(size
, GFP_KERNEL
);
577 if (!header
->snap_sizes
)
579 for (i
= 0; i
< snap_count
; i
++)
580 header
->snap_sizes
[i
] =
581 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
583 WARN_ON(ondisk
->snap_names_len
);
584 header
->snap_names
= NULL
;
585 header
->snap_sizes
= NULL
;
588 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
589 header
->obj_order
= ondisk
->options
.order
;
590 header
->crypt_type
= ondisk
->options
.crypt_type
;
591 header
->comp_type
= ondisk
->options
.comp_type
;
592 header
->total_snaps
= snap_count
;
594 /* Allocate and fill in the snapshot context */
596 size
= sizeof (struct ceph_snap_context
);
597 size
+= snap_count
* sizeof (header
->snapc
->snaps
[0]);
598 header
->snapc
= kzalloc(size
, GFP_KERNEL
);
602 atomic_set(&header
->snapc
->nref
, 1);
603 header
->snapc
->seq
= le64_to_cpu(ondisk
->snap_seq
);
604 header
->snapc
->num_snaps
= snap_count
;
605 for (i
= 0; i
< snap_count
; i
++)
606 header
->snapc
->snaps
[i
] =
607 le64_to_cpu(ondisk
->snaps
[i
].id
);
612 kfree(header
->snap_sizes
);
613 header
->snap_sizes
= NULL
;
614 kfree(header
->snap_names
);
615 header
->snap_names
= NULL
;
616 kfree(header
->object_prefix
);
617 header
->object_prefix
= NULL
;
622 static int snap_by_name(struct rbd_image_header
*header
, const char *snap_name
,
626 char *p
= header
->snap_names
;
628 for (i
= 0; i
< header
->total_snaps
; i
++) {
629 if (!strcmp(snap_name
, p
)) {
631 /* Found it. Pass back its id and/or size */
634 *seq
= header
->snapc
->snaps
[i
];
636 *size
= header
->snap_sizes
[i
];
639 p
+= strlen(p
) + 1; /* Skip ahead to the next name */
644 static int rbd_header_set_snap(struct rbd_device
*rbd_dev
, u64
*size
)
648 down_write(&rbd_dev
->header_rwsem
);
650 if (!memcmp(rbd_dev
->snap_name
, RBD_SNAP_HEAD_NAME
,
651 sizeof (RBD_SNAP_HEAD_NAME
))) {
652 rbd_dev
->snap_id
= CEPH_NOSNAP
;
653 rbd_dev
->snap_exists
= false;
654 rbd_dev
->read_only
= rbd_dev
->rbd_opts
.read_only
;
656 *size
= rbd_dev
->header
.image_size
;
660 ret
= snap_by_name(&rbd_dev
->header
, rbd_dev
->snap_name
,
664 rbd_dev
->snap_id
= snap_id
;
665 rbd_dev
->snap_exists
= true;
666 rbd_dev
->read_only
= true; /* No choice for snapshots */
671 up_write(&rbd_dev
->header_rwsem
);
675 static void rbd_header_free(struct rbd_image_header
*header
)
677 kfree(header
->object_prefix
);
678 header
->object_prefix
= NULL
;
679 kfree(header
->snap_sizes
);
680 header
->snap_sizes
= NULL
;
681 kfree(header
->snap_names
);
682 header
->snap_names
= NULL
;
683 ceph_put_snap_context(header
->snapc
);
684 header
->snapc
= NULL
;
687 static char *rbd_segment_name(struct rbd_device
*rbd_dev
, u64 offset
)
693 name
= kmalloc(RBD_MAX_SEG_NAME_LEN
+ 1, GFP_NOIO
);
696 segment
= offset
>> rbd_dev
->header
.obj_order
;
697 ret
= snprintf(name
, RBD_MAX_SEG_NAME_LEN
, "%s.%012llx",
698 rbd_dev
->header
.object_prefix
, segment
);
699 if (ret
< 0 || ret
>= RBD_MAX_SEG_NAME_LEN
) {
700 pr_err("error formatting segment name for #%llu (%d)\n",
709 static u64
rbd_segment_offset(struct rbd_device
*rbd_dev
, u64 offset
)
711 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
713 return offset
& (segment_size
- 1);
716 static u64
rbd_segment_length(struct rbd_device
*rbd_dev
,
717 u64 offset
, u64 length
)
719 u64 segment_size
= (u64
) 1 << rbd_dev
->header
.obj_order
;
721 offset
&= segment_size
- 1;
723 rbd_assert(length
<= U64_MAX
- offset
);
724 if (offset
+ length
> segment_size
)
725 length
= segment_size
- offset
;
730 static int rbd_get_num_segments(struct rbd_image_header
*header
,
738 if (len
- 1 > U64_MAX
- ofs
)
741 start_seg
= ofs
>> header
->obj_order
;
742 end_seg
= (ofs
+ len
- 1) >> header
->obj_order
;
744 return end_seg
- start_seg
+ 1;
748 * returns the size of an object in the image
750 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
752 return 1 << header
->obj_order
;
759 static void bio_chain_put(struct bio
*chain
)
765 chain
= chain
->bi_next
;
771 * zeros a bio chain, starting at specific offset
773 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
782 bio_for_each_segment(bv
, chain
, i
) {
783 if (pos
+ bv
->bv_len
> start_ofs
) {
784 int remainder
= max(start_ofs
- pos
, 0);
785 buf
= bvec_kmap_irq(bv
, &flags
);
786 memset(buf
+ remainder
, 0,
787 bv
->bv_len
- remainder
);
788 bvec_kunmap_irq(buf
, &flags
);
793 chain
= chain
->bi_next
;
798 * bio_chain_clone - clone a chain of bios up to a certain length.
799 * might return a bio_pair that will need to be released.
801 static struct bio
*bio_chain_clone(struct bio
**old
, struct bio
**next
,
802 struct bio_pair
**bp
,
803 int len
, gfp_t gfpmask
)
805 struct bio
*old_chain
= *old
;
806 struct bio
*new_chain
= NULL
;
811 bio_pair_release(*bp
);
815 while (old_chain
&& (total
< len
)) {
818 tmp
= bio_kmalloc(gfpmask
, old_chain
->bi_max_vecs
);
821 gfpmask
&= ~__GFP_WAIT
; /* can't wait after the first */
823 if (total
+ old_chain
->bi_size
> len
) {
827 * this split can only happen with a single paged bio,
828 * split_bio will BUG_ON if this is not the case
830 dout("bio_chain_clone split! total=%d remaining=%d"
832 total
, len
- total
, old_chain
->bi_size
);
834 /* split the bio. We'll release it either in the next
835 call, or it will have to be released outside */
836 bp
= bio_split(old_chain
, (len
- total
) / SECTOR_SIZE
);
840 __bio_clone(tmp
, &bp
->bio1
);
844 __bio_clone(tmp
, old_chain
);
845 *next
= old_chain
->bi_next
;
855 old_chain
= old_chain
->bi_next
;
857 total
+= tmp
->bi_size
;
860 rbd_assert(total
== len
);
867 dout("bio_chain_clone with err\n");
868 bio_chain_put(new_chain
);
873 * helpers for osd request op vectors.
875 static struct ceph_osd_req_op
*rbd_create_rw_ops(int num_ops
,
876 int opcode
, u32 payload_len
)
878 struct ceph_osd_req_op
*ops
;
880 ops
= kzalloc(sizeof (*ops
) * (num_ops
+ 1), GFP_NOIO
);
887 * op extent offset and length will be set later on
888 * in calc_raw_layout()
890 ops
[0].payload_len
= payload_len
;
895 static void rbd_destroy_ops(struct ceph_osd_req_op
*ops
)
900 static void rbd_coll_end_req_index(struct request
*rq
,
901 struct rbd_req_coll
*coll
,
905 struct request_queue
*q
;
908 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
909 coll
, index
, ret
, (unsigned long long) len
);
915 blk_end_request(rq
, ret
, len
);
921 spin_lock_irq(q
->queue_lock
);
922 coll
->status
[index
].done
= 1;
923 coll
->status
[index
].rc
= ret
;
924 coll
->status
[index
].bytes
= len
;
925 max
= min
= coll
->num_done
;
926 while (max
< coll
->total
&& coll
->status
[max
].done
)
929 for (i
= min
; i
<max
; i
++) {
930 __blk_end_request(rq
, coll
->status
[i
].rc
,
931 coll
->status
[i
].bytes
);
933 kref_put(&coll
->kref
, rbd_coll_release
);
935 spin_unlock_irq(q
->queue_lock
);
938 static void rbd_coll_end_req(struct rbd_request
*req
,
941 rbd_coll_end_req_index(req
->rq
, req
->coll
, req
->coll_index
, ret
, len
);
945 * Send ceph osd request
947 static int rbd_do_request(struct request
*rq
,
948 struct rbd_device
*rbd_dev
,
949 struct ceph_snap_context
*snapc
,
951 const char *object_name
, u64 ofs
, u64 len
,
956 struct ceph_osd_req_op
*ops
,
957 struct rbd_req_coll
*coll
,
959 void (*rbd_cb
)(struct ceph_osd_request
*req
,
960 struct ceph_msg
*msg
),
961 struct ceph_osd_request
**linger_req
,
964 struct ceph_osd_request
*req
;
965 struct ceph_file_layout
*layout
;
968 struct timespec mtime
= CURRENT_TIME
;
969 struct rbd_request
*req_data
;
970 struct ceph_osd_request_head
*reqhead
;
971 struct ceph_osd_client
*osdc
;
973 req_data
= kzalloc(sizeof(*req_data
), GFP_NOIO
);
976 rbd_coll_end_req_index(rq
, coll
, coll_index
,
982 req_data
->coll
= coll
;
983 req_data
->coll_index
= coll_index
;
986 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name
,
987 (unsigned long long) ofs
, (unsigned long long) len
);
989 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
990 req
= ceph_osdc_alloc_request(osdc
, flags
, snapc
, ops
,
991 false, GFP_NOIO
, pages
, bio
);
997 req
->r_callback
= rbd_cb
;
1000 req_data
->bio
= bio
;
1001 req_data
->pages
= pages
;
1002 req_data
->len
= len
;
1004 req
->r_priv
= req_data
;
1006 reqhead
= req
->r_request
->front
.iov_base
;
1007 reqhead
->snapid
= cpu_to_le64(CEPH_NOSNAP
);
1009 strncpy(req
->r_oid
, object_name
, sizeof(req
->r_oid
));
1010 req
->r_oid_len
= strlen(req
->r_oid
);
1012 layout
= &req
->r_file_layout
;
1013 memset(layout
, 0, sizeof(*layout
));
1014 layout
->fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
1015 layout
->fl_stripe_count
= cpu_to_le32(1);
1016 layout
->fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
1017 layout
->fl_pg_pool
= cpu_to_le32(rbd_dev
->pool_id
);
1018 ceph_calc_raw_layout(osdc
, layout
, snapid
, ofs
, &len
, &bno
,
1021 ceph_osdc_build_request(req
, ofs
, &len
,
1025 req
->r_oid
, req
->r_oid_len
);
1028 ceph_osdc_set_request_linger(osdc
, req
);
1032 ret
= ceph_osdc_start_request(osdc
, req
, false);
1037 ret
= ceph_osdc_wait_request(osdc
, req
);
1039 *ver
= le64_to_cpu(req
->r_reassert_version
.version
);
1040 dout("reassert_ver=%llu\n",
1041 (unsigned long long)
1042 le64_to_cpu(req
->r_reassert_version
.version
));
1043 ceph_osdc_put_request(req
);
1048 bio_chain_put(req_data
->bio
);
1049 ceph_osdc_put_request(req
);
1051 rbd_coll_end_req(req_data
, ret
, len
);
1057 * Ceph osd op callback
1059 static void rbd_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1061 struct rbd_request
*req_data
= req
->r_priv
;
1062 struct ceph_osd_reply_head
*replyhead
;
1063 struct ceph_osd_op
*op
;
1069 replyhead
= msg
->front
.iov_base
;
1070 WARN_ON(le32_to_cpu(replyhead
->num_ops
) == 0);
1071 op
= (void *)(replyhead
+ 1);
1072 rc
= le32_to_cpu(replyhead
->result
);
1073 bytes
= le64_to_cpu(op
->extent
.length
);
1074 read_op
= (le16_to_cpu(op
->op
) == CEPH_OSD_OP_READ
);
1076 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1077 (unsigned long long) bytes
, read_op
, (int) rc
);
1079 if (rc
== -ENOENT
&& read_op
) {
1080 zero_bio_chain(req_data
->bio
, 0);
1082 } else if (rc
== 0 && read_op
&& bytes
< req_data
->len
) {
1083 zero_bio_chain(req_data
->bio
, bytes
);
1084 bytes
= req_data
->len
;
1087 rbd_coll_end_req(req_data
, rc
, bytes
);
1090 bio_chain_put(req_data
->bio
);
1092 ceph_osdc_put_request(req
);
1096 static void rbd_simple_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1098 ceph_osdc_put_request(req
);
1102 * Do a synchronous ceph osd operation
1104 static int rbd_req_sync_op(struct rbd_device
*rbd_dev
,
1105 struct ceph_snap_context
*snapc
,
1108 struct ceph_osd_req_op
*ops
,
1109 const char *object_name
,
1112 struct ceph_osd_request
**linger_req
,
1116 struct page
**pages
;
1119 rbd_assert(ops
!= NULL
);
1121 num_pages
= calc_pages_for(ofs
, len
);
1122 pages
= ceph_alloc_page_vector(num_pages
, GFP_KERNEL
);
1124 return PTR_ERR(pages
);
1126 ret
= rbd_do_request(NULL
, rbd_dev
, snapc
, snapid
,
1127 object_name
, ofs
, len
, NULL
,
1137 if ((flags
& CEPH_OSD_FLAG_READ
) && buf
)
1138 ret
= ceph_copy_from_page_vector(pages
, buf
, ofs
, ret
);
1141 ceph_release_page_vector(pages
, num_pages
);
1146 * Do an asynchronous ceph osd operation
1148 static int rbd_do_op(struct request
*rq
,
1149 struct rbd_device
*rbd_dev
,
1150 struct ceph_snap_context
*snapc
,
1152 int opcode
, int flags
,
1155 struct rbd_req_coll
*coll
,
1162 struct ceph_osd_req_op
*ops
;
1165 seg_name
= rbd_segment_name(rbd_dev
, ofs
);
1168 seg_len
= rbd_segment_length(rbd_dev
, ofs
, len
);
1169 seg_ofs
= rbd_segment_offset(rbd_dev
, ofs
);
1171 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? seg_len
: 0);
1174 ops
= rbd_create_rw_ops(1, opcode
, payload_len
);
1178 /* we've taken care of segment sizes earlier when we
1179 cloned the bios. We should never have a segment
1180 truncated at this point */
1181 rbd_assert(seg_len
== len
);
1183 ret
= rbd_do_request(rq
, rbd_dev
, snapc
, snapid
,
1184 seg_name
, seg_ofs
, seg_len
,
1190 rbd_req_cb
, 0, NULL
);
1192 rbd_destroy_ops(ops
);
1199 * Request async osd write
1201 static int rbd_req_write(struct request
*rq
,
1202 struct rbd_device
*rbd_dev
,
1203 struct ceph_snap_context
*snapc
,
1206 struct rbd_req_coll
*coll
,
1209 return rbd_do_op(rq
, rbd_dev
, snapc
, CEPH_NOSNAP
,
1211 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1212 ofs
, len
, bio
, coll
, coll_index
);
1216 * Request async osd read
1218 static int rbd_req_read(struct request
*rq
,
1219 struct rbd_device
*rbd_dev
,
1223 struct rbd_req_coll
*coll
,
1226 return rbd_do_op(rq
, rbd_dev
, NULL
,
1230 ofs
, len
, bio
, coll
, coll_index
);
1234 * Request sync osd read
1236 static int rbd_req_sync_read(struct rbd_device
*rbd_dev
,
1238 const char *object_name
,
1243 struct ceph_osd_req_op
*ops
;
1246 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_READ
, 0);
1250 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1253 ops
, object_name
, ofs
, len
, buf
, NULL
, ver
);
1254 rbd_destroy_ops(ops
);
1260 * Request sync osd watch
1262 static int rbd_req_sync_notify_ack(struct rbd_device
*rbd_dev
,
1266 struct ceph_osd_req_op
*ops
;
1269 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK
, 0);
1273 ops
[0].watch
.ver
= cpu_to_le64(ver
);
1274 ops
[0].watch
.cookie
= notify_id
;
1275 ops
[0].watch
.flag
= 0;
1277 ret
= rbd_do_request(NULL
, rbd_dev
, NULL
, CEPH_NOSNAP
,
1278 rbd_dev
->header_name
, 0, 0, NULL
,
1283 rbd_simple_req_cb
, 0, NULL
);
1285 rbd_destroy_ops(ops
);
1289 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1291 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1298 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1299 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1300 (unsigned int) opcode
);
1301 rc
= rbd_refresh_header(rbd_dev
, &hver
);
1303 pr_warning(RBD_DRV_NAME
"%d got notification but failed to "
1304 " update snaps: %d\n", rbd_dev
->major
, rc
);
1306 rbd_req_sync_notify_ack(rbd_dev
, hver
, notify_id
);
1310 * Request sync osd watch
1312 static int rbd_req_sync_watch(struct rbd_device
*rbd_dev
)
1314 struct ceph_osd_req_op
*ops
;
1315 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1318 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH
, 0);
1322 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, 0,
1323 (void *)rbd_dev
, &rbd_dev
->watch_event
);
1327 ops
[0].watch
.ver
= cpu_to_le64(rbd_dev
->header
.obj_version
);
1328 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1329 ops
[0].watch
.flag
= 1;
1331 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1333 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1335 rbd_dev
->header_name
,
1337 &rbd_dev
->watch_request
, NULL
);
1342 rbd_destroy_ops(ops
);
1346 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1347 rbd_dev
->watch_event
= NULL
;
1349 rbd_destroy_ops(ops
);
1354 * Request sync osd unwatch
1356 static int rbd_req_sync_unwatch(struct rbd_device
*rbd_dev
)
1358 struct ceph_osd_req_op
*ops
;
1361 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH
, 0);
1365 ops
[0].watch
.ver
= 0;
1366 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1367 ops
[0].watch
.flag
= 0;
1369 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1371 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1373 rbd_dev
->header_name
,
1374 0, 0, NULL
, NULL
, NULL
);
1377 rbd_destroy_ops(ops
);
1378 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1379 rbd_dev
->watch_event
= NULL
;
1383 struct rbd_notify_info
{
1384 struct rbd_device
*rbd_dev
;
1387 static void rbd_notify_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1389 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1393 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1394 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1395 (unsigned int) opcode
);
1399 * Request sync osd notify
1401 static int rbd_req_sync_notify(struct rbd_device
*rbd_dev
)
1403 struct ceph_osd_req_op
*ops
;
1404 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1405 struct ceph_osd_event
*event
;
1406 struct rbd_notify_info info
;
1407 int payload_len
= sizeof(u32
) + sizeof(u32
);
1410 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY
, payload_len
);
1414 info
.rbd_dev
= rbd_dev
;
1416 ret
= ceph_osdc_create_event(osdc
, rbd_notify_cb
, 1,
1417 (void *)&info
, &event
);
1421 ops
[0].watch
.ver
= 1;
1422 ops
[0].watch
.flag
= 1;
1423 ops
[0].watch
.cookie
= event
->cookie
;
1424 ops
[0].watch
.prot_ver
= RADOS_NOTIFY_VER
;
1425 ops
[0].watch
.timeout
= 12;
1427 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1429 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1431 rbd_dev
->header_name
,
1432 0, 0, NULL
, NULL
, NULL
);
1436 ret
= ceph_osdc_wait_event(event
, CEPH_OSD_TIMEOUT_DEFAULT
);
1437 dout("ceph_osdc_wait_event returned %d\n", ret
);
1438 rbd_destroy_ops(ops
);
1442 ceph_osdc_cancel_event(event
);
1444 rbd_destroy_ops(ops
);
1449 * Request sync osd read
1451 static int rbd_req_sync_exec(struct rbd_device
*rbd_dev
,
1452 const char *object_name
,
1453 const char *class_name
,
1454 const char *method_name
,
1459 struct ceph_osd_req_op
*ops
;
1460 int class_name_len
= strlen(class_name
);
1461 int method_name_len
= strlen(method_name
);
1464 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_CALL
,
1465 class_name_len
+ method_name_len
+ len
);
1469 ops
[0].cls
.class_name
= class_name
;
1470 ops
[0].cls
.class_len
= (__u8
) class_name_len
;
1471 ops
[0].cls
.method_name
= method_name
;
1472 ops
[0].cls
.method_len
= (__u8
) method_name_len
;
1473 ops
[0].cls
.argc
= 0;
1474 ops
[0].cls
.indata
= data
;
1475 ops
[0].cls
.indata_len
= len
;
1477 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1479 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1481 object_name
, 0, 0, NULL
, NULL
, ver
);
1483 rbd_destroy_ops(ops
);
1485 dout("cls_exec returned %d\n", ret
);
1489 static struct rbd_req_coll
*rbd_alloc_coll(int num_reqs
)
1491 struct rbd_req_coll
*coll
=
1492 kzalloc(sizeof(struct rbd_req_coll
) +
1493 sizeof(struct rbd_req_status
) * num_reqs
,
1498 coll
->total
= num_reqs
;
1499 kref_init(&coll
->kref
);
1504 * block device queue callback
1506 static void rbd_rq_fn(struct request_queue
*q
)
1508 struct rbd_device
*rbd_dev
= q
->queuedata
;
1510 struct bio_pair
*bp
= NULL
;
1512 while ((rq
= blk_fetch_request(q
))) {
1514 struct bio
*rq_bio
, *next_bio
= NULL
;
1519 int num_segs
, cur_seg
= 0;
1520 struct rbd_req_coll
*coll
;
1521 struct ceph_snap_context
*snapc
;
1523 dout("fetched request\n");
1525 /* filter out block requests we don't understand */
1526 if ((rq
->cmd_type
!= REQ_TYPE_FS
)) {
1527 __blk_end_request_all(rq
, 0);
1531 /* deduce our operation (read, write) */
1532 do_write
= (rq_data_dir(rq
) == WRITE
);
1534 size
= blk_rq_bytes(rq
);
1535 ofs
= blk_rq_pos(rq
) * SECTOR_SIZE
;
1537 if (do_write
&& rbd_dev
->read_only
) {
1538 __blk_end_request_all(rq
, -EROFS
);
1542 spin_unlock_irq(q
->queue_lock
);
1544 down_read(&rbd_dev
->header_rwsem
);
1546 if (rbd_dev
->snap_id
!= CEPH_NOSNAP
&& !rbd_dev
->snap_exists
) {
1547 up_read(&rbd_dev
->header_rwsem
);
1548 dout("request for non-existent snapshot");
1549 spin_lock_irq(q
->queue_lock
);
1550 __blk_end_request_all(rq
, -ENXIO
);
1554 snapc
= ceph_get_snap_context(rbd_dev
->header
.snapc
);
1556 up_read(&rbd_dev
->header_rwsem
);
1558 dout("%s 0x%x bytes at 0x%llx\n",
1559 do_write
? "write" : "read",
1560 size
, (unsigned long long) blk_rq_pos(rq
) * SECTOR_SIZE
);
1562 num_segs
= rbd_get_num_segments(&rbd_dev
->header
, ofs
, size
);
1563 if (num_segs
<= 0) {
1564 spin_lock_irq(q
->queue_lock
);
1565 __blk_end_request_all(rq
, num_segs
);
1566 ceph_put_snap_context(snapc
);
1569 coll
= rbd_alloc_coll(num_segs
);
1571 spin_lock_irq(q
->queue_lock
);
1572 __blk_end_request_all(rq
, -ENOMEM
);
1573 ceph_put_snap_context(snapc
);
1578 /* a bio clone to be passed down to OSD req */
1579 dout("rq->bio->bi_vcnt=%hu\n", rq
->bio
->bi_vcnt
);
1580 op_size
= rbd_segment_length(rbd_dev
, ofs
, size
);
1581 kref_get(&coll
->kref
);
1582 bio
= bio_chain_clone(&rq_bio
, &next_bio
, &bp
,
1583 op_size
, GFP_ATOMIC
);
1585 rbd_coll_end_req_index(rq
, coll
, cur_seg
,
1591 /* init OSD command: write or read */
1593 rbd_req_write(rq
, rbd_dev
,
1599 rbd_req_read(rq
, rbd_dev
,
1612 kref_put(&coll
->kref
, rbd_coll_release
);
1615 bio_pair_release(bp
);
1616 spin_lock_irq(q
->queue_lock
);
1618 ceph_put_snap_context(snapc
);
1623 * a queue callback. Makes sure that we don't create a bio that spans across
1624 * multiple osd objects. One exception would be with a single page bios,
1625 * which we handle later at bio_chain_clone
1627 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
1628 struct bio_vec
*bvec
)
1630 struct rbd_device
*rbd_dev
= q
->queuedata
;
1631 unsigned int chunk_sectors
;
1633 unsigned int bio_sectors
;
1636 chunk_sectors
= 1 << (rbd_dev
->header
.obj_order
- SECTOR_SHIFT
);
1637 sector
= bmd
->bi_sector
+ get_start_sect(bmd
->bi_bdev
);
1638 bio_sectors
= bmd
->bi_size
>> SECTOR_SHIFT
;
1640 max
= (chunk_sectors
- ((sector
& (chunk_sectors
- 1))
1641 + bio_sectors
)) << SECTOR_SHIFT
;
1643 max
= 0; /* bio_add cannot handle a negative return */
1644 if (max
<= bvec
->bv_len
&& bio_sectors
== 0)
1645 return bvec
->bv_len
;
1649 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
1651 struct gendisk
*disk
= rbd_dev
->disk
;
1656 rbd_header_free(&rbd_dev
->header
);
1658 if (disk
->flags
& GENHD_FL_UP
)
1661 blk_cleanup_queue(disk
->queue
);
1666 * Read the complete header for the given rbd device.
1668 * Returns a pointer to a dynamically-allocated buffer containing
1669 * the complete and validated header. Caller can pass the address
1670 * of a variable that will be filled in with the version of the
1671 * header object at the time it was read.
1673 * Returns a pointer-coded errno if a failure occurs.
1675 static struct rbd_image_header_ondisk
*
1676 rbd_dev_v1_header_read(struct rbd_device
*rbd_dev
, u64
*version
)
1678 struct rbd_image_header_ondisk
*ondisk
= NULL
;
1685 * The complete header will include an array of its 64-bit
1686 * snapshot ids, followed by the names of those snapshots as
1687 * a contiguous block of NUL-terminated strings. Note that
1688 * the number of snapshots could change by the time we read
1689 * it in, in which case we re-read it.
1696 size
= sizeof (*ondisk
);
1697 size
+= snap_count
* sizeof (struct rbd_image_snap_ondisk
);
1699 ondisk
= kmalloc(size
, GFP_KERNEL
);
1701 return ERR_PTR(-ENOMEM
);
1703 ret
= rbd_req_sync_read(rbd_dev
, CEPH_NOSNAP
,
1704 rbd_dev
->header_name
,
1706 (char *) ondisk
, version
);
1710 if (WARN_ON((size_t) ret
< size
)) {
1712 pr_warning("short header read for image %s"
1713 " (want %zd got %d)\n",
1714 rbd_dev
->image_name
, size
, ret
);
1717 if (!rbd_dev_ondisk_valid(ondisk
)) {
1719 pr_warning("invalid header for image %s\n",
1720 rbd_dev
->image_name
);
1724 names_size
= le64_to_cpu(ondisk
->snap_names_len
);
1725 want_count
= snap_count
;
1726 snap_count
= le32_to_cpu(ondisk
->snap_count
);
1727 } while (snap_count
!= want_count
);
1734 return ERR_PTR(ret
);
1738 * reload the ondisk the header
1740 static int rbd_read_header(struct rbd_device
*rbd_dev
,
1741 struct rbd_image_header
*header
)
1743 struct rbd_image_header_ondisk
*ondisk
;
1747 ondisk
= rbd_dev_v1_header_read(rbd_dev
, &ver
);
1749 return PTR_ERR(ondisk
);
1750 ret
= rbd_header_from_disk(header
, ondisk
);
1752 header
->obj_version
= ver
;
1761 static int rbd_header_add_snap(struct rbd_device
*rbd_dev
,
1762 const char *snap_name
,
1765 int name_len
= strlen(snap_name
);
1769 struct ceph_mon_client
*monc
;
1771 /* we should create a snapshot only if we're pointing at the head */
1772 if (rbd_dev
->snap_id
!= CEPH_NOSNAP
)
1775 monc
= &rbd_dev
->rbd_client
->client
->monc
;
1776 ret
= ceph_monc_create_snapid(monc
, rbd_dev
->pool_id
, &new_snapid
);
1777 dout("created snapid=%llu\n", (unsigned long long) new_snapid
);
1781 data
= kmalloc(name_len
+ 16, gfp_flags
);
1786 e
= data
+ name_len
+ 16;
1788 ceph_encode_string_safe(&p
, e
, snap_name
, name_len
, bad
);
1789 ceph_encode_64_safe(&p
, e
, new_snapid
, bad
);
1791 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
1793 data
, p
- data
, NULL
);
1797 return ret
< 0 ? ret
: 0;
1802 static void __rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
1804 struct rbd_snap
*snap
;
1805 struct rbd_snap
*next
;
1807 list_for_each_entry_safe(snap
, next
, &rbd_dev
->snaps
, node
)
1808 __rbd_remove_snap_dev(snap
);
1812 * only read the first part of the ondisk header, without the snaps info
1814 static int __rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
)
1817 struct rbd_image_header h
;
1819 ret
= rbd_read_header(rbd_dev
, &h
);
1823 down_write(&rbd_dev
->header_rwsem
);
1826 if (rbd_dev
->snap_id
== CEPH_NOSNAP
) {
1827 sector_t size
= (sector_t
) h
.image_size
/ SECTOR_SIZE
;
1829 dout("setting size to %llu sectors", (unsigned long long) size
);
1830 set_capacity(rbd_dev
->disk
, size
);
1833 /* rbd_dev->header.object_prefix shouldn't change */
1834 kfree(rbd_dev
->header
.snap_sizes
);
1835 kfree(rbd_dev
->header
.snap_names
);
1836 /* osd requests may still refer to snapc */
1837 ceph_put_snap_context(rbd_dev
->header
.snapc
);
1840 *hver
= h
.obj_version
;
1841 rbd_dev
->header
.obj_version
= h
.obj_version
;
1842 rbd_dev
->header
.image_size
= h
.image_size
;
1843 rbd_dev
->header
.total_snaps
= h
.total_snaps
;
1844 rbd_dev
->header
.snapc
= h
.snapc
;
1845 rbd_dev
->header
.snap_names
= h
.snap_names
;
1846 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
1847 /* Free the extra copy of the object prefix */
1848 WARN_ON(strcmp(rbd_dev
->header
.object_prefix
, h
.object_prefix
));
1849 kfree(h
.object_prefix
);
1851 ret
= __rbd_init_snaps_header(rbd_dev
);
1853 up_write(&rbd_dev
->header_rwsem
);
1858 static int rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
)
1862 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1863 ret
= __rbd_refresh_header(rbd_dev
, hver
);
1864 mutex_unlock(&ctl_mutex
);
1869 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
1871 struct gendisk
*disk
;
1872 struct request_queue
*q
;
1877 /* contact OSD, request size info about the object being mapped */
1878 rc
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
1882 /* no need to lock here, as rbd_dev is not registered yet */
1883 rc
= __rbd_init_snaps_header(rbd_dev
);
1887 rc
= rbd_header_set_snap(rbd_dev
, &total_size
);
1891 /* create gendisk info */
1893 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
1897 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), RBD_DRV_NAME
"%d",
1899 disk
->major
= rbd_dev
->major
;
1900 disk
->first_minor
= 0;
1901 disk
->fops
= &rbd_bd_ops
;
1902 disk
->private_data
= rbd_dev
;
1906 q
= blk_init_queue(rbd_rq_fn
, &rbd_dev
->lock
);
1910 /* We use the default size, but let's be explicit about it. */
1911 blk_queue_physical_block_size(q
, SECTOR_SIZE
);
1913 /* set io sizes to object size */
1914 segment_size
= rbd_obj_bytes(&rbd_dev
->header
);
1915 blk_queue_max_hw_sectors(q
, segment_size
/ SECTOR_SIZE
);
1916 blk_queue_max_segment_size(q
, segment_size
);
1917 blk_queue_io_min(q
, segment_size
);
1918 blk_queue_io_opt(q
, segment_size
);
1920 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
1923 q
->queuedata
= rbd_dev
;
1925 rbd_dev
->disk
= disk
;
1928 /* finally, announce the disk to the world */
1929 set_capacity(disk
, total_size
/ SECTOR_SIZE
);
1932 pr_info("%s: added with size 0x%llx\n",
1933 disk
->disk_name
, (unsigned long long)total_size
);
1946 static struct rbd_device
*dev_to_rbd_dev(struct device
*dev
)
1948 return container_of(dev
, struct rbd_device
, dev
);
1951 static ssize_t
rbd_size_show(struct device
*dev
,
1952 struct device_attribute
*attr
, char *buf
)
1954 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1957 down_read(&rbd_dev
->header_rwsem
);
1958 size
= get_capacity(rbd_dev
->disk
);
1959 up_read(&rbd_dev
->header_rwsem
);
1961 return sprintf(buf
, "%llu\n", (unsigned long long) size
* SECTOR_SIZE
);
1964 static ssize_t
rbd_major_show(struct device
*dev
,
1965 struct device_attribute
*attr
, char *buf
)
1967 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1969 return sprintf(buf
, "%d\n", rbd_dev
->major
);
1972 static ssize_t
rbd_client_id_show(struct device
*dev
,
1973 struct device_attribute
*attr
, char *buf
)
1975 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1977 return sprintf(buf
, "client%lld\n",
1978 ceph_client_id(rbd_dev
->rbd_client
->client
));
1981 static ssize_t
rbd_pool_show(struct device
*dev
,
1982 struct device_attribute
*attr
, char *buf
)
1984 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1986 return sprintf(buf
, "%s\n", rbd_dev
->pool_name
);
1989 static ssize_t
rbd_pool_id_show(struct device
*dev
,
1990 struct device_attribute
*attr
, char *buf
)
1992 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1994 return sprintf(buf
, "%d\n", rbd_dev
->pool_id
);
1997 static ssize_t
rbd_name_show(struct device
*dev
,
1998 struct device_attribute
*attr
, char *buf
)
2000 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2002 return sprintf(buf
, "%s\n", rbd_dev
->image_name
);
2005 static ssize_t
rbd_snap_show(struct device
*dev
,
2006 struct device_attribute
*attr
,
2009 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2011 return sprintf(buf
, "%s\n", rbd_dev
->snap_name
);
2014 static ssize_t
rbd_image_refresh(struct device
*dev
,
2015 struct device_attribute
*attr
,
2019 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2022 ret
= rbd_refresh_header(rbd_dev
, NULL
);
2024 return ret
< 0 ? ret
: size
;
2027 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
2028 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
2029 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
2030 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
2031 static DEVICE_ATTR(pool_id
, S_IRUGO
, rbd_pool_id_show
, NULL
);
2032 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
2033 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
2034 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
2035 static DEVICE_ATTR(create_snap
, S_IWUSR
, NULL
, rbd_snap_add
);
2037 static struct attribute
*rbd_attrs
[] = {
2038 &dev_attr_size
.attr
,
2039 &dev_attr_major
.attr
,
2040 &dev_attr_client_id
.attr
,
2041 &dev_attr_pool
.attr
,
2042 &dev_attr_pool_id
.attr
,
2043 &dev_attr_name
.attr
,
2044 &dev_attr_current_snap
.attr
,
2045 &dev_attr_refresh
.attr
,
2046 &dev_attr_create_snap
.attr
,
2050 static struct attribute_group rbd_attr_group
= {
2054 static const struct attribute_group
*rbd_attr_groups
[] = {
2059 static void rbd_sysfs_dev_release(struct device
*dev
)
2063 static struct device_type rbd_device_type
= {
2065 .groups
= rbd_attr_groups
,
2066 .release
= rbd_sysfs_dev_release
,
2074 static ssize_t
rbd_snap_size_show(struct device
*dev
,
2075 struct device_attribute
*attr
,
2078 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2080 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->size
);
2083 static ssize_t
rbd_snap_id_show(struct device
*dev
,
2084 struct device_attribute
*attr
,
2087 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2089 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->id
);
2092 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
2093 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
2095 static struct attribute
*rbd_snap_attrs
[] = {
2096 &dev_attr_snap_size
.attr
,
2097 &dev_attr_snap_id
.attr
,
2101 static struct attribute_group rbd_snap_attr_group
= {
2102 .attrs
= rbd_snap_attrs
,
2105 static void rbd_snap_dev_release(struct device
*dev
)
2107 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2112 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
2113 &rbd_snap_attr_group
,
2117 static struct device_type rbd_snap_device_type
= {
2118 .groups
= rbd_snap_attr_groups
,
2119 .release
= rbd_snap_dev_release
,
2122 static void __rbd_remove_snap_dev(struct rbd_snap
*snap
)
2124 list_del(&snap
->node
);
2125 device_unregister(&snap
->dev
);
2128 static int rbd_register_snap_dev(struct rbd_snap
*snap
,
2129 struct device
*parent
)
2131 struct device
*dev
= &snap
->dev
;
2134 dev
->type
= &rbd_snap_device_type
;
2135 dev
->parent
= parent
;
2136 dev
->release
= rbd_snap_dev_release
;
2137 dev_set_name(dev
, "snap_%s", snap
->name
);
2138 ret
= device_register(dev
);
2143 static struct rbd_snap
*__rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
2144 int i
, const char *name
)
2146 struct rbd_snap
*snap
;
2149 snap
= kzalloc(sizeof (*snap
), GFP_KERNEL
);
2151 return ERR_PTR(-ENOMEM
);
2154 snap
->name
= kstrdup(name
, GFP_KERNEL
);
2158 snap
->size
= rbd_dev
->header
.snap_sizes
[i
];
2159 snap
->id
= rbd_dev
->header
.snapc
->snaps
[i
];
2160 if (device_is_registered(&rbd_dev
->dev
)) {
2161 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
2172 return ERR_PTR(ret
);
2176 * Scan the rbd device's current snapshot list and compare it to the
2177 * newly-received snapshot context. Remove any existing snapshots
2178 * not present in the new snapshot context. Add a new snapshot for
2179 * any snaphots in the snapshot context not in the current list.
2180 * And verify there are no changes to snapshots we already know
2183 * Assumes the snapshots in the snapshot context are sorted by
2184 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2185 * are also maintained in that order.)
2187 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
)
2189 struct ceph_snap_context
*snapc
= rbd_dev
->header
.snapc
;
2190 const u32 snap_count
= snapc
->num_snaps
;
2191 char *snap_name
= rbd_dev
->header
.snap_names
;
2192 struct list_head
*head
= &rbd_dev
->snaps
;
2193 struct list_head
*links
= head
->next
;
2196 while (index
< snap_count
|| links
!= head
) {
2198 struct rbd_snap
*snap
;
2200 snap_id
= index
< snap_count
? snapc
->snaps
[index
]
2202 snap
= links
!= head
? list_entry(links
, struct rbd_snap
, node
)
2204 rbd_assert(!snap
|| snap
->id
!= CEPH_NOSNAP
);
2206 if (snap_id
== CEPH_NOSNAP
|| (snap
&& snap
->id
> snap_id
)) {
2207 struct list_head
*next
= links
->next
;
2209 /* Existing snapshot not in the new snap context */
2211 if (rbd_dev
->snap_id
== snap
->id
)
2212 rbd_dev
->snap_exists
= false;
2213 __rbd_remove_snap_dev(snap
);
2215 /* Done with this list entry; advance */
2221 if (!snap
|| (snap_id
!= CEPH_NOSNAP
&& snap
->id
< snap_id
)) {
2222 struct rbd_snap
*new_snap
;
2224 /* We haven't seen this snapshot before */
2226 new_snap
= __rbd_add_snap_dev(rbd_dev
, index
,
2228 if (IS_ERR(new_snap
))
2229 return PTR_ERR(new_snap
);
2231 /* New goes before existing, or at end of list */
2234 list_add_tail(&new_snap
->node
, &snap
->node
);
2236 list_add_tail(&new_snap
->node
, head
);
2238 /* Already have this one */
2240 rbd_assert(snap
->size
==
2241 rbd_dev
->header
.snap_sizes
[index
]);
2242 rbd_assert(!strcmp(snap
->name
, snap_name
));
2244 /* Done with this list entry; advance */
2246 links
= links
->next
;
2249 /* Advance to the next entry in the snapshot context */
2252 snap_name
+= strlen(snap_name
) + 1;
2258 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
2262 struct rbd_snap
*snap
;
2264 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2265 dev
= &rbd_dev
->dev
;
2267 dev
->bus
= &rbd_bus_type
;
2268 dev
->type
= &rbd_device_type
;
2269 dev
->parent
= &rbd_root_dev
;
2270 dev
->release
= rbd_dev_release
;
2271 dev_set_name(dev
, "%d", rbd_dev
->dev_id
);
2272 ret
= device_register(dev
);
2276 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
2277 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
2282 mutex_unlock(&ctl_mutex
);
2286 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
2288 device_unregister(&rbd_dev
->dev
);
2291 static int rbd_init_watch_dev(struct rbd_device
*rbd_dev
)
2296 ret
= rbd_req_sync_watch(rbd_dev
);
2297 if (ret
== -ERANGE
) {
2298 rc
= rbd_refresh_header(rbd_dev
, NULL
);
2302 } while (ret
== -ERANGE
);
2307 static atomic64_t rbd_dev_id_max
= ATOMIC64_INIT(0);
2310 * Get a unique rbd identifier for the given new rbd_dev, and add
2311 * the rbd_dev to the global list. The minimum rbd id is 1.
2313 static void rbd_dev_id_get(struct rbd_device
*rbd_dev
)
2315 rbd_dev
->dev_id
= atomic64_inc_return(&rbd_dev_id_max
);
2317 spin_lock(&rbd_dev_list_lock
);
2318 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
2319 spin_unlock(&rbd_dev_list_lock
);
2320 dout("rbd_dev %p given dev id %llu\n", rbd_dev
,
2321 (unsigned long long) rbd_dev
->dev_id
);
2325 * Remove an rbd_dev from the global list, and record that its
2326 * identifier is no longer in use.
2328 static void rbd_dev_id_put(struct rbd_device
*rbd_dev
)
2330 struct list_head
*tmp
;
2331 int rbd_id
= rbd_dev
->dev_id
;
2334 rbd_assert(rbd_id
> 0);
2336 dout("rbd_dev %p released dev id %llu\n", rbd_dev
,
2337 (unsigned long long) rbd_dev
->dev_id
);
2338 spin_lock(&rbd_dev_list_lock
);
2339 list_del_init(&rbd_dev
->node
);
2342 * If the id being "put" is not the current maximum, there
2343 * is nothing special we need to do.
2345 if (rbd_id
!= atomic64_read(&rbd_dev_id_max
)) {
2346 spin_unlock(&rbd_dev_list_lock
);
2351 * We need to update the current maximum id. Search the
2352 * list to find out what it is. We're more likely to find
2353 * the maximum at the end, so search the list backward.
2356 list_for_each_prev(tmp
, &rbd_dev_list
) {
2357 struct rbd_device
*rbd_dev
;
2359 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2360 if (rbd_id
> max_id
)
2363 spin_unlock(&rbd_dev_list_lock
);
2366 * The max id could have been updated by rbd_dev_id_get(), in
2367 * which case it now accurately reflects the new maximum.
2368 * Be careful not to overwrite the maximum value in that
2371 atomic64_cmpxchg(&rbd_dev_id_max
, rbd_id
, max_id
);
2372 dout(" max dev id has been reset\n");
2376 * Skips over white space at *buf, and updates *buf to point to the
2377 * first found non-space character (if any). Returns the length of
2378 * the token (string of non-white space characters) found. Note
2379 * that *buf must be terminated with '\0'.
2381 static inline size_t next_token(const char **buf
)
2384 * These are the characters that produce nonzero for
2385 * isspace() in the "C" and "POSIX" locales.
2387 const char *spaces
= " \f\n\r\t\v";
2389 *buf
+= strspn(*buf
, spaces
); /* Find start of token */
2391 return strcspn(*buf
, spaces
); /* Return token length */
2395 * Finds the next token in *buf, and if the provided token buffer is
2396 * big enough, copies the found token into it. The result, if
2397 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2398 * must be terminated with '\0' on entry.
2400 * Returns the length of the token found (not including the '\0').
2401 * Return value will be 0 if no token is found, and it will be >=
2402 * token_size if the token would not fit.
2404 * The *buf pointer will be updated to point beyond the end of the
2405 * found token. Note that this occurs even if the token buffer is
2406 * too small to hold it.
2408 static inline size_t copy_token(const char **buf
,
2414 len
= next_token(buf
);
2415 if (len
< token_size
) {
2416 memcpy(token
, *buf
, len
);
2417 *(token
+ len
) = '\0';
2425 * Finds the next token in *buf, dynamically allocates a buffer big
2426 * enough to hold a copy of it, and copies the token into the new
2427 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2428 * that a duplicate buffer is created even for a zero-length token.
2430 * Returns a pointer to the newly-allocated duplicate, or a null
2431 * pointer if memory for the duplicate was not available. If
2432 * the lenp argument is a non-null pointer, the length of the token
2433 * (not including the '\0') is returned in *lenp.
2435 * If successful, the *buf pointer will be updated to point beyond
2436 * the end of the found token.
2438 * Note: uses GFP_KERNEL for allocation.
2440 static inline char *dup_token(const char **buf
, size_t *lenp
)
2445 len
= next_token(buf
);
2446 dup
= kmalloc(len
+ 1, GFP_KERNEL
);
2450 memcpy(dup
, *buf
, len
);
2451 *(dup
+ len
) = '\0';
2461 * This fills in the pool_name, image_name, image_name_len, snap_name,
2462 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2463 * on the list of monitor addresses and other options provided via
2466 * Note: rbd_dev is assumed to have been initially zero-filled.
2468 static int rbd_add_parse_args(struct rbd_device
*rbd_dev
,
2470 const char **mon_addrs
,
2471 size_t *mon_addrs_size
,
2473 size_t options_size
)
2478 /* The first four tokens are required */
2480 len
= next_token(&buf
);
2483 *mon_addrs_size
= len
+ 1;
2488 len
= copy_token(&buf
, options
, options_size
);
2489 if (!len
|| len
>= options_size
)
2493 rbd_dev
->pool_name
= dup_token(&buf
, NULL
);
2494 if (!rbd_dev
->pool_name
)
2497 rbd_dev
->image_name
= dup_token(&buf
, &rbd_dev
->image_name_len
);
2498 if (!rbd_dev
->image_name
)
2501 /* Create the name of the header object */
2503 rbd_dev
->header_name
= kmalloc(rbd_dev
->image_name_len
2504 + sizeof (RBD_SUFFIX
),
2506 if (!rbd_dev
->header_name
)
2508 sprintf(rbd_dev
->header_name
, "%s%s", rbd_dev
->image_name
, RBD_SUFFIX
);
2511 * The snapshot name is optional. If none is is supplied,
2512 * we use the default value.
2514 rbd_dev
->snap_name
= dup_token(&buf
, &len
);
2515 if (!rbd_dev
->snap_name
)
2518 /* Replace the empty name with the default */
2519 kfree(rbd_dev
->snap_name
);
2521 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME
), GFP_KERNEL
);
2522 if (!rbd_dev
->snap_name
)
2525 memcpy(rbd_dev
->snap_name
, RBD_SNAP_HEAD_NAME
,
2526 sizeof (RBD_SNAP_HEAD_NAME
));
2532 kfree(rbd_dev
->header_name
);
2533 rbd_dev
->header_name
= NULL
;
2534 kfree(rbd_dev
->image_name
);
2535 rbd_dev
->image_name
= NULL
;
2536 rbd_dev
->image_name_len
= 0;
2537 kfree(rbd_dev
->pool_name
);
2538 rbd_dev
->pool_name
= NULL
;
2543 static ssize_t
rbd_add(struct bus_type
*bus
,
2548 struct rbd_device
*rbd_dev
= NULL
;
2549 const char *mon_addrs
= NULL
;
2550 size_t mon_addrs_size
= 0;
2551 struct ceph_osd_client
*osdc
;
2554 if (!try_module_get(THIS_MODULE
))
2557 options
= kmalloc(count
, GFP_KERNEL
);
2560 rbd_dev
= kzalloc(sizeof(*rbd_dev
), GFP_KERNEL
);
2564 /* static rbd_device initialization */
2565 spin_lock_init(&rbd_dev
->lock
);
2566 INIT_LIST_HEAD(&rbd_dev
->node
);
2567 INIT_LIST_HEAD(&rbd_dev
->snaps
);
2568 init_rwsem(&rbd_dev
->header_rwsem
);
2570 /* generate unique id: find highest unique id, add one */
2571 rbd_dev_id_get(rbd_dev
);
2573 /* Fill in the device name, now that we have its id. */
2574 BUILD_BUG_ON(DEV_NAME_LEN
2575 < sizeof (RBD_DRV_NAME
) + MAX_INT_FORMAT_WIDTH
);
2576 sprintf(rbd_dev
->name
, "%s%d", RBD_DRV_NAME
, rbd_dev
->dev_id
);
2578 /* parse add command */
2579 rc
= rbd_add_parse_args(rbd_dev
, buf
, &mon_addrs
, &mon_addrs_size
,
2584 rc
= rbd_get_client(rbd_dev
, mon_addrs
, mon_addrs_size
- 1, options
);
2589 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2590 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, rbd_dev
->pool_name
);
2592 goto err_out_client
;
2593 rbd_dev
->pool_id
= rc
;
2595 /* register our block device */
2596 rc
= register_blkdev(0, rbd_dev
->name
);
2598 goto err_out_client
;
2599 rbd_dev
->major
= rc
;
2601 rc
= rbd_bus_add_dev(rbd_dev
);
2603 goto err_out_blkdev
;
2606 * At this point cleanup in the event of an error is the job
2607 * of the sysfs code (initiated by rbd_bus_del_dev()).
2609 * Set up and announce blkdev mapping.
2611 rc
= rbd_init_disk(rbd_dev
);
2615 rc
= rbd_init_watch_dev(rbd_dev
);
2622 /* this will also clean up rest of rbd_dev stuff */
2624 rbd_bus_del_dev(rbd_dev
);
2629 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2631 rbd_put_client(rbd_dev
);
2633 if (rbd_dev
->pool_name
) {
2634 kfree(rbd_dev
->snap_name
);
2635 kfree(rbd_dev
->header_name
);
2636 kfree(rbd_dev
->image_name
);
2637 kfree(rbd_dev
->pool_name
);
2639 rbd_dev_id_put(rbd_dev
);
2644 dout("Error adding device %s\n", buf
);
2645 module_put(THIS_MODULE
);
2647 return (ssize_t
) rc
;
2650 static struct rbd_device
*__rbd_get_dev(unsigned long dev_id
)
2652 struct list_head
*tmp
;
2653 struct rbd_device
*rbd_dev
;
2655 spin_lock(&rbd_dev_list_lock
);
2656 list_for_each(tmp
, &rbd_dev_list
) {
2657 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2658 if (rbd_dev
->dev_id
== dev_id
) {
2659 spin_unlock(&rbd_dev_list_lock
);
2663 spin_unlock(&rbd_dev_list_lock
);
2667 static void rbd_dev_release(struct device
*dev
)
2669 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2671 if (rbd_dev
->watch_request
) {
2672 struct ceph_client
*client
= rbd_dev
->rbd_client
->client
;
2674 ceph_osdc_unregister_linger_request(&client
->osdc
,
2675 rbd_dev
->watch_request
);
2677 if (rbd_dev
->watch_event
)
2678 rbd_req_sync_unwatch(rbd_dev
);
2680 rbd_put_client(rbd_dev
);
2682 /* clean up and free blkdev */
2683 rbd_free_disk(rbd_dev
);
2684 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2686 /* done with the id, and with the rbd_dev */
2687 kfree(rbd_dev
->snap_name
);
2688 kfree(rbd_dev
->header_name
);
2689 kfree(rbd_dev
->pool_name
);
2690 kfree(rbd_dev
->image_name
);
2691 rbd_dev_id_put(rbd_dev
);
2694 /* release module ref */
2695 module_put(THIS_MODULE
);
2698 static ssize_t
rbd_remove(struct bus_type
*bus
,
2702 struct rbd_device
*rbd_dev
= NULL
;
2707 rc
= strict_strtoul(buf
, 10, &ul
);
2711 /* convert to int; abort if we lost anything in the conversion */
2712 target_id
= (int) ul
;
2713 if (target_id
!= ul
)
2716 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2718 rbd_dev
= __rbd_get_dev(target_id
);
2724 __rbd_remove_all_snaps(rbd_dev
);
2725 rbd_bus_del_dev(rbd_dev
);
2728 mutex_unlock(&ctl_mutex
);
2733 static ssize_t
rbd_snap_add(struct device
*dev
,
2734 struct device_attribute
*attr
,
2738 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2740 char *name
= kmalloc(count
+ 1, GFP_KERNEL
);
2744 snprintf(name
, count
, "%s", buf
);
2746 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2748 ret
= rbd_header_add_snap(rbd_dev
,
2753 ret
= __rbd_refresh_header(rbd_dev
, NULL
);
2757 /* shouldn't hold ctl_mutex when notifying.. notify might
2758 trigger a watch callback that would need to get that mutex */
2759 mutex_unlock(&ctl_mutex
);
2761 /* make a best effort, don't error if failed */
2762 rbd_req_sync_notify(rbd_dev
);
2769 mutex_unlock(&ctl_mutex
);
2775 * create control files in sysfs
2778 static int rbd_sysfs_init(void)
2782 ret
= device_register(&rbd_root_dev
);
2786 ret
= bus_register(&rbd_bus_type
);
2788 device_unregister(&rbd_root_dev
);
2793 static void rbd_sysfs_cleanup(void)
2795 bus_unregister(&rbd_bus_type
);
2796 device_unregister(&rbd_root_dev
);
2799 int __init
rbd_init(void)
2803 rc
= rbd_sysfs_init();
2806 pr_info("loaded " RBD_DRV_NAME_LONG
"\n");
2810 void __exit
rbd_exit(void)
2812 rbd_sysfs_cleanup();
2815 module_init(rbd_init
);
2816 module_exit(rbd_exit
);
2818 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2819 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2820 MODULE_DESCRIPTION("rados block device");
2822 /* following authorship retained from original osdblk.c */
2823 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2825 MODULE_LICENSE("GPL");