]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blob - drivers/block/rbd.c
rbd: set mapping name with the rest
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
45
46 /*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN 32
65 #define RBD_MAX_OPT_LEN 1024
66
67 #define RBD_SNAP_HEAD_NAME "-"
68
69 /*
70 * An RBD device name will be "rbd#", where the "rbd" comes from
71 * RBD_DRV_NAME above, and # is a unique integer identifier.
72 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
73 * enough to hold all possible device names.
74 */
75 #define DEV_NAME_LEN 32
76 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
77
78 #define RBD_READ_ONLY_DEFAULT false
79
80 /*
81 * block device image metadata (in-memory version)
82 */
83 struct rbd_image_header {
84 /* These four fields never change for a given rbd image */
85 char *object_prefix;
86 __u8 obj_order;
87 __u8 crypt_type;
88 __u8 comp_type;
89
90 /* The remaining fields need to be updated occasionally */
91 u64 image_size;
92 struct ceph_snap_context *snapc;
93 char *snap_names;
94 u64 *snap_sizes;
95
96 u64 obj_version;
97 };
98
99 struct rbd_options {
100 bool read_only;
101 };
102
103 /*
104 * an instance of the client. multiple devices may share an rbd client.
105 */
106 struct rbd_client {
107 struct ceph_client *client;
108 struct kref kref;
109 struct list_head node;
110 };
111
112 /*
113 * a request completion status
114 */
115 struct rbd_req_status {
116 int done;
117 int rc;
118 u64 bytes;
119 };
120
121 /*
122 * a collection of requests
123 */
124 struct rbd_req_coll {
125 int total;
126 int num_done;
127 struct kref kref;
128 struct rbd_req_status status[0];
129 };
130
131 /*
132 * a single io request
133 */
134 struct rbd_request {
135 struct request *rq; /* blk layer request */
136 struct bio *bio; /* cloned bio */
137 struct page **pages; /* list of used pages */
138 u64 len;
139 int coll_index;
140 struct rbd_req_coll *coll;
141 };
142
143 struct rbd_snap {
144 struct device dev;
145 const char *name;
146 u64 size;
147 struct list_head node;
148 u64 id;
149 };
150
151 struct rbd_mapping {
152 char *snap_name;
153 u64 snap_id;
154 u64 size;
155 bool snap_exists;
156 bool read_only;
157 };
158
159 /*
160 * a single device
161 */
162 struct rbd_device {
163 int dev_id; /* blkdev unique id */
164
165 int major; /* blkdev assigned major */
166 struct gendisk *disk; /* blkdev's gendisk and rq */
167
168 struct rbd_options rbd_opts;
169 struct rbd_client *rbd_client;
170
171 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
172
173 spinlock_t lock; /* queue lock */
174
175 struct rbd_image_header header;
176 char *image_name;
177 size_t image_name_len;
178 char *header_name;
179 char *pool_name;
180 int pool_id;
181
182 struct ceph_osd_event *watch_event;
183 struct ceph_osd_request *watch_request;
184
185 /* protects updating the header */
186 struct rw_semaphore header_rwsem;
187
188 struct rbd_mapping mapping;
189
190 struct list_head node;
191
192 /* list of snapshots */
193 struct list_head snaps;
194
195 /* sysfs related */
196 struct device dev;
197 };
198
199 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
200
201 static LIST_HEAD(rbd_dev_list); /* devices */
202 static DEFINE_SPINLOCK(rbd_dev_list_lock);
203
204 static LIST_HEAD(rbd_client_list); /* clients */
205 static DEFINE_SPINLOCK(rbd_client_list_lock);
206
207 static int rbd_dev_snap_devs_update(struct rbd_device *rbd_dev);
208 static void rbd_dev_release(struct device *dev);
209 static ssize_t rbd_snap_add(struct device *dev,
210 struct device_attribute *attr,
211 const char *buf,
212 size_t count);
213 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
214
215 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
216 size_t count);
217 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
218 size_t count);
219
220 static struct bus_attribute rbd_bus_attrs[] = {
221 __ATTR(add, S_IWUSR, NULL, rbd_add),
222 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
223 __ATTR_NULL
224 };
225
226 static struct bus_type rbd_bus_type = {
227 .name = "rbd",
228 .bus_attrs = rbd_bus_attrs,
229 };
230
231 static void rbd_root_dev_release(struct device *dev)
232 {
233 }
234
235 static struct device rbd_root_dev = {
236 .init_name = "rbd",
237 .release = rbd_root_dev_release,
238 };
239
240 #ifdef RBD_DEBUG
241 #define rbd_assert(expr) \
242 if (unlikely(!(expr))) { \
243 printk(KERN_ERR "\nAssertion failure in %s() " \
244 "at line %d:\n\n" \
245 "\trbd_assert(%s);\n\n", \
246 __func__, __LINE__, #expr); \
247 BUG(); \
248 }
249 #else /* !RBD_DEBUG */
250 # define rbd_assert(expr) ((void) 0)
251 #endif /* !RBD_DEBUG */
252
253 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
254 {
255 return get_device(&rbd_dev->dev);
256 }
257
258 static void rbd_put_dev(struct rbd_device *rbd_dev)
259 {
260 put_device(&rbd_dev->dev);
261 }
262
263 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
264
265 static int rbd_open(struct block_device *bdev, fmode_t mode)
266 {
267 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
268
269 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
270 return -EROFS;
271
272 rbd_get_dev(rbd_dev);
273 set_device_ro(bdev, rbd_dev->mapping.read_only);
274
275 return 0;
276 }
277
278 static int rbd_release(struct gendisk *disk, fmode_t mode)
279 {
280 struct rbd_device *rbd_dev = disk->private_data;
281
282 rbd_put_dev(rbd_dev);
283
284 return 0;
285 }
286
287 static const struct block_device_operations rbd_bd_ops = {
288 .owner = THIS_MODULE,
289 .open = rbd_open,
290 .release = rbd_release,
291 };
292
293 /*
294 * Initialize an rbd client instance.
295 * We own *ceph_opts.
296 */
297 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
298 {
299 struct rbd_client *rbdc;
300 int ret = -ENOMEM;
301
302 dout("rbd_client_create\n");
303 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
304 if (!rbdc)
305 goto out_opt;
306
307 kref_init(&rbdc->kref);
308 INIT_LIST_HEAD(&rbdc->node);
309
310 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
311
312 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
313 if (IS_ERR(rbdc->client))
314 goto out_mutex;
315 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
316
317 ret = ceph_open_session(rbdc->client);
318 if (ret < 0)
319 goto out_err;
320
321 spin_lock(&rbd_client_list_lock);
322 list_add_tail(&rbdc->node, &rbd_client_list);
323 spin_unlock(&rbd_client_list_lock);
324
325 mutex_unlock(&ctl_mutex);
326
327 dout("rbd_client_create created %p\n", rbdc);
328 return rbdc;
329
330 out_err:
331 ceph_destroy_client(rbdc->client);
332 out_mutex:
333 mutex_unlock(&ctl_mutex);
334 kfree(rbdc);
335 out_opt:
336 if (ceph_opts)
337 ceph_destroy_options(ceph_opts);
338 return ERR_PTR(ret);
339 }
340
341 /*
342 * Find a ceph client with specific addr and configuration. If
343 * found, bump its reference count.
344 */
345 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
346 {
347 struct rbd_client *client_node;
348 bool found = false;
349
350 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
351 return NULL;
352
353 spin_lock(&rbd_client_list_lock);
354 list_for_each_entry(client_node, &rbd_client_list, node) {
355 if (!ceph_compare_options(ceph_opts, client_node->client)) {
356 kref_get(&client_node->kref);
357 found = true;
358 break;
359 }
360 }
361 spin_unlock(&rbd_client_list_lock);
362
363 return found ? client_node : NULL;
364 }
365
366 /*
367 * mount options
368 */
369 enum {
370 Opt_last_int,
371 /* int args above */
372 Opt_last_string,
373 /* string args above */
374 Opt_read_only,
375 Opt_read_write,
376 /* Boolean args above */
377 Opt_last_bool,
378 };
379
380 static match_table_t rbd_opts_tokens = {
381 /* int args above */
382 /* string args above */
383 {Opt_read_only, "mapping.read_only"},
384 {Opt_read_only, "ro"}, /* Alternate spelling */
385 {Opt_read_write, "read_write"},
386 {Opt_read_write, "rw"}, /* Alternate spelling */
387 /* Boolean args above */
388 {-1, NULL}
389 };
390
391 static int parse_rbd_opts_token(char *c, void *private)
392 {
393 struct rbd_options *rbd_opts = private;
394 substring_t argstr[MAX_OPT_ARGS];
395 int token, intval, ret;
396
397 token = match_token(c, rbd_opts_tokens, argstr);
398 if (token < 0)
399 return -EINVAL;
400
401 if (token < Opt_last_int) {
402 ret = match_int(&argstr[0], &intval);
403 if (ret < 0) {
404 pr_err("bad mount option arg (not int) "
405 "at '%s'\n", c);
406 return ret;
407 }
408 dout("got int token %d val %d\n", token, intval);
409 } else if (token > Opt_last_int && token < Opt_last_string) {
410 dout("got string token %d val %s\n", token,
411 argstr[0].from);
412 } else if (token > Opt_last_string && token < Opt_last_bool) {
413 dout("got Boolean token %d\n", token);
414 } else {
415 dout("got token %d\n", token);
416 }
417
418 switch (token) {
419 case Opt_read_only:
420 rbd_opts->read_only = true;
421 break;
422 case Opt_read_write:
423 rbd_opts->read_only = false;
424 break;
425 default:
426 rbd_assert(false);
427 break;
428 }
429 return 0;
430 }
431
432 /*
433 * Get a ceph client with specific addr and configuration, if one does
434 * not exist create it.
435 */
436 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
437 size_t mon_addr_len, char *options)
438 {
439 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
440 struct ceph_options *ceph_opts;
441 struct rbd_client *rbdc;
442
443 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
444
445 ceph_opts = ceph_parse_options(options, mon_addr,
446 mon_addr + mon_addr_len,
447 parse_rbd_opts_token, rbd_opts);
448 if (IS_ERR(ceph_opts))
449 return PTR_ERR(ceph_opts);
450
451 rbdc = rbd_client_find(ceph_opts);
452 if (rbdc) {
453 /* using an existing client */
454 ceph_destroy_options(ceph_opts);
455 } else {
456 rbdc = rbd_client_create(ceph_opts);
457 if (IS_ERR(rbdc))
458 return PTR_ERR(rbdc);
459 }
460 rbd_dev->rbd_client = rbdc;
461
462 return 0;
463 }
464
465 /*
466 * Destroy ceph client
467 *
468 * Caller must hold rbd_client_list_lock.
469 */
470 static void rbd_client_release(struct kref *kref)
471 {
472 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
473
474 dout("rbd_release_client %p\n", rbdc);
475 spin_lock(&rbd_client_list_lock);
476 list_del(&rbdc->node);
477 spin_unlock(&rbd_client_list_lock);
478
479 ceph_destroy_client(rbdc->client);
480 kfree(rbdc);
481 }
482
483 /*
484 * Drop reference to ceph client node. If it's not referenced anymore, release
485 * it.
486 */
487 static void rbd_put_client(struct rbd_device *rbd_dev)
488 {
489 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
490 rbd_dev->rbd_client = NULL;
491 }
492
493 /*
494 * Destroy requests collection
495 */
496 static void rbd_coll_release(struct kref *kref)
497 {
498 struct rbd_req_coll *coll =
499 container_of(kref, struct rbd_req_coll, kref);
500
501 dout("rbd_coll_release %p\n", coll);
502 kfree(coll);
503 }
504
505 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
506 {
507 size_t size;
508 u32 snap_count;
509
510 /* The header has to start with the magic rbd header text */
511 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
512 return false;
513
514 /*
515 * The size of a snapshot header has to fit in a size_t, and
516 * that limits the number of snapshots.
517 */
518 snap_count = le32_to_cpu(ondisk->snap_count);
519 size = SIZE_MAX - sizeof (struct ceph_snap_context);
520 if (snap_count > size / sizeof (__le64))
521 return false;
522
523 /*
524 * Not only that, but the size of the entire the snapshot
525 * header must also be representable in a size_t.
526 */
527 size -= snap_count * sizeof (__le64);
528 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
529 return false;
530
531 return true;
532 }
533
534 /*
535 * Create a new header structure, translate header format from the on-disk
536 * header.
537 */
538 static int rbd_header_from_disk(struct rbd_image_header *header,
539 struct rbd_image_header_ondisk *ondisk)
540 {
541 u32 snap_count;
542 size_t len;
543 size_t size;
544 u32 i;
545
546 memset(header, 0, sizeof (*header));
547
548 snap_count = le32_to_cpu(ondisk->snap_count);
549
550 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
551 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
552 if (!header->object_prefix)
553 return -ENOMEM;
554 memcpy(header->object_prefix, ondisk->object_prefix, len);
555 header->object_prefix[len] = '\0';
556
557 if (snap_count) {
558 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
559
560 /* Save a copy of the snapshot names */
561
562 if (snap_names_len > (u64) SIZE_MAX)
563 return -EIO;
564 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
565 if (!header->snap_names)
566 goto out_err;
567 /*
568 * Note that rbd_dev_v1_header_read() guarantees
569 * the ondisk buffer we're working with has
570 * snap_names_len bytes beyond the end of the
571 * snapshot id array, this memcpy() is safe.
572 */
573 memcpy(header->snap_names, &ondisk->snaps[snap_count],
574 snap_names_len);
575
576 /* Record each snapshot's size */
577
578 size = snap_count * sizeof (*header->snap_sizes);
579 header->snap_sizes = kmalloc(size, GFP_KERNEL);
580 if (!header->snap_sizes)
581 goto out_err;
582 for (i = 0; i < snap_count; i++)
583 header->snap_sizes[i] =
584 le64_to_cpu(ondisk->snaps[i].image_size);
585 } else {
586 WARN_ON(ondisk->snap_names_len);
587 header->snap_names = NULL;
588 header->snap_sizes = NULL;
589 }
590
591 header->obj_order = ondisk->options.order;
592 header->crypt_type = ondisk->options.crypt_type;
593 header->comp_type = ondisk->options.comp_type;
594
595 /* Allocate and fill in the snapshot context */
596
597 header->image_size = le64_to_cpu(ondisk->image_size);
598 size = sizeof (struct ceph_snap_context);
599 size += snap_count * sizeof (header->snapc->snaps[0]);
600 header->snapc = kzalloc(size, GFP_KERNEL);
601 if (!header->snapc)
602 goto out_err;
603
604 atomic_set(&header->snapc->nref, 1);
605 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
606 header->snapc->num_snaps = snap_count;
607 for (i = 0; i < snap_count; i++)
608 header->snapc->snaps[i] =
609 le64_to_cpu(ondisk->snaps[i].id);
610
611 return 0;
612
613 out_err:
614 kfree(header->snap_sizes);
615 header->snap_sizes = NULL;
616 kfree(header->snap_names);
617 header->snap_names = NULL;
618 kfree(header->object_prefix);
619 header->object_prefix = NULL;
620
621 return -ENOMEM;
622 }
623
624 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
625 u64 *seq, u64 *size)
626 {
627 int i;
628 char *p = header->snap_names;
629
630 rbd_assert(header->snapc != NULL);
631 for (i = 0; i < header->snapc->num_snaps; i++) {
632 if (!strcmp(snap_name, p)) {
633
634 /* Found it. Pass back its id and/or size */
635
636 if (seq)
637 *seq = header->snapc->snaps[i];
638 if (size)
639 *size = header->snap_sizes[i];
640 return i;
641 }
642 p += strlen(p) + 1; /* Skip ahead to the next name */
643 }
644 return -ENOENT;
645 }
646
647 static int rbd_header_set_snap(struct rbd_device *rbd_dev, char *snap_name)
648 {
649 int ret;
650
651 down_write(&rbd_dev->header_rwsem);
652
653 if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
654 sizeof (RBD_SNAP_HEAD_NAME))) {
655 rbd_dev->mapping.snap_id = CEPH_NOSNAP;
656 rbd_dev->mapping.size = rbd_dev->header.image_size;
657 rbd_dev->mapping.snap_exists = false;
658 rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
659 } else {
660 ret = snap_by_name(&rbd_dev->header, snap_name,
661 &rbd_dev->mapping.snap_id,
662 &rbd_dev->mapping.size);
663 if (ret < 0)
664 goto done;
665 rbd_dev->mapping.snap_exists = true;
666 rbd_dev->mapping.read_only = true;
667 }
668 rbd_dev->mapping.snap_name = snap_name;
669
670 ret = 0;
671 done:
672 up_write(&rbd_dev->header_rwsem);
673 return ret;
674 }
675
676 static void rbd_header_free(struct rbd_image_header *header)
677 {
678 kfree(header->object_prefix);
679 header->object_prefix = NULL;
680 kfree(header->snap_sizes);
681 header->snap_sizes = NULL;
682 kfree(header->snap_names);
683 header->snap_names = NULL;
684 ceph_put_snap_context(header->snapc);
685 header->snapc = NULL;
686 }
687
688 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
689 {
690 char *name;
691 u64 segment;
692 int ret;
693
694 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
695 if (!name)
696 return NULL;
697 segment = offset >> rbd_dev->header.obj_order;
698 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
699 rbd_dev->header.object_prefix, segment);
700 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
701 pr_err("error formatting segment name for #%llu (%d)\n",
702 segment, ret);
703 kfree(name);
704 name = NULL;
705 }
706
707 return name;
708 }
709
710 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
711 {
712 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
713
714 return offset & (segment_size - 1);
715 }
716
717 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
718 u64 offset, u64 length)
719 {
720 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
721
722 offset &= segment_size - 1;
723
724 rbd_assert(length <= U64_MAX - offset);
725 if (offset + length > segment_size)
726 length = segment_size - offset;
727
728 return length;
729 }
730
731 static int rbd_get_num_segments(struct rbd_image_header *header,
732 u64 ofs, u64 len)
733 {
734 u64 start_seg;
735 u64 end_seg;
736
737 if (!len)
738 return 0;
739 if (len - 1 > U64_MAX - ofs)
740 return -ERANGE;
741
742 start_seg = ofs >> header->obj_order;
743 end_seg = (ofs + len - 1) >> header->obj_order;
744
745 return end_seg - start_seg + 1;
746 }
747
748 /*
749 * returns the size of an object in the image
750 */
751 static u64 rbd_obj_bytes(struct rbd_image_header *header)
752 {
753 return 1 << header->obj_order;
754 }
755
756 /*
757 * bio helpers
758 */
759
760 static void bio_chain_put(struct bio *chain)
761 {
762 struct bio *tmp;
763
764 while (chain) {
765 tmp = chain;
766 chain = chain->bi_next;
767 bio_put(tmp);
768 }
769 }
770
771 /*
772 * zeros a bio chain, starting at specific offset
773 */
774 static void zero_bio_chain(struct bio *chain, int start_ofs)
775 {
776 struct bio_vec *bv;
777 unsigned long flags;
778 void *buf;
779 int i;
780 int pos = 0;
781
782 while (chain) {
783 bio_for_each_segment(bv, chain, i) {
784 if (pos + bv->bv_len > start_ofs) {
785 int remainder = max(start_ofs - pos, 0);
786 buf = bvec_kmap_irq(bv, &flags);
787 memset(buf + remainder, 0,
788 bv->bv_len - remainder);
789 bvec_kunmap_irq(buf, &flags);
790 }
791 pos += bv->bv_len;
792 }
793
794 chain = chain->bi_next;
795 }
796 }
797
798 /*
799 * bio_chain_clone - clone a chain of bios up to a certain length.
800 * might return a bio_pair that will need to be released.
801 */
802 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
803 struct bio_pair **bp,
804 int len, gfp_t gfpmask)
805 {
806 struct bio *old_chain = *old;
807 struct bio *new_chain = NULL;
808 struct bio *tail;
809 int total = 0;
810
811 if (*bp) {
812 bio_pair_release(*bp);
813 *bp = NULL;
814 }
815
816 while (old_chain && (total < len)) {
817 struct bio *tmp;
818
819 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
820 if (!tmp)
821 goto err_out;
822 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
823
824 if (total + old_chain->bi_size > len) {
825 struct bio_pair *bp;
826
827 /*
828 * this split can only happen with a single paged bio,
829 * split_bio will BUG_ON if this is not the case
830 */
831 dout("bio_chain_clone split! total=%d remaining=%d"
832 "bi_size=%u\n",
833 total, len - total, old_chain->bi_size);
834
835 /* split the bio. We'll release it either in the next
836 call, or it will have to be released outside */
837 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
838 if (!bp)
839 goto err_out;
840
841 __bio_clone(tmp, &bp->bio1);
842
843 *next = &bp->bio2;
844 } else {
845 __bio_clone(tmp, old_chain);
846 *next = old_chain->bi_next;
847 }
848
849 tmp->bi_bdev = NULL;
850 tmp->bi_next = NULL;
851 if (new_chain)
852 tail->bi_next = tmp;
853 else
854 new_chain = tmp;
855 tail = tmp;
856 old_chain = old_chain->bi_next;
857
858 total += tmp->bi_size;
859 }
860
861 rbd_assert(total == len);
862
863 *old = old_chain;
864
865 return new_chain;
866
867 err_out:
868 dout("bio_chain_clone with err\n");
869 bio_chain_put(new_chain);
870 return NULL;
871 }
872
873 /*
874 * helpers for osd request op vectors.
875 */
876 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
877 int opcode, u32 payload_len)
878 {
879 struct ceph_osd_req_op *ops;
880
881 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
882 if (!ops)
883 return NULL;
884
885 ops[0].op = opcode;
886
887 /*
888 * op extent offset and length will be set later on
889 * in calc_raw_layout()
890 */
891 ops[0].payload_len = payload_len;
892
893 return ops;
894 }
895
896 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
897 {
898 kfree(ops);
899 }
900
901 static void rbd_coll_end_req_index(struct request *rq,
902 struct rbd_req_coll *coll,
903 int index,
904 int ret, u64 len)
905 {
906 struct request_queue *q;
907 int min, max, i;
908
909 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
910 coll, index, ret, (unsigned long long) len);
911
912 if (!rq)
913 return;
914
915 if (!coll) {
916 blk_end_request(rq, ret, len);
917 return;
918 }
919
920 q = rq->q;
921
922 spin_lock_irq(q->queue_lock);
923 coll->status[index].done = 1;
924 coll->status[index].rc = ret;
925 coll->status[index].bytes = len;
926 max = min = coll->num_done;
927 while (max < coll->total && coll->status[max].done)
928 max++;
929
930 for (i = min; i<max; i++) {
931 __blk_end_request(rq, coll->status[i].rc,
932 coll->status[i].bytes);
933 coll->num_done++;
934 kref_put(&coll->kref, rbd_coll_release);
935 }
936 spin_unlock_irq(q->queue_lock);
937 }
938
939 static void rbd_coll_end_req(struct rbd_request *req,
940 int ret, u64 len)
941 {
942 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
943 }
944
945 /*
946 * Send ceph osd request
947 */
948 static int rbd_do_request(struct request *rq,
949 struct rbd_device *rbd_dev,
950 struct ceph_snap_context *snapc,
951 u64 snapid,
952 const char *object_name, u64 ofs, u64 len,
953 struct bio *bio,
954 struct page **pages,
955 int num_pages,
956 int flags,
957 struct ceph_osd_req_op *ops,
958 struct rbd_req_coll *coll,
959 int coll_index,
960 void (*rbd_cb)(struct ceph_osd_request *req,
961 struct ceph_msg *msg),
962 struct ceph_osd_request **linger_req,
963 u64 *ver)
964 {
965 struct ceph_osd_request *req;
966 struct ceph_file_layout *layout;
967 int ret;
968 u64 bno;
969 struct timespec mtime = CURRENT_TIME;
970 struct rbd_request *req_data;
971 struct ceph_osd_request_head *reqhead;
972 struct ceph_osd_client *osdc;
973
974 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
975 if (!req_data) {
976 if (coll)
977 rbd_coll_end_req_index(rq, coll, coll_index,
978 -ENOMEM, len);
979 return -ENOMEM;
980 }
981
982 if (coll) {
983 req_data->coll = coll;
984 req_data->coll_index = coll_index;
985 }
986
987 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
988 (unsigned long long) ofs, (unsigned long long) len);
989
990 osdc = &rbd_dev->rbd_client->client->osdc;
991 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
992 false, GFP_NOIO, pages, bio);
993 if (!req) {
994 ret = -ENOMEM;
995 goto done_pages;
996 }
997
998 req->r_callback = rbd_cb;
999
1000 req_data->rq = rq;
1001 req_data->bio = bio;
1002 req_data->pages = pages;
1003 req_data->len = len;
1004
1005 req->r_priv = req_data;
1006
1007 reqhead = req->r_request->front.iov_base;
1008 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1009
1010 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1011 req->r_oid_len = strlen(req->r_oid);
1012
1013 layout = &req->r_file_layout;
1014 memset(layout, 0, sizeof(*layout));
1015 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1016 layout->fl_stripe_count = cpu_to_le32(1);
1017 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1018 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1019 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1020 req, ops);
1021
1022 ceph_osdc_build_request(req, ofs, &len,
1023 ops,
1024 snapc,
1025 &mtime,
1026 req->r_oid, req->r_oid_len);
1027
1028 if (linger_req) {
1029 ceph_osdc_set_request_linger(osdc, req);
1030 *linger_req = req;
1031 }
1032
1033 ret = ceph_osdc_start_request(osdc, req, false);
1034 if (ret < 0)
1035 goto done_err;
1036
1037 if (!rbd_cb) {
1038 ret = ceph_osdc_wait_request(osdc, req);
1039 if (ver)
1040 *ver = le64_to_cpu(req->r_reassert_version.version);
1041 dout("reassert_ver=%llu\n",
1042 (unsigned long long)
1043 le64_to_cpu(req->r_reassert_version.version));
1044 ceph_osdc_put_request(req);
1045 }
1046 return ret;
1047
1048 done_err:
1049 bio_chain_put(req_data->bio);
1050 ceph_osdc_put_request(req);
1051 done_pages:
1052 rbd_coll_end_req(req_data, ret, len);
1053 kfree(req_data);
1054 return ret;
1055 }
1056
1057 /*
1058 * Ceph osd op callback
1059 */
1060 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1061 {
1062 struct rbd_request *req_data = req->r_priv;
1063 struct ceph_osd_reply_head *replyhead;
1064 struct ceph_osd_op *op;
1065 __s32 rc;
1066 u64 bytes;
1067 int read_op;
1068
1069 /* parse reply */
1070 replyhead = msg->front.iov_base;
1071 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1072 op = (void *)(replyhead + 1);
1073 rc = le32_to_cpu(replyhead->result);
1074 bytes = le64_to_cpu(op->extent.length);
1075 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1076
1077 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1078 (unsigned long long) bytes, read_op, (int) rc);
1079
1080 if (rc == -ENOENT && read_op) {
1081 zero_bio_chain(req_data->bio, 0);
1082 rc = 0;
1083 } else if (rc == 0 && read_op && bytes < req_data->len) {
1084 zero_bio_chain(req_data->bio, bytes);
1085 bytes = req_data->len;
1086 }
1087
1088 rbd_coll_end_req(req_data, rc, bytes);
1089
1090 if (req_data->bio)
1091 bio_chain_put(req_data->bio);
1092
1093 ceph_osdc_put_request(req);
1094 kfree(req_data);
1095 }
1096
1097 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1098 {
1099 ceph_osdc_put_request(req);
1100 }
1101
1102 /*
1103 * Do a synchronous ceph osd operation
1104 */
1105 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1106 struct ceph_snap_context *snapc,
1107 u64 snapid,
1108 int flags,
1109 struct ceph_osd_req_op *ops,
1110 const char *object_name,
1111 u64 ofs, u64 len,
1112 char *buf,
1113 struct ceph_osd_request **linger_req,
1114 u64 *ver)
1115 {
1116 int ret;
1117 struct page **pages;
1118 int num_pages;
1119
1120 rbd_assert(ops != NULL);
1121
1122 num_pages = calc_pages_for(ofs , len);
1123 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1124 if (IS_ERR(pages))
1125 return PTR_ERR(pages);
1126
1127 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1128 object_name, ofs, len, NULL,
1129 pages, num_pages,
1130 flags,
1131 ops,
1132 NULL, 0,
1133 NULL,
1134 linger_req, ver);
1135 if (ret < 0)
1136 goto done;
1137
1138 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1139 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1140
1141 done:
1142 ceph_release_page_vector(pages, num_pages);
1143 return ret;
1144 }
1145
1146 /*
1147 * Do an asynchronous ceph osd operation
1148 */
1149 static int rbd_do_op(struct request *rq,
1150 struct rbd_device *rbd_dev,
1151 struct ceph_snap_context *snapc,
1152 u64 snapid,
1153 int opcode, int flags,
1154 u64 ofs, u64 len,
1155 struct bio *bio,
1156 struct rbd_req_coll *coll,
1157 int coll_index)
1158 {
1159 char *seg_name;
1160 u64 seg_ofs;
1161 u64 seg_len;
1162 int ret;
1163 struct ceph_osd_req_op *ops;
1164 u32 payload_len;
1165
1166 seg_name = rbd_segment_name(rbd_dev, ofs);
1167 if (!seg_name)
1168 return -ENOMEM;
1169 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1170 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1171
1172 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1173
1174 ret = -ENOMEM;
1175 ops = rbd_create_rw_ops(1, opcode, payload_len);
1176 if (!ops)
1177 goto done;
1178
1179 /* we've taken care of segment sizes earlier when we
1180 cloned the bios. We should never have a segment
1181 truncated at this point */
1182 rbd_assert(seg_len == len);
1183
1184 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1185 seg_name, seg_ofs, seg_len,
1186 bio,
1187 NULL, 0,
1188 flags,
1189 ops,
1190 coll, coll_index,
1191 rbd_req_cb, 0, NULL);
1192
1193 rbd_destroy_ops(ops);
1194 done:
1195 kfree(seg_name);
1196 return ret;
1197 }
1198
1199 /*
1200 * Request async osd write
1201 */
1202 static int rbd_req_write(struct request *rq,
1203 struct rbd_device *rbd_dev,
1204 struct ceph_snap_context *snapc,
1205 u64 ofs, u64 len,
1206 struct bio *bio,
1207 struct rbd_req_coll *coll,
1208 int coll_index)
1209 {
1210 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1211 CEPH_OSD_OP_WRITE,
1212 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1213 ofs, len, bio, coll, coll_index);
1214 }
1215
1216 /*
1217 * Request async osd read
1218 */
1219 static int rbd_req_read(struct request *rq,
1220 struct rbd_device *rbd_dev,
1221 u64 snapid,
1222 u64 ofs, u64 len,
1223 struct bio *bio,
1224 struct rbd_req_coll *coll,
1225 int coll_index)
1226 {
1227 return rbd_do_op(rq, rbd_dev, NULL,
1228 snapid,
1229 CEPH_OSD_OP_READ,
1230 CEPH_OSD_FLAG_READ,
1231 ofs, len, bio, coll, coll_index);
1232 }
1233
1234 /*
1235 * Request sync osd read
1236 */
1237 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1238 u64 snapid,
1239 const char *object_name,
1240 u64 ofs, u64 len,
1241 char *buf,
1242 u64 *ver)
1243 {
1244 struct ceph_osd_req_op *ops;
1245 int ret;
1246
1247 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1248 if (!ops)
1249 return -ENOMEM;
1250
1251 ret = rbd_req_sync_op(rbd_dev, NULL,
1252 snapid,
1253 CEPH_OSD_FLAG_READ,
1254 ops, object_name, ofs, len, buf, NULL, ver);
1255 rbd_destroy_ops(ops);
1256
1257 return ret;
1258 }
1259
1260 /*
1261 * Request sync osd watch
1262 */
1263 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1264 u64 ver,
1265 u64 notify_id)
1266 {
1267 struct ceph_osd_req_op *ops;
1268 int ret;
1269
1270 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1271 if (!ops)
1272 return -ENOMEM;
1273
1274 ops[0].watch.ver = cpu_to_le64(ver);
1275 ops[0].watch.cookie = notify_id;
1276 ops[0].watch.flag = 0;
1277
1278 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1279 rbd_dev->header_name, 0, 0, NULL,
1280 NULL, 0,
1281 CEPH_OSD_FLAG_READ,
1282 ops,
1283 NULL, 0,
1284 rbd_simple_req_cb, 0, NULL);
1285
1286 rbd_destroy_ops(ops);
1287 return ret;
1288 }
1289
1290 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1291 {
1292 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1293 u64 hver;
1294 int rc;
1295
1296 if (!rbd_dev)
1297 return;
1298
1299 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1300 rbd_dev->header_name, (unsigned long long) notify_id,
1301 (unsigned int) opcode);
1302 rc = rbd_refresh_header(rbd_dev, &hver);
1303 if (rc)
1304 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1305 " update snaps: %d\n", rbd_dev->major, rc);
1306
1307 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1308 }
1309
1310 /*
1311 * Request sync osd watch
1312 */
1313 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1314 {
1315 struct ceph_osd_req_op *ops;
1316 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1317 int ret;
1318
1319 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1320 if (!ops)
1321 return -ENOMEM;
1322
1323 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1324 (void *)rbd_dev, &rbd_dev->watch_event);
1325 if (ret < 0)
1326 goto fail;
1327
1328 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1329 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1330 ops[0].watch.flag = 1;
1331
1332 ret = rbd_req_sync_op(rbd_dev, NULL,
1333 CEPH_NOSNAP,
1334 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1335 ops,
1336 rbd_dev->header_name,
1337 0, 0, NULL,
1338 &rbd_dev->watch_request, NULL);
1339
1340 if (ret < 0)
1341 goto fail_event;
1342
1343 rbd_destroy_ops(ops);
1344 return 0;
1345
1346 fail_event:
1347 ceph_osdc_cancel_event(rbd_dev->watch_event);
1348 rbd_dev->watch_event = NULL;
1349 fail:
1350 rbd_destroy_ops(ops);
1351 return ret;
1352 }
1353
1354 /*
1355 * Request sync osd unwatch
1356 */
1357 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1358 {
1359 struct ceph_osd_req_op *ops;
1360 int ret;
1361
1362 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1363 if (!ops)
1364 return -ENOMEM;
1365
1366 ops[0].watch.ver = 0;
1367 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1368 ops[0].watch.flag = 0;
1369
1370 ret = rbd_req_sync_op(rbd_dev, NULL,
1371 CEPH_NOSNAP,
1372 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1373 ops,
1374 rbd_dev->header_name,
1375 0, 0, NULL, NULL, NULL);
1376
1377
1378 rbd_destroy_ops(ops);
1379 ceph_osdc_cancel_event(rbd_dev->watch_event);
1380 rbd_dev->watch_event = NULL;
1381 return ret;
1382 }
1383
1384 struct rbd_notify_info {
1385 struct rbd_device *rbd_dev;
1386 };
1387
1388 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1389 {
1390 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1391 if (!rbd_dev)
1392 return;
1393
1394 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1395 rbd_dev->header_name, (unsigned long long) notify_id,
1396 (unsigned int) opcode);
1397 }
1398
1399 /*
1400 * Request sync osd notify
1401 */
1402 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1403 {
1404 struct ceph_osd_req_op *ops;
1405 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1406 struct ceph_osd_event *event;
1407 struct rbd_notify_info info;
1408 int payload_len = sizeof(u32) + sizeof(u32);
1409 int ret;
1410
1411 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1412 if (!ops)
1413 return -ENOMEM;
1414
1415 info.rbd_dev = rbd_dev;
1416
1417 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1418 (void *)&info, &event);
1419 if (ret < 0)
1420 goto fail;
1421
1422 ops[0].watch.ver = 1;
1423 ops[0].watch.flag = 1;
1424 ops[0].watch.cookie = event->cookie;
1425 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1426 ops[0].watch.timeout = 12;
1427
1428 ret = rbd_req_sync_op(rbd_dev, NULL,
1429 CEPH_NOSNAP,
1430 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1431 ops,
1432 rbd_dev->header_name,
1433 0, 0, NULL, NULL, NULL);
1434 if (ret < 0)
1435 goto fail_event;
1436
1437 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1438 dout("ceph_osdc_wait_event returned %d\n", ret);
1439 rbd_destroy_ops(ops);
1440 return 0;
1441
1442 fail_event:
1443 ceph_osdc_cancel_event(event);
1444 fail:
1445 rbd_destroy_ops(ops);
1446 return ret;
1447 }
1448
1449 /*
1450 * Request sync osd read
1451 */
1452 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1453 const char *object_name,
1454 const char *class_name,
1455 const char *method_name,
1456 const char *data,
1457 int len,
1458 u64 *ver)
1459 {
1460 struct ceph_osd_req_op *ops;
1461 int class_name_len = strlen(class_name);
1462 int method_name_len = strlen(method_name);
1463 int ret;
1464
1465 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1466 class_name_len + method_name_len + len);
1467 if (!ops)
1468 return -ENOMEM;
1469
1470 ops[0].cls.class_name = class_name;
1471 ops[0].cls.class_len = (__u8) class_name_len;
1472 ops[0].cls.method_name = method_name;
1473 ops[0].cls.method_len = (__u8) method_name_len;
1474 ops[0].cls.argc = 0;
1475 ops[0].cls.indata = data;
1476 ops[0].cls.indata_len = len;
1477
1478 ret = rbd_req_sync_op(rbd_dev, NULL,
1479 CEPH_NOSNAP,
1480 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1481 ops,
1482 object_name, 0, 0, NULL, NULL, ver);
1483
1484 rbd_destroy_ops(ops);
1485
1486 dout("cls_exec returned %d\n", ret);
1487 return ret;
1488 }
1489
1490 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1491 {
1492 struct rbd_req_coll *coll =
1493 kzalloc(sizeof(struct rbd_req_coll) +
1494 sizeof(struct rbd_req_status) * num_reqs,
1495 GFP_ATOMIC);
1496
1497 if (!coll)
1498 return NULL;
1499 coll->total = num_reqs;
1500 kref_init(&coll->kref);
1501 return coll;
1502 }
1503
1504 /*
1505 * block device queue callback
1506 */
1507 static void rbd_rq_fn(struct request_queue *q)
1508 {
1509 struct rbd_device *rbd_dev = q->queuedata;
1510 struct request *rq;
1511 struct bio_pair *bp = NULL;
1512
1513 while ((rq = blk_fetch_request(q))) {
1514 struct bio *bio;
1515 struct bio *rq_bio, *next_bio = NULL;
1516 bool do_write;
1517 unsigned int size;
1518 u64 op_size = 0;
1519 u64 ofs;
1520 int num_segs, cur_seg = 0;
1521 struct rbd_req_coll *coll;
1522 struct ceph_snap_context *snapc;
1523
1524 dout("fetched request\n");
1525
1526 /* filter out block requests we don't understand */
1527 if ((rq->cmd_type != REQ_TYPE_FS)) {
1528 __blk_end_request_all(rq, 0);
1529 continue;
1530 }
1531
1532 /* deduce our operation (read, write) */
1533 do_write = (rq_data_dir(rq) == WRITE);
1534
1535 size = blk_rq_bytes(rq);
1536 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1537 rq_bio = rq->bio;
1538 if (do_write && rbd_dev->mapping.read_only) {
1539 __blk_end_request_all(rq, -EROFS);
1540 continue;
1541 }
1542
1543 spin_unlock_irq(q->queue_lock);
1544
1545 down_read(&rbd_dev->header_rwsem);
1546
1547 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
1548 !rbd_dev->mapping.snap_exists) {
1549 up_read(&rbd_dev->header_rwsem);
1550 dout("request for non-existent snapshot");
1551 spin_lock_irq(q->queue_lock);
1552 __blk_end_request_all(rq, -ENXIO);
1553 continue;
1554 }
1555
1556 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1557
1558 up_read(&rbd_dev->header_rwsem);
1559
1560 dout("%s 0x%x bytes at 0x%llx\n",
1561 do_write ? "write" : "read",
1562 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1563
1564 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1565 if (num_segs <= 0) {
1566 spin_lock_irq(q->queue_lock);
1567 __blk_end_request_all(rq, num_segs);
1568 ceph_put_snap_context(snapc);
1569 continue;
1570 }
1571 coll = rbd_alloc_coll(num_segs);
1572 if (!coll) {
1573 spin_lock_irq(q->queue_lock);
1574 __blk_end_request_all(rq, -ENOMEM);
1575 ceph_put_snap_context(snapc);
1576 continue;
1577 }
1578
1579 do {
1580 /* a bio clone to be passed down to OSD req */
1581 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1582 op_size = rbd_segment_length(rbd_dev, ofs, size);
1583 kref_get(&coll->kref);
1584 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1585 op_size, GFP_ATOMIC);
1586 if (!bio) {
1587 rbd_coll_end_req_index(rq, coll, cur_seg,
1588 -ENOMEM, op_size);
1589 goto next_seg;
1590 }
1591
1592
1593 /* init OSD command: write or read */
1594 if (do_write)
1595 rbd_req_write(rq, rbd_dev,
1596 snapc,
1597 ofs,
1598 op_size, bio,
1599 coll, cur_seg);
1600 else
1601 rbd_req_read(rq, rbd_dev,
1602 rbd_dev->mapping.snap_id,
1603 ofs,
1604 op_size, bio,
1605 coll, cur_seg);
1606
1607 next_seg:
1608 size -= op_size;
1609 ofs += op_size;
1610
1611 cur_seg++;
1612 rq_bio = next_bio;
1613 } while (size > 0);
1614 kref_put(&coll->kref, rbd_coll_release);
1615
1616 if (bp)
1617 bio_pair_release(bp);
1618 spin_lock_irq(q->queue_lock);
1619
1620 ceph_put_snap_context(snapc);
1621 }
1622 }
1623
1624 /*
1625 * a queue callback. Makes sure that we don't create a bio that spans across
1626 * multiple osd objects. One exception would be with a single page bios,
1627 * which we handle later at bio_chain_clone
1628 */
1629 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1630 struct bio_vec *bvec)
1631 {
1632 struct rbd_device *rbd_dev = q->queuedata;
1633 unsigned int chunk_sectors;
1634 sector_t sector;
1635 unsigned int bio_sectors;
1636 int max;
1637
1638 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1639 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1640 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1641
1642 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1643 + bio_sectors)) << SECTOR_SHIFT;
1644 if (max < 0)
1645 max = 0; /* bio_add cannot handle a negative return */
1646 if (max <= bvec->bv_len && bio_sectors == 0)
1647 return bvec->bv_len;
1648 return max;
1649 }
1650
1651 static void rbd_free_disk(struct rbd_device *rbd_dev)
1652 {
1653 struct gendisk *disk = rbd_dev->disk;
1654
1655 if (!disk)
1656 return;
1657
1658 rbd_header_free(&rbd_dev->header);
1659
1660 if (disk->flags & GENHD_FL_UP)
1661 del_gendisk(disk);
1662 if (disk->queue)
1663 blk_cleanup_queue(disk->queue);
1664 put_disk(disk);
1665 }
1666
1667 /*
1668 * Read the complete header for the given rbd device.
1669 *
1670 * Returns a pointer to a dynamically-allocated buffer containing
1671 * the complete and validated header. Caller can pass the address
1672 * of a variable that will be filled in with the version of the
1673 * header object at the time it was read.
1674 *
1675 * Returns a pointer-coded errno if a failure occurs.
1676 */
1677 static struct rbd_image_header_ondisk *
1678 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1679 {
1680 struct rbd_image_header_ondisk *ondisk = NULL;
1681 u32 snap_count = 0;
1682 u64 names_size = 0;
1683 u32 want_count;
1684 int ret;
1685
1686 /*
1687 * The complete header will include an array of its 64-bit
1688 * snapshot ids, followed by the names of those snapshots as
1689 * a contiguous block of NUL-terminated strings. Note that
1690 * the number of snapshots could change by the time we read
1691 * it in, in which case we re-read it.
1692 */
1693 do {
1694 size_t size;
1695
1696 kfree(ondisk);
1697
1698 size = sizeof (*ondisk);
1699 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1700 size += names_size;
1701 ondisk = kmalloc(size, GFP_KERNEL);
1702 if (!ondisk)
1703 return ERR_PTR(-ENOMEM);
1704
1705 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1706 rbd_dev->header_name,
1707 0, size,
1708 (char *) ondisk, version);
1709
1710 if (ret < 0)
1711 goto out_err;
1712 if (WARN_ON((size_t) ret < size)) {
1713 ret = -ENXIO;
1714 pr_warning("short header read for image %s"
1715 " (want %zd got %d)\n",
1716 rbd_dev->image_name, size, ret);
1717 goto out_err;
1718 }
1719 if (!rbd_dev_ondisk_valid(ondisk)) {
1720 ret = -ENXIO;
1721 pr_warning("invalid header for image %s\n",
1722 rbd_dev->image_name);
1723 goto out_err;
1724 }
1725
1726 names_size = le64_to_cpu(ondisk->snap_names_len);
1727 want_count = snap_count;
1728 snap_count = le32_to_cpu(ondisk->snap_count);
1729 } while (snap_count != want_count);
1730
1731 return ondisk;
1732
1733 out_err:
1734 kfree(ondisk);
1735
1736 return ERR_PTR(ret);
1737 }
1738
1739 /*
1740 * reload the ondisk the header
1741 */
1742 static int rbd_read_header(struct rbd_device *rbd_dev,
1743 struct rbd_image_header *header)
1744 {
1745 struct rbd_image_header_ondisk *ondisk;
1746 u64 ver = 0;
1747 int ret;
1748
1749 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1750 if (IS_ERR(ondisk))
1751 return PTR_ERR(ondisk);
1752 ret = rbd_header_from_disk(header, ondisk);
1753 if (ret >= 0)
1754 header->obj_version = ver;
1755 kfree(ondisk);
1756
1757 return ret;
1758 }
1759
1760 /*
1761 * create a snapshot
1762 */
1763 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1764 const char *snap_name,
1765 gfp_t gfp_flags)
1766 {
1767 int name_len = strlen(snap_name);
1768 u64 new_snapid;
1769 int ret;
1770 void *data, *p, *e;
1771 struct ceph_mon_client *monc;
1772
1773 /* we should create a snapshot only if we're pointing at the head */
1774 if (rbd_dev->mapping.snap_id != CEPH_NOSNAP)
1775 return -EINVAL;
1776
1777 monc = &rbd_dev->rbd_client->client->monc;
1778 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1779 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1780 if (ret < 0)
1781 return ret;
1782
1783 data = kmalloc(name_len + 16, gfp_flags);
1784 if (!data)
1785 return -ENOMEM;
1786
1787 p = data;
1788 e = data + name_len + 16;
1789
1790 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1791 ceph_encode_64_safe(&p, e, new_snapid, bad);
1792
1793 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1794 "rbd", "snap_add",
1795 data, p - data, NULL);
1796
1797 kfree(data);
1798
1799 return ret < 0 ? ret : 0;
1800 bad:
1801 return -ERANGE;
1802 }
1803
1804 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1805 {
1806 struct rbd_snap *snap;
1807 struct rbd_snap *next;
1808
1809 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1810 __rbd_remove_snap_dev(snap);
1811 }
1812
1813 /*
1814 * only read the first part of the ondisk header, without the snaps info
1815 */
1816 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1817 {
1818 int ret;
1819 struct rbd_image_header h;
1820
1821 ret = rbd_read_header(rbd_dev, &h);
1822 if (ret < 0)
1823 return ret;
1824
1825 down_write(&rbd_dev->header_rwsem);
1826
1827 /* resized? */
1828 if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
1829 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1830
1831 if (size != (sector_t) rbd_dev->mapping.size) {
1832 dout("setting size to %llu sectors",
1833 (unsigned long long) size);
1834 rbd_dev->mapping.size = (u64) size;
1835 set_capacity(rbd_dev->disk, size);
1836 }
1837 }
1838
1839 /* rbd_dev->header.object_prefix shouldn't change */
1840 kfree(rbd_dev->header.snap_sizes);
1841 kfree(rbd_dev->header.snap_names);
1842 /* osd requests may still refer to snapc */
1843 ceph_put_snap_context(rbd_dev->header.snapc);
1844
1845 if (hver)
1846 *hver = h.obj_version;
1847 rbd_dev->header.obj_version = h.obj_version;
1848 rbd_dev->header.image_size = h.image_size;
1849 rbd_dev->header.snapc = h.snapc;
1850 rbd_dev->header.snap_names = h.snap_names;
1851 rbd_dev->header.snap_sizes = h.snap_sizes;
1852 /* Free the extra copy of the object prefix */
1853 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1854 kfree(h.object_prefix);
1855
1856 ret = rbd_dev_snap_devs_update(rbd_dev);
1857
1858 up_write(&rbd_dev->header_rwsem);
1859
1860 return ret;
1861 }
1862
1863 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1864 {
1865 int ret;
1866
1867 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1868 ret = __rbd_refresh_header(rbd_dev, hver);
1869 mutex_unlock(&ctl_mutex);
1870
1871 return ret;
1872 }
1873
1874 static int rbd_init_disk(struct rbd_device *rbd_dev)
1875 {
1876 struct gendisk *disk;
1877 struct request_queue *q;
1878 int rc;
1879 u64 segment_size;
1880
1881 /* contact OSD, request size info about the object being mapped */
1882 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1883 if (rc)
1884 return rc;
1885
1886 /* no need to lock here, as rbd_dev is not registered yet */
1887 rc = rbd_dev_snap_devs_update(rbd_dev);
1888 if (rc)
1889 return rc;
1890
1891 rc = rbd_header_set_snap(rbd_dev, snap_name);
1892 if (rc)
1893 return rc;
1894
1895 /* create gendisk info */
1896 rc = -ENOMEM;
1897 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1898 if (!disk)
1899 goto out;
1900
1901 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1902 rbd_dev->dev_id);
1903 disk->major = rbd_dev->major;
1904 disk->first_minor = 0;
1905 disk->fops = &rbd_bd_ops;
1906 disk->private_data = rbd_dev;
1907
1908 /* init rq */
1909 rc = -ENOMEM;
1910 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1911 if (!q)
1912 goto out_disk;
1913
1914 /* We use the default size, but let's be explicit about it. */
1915 blk_queue_physical_block_size(q, SECTOR_SIZE);
1916
1917 /* set io sizes to object size */
1918 segment_size = rbd_obj_bytes(&rbd_dev->header);
1919 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1920 blk_queue_max_segment_size(q, segment_size);
1921 blk_queue_io_min(q, segment_size);
1922 blk_queue_io_opt(q, segment_size);
1923
1924 blk_queue_merge_bvec(q, rbd_merge_bvec);
1925 disk->queue = q;
1926
1927 q->queuedata = rbd_dev;
1928
1929 rbd_dev->disk = disk;
1930
1931 /* finally, announce the disk to the world */
1932 set_capacity(disk, (sector_t) rbd_dev->mapping.size / SECTOR_SIZE);
1933 add_disk(disk);
1934
1935 pr_info("%s: added with size 0x%llx\n",
1936 disk->disk_name, (unsigned long long) rbd_dev->mapping.size);
1937 return 0;
1938
1939 out_disk:
1940 put_disk(disk);
1941 out:
1942 return rc;
1943 }
1944
1945 /*
1946 sysfs
1947 */
1948
1949 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1950 {
1951 return container_of(dev, struct rbd_device, dev);
1952 }
1953
1954 static ssize_t rbd_size_show(struct device *dev,
1955 struct device_attribute *attr, char *buf)
1956 {
1957 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1958 sector_t size;
1959
1960 down_read(&rbd_dev->header_rwsem);
1961 size = get_capacity(rbd_dev->disk);
1962 up_read(&rbd_dev->header_rwsem);
1963
1964 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1965 }
1966
1967 static ssize_t rbd_major_show(struct device *dev,
1968 struct device_attribute *attr, char *buf)
1969 {
1970 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1971
1972 return sprintf(buf, "%d\n", rbd_dev->major);
1973 }
1974
1975 static ssize_t rbd_client_id_show(struct device *dev,
1976 struct device_attribute *attr, char *buf)
1977 {
1978 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1979
1980 return sprintf(buf, "client%lld\n",
1981 ceph_client_id(rbd_dev->rbd_client->client));
1982 }
1983
1984 static ssize_t rbd_pool_show(struct device *dev,
1985 struct device_attribute *attr, char *buf)
1986 {
1987 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1988
1989 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1990 }
1991
1992 static ssize_t rbd_pool_id_show(struct device *dev,
1993 struct device_attribute *attr, char *buf)
1994 {
1995 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1996
1997 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1998 }
1999
2000 static ssize_t rbd_name_show(struct device *dev,
2001 struct device_attribute *attr, char *buf)
2002 {
2003 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2004
2005 return sprintf(buf, "%s\n", rbd_dev->image_name);
2006 }
2007
2008 static ssize_t rbd_snap_show(struct device *dev,
2009 struct device_attribute *attr,
2010 char *buf)
2011 {
2012 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2013
2014 return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
2015 }
2016
2017 static ssize_t rbd_image_refresh(struct device *dev,
2018 struct device_attribute *attr,
2019 const char *buf,
2020 size_t size)
2021 {
2022 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2023 int ret;
2024
2025 ret = rbd_refresh_header(rbd_dev, NULL);
2026
2027 return ret < 0 ? ret : size;
2028 }
2029
2030 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2031 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2032 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2033 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2034 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2035 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2036 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2037 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2038 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
2039
2040 static struct attribute *rbd_attrs[] = {
2041 &dev_attr_size.attr,
2042 &dev_attr_major.attr,
2043 &dev_attr_client_id.attr,
2044 &dev_attr_pool.attr,
2045 &dev_attr_pool_id.attr,
2046 &dev_attr_name.attr,
2047 &dev_attr_current_snap.attr,
2048 &dev_attr_refresh.attr,
2049 &dev_attr_create_snap.attr,
2050 NULL
2051 };
2052
2053 static struct attribute_group rbd_attr_group = {
2054 .attrs = rbd_attrs,
2055 };
2056
2057 static const struct attribute_group *rbd_attr_groups[] = {
2058 &rbd_attr_group,
2059 NULL
2060 };
2061
2062 static void rbd_sysfs_dev_release(struct device *dev)
2063 {
2064 }
2065
2066 static struct device_type rbd_device_type = {
2067 .name = "rbd",
2068 .groups = rbd_attr_groups,
2069 .release = rbd_sysfs_dev_release,
2070 };
2071
2072
2073 /*
2074 sysfs - snapshots
2075 */
2076
2077 static ssize_t rbd_snap_size_show(struct device *dev,
2078 struct device_attribute *attr,
2079 char *buf)
2080 {
2081 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2082
2083 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2084 }
2085
2086 static ssize_t rbd_snap_id_show(struct device *dev,
2087 struct device_attribute *attr,
2088 char *buf)
2089 {
2090 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2091
2092 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2093 }
2094
2095 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2096 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2097
2098 static struct attribute *rbd_snap_attrs[] = {
2099 &dev_attr_snap_size.attr,
2100 &dev_attr_snap_id.attr,
2101 NULL,
2102 };
2103
2104 static struct attribute_group rbd_snap_attr_group = {
2105 .attrs = rbd_snap_attrs,
2106 };
2107
2108 static void rbd_snap_dev_release(struct device *dev)
2109 {
2110 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2111 kfree(snap->name);
2112 kfree(snap);
2113 }
2114
2115 static const struct attribute_group *rbd_snap_attr_groups[] = {
2116 &rbd_snap_attr_group,
2117 NULL
2118 };
2119
2120 static struct device_type rbd_snap_device_type = {
2121 .groups = rbd_snap_attr_groups,
2122 .release = rbd_snap_dev_release,
2123 };
2124
2125 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2126 {
2127 list_del(&snap->node);
2128 device_unregister(&snap->dev);
2129 }
2130
2131 static int rbd_register_snap_dev(struct rbd_snap *snap,
2132 struct device *parent)
2133 {
2134 struct device *dev = &snap->dev;
2135 int ret;
2136
2137 dev->type = &rbd_snap_device_type;
2138 dev->parent = parent;
2139 dev->release = rbd_snap_dev_release;
2140 dev_set_name(dev, "snap_%s", snap->name);
2141 ret = device_register(dev);
2142
2143 return ret;
2144 }
2145
2146 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2147 int i, const char *name)
2148 {
2149 struct rbd_snap *snap;
2150 int ret;
2151
2152 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2153 if (!snap)
2154 return ERR_PTR(-ENOMEM);
2155
2156 ret = -ENOMEM;
2157 snap->name = kstrdup(name, GFP_KERNEL);
2158 if (!snap->name)
2159 goto err;
2160
2161 snap->size = rbd_dev->header.snap_sizes[i];
2162 snap->id = rbd_dev->header.snapc->snaps[i];
2163 if (device_is_registered(&rbd_dev->dev)) {
2164 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2165 if (ret < 0)
2166 goto err;
2167 }
2168
2169 return snap;
2170
2171 err:
2172 kfree(snap->name);
2173 kfree(snap);
2174
2175 return ERR_PTR(ret);
2176 }
2177
2178 /*
2179 * Scan the rbd device's current snapshot list and compare it to the
2180 * newly-received snapshot context. Remove any existing snapshots
2181 * not present in the new snapshot context. Add a new snapshot for
2182 * any snaphots in the snapshot context not in the current list.
2183 * And verify there are no changes to snapshots we already know
2184 * about.
2185 *
2186 * Assumes the snapshots in the snapshot context are sorted by
2187 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2188 * are also maintained in that order.)
2189 */
2190 static int rbd_dev_snap_devs_update(struct rbd_device *rbd_dev)
2191 {
2192 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2193 const u32 snap_count = snapc->num_snaps;
2194 char *snap_name = rbd_dev->header.snap_names;
2195 struct list_head *head = &rbd_dev->snaps;
2196 struct list_head *links = head->next;
2197 u32 index = 0;
2198
2199 dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
2200 while (index < snap_count || links != head) {
2201 u64 snap_id;
2202 struct rbd_snap *snap;
2203
2204 snap_id = index < snap_count ? snapc->snaps[index]
2205 : CEPH_NOSNAP;
2206 snap = links != head ? list_entry(links, struct rbd_snap, node)
2207 : NULL;
2208 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2209
2210 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2211 struct list_head *next = links->next;
2212
2213 /* Existing snapshot not in the new snap context */
2214
2215 if (rbd_dev->mapping.snap_id == snap->id)
2216 rbd_dev->mapping.snap_exists = false;
2217 __rbd_remove_snap_dev(snap);
2218 dout("%ssnap id %llu has been removed\n",
2219 rbd_dev->mapping.snap_id == snap->id ?
2220 "mapped " : "",
2221 (unsigned long long) snap->id);
2222
2223 /* Done with this list entry; advance */
2224
2225 links = next;
2226 continue;
2227 }
2228
2229 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
2230 (unsigned long long) snap_id);
2231 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2232 struct rbd_snap *new_snap;
2233
2234 /* We haven't seen this snapshot before */
2235
2236 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2237 snap_name);
2238 if (IS_ERR(new_snap)) {
2239 int err = PTR_ERR(new_snap);
2240
2241 dout(" failed to add dev, error %d\n", err);
2242
2243 return err;
2244 }
2245
2246 /* New goes before existing, or at end of list */
2247
2248 dout(" added dev%s\n", snap ? "" : " at end\n");
2249 if (snap)
2250 list_add_tail(&new_snap->node, &snap->node);
2251 else
2252 list_add_tail(&new_snap->node, head);
2253 } else {
2254 /* Already have this one */
2255
2256 dout(" already present\n");
2257
2258 rbd_assert(snap->size ==
2259 rbd_dev->header.snap_sizes[index]);
2260 rbd_assert(!strcmp(snap->name, snap_name));
2261
2262 /* Done with this list entry; advance */
2263
2264 links = links->next;
2265 }
2266
2267 /* Advance to the next entry in the snapshot context */
2268
2269 index++;
2270 snap_name += strlen(snap_name) + 1;
2271 }
2272 dout("%s: done\n", __func__);
2273
2274 return 0;
2275 }
2276
2277 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2278 {
2279 int ret;
2280 struct device *dev;
2281 struct rbd_snap *snap;
2282
2283 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2284 dev = &rbd_dev->dev;
2285
2286 dev->bus = &rbd_bus_type;
2287 dev->type = &rbd_device_type;
2288 dev->parent = &rbd_root_dev;
2289 dev->release = rbd_dev_release;
2290 dev_set_name(dev, "%d", rbd_dev->dev_id);
2291 ret = device_register(dev);
2292 if (ret < 0)
2293 goto out;
2294
2295 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2296 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2297 if (ret < 0)
2298 break;
2299 }
2300 out:
2301 mutex_unlock(&ctl_mutex);
2302 return ret;
2303 }
2304
2305 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2306 {
2307 device_unregister(&rbd_dev->dev);
2308 }
2309
2310 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2311 {
2312 int ret, rc;
2313
2314 do {
2315 ret = rbd_req_sync_watch(rbd_dev);
2316 if (ret == -ERANGE) {
2317 rc = rbd_refresh_header(rbd_dev, NULL);
2318 if (rc < 0)
2319 return rc;
2320 }
2321 } while (ret == -ERANGE);
2322
2323 return ret;
2324 }
2325
2326 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2327
2328 /*
2329 * Get a unique rbd identifier for the given new rbd_dev, and add
2330 * the rbd_dev to the global list. The minimum rbd id is 1.
2331 */
2332 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2333 {
2334 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2335
2336 spin_lock(&rbd_dev_list_lock);
2337 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2338 spin_unlock(&rbd_dev_list_lock);
2339 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2340 (unsigned long long) rbd_dev->dev_id);
2341 }
2342
2343 /*
2344 * Remove an rbd_dev from the global list, and record that its
2345 * identifier is no longer in use.
2346 */
2347 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2348 {
2349 struct list_head *tmp;
2350 int rbd_id = rbd_dev->dev_id;
2351 int max_id;
2352
2353 rbd_assert(rbd_id > 0);
2354
2355 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2356 (unsigned long long) rbd_dev->dev_id);
2357 spin_lock(&rbd_dev_list_lock);
2358 list_del_init(&rbd_dev->node);
2359
2360 /*
2361 * If the id being "put" is not the current maximum, there
2362 * is nothing special we need to do.
2363 */
2364 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2365 spin_unlock(&rbd_dev_list_lock);
2366 return;
2367 }
2368
2369 /*
2370 * We need to update the current maximum id. Search the
2371 * list to find out what it is. We're more likely to find
2372 * the maximum at the end, so search the list backward.
2373 */
2374 max_id = 0;
2375 list_for_each_prev(tmp, &rbd_dev_list) {
2376 struct rbd_device *rbd_dev;
2377
2378 rbd_dev = list_entry(tmp, struct rbd_device, node);
2379 if (rbd_id > max_id)
2380 max_id = rbd_id;
2381 }
2382 spin_unlock(&rbd_dev_list_lock);
2383
2384 /*
2385 * The max id could have been updated by rbd_dev_id_get(), in
2386 * which case it now accurately reflects the new maximum.
2387 * Be careful not to overwrite the maximum value in that
2388 * case.
2389 */
2390 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2391 dout(" max dev id has been reset\n");
2392 }
2393
2394 /*
2395 * Skips over white space at *buf, and updates *buf to point to the
2396 * first found non-space character (if any). Returns the length of
2397 * the token (string of non-white space characters) found. Note
2398 * that *buf must be terminated with '\0'.
2399 */
2400 static inline size_t next_token(const char **buf)
2401 {
2402 /*
2403 * These are the characters that produce nonzero for
2404 * isspace() in the "C" and "POSIX" locales.
2405 */
2406 const char *spaces = " \f\n\r\t\v";
2407
2408 *buf += strspn(*buf, spaces); /* Find start of token */
2409
2410 return strcspn(*buf, spaces); /* Return token length */
2411 }
2412
2413 /*
2414 * Finds the next token in *buf, and if the provided token buffer is
2415 * big enough, copies the found token into it. The result, if
2416 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2417 * must be terminated with '\0' on entry.
2418 *
2419 * Returns the length of the token found (not including the '\0').
2420 * Return value will be 0 if no token is found, and it will be >=
2421 * token_size if the token would not fit.
2422 *
2423 * The *buf pointer will be updated to point beyond the end of the
2424 * found token. Note that this occurs even if the token buffer is
2425 * too small to hold it.
2426 */
2427 static inline size_t copy_token(const char **buf,
2428 char *token,
2429 size_t token_size)
2430 {
2431 size_t len;
2432
2433 len = next_token(buf);
2434 if (len < token_size) {
2435 memcpy(token, *buf, len);
2436 *(token + len) = '\0';
2437 }
2438 *buf += len;
2439
2440 return len;
2441 }
2442
2443 /*
2444 * Finds the next token in *buf, dynamically allocates a buffer big
2445 * enough to hold a copy of it, and copies the token into the new
2446 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2447 * that a duplicate buffer is created even for a zero-length token.
2448 *
2449 * Returns a pointer to the newly-allocated duplicate, or a null
2450 * pointer if memory for the duplicate was not available. If
2451 * the lenp argument is a non-null pointer, the length of the token
2452 * (not including the '\0') is returned in *lenp.
2453 *
2454 * If successful, the *buf pointer will be updated to point beyond
2455 * the end of the found token.
2456 *
2457 * Note: uses GFP_KERNEL for allocation.
2458 */
2459 static inline char *dup_token(const char **buf, size_t *lenp)
2460 {
2461 char *dup;
2462 size_t len;
2463
2464 len = next_token(buf);
2465 dup = kmalloc(len + 1, GFP_KERNEL);
2466 if (!dup)
2467 return NULL;
2468
2469 memcpy(dup, *buf, len);
2470 *(dup + len) = '\0';
2471 *buf += len;
2472
2473 if (lenp)
2474 *lenp = len;
2475
2476 return dup;
2477 }
2478
2479 /*
2480 * This fills in the pool_name, image_name, image_name_len, rbd_dev,
2481 * rbd_md_name, and name fields of the given rbd_dev, based on the
2482 * list of monitor addresses and other options provided via
2483 * /sys/bus/rbd/add. Returns a pointer to a dynamically-allocated
2484 * copy of the snapshot name to map if successful, or a
2485 * pointer-coded error otherwise.
2486 *
2487 * Note: rbd_dev is assumed to have been initially zero-filled.
2488 */
2489 static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
2490 const char *buf,
2491 const char **mon_addrs,
2492 size_t *mon_addrs_size,
2493 char *options,
2494 size_t options_size)
2495 {
2496 size_t len;
2497 char *err_ptr = ERR_PTR(-EINVAL);
2498 char *snap_name;
2499
2500 /* The first four tokens are required */
2501
2502 len = next_token(&buf);
2503 if (!len)
2504 return err_ptr;
2505 *mon_addrs_size = len + 1;
2506 *mon_addrs = buf;
2507
2508 buf += len;
2509
2510 len = copy_token(&buf, options, options_size);
2511 if (!len || len >= options_size)
2512 return err_ptr;
2513
2514 err_ptr = ERR_PTR(-ENOMEM);
2515 rbd_dev->pool_name = dup_token(&buf, NULL);
2516 if (!rbd_dev->pool_name)
2517 goto out_err;
2518
2519 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2520 if (!rbd_dev->image_name)
2521 goto out_err;
2522
2523 /* Create the name of the header object */
2524
2525 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2526 + sizeof (RBD_SUFFIX),
2527 GFP_KERNEL);
2528 if (!rbd_dev->header_name)
2529 goto out_err;
2530 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2531
2532 /* Snapshot name is optional */
2533 len = next_token(&buf);
2534 if (!len) {
2535 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
2536 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
2537 }
2538 snap_name = kmalloc(len + 1, GFP_KERNEL);
2539 if (!snap_name)
2540 goto out_err;
2541 memcpy(snap_name, buf, len);
2542 *(snap_name + len) = '\0';
2543
2544 dout(" SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
2545
2546 return snap_name;
2547
2548 out_err:
2549 kfree(rbd_dev->header_name);
2550 rbd_dev->header_name = NULL;
2551 kfree(rbd_dev->image_name);
2552 rbd_dev->image_name = NULL;
2553 rbd_dev->image_name_len = 0;
2554 kfree(rbd_dev->pool_name);
2555 rbd_dev->pool_name = NULL;
2556
2557 return err_ptr;
2558 }
2559
2560 static ssize_t rbd_add(struct bus_type *bus,
2561 const char *buf,
2562 size_t count)
2563 {
2564 char *options;
2565 struct rbd_device *rbd_dev = NULL;
2566 const char *mon_addrs = NULL;
2567 size_t mon_addrs_size = 0;
2568 struct ceph_osd_client *osdc;
2569 int rc = -ENOMEM;
2570 char *snap_name;
2571
2572 if (!try_module_get(THIS_MODULE))
2573 return -ENODEV;
2574
2575 options = kmalloc(count, GFP_KERNEL);
2576 if (!options)
2577 goto err_nomem;
2578 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2579 if (!rbd_dev)
2580 goto err_nomem;
2581
2582 /* static rbd_device initialization */
2583 spin_lock_init(&rbd_dev->lock);
2584 INIT_LIST_HEAD(&rbd_dev->node);
2585 INIT_LIST_HEAD(&rbd_dev->snaps);
2586 init_rwsem(&rbd_dev->header_rwsem);
2587
2588 /* generate unique id: find highest unique id, add one */
2589 rbd_dev_id_get(rbd_dev);
2590
2591 /* Fill in the device name, now that we have its id. */
2592 BUILD_BUG_ON(DEV_NAME_LEN
2593 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2594 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2595
2596 /* parse add command */
2597 snap_name = rbd_add_parse_args(rbd_dev, buf,
2598 &mon_addrs, &mon_addrs_size, options, count);
2599 if (IS_ERR(snap_name)) {
2600 rc = PTR_ERR(snap_name);
2601 goto err_put_id;
2602 }
2603
2604 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2605 if (rc < 0)
2606 goto err_put_id;
2607
2608 /* pick the pool */
2609 osdc = &rbd_dev->rbd_client->client->osdc;
2610 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2611 if (rc < 0)
2612 goto err_out_client;
2613 rbd_dev->pool_id = rc;
2614
2615 /* register our block device */
2616 rc = register_blkdev(0, rbd_dev->name);
2617 if (rc < 0)
2618 goto err_out_client;
2619 rbd_dev->major = rc;
2620
2621 rc = rbd_bus_add_dev(rbd_dev);
2622 if (rc)
2623 goto err_out_blkdev;
2624
2625 /*
2626 * At this point cleanup in the event of an error is the job
2627 * of the sysfs code (initiated by rbd_bus_del_dev()).
2628 *
2629 * Set up and announce blkdev mapping.
2630 */
2631 rc = rbd_init_disk(rbd_dev);
2632 if (rc)
2633 goto err_out_bus;
2634
2635 rc = rbd_init_watch_dev(rbd_dev);
2636 if (rc)
2637 goto err_out_bus;
2638
2639 return count;
2640
2641 err_out_bus:
2642 /* this will also clean up rest of rbd_dev stuff */
2643
2644 rbd_bus_del_dev(rbd_dev);
2645 kfree(options);
2646 return rc;
2647
2648 err_out_blkdev:
2649 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2650 err_out_client:
2651 rbd_put_client(rbd_dev);
2652 err_put_id:
2653 if (rbd_dev->pool_name) {
2654 kfree(rbd_dev->mapping.snap_name);
2655 kfree(rbd_dev->header_name);
2656 kfree(rbd_dev->image_name);
2657 kfree(rbd_dev->pool_name);
2658 }
2659 rbd_dev_id_put(rbd_dev);
2660 err_nomem:
2661 kfree(rbd_dev);
2662 kfree(options);
2663
2664 dout("Error adding device %s\n", buf);
2665 module_put(THIS_MODULE);
2666
2667 return (ssize_t) rc;
2668 }
2669
2670 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2671 {
2672 struct list_head *tmp;
2673 struct rbd_device *rbd_dev;
2674
2675 spin_lock(&rbd_dev_list_lock);
2676 list_for_each(tmp, &rbd_dev_list) {
2677 rbd_dev = list_entry(tmp, struct rbd_device, node);
2678 if (rbd_dev->dev_id == dev_id) {
2679 spin_unlock(&rbd_dev_list_lock);
2680 return rbd_dev;
2681 }
2682 }
2683 spin_unlock(&rbd_dev_list_lock);
2684 return NULL;
2685 }
2686
2687 static void rbd_dev_release(struct device *dev)
2688 {
2689 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2690
2691 if (rbd_dev->watch_request) {
2692 struct ceph_client *client = rbd_dev->rbd_client->client;
2693
2694 ceph_osdc_unregister_linger_request(&client->osdc,
2695 rbd_dev->watch_request);
2696 }
2697 if (rbd_dev->watch_event)
2698 rbd_req_sync_unwatch(rbd_dev);
2699
2700 rbd_put_client(rbd_dev);
2701
2702 /* clean up and free blkdev */
2703 rbd_free_disk(rbd_dev);
2704 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2705
2706 /* done with the id, and with the rbd_dev */
2707 kfree(rbd_dev->mapping.snap_name);
2708 kfree(rbd_dev->header_name);
2709 kfree(rbd_dev->pool_name);
2710 kfree(rbd_dev->image_name);
2711 rbd_dev_id_put(rbd_dev);
2712 kfree(rbd_dev);
2713
2714 /* release module ref */
2715 module_put(THIS_MODULE);
2716 }
2717
2718 static ssize_t rbd_remove(struct bus_type *bus,
2719 const char *buf,
2720 size_t count)
2721 {
2722 struct rbd_device *rbd_dev = NULL;
2723 int target_id, rc;
2724 unsigned long ul;
2725 int ret = count;
2726
2727 rc = strict_strtoul(buf, 10, &ul);
2728 if (rc)
2729 return rc;
2730
2731 /* convert to int; abort if we lost anything in the conversion */
2732 target_id = (int) ul;
2733 if (target_id != ul)
2734 return -EINVAL;
2735
2736 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2737
2738 rbd_dev = __rbd_get_dev(target_id);
2739 if (!rbd_dev) {
2740 ret = -ENOENT;
2741 goto done;
2742 }
2743
2744 __rbd_remove_all_snaps(rbd_dev);
2745 rbd_bus_del_dev(rbd_dev);
2746
2747 done:
2748 mutex_unlock(&ctl_mutex);
2749
2750 return ret;
2751 }
2752
2753 static ssize_t rbd_snap_add(struct device *dev,
2754 struct device_attribute *attr,
2755 const char *buf,
2756 size_t count)
2757 {
2758 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2759 int ret;
2760 char *name = kmalloc(count + 1, GFP_KERNEL);
2761 if (!name)
2762 return -ENOMEM;
2763
2764 snprintf(name, count, "%s", buf);
2765
2766 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2767
2768 ret = rbd_header_add_snap(rbd_dev,
2769 name, GFP_KERNEL);
2770 if (ret < 0)
2771 goto err_unlock;
2772
2773 ret = __rbd_refresh_header(rbd_dev, NULL);
2774 if (ret < 0)
2775 goto err_unlock;
2776
2777 /* shouldn't hold ctl_mutex when notifying.. notify might
2778 trigger a watch callback that would need to get that mutex */
2779 mutex_unlock(&ctl_mutex);
2780
2781 /* make a best effort, don't error if failed */
2782 rbd_req_sync_notify(rbd_dev);
2783
2784 ret = count;
2785 kfree(name);
2786 return ret;
2787
2788 err_unlock:
2789 mutex_unlock(&ctl_mutex);
2790 kfree(name);
2791 return ret;
2792 }
2793
2794 /*
2795 * create control files in sysfs
2796 * /sys/bus/rbd/...
2797 */
2798 static int rbd_sysfs_init(void)
2799 {
2800 int ret;
2801
2802 ret = device_register(&rbd_root_dev);
2803 if (ret < 0)
2804 return ret;
2805
2806 ret = bus_register(&rbd_bus_type);
2807 if (ret < 0)
2808 device_unregister(&rbd_root_dev);
2809
2810 return ret;
2811 }
2812
2813 static void rbd_sysfs_cleanup(void)
2814 {
2815 bus_unregister(&rbd_bus_type);
2816 device_unregister(&rbd_root_dev);
2817 }
2818
2819 int __init rbd_init(void)
2820 {
2821 int rc;
2822
2823 rc = rbd_sysfs_init();
2824 if (rc)
2825 return rc;
2826 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2827 return 0;
2828 }
2829
2830 void __exit rbd_exit(void)
2831 {
2832 rbd_sysfs_cleanup();
2833 }
2834
2835 module_init(rbd_init);
2836 module_exit(rbd_exit);
2837
2838 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2839 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2840 MODULE_DESCRIPTION("rados block device");
2841
2842 /* following authorship retained from original osdblk.c */
2843 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2844
2845 MODULE_LICENSE("GPL");