]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blob - drivers/block/rbd.c
rbd: return earlier in rbd_header_from_disk()
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
60
61 #define RBD_SNAP_HEAD_NAME "-"
62
63 /*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75 * block device image metadata (in-memory version)
76 */
77 struct rbd_image_header {
78 u64 image_size;
79 char *object_prefix;
80 __u8 obj_order;
81 __u8 crypt_type;
82 __u8 comp_type;
83 struct ceph_snap_context *snapc;
84 u64 snap_names_len;
85 u32 total_snaps;
86
87 char *snap_names;
88 u64 *snap_sizes;
89
90 u64 obj_version;
91 };
92
93 struct rbd_options {
94 int notify_timeout;
95 };
96
97 /*
98 * an instance of the client. multiple devices may share an rbd client.
99 */
100 struct rbd_client {
101 struct ceph_client *client;
102 struct rbd_options *rbd_opts;
103 struct kref kref;
104 struct list_head node;
105 };
106
107 /*
108 * a request completion status
109 */
110 struct rbd_req_status {
111 int done;
112 int rc;
113 u64 bytes;
114 };
115
116 /*
117 * a collection of requests
118 */
119 struct rbd_req_coll {
120 int total;
121 int num_done;
122 struct kref kref;
123 struct rbd_req_status status[0];
124 };
125
126 /*
127 * a single io request
128 */
129 struct rbd_request {
130 struct request *rq; /* blk layer request */
131 struct bio *bio; /* cloned bio */
132 struct page **pages; /* list of used pages */
133 u64 len;
134 int coll_index;
135 struct rbd_req_coll *coll;
136 };
137
138 struct rbd_snap {
139 struct device dev;
140 const char *name;
141 u64 size;
142 struct list_head node;
143 u64 id;
144 };
145
146 /*
147 * a single device
148 */
149 struct rbd_device {
150 int dev_id; /* blkdev unique id */
151
152 int major; /* blkdev assigned major */
153 struct gendisk *disk; /* blkdev's gendisk and rq */
154 struct request_queue *q;
155
156 struct rbd_client *rbd_client;
157
158 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160 spinlock_t lock; /* queue lock */
161
162 struct rbd_image_header header;
163 char *image_name;
164 size_t image_name_len;
165 char *header_name;
166 char *pool_name;
167 int pool_id;
168
169 struct ceph_osd_event *watch_event;
170 struct ceph_osd_request *watch_request;
171
172 /* protects updating the header */
173 struct rw_semaphore header_rwsem;
174 /* name of the snapshot this device reads from */
175 char *snap_name;
176 /* id of the snapshot this device reads from */
177 u64 snap_id; /* current snapshot id */
178 /* whether the snap_id this device reads from still exists */
179 bool snap_exists;
180 int read_only;
181
182 struct list_head node;
183
184 /* list of snapshots */
185 struct list_head snaps;
186
187 /* sysfs related */
188 struct device dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list); /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list); /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202 struct device_attribute *attr,
203 const char *buf,
204 size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208 size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210 size_t count);
211
212 static struct bus_attribute rbd_bus_attrs[] = {
213 __ATTR(add, S_IWUSR, NULL, rbd_add),
214 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215 __ATTR_NULL
216 };
217
218 static struct bus_type rbd_bus_type = {
219 .name = "rbd",
220 .bus_attrs = rbd_bus_attrs,
221 };
222
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226
227 static struct device rbd_root_dev = {
228 .init_name = "rbd",
229 .release = rbd_root_dev_release,
230 };
231
232
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235 return get_device(&rbd_dev->dev);
236 }
237
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240 put_device(&rbd_dev->dev);
241 }
242
243 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250 return -EROFS;
251
252 rbd_get_dev(rbd_dev);
253 set_device_ro(bdev, rbd_dev->read_only);
254
255 return 0;
256 }
257
258 static int rbd_release(struct gendisk *disk, fmode_t mode)
259 {
260 struct rbd_device *rbd_dev = disk->private_data;
261
262 rbd_put_dev(rbd_dev);
263
264 return 0;
265 }
266
267 static const struct block_device_operations rbd_bd_ops = {
268 .owner = THIS_MODULE,
269 .open = rbd_open,
270 .release = rbd_release,
271 };
272
273 /*
274 * Initialize an rbd client instance.
275 * We own *ceph_opts.
276 */
277 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
278 struct rbd_options *rbd_opts)
279 {
280 struct rbd_client *rbdc;
281 int ret = -ENOMEM;
282
283 dout("rbd_client_create\n");
284 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
285 if (!rbdc)
286 goto out_opt;
287
288 kref_init(&rbdc->kref);
289 INIT_LIST_HEAD(&rbdc->node);
290
291 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292
293 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
294 if (IS_ERR(rbdc->client))
295 goto out_mutex;
296 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
297
298 ret = ceph_open_session(rbdc->client);
299 if (ret < 0)
300 goto out_err;
301
302 rbdc->rbd_opts = rbd_opts;
303
304 spin_lock(&rbd_client_list_lock);
305 list_add_tail(&rbdc->node, &rbd_client_list);
306 spin_unlock(&rbd_client_list_lock);
307
308 mutex_unlock(&ctl_mutex);
309
310 dout("rbd_client_create created %p\n", rbdc);
311 return rbdc;
312
313 out_err:
314 ceph_destroy_client(rbdc->client);
315 out_mutex:
316 mutex_unlock(&ctl_mutex);
317 kfree(rbdc);
318 out_opt:
319 if (ceph_opts)
320 ceph_destroy_options(ceph_opts);
321 return ERR_PTR(ret);
322 }
323
324 /*
325 * Find a ceph client with specific addr and configuration.
326 */
327 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
328 {
329 struct rbd_client *client_node;
330
331 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
332 return NULL;
333
334 list_for_each_entry(client_node, &rbd_client_list, node)
335 if (!ceph_compare_options(ceph_opts, client_node->client))
336 return client_node;
337 return NULL;
338 }
339
340 /*
341 * mount options
342 */
343 enum {
344 Opt_notify_timeout,
345 Opt_last_int,
346 /* int args above */
347 Opt_last_string,
348 /* string args above */
349 };
350
351 static match_table_t rbd_opts_tokens = {
352 {Opt_notify_timeout, "notify_timeout=%d"},
353 /* int args above */
354 /* string args above */
355 {-1, NULL}
356 };
357
358 static int parse_rbd_opts_token(char *c, void *private)
359 {
360 struct rbd_options *rbd_opts = private;
361 substring_t argstr[MAX_OPT_ARGS];
362 int token, intval, ret;
363
364 token = match_token(c, rbd_opts_tokens, argstr);
365 if (token < 0)
366 return -EINVAL;
367
368 if (token < Opt_last_int) {
369 ret = match_int(&argstr[0], &intval);
370 if (ret < 0) {
371 pr_err("bad mount option arg (not int) "
372 "at '%s'\n", c);
373 return ret;
374 }
375 dout("got int token %d val %d\n", token, intval);
376 } else if (token > Opt_last_int && token < Opt_last_string) {
377 dout("got string token %d val %s\n", token,
378 argstr[0].from);
379 } else {
380 dout("got token %d\n", token);
381 }
382
383 switch (token) {
384 case Opt_notify_timeout:
385 rbd_opts->notify_timeout = intval;
386 break;
387 default:
388 BUG_ON(token);
389 }
390 return 0;
391 }
392
393 /*
394 * Get a ceph client with specific addr and configuration, if one does
395 * not exist create it.
396 */
397 static struct rbd_client *rbd_get_client(const char *mon_addr,
398 size_t mon_addr_len,
399 char *options)
400 {
401 struct rbd_client *rbdc;
402 struct ceph_options *ceph_opts;
403 struct rbd_options *rbd_opts;
404
405 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406 if (!rbd_opts)
407 return ERR_PTR(-ENOMEM);
408
409 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
410
411 ceph_opts = ceph_parse_options(options, mon_addr,
412 mon_addr + mon_addr_len,
413 parse_rbd_opts_token, rbd_opts);
414 if (IS_ERR(ceph_opts)) {
415 kfree(rbd_opts);
416 return ERR_CAST(ceph_opts);
417 }
418
419 spin_lock(&rbd_client_list_lock);
420 rbdc = __rbd_client_find(ceph_opts);
421 if (rbdc) {
422 /* using an existing client */
423 kref_get(&rbdc->kref);
424 spin_unlock(&rbd_client_list_lock);
425
426 ceph_destroy_options(ceph_opts);
427 kfree(rbd_opts);
428
429 return rbdc;
430 }
431 spin_unlock(&rbd_client_list_lock);
432
433 rbdc = rbd_client_create(ceph_opts, rbd_opts);
434
435 if (IS_ERR(rbdc))
436 kfree(rbd_opts);
437
438 return rbdc;
439 }
440
441 /*
442 * Destroy ceph client
443 *
444 * Caller must hold rbd_client_list_lock.
445 */
446 static void rbd_client_release(struct kref *kref)
447 {
448 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449
450 dout("rbd_release_client %p\n", rbdc);
451 spin_lock(&rbd_client_list_lock);
452 list_del(&rbdc->node);
453 spin_unlock(&rbd_client_list_lock);
454
455 ceph_destroy_client(rbdc->client);
456 kfree(rbdc->rbd_opts);
457 kfree(rbdc);
458 }
459
460 /*
461 * Drop reference to ceph client node. If it's not referenced anymore, release
462 * it.
463 */
464 static void rbd_put_client(struct rbd_device *rbd_dev)
465 {
466 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467 rbd_dev->rbd_client = NULL;
468 }
469
470 /*
471 * Destroy requests collection
472 */
473 static void rbd_coll_release(struct kref *kref)
474 {
475 struct rbd_req_coll *coll =
476 container_of(kref, struct rbd_req_coll, kref);
477
478 dout("rbd_coll_release %p\n", coll);
479 kfree(coll);
480 }
481
482 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
483 {
484 return !memcmp(&ondisk->text,
485 RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
486 }
487
488 /*
489 * Create a new header structure, translate header format from the on-disk
490 * header.
491 */
492 static int rbd_header_from_disk(struct rbd_image_header *header,
493 struct rbd_image_header_ondisk *ondisk,
494 u32 allocated_snaps)
495 {
496 u32 snap_count;
497 size_t size;
498
499 if (!rbd_dev_ondisk_valid(ondisk))
500 return -ENXIO;
501
502 snap_count = le32_to_cpu(ondisk->snap_count);
503
504 /* Make sure we don't overflow below */
505 size = SIZE_MAX - sizeof (struct ceph_snap_context);
506 if (snap_count > size / sizeof (header->snapc->snaps[0]))
507 return -EINVAL;
508
509 memset(header, 0, sizeof (*header));
510
511 size = sizeof (ondisk->block_name) + 1;
512 header->object_prefix = kmalloc(size, GFP_KERNEL);
513 if (!header->object_prefix)
514 return -ENOMEM;
515 memcpy(header->object_prefix, ondisk->block_name, size - 1);
516 header->object_prefix[size - 1] = '\0';
517
518 if (snap_count) {
519 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
520 BUG_ON(header->snap_names_len > (u64) SIZE_MAX);
521 header->snap_names = kmalloc(header->snap_names_len,
522 GFP_KERNEL);
523 if (!header->snap_names)
524 goto out_err;
525
526 size = snap_count * sizeof (*header->snap_sizes);
527 header->snap_sizes = kmalloc(size, GFP_KERNEL);
528 if (!header->snap_sizes)
529 goto out_err;
530 } else {
531 WARN_ON(ondisk->snap_names_len);
532 header->snap_names_len = 0;
533 header->snap_names = NULL;
534 header->snap_sizes = NULL;
535 }
536
537 header->image_size = le64_to_cpu(ondisk->image_size);
538 header->obj_order = ondisk->options.order;
539 header->crypt_type = ondisk->options.crypt_type;
540 header->comp_type = ondisk->options.comp_type;
541 header->total_snaps = snap_count;
542
543 /*
544 * If the number of snapshot ids provided by the caller
545 * doesn't match the number in the entire context there's
546 * no point in going further. Caller will try again after
547 * getting an updated snapshot context from the server.
548 */
549 if (allocated_snaps != snap_count)
550 return 0;
551
552 size = sizeof (struct ceph_snap_context);
553 size += snap_count * sizeof (header->snapc->snaps[0]);
554 header->snapc = kzalloc(size, GFP_KERNEL);
555 if (!header->snapc)
556 goto out_err;
557
558 atomic_set(&header->snapc->nref, 1);
559 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
560 header->snapc->num_snaps = snap_count;
561
562 /* Fill in the snapshot information */
563
564 if (snap_count) {
565 u32 i;
566
567 for (i = 0; i < snap_count; i++) {
568 header->snapc->snaps[i] =
569 le64_to_cpu(ondisk->snaps[i].id);
570 header->snap_sizes[i] =
571 le64_to_cpu(ondisk->snaps[i].image_size);
572 }
573
574 /* copy snapshot names */
575 memcpy(header->snap_names, &ondisk->snaps[snap_count],
576 header->snap_names_len);
577 }
578
579 return 0;
580
581 out_err:
582 kfree(header->snap_sizes);
583 header->snap_sizes = NULL;
584 kfree(header->snap_names);
585 header->snap_names = NULL;
586 header->snap_names_len = 0;
587 kfree(header->object_prefix);
588 header->object_prefix = NULL;
589
590 return -ENOMEM;
591 }
592
593 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
594 u64 *seq, u64 *size)
595 {
596 int i;
597 char *p = header->snap_names;
598
599 for (i = 0; i < header->total_snaps; i++) {
600 if (!strcmp(snap_name, p)) {
601
602 /* Found it. Pass back its id and/or size */
603
604 if (seq)
605 *seq = header->snapc->snaps[i];
606 if (size)
607 *size = header->snap_sizes[i];
608 return i;
609 }
610 p += strlen(p) + 1; /* Skip ahead to the next name */
611 }
612 return -ENOENT;
613 }
614
615 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
616 {
617 int ret;
618
619 down_write(&rbd_dev->header_rwsem);
620
621 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
622 sizeof (RBD_SNAP_HEAD_NAME))) {
623 rbd_dev->snap_id = CEPH_NOSNAP;
624 rbd_dev->snap_exists = false;
625 rbd_dev->read_only = 0;
626 if (size)
627 *size = rbd_dev->header.image_size;
628 } else {
629 u64 snap_id = 0;
630
631 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
632 &snap_id, size);
633 if (ret < 0)
634 goto done;
635 rbd_dev->snap_id = snap_id;
636 rbd_dev->snap_exists = true;
637 rbd_dev->read_only = 1;
638 }
639
640 ret = 0;
641 done:
642 up_write(&rbd_dev->header_rwsem);
643 return ret;
644 }
645
646 static void rbd_header_free(struct rbd_image_header *header)
647 {
648 kfree(header->object_prefix);
649 header->object_prefix = NULL;
650 kfree(header->snap_sizes);
651 header->snap_sizes = NULL;
652 kfree(header->snap_names);
653 header->snap_names = NULL;
654 header->snap_names_len = 0;
655 ceph_put_snap_context(header->snapc);
656 header->snapc = NULL;
657 }
658
659 /*
660 * get the actual striped segment name, offset and length
661 */
662 static u64 rbd_get_segment(struct rbd_image_header *header,
663 const char *object_prefix,
664 u64 ofs, u64 len,
665 char *seg_name, u64 *segofs)
666 {
667 u64 seg = ofs >> header->obj_order;
668
669 if (seg_name)
670 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
671 "%s.%012llx", object_prefix, seg);
672
673 ofs = ofs & ((1 << header->obj_order) - 1);
674 len = min_t(u64, len, (1 << header->obj_order) - ofs);
675
676 if (segofs)
677 *segofs = ofs;
678
679 return len;
680 }
681
682 static int rbd_get_num_segments(struct rbd_image_header *header,
683 u64 ofs, u64 len)
684 {
685 u64 start_seg = ofs >> header->obj_order;
686 u64 end_seg = (ofs + len - 1) >> header->obj_order;
687 return end_seg - start_seg + 1;
688 }
689
690 /*
691 * returns the size of an object in the image
692 */
693 static u64 rbd_obj_bytes(struct rbd_image_header *header)
694 {
695 return 1 << header->obj_order;
696 }
697
698 /*
699 * bio helpers
700 */
701
702 static void bio_chain_put(struct bio *chain)
703 {
704 struct bio *tmp;
705
706 while (chain) {
707 tmp = chain;
708 chain = chain->bi_next;
709 bio_put(tmp);
710 }
711 }
712
713 /*
714 * zeros a bio chain, starting at specific offset
715 */
716 static void zero_bio_chain(struct bio *chain, int start_ofs)
717 {
718 struct bio_vec *bv;
719 unsigned long flags;
720 void *buf;
721 int i;
722 int pos = 0;
723
724 while (chain) {
725 bio_for_each_segment(bv, chain, i) {
726 if (pos + bv->bv_len > start_ofs) {
727 int remainder = max(start_ofs - pos, 0);
728 buf = bvec_kmap_irq(bv, &flags);
729 memset(buf + remainder, 0,
730 bv->bv_len - remainder);
731 bvec_kunmap_irq(buf, &flags);
732 }
733 pos += bv->bv_len;
734 }
735
736 chain = chain->bi_next;
737 }
738 }
739
740 /*
741 * bio_chain_clone - clone a chain of bios up to a certain length.
742 * might return a bio_pair that will need to be released.
743 */
744 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
745 struct bio_pair **bp,
746 int len, gfp_t gfpmask)
747 {
748 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
749 int total = 0;
750
751 if (*bp) {
752 bio_pair_release(*bp);
753 *bp = NULL;
754 }
755
756 while (old_chain && (total < len)) {
757 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
758 if (!tmp)
759 goto err_out;
760
761 if (total + old_chain->bi_size > len) {
762 struct bio_pair *bp;
763
764 /*
765 * this split can only happen with a single paged bio,
766 * split_bio will BUG_ON if this is not the case
767 */
768 dout("bio_chain_clone split! total=%d remaining=%d"
769 "bi_size=%u\n",
770 total, len - total, old_chain->bi_size);
771
772 /* split the bio. We'll release it either in the next
773 call, or it will have to be released outside */
774 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
775 if (!bp)
776 goto err_out;
777
778 __bio_clone(tmp, &bp->bio1);
779
780 *next = &bp->bio2;
781 } else {
782 __bio_clone(tmp, old_chain);
783 *next = old_chain->bi_next;
784 }
785
786 tmp->bi_bdev = NULL;
787 gfpmask &= ~__GFP_WAIT;
788 tmp->bi_next = NULL;
789
790 if (!new_chain) {
791 new_chain = tail = tmp;
792 } else {
793 tail->bi_next = tmp;
794 tail = tmp;
795 }
796 old_chain = old_chain->bi_next;
797
798 total += tmp->bi_size;
799 }
800
801 BUG_ON(total < len);
802
803 if (tail)
804 tail->bi_next = NULL;
805
806 *old = old_chain;
807
808 return new_chain;
809
810 err_out:
811 dout("bio_chain_clone with err\n");
812 bio_chain_put(new_chain);
813 return NULL;
814 }
815
816 /*
817 * helpers for osd request op vectors.
818 */
819 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
820 int opcode, u32 payload_len)
821 {
822 struct ceph_osd_req_op *ops;
823
824 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
825 if (!ops)
826 return NULL;
827
828 ops[0].op = opcode;
829
830 /*
831 * op extent offset and length will be set later on
832 * in calc_raw_layout()
833 */
834 ops[0].payload_len = payload_len;
835
836 return ops;
837 }
838
839 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
840 {
841 kfree(ops);
842 }
843
844 static void rbd_coll_end_req_index(struct request *rq,
845 struct rbd_req_coll *coll,
846 int index,
847 int ret, u64 len)
848 {
849 struct request_queue *q;
850 int min, max, i;
851
852 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
853 coll, index, ret, (unsigned long long) len);
854
855 if (!rq)
856 return;
857
858 if (!coll) {
859 blk_end_request(rq, ret, len);
860 return;
861 }
862
863 q = rq->q;
864
865 spin_lock_irq(q->queue_lock);
866 coll->status[index].done = 1;
867 coll->status[index].rc = ret;
868 coll->status[index].bytes = len;
869 max = min = coll->num_done;
870 while (max < coll->total && coll->status[max].done)
871 max++;
872
873 for (i = min; i<max; i++) {
874 __blk_end_request(rq, coll->status[i].rc,
875 coll->status[i].bytes);
876 coll->num_done++;
877 kref_put(&coll->kref, rbd_coll_release);
878 }
879 spin_unlock_irq(q->queue_lock);
880 }
881
882 static void rbd_coll_end_req(struct rbd_request *req,
883 int ret, u64 len)
884 {
885 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
886 }
887
888 /*
889 * Send ceph osd request
890 */
891 static int rbd_do_request(struct request *rq,
892 struct rbd_device *rbd_dev,
893 struct ceph_snap_context *snapc,
894 u64 snapid,
895 const char *object_name, u64 ofs, u64 len,
896 struct bio *bio,
897 struct page **pages,
898 int num_pages,
899 int flags,
900 struct ceph_osd_req_op *ops,
901 struct rbd_req_coll *coll,
902 int coll_index,
903 void (*rbd_cb)(struct ceph_osd_request *req,
904 struct ceph_msg *msg),
905 struct ceph_osd_request **linger_req,
906 u64 *ver)
907 {
908 struct ceph_osd_request *req;
909 struct ceph_file_layout *layout;
910 int ret;
911 u64 bno;
912 struct timespec mtime = CURRENT_TIME;
913 struct rbd_request *req_data;
914 struct ceph_osd_request_head *reqhead;
915 struct ceph_osd_client *osdc;
916
917 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
918 if (!req_data) {
919 if (coll)
920 rbd_coll_end_req_index(rq, coll, coll_index,
921 -ENOMEM, len);
922 return -ENOMEM;
923 }
924
925 if (coll) {
926 req_data->coll = coll;
927 req_data->coll_index = coll_index;
928 }
929
930 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
931 (unsigned long long) ofs, (unsigned long long) len);
932
933 osdc = &rbd_dev->rbd_client->client->osdc;
934 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
935 false, GFP_NOIO, pages, bio);
936 if (!req) {
937 ret = -ENOMEM;
938 goto done_pages;
939 }
940
941 req->r_callback = rbd_cb;
942
943 req_data->rq = rq;
944 req_data->bio = bio;
945 req_data->pages = pages;
946 req_data->len = len;
947
948 req->r_priv = req_data;
949
950 reqhead = req->r_request->front.iov_base;
951 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
952
953 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
954 req->r_oid_len = strlen(req->r_oid);
955
956 layout = &req->r_file_layout;
957 memset(layout, 0, sizeof(*layout));
958 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
959 layout->fl_stripe_count = cpu_to_le32(1);
960 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
961 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
962 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
963 req, ops);
964
965 ceph_osdc_build_request(req, ofs, &len,
966 ops,
967 snapc,
968 &mtime,
969 req->r_oid, req->r_oid_len);
970
971 if (linger_req) {
972 ceph_osdc_set_request_linger(osdc, req);
973 *linger_req = req;
974 }
975
976 ret = ceph_osdc_start_request(osdc, req, false);
977 if (ret < 0)
978 goto done_err;
979
980 if (!rbd_cb) {
981 ret = ceph_osdc_wait_request(osdc, req);
982 if (ver)
983 *ver = le64_to_cpu(req->r_reassert_version.version);
984 dout("reassert_ver=%llu\n",
985 (unsigned long long)
986 le64_to_cpu(req->r_reassert_version.version));
987 ceph_osdc_put_request(req);
988 }
989 return ret;
990
991 done_err:
992 bio_chain_put(req_data->bio);
993 ceph_osdc_put_request(req);
994 done_pages:
995 rbd_coll_end_req(req_data, ret, len);
996 kfree(req_data);
997 return ret;
998 }
999
1000 /*
1001 * Ceph osd op callback
1002 */
1003 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1004 {
1005 struct rbd_request *req_data = req->r_priv;
1006 struct ceph_osd_reply_head *replyhead;
1007 struct ceph_osd_op *op;
1008 __s32 rc;
1009 u64 bytes;
1010 int read_op;
1011
1012 /* parse reply */
1013 replyhead = msg->front.iov_base;
1014 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1015 op = (void *)(replyhead + 1);
1016 rc = le32_to_cpu(replyhead->result);
1017 bytes = le64_to_cpu(op->extent.length);
1018 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1019
1020 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1021 (unsigned long long) bytes, read_op, (int) rc);
1022
1023 if (rc == -ENOENT && read_op) {
1024 zero_bio_chain(req_data->bio, 0);
1025 rc = 0;
1026 } else if (rc == 0 && read_op && bytes < req_data->len) {
1027 zero_bio_chain(req_data->bio, bytes);
1028 bytes = req_data->len;
1029 }
1030
1031 rbd_coll_end_req(req_data, rc, bytes);
1032
1033 if (req_data->bio)
1034 bio_chain_put(req_data->bio);
1035
1036 ceph_osdc_put_request(req);
1037 kfree(req_data);
1038 }
1039
1040 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1041 {
1042 ceph_osdc_put_request(req);
1043 }
1044
1045 /*
1046 * Do a synchronous ceph osd operation
1047 */
1048 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1049 struct ceph_snap_context *snapc,
1050 u64 snapid,
1051 int flags,
1052 struct ceph_osd_req_op *ops,
1053 const char *object_name,
1054 u64 ofs, u64 len,
1055 char *buf,
1056 struct ceph_osd_request **linger_req,
1057 u64 *ver)
1058 {
1059 int ret;
1060 struct page **pages;
1061 int num_pages;
1062
1063 BUG_ON(ops == NULL);
1064
1065 num_pages = calc_pages_for(ofs , len);
1066 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1067 if (IS_ERR(pages))
1068 return PTR_ERR(pages);
1069
1070 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1071 object_name, ofs, len, NULL,
1072 pages, num_pages,
1073 flags,
1074 ops,
1075 NULL, 0,
1076 NULL,
1077 linger_req, ver);
1078 if (ret < 0)
1079 goto done;
1080
1081 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1082 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1083
1084 done:
1085 ceph_release_page_vector(pages, num_pages);
1086 return ret;
1087 }
1088
1089 /*
1090 * Do an asynchronous ceph osd operation
1091 */
1092 static int rbd_do_op(struct request *rq,
1093 struct rbd_device *rbd_dev,
1094 struct ceph_snap_context *snapc,
1095 u64 snapid,
1096 int opcode, int flags,
1097 u64 ofs, u64 len,
1098 struct bio *bio,
1099 struct rbd_req_coll *coll,
1100 int coll_index)
1101 {
1102 char *seg_name;
1103 u64 seg_ofs;
1104 u64 seg_len;
1105 int ret;
1106 struct ceph_osd_req_op *ops;
1107 u32 payload_len;
1108
1109 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1110 if (!seg_name)
1111 return -ENOMEM;
1112
1113 seg_len = rbd_get_segment(&rbd_dev->header,
1114 rbd_dev->header.object_prefix,
1115 ofs, len,
1116 seg_name, &seg_ofs);
1117
1118 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1119
1120 ret = -ENOMEM;
1121 ops = rbd_create_rw_ops(1, opcode, payload_len);
1122 if (!ops)
1123 goto done;
1124
1125 /* we've taken care of segment sizes earlier when we
1126 cloned the bios. We should never have a segment
1127 truncated at this point */
1128 BUG_ON(seg_len < len);
1129
1130 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1131 seg_name, seg_ofs, seg_len,
1132 bio,
1133 NULL, 0,
1134 flags,
1135 ops,
1136 coll, coll_index,
1137 rbd_req_cb, 0, NULL);
1138
1139 rbd_destroy_ops(ops);
1140 done:
1141 kfree(seg_name);
1142 return ret;
1143 }
1144
1145 /*
1146 * Request async osd write
1147 */
1148 static int rbd_req_write(struct request *rq,
1149 struct rbd_device *rbd_dev,
1150 struct ceph_snap_context *snapc,
1151 u64 ofs, u64 len,
1152 struct bio *bio,
1153 struct rbd_req_coll *coll,
1154 int coll_index)
1155 {
1156 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1157 CEPH_OSD_OP_WRITE,
1158 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1159 ofs, len, bio, coll, coll_index);
1160 }
1161
1162 /*
1163 * Request async osd read
1164 */
1165 static int rbd_req_read(struct request *rq,
1166 struct rbd_device *rbd_dev,
1167 u64 snapid,
1168 u64 ofs, u64 len,
1169 struct bio *bio,
1170 struct rbd_req_coll *coll,
1171 int coll_index)
1172 {
1173 return rbd_do_op(rq, rbd_dev, NULL,
1174 snapid,
1175 CEPH_OSD_OP_READ,
1176 CEPH_OSD_FLAG_READ,
1177 ofs, len, bio, coll, coll_index);
1178 }
1179
1180 /*
1181 * Request sync osd read
1182 */
1183 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1184 u64 snapid,
1185 const char *object_name,
1186 u64 ofs, u64 len,
1187 char *buf,
1188 u64 *ver)
1189 {
1190 struct ceph_osd_req_op *ops;
1191 int ret;
1192
1193 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1194 if (!ops)
1195 return -ENOMEM;
1196
1197 ret = rbd_req_sync_op(rbd_dev, NULL,
1198 snapid,
1199 CEPH_OSD_FLAG_READ,
1200 ops, object_name, ofs, len, buf, NULL, ver);
1201 rbd_destroy_ops(ops);
1202
1203 return ret;
1204 }
1205
1206 /*
1207 * Request sync osd watch
1208 */
1209 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1210 u64 ver,
1211 u64 notify_id)
1212 {
1213 struct ceph_osd_req_op *ops;
1214 int ret;
1215
1216 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1217 if (!ops)
1218 return -ENOMEM;
1219
1220 ops[0].watch.ver = cpu_to_le64(ver);
1221 ops[0].watch.cookie = notify_id;
1222 ops[0].watch.flag = 0;
1223
1224 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1225 rbd_dev->header_name, 0, 0, NULL,
1226 NULL, 0,
1227 CEPH_OSD_FLAG_READ,
1228 ops,
1229 NULL, 0,
1230 rbd_simple_req_cb, 0, NULL);
1231
1232 rbd_destroy_ops(ops);
1233 return ret;
1234 }
1235
1236 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1237 {
1238 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1239 u64 hver;
1240 int rc;
1241
1242 if (!rbd_dev)
1243 return;
1244
1245 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1246 rbd_dev->header_name, (unsigned long long) notify_id,
1247 (unsigned int) opcode);
1248 rc = rbd_refresh_header(rbd_dev, &hver);
1249 if (rc)
1250 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1251 " update snaps: %d\n", rbd_dev->major, rc);
1252
1253 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1254 }
1255
1256 /*
1257 * Request sync osd watch
1258 */
1259 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1260 {
1261 struct ceph_osd_req_op *ops;
1262 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1263 int ret;
1264
1265 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1266 if (!ops)
1267 return -ENOMEM;
1268
1269 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1270 (void *)rbd_dev, &rbd_dev->watch_event);
1271 if (ret < 0)
1272 goto fail;
1273
1274 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1275 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1276 ops[0].watch.flag = 1;
1277
1278 ret = rbd_req_sync_op(rbd_dev, NULL,
1279 CEPH_NOSNAP,
1280 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1281 ops,
1282 rbd_dev->header_name,
1283 0, 0, NULL,
1284 &rbd_dev->watch_request, NULL);
1285
1286 if (ret < 0)
1287 goto fail_event;
1288
1289 rbd_destroy_ops(ops);
1290 return 0;
1291
1292 fail_event:
1293 ceph_osdc_cancel_event(rbd_dev->watch_event);
1294 rbd_dev->watch_event = NULL;
1295 fail:
1296 rbd_destroy_ops(ops);
1297 return ret;
1298 }
1299
1300 /*
1301 * Request sync osd unwatch
1302 */
1303 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1304 {
1305 struct ceph_osd_req_op *ops;
1306 int ret;
1307
1308 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1309 if (!ops)
1310 return -ENOMEM;
1311
1312 ops[0].watch.ver = 0;
1313 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1314 ops[0].watch.flag = 0;
1315
1316 ret = rbd_req_sync_op(rbd_dev, NULL,
1317 CEPH_NOSNAP,
1318 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1319 ops,
1320 rbd_dev->header_name,
1321 0, 0, NULL, NULL, NULL);
1322
1323
1324 rbd_destroy_ops(ops);
1325 ceph_osdc_cancel_event(rbd_dev->watch_event);
1326 rbd_dev->watch_event = NULL;
1327 return ret;
1328 }
1329
1330 struct rbd_notify_info {
1331 struct rbd_device *rbd_dev;
1332 };
1333
1334 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1335 {
1336 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1337 if (!rbd_dev)
1338 return;
1339
1340 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1341 rbd_dev->header_name, (unsigned long long) notify_id,
1342 (unsigned int) opcode);
1343 }
1344
1345 /*
1346 * Request sync osd notify
1347 */
1348 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1349 {
1350 struct ceph_osd_req_op *ops;
1351 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1352 struct ceph_osd_event *event;
1353 struct rbd_notify_info info;
1354 int payload_len = sizeof(u32) + sizeof(u32);
1355 int ret;
1356
1357 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1358 if (!ops)
1359 return -ENOMEM;
1360
1361 info.rbd_dev = rbd_dev;
1362
1363 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1364 (void *)&info, &event);
1365 if (ret < 0)
1366 goto fail;
1367
1368 ops[0].watch.ver = 1;
1369 ops[0].watch.flag = 1;
1370 ops[0].watch.cookie = event->cookie;
1371 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1372 ops[0].watch.timeout = 12;
1373
1374 ret = rbd_req_sync_op(rbd_dev, NULL,
1375 CEPH_NOSNAP,
1376 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1377 ops,
1378 rbd_dev->header_name,
1379 0, 0, NULL, NULL, NULL);
1380 if (ret < 0)
1381 goto fail_event;
1382
1383 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1384 dout("ceph_osdc_wait_event returned %d\n", ret);
1385 rbd_destroy_ops(ops);
1386 return 0;
1387
1388 fail_event:
1389 ceph_osdc_cancel_event(event);
1390 fail:
1391 rbd_destroy_ops(ops);
1392 return ret;
1393 }
1394
1395 /*
1396 * Request sync osd read
1397 */
1398 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1399 const char *object_name,
1400 const char *class_name,
1401 const char *method_name,
1402 const char *data,
1403 int len,
1404 u64 *ver)
1405 {
1406 struct ceph_osd_req_op *ops;
1407 int class_name_len = strlen(class_name);
1408 int method_name_len = strlen(method_name);
1409 int ret;
1410
1411 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1412 class_name_len + method_name_len + len);
1413 if (!ops)
1414 return -ENOMEM;
1415
1416 ops[0].cls.class_name = class_name;
1417 ops[0].cls.class_len = (__u8) class_name_len;
1418 ops[0].cls.method_name = method_name;
1419 ops[0].cls.method_len = (__u8) method_name_len;
1420 ops[0].cls.argc = 0;
1421 ops[0].cls.indata = data;
1422 ops[0].cls.indata_len = len;
1423
1424 ret = rbd_req_sync_op(rbd_dev, NULL,
1425 CEPH_NOSNAP,
1426 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1427 ops,
1428 object_name, 0, 0, NULL, NULL, ver);
1429
1430 rbd_destroy_ops(ops);
1431
1432 dout("cls_exec returned %d\n", ret);
1433 return ret;
1434 }
1435
1436 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1437 {
1438 struct rbd_req_coll *coll =
1439 kzalloc(sizeof(struct rbd_req_coll) +
1440 sizeof(struct rbd_req_status) * num_reqs,
1441 GFP_ATOMIC);
1442
1443 if (!coll)
1444 return NULL;
1445 coll->total = num_reqs;
1446 kref_init(&coll->kref);
1447 return coll;
1448 }
1449
1450 /*
1451 * block device queue callback
1452 */
1453 static void rbd_rq_fn(struct request_queue *q)
1454 {
1455 struct rbd_device *rbd_dev = q->queuedata;
1456 struct request *rq;
1457 struct bio_pair *bp = NULL;
1458
1459 while ((rq = blk_fetch_request(q))) {
1460 struct bio *bio;
1461 struct bio *rq_bio, *next_bio = NULL;
1462 bool do_write;
1463 unsigned int size;
1464 u64 op_size = 0;
1465 u64 ofs;
1466 int num_segs, cur_seg = 0;
1467 struct rbd_req_coll *coll;
1468 struct ceph_snap_context *snapc;
1469
1470 /* peek at request from block layer */
1471 if (!rq)
1472 break;
1473
1474 dout("fetched request\n");
1475
1476 /* filter out block requests we don't understand */
1477 if ((rq->cmd_type != REQ_TYPE_FS)) {
1478 __blk_end_request_all(rq, 0);
1479 continue;
1480 }
1481
1482 /* deduce our operation (read, write) */
1483 do_write = (rq_data_dir(rq) == WRITE);
1484
1485 size = blk_rq_bytes(rq);
1486 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1487 rq_bio = rq->bio;
1488 if (do_write && rbd_dev->read_only) {
1489 __blk_end_request_all(rq, -EROFS);
1490 continue;
1491 }
1492
1493 spin_unlock_irq(q->queue_lock);
1494
1495 down_read(&rbd_dev->header_rwsem);
1496
1497 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1498 up_read(&rbd_dev->header_rwsem);
1499 dout("request for non-existent snapshot");
1500 spin_lock_irq(q->queue_lock);
1501 __blk_end_request_all(rq, -ENXIO);
1502 continue;
1503 }
1504
1505 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1506
1507 up_read(&rbd_dev->header_rwsem);
1508
1509 dout("%s 0x%x bytes at 0x%llx\n",
1510 do_write ? "write" : "read",
1511 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1512
1513 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1514 coll = rbd_alloc_coll(num_segs);
1515 if (!coll) {
1516 spin_lock_irq(q->queue_lock);
1517 __blk_end_request_all(rq, -ENOMEM);
1518 ceph_put_snap_context(snapc);
1519 continue;
1520 }
1521
1522 do {
1523 /* a bio clone to be passed down to OSD req */
1524 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1525 op_size = rbd_get_segment(&rbd_dev->header,
1526 rbd_dev->header.object_prefix,
1527 ofs, size,
1528 NULL, NULL);
1529 kref_get(&coll->kref);
1530 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1531 op_size, GFP_ATOMIC);
1532 if (!bio) {
1533 rbd_coll_end_req_index(rq, coll, cur_seg,
1534 -ENOMEM, op_size);
1535 goto next_seg;
1536 }
1537
1538
1539 /* init OSD command: write or read */
1540 if (do_write)
1541 rbd_req_write(rq, rbd_dev,
1542 snapc,
1543 ofs,
1544 op_size, bio,
1545 coll, cur_seg);
1546 else
1547 rbd_req_read(rq, rbd_dev,
1548 rbd_dev->snap_id,
1549 ofs,
1550 op_size, bio,
1551 coll, cur_seg);
1552
1553 next_seg:
1554 size -= op_size;
1555 ofs += op_size;
1556
1557 cur_seg++;
1558 rq_bio = next_bio;
1559 } while (size > 0);
1560 kref_put(&coll->kref, rbd_coll_release);
1561
1562 if (bp)
1563 bio_pair_release(bp);
1564 spin_lock_irq(q->queue_lock);
1565
1566 ceph_put_snap_context(snapc);
1567 }
1568 }
1569
1570 /*
1571 * a queue callback. Makes sure that we don't create a bio that spans across
1572 * multiple osd objects. One exception would be with a single page bios,
1573 * which we handle later at bio_chain_clone
1574 */
1575 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1576 struct bio_vec *bvec)
1577 {
1578 struct rbd_device *rbd_dev = q->queuedata;
1579 unsigned int chunk_sectors;
1580 sector_t sector;
1581 unsigned int bio_sectors;
1582 int max;
1583
1584 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1585 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1586 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1587
1588 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1589 + bio_sectors)) << SECTOR_SHIFT;
1590 if (max < 0)
1591 max = 0; /* bio_add cannot handle a negative return */
1592 if (max <= bvec->bv_len && bio_sectors == 0)
1593 return bvec->bv_len;
1594 return max;
1595 }
1596
1597 static void rbd_free_disk(struct rbd_device *rbd_dev)
1598 {
1599 struct gendisk *disk = rbd_dev->disk;
1600
1601 if (!disk)
1602 return;
1603
1604 rbd_header_free(&rbd_dev->header);
1605
1606 if (disk->flags & GENHD_FL_UP)
1607 del_gendisk(disk);
1608 if (disk->queue)
1609 blk_cleanup_queue(disk->queue);
1610 put_disk(disk);
1611 }
1612
1613 /*
1614 * reload the ondisk the header
1615 */
1616 static int rbd_read_header(struct rbd_device *rbd_dev,
1617 struct rbd_image_header *header)
1618 {
1619 ssize_t rc;
1620 struct rbd_image_header_ondisk *dh;
1621 u32 snap_count = 0;
1622 u64 ver;
1623 size_t len;
1624
1625 /*
1626 * First reads the fixed-size header to determine the number
1627 * of snapshots, then re-reads it, along with all snapshot
1628 * records as well as their stored names.
1629 */
1630 len = sizeof (*dh);
1631 while (1) {
1632 dh = kmalloc(len, GFP_KERNEL);
1633 if (!dh)
1634 return -ENOMEM;
1635
1636 rc = rbd_req_sync_read(rbd_dev,
1637 CEPH_NOSNAP,
1638 rbd_dev->header_name,
1639 0, len,
1640 (char *)dh, &ver);
1641 if (rc < 0)
1642 goto out_dh;
1643
1644 rc = rbd_header_from_disk(header, dh, snap_count);
1645 if (rc < 0) {
1646 if (rc == -ENXIO)
1647 pr_warning("unrecognized header format"
1648 " for image %s\n",
1649 rbd_dev->image_name);
1650 goto out_dh;
1651 }
1652
1653 if (snap_count == header->total_snaps)
1654 break;
1655
1656 snap_count = header->total_snaps;
1657 len = sizeof (*dh) +
1658 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1659 header->snap_names_len;
1660
1661 rbd_header_free(header);
1662 kfree(dh);
1663 }
1664 header->obj_version = ver;
1665
1666 out_dh:
1667 kfree(dh);
1668 return rc;
1669 }
1670
1671 /*
1672 * create a snapshot
1673 */
1674 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1675 const char *snap_name,
1676 gfp_t gfp_flags)
1677 {
1678 int name_len = strlen(snap_name);
1679 u64 new_snapid;
1680 int ret;
1681 void *data, *p, *e;
1682 struct ceph_mon_client *monc;
1683
1684 /* we should create a snapshot only if we're pointing at the head */
1685 if (rbd_dev->snap_id != CEPH_NOSNAP)
1686 return -EINVAL;
1687
1688 monc = &rbd_dev->rbd_client->client->monc;
1689 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1690 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1691 if (ret < 0)
1692 return ret;
1693
1694 data = kmalloc(name_len + 16, gfp_flags);
1695 if (!data)
1696 return -ENOMEM;
1697
1698 p = data;
1699 e = data + name_len + 16;
1700
1701 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1702 ceph_encode_64_safe(&p, e, new_snapid, bad);
1703
1704 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1705 "rbd", "snap_add",
1706 data, p - data, NULL);
1707
1708 kfree(data);
1709
1710 return ret < 0 ? ret : 0;
1711 bad:
1712 return -ERANGE;
1713 }
1714
1715 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1716 {
1717 struct rbd_snap *snap;
1718 struct rbd_snap *next;
1719
1720 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1721 __rbd_remove_snap_dev(snap);
1722 }
1723
1724 /*
1725 * only read the first part of the ondisk header, without the snaps info
1726 */
1727 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1728 {
1729 int ret;
1730 struct rbd_image_header h;
1731
1732 ret = rbd_read_header(rbd_dev, &h);
1733 if (ret < 0)
1734 return ret;
1735
1736 down_write(&rbd_dev->header_rwsem);
1737
1738 /* resized? */
1739 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1740 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1741
1742 dout("setting size to %llu sectors", (unsigned long long) size);
1743 set_capacity(rbd_dev->disk, size);
1744 }
1745
1746 /* rbd_dev->header.object_prefix shouldn't change */
1747 kfree(rbd_dev->header.snap_sizes);
1748 kfree(rbd_dev->header.snap_names);
1749 /* osd requests may still refer to snapc */
1750 ceph_put_snap_context(rbd_dev->header.snapc);
1751
1752 if (hver)
1753 *hver = h.obj_version;
1754 rbd_dev->header.obj_version = h.obj_version;
1755 rbd_dev->header.image_size = h.image_size;
1756 rbd_dev->header.total_snaps = h.total_snaps;
1757 rbd_dev->header.snapc = h.snapc;
1758 rbd_dev->header.snap_names = h.snap_names;
1759 rbd_dev->header.snap_names_len = h.snap_names_len;
1760 rbd_dev->header.snap_sizes = h.snap_sizes;
1761 /* Free the extra copy of the object prefix */
1762 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1763 kfree(h.object_prefix);
1764
1765 ret = __rbd_init_snaps_header(rbd_dev);
1766
1767 up_write(&rbd_dev->header_rwsem);
1768
1769 return ret;
1770 }
1771
1772 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1773 {
1774 int ret;
1775
1776 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1777 ret = __rbd_refresh_header(rbd_dev, hver);
1778 mutex_unlock(&ctl_mutex);
1779
1780 return ret;
1781 }
1782
1783 static int rbd_init_disk(struct rbd_device *rbd_dev)
1784 {
1785 struct gendisk *disk;
1786 struct request_queue *q;
1787 int rc;
1788 u64 segment_size;
1789 u64 total_size = 0;
1790
1791 /* contact OSD, request size info about the object being mapped */
1792 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1793 if (rc)
1794 return rc;
1795
1796 /* no need to lock here, as rbd_dev is not registered yet */
1797 rc = __rbd_init_snaps_header(rbd_dev);
1798 if (rc)
1799 return rc;
1800
1801 rc = rbd_header_set_snap(rbd_dev, &total_size);
1802 if (rc)
1803 return rc;
1804
1805 /* create gendisk info */
1806 rc = -ENOMEM;
1807 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1808 if (!disk)
1809 goto out;
1810
1811 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1812 rbd_dev->dev_id);
1813 disk->major = rbd_dev->major;
1814 disk->first_minor = 0;
1815 disk->fops = &rbd_bd_ops;
1816 disk->private_data = rbd_dev;
1817
1818 /* init rq */
1819 rc = -ENOMEM;
1820 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1821 if (!q)
1822 goto out_disk;
1823
1824 /* We use the default size, but let's be explicit about it. */
1825 blk_queue_physical_block_size(q, SECTOR_SIZE);
1826
1827 /* set io sizes to object size */
1828 segment_size = rbd_obj_bytes(&rbd_dev->header);
1829 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1830 blk_queue_max_segment_size(q, segment_size);
1831 blk_queue_io_min(q, segment_size);
1832 blk_queue_io_opt(q, segment_size);
1833
1834 blk_queue_merge_bvec(q, rbd_merge_bvec);
1835 disk->queue = q;
1836
1837 q->queuedata = rbd_dev;
1838
1839 rbd_dev->disk = disk;
1840 rbd_dev->q = q;
1841
1842 /* finally, announce the disk to the world */
1843 set_capacity(disk, total_size / SECTOR_SIZE);
1844 add_disk(disk);
1845
1846 pr_info("%s: added with size 0x%llx\n",
1847 disk->disk_name, (unsigned long long)total_size);
1848 return 0;
1849
1850 out_disk:
1851 put_disk(disk);
1852 out:
1853 return rc;
1854 }
1855
1856 /*
1857 sysfs
1858 */
1859
1860 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1861 {
1862 return container_of(dev, struct rbd_device, dev);
1863 }
1864
1865 static ssize_t rbd_size_show(struct device *dev,
1866 struct device_attribute *attr, char *buf)
1867 {
1868 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1869 sector_t size;
1870
1871 down_read(&rbd_dev->header_rwsem);
1872 size = get_capacity(rbd_dev->disk);
1873 up_read(&rbd_dev->header_rwsem);
1874
1875 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1876 }
1877
1878 static ssize_t rbd_major_show(struct device *dev,
1879 struct device_attribute *attr, char *buf)
1880 {
1881 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1882
1883 return sprintf(buf, "%d\n", rbd_dev->major);
1884 }
1885
1886 static ssize_t rbd_client_id_show(struct device *dev,
1887 struct device_attribute *attr, char *buf)
1888 {
1889 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1890
1891 return sprintf(buf, "client%lld\n",
1892 ceph_client_id(rbd_dev->rbd_client->client));
1893 }
1894
1895 static ssize_t rbd_pool_show(struct device *dev,
1896 struct device_attribute *attr, char *buf)
1897 {
1898 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1899
1900 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1901 }
1902
1903 static ssize_t rbd_pool_id_show(struct device *dev,
1904 struct device_attribute *attr, char *buf)
1905 {
1906 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1907
1908 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1909 }
1910
1911 static ssize_t rbd_name_show(struct device *dev,
1912 struct device_attribute *attr, char *buf)
1913 {
1914 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1915
1916 return sprintf(buf, "%s\n", rbd_dev->image_name);
1917 }
1918
1919 static ssize_t rbd_snap_show(struct device *dev,
1920 struct device_attribute *attr,
1921 char *buf)
1922 {
1923 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1924
1925 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1926 }
1927
1928 static ssize_t rbd_image_refresh(struct device *dev,
1929 struct device_attribute *attr,
1930 const char *buf,
1931 size_t size)
1932 {
1933 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1934 int ret;
1935
1936 ret = rbd_refresh_header(rbd_dev, NULL);
1937
1938 return ret < 0 ? ret : size;
1939 }
1940
1941 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1942 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1943 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1944 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1945 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1946 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1947 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1948 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1949 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1950
1951 static struct attribute *rbd_attrs[] = {
1952 &dev_attr_size.attr,
1953 &dev_attr_major.attr,
1954 &dev_attr_client_id.attr,
1955 &dev_attr_pool.attr,
1956 &dev_attr_pool_id.attr,
1957 &dev_attr_name.attr,
1958 &dev_attr_current_snap.attr,
1959 &dev_attr_refresh.attr,
1960 &dev_attr_create_snap.attr,
1961 NULL
1962 };
1963
1964 static struct attribute_group rbd_attr_group = {
1965 .attrs = rbd_attrs,
1966 };
1967
1968 static const struct attribute_group *rbd_attr_groups[] = {
1969 &rbd_attr_group,
1970 NULL
1971 };
1972
1973 static void rbd_sysfs_dev_release(struct device *dev)
1974 {
1975 }
1976
1977 static struct device_type rbd_device_type = {
1978 .name = "rbd",
1979 .groups = rbd_attr_groups,
1980 .release = rbd_sysfs_dev_release,
1981 };
1982
1983
1984 /*
1985 sysfs - snapshots
1986 */
1987
1988 static ssize_t rbd_snap_size_show(struct device *dev,
1989 struct device_attribute *attr,
1990 char *buf)
1991 {
1992 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1993
1994 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1995 }
1996
1997 static ssize_t rbd_snap_id_show(struct device *dev,
1998 struct device_attribute *attr,
1999 char *buf)
2000 {
2001 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2002
2003 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2004 }
2005
2006 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2007 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2008
2009 static struct attribute *rbd_snap_attrs[] = {
2010 &dev_attr_snap_size.attr,
2011 &dev_attr_snap_id.attr,
2012 NULL,
2013 };
2014
2015 static struct attribute_group rbd_snap_attr_group = {
2016 .attrs = rbd_snap_attrs,
2017 };
2018
2019 static void rbd_snap_dev_release(struct device *dev)
2020 {
2021 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2022 kfree(snap->name);
2023 kfree(snap);
2024 }
2025
2026 static const struct attribute_group *rbd_snap_attr_groups[] = {
2027 &rbd_snap_attr_group,
2028 NULL
2029 };
2030
2031 static struct device_type rbd_snap_device_type = {
2032 .groups = rbd_snap_attr_groups,
2033 .release = rbd_snap_dev_release,
2034 };
2035
2036 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2037 {
2038 list_del(&snap->node);
2039 device_unregister(&snap->dev);
2040 }
2041
2042 static int rbd_register_snap_dev(struct rbd_snap *snap,
2043 struct device *parent)
2044 {
2045 struct device *dev = &snap->dev;
2046 int ret;
2047
2048 dev->type = &rbd_snap_device_type;
2049 dev->parent = parent;
2050 dev->release = rbd_snap_dev_release;
2051 dev_set_name(dev, "snap_%s", snap->name);
2052 ret = device_register(dev);
2053
2054 return ret;
2055 }
2056
2057 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2058 int i, const char *name)
2059 {
2060 struct rbd_snap *snap;
2061 int ret;
2062
2063 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2064 if (!snap)
2065 return ERR_PTR(-ENOMEM);
2066
2067 ret = -ENOMEM;
2068 snap->name = kstrdup(name, GFP_KERNEL);
2069 if (!snap->name)
2070 goto err;
2071
2072 snap->size = rbd_dev->header.snap_sizes[i];
2073 snap->id = rbd_dev->header.snapc->snaps[i];
2074 if (device_is_registered(&rbd_dev->dev)) {
2075 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2076 if (ret < 0)
2077 goto err;
2078 }
2079
2080 return snap;
2081
2082 err:
2083 kfree(snap->name);
2084 kfree(snap);
2085
2086 return ERR_PTR(ret);
2087 }
2088
2089 /*
2090 * Scan the rbd device's current snapshot list and compare it to the
2091 * newly-received snapshot context. Remove any existing snapshots
2092 * not present in the new snapshot context. Add a new snapshot for
2093 * any snaphots in the snapshot context not in the current list.
2094 * And verify there are no changes to snapshots we already know
2095 * about.
2096 *
2097 * Assumes the snapshots in the snapshot context are sorted by
2098 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2099 * are also maintained in that order.)
2100 */
2101 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2102 {
2103 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2104 const u32 snap_count = snapc->num_snaps;
2105 char *snap_name = rbd_dev->header.snap_names;
2106 struct list_head *head = &rbd_dev->snaps;
2107 struct list_head *links = head->next;
2108 u32 index = 0;
2109
2110 while (index < snap_count || links != head) {
2111 u64 snap_id;
2112 struct rbd_snap *snap;
2113
2114 snap_id = index < snap_count ? snapc->snaps[index]
2115 : CEPH_NOSNAP;
2116 snap = links != head ? list_entry(links, struct rbd_snap, node)
2117 : NULL;
2118 BUG_ON(snap && snap->id == CEPH_NOSNAP);
2119
2120 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2121 struct list_head *next = links->next;
2122
2123 /* Existing snapshot not in the new snap context */
2124
2125 if (rbd_dev->snap_id == snap->id)
2126 rbd_dev->snap_exists = false;
2127 __rbd_remove_snap_dev(snap);
2128
2129 /* Done with this list entry; advance */
2130
2131 links = next;
2132 continue;
2133 }
2134
2135 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2136 struct rbd_snap *new_snap;
2137
2138 /* We haven't seen this snapshot before */
2139
2140 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2141 snap_name);
2142 if (IS_ERR(new_snap))
2143 return PTR_ERR(new_snap);
2144
2145 /* New goes before existing, or at end of list */
2146
2147 if (snap)
2148 list_add_tail(&new_snap->node, &snap->node);
2149 else
2150 list_add(&new_snap->node, head);
2151 } else {
2152 /* Already have this one */
2153
2154 BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2155 BUG_ON(strcmp(snap->name, snap_name));
2156
2157 /* Done with this list entry; advance */
2158
2159 links = links->next;
2160 }
2161
2162 /* Advance to the next entry in the snapshot context */
2163
2164 index++;
2165 snap_name += strlen(snap_name) + 1;
2166 }
2167
2168 return 0;
2169 }
2170
2171 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2172 {
2173 int ret;
2174 struct device *dev;
2175 struct rbd_snap *snap;
2176
2177 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2178 dev = &rbd_dev->dev;
2179
2180 dev->bus = &rbd_bus_type;
2181 dev->type = &rbd_device_type;
2182 dev->parent = &rbd_root_dev;
2183 dev->release = rbd_dev_release;
2184 dev_set_name(dev, "%d", rbd_dev->dev_id);
2185 ret = device_register(dev);
2186 if (ret < 0)
2187 goto out;
2188
2189 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2190 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2191 if (ret < 0)
2192 break;
2193 }
2194 out:
2195 mutex_unlock(&ctl_mutex);
2196 return ret;
2197 }
2198
2199 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2200 {
2201 device_unregister(&rbd_dev->dev);
2202 }
2203
2204 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2205 {
2206 int ret, rc;
2207
2208 do {
2209 ret = rbd_req_sync_watch(rbd_dev);
2210 if (ret == -ERANGE) {
2211 rc = rbd_refresh_header(rbd_dev, NULL);
2212 if (rc < 0)
2213 return rc;
2214 }
2215 } while (ret == -ERANGE);
2216
2217 return ret;
2218 }
2219
2220 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2221
2222 /*
2223 * Get a unique rbd identifier for the given new rbd_dev, and add
2224 * the rbd_dev to the global list. The minimum rbd id is 1.
2225 */
2226 static void rbd_id_get(struct rbd_device *rbd_dev)
2227 {
2228 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2229
2230 spin_lock(&rbd_dev_list_lock);
2231 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2232 spin_unlock(&rbd_dev_list_lock);
2233 }
2234
2235 /*
2236 * Remove an rbd_dev from the global list, and record that its
2237 * identifier is no longer in use.
2238 */
2239 static void rbd_id_put(struct rbd_device *rbd_dev)
2240 {
2241 struct list_head *tmp;
2242 int rbd_id = rbd_dev->dev_id;
2243 int max_id;
2244
2245 BUG_ON(rbd_id < 1);
2246
2247 spin_lock(&rbd_dev_list_lock);
2248 list_del_init(&rbd_dev->node);
2249
2250 /*
2251 * If the id being "put" is not the current maximum, there
2252 * is nothing special we need to do.
2253 */
2254 if (rbd_id != atomic64_read(&rbd_id_max)) {
2255 spin_unlock(&rbd_dev_list_lock);
2256 return;
2257 }
2258
2259 /*
2260 * We need to update the current maximum id. Search the
2261 * list to find out what it is. We're more likely to find
2262 * the maximum at the end, so search the list backward.
2263 */
2264 max_id = 0;
2265 list_for_each_prev(tmp, &rbd_dev_list) {
2266 struct rbd_device *rbd_dev;
2267
2268 rbd_dev = list_entry(tmp, struct rbd_device, node);
2269 if (rbd_id > max_id)
2270 max_id = rbd_id;
2271 }
2272 spin_unlock(&rbd_dev_list_lock);
2273
2274 /*
2275 * The max id could have been updated by rbd_id_get(), in
2276 * which case it now accurately reflects the new maximum.
2277 * Be careful not to overwrite the maximum value in that
2278 * case.
2279 */
2280 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2281 }
2282
2283 /*
2284 * Skips over white space at *buf, and updates *buf to point to the
2285 * first found non-space character (if any). Returns the length of
2286 * the token (string of non-white space characters) found. Note
2287 * that *buf must be terminated with '\0'.
2288 */
2289 static inline size_t next_token(const char **buf)
2290 {
2291 /*
2292 * These are the characters that produce nonzero for
2293 * isspace() in the "C" and "POSIX" locales.
2294 */
2295 const char *spaces = " \f\n\r\t\v";
2296
2297 *buf += strspn(*buf, spaces); /* Find start of token */
2298
2299 return strcspn(*buf, spaces); /* Return token length */
2300 }
2301
2302 /*
2303 * Finds the next token in *buf, and if the provided token buffer is
2304 * big enough, copies the found token into it. The result, if
2305 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2306 * must be terminated with '\0' on entry.
2307 *
2308 * Returns the length of the token found (not including the '\0').
2309 * Return value will be 0 if no token is found, and it will be >=
2310 * token_size if the token would not fit.
2311 *
2312 * The *buf pointer will be updated to point beyond the end of the
2313 * found token. Note that this occurs even if the token buffer is
2314 * too small to hold it.
2315 */
2316 static inline size_t copy_token(const char **buf,
2317 char *token,
2318 size_t token_size)
2319 {
2320 size_t len;
2321
2322 len = next_token(buf);
2323 if (len < token_size) {
2324 memcpy(token, *buf, len);
2325 *(token + len) = '\0';
2326 }
2327 *buf += len;
2328
2329 return len;
2330 }
2331
2332 /*
2333 * Finds the next token in *buf, dynamically allocates a buffer big
2334 * enough to hold a copy of it, and copies the token into the new
2335 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2336 * that a duplicate buffer is created even for a zero-length token.
2337 *
2338 * Returns a pointer to the newly-allocated duplicate, or a null
2339 * pointer if memory for the duplicate was not available. If
2340 * the lenp argument is a non-null pointer, the length of the token
2341 * (not including the '\0') is returned in *lenp.
2342 *
2343 * If successful, the *buf pointer will be updated to point beyond
2344 * the end of the found token.
2345 *
2346 * Note: uses GFP_KERNEL for allocation.
2347 */
2348 static inline char *dup_token(const char **buf, size_t *lenp)
2349 {
2350 char *dup;
2351 size_t len;
2352
2353 len = next_token(buf);
2354 dup = kmalloc(len + 1, GFP_KERNEL);
2355 if (!dup)
2356 return NULL;
2357
2358 memcpy(dup, *buf, len);
2359 *(dup + len) = '\0';
2360 *buf += len;
2361
2362 if (lenp)
2363 *lenp = len;
2364
2365 return dup;
2366 }
2367
2368 /*
2369 * This fills in the pool_name, image_name, image_name_len, snap_name,
2370 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2371 * on the list of monitor addresses and other options provided via
2372 * /sys/bus/rbd/add.
2373 *
2374 * Note: rbd_dev is assumed to have been initially zero-filled.
2375 */
2376 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2377 const char *buf,
2378 const char **mon_addrs,
2379 size_t *mon_addrs_size,
2380 char *options,
2381 size_t options_size)
2382 {
2383 size_t len;
2384 int ret;
2385
2386 /* The first four tokens are required */
2387
2388 len = next_token(&buf);
2389 if (!len)
2390 return -EINVAL;
2391 *mon_addrs_size = len + 1;
2392 *mon_addrs = buf;
2393
2394 buf += len;
2395
2396 len = copy_token(&buf, options, options_size);
2397 if (!len || len >= options_size)
2398 return -EINVAL;
2399
2400 ret = -ENOMEM;
2401 rbd_dev->pool_name = dup_token(&buf, NULL);
2402 if (!rbd_dev->pool_name)
2403 goto out_err;
2404
2405 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2406 if (!rbd_dev->image_name)
2407 goto out_err;
2408
2409 /* Create the name of the header object */
2410
2411 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2412 + sizeof (RBD_SUFFIX),
2413 GFP_KERNEL);
2414 if (!rbd_dev->header_name)
2415 goto out_err;
2416 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2417
2418 /*
2419 * The snapshot name is optional. If none is is supplied,
2420 * we use the default value.
2421 */
2422 rbd_dev->snap_name = dup_token(&buf, &len);
2423 if (!rbd_dev->snap_name)
2424 goto out_err;
2425 if (!len) {
2426 /* Replace the empty name with the default */
2427 kfree(rbd_dev->snap_name);
2428 rbd_dev->snap_name
2429 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2430 if (!rbd_dev->snap_name)
2431 goto out_err;
2432
2433 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2434 sizeof (RBD_SNAP_HEAD_NAME));
2435 }
2436
2437 return 0;
2438
2439 out_err:
2440 kfree(rbd_dev->header_name);
2441 rbd_dev->header_name = NULL;
2442 kfree(rbd_dev->image_name);
2443 rbd_dev->image_name = NULL;
2444 rbd_dev->image_name_len = 0;
2445 kfree(rbd_dev->pool_name);
2446 rbd_dev->pool_name = NULL;
2447
2448 return ret;
2449 }
2450
2451 static ssize_t rbd_add(struct bus_type *bus,
2452 const char *buf,
2453 size_t count)
2454 {
2455 char *options;
2456 struct rbd_device *rbd_dev = NULL;
2457 const char *mon_addrs = NULL;
2458 size_t mon_addrs_size = 0;
2459 struct ceph_osd_client *osdc;
2460 int rc = -ENOMEM;
2461
2462 if (!try_module_get(THIS_MODULE))
2463 return -ENODEV;
2464
2465 options = kmalloc(count, GFP_KERNEL);
2466 if (!options)
2467 goto err_nomem;
2468 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2469 if (!rbd_dev)
2470 goto err_nomem;
2471
2472 /* static rbd_device initialization */
2473 spin_lock_init(&rbd_dev->lock);
2474 INIT_LIST_HEAD(&rbd_dev->node);
2475 INIT_LIST_HEAD(&rbd_dev->snaps);
2476 init_rwsem(&rbd_dev->header_rwsem);
2477
2478 /* generate unique id: find highest unique id, add one */
2479 rbd_id_get(rbd_dev);
2480
2481 /* Fill in the device name, now that we have its id. */
2482 BUILD_BUG_ON(DEV_NAME_LEN
2483 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2484 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2485
2486 /* parse add command */
2487 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2488 options, count);
2489 if (rc)
2490 goto err_put_id;
2491
2492 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2493 options);
2494 if (IS_ERR(rbd_dev->rbd_client)) {
2495 rc = PTR_ERR(rbd_dev->rbd_client);
2496 rbd_dev->rbd_client = NULL;
2497 goto err_put_id;
2498 }
2499
2500 /* pick the pool */
2501 osdc = &rbd_dev->rbd_client->client->osdc;
2502 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2503 if (rc < 0)
2504 goto err_out_client;
2505 rbd_dev->pool_id = rc;
2506
2507 /* register our block device */
2508 rc = register_blkdev(0, rbd_dev->name);
2509 if (rc < 0)
2510 goto err_out_client;
2511 rbd_dev->major = rc;
2512
2513 rc = rbd_bus_add_dev(rbd_dev);
2514 if (rc)
2515 goto err_out_blkdev;
2516
2517 /*
2518 * At this point cleanup in the event of an error is the job
2519 * of the sysfs code (initiated by rbd_bus_del_dev()).
2520 *
2521 * Set up and announce blkdev mapping.
2522 */
2523 rc = rbd_init_disk(rbd_dev);
2524 if (rc)
2525 goto err_out_bus;
2526
2527 rc = rbd_init_watch_dev(rbd_dev);
2528 if (rc)
2529 goto err_out_bus;
2530
2531 return count;
2532
2533 err_out_bus:
2534 /* this will also clean up rest of rbd_dev stuff */
2535
2536 rbd_bus_del_dev(rbd_dev);
2537 kfree(options);
2538 return rc;
2539
2540 err_out_blkdev:
2541 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2542 err_out_client:
2543 rbd_put_client(rbd_dev);
2544 err_put_id:
2545 if (rbd_dev->pool_name) {
2546 kfree(rbd_dev->snap_name);
2547 kfree(rbd_dev->header_name);
2548 kfree(rbd_dev->image_name);
2549 kfree(rbd_dev->pool_name);
2550 }
2551 rbd_id_put(rbd_dev);
2552 err_nomem:
2553 kfree(rbd_dev);
2554 kfree(options);
2555
2556 dout("Error adding device %s\n", buf);
2557 module_put(THIS_MODULE);
2558
2559 return (ssize_t) rc;
2560 }
2561
2562 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2563 {
2564 struct list_head *tmp;
2565 struct rbd_device *rbd_dev;
2566
2567 spin_lock(&rbd_dev_list_lock);
2568 list_for_each(tmp, &rbd_dev_list) {
2569 rbd_dev = list_entry(tmp, struct rbd_device, node);
2570 if (rbd_dev->dev_id == dev_id) {
2571 spin_unlock(&rbd_dev_list_lock);
2572 return rbd_dev;
2573 }
2574 }
2575 spin_unlock(&rbd_dev_list_lock);
2576 return NULL;
2577 }
2578
2579 static void rbd_dev_release(struct device *dev)
2580 {
2581 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2582
2583 if (rbd_dev->watch_request) {
2584 struct ceph_client *client = rbd_dev->rbd_client->client;
2585
2586 ceph_osdc_unregister_linger_request(&client->osdc,
2587 rbd_dev->watch_request);
2588 }
2589 if (rbd_dev->watch_event)
2590 rbd_req_sync_unwatch(rbd_dev);
2591
2592 rbd_put_client(rbd_dev);
2593
2594 /* clean up and free blkdev */
2595 rbd_free_disk(rbd_dev);
2596 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2597
2598 /* done with the id, and with the rbd_dev */
2599 kfree(rbd_dev->snap_name);
2600 kfree(rbd_dev->header_name);
2601 kfree(rbd_dev->pool_name);
2602 kfree(rbd_dev->image_name);
2603 rbd_id_put(rbd_dev);
2604 kfree(rbd_dev);
2605
2606 /* release module ref */
2607 module_put(THIS_MODULE);
2608 }
2609
2610 static ssize_t rbd_remove(struct bus_type *bus,
2611 const char *buf,
2612 size_t count)
2613 {
2614 struct rbd_device *rbd_dev = NULL;
2615 int target_id, rc;
2616 unsigned long ul;
2617 int ret = count;
2618
2619 rc = strict_strtoul(buf, 10, &ul);
2620 if (rc)
2621 return rc;
2622
2623 /* convert to int; abort if we lost anything in the conversion */
2624 target_id = (int) ul;
2625 if (target_id != ul)
2626 return -EINVAL;
2627
2628 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2629
2630 rbd_dev = __rbd_get_dev(target_id);
2631 if (!rbd_dev) {
2632 ret = -ENOENT;
2633 goto done;
2634 }
2635
2636 __rbd_remove_all_snaps(rbd_dev);
2637 rbd_bus_del_dev(rbd_dev);
2638
2639 done:
2640 mutex_unlock(&ctl_mutex);
2641 return ret;
2642 }
2643
2644 static ssize_t rbd_snap_add(struct device *dev,
2645 struct device_attribute *attr,
2646 const char *buf,
2647 size_t count)
2648 {
2649 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2650 int ret;
2651 char *name = kmalloc(count + 1, GFP_KERNEL);
2652 if (!name)
2653 return -ENOMEM;
2654
2655 snprintf(name, count, "%s", buf);
2656
2657 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2658
2659 ret = rbd_header_add_snap(rbd_dev,
2660 name, GFP_KERNEL);
2661 if (ret < 0)
2662 goto err_unlock;
2663
2664 ret = __rbd_refresh_header(rbd_dev, NULL);
2665 if (ret < 0)
2666 goto err_unlock;
2667
2668 /* shouldn't hold ctl_mutex when notifying.. notify might
2669 trigger a watch callback that would need to get that mutex */
2670 mutex_unlock(&ctl_mutex);
2671
2672 /* make a best effort, don't error if failed */
2673 rbd_req_sync_notify(rbd_dev);
2674
2675 ret = count;
2676 kfree(name);
2677 return ret;
2678
2679 err_unlock:
2680 mutex_unlock(&ctl_mutex);
2681 kfree(name);
2682 return ret;
2683 }
2684
2685 /*
2686 * create control files in sysfs
2687 * /sys/bus/rbd/...
2688 */
2689 static int rbd_sysfs_init(void)
2690 {
2691 int ret;
2692
2693 ret = device_register(&rbd_root_dev);
2694 if (ret < 0)
2695 return ret;
2696
2697 ret = bus_register(&rbd_bus_type);
2698 if (ret < 0)
2699 device_unregister(&rbd_root_dev);
2700
2701 return ret;
2702 }
2703
2704 static void rbd_sysfs_cleanup(void)
2705 {
2706 bus_unregister(&rbd_bus_type);
2707 device_unregister(&rbd_root_dev);
2708 }
2709
2710 int __init rbd_init(void)
2711 {
2712 int rc;
2713
2714 rc = rbd_sysfs_init();
2715 if (rc)
2716 return rc;
2717 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2718 return 0;
2719 }
2720
2721 void __exit rbd_exit(void)
2722 {
2723 rbd_sysfs_cleanup();
2724 }
2725
2726 module_init(rbd_init);
2727 module_exit(rbd_exit);
2728
2729 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2730 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2731 MODULE_DESCRIPTION("rados block device");
2732
2733 /* following authorship retained from original osdblk.c */
2734 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2735
2736 MODULE_LICENSE("GPL");