]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blob - drivers/block/rbd.c
rbd: rename rbd_id_get()
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
1 /*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG /* Activate rbd_assert() calls */
45
46 /*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52 #define SECTOR_SHIFT 9
53 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
55 /* It might be useful to have this defined elsewhere too */
56
57 #define U64_MAX ((u64) (~0ULL))
58
59 #define RBD_DRV_NAME "rbd"
60 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
61
62 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
63
64 #define RBD_MAX_SNAP_NAME_LEN 32
65 #define RBD_MAX_OPT_LEN 1024
66
67 #define RBD_SNAP_HEAD_NAME "-"
68
69 /*
70 * An RBD device name will be "rbd#", where the "rbd" comes from
71 * RBD_DRV_NAME above, and # is a unique integer identifier.
72 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
73 * enough to hold all possible device names.
74 */
75 #define DEV_NAME_LEN 32
76 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
77
78 #define RBD_READ_ONLY_DEFAULT false
79
80 /*
81 * block device image metadata (in-memory version)
82 */
83 struct rbd_image_header {
84 u64 image_size;
85 char *object_prefix;
86 __u8 obj_order;
87 __u8 crypt_type;
88 __u8 comp_type;
89 struct ceph_snap_context *snapc;
90 u32 total_snaps;
91
92 char *snap_names;
93 u64 *snap_sizes;
94
95 u64 obj_version;
96 };
97
98 struct rbd_options {
99 bool read_only;
100 };
101
102 /*
103 * an instance of the client. multiple devices may share an rbd client.
104 */
105 struct rbd_client {
106 struct ceph_client *client;
107 struct kref kref;
108 struct list_head node;
109 };
110
111 /*
112 * a request completion status
113 */
114 struct rbd_req_status {
115 int done;
116 int rc;
117 u64 bytes;
118 };
119
120 /*
121 * a collection of requests
122 */
123 struct rbd_req_coll {
124 int total;
125 int num_done;
126 struct kref kref;
127 struct rbd_req_status status[0];
128 };
129
130 /*
131 * a single io request
132 */
133 struct rbd_request {
134 struct request *rq; /* blk layer request */
135 struct bio *bio; /* cloned bio */
136 struct page **pages; /* list of used pages */
137 u64 len;
138 int coll_index;
139 struct rbd_req_coll *coll;
140 };
141
142 struct rbd_snap {
143 struct device dev;
144 const char *name;
145 u64 size;
146 struct list_head node;
147 u64 id;
148 };
149
150 /*
151 * a single device
152 */
153 struct rbd_device {
154 int dev_id; /* blkdev unique id */
155
156 int major; /* blkdev assigned major */
157 struct gendisk *disk; /* blkdev's gendisk and rq */
158 struct request_queue *q;
159
160 struct rbd_options rbd_opts;
161 struct rbd_client *rbd_client;
162
163 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
164
165 spinlock_t lock; /* queue lock */
166
167 struct rbd_image_header header;
168 char *image_name;
169 size_t image_name_len;
170 char *header_name;
171 char *pool_name;
172 int pool_id;
173
174 struct ceph_osd_event *watch_event;
175 struct ceph_osd_request *watch_request;
176
177 /* protects updating the header */
178 struct rw_semaphore header_rwsem;
179 /* name of the snapshot this device reads from */
180 char *snap_name;
181 /* id of the snapshot this device reads from */
182 u64 snap_id; /* current snapshot id */
183 /* whether the snap_id this device reads from still exists */
184 bool snap_exists;
185 bool read_only;
186
187 struct list_head node;
188
189 /* list of snapshots */
190 struct list_head snaps;
191
192 /* sysfs related */
193 struct device dev;
194 };
195
196 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
197
198 static LIST_HEAD(rbd_dev_list); /* devices */
199 static DEFINE_SPINLOCK(rbd_dev_list_lock);
200
201 static LIST_HEAD(rbd_client_list); /* clients */
202 static DEFINE_SPINLOCK(rbd_client_list_lock);
203
204 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
205 static void rbd_dev_release(struct device *dev);
206 static ssize_t rbd_snap_add(struct device *dev,
207 struct device_attribute *attr,
208 const char *buf,
209 size_t count);
210 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
211
212 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
213 size_t count);
214 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
215 size_t count);
216
217 static struct bus_attribute rbd_bus_attrs[] = {
218 __ATTR(add, S_IWUSR, NULL, rbd_add),
219 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
220 __ATTR_NULL
221 };
222
223 static struct bus_type rbd_bus_type = {
224 .name = "rbd",
225 .bus_attrs = rbd_bus_attrs,
226 };
227
228 static void rbd_root_dev_release(struct device *dev)
229 {
230 }
231
232 static struct device rbd_root_dev = {
233 .init_name = "rbd",
234 .release = rbd_root_dev_release,
235 };
236
237 #ifdef RBD_DEBUG
238 #define rbd_assert(expr) \
239 if (unlikely(!(expr))) { \
240 printk(KERN_ERR "\nAssertion failure in %s() " \
241 "at line %d:\n\n" \
242 "\trbd_assert(%s);\n\n", \
243 __func__, __LINE__, #expr); \
244 BUG(); \
245 }
246 #else /* !RBD_DEBUG */
247 # define rbd_assert(expr) ((void) 0)
248 #endif /* !RBD_DEBUG */
249
250 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
251 {
252 return get_device(&rbd_dev->dev);
253 }
254
255 static void rbd_put_dev(struct rbd_device *rbd_dev)
256 {
257 put_device(&rbd_dev->dev);
258 }
259
260 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
261
262 static int rbd_open(struct block_device *bdev, fmode_t mode)
263 {
264 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
265
266 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
267 return -EROFS;
268
269 rbd_get_dev(rbd_dev);
270 set_device_ro(bdev, rbd_dev->read_only);
271
272 return 0;
273 }
274
275 static int rbd_release(struct gendisk *disk, fmode_t mode)
276 {
277 struct rbd_device *rbd_dev = disk->private_data;
278
279 rbd_put_dev(rbd_dev);
280
281 return 0;
282 }
283
284 static const struct block_device_operations rbd_bd_ops = {
285 .owner = THIS_MODULE,
286 .open = rbd_open,
287 .release = rbd_release,
288 };
289
290 /*
291 * Initialize an rbd client instance.
292 * We own *ceph_opts.
293 */
294 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
295 {
296 struct rbd_client *rbdc;
297 int ret = -ENOMEM;
298
299 dout("rbd_client_create\n");
300 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
301 if (!rbdc)
302 goto out_opt;
303
304 kref_init(&rbdc->kref);
305 INIT_LIST_HEAD(&rbdc->node);
306
307 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
308
309 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
310 if (IS_ERR(rbdc->client))
311 goto out_mutex;
312 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
313
314 ret = ceph_open_session(rbdc->client);
315 if (ret < 0)
316 goto out_err;
317
318 spin_lock(&rbd_client_list_lock);
319 list_add_tail(&rbdc->node, &rbd_client_list);
320 spin_unlock(&rbd_client_list_lock);
321
322 mutex_unlock(&ctl_mutex);
323
324 dout("rbd_client_create created %p\n", rbdc);
325 return rbdc;
326
327 out_err:
328 ceph_destroy_client(rbdc->client);
329 out_mutex:
330 mutex_unlock(&ctl_mutex);
331 kfree(rbdc);
332 out_opt:
333 if (ceph_opts)
334 ceph_destroy_options(ceph_opts);
335 return ERR_PTR(ret);
336 }
337
338 /*
339 * Find a ceph client with specific addr and configuration. If
340 * found, bump its reference count.
341 */
342 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
343 {
344 struct rbd_client *client_node;
345 bool found = false;
346
347 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
348 return NULL;
349
350 spin_lock(&rbd_client_list_lock);
351 list_for_each_entry(client_node, &rbd_client_list, node) {
352 if (!ceph_compare_options(ceph_opts, client_node->client)) {
353 kref_get(&client_node->kref);
354 found = true;
355 break;
356 }
357 }
358 spin_unlock(&rbd_client_list_lock);
359
360 return found ? client_node : NULL;
361 }
362
363 /*
364 * mount options
365 */
366 enum {
367 Opt_last_int,
368 /* int args above */
369 Opt_last_string,
370 /* string args above */
371 Opt_read_only,
372 Opt_read_write,
373 /* Boolean args above */
374 Opt_last_bool,
375 };
376
377 static match_table_t rbd_opts_tokens = {
378 /* int args above */
379 /* string args above */
380 {Opt_read_only, "read_only"},
381 {Opt_read_only, "ro"}, /* Alternate spelling */
382 {Opt_read_write, "read_write"},
383 {Opt_read_write, "rw"}, /* Alternate spelling */
384 /* Boolean args above */
385 {-1, NULL}
386 };
387
388 static int parse_rbd_opts_token(char *c, void *private)
389 {
390 struct rbd_options *rbd_opts = private;
391 substring_t argstr[MAX_OPT_ARGS];
392 int token, intval, ret;
393
394 token = match_token(c, rbd_opts_tokens, argstr);
395 if (token < 0)
396 return -EINVAL;
397
398 if (token < Opt_last_int) {
399 ret = match_int(&argstr[0], &intval);
400 if (ret < 0) {
401 pr_err("bad mount option arg (not int) "
402 "at '%s'\n", c);
403 return ret;
404 }
405 dout("got int token %d val %d\n", token, intval);
406 } else if (token > Opt_last_int && token < Opt_last_string) {
407 dout("got string token %d val %s\n", token,
408 argstr[0].from);
409 } else if (token > Opt_last_string && token < Opt_last_bool) {
410 dout("got Boolean token %d\n", token);
411 } else {
412 dout("got token %d\n", token);
413 }
414
415 switch (token) {
416 case Opt_read_only:
417 rbd_opts->read_only = true;
418 break;
419 case Opt_read_write:
420 rbd_opts->read_only = false;
421 break;
422 default:
423 rbd_assert(false);
424 break;
425 }
426 return 0;
427 }
428
429 /*
430 * Get a ceph client with specific addr and configuration, if one does
431 * not exist create it.
432 */
433 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
434 size_t mon_addr_len, char *options)
435 {
436 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
437 struct ceph_options *ceph_opts;
438 struct rbd_client *rbdc;
439
440 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
441
442 ceph_opts = ceph_parse_options(options, mon_addr,
443 mon_addr + mon_addr_len,
444 parse_rbd_opts_token, rbd_opts);
445 if (IS_ERR(ceph_opts))
446 return PTR_ERR(ceph_opts);
447
448 rbdc = rbd_client_find(ceph_opts);
449 if (rbdc) {
450 /* using an existing client */
451 ceph_destroy_options(ceph_opts);
452 } else {
453 rbdc = rbd_client_create(ceph_opts);
454 if (IS_ERR(rbdc))
455 return PTR_ERR(rbdc);
456 }
457 rbd_dev->rbd_client = rbdc;
458
459 return 0;
460 }
461
462 /*
463 * Destroy ceph client
464 *
465 * Caller must hold rbd_client_list_lock.
466 */
467 static void rbd_client_release(struct kref *kref)
468 {
469 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
470
471 dout("rbd_release_client %p\n", rbdc);
472 spin_lock(&rbd_client_list_lock);
473 list_del(&rbdc->node);
474 spin_unlock(&rbd_client_list_lock);
475
476 ceph_destroy_client(rbdc->client);
477 kfree(rbdc);
478 }
479
480 /*
481 * Drop reference to ceph client node. If it's not referenced anymore, release
482 * it.
483 */
484 static void rbd_put_client(struct rbd_device *rbd_dev)
485 {
486 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
487 rbd_dev->rbd_client = NULL;
488 }
489
490 /*
491 * Destroy requests collection
492 */
493 static void rbd_coll_release(struct kref *kref)
494 {
495 struct rbd_req_coll *coll =
496 container_of(kref, struct rbd_req_coll, kref);
497
498 dout("rbd_coll_release %p\n", coll);
499 kfree(coll);
500 }
501
502 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
503 {
504 size_t size;
505 u32 snap_count;
506
507 /* The header has to start with the magic rbd header text */
508 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
509 return false;
510
511 /*
512 * The size of a snapshot header has to fit in a size_t, and
513 * that limits the number of snapshots.
514 */
515 snap_count = le32_to_cpu(ondisk->snap_count);
516 size = SIZE_MAX - sizeof (struct ceph_snap_context);
517 if (snap_count > size / sizeof (__le64))
518 return false;
519
520 /*
521 * Not only that, but the size of the entire the snapshot
522 * header must also be representable in a size_t.
523 */
524 size -= snap_count * sizeof (__le64);
525 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
526 return false;
527
528 return true;
529 }
530
531 /*
532 * Create a new header structure, translate header format from the on-disk
533 * header.
534 */
535 static int rbd_header_from_disk(struct rbd_image_header *header,
536 struct rbd_image_header_ondisk *ondisk)
537 {
538 u32 snap_count;
539 size_t len;
540 size_t size;
541 u32 i;
542
543 memset(header, 0, sizeof (*header));
544
545 snap_count = le32_to_cpu(ondisk->snap_count);
546
547 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
548 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
549 if (!header->object_prefix)
550 return -ENOMEM;
551 memcpy(header->object_prefix, ondisk->object_prefix, len);
552 header->object_prefix[len] = '\0';
553
554 if (snap_count) {
555 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
556
557 /* Save a copy of the snapshot names */
558
559 if (snap_names_len > (u64) SIZE_MAX)
560 return -EIO;
561 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
562 if (!header->snap_names)
563 goto out_err;
564 /*
565 * Note that rbd_dev_v1_header_read() guarantees
566 * the ondisk buffer we're working with has
567 * snap_names_len bytes beyond the end of the
568 * snapshot id array, this memcpy() is safe.
569 */
570 memcpy(header->snap_names, &ondisk->snaps[snap_count],
571 snap_names_len);
572
573 /* Record each snapshot's size */
574
575 size = snap_count * sizeof (*header->snap_sizes);
576 header->snap_sizes = kmalloc(size, GFP_KERNEL);
577 if (!header->snap_sizes)
578 goto out_err;
579 for (i = 0; i < snap_count; i++)
580 header->snap_sizes[i] =
581 le64_to_cpu(ondisk->snaps[i].image_size);
582 } else {
583 WARN_ON(ondisk->snap_names_len);
584 header->snap_names = NULL;
585 header->snap_sizes = NULL;
586 }
587
588 header->image_size = le64_to_cpu(ondisk->image_size);
589 header->obj_order = ondisk->options.order;
590 header->crypt_type = ondisk->options.crypt_type;
591 header->comp_type = ondisk->options.comp_type;
592 header->total_snaps = snap_count;
593
594 /* Allocate and fill in the snapshot context */
595
596 size = sizeof (struct ceph_snap_context);
597 size += snap_count * sizeof (header->snapc->snaps[0]);
598 header->snapc = kzalloc(size, GFP_KERNEL);
599 if (!header->snapc)
600 goto out_err;
601
602 atomic_set(&header->snapc->nref, 1);
603 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
604 header->snapc->num_snaps = snap_count;
605 for (i = 0; i < snap_count; i++)
606 header->snapc->snaps[i] =
607 le64_to_cpu(ondisk->snaps[i].id);
608
609 return 0;
610
611 out_err:
612 kfree(header->snap_sizes);
613 header->snap_sizes = NULL;
614 kfree(header->snap_names);
615 header->snap_names = NULL;
616 kfree(header->object_prefix);
617 header->object_prefix = NULL;
618
619 return -ENOMEM;
620 }
621
622 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
623 u64 *seq, u64 *size)
624 {
625 int i;
626 char *p = header->snap_names;
627
628 for (i = 0; i < header->total_snaps; i++) {
629 if (!strcmp(snap_name, p)) {
630
631 /* Found it. Pass back its id and/or size */
632
633 if (seq)
634 *seq = header->snapc->snaps[i];
635 if (size)
636 *size = header->snap_sizes[i];
637 return i;
638 }
639 p += strlen(p) + 1; /* Skip ahead to the next name */
640 }
641 return -ENOENT;
642 }
643
644 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
645 {
646 int ret;
647
648 down_write(&rbd_dev->header_rwsem);
649
650 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
651 sizeof (RBD_SNAP_HEAD_NAME))) {
652 rbd_dev->snap_id = CEPH_NOSNAP;
653 rbd_dev->snap_exists = false;
654 rbd_dev->read_only = rbd_dev->rbd_opts.read_only;
655 if (size)
656 *size = rbd_dev->header.image_size;
657 } else {
658 u64 snap_id = 0;
659
660 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
661 &snap_id, size);
662 if (ret < 0)
663 goto done;
664 rbd_dev->snap_id = snap_id;
665 rbd_dev->snap_exists = true;
666 rbd_dev->read_only = true; /* No choice for snapshots */
667 }
668
669 ret = 0;
670 done:
671 up_write(&rbd_dev->header_rwsem);
672 return ret;
673 }
674
675 static void rbd_header_free(struct rbd_image_header *header)
676 {
677 kfree(header->object_prefix);
678 header->object_prefix = NULL;
679 kfree(header->snap_sizes);
680 header->snap_sizes = NULL;
681 kfree(header->snap_names);
682 header->snap_names = NULL;
683 ceph_put_snap_context(header->snapc);
684 header->snapc = NULL;
685 }
686
687 static char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
688 {
689 char *name;
690 u64 segment;
691 int ret;
692
693 name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
694 if (!name)
695 return NULL;
696 segment = offset >> rbd_dev->header.obj_order;
697 ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
698 rbd_dev->header.object_prefix, segment);
699 if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
700 pr_err("error formatting segment name for #%llu (%d)\n",
701 segment, ret);
702 kfree(name);
703 name = NULL;
704 }
705
706 return name;
707 }
708
709 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
710 {
711 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
712
713 return offset & (segment_size - 1);
714 }
715
716 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
717 u64 offset, u64 length)
718 {
719 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
720
721 offset &= segment_size - 1;
722
723 rbd_assert(length <= U64_MAX - offset);
724 if (offset + length > segment_size)
725 length = segment_size - offset;
726
727 return length;
728 }
729
730 static int rbd_get_num_segments(struct rbd_image_header *header,
731 u64 ofs, u64 len)
732 {
733 u64 start_seg;
734 u64 end_seg;
735
736 if (!len)
737 return 0;
738 if (len - 1 > U64_MAX - ofs)
739 return -ERANGE;
740
741 start_seg = ofs >> header->obj_order;
742 end_seg = (ofs + len - 1) >> header->obj_order;
743
744 return end_seg - start_seg + 1;
745 }
746
747 /*
748 * returns the size of an object in the image
749 */
750 static u64 rbd_obj_bytes(struct rbd_image_header *header)
751 {
752 return 1 << header->obj_order;
753 }
754
755 /*
756 * bio helpers
757 */
758
759 static void bio_chain_put(struct bio *chain)
760 {
761 struct bio *tmp;
762
763 while (chain) {
764 tmp = chain;
765 chain = chain->bi_next;
766 bio_put(tmp);
767 }
768 }
769
770 /*
771 * zeros a bio chain, starting at specific offset
772 */
773 static void zero_bio_chain(struct bio *chain, int start_ofs)
774 {
775 struct bio_vec *bv;
776 unsigned long flags;
777 void *buf;
778 int i;
779 int pos = 0;
780
781 while (chain) {
782 bio_for_each_segment(bv, chain, i) {
783 if (pos + bv->bv_len > start_ofs) {
784 int remainder = max(start_ofs - pos, 0);
785 buf = bvec_kmap_irq(bv, &flags);
786 memset(buf + remainder, 0,
787 bv->bv_len - remainder);
788 bvec_kunmap_irq(buf, &flags);
789 }
790 pos += bv->bv_len;
791 }
792
793 chain = chain->bi_next;
794 }
795 }
796
797 /*
798 * bio_chain_clone - clone a chain of bios up to a certain length.
799 * might return a bio_pair that will need to be released.
800 */
801 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
802 struct bio_pair **bp,
803 int len, gfp_t gfpmask)
804 {
805 struct bio *old_chain = *old;
806 struct bio *new_chain = NULL;
807 struct bio *tail;
808 int total = 0;
809
810 if (*bp) {
811 bio_pair_release(*bp);
812 *bp = NULL;
813 }
814
815 while (old_chain && (total < len)) {
816 struct bio *tmp;
817
818 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
819 if (!tmp)
820 goto err_out;
821 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
822
823 if (total + old_chain->bi_size > len) {
824 struct bio_pair *bp;
825
826 /*
827 * this split can only happen with a single paged bio,
828 * split_bio will BUG_ON if this is not the case
829 */
830 dout("bio_chain_clone split! total=%d remaining=%d"
831 "bi_size=%u\n",
832 total, len - total, old_chain->bi_size);
833
834 /* split the bio. We'll release it either in the next
835 call, or it will have to be released outside */
836 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
837 if (!bp)
838 goto err_out;
839
840 __bio_clone(tmp, &bp->bio1);
841
842 *next = &bp->bio2;
843 } else {
844 __bio_clone(tmp, old_chain);
845 *next = old_chain->bi_next;
846 }
847
848 tmp->bi_bdev = NULL;
849 tmp->bi_next = NULL;
850 if (new_chain)
851 tail->bi_next = tmp;
852 else
853 new_chain = tmp;
854 tail = tmp;
855 old_chain = old_chain->bi_next;
856
857 total += tmp->bi_size;
858 }
859
860 rbd_assert(total == len);
861
862 *old = old_chain;
863
864 return new_chain;
865
866 err_out:
867 dout("bio_chain_clone with err\n");
868 bio_chain_put(new_chain);
869 return NULL;
870 }
871
872 /*
873 * helpers for osd request op vectors.
874 */
875 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
876 int opcode, u32 payload_len)
877 {
878 struct ceph_osd_req_op *ops;
879
880 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
881 if (!ops)
882 return NULL;
883
884 ops[0].op = opcode;
885
886 /*
887 * op extent offset and length will be set later on
888 * in calc_raw_layout()
889 */
890 ops[0].payload_len = payload_len;
891
892 return ops;
893 }
894
895 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
896 {
897 kfree(ops);
898 }
899
900 static void rbd_coll_end_req_index(struct request *rq,
901 struct rbd_req_coll *coll,
902 int index,
903 int ret, u64 len)
904 {
905 struct request_queue *q;
906 int min, max, i;
907
908 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
909 coll, index, ret, (unsigned long long) len);
910
911 if (!rq)
912 return;
913
914 if (!coll) {
915 blk_end_request(rq, ret, len);
916 return;
917 }
918
919 q = rq->q;
920
921 spin_lock_irq(q->queue_lock);
922 coll->status[index].done = 1;
923 coll->status[index].rc = ret;
924 coll->status[index].bytes = len;
925 max = min = coll->num_done;
926 while (max < coll->total && coll->status[max].done)
927 max++;
928
929 for (i = min; i<max; i++) {
930 __blk_end_request(rq, coll->status[i].rc,
931 coll->status[i].bytes);
932 coll->num_done++;
933 kref_put(&coll->kref, rbd_coll_release);
934 }
935 spin_unlock_irq(q->queue_lock);
936 }
937
938 static void rbd_coll_end_req(struct rbd_request *req,
939 int ret, u64 len)
940 {
941 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
942 }
943
944 /*
945 * Send ceph osd request
946 */
947 static int rbd_do_request(struct request *rq,
948 struct rbd_device *rbd_dev,
949 struct ceph_snap_context *snapc,
950 u64 snapid,
951 const char *object_name, u64 ofs, u64 len,
952 struct bio *bio,
953 struct page **pages,
954 int num_pages,
955 int flags,
956 struct ceph_osd_req_op *ops,
957 struct rbd_req_coll *coll,
958 int coll_index,
959 void (*rbd_cb)(struct ceph_osd_request *req,
960 struct ceph_msg *msg),
961 struct ceph_osd_request **linger_req,
962 u64 *ver)
963 {
964 struct ceph_osd_request *req;
965 struct ceph_file_layout *layout;
966 int ret;
967 u64 bno;
968 struct timespec mtime = CURRENT_TIME;
969 struct rbd_request *req_data;
970 struct ceph_osd_request_head *reqhead;
971 struct ceph_osd_client *osdc;
972
973 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
974 if (!req_data) {
975 if (coll)
976 rbd_coll_end_req_index(rq, coll, coll_index,
977 -ENOMEM, len);
978 return -ENOMEM;
979 }
980
981 if (coll) {
982 req_data->coll = coll;
983 req_data->coll_index = coll_index;
984 }
985
986 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
987 (unsigned long long) ofs, (unsigned long long) len);
988
989 osdc = &rbd_dev->rbd_client->client->osdc;
990 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
991 false, GFP_NOIO, pages, bio);
992 if (!req) {
993 ret = -ENOMEM;
994 goto done_pages;
995 }
996
997 req->r_callback = rbd_cb;
998
999 req_data->rq = rq;
1000 req_data->bio = bio;
1001 req_data->pages = pages;
1002 req_data->len = len;
1003
1004 req->r_priv = req_data;
1005
1006 reqhead = req->r_request->front.iov_base;
1007 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
1008
1009 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
1010 req->r_oid_len = strlen(req->r_oid);
1011
1012 layout = &req->r_file_layout;
1013 memset(layout, 0, sizeof(*layout));
1014 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1015 layout->fl_stripe_count = cpu_to_le32(1);
1016 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
1017 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1018 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
1019 req, ops);
1020
1021 ceph_osdc_build_request(req, ofs, &len,
1022 ops,
1023 snapc,
1024 &mtime,
1025 req->r_oid, req->r_oid_len);
1026
1027 if (linger_req) {
1028 ceph_osdc_set_request_linger(osdc, req);
1029 *linger_req = req;
1030 }
1031
1032 ret = ceph_osdc_start_request(osdc, req, false);
1033 if (ret < 0)
1034 goto done_err;
1035
1036 if (!rbd_cb) {
1037 ret = ceph_osdc_wait_request(osdc, req);
1038 if (ver)
1039 *ver = le64_to_cpu(req->r_reassert_version.version);
1040 dout("reassert_ver=%llu\n",
1041 (unsigned long long)
1042 le64_to_cpu(req->r_reassert_version.version));
1043 ceph_osdc_put_request(req);
1044 }
1045 return ret;
1046
1047 done_err:
1048 bio_chain_put(req_data->bio);
1049 ceph_osdc_put_request(req);
1050 done_pages:
1051 rbd_coll_end_req(req_data, ret, len);
1052 kfree(req_data);
1053 return ret;
1054 }
1055
1056 /*
1057 * Ceph osd op callback
1058 */
1059 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1060 {
1061 struct rbd_request *req_data = req->r_priv;
1062 struct ceph_osd_reply_head *replyhead;
1063 struct ceph_osd_op *op;
1064 __s32 rc;
1065 u64 bytes;
1066 int read_op;
1067
1068 /* parse reply */
1069 replyhead = msg->front.iov_base;
1070 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1071 op = (void *)(replyhead + 1);
1072 rc = le32_to_cpu(replyhead->result);
1073 bytes = le64_to_cpu(op->extent.length);
1074 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1075
1076 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1077 (unsigned long long) bytes, read_op, (int) rc);
1078
1079 if (rc == -ENOENT && read_op) {
1080 zero_bio_chain(req_data->bio, 0);
1081 rc = 0;
1082 } else if (rc == 0 && read_op && bytes < req_data->len) {
1083 zero_bio_chain(req_data->bio, bytes);
1084 bytes = req_data->len;
1085 }
1086
1087 rbd_coll_end_req(req_data, rc, bytes);
1088
1089 if (req_data->bio)
1090 bio_chain_put(req_data->bio);
1091
1092 ceph_osdc_put_request(req);
1093 kfree(req_data);
1094 }
1095
1096 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1097 {
1098 ceph_osdc_put_request(req);
1099 }
1100
1101 /*
1102 * Do a synchronous ceph osd operation
1103 */
1104 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1105 struct ceph_snap_context *snapc,
1106 u64 snapid,
1107 int flags,
1108 struct ceph_osd_req_op *ops,
1109 const char *object_name,
1110 u64 ofs, u64 len,
1111 char *buf,
1112 struct ceph_osd_request **linger_req,
1113 u64 *ver)
1114 {
1115 int ret;
1116 struct page **pages;
1117 int num_pages;
1118
1119 rbd_assert(ops != NULL);
1120
1121 num_pages = calc_pages_for(ofs , len);
1122 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1123 if (IS_ERR(pages))
1124 return PTR_ERR(pages);
1125
1126 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1127 object_name, ofs, len, NULL,
1128 pages, num_pages,
1129 flags,
1130 ops,
1131 NULL, 0,
1132 NULL,
1133 linger_req, ver);
1134 if (ret < 0)
1135 goto done;
1136
1137 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1138 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1139
1140 done:
1141 ceph_release_page_vector(pages, num_pages);
1142 return ret;
1143 }
1144
1145 /*
1146 * Do an asynchronous ceph osd operation
1147 */
1148 static int rbd_do_op(struct request *rq,
1149 struct rbd_device *rbd_dev,
1150 struct ceph_snap_context *snapc,
1151 u64 snapid,
1152 int opcode, int flags,
1153 u64 ofs, u64 len,
1154 struct bio *bio,
1155 struct rbd_req_coll *coll,
1156 int coll_index)
1157 {
1158 char *seg_name;
1159 u64 seg_ofs;
1160 u64 seg_len;
1161 int ret;
1162 struct ceph_osd_req_op *ops;
1163 u32 payload_len;
1164
1165 seg_name = rbd_segment_name(rbd_dev, ofs);
1166 if (!seg_name)
1167 return -ENOMEM;
1168 seg_len = rbd_segment_length(rbd_dev, ofs, len);
1169 seg_ofs = rbd_segment_offset(rbd_dev, ofs);
1170
1171 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1172
1173 ret = -ENOMEM;
1174 ops = rbd_create_rw_ops(1, opcode, payload_len);
1175 if (!ops)
1176 goto done;
1177
1178 /* we've taken care of segment sizes earlier when we
1179 cloned the bios. We should never have a segment
1180 truncated at this point */
1181 rbd_assert(seg_len == len);
1182
1183 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1184 seg_name, seg_ofs, seg_len,
1185 bio,
1186 NULL, 0,
1187 flags,
1188 ops,
1189 coll, coll_index,
1190 rbd_req_cb, 0, NULL);
1191
1192 rbd_destroy_ops(ops);
1193 done:
1194 kfree(seg_name);
1195 return ret;
1196 }
1197
1198 /*
1199 * Request async osd write
1200 */
1201 static int rbd_req_write(struct request *rq,
1202 struct rbd_device *rbd_dev,
1203 struct ceph_snap_context *snapc,
1204 u64 ofs, u64 len,
1205 struct bio *bio,
1206 struct rbd_req_coll *coll,
1207 int coll_index)
1208 {
1209 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1210 CEPH_OSD_OP_WRITE,
1211 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1212 ofs, len, bio, coll, coll_index);
1213 }
1214
1215 /*
1216 * Request async osd read
1217 */
1218 static int rbd_req_read(struct request *rq,
1219 struct rbd_device *rbd_dev,
1220 u64 snapid,
1221 u64 ofs, u64 len,
1222 struct bio *bio,
1223 struct rbd_req_coll *coll,
1224 int coll_index)
1225 {
1226 return rbd_do_op(rq, rbd_dev, NULL,
1227 snapid,
1228 CEPH_OSD_OP_READ,
1229 CEPH_OSD_FLAG_READ,
1230 ofs, len, bio, coll, coll_index);
1231 }
1232
1233 /*
1234 * Request sync osd read
1235 */
1236 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1237 u64 snapid,
1238 const char *object_name,
1239 u64 ofs, u64 len,
1240 char *buf,
1241 u64 *ver)
1242 {
1243 struct ceph_osd_req_op *ops;
1244 int ret;
1245
1246 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1247 if (!ops)
1248 return -ENOMEM;
1249
1250 ret = rbd_req_sync_op(rbd_dev, NULL,
1251 snapid,
1252 CEPH_OSD_FLAG_READ,
1253 ops, object_name, ofs, len, buf, NULL, ver);
1254 rbd_destroy_ops(ops);
1255
1256 return ret;
1257 }
1258
1259 /*
1260 * Request sync osd watch
1261 */
1262 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1263 u64 ver,
1264 u64 notify_id)
1265 {
1266 struct ceph_osd_req_op *ops;
1267 int ret;
1268
1269 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1270 if (!ops)
1271 return -ENOMEM;
1272
1273 ops[0].watch.ver = cpu_to_le64(ver);
1274 ops[0].watch.cookie = notify_id;
1275 ops[0].watch.flag = 0;
1276
1277 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1278 rbd_dev->header_name, 0, 0, NULL,
1279 NULL, 0,
1280 CEPH_OSD_FLAG_READ,
1281 ops,
1282 NULL, 0,
1283 rbd_simple_req_cb, 0, NULL);
1284
1285 rbd_destroy_ops(ops);
1286 return ret;
1287 }
1288
1289 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1290 {
1291 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1292 u64 hver;
1293 int rc;
1294
1295 if (!rbd_dev)
1296 return;
1297
1298 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1299 rbd_dev->header_name, (unsigned long long) notify_id,
1300 (unsigned int) opcode);
1301 rc = rbd_refresh_header(rbd_dev, &hver);
1302 if (rc)
1303 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1304 " update snaps: %d\n", rbd_dev->major, rc);
1305
1306 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1307 }
1308
1309 /*
1310 * Request sync osd watch
1311 */
1312 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1313 {
1314 struct ceph_osd_req_op *ops;
1315 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1316 int ret;
1317
1318 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1319 if (!ops)
1320 return -ENOMEM;
1321
1322 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1323 (void *)rbd_dev, &rbd_dev->watch_event);
1324 if (ret < 0)
1325 goto fail;
1326
1327 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1328 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1329 ops[0].watch.flag = 1;
1330
1331 ret = rbd_req_sync_op(rbd_dev, NULL,
1332 CEPH_NOSNAP,
1333 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1334 ops,
1335 rbd_dev->header_name,
1336 0, 0, NULL,
1337 &rbd_dev->watch_request, NULL);
1338
1339 if (ret < 0)
1340 goto fail_event;
1341
1342 rbd_destroy_ops(ops);
1343 return 0;
1344
1345 fail_event:
1346 ceph_osdc_cancel_event(rbd_dev->watch_event);
1347 rbd_dev->watch_event = NULL;
1348 fail:
1349 rbd_destroy_ops(ops);
1350 return ret;
1351 }
1352
1353 /*
1354 * Request sync osd unwatch
1355 */
1356 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1357 {
1358 struct ceph_osd_req_op *ops;
1359 int ret;
1360
1361 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1362 if (!ops)
1363 return -ENOMEM;
1364
1365 ops[0].watch.ver = 0;
1366 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1367 ops[0].watch.flag = 0;
1368
1369 ret = rbd_req_sync_op(rbd_dev, NULL,
1370 CEPH_NOSNAP,
1371 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1372 ops,
1373 rbd_dev->header_name,
1374 0, 0, NULL, NULL, NULL);
1375
1376
1377 rbd_destroy_ops(ops);
1378 ceph_osdc_cancel_event(rbd_dev->watch_event);
1379 rbd_dev->watch_event = NULL;
1380 return ret;
1381 }
1382
1383 struct rbd_notify_info {
1384 struct rbd_device *rbd_dev;
1385 };
1386
1387 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1388 {
1389 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1390 if (!rbd_dev)
1391 return;
1392
1393 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1394 rbd_dev->header_name, (unsigned long long) notify_id,
1395 (unsigned int) opcode);
1396 }
1397
1398 /*
1399 * Request sync osd notify
1400 */
1401 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1402 {
1403 struct ceph_osd_req_op *ops;
1404 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1405 struct ceph_osd_event *event;
1406 struct rbd_notify_info info;
1407 int payload_len = sizeof(u32) + sizeof(u32);
1408 int ret;
1409
1410 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1411 if (!ops)
1412 return -ENOMEM;
1413
1414 info.rbd_dev = rbd_dev;
1415
1416 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1417 (void *)&info, &event);
1418 if (ret < 0)
1419 goto fail;
1420
1421 ops[0].watch.ver = 1;
1422 ops[0].watch.flag = 1;
1423 ops[0].watch.cookie = event->cookie;
1424 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1425 ops[0].watch.timeout = 12;
1426
1427 ret = rbd_req_sync_op(rbd_dev, NULL,
1428 CEPH_NOSNAP,
1429 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1430 ops,
1431 rbd_dev->header_name,
1432 0, 0, NULL, NULL, NULL);
1433 if (ret < 0)
1434 goto fail_event;
1435
1436 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1437 dout("ceph_osdc_wait_event returned %d\n", ret);
1438 rbd_destroy_ops(ops);
1439 return 0;
1440
1441 fail_event:
1442 ceph_osdc_cancel_event(event);
1443 fail:
1444 rbd_destroy_ops(ops);
1445 return ret;
1446 }
1447
1448 /*
1449 * Request sync osd read
1450 */
1451 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1452 const char *object_name,
1453 const char *class_name,
1454 const char *method_name,
1455 const char *data,
1456 int len,
1457 u64 *ver)
1458 {
1459 struct ceph_osd_req_op *ops;
1460 int class_name_len = strlen(class_name);
1461 int method_name_len = strlen(method_name);
1462 int ret;
1463
1464 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1465 class_name_len + method_name_len + len);
1466 if (!ops)
1467 return -ENOMEM;
1468
1469 ops[0].cls.class_name = class_name;
1470 ops[0].cls.class_len = (__u8) class_name_len;
1471 ops[0].cls.method_name = method_name;
1472 ops[0].cls.method_len = (__u8) method_name_len;
1473 ops[0].cls.argc = 0;
1474 ops[0].cls.indata = data;
1475 ops[0].cls.indata_len = len;
1476
1477 ret = rbd_req_sync_op(rbd_dev, NULL,
1478 CEPH_NOSNAP,
1479 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1480 ops,
1481 object_name, 0, 0, NULL, NULL, ver);
1482
1483 rbd_destroy_ops(ops);
1484
1485 dout("cls_exec returned %d\n", ret);
1486 return ret;
1487 }
1488
1489 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1490 {
1491 struct rbd_req_coll *coll =
1492 kzalloc(sizeof(struct rbd_req_coll) +
1493 sizeof(struct rbd_req_status) * num_reqs,
1494 GFP_ATOMIC);
1495
1496 if (!coll)
1497 return NULL;
1498 coll->total = num_reqs;
1499 kref_init(&coll->kref);
1500 return coll;
1501 }
1502
1503 /*
1504 * block device queue callback
1505 */
1506 static void rbd_rq_fn(struct request_queue *q)
1507 {
1508 struct rbd_device *rbd_dev = q->queuedata;
1509 struct request *rq;
1510 struct bio_pair *bp = NULL;
1511
1512 while ((rq = blk_fetch_request(q))) {
1513 struct bio *bio;
1514 struct bio *rq_bio, *next_bio = NULL;
1515 bool do_write;
1516 unsigned int size;
1517 u64 op_size = 0;
1518 u64 ofs;
1519 int num_segs, cur_seg = 0;
1520 struct rbd_req_coll *coll;
1521 struct ceph_snap_context *snapc;
1522
1523 dout("fetched request\n");
1524
1525 /* filter out block requests we don't understand */
1526 if ((rq->cmd_type != REQ_TYPE_FS)) {
1527 __blk_end_request_all(rq, 0);
1528 continue;
1529 }
1530
1531 /* deduce our operation (read, write) */
1532 do_write = (rq_data_dir(rq) == WRITE);
1533
1534 size = blk_rq_bytes(rq);
1535 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1536 rq_bio = rq->bio;
1537 if (do_write && rbd_dev->read_only) {
1538 __blk_end_request_all(rq, -EROFS);
1539 continue;
1540 }
1541
1542 spin_unlock_irq(q->queue_lock);
1543
1544 down_read(&rbd_dev->header_rwsem);
1545
1546 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1547 up_read(&rbd_dev->header_rwsem);
1548 dout("request for non-existent snapshot");
1549 spin_lock_irq(q->queue_lock);
1550 __blk_end_request_all(rq, -ENXIO);
1551 continue;
1552 }
1553
1554 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1555
1556 up_read(&rbd_dev->header_rwsem);
1557
1558 dout("%s 0x%x bytes at 0x%llx\n",
1559 do_write ? "write" : "read",
1560 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1561
1562 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1563 if (num_segs <= 0) {
1564 spin_lock_irq(q->queue_lock);
1565 __blk_end_request_all(rq, num_segs);
1566 ceph_put_snap_context(snapc);
1567 continue;
1568 }
1569 coll = rbd_alloc_coll(num_segs);
1570 if (!coll) {
1571 spin_lock_irq(q->queue_lock);
1572 __blk_end_request_all(rq, -ENOMEM);
1573 ceph_put_snap_context(snapc);
1574 continue;
1575 }
1576
1577 do {
1578 /* a bio clone to be passed down to OSD req */
1579 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1580 op_size = rbd_segment_length(rbd_dev, ofs, size);
1581 kref_get(&coll->kref);
1582 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1583 op_size, GFP_ATOMIC);
1584 if (!bio) {
1585 rbd_coll_end_req_index(rq, coll, cur_seg,
1586 -ENOMEM, op_size);
1587 goto next_seg;
1588 }
1589
1590
1591 /* init OSD command: write or read */
1592 if (do_write)
1593 rbd_req_write(rq, rbd_dev,
1594 snapc,
1595 ofs,
1596 op_size, bio,
1597 coll, cur_seg);
1598 else
1599 rbd_req_read(rq, rbd_dev,
1600 rbd_dev->snap_id,
1601 ofs,
1602 op_size, bio,
1603 coll, cur_seg);
1604
1605 next_seg:
1606 size -= op_size;
1607 ofs += op_size;
1608
1609 cur_seg++;
1610 rq_bio = next_bio;
1611 } while (size > 0);
1612 kref_put(&coll->kref, rbd_coll_release);
1613
1614 if (bp)
1615 bio_pair_release(bp);
1616 spin_lock_irq(q->queue_lock);
1617
1618 ceph_put_snap_context(snapc);
1619 }
1620 }
1621
1622 /*
1623 * a queue callback. Makes sure that we don't create a bio that spans across
1624 * multiple osd objects. One exception would be with a single page bios,
1625 * which we handle later at bio_chain_clone
1626 */
1627 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1628 struct bio_vec *bvec)
1629 {
1630 struct rbd_device *rbd_dev = q->queuedata;
1631 unsigned int chunk_sectors;
1632 sector_t sector;
1633 unsigned int bio_sectors;
1634 int max;
1635
1636 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1637 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1638 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1639
1640 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1641 + bio_sectors)) << SECTOR_SHIFT;
1642 if (max < 0)
1643 max = 0; /* bio_add cannot handle a negative return */
1644 if (max <= bvec->bv_len && bio_sectors == 0)
1645 return bvec->bv_len;
1646 return max;
1647 }
1648
1649 static void rbd_free_disk(struct rbd_device *rbd_dev)
1650 {
1651 struct gendisk *disk = rbd_dev->disk;
1652
1653 if (!disk)
1654 return;
1655
1656 rbd_header_free(&rbd_dev->header);
1657
1658 if (disk->flags & GENHD_FL_UP)
1659 del_gendisk(disk);
1660 if (disk->queue)
1661 blk_cleanup_queue(disk->queue);
1662 put_disk(disk);
1663 }
1664
1665 /*
1666 * Read the complete header for the given rbd device.
1667 *
1668 * Returns a pointer to a dynamically-allocated buffer containing
1669 * the complete and validated header. Caller can pass the address
1670 * of a variable that will be filled in with the version of the
1671 * header object at the time it was read.
1672 *
1673 * Returns a pointer-coded errno if a failure occurs.
1674 */
1675 static struct rbd_image_header_ondisk *
1676 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
1677 {
1678 struct rbd_image_header_ondisk *ondisk = NULL;
1679 u32 snap_count = 0;
1680 u64 names_size = 0;
1681 u32 want_count;
1682 int ret;
1683
1684 /*
1685 * The complete header will include an array of its 64-bit
1686 * snapshot ids, followed by the names of those snapshots as
1687 * a contiguous block of NUL-terminated strings. Note that
1688 * the number of snapshots could change by the time we read
1689 * it in, in which case we re-read it.
1690 */
1691 do {
1692 size_t size;
1693
1694 kfree(ondisk);
1695
1696 size = sizeof (*ondisk);
1697 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1698 size += names_size;
1699 ondisk = kmalloc(size, GFP_KERNEL);
1700 if (!ondisk)
1701 return ERR_PTR(-ENOMEM);
1702
1703 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
1704 rbd_dev->header_name,
1705 0, size,
1706 (char *) ondisk, version);
1707
1708 if (ret < 0)
1709 goto out_err;
1710 if (WARN_ON((size_t) ret < size)) {
1711 ret = -ENXIO;
1712 pr_warning("short header read for image %s"
1713 " (want %zd got %d)\n",
1714 rbd_dev->image_name, size, ret);
1715 goto out_err;
1716 }
1717 if (!rbd_dev_ondisk_valid(ondisk)) {
1718 ret = -ENXIO;
1719 pr_warning("invalid header for image %s\n",
1720 rbd_dev->image_name);
1721 goto out_err;
1722 }
1723
1724 names_size = le64_to_cpu(ondisk->snap_names_len);
1725 want_count = snap_count;
1726 snap_count = le32_to_cpu(ondisk->snap_count);
1727 } while (snap_count != want_count);
1728
1729 return ondisk;
1730
1731 out_err:
1732 kfree(ondisk);
1733
1734 return ERR_PTR(ret);
1735 }
1736
1737 /*
1738 * reload the ondisk the header
1739 */
1740 static int rbd_read_header(struct rbd_device *rbd_dev,
1741 struct rbd_image_header *header)
1742 {
1743 struct rbd_image_header_ondisk *ondisk;
1744 u64 ver = 0;
1745 int ret;
1746
1747 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1748 if (IS_ERR(ondisk))
1749 return PTR_ERR(ondisk);
1750 ret = rbd_header_from_disk(header, ondisk);
1751 if (ret >= 0)
1752 header->obj_version = ver;
1753 kfree(ondisk);
1754
1755 return ret;
1756 }
1757
1758 /*
1759 * create a snapshot
1760 */
1761 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1762 const char *snap_name,
1763 gfp_t gfp_flags)
1764 {
1765 int name_len = strlen(snap_name);
1766 u64 new_snapid;
1767 int ret;
1768 void *data, *p, *e;
1769 struct ceph_mon_client *monc;
1770
1771 /* we should create a snapshot only if we're pointing at the head */
1772 if (rbd_dev->snap_id != CEPH_NOSNAP)
1773 return -EINVAL;
1774
1775 monc = &rbd_dev->rbd_client->client->monc;
1776 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1777 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1778 if (ret < 0)
1779 return ret;
1780
1781 data = kmalloc(name_len + 16, gfp_flags);
1782 if (!data)
1783 return -ENOMEM;
1784
1785 p = data;
1786 e = data + name_len + 16;
1787
1788 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1789 ceph_encode_64_safe(&p, e, new_snapid, bad);
1790
1791 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1792 "rbd", "snap_add",
1793 data, p - data, NULL);
1794
1795 kfree(data);
1796
1797 return ret < 0 ? ret : 0;
1798 bad:
1799 return -ERANGE;
1800 }
1801
1802 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1803 {
1804 struct rbd_snap *snap;
1805 struct rbd_snap *next;
1806
1807 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1808 __rbd_remove_snap_dev(snap);
1809 }
1810
1811 /*
1812 * only read the first part of the ondisk header, without the snaps info
1813 */
1814 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1815 {
1816 int ret;
1817 struct rbd_image_header h;
1818
1819 ret = rbd_read_header(rbd_dev, &h);
1820 if (ret < 0)
1821 return ret;
1822
1823 down_write(&rbd_dev->header_rwsem);
1824
1825 /* resized? */
1826 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1827 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1828
1829 dout("setting size to %llu sectors", (unsigned long long) size);
1830 set_capacity(rbd_dev->disk, size);
1831 }
1832
1833 /* rbd_dev->header.object_prefix shouldn't change */
1834 kfree(rbd_dev->header.snap_sizes);
1835 kfree(rbd_dev->header.snap_names);
1836 /* osd requests may still refer to snapc */
1837 ceph_put_snap_context(rbd_dev->header.snapc);
1838
1839 if (hver)
1840 *hver = h.obj_version;
1841 rbd_dev->header.obj_version = h.obj_version;
1842 rbd_dev->header.image_size = h.image_size;
1843 rbd_dev->header.total_snaps = h.total_snaps;
1844 rbd_dev->header.snapc = h.snapc;
1845 rbd_dev->header.snap_names = h.snap_names;
1846 rbd_dev->header.snap_sizes = h.snap_sizes;
1847 /* Free the extra copy of the object prefix */
1848 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1849 kfree(h.object_prefix);
1850
1851 ret = __rbd_init_snaps_header(rbd_dev);
1852
1853 up_write(&rbd_dev->header_rwsem);
1854
1855 return ret;
1856 }
1857
1858 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1859 {
1860 int ret;
1861
1862 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1863 ret = __rbd_refresh_header(rbd_dev, hver);
1864 mutex_unlock(&ctl_mutex);
1865
1866 return ret;
1867 }
1868
1869 static int rbd_init_disk(struct rbd_device *rbd_dev)
1870 {
1871 struct gendisk *disk;
1872 struct request_queue *q;
1873 int rc;
1874 u64 segment_size;
1875 u64 total_size = 0;
1876
1877 /* contact OSD, request size info about the object being mapped */
1878 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1879 if (rc)
1880 return rc;
1881
1882 /* no need to lock here, as rbd_dev is not registered yet */
1883 rc = __rbd_init_snaps_header(rbd_dev);
1884 if (rc)
1885 return rc;
1886
1887 rc = rbd_header_set_snap(rbd_dev, &total_size);
1888 if (rc)
1889 return rc;
1890
1891 /* create gendisk info */
1892 rc = -ENOMEM;
1893 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1894 if (!disk)
1895 goto out;
1896
1897 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1898 rbd_dev->dev_id);
1899 disk->major = rbd_dev->major;
1900 disk->first_minor = 0;
1901 disk->fops = &rbd_bd_ops;
1902 disk->private_data = rbd_dev;
1903
1904 /* init rq */
1905 rc = -ENOMEM;
1906 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1907 if (!q)
1908 goto out_disk;
1909
1910 /* We use the default size, but let's be explicit about it. */
1911 blk_queue_physical_block_size(q, SECTOR_SIZE);
1912
1913 /* set io sizes to object size */
1914 segment_size = rbd_obj_bytes(&rbd_dev->header);
1915 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1916 blk_queue_max_segment_size(q, segment_size);
1917 blk_queue_io_min(q, segment_size);
1918 blk_queue_io_opt(q, segment_size);
1919
1920 blk_queue_merge_bvec(q, rbd_merge_bvec);
1921 disk->queue = q;
1922
1923 q->queuedata = rbd_dev;
1924
1925 rbd_dev->disk = disk;
1926 rbd_dev->q = q;
1927
1928 /* finally, announce the disk to the world */
1929 set_capacity(disk, total_size / SECTOR_SIZE);
1930 add_disk(disk);
1931
1932 pr_info("%s: added with size 0x%llx\n",
1933 disk->disk_name, (unsigned long long)total_size);
1934 return 0;
1935
1936 out_disk:
1937 put_disk(disk);
1938 out:
1939 return rc;
1940 }
1941
1942 /*
1943 sysfs
1944 */
1945
1946 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1947 {
1948 return container_of(dev, struct rbd_device, dev);
1949 }
1950
1951 static ssize_t rbd_size_show(struct device *dev,
1952 struct device_attribute *attr, char *buf)
1953 {
1954 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1955 sector_t size;
1956
1957 down_read(&rbd_dev->header_rwsem);
1958 size = get_capacity(rbd_dev->disk);
1959 up_read(&rbd_dev->header_rwsem);
1960
1961 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1962 }
1963
1964 static ssize_t rbd_major_show(struct device *dev,
1965 struct device_attribute *attr, char *buf)
1966 {
1967 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1968
1969 return sprintf(buf, "%d\n", rbd_dev->major);
1970 }
1971
1972 static ssize_t rbd_client_id_show(struct device *dev,
1973 struct device_attribute *attr, char *buf)
1974 {
1975 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1976
1977 return sprintf(buf, "client%lld\n",
1978 ceph_client_id(rbd_dev->rbd_client->client));
1979 }
1980
1981 static ssize_t rbd_pool_show(struct device *dev,
1982 struct device_attribute *attr, char *buf)
1983 {
1984 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1985
1986 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1987 }
1988
1989 static ssize_t rbd_pool_id_show(struct device *dev,
1990 struct device_attribute *attr, char *buf)
1991 {
1992 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1993
1994 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1995 }
1996
1997 static ssize_t rbd_name_show(struct device *dev,
1998 struct device_attribute *attr, char *buf)
1999 {
2000 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2001
2002 return sprintf(buf, "%s\n", rbd_dev->image_name);
2003 }
2004
2005 static ssize_t rbd_snap_show(struct device *dev,
2006 struct device_attribute *attr,
2007 char *buf)
2008 {
2009 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2010
2011 return sprintf(buf, "%s\n", rbd_dev->snap_name);
2012 }
2013
2014 static ssize_t rbd_image_refresh(struct device *dev,
2015 struct device_attribute *attr,
2016 const char *buf,
2017 size_t size)
2018 {
2019 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2020 int ret;
2021
2022 ret = rbd_refresh_header(rbd_dev, NULL);
2023
2024 return ret < 0 ? ret : size;
2025 }
2026
2027 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
2028 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2029 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2030 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
2031 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
2032 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2033 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2034 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2035 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
2036
2037 static struct attribute *rbd_attrs[] = {
2038 &dev_attr_size.attr,
2039 &dev_attr_major.attr,
2040 &dev_attr_client_id.attr,
2041 &dev_attr_pool.attr,
2042 &dev_attr_pool_id.attr,
2043 &dev_attr_name.attr,
2044 &dev_attr_current_snap.attr,
2045 &dev_attr_refresh.attr,
2046 &dev_attr_create_snap.attr,
2047 NULL
2048 };
2049
2050 static struct attribute_group rbd_attr_group = {
2051 .attrs = rbd_attrs,
2052 };
2053
2054 static const struct attribute_group *rbd_attr_groups[] = {
2055 &rbd_attr_group,
2056 NULL
2057 };
2058
2059 static void rbd_sysfs_dev_release(struct device *dev)
2060 {
2061 }
2062
2063 static struct device_type rbd_device_type = {
2064 .name = "rbd",
2065 .groups = rbd_attr_groups,
2066 .release = rbd_sysfs_dev_release,
2067 };
2068
2069
2070 /*
2071 sysfs - snapshots
2072 */
2073
2074 static ssize_t rbd_snap_size_show(struct device *dev,
2075 struct device_attribute *attr,
2076 char *buf)
2077 {
2078 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2079
2080 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
2081 }
2082
2083 static ssize_t rbd_snap_id_show(struct device *dev,
2084 struct device_attribute *attr,
2085 char *buf)
2086 {
2087 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2088
2089 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
2090 }
2091
2092 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2093 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2094
2095 static struct attribute *rbd_snap_attrs[] = {
2096 &dev_attr_snap_size.attr,
2097 &dev_attr_snap_id.attr,
2098 NULL,
2099 };
2100
2101 static struct attribute_group rbd_snap_attr_group = {
2102 .attrs = rbd_snap_attrs,
2103 };
2104
2105 static void rbd_snap_dev_release(struct device *dev)
2106 {
2107 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2108 kfree(snap->name);
2109 kfree(snap);
2110 }
2111
2112 static const struct attribute_group *rbd_snap_attr_groups[] = {
2113 &rbd_snap_attr_group,
2114 NULL
2115 };
2116
2117 static struct device_type rbd_snap_device_type = {
2118 .groups = rbd_snap_attr_groups,
2119 .release = rbd_snap_dev_release,
2120 };
2121
2122 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2123 {
2124 list_del(&snap->node);
2125 device_unregister(&snap->dev);
2126 }
2127
2128 static int rbd_register_snap_dev(struct rbd_snap *snap,
2129 struct device *parent)
2130 {
2131 struct device *dev = &snap->dev;
2132 int ret;
2133
2134 dev->type = &rbd_snap_device_type;
2135 dev->parent = parent;
2136 dev->release = rbd_snap_dev_release;
2137 dev_set_name(dev, "snap_%s", snap->name);
2138 ret = device_register(dev);
2139
2140 return ret;
2141 }
2142
2143 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2144 int i, const char *name)
2145 {
2146 struct rbd_snap *snap;
2147 int ret;
2148
2149 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2150 if (!snap)
2151 return ERR_PTR(-ENOMEM);
2152
2153 ret = -ENOMEM;
2154 snap->name = kstrdup(name, GFP_KERNEL);
2155 if (!snap->name)
2156 goto err;
2157
2158 snap->size = rbd_dev->header.snap_sizes[i];
2159 snap->id = rbd_dev->header.snapc->snaps[i];
2160 if (device_is_registered(&rbd_dev->dev)) {
2161 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2162 if (ret < 0)
2163 goto err;
2164 }
2165
2166 return snap;
2167
2168 err:
2169 kfree(snap->name);
2170 kfree(snap);
2171
2172 return ERR_PTR(ret);
2173 }
2174
2175 /*
2176 * Scan the rbd device's current snapshot list and compare it to the
2177 * newly-received snapshot context. Remove any existing snapshots
2178 * not present in the new snapshot context. Add a new snapshot for
2179 * any snaphots in the snapshot context not in the current list.
2180 * And verify there are no changes to snapshots we already know
2181 * about.
2182 *
2183 * Assumes the snapshots in the snapshot context are sorted by
2184 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2185 * are also maintained in that order.)
2186 */
2187 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2188 {
2189 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2190 const u32 snap_count = snapc->num_snaps;
2191 char *snap_name = rbd_dev->header.snap_names;
2192 struct list_head *head = &rbd_dev->snaps;
2193 struct list_head *links = head->next;
2194 u32 index = 0;
2195
2196 while (index < snap_count || links != head) {
2197 u64 snap_id;
2198 struct rbd_snap *snap;
2199
2200 snap_id = index < snap_count ? snapc->snaps[index]
2201 : CEPH_NOSNAP;
2202 snap = links != head ? list_entry(links, struct rbd_snap, node)
2203 : NULL;
2204 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
2205
2206 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2207 struct list_head *next = links->next;
2208
2209 /* Existing snapshot not in the new snap context */
2210
2211 if (rbd_dev->snap_id == snap->id)
2212 rbd_dev->snap_exists = false;
2213 __rbd_remove_snap_dev(snap);
2214
2215 /* Done with this list entry; advance */
2216
2217 links = next;
2218 continue;
2219 }
2220
2221 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2222 struct rbd_snap *new_snap;
2223
2224 /* We haven't seen this snapshot before */
2225
2226 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2227 snap_name);
2228 if (IS_ERR(new_snap))
2229 return PTR_ERR(new_snap);
2230
2231 /* New goes before existing, or at end of list */
2232
2233 if (snap)
2234 list_add_tail(&new_snap->node, &snap->node);
2235 else
2236 list_add_tail(&new_snap->node, head);
2237 } else {
2238 /* Already have this one */
2239
2240 rbd_assert(snap->size ==
2241 rbd_dev->header.snap_sizes[index]);
2242 rbd_assert(!strcmp(snap->name, snap_name));
2243
2244 /* Done with this list entry; advance */
2245
2246 links = links->next;
2247 }
2248
2249 /* Advance to the next entry in the snapshot context */
2250
2251 index++;
2252 snap_name += strlen(snap_name) + 1;
2253 }
2254
2255 return 0;
2256 }
2257
2258 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2259 {
2260 int ret;
2261 struct device *dev;
2262 struct rbd_snap *snap;
2263
2264 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2265 dev = &rbd_dev->dev;
2266
2267 dev->bus = &rbd_bus_type;
2268 dev->type = &rbd_device_type;
2269 dev->parent = &rbd_root_dev;
2270 dev->release = rbd_dev_release;
2271 dev_set_name(dev, "%d", rbd_dev->dev_id);
2272 ret = device_register(dev);
2273 if (ret < 0)
2274 goto out;
2275
2276 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2277 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2278 if (ret < 0)
2279 break;
2280 }
2281 out:
2282 mutex_unlock(&ctl_mutex);
2283 return ret;
2284 }
2285
2286 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2287 {
2288 device_unregister(&rbd_dev->dev);
2289 }
2290
2291 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2292 {
2293 int ret, rc;
2294
2295 do {
2296 ret = rbd_req_sync_watch(rbd_dev);
2297 if (ret == -ERANGE) {
2298 rc = rbd_refresh_header(rbd_dev, NULL);
2299 if (rc < 0)
2300 return rc;
2301 }
2302 } while (ret == -ERANGE);
2303
2304 return ret;
2305 }
2306
2307 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
2308
2309 /*
2310 * Get a unique rbd identifier for the given new rbd_dev, and add
2311 * the rbd_dev to the global list. The minimum rbd id is 1.
2312 */
2313 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
2314 {
2315 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
2316
2317 spin_lock(&rbd_dev_list_lock);
2318 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2319 spin_unlock(&rbd_dev_list_lock);
2320 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
2321 (unsigned long long) rbd_dev->dev_id);
2322 }
2323
2324 /*
2325 * Remove an rbd_dev from the global list, and record that its
2326 * identifier is no longer in use.
2327 */
2328 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
2329 {
2330 struct list_head *tmp;
2331 int rbd_id = rbd_dev->dev_id;
2332 int max_id;
2333
2334 rbd_assert(rbd_id > 0);
2335
2336 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
2337 (unsigned long long) rbd_dev->dev_id);
2338 spin_lock(&rbd_dev_list_lock);
2339 list_del_init(&rbd_dev->node);
2340
2341 /*
2342 * If the id being "put" is not the current maximum, there
2343 * is nothing special we need to do.
2344 */
2345 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
2346 spin_unlock(&rbd_dev_list_lock);
2347 return;
2348 }
2349
2350 /*
2351 * We need to update the current maximum id. Search the
2352 * list to find out what it is. We're more likely to find
2353 * the maximum at the end, so search the list backward.
2354 */
2355 max_id = 0;
2356 list_for_each_prev(tmp, &rbd_dev_list) {
2357 struct rbd_device *rbd_dev;
2358
2359 rbd_dev = list_entry(tmp, struct rbd_device, node);
2360 if (rbd_id > max_id)
2361 max_id = rbd_id;
2362 }
2363 spin_unlock(&rbd_dev_list_lock);
2364
2365 /*
2366 * The max id could have been updated by rbd_dev_id_get(), in
2367 * which case it now accurately reflects the new maximum.
2368 * Be careful not to overwrite the maximum value in that
2369 * case.
2370 */
2371 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
2372 dout(" max dev id has been reset\n");
2373 }
2374
2375 /*
2376 * Skips over white space at *buf, and updates *buf to point to the
2377 * first found non-space character (if any). Returns the length of
2378 * the token (string of non-white space characters) found. Note
2379 * that *buf must be terminated with '\0'.
2380 */
2381 static inline size_t next_token(const char **buf)
2382 {
2383 /*
2384 * These are the characters that produce nonzero for
2385 * isspace() in the "C" and "POSIX" locales.
2386 */
2387 const char *spaces = " \f\n\r\t\v";
2388
2389 *buf += strspn(*buf, spaces); /* Find start of token */
2390
2391 return strcspn(*buf, spaces); /* Return token length */
2392 }
2393
2394 /*
2395 * Finds the next token in *buf, and if the provided token buffer is
2396 * big enough, copies the found token into it. The result, if
2397 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2398 * must be terminated with '\0' on entry.
2399 *
2400 * Returns the length of the token found (not including the '\0').
2401 * Return value will be 0 if no token is found, and it will be >=
2402 * token_size if the token would not fit.
2403 *
2404 * The *buf pointer will be updated to point beyond the end of the
2405 * found token. Note that this occurs even if the token buffer is
2406 * too small to hold it.
2407 */
2408 static inline size_t copy_token(const char **buf,
2409 char *token,
2410 size_t token_size)
2411 {
2412 size_t len;
2413
2414 len = next_token(buf);
2415 if (len < token_size) {
2416 memcpy(token, *buf, len);
2417 *(token + len) = '\0';
2418 }
2419 *buf += len;
2420
2421 return len;
2422 }
2423
2424 /*
2425 * Finds the next token in *buf, dynamically allocates a buffer big
2426 * enough to hold a copy of it, and copies the token into the new
2427 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2428 * that a duplicate buffer is created even for a zero-length token.
2429 *
2430 * Returns a pointer to the newly-allocated duplicate, or a null
2431 * pointer if memory for the duplicate was not available. If
2432 * the lenp argument is a non-null pointer, the length of the token
2433 * (not including the '\0') is returned in *lenp.
2434 *
2435 * If successful, the *buf pointer will be updated to point beyond
2436 * the end of the found token.
2437 *
2438 * Note: uses GFP_KERNEL for allocation.
2439 */
2440 static inline char *dup_token(const char **buf, size_t *lenp)
2441 {
2442 char *dup;
2443 size_t len;
2444
2445 len = next_token(buf);
2446 dup = kmalloc(len + 1, GFP_KERNEL);
2447 if (!dup)
2448 return NULL;
2449
2450 memcpy(dup, *buf, len);
2451 *(dup + len) = '\0';
2452 *buf += len;
2453
2454 if (lenp)
2455 *lenp = len;
2456
2457 return dup;
2458 }
2459
2460 /*
2461 * This fills in the pool_name, image_name, image_name_len, snap_name,
2462 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2463 * on the list of monitor addresses and other options provided via
2464 * /sys/bus/rbd/add.
2465 *
2466 * Note: rbd_dev is assumed to have been initially zero-filled.
2467 */
2468 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2469 const char *buf,
2470 const char **mon_addrs,
2471 size_t *mon_addrs_size,
2472 char *options,
2473 size_t options_size)
2474 {
2475 size_t len;
2476 int ret;
2477
2478 /* The first four tokens are required */
2479
2480 len = next_token(&buf);
2481 if (!len)
2482 return -EINVAL;
2483 *mon_addrs_size = len + 1;
2484 *mon_addrs = buf;
2485
2486 buf += len;
2487
2488 len = copy_token(&buf, options, options_size);
2489 if (!len || len >= options_size)
2490 return -EINVAL;
2491
2492 ret = -ENOMEM;
2493 rbd_dev->pool_name = dup_token(&buf, NULL);
2494 if (!rbd_dev->pool_name)
2495 goto out_err;
2496
2497 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2498 if (!rbd_dev->image_name)
2499 goto out_err;
2500
2501 /* Create the name of the header object */
2502
2503 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2504 + sizeof (RBD_SUFFIX),
2505 GFP_KERNEL);
2506 if (!rbd_dev->header_name)
2507 goto out_err;
2508 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2509
2510 /*
2511 * The snapshot name is optional. If none is is supplied,
2512 * we use the default value.
2513 */
2514 rbd_dev->snap_name = dup_token(&buf, &len);
2515 if (!rbd_dev->snap_name)
2516 goto out_err;
2517 if (!len) {
2518 /* Replace the empty name with the default */
2519 kfree(rbd_dev->snap_name);
2520 rbd_dev->snap_name
2521 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2522 if (!rbd_dev->snap_name)
2523 goto out_err;
2524
2525 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2526 sizeof (RBD_SNAP_HEAD_NAME));
2527 }
2528
2529 return 0;
2530
2531 out_err:
2532 kfree(rbd_dev->header_name);
2533 rbd_dev->header_name = NULL;
2534 kfree(rbd_dev->image_name);
2535 rbd_dev->image_name = NULL;
2536 rbd_dev->image_name_len = 0;
2537 kfree(rbd_dev->pool_name);
2538 rbd_dev->pool_name = NULL;
2539
2540 return ret;
2541 }
2542
2543 static ssize_t rbd_add(struct bus_type *bus,
2544 const char *buf,
2545 size_t count)
2546 {
2547 char *options;
2548 struct rbd_device *rbd_dev = NULL;
2549 const char *mon_addrs = NULL;
2550 size_t mon_addrs_size = 0;
2551 struct ceph_osd_client *osdc;
2552 int rc = -ENOMEM;
2553
2554 if (!try_module_get(THIS_MODULE))
2555 return -ENODEV;
2556
2557 options = kmalloc(count, GFP_KERNEL);
2558 if (!options)
2559 goto err_nomem;
2560 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2561 if (!rbd_dev)
2562 goto err_nomem;
2563
2564 /* static rbd_device initialization */
2565 spin_lock_init(&rbd_dev->lock);
2566 INIT_LIST_HEAD(&rbd_dev->node);
2567 INIT_LIST_HEAD(&rbd_dev->snaps);
2568 init_rwsem(&rbd_dev->header_rwsem);
2569
2570 /* generate unique id: find highest unique id, add one */
2571 rbd_dev_id_get(rbd_dev);
2572
2573 /* Fill in the device name, now that we have its id. */
2574 BUILD_BUG_ON(DEV_NAME_LEN
2575 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2576 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2577
2578 /* parse add command */
2579 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2580 options, count);
2581 if (rc)
2582 goto err_put_id;
2583
2584 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2585 if (rc < 0)
2586 goto err_put_id;
2587
2588 /* pick the pool */
2589 osdc = &rbd_dev->rbd_client->client->osdc;
2590 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2591 if (rc < 0)
2592 goto err_out_client;
2593 rbd_dev->pool_id = rc;
2594
2595 /* register our block device */
2596 rc = register_blkdev(0, rbd_dev->name);
2597 if (rc < 0)
2598 goto err_out_client;
2599 rbd_dev->major = rc;
2600
2601 rc = rbd_bus_add_dev(rbd_dev);
2602 if (rc)
2603 goto err_out_blkdev;
2604
2605 /*
2606 * At this point cleanup in the event of an error is the job
2607 * of the sysfs code (initiated by rbd_bus_del_dev()).
2608 *
2609 * Set up and announce blkdev mapping.
2610 */
2611 rc = rbd_init_disk(rbd_dev);
2612 if (rc)
2613 goto err_out_bus;
2614
2615 rc = rbd_init_watch_dev(rbd_dev);
2616 if (rc)
2617 goto err_out_bus;
2618
2619 return count;
2620
2621 err_out_bus:
2622 /* this will also clean up rest of rbd_dev stuff */
2623
2624 rbd_bus_del_dev(rbd_dev);
2625 kfree(options);
2626 return rc;
2627
2628 err_out_blkdev:
2629 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2630 err_out_client:
2631 rbd_put_client(rbd_dev);
2632 err_put_id:
2633 if (rbd_dev->pool_name) {
2634 kfree(rbd_dev->snap_name);
2635 kfree(rbd_dev->header_name);
2636 kfree(rbd_dev->image_name);
2637 kfree(rbd_dev->pool_name);
2638 }
2639 rbd_dev_id_put(rbd_dev);
2640 err_nomem:
2641 kfree(rbd_dev);
2642 kfree(options);
2643
2644 dout("Error adding device %s\n", buf);
2645 module_put(THIS_MODULE);
2646
2647 return (ssize_t) rc;
2648 }
2649
2650 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2651 {
2652 struct list_head *tmp;
2653 struct rbd_device *rbd_dev;
2654
2655 spin_lock(&rbd_dev_list_lock);
2656 list_for_each(tmp, &rbd_dev_list) {
2657 rbd_dev = list_entry(tmp, struct rbd_device, node);
2658 if (rbd_dev->dev_id == dev_id) {
2659 spin_unlock(&rbd_dev_list_lock);
2660 return rbd_dev;
2661 }
2662 }
2663 spin_unlock(&rbd_dev_list_lock);
2664 return NULL;
2665 }
2666
2667 static void rbd_dev_release(struct device *dev)
2668 {
2669 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2670
2671 if (rbd_dev->watch_request) {
2672 struct ceph_client *client = rbd_dev->rbd_client->client;
2673
2674 ceph_osdc_unregister_linger_request(&client->osdc,
2675 rbd_dev->watch_request);
2676 }
2677 if (rbd_dev->watch_event)
2678 rbd_req_sync_unwatch(rbd_dev);
2679
2680 rbd_put_client(rbd_dev);
2681
2682 /* clean up and free blkdev */
2683 rbd_free_disk(rbd_dev);
2684 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2685
2686 /* done with the id, and with the rbd_dev */
2687 kfree(rbd_dev->snap_name);
2688 kfree(rbd_dev->header_name);
2689 kfree(rbd_dev->pool_name);
2690 kfree(rbd_dev->image_name);
2691 rbd_dev_id_put(rbd_dev);
2692 kfree(rbd_dev);
2693
2694 /* release module ref */
2695 module_put(THIS_MODULE);
2696 }
2697
2698 static ssize_t rbd_remove(struct bus_type *bus,
2699 const char *buf,
2700 size_t count)
2701 {
2702 struct rbd_device *rbd_dev = NULL;
2703 int target_id, rc;
2704 unsigned long ul;
2705 int ret = count;
2706
2707 rc = strict_strtoul(buf, 10, &ul);
2708 if (rc)
2709 return rc;
2710
2711 /* convert to int; abort if we lost anything in the conversion */
2712 target_id = (int) ul;
2713 if (target_id != ul)
2714 return -EINVAL;
2715
2716 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2717
2718 rbd_dev = __rbd_get_dev(target_id);
2719 if (!rbd_dev) {
2720 ret = -ENOENT;
2721 goto done;
2722 }
2723
2724 __rbd_remove_all_snaps(rbd_dev);
2725 rbd_bus_del_dev(rbd_dev);
2726
2727 done:
2728 mutex_unlock(&ctl_mutex);
2729
2730 return ret;
2731 }
2732
2733 static ssize_t rbd_snap_add(struct device *dev,
2734 struct device_attribute *attr,
2735 const char *buf,
2736 size_t count)
2737 {
2738 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2739 int ret;
2740 char *name = kmalloc(count + 1, GFP_KERNEL);
2741 if (!name)
2742 return -ENOMEM;
2743
2744 snprintf(name, count, "%s", buf);
2745
2746 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2747
2748 ret = rbd_header_add_snap(rbd_dev,
2749 name, GFP_KERNEL);
2750 if (ret < 0)
2751 goto err_unlock;
2752
2753 ret = __rbd_refresh_header(rbd_dev, NULL);
2754 if (ret < 0)
2755 goto err_unlock;
2756
2757 /* shouldn't hold ctl_mutex when notifying.. notify might
2758 trigger a watch callback that would need to get that mutex */
2759 mutex_unlock(&ctl_mutex);
2760
2761 /* make a best effort, don't error if failed */
2762 rbd_req_sync_notify(rbd_dev);
2763
2764 ret = count;
2765 kfree(name);
2766 return ret;
2767
2768 err_unlock:
2769 mutex_unlock(&ctl_mutex);
2770 kfree(name);
2771 return ret;
2772 }
2773
2774 /*
2775 * create control files in sysfs
2776 * /sys/bus/rbd/...
2777 */
2778 static int rbd_sysfs_init(void)
2779 {
2780 int ret;
2781
2782 ret = device_register(&rbd_root_dev);
2783 if (ret < 0)
2784 return ret;
2785
2786 ret = bus_register(&rbd_bus_type);
2787 if (ret < 0)
2788 device_unregister(&rbd_root_dev);
2789
2790 return ret;
2791 }
2792
2793 static void rbd_sysfs_cleanup(void)
2794 {
2795 bus_unregister(&rbd_bus_type);
2796 device_unregister(&rbd_root_dev);
2797 }
2798
2799 int __init rbd_init(void)
2800 {
2801 int rc;
2802
2803 rc = rbd_sysfs_init();
2804 if (rc)
2805 return rc;
2806 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2807 return 0;
2808 }
2809
2810 void __exit rbd_exit(void)
2811 {
2812 rbd_sysfs_cleanup();
2813 }
2814
2815 module_init(rbd_init);
2816 module_exit(rbd_exit);
2817
2818 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2819 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2820 MODULE_DESCRIPTION("rados block device");
2821
2822 /* following authorship retained from original osdblk.c */
2823 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2824
2825 MODULE_LICENSE("GPL");