2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 * block device image metadata (in-memory version)
77 struct rbd_image_header
{
83 struct ceph_snap_context
*snapc
;
98 * an instance of the client. multiple devices may share an rbd client.
101 struct ceph_client
*client
;
102 struct rbd_options
*rbd_opts
;
104 struct list_head node
;
108 * a request completion status
110 struct rbd_req_status
{
117 * a collection of requests
119 struct rbd_req_coll
{
123 struct rbd_req_status status
[0];
127 * a single io request
130 struct request
*rq
; /* blk layer request */
131 struct bio
*bio
; /* cloned bio */
132 struct page
**pages
; /* list of used pages */
135 struct rbd_req_coll
*coll
;
142 struct list_head node
;
150 int dev_id
; /* blkdev unique id */
152 int major
; /* blkdev assigned major */
153 struct gendisk
*disk
; /* blkdev's gendisk and rq */
154 struct request_queue
*q
;
156 struct rbd_client
*rbd_client
;
158 char name
[DEV_NAME_LEN
]; /* blkdev name, e.g. rbd3 */
160 spinlock_t lock
; /* queue lock */
162 struct rbd_image_header header
;
164 size_t image_name_len
;
169 struct ceph_osd_event
*watch_event
;
170 struct ceph_osd_request
*watch_request
;
172 /* protects updating the header */
173 struct rw_semaphore header_rwsem
;
174 /* name of the snapshot this device reads from */
176 /* id of the snapshot this device reads from */
177 u64 snap_id
; /* current snapshot id */
178 /* whether the snap_id this device reads from still exists */
182 struct list_head node
;
184 /* list of snapshots */
185 struct list_head snaps
;
191 static DEFINE_MUTEX(ctl_mutex
); /* Serialize open/close/setup/teardown */
193 static LIST_HEAD(rbd_dev_list
); /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock
);
196 static LIST_HEAD(rbd_client_list
); /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock
);
199 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
);
200 static void rbd_dev_release(struct device
*dev
);
201 static ssize_t
rbd_snap_add(struct device
*dev
,
202 struct device_attribute
*attr
,
205 static void __rbd_remove_snap_dev(struct rbd_snap
*snap
);
207 static ssize_t
rbd_add(struct bus_type
*bus
, const char *buf
,
209 static ssize_t
rbd_remove(struct bus_type
*bus
, const char *buf
,
212 static struct bus_attribute rbd_bus_attrs
[] = {
213 __ATTR(add
, S_IWUSR
, NULL
, rbd_add
),
214 __ATTR(remove
, S_IWUSR
, NULL
, rbd_remove
),
218 static struct bus_type rbd_bus_type
= {
220 .bus_attrs
= rbd_bus_attrs
,
223 static void rbd_root_dev_release(struct device
*dev
)
227 static struct device rbd_root_dev
= {
229 .release
= rbd_root_dev_release
,
233 static struct device
*rbd_get_dev(struct rbd_device
*rbd_dev
)
235 return get_device(&rbd_dev
->dev
);
238 static void rbd_put_dev(struct rbd_device
*rbd_dev
)
240 put_device(&rbd_dev
->dev
);
243 static int rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
);
245 static int rbd_open(struct block_device
*bdev
, fmode_t mode
)
247 struct rbd_device
*rbd_dev
= bdev
->bd_disk
->private_data
;
249 if ((mode
& FMODE_WRITE
) && rbd_dev
->read_only
)
252 rbd_get_dev(rbd_dev
);
253 set_device_ro(bdev
, rbd_dev
->read_only
);
258 static int rbd_release(struct gendisk
*disk
, fmode_t mode
)
260 struct rbd_device
*rbd_dev
= disk
->private_data
;
262 rbd_put_dev(rbd_dev
);
267 static const struct block_device_operations rbd_bd_ops
= {
268 .owner
= THIS_MODULE
,
270 .release
= rbd_release
,
274 * Initialize an rbd client instance.
277 static struct rbd_client
*rbd_client_create(struct ceph_options
*ceph_opts
,
278 struct rbd_options
*rbd_opts
)
280 struct rbd_client
*rbdc
;
283 dout("rbd_client_create\n");
284 rbdc
= kmalloc(sizeof(struct rbd_client
), GFP_KERNEL
);
288 kref_init(&rbdc
->kref
);
289 INIT_LIST_HEAD(&rbdc
->node
);
291 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
293 rbdc
->client
= ceph_create_client(ceph_opts
, rbdc
, 0, 0);
294 if (IS_ERR(rbdc
->client
))
296 ceph_opts
= NULL
; /* Now rbdc->client is responsible for ceph_opts */
298 ret
= ceph_open_session(rbdc
->client
);
302 rbdc
->rbd_opts
= rbd_opts
;
304 spin_lock(&rbd_client_list_lock
);
305 list_add_tail(&rbdc
->node
, &rbd_client_list
);
306 spin_unlock(&rbd_client_list_lock
);
308 mutex_unlock(&ctl_mutex
);
310 dout("rbd_client_create created %p\n", rbdc
);
314 ceph_destroy_client(rbdc
->client
);
316 mutex_unlock(&ctl_mutex
);
320 ceph_destroy_options(ceph_opts
);
325 * Find a ceph client with specific addr and configuration.
327 static struct rbd_client
*__rbd_client_find(struct ceph_options
*ceph_opts
)
329 struct rbd_client
*client_node
;
331 if (ceph_opts
->flags
& CEPH_OPT_NOSHARE
)
334 list_for_each_entry(client_node
, &rbd_client_list
, node
)
335 if (!ceph_compare_options(ceph_opts
, client_node
->client
))
348 /* string args above */
351 static match_table_t rbd_opts_tokens
= {
352 {Opt_notify_timeout
, "notify_timeout=%d"},
354 /* string args above */
358 static int parse_rbd_opts_token(char *c
, void *private)
360 struct rbd_options
*rbd_opts
= private;
361 substring_t argstr
[MAX_OPT_ARGS
];
362 int token
, intval
, ret
;
364 token
= match_token(c
, rbd_opts_tokens
, argstr
);
368 if (token
< Opt_last_int
) {
369 ret
= match_int(&argstr
[0], &intval
);
371 pr_err("bad mount option arg (not int) "
375 dout("got int token %d val %d\n", token
, intval
);
376 } else if (token
> Opt_last_int
&& token
< Opt_last_string
) {
377 dout("got string token %d val %s\n", token
,
380 dout("got token %d\n", token
);
384 case Opt_notify_timeout
:
385 rbd_opts
->notify_timeout
= intval
;
394 * Get a ceph client with specific addr and configuration, if one does
395 * not exist create it.
397 static struct rbd_client
*rbd_get_client(const char *mon_addr
,
401 struct rbd_client
*rbdc
;
402 struct ceph_options
*ceph_opts
;
403 struct rbd_options
*rbd_opts
;
405 rbd_opts
= kzalloc(sizeof(*rbd_opts
), GFP_KERNEL
);
407 return ERR_PTR(-ENOMEM
);
409 rbd_opts
->notify_timeout
= RBD_NOTIFY_TIMEOUT_DEFAULT
;
411 ceph_opts
= ceph_parse_options(options
, mon_addr
,
412 mon_addr
+ mon_addr_len
,
413 parse_rbd_opts_token
, rbd_opts
);
414 if (IS_ERR(ceph_opts
)) {
416 return ERR_CAST(ceph_opts
);
419 spin_lock(&rbd_client_list_lock
);
420 rbdc
= __rbd_client_find(ceph_opts
);
422 /* using an existing client */
423 kref_get(&rbdc
->kref
);
424 spin_unlock(&rbd_client_list_lock
);
426 ceph_destroy_options(ceph_opts
);
431 spin_unlock(&rbd_client_list_lock
);
433 rbdc
= rbd_client_create(ceph_opts
, rbd_opts
);
442 * Destroy ceph client
444 * Caller must hold rbd_client_list_lock.
446 static void rbd_client_release(struct kref
*kref
)
448 struct rbd_client
*rbdc
= container_of(kref
, struct rbd_client
, kref
);
450 dout("rbd_release_client %p\n", rbdc
);
451 spin_lock(&rbd_client_list_lock
);
452 list_del(&rbdc
->node
);
453 spin_unlock(&rbd_client_list_lock
);
455 ceph_destroy_client(rbdc
->client
);
456 kfree(rbdc
->rbd_opts
);
461 * Drop reference to ceph client node. If it's not referenced anymore, release
464 static void rbd_put_client(struct rbd_device
*rbd_dev
)
466 kref_put(&rbd_dev
->rbd_client
->kref
, rbd_client_release
);
467 rbd_dev
->rbd_client
= NULL
;
471 * Destroy requests collection
473 static void rbd_coll_release(struct kref
*kref
)
475 struct rbd_req_coll
*coll
=
476 container_of(kref
, struct rbd_req_coll
, kref
);
478 dout("rbd_coll_release %p\n", coll
);
482 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk
*ondisk
)
484 return !memcmp(&ondisk
->text
,
485 RBD_HEADER_TEXT
, sizeof (RBD_HEADER_TEXT
));
489 * Create a new header structure, translate header format from the on-disk
492 static int rbd_header_from_disk(struct rbd_image_header
*header
,
493 struct rbd_image_header_ondisk
*ondisk
,
499 if (!rbd_dev_ondisk_valid(ondisk
))
502 snap_count
= le32_to_cpu(ondisk
->snap_count
);
504 /* Make sure we don't overflow below */
505 size
= SIZE_MAX
- sizeof (struct ceph_snap_context
);
506 if (snap_count
> size
/ sizeof (header
->snapc
->snaps
[0]))
509 memset(header
, 0, sizeof (*header
));
511 size
= sizeof (ondisk
->block_name
) + 1;
512 header
->object_prefix
= kmalloc(size
, GFP_KERNEL
);
513 if (!header
->object_prefix
)
515 memcpy(header
->object_prefix
, ondisk
->block_name
, size
- 1);
516 header
->object_prefix
[size
- 1] = '\0';
519 header
->snap_names_len
= le64_to_cpu(ondisk
->snap_names_len
);
520 BUG_ON(header
->snap_names_len
> (u64
) SIZE_MAX
);
521 header
->snap_names
= kmalloc(header
->snap_names_len
,
523 if (!header
->snap_names
)
526 size
= snap_count
* sizeof (*header
->snap_sizes
);
527 header
->snap_sizes
= kmalloc(size
, GFP_KERNEL
);
528 if (!header
->snap_sizes
)
531 WARN_ON(ondisk
->snap_names_len
);
532 header
->snap_names_len
= 0;
533 header
->snap_names
= NULL
;
534 header
->snap_sizes
= NULL
;
537 header
->image_size
= le64_to_cpu(ondisk
->image_size
);
538 header
->obj_order
= ondisk
->options
.order
;
539 header
->crypt_type
= ondisk
->options
.crypt_type
;
540 header
->comp_type
= ondisk
->options
.comp_type
;
541 header
->total_snaps
= snap_count
;
544 * If the number of snapshot ids provided by the caller
545 * doesn't match the number in the entire context there's
546 * no point in going further. Caller will try again after
547 * getting an updated snapshot context from the server.
549 if (allocated_snaps
!= snap_count
)
552 size
= sizeof (struct ceph_snap_context
);
553 size
+= snap_count
* sizeof (header
->snapc
->snaps
[0]);
554 header
->snapc
= kzalloc(size
, GFP_KERNEL
);
558 atomic_set(&header
->snapc
->nref
, 1);
559 header
->snapc
->seq
= le64_to_cpu(ondisk
->snap_seq
);
560 header
->snapc
->num_snaps
= snap_count
;
562 /* Fill in the snapshot information */
567 for (i
= 0; i
< snap_count
; i
++) {
568 header
->snapc
->snaps
[i
] =
569 le64_to_cpu(ondisk
->snaps
[i
].id
);
570 header
->snap_sizes
[i
] =
571 le64_to_cpu(ondisk
->snaps
[i
].image_size
);
574 /* copy snapshot names */
575 memcpy(header
->snap_names
, &ondisk
->snaps
[snap_count
],
576 header
->snap_names_len
);
582 kfree(header
->snap_sizes
);
583 header
->snap_sizes
= NULL
;
584 kfree(header
->snap_names
);
585 header
->snap_names
= NULL
;
586 header
->snap_names_len
= 0;
587 kfree(header
->object_prefix
);
588 header
->object_prefix
= NULL
;
593 static int snap_by_name(struct rbd_image_header
*header
, const char *snap_name
,
597 char *p
= header
->snap_names
;
599 for (i
= 0; i
< header
->total_snaps
; i
++) {
600 if (!strcmp(snap_name
, p
)) {
602 /* Found it. Pass back its id and/or size */
605 *seq
= header
->snapc
->snaps
[i
];
607 *size
= header
->snap_sizes
[i
];
610 p
+= strlen(p
) + 1; /* Skip ahead to the next name */
615 static int rbd_header_set_snap(struct rbd_device
*rbd_dev
, u64
*size
)
619 down_write(&rbd_dev
->header_rwsem
);
621 if (!memcmp(rbd_dev
->snap_name
, RBD_SNAP_HEAD_NAME
,
622 sizeof (RBD_SNAP_HEAD_NAME
))) {
623 rbd_dev
->snap_id
= CEPH_NOSNAP
;
624 rbd_dev
->snap_exists
= false;
625 rbd_dev
->read_only
= 0;
627 *size
= rbd_dev
->header
.image_size
;
631 ret
= snap_by_name(&rbd_dev
->header
, rbd_dev
->snap_name
,
635 rbd_dev
->snap_id
= snap_id
;
636 rbd_dev
->snap_exists
= true;
637 rbd_dev
->read_only
= 1;
642 up_write(&rbd_dev
->header_rwsem
);
646 static void rbd_header_free(struct rbd_image_header
*header
)
648 kfree(header
->object_prefix
);
649 header
->object_prefix
= NULL
;
650 kfree(header
->snap_sizes
);
651 header
->snap_sizes
= NULL
;
652 kfree(header
->snap_names
);
653 header
->snap_names
= NULL
;
654 header
->snap_names_len
= 0;
655 ceph_put_snap_context(header
->snapc
);
656 header
->snapc
= NULL
;
660 * get the actual striped segment name, offset and length
662 static u64
rbd_get_segment(struct rbd_image_header
*header
,
663 const char *object_prefix
,
665 char *seg_name
, u64
*segofs
)
667 u64 seg
= ofs
>> header
->obj_order
;
670 snprintf(seg_name
, RBD_MAX_SEG_NAME_LEN
,
671 "%s.%012llx", object_prefix
, seg
);
673 ofs
= ofs
& ((1 << header
->obj_order
) - 1);
674 len
= min_t(u64
, len
, (1 << header
->obj_order
) - ofs
);
682 static int rbd_get_num_segments(struct rbd_image_header
*header
,
685 u64 start_seg
= ofs
>> header
->obj_order
;
686 u64 end_seg
= (ofs
+ len
- 1) >> header
->obj_order
;
687 return end_seg
- start_seg
+ 1;
691 * returns the size of an object in the image
693 static u64
rbd_obj_bytes(struct rbd_image_header
*header
)
695 return 1 << header
->obj_order
;
702 static void bio_chain_put(struct bio
*chain
)
708 chain
= chain
->bi_next
;
714 * zeros a bio chain, starting at specific offset
716 static void zero_bio_chain(struct bio
*chain
, int start_ofs
)
725 bio_for_each_segment(bv
, chain
, i
) {
726 if (pos
+ bv
->bv_len
> start_ofs
) {
727 int remainder
= max(start_ofs
- pos
, 0);
728 buf
= bvec_kmap_irq(bv
, &flags
);
729 memset(buf
+ remainder
, 0,
730 bv
->bv_len
- remainder
);
731 bvec_kunmap_irq(buf
, &flags
);
736 chain
= chain
->bi_next
;
741 * bio_chain_clone - clone a chain of bios up to a certain length.
742 * might return a bio_pair that will need to be released.
744 static struct bio
*bio_chain_clone(struct bio
**old
, struct bio
**next
,
745 struct bio_pair
**bp
,
746 int len
, gfp_t gfpmask
)
748 struct bio
*tmp
, *old_chain
= *old
, *new_chain
= NULL
, *tail
= NULL
;
752 bio_pair_release(*bp
);
756 while (old_chain
&& (total
< len
)) {
757 tmp
= bio_kmalloc(gfpmask
, old_chain
->bi_max_vecs
);
761 if (total
+ old_chain
->bi_size
> len
) {
765 * this split can only happen with a single paged bio,
766 * split_bio will BUG_ON if this is not the case
768 dout("bio_chain_clone split! total=%d remaining=%d"
770 total
, len
- total
, old_chain
->bi_size
);
772 /* split the bio. We'll release it either in the next
773 call, or it will have to be released outside */
774 bp
= bio_split(old_chain
, (len
- total
) / SECTOR_SIZE
);
778 __bio_clone(tmp
, &bp
->bio1
);
782 __bio_clone(tmp
, old_chain
);
783 *next
= old_chain
->bi_next
;
787 gfpmask
&= ~__GFP_WAIT
;
791 new_chain
= tail
= tmp
;
796 old_chain
= old_chain
->bi_next
;
798 total
+= tmp
->bi_size
;
804 tail
->bi_next
= NULL
;
811 dout("bio_chain_clone with err\n");
812 bio_chain_put(new_chain
);
817 * helpers for osd request op vectors.
819 static struct ceph_osd_req_op
*rbd_create_rw_ops(int num_ops
,
820 int opcode
, u32 payload_len
)
822 struct ceph_osd_req_op
*ops
;
824 ops
= kzalloc(sizeof (*ops
) * (num_ops
+ 1), GFP_NOIO
);
831 * op extent offset and length will be set later on
832 * in calc_raw_layout()
834 ops
[0].payload_len
= payload_len
;
839 static void rbd_destroy_ops(struct ceph_osd_req_op
*ops
)
844 static void rbd_coll_end_req_index(struct request
*rq
,
845 struct rbd_req_coll
*coll
,
849 struct request_queue
*q
;
852 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
853 coll
, index
, ret
, (unsigned long long) len
);
859 blk_end_request(rq
, ret
, len
);
865 spin_lock_irq(q
->queue_lock
);
866 coll
->status
[index
].done
= 1;
867 coll
->status
[index
].rc
= ret
;
868 coll
->status
[index
].bytes
= len
;
869 max
= min
= coll
->num_done
;
870 while (max
< coll
->total
&& coll
->status
[max
].done
)
873 for (i
= min
; i
<max
; i
++) {
874 __blk_end_request(rq
, coll
->status
[i
].rc
,
875 coll
->status
[i
].bytes
);
877 kref_put(&coll
->kref
, rbd_coll_release
);
879 spin_unlock_irq(q
->queue_lock
);
882 static void rbd_coll_end_req(struct rbd_request
*req
,
885 rbd_coll_end_req_index(req
->rq
, req
->coll
, req
->coll_index
, ret
, len
);
889 * Send ceph osd request
891 static int rbd_do_request(struct request
*rq
,
892 struct rbd_device
*rbd_dev
,
893 struct ceph_snap_context
*snapc
,
895 const char *object_name
, u64 ofs
, u64 len
,
900 struct ceph_osd_req_op
*ops
,
901 struct rbd_req_coll
*coll
,
903 void (*rbd_cb
)(struct ceph_osd_request
*req
,
904 struct ceph_msg
*msg
),
905 struct ceph_osd_request
**linger_req
,
908 struct ceph_osd_request
*req
;
909 struct ceph_file_layout
*layout
;
912 struct timespec mtime
= CURRENT_TIME
;
913 struct rbd_request
*req_data
;
914 struct ceph_osd_request_head
*reqhead
;
915 struct ceph_osd_client
*osdc
;
917 req_data
= kzalloc(sizeof(*req_data
), GFP_NOIO
);
920 rbd_coll_end_req_index(rq
, coll
, coll_index
,
926 req_data
->coll
= coll
;
927 req_data
->coll_index
= coll_index
;
930 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name
,
931 (unsigned long long) ofs
, (unsigned long long) len
);
933 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
934 req
= ceph_osdc_alloc_request(osdc
, flags
, snapc
, ops
,
935 false, GFP_NOIO
, pages
, bio
);
941 req
->r_callback
= rbd_cb
;
945 req_data
->pages
= pages
;
948 req
->r_priv
= req_data
;
950 reqhead
= req
->r_request
->front
.iov_base
;
951 reqhead
->snapid
= cpu_to_le64(CEPH_NOSNAP
);
953 strncpy(req
->r_oid
, object_name
, sizeof(req
->r_oid
));
954 req
->r_oid_len
= strlen(req
->r_oid
);
956 layout
= &req
->r_file_layout
;
957 memset(layout
, 0, sizeof(*layout
));
958 layout
->fl_stripe_unit
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
959 layout
->fl_stripe_count
= cpu_to_le32(1);
960 layout
->fl_object_size
= cpu_to_le32(1 << RBD_MAX_OBJ_ORDER
);
961 layout
->fl_pg_pool
= cpu_to_le32(rbd_dev
->pool_id
);
962 ceph_calc_raw_layout(osdc
, layout
, snapid
, ofs
, &len
, &bno
,
965 ceph_osdc_build_request(req
, ofs
, &len
,
969 req
->r_oid
, req
->r_oid_len
);
972 ceph_osdc_set_request_linger(osdc
, req
);
976 ret
= ceph_osdc_start_request(osdc
, req
, false);
981 ret
= ceph_osdc_wait_request(osdc
, req
);
983 *ver
= le64_to_cpu(req
->r_reassert_version
.version
);
984 dout("reassert_ver=%llu\n",
986 le64_to_cpu(req
->r_reassert_version
.version
));
987 ceph_osdc_put_request(req
);
992 bio_chain_put(req_data
->bio
);
993 ceph_osdc_put_request(req
);
995 rbd_coll_end_req(req_data
, ret
, len
);
1001 * Ceph osd op callback
1003 static void rbd_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1005 struct rbd_request
*req_data
= req
->r_priv
;
1006 struct ceph_osd_reply_head
*replyhead
;
1007 struct ceph_osd_op
*op
;
1013 replyhead
= msg
->front
.iov_base
;
1014 WARN_ON(le32_to_cpu(replyhead
->num_ops
) == 0);
1015 op
= (void *)(replyhead
+ 1);
1016 rc
= le32_to_cpu(replyhead
->result
);
1017 bytes
= le64_to_cpu(op
->extent
.length
);
1018 read_op
= (le16_to_cpu(op
->op
) == CEPH_OSD_OP_READ
);
1020 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1021 (unsigned long long) bytes
, read_op
, (int) rc
);
1023 if (rc
== -ENOENT
&& read_op
) {
1024 zero_bio_chain(req_data
->bio
, 0);
1026 } else if (rc
== 0 && read_op
&& bytes
< req_data
->len
) {
1027 zero_bio_chain(req_data
->bio
, bytes
);
1028 bytes
= req_data
->len
;
1031 rbd_coll_end_req(req_data
, rc
, bytes
);
1034 bio_chain_put(req_data
->bio
);
1036 ceph_osdc_put_request(req
);
1040 static void rbd_simple_req_cb(struct ceph_osd_request
*req
, struct ceph_msg
*msg
)
1042 ceph_osdc_put_request(req
);
1046 * Do a synchronous ceph osd operation
1048 static int rbd_req_sync_op(struct rbd_device
*rbd_dev
,
1049 struct ceph_snap_context
*snapc
,
1052 struct ceph_osd_req_op
*ops
,
1053 const char *object_name
,
1056 struct ceph_osd_request
**linger_req
,
1060 struct page
**pages
;
1063 BUG_ON(ops
== NULL
);
1065 num_pages
= calc_pages_for(ofs
, len
);
1066 pages
= ceph_alloc_page_vector(num_pages
, GFP_KERNEL
);
1068 return PTR_ERR(pages
);
1070 ret
= rbd_do_request(NULL
, rbd_dev
, snapc
, snapid
,
1071 object_name
, ofs
, len
, NULL
,
1081 if ((flags
& CEPH_OSD_FLAG_READ
) && buf
)
1082 ret
= ceph_copy_from_page_vector(pages
, buf
, ofs
, ret
);
1085 ceph_release_page_vector(pages
, num_pages
);
1090 * Do an asynchronous ceph osd operation
1092 static int rbd_do_op(struct request
*rq
,
1093 struct rbd_device
*rbd_dev
,
1094 struct ceph_snap_context
*snapc
,
1096 int opcode
, int flags
,
1099 struct rbd_req_coll
*coll
,
1106 struct ceph_osd_req_op
*ops
;
1109 seg_name
= kmalloc(RBD_MAX_SEG_NAME_LEN
+ 1, GFP_NOIO
);
1113 seg_len
= rbd_get_segment(&rbd_dev
->header
,
1114 rbd_dev
->header
.object_prefix
,
1116 seg_name
, &seg_ofs
);
1118 payload_len
= (flags
& CEPH_OSD_FLAG_WRITE
? seg_len
: 0);
1121 ops
= rbd_create_rw_ops(1, opcode
, payload_len
);
1125 /* we've taken care of segment sizes earlier when we
1126 cloned the bios. We should never have a segment
1127 truncated at this point */
1128 BUG_ON(seg_len
< len
);
1130 ret
= rbd_do_request(rq
, rbd_dev
, snapc
, snapid
,
1131 seg_name
, seg_ofs
, seg_len
,
1137 rbd_req_cb
, 0, NULL
);
1139 rbd_destroy_ops(ops
);
1146 * Request async osd write
1148 static int rbd_req_write(struct request
*rq
,
1149 struct rbd_device
*rbd_dev
,
1150 struct ceph_snap_context
*snapc
,
1153 struct rbd_req_coll
*coll
,
1156 return rbd_do_op(rq
, rbd_dev
, snapc
, CEPH_NOSNAP
,
1158 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1159 ofs
, len
, bio
, coll
, coll_index
);
1163 * Request async osd read
1165 static int rbd_req_read(struct request
*rq
,
1166 struct rbd_device
*rbd_dev
,
1170 struct rbd_req_coll
*coll
,
1173 return rbd_do_op(rq
, rbd_dev
, NULL
,
1177 ofs
, len
, bio
, coll
, coll_index
);
1181 * Request sync osd read
1183 static int rbd_req_sync_read(struct rbd_device
*rbd_dev
,
1185 const char *object_name
,
1190 struct ceph_osd_req_op
*ops
;
1193 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_READ
, 0);
1197 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1200 ops
, object_name
, ofs
, len
, buf
, NULL
, ver
);
1201 rbd_destroy_ops(ops
);
1207 * Request sync osd watch
1209 static int rbd_req_sync_notify_ack(struct rbd_device
*rbd_dev
,
1213 struct ceph_osd_req_op
*ops
;
1216 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK
, 0);
1220 ops
[0].watch
.ver
= cpu_to_le64(ver
);
1221 ops
[0].watch
.cookie
= notify_id
;
1222 ops
[0].watch
.flag
= 0;
1224 ret
= rbd_do_request(NULL
, rbd_dev
, NULL
, CEPH_NOSNAP
,
1225 rbd_dev
->header_name
, 0, 0, NULL
,
1230 rbd_simple_req_cb
, 0, NULL
);
1232 rbd_destroy_ops(ops
);
1236 static void rbd_watch_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1238 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1245 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1246 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1247 (unsigned int) opcode
);
1248 rc
= rbd_refresh_header(rbd_dev
, &hver
);
1250 pr_warning(RBD_DRV_NAME
"%d got notification but failed to "
1251 " update snaps: %d\n", rbd_dev
->major
, rc
);
1253 rbd_req_sync_notify_ack(rbd_dev
, hver
, notify_id
);
1257 * Request sync osd watch
1259 static int rbd_req_sync_watch(struct rbd_device
*rbd_dev
)
1261 struct ceph_osd_req_op
*ops
;
1262 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1265 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH
, 0);
1269 ret
= ceph_osdc_create_event(osdc
, rbd_watch_cb
, 0,
1270 (void *)rbd_dev
, &rbd_dev
->watch_event
);
1274 ops
[0].watch
.ver
= cpu_to_le64(rbd_dev
->header
.obj_version
);
1275 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1276 ops
[0].watch
.flag
= 1;
1278 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1280 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1282 rbd_dev
->header_name
,
1284 &rbd_dev
->watch_request
, NULL
);
1289 rbd_destroy_ops(ops
);
1293 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1294 rbd_dev
->watch_event
= NULL
;
1296 rbd_destroy_ops(ops
);
1301 * Request sync osd unwatch
1303 static int rbd_req_sync_unwatch(struct rbd_device
*rbd_dev
)
1305 struct ceph_osd_req_op
*ops
;
1308 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH
, 0);
1312 ops
[0].watch
.ver
= 0;
1313 ops
[0].watch
.cookie
= cpu_to_le64(rbd_dev
->watch_event
->cookie
);
1314 ops
[0].watch
.flag
= 0;
1316 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1318 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1320 rbd_dev
->header_name
,
1321 0, 0, NULL
, NULL
, NULL
);
1324 rbd_destroy_ops(ops
);
1325 ceph_osdc_cancel_event(rbd_dev
->watch_event
);
1326 rbd_dev
->watch_event
= NULL
;
1330 struct rbd_notify_info
{
1331 struct rbd_device
*rbd_dev
;
1334 static void rbd_notify_cb(u64 ver
, u64 notify_id
, u8 opcode
, void *data
)
1336 struct rbd_device
*rbd_dev
= (struct rbd_device
*)data
;
1340 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1341 rbd_dev
->header_name
, (unsigned long long) notify_id
,
1342 (unsigned int) opcode
);
1346 * Request sync osd notify
1348 static int rbd_req_sync_notify(struct rbd_device
*rbd_dev
)
1350 struct ceph_osd_req_op
*ops
;
1351 struct ceph_osd_client
*osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
1352 struct ceph_osd_event
*event
;
1353 struct rbd_notify_info info
;
1354 int payload_len
= sizeof(u32
) + sizeof(u32
);
1357 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY
, payload_len
);
1361 info
.rbd_dev
= rbd_dev
;
1363 ret
= ceph_osdc_create_event(osdc
, rbd_notify_cb
, 1,
1364 (void *)&info
, &event
);
1368 ops
[0].watch
.ver
= 1;
1369 ops
[0].watch
.flag
= 1;
1370 ops
[0].watch
.cookie
= event
->cookie
;
1371 ops
[0].watch
.prot_ver
= RADOS_NOTIFY_VER
;
1372 ops
[0].watch
.timeout
= 12;
1374 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1376 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1378 rbd_dev
->header_name
,
1379 0, 0, NULL
, NULL
, NULL
);
1383 ret
= ceph_osdc_wait_event(event
, CEPH_OSD_TIMEOUT_DEFAULT
);
1384 dout("ceph_osdc_wait_event returned %d\n", ret
);
1385 rbd_destroy_ops(ops
);
1389 ceph_osdc_cancel_event(event
);
1391 rbd_destroy_ops(ops
);
1396 * Request sync osd read
1398 static int rbd_req_sync_exec(struct rbd_device
*rbd_dev
,
1399 const char *object_name
,
1400 const char *class_name
,
1401 const char *method_name
,
1406 struct ceph_osd_req_op
*ops
;
1407 int class_name_len
= strlen(class_name
);
1408 int method_name_len
= strlen(method_name
);
1411 ops
= rbd_create_rw_ops(1, CEPH_OSD_OP_CALL
,
1412 class_name_len
+ method_name_len
+ len
);
1416 ops
[0].cls
.class_name
= class_name
;
1417 ops
[0].cls
.class_len
= (__u8
) class_name_len
;
1418 ops
[0].cls
.method_name
= method_name
;
1419 ops
[0].cls
.method_len
= (__u8
) method_name_len
;
1420 ops
[0].cls
.argc
= 0;
1421 ops
[0].cls
.indata
= data
;
1422 ops
[0].cls
.indata_len
= len
;
1424 ret
= rbd_req_sync_op(rbd_dev
, NULL
,
1426 CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_ONDISK
,
1428 object_name
, 0, 0, NULL
, NULL
, ver
);
1430 rbd_destroy_ops(ops
);
1432 dout("cls_exec returned %d\n", ret
);
1436 static struct rbd_req_coll
*rbd_alloc_coll(int num_reqs
)
1438 struct rbd_req_coll
*coll
=
1439 kzalloc(sizeof(struct rbd_req_coll
) +
1440 sizeof(struct rbd_req_status
) * num_reqs
,
1445 coll
->total
= num_reqs
;
1446 kref_init(&coll
->kref
);
1451 * block device queue callback
1453 static void rbd_rq_fn(struct request_queue
*q
)
1455 struct rbd_device
*rbd_dev
= q
->queuedata
;
1457 struct bio_pair
*bp
= NULL
;
1459 while ((rq
= blk_fetch_request(q
))) {
1461 struct bio
*rq_bio
, *next_bio
= NULL
;
1466 int num_segs
, cur_seg
= 0;
1467 struct rbd_req_coll
*coll
;
1468 struct ceph_snap_context
*snapc
;
1470 /* peek at request from block layer */
1474 dout("fetched request\n");
1476 /* filter out block requests we don't understand */
1477 if ((rq
->cmd_type
!= REQ_TYPE_FS
)) {
1478 __blk_end_request_all(rq
, 0);
1482 /* deduce our operation (read, write) */
1483 do_write
= (rq_data_dir(rq
) == WRITE
);
1485 size
= blk_rq_bytes(rq
);
1486 ofs
= blk_rq_pos(rq
) * SECTOR_SIZE
;
1488 if (do_write
&& rbd_dev
->read_only
) {
1489 __blk_end_request_all(rq
, -EROFS
);
1493 spin_unlock_irq(q
->queue_lock
);
1495 down_read(&rbd_dev
->header_rwsem
);
1497 if (rbd_dev
->snap_id
!= CEPH_NOSNAP
&& !rbd_dev
->snap_exists
) {
1498 up_read(&rbd_dev
->header_rwsem
);
1499 dout("request for non-existent snapshot");
1500 spin_lock_irq(q
->queue_lock
);
1501 __blk_end_request_all(rq
, -ENXIO
);
1505 snapc
= ceph_get_snap_context(rbd_dev
->header
.snapc
);
1507 up_read(&rbd_dev
->header_rwsem
);
1509 dout("%s 0x%x bytes at 0x%llx\n",
1510 do_write
? "write" : "read",
1511 size
, (unsigned long long) blk_rq_pos(rq
) * SECTOR_SIZE
);
1513 num_segs
= rbd_get_num_segments(&rbd_dev
->header
, ofs
, size
);
1514 coll
= rbd_alloc_coll(num_segs
);
1516 spin_lock_irq(q
->queue_lock
);
1517 __blk_end_request_all(rq
, -ENOMEM
);
1518 ceph_put_snap_context(snapc
);
1523 /* a bio clone to be passed down to OSD req */
1524 dout("rq->bio->bi_vcnt=%hu\n", rq
->bio
->bi_vcnt
);
1525 op_size
= rbd_get_segment(&rbd_dev
->header
,
1526 rbd_dev
->header
.object_prefix
,
1529 kref_get(&coll
->kref
);
1530 bio
= bio_chain_clone(&rq_bio
, &next_bio
, &bp
,
1531 op_size
, GFP_ATOMIC
);
1533 rbd_coll_end_req_index(rq
, coll
, cur_seg
,
1539 /* init OSD command: write or read */
1541 rbd_req_write(rq
, rbd_dev
,
1547 rbd_req_read(rq
, rbd_dev
,
1560 kref_put(&coll
->kref
, rbd_coll_release
);
1563 bio_pair_release(bp
);
1564 spin_lock_irq(q
->queue_lock
);
1566 ceph_put_snap_context(snapc
);
1571 * a queue callback. Makes sure that we don't create a bio that spans across
1572 * multiple osd objects. One exception would be with a single page bios,
1573 * which we handle later at bio_chain_clone
1575 static int rbd_merge_bvec(struct request_queue
*q
, struct bvec_merge_data
*bmd
,
1576 struct bio_vec
*bvec
)
1578 struct rbd_device
*rbd_dev
= q
->queuedata
;
1579 unsigned int chunk_sectors
;
1581 unsigned int bio_sectors
;
1584 chunk_sectors
= 1 << (rbd_dev
->header
.obj_order
- SECTOR_SHIFT
);
1585 sector
= bmd
->bi_sector
+ get_start_sect(bmd
->bi_bdev
);
1586 bio_sectors
= bmd
->bi_size
>> SECTOR_SHIFT
;
1588 max
= (chunk_sectors
- ((sector
& (chunk_sectors
- 1))
1589 + bio_sectors
)) << SECTOR_SHIFT
;
1591 max
= 0; /* bio_add cannot handle a negative return */
1592 if (max
<= bvec
->bv_len
&& bio_sectors
== 0)
1593 return bvec
->bv_len
;
1597 static void rbd_free_disk(struct rbd_device
*rbd_dev
)
1599 struct gendisk
*disk
= rbd_dev
->disk
;
1604 rbd_header_free(&rbd_dev
->header
);
1606 if (disk
->flags
& GENHD_FL_UP
)
1609 blk_cleanup_queue(disk
->queue
);
1614 * reload the ondisk the header
1616 static int rbd_read_header(struct rbd_device
*rbd_dev
,
1617 struct rbd_image_header
*header
)
1620 struct rbd_image_header_ondisk
*dh
;
1626 * First reads the fixed-size header to determine the number
1627 * of snapshots, then re-reads it, along with all snapshot
1628 * records as well as their stored names.
1632 dh
= kmalloc(len
, GFP_KERNEL
);
1636 rc
= rbd_req_sync_read(rbd_dev
,
1638 rbd_dev
->header_name
,
1644 rc
= rbd_header_from_disk(header
, dh
, snap_count
);
1647 pr_warning("unrecognized header format"
1649 rbd_dev
->image_name
);
1653 if (snap_count
== header
->total_snaps
)
1656 snap_count
= header
->total_snaps
;
1657 len
= sizeof (*dh
) +
1658 snap_count
* sizeof(struct rbd_image_snap_ondisk
) +
1659 header
->snap_names_len
;
1661 rbd_header_free(header
);
1664 header
->obj_version
= ver
;
1674 static int rbd_header_add_snap(struct rbd_device
*rbd_dev
,
1675 const char *snap_name
,
1678 int name_len
= strlen(snap_name
);
1682 struct ceph_mon_client
*monc
;
1684 /* we should create a snapshot only if we're pointing at the head */
1685 if (rbd_dev
->snap_id
!= CEPH_NOSNAP
)
1688 monc
= &rbd_dev
->rbd_client
->client
->monc
;
1689 ret
= ceph_monc_create_snapid(monc
, rbd_dev
->pool_id
, &new_snapid
);
1690 dout("created snapid=%llu\n", (unsigned long long) new_snapid
);
1694 data
= kmalloc(name_len
+ 16, gfp_flags
);
1699 e
= data
+ name_len
+ 16;
1701 ceph_encode_string_safe(&p
, e
, snap_name
, name_len
, bad
);
1702 ceph_encode_64_safe(&p
, e
, new_snapid
, bad
);
1704 ret
= rbd_req_sync_exec(rbd_dev
, rbd_dev
->header_name
,
1706 data
, p
- data
, NULL
);
1710 return ret
< 0 ? ret
: 0;
1715 static void __rbd_remove_all_snaps(struct rbd_device
*rbd_dev
)
1717 struct rbd_snap
*snap
;
1718 struct rbd_snap
*next
;
1720 list_for_each_entry_safe(snap
, next
, &rbd_dev
->snaps
, node
)
1721 __rbd_remove_snap_dev(snap
);
1725 * only read the first part of the ondisk header, without the snaps info
1727 static int __rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
)
1730 struct rbd_image_header h
;
1732 ret
= rbd_read_header(rbd_dev
, &h
);
1736 down_write(&rbd_dev
->header_rwsem
);
1739 if (rbd_dev
->snap_id
== CEPH_NOSNAP
) {
1740 sector_t size
= (sector_t
) h
.image_size
/ SECTOR_SIZE
;
1742 dout("setting size to %llu sectors", (unsigned long long) size
);
1743 set_capacity(rbd_dev
->disk
, size
);
1746 /* rbd_dev->header.object_prefix shouldn't change */
1747 kfree(rbd_dev
->header
.snap_sizes
);
1748 kfree(rbd_dev
->header
.snap_names
);
1749 /* osd requests may still refer to snapc */
1750 ceph_put_snap_context(rbd_dev
->header
.snapc
);
1753 *hver
= h
.obj_version
;
1754 rbd_dev
->header
.obj_version
= h
.obj_version
;
1755 rbd_dev
->header
.image_size
= h
.image_size
;
1756 rbd_dev
->header
.total_snaps
= h
.total_snaps
;
1757 rbd_dev
->header
.snapc
= h
.snapc
;
1758 rbd_dev
->header
.snap_names
= h
.snap_names
;
1759 rbd_dev
->header
.snap_names_len
= h
.snap_names_len
;
1760 rbd_dev
->header
.snap_sizes
= h
.snap_sizes
;
1761 /* Free the extra copy of the object prefix */
1762 WARN_ON(strcmp(rbd_dev
->header
.object_prefix
, h
.object_prefix
));
1763 kfree(h
.object_prefix
);
1765 ret
= __rbd_init_snaps_header(rbd_dev
);
1767 up_write(&rbd_dev
->header_rwsem
);
1772 static int rbd_refresh_header(struct rbd_device
*rbd_dev
, u64
*hver
)
1776 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
1777 ret
= __rbd_refresh_header(rbd_dev
, hver
);
1778 mutex_unlock(&ctl_mutex
);
1783 static int rbd_init_disk(struct rbd_device
*rbd_dev
)
1785 struct gendisk
*disk
;
1786 struct request_queue
*q
;
1791 /* contact OSD, request size info about the object being mapped */
1792 rc
= rbd_read_header(rbd_dev
, &rbd_dev
->header
);
1796 /* no need to lock here, as rbd_dev is not registered yet */
1797 rc
= __rbd_init_snaps_header(rbd_dev
);
1801 rc
= rbd_header_set_snap(rbd_dev
, &total_size
);
1805 /* create gendisk info */
1807 disk
= alloc_disk(RBD_MINORS_PER_MAJOR
);
1811 snprintf(disk
->disk_name
, sizeof(disk
->disk_name
), RBD_DRV_NAME
"%d",
1813 disk
->major
= rbd_dev
->major
;
1814 disk
->first_minor
= 0;
1815 disk
->fops
= &rbd_bd_ops
;
1816 disk
->private_data
= rbd_dev
;
1820 q
= blk_init_queue(rbd_rq_fn
, &rbd_dev
->lock
);
1824 /* We use the default size, but let's be explicit about it. */
1825 blk_queue_physical_block_size(q
, SECTOR_SIZE
);
1827 /* set io sizes to object size */
1828 segment_size
= rbd_obj_bytes(&rbd_dev
->header
);
1829 blk_queue_max_hw_sectors(q
, segment_size
/ SECTOR_SIZE
);
1830 blk_queue_max_segment_size(q
, segment_size
);
1831 blk_queue_io_min(q
, segment_size
);
1832 blk_queue_io_opt(q
, segment_size
);
1834 blk_queue_merge_bvec(q
, rbd_merge_bvec
);
1837 q
->queuedata
= rbd_dev
;
1839 rbd_dev
->disk
= disk
;
1842 /* finally, announce the disk to the world */
1843 set_capacity(disk
, total_size
/ SECTOR_SIZE
);
1846 pr_info("%s: added with size 0x%llx\n",
1847 disk
->disk_name
, (unsigned long long)total_size
);
1860 static struct rbd_device
*dev_to_rbd_dev(struct device
*dev
)
1862 return container_of(dev
, struct rbd_device
, dev
);
1865 static ssize_t
rbd_size_show(struct device
*dev
,
1866 struct device_attribute
*attr
, char *buf
)
1868 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1871 down_read(&rbd_dev
->header_rwsem
);
1872 size
= get_capacity(rbd_dev
->disk
);
1873 up_read(&rbd_dev
->header_rwsem
);
1875 return sprintf(buf
, "%llu\n", (unsigned long long) size
* SECTOR_SIZE
);
1878 static ssize_t
rbd_major_show(struct device
*dev
,
1879 struct device_attribute
*attr
, char *buf
)
1881 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1883 return sprintf(buf
, "%d\n", rbd_dev
->major
);
1886 static ssize_t
rbd_client_id_show(struct device
*dev
,
1887 struct device_attribute
*attr
, char *buf
)
1889 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1891 return sprintf(buf
, "client%lld\n",
1892 ceph_client_id(rbd_dev
->rbd_client
->client
));
1895 static ssize_t
rbd_pool_show(struct device
*dev
,
1896 struct device_attribute
*attr
, char *buf
)
1898 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1900 return sprintf(buf
, "%s\n", rbd_dev
->pool_name
);
1903 static ssize_t
rbd_pool_id_show(struct device
*dev
,
1904 struct device_attribute
*attr
, char *buf
)
1906 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1908 return sprintf(buf
, "%d\n", rbd_dev
->pool_id
);
1911 static ssize_t
rbd_name_show(struct device
*dev
,
1912 struct device_attribute
*attr
, char *buf
)
1914 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1916 return sprintf(buf
, "%s\n", rbd_dev
->image_name
);
1919 static ssize_t
rbd_snap_show(struct device
*dev
,
1920 struct device_attribute
*attr
,
1923 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1925 return sprintf(buf
, "%s\n", rbd_dev
->snap_name
);
1928 static ssize_t
rbd_image_refresh(struct device
*dev
,
1929 struct device_attribute
*attr
,
1933 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
1936 ret
= rbd_refresh_header(rbd_dev
, NULL
);
1938 return ret
< 0 ? ret
: size
;
1941 static DEVICE_ATTR(size
, S_IRUGO
, rbd_size_show
, NULL
);
1942 static DEVICE_ATTR(major
, S_IRUGO
, rbd_major_show
, NULL
);
1943 static DEVICE_ATTR(client_id
, S_IRUGO
, rbd_client_id_show
, NULL
);
1944 static DEVICE_ATTR(pool
, S_IRUGO
, rbd_pool_show
, NULL
);
1945 static DEVICE_ATTR(pool_id
, S_IRUGO
, rbd_pool_id_show
, NULL
);
1946 static DEVICE_ATTR(name
, S_IRUGO
, rbd_name_show
, NULL
);
1947 static DEVICE_ATTR(refresh
, S_IWUSR
, NULL
, rbd_image_refresh
);
1948 static DEVICE_ATTR(current_snap
, S_IRUGO
, rbd_snap_show
, NULL
);
1949 static DEVICE_ATTR(create_snap
, S_IWUSR
, NULL
, rbd_snap_add
);
1951 static struct attribute
*rbd_attrs
[] = {
1952 &dev_attr_size
.attr
,
1953 &dev_attr_major
.attr
,
1954 &dev_attr_client_id
.attr
,
1955 &dev_attr_pool
.attr
,
1956 &dev_attr_pool_id
.attr
,
1957 &dev_attr_name
.attr
,
1958 &dev_attr_current_snap
.attr
,
1959 &dev_attr_refresh
.attr
,
1960 &dev_attr_create_snap
.attr
,
1964 static struct attribute_group rbd_attr_group
= {
1968 static const struct attribute_group
*rbd_attr_groups
[] = {
1973 static void rbd_sysfs_dev_release(struct device
*dev
)
1977 static struct device_type rbd_device_type
= {
1979 .groups
= rbd_attr_groups
,
1980 .release
= rbd_sysfs_dev_release
,
1988 static ssize_t
rbd_snap_size_show(struct device
*dev
,
1989 struct device_attribute
*attr
,
1992 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
1994 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->size
);
1997 static ssize_t
rbd_snap_id_show(struct device
*dev
,
1998 struct device_attribute
*attr
,
2001 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2003 return sprintf(buf
, "%llu\n", (unsigned long long)snap
->id
);
2006 static DEVICE_ATTR(snap_size
, S_IRUGO
, rbd_snap_size_show
, NULL
);
2007 static DEVICE_ATTR(snap_id
, S_IRUGO
, rbd_snap_id_show
, NULL
);
2009 static struct attribute
*rbd_snap_attrs
[] = {
2010 &dev_attr_snap_size
.attr
,
2011 &dev_attr_snap_id
.attr
,
2015 static struct attribute_group rbd_snap_attr_group
= {
2016 .attrs
= rbd_snap_attrs
,
2019 static void rbd_snap_dev_release(struct device
*dev
)
2021 struct rbd_snap
*snap
= container_of(dev
, struct rbd_snap
, dev
);
2026 static const struct attribute_group
*rbd_snap_attr_groups
[] = {
2027 &rbd_snap_attr_group
,
2031 static struct device_type rbd_snap_device_type
= {
2032 .groups
= rbd_snap_attr_groups
,
2033 .release
= rbd_snap_dev_release
,
2036 static void __rbd_remove_snap_dev(struct rbd_snap
*snap
)
2038 list_del(&snap
->node
);
2039 device_unregister(&snap
->dev
);
2042 static int rbd_register_snap_dev(struct rbd_snap
*snap
,
2043 struct device
*parent
)
2045 struct device
*dev
= &snap
->dev
;
2048 dev
->type
= &rbd_snap_device_type
;
2049 dev
->parent
= parent
;
2050 dev
->release
= rbd_snap_dev_release
;
2051 dev_set_name(dev
, "snap_%s", snap
->name
);
2052 ret
= device_register(dev
);
2057 static struct rbd_snap
*__rbd_add_snap_dev(struct rbd_device
*rbd_dev
,
2058 int i
, const char *name
)
2060 struct rbd_snap
*snap
;
2063 snap
= kzalloc(sizeof (*snap
), GFP_KERNEL
);
2065 return ERR_PTR(-ENOMEM
);
2068 snap
->name
= kstrdup(name
, GFP_KERNEL
);
2072 snap
->size
= rbd_dev
->header
.snap_sizes
[i
];
2073 snap
->id
= rbd_dev
->header
.snapc
->snaps
[i
];
2074 if (device_is_registered(&rbd_dev
->dev
)) {
2075 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
2086 return ERR_PTR(ret
);
2090 * Scan the rbd device's current snapshot list and compare it to the
2091 * newly-received snapshot context. Remove any existing snapshots
2092 * not present in the new snapshot context. Add a new snapshot for
2093 * any snaphots in the snapshot context not in the current list.
2094 * And verify there are no changes to snapshots we already know
2097 * Assumes the snapshots in the snapshot context are sorted by
2098 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2099 * are also maintained in that order.)
2101 static int __rbd_init_snaps_header(struct rbd_device
*rbd_dev
)
2103 struct ceph_snap_context
*snapc
= rbd_dev
->header
.snapc
;
2104 const u32 snap_count
= snapc
->num_snaps
;
2105 char *snap_name
= rbd_dev
->header
.snap_names
;
2106 struct list_head
*head
= &rbd_dev
->snaps
;
2107 struct list_head
*links
= head
->next
;
2110 while (index
< snap_count
|| links
!= head
) {
2112 struct rbd_snap
*snap
;
2114 snap_id
= index
< snap_count
? snapc
->snaps
[index
]
2116 snap
= links
!= head
? list_entry(links
, struct rbd_snap
, node
)
2118 BUG_ON(snap
&& snap
->id
== CEPH_NOSNAP
);
2120 if (snap_id
== CEPH_NOSNAP
|| (snap
&& snap
->id
> snap_id
)) {
2121 struct list_head
*next
= links
->next
;
2123 /* Existing snapshot not in the new snap context */
2125 if (rbd_dev
->snap_id
== snap
->id
)
2126 rbd_dev
->snap_exists
= false;
2127 __rbd_remove_snap_dev(snap
);
2129 /* Done with this list entry; advance */
2135 if (!snap
|| (snap_id
!= CEPH_NOSNAP
&& snap
->id
< snap_id
)) {
2136 struct rbd_snap
*new_snap
;
2138 /* We haven't seen this snapshot before */
2140 new_snap
= __rbd_add_snap_dev(rbd_dev
, index
,
2142 if (IS_ERR(new_snap
))
2143 return PTR_ERR(new_snap
);
2145 /* New goes before existing, or at end of list */
2148 list_add_tail(&new_snap
->node
, &snap
->node
);
2150 list_add(&new_snap
->node
, head
);
2152 /* Already have this one */
2154 BUG_ON(snap
->size
!= rbd_dev
->header
.snap_sizes
[index
]);
2155 BUG_ON(strcmp(snap
->name
, snap_name
));
2157 /* Done with this list entry; advance */
2159 links
= links
->next
;
2162 /* Advance to the next entry in the snapshot context */
2165 snap_name
+= strlen(snap_name
) + 1;
2171 static int rbd_bus_add_dev(struct rbd_device
*rbd_dev
)
2175 struct rbd_snap
*snap
;
2177 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2178 dev
= &rbd_dev
->dev
;
2180 dev
->bus
= &rbd_bus_type
;
2181 dev
->type
= &rbd_device_type
;
2182 dev
->parent
= &rbd_root_dev
;
2183 dev
->release
= rbd_dev_release
;
2184 dev_set_name(dev
, "%d", rbd_dev
->dev_id
);
2185 ret
= device_register(dev
);
2189 list_for_each_entry(snap
, &rbd_dev
->snaps
, node
) {
2190 ret
= rbd_register_snap_dev(snap
, &rbd_dev
->dev
);
2195 mutex_unlock(&ctl_mutex
);
2199 static void rbd_bus_del_dev(struct rbd_device
*rbd_dev
)
2201 device_unregister(&rbd_dev
->dev
);
2204 static int rbd_init_watch_dev(struct rbd_device
*rbd_dev
)
2209 ret
= rbd_req_sync_watch(rbd_dev
);
2210 if (ret
== -ERANGE
) {
2211 rc
= rbd_refresh_header(rbd_dev
, NULL
);
2215 } while (ret
== -ERANGE
);
2220 static atomic64_t rbd_id_max
= ATOMIC64_INIT(0);
2223 * Get a unique rbd identifier for the given new rbd_dev, and add
2224 * the rbd_dev to the global list. The minimum rbd id is 1.
2226 static void rbd_id_get(struct rbd_device
*rbd_dev
)
2228 rbd_dev
->dev_id
= atomic64_inc_return(&rbd_id_max
);
2230 spin_lock(&rbd_dev_list_lock
);
2231 list_add_tail(&rbd_dev
->node
, &rbd_dev_list
);
2232 spin_unlock(&rbd_dev_list_lock
);
2236 * Remove an rbd_dev from the global list, and record that its
2237 * identifier is no longer in use.
2239 static void rbd_id_put(struct rbd_device
*rbd_dev
)
2241 struct list_head
*tmp
;
2242 int rbd_id
= rbd_dev
->dev_id
;
2247 spin_lock(&rbd_dev_list_lock
);
2248 list_del_init(&rbd_dev
->node
);
2251 * If the id being "put" is not the current maximum, there
2252 * is nothing special we need to do.
2254 if (rbd_id
!= atomic64_read(&rbd_id_max
)) {
2255 spin_unlock(&rbd_dev_list_lock
);
2260 * We need to update the current maximum id. Search the
2261 * list to find out what it is. We're more likely to find
2262 * the maximum at the end, so search the list backward.
2265 list_for_each_prev(tmp
, &rbd_dev_list
) {
2266 struct rbd_device
*rbd_dev
;
2268 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2269 if (rbd_id
> max_id
)
2272 spin_unlock(&rbd_dev_list_lock
);
2275 * The max id could have been updated by rbd_id_get(), in
2276 * which case it now accurately reflects the new maximum.
2277 * Be careful not to overwrite the maximum value in that
2280 atomic64_cmpxchg(&rbd_id_max
, rbd_id
, max_id
);
2284 * Skips over white space at *buf, and updates *buf to point to the
2285 * first found non-space character (if any). Returns the length of
2286 * the token (string of non-white space characters) found. Note
2287 * that *buf must be terminated with '\0'.
2289 static inline size_t next_token(const char **buf
)
2292 * These are the characters that produce nonzero for
2293 * isspace() in the "C" and "POSIX" locales.
2295 const char *spaces
= " \f\n\r\t\v";
2297 *buf
+= strspn(*buf
, spaces
); /* Find start of token */
2299 return strcspn(*buf
, spaces
); /* Return token length */
2303 * Finds the next token in *buf, and if the provided token buffer is
2304 * big enough, copies the found token into it. The result, if
2305 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2306 * must be terminated with '\0' on entry.
2308 * Returns the length of the token found (not including the '\0').
2309 * Return value will be 0 if no token is found, and it will be >=
2310 * token_size if the token would not fit.
2312 * The *buf pointer will be updated to point beyond the end of the
2313 * found token. Note that this occurs even if the token buffer is
2314 * too small to hold it.
2316 static inline size_t copy_token(const char **buf
,
2322 len
= next_token(buf
);
2323 if (len
< token_size
) {
2324 memcpy(token
, *buf
, len
);
2325 *(token
+ len
) = '\0';
2333 * Finds the next token in *buf, dynamically allocates a buffer big
2334 * enough to hold a copy of it, and copies the token into the new
2335 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2336 * that a duplicate buffer is created even for a zero-length token.
2338 * Returns a pointer to the newly-allocated duplicate, or a null
2339 * pointer if memory for the duplicate was not available. If
2340 * the lenp argument is a non-null pointer, the length of the token
2341 * (not including the '\0') is returned in *lenp.
2343 * If successful, the *buf pointer will be updated to point beyond
2344 * the end of the found token.
2346 * Note: uses GFP_KERNEL for allocation.
2348 static inline char *dup_token(const char **buf
, size_t *lenp
)
2353 len
= next_token(buf
);
2354 dup
= kmalloc(len
+ 1, GFP_KERNEL
);
2358 memcpy(dup
, *buf
, len
);
2359 *(dup
+ len
) = '\0';
2369 * This fills in the pool_name, image_name, image_name_len, snap_name,
2370 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2371 * on the list of monitor addresses and other options provided via
2374 * Note: rbd_dev is assumed to have been initially zero-filled.
2376 static int rbd_add_parse_args(struct rbd_device
*rbd_dev
,
2378 const char **mon_addrs
,
2379 size_t *mon_addrs_size
,
2381 size_t options_size
)
2386 /* The first four tokens are required */
2388 len
= next_token(&buf
);
2391 *mon_addrs_size
= len
+ 1;
2396 len
= copy_token(&buf
, options
, options_size
);
2397 if (!len
|| len
>= options_size
)
2401 rbd_dev
->pool_name
= dup_token(&buf
, NULL
);
2402 if (!rbd_dev
->pool_name
)
2405 rbd_dev
->image_name
= dup_token(&buf
, &rbd_dev
->image_name_len
);
2406 if (!rbd_dev
->image_name
)
2409 /* Create the name of the header object */
2411 rbd_dev
->header_name
= kmalloc(rbd_dev
->image_name_len
2412 + sizeof (RBD_SUFFIX
),
2414 if (!rbd_dev
->header_name
)
2416 sprintf(rbd_dev
->header_name
, "%s%s", rbd_dev
->image_name
, RBD_SUFFIX
);
2419 * The snapshot name is optional. If none is is supplied,
2420 * we use the default value.
2422 rbd_dev
->snap_name
= dup_token(&buf
, &len
);
2423 if (!rbd_dev
->snap_name
)
2426 /* Replace the empty name with the default */
2427 kfree(rbd_dev
->snap_name
);
2429 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME
), GFP_KERNEL
);
2430 if (!rbd_dev
->snap_name
)
2433 memcpy(rbd_dev
->snap_name
, RBD_SNAP_HEAD_NAME
,
2434 sizeof (RBD_SNAP_HEAD_NAME
));
2440 kfree(rbd_dev
->header_name
);
2441 rbd_dev
->header_name
= NULL
;
2442 kfree(rbd_dev
->image_name
);
2443 rbd_dev
->image_name
= NULL
;
2444 rbd_dev
->image_name_len
= 0;
2445 kfree(rbd_dev
->pool_name
);
2446 rbd_dev
->pool_name
= NULL
;
2451 static ssize_t
rbd_add(struct bus_type
*bus
,
2456 struct rbd_device
*rbd_dev
= NULL
;
2457 const char *mon_addrs
= NULL
;
2458 size_t mon_addrs_size
= 0;
2459 struct ceph_osd_client
*osdc
;
2462 if (!try_module_get(THIS_MODULE
))
2465 options
= kmalloc(count
, GFP_KERNEL
);
2468 rbd_dev
= kzalloc(sizeof(*rbd_dev
), GFP_KERNEL
);
2472 /* static rbd_device initialization */
2473 spin_lock_init(&rbd_dev
->lock
);
2474 INIT_LIST_HEAD(&rbd_dev
->node
);
2475 INIT_LIST_HEAD(&rbd_dev
->snaps
);
2476 init_rwsem(&rbd_dev
->header_rwsem
);
2478 /* generate unique id: find highest unique id, add one */
2479 rbd_id_get(rbd_dev
);
2481 /* Fill in the device name, now that we have its id. */
2482 BUILD_BUG_ON(DEV_NAME_LEN
2483 < sizeof (RBD_DRV_NAME
) + MAX_INT_FORMAT_WIDTH
);
2484 sprintf(rbd_dev
->name
, "%s%d", RBD_DRV_NAME
, rbd_dev
->dev_id
);
2486 /* parse add command */
2487 rc
= rbd_add_parse_args(rbd_dev
, buf
, &mon_addrs
, &mon_addrs_size
,
2492 rbd_dev
->rbd_client
= rbd_get_client(mon_addrs
, mon_addrs_size
- 1,
2494 if (IS_ERR(rbd_dev
->rbd_client
)) {
2495 rc
= PTR_ERR(rbd_dev
->rbd_client
);
2496 rbd_dev
->rbd_client
= NULL
;
2501 osdc
= &rbd_dev
->rbd_client
->client
->osdc
;
2502 rc
= ceph_pg_poolid_by_name(osdc
->osdmap
, rbd_dev
->pool_name
);
2504 goto err_out_client
;
2505 rbd_dev
->pool_id
= rc
;
2507 /* register our block device */
2508 rc
= register_blkdev(0, rbd_dev
->name
);
2510 goto err_out_client
;
2511 rbd_dev
->major
= rc
;
2513 rc
= rbd_bus_add_dev(rbd_dev
);
2515 goto err_out_blkdev
;
2518 * At this point cleanup in the event of an error is the job
2519 * of the sysfs code (initiated by rbd_bus_del_dev()).
2521 * Set up and announce blkdev mapping.
2523 rc
= rbd_init_disk(rbd_dev
);
2527 rc
= rbd_init_watch_dev(rbd_dev
);
2534 /* this will also clean up rest of rbd_dev stuff */
2536 rbd_bus_del_dev(rbd_dev
);
2541 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2543 rbd_put_client(rbd_dev
);
2545 if (rbd_dev
->pool_name
) {
2546 kfree(rbd_dev
->snap_name
);
2547 kfree(rbd_dev
->header_name
);
2548 kfree(rbd_dev
->image_name
);
2549 kfree(rbd_dev
->pool_name
);
2551 rbd_id_put(rbd_dev
);
2556 dout("Error adding device %s\n", buf
);
2557 module_put(THIS_MODULE
);
2559 return (ssize_t
) rc
;
2562 static struct rbd_device
*__rbd_get_dev(unsigned long dev_id
)
2564 struct list_head
*tmp
;
2565 struct rbd_device
*rbd_dev
;
2567 spin_lock(&rbd_dev_list_lock
);
2568 list_for_each(tmp
, &rbd_dev_list
) {
2569 rbd_dev
= list_entry(tmp
, struct rbd_device
, node
);
2570 if (rbd_dev
->dev_id
== dev_id
) {
2571 spin_unlock(&rbd_dev_list_lock
);
2575 spin_unlock(&rbd_dev_list_lock
);
2579 static void rbd_dev_release(struct device
*dev
)
2581 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2583 if (rbd_dev
->watch_request
) {
2584 struct ceph_client
*client
= rbd_dev
->rbd_client
->client
;
2586 ceph_osdc_unregister_linger_request(&client
->osdc
,
2587 rbd_dev
->watch_request
);
2589 if (rbd_dev
->watch_event
)
2590 rbd_req_sync_unwatch(rbd_dev
);
2592 rbd_put_client(rbd_dev
);
2594 /* clean up and free blkdev */
2595 rbd_free_disk(rbd_dev
);
2596 unregister_blkdev(rbd_dev
->major
, rbd_dev
->name
);
2598 /* done with the id, and with the rbd_dev */
2599 kfree(rbd_dev
->snap_name
);
2600 kfree(rbd_dev
->header_name
);
2601 kfree(rbd_dev
->pool_name
);
2602 kfree(rbd_dev
->image_name
);
2603 rbd_id_put(rbd_dev
);
2606 /* release module ref */
2607 module_put(THIS_MODULE
);
2610 static ssize_t
rbd_remove(struct bus_type
*bus
,
2614 struct rbd_device
*rbd_dev
= NULL
;
2619 rc
= strict_strtoul(buf
, 10, &ul
);
2623 /* convert to int; abort if we lost anything in the conversion */
2624 target_id
= (int) ul
;
2625 if (target_id
!= ul
)
2628 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2630 rbd_dev
= __rbd_get_dev(target_id
);
2636 __rbd_remove_all_snaps(rbd_dev
);
2637 rbd_bus_del_dev(rbd_dev
);
2640 mutex_unlock(&ctl_mutex
);
2644 static ssize_t
rbd_snap_add(struct device
*dev
,
2645 struct device_attribute
*attr
,
2649 struct rbd_device
*rbd_dev
= dev_to_rbd_dev(dev
);
2651 char *name
= kmalloc(count
+ 1, GFP_KERNEL
);
2655 snprintf(name
, count
, "%s", buf
);
2657 mutex_lock_nested(&ctl_mutex
, SINGLE_DEPTH_NESTING
);
2659 ret
= rbd_header_add_snap(rbd_dev
,
2664 ret
= __rbd_refresh_header(rbd_dev
, NULL
);
2668 /* shouldn't hold ctl_mutex when notifying.. notify might
2669 trigger a watch callback that would need to get that mutex */
2670 mutex_unlock(&ctl_mutex
);
2672 /* make a best effort, don't error if failed */
2673 rbd_req_sync_notify(rbd_dev
);
2680 mutex_unlock(&ctl_mutex
);
2686 * create control files in sysfs
2689 static int rbd_sysfs_init(void)
2693 ret
= device_register(&rbd_root_dev
);
2697 ret
= bus_register(&rbd_bus_type
);
2699 device_unregister(&rbd_root_dev
);
2704 static void rbd_sysfs_cleanup(void)
2706 bus_unregister(&rbd_bus_type
);
2707 device_unregister(&rbd_root_dev
);
2710 int __init
rbd_init(void)
2714 rc
= rbd_sysfs_init();
2717 pr_info("loaded " RBD_DRV_NAME_LONG
"\n");
2721 void __exit
rbd_exit(void)
2723 rbd_sysfs_cleanup();
2726 module_init(rbd_init
);
2727 module_exit(rbd_exit
);
2729 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2730 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2731 MODULE_DESCRIPTION("rados block device");
2733 /* following authorship retained from original osdblk.c */
2734 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2736 MODULE_LICENSE("GPL");