]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blob - drivers/block/rbd.c
rbd: bio_chain_clone() cleanups
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
60
61 #define RBD_SNAP_HEAD_NAME "-"
62
63 /*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_READ_ONLY_DEFAULT false
73
74 /*
75 * block device image metadata (in-memory version)
76 */
77 struct rbd_image_header {
78 u64 image_size;
79 char *object_prefix;
80 __u8 obj_order;
81 __u8 crypt_type;
82 __u8 comp_type;
83 struct ceph_snap_context *snapc;
84 u32 total_snaps;
85
86 char *snap_names;
87 u64 *snap_sizes;
88
89 u64 obj_version;
90 };
91
92 struct rbd_options {
93 bool read_only;
94 };
95
96 /*
97 * an instance of the client. multiple devices may share an rbd client.
98 */
99 struct rbd_client {
100 struct ceph_client *client;
101 struct kref kref;
102 struct list_head node;
103 };
104
105 /*
106 * a request completion status
107 */
108 struct rbd_req_status {
109 int done;
110 int rc;
111 u64 bytes;
112 };
113
114 /*
115 * a collection of requests
116 */
117 struct rbd_req_coll {
118 int total;
119 int num_done;
120 struct kref kref;
121 struct rbd_req_status status[0];
122 };
123
124 /*
125 * a single io request
126 */
127 struct rbd_request {
128 struct request *rq; /* blk layer request */
129 struct bio *bio; /* cloned bio */
130 struct page **pages; /* list of used pages */
131 u64 len;
132 int coll_index;
133 struct rbd_req_coll *coll;
134 };
135
136 struct rbd_snap {
137 struct device dev;
138 const char *name;
139 u64 size;
140 struct list_head node;
141 u64 id;
142 };
143
144 /*
145 * a single device
146 */
147 struct rbd_device {
148 int dev_id; /* blkdev unique id */
149
150 int major; /* blkdev assigned major */
151 struct gendisk *disk; /* blkdev's gendisk and rq */
152 struct request_queue *q;
153
154 struct rbd_options rbd_opts;
155 struct rbd_client *rbd_client;
156
157 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
158
159 spinlock_t lock; /* queue lock */
160
161 struct rbd_image_header header;
162 char *image_name;
163 size_t image_name_len;
164 char *header_name;
165 char *pool_name;
166 int pool_id;
167
168 struct ceph_osd_event *watch_event;
169 struct ceph_osd_request *watch_request;
170
171 /* protects updating the header */
172 struct rw_semaphore header_rwsem;
173 /* name of the snapshot this device reads from */
174 char *snap_name;
175 /* id of the snapshot this device reads from */
176 u64 snap_id; /* current snapshot id */
177 /* whether the snap_id this device reads from still exists */
178 bool snap_exists;
179 bool read_only;
180
181 struct list_head node;
182
183 /* list of snapshots */
184 struct list_head snaps;
185
186 /* sysfs related */
187 struct device dev;
188 };
189
190 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
191
192 static LIST_HEAD(rbd_dev_list); /* devices */
193 static DEFINE_SPINLOCK(rbd_dev_list_lock);
194
195 static LIST_HEAD(rbd_client_list); /* clients */
196 static DEFINE_SPINLOCK(rbd_client_list_lock);
197
198 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
199 static void rbd_dev_release(struct device *dev);
200 static ssize_t rbd_snap_add(struct device *dev,
201 struct device_attribute *attr,
202 const char *buf,
203 size_t count);
204 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
205
206 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
207 size_t count);
208 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
209 size_t count);
210
211 static struct bus_attribute rbd_bus_attrs[] = {
212 __ATTR(add, S_IWUSR, NULL, rbd_add),
213 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
214 __ATTR_NULL
215 };
216
217 static struct bus_type rbd_bus_type = {
218 .name = "rbd",
219 .bus_attrs = rbd_bus_attrs,
220 };
221
222 static void rbd_root_dev_release(struct device *dev)
223 {
224 }
225
226 static struct device rbd_root_dev = {
227 .init_name = "rbd",
228 .release = rbd_root_dev_release,
229 };
230
231
232 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
233 {
234 return get_device(&rbd_dev->dev);
235 }
236
237 static void rbd_put_dev(struct rbd_device *rbd_dev)
238 {
239 put_device(&rbd_dev->dev);
240 }
241
242 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
243
244 static int rbd_open(struct block_device *bdev, fmode_t mode)
245 {
246 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
247
248 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
249 return -EROFS;
250
251 rbd_get_dev(rbd_dev);
252 set_device_ro(bdev, rbd_dev->read_only);
253
254 return 0;
255 }
256
257 static int rbd_release(struct gendisk *disk, fmode_t mode)
258 {
259 struct rbd_device *rbd_dev = disk->private_data;
260
261 rbd_put_dev(rbd_dev);
262
263 return 0;
264 }
265
266 static const struct block_device_operations rbd_bd_ops = {
267 .owner = THIS_MODULE,
268 .open = rbd_open,
269 .release = rbd_release,
270 };
271
272 /*
273 * Initialize an rbd client instance.
274 * We own *ceph_opts.
275 */
276 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
277 {
278 struct rbd_client *rbdc;
279 int ret = -ENOMEM;
280
281 dout("rbd_client_create\n");
282 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
283 if (!rbdc)
284 goto out_opt;
285
286 kref_init(&rbdc->kref);
287 INIT_LIST_HEAD(&rbdc->node);
288
289 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
290
291 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
292 if (IS_ERR(rbdc->client))
293 goto out_mutex;
294 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
295
296 ret = ceph_open_session(rbdc->client);
297 if (ret < 0)
298 goto out_err;
299
300 spin_lock(&rbd_client_list_lock);
301 list_add_tail(&rbdc->node, &rbd_client_list);
302 spin_unlock(&rbd_client_list_lock);
303
304 mutex_unlock(&ctl_mutex);
305
306 dout("rbd_client_create created %p\n", rbdc);
307 return rbdc;
308
309 out_err:
310 ceph_destroy_client(rbdc->client);
311 out_mutex:
312 mutex_unlock(&ctl_mutex);
313 kfree(rbdc);
314 out_opt:
315 if (ceph_opts)
316 ceph_destroy_options(ceph_opts);
317 return ERR_PTR(ret);
318 }
319
320 /*
321 * Find a ceph client with specific addr and configuration. If
322 * found, bump its reference count.
323 */
324 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
325 {
326 struct rbd_client *client_node;
327 bool found = false;
328
329 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
330 return NULL;
331
332 spin_lock(&rbd_client_list_lock);
333 list_for_each_entry(client_node, &rbd_client_list, node) {
334 if (!ceph_compare_options(ceph_opts, client_node->client)) {
335 kref_get(&client_node->kref);
336 found = true;
337 break;
338 }
339 }
340 spin_unlock(&rbd_client_list_lock);
341
342 return found ? client_node : NULL;
343 }
344
345 /*
346 * mount options
347 */
348 enum {
349 Opt_last_int,
350 /* int args above */
351 Opt_last_string,
352 /* string args above */
353 Opt_read_only,
354 Opt_read_write,
355 /* Boolean args above */
356 Opt_last_bool,
357 };
358
359 static match_table_t rbd_opts_tokens = {
360 /* int args above */
361 /* string args above */
362 {Opt_read_only, "read_only"},
363 {Opt_read_only, "ro"}, /* Alternate spelling */
364 {Opt_read_write, "read_write"},
365 {Opt_read_write, "rw"}, /* Alternate spelling */
366 /* Boolean args above */
367 {-1, NULL}
368 };
369
370 static int parse_rbd_opts_token(char *c, void *private)
371 {
372 struct rbd_options *rbd_opts = private;
373 substring_t argstr[MAX_OPT_ARGS];
374 int token, intval, ret;
375
376 token = match_token(c, rbd_opts_tokens, argstr);
377 if (token < 0)
378 return -EINVAL;
379
380 if (token < Opt_last_int) {
381 ret = match_int(&argstr[0], &intval);
382 if (ret < 0) {
383 pr_err("bad mount option arg (not int) "
384 "at '%s'\n", c);
385 return ret;
386 }
387 dout("got int token %d val %d\n", token, intval);
388 } else if (token > Opt_last_int && token < Opt_last_string) {
389 dout("got string token %d val %s\n", token,
390 argstr[0].from);
391 } else if (token > Opt_last_string && token < Opt_last_bool) {
392 dout("got Boolean token %d\n", token);
393 } else {
394 dout("got token %d\n", token);
395 }
396
397 switch (token) {
398 case Opt_read_only:
399 rbd_opts->read_only = true;
400 break;
401 case Opt_read_write:
402 rbd_opts->read_only = false;
403 break;
404 default:
405 BUG_ON(token);
406 }
407 return 0;
408 }
409
410 /*
411 * Get a ceph client with specific addr and configuration, if one does
412 * not exist create it.
413 */
414 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
415 size_t mon_addr_len, char *options)
416 {
417 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
418 struct ceph_options *ceph_opts;
419 struct rbd_client *rbdc;
420
421 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
422
423 ceph_opts = ceph_parse_options(options, mon_addr,
424 mon_addr + mon_addr_len,
425 parse_rbd_opts_token, rbd_opts);
426 if (IS_ERR(ceph_opts))
427 return PTR_ERR(ceph_opts);
428
429 rbdc = rbd_client_find(ceph_opts);
430 if (rbdc) {
431 /* using an existing client */
432 ceph_destroy_options(ceph_opts);
433 } else {
434 rbdc = rbd_client_create(ceph_opts);
435 if (IS_ERR(rbdc))
436 return PTR_ERR(rbdc);
437 }
438 rbd_dev->rbd_client = rbdc;
439
440 return 0;
441 }
442
443 /*
444 * Destroy ceph client
445 *
446 * Caller must hold rbd_client_list_lock.
447 */
448 static void rbd_client_release(struct kref *kref)
449 {
450 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
451
452 dout("rbd_release_client %p\n", rbdc);
453 spin_lock(&rbd_client_list_lock);
454 list_del(&rbdc->node);
455 spin_unlock(&rbd_client_list_lock);
456
457 ceph_destroy_client(rbdc->client);
458 kfree(rbdc);
459 }
460
461 /*
462 * Drop reference to ceph client node. If it's not referenced anymore, release
463 * it.
464 */
465 static void rbd_put_client(struct rbd_device *rbd_dev)
466 {
467 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
468 rbd_dev->rbd_client = NULL;
469 }
470
471 /*
472 * Destroy requests collection
473 */
474 static void rbd_coll_release(struct kref *kref)
475 {
476 struct rbd_req_coll *coll =
477 container_of(kref, struct rbd_req_coll, kref);
478
479 dout("rbd_coll_release %p\n", coll);
480 kfree(coll);
481 }
482
483 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
484 {
485 size_t size;
486 u32 snap_count;
487
488 /* The header has to start with the magic rbd header text */
489 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
490 return false;
491
492 /*
493 * The size of a snapshot header has to fit in a size_t, and
494 * that limits the number of snapshots.
495 */
496 snap_count = le32_to_cpu(ondisk->snap_count);
497 size = SIZE_MAX - sizeof (struct ceph_snap_context);
498 if (snap_count > size / sizeof (__le64))
499 return false;
500
501 /*
502 * Not only that, but the size of the entire the snapshot
503 * header must also be representable in a size_t.
504 */
505 size -= snap_count * sizeof (__le64);
506 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
507 return false;
508
509 return true;
510 }
511
512 /*
513 * Create a new header structure, translate header format from the on-disk
514 * header.
515 */
516 static int rbd_header_from_disk(struct rbd_image_header *header,
517 struct rbd_image_header_ondisk *ondisk)
518 {
519 u32 snap_count;
520 size_t len;
521 size_t size;
522 u32 i;
523
524 memset(header, 0, sizeof (*header));
525
526 snap_count = le32_to_cpu(ondisk->snap_count);
527
528 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
529 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
530 if (!header->object_prefix)
531 return -ENOMEM;
532 memcpy(header->object_prefix, ondisk->object_prefix, len);
533 header->object_prefix[len] = '\0';
534
535 if (snap_count) {
536 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
537
538 /* Save a copy of the snapshot names */
539
540 if (snap_names_len > (u64) SIZE_MAX)
541 return -EIO;
542 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
543 if (!header->snap_names)
544 goto out_err;
545 /*
546 * Note that rbd_dev_v1_header_read() guarantees
547 * the ondisk buffer we're working with has
548 * snap_names_len bytes beyond the end of the
549 * snapshot id array, this memcpy() is safe.
550 */
551 memcpy(header->snap_names, &ondisk->snaps[snap_count],
552 snap_names_len);
553
554 /* Record each snapshot's size */
555
556 size = snap_count * sizeof (*header->snap_sizes);
557 header->snap_sizes = kmalloc(size, GFP_KERNEL);
558 if (!header->snap_sizes)
559 goto out_err;
560 for (i = 0; i < snap_count; i++)
561 header->snap_sizes[i] =
562 le64_to_cpu(ondisk->snaps[i].image_size);
563 } else {
564 WARN_ON(ondisk->snap_names_len);
565 header->snap_names = NULL;
566 header->snap_sizes = NULL;
567 }
568
569 header->image_size = le64_to_cpu(ondisk->image_size);
570 header->obj_order = ondisk->options.order;
571 header->crypt_type = ondisk->options.crypt_type;
572 header->comp_type = ondisk->options.comp_type;
573 header->total_snaps = snap_count;
574
575 /* Allocate and fill in the snapshot context */
576
577 size = sizeof (struct ceph_snap_context);
578 size += snap_count * sizeof (header->snapc->snaps[0]);
579 header->snapc = kzalloc(size, GFP_KERNEL);
580 if (!header->snapc)
581 goto out_err;
582
583 atomic_set(&header->snapc->nref, 1);
584 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
585 header->snapc->num_snaps = snap_count;
586 for (i = 0; i < snap_count; i++)
587 header->snapc->snaps[i] =
588 le64_to_cpu(ondisk->snaps[i].id);
589
590 return 0;
591
592 out_err:
593 kfree(header->snap_sizes);
594 header->snap_sizes = NULL;
595 kfree(header->snap_names);
596 header->snap_names = NULL;
597 kfree(header->object_prefix);
598 header->object_prefix = NULL;
599
600 return -ENOMEM;
601 }
602
603 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
604 u64 *seq, u64 *size)
605 {
606 int i;
607 char *p = header->snap_names;
608
609 for (i = 0; i < header->total_snaps; i++) {
610 if (!strcmp(snap_name, p)) {
611
612 /* Found it. Pass back its id and/or size */
613
614 if (seq)
615 *seq = header->snapc->snaps[i];
616 if (size)
617 *size = header->snap_sizes[i];
618 return i;
619 }
620 p += strlen(p) + 1; /* Skip ahead to the next name */
621 }
622 return -ENOENT;
623 }
624
625 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
626 {
627 int ret;
628
629 down_write(&rbd_dev->header_rwsem);
630
631 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
632 sizeof (RBD_SNAP_HEAD_NAME))) {
633 rbd_dev->snap_id = CEPH_NOSNAP;
634 rbd_dev->snap_exists = false;
635 rbd_dev->read_only = rbd_dev->rbd_opts.read_only;
636 if (size)
637 *size = rbd_dev->header.image_size;
638 } else {
639 u64 snap_id = 0;
640
641 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
642 &snap_id, size);
643 if (ret < 0)
644 goto done;
645 rbd_dev->snap_id = snap_id;
646 rbd_dev->snap_exists = true;
647 rbd_dev->read_only = true; /* No choice for snapshots */
648 }
649
650 ret = 0;
651 done:
652 up_write(&rbd_dev->header_rwsem);
653 return ret;
654 }
655
656 static void rbd_header_free(struct rbd_image_header *header)
657 {
658 kfree(header->object_prefix);
659 header->object_prefix = NULL;
660 kfree(header->snap_sizes);
661 header->snap_sizes = NULL;
662 kfree(header->snap_names);
663 header->snap_names = NULL;
664 ceph_put_snap_context(header->snapc);
665 header->snapc = NULL;
666 }
667
668 /*
669 * get the actual striped segment name, offset and length
670 */
671 static u64 rbd_get_segment(struct rbd_image_header *header,
672 const char *object_prefix,
673 u64 ofs, u64 len,
674 char *seg_name, u64 *segofs)
675 {
676 u64 seg = ofs >> header->obj_order;
677
678 if (seg_name)
679 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
680 "%s.%012llx", object_prefix, seg);
681
682 ofs = ofs & ((1 << header->obj_order) - 1);
683 len = min_t(u64, len, (1 << header->obj_order) - ofs);
684
685 if (segofs)
686 *segofs = ofs;
687
688 return len;
689 }
690
691 static int rbd_get_num_segments(struct rbd_image_header *header,
692 u64 ofs, u64 len)
693 {
694 u64 start_seg = ofs >> header->obj_order;
695 u64 end_seg = (ofs + len - 1) >> header->obj_order;
696 return end_seg - start_seg + 1;
697 }
698
699 /*
700 * returns the size of an object in the image
701 */
702 static u64 rbd_obj_bytes(struct rbd_image_header *header)
703 {
704 return 1 << header->obj_order;
705 }
706
707 /*
708 * bio helpers
709 */
710
711 static void bio_chain_put(struct bio *chain)
712 {
713 struct bio *tmp;
714
715 while (chain) {
716 tmp = chain;
717 chain = chain->bi_next;
718 bio_put(tmp);
719 }
720 }
721
722 /*
723 * zeros a bio chain, starting at specific offset
724 */
725 static void zero_bio_chain(struct bio *chain, int start_ofs)
726 {
727 struct bio_vec *bv;
728 unsigned long flags;
729 void *buf;
730 int i;
731 int pos = 0;
732
733 while (chain) {
734 bio_for_each_segment(bv, chain, i) {
735 if (pos + bv->bv_len > start_ofs) {
736 int remainder = max(start_ofs - pos, 0);
737 buf = bvec_kmap_irq(bv, &flags);
738 memset(buf + remainder, 0,
739 bv->bv_len - remainder);
740 bvec_kunmap_irq(buf, &flags);
741 }
742 pos += bv->bv_len;
743 }
744
745 chain = chain->bi_next;
746 }
747 }
748
749 /*
750 * bio_chain_clone - clone a chain of bios up to a certain length.
751 * might return a bio_pair that will need to be released.
752 */
753 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
754 struct bio_pair **bp,
755 int len, gfp_t gfpmask)
756 {
757 struct bio *old_chain = *old;
758 struct bio *new_chain = NULL;
759 struct bio *tail;
760 int total = 0;
761
762 if (*bp) {
763 bio_pair_release(*bp);
764 *bp = NULL;
765 }
766
767 while (old_chain && (total < len)) {
768 struct bio *tmp;
769
770 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
771 if (!tmp)
772 goto err_out;
773 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
774
775 if (total + old_chain->bi_size > len) {
776 struct bio_pair *bp;
777
778 /*
779 * this split can only happen with a single paged bio,
780 * split_bio will BUG_ON if this is not the case
781 */
782 dout("bio_chain_clone split! total=%d remaining=%d"
783 "bi_size=%u\n",
784 total, len - total, old_chain->bi_size);
785
786 /* split the bio. We'll release it either in the next
787 call, or it will have to be released outside */
788 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
789 if (!bp)
790 goto err_out;
791
792 __bio_clone(tmp, &bp->bio1);
793
794 *next = &bp->bio2;
795 } else {
796 __bio_clone(tmp, old_chain);
797 *next = old_chain->bi_next;
798 }
799
800 tmp->bi_bdev = NULL;
801 tmp->bi_next = NULL;
802 if (new_chain)
803 tail->bi_next = tmp;
804 else
805 new_chain = tmp;
806 tail = tmp;
807 old_chain = old_chain->bi_next;
808
809 total += tmp->bi_size;
810 }
811
812 BUG_ON(total < len);
813
814 *old = old_chain;
815
816 return new_chain;
817
818 err_out:
819 dout("bio_chain_clone with err\n");
820 bio_chain_put(new_chain);
821 return NULL;
822 }
823
824 /*
825 * helpers for osd request op vectors.
826 */
827 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
828 int opcode, u32 payload_len)
829 {
830 struct ceph_osd_req_op *ops;
831
832 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
833 if (!ops)
834 return NULL;
835
836 ops[0].op = opcode;
837
838 /*
839 * op extent offset and length will be set later on
840 * in calc_raw_layout()
841 */
842 ops[0].payload_len = payload_len;
843
844 return ops;
845 }
846
847 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
848 {
849 kfree(ops);
850 }
851
852 static void rbd_coll_end_req_index(struct request *rq,
853 struct rbd_req_coll *coll,
854 int index,
855 int ret, u64 len)
856 {
857 struct request_queue *q;
858 int min, max, i;
859
860 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
861 coll, index, ret, (unsigned long long) len);
862
863 if (!rq)
864 return;
865
866 if (!coll) {
867 blk_end_request(rq, ret, len);
868 return;
869 }
870
871 q = rq->q;
872
873 spin_lock_irq(q->queue_lock);
874 coll->status[index].done = 1;
875 coll->status[index].rc = ret;
876 coll->status[index].bytes = len;
877 max = min = coll->num_done;
878 while (max < coll->total && coll->status[max].done)
879 max++;
880
881 for (i = min; i<max; i++) {
882 __blk_end_request(rq, coll->status[i].rc,
883 coll->status[i].bytes);
884 coll->num_done++;
885 kref_put(&coll->kref, rbd_coll_release);
886 }
887 spin_unlock_irq(q->queue_lock);
888 }
889
890 static void rbd_coll_end_req(struct rbd_request *req,
891 int ret, u64 len)
892 {
893 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
894 }
895
896 /*
897 * Send ceph osd request
898 */
899 static int rbd_do_request(struct request *rq,
900 struct rbd_device *rbd_dev,
901 struct ceph_snap_context *snapc,
902 u64 snapid,
903 const char *object_name, u64 ofs, u64 len,
904 struct bio *bio,
905 struct page **pages,
906 int num_pages,
907 int flags,
908 struct ceph_osd_req_op *ops,
909 struct rbd_req_coll *coll,
910 int coll_index,
911 void (*rbd_cb)(struct ceph_osd_request *req,
912 struct ceph_msg *msg),
913 struct ceph_osd_request **linger_req,
914 u64 *ver)
915 {
916 struct ceph_osd_request *req;
917 struct ceph_file_layout *layout;
918 int ret;
919 u64 bno;
920 struct timespec mtime = CURRENT_TIME;
921 struct rbd_request *req_data;
922 struct ceph_osd_request_head *reqhead;
923 struct ceph_osd_client *osdc;
924
925 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
926 if (!req_data) {
927 if (coll)
928 rbd_coll_end_req_index(rq, coll, coll_index,
929 -ENOMEM, len);
930 return -ENOMEM;
931 }
932
933 if (coll) {
934 req_data->coll = coll;
935 req_data->coll_index = coll_index;
936 }
937
938 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
939 (unsigned long long) ofs, (unsigned long long) len);
940
941 osdc = &rbd_dev->rbd_client->client->osdc;
942 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
943 false, GFP_NOIO, pages, bio);
944 if (!req) {
945 ret = -ENOMEM;
946 goto done_pages;
947 }
948
949 req->r_callback = rbd_cb;
950
951 req_data->rq = rq;
952 req_data->bio = bio;
953 req_data->pages = pages;
954 req_data->len = len;
955
956 req->r_priv = req_data;
957
958 reqhead = req->r_request->front.iov_base;
959 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
960
961 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
962 req->r_oid_len = strlen(req->r_oid);
963
964 layout = &req->r_file_layout;
965 memset(layout, 0, sizeof(*layout));
966 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
967 layout->fl_stripe_count = cpu_to_le32(1);
968 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
969 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
970 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
971 req, ops);
972
973 ceph_osdc_build_request(req, ofs, &len,
974 ops,
975 snapc,
976 &mtime,
977 req->r_oid, req->r_oid_len);
978
979 if (linger_req) {
980 ceph_osdc_set_request_linger(osdc, req);
981 *linger_req = req;
982 }
983
984 ret = ceph_osdc_start_request(osdc, req, false);
985 if (ret < 0)
986 goto done_err;
987
988 if (!rbd_cb) {
989 ret = ceph_osdc_wait_request(osdc, req);
990 if (ver)
991 *ver = le64_to_cpu(req->r_reassert_version.version);
992 dout("reassert_ver=%llu\n",
993 (unsigned long long)
994 le64_to_cpu(req->r_reassert_version.version));
995 ceph_osdc_put_request(req);
996 }
997 return ret;
998
999 done_err:
1000 bio_chain_put(req_data->bio);
1001 ceph_osdc_put_request(req);
1002 done_pages:
1003 rbd_coll_end_req(req_data, ret, len);
1004 kfree(req_data);
1005 return ret;
1006 }
1007
1008 /*
1009 * Ceph osd op callback
1010 */
1011 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1012 {
1013 struct rbd_request *req_data = req->r_priv;
1014 struct ceph_osd_reply_head *replyhead;
1015 struct ceph_osd_op *op;
1016 __s32 rc;
1017 u64 bytes;
1018 int read_op;
1019
1020 /* parse reply */
1021 replyhead = msg->front.iov_base;
1022 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1023 op = (void *)(replyhead + 1);
1024 rc = le32_to_cpu(replyhead->result);
1025 bytes = le64_to_cpu(op->extent.length);
1026 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1027
1028 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1029 (unsigned long long) bytes, read_op, (int) rc);
1030
1031 if (rc == -ENOENT && read_op) {
1032 zero_bio_chain(req_data->bio, 0);
1033 rc = 0;
1034 } else if (rc == 0 && read_op && bytes < req_data->len) {
1035 zero_bio_chain(req_data->bio, bytes);
1036 bytes = req_data->len;
1037 }
1038
1039 rbd_coll_end_req(req_data, rc, bytes);
1040
1041 if (req_data->bio)
1042 bio_chain_put(req_data->bio);
1043
1044 ceph_osdc_put_request(req);
1045 kfree(req_data);
1046 }
1047
1048 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1049 {
1050 ceph_osdc_put_request(req);
1051 }
1052
1053 /*
1054 * Do a synchronous ceph osd operation
1055 */
1056 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1057 struct ceph_snap_context *snapc,
1058 u64 snapid,
1059 int flags,
1060 struct ceph_osd_req_op *ops,
1061 const char *object_name,
1062 u64 ofs, u64 len,
1063 char *buf,
1064 struct ceph_osd_request **linger_req,
1065 u64 *ver)
1066 {
1067 int ret;
1068 struct page **pages;
1069 int num_pages;
1070
1071 BUG_ON(ops == NULL);
1072
1073 num_pages = calc_pages_for(ofs , len);
1074 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1075 if (IS_ERR(pages))
1076 return PTR_ERR(pages);
1077
1078 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1079 object_name, ofs, len, NULL,
1080 pages, num_pages,
1081 flags,
1082 ops,
1083 NULL, 0,
1084 NULL,
1085 linger_req, ver);
1086 if (ret < 0)
1087 goto done;
1088
1089 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1090 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1091
1092 done:
1093 ceph_release_page_vector(pages, num_pages);
1094 return ret;
1095 }
1096
1097 /*
1098 * Do an asynchronous ceph osd operation
1099 */
1100 static int rbd_do_op(struct request *rq,
1101 struct rbd_device *rbd_dev,
1102 struct ceph_snap_context *snapc,
1103 u64 snapid,
1104 int opcode, int flags,
1105 u64 ofs, u64 len,
1106 struct bio *bio,
1107 struct rbd_req_coll *coll,
1108 int coll_index)
1109 {
1110 char *seg_name;
1111 u64 seg_ofs;
1112 u64 seg_len;
1113 int ret;
1114 struct ceph_osd_req_op *ops;
1115 u32 payload_len;
1116
1117 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1118 if (!seg_name)
1119 return -ENOMEM;
1120
1121 seg_len = rbd_get_segment(&rbd_dev->header,
1122 rbd_dev->header.object_prefix,
1123 ofs, len,
1124 seg_name, &seg_ofs);
1125
1126 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1127
1128 ret = -ENOMEM;
1129 ops = rbd_create_rw_ops(1, opcode, payload_len);
1130 if (!ops)
1131 goto done;
1132
1133 /* we've taken care of segment sizes earlier when we
1134 cloned the bios. We should never have a segment
1135 truncated at this point */
1136 BUG_ON(seg_len < len);
1137
1138 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1139 seg_name, seg_ofs, seg_len,
1140 bio,
1141 NULL, 0,
1142 flags,
1143 ops,
1144 coll, coll_index,
1145 rbd_req_cb, 0, NULL);
1146
1147 rbd_destroy_ops(ops);
1148 done:
1149 kfree(seg_name);
1150 return ret;
1151 }
1152
1153 /*
1154 * Request async osd write
1155 */
1156 static int rbd_req_write(struct request *rq,
1157 struct rbd_device *rbd_dev,
1158 struct ceph_snap_context *snapc,
1159 u64 ofs, u64 len,
1160 struct bio *bio,
1161 struct rbd_req_coll *coll,
1162 int coll_index)
1163 {
1164 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1165 CEPH_OSD_OP_WRITE,
1166 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1167 ofs, len, bio, coll, coll_index);
1168 }
1169
1170 /*
1171 * Request async osd read
1172 */
1173 static int rbd_req_read(struct request *rq,
1174 struct rbd_device *rbd_dev,
1175 u64 snapid,
1176 u64 ofs, u64 len,
1177 struct bio *bio,
1178 struct rbd_req_coll *coll,
1179 int coll_index)
1180 {
1181 return rbd_do_op(rq, rbd_dev, NULL,
1182 snapid,
1183 CEPH_OSD_OP_READ,
1184 CEPH_OSD_FLAG_READ,
1185 ofs, len, bio, coll, coll_index);
1186 }
1187
1188 /*
1189 * Request sync osd read
1190 */
1191 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1192 u64 snapid,
1193 const char *object_name,
1194 u64 ofs, u64 len,
1195 char *buf,
1196 u64 *ver)
1197 {
1198 struct ceph_osd_req_op *ops;
1199 int ret;
1200
1201 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1202 if (!ops)
1203 return -ENOMEM;
1204
1205 ret = rbd_req_sync_op(rbd_dev, NULL,
1206 snapid,
1207 CEPH_OSD_FLAG_READ,
1208 ops, object_name, ofs, len, buf, NULL, ver);
1209 rbd_destroy_ops(ops);
1210
1211 return ret;
1212 }
1213
1214 /*
1215 * Request sync osd watch
1216 */
1217 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1218 u64 ver,
1219 u64 notify_id)
1220 {
1221 struct ceph_osd_req_op *ops;
1222 int ret;
1223
1224 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1225 if (!ops)
1226 return -ENOMEM;
1227
1228 ops[0].watch.ver = cpu_to_le64(ver);
1229 ops[0].watch.cookie = notify_id;
1230 ops[0].watch.flag = 0;
1231
1232 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1233 rbd_dev->header_name, 0, 0, NULL,
1234 NULL, 0,
1235 CEPH_OSD_FLAG_READ,
1236 ops,
1237 NULL, 0,
1238 rbd_simple_req_cb, 0, NULL);
1239
1240 rbd_destroy_ops(ops);
1241 return ret;
1242 }
1243
1244 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1245 {
1246 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1247 u64 hver;
1248 int rc;
1249
1250 if (!rbd_dev)
1251 return;
1252
1253 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1254 rbd_dev->header_name, (unsigned long long) notify_id,
1255 (unsigned int) opcode);
1256 rc = rbd_refresh_header(rbd_dev, &hver);
1257 if (rc)
1258 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1259 " update snaps: %d\n", rbd_dev->major, rc);
1260
1261 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1262 }
1263
1264 /*
1265 * Request sync osd watch
1266 */
1267 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1268 {
1269 struct ceph_osd_req_op *ops;
1270 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1271 int ret;
1272
1273 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1274 if (!ops)
1275 return -ENOMEM;
1276
1277 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1278 (void *)rbd_dev, &rbd_dev->watch_event);
1279 if (ret < 0)
1280 goto fail;
1281
1282 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1283 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1284 ops[0].watch.flag = 1;
1285
1286 ret = rbd_req_sync_op(rbd_dev, NULL,
1287 CEPH_NOSNAP,
1288 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1289 ops,
1290 rbd_dev->header_name,
1291 0, 0, NULL,
1292 &rbd_dev->watch_request, NULL);
1293
1294 if (ret < 0)
1295 goto fail_event;
1296
1297 rbd_destroy_ops(ops);
1298 return 0;
1299
1300 fail_event:
1301 ceph_osdc_cancel_event(rbd_dev->watch_event);
1302 rbd_dev->watch_event = NULL;
1303 fail:
1304 rbd_destroy_ops(ops);
1305 return ret;
1306 }
1307
1308 /*
1309 * Request sync osd unwatch
1310 */
1311 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1312 {
1313 struct ceph_osd_req_op *ops;
1314 int ret;
1315
1316 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1317 if (!ops)
1318 return -ENOMEM;
1319
1320 ops[0].watch.ver = 0;
1321 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1322 ops[0].watch.flag = 0;
1323
1324 ret = rbd_req_sync_op(rbd_dev, NULL,
1325 CEPH_NOSNAP,
1326 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1327 ops,
1328 rbd_dev->header_name,
1329 0, 0, NULL, NULL, NULL);
1330
1331
1332 rbd_destroy_ops(ops);
1333 ceph_osdc_cancel_event(rbd_dev->watch_event);
1334 rbd_dev->watch_event = NULL;
1335 return ret;
1336 }
1337
1338 struct rbd_notify_info {
1339 struct rbd_device *rbd_dev;
1340 };
1341
1342 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1343 {
1344 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1345 if (!rbd_dev)
1346 return;
1347
1348 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1349 rbd_dev->header_name, (unsigned long long) notify_id,
1350 (unsigned int) opcode);
1351 }
1352
1353 /*
1354 * Request sync osd notify
1355 */
1356 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1357 {
1358 struct ceph_osd_req_op *ops;
1359 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1360 struct ceph_osd_event *event;
1361 struct rbd_notify_info info;
1362 int payload_len = sizeof(u32) + sizeof(u32);
1363 int ret;
1364
1365 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1366 if (!ops)
1367 return -ENOMEM;
1368
1369 info.rbd_dev = rbd_dev;
1370
1371 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1372 (void *)&info, &event);
1373 if (ret < 0)
1374 goto fail;
1375
1376 ops[0].watch.ver = 1;
1377 ops[0].watch.flag = 1;
1378 ops[0].watch.cookie = event->cookie;
1379 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1380 ops[0].watch.timeout = 12;
1381
1382 ret = rbd_req_sync_op(rbd_dev, NULL,
1383 CEPH_NOSNAP,
1384 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1385 ops,
1386 rbd_dev->header_name,
1387 0, 0, NULL, NULL, NULL);
1388 if (ret < 0)
1389 goto fail_event;
1390
1391 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1392 dout("ceph_osdc_wait_event returned %d\n", ret);
1393 rbd_destroy_ops(ops);
1394 return 0;
1395
1396 fail_event:
1397 ceph_osdc_cancel_event(event);
1398 fail:
1399 rbd_destroy_ops(ops);
1400 return ret;
1401 }
1402
1403 /*
1404 * Request sync osd read
1405 */
1406 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1407 const char *object_name,
1408 const char *class_name,
1409 const char *method_name,
1410 const char *data,
1411 int len,
1412 u64 *ver)
1413 {
1414 struct ceph_osd_req_op *ops;
1415 int class_name_len = strlen(class_name);
1416 int method_name_len = strlen(method_name);
1417 int ret;
1418
1419 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1420 class_name_len + method_name_len + len);
1421 if (!ops)
1422 return -ENOMEM;
1423
1424 ops[0].cls.class_name = class_name;
1425 ops[0].cls.class_len = (__u8) class_name_len;
1426 ops[0].cls.method_name = method_name;
1427 ops[0].cls.method_len = (__u8) method_name_len;
1428 ops[0].cls.argc = 0;
1429 ops[0].cls.indata = data;
1430 ops[0].cls.indata_len = len;
1431
1432 ret = rbd_req_sync_op(rbd_dev, NULL,
1433 CEPH_NOSNAP,
1434 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1435 ops,
1436 object_name, 0, 0, NULL, NULL, ver);
1437
1438 rbd_destroy_ops(ops);
1439
1440 dout("cls_exec returned %d\n", ret);
1441 return ret;
1442 }
1443
1444 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1445 {
1446 struct rbd_req_coll *coll =
1447 kzalloc(sizeof(struct rbd_req_coll) +
1448 sizeof(struct rbd_req_status) * num_reqs,
1449 GFP_ATOMIC);
1450
1451 if (!coll)
1452 return NULL;
1453 coll->total = num_reqs;
1454 kref_init(&coll->kref);
1455 return coll;
1456 }
1457
1458 /*
1459 * block device queue callback
1460 */
1461 static void rbd_rq_fn(struct request_queue *q)
1462 {
1463 struct rbd_device *rbd_dev = q->queuedata;
1464 struct request *rq;
1465 struct bio_pair *bp = NULL;
1466
1467 while ((rq = blk_fetch_request(q))) {
1468 struct bio *bio;
1469 struct bio *rq_bio, *next_bio = NULL;
1470 bool do_write;
1471 unsigned int size;
1472 u64 op_size = 0;
1473 u64 ofs;
1474 int num_segs, cur_seg = 0;
1475 struct rbd_req_coll *coll;
1476 struct ceph_snap_context *snapc;
1477
1478 /* peek at request from block layer */
1479 if (!rq)
1480 break;
1481
1482 dout("fetched request\n");
1483
1484 /* filter out block requests we don't understand */
1485 if ((rq->cmd_type != REQ_TYPE_FS)) {
1486 __blk_end_request_all(rq, 0);
1487 continue;
1488 }
1489
1490 /* deduce our operation (read, write) */
1491 do_write = (rq_data_dir(rq) == WRITE);
1492
1493 size = blk_rq_bytes(rq);
1494 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1495 rq_bio = rq->bio;
1496 if (do_write && rbd_dev->read_only) {
1497 __blk_end_request_all(rq, -EROFS);
1498 continue;
1499 }
1500
1501 spin_unlock_irq(q->queue_lock);
1502
1503 down_read(&rbd_dev->header_rwsem);
1504
1505 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1506 up_read(&rbd_dev->header_rwsem);
1507 dout("request for non-existent snapshot");
1508 spin_lock_irq(q->queue_lock);
1509 __blk_end_request_all(rq, -ENXIO);
1510 continue;
1511 }
1512
1513 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1514
1515 up_read(&rbd_dev->header_rwsem);
1516
1517 dout("%s 0x%x bytes at 0x%llx\n",
1518 do_write ? "write" : "read",
1519 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1520
1521 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1522 coll = rbd_alloc_coll(num_segs);
1523 if (!coll) {
1524 spin_lock_irq(q->queue_lock);
1525 __blk_end_request_all(rq, -ENOMEM);
1526 ceph_put_snap_context(snapc);
1527 continue;
1528 }
1529
1530 do {
1531 /* a bio clone to be passed down to OSD req */
1532 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1533 op_size = rbd_get_segment(&rbd_dev->header,
1534 rbd_dev->header.object_prefix,
1535 ofs, size,
1536 NULL, NULL);
1537 kref_get(&coll->kref);
1538 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1539 op_size, GFP_ATOMIC);
1540 if (!bio) {
1541 rbd_coll_end_req_index(rq, coll, cur_seg,
1542 -ENOMEM, op_size);
1543 goto next_seg;
1544 }
1545
1546
1547 /* init OSD command: write or read */
1548 if (do_write)
1549 rbd_req_write(rq, rbd_dev,
1550 snapc,
1551 ofs,
1552 op_size, bio,
1553 coll, cur_seg);
1554 else
1555 rbd_req_read(rq, rbd_dev,
1556 rbd_dev->snap_id,
1557 ofs,
1558 op_size, bio,
1559 coll, cur_seg);
1560
1561 next_seg:
1562 size -= op_size;
1563 ofs += op_size;
1564
1565 cur_seg++;
1566 rq_bio = next_bio;
1567 } while (size > 0);
1568 kref_put(&coll->kref, rbd_coll_release);
1569
1570 if (bp)
1571 bio_pair_release(bp);
1572 spin_lock_irq(q->queue_lock);
1573
1574 ceph_put_snap_context(snapc);
1575 }
1576 }
1577
1578 /*
1579 * a queue callback. Makes sure that we don't create a bio that spans across
1580 * multiple osd objects. One exception would be with a single page bios,
1581 * which we handle later at bio_chain_clone
1582 */
1583 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1584 struct bio_vec *bvec)
1585 {
1586 struct rbd_device *rbd_dev = q->queuedata;
1587 unsigned int chunk_sectors;
1588 sector_t sector;
1589 unsigned int bio_sectors;
1590 int max;
1591
1592 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1593 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1594 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1595
1596 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1597 + bio_sectors)) << SECTOR_SHIFT;
1598 if (max < 0)
1599 max = 0; /* bio_add cannot handle a negative return */
1600 if (max <= bvec->bv_len && bio_sectors == 0)
1601 return bvec->bv_len;
1602 return max;
1603 }
1604
1605 static void rbd_free_disk(struct rbd_device *rbd_dev)
1606 {
1607 struct gendisk *disk = rbd_dev->disk;
1608
1609 if (!disk)
1610 return;
1611
1612 rbd_header_free(&rbd_dev->header);
1613
1614 if (disk->flags & GENHD_FL_UP)
1615 del_gendisk(disk);
1616 if (disk->queue)
1617 blk_cleanup_queue(disk->queue);
1618 put_disk(disk);
1619 }
1620
1621 /*
1622 * Read the complete header for the given rbd device.
1623 *
1624 * Returns a pointer to a dynamically-allocated buffer containing
1625 * the complete and validated header. Caller can pass the address
1626 * of a variable that will be filled in with the version of the
1627 * header object at the time it was read.
1628 *
1629 * Returns a pointer-coded errno if a failure occurs.
1630 */
1631 static struct rbd_image_header_ondisk *
1632 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1633 {
1634 struct rbd_image_header_ondisk *ondisk = NULL;
1635 u32 snap_count = 0;
1636 u64 names_size = 0;
1637 u32 want_count;
1638 int ret;
1639
1640 /*
1641 * The complete header will include an array of its 64-bit
1642 * snapshot ids, followed by the names of those snapshots as
1643 * a contiguous block of NUL-terminated strings. Note that
1644 * the number of snapshots could change by the time we read
1645 * it in, in which case we re-read it.
1646 */
1647 do {
1648 size_t size;
1649
1650 kfree(ondisk);
1651
1652 size = sizeof (*ondisk);
1653 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1654 size += names_size;
1655 ondisk = kmalloc(size, GFP_KERNEL);
1656 if (!ondisk)
1657 return ERR_PTR(-ENOMEM);
1658
1659 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1660 rbd_dev->header_name,
1661 0, size,
1662 (char *) ondisk, version);
1663
1664 if (ret < 0)
1665 goto out_err;
1666 if (WARN_ON((size_t) ret < size)) {
1667 ret = -ENXIO;
1668 pr_warning("short header read for image %s"
1669 " (want %zd got %d)\n",
1670 rbd_dev->image_name, size, ret);
1671 goto out_err;
1672 }
1673 if (!rbd_dev_ondisk_valid(ondisk)) {
1674 ret = -ENXIO;
1675 pr_warning("invalid header for image %s\n",
1676 rbd_dev->image_name);
1677 goto out_err;
1678 }
1679
1680 names_size = le64_to_cpu(ondisk->snap_names_len);
1681 want_count = snap_count;
1682 snap_count = le32_to_cpu(ondisk->snap_count);
1683 } while (snap_count != want_count);
1684
1685 return ondisk;
1686
1687 out_err:
1688 kfree(ondisk);
1689
1690 return ERR_PTR(ret);
1691 }
1692
1693 /*
1694 * reload the ondisk the header
1695 */
1696 static int rbd_read_header(struct rbd_device *rbd_dev,
1697 struct rbd_image_header *header)
1698 {
1699 struct rbd_image_header_ondisk *ondisk;
1700 u64 ver = 0;
1701 int ret;
1702
1703 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1704 if (IS_ERR(ondisk))
1705 return PTR_ERR(ondisk);
1706 ret = rbd_header_from_disk(header, ondisk);
1707 if (ret >= 0)
1708 header->obj_version = ver;
1709 kfree(ondisk);
1710
1711 return ret;
1712 }
1713
1714 /*
1715 * create a snapshot
1716 */
1717 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1718 const char *snap_name,
1719 gfp_t gfp_flags)
1720 {
1721 int name_len = strlen(snap_name);
1722 u64 new_snapid;
1723 int ret;
1724 void *data, *p, *e;
1725 struct ceph_mon_client *monc;
1726
1727 /* we should create a snapshot only if we're pointing at the head */
1728 if (rbd_dev->snap_id != CEPH_NOSNAP)
1729 return -EINVAL;
1730
1731 monc = &rbd_dev->rbd_client->client->monc;
1732 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1733 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1734 if (ret < 0)
1735 return ret;
1736
1737 data = kmalloc(name_len + 16, gfp_flags);
1738 if (!data)
1739 return -ENOMEM;
1740
1741 p = data;
1742 e = data + name_len + 16;
1743
1744 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1745 ceph_encode_64_safe(&p, e, new_snapid, bad);
1746
1747 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1748 "rbd", "snap_add",
1749 data, p - data, NULL);
1750
1751 kfree(data);
1752
1753 return ret < 0 ? ret : 0;
1754 bad:
1755 return -ERANGE;
1756 }
1757
1758 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1759 {
1760 struct rbd_snap *snap;
1761 struct rbd_snap *next;
1762
1763 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1764 __rbd_remove_snap_dev(snap);
1765 }
1766
1767 /*
1768 * only read the first part of the ondisk header, without the snaps info
1769 */
1770 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1771 {
1772 int ret;
1773 struct rbd_image_header h;
1774
1775 ret = rbd_read_header(rbd_dev, &h);
1776 if (ret < 0)
1777 return ret;
1778
1779 down_write(&rbd_dev->header_rwsem);
1780
1781 /* resized? */
1782 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1783 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1784
1785 dout("setting size to %llu sectors", (unsigned long long) size);
1786 set_capacity(rbd_dev->disk, size);
1787 }
1788
1789 /* rbd_dev->header.object_prefix shouldn't change */
1790 kfree(rbd_dev->header.snap_sizes);
1791 kfree(rbd_dev->header.snap_names);
1792 /* osd requests may still refer to snapc */
1793 ceph_put_snap_context(rbd_dev->header.snapc);
1794
1795 if (hver)
1796 *hver = h.obj_version;
1797 rbd_dev->header.obj_version = h.obj_version;
1798 rbd_dev->header.image_size = h.image_size;
1799 rbd_dev->header.total_snaps = h.total_snaps;
1800 rbd_dev->header.snapc = h.snapc;
1801 rbd_dev->header.snap_names = h.snap_names;
1802 rbd_dev->header.snap_sizes = h.snap_sizes;
1803 /* Free the extra copy of the object prefix */
1804 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1805 kfree(h.object_prefix);
1806
1807 ret = __rbd_init_snaps_header(rbd_dev);
1808
1809 up_write(&rbd_dev->header_rwsem);
1810
1811 return ret;
1812 }
1813
1814 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1815 {
1816 int ret;
1817
1818 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1819 ret = __rbd_refresh_header(rbd_dev, hver);
1820 mutex_unlock(&ctl_mutex);
1821
1822 return ret;
1823 }
1824
1825 static int rbd_init_disk(struct rbd_device *rbd_dev)
1826 {
1827 struct gendisk *disk;
1828 struct request_queue *q;
1829 int rc;
1830 u64 segment_size;
1831 u64 total_size = 0;
1832
1833 /* contact OSD, request size info about the object being mapped */
1834 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1835 if (rc)
1836 return rc;
1837
1838 /* no need to lock here, as rbd_dev is not registered yet */
1839 rc = __rbd_init_snaps_header(rbd_dev);
1840 if (rc)
1841 return rc;
1842
1843 rc = rbd_header_set_snap(rbd_dev, &total_size);
1844 if (rc)
1845 return rc;
1846
1847 /* create gendisk info */
1848 rc = -ENOMEM;
1849 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1850 if (!disk)
1851 goto out;
1852
1853 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1854 rbd_dev->dev_id);
1855 disk->major = rbd_dev->major;
1856 disk->first_minor = 0;
1857 disk->fops = &rbd_bd_ops;
1858 disk->private_data = rbd_dev;
1859
1860 /* init rq */
1861 rc = -ENOMEM;
1862 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1863 if (!q)
1864 goto out_disk;
1865
1866 /* We use the default size, but let's be explicit about it. */
1867 blk_queue_physical_block_size(q, SECTOR_SIZE);
1868
1869 /* set io sizes to object size */
1870 segment_size = rbd_obj_bytes(&rbd_dev->header);
1871 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1872 blk_queue_max_segment_size(q, segment_size);
1873 blk_queue_io_min(q, segment_size);
1874 blk_queue_io_opt(q, segment_size);
1875
1876 blk_queue_merge_bvec(q, rbd_merge_bvec);
1877 disk->queue = q;
1878
1879 q->queuedata = rbd_dev;
1880
1881 rbd_dev->disk = disk;
1882 rbd_dev->q = q;
1883
1884 /* finally, announce the disk to the world */
1885 set_capacity(disk, total_size / SECTOR_SIZE);
1886 add_disk(disk);
1887
1888 pr_info("%s: added with size 0x%llx\n",
1889 disk->disk_name, (unsigned long long)total_size);
1890 return 0;
1891
1892 out_disk:
1893 put_disk(disk);
1894 out:
1895 return rc;
1896 }
1897
1898 /*
1899 sysfs
1900 */
1901
1902 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1903 {
1904 return container_of(dev, struct rbd_device, dev);
1905 }
1906
1907 static ssize_t rbd_size_show(struct device *dev,
1908 struct device_attribute *attr, char *buf)
1909 {
1910 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1911 sector_t size;
1912
1913 down_read(&rbd_dev->header_rwsem);
1914 size = get_capacity(rbd_dev->disk);
1915 up_read(&rbd_dev->header_rwsem);
1916
1917 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1918 }
1919
1920 static ssize_t rbd_major_show(struct device *dev,
1921 struct device_attribute *attr, char *buf)
1922 {
1923 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1924
1925 return sprintf(buf, "%d\n", rbd_dev->major);
1926 }
1927
1928 static ssize_t rbd_client_id_show(struct device *dev,
1929 struct device_attribute *attr, char *buf)
1930 {
1931 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1932
1933 return sprintf(buf, "client%lld\n",
1934 ceph_client_id(rbd_dev->rbd_client->client));
1935 }
1936
1937 static ssize_t rbd_pool_show(struct device *dev,
1938 struct device_attribute *attr, char *buf)
1939 {
1940 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1941
1942 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1943 }
1944
1945 static ssize_t rbd_pool_id_show(struct device *dev,
1946 struct device_attribute *attr, char *buf)
1947 {
1948 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1949
1950 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1951 }
1952
1953 static ssize_t rbd_name_show(struct device *dev,
1954 struct device_attribute *attr, char *buf)
1955 {
1956 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1957
1958 return sprintf(buf, "%s\n", rbd_dev->image_name);
1959 }
1960
1961 static ssize_t rbd_snap_show(struct device *dev,
1962 struct device_attribute *attr,
1963 char *buf)
1964 {
1965 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1966
1967 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1968 }
1969
1970 static ssize_t rbd_image_refresh(struct device *dev,
1971 struct device_attribute *attr,
1972 const char *buf,
1973 size_t size)
1974 {
1975 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1976 int ret;
1977
1978 ret = rbd_refresh_header(rbd_dev, NULL);
1979
1980 return ret < 0 ? ret : size;
1981 }
1982
1983 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1984 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1985 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1986 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1987 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1988 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1989 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1990 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1991 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1992
1993 static struct attribute *rbd_attrs[] = {
1994 &dev_attr_size.attr,
1995 &dev_attr_major.attr,
1996 &dev_attr_client_id.attr,
1997 &dev_attr_pool.attr,
1998 &dev_attr_pool_id.attr,
1999 &dev_attr_name.attr,
2000 &dev_attr_current_snap.attr,
2001 &dev_attr_refresh.attr,
2002 &dev_attr_create_snap.attr,
2003 NULL
2004 };
2005
2006 static struct attribute_group rbd_attr_group = {
2007 .attrs = rbd_attrs,
2008 };
2009
2010 static const struct attribute_group *rbd_attr_groups[] = {
2011 &rbd_attr_group,
2012 NULL
2013 };
2014
2015 static void rbd_sysfs_dev_release(struct device *dev)
2016 {
2017 }
2018
2019 static struct device_type rbd_device_type = {
2020 .name = "rbd",
2021 .groups = rbd_attr_groups,
2022 .release = rbd_sysfs_dev_release,
2023 };
2024
2025
2026 /*
2027 sysfs - snapshots
2028 */
2029
2030 static ssize_t rbd_snap_size_show(struct device *dev,
2031 struct device_attribute *attr,
2032 char *buf)
2033 {
2034 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2035
2036 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2037 }
2038
2039 static ssize_t rbd_snap_id_show(struct device *dev,
2040 struct device_attribute *attr,
2041 char *buf)
2042 {
2043 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2044
2045 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2046 }
2047
2048 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2049 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2050
2051 static struct attribute *rbd_snap_attrs[] = {
2052 &dev_attr_snap_size.attr,
2053 &dev_attr_snap_id.attr,
2054 NULL,
2055 };
2056
2057 static struct attribute_group rbd_snap_attr_group = {
2058 .attrs = rbd_snap_attrs,
2059 };
2060
2061 static void rbd_snap_dev_release(struct device *dev)
2062 {
2063 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2064 kfree(snap->name);
2065 kfree(snap);
2066 }
2067
2068 static const struct attribute_group *rbd_snap_attr_groups[] = {
2069 &rbd_snap_attr_group,
2070 NULL
2071 };
2072
2073 static struct device_type rbd_snap_device_type = {
2074 .groups = rbd_snap_attr_groups,
2075 .release = rbd_snap_dev_release,
2076 };
2077
2078 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2079 {
2080 list_del(&snap->node);
2081 device_unregister(&snap->dev);
2082 }
2083
2084 static int rbd_register_snap_dev(struct rbd_snap *snap,
2085 struct device *parent)
2086 {
2087 struct device *dev = &snap->dev;
2088 int ret;
2089
2090 dev->type = &rbd_snap_device_type;
2091 dev->parent = parent;
2092 dev->release = rbd_snap_dev_release;
2093 dev_set_name(dev, "snap_%s", snap->name);
2094 ret = device_register(dev);
2095
2096 return ret;
2097 }
2098
2099 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2100 int i, const char *name)
2101 {
2102 struct rbd_snap *snap;
2103 int ret;
2104
2105 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2106 if (!snap)
2107 return ERR_PTR(-ENOMEM);
2108
2109 ret = -ENOMEM;
2110 snap->name = kstrdup(name, GFP_KERNEL);
2111 if (!snap->name)
2112 goto err;
2113
2114 snap->size = rbd_dev->header.snap_sizes[i];
2115 snap->id = rbd_dev->header.snapc->snaps[i];
2116 if (device_is_registered(&rbd_dev->dev)) {
2117 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2118 if (ret < 0)
2119 goto err;
2120 }
2121
2122 return snap;
2123
2124 err:
2125 kfree(snap->name);
2126 kfree(snap);
2127
2128 return ERR_PTR(ret);
2129 }
2130
2131 /*
2132 * Scan the rbd device's current snapshot list and compare it to the
2133 * newly-received snapshot context. Remove any existing snapshots
2134 * not present in the new snapshot context. Add a new snapshot for
2135 * any snaphots in the snapshot context not in the current list.
2136 * And verify there are no changes to snapshots we already know
2137 * about.
2138 *
2139 * Assumes the snapshots in the snapshot context are sorted by
2140 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2141 * are also maintained in that order.)
2142 */
2143 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2144 {
2145 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2146 const u32 snap_count = snapc->num_snaps;
2147 char *snap_name = rbd_dev->header.snap_names;
2148 struct list_head *head = &rbd_dev->snaps;
2149 struct list_head *links = head->next;
2150 u32 index = 0;
2151
2152 while (index < snap_count || links != head) {
2153 u64 snap_id;
2154 struct rbd_snap *snap;
2155
2156 snap_id = index < snap_count ? snapc->snaps[index]
2157 : CEPH_NOSNAP;
2158 snap = links != head ? list_entry(links, struct rbd_snap, node)
2159 : NULL;
2160 BUG_ON(snap && snap->id == CEPH_NOSNAP);
2161
2162 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2163 struct list_head *next = links->next;
2164
2165 /* Existing snapshot not in the new snap context */
2166
2167 if (rbd_dev->snap_id == snap->id)
2168 rbd_dev->snap_exists = false;
2169 __rbd_remove_snap_dev(snap);
2170
2171 /* Done with this list entry; advance */
2172
2173 links = next;
2174 continue;
2175 }
2176
2177 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2178 struct rbd_snap *new_snap;
2179
2180 /* We haven't seen this snapshot before */
2181
2182 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2183 snap_name);
2184 if (IS_ERR(new_snap))
2185 return PTR_ERR(new_snap);
2186
2187 /* New goes before existing, or at end of list */
2188
2189 if (snap)
2190 list_add_tail(&new_snap->node, &snap->node);
2191 else
2192 list_add_tail(&new_snap->node, head);
2193 } else {
2194 /* Already have this one */
2195
2196 BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2197 BUG_ON(strcmp(snap->name, snap_name));
2198
2199 /* Done with this list entry; advance */
2200
2201 links = links->next;
2202 }
2203
2204 /* Advance to the next entry in the snapshot context */
2205
2206 index++;
2207 snap_name += strlen(snap_name) + 1;
2208 }
2209
2210 return 0;
2211 }
2212
2213 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2214 {
2215 int ret;
2216 struct device *dev;
2217 struct rbd_snap *snap;
2218
2219 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2220 dev = &rbd_dev->dev;
2221
2222 dev->bus = &rbd_bus_type;
2223 dev->type = &rbd_device_type;
2224 dev->parent = &rbd_root_dev;
2225 dev->release = rbd_dev_release;
2226 dev_set_name(dev, "%d", rbd_dev->dev_id);
2227 ret = device_register(dev);
2228 if (ret < 0)
2229 goto out;
2230
2231 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2232 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2233 if (ret < 0)
2234 break;
2235 }
2236 out:
2237 mutex_unlock(&ctl_mutex);
2238 return ret;
2239 }
2240
2241 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2242 {
2243 device_unregister(&rbd_dev->dev);
2244 }
2245
2246 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2247 {
2248 int ret, rc;
2249
2250 do {
2251 ret = rbd_req_sync_watch(rbd_dev);
2252 if (ret == -ERANGE) {
2253 rc = rbd_refresh_header(rbd_dev, NULL);
2254 if (rc < 0)
2255 return rc;
2256 }
2257 } while (ret == -ERANGE);
2258
2259 return ret;
2260 }
2261
2262 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2263
2264 /*
2265 * Get a unique rbd identifier for the given new rbd_dev, and add
2266 * the rbd_dev to the global list. The minimum rbd id is 1.
2267 */
2268 static void rbd_id_get(struct rbd_device *rbd_dev)
2269 {
2270 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2271
2272 spin_lock(&rbd_dev_list_lock);
2273 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2274 spin_unlock(&rbd_dev_list_lock);
2275 }
2276
2277 /*
2278 * Remove an rbd_dev from the global list, and record that its
2279 * identifier is no longer in use.
2280 */
2281 static void rbd_id_put(struct rbd_device *rbd_dev)
2282 {
2283 struct list_head *tmp;
2284 int rbd_id = rbd_dev->dev_id;
2285 int max_id;
2286
2287 BUG_ON(rbd_id < 1);
2288
2289 spin_lock(&rbd_dev_list_lock);
2290 list_del_init(&rbd_dev->node);
2291
2292 /*
2293 * If the id being "put" is not the current maximum, there
2294 * is nothing special we need to do.
2295 */
2296 if (rbd_id != atomic64_read(&rbd_id_max)) {
2297 spin_unlock(&rbd_dev_list_lock);
2298 return;
2299 }
2300
2301 /*
2302 * We need to update the current maximum id. Search the
2303 * list to find out what it is. We're more likely to find
2304 * the maximum at the end, so search the list backward.
2305 */
2306 max_id = 0;
2307 list_for_each_prev(tmp, &rbd_dev_list) {
2308 struct rbd_device *rbd_dev;
2309
2310 rbd_dev = list_entry(tmp, struct rbd_device, node);
2311 if (rbd_id > max_id)
2312 max_id = rbd_id;
2313 }
2314 spin_unlock(&rbd_dev_list_lock);
2315
2316 /*
2317 * The max id could have been updated by rbd_id_get(), in
2318 * which case it now accurately reflects the new maximum.
2319 * Be careful not to overwrite the maximum value in that
2320 * case.
2321 */
2322 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2323 }
2324
2325 /*
2326 * Skips over white space at *buf, and updates *buf to point to the
2327 * first found non-space character (if any). Returns the length of
2328 * the token (string of non-white space characters) found. Note
2329 * that *buf must be terminated with '\0'.
2330 */
2331 static inline size_t next_token(const char **buf)
2332 {
2333 /*
2334 * These are the characters that produce nonzero for
2335 * isspace() in the "C" and "POSIX" locales.
2336 */
2337 const char *spaces = " \f\n\r\t\v";
2338
2339 *buf += strspn(*buf, spaces); /* Find start of token */
2340
2341 return strcspn(*buf, spaces); /* Return token length */
2342 }
2343
2344 /*
2345 * Finds the next token in *buf, and if the provided token buffer is
2346 * big enough, copies the found token into it. The result, if
2347 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2348 * must be terminated with '\0' on entry.
2349 *
2350 * Returns the length of the token found (not including the '\0').
2351 * Return value will be 0 if no token is found, and it will be >=
2352 * token_size if the token would not fit.
2353 *
2354 * The *buf pointer will be updated to point beyond the end of the
2355 * found token. Note that this occurs even if the token buffer is
2356 * too small to hold it.
2357 */
2358 static inline size_t copy_token(const char **buf,
2359 char *token,
2360 size_t token_size)
2361 {
2362 size_t len;
2363
2364 len = next_token(buf);
2365 if (len < token_size) {
2366 memcpy(token, *buf, len);
2367 *(token + len) = '\0';
2368 }
2369 *buf += len;
2370
2371 return len;
2372 }
2373
2374 /*
2375 * Finds the next token in *buf, dynamically allocates a buffer big
2376 * enough to hold a copy of it, and copies the token into the new
2377 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2378 * that a duplicate buffer is created even for a zero-length token.
2379 *
2380 * Returns a pointer to the newly-allocated duplicate, or a null
2381 * pointer if memory for the duplicate was not available. If
2382 * the lenp argument is a non-null pointer, the length of the token
2383 * (not including the '\0') is returned in *lenp.
2384 *
2385 * If successful, the *buf pointer will be updated to point beyond
2386 * the end of the found token.
2387 *
2388 * Note: uses GFP_KERNEL for allocation.
2389 */
2390 static inline char *dup_token(const char **buf, size_t *lenp)
2391 {
2392 char *dup;
2393 size_t len;
2394
2395 len = next_token(buf);
2396 dup = kmalloc(len + 1, GFP_KERNEL);
2397 if (!dup)
2398 return NULL;
2399
2400 memcpy(dup, *buf, len);
2401 *(dup + len) = '\0';
2402 *buf += len;
2403
2404 if (lenp)
2405 *lenp = len;
2406
2407 return dup;
2408 }
2409
2410 /*
2411 * This fills in the pool_name, image_name, image_name_len, snap_name,
2412 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2413 * on the list of monitor addresses and other options provided via
2414 * /sys/bus/rbd/add.
2415 *
2416 * Note: rbd_dev is assumed to have been initially zero-filled.
2417 */
2418 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2419 const char *buf,
2420 const char **mon_addrs,
2421 size_t *mon_addrs_size,
2422 char *options,
2423 size_t options_size)
2424 {
2425 size_t len;
2426 int ret;
2427
2428 /* The first four tokens are required */
2429
2430 len = next_token(&buf);
2431 if (!len)
2432 return -EINVAL;
2433 *mon_addrs_size = len + 1;
2434 *mon_addrs = buf;
2435
2436 buf += len;
2437
2438 len = copy_token(&buf, options, options_size);
2439 if (!len || len >= options_size)
2440 return -EINVAL;
2441
2442 ret = -ENOMEM;
2443 rbd_dev->pool_name = dup_token(&buf, NULL);
2444 if (!rbd_dev->pool_name)
2445 goto out_err;
2446
2447 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2448 if (!rbd_dev->image_name)
2449 goto out_err;
2450
2451 /* Create the name of the header object */
2452
2453 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2454 + sizeof (RBD_SUFFIX),
2455 GFP_KERNEL);
2456 if (!rbd_dev->header_name)
2457 goto out_err;
2458 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2459
2460 /*
2461 * The snapshot name is optional. If none is is supplied,
2462 * we use the default value.
2463 */
2464 rbd_dev->snap_name = dup_token(&buf, &len);
2465 if (!rbd_dev->snap_name)
2466 goto out_err;
2467 if (!len) {
2468 /* Replace the empty name with the default */
2469 kfree(rbd_dev->snap_name);
2470 rbd_dev->snap_name
2471 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2472 if (!rbd_dev->snap_name)
2473 goto out_err;
2474
2475 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2476 sizeof (RBD_SNAP_HEAD_NAME));
2477 }
2478
2479 return 0;
2480
2481 out_err:
2482 kfree(rbd_dev->header_name);
2483 rbd_dev->header_name = NULL;
2484 kfree(rbd_dev->image_name);
2485 rbd_dev->image_name = NULL;
2486 rbd_dev->image_name_len = 0;
2487 kfree(rbd_dev->pool_name);
2488 rbd_dev->pool_name = NULL;
2489
2490 return ret;
2491 }
2492
2493 static ssize_t rbd_add(struct bus_type *bus,
2494 const char *buf,
2495 size_t count)
2496 {
2497 char *options;
2498 struct rbd_device *rbd_dev = NULL;
2499 const char *mon_addrs = NULL;
2500 size_t mon_addrs_size = 0;
2501 struct ceph_osd_client *osdc;
2502 int rc = -ENOMEM;
2503
2504 if (!try_module_get(THIS_MODULE))
2505 return -ENODEV;
2506
2507 options = kmalloc(count, GFP_KERNEL);
2508 if (!options)
2509 goto err_nomem;
2510 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2511 if (!rbd_dev)
2512 goto err_nomem;
2513
2514 /* static rbd_device initialization */
2515 spin_lock_init(&rbd_dev->lock);
2516 INIT_LIST_HEAD(&rbd_dev->node);
2517 INIT_LIST_HEAD(&rbd_dev->snaps);
2518 init_rwsem(&rbd_dev->header_rwsem);
2519
2520 /* generate unique id: find highest unique id, add one */
2521 rbd_id_get(rbd_dev);
2522
2523 /* Fill in the device name, now that we have its id. */
2524 BUILD_BUG_ON(DEV_NAME_LEN
2525 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2526 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2527
2528 /* parse add command */
2529 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2530 options, count);
2531 if (rc)
2532 goto err_put_id;
2533
2534 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2535 if (rc < 0)
2536 goto err_put_id;
2537
2538 /* pick the pool */
2539 osdc = &rbd_dev->rbd_client->client->osdc;
2540 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2541 if (rc < 0)
2542 goto err_out_client;
2543 rbd_dev->pool_id = rc;
2544
2545 /* register our block device */
2546 rc = register_blkdev(0, rbd_dev->name);
2547 if (rc < 0)
2548 goto err_out_client;
2549 rbd_dev->major = rc;
2550
2551 rc = rbd_bus_add_dev(rbd_dev);
2552 if (rc)
2553 goto err_out_blkdev;
2554
2555 /*
2556 * At this point cleanup in the event of an error is the job
2557 * of the sysfs code (initiated by rbd_bus_del_dev()).
2558 *
2559 * Set up and announce blkdev mapping.
2560 */
2561 rc = rbd_init_disk(rbd_dev);
2562 if (rc)
2563 goto err_out_bus;
2564
2565 rc = rbd_init_watch_dev(rbd_dev);
2566 if (rc)
2567 goto err_out_bus;
2568
2569 return count;
2570
2571 err_out_bus:
2572 /* this will also clean up rest of rbd_dev stuff */
2573
2574 rbd_bus_del_dev(rbd_dev);
2575 kfree(options);
2576 return rc;
2577
2578 err_out_blkdev:
2579 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2580 err_out_client:
2581 rbd_put_client(rbd_dev);
2582 err_put_id:
2583 if (rbd_dev->pool_name) {
2584 kfree(rbd_dev->snap_name);
2585 kfree(rbd_dev->header_name);
2586 kfree(rbd_dev->image_name);
2587 kfree(rbd_dev->pool_name);
2588 }
2589 rbd_id_put(rbd_dev);
2590 err_nomem:
2591 kfree(rbd_dev);
2592 kfree(options);
2593
2594 dout("Error adding device %s\n", buf);
2595 module_put(THIS_MODULE);
2596
2597 return (ssize_t) rc;
2598 }
2599
2600 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2601 {
2602 struct list_head *tmp;
2603 struct rbd_device *rbd_dev;
2604
2605 spin_lock(&rbd_dev_list_lock);
2606 list_for_each(tmp, &rbd_dev_list) {
2607 rbd_dev = list_entry(tmp, struct rbd_device, node);
2608 if (rbd_dev->dev_id == dev_id) {
2609 spin_unlock(&rbd_dev_list_lock);
2610 return rbd_dev;
2611 }
2612 }
2613 spin_unlock(&rbd_dev_list_lock);
2614 return NULL;
2615 }
2616
2617 static void rbd_dev_release(struct device *dev)
2618 {
2619 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2620
2621 if (rbd_dev->watch_request) {
2622 struct ceph_client *client = rbd_dev->rbd_client->client;
2623
2624 ceph_osdc_unregister_linger_request(&client->osdc,
2625 rbd_dev->watch_request);
2626 }
2627 if (rbd_dev->watch_event)
2628 rbd_req_sync_unwatch(rbd_dev);
2629
2630 rbd_put_client(rbd_dev);
2631
2632 /* clean up and free blkdev */
2633 rbd_free_disk(rbd_dev);
2634 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2635
2636 /* done with the id, and with the rbd_dev */
2637 kfree(rbd_dev->snap_name);
2638 kfree(rbd_dev->header_name);
2639 kfree(rbd_dev->pool_name);
2640 kfree(rbd_dev->image_name);
2641 rbd_id_put(rbd_dev);
2642 kfree(rbd_dev);
2643
2644 /* release module ref */
2645 module_put(THIS_MODULE);
2646 }
2647
2648 static ssize_t rbd_remove(struct bus_type *bus,
2649 const char *buf,
2650 size_t count)
2651 {
2652 struct rbd_device *rbd_dev = NULL;
2653 int target_id, rc;
2654 unsigned long ul;
2655 int ret = count;
2656
2657 rc = strict_strtoul(buf, 10, &ul);
2658 if (rc)
2659 return rc;
2660
2661 /* convert to int; abort if we lost anything in the conversion */
2662 target_id = (int) ul;
2663 if (target_id != ul)
2664 return -EINVAL;
2665
2666 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2667
2668 rbd_dev = __rbd_get_dev(target_id);
2669 if (!rbd_dev) {
2670 ret = -ENOENT;
2671 goto done;
2672 }
2673
2674 __rbd_remove_all_snaps(rbd_dev);
2675 rbd_bus_del_dev(rbd_dev);
2676
2677 done:
2678 mutex_unlock(&ctl_mutex);
2679 return ret;
2680 }
2681
2682 static ssize_t rbd_snap_add(struct device *dev,
2683 struct device_attribute *attr,
2684 const char *buf,
2685 size_t count)
2686 {
2687 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2688 int ret;
2689 char *name = kmalloc(count + 1, GFP_KERNEL);
2690 if (!name)
2691 return -ENOMEM;
2692
2693 snprintf(name, count, "%s", buf);
2694
2695 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2696
2697 ret = rbd_header_add_snap(rbd_dev,
2698 name, GFP_KERNEL);
2699 if (ret < 0)
2700 goto err_unlock;
2701
2702 ret = __rbd_refresh_header(rbd_dev, NULL);
2703 if (ret < 0)
2704 goto err_unlock;
2705
2706 /* shouldn't hold ctl_mutex when notifying.. notify might
2707 trigger a watch callback that would need to get that mutex */
2708 mutex_unlock(&ctl_mutex);
2709
2710 /* make a best effort, don't error if failed */
2711 rbd_req_sync_notify(rbd_dev);
2712
2713 ret = count;
2714 kfree(name);
2715 return ret;
2716
2717 err_unlock:
2718 mutex_unlock(&ctl_mutex);
2719 kfree(name);
2720 return ret;
2721 }
2722
2723 /*
2724 * create control files in sysfs
2725 * /sys/bus/rbd/...
2726 */
2727 static int rbd_sysfs_init(void)
2728 {
2729 int ret;
2730
2731 ret = device_register(&rbd_root_dev);
2732 if (ret < 0)
2733 return ret;
2734
2735 ret = bus_register(&rbd_bus_type);
2736 if (ret < 0)
2737 device_unregister(&rbd_root_dev);
2738
2739 return ret;
2740 }
2741
2742 static void rbd_sysfs_cleanup(void)
2743 {
2744 bus_unregister(&rbd_bus_type);
2745 device_unregister(&rbd_root_dev);
2746 }
2747
2748 int __init rbd_init(void)
2749 {
2750 int rc;
2751
2752 rc = rbd_sysfs_init();
2753 if (rc)
2754 return rc;
2755 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2756 return 0;
2757 }
2758
2759 void __exit rbd_exit(void)
2760 {
2761 rbd_sysfs_cleanup();
2762 }
2763
2764 module_init(rbd_init);
2765 module_exit(rbd_exit);
2766
2767 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2768 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2769 MODULE_DESCRIPTION("rados block device");
2770
2771 /* following authorship retained from original osdblk.c */
2772 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2773
2774 MODULE_LICENSE("GPL");