]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blob - drivers/block/rbd.c
rbd: do not allow remove of mounted-on image
[mirror_ubuntu-bionic-kernel.git] / drivers / block / rbd.c
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
45
46 /*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63
64 #define RBD_SNAP_DEV_NAME_PREFIX "snap_"
65 #define RBD_MAX_SNAP_NAME_LEN \
66 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
67
68 #define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
69 #define RBD_MAX_OPT_LEN 1024
70
71 #define RBD_SNAP_HEAD_NAME "-"
72
73 /* This allows a single page to hold an image name sent by OSD */
74 #define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
75 #define RBD_IMAGE_ID_LEN_MAX 64
76
77 #define RBD_OBJ_PREFIX_LEN_MAX 64
78
79 /* Feature bits */
80
81 #define RBD_FEATURE_LAYERING 1
82
83 /* Features supported by this (client software) implementation. */
84
85 #define RBD_FEATURES_ALL (0)
86
87 /*
88 * An RBD device name will be "rbd#", where the "rbd" comes from
89 * RBD_DRV_NAME above, and # is a unique integer identifier.
90 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
91 * enough to hold all possible device names.
92 */
93 #define DEV_NAME_LEN 32
94 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
95
96 #define RBD_READ_ONLY_DEFAULT false
97
98 /*
99 * block device image metadata (in-memory version)
100 */
101 struct rbd_image_header {
102 /* These four fields never change for a given rbd image */
103 char *object_prefix;
104 u64 features;
105 __u8 obj_order;
106 __u8 crypt_type;
107 __u8 comp_type;
108
109 /* The remaining fields need to be updated occasionally */
110 u64 image_size;
111 struct ceph_snap_context *snapc;
112 char *snap_names;
113 u64 *snap_sizes;
114
115 u64 obj_version;
116 };
117
118 /*
119 * An rbd image specification.
120 *
121 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
122 * identify an image.
123 */
124 struct rbd_spec {
125 u64 pool_id;
126 char *pool_name;
127
128 char *image_id;
129 size_t image_id_len;
130 char *image_name;
131 size_t image_name_len;
132
133 u64 snap_id;
134 char *snap_name;
135
136 struct kref kref;
137 };
138
139 struct rbd_options {
140 bool read_only;
141 };
142
143 /*
144 * an instance of the client. multiple devices may share an rbd client.
145 */
146 struct rbd_client {
147 struct ceph_client *client;
148 struct kref kref;
149 struct list_head node;
150 };
151
152 /*
153 * a request completion status
154 */
155 struct rbd_req_status {
156 int done;
157 int rc;
158 u64 bytes;
159 };
160
161 /*
162 * a collection of requests
163 */
164 struct rbd_req_coll {
165 int total;
166 int num_done;
167 struct kref kref;
168 struct rbd_req_status status[0];
169 };
170
171 /*
172 * a single io request
173 */
174 struct rbd_request {
175 struct request *rq; /* blk layer request */
176 struct bio *bio; /* cloned bio */
177 struct page **pages; /* list of used pages */
178 u64 len;
179 int coll_index;
180 struct rbd_req_coll *coll;
181 };
182
183 struct rbd_snap {
184 struct device dev;
185 const char *name;
186 u64 size;
187 struct list_head node;
188 u64 id;
189 u64 features;
190 };
191
192 struct rbd_mapping {
193 u64 size;
194 u64 features;
195 bool read_only;
196 };
197
198 /*
199 * a single device
200 */
201 struct rbd_device {
202 int dev_id; /* blkdev unique id */
203
204 int major; /* blkdev assigned major */
205 struct gendisk *disk; /* blkdev's gendisk and rq */
206
207 u32 image_format; /* Either 1 or 2 */
208 struct rbd_client *rbd_client;
209
210 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
211
212 spinlock_t lock; /* queue lock */
213
214 struct rbd_image_header header;
215 bool exists;
216 struct rbd_spec *spec;
217
218 char *header_name;
219
220 struct ceph_osd_event *watch_event;
221 struct ceph_osd_request *watch_request;
222
223 struct rbd_spec *parent_spec;
224 u64 parent_overlap;
225
226 /* protects updating the header */
227 struct rw_semaphore header_rwsem;
228
229 struct rbd_mapping mapping;
230
231 struct list_head node;
232
233 /* list of snapshots */
234 struct list_head snaps;
235
236 /* sysfs related */
237 struct device dev;
238 unsigned long open_count;
239 };
240
241 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
242
243 static LIST_HEAD(rbd_dev_list); /* devices */
244 static DEFINE_SPINLOCK(rbd_dev_list_lock);
245
246 static LIST_HEAD(rbd_client_list); /* clients */
247 static DEFINE_SPINLOCK(rbd_client_list_lock);
248
249 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
250 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
251
252 static void rbd_dev_release(struct device *dev);
253 static void rbd_remove_snap_dev(struct rbd_snap *snap);
254
255 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
256 size_t count);
257 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
258 size_t count);
259
260 static struct bus_attribute rbd_bus_attrs[] = {
261 __ATTR(add, S_IWUSR, NULL, rbd_add),
262 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
263 __ATTR_NULL
264 };
265
266 static struct bus_type rbd_bus_type = {
267 .name = "rbd",
268 .bus_attrs = rbd_bus_attrs,
269 };
270
271 static void rbd_root_dev_release(struct device *dev)
272 {
273 }
274
275 static struct device rbd_root_dev = {
276 .init_name = "rbd",
277 .release = rbd_root_dev_release,
278 };
279
280 #ifdef RBD_DEBUG
281 #define rbd_assert(expr) \
282 if (unlikely(!(expr))) { \
283 printk(KERN_ERR "\nAssertion failure in %s() " \
284 "at line %d:\n\n" \
285 "\trbd_assert(%s);\n\n", \
286 __func__, __LINE__, #expr); \
287 BUG(); \
288 }
289 #else /* !RBD_DEBUG */
290 # define rbd_assert(expr) ((void) 0)
291 #endif /* !RBD_DEBUG */
292
293 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
294 {
295 return get_device(&rbd_dev->dev);
296 }
297
298 static void rbd_put_dev(struct rbd_device *rbd_dev)
299 {
300 put_device(&rbd_dev->dev);
301 }
302
303 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
304 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
305
306 static int rbd_open(struct block_device *bdev, fmode_t mode)
307 {
308 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
309
310 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
311 return -EROFS;
312
313 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
314 rbd_get_dev(rbd_dev);
315 set_device_ro(bdev, rbd_dev->mapping.read_only);
316 rbd_dev->open_count++;
317 mutex_unlock(&ctl_mutex);
318
319 return 0;
320 }
321
322 static int rbd_release(struct gendisk *disk, fmode_t mode)
323 {
324 struct rbd_device *rbd_dev = disk->private_data;
325
326 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
327 rbd_assert(rbd_dev->open_count > 0);
328 rbd_dev->open_count--;
329 rbd_put_dev(rbd_dev);
330 mutex_unlock(&ctl_mutex);
331
332 return 0;
333 }
334
335 static const struct block_device_operations rbd_bd_ops = {
336 .owner = THIS_MODULE,
337 .open = rbd_open,
338 .release = rbd_release,
339 };
340
341 /*
342 * Initialize an rbd client instance.
343 * We own *ceph_opts.
344 */
345 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
346 {
347 struct rbd_client *rbdc;
348 int ret = -ENOMEM;
349
350 dout("rbd_client_create\n");
351 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
352 if (!rbdc)
353 goto out_opt;
354
355 kref_init(&rbdc->kref);
356 INIT_LIST_HEAD(&rbdc->node);
357
358 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
359
360 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
361 if (IS_ERR(rbdc->client))
362 goto out_mutex;
363 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
364
365 ret = ceph_open_session(rbdc->client);
366 if (ret < 0)
367 goto out_err;
368
369 spin_lock(&rbd_client_list_lock);
370 list_add_tail(&rbdc->node, &rbd_client_list);
371 spin_unlock(&rbd_client_list_lock);
372
373 mutex_unlock(&ctl_mutex);
374
375 dout("rbd_client_create created %p\n", rbdc);
376 return rbdc;
377
378 out_err:
379 ceph_destroy_client(rbdc->client);
380 out_mutex:
381 mutex_unlock(&ctl_mutex);
382 kfree(rbdc);
383 out_opt:
384 if (ceph_opts)
385 ceph_destroy_options(ceph_opts);
386 return ERR_PTR(ret);
387 }
388
389 /*
390 * Find a ceph client with specific addr and configuration. If
391 * found, bump its reference count.
392 */
393 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
394 {
395 struct rbd_client *client_node;
396 bool found = false;
397
398 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
399 return NULL;
400
401 spin_lock(&rbd_client_list_lock);
402 list_for_each_entry(client_node, &rbd_client_list, node) {
403 if (!ceph_compare_options(ceph_opts, client_node->client)) {
404 kref_get(&client_node->kref);
405 found = true;
406 break;
407 }
408 }
409 spin_unlock(&rbd_client_list_lock);
410
411 return found ? client_node : NULL;
412 }
413
414 /*
415 * mount options
416 */
417 enum {
418 Opt_last_int,
419 /* int args above */
420 Opt_last_string,
421 /* string args above */
422 Opt_read_only,
423 Opt_read_write,
424 /* Boolean args above */
425 Opt_last_bool,
426 };
427
428 static match_table_t rbd_opts_tokens = {
429 /* int args above */
430 /* string args above */
431 {Opt_read_only, "read_only"},
432 {Opt_read_only, "ro"}, /* Alternate spelling */
433 {Opt_read_write, "read_write"},
434 {Opt_read_write, "rw"}, /* Alternate spelling */
435 /* Boolean args above */
436 {-1, NULL}
437 };
438
439 static int parse_rbd_opts_token(char *c, void *private)
440 {
441 struct rbd_options *rbd_opts = private;
442 substring_t argstr[MAX_OPT_ARGS];
443 int token, intval, ret;
444
445 token = match_token(c, rbd_opts_tokens, argstr);
446 if (token < 0)
447 return -EINVAL;
448
449 if (token < Opt_last_int) {
450 ret = match_int(&argstr[0], &intval);
451 if (ret < 0) {
452 pr_err("bad mount option arg (not int) "
453 "at '%s'\n", c);
454 return ret;
455 }
456 dout("got int token %d val %d\n", token, intval);
457 } else if (token > Opt_last_int && token < Opt_last_string) {
458 dout("got string token %d val %s\n", token,
459 argstr[0].from);
460 } else if (token > Opt_last_string && token < Opt_last_bool) {
461 dout("got Boolean token %d\n", token);
462 } else {
463 dout("got token %d\n", token);
464 }
465
466 switch (token) {
467 case Opt_read_only:
468 rbd_opts->read_only = true;
469 break;
470 case Opt_read_write:
471 rbd_opts->read_only = false;
472 break;
473 default:
474 rbd_assert(false);
475 break;
476 }
477 return 0;
478 }
479
480 /*
481 * Get a ceph client with specific addr and configuration, if one does
482 * not exist create it.
483 */
484 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
485 {
486 struct rbd_client *rbdc;
487
488 rbdc = rbd_client_find(ceph_opts);
489 if (rbdc) /* using an existing client */
490 ceph_destroy_options(ceph_opts);
491 else
492 rbdc = rbd_client_create(ceph_opts);
493
494 return rbdc;
495 }
496
497 /*
498 * Destroy ceph client
499 *
500 * Caller must hold rbd_client_list_lock.
501 */
502 static void rbd_client_release(struct kref *kref)
503 {
504 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
505
506 dout("rbd_release_client %p\n", rbdc);
507 spin_lock(&rbd_client_list_lock);
508 list_del(&rbdc->node);
509 spin_unlock(&rbd_client_list_lock);
510
511 ceph_destroy_client(rbdc->client);
512 kfree(rbdc);
513 }
514
515 /*
516 * Drop reference to ceph client node. If it's not referenced anymore, release
517 * it.
518 */
519 static void rbd_put_client(struct rbd_client *rbdc)
520 {
521 if (rbdc)
522 kref_put(&rbdc->kref, rbd_client_release);
523 }
524
525 /*
526 * Destroy requests collection
527 */
528 static void rbd_coll_release(struct kref *kref)
529 {
530 struct rbd_req_coll *coll =
531 container_of(kref, struct rbd_req_coll, kref);
532
533 dout("rbd_coll_release %p\n", coll);
534 kfree(coll);
535 }
536
537 static bool rbd_image_format_valid(u32 image_format)
538 {
539 return image_format == 1 || image_format == 2;
540 }
541
542 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
543 {
544 size_t size;
545 u32 snap_count;
546
547 /* The header has to start with the magic rbd header text */
548 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
549 return false;
550
551 /* The bio layer requires at least sector-sized I/O */
552
553 if (ondisk->options.order < SECTOR_SHIFT)
554 return false;
555
556 /* If we use u64 in a few spots we may be able to loosen this */
557
558 if (ondisk->options.order > 8 * sizeof (int) - 1)
559 return false;
560
561 /*
562 * The size of a snapshot header has to fit in a size_t, and
563 * that limits the number of snapshots.
564 */
565 snap_count = le32_to_cpu(ondisk->snap_count);
566 size = SIZE_MAX - sizeof (struct ceph_snap_context);
567 if (snap_count > size / sizeof (__le64))
568 return false;
569
570 /*
571 * Not only that, but the size of the entire the snapshot
572 * header must also be representable in a size_t.
573 */
574 size -= snap_count * sizeof (__le64);
575 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
576 return false;
577
578 return true;
579 }
580
581 /*
582 * Create a new header structure, translate header format from the on-disk
583 * header.
584 */
585 static int rbd_header_from_disk(struct rbd_image_header *header,
586 struct rbd_image_header_ondisk *ondisk)
587 {
588 u32 snap_count;
589 size_t len;
590 size_t size;
591 u32 i;
592
593 memset(header, 0, sizeof (*header));
594
595 snap_count = le32_to_cpu(ondisk->snap_count);
596
597 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
598 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
599 if (!header->object_prefix)
600 return -ENOMEM;
601 memcpy(header->object_prefix, ondisk->object_prefix, len);
602 header->object_prefix[len] = '\0';
603
604 if (snap_count) {
605 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
606
607 /* Save a copy of the snapshot names */
608
609 if (snap_names_len > (u64) SIZE_MAX)
610 return -EIO;
611 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
612 if (!header->snap_names)
613 goto out_err;
614 /*
615 * Note that rbd_dev_v1_header_read() guarantees
616 * the ondisk buffer we're working with has
617 * snap_names_len bytes beyond the end of the
618 * snapshot id array, this memcpy() is safe.
619 */
620 memcpy(header->snap_names, &ondisk->snaps[snap_count],
621 snap_names_len);
622
623 /* Record each snapshot's size */
624
625 size = snap_count * sizeof (*header->snap_sizes);
626 header->snap_sizes = kmalloc(size, GFP_KERNEL);
627 if (!header->snap_sizes)
628 goto out_err;
629 for (i = 0; i < snap_count; i++)
630 header->snap_sizes[i] =
631 le64_to_cpu(ondisk->snaps[i].image_size);
632 } else {
633 WARN_ON(ondisk->snap_names_len);
634 header->snap_names = NULL;
635 header->snap_sizes = NULL;
636 }
637
638 header->features = 0; /* No features support in v1 images */
639 header->obj_order = ondisk->options.order;
640 header->crypt_type = ondisk->options.crypt_type;
641 header->comp_type = ondisk->options.comp_type;
642
643 /* Allocate and fill in the snapshot context */
644
645 header->image_size = le64_to_cpu(ondisk->image_size);
646 size = sizeof (struct ceph_snap_context);
647 size += snap_count * sizeof (header->snapc->snaps[0]);
648 header->snapc = kzalloc(size, GFP_KERNEL);
649 if (!header->snapc)
650 goto out_err;
651
652 atomic_set(&header->snapc->nref, 1);
653 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
654 header->snapc->num_snaps = snap_count;
655 for (i = 0; i < snap_count; i++)
656 header->snapc->snaps[i] =
657 le64_to_cpu(ondisk->snaps[i].id);
658
659 return 0;
660
661 out_err:
662 kfree(header->snap_sizes);
663 header->snap_sizes = NULL;
664 kfree(header->snap_names);
665 header->snap_names = NULL;
666 kfree(header->object_prefix);
667 header->object_prefix = NULL;
668
669 return -ENOMEM;
670 }
671
672 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
673 {
674 struct rbd_snap *snap;
675
676 if (snap_id == CEPH_NOSNAP)
677 return RBD_SNAP_HEAD_NAME;
678
679 list_for_each_entry(snap, &rbd_dev->snaps, node)
680 if (snap_id == snap->id)
681 return snap->name;
682
683 return NULL;
684 }
685
686 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
687 {
688
689 struct rbd_snap *snap;
690
691 list_for_each_entry(snap, &rbd_dev->snaps, node) {
692 if (!strcmp(snap_name, snap->name)) {
693 rbd_dev->spec->snap_id = snap->id;
694 rbd_dev->mapping.size = snap->size;
695 rbd_dev->mapping.features = snap->features;
696
697 return 0;
698 }
699 }
700
701 return -ENOENT;
702 }
703
704 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
705 {
706 int ret;
707
708 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
709 sizeof (RBD_SNAP_HEAD_NAME))) {
710 rbd_dev->spec->snap_id = CEPH_NOSNAP;
711 rbd_dev->mapping.size = rbd_dev->header.image_size;
712 rbd_dev->mapping.features = rbd_dev->header.features;
713 ret = 0;
714 } else {
715 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
716 if (ret < 0)
717 goto done;
718 rbd_dev->mapping.read_only = true;
719 }
720 rbd_dev->exists = true;
721 done:
722 return ret;
723 }
724
725 static void rbd_header_free(struct rbd_image_header *header)
726 {
727 kfree(header->object_prefix);
728 header->object_prefix = NULL;
729 kfree(header->snap_sizes);
730 header->snap_sizes = NULL;
731 kfree(header->snap_names);
732 header->snap_names = NULL;
733 ceph_put_snap_context(header->snapc);
734 header->snapc = NULL;
735 }
736
737 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
738 {
739 char *name;
740 u64 segment;
741 int ret;
742
743 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
744 if (!name)
745 return NULL;
746 segment = offset >> rbd_dev->header.obj_order;
747 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
748 rbd_dev->header.object_prefix, segment);
749 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
750 pr_err("error formatting segment name for #%llu (%d)\n",
751 segment, ret);
752 kfree(name);
753 name = NULL;
754 }
755
756 return name;
757 }
758
759 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
760 {
761 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
762
763 return offset & (segment_size - 1);
764 }
765
766 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
767 u64 offset, u64 length)
768 {
769 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
770
771 offset &= segment_size - 1;
772
773 rbd_assert(length <= U64_MAX - offset);
774 if (offset + length > segment_size)
775 length = segment_size - offset;
776
777 return length;
778 }
779
780 static int rbd_get_num_segments(struct rbd_image_header *header,
781 u64 ofs, u64 len)
782 {
783 u64 start_seg;
784 u64 end_seg;
785
786 if (!len)
787 return 0;
788 if (len - 1 > U64_MAX - ofs)
789 return -ERANGE;
790
791 start_seg = ofs >> header->obj_order;
792 end_seg = (ofs + len - 1) >> header->obj_order;
793
794 return end_seg - start_seg + 1;
795 }
796
797 /*
798 * returns the size of an object in the image
799 */
800 static u64 rbd_obj_bytes(struct rbd_image_header *header)
801 {
802 return 1 << header->obj_order;
803 }
804
805 /*
806 * bio helpers
807 */
808
809 static void bio_chain_put(struct bio *chain)
810 {
811 struct bio *tmp;
812
813 while (chain) {
814 tmp = chain;
815 chain = chain->bi_next;
816 bio_put(tmp);
817 }
818 }
819
820 /*
821 * zeros a bio chain, starting at specific offset
822 */
823 static void zero_bio_chain(struct bio *chain, int start_ofs)
824 {
825 struct bio_vec *bv;
826 unsigned long flags;
827 void *buf;
828 int i;
829 int pos = 0;
830
831 while (chain) {
832 bio_for_each_segment(bv, chain, i) {
833 if (pos + bv->bv_len > start_ofs) {
834 int remainder = max(start_ofs - pos, 0);
835 buf = bvec_kmap_irq(bv, &flags);
836 memset(buf + remainder, 0,
837 bv->bv_len - remainder);
838 bvec_kunmap_irq(buf, &flags);
839 }
840 pos += bv->bv_len;
841 }
842
843 chain = chain->bi_next;
844 }
845 }
846
847 /*
848 * Clone a portion of a bio, starting at the given byte offset
849 * and continuing for the number of bytes indicated.
850 */
851 static struct bio *bio_clone_range(struct bio *bio_src,
852 unsigned int offset,
853 unsigned int len,
854 gfp_t gfpmask)
855 {
856 struct bio_vec *bv;
857 unsigned int resid;
858 unsigned short idx;
859 unsigned int voff;
860 unsigned short end_idx;
861 unsigned short vcnt;
862 struct bio *bio;
863
864 /* Handle the easy case for the caller */
865
866 if (!offset && len == bio_src->bi_size)
867 return bio_clone(bio_src, gfpmask);
868
869 if (WARN_ON_ONCE(!len))
870 return NULL;
871 if (WARN_ON_ONCE(len > bio_src->bi_size))
872 return NULL;
873 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
874 return NULL;
875
876 /* Find first affected segment... */
877
878 resid = offset;
879 __bio_for_each_segment(bv, bio_src, idx, 0) {
880 if (resid < bv->bv_len)
881 break;
882 resid -= bv->bv_len;
883 }
884 voff = resid;
885
886 /* ...and the last affected segment */
887
888 resid += len;
889 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
890 if (resid <= bv->bv_len)
891 break;
892 resid -= bv->bv_len;
893 }
894 vcnt = end_idx - idx + 1;
895
896 /* Build the clone */
897
898 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
899 if (!bio)
900 return NULL; /* ENOMEM */
901
902 bio->bi_bdev = bio_src->bi_bdev;
903 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
904 bio->bi_rw = bio_src->bi_rw;
905 bio->bi_flags |= 1 << BIO_CLONED;
906
907 /*
908 * Copy over our part of the bio_vec, then update the first
909 * and last (or only) entries.
910 */
911 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
912 vcnt * sizeof (struct bio_vec));
913 bio->bi_io_vec[0].bv_offset += voff;
914 if (vcnt > 1) {
915 bio->bi_io_vec[0].bv_len -= voff;
916 bio->bi_io_vec[vcnt - 1].bv_len = resid;
917 } else {
918 bio->bi_io_vec[0].bv_len = len;
919 }
920
921 bio->bi_vcnt = vcnt;
922 bio->bi_size = len;
923 bio->bi_idx = 0;
924
925 return bio;
926 }
927
928 /*
929 * Clone a portion of a bio chain, starting at the given byte offset
930 * into the first bio in the source chain and continuing for the
931 * number of bytes indicated. The result is another bio chain of
932 * exactly the given length, or a null pointer on error.
933 *
934 * The bio_src and offset parameters are both in-out. On entry they
935 * refer to the first source bio and the offset into that bio where
936 * the start of data to be cloned is located.
937 *
938 * On return, bio_src is updated to refer to the bio in the source
939 * chain that contains first un-cloned byte, and *offset will
940 * contain the offset of that byte within that bio.
941 */
942 static struct bio *bio_chain_clone_range(struct bio **bio_src,
943 unsigned int *offset,
944 unsigned int len,
945 gfp_t gfpmask)
946 {
947 struct bio *bi = *bio_src;
948 unsigned int off = *offset;
949 struct bio *chain = NULL;
950 struct bio **end;
951
952 /* Build up a chain of clone bios up to the limit */
953
954 if (!bi || off >= bi->bi_size || !len)
955 return NULL; /* Nothing to clone */
956
957 end = &chain;
958 while (len) {
959 unsigned int bi_size;
960 struct bio *bio;
961
962 if (!bi)
963 goto out_err; /* EINVAL; ran out of bio's */
964 bi_size = min_t(unsigned int, bi->bi_size - off, len);
965 bio = bio_clone_range(bi, off, bi_size, gfpmask);
966 if (!bio)
967 goto out_err; /* ENOMEM */
968
969 *end = bio;
970 end = &bio->bi_next;
971
972 off += bi_size;
973 if (off == bi->bi_size) {
974 bi = bi->bi_next;
975 off = 0;
976 }
977 len -= bi_size;
978 }
979 *bio_src = bi;
980 *offset = off;
981
982 return chain;
983 out_err:
984 bio_chain_put(chain);
985
986 return NULL;
987 }
988
989 /*
990 * helpers for osd request op vectors.
991 */
992 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
993 int opcode, u32 payload_len)
994 {
995 struct ceph_osd_req_op *ops;
996
997 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
998 if (!ops)
999 return NULL;
1000
1001 ops[0].op = opcode;
1002
1003 /*
1004 * op extent offset and length will be set later on
1005 * in calc_raw_layout()
1006 */
1007 ops[0].payload_len = payload_len;
1008
1009 return ops;
1010 }
1011
1012 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
1013 {
1014 kfree(ops);
1015 }
1016
1017 static void rbd_coll_end_req_index(struct request *rq,
1018 struct rbd_req_coll *coll,
1019 int index,
1020 int ret, u64 len)
1021 {
1022 struct request_queue *q;
1023 int min, max, i;
1024
1025 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
1026 coll, index, ret, (unsigned long long) len);
1027
1028 if (!rq)
1029 return;
1030
1031 if (!coll) {
1032 blk_end_request(rq, ret, len);
1033 return;
1034 }
1035
1036 q = rq->q;
1037
1038 spin_lock_irq(q->queue_lock);
1039 coll->status[index].done = 1;
1040 coll->status[index].rc = ret;
1041 coll->status[index].bytes = len;
1042 max = min = coll->num_done;
1043 while (max < coll->total && coll->status[max].done)
1044 max++;
1045
1046 for (i = min; i<max; i++) {
1047 __blk_end_request(rq, coll->status[i].rc,
1048 coll->status[i].bytes);
1049 coll->num_done++;
1050 kref_put(&coll->kref, rbd_coll_release);
1051 }
1052 spin_unlock_irq(q->queue_lock);
1053 }
1054
1055 static void rbd_coll_end_req(struct rbd_request *req,
1056 int ret, u64 len)
1057 {
1058 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
1059 }
1060
1061 /*
1062 * Send ceph osd request
1063 */
1064 static int rbd_do_request(struct request *rq,
1065 struct rbd_device *rbd_dev,
1066 struct ceph_snap_context *snapc,
1067 u64 snapid,
1068 const char *object_name, u64 ofs, u64 len,
1069 struct bio *bio,
1070 struct page **pages,
1071 int num_pages,
1072 int flags,
1073 struct ceph_osd_req_op *ops,
1074 struct rbd_req_coll *coll,
1075 int coll_index,
1076 void (*rbd_cb)(struct ceph_osd_request *req,
1077 struct ceph_msg *msg),
1078 struct ceph_osd_request **linger_req,
1079 u64 *ver)
1080 {
1081 struct ceph_osd_request *req;
1082 struct ceph_file_layout *layout;
1083 int ret;
1084 u64 bno;
1085 struct timespec mtime = CURRENT_TIME;
1086 struct rbd_request *req_data;
1087 struct ceph_osd_request_head *reqhead;
1088 struct ceph_osd_client *osdc;
1089
1090 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1091 if (!req_data) {
1092 if (coll)
1093 rbd_coll_end_req_index(rq, coll, coll_index,
1094 -ENOMEM, len);
1095 return -ENOMEM;
1096 }
1097
1098 if (coll) {
1099 req_data->coll = coll;
1100 req_data->coll_index = coll_index;
1101 }
1102
1103 dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
1104 object_name, (unsigned long long) ofs,
1105 (unsigned long long) len, coll, coll_index);
1106
1107 osdc = &rbd_dev->rbd_client->client->osdc;
1108 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
1109 false, GFP_NOIO, pages, bio);
1110 if (!req) {
1111 ret = -ENOMEM;
1112 goto done_pages;
1113 }
1114
1115 req->r_callback = rbd_cb;
1116
1117 req_data->rq = rq;
1118 req_data->bio = bio;
1119 req_data->pages = pages;
1120 req_data->len = len;
1121
1122 req->r_priv = req_data;
1123
1124 reqhead = req->r_request->front.iov_base;
1125 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1126
1127 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1128 req->r_oid_len = strlen(req->r_oid);
1129
1130 layout = &req->r_file_layout;
1131 memset(layout, 0, sizeof(*layout));
1132 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1133 layout->fl_stripe_count = cpu_to_le32(1);
1134 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1135 layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
1136 ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1137 req, ops);
1138 rbd_assert(ret == 0);
1139
1140 ceph_osdc_build_request(req, ofs, &len,
1141 ops,
1142 snapc,
1143 &mtime,
1144 req->r_oid, req->r_oid_len);
1145
1146 if (linger_req) {
1147 ceph_osdc_set_request_linger(osdc, req);
1148 *linger_req = req;
1149 }
1150
1151 ret = ceph_osdc_start_request(osdc, req, false);
1152 if (ret < 0)
1153 goto done_err;
1154
1155 if (!rbd_cb) {
1156 ret = ceph_osdc_wait_request(osdc, req);
1157 if (ver)
1158 *ver = le64_to_cpu(req->r_reassert_version.version);
1159 dout("reassert_ver=%llu\n",
1160 (unsigned long long)
1161 le64_to_cpu(req->r_reassert_version.version));
1162 ceph_osdc_put_request(req);
1163 }
1164 return ret;
1165
1166 done_err:
1167 bio_chain_put(req_data->bio);
1168 ceph_osdc_put_request(req);
1169 done_pages:
1170 rbd_coll_end_req(req_data, ret, len);
1171 kfree(req_data);
1172 return ret;
1173 }
1174
1175 /*
1176 * Ceph osd op callback
1177 */
1178 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1179 {
1180 struct rbd_request *req_data = req->r_priv;
1181 struct ceph_osd_reply_head *replyhead;
1182 struct ceph_osd_op *op;
1183 __s32 rc;
1184 u64 bytes;
1185 int read_op;
1186
1187 /* parse reply */
1188 replyhead = msg->front.iov_base;
1189 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1190 op = (void *)(replyhead + 1);
1191 rc = le32_to_cpu(replyhead->result);
1192 bytes = le64_to_cpu(op->extent.length);
1193 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1194
1195 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1196 (unsigned long long) bytes, read_op, (int) rc);
1197
1198 if (rc == -ENOENT && read_op) {
1199 zero_bio_chain(req_data->bio, 0);
1200 rc = 0;
1201 } else if (rc == 0 && read_op && bytes < req_data->len) {
1202 zero_bio_chain(req_data->bio, bytes);
1203 bytes = req_data->len;
1204 }
1205
1206 rbd_coll_end_req(req_data, rc, bytes);
1207
1208 if (req_data->bio)
1209 bio_chain_put(req_data->bio);
1210
1211 ceph_osdc_put_request(req);
1212 kfree(req_data);
1213 }
1214
1215 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1216 {
1217 ceph_osdc_put_request(req);
1218 }
1219
1220 /*
1221 * Do a synchronous ceph osd operation
1222 */
1223 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1224 struct ceph_snap_context *snapc,
1225 u64 snapid,
1226 int flags,
1227 struct ceph_osd_req_op *ops,
1228 const char *object_name,
1229 u64 ofs, u64 inbound_size,
1230 char *inbound,
1231 struct ceph_osd_request **linger_req,
1232 u64 *ver)
1233 {
1234 int ret;
1235 struct page **pages;
1236 int num_pages;
1237
1238 rbd_assert(ops != NULL);
1239
1240 num_pages = calc_pages_for(ofs, inbound_size);
1241 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1242 if (IS_ERR(pages))
1243 return PTR_ERR(pages);
1244
1245 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1246 object_name, ofs, inbound_size, NULL,
1247 pages, num_pages,
1248 flags,
1249 ops,
1250 NULL, 0,
1251 NULL,
1252 linger_req, ver);
1253 if (ret < 0)
1254 goto done;
1255
1256 if ((flags & CEPH_OSD_FLAG_READ) && inbound)
1257 ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
1258
1259 done:
1260 ceph_release_page_vector(pages, num_pages);
1261 return ret;
1262 }
1263
1264 /*
1265 * Do an asynchronous ceph osd operation
1266 */
1267 static int rbd_do_op(struct request *rq,
1268 struct rbd_device *rbd_dev,
1269 struct ceph_snap_context *snapc,
1270 u64 ofs, u64 len,
1271 struct bio *bio,
1272 struct rbd_req_coll *coll,
1273 int coll_index)
1274 {
1275 char *seg_name;
1276 u64 seg_ofs;
1277 u64 seg_len;
1278 int ret;
1279 struct ceph_osd_req_op *ops;
1280 u32 payload_len;
1281 int opcode;
1282 int flags;
1283 u64 snapid;
1284
1285 seg_name = rbd_segment_name(rbd_dev, ofs);
1286 if (!seg_name)
1287 return -ENOMEM;
1288 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1289 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1290
1291 if (rq_data_dir(rq) == WRITE) {
1292 opcode = CEPH_OSD_OP_WRITE;
1293 flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
1294 snapid = CEPH_NOSNAP;
1295 payload_len = seg_len;
1296 } else {
1297 opcode = CEPH_OSD_OP_READ;
1298 flags = CEPH_OSD_FLAG_READ;
1299 snapc = NULL;
1300 snapid = rbd_dev->spec->snap_id;
1301 payload_len = 0;
1302 }
1303
1304 ret = -ENOMEM;
1305 ops = rbd_create_rw_ops(1, opcode, payload_len);
1306 if (!ops)
1307 goto done;
1308
1309 /* we've taken care of segment sizes earlier when we
1310 cloned the bios. We should never have a segment
1311 truncated at this point */
1312 rbd_assert(seg_len == len);
1313
1314 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1315 seg_name, seg_ofs, seg_len,
1316 bio,
1317 NULL, 0,
1318 flags,
1319 ops,
1320 coll, coll_index,
1321 rbd_req_cb, 0, NULL);
1322
1323 rbd_destroy_ops(ops);
1324 done:
1325 kfree(seg_name);
1326 return ret;
1327 }
1328
1329 /*
1330 * Request sync osd read
1331 */
1332 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1333 u64 snapid,
1334 const char *object_name,
1335 u64 ofs, u64 len,
1336 char *buf,
1337 u64 *ver)
1338 {
1339 struct ceph_osd_req_op *ops;
1340 int ret;
1341
1342 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1343 if (!ops)
1344 return -ENOMEM;
1345
1346 ret = rbd_req_sync_op(rbd_dev, NULL,
1347 snapid,
1348 CEPH_OSD_FLAG_READ,
1349 ops, object_name, ofs, len, buf, NULL, ver);
1350 rbd_destroy_ops(ops);
1351
1352 return ret;
1353 }
1354
1355 /*
1356 * Request sync osd watch
1357 */
1358 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1359 u64 ver,
1360 u64 notify_id)
1361 {
1362 struct ceph_osd_req_op *ops;
1363 int ret;
1364
1365 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1366 if (!ops)
1367 return -ENOMEM;
1368
1369 ops[0].watch.ver = cpu_to_le64(ver);
1370 ops[0].watch.cookie = notify_id;
1371 ops[0].watch.flag = 0;
1372
1373 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1374 rbd_dev->header_name, 0, 0, NULL,
1375 NULL, 0,
1376 CEPH_OSD_FLAG_READ,
1377 ops,
1378 NULL, 0,
1379 rbd_simple_req_cb, 0, NULL);
1380
1381 rbd_destroy_ops(ops);
1382 return ret;
1383 }
1384
1385 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1386 {
1387 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1388 u64 hver;
1389 int rc;
1390
1391 if (!rbd_dev)
1392 return;
1393
1394 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1395 rbd_dev->header_name, (unsigned long long) notify_id,
1396 (unsigned int) opcode);
1397 rc = rbd_dev_refresh(rbd_dev, &hver);
1398 if (rc)
1399 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1400 " update snaps: %d\n", rbd_dev->major, rc);
1401
1402 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1403 }
1404
1405 /*
1406 * Request sync osd watch
1407 */
1408 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1409 {
1410 struct ceph_osd_req_op *ops;
1411 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1412 int ret;
1413
1414 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1415 if (!ops)
1416 return -ENOMEM;
1417
1418 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1419 (void *)rbd_dev, &rbd_dev->watch_event);
1420 if (ret < 0)
1421 goto fail;
1422
1423 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1424 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1425 ops[0].watch.flag = 1;
1426
1427 ret = rbd_req_sync_op(rbd_dev, NULL,
1428 CEPH_NOSNAP,
1429 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1430 ops,
1431 rbd_dev->header_name,
1432 0, 0, NULL,
1433 &rbd_dev->watch_request, NULL);
1434
1435 if (ret < 0)
1436 goto fail_event;
1437
1438 rbd_destroy_ops(ops);
1439 return 0;
1440
1441 fail_event:
1442 ceph_osdc_cancel_event(rbd_dev->watch_event);
1443 rbd_dev->watch_event = NULL;
1444 fail:
1445 rbd_destroy_ops(ops);
1446 return ret;
1447 }
1448
1449 /*
1450 * Request sync osd unwatch
1451 */
1452 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1453 {
1454 struct ceph_osd_req_op *ops;
1455 int ret;
1456
1457 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1458 if (!ops)
1459 return -ENOMEM;
1460
1461 ops[0].watch.ver = 0;
1462 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1463 ops[0].watch.flag = 0;
1464
1465 ret = rbd_req_sync_op(rbd_dev, NULL,
1466 CEPH_NOSNAP,
1467 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1468 ops,
1469 rbd_dev->header_name,
1470 0, 0, NULL, NULL, NULL);
1471
1472
1473 rbd_destroy_ops(ops);
1474 ceph_osdc_cancel_event(rbd_dev->watch_event);
1475 rbd_dev->watch_event = NULL;
1476 return ret;
1477 }
1478
1479 /*
1480 * Synchronous osd object method call
1481 */
1482 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1483 const char *object_name,
1484 const char *class_name,
1485 const char *method_name,
1486 const char *outbound,
1487 size_t outbound_size,
1488 char *inbound,
1489 size_t inbound_size,
1490 int flags,
1491 u64 *ver)
1492 {
1493 struct ceph_osd_req_op *ops;
1494 int class_name_len = strlen(class_name);
1495 int method_name_len = strlen(method_name);
1496 int payload_size;
1497 int ret;
1498
1499 /*
1500 * Any input parameters required by the method we're calling
1501 * will be sent along with the class and method names as
1502 * part of the message payload. That data and its size are
1503 * supplied via the indata and indata_len fields (named from
1504 * the perspective of the server side) in the OSD request
1505 * operation.
1506 */
1507 payload_size = class_name_len + method_name_len + outbound_size;
1508 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL, payload_size);
1509 if (!ops)
1510 return -ENOMEM;
1511
1512 ops[0].cls.class_name = class_name;
1513 ops[0].cls.class_len = (__u8) class_name_len;
1514 ops[0].cls.method_name = method_name;
1515 ops[0].cls.method_len = (__u8) method_name_len;
1516 ops[0].cls.argc = 0;
1517 ops[0].cls.indata = outbound;
1518 ops[0].cls.indata_len = outbound_size;
1519
1520 ret = rbd_req_sync_op(rbd_dev, NULL,
1521 CEPH_NOSNAP,
1522 flags, ops,
1523 object_name, 0, inbound_size, inbound,
1524 NULL, ver);
1525
1526 rbd_destroy_ops(ops);
1527
1528 dout("cls_exec returned %d\n", ret);
1529 return ret;
1530 }
1531
1532 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1533 {
1534 struct rbd_req_coll *coll =
1535 kzalloc(sizeof(struct rbd_req_coll) +
1536 sizeof(struct rbd_req_status) * num_reqs,
1537 GFP_ATOMIC);
1538
1539 if (!coll)
1540 return NULL;
1541 coll->total = num_reqs;
1542 kref_init(&coll->kref);
1543 return coll;
1544 }
1545
1546 /*
1547 * block device queue callback
1548 */
1549 static void rbd_rq_fn(struct request_queue *q)
1550 {
1551 struct rbd_device *rbd_dev = q->queuedata;
1552 struct request *rq;
1553
1554 while ((rq = blk_fetch_request(q))) {
1555 struct bio *bio;
1556 bool do_write;
1557 unsigned int size;
1558 u64 ofs;
1559 int num_segs, cur_seg = 0;
1560 struct rbd_req_coll *coll;
1561 struct ceph_snap_context *snapc;
1562 unsigned int bio_offset;
1563
1564 dout("fetched request\n");
1565
1566 /* filter out block requests we don't understand */
1567 if ((rq->cmd_type != REQ_TYPE_FS)) {
1568 __blk_end_request_all(rq, 0);
1569 continue;
1570 }
1571
1572 /* deduce our operation (read, write) */
1573 do_write = (rq_data_dir(rq) == WRITE);
1574 if (do_write && rbd_dev->mapping.read_only) {
1575 __blk_end_request_all(rq, -EROFS);
1576 continue;
1577 }
1578
1579 spin_unlock_irq(q->queue_lock);
1580
1581 down_read(&rbd_dev->header_rwsem);
1582
1583 if (!rbd_dev->exists) {
1584 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
1585 up_read(&rbd_dev->header_rwsem);
1586 dout("request for non-existent snapshot");
1587 spin_lock_irq(q->queue_lock);
1588 __blk_end_request_all(rq, -ENXIO);
1589 continue;
1590 }
1591
1592 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1593
1594 up_read(&rbd_dev->header_rwsem);
1595
1596 size = blk_rq_bytes(rq);
1597 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1598 bio = rq->bio;
1599
1600 dout("%s 0x%x bytes at 0x%llx\n",
1601 do_write ? "write" : "read",
1602 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1603
1604 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1605 if (num_segs <= 0) {
1606 spin_lock_irq(q->queue_lock);
1607 __blk_end_request_all(rq, num_segs);
1608 ceph_put_snap_context(snapc);
1609 continue;
1610 }
1611 coll = rbd_alloc_coll(num_segs);
1612 if (!coll) {
1613 spin_lock_irq(q->queue_lock);
1614 __blk_end_request_all(rq, -ENOMEM);
1615 ceph_put_snap_context(snapc);
1616 continue;
1617 }
1618
1619 bio_offset = 0;
1620 do {
1621 u64 limit = rbd_segment_length(rbd_dev, ofs, size);
1622 unsigned int chain_size;
1623 struct bio *bio_chain;
1624
1625 BUG_ON(limit > (u64) UINT_MAX);
1626 chain_size = (unsigned int) limit;
1627 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1628
1629 kref_get(&coll->kref);
1630
1631 /* Pass a cloned bio chain via an osd request */
1632
1633 bio_chain = bio_chain_clone_range(&bio,
1634 &bio_offset, chain_size,
1635 GFP_ATOMIC);
1636 if (bio_chain)
1637 (void) rbd_do_op(rq, rbd_dev, snapc,
1638 ofs, chain_size,
1639 bio_chain, coll, cur_seg);
1640 else
1641 rbd_coll_end_req_index(rq, coll, cur_seg,
1642 -ENOMEM, chain_size);
1643 size -= chain_size;
1644 ofs += chain_size;
1645
1646 cur_seg++;
1647 } while (size > 0);
1648 kref_put(&coll->kref, rbd_coll_release);
1649
1650 spin_lock_irq(q->queue_lock);
1651
1652 ceph_put_snap_context(snapc);
1653 }
1654 }
1655
1656 /*
1657 * a queue callback. Makes sure that we don't create a bio that spans across
1658 * multiple osd objects. One exception would be with a single page bios,
1659 * which we handle later at bio_chain_clone_range()
1660 */
1661 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1662 struct bio_vec *bvec)
1663 {
1664 struct rbd_device *rbd_dev = q->queuedata;
1665 sector_t sector_offset;
1666 sector_t sectors_per_obj;
1667 sector_t obj_sector_offset;
1668 int ret;
1669
1670 /*
1671 * Find how far into its rbd object the partition-relative
1672 * bio start sector is to offset relative to the enclosing
1673 * device.
1674 */
1675 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
1676 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1677 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
1678
1679 /*
1680 * Compute the number of bytes from that offset to the end
1681 * of the object. Account for what's already used by the bio.
1682 */
1683 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
1684 if (ret > bmd->bi_size)
1685 ret -= bmd->bi_size;
1686 else
1687 ret = 0;
1688
1689 /*
1690 * Don't send back more than was asked for. And if the bio
1691 * was empty, let the whole thing through because: "Note
1692 * that a block device *must* allow a single page to be
1693 * added to an empty bio."
1694 */
1695 rbd_assert(bvec->bv_len <= PAGE_SIZE);
1696 if (ret > (int) bvec->bv_len || !bmd->bi_size)
1697 ret = (int) bvec->bv_len;
1698
1699 return ret;
1700 }
1701
1702 static void rbd_free_disk(struct rbd_device *rbd_dev)
1703 {
1704 struct gendisk *disk = rbd_dev->disk;
1705
1706 if (!disk)
1707 return;
1708
1709 if (disk->flags & GENHD_FL_UP)
1710 del_gendisk(disk);
1711 if (disk->queue)
1712 blk_cleanup_queue(disk->queue);
1713 put_disk(disk);
1714 }
1715
1716 /*
1717 * Read the complete header for the given rbd device.
1718 *
1719 * Returns a pointer to a dynamically-allocated buffer containing
1720 * the complete and validated header. Caller can pass the address
1721 * of a variable that will be filled in with the version of the
1722 * header object at the time it was read.
1723 *
1724 * Returns a pointer-coded errno if a failure occurs.
1725 */
1726 static struct rbd_image_header_ondisk *
1727 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1728 {
1729 struct rbd_image_header_ondisk *ondisk = NULL;
1730 u32 snap_count = 0;
1731 u64 names_size = 0;
1732 u32 want_count;
1733 int ret;
1734
1735 /*
1736 * The complete header will include an array of its 64-bit
1737 * snapshot ids, followed by the names of those snapshots as
1738 * a contiguous block of NUL-terminated strings. Note that
1739 * the number of snapshots could change by the time we read
1740 * it in, in which case we re-read it.
1741 */
1742 do {
1743 size_t size;
1744
1745 kfree(ondisk);
1746
1747 size = sizeof (*ondisk);
1748 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1749 size += names_size;
1750 ondisk = kmalloc(size, GFP_KERNEL);
1751 if (!ondisk)
1752 return ERR_PTR(-ENOMEM);
1753
1754 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1755 rbd_dev->header_name,
1756 0, size,
1757 (char *) ondisk, version);
1758
1759 if (ret < 0)
1760 goto out_err;
1761 if (WARN_ON((size_t) ret < size)) {
1762 ret = -ENXIO;
1763 pr_warning("short header read for image %s"
1764 " (want %zd got %d)\n",
1765 rbd_dev->spec->image_name, size, ret);
1766 goto out_err;
1767 }
1768 if (!rbd_dev_ondisk_valid(ondisk)) {
1769 ret = -ENXIO;
1770 pr_warning("invalid header for image %s\n",
1771 rbd_dev->spec->image_name);
1772 goto out_err;
1773 }
1774
1775 names_size = le64_to_cpu(ondisk->snap_names_len);
1776 want_count = snap_count;
1777 snap_count = le32_to_cpu(ondisk->snap_count);
1778 } while (snap_count != want_count);
1779
1780 return ondisk;
1781
1782 out_err:
1783 kfree(ondisk);
1784
1785 return ERR_PTR(ret);
1786 }
1787
1788 /*
1789 * reload the ondisk the header
1790 */
1791 static int rbd_read_header(struct rbd_device *rbd_dev,
1792 struct rbd_image_header *header)
1793 {
1794 struct rbd_image_header_ondisk *ondisk;
1795 u64 ver = 0;
1796 int ret;
1797
1798 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1799 if (IS_ERR(ondisk))
1800 return PTR_ERR(ondisk);
1801 ret = rbd_header_from_disk(header, ondisk);
1802 if (ret >= 0)
1803 header->obj_version = ver;
1804 kfree(ondisk);
1805
1806 return ret;
1807 }
1808
1809 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1810 {
1811 struct rbd_snap *snap;
1812 struct rbd_snap *next;
1813
1814 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1815 rbd_remove_snap_dev(snap);
1816 }
1817
1818 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
1819 {
1820 sector_t size;
1821
1822 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
1823 return;
1824
1825 size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
1826 dout("setting size to %llu sectors", (unsigned long long) size);
1827 rbd_dev->mapping.size = (u64) size;
1828 set_capacity(rbd_dev->disk, size);
1829 }
1830
1831 /*
1832 * only read the first part of the ondisk header, without the snaps info
1833 */
1834 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
1835 {
1836 int ret;
1837 struct rbd_image_header h;
1838
1839 ret = rbd_read_header(rbd_dev, &h);
1840 if (ret < 0)
1841 return ret;
1842
1843 down_write(&rbd_dev->header_rwsem);
1844
1845 /* Update image size, and check for resize of mapped image */
1846 rbd_dev->header.image_size = h.image_size;
1847 rbd_update_mapping_size(rbd_dev);
1848
1849 /* rbd_dev->header.object_prefix shouldn't change */
1850 kfree(rbd_dev->header.snap_sizes);
1851 kfree(rbd_dev->header.snap_names);
1852 /* osd requests may still refer to snapc */
1853 ceph_put_snap_context(rbd_dev->header.snapc);
1854
1855 if (hver)
1856 *hver = h.obj_version;
1857 rbd_dev->header.obj_version = h.obj_version;
1858 rbd_dev->header.image_size = h.image_size;
1859 rbd_dev->header.snapc = h.snapc;
1860 rbd_dev->header.snap_names = h.snap_names;
1861 rbd_dev->header.snap_sizes = h.snap_sizes;
1862 /* Free the extra copy of the object prefix */
1863 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1864 kfree(h.object_prefix);
1865
1866 ret = rbd_dev_snaps_update(rbd_dev);
1867 if (!ret)
1868 ret = rbd_dev_snaps_register(rbd_dev);
1869
1870 up_write(&rbd_dev->header_rwsem);
1871
1872 return ret;
1873 }
1874
1875 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
1876 {
1877 int ret;
1878
1879 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1880 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1881 if (rbd_dev->image_format == 1)
1882 ret = rbd_dev_v1_refresh(rbd_dev, hver);
1883 else
1884 ret = rbd_dev_v2_refresh(rbd_dev, hver);
1885 mutex_unlock(&ctl_mutex);
1886
1887 return ret;
1888 }
1889
1890 static int rbd_init_disk(struct rbd_device *rbd_dev)
1891 {
1892 struct gendisk *disk;
1893 struct request_queue *q;
1894 u64 segment_size;
1895
1896 /* create gendisk info */
1897 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1898 if (!disk)
1899 return -ENOMEM;
1900
1901 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1902 rbd_dev->dev_id);
1903 disk->major = rbd_dev->major;
1904 disk->first_minor = 0;
1905 disk->fops = &rbd_bd_ops;
1906 disk->private_data = rbd_dev;
1907
1908 /* init rq */
1909 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1910 if (!q)
1911 goto out_disk;
1912
1913 /* We use the default size, but let's be explicit about it. */
1914 blk_queue_physical_block_size(q, SECTOR_SIZE);
1915
1916 /* set io sizes to object size */
1917 segment_size = rbd_obj_bytes(&rbd_dev->header);
1918 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1919 blk_queue_max_segment_size(q, segment_size);
1920 blk_queue_io_min(q, segment_size);
1921 blk_queue_io_opt(q, segment_size);
1922
1923 blk_queue_merge_bvec(q, rbd_merge_bvec);
1924 disk->queue = q;
1925
1926 q->queuedata = rbd_dev;
1927
1928 rbd_dev->disk = disk;
1929
1930 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
1931
1932 return 0;
1933 out_disk:
1934 put_disk(disk);
1935
1936 return -ENOMEM;
1937 }
1938
1939 /*
1940 sysfs
1941 */
1942
1943 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1944 {
1945 return container_of(dev, struct rbd_device, dev);
1946 }
1947
1948 static ssize_t rbd_size_show(struct device *dev,
1949 struct device_attribute *attr, char *buf)
1950 {
1951 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1952 sector_t size;
1953
1954 down_read(&rbd_dev->header_rwsem);
1955 size = get_capacity(rbd_dev->disk);
1956 up_read(&rbd_dev->header_rwsem);
1957
1958 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1959 }
1960
1961 /*
1962 * Note this shows the features for whatever's mapped, which is not
1963 * necessarily the base image.
1964 */
1965 static ssize_t rbd_features_show(struct device *dev,
1966 struct device_attribute *attr, char *buf)
1967 {
1968 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1969
1970 return sprintf(buf, "0x%016llx\n",
1971 (unsigned long long) rbd_dev->mapping.features);
1972 }
1973
1974 static ssize_t rbd_major_show(struct device *dev,
1975 struct device_attribute *attr, char *buf)
1976 {
1977 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1978
1979 return sprintf(buf, "%d\n", rbd_dev->major);
1980 }
1981
1982 static ssize_t rbd_client_id_show(struct device *dev,
1983 struct device_attribute *attr, char *buf)
1984 {
1985 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1986
1987 return sprintf(buf, "client%lld\n",
1988 ceph_client_id(rbd_dev->rbd_client->client));
1989 }
1990
1991 static ssize_t rbd_pool_show(struct device *dev,
1992 struct device_attribute *attr, char *buf)
1993 {
1994 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1995
1996 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
1997 }
1998
1999 static ssize_t rbd_pool_id_show(struct device *dev,
2000 struct device_attribute *attr, char *buf)
2001 {
2002 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2003
2004 return sprintf(buf, "%llu\n",
2005 (unsigned long long) rbd_dev->spec->pool_id);
2006 }
2007
2008 static ssize_t rbd_name_show(struct device *dev,
2009 struct device_attribute *attr, char *buf)
2010 {
2011 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2012
2013 if (rbd_dev->spec->image_name)
2014 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
2015
2016 return sprintf(buf, "(unknown)\n");
2017 }
2018
2019 static ssize_t rbd_image_id_show(struct device *dev,
2020 struct device_attribute *attr, char *buf)
2021 {
2022 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2023
2024 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
2025 }
2026
2027 /*
2028 * Shows the name of the currently-mapped snapshot (or
2029 * RBD_SNAP_HEAD_NAME for the base image).
2030 */
2031 static ssize_t rbd_snap_show(struct device *dev,
2032 struct device_attribute *attr,
2033 char *buf)
2034 {
2035 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2036
2037 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
2038 }
2039
2040 /*
2041 * For an rbd v2 image, shows the pool id, image id, and snapshot id
2042 * for the parent image. If there is no parent, simply shows
2043 * "(no parent image)".
2044 */
2045 static ssize_t rbd_parent_show(struct device *dev,
2046 struct device_attribute *attr,
2047 char *buf)
2048 {
2049 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2050 struct rbd_spec *spec = rbd_dev->parent_spec;
2051 int count;
2052 char *bufp = buf;
2053
2054 if (!spec)
2055 return sprintf(buf, "(no parent image)\n");
2056
2057 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
2058 (unsigned long long) spec->pool_id, spec->pool_name);
2059 if (count < 0)
2060 return count;
2061 bufp += count;
2062
2063 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
2064 spec->image_name ? spec->image_name : "(unknown)");
2065 if (count < 0)
2066 return count;
2067 bufp += count;
2068
2069 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
2070 (unsigned long long) spec->snap_id, spec->snap_name);
2071 if (count < 0)
2072 return count;
2073 bufp += count;
2074
2075 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
2076 if (count < 0)
2077 return count;
2078 bufp += count;
2079
2080 return (ssize_t) (bufp - buf);
2081 }
2082
2083 static ssize_t rbd_image_refresh(struct device *dev,
2084 struct device_attribute *attr,
2085 const char *buf,
2086 size_t size)
2087 {
2088 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2089 int ret;
2090
2091 ret = rbd_dev_refresh(rbd_dev, NULL);
2092
2093 return ret < 0 ? ret : size;
2094 }
2095
2096 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2097 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
2098 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2099 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2100 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2101 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2102 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2103 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
2104 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2105 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2106 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
2107
2108 static struct attribute *rbd_attrs[] = {
2109 &dev_attr_size.attr,
2110 &dev_attr_features.attr,
2111 &dev_attr_major.attr,
2112 &dev_attr_client_id.attr,
2113 &dev_attr_pool.attr,
2114 &dev_attr_pool_id.attr,
2115 &dev_attr_name.attr,
2116 &dev_attr_image_id.attr,
2117 &dev_attr_current_snap.attr,
2118 &dev_attr_parent.attr,
2119 &dev_attr_refresh.attr,
2120 NULL
2121 };
2122
2123 static struct attribute_group rbd_attr_group = {
2124 .attrs = rbd_attrs,
2125 };
2126
2127 static const struct attribute_group *rbd_attr_groups[] = {
2128 &rbd_attr_group,
2129 NULL
2130 };
2131
2132 static void rbd_sysfs_dev_release(struct device *dev)
2133 {
2134 }
2135
2136 static struct device_type rbd_device_type = {
2137 .name = "rbd",
2138 .groups = rbd_attr_groups,
2139 .release = rbd_sysfs_dev_release,
2140 };
2141
2142
2143 /*
2144 sysfs - snapshots
2145 */
2146
2147 static ssize_t rbd_snap_size_show(struct device *dev,
2148 struct device_attribute *attr,
2149 char *buf)
2150 {
2151 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2152
2153 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2154 }
2155
2156 static ssize_t rbd_snap_id_show(struct device *dev,
2157 struct device_attribute *attr,
2158 char *buf)
2159 {
2160 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2161
2162 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2163 }
2164
2165 static ssize_t rbd_snap_features_show(struct device *dev,
2166 struct device_attribute *attr,
2167 char *buf)
2168 {
2169 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2170
2171 return sprintf(buf, "0x%016llx\n",
2172 (unsigned long long) snap->features);
2173 }
2174
2175 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2176 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2177 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
2178
2179 static struct attribute *rbd_snap_attrs[] = {
2180 &dev_attr_snap_size.attr,
2181 &dev_attr_snap_id.attr,
2182 &dev_attr_snap_features.attr,
2183 NULL,
2184 };
2185
2186 static struct attribute_group rbd_snap_attr_group = {
2187 .attrs = rbd_snap_attrs,
2188 };
2189
2190 static void rbd_snap_dev_release(struct device *dev)
2191 {
2192 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2193 kfree(snap->name);
2194 kfree(snap);
2195 }
2196
2197 static const struct attribute_group *rbd_snap_attr_groups[] = {
2198 &rbd_snap_attr_group,
2199 NULL
2200 };
2201
2202 static struct device_type rbd_snap_device_type = {
2203 .groups = rbd_snap_attr_groups,
2204 .release = rbd_snap_dev_release,
2205 };
2206
2207 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
2208 {
2209 kref_get(&spec->kref);
2210
2211 return spec;
2212 }
2213
2214 static void rbd_spec_free(struct kref *kref);
2215 static void rbd_spec_put(struct rbd_spec *spec)
2216 {
2217 if (spec)
2218 kref_put(&spec->kref, rbd_spec_free);
2219 }
2220
2221 static struct rbd_spec *rbd_spec_alloc(void)
2222 {
2223 struct rbd_spec *spec;
2224
2225 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
2226 if (!spec)
2227 return NULL;
2228 kref_init(&spec->kref);
2229
2230 rbd_spec_put(rbd_spec_get(spec)); /* TEMPORARY */
2231
2232 return spec;
2233 }
2234
2235 static void rbd_spec_free(struct kref *kref)
2236 {
2237 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
2238
2239 kfree(spec->pool_name);
2240 kfree(spec->image_id);
2241 kfree(spec->image_name);
2242 kfree(spec->snap_name);
2243 kfree(spec);
2244 }
2245
2246 struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
2247 struct rbd_spec *spec)
2248 {
2249 struct rbd_device *rbd_dev;
2250
2251 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
2252 if (!rbd_dev)
2253 return NULL;
2254
2255 spin_lock_init(&rbd_dev->lock);
2256 INIT_LIST_HEAD(&rbd_dev->node);
2257 INIT_LIST_HEAD(&rbd_dev->snaps);
2258 init_rwsem(&rbd_dev->header_rwsem);
2259
2260 rbd_dev->spec = spec;
2261 rbd_dev->rbd_client = rbdc;
2262
2263 return rbd_dev;
2264 }
2265
2266 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
2267 {
2268 rbd_spec_put(rbd_dev->parent_spec);
2269 kfree(rbd_dev->header_name);
2270 rbd_put_client(rbd_dev->rbd_client);
2271 rbd_spec_put(rbd_dev->spec);
2272 kfree(rbd_dev);
2273 }
2274
2275 static bool rbd_snap_registered(struct rbd_snap *snap)
2276 {
2277 bool ret = snap->dev.type == &rbd_snap_device_type;
2278 bool reg = device_is_registered(&snap->dev);
2279
2280 rbd_assert(!ret ^ reg);
2281
2282 return ret;
2283 }
2284
2285 static void rbd_remove_snap_dev(struct rbd_snap *snap)
2286 {
2287 list_del(&snap->node);
2288 if (device_is_registered(&snap->dev))
2289 device_unregister(&snap->dev);
2290 }
2291
2292 static int rbd_register_snap_dev(struct rbd_snap *snap,
2293 struct device *parent)
2294 {
2295 struct device *dev = &snap->dev;
2296 int ret;
2297
2298 dev->type = &rbd_snap_device_type;
2299 dev->parent = parent;
2300 dev->release = rbd_snap_dev_release;
2301 dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
2302 dout("%s: registering device for snapshot %s\n", __func__, snap->name);
2303
2304 ret = device_register(dev);
2305
2306 return ret;
2307 }
2308
2309 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2310 const char *snap_name,
2311 u64 snap_id, u64 snap_size,
2312 u64 snap_features)
2313 {
2314 struct rbd_snap *snap;
2315 int ret;
2316
2317 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2318 if (!snap)
2319 return ERR_PTR(-ENOMEM);
2320
2321 ret = -ENOMEM;
2322 snap->name = kstrdup(snap_name, GFP_KERNEL);
2323 if (!snap->name)
2324 goto err;
2325
2326 snap->id = snap_id;
2327 snap->size = snap_size;
2328 snap->features = snap_features;
2329
2330 return snap;
2331
2332 err:
2333 kfree(snap->name);
2334 kfree(snap);
2335
2336 return ERR_PTR(ret);
2337 }
2338
2339 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
2340 u64 *snap_size, u64 *snap_features)
2341 {
2342 char *snap_name;
2343
2344 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
2345
2346 *snap_size = rbd_dev->header.snap_sizes[which];
2347 *snap_features = 0; /* No features for v1 */
2348
2349 /* Skip over names until we find the one we are looking for */
2350
2351 snap_name = rbd_dev->header.snap_names;
2352 while (which--)
2353 snap_name += strlen(snap_name) + 1;
2354
2355 return snap_name;
2356 }
2357
2358 /*
2359 * Get the size and object order for an image snapshot, or if
2360 * snap_id is CEPH_NOSNAP, gets this information for the base
2361 * image.
2362 */
2363 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
2364 u8 *order, u64 *snap_size)
2365 {
2366 __le64 snapid = cpu_to_le64(snap_id);
2367 int ret;
2368 struct {
2369 u8 order;
2370 __le64 size;
2371 } __attribute__ ((packed)) size_buf = { 0 };
2372
2373 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2374 "rbd", "get_size",
2375 (char *) &snapid, sizeof (snapid),
2376 (char *) &size_buf, sizeof (size_buf),
2377 CEPH_OSD_FLAG_READ, NULL);
2378 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2379 if (ret < 0)
2380 return ret;
2381
2382 *order = size_buf.order;
2383 *snap_size = le64_to_cpu(size_buf.size);
2384
2385 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
2386 (unsigned long long) snap_id, (unsigned int) *order,
2387 (unsigned long long) *snap_size);
2388
2389 return 0;
2390 }
2391
2392 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
2393 {
2394 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
2395 &rbd_dev->header.obj_order,
2396 &rbd_dev->header.image_size);
2397 }
2398
2399 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
2400 {
2401 void *reply_buf;
2402 int ret;
2403 void *p;
2404
2405 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
2406 if (!reply_buf)
2407 return -ENOMEM;
2408
2409 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2410 "rbd", "get_object_prefix",
2411 NULL, 0,
2412 reply_buf, RBD_OBJ_PREFIX_LEN_MAX,
2413 CEPH_OSD_FLAG_READ, NULL);
2414 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2415 if (ret < 0)
2416 goto out;
2417 ret = 0; /* rbd_req_sync_exec() can return positive */
2418
2419 p = reply_buf;
2420 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
2421 p + RBD_OBJ_PREFIX_LEN_MAX,
2422 NULL, GFP_NOIO);
2423
2424 if (IS_ERR(rbd_dev->header.object_prefix)) {
2425 ret = PTR_ERR(rbd_dev->header.object_prefix);
2426 rbd_dev->header.object_prefix = NULL;
2427 } else {
2428 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
2429 }
2430
2431 out:
2432 kfree(reply_buf);
2433
2434 return ret;
2435 }
2436
2437 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
2438 u64 *snap_features)
2439 {
2440 __le64 snapid = cpu_to_le64(snap_id);
2441 struct {
2442 __le64 features;
2443 __le64 incompat;
2444 } features_buf = { 0 };
2445 u64 incompat;
2446 int ret;
2447
2448 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2449 "rbd", "get_features",
2450 (char *) &snapid, sizeof (snapid),
2451 (char *) &features_buf, sizeof (features_buf),
2452 CEPH_OSD_FLAG_READ, NULL);
2453 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2454 if (ret < 0)
2455 return ret;
2456
2457 incompat = le64_to_cpu(features_buf.incompat);
2458 if (incompat & ~RBD_FEATURES_ALL)
2459 return -ENOTSUPP;
2460
2461 *snap_features = le64_to_cpu(features_buf.features);
2462
2463 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
2464 (unsigned long long) snap_id,
2465 (unsigned long long) *snap_features,
2466 (unsigned long long) le64_to_cpu(features_buf.incompat));
2467
2468 return 0;
2469 }
2470
2471 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
2472 {
2473 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
2474 &rbd_dev->header.features);
2475 }
2476
2477 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
2478 {
2479 struct rbd_spec *parent_spec;
2480 size_t size;
2481 void *reply_buf = NULL;
2482 __le64 snapid;
2483 void *p;
2484 void *end;
2485 char *image_id;
2486 u64 overlap;
2487 size_t len = 0;
2488 int ret;
2489
2490 parent_spec = rbd_spec_alloc();
2491 if (!parent_spec)
2492 return -ENOMEM;
2493
2494 size = sizeof (__le64) + /* pool_id */
2495 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
2496 sizeof (__le64) + /* snap_id */
2497 sizeof (__le64); /* overlap */
2498 reply_buf = kmalloc(size, GFP_KERNEL);
2499 if (!reply_buf) {
2500 ret = -ENOMEM;
2501 goto out_err;
2502 }
2503
2504 snapid = cpu_to_le64(CEPH_NOSNAP);
2505 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2506 "rbd", "get_parent",
2507 (char *) &snapid, sizeof (snapid),
2508 (char *) reply_buf, size,
2509 CEPH_OSD_FLAG_READ, NULL);
2510 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2511 if (ret < 0)
2512 goto out_err;
2513
2514 ret = -ERANGE;
2515 p = reply_buf;
2516 end = (char *) reply_buf + size;
2517 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
2518 if (parent_spec->pool_id == CEPH_NOPOOL)
2519 goto out; /* No parent? No problem. */
2520
2521 image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2522 if (IS_ERR(image_id)) {
2523 ret = PTR_ERR(image_id);
2524 goto out_err;
2525 }
2526 parent_spec->image_id = image_id;
2527 parent_spec->image_id_len = len;
2528 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
2529 ceph_decode_64_safe(&p, end, overlap, out_err);
2530
2531 rbd_dev->parent_overlap = overlap;
2532 rbd_dev->parent_spec = parent_spec;
2533 parent_spec = NULL; /* rbd_dev now owns this */
2534 out:
2535 ret = 0;
2536 out_err:
2537 kfree(reply_buf);
2538 rbd_spec_put(parent_spec);
2539
2540 return ret;
2541 }
2542
2543 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
2544 {
2545 size_t image_id_size;
2546 char *image_id;
2547 void *p;
2548 void *end;
2549 size_t size;
2550 void *reply_buf = NULL;
2551 size_t len = 0;
2552 char *image_name = NULL;
2553 int ret;
2554
2555 rbd_assert(!rbd_dev->spec->image_name);
2556
2557 image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len;
2558 image_id = kmalloc(image_id_size, GFP_KERNEL);
2559 if (!image_id)
2560 return NULL;
2561
2562 p = image_id;
2563 end = (char *) image_id + image_id_size;
2564 ceph_encode_string(&p, end, rbd_dev->spec->image_id,
2565 (u32) rbd_dev->spec->image_id_len);
2566
2567 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
2568 reply_buf = kmalloc(size, GFP_KERNEL);
2569 if (!reply_buf)
2570 goto out;
2571
2572 ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
2573 "rbd", "dir_get_name",
2574 image_id, image_id_size,
2575 (char *) reply_buf, size,
2576 CEPH_OSD_FLAG_READ, NULL);
2577 if (ret < 0)
2578 goto out;
2579 p = reply_buf;
2580 end = (char *) reply_buf + size;
2581 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
2582 if (IS_ERR(image_name))
2583 image_name = NULL;
2584 else
2585 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
2586 out:
2587 kfree(reply_buf);
2588 kfree(image_id);
2589
2590 return image_name;
2591 }
2592
2593 /*
2594 * When a parent image gets probed, we only have the pool, image,
2595 * and snapshot ids but not the names of any of them. This call
2596 * is made later to fill in those names. It has to be done after
2597 * rbd_dev_snaps_update() has completed because some of the
2598 * information (in particular, snapshot name) is not available
2599 * until then.
2600 */
2601 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
2602 {
2603 struct ceph_osd_client *osdc;
2604 const char *name;
2605 void *reply_buf = NULL;
2606 int ret;
2607
2608 if (rbd_dev->spec->pool_name)
2609 return 0; /* Already have the names */
2610
2611 /* Look up the pool name */
2612
2613 osdc = &rbd_dev->rbd_client->client->osdc;
2614 name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
2615 if (!name)
2616 return -EIO; /* pool id too large (>= 2^31) */
2617
2618 rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
2619 if (!rbd_dev->spec->pool_name)
2620 return -ENOMEM;
2621
2622 /* Fetch the image name; tolerate failure here */
2623
2624 name = rbd_dev_image_name(rbd_dev);
2625 if (name) {
2626 rbd_dev->spec->image_name_len = strlen(name);
2627 rbd_dev->spec->image_name = (char *) name;
2628 } else {
2629 pr_warning(RBD_DRV_NAME "%d "
2630 "unable to get image name for image id %s\n",
2631 rbd_dev->major, rbd_dev->spec->image_id);
2632 }
2633
2634 /* Look up the snapshot name. */
2635
2636 name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
2637 if (!name) {
2638 ret = -EIO;
2639 goto out_err;
2640 }
2641 rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
2642 if(!rbd_dev->spec->snap_name)
2643 goto out_err;
2644
2645 return 0;
2646 out_err:
2647 kfree(reply_buf);
2648 kfree(rbd_dev->spec->pool_name);
2649 rbd_dev->spec->pool_name = NULL;
2650
2651 return ret;
2652 }
2653
2654 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
2655 {
2656 size_t size;
2657 int ret;
2658 void *reply_buf;
2659 void *p;
2660 void *end;
2661 u64 seq;
2662 u32 snap_count;
2663 struct ceph_snap_context *snapc;
2664 u32 i;
2665
2666 /*
2667 * We'll need room for the seq value (maximum snapshot id),
2668 * snapshot count, and array of that many snapshot ids.
2669 * For now we have a fixed upper limit on the number we're
2670 * prepared to receive.
2671 */
2672 size = sizeof (__le64) + sizeof (__le32) +
2673 RBD_MAX_SNAP_COUNT * sizeof (__le64);
2674 reply_buf = kzalloc(size, GFP_KERNEL);
2675 if (!reply_buf)
2676 return -ENOMEM;
2677
2678 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2679 "rbd", "get_snapcontext",
2680 NULL, 0,
2681 reply_buf, size,
2682 CEPH_OSD_FLAG_READ, ver);
2683 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2684 if (ret < 0)
2685 goto out;
2686
2687 ret = -ERANGE;
2688 p = reply_buf;
2689 end = (char *) reply_buf + size;
2690 ceph_decode_64_safe(&p, end, seq, out);
2691 ceph_decode_32_safe(&p, end, snap_count, out);
2692
2693 /*
2694 * Make sure the reported number of snapshot ids wouldn't go
2695 * beyond the end of our buffer. But before checking that,
2696 * make sure the computed size of the snapshot context we
2697 * allocate is representable in a size_t.
2698 */
2699 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
2700 / sizeof (u64)) {
2701 ret = -EINVAL;
2702 goto out;
2703 }
2704 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
2705 goto out;
2706
2707 size = sizeof (struct ceph_snap_context) +
2708 snap_count * sizeof (snapc->snaps[0]);
2709 snapc = kmalloc(size, GFP_KERNEL);
2710 if (!snapc) {
2711 ret = -ENOMEM;
2712 goto out;
2713 }
2714
2715 atomic_set(&snapc->nref, 1);
2716 snapc->seq = seq;
2717 snapc->num_snaps = snap_count;
2718 for (i = 0; i < snap_count; i++)
2719 snapc->snaps[i] = ceph_decode_64(&p);
2720
2721 rbd_dev->header.snapc = snapc;
2722
2723 dout(" snap context seq = %llu, snap_count = %u\n",
2724 (unsigned long long) seq, (unsigned int) snap_count);
2725
2726 out:
2727 kfree(reply_buf);
2728
2729 return 0;
2730 }
2731
2732 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
2733 {
2734 size_t size;
2735 void *reply_buf;
2736 __le64 snap_id;
2737 int ret;
2738 void *p;
2739 void *end;
2740 char *snap_name;
2741
2742 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
2743 reply_buf = kmalloc(size, GFP_KERNEL);
2744 if (!reply_buf)
2745 return ERR_PTR(-ENOMEM);
2746
2747 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
2748 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
2749 "rbd", "get_snapshot_name",
2750 (char *) &snap_id, sizeof (snap_id),
2751 reply_buf, size,
2752 CEPH_OSD_FLAG_READ, NULL);
2753 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
2754 if (ret < 0)
2755 goto out;
2756
2757 p = reply_buf;
2758 end = (char *) reply_buf + size;
2759 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
2760 if (IS_ERR(snap_name)) {
2761 ret = PTR_ERR(snap_name);
2762 goto out;
2763 } else {
2764 dout(" snap_id 0x%016llx snap_name = %s\n",
2765 (unsigned long long) le64_to_cpu(snap_id), snap_name);
2766 }
2767 kfree(reply_buf);
2768
2769 return snap_name;
2770 out:
2771 kfree(reply_buf);
2772
2773 return ERR_PTR(ret);
2774 }
2775
2776 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
2777 u64 *snap_size, u64 *snap_features)
2778 {
2779 __le64 snap_id;
2780 u8 order;
2781 int ret;
2782
2783 snap_id = rbd_dev->header.snapc->snaps[which];
2784 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
2785 if (ret)
2786 return ERR_PTR(ret);
2787 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
2788 if (ret)
2789 return ERR_PTR(ret);
2790
2791 return rbd_dev_v2_snap_name(rbd_dev, which);
2792 }
2793
2794 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
2795 u64 *snap_size, u64 *snap_features)
2796 {
2797 if (rbd_dev->image_format == 1)
2798 return rbd_dev_v1_snap_info(rbd_dev, which,
2799 snap_size, snap_features);
2800 if (rbd_dev->image_format == 2)
2801 return rbd_dev_v2_snap_info(rbd_dev, which,
2802 snap_size, snap_features);
2803 return ERR_PTR(-EINVAL);
2804 }
2805
2806 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
2807 {
2808 int ret;
2809 __u8 obj_order;
2810
2811 down_write(&rbd_dev->header_rwsem);
2812
2813 /* Grab old order first, to see if it changes */
2814
2815 obj_order = rbd_dev->header.obj_order,
2816 ret = rbd_dev_v2_image_size(rbd_dev);
2817 if (ret)
2818 goto out;
2819 if (rbd_dev->header.obj_order != obj_order) {
2820 ret = -EIO;
2821 goto out;
2822 }
2823 rbd_update_mapping_size(rbd_dev);
2824
2825 ret = rbd_dev_v2_snap_context(rbd_dev, hver);
2826 dout("rbd_dev_v2_snap_context returned %d\n", ret);
2827 if (ret)
2828 goto out;
2829 ret = rbd_dev_snaps_update(rbd_dev);
2830 dout("rbd_dev_snaps_update returned %d\n", ret);
2831 if (ret)
2832 goto out;
2833 ret = rbd_dev_snaps_register(rbd_dev);
2834 dout("rbd_dev_snaps_register returned %d\n", ret);
2835 out:
2836 up_write(&rbd_dev->header_rwsem);
2837
2838 return ret;
2839 }
2840
2841 /*
2842 * Scan the rbd device's current snapshot list and compare it to the
2843 * newly-received snapshot context. Remove any existing snapshots
2844 * not present in the new snapshot context. Add a new snapshot for
2845 * any snaphots in the snapshot context not in the current list.
2846 * And verify there are no changes to snapshots we already know
2847 * about.
2848 *
2849 * Assumes the snapshots in the snapshot context are sorted by
2850 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2851 * are also maintained in that order.)
2852 */
2853 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
2854 {
2855 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2856 const u32 snap_count = snapc->num_snaps;
2857 struct list_head *head = &rbd_dev->snaps;
2858 struct list_head *links = head->next;
2859 u32 index = 0;
2860
2861 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2862 while (index < snap_count || links != head) {
2863 u64 snap_id;
2864 struct rbd_snap *snap;
2865 char *snap_name;
2866 u64 snap_size = 0;
2867 u64 snap_features = 0;
2868
2869 snap_id = index < snap_count ? snapc->snaps[index]
2870 : CEPH_NOSNAP;
2871 snap = links != head ? list_entry(links, struct rbd_snap, node)
2872 : NULL;
2873 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2874
2875 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2876 struct list_head *next = links->next;
2877
2878 /* Existing snapshot not in the new snap context */
2879
2880 if (rbd_dev->spec->snap_id == snap->id)
2881 rbd_dev->exists = false;
2882 rbd_remove_snap_dev(snap);
2883 dout("%ssnap id %llu has been removed\n",
2884 rbd_dev->spec->snap_id == snap->id ?
2885 "mapped " : "",
2886 (unsigned long long) snap->id);
2887
2888 /* Done with this list entry; advance */
2889
2890 links = next;
2891 continue;
2892 }
2893
2894 snap_name = rbd_dev_snap_info(rbd_dev, index,
2895 &snap_size, &snap_features);
2896 if (IS_ERR(snap_name))
2897 return PTR_ERR(snap_name);
2898
2899 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2900 (unsigned long long) snap_id);
2901 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2902 struct rbd_snap *new_snap;
2903
2904 /* We haven't seen this snapshot before */
2905
2906 new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
2907 snap_id, snap_size, snap_features);
2908 if (IS_ERR(new_snap)) {
2909 int err = PTR_ERR(new_snap);
2910
2911 dout(" failed to add dev, error %d\n", err);
2912
2913 return err;
2914 }
2915
2916 /* New goes before existing, or at end of list */
2917
2918 dout(" added dev%s\n", snap ? "" : " at end\n");
2919 if (snap)
2920 list_add_tail(&new_snap->node, &snap->node);
2921 else
2922 list_add_tail(&new_snap->node, head);
2923 } else {
2924 /* Already have this one */
2925
2926 dout(" already present\n");
2927
2928 rbd_assert(snap->size == snap_size);
2929 rbd_assert(!strcmp(snap->name, snap_name));
2930 rbd_assert(snap->features == snap_features);
2931
2932 /* Done with this list entry; advance */
2933
2934 links = links->next;
2935 }
2936
2937 /* Advance to the next entry in the snapshot context */
2938
2939 index++;
2940 }
2941 dout("%s: done\n", __func__);
2942
2943 return 0;
2944 }
2945
2946 /*
2947 * Scan the list of snapshots and register the devices for any that
2948 * have not already been registered.
2949 */
2950 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
2951 {
2952 struct rbd_snap *snap;
2953 int ret = 0;
2954
2955 dout("%s called\n", __func__);
2956 if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
2957 return -EIO;
2958
2959 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2960 if (!rbd_snap_registered(snap)) {
2961 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2962 if (ret < 0)
2963 break;
2964 }
2965 }
2966 dout("%s: returning %d\n", __func__, ret);
2967
2968 return ret;
2969 }
2970
2971 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2972 {
2973 struct device *dev;
2974 int ret;
2975
2976 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2977
2978 dev = &rbd_dev->dev;
2979 dev->bus = &rbd_bus_type;
2980 dev->type = &rbd_device_type;
2981 dev->parent = &rbd_root_dev;
2982 dev->release = rbd_dev_release;
2983 dev_set_name(dev, "%d", rbd_dev->dev_id);
2984 ret = device_register(dev);
2985
2986 mutex_unlock(&ctl_mutex);
2987
2988 return ret;
2989 }
2990
2991 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2992 {
2993 device_unregister(&rbd_dev->dev);
2994 }
2995
2996 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2997 {
2998 int ret, rc;
2999
3000 do {
3001 ret = rbd_req_sync_watch(rbd_dev);
3002 if (ret == -ERANGE) {
3003 rc = rbd_dev_refresh(rbd_dev, NULL);
3004 if (rc < 0)
3005 return rc;
3006 }
3007 } while (ret == -ERANGE);
3008
3009 return ret;
3010 }
3011
3012 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
3013
3014 /*
3015 * Get a unique rbd identifier for the given new rbd_dev, and add
3016 * the rbd_dev to the global list. The minimum rbd id is 1.
3017 */
3018 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
3019 {
3020 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
3021
3022 spin_lock(&rbd_dev_list_lock);
3023 list_add_tail(&rbd_dev->node, &rbd_dev_list);
3024 spin_unlock(&rbd_dev_list_lock);
3025 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
3026 (unsigned long long) rbd_dev->dev_id);
3027 }
3028
3029 /*
3030 * Remove an rbd_dev from the global list, and record that its
3031 * identifier is no longer in use.
3032 */
3033 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
3034 {
3035 struct list_head *tmp;
3036 int rbd_id = rbd_dev->dev_id;
3037 int max_id;
3038
3039 rbd_assert(rbd_id > 0);
3040
3041 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
3042 (unsigned long long) rbd_dev->dev_id);
3043 spin_lock(&rbd_dev_list_lock);
3044 list_del_init(&rbd_dev->node);
3045
3046 /*
3047 * If the id being "put" is not the current maximum, there
3048 * is nothing special we need to do.
3049 */
3050 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
3051 spin_unlock(&rbd_dev_list_lock);
3052 return;
3053 }
3054
3055 /*
3056 * We need to update the current maximum id. Search the
3057 * list to find out what it is. We're more likely to find
3058 * the maximum at the end, so search the list backward.
3059 */
3060 max_id = 0;
3061 list_for_each_prev(tmp, &rbd_dev_list) {
3062 struct rbd_device *rbd_dev;
3063
3064 rbd_dev = list_entry(tmp, struct rbd_device, node);
3065 if (rbd_dev->dev_id > max_id)
3066 max_id = rbd_dev->dev_id;
3067 }
3068 spin_unlock(&rbd_dev_list_lock);
3069
3070 /*
3071 * The max id could have been updated by rbd_dev_id_get(), in
3072 * which case it now accurately reflects the new maximum.
3073 * Be careful not to overwrite the maximum value in that
3074 * case.
3075 */
3076 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
3077 dout(" max dev id has been reset\n");
3078 }
3079
3080 /*
3081 * Skips over white space at *buf, and updates *buf to point to the
3082 * first found non-space character (if any). Returns the length of
3083 * the token (string of non-white space characters) found. Note
3084 * that *buf must be terminated with '\0'.
3085 */
3086 static inline size_t next_token(const char **buf)
3087 {
3088 /*
3089 * These are the characters that produce nonzero for
3090 * isspace() in the "C" and "POSIX" locales.
3091 */
3092 const char *spaces = " \f\n\r\t\v";
3093
3094 *buf += strspn(*buf, spaces); /* Find start of token */
3095
3096 return strcspn(*buf, spaces); /* Return token length */
3097 }
3098
3099 /*
3100 * Finds the next token in *buf, and if the provided token buffer is
3101 * big enough, copies the found token into it. The result, if
3102 * copied, is guaranteed to be terminated with '\0'. Note that *buf
3103 * must be terminated with '\0' on entry.
3104 *
3105 * Returns the length of the token found (not including the '\0').
3106 * Return value will be 0 if no token is found, and it will be >=
3107 * token_size if the token would not fit.
3108 *
3109 * The *buf pointer will be updated to point beyond the end of the
3110 * found token. Note that this occurs even if the token buffer is
3111 * too small to hold it.
3112 */
3113 static inline size_t copy_token(const char **buf,
3114 char *token,
3115 size_t token_size)
3116 {
3117 size_t len;
3118
3119 len = next_token(buf);
3120 if (len < token_size) {
3121 memcpy(token, *buf, len);
3122 *(token + len) = '\0';
3123 }
3124 *buf += len;
3125
3126 return len;
3127 }
3128
3129 /*
3130 * Finds the next token in *buf, dynamically allocates a buffer big
3131 * enough to hold a copy of it, and copies the token into the new
3132 * buffer. The copy is guaranteed to be terminated with '\0'. Note
3133 * that a duplicate buffer is created even for a zero-length token.
3134 *
3135 * Returns a pointer to the newly-allocated duplicate, or a null
3136 * pointer if memory for the duplicate was not available. If
3137 * the lenp argument is a non-null pointer, the length of the token
3138 * (not including the '\0') is returned in *lenp.
3139 *
3140 * If successful, the *buf pointer will be updated to point beyond
3141 * the end of the found token.
3142 *
3143 * Note: uses GFP_KERNEL for allocation.
3144 */
3145 static inline char *dup_token(const char **buf, size_t *lenp)
3146 {
3147 char *dup;
3148 size_t len;
3149
3150 len = next_token(buf);
3151 dup = kmalloc(len + 1, GFP_KERNEL);
3152 if (!dup)
3153 return NULL;
3154
3155 memcpy(dup, *buf, len);
3156 *(dup + len) = '\0';
3157 *buf += len;
3158
3159 if (lenp)
3160 *lenp = len;
3161
3162 return dup;
3163 }
3164
3165 /*
3166 * Parse the options provided for an "rbd add" (i.e., rbd image
3167 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
3168 * and the data written is passed here via a NUL-terminated buffer.
3169 * Returns 0 if successful or an error code otherwise.
3170 *
3171 * The information extracted from these options is recorded in
3172 * the other parameters which return dynamically-allocated
3173 * structures:
3174 * ceph_opts
3175 * The address of a pointer that will refer to a ceph options
3176 * structure. Caller must release the returned pointer using
3177 * ceph_destroy_options() when it is no longer needed.
3178 * rbd_opts
3179 * Address of an rbd options pointer. Fully initialized by
3180 * this function; caller must release with kfree().
3181 * spec
3182 * Address of an rbd image specification pointer. Fully
3183 * initialized by this function based on parsed options.
3184 * Caller must release with rbd_spec_put().
3185 *
3186 * The options passed take this form:
3187 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
3188 * where:
3189 * <mon_addrs>
3190 * A comma-separated list of one or more monitor addresses.
3191 * A monitor address is an ip address, optionally followed
3192 * by a port number (separated by a colon).
3193 * I.e.: ip1[:port1][,ip2[:port2]...]
3194 * <options>
3195 * A comma-separated list of ceph and/or rbd options.
3196 * <pool_name>
3197 * The name of the rados pool containing the rbd image.
3198 * <image_name>
3199 * The name of the image in that pool to map.
3200 * <snap_id>
3201 * An optional snapshot id. If provided, the mapping will
3202 * present data from the image at the time that snapshot was
3203 * created. The image head is used if no snapshot id is
3204 * provided. Snapshot mappings are always read-only.
3205 */
3206 static int rbd_add_parse_args(const char *buf,
3207 struct ceph_options **ceph_opts,
3208 struct rbd_options **opts,
3209 struct rbd_spec **rbd_spec)
3210 {
3211 size_t len;
3212 char *options;
3213 const char *mon_addrs;
3214 size_t mon_addrs_size;
3215 struct rbd_spec *spec = NULL;
3216 struct rbd_options *rbd_opts = NULL;
3217 struct ceph_options *copts;
3218 int ret;
3219
3220 /* The first four tokens are required */
3221
3222 len = next_token(&buf);
3223 if (!len)
3224 return -EINVAL; /* Missing monitor address(es) */
3225 mon_addrs = buf;
3226 mon_addrs_size = len + 1;
3227 buf += len;
3228
3229 ret = -EINVAL;
3230 options = dup_token(&buf, NULL);
3231 if (!options)
3232 return -ENOMEM;
3233 if (!*options)
3234 goto out_err; /* Missing options */
3235
3236 spec = rbd_spec_alloc();
3237 if (!spec)
3238 goto out_mem;
3239
3240 spec->pool_name = dup_token(&buf, NULL);
3241 if (!spec->pool_name)
3242 goto out_mem;
3243 if (!*spec->pool_name)
3244 goto out_err; /* Missing pool name */
3245
3246 spec->image_name = dup_token(&buf, &spec->image_name_len);
3247 if (!spec->image_name)
3248 goto out_mem;
3249 if (!*spec->image_name)
3250 goto out_err; /* Missing image name */
3251
3252 /*
3253 * Snapshot name is optional; default is to use "-"
3254 * (indicating the head/no snapshot).
3255 */
3256 len = next_token(&buf);
3257 if (!len) {
3258 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
3259 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
3260 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
3261 ret = -ENAMETOOLONG;
3262 goto out_err;
3263 }
3264 spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
3265 if (!spec->snap_name)
3266 goto out_mem;
3267 memcpy(spec->snap_name, buf, len);
3268 *(spec->snap_name + len) = '\0';
3269
3270 /* Initialize all rbd options to the defaults */
3271
3272 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
3273 if (!rbd_opts)
3274 goto out_mem;
3275
3276 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
3277
3278 copts = ceph_parse_options(options, mon_addrs,
3279 mon_addrs + mon_addrs_size - 1,
3280 parse_rbd_opts_token, rbd_opts);
3281 if (IS_ERR(copts)) {
3282 ret = PTR_ERR(copts);
3283 goto out_err;
3284 }
3285 kfree(options);
3286
3287 *ceph_opts = copts;
3288 *opts = rbd_opts;
3289 *rbd_spec = spec;
3290
3291 return 0;
3292 out_mem:
3293 ret = -ENOMEM;
3294 out_err:
3295 kfree(rbd_opts);
3296 rbd_spec_put(spec);
3297 kfree(options);
3298
3299 return ret;
3300 }
3301
3302 /*
3303 * An rbd format 2 image has a unique identifier, distinct from the
3304 * name given to it by the user. Internally, that identifier is
3305 * what's used to specify the names of objects related to the image.
3306 *
3307 * A special "rbd id" object is used to map an rbd image name to its
3308 * id. If that object doesn't exist, then there is no v2 rbd image
3309 * with the supplied name.
3310 *
3311 * This function will record the given rbd_dev's image_id field if
3312 * it can be determined, and in that case will return 0. If any
3313 * errors occur a negative errno will be returned and the rbd_dev's
3314 * image_id field will be unchanged (and should be NULL).
3315 */
3316 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
3317 {
3318 int ret;
3319 size_t size;
3320 char *object_name;
3321 void *response;
3322 void *p;
3323
3324 /*
3325 * When probing a parent image, the image id is already
3326 * known (and the image name likely is not). There's no
3327 * need to fetch the image id again in this case.
3328 */
3329 if (rbd_dev->spec->image_id)
3330 return 0;
3331
3332 /*
3333 * First, see if the format 2 image id file exists, and if
3334 * so, get the image's persistent id from it.
3335 */
3336 size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
3337 object_name = kmalloc(size, GFP_NOIO);
3338 if (!object_name)
3339 return -ENOMEM;
3340 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
3341 dout("rbd id object name is %s\n", object_name);
3342
3343 /* Response will be an encoded string, which includes a length */
3344
3345 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
3346 response = kzalloc(size, GFP_NOIO);
3347 if (!response) {
3348 ret = -ENOMEM;
3349 goto out;
3350 }
3351
3352 ret = rbd_req_sync_exec(rbd_dev, object_name,
3353 "rbd", "get_id",
3354 NULL, 0,
3355 response, RBD_IMAGE_ID_LEN_MAX,
3356 CEPH_OSD_FLAG_READ, NULL);
3357 dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
3358 if (ret < 0)
3359 goto out;
3360 ret = 0; /* rbd_req_sync_exec() can return positive */
3361
3362 p = response;
3363 rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
3364 p + RBD_IMAGE_ID_LEN_MAX,
3365 &rbd_dev->spec->image_id_len,
3366 GFP_NOIO);
3367 if (IS_ERR(rbd_dev->spec->image_id)) {
3368 ret = PTR_ERR(rbd_dev->spec->image_id);
3369 rbd_dev->spec->image_id = NULL;
3370 } else {
3371 dout("image_id is %s\n", rbd_dev->spec->image_id);
3372 }
3373 out:
3374 kfree(response);
3375 kfree(object_name);
3376
3377 return ret;
3378 }
3379
3380 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
3381 {
3382 int ret;
3383 size_t size;
3384
3385 /* Version 1 images have no id; empty string is used */
3386
3387 rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
3388 if (!rbd_dev->spec->image_id)
3389 return -ENOMEM;
3390 rbd_dev->spec->image_id_len = 0;
3391
3392 /* Record the header object name for this rbd image. */
3393
3394 size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
3395 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3396 if (!rbd_dev->header_name) {
3397 ret = -ENOMEM;
3398 goto out_err;
3399 }
3400 sprintf(rbd_dev->header_name, "%s%s",
3401 rbd_dev->spec->image_name, RBD_SUFFIX);
3402
3403 /* Populate rbd image metadata */
3404
3405 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
3406 if (ret < 0)
3407 goto out_err;
3408
3409 /* Version 1 images have no parent (no layering) */
3410
3411 rbd_dev->parent_spec = NULL;
3412 rbd_dev->parent_overlap = 0;
3413
3414 rbd_dev->image_format = 1;
3415
3416 dout("discovered version 1 image, header name is %s\n",
3417 rbd_dev->header_name);
3418
3419 return 0;
3420
3421 out_err:
3422 kfree(rbd_dev->header_name);
3423 rbd_dev->header_name = NULL;
3424 kfree(rbd_dev->spec->image_id);
3425 rbd_dev->spec->image_id = NULL;
3426
3427 return ret;
3428 }
3429
3430 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
3431 {
3432 size_t size;
3433 int ret;
3434 u64 ver = 0;
3435
3436 /*
3437 * Image id was filled in by the caller. Record the header
3438 * object name for this rbd image.
3439 */
3440 size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
3441 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
3442 if (!rbd_dev->header_name)
3443 return -ENOMEM;
3444 sprintf(rbd_dev->header_name, "%s%s",
3445 RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
3446
3447 /* Get the size and object order for the image */
3448
3449 ret = rbd_dev_v2_image_size(rbd_dev);
3450 if (ret < 0)
3451 goto out_err;
3452
3453 /* Get the object prefix (a.k.a. block_name) for the image */
3454
3455 ret = rbd_dev_v2_object_prefix(rbd_dev);
3456 if (ret < 0)
3457 goto out_err;
3458
3459 /* Get the and check features for the image */
3460
3461 ret = rbd_dev_v2_features(rbd_dev);
3462 if (ret < 0)
3463 goto out_err;
3464
3465 /* If the image supports layering, get the parent info */
3466
3467 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
3468 ret = rbd_dev_v2_parent_info(rbd_dev);
3469 if (ret < 0)
3470 goto out_err;
3471 }
3472
3473 /* crypto and compression type aren't (yet) supported for v2 images */
3474
3475 rbd_dev->header.crypt_type = 0;
3476 rbd_dev->header.comp_type = 0;
3477
3478 /* Get the snapshot context, plus the header version */
3479
3480 ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
3481 if (ret)
3482 goto out_err;
3483 rbd_dev->header.obj_version = ver;
3484
3485 rbd_dev->image_format = 2;
3486
3487 dout("discovered version 2 image, header name is %s\n",
3488 rbd_dev->header_name);
3489
3490 return 0;
3491 out_err:
3492 rbd_dev->parent_overlap = 0;
3493 rbd_spec_put(rbd_dev->parent_spec);
3494 rbd_dev->parent_spec = NULL;
3495 kfree(rbd_dev->header_name);
3496 rbd_dev->header_name = NULL;
3497 kfree(rbd_dev->header.object_prefix);
3498 rbd_dev->header.object_prefix = NULL;
3499
3500 return ret;
3501 }
3502
3503 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
3504 {
3505 int ret;
3506
3507 /* no need to lock here, as rbd_dev is not registered yet */
3508 ret = rbd_dev_snaps_update(rbd_dev);
3509 if (ret)
3510 return ret;
3511
3512 ret = rbd_dev_probe_update_spec(rbd_dev);
3513 if (ret)
3514 goto err_out_snaps;
3515
3516 ret = rbd_dev_set_mapping(rbd_dev);
3517 if (ret)
3518 goto err_out_snaps;
3519
3520 /* generate unique id: find highest unique id, add one */
3521 rbd_dev_id_get(rbd_dev);
3522
3523 /* Fill in the device name, now that we have its id. */
3524 BUILD_BUG_ON(DEV_NAME_LEN
3525 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
3526 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
3527
3528 /* Get our block major device number. */
3529
3530 ret = register_blkdev(0, rbd_dev->name);
3531 if (ret < 0)
3532 goto err_out_id;
3533 rbd_dev->major = ret;
3534
3535 /* Set up the blkdev mapping. */
3536
3537 ret = rbd_init_disk(rbd_dev);
3538 if (ret)
3539 goto err_out_blkdev;
3540
3541 ret = rbd_bus_add_dev(rbd_dev);
3542 if (ret)
3543 goto err_out_disk;
3544
3545 /*
3546 * At this point cleanup in the event of an error is the job
3547 * of the sysfs code (initiated by rbd_bus_del_dev()).
3548 */
3549 down_write(&rbd_dev->header_rwsem);
3550 ret = rbd_dev_snaps_register(rbd_dev);
3551 up_write(&rbd_dev->header_rwsem);
3552 if (ret)
3553 goto err_out_bus;
3554
3555 ret = rbd_init_watch_dev(rbd_dev);
3556 if (ret)
3557 goto err_out_bus;
3558
3559 /* Everything's ready. Announce the disk to the world. */
3560
3561 add_disk(rbd_dev->disk);
3562
3563 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
3564 (unsigned long long) rbd_dev->mapping.size);
3565
3566 return ret;
3567 err_out_bus:
3568 /* this will also clean up rest of rbd_dev stuff */
3569
3570 rbd_bus_del_dev(rbd_dev);
3571
3572 return ret;
3573 err_out_disk:
3574 rbd_free_disk(rbd_dev);
3575 err_out_blkdev:
3576 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3577 err_out_id:
3578 rbd_dev_id_put(rbd_dev);
3579 err_out_snaps:
3580 rbd_remove_all_snaps(rbd_dev);
3581
3582 return ret;
3583 }
3584
3585 /*
3586 * Probe for the existence of the header object for the given rbd
3587 * device. For format 2 images this includes determining the image
3588 * id.
3589 */
3590 static int rbd_dev_probe(struct rbd_device *rbd_dev)
3591 {
3592 int ret;
3593
3594 /*
3595 * Get the id from the image id object. If it's not a
3596 * format 2 image, we'll get ENOENT back, and we'll assume
3597 * it's a format 1 image.
3598 */
3599 ret = rbd_dev_image_id(rbd_dev);
3600 if (ret)
3601 ret = rbd_dev_v1_probe(rbd_dev);
3602 else
3603 ret = rbd_dev_v2_probe(rbd_dev);
3604 if (ret) {
3605 dout("probe failed, returning %d\n", ret);
3606
3607 return ret;
3608 }
3609
3610 ret = rbd_dev_probe_finish(rbd_dev);
3611 if (ret)
3612 rbd_header_free(&rbd_dev->header);
3613
3614 return ret;
3615 }
3616
3617 static ssize_t rbd_add(struct bus_type *bus,
3618 const char *buf,
3619 size_t count)
3620 {
3621 struct rbd_device *rbd_dev = NULL;
3622 struct ceph_options *ceph_opts = NULL;
3623 struct rbd_options *rbd_opts = NULL;
3624 struct rbd_spec *spec = NULL;
3625 struct rbd_client *rbdc;
3626 struct ceph_osd_client *osdc;
3627 int rc = -ENOMEM;
3628
3629 if (!try_module_get(THIS_MODULE))
3630 return -ENODEV;
3631
3632 /* parse add command */
3633 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
3634 if (rc < 0)
3635 goto err_out_module;
3636
3637 rbdc = rbd_get_client(ceph_opts);
3638 if (IS_ERR(rbdc)) {
3639 rc = PTR_ERR(rbdc);
3640 goto err_out_args;
3641 }
3642 ceph_opts = NULL; /* rbd_dev client now owns this */
3643
3644 /* pick the pool */
3645 osdc = &rbdc->client->osdc;
3646 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
3647 if (rc < 0)
3648 goto err_out_client;
3649 spec->pool_id = (u64) rc;
3650
3651 rbd_dev = rbd_dev_create(rbdc, spec);
3652 if (!rbd_dev)
3653 goto err_out_client;
3654 rbdc = NULL; /* rbd_dev now owns this */
3655 spec = NULL; /* rbd_dev now owns this */
3656
3657 rbd_dev->mapping.read_only = rbd_opts->read_only;
3658 kfree(rbd_opts);
3659 rbd_opts = NULL; /* done with this */
3660
3661 rc = rbd_dev_probe(rbd_dev);
3662 if (rc < 0)
3663 goto err_out_rbd_dev;
3664
3665 return count;
3666 err_out_rbd_dev:
3667 rbd_dev_destroy(rbd_dev);
3668 err_out_client:
3669 rbd_put_client(rbdc);
3670 err_out_args:
3671 if (ceph_opts)
3672 ceph_destroy_options(ceph_opts);
3673 kfree(rbd_opts);
3674 rbd_spec_put(spec);
3675 err_out_module:
3676 module_put(THIS_MODULE);
3677
3678 dout("Error adding device %s\n", buf);
3679
3680 return (ssize_t) rc;
3681 }
3682
3683 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
3684 {
3685 struct list_head *tmp;
3686 struct rbd_device *rbd_dev;
3687
3688 spin_lock(&rbd_dev_list_lock);
3689 list_for_each(tmp, &rbd_dev_list) {
3690 rbd_dev = list_entry(tmp, struct rbd_device, node);
3691 if (rbd_dev->dev_id == dev_id) {
3692 spin_unlock(&rbd_dev_list_lock);
3693 return rbd_dev;
3694 }
3695 }
3696 spin_unlock(&rbd_dev_list_lock);
3697 return NULL;
3698 }
3699
3700 static void rbd_dev_release(struct device *dev)
3701 {
3702 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3703
3704 if (rbd_dev->watch_request) {
3705 struct ceph_client *client = rbd_dev->rbd_client->client;
3706
3707 ceph_osdc_unregister_linger_request(&client->osdc,
3708 rbd_dev->watch_request);
3709 }
3710 if (rbd_dev->watch_event)
3711 rbd_req_sync_unwatch(rbd_dev);
3712
3713
3714 /* clean up and free blkdev */
3715 rbd_free_disk(rbd_dev);
3716 unregister_blkdev(rbd_dev->major, rbd_dev->name);
3717
3718 /* release allocated disk header fields */
3719 rbd_header_free(&rbd_dev->header);
3720
3721 /* done with the id, and with the rbd_dev */
3722 rbd_dev_id_put(rbd_dev);
3723 rbd_assert(rbd_dev->rbd_client != NULL);
3724 rbd_dev_destroy(rbd_dev);
3725
3726 /* release module ref */
3727 module_put(THIS_MODULE);
3728 }
3729
3730 static ssize_t rbd_remove(struct bus_type *bus,
3731 const char *buf,
3732 size_t count)
3733 {
3734 struct rbd_device *rbd_dev = NULL;
3735 int target_id, rc;
3736 unsigned long ul;
3737 int ret = count;
3738
3739 rc = strict_strtoul(buf, 10, &ul);
3740 if (rc)
3741 return rc;
3742
3743 /* convert to int; abort if we lost anything in the conversion */
3744 target_id = (int) ul;
3745 if (target_id != ul)
3746 return -EINVAL;
3747
3748 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3749
3750 rbd_dev = __rbd_get_dev(target_id);
3751 if (!rbd_dev) {
3752 ret = -ENOENT;
3753 goto done;
3754 }
3755
3756 if (rbd_dev->open_count) {
3757 ret = -EBUSY;
3758 goto done;
3759 }
3760
3761 rbd_remove_all_snaps(rbd_dev);
3762 rbd_bus_del_dev(rbd_dev);
3763
3764 done:
3765 mutex_unlock(&ctl_mutex);
3766
3767 return ret;
3768 }
3769
3770 /*
3771 * create control files in sysfs
3772 * /sys/bus/rbd/...
3773 */
3774 static int rbd_sysfs_init(void)
3775 {
3776 int ret;
3777
3778 ret = device_register(&rbd_root_dev);
3779 if (ret < 0)
3780 return ret;
3781
3782 ret = bus_register(&rbd_bus_type);
3783 if (ret < 0)
3784 device_unregister(&rbd_root_dev);
3785
3786 return ret;
3787 }
3788
3789 static void rbd_sysfs_cleanup(void)
3790 {
3791 bus_unregister(&rbd_bus_type);
3792 device_unregister(&rbd_root_dev);
3793 }
3794
3795 int __init rbd_init(void)
3796 {
3797 int rc;
3798
3799 rc = rbd_sysfs_init();
3800 if (rc)
3801 return rc;
3802 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
3803 return 0;
3804 }
3805
3806 void __exit rbd_exit(void)
3807 {
3808 rbd_sysfs_cleanup();
3809 }
3810
3811 module_init(rbd_init);
3812 module_exit(rbd_exit);
3813
3814 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
3815 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
3816 MODULE_DESCRIPTION("rados block device");
3817
3818 /* following authorship retained from original osdblk.c */
3819 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
3820
3821 MODULE_LICENSE("GPL");