]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blob - drivers/block/rbd.c
Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
60
61 #define RBD_SNAP_HEAD_NAME "-"
62
63 /*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75 * block device image metadata (in-memory version)
76 */
77 struct rbd_image_header {
78 u64 image_size;
79 char *object_prefix;
80 __u8 obj_order;
81 __u8 crypt_type;
82 __u8 comp_type;
83 struct ceph_snap_context *snapc;
84 size_t snap_names_len;
85 u32 total_snaps;
86
87 char *snap_names;
88 u64 *snap_sizes;
89
90 u64 obj_version;
91 };
92
93 struct rbd_options {
94 int notify_timeout;
95 };
96
97 /*
98 * an instance of the client. multiple devices may share an rbd client.
99 */
100 struct rbd_client {
101 struct ceph_client *client;
102 struct rbd_options *rbd_opts;
103 struct kref kref;
104 struct list_head node;
105 };
106
107 /*
108 * a request completion status
109 */
110 struct rbd_req_status {
111 int done;
112 int rc;
113 u64 bytes;
114 };
115
116 /*
117 * a collection of requests
118 */
119 struct rbd_req_coll {
120 int total;
121 int num_done;
122 struct kref kref;
123 struct rbd_req_status status[0];
124 };
125
126 /*
127 * a single io request
128 */
129 struct rbd_request {
130 struct request *rq; /* blk layer request */
131 struct bio *bio; /* cloned bio */
132 struct page **pages; /* list of used pages */
133 u64 len;
134 int coll_index;
135 struct rbd_req_coll *coll;
136 };
137
138 struct rbd_snap {
139 struct device dev;
140 const char *name;
141 u64 size;
142 struct list_head node;
143 u64 id;
144 };
145
146 /*
147 * a single device
148 */
149 struct rbd_device {
150 int dev_id; /* blkdev unique id */
151
152 int major; /* blkdev assigned major */
153 struct gendisk *disk; /* blkdev's gendisk and rq */
154 struct request_queue *q;
155
156 struct rbd_client *rbd_client;
157
158 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160 spinlock_t lock; /* queue lock */
161
162 struct rbd_image_header header;
163 char *image_name;
164 size_t image_name_len;
165 char *header_name;
166 char *pool_name;
167 int pool_id;
168
169 struct ceph_osd_event *watch_event;
170 struct ceph_osd_request *watch_request;
171
172 /* protects updating the header */
173 struct rw_semaphore header_rwsem;
174 /* name of the snapshot this device reads from */
175 char *snap_name;
176 /* id of the snapshot this device reads from */
177 u64 snap_id; /* current snapshot id */
178 /* whether the snap_id this device reads from still exists */
179 bool snap_exists;
180 int read_only;
181
182 struct list_head node;
183
184 /* list of snapshots */
185 struct list_head snaps;
186
187 /* sysfs related */
188 struct device dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list); /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list); /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202 struct device_attribute *attr,
203 const char *buf,
204 size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208 size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210 size_t count);
211
212 static struct bus_attribute rbd_bus_attrs[] = {
213 __ATTR(add, S_IWUSR, NULL, rbd_add),
214 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215 __ATTR_NULL
216 };
217
218 static struct bus_type rbd_bus_type = {
219 .name = "rbd",
220 .bus_attrs = rbd_bus_attrs,
221 };
222
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226
227 static struct device rbd_root_dev = {
228 .init_name = "rbd",
229 .release = rbd_root_dev_release,
230 };
231
232
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235 return get_device(&rbd_dev->dev);
236 }
237
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240 put_device(&rbd_dev->dev);
241 }
242
243 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250 return -EROFS;
251
252 rbd_get_dev(rbd_dev);
253 set_device_ro(bdev, rbd_dev->read_only);
254
255 return 0;
256 }
257
258 static int rbd_release(struct gendisk *disk, fmode_t mode)
259 {
260 struct rbd_device *rbd_dev = disk->private_data;
261
262 rbd_put_dev(rbd_dev);
263
264 return 0;
265 }
266
267 static const struct block_device_operations rbd_bd_ops = {
268 .owner = THIS_MODULE,
269 .open = rbd_open,
270 .release = rbd_release,
271 };
272
273 /*
274 * Initialize an rbd client instance.
275 * We own *ceph_opts.
276 */
277 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
278 struct rbd_options *rbd_opts)
279 {
280 struct rbd_client *rbdc;
281 int ret = -ENOMEM;
282
283 dout("rbd_client_create\n");
284 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
285 if (!rbdc)
286 goto out_opt;
287
288 kref_init(&rbdc->kref);
289 INIT_LIST_HEAD(&rbdc->node);
290
291 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292
293 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
294 if (IS_ERR(rbdc->client))
295 goto out_mutex;
296 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
297
298 ret = ceph_open_session(rbdc->client);
299 if (ret < 0)
300 goto out_err;
301
302 rbdc->rbd_opts = rbd_opts;
303
304 spin_lock(&rbd_client_list_lock);
305 list_add_tail(&rbdc->node, &rbd_client_list);
306 spin_unlock(&rbd_client_list_lock);
307
308 mutex_unlock(&ctl_mutex);
309
310 dout("rbd_client_create created %p\n", rbdc);
311 return rbdc;
312
313 out_err:
314 ceph_destroy_client(rbdc->client);
315 out_mutex:
316 mutex_unlock(&ctl_mutex);
317 kfree(rbdc);
318 out_opt:
319 if (ceph_opts)
320 ceph_destroy_options(ceph_opts);
321 return ERR_PTR(ret);
322 }
323
324 /*
325 * Find a ceph client with specific addr and configuration.
326 */
327 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
328 {
329 struct rbd_client *client_node;
330
331 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
332 return NULL;
333
334 list_for_each_entry(client_node, &rbd_client_list, node)
335 if (!ceph_compare_options(ceph_opts, client_node->client))
336 return client_node;
337 return NULL;
338 }
339
340 /*
341 * mount options
342 */
343 enum {
344 Opt_notify_timeout,
345 Opt_last_int,
346 /* int args above */
347 Opt_last_string,
348 /* string args above */
349 };
350
351 static match_table_t rbd_opts_tokens = {
352 {Opt_notify_timeout, "notify_timeout=%d"},
353 /* int args above */
354 /* string args above */
355 {-1, NULL}
356 };
357
358 static int parse_rbd_opts_token(char *c, void *private)
359 {
360 struct rbd_options *rbd_opts = private;
361 substring_t argstr[MAX_OPT_ARGS];
362 int token, intval, ret;
363
364 token = match_token(c, rbd_opts_tokens, argstr);
365 if (token < 0)
366 return -EINVAL;
367
368 if (token < Opt_last_int) {
369 ret = match_int(&argstr[0], &intval);
370 if (ret < 0) {
371 pr_err("bad mount option arg (not int) "
372 "at '%s'\n", c);
373 return ret;
374 }
375 dout("got int token %d val %d\n", token, intval);
376 } else if (token > Opt_last_int && token < Opt_last_string) {
377 dout("got string token %d val %s\n", token,
378 argstr[0].from);
379 } else {
380 dout("got token %d\n", token);
381 }
382
383 switch (token) {
384 case Opt_notify_timeout:
385 rbd_opts->notify_timeout = intval;
386 break;
387 default:
388 BUG_ON(token);
389 }
390 return 0;
391 }
392
393 /*
394 * Get a ceph client with specific addr and configuration, if one does
395 * not exist create it.
396 */
397 static struct rbd_client *rbd_get_client(const char *mon_addr,
398 size_t mon_addr_len,
399 char *options)
400 {
401 struct rbd_client *rbdc;
402 struct ceph_options *ceph_opts;
403 struct rbd_options *rbd_opts;
404
405 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406 if (!rbd_opts)
407 return ERR_PTR(-ENOMEM);
408
409 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
410
411 ceph_opts = ceph_parse_options(options, mon_addr,
412 mon_addr + mon_addr_len,
413 parse_rbd_opts_token, rbd_opts);
414 if (IS_ERR(ceph_opts)) {
415 kfree(rbd_opts);
416 return ERR_CAST(ceph_opts);
417 }
418
419 spin_lock(&rbd_client_list_lock);
420 rbdc = __rbd_client_find(ceph_opts);
421 if (rbdc) {
422 /* using an existing client */
423 kref_get(&rbdc->kref);
424 spin_unlock(&rbd_client_list_lock);
425
426 ceph_destroy_options(ceph_opts);
427 kfree(rbd_opts);
428
429 return rbdc;
430 }
431 spin_unlock(&rbd_client_list_lock);
432
433 rbdc = rbd_client_create(ceph_opts, rbd_opts);
434
435 if (IS_ERR(rbdc))
436 kfree(rbd_opts);
437
438 return rbdc;
439 }
440
441 /*
442 * Destroy ceph client
443 *
444 * Caller must hold rbd_client_list_lock.
445 */
446 static void rbd_client_release(struct kref *kref)
447 {
448 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449
450 dout("rbd_release_client %p\n", rbdc);
451 spin_lock(&rbd_client_list_lock);
452 list_del(&rbdc->node);
453 spin_unlock(&rbd_client_list_lock);
454
455 ceph_destroy_client(rbdc->client);
456 kfree(rbdc->rbd_opts);
457 kfree(rbdc);
458 }
459
460 /*
461 * Drop reference to ceph client node. If it's not referenced anymore, release
462 * it.
463 */
464 static void rbd_put_client(struct rbd_device *rbd_dev)
465 {
466 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467 rbd_dev->rbd_client = NULL;
468 }
469
470 /*
471 * Destroy requests collection
472 */
473 static void rbd_coll_release(struct kref *kref)
474 {
475 struct rbd_req_coll *coll =
476 container_of(kref, struct rbd_req_coll, kref);
477
478 dout("rbd_coll_release %p\n", coll);
479 kfree(coll);
480 }
481
482 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
483 {
484 return !memcmp(&ondisk->text,
485 RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
486 }
487
488 /*
489 * Create a new header structure, translate header format from the on-disk
490 * header.
491 */
492 static int rbd_header_from_disk(struct rbd_image_header *header,
493 struct rbd_image_header_ondisk *ondisk,
494 u32 allocated_snaps)
495 {
496 u32 snap_count;
497
498 if (!rbd_dev_ondisk_valid(ondisk))
499 return -ENXIO;
500
501 snap_count = le32_to_cpu(ondisk->snap_count);
502 if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
503 / sizeof (u64))
504 return -EINVAL;
505 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
506 snap_count * sizeof(u64),
507 GFP_KERNEL);
508 if (!header->snapc)
509 return -ENOMEM;
510
511 if (snap_count) {
512 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
513 header->snap_names = kmalloc(header->snap_names_len,
514 GFP_KERNEL);
515 if (!header->snap_names)
516 goto err_snapc;
517 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
518 GFP_KERNEL);
519 if (!header->snap_sizes)
520 goto err_names;
521 } else {
522 WARN_ON(ondisk->snap_names_len);
523 header->snap_names_len = 0;
524 header->snap_names = NULL;
525 header->snap_sizes = NULL;
526 }
527
528 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
529 GFP_KERNEL);
530 if (!header->object_prefix)
531 goto err_sizes;
532
533 memcpy(header->object_prefix, ondisk->block_name,
534 sizeof(ondisk->block_name));
535 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
536
537 header->image_size = le64_to_cpu(ondisk->image_size);
538 header->obj_order = ondisk->options.order;
539 header->crypt_type = ondisk->options.crypt_type;
540 header->comp_type = ondisk->options.comp_type;
541
542 atomic_set(&header->snapc->nref, 1);
543 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
544 header->snapc->num_snaps = snap_count;
545 header->total_snaps = snap_count;
546
547 if (snap_count && allocated_snaps == snap_count) {
548 int i;
549
550 for (i = 0; i < snap_count; i++) {
551 header->snapc->snaps[i] =
552 le64_to_cpu(ondisk->snaps[i].id);
553 header->snap_sizes[i] =
554 le64_to_cpu(ondisk->snaps[i].image_size);
555 }
556
557 /* copy snapshot names */
558 memcpy(header->snap_names, &ondisk->snaps[snap_count],
559 header->snap_names_len);
560 }
561
562 return 0;
563
564 err_sizes:
565 kfree(header->snap_sizes);
566 header->snap_sizes = NULL;
567 err_names:
568 kfree(header->snap_names);
569 header->snap_names = NULL;
570 err_snapc:
571 kfree(header->snapc);
572 header->snapc = NULL;
573
574 return -ENOMEM;
575 }
576
577 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
578 u64 *seq, u64 *size)
579 {
580 int i;
581 char *p = header->snap_names;
582
583 for (i = 0; i < header->total_snaps; i++) {
584 if (!strcmp(snap_name, p)) {
585
586 /* Found it. Pass back its id and/or size */
587
588 if (seq)
589 *seq = header->snapc->snaps[i];
590 if (size)
591 *size = header->snap_sizes[i];
592 return i;
593 }
594 p += strlen(p) + 1; /* Skip ahead to the next name */
595 }
596 return -ENOENT;
597 }
598
599 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
600 {
601 int ret;
602
603 down_write(&rbd_dev->header_rwsem);
604
605 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
606 sizeof (RBD_SNAP_HEAD_NAME))) {
607 rbd_dev->snap_id = CEPH_NOSNAP;
608 rbd_dev->snap_exists = false;
609 rbd_dev->read_only = 0;
610 if (size)
611 *size = rbd_dev->header.image_size;
612 } else {
613 u64 snap_id = 0;
614
615 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
616 &snap_id, size);
617 if (ret < 0)
618 goto done;
619 rbd_dev->snap_id = snap_id;
620 rbd_dev->snap_exists = true;
621 rbd_dev->read_only = 1;
622 }
623
624 ret = 0;
625 done:
626 up_write(&rbd_dev->header_rwsem);
627 return ret;
628 }
629
630 static void rbd_header_free(struct rbd_image_header *header)
631 {
632 kfree(header->object_prefix);
633 kfree(header->snap_sizes);
634 kfree(header->snap_names);
635 ceph_put_snap_context(header->snapc);
636 }
637
638 /*
639 * get the actual striped segment name, offset and length
640 */
641 static u64 rbd_get_segment(struct rbd_image_header *header,
642 const char *object_prefix,
643 u64 ofs, u64 len,
644 char *seg_name, u64 *segofs)
645 {
646 u64 seg = ofs >> header->obj_order;
647
648 if (seg_name)
649 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
650 "%s.%012llx", object_prefix, seg);
651
652 ofs = ofs & ((1 << header->obj_order) - 1);
653 len = min_t(u64, len, (1 << header->obj_order) - ofs);
654
655 if (segofs)
656 *segofs = ofs;
657
658 return len;
659 }
660
661 static int rbd_get_num_segments(struct rbd_image_header *header,
662 u64 ofs, u64 len)
663 {
664 u64 start_seg = ofs >> header->obj_order;
665 u64 end_seg = (ofs + len - 1) >> header->obj_order;
666 return end_seg - start_seg + 1;
667 }
668
669 /*
670 * returns the size of an object in the image
671 */
672 static u64 rbd_obj_bytes(struct rbd_image_header *header)
673 {
674 return 1 << header->obj_order;
675 }
676
677 /*
678 * bio helpers
679 */
680
681 static void bio_chain_put(struct bio *chain)
682 {
683 struct bio *tmp;
684
685 while (chain) {
686 tmp = chain;
687 chain = chain->bi_next;
688 bio_put(tmp);
689 }
690 }
691
692 /*
693 * zeros a bio chain, starting at specific offset
694 */
695 static void zero_bio_chain(struct bio *chain, int start_ofs)
696 {
697 struct bio_vec *bv;
698 unsigned long flags;
699 void *buf;
700 int i;
701 int pos = 0;
702
703 while (chain) {
704 bio_for_each_segment(bv, chain, i) {
705 if (pos + bv->bv_len > start_ofs) {
706 int remainder = max(start_ofs - pos, 0);
707 buf = bvec_kmap_irq(bv, &flags);
708 memset(buf + remainder, 0,
709 bv->bv_len - remainder);
710 bvec_kunmap_irq(buf, &flags);
711 }
712 pos += bv->bv_len;
713 }
714
715 chain = chain->bi_next;
716 }
717 }
718
719 /*
720 * bio_chain_clone - clone a chain of bios up to a certain length.
721 * might return a bio_pair that will need to be released.
722 */
723 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
724 struct bio_pair **bp,
725 int len, gfp_t gfpmask)
726 {
727 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
728 int total = 0;
729
730 if (*bp) {
731 bio_pair_release(*bp);
732 *bp = NULL;
733 }
734
735 while (old_chain && (total < len)) {
736 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
737 if (!tmp)
738 goto err_out;
739
740 if (total + old_chain->bi_size > len) {
741 struct bio_pair *bp;
742
743 /*
744 * this split can only happen with a single paged bio,
745 * split_bio will BUG_ON if this is not the case
746 */
747 dout("bio_chain_clone split! total=%d remaining=%d"
748 "bi_size=%u\n",
749 total, len - total, old_chain->bi_size);
750
751 /* split the bio. We'll release it either in the next
752 call, or it will have to be released outside */
753 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
754 if (!bp)
755 goto err_out;
756
757 __bio_clone(tmp, &bp->bio1);
758
759 *next = &bp->bio2;
760 } else {
761 __bio_clone(tmp, old_chain);
762 *next = old_chain->bi_next;
763 }
764
765 tmp->bi_bdev = NULL;
766 gfpmask &= ~__GFP_WAIT;
767 tmp->bi_next = NULL;
768
769 if (!new_chain) {
770 new_chain = tail = tmp;
771 } else {
772 tail->bi_next = tmp;
773 tail = tmp;
774 }
775 old_chain = old_chain->bi_next;
776
777 total += tmp->bi_size;
778 }
779
780 BUG_ON(total < len);
781
782 if (tail)
783 tail->bi_next = NULL;
784
785 *old = old_chain;
786
787 return new_chain;
788
789 err_out:
790 dout("bio_chain_clone with err\n");
791 bio_chain_put(new_chain);
792 return NULL;
793 }
794
795 /*
796 * helpers for osd request op vectors.
797 */
798 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
799 int opcode, u32 payload_len)
800 {
801 struct ceph_osd_req_op *ops;
802
803 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
804 if (!ops)
805 return NULL;
806
807 ops[0].op = opcode;
808
809 /*
810 * op extent offset and length will be set later on
811 * in calc_raw_layout()
812 */
813 ops[0].payload_len = payload_len;
814
815 return ops;
816 }
817
818 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
819 {
820 kfree(ops);
821 }
822
823 static void rbd_coll_end_req_index(struct request *rq,
824 struct rbd_req_coll *coll,
825 int index,
826 int ret, u64 len)
827 {
828 struct request_queue *q;
829 int min, max, i;
830
831 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
832 coll, index, ret, (unsigned long long) len);
833
834 if (!rq)
835 return;
836
837 if (!coll) {
838 blk_end_request(rq, ret, len);
839 return;
840 }
841
842 q = rq->q;
843
844 spin_lock_irq(q->queue_lock);
845 coll->status[index].done = 1;
846 coll->status[index].rc = ret;
847 coll->status[index].bytes = len;
848 max = min = coll->num_done;
849 while (max < coll->total && coll->status[max].done)
850 max++;
851
852 for (i = min; i<max; i++) {
853 __blk_end_request(rq, coll->status[i].rc,
854 coll->status[i].bytes);
855 coll->num_done++;
856 kref_put(&coll->kref, rbd_coll_release);
857 }
858 spin_unlock_irq(q->queue_lock);
859 }
860
861 static void rbd_coll_end_req(struct rbd_request *req,
862 int ret, u64 len)
863 {
864 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
865 }
866
867 /*
868 * Send ceph osd request
869 */
870 static int rbd_do_request(struct request *rq,
871 struct rbd_device *rbd_dev,
872 struct ceph_snap_context *snapc,
873 u64 snapid,
874 const char *object_name, u64 ofs, u64 len,
875 struct bio *bio,
876 struct page **pages,
877 int num_pages,
878 int flags,
879 struct ceph_osd_req_op *ops,
880 struct rbd_req_coll *coll,
881 int coll_index,
882 void (*rbd_cb)(struct ceph_osd_request *req,
883 struct ceph_msg *msg),
884 struct ceph_osd_request **linger_req,
885 u64 *ver)
886 {
887 struct ceph_osd_request *req;
888 struct ceph_file_layout *layout;
889 int ret;
890 u64 bno;
891 struct timespec mtime = CURRENT_TIME;
892 struct rbd_request *req_data;
893 struct ceph_osd_request_head *reqhead;
894 struct ceph_osd_client *osdc;
895
896 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
897 if (!req_data) {
898 if (coll)
899 rbd_coll_end_req_index(rq, coll, coll_index,
900 -ENOMEM, len);
901 return -ENOMEM;
902 }
903
904 if (coll) {
905 req_data->coll = coll;
906 req_data->coll_index = coll_index;
907 }
908
909 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
910 (unsigned long long) ofs, (unsigned long long) len);
911
912 osdc = &rbd_dev->rbd_client->client->osdc;
913 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
914 false, GFP_NOIO, pages, bio);
915 if (!req) {
916 ret = -ENOMEM;
917 goto done_pages;
918 }
919
920 req->r_callback = rbd_cb;
921
922 req_data->rq = rq;
923 req_data->bio = bio;
924 req_data->pages = pages;
925 req_data->len = len;
926
927 req->r_priv = req_data;
928
929 reqhead = req->r_request->front.iov_base;
930 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
931
932 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
933 req->r_oid_len = strlen(req->r_oid);
934
935 layout = &req->r_file_layout;
936 memset(layout, 0, sizeof(*layout));
937 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
938 layout->fl_stripe_count = cpu_to_le32(1);
939 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
940 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
941 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
942 req, ops);
943
944 ceph_osdc_build_request(req, ofs, &len,
945 ops,
946 snapc,
947 &mtime,
948 req->r_oid, req->r_oid_len);
949
950 if (linger_req) {
951 ceph_osdc_set_request_linger(osdc, req);
952 *linger_req = req;
953 }
954
955 ret = ceph_osdc_start_request(osdc, req, false);
956 if (ret < 0)
957 goto done_err;
958
959 if (!rbd_cb) {
960 ret = ceph_osdc_wait_request(osdc, req);
961 if (ver)
962 *ver = le64_to_cpu(req->r_reassert_version.version);
963 dout("reassert_ver=%llu\n",
964 (unsigned long long)
965 le64_to_cpu(req->r_reassert_version.version));
966 ceph_osdc_put_request(req);
967 }
968 return ret;
969
970 done_err:
971 bio_chain_put(req_data->bio);
972 ceph_osdc_put_request(req);
973 done_pages:
974 rbd_coll_end_req(req_data, ret, len);
975 kfree(req_data);
976 return ret;
977 }
978
979 /*
980 * Ceph osd op callback
981 */
982 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
983 {
984 struct rbd_request *req_data = req->r_priv;
985 struct ceph_osd_reply_head *replyhead;
986 struct ceph_osd_op *op;
987 __s32 rc;
988 u64 bytes;
989 int read_op;
990
991 /* parse reply */
992 replyhead = msg->front.iov_base;
993 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
994 op = (void *)(replyhead + 1);
995 rc = le32_to_cpu(replyhead->result);
996 bytes = le64_to_cpu(op->extent.length);
997 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
998
999 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1000 (unsigned long long) bytes, read_op, (int) rc);
1001
1002 if (rc == -ENOENT && read_op) {
1003 zero_bio_chain(req_data->bio, 0);
1004 rc = 0;
1005 } else if (rc == 0 && read_op && bytes < req_data->len) {
1006 zero_bio_chain(req_data->bio, bytes);
1007 bytes = req_data->len;
1008 }
1009
1010 rbd_coll_end_req(req_data, rc, bytes);
1011
1012 if (req_data->bio)
1013 bio_chain_put(req_data->bio);
1014
1015 ceph_osdc_put_request(req);
1016 kfree(req_data);
1017 }
1018
1019 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1020 {
1021 ceph_osdc_put_request(req);
1022 }
1023
1024 /*
1025 * Do a synchronous ceph osd operation
1026 */
1027 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1028 struct ceph_snap_context *snapc,
1029 u64 snapid,
1030 int flags,
1031 struct ceph_osd_req_op *ops,
1032 const char *object_name,
1033 u64 ofs, u64 len,
1034 char *buf,
1035 struct ceph_osd_request **linger_req,
1036 u64 *ver)
1037 {
1038 int ret;
1039 struct page **pages;
1040 int num_pages;
1041
1042 BUG_ON(ops == NULL);
1043
1044 num_pages = calc_pages_for(ofs , len);
1045 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1046 if (IS_ERR(pages))
1047 return PTR_ERR(pages);
1048
1049 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1050 object_name, ofs, len, NULL,
1051 pages, num_pages,
1052 flags,
1053 ops,
1054 NULL, 0,
1055 NULL,
1056 linger_req, ver);
1057 if (ret < 0)
1058 goto done;
1059
1060 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1061 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1062
1063 done:
1064 ceph_release_page_vector(pages, num_pages);
1065 return ret;
1066 }
1067
1068 /*
1069 * Do an asynchronous ceph osd operation
1070 */
1071 static int rbd_do_op(struct request *rq,
1072 struct rbd_device *rbd_dev,
1073 struct ceph_snap_context *snapc,
1074 u64 snapid,
1075 int opcode, int flags,
1076 u64 ofs, u64 len,
1077 struct bio *bio,
1078 struct rbd_req_coll *coll,
1079 int coll_index)
1080 {
1081 char *seg_name;
1082 u64 seg_ofs;
1083 u64 seg_len;
1084 int ret;
1085 struct ceph_osd_req_op *ops;
1086 u32 payload_len;
1087
1088 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1089 if (!seg_name)
1090 return -ENOMEM;
1091
1092 seg_len = rbd_get_segment(&rbd_dev->header,
1093 rbd_dev->header.object_prefix,
1094 ofs, len,
1095 seg_name, &seg_ofs);
1096
1097 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1098
1099 ret = -ENOMEM;
1100 ops = rbd_create_rw_ops(1, opcode, payload_len);
1101 if (!ops)
1102 goto done;
1103
1104 /* we've taken care of segment sizes earlier when we
1105 cloned the bios. We should never have a segment
1106 truncated at this point */
1107 BUG_ON(seg_len < len);
1108
1109 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1110 seg_name, seg_ofs, seg_len,
1111 bio,
1112 NULL, 0,
1113 flags,
1114 ops,
1115 coll, coll_index,
1116 rbd_req_cb, 0, NULL);
1117
1118 rbd_destroy_ops(ops);
1119 done:
1120 kfree(seg_name);
1121 return ret;
1122 }
1123
1124 /*
1125 * Request async osd write
1126 */
1127 static int rbd_req_write(struct request *rq,
1128 struct rbd_device *rbd_dev,
1129 struct ceph_snap_context *snapc,
1130 u64 ofs, u64 len,
1131 struct bio *bio,
1132 struct rbd_req_coll *coll,
1133 int coll_index)
1134 {
1135 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1136 CEPH_OSD_OP_WRITE,
1137 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1138 ofs, len, bio, coll, coll_index);
1139 }
1140
1141 /*
1142 * Request async osd read
1143 */
1144 static int rbd_req_read(struct request *rq,
1145 struct rbd_device *rbd_dev,
1146 u64 snapid,
1147 u64 ofs, u64 len,
1148 struct bio *bio,
1149 struct rbd_req_coll *coll,
1150 int coll_index)
1151 {
1152 return rbd_do_op(rq, rbd_dev, NULL,
1153 snapid,
1154 CEPH_OSD_OP_READ,
1155 CEPH_OSD_FLAG_READ,
1156 ofs, len, bio, coll, coll_index);
1157 }
1158
1159 /*
1160 * Request sync osd read
1161 */
1162 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1163 u64 snapid,
1164 const char *object_name,
1165 u64 ofs, u64 len,
1166 char *buf,
1167 u64 *ver)
1168 {
1169 struct ceph_osd_req_op *ops;
1170 int ret;
1171
1172 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1173 if (!ops)
1174 return -ENOMEM;
1175
1176 ret = rbd_req_sync_op(rbd_dev, NULL,
1177 snapid,
1178 CEPH_OSD_FLAG_READ,
1179 ops, object_name, ofs, len, buf, NULL, ver);
1180 rbd_destroy_ops(ops);
1181
1182 return ret;
1183 }
1184
1185 /*
1186 * Request sync osd watch
1187 */
1188 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1189 u64 ver,
1190 u64 notify_id)
1191 {
1192 struct ceph_osd_req_op *ops;
1193 int ret;
1194
1195 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1196 if (!ops)
1197 return -ENOMEM;
1198
1199 ops[0].watch.ver = cpu_to_le64(ver);
1200 ops[0].watch.cookie = notify_id;
1201 ops[0].watch.flag = 0;
1202
1203 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1204 rbd_dev->header_name, 0, 0, NULL,
1205 NULL, 0,
1206 CEPH_OSD_FLAG_READ,
1207 ops,
1208 NULL, 0,
1209 rbd_simple_req_cb, 0, NULL);
1210
1211 rbd_destroy_ops(ops);
1212 return ret;
1213 }
1214
1215 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1216 {
1217 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1218 u64 hver;
1219 int rc;
1220
1221 if (!rbd_dev)
1222 return;
1223
1224 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1225 rbd_dev->header_name, (unsigned long long) notify_id,
1226 (unsigned int) opcode);
1227 rc = rbd_refresh_header(rbd_dev, &hver);
1228 if (rc)
1229 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1230 " update snaps: %d\n", rbd_dev->major, rc);
1231
1232 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1233 }
1234
1235 /*
1236 * Request sync osd watch
1237 */
1238 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1239 {
1240 struct ceph_osd_req_op *ops;
1241 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1242 int ret;
1243
1244 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1245 if (!ops)
1246 return -ENOMEM;
1247
1248 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1249 (void *)rbd_dev, &rbd_dev->watch_event);
1250 if (ret < 0)
1251 goto fail;
1252
1253 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1254 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1255 ops[0].watch.flag = 1;
1256
1257 ret = rbd_req_sync_op(rbd_dev, NULL,
1258 CEPH_NOSNAP,
1259 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1260 ops,
1261 rbd_dev->header_name,
1262 0, 0, NULL,
1263 &rbd_dev->watch_request, NULL);
1264
1265 if (ret < 0)
1266 goto fail_event;
1267
1268 rbd_destroy_ops(ops);
1269 return 0;
1270
1271 fail_event:
1272 ceph_osdc_cancel_event(rbd_dev->watch_event);
1273 rbd_dev->watch_event = NULL;
1274 fail:
1275 rbd_destroy_ops(ops);
1276 return ret;
1277 }
1278
1279 /*
1280 * Request sync osd unwatch
1281 */
1282 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1283 {
1284 struct ceph_osd_req_op *ops;
1285 int ret;
1286
1287 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1288 if (!ops)
1289 return -ENOMEM;
1290
1291 ops[0].watch.ver = 0;
1292 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1293 ops[0].watch.flag = 0;
1294
1295 ret = rbd_req_sync_op(rbd_dev, NULL,
1296 CEPH_NOSNAP,
1297 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1298 ops,
1299 rbd_dev->header_name,
1300 0, 0, NULL, NULL, NULL);
1301
1302
1303 rbd_destroy_ops(ops);
1304 ceph_osdc_cancel_event(rbd_dev->watch_event);
1305 rbd_dev->watch_event = NULL;
1306 return ret;
1307 }
1308
1309 struct rbd_notify_info {
1310 struct rbd_device *rbd_dev;
1311 };
1312
1313 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1314 {
1315 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1316 if (!rbd_dev)
1317 return;
1318
1319 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1320 rbd_dev->header_name, (unsigned long long) notify_id,
1321 (unsigned int) opcode);
1322 }
1323
1324 /*
1325 * Request sync osd notify
1326 */
1327 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1328 {
1329 struct ceph_osd_req_op *ops;
1330 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1331 struct ceph_osd_event *event;
1332 struct rbd_notify_info info;
1333 int payload_len = sizeof(u32) + sizeof(u32);
1334 int ret;
1335
1336 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1337 if (!ops)
1338 return -ENOMEM;
1339
1340 info.rbd_dev = rbd_dev;
1341
1342 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1343 (void *)&info, &event);
1344 if (ret < 0)
1345 goto fail;
1346
1347 ops[0].watch.ver = 1;
1348 ops[0].watch.flag = 1;
1349 ops[0].watch.cookie = event->cookie;
1350 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1351 ops[0].watch.timeout = 12;
1352
1353 ret = rbd_req_sync_op(rbd_dev, NULL,
1354 CEPH_NOSNAP,
1355 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1356 ops,
1357 rbd_dev->header_name,
1358 0, 0, NULL, NULL, NULL);
1359 if (ret < 0)
1360 goto fail_event;
1361
1362 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1363 dout("ceph_osdc_wait_event returned %d\n", ret);
1364 rbd_destroy_ops(ops);
1365 return 0;
1366
1367 fail_event:
1368 ceph_osdc_cancel_event(event);
1369 fail:
1370 rbd_destroy_ops(ops);
1371 return ret;
1372 }
1373
1374 /*
1375 * Request sync osd read
1376 */
1377 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1378 const char *object_name,
1379 const char *class_name,
1380 const char *method_name,
1381 const char *data,
1382 int len,
1383 u64 *ver)
1384 {
1385 struct ceph_osd_req_op *ops;
1386 int class_name_len = strlen(class_name);
1387 int method_name_len = strlen(method_name);
1388 int ret;
1389
1390 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1391 class_name_len + method_name_len + len);
1392 if (!ops)
1393 return -ENOMEM;
1394
1395 ops[0].cls.class_name = class_name;
1396 ops[0].cls.class_len = (__u8) class_name_len;
1397 ops[0].cls.method_name = method_name;
1398 ops[0].cls.method_len = (__u8) method_name_len;
1399 ops[0].cls.argc = 0;
1400 ops[0].cls.indata = data;
1401 ops[0].cls.indata_len = len;
1402
1403 ret = rbd_req_sync_op(rbd_dev, NULL,
1404 CEPH_NOSNAP,
1405 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1406 ops,
1407 object_name, 0, 0, NULL, NULL, ver);
1408
1409 rbd_destroy_ops(ops);
1410
1411 dout("cls_exec returned %d\n", ret);
1412 return ret;
1413 }
1414
1415 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1416 {
1417 struct rbd_req_coll *coll =
1418 kzalloc(sizeof(struct rbd_req_coll) +
1419 sizeof(struct rbd_req_status) * num_reqs,
1420 GFP_ATOMIC);
1421
1422 if (!coll)
1423 return NULL;
1424 coll->total = num_reqs;
1425 kref_init(&coll->kref);
1426 return coll;
1427 }
1428
1429 /*
1430 * block device queue callback
1431 */
1432 static void rbd_rq_fn(struct request_queue *q)
1433 {
1434 struct rbd_device *rbd_dev = q->queuedata;
1435 struct request *rq;
1436 struct bio_pair *bp = NULL;
1437
1438 while ((rq = blk_fetch_request(q))) {
1439 struct bio *bio;
1440 struct bio *rq_bio, *next_bio = NULL;
1441 bool do_write;
1442 unsigned int size;
1443 u64 op_size = 0;
1444 u64 ofs;
1445 int num_segs, cur_seg = 0;
1446 struct rbd_req_coll *coll;
1447 struct ceph_snap_context *snapc;
1448
1449 /* peek at request from block layer */
1450 if (!rq)
1451 break;
1452
1453 dout("fetched request\n");
1454
1455 /* filter out block requests we don't understand */
1456 if ((rq->cmd_type != REQ_TYPE_FS)) {
1457 __blk_end_request_all(rq, 0);
1458 continue;
1459 }
1460
1461 /* deduce our operation (read, write) */
1462 do_write = (rq_data_dir(rq) == WRITE);
1463
1464 size = blk_rq_bytes(rq);
1465 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1466 rq_bio = rq->bio;
1467 if (do_write && rbd_dev->read_only) {
1468 __blk_end_request_all(rq, -EROFS);
1469 continue;
1470 }
1471
1472 spin_unlock_irq(q->queue_lock);
1473
1474 down_read(&rbd_dev->header_rwsem);
1475
1476 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1477 up_read(&rbd_dev->header_rwsem);
1478 dout("request for non-existent snapshot");
1479 spin_lock_irq(q->queue_lock);
1480 __blk_end_request_all(rq, -ENXIO);
1481 continue;
1482 }
1483
1484 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1485
1486 up_read(&rbd_dev->header_rwsem);
1487
1488 dout("%s 0x%x bytes at 0x%llx\n",
1489 do_write ? "write" : "read",
1490 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1491
1492 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1493 coll = rbd_alloc_coll(num_segs);
1494 if (!coll) {
1495 spin_lock_irq(q->queue_lock);
1496 __blk_end_request_all(rq, -ENOMEM);
1497 ceph_put_snap_context(snapc);
1498 continue;
1499 }
1500
1501 do {
1502 /* a bio clone to be passed down to OSD req */
1503 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1504 op_size = rbd_get_segment(&rbd_dev->header,
1505 rbd_dev->header.object_prefix,
1506 ofs, size,
1507 NULL, NULL);
1508 kref_get(&coll->kref);
1509 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1510 op_size, GFP_ATOMIC);
1511 if (!bio) {
1512 rbd_coll_end_req_index(rq, coll, cur_seg,
1513 -ENOMEM, op_size);
1514 goto next_seg;
1515 }
1516
1517
1518 /* init OSD command: write or read */
1519 if (do_write)
1520 rbd_req_write(rq, rbd_dev,
1521 snapc,
1522 ofs,
1523 op_size, bio,
1524 coll, cur_seg);
1525 else
1526 rbd_req_read(rq, rbd_dev,
1527 rbd_dev->snap_id,
1528 ofs,
1529 op_size, bio,
1530 coll, cur_seg);
1531
1532 next_seg:
1533 size -= op_size;
1534 ofs += op_size;
1535
1536 cur_seg++;
1537 rq_bio = next_bio;
1538 } while (size > 0);
1539 kref_put(&coll->kref, rbd_coll_release);
1540
1541 if (bp)
1542 bio_pair_release(bp);
1543 spin_lock_irq(q->queue_lock);
1544
1545 ceph_put_snap_context(snapc);
1546 }
1547 }
1548
1549 /*
1550 * a queue callback. Makes sure that we don't create a bio that spans across
1551 * multiple osd objects. One exception would be with a single page bios,
1552 * which we handle later at bio_chain_clone
1553 */
1554 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1555 struct bio_vec *bvec)
1556 {
1557 struct rbd_device *rbd_dev = q->queuedata;
1558 unsigned int chunk_sectors;
1559 sector_t sector;
1560 unsigned int bio_sectors;
1561 int max;
1562
1563 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1564 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1565 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1566
1567 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1568 + bio_sectors)) << SECTOR_SHIFT;
1569 if (max < 0)
1570 max = 0; /* bio_add cannot handle a negative return */
1571 if (max <= bvec->bv_len && bio_sectors == 0)
1572 return bvec->bv_len;
1573 return max;
1574 }
1575
1576 static void rbd_free_disk(struct rbd_device *rbd_dev)
1577 {
1578 struct gendisk *disk = rbd_dev->disk;
1579
1580 if (!disk)
1581 return;
1582
1583 rbd_header_free(&rbd_dev->header);
1584
1585 if (disk->flags & GENHD_FL_UP)
1586 del_gendisk(disk);
1587 if (disk->queue)
1588 blk_cleanup_queue(disk->queue);
1589 put_disk(disk);
1590 }
1591
1592 /*
1593 * reload the ondisk the header
1594 */
1595 static int rbd_read_header(struct rbd_device *rbd_dev,
1596 struct rbd_image_header *header)
1597 {
1598 ssize_t rc;
1599 struct rbd_image_header_ondisk *dh;
1600 u32 snap_count = 0;
1601 u64 ver;
1602 size_t len;
1603
1604 /*
1605 * First reads the fixed-size header to determine the number
1606 * of snapshots, then re-reads it, along with all snapshot
1607 * records as well as their stored names.
1608 */
1609 len = sizeof (*dh);
1610 while (1) {
1611 dh = kmalloc(len, GFP_KERNEL);
1612 if (!dh)
1613 return -ENOMEM;
1614
1615 rc = rbd_req_sync_read(rbd_dev,
1616 CEPH_NOSNAP,
1617 rbd_dev->header_name,
1618 0, len,
1619 (char *)dh, &ver);
1620 if (rc < 0)
1621 goto out_dh;
1622
1623 rc = rbd_header_from_disk(header, dh, snap_count);
1624 if (rc < 0) {
1625 if (rc == -ENXIO)
1626 pr_warning("unrecognized header format"
1627 " for image %s\n",
1628 rbd_dev->image_name);
1629 goto out_dh;
1630 }
1631
1632 if (snap_count == header->total_snaps)
1633 break;
1634
1635 snap_count = header->total_snaps;
1636 len = sizeof (*dh) +
1637 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1638 header->snap_names_len;
1639
1640 rbd_header_free(header);
1641 kfree(dh);
1642 }
1643 header->obj_version = ver;
1644
1645 out_dh:
1646 kfree(dh);
1647 return rc;
1648 }
1649
1650 /*
1651 * create a snapshot
1652 */
1653 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1654 const char *snap_name,
1655 gfp_t gfp_flags)
1656 {
1657 int name_len = strlen(snap_name);
1658 u64 new_snapid;
1659 int ret;
1660 void *data, *p, *e;
1661 struct ceph_mon_client *monc;
1662
1663 /* we should create a snapshot only if we're pointing at the head */
1664 if (rbd_dev->snap_id != CEPH_NOSNAP)
1665 return -EINVAL;
1666
1667 monc = &rbd_dev->rbd_client->client->monc;
1668 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1669 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1670 if (ret < 0)
1671 return ret;
1672
1673 data = kmalloc(name_len + 16, gfp_flags);
1674 if (!data)
1675 return -ENOMEM;
1676
1677 p = data;
1678 e = data + name_len + 16;
1679
1680 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1681 ceph_encode_64_safe(&p, e, new_snapid, bad);
1682
1683 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1684 "rbd", "snap_add",
1685 data, p - data, NULL);
1686
1687 kfree(data);
1688
1689 return ret < 0 ? ret : 0;
1690 bad:
1691 return -ERANGE;
1692 }
1693
1694 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1695 {
1696 struct rbd_snap *snap;
1697 struct rbd_snap *next;
1698
1699 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1700 __rbd_remove_snap_dev(snap);
1701 }
1702
1703 /*
1704 * only read the first part of the ondisk header, without the snaps info
1705 */
1706 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1707 {
1708 int ret;
1709 struct rbd_image_header h;
1710
1711 ret = rbd_read_header(rbd_dev, &h);
1712 if (ret < 0)
1713 return ret;
1714
1715 down_write(&rbd_dev->header_rwsem);
1716
1717 /* resized? */
1718 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1719 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1720
1721 dout("setting size to %llu sectors", (unsigned long long) size);
1722 set_capacity(rbd_dev->disk, size);
1723 }
1724
1725 /* rbd_dev->header.object_prefix shouldn't change */
1726 kfree(rbd_dev->header.snap_sizes);
1727 kfree(rbd_dev->header.snap_names);
1728 /* osd requests may still refer to snapc */
1729 ceph_put_snap_context(rbd_dev->header.snapc);
1730
1731 if (hver)
1732 *hver = h.obj_version;
1733 rbd_dev->header.obj_version = h.obj_version;
1734 rbd_dev->header.image_size = h.image_size;
1735 rbd_dev->header.total_snaps = h.total_snaps;
1736 rbd_dev->header.snapc = h.snapc;
1737 rbd_dev->header.snap_names = h.snap_names;
1738 rbd_dev->header.snap_names_len = h.snap_names_len;
1739 rbd_dev->header.snap_sizes = h.snap_sizes;
1740 /* Free the extra copy of the object prefix */
1741 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1742 kfree(h.object_prefix);
1743
1744 ret = __rbd_init_snaps_header(rbd_dev);
1745
1746 up_write(&rbd_dev->header_rwsem);
1747
1748 return ret;
1749 }
1750
1751 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1752 {
1753 int ret;
1754
1755 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1756 ret = __rbd_refresh_header(rbd_dev, hver);
1757 mutex_unlock(&ctl_mutex);
1758
1759 return ret;
1760 }
1761
1762 static int rbd_init_disk(struct rbd_device *rbd_dev)
1763 {
1764 struct gendisk *disk;
1765 struct request_queue *q;
1766 int rc;
1767 u64 segment_size;
1768 u64 total_size = 0;
1769
1770 /* contact OSD, request size info about the object being mapped */
1771 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1772 if (rc)
1773 return rc;
1774
1775 /* no need to lock here, as rbd_dev is not registered yet */
1776 rc = __rbd_init_snaps_header(rbd_dev);
1777 if (rc)
1778 return rc;
1779
1780 rc = rbd_header_set_snap(rbd_dev, &total_size);
1781 if (rc)
1782 return rc;
1783
1784 /* create gendisk info */
1785 rc = -ENOMEM;
1786 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1787 if (!disk)
1788 goto out;
1789
1790 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1791 rbd_dev->dev_id);
1792 disk->major = rbd_dev->major;
1793 disk->first_minor = 0;
1794 disk->fops = &rbd_bd_ops;
1795 disk->private_data = rbd_dev;
1796
1797 /* init rq */
1798 rc = -ENOMEM;
1799 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1800 if (!q)
1801 goto out_disk;
1802
1803 /* We use the default size, but let's be explicit about it. */
1804 blk_queue_physical_block_size(q, SECTOR_SIZE);
1805
1806 /* set io sizes to object size */
1807 segment_size = rbd_obj_bytes(&rbd_dev->header);
1808 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1809 blk_queue_max_segment_size(q, segment_size);
1810 blk_queue_io_min(q, segment_size);
1811 blk_queue_io_opt(q, segment_size);
1812
1813 blk_queue_merge_bvec(q, rbd_merge_bvec);
1814 disk->queue = q;
1815
1816 q->queuedata = rbd_dev;
1817
1818 rbd_dev->disk = disk;
1819 rbd_dev->q = q;
1820
1821 /* finally, announce the disk to the world */
1822 set_capacity(disk, total_size / SECTOR_SIZE);
1823 add_disk(disk);
1824
1825 pr_info("%s: added with size 0x%llx\n",
1826 disk->disk_name, (unsigned long long)total_size);
1827 return 0;
1828
1829 out_disk:
1830 put_disk(disk);
1831 out:
1832 return rc;
1833 }
1834
1835 /*
1836 sysfs
1837 */
1838
1839 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1840 {
1841 return container_of(dev, struct rbd_device, dev);
1842 }
1843
1844 static ssize_t rbd_size_show(struct device *dev,
1845 struct device_attribute *attr, char *buf)
1846 {
1847 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1848 sector_t size;
1849
1850 down_read(&rbd_dev->header_rwsem);
1851 size = get_capacity(rbd_dev->disk);
1852 up_read(&rbd_dev->header_rwsem);
1853
1854 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1855 }
1856
1857 static ssize_t rbd_major_show(struct device *dev,
1858 struct device_attribute *attr, char *buf)
1859 {
1860 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1861
1862 return sprintf(buf, "%d\n", rbd_dev->major);
1863 }
1864
1865 static ssize_t rbd_client_id_show(struct device *dev,
1866 struct device_attribute *attr, char *buf)
1867 {
1868 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1869
1870 return sprintf(buf, "client%lld\n",
1871 ceph_client_id(rbd_dev->rbd_client->client));
1872 }
1873
1874 static ssize_t rbd_pool_show(struct device *dev,
1875 struct device_attribute *attr, char *buf)
1876 {
1877 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878
1879 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1880 }
1881
1882 static ssize_t rbd_pool_id_show(struct device *dev,
1883 struct device_attribute *attr, char *buf)
1884 {
1885 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1886
1887 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1888 }
1889
1890 static ssize_t rbd_name_show(struct device *dev,
1891 struct device_attribute *attr, char *buf)
1892 {
1893 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1894
1895 return sprintf(buf, "%s\n", rbd_dev->image_name);
1896 }
1897
1898 static ssize_t rbd_snap_show(struct device *dev,
1899 struct device_attribute *attr,
1900 char *buf)
1901 {
1902 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1903
1904 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1905 }
1906
1907 static ssize_t rbd_image_refresh(struct device *dev,
1908 struct device_attribute *attr,
1909 const char *buf,
1910 size_t size)
1911 {
1912 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1913 int ret;
1914
1915 ret = rbd_refresh_header(rbd_dev, NULL);
1916
1917 return ret < 0 ? ret : size;
1918 }
1919
1920 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1921 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1922 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1923 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1924 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1925 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1926 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1927 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1928 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1929
1930 static struct attribute *rbd_attrs[] = {
1931 &dev_attr_size.attr,
1932 &dev_attr_major.attr,
1933 &dev_attr_client_id.attr,
1934 &dev_attr_pool.attr,
1935 &dev_attr_pool_id.attr,
1936 &dev_attr_name.attr,
1937 &dev_attr_current_snap.attr,
1938 &dev_attr_refresh.attr,
1939 &dev_attr_create_snap.attr,
1940 NULL
1941 };
1942
1943 static struct attribute_group rbd_attr_group = {
1944 .attrs = rbd_attrs,
1945 };
1946
1947 static const struct attribute_group *rbd_attr_groups[] = {
1948 &rbd_attr_group,
1949 NULL
1950 };
1951
1952 static void rbd_sysfs_dev_release(struct device *dev)
1953 {
1954 }
1955
1956 static struct device_type rbd_device_type = {
1957 .name = "rbd",
1958 .groups = rbd_attr_groups,
1959 .release = rbd_sysfs_dev_release,
1960 };
1961
1962
1963 /*
1964 sysfs - snapshots
1965 */
1966
1967 static ssize_t rbd_snap_size_show(struct device *dev,
1968 struct device_attribute *attr,
1969 char *buf)
1970 {
1971 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1972
1973 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1974 }
1975
1976 static ssize_t rbd_snap_id_show(struct device *dev,
1977 struct device_attribute *attr,
1978 char *buf)
1979 {
1980 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1981
1982 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1983 }
1984
1985 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1986 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1987
1988 static struct attribute *rbd_snap_attrs[] = {
1989 &dev_attr_snap_size.attr,
1990 &dev_attr_snap_id.attr,
1991 NULL,
1992 };
1993
1994 static struct attribute_group rbd_snap_attr_group = {
1995 .attrs = rbd_snap_attrs,
1996 };
1997
1998 static void rbd_snap_dev_release(struct device *dev)
1999 {
2000 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2001 kfree(snap->name);
2002 kfree(snap);
2003 }
2004
2005 static const struct attribute_group *rbd_snap_attr_groups[] = {
2006 &rbd_snap_attr_group,
2007 NULL
2008 };
2009
2010 static struct device_type rbd_snap_device_type = {
2011 .groups = rbd_snap_attr_groups,
2012 .release = rbd_snap_dev_release,
2013 };
2014
2015 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2016 {
2017 list_del(&snap->node);
2018 device_unregister(&snap->dev);
2019 }
2020
2021 static int rbd_register_snap_dev(struct rbd_snap *snap,
2022 struct device *parent)
2023 {
2024 struct device *dev = &snap->dev;
2025 int ret;
2026
2027 dev->type = &rbd_snap_device_type;
2028 dev->parent = parent;
2029 dev->release = rbd_snap_dev_release;
2030 dev_set_name(dev, "snap_%s", snap->name);
2031 ret = device_register(dev);
2032
2033 return ret;
2034 }
2035
2036 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2037 int i, const char *name)
2038 {
2039 struct rbd_snap *snap;
2040 int ret;
2041
2042 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2043 if (!snap)
2044 return ERR_PTR(-ENOMEM);
2045
2046 ret = -ENOMEM;
2047 snap->name = kstrdup(name, GFP_KERNEL);
2048 if (!snap->name)
2049 goto err;
2050
2051 snap->size = rbd_dev->header.snap_sizes[i];
2052 snap->id = rbd_dev->header.snapc->snaps[i];
2053 if (device_is_registered(&rbd_dev->dev)) {
2054 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2055 if (ret < 0)
2056 goto err;
2057 }
2058
2059 return snap;
2060
2061 err:
2062 kfree(snap->name);
2063 kfree(snap);
2064
2065 return ERR_PTR(ret);
2066 }
2067
2068 /*
2069 * search for the previous snap in a null delimited string list
2070 */
2071 const char *rbd_prev_snap_name(const char *name, const char *start)
2072 {
2073 if (name < start + 2)
2074 return NULL;
2075
2076 name -= 2;
2077 while (*name) {
2078 if (name == start)
2079 return start;
2080 name--;
2081 }
2082 return name + 1;
2083 }
2084
2085 /*
2086 * compare the old list of snapshots that we have to what's in the header
2087 * and update it accordingly. Note that the header holds the snapshots
2088 * in a reverse order (from newest to oldest) and we need to go from
2089 * older to new so that we don't get a duplicate snap name when
2090 * doing the process (e.g., removed snapshot and recreated a new
2091 * one with the same name.
2092 */
2093 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2094 {
2095 const char *name, *first_name;
2096 int i = rbd_dev->header.total_snaps;
2097 struct rbd_snap *snap, *old_snap = NULL;
2098 struct list_head *p, *n;
2099
2100 first_name = rbd_dev->header.snap_names;
2101 name = first_name + rbd_dev->header.snap_names_len;
2102
2103 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2104 u64 cur_id;
2105
2106 old_snap = list_entry(p, struct rbd_snap, node);
2107
2108 if (i)
2109 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2110
2111 if (!i || old_snap->id < cur_id) {
2112 /*
2113 * old_snap->id was skipped, thus was
2114 * removed. If this rbd_dev is mapped to
2115 * the removed snapshot, record that it no
2116 * longer exists, to prevent further I/O.
2117 */
2118 if (rbd_dev->snap_id == old_snap->id)
2119 rbd_dev->snap_exists = false;
2120 __rbd_remove_snap_dev(old_snap);
2121 continue;
2122 }
2123 if (old_snap->id == cur_id) {
2124 /* we have this snapshot already */
2125 i--;
2126 name = rbd_prev_snap_name(name, first_name);
2127 continue;
2128 }
2129 for (; i > 0;
2130 i--, name = rbd_prev_snap_name(name, first_name)) {
2131 if (!name) {
2132 WARN_ON(1);
2133 return -EINVAL;
2134 }
2135 cur_id = rbd_dev->header.snapc->snaps[i];
2136 /* snapshot removal? handle it above */
2137 if (cur_id >= old_snap->id)
2138 break;
2139 /* a new snapshot */
2140 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2141 if (IS_ERR(snap))
2142 return PTR_ERR(snap);
2143
2144 /* note that we add it backward so using n and not p */
2145 list_add(&snap->node, n);
2146 p = &snap->node;
2147 }
2148 }
2149 /* we're done going over the old snap list, just add what's left */
2150 for (; i > 0; i--) {
2151 name = rbd_prev_snap_name(name, first_name);
2152 if (!name) {
2153 WARN_ON(1);
2154 return -EINVAL;
2155 }
2156 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2157 if (IS_ERR(snap))
2158 return PTR_ERR(snap);
2159 list_add(&snap->node, &rbd_dev->snaps);
2160 }
2161
2162 return 0;
2163 }
2164
2165 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2166 {
2167 int ret;
2168 struct device *dev;
2169 struct rbd_snap *snap;
2170
2171 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2172 dev = &rbd_dev->dev;
2173
2174 dev->bus = &rbd_bus_type;
2175 dev->type = &rbd_device_type;
2176 dev->parent = &rbd_root_dev;
2177 dev->release = rbd_dev_release;
2178 dev_set_name(dev, "%d", rbd_dev->dev_id);
2179 ret = device_register(dev);
2180 if (ret < 0)
2181 goto out;
2182
2183 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2184 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2185 if (ret < 0)
2186 break;
2187 }
2188 out:
2189 mutex_unlock(&ctl_mutex);
2190 return ret;
2191 }
2192
2193 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2194 {
2195 device_unregister(&rbd_dev->dev);
2196 }
2197
2198 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2199 {
2200 int ret, rc;
2201
2202 do {
2203 ret = rbd_req_sync_watch(rbd_dev);
2204 if (ret == -ERANGE) {
2205 rc = rbd_refresh_header(rbd_dev, NULL);
2206 if (rc < 0)
2207 return rc;
2208 }
2209 } while (ret == -ERANGE);
2210
2211 return ret;
2212 }
2213
2214 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2215
2216 /*
2217 * Get a unique rbd identifier for the given new rbd_dev, and add
2218 * the rbd_dev to the global list. The minimum rbd id is 1.
2219 */
2220 static void rbd_id_get(struct rbd_device *rbd_dev)
2221 {
2222 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2223
2224 spin_lock(&rbd_dev_list_lock);
2225 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2226 spin_unlock(&rbd_dev_list_lock);
2227 }
2228
2229 /*
2230 * Remove an rbd_dev from the global list, and record that its
2231 * identifier is no longer in use.
2232 */
2233 static void rbd_id_put(struct rbd_device *rbd_dev)
2234 {
2235 struct list_head *tmp;
2236 int rbd_id = rbd_dev->dev_id;
2237 int max_id;
2238
2239 BUG_ON(rbd_id < 1);
2240
2241 spin_lock(&rbd_dev_list_lock);
2242 list_del_init(&rbd_dev->node);
2243
2244 /*
2245 * If the id being "put" is not the current maximum, there
2246 * is nothing special we need to do.
2247 */
2248 if (rbd_id != atomic64_read(&rbd_id_max)) {
2249 spin_unlock(&rbd_dev_list_lock);
2250 return;
2251 }
2252
2253 /*
2254 * We need to update the current maximum id. Search the
2255 * list to find out what it is. We're more likely to find
2256 * the maximum at the end, so search the list backward.
2257 */
2258 max_id = 0;
2259 list_for_each_prev(tmp, &rbd_dev_list) {
2260 struct rbd_device *rbd_dev;
2261
2262 rbd_dev = list_entry(tmp, struct rbd_device, node);
2263 if (rbd_id > max_id)
2264 max_id = rbd_id;
2265 }
2266 spin_unlock(&rbd_dev_list_lock);
2267
2268 /*
2269 * The max id could have been updated by rbd_id_get(), in
2270 * which case it now accurately reflects the new maximum.
2271 * Be careful not to overwrite the maximum value in that
2272 * case.
2273 */
2274 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2275 }
2276
2277 /*
2278 * Skips over white space at *buf, and updates *buf to point to the
2279 * first found non-space character (if any). Returns the length of
2280 * the token (string of non-white space characters) found. Note
2281 * that *buf must be terminated with '\0'.
2282 */
2283 static inline size_t next_token(const char **buf)
2284 {
2285 /*
2286 * These are the characters that produce nonzero for
2287 * isspace() in the "C" and "POSIX" locales.
2288 */
2289 const char *spaces = " \f\n\r\t\v";
2290
2291 *buf += strspn(*buf, spaces); /* Find start of token */
2292
2293 return strcspn(*buf, spaces); /* Return token length */
2294 }
2295
2296 /*
2297 * Finds the next token in *buf, and if the provided token buffer is
2298 * big enough, copies the found token into it. The result, if
2299 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2300 * must be terminated with '\0' on entry.
2301 *
2302 * Returns the length of the token found (not including the '\0').
2303 * Return value will be 0 if no token is found, and it will be >=
2304 * token_size if the token would not fit.
2305 *
2306 * The *buf pointer will be updated to point beyond the end of the
2307 * found token. Note that this occurs even if the token buffer is
2308 * too small to hold it.
2309 */
2310 static inline size_t copy_token(const char **buf,
2311 char *token,
2312 size_t token_size)
2313 {
2314 size_t len;
2315
2316 len = next_token(buf);
2317 if (len < token_size) {
2318 memcpy(token, *buf, len);
2319 *(token + len) = '\0';
2320 }
2321 *buf += len;
2322
2323 return len;
2324 }
2325
2326 /*
2327 * Finds the next token in *buf, dynamically allocates a buffer big
2328 * enough to hold a copy of it, and copies the token into the new
2329 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2330 * that a duplicate buffer is created even for a zero-length token.
2331 *
2332 * Returns a pointer to the newly-allocated duplicate, or a null
2333 * pointer if memory for the duplicate was not available. If
2334 * the lenp argument is a non-null pointer, the length of the token
2335 * (not including the '\0') is returned in *lenp.
2336 *
2337 * If successful, the *buf pointer will be updated to point beyond
2338 * the end of the found token.
2339 *
2340 * Note: uses GFP_KERNEL for allocation.
2341 */
2342 static inline char *dup_token(const char **buf, size_t *lenp)
2343 {
2344 char *dup;
2345 size_t len;
2346
2347 len = next_token(buf);
2348 dup = kmalloc(len + 1, GFP_KERNEL);
2349 if (!dup)
2350 return NULL;
2351
2352 memcpy(dup, *buf, len);
2353 *(dup + len) = '\0';
2354 *buf += len;
2355
2356 if (lenp)
2357 *lenp = len;
2358
2359 return dup;
2360 }
2361
2362 /*
2363 * This fills in the pool_name, image_name, image_name_len, snap_name,
2364 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2365 * on the list of monitor addresses and other options provided via
2366 * /sys/bus/rbd/add.
2367 *
2368 * Note: rbd_dev is assumed to have been initially zero-filled.
2369 */
2370 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2371 const char *buf,
2372 const char **mon_addrs,
2373 size_t *mon_addrs_size,
2374 char *options,
2375 size_t options_size)
2376 {
2377 size_t len;
2378 int ret;
2379
2380 /* The first four tokens are required */
2381
2382 len = next_token(&buf);
2383 if (!len)
2384 return -EINVAL;
2385 *mon_addrs_size = len + 1;
2386 *mon_addrs = buf;
2387
2388 buf += len;
2389
2390 len = copy_token(&buf, options, options_size);
2391 if (!len || len >= options_size)
2392 return -EINVAL;
2393
2394 ret = -ENOMEM;
2395 rbd_dev->pool_name = dup_token(&buf, NULL);
2396 if (!rbd_dev->pool_name)
2397 goto out_err;
2398
2399 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2400 if (!rbd_dev->image_name)
2401 goto out_err;
2402
2403 /* Create the name of the header object */
2404
2405 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2406 + sizeof (RBD_SUFFIX),
2407 GFP_KERNEL);
2408 if (!rbd_dev->header_name)
2409 goto out_err;
2410 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2411
2412 /*
2413 * The snapshot name is optional. If none is is supplied,
2414 * we use the default value.
2415 */
2416 rbd_dev->snap_name = dup_token(&buf, &len);
2417 if (!rbd_dev->snap_name)
2418 goto out_err;
2419 if (!len) {
2420 /* Replace the empty name with the default */
2421 kfree(rbd_dev->snap_name);
2422 rbd_dev->snap_name
2423 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2424 if (!rbd_dev->snap_name)
2425 goto out_err;
2426
2427 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2428 sizeof (RBD_SNAP_HEAD_NAME));
2429 }
2430
2431 return 0;
2432
2433 out_err:
2434 kfree(rbd_dev->header_name);
2435 kfree(rbd_dev->image_name);
2436 kfree(rbd_dev->pool_name);
2437 rbd_dev->pool_name = NULL;
2438
2439 return ret;
2440 }
2441
2442 static ssize_t rbd_add(struct bus_type *bus,
2443 const char *buf,
2444 size_t count)
2445 {
2446 char *options;
2447 struct rbd_device *rbd_dev = NULL;
2448 const char *mon_addrs = NULL;
2449 size_t mon_addrs_size = 0;
2450 struct ceph_osd_client *osdc;
2451 int rc = -ENOMEM;
2452
2453 if (!try_module_get(THIS_MODULE))
2454 return -ENODEV;
2455
2456 options = kmalloc(count, GFP_KERNEL);
2457 if (!options)
2458 goto err_nomem;
2459 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2460 if (!rbd_dev)
2461 goto err_nomem;
2462
2463 /* static rbd_device initialization */
2464 spin_lock_init(&rbd_dev->lock);
2465 INIT_LIST_HEAD(&rbd_dev->node);
2466 INIT_LIST_HEAD(&rbd_dev->snaps);
2467 init_rwsem(&rbd_dev->header_rwsem);
2468
2469 /* generate unique id: find highest unique id, add one */
2470 rbd_id_get(rbd_dev);
2471
2472 /* Fill in the device name, now that we have its id. */
2473 BUILD_BUG_ON(DEV_NAME_LEN
2474 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2475 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2476
2477 /* parse add command */
2478 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2479 options, count);
2480 if (rc)
2481 goto err_put_id;
2482
2483 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2484 options);
2485 if (IS_ERR(rbd_dev->rbd_client)) {
2486 rc = PTR_ERR(rbd_dev->rbd_client);
2487 goto err_put_id;
2488 }
2489
2490 /* pick the pool */
2491 osdc = &rbd_dev->rbd_client->client->osdc;
2492 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2493 if (rc < 0)
2494 goto err_out_client;
2495 rbd_dev->pool_id = rc;
2496
2497 /* register our block device */
2498 rc = register_blkdev(0, rbd_dev->name);
2499 if (rc < 0)
2500 goto err_out_client;
2501 rbd_dev->major = rc;
2502
2503 rc = rbd_bus_add_dev(rbd_dev);
2504 if (rc)
2505 goto err_out_blkdev;
2506
2507 /*
2508 * At this point cleanup in the event of an error is the job
2509 * of the sysfs code (initiated by rbd_bus_del_dev()).
2510 *
2511 * Set up and announce blkdev mapping.
2512 */
2513 rc = rbd_init_disk(rbd_dev);
2514 if (rc)
2515 goto err_out_bus;
2516
2517 rc = rbd_init_watch_dev(rbd_dev);
2518 if (rc)
2519 goto err_out_bus;
2520
2521 return count;
2522
2523 err_out_bus:
2524 /* this will also clean up rest of rbd_dev stuff */
2525
2526 rbd_bus_del_dev(rbd_dev);
2527 kfree(options);
2528 return rc;
2529
2530 err_out_blkdev:
2531 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2532 err_out_client:
2533 rbd_put_client(rbd_dev);
2534 err_put_id:
2535 if (rbd_dev->pool_name) {
2536 kfree(rbd_dev->snap_name);
2537 kfree(rbd_dev->header_name);
2538 kfree(rbd_dev->image_name);
2539 kfree(rbd_dev->pool_name);
2540 }
2541 rbd_id_put(rbd_dev);
2542 err_nomem:
2543 kfree(rbd_dev);
2544 kfree(options);
2545
2546 dout("Error adding device %s\n", buf);
2547 module_put(THIS_MODULE);
2548
2549 return (ssize_t) rc;
2550 }
2551
2552 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2553 {
2554 struct list_head *tmp;
2555 struct rbd_device *rbd_dev;
2556
2557 spin_lock(&rbd_dev_list_lock);
2558 list_for_each(tmp, &rbd_dev_list) {
2559 rbd_dev = list_entry(tmp, struct rbd_device, node);
2560 if (rbd_dev->dev_id == dev_id) {
2561 spin_unlock(&rbd_dev_list_lock);
2562 return rbd_dev;
2563 }
2564 }
2565 spin_unlock(&rbd_dev_list_lock);
2566 return NULL;
2567 }
2568
2569 static void rbd_dev_release(struct device *dev)
2570 {
2571 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2572
2573 if (rbd_dev->watch_request) {
2574 struct ceph_client *client = rbd_dev->rbd_client->client;
2575
2576 ceph_osdc_unregister_linger_request(&client->osdc,
2577 rbd_dev->watch_request);
2578 }
2579 if (rbd_dev->watch_event)
2580 rbd_req_sync_unwatch(rbd_dev);
2581
2582 rbd_put_client(rbd_dev);
2583
2584 /* clean up and free blkdev */
2585 rbd_free_disk(rbd_dev);
2586 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2587
2588 /* done with the id, and with the rbd_dev */
2589 kfree(rbd_dev->snap_name);
2590 kfree(rbd_dev->header_name);
2591 kfree(rbd_dev->pool_name);
2592 kfree(rbd_dev->image_name);
2593 rbd_id_put(rbd_dev);
2594 kfree(rbd_dev);
2595
2596 /* release module ref */
2597 module_put(THIS_MODULE);
2598 }
2599
2600 static ssize_t rbd_remove(struct bus_type *bus,
2601 const char *buf,
2602 size_t count)
2603 {
2604 struct rbd_device *rbd_dev = NULL;
2605 int target_id, rc;
2606 unsigned long ul;
2607 int ret = count;
2608
2609 rc = strict_strtoul(buf, 10, &ul);
2610 if (rc)
2611 return rc;
2612
2613 /* convert to int; abort if we lost anything in the conversion */
2614 target_id = (int) ul;
2615 if (target_id != ul)
2616 return -EINVAL;
2617
2618 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2619
2620 rbd_dev = __rbd_get_dev(target_id);
2621 if (!rbd_dev) {
2622 ret = -ENOENT;
2623 goto done;
2624 }
2625
2626 __rbd_remove_all_snaps(rbd_dev);
2627 rbd_bus_del_dev(rbd_dev);
2628
2629 done:
2630 mutex_unlock(&ctl_mutex);
2631 return ret;
2632 }
2633
2634 static ssize_t rbd_snap_add(struct device *dev,
2635 struct device_attribute *attr,
2636 const char *buf,
2637 size_t count)
2638 {
2639 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2640 int ret;
2641 char *name = kmalloc(count + 1, GFP_KERNEL);
2642 if (!name)
2643 return -ENOMEM;
2644
2645 snprintf(name, count, "%s", buf);
2646
2647 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2648
2649 ret = rbd_header_add_snap(rbd_dev,
2650 name, GFP_KERNEL);
2651 if (ret < 0)
2652 goto err_unlock;
2653
2654 ret = __rbd_refresh_header(rbd_dev, NULL);
2655 if (ret < 0)
2656 goto err_unlock;
2657
2658 /* shouldn't hold ctl_mutex when notifying.. notify might
2659 trigger a watch callback that would need to get that mutex */
2660 mutex_unlock(&ctl_mutex);
2661
2662 /* make a best effort, don't error if failed */
2663 rbd_req_sync_notify(rbd_dev);
2664
2665 ret = count;
2666 kfree(name);
2667 return ret;
2668
2669 err_unlock:
2670 mutex_unlock(&ctl_mutex);
2671 kfree(name);
2672 return ret;
2673 }
2674
2675 /*
2676 * create control files in sysfs
2677 * /sys/bus/rbd/...
2678 */
2679 static int rbd_sysfs_init(void)
2680 {
2681 int ret;
2682
2683 ret = device_register(&rbd_root_dev);
2684 if (ret < 0)
2685 return ret;
2686
2687 ret = bus_register(&rbd_bus_type);
2688 if (ret < 0)
2689 device_unregister(&rbd_root_dev);
2690
2691 return ret;
2692 }
2693
2694 static void rbd_sysfs_cleanup(void)
2695 {
2696 bus_unregister(&rbd_bus_type);
2697 device_unregister(&rbd_root_dev);
2698 }
2699
2700 int __init rbd_init(void)
2701 {
2702 int rc;
2703
2704 rc = rbd_sysfs_init();
2705 if (rc)
2706 return rc;
2707 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2708 return 0;
2709 }
2710
2711 void __exit rbd_exit(void)
2712 {
2713 rbd_sysfs_cleanup();
2714 }
2715
2716 module_init(rbd_init);
2717 module_exit(rbd_exit);
2718
2719 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2720 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2721 MODULE_DESCRIPTION("rados block device");
2722
2723 /* following authorship retained from original osdblk.c */
2724 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2725
2726 MODULE_LICENSE("GPL");