]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame_incremental - drivers/block/rbd.c
rbd: simplify __rbd_remove_all_snaps()
[mirror_ubuntu-bionic-kernel.git] / drivers / block / rbd.c
... / ...
CommitLineData
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 For usage instructions, please refer to:
25
26 Documentation/ABI/testing/sysfs-bus-rbd
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
34#include <linux/parser.h>
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
44/*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
58#define RBD_MAX_SNAP_NAME_LEN 32
59#define RBD_MAX_OPT_LEN 1024
60
61#define RBD_SNAP_HEAD_NAME "-"
62
63/*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
69#define DEV_NAME_LEN 32
70#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
71
72#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74/*
75 * block device image metadata (in-memory version)
76 */
77struct rbd_image_header {
78 u64 image_size;
79 char *object_prefix;
80 __u8 obj_order;
81 __u8 crypt_type;
82 __u8 comp_type;
83 struct ceph_snap_context *snapc;
84 size_t snap_names_len;
85 u32 total_snaps;
86
87 char *snap_names;
88 u64 *snap_sizes;
89
90 u64 obj_version;
91};
92
93struct rbd_options {
94 int notify_timeout;
95};
96
97/*
98 * an instance of the client. multiple devices may share an rbd client.
99 */
100struct rbd_client {
101 struct ceph_client *client;
102 struct rbd_options *rbd_opts;
103 struct kref kref;
104 struct list_head node;
105};
106
107/*
108 * a request completion status
109 */
110struct rbd_req_status {
111 int done;
112 int rc;
113 u64 bytes;
114};
115
116/*
117 * a collection of requests
118 */
119struct rbd_req_coll {
120 int total;
121 int num_done;
122 struct kref kref;
123 struct rbd_req_status status[0];
124};
125
126/*
127 * a single io request
128 */
129struct rbd_request {
130 struct request *rq; /* blk layer request */
131 struct bio *bio; /* cloned bio */
132 struct page **pages; /* list of used pages */
133 u64 len;
134 int coll_index;
135 struct rbd_req_coll *coll;
136};
137
138struct rbd_snap {
139 struct device dev;
140 const char *name;
141 u64 size;
142 struct list_head node;
143 u64 id;
144};
145
146/*
147 * a single device
148 */
149struct rbd_device {
150 int id; /* blkdev unique id */
151
152 int major; /* blkdev assigned major */
153 struct gendisk *disk; /* blkdev's gendisk and rq */
154 struct request_queue *q;
155
156 struct rbd_client *rbd_client;
157
158 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160 spinlock_t lock; /* queue lock */
161
162 struct rbd_image_header header;
163 char *image_name;
164 size_t image_name_len;
165 char *header_name;
166 char *pool_name;
167 int pool_id;
168
169 struct ceph_osd_event *watch_event;
170 struct ceph_osd_request *watch_request;
171
172 /* protects updating the header */
173 struct rw_semaphore header_rwsem;
174 /* name of the snapshot this device reads from */
175 char *snap_name;
176 /* id of the snapshot this device reads from */
177 u64 snap_id; /* current snapshot id */
178 /* whether the snap_id this device reads from still exists */
179 bool snap_exists;
180 int read_only;
181
182 struct list_head node;
183
184 /* list of snapshots */
185 struct list_head snaps;
186
187 /* sysfs related */
188 struct device dev;
189};
190
191static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
192
193static LIST_HEAD(rbd_dev_list); /* devices */
194static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196static LIST_HEAD(rbd_client_list); /* clients */
197static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200static void rbd_dev_release(struct device *dev);
201static ssize_t rbd_snap_add(struct device *dev,
202 struct device_attribute *attr,
203 const char *buf,
204 size_t count);
205static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
206 struct rbd_snap *snap);
207
208static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209 size_t count);
210static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
211 size_t count);
212
213static struct bus_attribute rbd_bus_attrs[] = {
214 __ATTR(add, S_IWUSR, NULL, rbd_add),
215 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
216 __ATTR_NULL
217};
218
219static struct bus_type rbd_bus_type = {
220 .name = "rbd",
221 .bus_attrs = rbd_bus_attrs,
222};
223
224static void rbd_root_dev_release(struct device *dev)
225{
226}
227
228static struct device rbd_root_dev = {
229 .init_name = "rbd",
230 .release = rbd_root_dev_release,
231};
232
233
234static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
235{
236 return get_device(&rbd_dev->dev);
237}
238
239static void rbd_put_dev(struct rbd_device *rbd_dev)
240{
241 put_device(&rbd_dev->dev);
242}
243
244static int __rbd_refresh_header(struct rbd_device *rbd_dev);
245
246static int rbd_open(struct block_device *bdev, fmode_t mode)
247{
248 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
249
250 rbd_get_dev(rbd_dev);
251
252 set_device_ro(bdev, rbd_dev->read_only);
253
254 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
255 return -EROFS;
256
257 return 0;
258}
259
260static int rbd_release(struct gendisk *disk, fmode_t mode)
261{
262 struct rbd_device *rbd_dev = disk->private_data;
263
264 rbd_put_dev(rbd_dev);
265
266 return 0;
267}
268
269static const struct block_device_operations rbd_bd_ops = {
270 .owner = THIS_MODULE,
271 .open = rbd_open,
272 .release = rbd_release,
273};
274
275/*
276 * Initialize an rbd client instance.
277 * We own *ceph_opts.
278 */
279static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
280 struct rbd_options *rbd_opts)
281{
282 struct rbd_client *rbdc;
283 int ret = -ENOMEM;
284
285 dout("rbd_client_create\n");
286 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
287 if (!rbdc)
288 goto out_opt;
289
290 kref_init(&rbdc->kref);
291 INIT_LIST_HEAD(&rbdc->node);
292
293 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
294
295 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
296 if (IS_ERR(rbdc->client))
297 goto out_mutex;
298 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
299
300 ret = ceph_open_session(rbdc->client);
301 if (ret < 0)
302 goto out_err;
303
304 rbdc->rbd_opts = rbd_opts;
305
306 spin_lock(&rbd_client_list_lock);
307 list_add_tail(&rbdc->node, &rbd_client_list);
308 spin_unlock(&rbd_client_list_lock);
309
310 mutex_unlock(&ctl_mutex);
311
312 dout("rbd_client_create created %p\n", rbdc);
313 return rbdc;
314
315out_err:
316 ceph_destroy_client(rbdc->client);
317out_mutex:
318 mutex_unlock(&ctl_mutex);
319 kfree(rbdc);
320out_opt:
321 if (ceph_opts)
322 ceph_destroy_options(ceph_opts);
323 return ERR_PTR(ret);
324}
325
326/*
327 * Find a ceph client with specific addr and configuration.
328 */
329static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
330{
331 struct rbd_client *client_node;
332
333 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
334 return NULL;
335
336 list_for_each_entry(client_node, &rbd_client_list, node)
337 if (!ceph_compare_options(ceph_opts, client_node->client))
338 return client_node;
339 return NULL;
340}
341
342/*
343 * mount options
344 */
345enum {
346 Opt_notify_timeout,
347 Opt_last_int,
348 /* int args above */
349 Opt_last_string,
350 /* string args above */
351};
352
353static match_table_t rbd_opts_tokens = {
354 {Opt_notify_timeout, "notify_timeout=%d"},
355 /* int args above */
356 /* string args above */
357 {-1, NULL}
358};
359
360static int parse_rbd_opts_token(char *c, void *private)
361{
362 struct rbd_options *rbd_opts = private;
363 substring_t argstr[MAX_OPT_ARGS];
364 int token, intval, ret;
365
366 token = match_token(c, rbd_opts_tokens, argstr);
367 if (token < 0)
368 return -EINVAL;
369
370 if (token < Opt_last_int) {
371 ret = match_int(&argstr[0], &intval);
372 if (ret < 0) {
373 pr_err("bad mount option arg (not int) "
374 "at '%s'\n", c);
375 return ret;
376 }
377 dout("got int token %d val %d\n", token, intval);
378 } else if (token > Opt_last_int && token < Opt_last_string) {
379 dout("got string token %d val %s\n", token,
380 argstr[0].from);
381 } else {
382 dout("got token %d\n", token);
383 }
384
385 switch (token) {
386 case Opt_notify_timeout:
387 rbd_opts->notify_timeout = intval;
388 break;
389 default:
390 BUG_ON(token);
391 }
392 return 0;
393}
394
395/*
396 * Get a ceph client with specific addr and configuration, if one does
397 * not exist create it.
398 */
399static struct rbd_client *rbd_get_client(const char *mon_addr,
400 size_t mon_addr_len,
401 char *options)
402{
403 struct rbd_client *rbdc;
404 struct ceph_options *ceph_opts;
405 struct rbd_options *rbd_opts;
406
407 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
408 if (!rbd_opts)
409 return ERR_PTR(-ENOMEM);
410
411 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
412
413 ceph_opts = ceph_parse_options(options, mon_addr,
414 mon_addr + mon_addr_len,
415 parse_rbd_opts_token, rbd_opts);
416 if (IS_ERR(ceph_opts)) {
417 kfree(rbd_opts);
418 return ERR_CAST(ceph_opts);
419 }
420
421 spin_lock(&rbd_client_list_lock);
422 rbdc = __rbd_client_find(ceph_opts);
423 if (rbdc) {
424 /* using an existing client */
425 kref_get(&rbdc->kref);
426 spin_unlock(&rbd_client_list_lock);
427
428 ceph_destroy_options(ceph_opts);
429 kfree(rbd_opts);
430
431 return rbdc;
432 }
433 spin_unlock(&rbd_client_list_lock);
434
435 rbdc = rbd_client_create(ceph_opts, rbd_opts);
436
437 if (IS_ERR(rbdc))
438 kfree(rbd_opts);
439
440 return rbdc;
441}
442
443/*
444 * Destroy ceph client
445 *
446 * Caller must hold rbd_client_list_lock.
447 */
448static void rbd_client_release(struct kref *kref)
449{
450 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
451
452 dout("rbd_release_client %p\n", rbdc);
453 spin_lock(&rbd_client_list_lock);
454 list_del(&rbdc->node);
455 spin_unlock(&rbd_client_list_lock);
456
457 ceph_destroy_client(rbdc->client);
458 kfree(rbdc->rbd_opts);
459 kfree(rbdc);
460}
461
462/*
463 * Drop reference to ceph client node. If it's not referenced anymore, release
464 * it.
465 */
466static void rbd_put_client(struct rbd_device *rbd_dev)
467{
468 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
469 rbd_dev->rbd_client = NULL;
470}
471
472/*
473 * Destroy requests collection
474 */
475static void rbd_coll_release(struct kref *kref)
476{
477 struct rbd_req_coll *coll =
478 container_of(kref, struct rbd_req_coll, kref);
479
480 dout("rbd_coll_release %p\n", coll);
481 kfree(coll);
482}
483
484/*
485 * Create a new header structure, translate header format from the on-disk
486 * header.
487 */
488static int rbd_header_from_disk(struct rbd_image_header *header,
489 struct rbd_image_header_ondisk *ondisk,
490 u32 allocated_snaps,
491 gfp_t gfp_flags)
492{
493 u32 i, snap_count;
494
495 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
496 return -ENXIO;
497
498 snap_count = le32_to_cpu(ondisk->snap_count);
499 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
500 / sizeof (*ondisk))
501 return -EINVAL;
502 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
503 snap_count * sizeof(u64),
504 gfp_flags);
505 if (!header->snapc)
506 return -ENOMEM;
507
508 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
509 if (snap_count) {
510 header->snap_names = kmalloc(header->snap_names_len,
511 gfp_flags);
512 if (!header->snap_names)
513 goto err_snapc;
514 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
515 gfp_flags);
516 if (!header->snap_sizes)
517 goto err_names;
518 } else {
519 header->snap_names = NULL;
520 header->snap_sizes = NULL;
521 }
522
523 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
524 gfp_flags);
525 if (!header->object_prefix)
526 goto err_sizes;
527
528 memcpy(header->object_prefix, ondisk->block_name,
529 sizeof(ondisk->block_name));
530 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
531
532 header->image_size = le64_to_cpu(ondisk->image_size);
533 header->obj_order = ondisk->options.order;
534 header->crypt_type = ondisk->options.crypt_type;
535 header->comp_type = ondisk->options.comp_type;
536
537 atomic_set(&header->snapc->nref, 1);
538 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
539 header->snapc->num_snaps = snap_count;
540 header->total_snaps = snap_count;
541
542 if (snap_count && allocated_snaps == snap_count) {
543 for (i = 0; i < snap_count; i++) {
544 header->snapc->snaps[i] =
545 le64_to_cpu(ondisk->snaps[i].id);
546 header->snap_sizes[i] =
547 le64_to_cpu(ondisk->snaps[i].image_size);
548 }
549
550 /* copy snapshot names */
551 memcpy(header->snap_names, &ondisk->snaps[i],
552 header->snap_names_len);
553 }
554
555 return 0;
556
557err_sizes:
558 kfree(header->snap_sizes);
559err_names:
560 kfree(header->snap_names);
561err_snapc:
562 kfree(header->snapc);
563 return -ENOMEM;
564}
565
566static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
567 u64 *seq, u64 *size)
568{
569 int i;
570 char *p = header->snap_names;
571
572 for (i = 0; i < header->total_snaps; i++) {
573 if (!strcmp(snap_name, p)) {
574
575 /* Found it. Pass back its id and/or size */
576
577 if (seq)
578 *seq = header->snapc->snaps[i];
579 if (size)
580 *size = header->snap_sizes[i];
581 return i;
582 }
583 p += strlen(p) + 1; /* Skip ahead to the next name */
584 }
585 return -ENOENT;
586}
587
588static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
589{
590 int ret;
591
592 down_write(&rbd_dev->header_rwsem);
593
594 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
595 sizeof (RBD_SNAP_HEAD_NAME))) {
596 rbd_dev->snap_id = CEPH_NOSNAP;
597 rbd_dev->snap_exists = false;
598 rbd_dev->read_only = 0;
599 if (size)
600 *size = rbd_dev->header.image_size;
601 } else {
602 u64 snap_id = 0;
603
604 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
605 &snap_id, size);
606 if (ret < 0)
607 goto done;
608 rbd_dev->snap_id = snap_id;
609 rbd_dev->snap_exists = true;
610 rbd_dev->read_only = 1;
611 }
612
613 ret = 0;
614done:
615 up_write(&rbd_dev->header_rwsem);
616 return ret;
617}
618
619static void rbd_header_free(struct rbd_image_header *header)
620{
621 kfree(header->object_prefix);
622 kfree(header->snap_sizes);
623 kfree(header->snap_names);
624 ceph_put_snap_context(header->snapc);
625}
626
627/*
628 * get the actual striped segment name, offset and length
629 */
630static u64 rbd_get_segment(struct rbd_image_header *header,
631 const char *object_prefix,
632 u64 ofs, u64 len,
633 char *seg_name, u64 *segofs)
634{
635 u64 seg = ofs >> header->obj_order;
636
637 if (seg_name)
638 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
639 "%s.%012llx", object_prefix, seg);
640
641 ofs = ofs & ((1 << header->obj_order) - 1);
642 len = min_t(u64, len, (1 << header->obj_order) - ofs);
643
644 if (segofs)
645 *segofs = ofs;
646
647 return len;
648}
649
650static int rbd_get_num_segments(struct rbd_image_header *header,
651 u64 ofs, u64 len)
652{
653 u64 start_seg = ofs >> header->obj_order;
654 u64 end_seg = (ofs + len - 1) >> header->obj_order;
655 return end_seg - start_seg + 1;
656}
657
658/*
659 * returns the size of an object in the image
660 */
661static u64 rbd_obj_bytes(struct rbd_image_header *header)
662{
663 return 1 << header->obj_order;
664}
665
666/*
667 * bio helpers
668 */
669
670static void bio_chain_put(struct bio *chain)
671{
672 struct bio *tmp;
673
674 while (chain) {
675 tmp = chain;
676 chain = chain->bi_next;
677 bio_put(tmp);
678 }
679}
680
681/*
682 * zeros a bio chain, starting at specific offset
683 */
684static void zero_bio_chain(struct bio *chain, int start_ofs)
685{
686 struct bio_vec *bv;
687 unsigned long flags;
688 void *buf;
689 int i;
690 int pos = 0;
691
692 while (chain) {
693 bio_for_each_segment(bv, chain, i) {
694 if (pos + bv->bv_len > start_ofs) {
695 int remainder = max(start_ofs - pos, 0);
696 buf = bvec_kmap_irq(bv, &flags);
697 memset(buf + remainder, 0,
698 bv->bv_len - remainder);
699 bvec_kunmap_irq(buf, &flags);
700 }
701 pos += bv->bv_len;
702 }
703
704 chain = chain->bi_next;
705 }
706}
707
708/*
709 * bio_chain_clone - clone a chain of bios up to a certain length.
710 * might return a bio_pair that will need to be released.
711 */
712static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
713 struct bio_pair **bp,
714 int len, gfp_t gfpmask)
715{
716 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
717 int total = 0;
718
719 if (*bp) {
720 bio_pair_release(*bp);
721 *bp = NULL;
722 }
723
724 while (old_chain && (total < len)) {
725 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
726 if (!tmp)
727 goto err_out;
728
729 if (total + old_chain->bi_size > len) {
730 struct bio_pair *bp;
731
732 /*
733 * this split can only happen with a single paged bio,
734 * split_bio will BUG_ON if this is not the case
735 */
736 dout("bio_chain_clone split! total=%d remaining=%d"
737 "bi_size=%d\n",
738 (int)total, (int)len-total,
739 (int)old_chain->bi_size);
740
741 /* split the bio. We'll release it either in the next
742 call, or it will have to be released outside */
743 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
744 if (!bp)
745 goto err_out;
746
747 __bio_clone(tmp, &bp->bio1);
748
749 *next = &bp->bio2;
750 } else {
751 __bio_clone(tmp, old_chain);
752 *next = old_chain->bi_next;
753 }
754
755 tmp->bi_bdev = NULL;
756 gfpmask &= ~__GFP_WAIT;
757 tmp->bi_next = NULL;
758
759 if (!new_chain) {
760 new_chain = tail = tmp;
761 } else {
762 tail->bi_next = tmp;
763 tail = tmp;
764 }
765 old_chain = old_chain->bi_next;
766
767 total += tmp->bi_size;
768 }
769
770 BUG_ON(total < len);
771
772 if (tail)
773 tail->bi_next = NULL;
774
775 *old = old_chain;
776
777 return new_chain;
778
779err_out:
780 dout("bio_chain_clone with err\n");
781 bio_chain_put(new_chain);
782 return NULL;
783}
784
785/*
786 * helpers for osd request op vectors.
787 */
788static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
789 int num_ops,
790 int opcode,
791 u32 payload_len)
792{
793 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
794 GFP_NOIO);
795 if (!*ops)
796 return -ENOMEM;
797 (*ops)[0].op = opcode;
798 /*
799 * op extent offset and length will be set later on
800 * in calc_raw_layout()
801 */
802 (*ops)[0].payload_len = payload_len;
803 return 0;
804}
805
806static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
807{
808 kfree(ops);
809}
810
811static void rbd_coll_end_req_index(struct request *rq,
812 struct rbd_req_coll *coll,
813 int index,
814 int ret, u64 len)
815{
816 struct request_queue *q;
817 int min, max, i;
818
819 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
820 coll, index, ret, len);
821
822 if (!rq)
823 return;
824
825 if (!coll) {
826 blk_end_request(rq, ret, len);
827 return;
828 }
829
830 q = rq->q;
831
832 spin_lock_irq(q->queue_lock);
833 coll->status[index].done = 1;
834 coll->status[index].rc = ret;
835 coll->status[index].bytes = len;
836 max = min = coll->num_done;
837 while (max < coll->total && coll->status[max].done)
838 max++;
839
840 for (i = min; i<max; i++) {
841 __blk_end_request(rq, coll->status[i].rc,
842 coll->status[i].bytes);
843 coll->num_done++;
844 kref_put(&coll->kref, rbd_coll_release);
845 }
846 spin_unlock_irq(q->queue_lock);
847}
848
849static void rbd_coll_end_req(struct rbd_request *req,
850 int ret, u64 len)
851{
852 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
853}
854
855/*
856 * Send ceph osd request
857 */
858static int rbd_do_request(struct request *rq,
859 struct rbd_device *rbd_dev,
860 struct ceph_snap_context *snapc,
861 u64 snapid,
862 const char *object_name, u64 ofs, u64 len,
863 struct bio *bio,
864 struct page **pages,
865 int num_pages,
866 int flags,
867 struct ceph_osd_req_op *ops,
868 struct rbd_req_coll *coll,
869 int coll_index,
870 void (*rbd_cb)(struct ceph_osd_request *req,
871 struct ceph_msg *msg),
872 struct ceph_osd_request **linger_req,
873 u64 *ver)
874{
875 struct ceph_osd_request *req;
876 struct ceph_file_layout *layout;
877 int ret;
878 u64 bno;
879 struct timespec mtime = CURRENT_TIME;
880 struct rbd_request *req_data;
881 struct ceph_osd_request_head *reqhead;
882 struct ceph_osd_client *osdc;
883
884 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
885 if (!req_data) {
886 if (coll)
887 rbd_coll_end_req_index(rq, coll, coll_index,
888 -ENOMEM, len);
889 return -ENOMEM;
890 }
891
892 if (coll) {
893 req_data->coll = coll;
894 req_data->coll_index = coll_index;
895 }
896
897 dout("rbd_do_request object_name=%s ofs=%lld len=%lld\n",
898 object_name, len, ofs);
899
900 osdc = &rbd_dev->rbd_client->client->osdc;
901 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
902 false, GFP_NOIO, pages, bio);
903 if (!req) {
904 ret = -ENOMEM;
905 goto done_pages;
906 }
907
908 req->r_callback = rbd_cb;
909
910 req_data->rq = rq;
911 req_data->bio = bio;
912 req_data->pages = pages;
913 req_data->len = len;
914
915 req->r_priv = req_data;
916
917 reqhead = req->r_request->front.iov_base;
918 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
919
920 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
921 req->r_oid_len = strlen(req->r_oid);
922
923 layout = &req->r_file_layout;
924 memset(layout, 0, sizeof(*layout));
925 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
926 layout->fl_stripe_count = cpu_to_le32(1);
927 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
928 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
929 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
930 req, ops);
931
932 ceph_osdc_build_request(req, ofs, &len,
933 ops,
934 snapc,
935 &mtime,
936 req->r_oid, req->r_oid_len);
937
938 if (linger_req) {
939 ceph_osdc_set_request_linger(osdc, req);
940 *linger_req = req;
941 }
942
943 ret = ceph_osdc_start_request(osdc, req, false);
944 if (ret < 0)
945 goto done_err;
946
947 if (!rbd_cb) {
948 ret = ceph_osdc_wait_request(osdc, req);
949 if (ver)
950 *ver = le64_to_cpu(req->r_reassert_version.version);
951 dout("reassert_ver=%lld\n",
952 le64_to_cpu(req->r_reassert_version.version));
953 ceph_osdc_put_request(req);
954 }
955 return ret;
956
957done_err:
958 bio_chain_put(req_data->bio);
959 ceph_osdc_put_request(req);
960done_pages:
961 rbd_coll_end_req(req_data, ret, len);
962 kfree(req_data);
963 return ret;
964}
965
966/*
967 * Ceph osd op callback
968 */
969static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
970{
971 struct rbd_request *req_data = req->r_priv;
972 struct ceph_osd_reply_head *replyhead;
973 struct ceph_osd_op *op;
974 __s32 rc;
975 u64 bytes;
976 int read_op;
977
978 /* parse reply */
979 replyhead = msg->front.iov_base;
980 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
981 op = (void *)(replyhead + 1);
982 rc = le32_to_cpu(replyhead->result);
983 bytes = le64_to_cpu(op->extent.length);
984 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
985
986 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
987
988 if (rc == -ENOENT && read_op) {
989 zero_bio_chain(req_data->bio, 0);
990 rc = 0;
991 } else if (rc == 0 && read_op && bytes < req_data->len) {
992 zero_bio_chain(req_data->bio, bytes);
993 bytes = req_data->len;
994 }
995
996 rbd_coll_end_req(req_data, rc, bytes);
997
998 if (req_data->bio)
999 bio_chain_put(req_data->bio);
1000
1001 ceph_osdc_put_request(req);
1002 kfree(req_data);
1003}
1004
1005static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1006{
1007 ceph_osdc_put_request(req);
1008}
1009
1010/*
1011 * Do a synchronous ceph osd operation
1012 */
1013static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1014 struct ceph_snap_context *snapc,
1015 u64 snapid,
1016 int opcode,
1017 int flags,
1018 struct ceph_osd_req_op *orig_ops,
1019 const char *object_name,
1020 u64 ofs, u64 len,
1021 char *buf,
1022 struct ceph_osd_request **linger_req,
1023 u64 *ver)
1024{
1025 int ret;
1026 struct page **pages;
1027 int num_pages;
1028 struct ceph_osd_req_op *ops = orig_ops;
1029 u32 payload_len;
1030
1031 num_pages = calc_pages_for(ofs , len);
1032 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1033 if (IS_ERR(pages))
1034 return PTR_ERR(pages);
1035
1036 if (!orig_ops) {
1037 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1038 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1039 if (ret < 0)
1040 goto done;
1041
1042 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1043 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1044 if (ret < 0)
1045 goto done_ops;
1046 }
1047 }
1048
1049 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1050 object_name, ofs, len, NULL,
1051 pages, num_pages,
1052 flags,
1053 ops,
1054 NULL, 0,
1055 NULL,
1056 linger_req, ver);
1057 if (ret < 0)
1058 goto done_ops;
1059
1060 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1061 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1062
1063done_ops:
1064 if (!orig_ops)
1065 rbd_destroy_ops(ops);
1066done:
1067 ceph_release_page_vector(pages, num_pages);
1068 return ret;
1069}
1070
1071/*
1072 * Do an asynchronous ceph osd operation
1073 */
1074static int rbd_do_op(struct request *rq,
1075 struct rbd_device *rbd_dev,
1076 struct ceph_snap_context *snapc,
1077 u64 snapid,
1078 int opcode, int flags,
1079 u64 ofs, u64 len,
1080 struct bio *bio,
1081 struct rbd_req_coll *coll,
1082 int coll_index)
1083{
1084 char *seg_name;
1085 u64 seg_ofs;
1086 u64 seg_len;
1087 int ret;
1088 struct ceph_osd_req_op *ops;
1089 u32 payload_len;
1090
1091 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1092 if (!seg_name)
1093 return -ENOMEM;
1094
1095 seg_len = rbd_get_segment(&rbd_dev->header,
1096 rbd_dev->header.object_prefix,
1097 ofs, len,
1098 seg_name, &seg_ofs);
1099
1100 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1101
1102 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1103 if (ret < 0)
1104 goto done;
1105
1106 /* we've taken care of segment sizes earlier when we
1107 cloned the bios. We should never have a segment
1108 truncated at this point */
1109 BUG_ON(seg_len < len);
1110
1111 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1112 seg_name, seg_ofs, seg_len,
1113 bio,
1114 NULL, 0,
1115 flags,
1116 ops,
1117 coll, coll_index,
1118 rbd_req_cb, 0, NULL);
1119
1120 rbd_destroy_ops(ops);
1121done:
1122 kfree(seg_name);
1123 return ret;
1124}
1125
1126/*
1127 * Request async osd write
1128 */
1129static int rbd_req_write(struct request *rq,
1130 struct rbd_device *rbd_dev,
1131 struct ceph_snap_context *snapc,
1132 u64 ofs, u64 len,
1133 struct bio *bio,
1134 struct rbd_req_coll *coll,
1135 int coll_index)
1136{
1137 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1138 CEPH_OSD_OP_WRITE,
1139 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1140 ofs, len, bio, coll, coll_index);
1141}
1142
1143/*
1144 * Request async osd read
1145 */
1146static int rbd_req_read(struct request *rq,
1147 struct rbd_device *rbd_dev,
1148 u64 snapid,
1149 u64 ofs, u64 len,
1150 struct bio *bio,
1151 struct rbd_req_coll *coll,
1152 int coll_index)
1153{
1154 return rbd_do_op(rq, rbd_dev, NULL,
1155 snapid,
1156 CEPH_OSD_OP_READ,
1157 CEPH_OSD_FLAG_READ,
1158 ofs, len, bio, coll, coll_index);
1159}
1160
1161/*
1162 * Request sync osd read
1163 */
1164static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1165 struct ceph_snap_context *snapc,
1166 u64 snapid,
1167 const char *object_name,
1168 u64 ofs, u64 len,
1169 char *buf,
1170 u64 *ver)
1171{
1172 return rbd_req_sync_op(rbd_dev, NULL,
1173 snapid,
1174 CEPH_OSD_OP_READ,
1175 CEPH_OSD_FLAG_READ,
1176 NULL,
1177 object_name, ofs, len, buf, NULL, ver);
1178}
1179
1180/*
1181 * Request sync osd watch
1182 */
1183static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1184 u64 ver,
1185 u64 notify_id,
1186 const char *object_name)
1187{
1188 struct ceph_osd_req_op *ops;
1189 int ret;
1190
1191 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1192 if (ret < 0)
1193 return ret;
1194
1195 ops[0].watch.ver = cpu_to_le64(ver);
1196 ops[0].watch.cookie = notify_id;
1197 ops[0].watch.flag = 0;
1198
1199 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1200 object_name, 0, 0, NULL,
1201 NULL, 0,
1202 CEPH_OSD_FLAG_READ,
1203 ops,
1204 NULL, 0,
1205 rbd_simple_req_cb, 0, NULL);
1206
1207 rbd_destroy_ops(ops);
1208 return ret;
1209}
1210
1211static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1212{
1213 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1214 u64 hver;
1215 int rc;
1216
1217 if (!rbd_dev)
1218 return;
1219
1220 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n",
1221 rbd_dev->header_name, notify_id, (int) opcode);
1222 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1223 rc = __rbd_refresh_header(rbd_dev);
1224 hver = rbd_dev->header.obj_version;
1225 mutex_unlock(&ctl_mutex);
1226 if (rc)
1227 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1228 " update snaps: %d\n", rbd_dev->major, rc);
1229
1230 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id, rbd_dev->header_name);
1231}
1232
1233/*
1234 * Request sync osd watch
1235 */
1236static int rbd_req_sync_watch(struct rbd_device *rbd_dev,
1237 const char *object_name,
1238 u64 ver)
1239{
1240 struct ceph_osd_req_op *ops;
1241 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1242
1243 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1244 if (ret < 0)
1245 return ret;
1246
1247 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1248 (void *)rbd_dev, &rbd_dev->watch_event);
1249 if (ret < 0)
1250 goto fail;
1251
1252 ops[0].watch.ver = cpu_to_le64(ver);
1253 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1254 ops[0].watch.flag = 1;
1255
1256 ret = rbd_req_sync_op(rbd_dev, NULL,
1257 CEPH_NOSNAP,
1258 0,
1259 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1260 ops,
1261 object_name, 0, 0, NULL,
1262 &rbd_dev->watch_request, NULL);
1263
1264 if (ret < 0)
1265 goto fail_event;
1266
1267 rbd_destroy_ops(ops);
1268 return 0;
1269
1270fail_event:
1271 ceph_osdc_cancel_event(rbd_dev->watch_event);
1272 rbd_dev->watch_event = NULL;
1273fail:
1274 rbd_destroy_ops(ops);
1275 return ret;
1276}
1277
1278/*
1279 * Request sync osd unwatch
1280 */
1281static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
1282 const char *object_name)
1283{
1284 struct ceph_osd_req_op *ops;
1285
1286 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1287 if (ret < 0)
1288 return ret;
1289
1290 ops[0].watch.ver = 0;
1291 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1292 ops[0].watch.flag = 0;
1293
1294 ret = rbd_req_sync_op(rbd_dev, NULL,
1295 CEPH_NOSNAP,
1296 0,
1297 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1298 ops,
1299 object_name, 0, 0, NULL, NULL, NULL);
1300
1301 rbd_destroy_ops(ops);
1302 ceph_osdc_cancel_event(rbd_dev->watch_event);
1303 rbd_dev->watch_event = NULL;
1304 return ret;
1305}
1306
1307struct rbd_notify_info {
1308 struct rbd_device *rbd_dev;
1309};
1310
1311static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1312{
1313 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1314 if (!rbd_dev)
1315 return;
1316
1317 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n",
1318 rbd_dev->header_name,
1319 notify_id, (int)opcode);
1320}
1321
1322/*
1323 * Request sync osd notify
1324 */
1325static int rbd_req_sync_notify(struct rbd_device *rbd_dev,
1326 const char *object_name)
1327{
1328 struct ceph_osd_req_op *ops;
1329 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1330 struct ceph_osd_event *event;
1331 struct rbd_notify_info info;
1332 int payload_len = sizeof(u32) + sizeof(u32);
1333 int ret;
1334
1335 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1336 if (ret < 0)
1337 return ret;
1338
1339 info.rbd_dev = rbd_dev;
1340
1341 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1342 (void *)&info, &event);
1343 if (ret < 0)
1344 goto fail;
1345
1346 ops[0].watch.ver = 1;
1347 ops[0].watch.flag = 1;
1348 ops[0].watch.cookie = event->cookie;
1349 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1350 ops[0].watch.timeout = 12;
1351
1352 ret = rbd_req_sync_op(rbd_dev, NULL,
1353 CEPH_NOSNAP,
1354 0,
1355 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1356 ops,
1357 object_name, 0, 0, NULL, NULL, NULL);
1358 if (ret < 0)
1359 goto fail_event;
1360
1361 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1362 dout("ceph_osdc_wait_event returned %d\n", ret);
1363 rbd_destroy_ops(ops);
1364 return 0;
1365
1366fail_event:
1367 ceph_osdc_cancel_event(event);
1368fail:
1369 rbd_destroy_ops(ops);
1370 return ret;
1371}
1372
1373/*
1374 * Request sync osd read
1375 */
1376static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1377 const char *object_name,
1378 const char *class_name,
1379 const char *method_name,
1380 const char *data,
1381 int len,
1382 u64 *ver)
1383{
1384 struct ceph_osd_req_op *ops;
1385 int class_name_len = strlen(class_name);
1386 int method_name_len = strlen(method_name);
1387 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1388 class_name_len + method_name_len + len);
1389 if (ret < 0)
1390 return ret;
1391
1392 ops[0].cls.class_name = class_name;
1393 ops[0].cls.class_len = (__u8) class_name_len;
1394 ops[0].cls.method_name = method_name;
1395 ops[0].cls.method_len = (__u8) method_name_len;
1396 ops[0].cls.argc = 0;
1397 ops[0].cls.indata = data;
1398 ops[0].cls.indata_len = len;
1399
1400 ret = rbd_req_sync_op(rbd_dev, NULL,
1401 CEPH_NOSNAP,
1402 0,
1403 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1404 ops,
1405 object_name, 0, 0, NULL, NULL, ver);
1406
1407 rbd_destroy_ops(ops);
1408
1409 dout("cls_exec returned %d\n", ret);
1410 return ret;
1411}
1412
1413static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1414{
1415 struct rbd_req_coll *coll =
1416 kzalloc(sizeof(struct rbd_req_coll) +
1417 sizeof(struct rbd_req_status) * num_reqs,
1418 GFP_ATOMIC);
1419
1420 if (!coll)
1421 return NULL;
1422 coll->total = num_reqs;
1423 kref_init(&coll->kref);
1424 return coll;
1425}
1426
1427/*
1428 * block device queue callback
1429 */
1430static void rbd_rq_fn(struct request_queue *q)
1431{
1432 struct rbd_device *rbd_dev = q->queuedata;
1433 struct request *rq;
1434 struct bio_pair *bp = NULL;
1435
1436 while ((rq = blk_fetch_request(q))) {
1437 struct bio *bio;
1438 struct bio *rq_bio, *next_bio = NULL;
1439 bool do_write;
1440 int size, op_size = 0;
1441 u64 ofs;
1442 int num_segs, cur_seg = 0;
1443 struct rbd_req_coll *coll;
1444 struct ceph_snap_context *snapc;
1445
1446 /* peek at request from block layer */
1447 if (!rq)
1448 break;
1449
1450 dout("fetched request\n");
1451
1452 /* filter out block requests we don't understand */
1453 if ((rq->cmd_type != REQ_TYPE_FS)) {
1454 __blk_end_request_all(rq, 0);
1455 continue;
1456 }
1457
1458 /* deduce our operation (read, write) */
1459 do_write = (rq_data_dir(rq) == WRITE);
1460
1461 size = blk_rq_bytes(rq);
1462 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1463 rq_bio = rq->bio;
1464 if (do_write && rbd_dev->read_only) {
1465 __blk_end_request_all(rq, -EROFS);
1466 continue;
1467 }
1468
1469 spin_unlock_irq(q->queue_lock);
1470
1471 down_read(&rbd_dev->header_rwsem);
1472
1473 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1474 up_read(&rbd_dev->header_rwsem);
1475 dout("request for non-existent snapshot");
1476 spin_lock_irq(q->queue_lock);
1477 __blk_end_request_all(rq, -ENXIO);
1478 continue;
1479 }
1480
1481 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1482
1483 up_read(&rbd_dev->header_rwsem);
1484
1485 dout("%s 0x%x bytes at 0x%llx\n",
1486 do_write ? "write" : "read",
1487 size, blk_rq_pos(rq) * SECTOR_SIZE);
1488
1489 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1490 coll = rbd_alloc_coll(num_segs);
1491 if (!coll) {
1492 spin_lock_irq(q->queue_lock);
1493 __blk_end_request_all(rq, -ENOMEM);
1494 ceph_put_snap_context(snapc);
1495 continue;
1496 }
1497
1498 do {
1499 /* a bio clone to be passed down to OSD req */
1500 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1501 op_size = rbd_get_segment(&rbd_dev->header,
1502 rbd_dev->header.object_prefix,
1503 ofs, size,
1504 NULL, NULL);
1505 kref_get(&coll->kref);
1506 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1507 op_size, GFP_ATOMIC);
1508 if (!bio) {
1509 rbd_coll_end_req_index(rq, coll, cur_seg,
1510 -ENOMEM, op_size);
1511 goto next_seg;
1512 }
1513
1514
1515 /* init OSD command: write or read */
1516 if (do_write)
1517 rbd_req_write(rq, rbd_dev,
1518 snapc,
1519 ofs,
1520 op_size, bio,
1521 coll, cur_seg);
1522 else
1523 rbd_req_read(rq, rbd_dev,
1524 rbd_dev->snap_id,
1525 ofs,
1526 op_size, bio,
1527 coll, cur_seg);
1528
1529next_seg:
1530 size -= op_size;
1531 ofs += op_size;
1532
1533 cur_seg++;
1534 rq_bio = next_bio;
1535 } while (size > 0);
1536 kref_put(&coll->kref, rbd_coll_release);
1537
1538 if (bp)
1539 bio_pair_release(bp);
1540 spin_lock_irq(q->queue_lock);
1541
1542 ceph_put_snap_context(snapc);
1543 }
1544}
1545
1546/*
1547 * a queue callback. Makes sure that we don't create a bio that spans across
1548 * multiple osd objects. One exception would be with a single page bios,
1549 * which we handle later at bio_chain_clone
1550 */
1551static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1552 struct bio_vec *bvec)
1553{
1554 struct rbd_device *rbd_dev = q->queuedata;
1555 unsigned int chunk_sectors;
1556 sector_t sector;
1557 unsigned int bio_sectors;
1558 int max;
1559
1560 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1561 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1562 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1563
1564 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1565 + bio_sectors)) << SECTOR_SHIFT;
1566 if (max < 0)
1567 max = 0; /* bio_add cannot handle a negative return */
1568 if (max <= bvec->bv_len && bio_sectors == 0)
1569 return bvec->bv_len;
1570 return max;
1571}
1572
1573static void rbd_free_disk(struct rbd_device *rbd_dev)
1574{
1575 struct gendisk *disk = rbd_dev->disk;
1576
1577 if (!disk)
1578 return;
1579
1580 rbd_header_free(&rbd_dev->header);
1581
1582 if (disk->flags & GENHD_FL_UP)
1583 del_gendisk(disk);
1584 if (disk->queue)
1585 blk_cleanup_queue(disk->queue);
1586 put_disk(disk);
1587}
1588
1589/*
1590 * reload the ondisk the header
1591 */
1592static int rbd_read_header(struct rbd_device *rbd_dev,
1593 struct rbd_image_header *header)
1594{
1595 ssize_t rc;
1596 struct rbd_image_header_ondisk *dh;
1597 u32 snap_count = 0;
1598 u64 ver;
1599 size_t len;
1600
1601 /*
1602 * First reads the fixed-size header to determine the number
1603 * of snapshots, then re-reads it, along with all snapshot
1604 * records as well as their stored names.
1605 */
1606 len = sizeof (*dh);
1607 while (1) {
1608 dh = kmalloc(len, GFP_KERNEL);
1609 if (!dh)
1610 return -ENOMEM;
1611
1612 rc = rbd_req_sync_read(rbd_dev,
1613 NULL, CEPH_NOSNAP,
1614 rbd_dev->header_name,
1615 0, len,
1616 (char *)dh, &ver);
1617 if (rc < 0)
1618 goto out_dh;
1619
1620 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1621 if (rc < 0) {
1622 if (rc == -ENXIO)
1623 pr_warning("unrecognized header format"
1624 " for image %s\n",
1625 rbd_dev->image_name);
1626 goto out_dh;
1627 }
1628
1629 if (snap_count == header->total_snaps)
1630 break;
1631
1632 snap_count = header->total_snaps;
1633 len = sizeof (*dh) +
1634 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1635 header->snap_names_len;
1636
1637 rbd_header_free(header);
1638 kfree(dh);
1639 }
1640 header->obj_version = ver;
1641
1642out_dh:
1643 kfree(dh);
1644 return rc;
1645}
1646
1647/*
1648 * create a snapshot
1649 */
1650static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1651 const char *snap_name,
1652 gfp_t gfp_flags)
1653{
1654 int name_len = strlen(snap_name);
1655 u64 new_snapid;
1656 int ret;
1657 void *data, *p, *e;
1658 u64 ver;
1659 struct ceph_mon_client *monc;
1660
1661 /* we should create a snapshot only if we're pointing at the head */
1662 if (rbd_dev->snap_id != CEPH_NOSNAP)
1663 return -EINVAL;
1664
1665 monc = &rbd_dev->rbd_client->client->monc;
1666 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1667 dout("created snapid=%lld\n", new_snapid);
1668 if (ret < 0)
1669 return ret;
1670
1671 data = kmalloc(name_len + 16, gfp_flags);
1672 if (!data)
1673 return -ENOMEM;
1674
1675 p = data;
1676 e = data + name_len + 16;
1677
1678 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1679 ceph_encode_64_safe(&p, e, new_snapid, bad);
1680
1681 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1682 "rbd", "snap_add",
1683 data, p - data, &ver);
1684
1685 kfree(data);
1686
1687 return ret < 0 ? ret : 0;
1688bad:
1689 return -ERANGE;
1690}
1691
1692static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1693{
1694 struct rbd_snap *snap;
1695 struct rbd_snap *next;
1696
1697 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1698 __rbd_remove_snap_dev(rbd_dev, snap);
1699}
1700
1701/*
1702 * only read the first part of the ondisk header, without the snaps info
1703 */
1704static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1705{
1706 int ret;
1707 struct rbd_image_header h;
1708
1709 ret = rbd_read_header(rbd_dev, &h);
1710 if (ret < 0)
1711 return ret;
1712
1713 down_write(&rbd_dev->header_rwsem);
1714
1715 /* resized? */
1716 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1717 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1718
1719 dout("setting size to %llu sectors", (unsigned long long) size);
1720 set_capacity(rbd_dev->disk, size);
1721 }
1722
1723 /* rbd_dev->header.object_prefix shouldn't change */
1724 kfree(rbd_dev->header.snap_sizes);
1725 kfree(rbd_dev->header.snap_names);
1726 /* osd requests may still refer to snapc */
1727 ceph_put_snap_context(rbd_dev->header.snapc);
1728
1729 rbd_dev->header.obj_version = h.obj_version;
1730 rbd_dev->header.image_size = h.image_size;
1731 rbd_dev->header.total_snaps = h.total_snaps;
1732 rbd_dev->header.snapc = h.snapc;
1733 rbd_dev->header.snap_names = h.snap_names;
1734 rbd_dev->header.snap_names_len = h.snap_names_len;
1735 rbd_dev->header.snap_sizes = h.snap_sizes;
1736 /* Free the extra copy of the object prefix */
1737 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1738 kfree(h.object_prefix);
1739
1740 ret = __rbd_init_snaps_header(rbd_dev);
1741
1742 up_write(&rbd_dev->header_rwsem);
1743
1744 return ret;
1745}
1746
1747static int rbd_init_disk(struct rbd_device *rbd_dev)
1748{
1749 struct gendisk *disk;
1750 struct request_queue *q;
1751 int rc;
1752 u64 segment_size;
1753 u64 total_size = 0;
1754
1755 /* contact OSD, request size info about the object being mapped */
1756 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1757 if (rc)
1758 return rc;
1759
1760 /* no need to lock here, as rbd_dev is not registered yet */
1761 rc = __rbd_init_snaps_header(rbd_dev);
1762 if (rc)
1763 return rc;
1764
1765 rc = rbd_header_set_snap(rbd_dev, &total_size);
1766 if (rc)
1767 return rc;
1768
1769 /* create gendisk info */
1770 rc = -ENOMEM;
1771 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1772 if (!disk)
1773 goto out;
1774
1775 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1776 rbd_dev->id);
1777 disk->major = rbd_dev->major;
1778 disk->first_minor = 0;
1779 disk->fops = &rbd_bd_ops;
1780 disk->private_data = rbd_dev;
1781
1782 /* init rq */
1783 rc = -ENOMEM;
1784 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1785 if (!q)
1786 goto out_disk;
1787
1788 /* We use the default size, but let's be explicit about it. */
1789 blk_queue_physical_block_size(q, SECTOR_SIZE);
1790
1791 /* set io sizes to object size */
1792 segment_size = rbd_obj_bytes(&rbd_dev->header);
1793 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1794 blk_queue_max_segment_size(q, segment_size);
1795 blk_queue_io_min(q, segment_size);
1796 blk_queue_io_opt(q, segment_size);
1797
1798 blk_queue_merge_bvec(q, rbd_merge_bvec);
1799 disk->queue = q;
1800
1801 q->queuedata = rbd_dev;
1802
1803 rbd_dev->disk = disk;
1804 rbd_dev->q = q;
1805
1806 /* finally, announce the disk to the world */
1807 set_capacity(disk, total_size / SECTOR_SIZE);
1808 add_disk(disk);
1809
1810 pr_info("%s: added with size 0x%llx\n",
1811 disk->disk_name, (unsigned long long)total_size);
1812 return 0;
1813
1814out_disk:
1815 put_disk(disk);
1816out:
1817 return rc;
1818}
1819
1820/*
1821 sysfs
1822*/
1823
1824static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1825{
1826 return container_of(dev, struct rbd_device, dev);
1827}
1828
1829static ssize_t rbd_size_show(struct device *dev,
1830 struct device_attribute *attr, char *buf)
1831{
1832 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1833 sector_t size;
1834
1835 down_read(&rbd_dev->header_rwsem);
1836 size = get_capacity(rbd_dev->disk);
1837 up_read(&rbd_dev->header_rwsem);
1838
1839 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1840}
1841
1842static ssize_t rbd_major_show(struct device *dev,
1843 struct device_attribute *attr, char *buf)
1844{
1845 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1846
1847 return sprintf(buf, "%d\n", rbd_dev->major);
1848}
1849
1850static ssize_t rbd_client_id_show(struct device *dev,
1851 struct device_attribute *attr, char *buf)
1852{
1853 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1854
1855 return sprintf(buf, "client%lld\n",
1856 ceph_client_id(rbd_dev->rbd_client->client));
1857}
1858
1859static ssize_t rbd_pool_show(struct device *dev,
1860 struct device_attribute *attr, char *buf)
1861{
1862 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1863
1864 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1865}
1866
1867static ssize_t rbd_pool_id_show(struct device *dev,
1868 struct device_attribute *attr, char *buf)
1869{
1870 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1871
1872 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1873}
1874
1875static ssize_t rbd_name_show(struct device *dev,
1876 struct device_attribute *attr, char *buf)
1877{
1878 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1879
1880 return sprintf(buf, "%s\n", rbd_dev->image_name);
1881}
1882
1883static ssize_t rbd_snap_show(struct device *dev,
1884 struct device_attribute *attr,
1885 char *buf)
1886{
1887 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1888
1889 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1890}
1891
1892static ssize_t rbd_image_refresh(struct device *dev,
1893 struct device_attribute *attr,
1894 const char *buf,
1895 size_t size)
1896{
1897 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1898 int rc;
1899 int ret = size;
1900
1901 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1902
1903 rc = __rbd_refresh_header(rbd_dev);
1904 if (rc < 0)
1905 ret = rc;
1906
1907 mutex_unlock(&ctl_mutex);
1908 return ret;
1909}
1910
1911static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1912static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1913static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1914static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1915static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1916static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1917static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1918static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1919static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1920
1921static struct attribute *rbd_attrs[] = {
1922 &dev_attr_size.attr,
1923 &dev_attr_major.attr,
1924 &dev_attr_client_id.attr,
1925 &dev_attr_pool.attr,
1926 &dev_attr_pool_id.attr,
1927 &dev_attr_name.attr,
1928 &dev_attr_current_snap.attr,
1929 &dev_attr_refresh.attr,
1930 &dev_attr_create_snap.attr,
1931 NULL
1932};
1933
1934static struct attribute_group rbd_attr_group = {
1935 .attrs = rbd_attrs,
1936};
1937
1938static const struct attribute_group *rbd_attr_groups[] = {
1939 &rbd_attr_group,
1940 NULL
1941};
1942
1943static void rbd_sysfs_dev_release(struct device *dev)
1944{
1945}
1946
1947static struct device_type rbd_device_type = {
1948 .name = "rbd",
1949 .groups = rbd_attr_groups,
1950 .release = rbd_sysfs_dev_release,
1951};
1952
1953
1954/*
1955 sysfs - snapshots
1956*/
1957
1958static ssize_t rbd_snap_size_show(struct device *dev,
1959 struct device_attribute *attr,
1960 char *buf)
1961{
1962 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1963
1964 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1965}
1966
1967static ssize_t rbd_snap_id_show(struct device *dev,
1968 struct device_attribute *attr,
1969 char *buf)
1970{
1971 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1972
1973 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1974}
1975
1976static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1977static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1978
1979static struct attribute *rbd_snap_attrs[] = {
1980 &dev_attr_snap_size.attr,
1981 &dev_attr_snap_id.attr,
1982 NULL,
1983};
1984
1985static struct attribute_group rbd_snap_attr_group = {
1986 .attrs = rbd_snap_attrs,
1987};
1988
1989static void rbd_snap_dev_release(struct device *dev)
1990{
1991 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1992 kfree(snap->name);
1993 kfree(snap);
1994}
1995
1996static const struct attribute_group *rbd_snap_attr_groups[] = {
1997 &rbd_snap_attr_group,
1998 NULL
1999};
2000
2001static struct device_type rbd_snap_device_type = {
2002 .groups = rbd_snap_attr_groups,
2003 .release = rbd_snap_dev_release,
2004};
2005
2006static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2007 struct rbd_snap *snap)
2008{
2009 list_del(&snap->node);
2010 device_unregister(&snap->dev);
2011}
2012
2013static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2014 struct rbd_snap *snap,
2015 struct device *parent)
2016{
2017 struct device *dev = &snap->dev;
2018 int ret;
2019
2020 dev->type = &rbd_snap_device_type;
2021 dev->parent = parent;
2022 dev->release = rbd_snap_dev_release;
2023 dev_set_name(dev, "snap_%s", snap->name);
2024 ret = device_register(dev);
2025
2026 return ret;
2027}
2028
2029static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2030 int i, const char *name,
2031 struct rbd_snap **snapp)
2032{
2033 int ret;
2034 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2035 if (!snap)
2036 return -ENOMEM;
2037 snap->name = kstrdup(name, GFP_KERNEL);
2038 snap->size = rbd_dev->header.snap_sizes[i];
2039 snap->id = rbd_dev->header.snapc->snaps[i];
2040 if (device_is_registered(&rbd_dev->dev)) {
2041 ret = rbd_register_snap_dev(rbd_dev, snap,
2042 &rbd_dev->dev);
2043 if (ret < 0)
2044 goto err;
2045 }
2046 *snapp = snap;
2047 return 0;
2048err:
2049 kfree(snap->name);
2050 kfree(snap);
2051 return ret;
2052}
2053
2054/*
2055 * search for the previous snap in a null delimited string list
2056 */
2057const char *rbd_prev_snap_name(const char *name, const char *start)
2058{
2059 if (name < start + 2)
2060 return NULL;
2061
2062 name -= 2;
2063 while (*name) {
2064 if (name == start)
2065 return start;
2066 name--;
2067 }
2068 return name + 1;
2069}
2070
2071/*
2072 * compare the old list of snapshots that we have to what's in the header
2073 * and update it accordingly. Note that the header holds the snapshots
2074 * in a reverse order (from newest to oldest) and we need to go from
2075 * older to new so that we don't get a duplicate snap name when
2076 * doing the process (e.g., removed snapshot and recreated a new
2077 * one with the same name.
2078 */
2079static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2080{
2081 const char *name, *first_name;
2082 int i = rbd_dev->header.total_snaps;
2083 struct rbd_snap *snap, *old_snap = NULL;
2084 int ret;
2085 struct list_head *p, *n;
2086
2087 first_name = rbd_dev->header.snap_names;
2088 name = first_name + rbd_dev->header.snap_names_len;
2089
2090 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2091 u64 cur_id;
2092
2093 old_snap = list_entry(p, struct rbd_snap, node);
2094
2095 if (i)
2096 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2097
2098 if (!i || old_snap->id < cur_id) {
2099 /*
2100 * old_snap->id was skipped, thus was
2101 * removed. If this rbd_dev is mapped to
2102 * the removed snapshot, record that it no
2103 * longer exists, to prevent further I/O.
2104 */
2105 if (rbd_dev->snap_id == old_snap->id)
2106 rbd_dev->snap_exists = false;
2107 __rbd_remove_snap_dev(rbd_dev, old_snap);
2108 continue;
2109 }
2110 if (old_snap->id == cur_id) {
2111 /* we have this snapshot already */
2112 i--;
2113 name = rbd_prev_snap_name(name, first_name);
2114 continue;
2115 }
2116 for (; i > 0;
2117 i--, name = rbd_prev_snap_name(name, first_name)) {
2118 if (!name) {
2119 WARN_ON(1);
2120 return -EINVAL;
2121 }
2122 cur_id = rbd_dev->header.snapc->snaps[i];
2123 /* snapshot removal? handle it above */
2124 if (cur_id >= old_snap->id)
2125 break;
2126 /* a new snapshot */
2127 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2128 if (ret < 0)
2129 return ret;
2130
2131 /* note that we add it backward so using n and not p */
2132 list_add(&snap->node, n);
2133 p = &snap->node;
2134 }
2135 }
2136 /* we're done going over the old snap list, just add what's left */
2137 for (; i > 0; i--) {
2138 name = rbd_prev_snap_name(name, first_name);
2139 if (!name) {
2140 WARN_ON(1);
2141 return -EINVAL;
2142 }
2143 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2144 if (ret < 0)
2145 return ret;
2146 list_add(&snap->node, &rbd_dev->snaps);
2147 }
2148
2149 return 0;
2150}
2151
2152static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2153{
2154 int ret;
2155 struct device *dev;
2156 struct rbd_snap *snap;
2157
2158 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2159 dev = &rbd_dev->dev;
2160
2161 dev->bus = &rbd_bus_type;
2162 dev->type = &rbd_device_type;
2163 dev->parent = &rbd_root_dev;
2164 dev->release = rbd_dev_release;
2165 dev_set_name(dev, "%d", rbd_dev->id);
2166 ret = device_register(dev);
2167 if (ret < 0)
2168 goto out;
2169
2170 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2171 ret = rbd_register_snap_dev(rbd_dev, snap,
2172 &rbd_dev->dev);
2173 if (ret < 0)
2174 break;
2175 }
2176out:
2177 mutex_unlock(&ctl_mutex);
2178 return ret;
2179}
2180
2181static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2182{
2183 device_unregister(&rbd_dev->dev);
2184}
2185
2186static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2187{
2188 int ret, rc;
2189
2190 do {
2191 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->header_name,
2192 rbd_dev->header.obj_version);
2193 if (ret == -ERANGE) {
2194 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2195 rc = __rbd_refresh_header(rbd_dev);
2196 mutex_unlock(&ctl_mutex);
2197 if (rc < 0)
2198 return rc;
2199 }
2200 } while (ret == -ERANGE);
2201
2202 return ret;
2203}
2204
2205static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2206
2207/*
2208 * Get a unique rbd identifier for the given new rbd_dev, and add
2209 * the rbd_dev to the global list. The minimum rbd id is 1.
2210 */
2211static void rbd_id_get(struct rbd_device *rbd_dev)
2212{
2213 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2214
2215 spin_lock(&rbd_dev_list_lock);
2216 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2217 spin_unlock(&rbd_dev_list_lock);
2218}
2219
2220/*
2221 * Remove an rbd_dev from the global list, and record that its
2222 * identifier is no longer in use.
2223 */
2224static void rbd_id_put(struct rbd_device *rbd_dev)
2225{
2226 struct list_head *tmp;
2227 int rbd_id = rbd_dev->id;
2228 int max_id;
2229
2230 BUG_ON(rbd_id < 1);
2231
2232 spin_lock(&rbd_dev_list_lock);
2233 list_del_init(&rbd_dev->node);
2234
2235 /*
2236 * If the id being "put" is not the current maximum, there
2237 * is nothing special we need to do.
2238 */
2239 if (rbd_id != atomic64_read(&rbd_id_max)) {
2240 spin_unlock(&rbd_dev_list_lock);
2241 return;
2242 }
2243
2244 /*
2245 * We need to update the current maximum id. Search the
2246 * list to find out what it is. We're more likely to find
2247 * the maximum at the end, so search the list backward.
2248 */
2249 max_id = 0;
2250 list_for_each_prev(tmp, &rbd_dev_list) {
2251 struct rbd_device *rbd_dev;
2252
2253 rbd_dev = list_entry(tmp, struct rbd_device, node);
2254 if (rbd_id > max_id)
2255 max_id = rbd_id;
2256 }
2257 spin_unlock(&rbd_dev_list_lock);
2258
2259 /*
2260 * The max id could have been updated by rbd_id_get(), in
2261 * which case it now accurately reflects the new maximum.
2262 * Be careful not to overwrite the maximum value in that
2263 * case.
2264 */
2265 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2266}
2267
2268/*
2269 * Skips over white space at *buf, and updates *buf to point to the
2270 * first found non-space character (if any). Returns the length of
2271 * the token (string of non-white space characters) found. Note
2272 * that *buf must be terminated with '\0'.
2273 */
2274static inline size_t next_token(const char **buf)
2275{
2276 /*
2277 * These are the characters that produce nonzero for
2278 * isspace() in the "C" and "POSIX" locales.
2279 */
2280 const char *spaces = " \f\n\r\t\v";
2281
2282 *buf += strspn(*buf, spaces); /* Find start of token */
2283
2284 return strcspn(*buf, spaces); /* Return token length */
2285}
2286
2287/*
2288 * Finds the next token in *buf, and if the provided token buffer is
2289 * big enough, copies the found token into it. The result, if
2290 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2291 * must be terminated with '\0' on entry.
2292 *
2293 * Returns the length of the token found (not including the '\0').
2294 * Return value will be 0 if no token is found, and it will be >=
2295 * token_size if the token would not fit.
2296 *
2297 * The *buf pointer will be updated to point beyond the end of the
2298 * found token. Note that this occurs even if the token buffer is
2299 * too small to hold it.
2300 */
2301static inline size_t copy_token(const char **buf,
2302 char *token,
2303 size_t token_size)
2304{
2305 size_t len;
2306
2307 len = next_token(buf);
2308 if (len < token_size) {
2309 memcpy(token, *buf, len);
2310 *(token + len) = '\0';
2311 }
2312 *buf += len;
2313
2314 return len;
2315}
2316
2317/*
2318 * Finds the next token in *buf, dynamically allocates a buffer big
2319 * enough to hold a copy of it, and copies the token into the new
2320 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2321 * that a duplicate buffer is created even for a zero-length token.
2322 *
2323 * Returns a pointer to the newly-allocated duplicate, or a null
2324 * pointer if memory for the duplicate was not available. If
2325 * the lenp argument is a non-null pointer, the length of the token
2326 * (not including the '\0') is returned in *lenp.
2327 *
2328 * If successful, the *buf pointer will be updated to point beyond
2329 * the end of the found token.
2330 *
2331 * Note: uses GFP_KERNEL for allocation.
2332 */
2333static inline char *dup_token(const char **buf, size_t *lenp)
2334{
2335 char *dup;
2336 size_t len;
2337
2338 len = next_token(buf);
2339 dup = kmalloc(len + 1, GFP_KERNEL);
2340 if (!dup)
2341 return NULL;
2342
2343 memcpy(dup, *buf, len);
2344 *(dup + len) = '\0';
2345 *buf += len;
2346
2347 if (lenp)
2348 *lenp = len;
2349
2350 return dup;
2351}
2352
2353/*
2354 * This fills in the pool_name, image_name, image_name_len, snap_name,
2355 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2356 * on the list of monitor addresses and other options provided via
2357 * /sys/bus/rbd/add.
2358 *
2359 * Note: rbd_dev is assumed to have been initially zero-filled.
2360 */
2361static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2362 const char *buf,
2363 const char **mon_addrs,
2364 size_t *mon_addrs_size,
2365 char *options,
2366 size_t options_size)
2367{
2368 size_t len;
2369 int ret;
2370
2371 /* The first four tokens are required */
2372
2373 len = next_token(&buf);
2374 if (!len)
2375 return -EINVAL;
2376 *mon_addrs_size = len + 1;
2377 *mon_addrs = buf;
2378
2379 buf += len;
2380
2381 len = copy_token(&buf, options, options_size);
2382 if (!len || len >= options_size)
2383 return -EINVAL;
2384
2385 ret = -ENOMEM;
2386 rbd_dev->pool_name = dup_token(&buf, NULL);
2387 if (!rbd_dev->pool_name)
2388 goto out_err;
2389
2390 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2391 if (!rbd_dev->image_name)
2392 goto out_err;
2393
2394 /* Create the name of the header object */
2395
2396 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2397 + sizeof (RBD_SUFFIX),
2398 GFP_KERNEL);
2399 if (!rbd_dev->header_name)
2400 goto out_err;
2401 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2402
2403 /*
2404 * The snapshot name is optional. If none is is supplied,
2405 * we use the default value.
2406 */
2407 rbd_dev->snap_name = dup_token(&buf, &len);
2408 if (!rbd_dev->snap_name)
2409 goto out_err;
2410 if (!len) {
2411 /* Replace the empty name with the default */
2412 kfree(rbd_dev->snap_name);
2413 rbd_dev->snap_name
2414 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2415 if (!rbd_dev->snap_name)
2416 goto out_err;
2417
2418 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2419 sizeof (RBD_SNAP_HEAD_NAME));
2420 }
2421
2422 return 0;
2423
2424out_err:
2425 kfree(rbd_dev->header_name);
2426 kfree(rbd_dev->image_name);
2427 kfree(rbd_dev->pool_name);
2428 rbd_dev->pool_name = NULL;
2429
2430 return ret;
2431}
2432
2433static ssize_t rbd_add(struct bus_type *bus,
2434 const char *buf,
2435 size_t count)
2436{
2437 char *options;
2438 struct rbd_device *rbd_dev = NULL;
2439 const char *mon_addrs = NULL;
2440 size_t mon_addrs_size = 0;
2441 struct ceph_osd_client *osdc;
2442 int rc = -ENOMEM;
2443
2444 if (!try_module_get(THIS_MODULE))
2445 return -ENODEV;
2446
2447 options = kmalloc(count, GFP_KERNEL);
2448 if (!options)
2449 goto err_nomem;
2450 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2451 if (!rbd_dev)
2452 goto err_nomem;
2453
2454 /* static rbd_device initialization */
2455 spin_lock_init(&rbd_dev->lock);
2456 INIT_LIST_HEAD(&rbd_dev->node);
2457 INIT_LIST_HEAD(&rbd_dev->snaps);
2458 init_rwsem(&rbd_dev->header_rwsem);
2459
2460 /* generate unique id: find highest unique id, add one */
2461 rbd_id_get(rbd_dev);
2462
2463 /* Fill in the device name, now that we have its id. */
2464 BUILD_BUG_ON(DEV_NAME_LEN
2465 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2466 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2467
2468 /* parse add command */
2469 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2470 options, count);
2471 if (rc)
2472 goto err_put_id;
2473
2474 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2475 options);
2476 if (IS_ERR(rbd_dev->rbd_client)) {
2477 rc = PTR_ERR(rbd_dev->rbd_client);
2478 goto err_put_id;
2479 }
2480
2481 /* pick the pool */
2482 osdc = &rbd_dev->rbd_client->client->osdc;
2483 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2484 if (rc < 0)
2485 goto err_out_client;
2486 rbd_dev->pool_id = rc;
2487
2488 /* register our block device */
2489 rc = register_blkdev(0, rbd_dev->name);
2490 if (rc < 0)
2491 goto err_out_client;
2492 rbd_dev->major = rc;
2493
2494 rc = rbd_bus_add_dev(rbd_dev);
2495 if (rc)
2496 goto err_out_blkdev;
2497
2498 /*
2499 * At this point cleanup in the event of an error is the job
2500 * of the sysfs code (initiated by rbd_bus_del_dev()).
2501 *
2502 * Set up and announce blkdev mapping.
2503 */
2504 rc = rbd_init_disk(rbd_dev);
2505 if (rc)
2506 goto err_out_bus;
2507
2508 rc = rbd_init_watch_dev(rbd_dev);
2509 if (rc)
2510 goto err_out_bus;
2511
2512 return count;
2513
2514err_out_bus:
2515 /* this will also clean up rest of rbd_dev stuff */
2516
2517 rbd_bus_del_dev(rbd_dev);
2518 kfree(options);
2519 return rc;
2520
2521err_out_blkdev:
2522 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2523err_out_client:
2524 rbd_put_client(rbd_dev);
2525err_put_id:
2526 if (rbd_dev->pool_name) {
2527 kfree(rbd_dev->snap_name);
2528 kfree(rbd_dev->header_name);
2529 kfree(rbd_dev->image_name);
2530 kfree(rbd_dev->pool_name);
2531 }
2532 rbd_id_put(rbd_dev);
2533err_nomem:
2534 kfree(rbd_dev);
2535 kfree(options);
2536
2537 dout("Error adding device %s\n", buf);
2538 module_put(THIS_MODULE);
2539
2540 return (ssize_t) rc;
2541}
2542
2543static struct rbd_device *__rbd_get_dev(unsigned long id)
2544{
2545 struct list_head *tmp;
2546 struct rbd_device *rbd_dev;
2547
2548 spin_lock(&rbd_dev_list_lock);
2549 list_for_each(tmp, &rbd_dev_list) {
2550 rbd_dev = list_entry(tmp, struct rbd_device, node);
2551 if (rbd_dev->id == id) {
2552 spin_unlock(&rbd_dev_list_lock);
2553 return rbd_dev;
2554 }
2555 }
2556 spin_unlock(&rbd_dev_list_lock);
2557 return NULL;
2558}
2559
2560static void rbd_dev_release(struct device *dev)
2561{
2562 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2563
2564 if (rbd_dev->watch_request) {
2565 struct ceph_client *client = rbd_dev->rbd_client->client;
2566
2567 ceph_osdc_unregister_linger_request(&client->osdc,
2568 rbd_dev->watch_request);
2569 }
2570 if (rbd_dev->watch_event)
2571 rbd_req_sync_unwatch(rbd_dev, rbd_dev->header_name);
2572
2573 rbd_put_client(rbd_dev);
2574
2575 /* clean up and free blkdev */
2576 rbd_free_disk(rbd_dev);
2577 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2578
2579 /* done with the id, and with the rbd_dev */
2580 kfree(rbd_dev->snap_name);
2581 kfree(rbd_dev->header_name);
2582 kfree(rbd_dev->pool_name);
2583 kfree(rbd_dev->image_name);
2584 rbd_id_put(rbd_dev);
2585 kfree(rbd_dev);
2586
2587 /* release module ref */
2588 module_put(THIS_MODULE);
2589}
2590
2591static ssize_t rbd_remove(struct bus_type *bus,
2592 const char *buf,
2593 size_t count)
2594{
2595 struct rbd_device *rbd_dev = NULL;
2596 int target_id, rc;
2597 unsigned long ul;
2598 int ret = count;
2599
2600 rc = strict_strtoul(buf, 10, &ul);
2601 if (rc)
2602 return rc;
2603
2604 /* convert to int; abort if we lost anything in the conversion */
2605 target_id = (int) ul;
2606 if (target_id != ul)
2607 return -EINVAL;
2608
2609 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2610
2611 rbd_dev = __rbd_get_dev(target_id);
2612 if (!rbd_dev) {
2613 ret = -ENOENT;
2614 goto done;
2615 }
2616
2617 __rbd_remove_all_snaps(rbd_dev);
2618 rbd_bus_del_dev(rbd_dev);
2619
2620done:
2621 mutex_unlock(&ctl_mutex);
2622 return ret;
2623}
2624
2625static ssize_t rbd_snap_add(struct device *dev,
2626 struct device_attribute *attr,
2627 const char *buf,
2628 size_t count)
2629{
2630 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2631 int ret;
2632 char *name = kmalloc(count + 1, GFP_KERNEL);
2633 if (!name)
2634 return -ENOMEM;
2635
2636 snprintf(name, count, "%s", buf);
2637
2638 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2639
2640 ret = rbd_header_add_snap(rbd_dev,
2641 name, GFP_KERNEL);
2642 if (ret < 0)
2643 goto err_unlock;
2644
2645 ret = __rbd_refresh_header(rbd_dev);
2646 if (ret < 0)
2647 goto err_unlock;
2648
2649 /* shouldn't hold ctl_mutex when notifying.. notify might
2650 trigger a watch callback that would need to get that mutex */
2651 mutex_unlock(&ctl_mutex);
2652
2653 /* make a best effort, don't error if failed */
2654 rbd_req_sync_notify(rbd_dev, rbd_dev->header_name);
2655
2656 ret = count;
2657 kfree(name);
2658 return ret;
2659
2660err_unlock:
2661 mutex_unlock(&ctl_mutex);
2662 kfree(name);
2663 return ret;
2664}
2665
2666/*
2667 * create control files in sysfs
2668 * /sys/bus/rbd/...
2669 */
2670static int rbd_sysfs_init(void)
2671{
2672 int ret;
2673
2674 ret = device_register(&rbd_root_dev);
2675 if (ret < 0)
2676 return ret;
2677
2678 ret = bus_register(&rbd_bus_type);
2679 if (ret < 0)
2680 device_unregister(&rbd_root_dev);
2681
2682 return ret;
2683}
2684
2685static void rbd_sysfs_cleanup(void)
2686{
2687 bus_unregister(&rbd_bus_type);
2688 device_unregister(&rbd_root_dev);
2689}
2690
2691int __init rbd_init(void)
2692{
2693 int rc;
2694
2695 rc = rbd_sysfs_init();
2696 if (rc)
2697 return rc;
2698 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2699 return 0;
2700}
2701
2702void __exit rbd_exit(void)
2703{
2704 rbd_sysfs_cleanup();
2705}
2706
2707module_init(rbd_init);
2708module_exit(rbd_exit);
2709
2710MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2711MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2712MODULE_DESCRIPTION("rados block device");
2713
2714/* following authorship retained from original osdblk.c */
2715MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2716
2717MODULE_LICENSE("GPL");