]> git.proxmox.com Git - mirror_ubuntu-jammy-kernel.git/blame - drivers/block/rbd.c
rbd: protect the rbd_dev_list with a spinlock
[mirror_ubuntu-jammy-kernel.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
44#define DRV_NAME "rbd"
45#define DRV_NAME_LONG "rbd (rados block device)"
46
47#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
48
21079786 49#define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
602adf40
YS
50#define RBD_MAX_POOL_NAME_LEN 64
51#define RBD_MAX_SNAP_NAME_LEN 32
52#define RBD_MAX_OPT_LEN 1024
53
54#define RBD_SNAP_HEAD_NAME "-"
55
56#define DEV_NAME_LEN 32
57
59c2be1e
YS
58#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
602adf40
YS
60/*
61 * block device image metadata (in-memory version)
62 */
63struct rbd_image_header {
64 u64 image_size;
65 char block_name[32];
66 __u8 obj_order;
67 __u8 crypt_type;
68 __u8 comp_type;
69 struct rw_semaphore snap_rwsem;
70 struct ceph_snap_context *snapc;
71 size_t snap_names_len;
72 u64 snap_seq;
73 u32 total_snaps;
74
75 char *snap_names;
76 u64 *snap_sizes;
59c2be1e
YS
77
78 u64 obj_version;
79};
80
81struct rbd_options {
82 int notify_timeout;
602adf40
YS
83};
84
85/*
86 * an instance of the client. multiple devices may share a client.
87 */
88struct rbd_client {
89 struct ceph_client *client;
59c2be1e 90 struct rbd_options *rbd_opts;
602adf40
YS
91 struct kref kref;
92 struct list_head node;
93};
94
1fec7093
YS
95struct rbd_req_coll;
96
602adf40
YS
97/*
98 * a single io request
99 */
100struct rbd_request {
101 struct request *rq; /* blk layer request */
102 struct bio *bio; /* cloned bio */
103 struct page **pages; /* list of used pages */
104 u64 len;
1fec7093
YS
105 int coll_index;
106 struct rbd_req_coll *coll;
107};
108
109struct rbd_req_status {
110 int done;
111 int rc;
112 u64 bytes;
113};
114
115/*
116 * a collection of requests
117 */
118struct rbd_req_coll {
119 int total;
120 int num_done;
121 struct kref kref;
122 struct rbd_req_status status[0];
602adf40
YS
123};
124
dfc5606d
YS
125struct rbd_snap {
126 struct device dev;
127 const char *name;
128 size_t size;
129 struct list_head node;
130 u64 id;
131};
132
602adf40
YS
133/*
134 * a single device
135 */
136struct rbd_device {
137 int id; /* blkdev unique id */
138
139 int major; /* blkdev assigned major */
140 struct gendisk *disk; /* blkdev's gendisk and rq */
141 struct request_queue *q;
142
602adf40
YS
143 struct rbd_client *rbd_client;
144
145 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
146
147 spinlock_t lock; /* queue lock */
148
149 struct rbd_image_header header;
150 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
151 int obj_len;
152 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
153 char pool_name[RBD_MAX_POOL_NAME_LEN];
154 int poolid;
155
59c2be1e
YS
156 struct ceph_osd_event *watch_event;
157 struct ceph_osd_request *watch_request;
158
602adf40
YS
159 char snap_name[RBD_MAX_SNAP_NAME_LEN];
160 u32 cur_snap; /* index+1 of current snapshot within snap context
161 0 - for the head */
162 int read_only;
163
164 struct list_head node;
dfc5606d
YS
165
166 /* list of snapshots */
167 struct list_head snaps;
168
169 /* sysfs related */
170 struct device dev;
171};
172
173static struct bus_type rbd_bus_type = {
174 .name = "rbd",
602adf40
YS
175};
176
602adf40 177static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 178
602adf40 179static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
180static DEFINE_SPINLOCK(rbd_dev_list_lock);
181
602adf40 182static LIST_HEAD(rbd_client_list); /* clients */
e124a82f 183static DEFINE_SPINLOCK(node_lock); /* protects client get/put */
602adf40 184
dfc5606d
YS
185static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
186static void rbd_dev_release(struct device *dev);
dfc5606d
YS
187static ssize_t rbd_snap_add(struct device *dev,
188 struct device_attribute *attr,
189 const char *buf,
190 size_t count);
191static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
69932487 192 struct rbd_snap *snap);
dfc5606d
YS
193
194
195static struct rbd_device *dev_to_rbd(struct device *dev)
196{
197 return container_of(dev, struct rbd_device, dev);
198}
199
200static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
201{
202 return get_device(&rbd_dev->dev);
203}
204
205static void rbd_put_dev(struct rbd_device *rbd_dev)
206{
207 put_device(&rbd_dev->dev);
208}
602adf40 209
59c2be1e
YS
210static int __rbd_update_snaps(struct rbd_device *rbd_dev);
211
602adf40
YS
212static int rbd_open(struct block_device *bdev, fmode_t mode)
213{
214 struct gendisk *disk = bdev->bd_disk;
215 struct rbd_device *rbd_dev = disk->private_data;
216
dfc5606d
YS
217 rbd_get_dev(rbd_dev);
218
602adf40
YS
219 set_device_ro(bdev, rbd_dev->read_only);
220
221 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
222 return -EROFS;
223
224 return 0;
225}
226
dfc5606d
YS
227static int rbd_release(struct gendisk *disk, fmode_t mode)
228{
229 struct rbd_device *rbd_dev = disk->private_data;
230
231 rbd_put_dev(rbd_dev);
232
233 return 0;
234}
235
602adf40
YS
236static const struct block_device_operations rbd_bd_ops = {
237 .owner = THIS_MODULE,
238 .open = rbd_open,
dfc5606d 239 .release = rbd_release,
602adf40
YS
240};
241
242/*
243 * Initialize an rbd client instance.
244 * We own *opt.
245 */
59c2be1e
YS
246static struct rbd_client *rbd_client_create(struct ceph_options *opt,
247 struct rbd_options *rbd_opts)
602adf40
YS
248{
249 struct rbd_client *rbdc;
250 int ret = -ENOMEM;
251
252 dout("rbd_client_create\n");
253 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
254 if (!rbdc)
255 goto out_opt;
256
257 kref_init(&rbdc->kref);
258 INIT_LIST_HEAD(&rbdc->node);
259
6ab00d46 260 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
602adf40
YS
261 if (IS_ERR(rbdc->client))
262 goto out_rbdc;
28f259b7 263 opt = NULL; /* Now rbdc->client is responsible for opt */
602adf40
YS
264
265 ret = ceph_open_session(rbdc->client);
266 if (ret < 0)
267 goto out_err;
268
59c2be1e
YS
269 rbdc->rbd_opts = rbd_opts;
270
602adf40
YS
271 spin_lock(&node_lock);
272 list_add_tail(&rbdc->node, &rbd_client_list);
273 spin_unlock(&node_lock);
274
275 dout("rbd_client_create created %p\n", rbdc);
276 return rbdc;
277
278out_err:
279 ceph_destroy_client(rbdc->client);
602adf40
YS
280out_rbdc:
281 kfree(rbdc);
282out_opt:
28f259b7
VK
283 if (opt)
284 ceph_destroy_options(opt);
285 return ERR_PTR(ret);
602adf40
YS
286}
287
288/*
289 * Find a ceph client with specific addr and configuration.
290 */
291static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
292{
293 struct rbd_client *client_node;
294
295 if (opt->flags & CEPH_OPT_NOSHARE)
296 return NULL;
297
298 list_for_each_entry(client_node, &rbd_client_list, node)
299 if (ceph_compare_options(opt, client_node->client) == 0)
300 return client_node;
301 return NULL;
302}
303
59c2be1e
YS
304/*
305 * mount options
306 */
307enum {
308 Opt_notify_timeout,
309 Opt_last_int,
310 /* int args above */
311 Opt_last_string,
312 /* string args above */
313};
314
315static match_table_t rbdopt_tokens = {
316 {Opt_notify_timeout, "notify_timeout=%d"},
317 /* int args above */
318 /* string args above */
319 {-1, NULL}
320};
321
322static int parse_rbd_opts_token(char *c, void *private)
323{
324 struct rbd_options *rbdopt = private;
325 substring_t argstr[MAX_OPT_ARGS];
326 int token, intval, ret;
327
21079786 328 token = match_token(c, rbdopt_tokens, argstr);
59c2be1e
YS
329 if (token < 0)
330 return -EINVAL;
331
332 if (token < Opt_last_int) {
333 ret = match_int(&argstr[0], &intval);
334 if (ret < 0) {
335 pr_err("bad mount option arg (not int) "
336 "at '%s'\n", c);
337 return ret;
338 }
339 dout("got int token %d val %d\n", token, intval);
340 } else if (token > Opt_last_int && token < Opt_last_string) {
341 dout("got string token %d val %s\n", token,
342 argstr[0].from);
343 } else {
344 dout("got token %d\n", token);
345 }
346
347 switch (token) {
348 case Opt_notify_timeout:
349 rbdopt->notify_timeout = intval;
350 break;
351 default:
352 BUG_ON(token);
353 }
354 return 0;
355}
356
602adf40
YS
357/*
358 * Get a ceph client with specific addr and configuration, if one does
359 * not exist create it.
360 */
361static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
362 char *options)
363{
364 struct rbd_client *rbdc;
365 struct ceph_options *opt;
366 int ret;
59c2be1e
YS
367 struct rbd_options *rbd_opts;
368
369 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
370 if (!rbd_opts)
371 return -ENOMEM;
372
373 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 374
ee57741c 375 opt = ceph_parse_options(options, mon_addr,
21079786
AE
376 mon_addr + strlen(mon_addr),
377 parse_rbd_opts_token, rbd_opts);
ee57741c
AE
378 if (IS_ERR(opt)) {
379 ret = PTR_ERR(opt);
59c2be1e 380 goto done_err;
ee57741c 381 }
602adf40
YS
382
383 spin_lock(&node_lock);
384 rbdc = __rbd_client_find(opt);
385 if (rbdc) {
386 ceph_destroy_options(opt);
97bb59a0 387 kfree(rbd_opts);
602adf40
YS
388
389 /* using an existing client */
390 kref_get(&rbdc->kref);
391 rbd_dev->rbd_client = rbdc;
602adf40
YS
392 spin_unlock(&node_lock);
393 return 0;
394 }
395 spin_unlock(&node_lock);
396
59c2be1e
YS
397 rbdc = rbd_client_create(opt, rbd_opts);
398 if (IS_ERR(rbdc)) {
399 ret = PTR_ERR(rbdc);
400 goto done_err;
401 }
602adf40
YS
402
403 rbd_dev->rbd_client = rbdc;
602adf40 404 return 0;
59c2be1e
YS
405done_err:
406 kfree(rbd_opts);
407 return ret;
602adf40
YS
408}
409
410/*
411 * Destroy ceph client
d23a4b3f
AE
412 *
413 * Caller must hold node_lock.
602adf40
YS
414 */
415static void rbd_client_release(struct kref *kref)
416{
417 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
418
419 dout("rbd_release_client %p\n", rbdc);
602adf40 420 list_del(&rbdc->node);
602adf40
YS
421
422 ceph_destroy_client(rbdc->client);
59c2be1e 423 kfree(rbdc->rbd_opts);
602adf40
YS
424 kfree(rbdc);
425}
426
427/*
428 * Drop reference to ceph client node. If it's not referenced anymore, release
429 * it.
430 */
431static void rbd_put_client(struct rbd_device *rbd_dev)
432{
d23a4b3f 433 spin_lock(&node_lock);
602adf40 434 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
d23a4b3f 435 spin_unlock(&node_lock);
602adf40 436 rbd_dev->rbd_client = NULL;
602adf40
YS
437}
438
1fec7093
YS
439/*
440 * Destroy requests collection
441 */
442static void rbd_coll_release(struct kref *kref)
443{
444 struct rbd_req_coll *coll =
445 container_of(kref, struct rbd_req_coll, kref);
446
447 dout("rbd_coll_release %p\n", coll);
448 kfree(coll);
449}
602adf40
YS
450
451/*
452 * Create a new header structure, translate header format from the on-disk
453 * header.
454 */
455static int rbd_header_from_disk(struct rbd_image_header *header,
456 struct rbd_image_header_ondisk *ondisk,
457 int allocated_snaps,
458 gfp_t gfp_flags)
459{
460 int i;
461 u32 snap_count = le32_to_cpu(ondisk->snap_count);
462 int ret = -ENOMEM;
463
21079786 464 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
81e759fb 465 return -ENXIO;
81e759fb 466
602adf40 467 init_rwsem(&header->snap_rwsem);
602adf40
YS
468 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
469 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
21079786 470 snap_count * sizeof (*ondisk),
602adf40
YS
471 gfp_flags);
472 if (!header->snapc)
473 return -ENOMEM;
474 if (snap_count) {
475 header->snap_names = kmalloc(header->snap_names_len,
476 GFP_KERNEL);
477 if (!header->snap_names)
478 goto err_snapc;
479 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
480 GFP_KERNEL);
481 if (!header->snap_sizes)
482 goto err_names;
483 } else {
484 header->snap_names = NULL;
485 header->snap_sizes = NULL;
486 }
487 memcpy(header->block_name, ondisk->block_name,
488 sizeof(ondisk->block_name));
489
490 header->image_size = le64_to_cpu(ondisk->image_size);
491 header->obj_order = ondisk->options.order;
492 header->crypt_type = ondisk->options.crypt_type;
493 header->comp_type = ondisk->options.comp_type;
494
495 atomic_set(&header->snapc->nref, 1);
496 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
497 header->snapc->num_snaps = snap_count;
498 header->total_snaps = snap_count;
499
21079786 500 if (snap_count && allocated_snaps == snap_count) {
602adf40
YS
501 for (i = 0; i < snap_count; i++) {
502 header->snapc->snaps[i] =
503 le64_to_cpu(ondisk->snaps[i].id);
504 header->snap_sizes[i] =
505 le64_to_cpu(ondisk->snaps[i].image_size);
506 }
507
508 /* copy snapshot names */
509 memcpy(header->snap_names, &ondisk->snaps[i],
510 header->snap_names_len);
511 }
512
513 return 0;
514
515err_names:
516 kfree(header->snap_names);
517err_snapc:
518 kfree(header->snapc);
519 return ret;
520}
521
522static int snap_index(struct rbd_image_header *header, int snap_num)
523{
524 return header->total_snaps - snap_num;
525}
526
527static u64 cur_snap_id(struct rbd_device *rbd_dev)
528{
529 struct rbd_image_header *header = &rbd_dev->header;
530
531 if (!rbd_dev->cur_snap)
532 return 0;
533
534 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
535}
536
537static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
538 u64 *seq, u64 *size)
539{
540 int i;
541 char *p = header->snap_names;
542
543 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
544 if (strcmp(snap_name, p) == 0)
545 break;
546 }
547 if (i == header->total_snaps)
548 return -ENOENT;
549 if (seq)
550 *seq = header->snapc->snaps[i];
551
552 if (size)
553 *size = header->snap_sizes[i];
554
555 return i;
556}
557
cc9d734c 558static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
602adf40
YS
559{
560 struct rbd_image_header *header = &dev->header;
561 struct ceph_snap_context *snapc = header->snapc;
562 int ret = -ENOENT;
563
cc9d734c
JD
564 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
565
602adf40
YS
566 down_write(&header->snap_rwsem);
567
cc9d734c
JD
568 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
569 sizeof (RBD_SNAP_HEAD_NAME))) {
602adf40
YS
570 if (header->total_snaps)
571 snapc->seq = header->snap_seq;
572 else
573 snapc->seq = 0;
574 dev->cur_snap = 0;
575 dev->read_only = 0;
576 if (size)
577 *size = header->image_size;
578 } else {
cc9d734c 579 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
602adf40
YS
580 if (ret < 0)
581 goto done;
582
583 dev->cur_snap = header->total_snaps - ret;
584 dev->read_only = 1;
585 }
586
587 ret = 0;
588done:
589 up_write(&header->snap_rwsem);
590 return ret;
591}
592
593static void rbd_header_free(struct rbd_image_header *header)
594{
595 kfree(header->snapc);
596 kfree(header->snap_names);
597 kfree(header->snap_sizes);
598}
599
600/*
601 * get the actual striped segment name, offset and length
602 */
603static u64 rbd_get_segment(struct rbd_image_header *header,
604 const char *block_name,
605 u64 ofs, u64 len,
606 char *seg_name, u64 *segofs)
607{
608 u64 seg = ofs >> header->obj_order;
609
610 if (seg_name)
611 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
612 "%s.%012llx", block_name, seg);
613
614 ofs = ofs & ((1 << header->obj_order) - 1);
615 len = min_t(u64, len, (1 << header->obj_order) - ofs);
616
617 if (segofs)
618 *segofs = ofs;
619
620 return len;
621}
622
1fec7093
YS
623static int rbd_get_num_segments(struct rbd_image_header *header,
624 u64 ofs, u64 len)
625{
626 u64 start_seg = ofs >> header->obj_order;
627 u64 end_seg = (ofs + len - 1) >> header->obj_order;
628 return end_seg - start_seg + 1;
629}
630
029bcbd8
JD
631/*
632 * returns the size of an object in the image
633 */
634static u64 rbd_obj_bytes(struct rbd_image_header *header)
635{
636 return 1 << header->obj_order;
637}
638
602adf40
YS
639/*
640 * bio helpers
641 */
642
643static void bio_chain_put(struct bio *chain)
644{
645 struct bio *tmp;
646
647 while (chain) {
648 tmp = chain;
649 chain = chain->bi_next;
650 bio_put(tmp);
651 }
652}
653
654/*
655 * zeros a bio chain, starting at specific offset
656 */
657static void zero_bio_chain(struct bio *chain, int start_ofs)
658{
659 struct bio_vec *bv;
660 unsigned long flags;
661 void *buf;
662 int i;
663 int pos = 0;
664
665 while (chain) {
666 bio_for_each_segment(bv, chain, i) {
667 if (pos + bv->bv_len > start_ofs) {
668 int remainder = max(start_ofs - pos, 0);
669 buf = bvec_kmap_irq(bv, &flags);
670 memset(buf + remainder, 0,
671 bv->bv_len - remainder);
85b5aaa6 672 bvec_kunmap_irq(buf, &flags);
602adf40
YS
673 }
674 pos += bv->bv_len;
675 }
676
677 chain = chain->bi_next;
678 }
679}
680
681/*
682 * bio_chain_clone - clone a chain of bios up to a certain length.
683 * might return a bio_pair that will need to be released.
684 */
685static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
686 struct bio_pair **bp,
687 int len, gfp_t gfpmask)
688{
689 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
690 int total = 0;
691
692 if (*bp) {
693 bio_pair_release(*bp);
694 *bp = NULL;
695 }
696
697 while (old_chain && (total < len)) {
698 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
699 if (!tmp)
700 goto err_out;
701
702 if (total + old_chain->bi_size > len) {
703 struct bio_pair *bp;
704
705 /*
706 * this split can only happen with a single paged bio,
707 * split_bio will BUG_ON if this is not the case
708 */
709 dout("bio_chain_clone split! total=%d remaining=%d"
710 "bi_size=%d\n",
711 (int)total, (int)len-total,
712 (int)old_chain->bi_size);
713
714 /* split the bio. We'll release it either in the next
715 call, or it will have to be released outside */
716 bp = bio_split(old_chain, (len - total) / 512ULL);
717 if (!bp)
718 goto err_out;
719
720 __bio_clone(tmp, &bp->bio1);
721
722 *next = &bp->bio2;
723 } else {
724 __bio_clone(tmp, old_chain);
725 *next = old_chain->bi_next;
726 }
727
728 tmp->bi_bdev = NULL;
729 gfpmask &= ~__GFP_WAIT;
730 tmp->bi_next = NULL;
731
732 if (!new_chain) {
733 new_chain = tail = tmp;
734 } else {
735 tail->bi_next = tmp;
736 tail = tmp;
737 }
738 old_chain = old_chain->bi_next;
739
740 total += tmp->bi_size;
741 }
742
743 BUG_ON(total < len);
744
745 if (tail)
746 tail->bi_next = NULL;
747
748 *old = old_chain;
749
750 return new_chain;
751
752err_out:
753 dout("bio_chain_clone with err\n");
754 bio_chain_put(new_chain);
755 return NULL;
756}
757
758/*
759 * helpers for osd request op vectors.
760 */
761static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
762 int num_ops,
763 int opcode,
764 u32 payload_len)
765{
766 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
767 GFP_NOIO);
768 if (!*ops)
769 return -ENOMEM;
770 (*ops)[0].op = opcode;
771 /*
772 * op extent offset and length will be set later on
773 * in calc_raw_layout()
774 */
775 (*ops)[0].payload_len = payload_len;
776 return 0;
777}
778
779static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
780{
781 kfree(ops);
782}
783
1fec7093
YS
784static void rbd_coll_end_req_index(struct request *rq,
785 struct rbd_req_coll *coll,
786 int index,
787 int ret, u64 len)
788{
789 struct request_queue *q;
790 int min, max, i;
791
792 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
793 coll, index, ret, len);
794
795 if (!rq)
796 return;
797
798 if (!coll) {
799 blk_end_request(rq, ret, len);
800 return;
801 }
802
803 q = rq->q;
804
805 spin_lock_irq(q->queue_lock);
806 coll->status[index].done = 1;
807 coll->status[index].rc = ret;
808 coll->status[index].bytes = len;
809 max = min = coll->num_done;
810 while (max < coll->total && coll->status[max].done)
811 max++;
812
813 for (i = min; i<max; i++) {
814 __blk_end_request(rq, coll->status[i].rc,
815 coll->status[i].bytes);
816 coll->num_done++;
817 kref_put(&coll->kref, rbd_coll_release);
818 }
819 spin_unlock_irq(q->queue_lock);
820}
821
822static void rbd_coll_end_req(struct rbd_request *req,
823 int ret, u64 len)
824{
825 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
826}
827
602adf40
YS
828/*
829 * Send ceph osd request
830 */
831static int rbd_do_request(struct request *rq,
832 struct rbd_device *dev,
833 struct ceph_snap_context *snapc,
834 u64 snapid,
835 const char *obj, u64 ofs, u64 len,
836 struct bio *bio,
837 struct page **pages,
838 int num_pages,
839 int flags,
840 struct ceph_osd_req_op *ops,
841 int num_reply,
1fec7093
YS
842 struct rbd_req_coll *coll,
843 int coll_index,
602adf40 844 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
845 struct ceph_msg *msg),
846 struct ceph_osd_request **linger_req,
847 u64 *ver)
602adf40
YS
848{
849 struct ceph_osd_request *req;
850 struct ceph_file_layout *layout;
851 int ret;
852 u64 bno;
853 struct timespec mtime = CURRENT_TIME;
854 struct rbd_request *req_data;
855 struct ceph_osd_request_head *reqhead;
856 struct rbd_image_header *header = &dev->header;
1dbb4399 857 struct ceph_osd_client *osdc;
602adf40 858
602adf40 859 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
860 if (!req_data) {
861 if (coll)
862 rbd_coll_end_req_index(rq, coll, coll_index,
863 -ENOMEM, len);
864 return -ENOMEM;
865 }
866
867 if (coll) {
868 req_data->coll = coll;
869 req_data->coll_index = coll_index;
870 }
602adf40 871
1fec7093 872 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
602adf40
YS
873
874 down_read(&header->snap_rwsem);
875
1dbb4399
AE
876 osdc = &dev->rbd_client->client->osdc;
877 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
878 false, GFP_NOIO, pages, bio);
4ad12621 879 if (!req) {
602adf40 880 up_read(&header->snap_rwsem);
4ad12621 881 ret = -ENOMEM;
602adf40
YS
882 goto done_pages;
883 }
884
885 req->r_callback = rbd_cb;
886
887 req_data->rq = rq;
888 req_data->bio = bio;
889 req_data->pages = pages;
890 req_data->len = len;
891
892 req->r_priv = req_data;
893
894 reqhead = req->r_request->front.iov_base;
895 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
896
897 strncpy(req->r_oid, obj, sizeof(req->r_oid));
898 req->r_oid_len = strlen(req->r_oid);
899
900 layout = &req->r_file_layout;
901 memset(layout, 0, sizeof(*layout));
902 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
903 layout->fl_stripe_count = cpu_to_le32(1);
904 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
905 layout->fl_pg_preferred = cpu_to_le32(-1);
906 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
1dbb4399
AE
907 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
908 req, ops);
602adf40
YS
909
910 ceph_osdc_build_request(req, ofs, &len,
911 ops,
912 snapc,
913 &mtime,
914 req->r_oid, req->r_oid_len);
915 up_read(&header->snap_rwsem);
916
59c2be1e 917 if (linger_req) {
1dbb4399 918 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
919 *linger_req = req;
920 }
921
1dbb4399 922 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
923 if (ret < 0)
924 goto done_err;
925
926 if (!rbd_cb) {
1dbb4399 927 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
928 if (ver)
929 *ver = le64_to_cpu(req->r_reassert_version.version);
1fec7093
YS
930 dout("reassert_ver=%lld\n",
931 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
932 ceph_osdc_put_request(req);
933 }
934 return ret;
935
936done_err:
937 bio_chain_put(req_data->bio);
938 ceph_osdc_put_request(req);
939done_pages:
1fec7093 940 rbd_coll_end_req(req_data, ret, len);
602adf40 941 kfree(req_data);
602adf40
YS
942 return ret;
943}
944
945/*
946 * Ceph osd op callback
947 */
948static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
949{
950 struct rbd_request *req_data = req->r_priv;
951 struct ceph_osd_reply_head *replyhead;
952 struct ceph_osd_op *op;
953 __s32 rc;
954 u64 bytes;
955 int read_op;
956
957 /* parse reply */
958 replyhead = msg->front.iov_base;
959 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
960 op = (void *)(replyhead + 1);
961 rc = le32_to_cpu(replyhead->result);
962 bytes = le64_to_cpu(op->extent.length);
963 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
964
965 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
966
967 if (rc == -ENOENT && read_op) {
968 zero_bio_chain(req_data->bio, 0);
969 rc = 0;
970 } else if (rc == 0 && read_op && bytes < req_data->len) {
971 zero_bio_chain(req_data->bio, bytes);
972 bytes = req_data->len;
973 }
974
1fec7093 975 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
976
977 if (req_data->bio)
978 bio_chain_put(req_data->bio);
979
980 ceph_osdc_put_request(req);
981 kfree(req_data);
982}
983
59c2be1e
YS
984static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
985{
986 ceph_osdc_put_request(req);
987}
988
602adf40
YS
989/*
990 * Do a synchronous ceph osd operation
991 */
992static int rbd_req_sync_op(struct rbd_device *dev,
993 struct ceph_snap_context *snapc,
994 u64 snapid,
995 int opcode,
996 int flags,
997 struct ceph_osd_req_op *orig_ops,
998 int num_reply,
999 const char *obj,
1000 u64 ofs, u64 len,
59c2be1e
YS
1001 char *buf,
1002 struct ceph_osd_request **linger_req,
1003 u64 *ver)
602adf40
YS
1004{
1005 int ret;
1006 struct page **pages;
1007 int num_pages;
1008 struct ceph_osd_req_op *ops = orig_ops;
1009 u32 payload_len;
1010
1011 num_pages = calc_pages_for(ofs , len);
1012 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1013 if (IS_ERR(pages))
1014 return PTR_ERR(pages);
602adf40
YS
1015
1016 if (!orig_ops) {
1017 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1018 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1019 if (ret < 0)
1020 goto done;
1021
1022 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1023 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1024 if (ret < 0)
1025 goto done_ops;
1026 }
1027 }
1028
1029 ret = rbd_do_request(NULL, dev, snapc, snapid,
1030 obj, ofs, len, NULL,
1031 pages, num_pages,
1032 flags,
1033 ops,
1034 2,
1fec7093 1035 NULL, 0,
59c2be1e
YS
1036 NULL,
1037 linger_req, ver);
602adf40
YS
1038 if (ret < 0)
1039 goto done_ops;
1040
1041 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1042 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1043
1044done_ops:
1045 if (!orig_ops)
1046 rbd_destroy_ops(ops);
1047done:
1048 ceph_release_page_vector(pages, num_pages);
1049 return ret;
1050}
1051
1052/*
1053 * Do an asynchronous ceph osd operation
1054 */
1055static int rbd_do_op(struct request *rq,
1056 struct rbd_device *rbd_dev ,
1057 struct ceph_snap_context *snapc,
1058 u64 snapid,
1059 int opcode, int flags, int num_reply,
1060 u64 ofs, u64 len,
1fec7093
YS
1061 struct bio *bio,
1062 struct rbd_req_coll *coll,
1063 int coll_index)
602adf40
YS
1064{
1065 char *seg_name;
1066 u64 seg_ofs;
1067 u64 seg_len;
1068 int ret;
1069 struct ceph_osd_req_op *ops;
1070 u32 payload_len;
1071
1072 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1073 if (!seg_name)
1074 return -ENOMEM;
1075
1076 seg_len = rbd_get_segment(&rbd_dev->header,
1077 rbd_dev->header.block_name,
1078 ofs, len,
1079 seg_name, &seg_ofs);
602adf40
YS
1080
1081 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1082
1083 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1084 if (ret < 0)
1085 goto done;
1086
1087 /* we've taken care of segment sizes earlier when we
1088 cloned the bios. We should never have a segment
1089 truncated at this point */
1090 BUG_ON(seg_len < len);
1091
1092 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1093 seg_name, seg_ofs, seg_len,
1094 bio,
1095 NULL, 0,
1096 flags,
1097 ops,
1098 num_reply,
1fec7093 1099 coll, coll_index,
59c2be1e 1100 rbd_req_cb, 0, NULL);
11f77002
SW
1101
1102 rbd_destroy_ops(ops);
602adf40
YS
1103done:
1104 kfree(seg_name);
1105 return ret;
1106}
1107
1108/*
1109 * Request async osd write
1110 */
1111static int rbd_req_write(struct request *rq,
1112 struct rbd_device *rbd_dev,
1113 struct ceph_snap_context *snapc,
1114 u64 ofs, u64 len,
1fec7093
YS
1115 struct bio *bio,
1116 struct rbd_req_coll *coll,
1117 int coll_index)
602adf40
YS
1118{
1119 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1120 CEPH_OSD_OP_WRITE,
1121 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1122 2,
1fec7093 1123 ofs, len, bio, coll, coll_index);
602adf40
YS
1124}
1125
1126/*
1127 * Request async osd read
1128 */
1129static int rbd_req_read(struct request *rq,
1130 struct rbd_device *rbd_dev,
1131 u64 snapid,
1132 u64 ofs, u64 len,
1fec7093
YS
1133 struct bio *bio,
1134 struct rbd_req_coll *coll,
1135 int coll_index)
602adf40
YS
1136{
1137 return rbd_do_op(rq, rbd_dev, NULL,
1138 (snapid ? snapid : CEPH_NOSNAP),
1139 CEPH_OSD_OP_READ,
1140 CEPH_OSD_FLAG_READ,
1141 2,
1fec7093 1142 ofs, len, bio, coll, coll_index);
602adf40
YS
1143}
1144
1145/*
1146 * Request sync osd read
1147 */
1148static int rbd_req_sync_read(struct rbd_device *dev,
1149 struct ceph_snap_context *snapc,
1150 u64 snapid,
1151 const char *obj,
1152 u64 ofs, u64 len,
59c2be1e
YS
1153 char *buf,
1154 u64 *ver)
602adf40
YS
1155{
1156 return rbd_req_sync_op(dev, NULL,
1157 (snapid ? snapid : CEPH_NOSNAP),
1158 CEPH_OSD_OP_READ,
1159 CEPH_OSD_FLAG_READ,
1160 NULL,
59c2be1e 1161 1, obj, ofs, len, buf, NULL, ver);
602adf40
YS
1162}
1163
1164/*
59c2be1e
YS
1165 * Request sync osd watch
1166 */
1167static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1168 u64 ver,
1169 u64 notify_id,
1170 const char *obj)
1171{
1172 struct ceph_osd_req_op *ops;
1173 struct page **pages = NULL;
11f77002
SW
1174 int ret;
1175
1176 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
59c2be1e
YS
1177 if (ret < 0)
1178 return ret;
1179
1180 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1181 ops[0].watch.cookie = notify_id;
1182 ops[0].watch.flag = 0;
1183
1184 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1185 obj, 0, 0, NULL,
1186 pages, 0,
1187 CEPH_OSD_FLAG_READ,
1188 ops,
1189 1,
1fec7093 1190 NULL, 0,
59c2be1e
YS
1191 rbd_simple_req_cb, 0, NULL);
1192
1193 rbd_destroy_ops(ops);
1194 return ret;
1195}
1196
1197static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1198{
1199 struct rbd_device *dev = (struct rbd_device *)data;
13143d2d
SW
1200 int rc;
1201
59c2be1e
YS
1202 if (!dev)
1203 return;
1204
1205 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1206 notify_id, (int)opcode);
1207 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
13143d2d 1208 rc = __rbd_update_snaps(dev);
59c2be1e 1209 mutex_unlock(&ctl_mutex);
13143d2d
SW
1210 if (rc)
1211 pr_warning(DRV_NAME "%d got notification but failed to update"
1212 " snaps: %d\n", dev->major, rc);
59c2be1e
YS
1213
1214 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1215}
1216
1217/*
1218 * Request sync osd watch
1219 */
1220static int rbd_req_sync_watch(struct rbd_device *dev,
1221 const char *obj,
1222 u64 ver)
1223{
1224 struct ceph_osd_req_op *ops;
1dbb4399 1225 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1226
1227 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1228 if (ret < 0)
1229 return ret;
1230
1231 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1232 (void *)dev, &dev->watch_event);
1233 if (ret < 0)
1234 goto fail;
1235
1236 ops[0].watch.ver = cpu_to_le64(ver);
1237 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1238 ops[0].watch.flag = 1;
1239
1240 ret = rbd_req_sync_op(dev, NULL,
1241 CEPH_NOSNAP,
1242 0,
1243 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1244 ops,
1245 1, obj, 0, 0, NULL,
1246 &dev->watch_request, NULL);
1247
1248 if (ret < 0)
1249 goto fail_event;
1250
1251 rbd_destroy_ops(ops);
1252 return 0;
1253
1254fail_event:
1255 ceph_osdc_cancel_event(dev->watch_event);
1256 dev->watch_event = NULL;
1257fail:
1258 rbd_destroy_ops(ops);
1259 return ret;
1260}
1261
79e3057c
YS
1262/*
1263 * Request sync osd unwatch
1264 */
1265static int rbd_req_sync_unwatch(struct rbd_device *dev,
1266 const char *obj)
1267{
1268 struct ceph_osd_req_op *ops;
1269
1270 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1271 if (ret < 0)
1272 return ret;
1273
1274 ops[0].watch.ver = 0;
1275 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1276 ops[0].watch.flag = 0;
1277
1278 ret = rbd_req_sync_op(dev, NULL,
1279 CEPH_NOSNAP,
1280 0,
1281 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1282 ops,
1283 1, obj, 0, 0, NULL, NULL, NULL);
1284
1285 rbd_destroy_ops(ops);
1286 ceph_osdc_cancel_event(dev->watch_event);
1287 dev->watch_event = NULL;
1288 return ret;
1289}
1290
59c2be1e
YS
1291struct rbd_notify_info {
1292 struct rbd_device *dev;
1293};
1294
1295static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1296{
1297 struct rbd_device *dev = (struct rbd_device *)data;
1298 if (!dev)
1299 return;
1300
1301 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1302 notify_id, (int)opcode);
1303}
1304
1305/*
1306 * Request sync osd notify
1307 */
1308static int rbd_req_sync_notify(struct rbd_device *dev,
1309 const char *obj)
1310{
1311 struct ceph_osd_req_op *ops;
1dbb4399 1312 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1313 struct ceph_osd_event *event;
1314 struct rbd_notify_info info;
1315 int payload_len = sizeof(u32) + sizeof(u32);
1316 int ret;
1317
1318 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1319 if (ret < 0)
1320 return ret;
1321
1322 info.dev = dev;
1323
1324 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1325 (void *)&info, &event);
1326 if (ret < 0)
1327 goto fail;
1328
1329 ops[0].watch.ver = 1;
1330 ops[0].watch.flag = 1;
1331 ops[0].watch.cookie = event->cookie;
1332 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1333 ops[0].watch.timeout = 12;
1334
1335 ret = rbd_req_sync_op(dev, NULL,
1336 CEPH_NOSNAP,
1337 0,
1338 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1339 ops,
1340 1, obj, 0, 0, NULL, NULL, NULL);
1341 if (ret < 0)
1342 goto fail_event;
1343
1344 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1345 dout("ceph_osdc_wait_event returned %d\n", ret);
1346 rbd_destroy_ops(ops);
1347 return 0;
1348
1349fail_event:
1350 ceph_osdc_cancel_event(event);
1351fail:
1352 rbd_destroy_ops(ops);
1353 return ret;
1354}
1355
602adf40
YS
1356/*
1357 * Request sync osd read
1358 */
1359static int rbd_req_sync_exec(struct rbd_device *dev,
1360 const char *obj,
1361 const char *cls,
1362 const char *method,
1363 const char *data,
59c2be1e
YS
1364 int len,
1365 u64 *ver)
602adf40
YS
1366{
1367 struct ceph_osd_req_op *ops;
1368 int cls_len = strlen(cls);
1369 int method_len = strlen(method);
1370 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1371 cls_len + method_len + len);
1372 if (ret < 0)
1373 return ret;
1374
1375 ops[0].cls.class_name = cls;
1376 ops[0].cls.class_len = (__u8)cls_len;
1377 ops[0].cls.method_name = method;
1378 ops[0].cls.method_len = (__u8)method_len;
1379 ops[0].cls.argc = 0;
1380 ops[0].cls.indata = data;
1381 ops[0].cls.indata_len = len;
1382
1383 ret = rbd_req_sync_op(dev, NULL,
1384 CEPH_NOSNAP,
1385 0,
1386 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1387 ops,
59c2be1e 1388 1, obj, 0, 0, NULL, NULL, ver);
602adf40
YS
1389
1390 rbd_destroy_ops(ops);
1391
1392 dout("cls_exec returned %d\n", ret);
1393 return ret;
1394}
1395
1fec7093
YS
1396static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1397{
1398 struct rbd_req_coll *coll =
1399 kzalloc(sizeof(struct rbd_req_coll) +
1400 sizeof(struct rbd_req_status) * num_reqs,
1401 GFP_ATOMIC);
1402
1403 if (!coll)
1404 return NULL;
1405 coll->total = num_reqs;
1406 kref_init(&coll->kref);
1407 return coll;
1408}
1409
602adf40
YS
1410/*
1411 * block device queue callback
1412 */
1413static void rbd_rq_fn(struct request_queue *q)
1414{
1415 struct rbd_device *rbd_dev = q->queuedata;
1416 struct request *rq;
1417 struct bio_pair *bp = NULL;
1418
1419 rq = blk_fetch_request(q);
1420
1421 while (1) {
1422 struct bio *bio;
1423 struct bio *rq_bio, *next_bio = NULL;
1424 bool do_write;
1425 int size, op_size = 0;
1426 u64 ofs;
1fec7093
YS
1427 int num_segs, cur_seg = 0;
1428 struct rbd_req_coll *coll;
602adf40
YS
1429
1430 /* peek at request from block layer */
1431 if (!rq)
1432 break;
1433
1434 dout("fetched request\n");
1435
1436 /* filter out block requests we don't understand */
1437 if ((rq->cmd_type != REQ_TYPE_FS)) {
1438 __blk_end_request_all(rq, 0);
1439 goto next;
1440 }
1441
1442 /* deduce our operation (read, write) */
1443 do_write = (rq_data_dir(rq) == WRITE);
1444
1445 size = blk_rq_bytes(rq);
1446 ofs = blk_rq_pos(rq) * 512ULL;
1447 rq_bio = rq->bio;
1448 if (do_write && rbd_dev->read_only) {
1449 __blk_end_request_all(rq, -EROFS);
1450 goto next;
1451 }
1452
1453 spin_unlock_irq(q->queue_lock);
1454
1455 dout("%s 0x%x bytes at 0x%llx\n",
1456 do_write ? "write" : "read",
1457 size, blk_rq_pos(rq) * 512ULL);
1458
1fec7093
YS
1459 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1460 coll = rbd_alloc_coll(num_segs);
1461 if (!coll) {
1462 spin_lock_irq(q->queue_lock);
1463 __blk_end_request_all(rq, -ENOMEM);
1464 goto next;
1465 }
1466
602adf40
YS
1467 do {
1468 /* a bio clone to be passed down to OSD req */
1469 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1470 op_size = rbd_get_segment(&rbd_dev->header,
1471 rbd_dev->header.block_name,
1472 ofs, size,
1473 NULL, NULL);
1fec7093 1474 kref_get(&coll->kref);
602adf40
YS
1475 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1476 op_size, GFP_ATOMIC);
1477 if (!bio) {
1fec7093
YS
1478 rbd_coll_end_req_index(rq, coll, cur_seg,
1479 -ENOMEM, op_size);
1480 goto next_seg;
602adf40
YS
1481 }
1482
1fec7093 1483
602adf40
YS
1484 /* init OSD command: write or read */
1485 if (do_write)
1486 rbd_req_write(rq, rbd_dev,
1487 rbd_dev->header.snapc,
1488 ofs,
1fec7093
YS
1489 op_size, bio,
1490 coll, cur_seg);
602adf40
YS
1491 else
1492 rbd_req_read(rq, rbd_dev,
1493 cur_snap_id(rbd_dev),
1494 ofs,
1fec7093
YS
1495 op_size, bio,
1496 coll, cur_seg);
602adf40 1497
1fec7093 1498next_seg:
602adf40
YS
1499 size -= op_size;
1500 ofs += op_size;
1501
1fec7093 1502 cur_seg++;
602adf40
YS
1503 rq_bio = next_bio;
1504 } while (size > 0);
1fec7093 1505 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1506
1507 if (bp)
1508 bio_pair_release(bp);
602adf40
YS
1509 spin_lock_irq(q->queue_lock);
1510next:
1511 rq = blk_fetch_request(q);
1512 }
1513}
1514
1515/*
1516 * a queue callback. Makes sure that we don't create a bio that spans across
1517 * multiple osd objects. One exception would be with a single page bios,
1518 * which we handle later at bio_chain_clone
1519 */
1520static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1521 struct bio_vec *bvec)
1522{
1523 struct rbd_device *rbd_dev = q->queuedata;
1524 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1525 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1526 unsigned int bio_sectors = bmd->bi_size >> 9;
1527 int max;
1528
1529 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1530 + bio_sectors)) << 9;
1531 if (max < 0)
1532 max = 0; /* bio_add cannot handle a negative return */
1533 if (max <= bvec->bv_len && bio_sectors == 0)
1534 return bvec->bv_len;
1535 return max;
1536}
1537
1538static void rbd_free_disk(struct rbd_device *rbd_dev)
1539{
1540 struct gendisk *disk = rbd_dev->disk;
1541
1542 if (!disk)
1543 return;
1544
1545 rbd_header_free(&rbd_dev->header);
1546
1547 if (disk->flags & GENHD_FL_UP)
1548 del_gendisk(disk);
1549 if (disk->queue)
1550 blk_cleanup_queue(disk->queue);
1551 put_disk(disk);
1552}
1553
1554/*
1555 * reload the ondisk the header
1556 */
1557static int rbd_read_header(struct rbd_device *rbd_dev,
1558 struct rbd_image_header *header)
1559{
1560 ssize_t rc;
1561 struct rbd_image_header_ondisk *dh;
1562 int snap_count = 0;
1563 u64 snap_names_len = 0;
59c2be1e 1564 u64 ver;
602adf40
YS
1565
1566 while (1) {
1567 int len = sizeof(*dh) +
1568 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1569 snap_names_len;
1570
1571 rc = -ENOMEM;
1572 dh = kmalloc(len, GFP_KERNEL);
1573 if (!dh)
1574 return -ENOMEM;
1575
1576 rc = rbd_req_sync_read(rbd_dev,
1577 NULL, CEPH_NOSNAP,
1578 rbd_dev->obj_md_name,
1579 0, len,
59c2be1e 1580 (char *)dh, &ver);
602adf40
YS
1581 if (rc < 0)
1582 goto out_dh;
1583
1584 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
81e759fb
JD
1585 if (rc < 0) {
1586 if (rc == -ENXIO) {
1587 pr_warning("unrecognized header format"
1588 " for image %s", rbd_dev->obj);
1589 }
602adf40 1590 goto out_dh;
81e759fb 1591 }
602adf40
YS
1592
1593 if (snap_count != header->total_snaps) {
1594 snap_count = header->total_snaps;
1595 snap_names_len = header->snap_names_len;
1596 rbd_header_free(header);
1597 kfree(dh);
1598 continue;
1599 }
1600 break;
1601 }
59c2be1e 1602 header->obj_version = ver;
602adf40
YS
1603
1604out_dh:
1605 kfree(dh);
1606 return rc;
1607}
1608
1609/*
1610 * create a snapshot
1611 */
1612static int rbd_header_add_snap(struct rbd_device *dev,
1613 const char *snap_name,
1614 gfp_t gfp_flags)
1615{
1616 int name_len = strlen(snap_name);
1617 u64 new_snapid;
1618 int ret;
916d4d67 1619 void *data, *p, *e;
59c2be1e 1620 u64 ver;
1dbb4399 1621 struct ceph_mon_client *monc;
602adf40
YS
1622
1623 /* we should create a snapshot only if we're pointing at the head */
1624 if (dev->cur_snap)
1625 return -EINVAL;
1626
1dbb4399
AE
1627 monc = &dev->rbd_client->client->monc;
1628 ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
602adf40
YS
1629 dout("created snapid=%lld\n", new_snapid);
1630 if (ret < 0)
1631 return ret;
1632
1633 data = kmalloc(name_len + 16, gfp_flags);
1634 if (!data)
1635 return -ENOMEM;
1636
916d4d67
SW
1637 p = data;
1638 e = data + name_len + 16;
602adf40 1639
916d4d67
SW
1640 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1641 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40
YS
1642
1643 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
916d4d67 1644 data, p - data, &ver);
602adf40 1645
916d4d67 1646 kfree(data);
602adf40
YS
1647
1648 if (ret < 0)
1649 return ret;
1650
1651 dev->header.snapc->seq = new_snapid;
1652
1653 return 0;
1654bad:
1655 return -ERANGE;
1656}
1657
dfc5606d
YS
1658static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1659{
1660 struct rbd_snap *snap;
1661
1662 while (!list_empty(&rbd_dev->snaps)) {
1663 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1664 __rbd_remove_snap_dev(rbd_dev, snap);
1665 }
1666}
1667
602adf40
YS
1668/*
1669 * only read the first part of the ondisk header, without the snaps info
1670 */
dfc5606d 1671static int __rbd_update_snaps(struct rbd_device *rbd_dev)
602adf40
YS
1672{
1673 int ret;
1674 struct rbd_image_header h;
1675 u64 snap_seq;
59c2be1e 1676 int follow_seq = 0;
602adf40
YS
1677
1678 ret = rbd_read_header(rbd_dev, &h);
1679 if (ret < 0)
1680 return ret;
1681
9db4b3e3
SW
1682 /* resized? */
1683 set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1684
602adf40
YS
1685 down_write(&rbd_dev->header.snap_rwsem);
1686
1687 snap_seq = rbd_dev->header.snapc->seq;
59c2be1e
YS
1688 if (rbd_dev->header.total_snaps &&
1689 rbd_dev->header.snapc->snaps[0] == snap_seq)
1690 /* pointing at the head, will need to follow that
1691 if head moves */
1692 follow_seq = 1;
602adf40
YS
1693
1694 kfree(rbd_dev->header.snapc);
1695 kfree(rbd_dev->header.snap_names);
1696 kfree(rbd_dev->header.snap_sizes);
1697
1698 rbd_dev->header.total_snaps = h.total_snaps;
1699 rbd_dev->header.snapc = h.snapc;
1700 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1701 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1702 rbd_dev->header.snap_sizes = h.snap_sizes;
59c2be1e
YS
1703 if (follow_seq)
1704 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1705 else
1706 rbd_dev->header.snapc->seq = snap_seq;
602adf40 1707
dfc5606d
YS
1708 ret = __rbd_init_snaps_header(rbd_dev);
1709
602adf40
YS
1710 up_write(&rbd_dev->header.snap_rwsem);
1711
dfc5606d 1712 return ret;
602adf40
YS
1713}
1714
1715static int rbd_init_disk(struct rbd_device *rbd_dev)
1716{
1717 struct gendisk *disk;
1718 struct request_queue *q;
1719 int rc;
1720 u64 total_size = 0;
1721
1722 /* contact OSD, request size info about the object being mapped */
1723 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1724 if (rc)
1725 return rc;
1726
dfc5606d
YS
1727 /* no need to lock here, as rbd_dev is not registered yet */
1728 rc = __rbd_init_snaps_header(rbd_dev);
1729 if (rc)
1730 return rc;
1731
cc9d734c 1732 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1733 if (rc)
1734 return rc;
1735
1736 /* create gendisk info */
1737 rc = -ENOMEM;
1738 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1739 if (!disk)
1740 goto out;
1741
aedfec59
SW
1742 snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1743 rbd_dev->id);
602adf40
YS
1744 disk->major = rbd_dev->major;
1745 disk->first_minor = 0;
1746 disk->fops = &rbd_bd_ops;
1747 disk->private_data = rbd_dev;
1748
1749 /* init rq */
1750 rc = -ENOMEM;
1751 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1752 if (!q)
1753 goto out_disk;
029bcbd8
JD
1754
1755 /* set io sizes to object size */
1756 blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1757 blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1758 blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1759 blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1760
602adf40
YS
1761 blk_queue_merge_bvec(q, rbd_merge_bvec);
1762 disk->queue = q;
1763
1764 q->queuedata = rbd_dev;
1765
1766 rbd_dev->disk = disk;
1767 rbd_dev->q = q;
1768
1769 /* finally, announce the disk to the world */
1770 set_capacity(disk, total_size / 512ULL);
1771 add_disk(disk);
1772
1773 pr_info("%s: added with size 0x%llx\n",
1774 disk->disk_name, (unsigned long long)total_size);
1775 return 0;
1776
1777out_disk:
1778 put_disk(disk);
1779out:
1780 return rc;
1781}
1782
dfc5606d
YS
1783/*
1784 sysfs
1785*/
1786
1787static ssize_t rbd_size_show(struct device *dev,
1788 struct device_attribute *attr, char *buf)
1789{
1790 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1791
1792 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1793}
1794
1795static ssize_t rbd_major_show(struct device *dev,
1796 struct device_attribute *attr, char *buf)
1797{
1798 struct rbd_device *rbd_dev = dev_to_rbd(dev);
602adf40 1799
dfc5606d
YS
1800 return sprintf(buf, "%d\n", rbd_dev->major);
1801}
1802
1803static ssize_t rbd_client_id_show(struct device *dev,
1804 struct device_attribute *attr, char *buf)
602adf40 1805{
dfc5606d
YS
1806 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1807
1dbb4399
AE
1808 return sprintf(buf, "client%lld\n",
1809 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1810}
1811
dfc5606d
YS
1812static ssize_t rbd_pool_show(struct device *dev,
1813 struct device_attribute *attr, char *buf)
602adf40 1814{
dfc5606d
YS
1815 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1816
1817 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1818}
1819
1820static ssize_t rbd_name_show(struct device *dev,
1821 struct device_attribute *attr, char *buf)
1822{
1823 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1824
1825 return sprintf(buf, "%s\n", rbd_dev->obj);
1826}
1827
1828static ssize_t rbd_snap_show(struct device *dev,
1829 struct device_attribute *attr,
1830 char *buf)
1831{
1832 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1833
1834 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1835}
1836
1837static ssize_t rbd_image_refresh(struct device *dev,
1838 struct device_attribute *attr,
1839 const char *buf,
1840 size_t size)
1841{
1842 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1843 int rc;
1844 int ret = size;
602adf40
YS
1845
1846 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1847
dfc5606d
YS
1848 rc = __rbd_update_snaps(rbd_dev);
1849 if (rc < 0)
1850 ret = rc;
602adf40 1851
dfc5606d
YS
1852 mutex_unlock(&ctl_mutex);
1853 return ret;
1854}
602adf40 1855
dfc5606d
YS
1856static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1857static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1858static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1859static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1860static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1861static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1862static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1863static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1864
1865static struct attribute *rbd_attrs[] = {
1866 &dev_attr_size.attr,
1867 &dev_attr_major.attr,
1868 &dev_attr_client_id.attr,
1869 &dev_attr_pool.attr,
1870 &dev_attr_name.attr,
1871 &dev_attr_current_snap.attr,
1872 &dev_attr_refresh.attr,
1873 &dev_attr_create_snap.attr,
dfc5606d
YS
1874 NULL
1875};
1876
1877static struct attribute_group rbd_attr_group = {
1878 .attrs = rbd_attrs,
1879};
1880
1881static const struct attribute_group *rbd_attr_groups[] = {
1882 &rbd_attr_group,
1883 NULL
1884};
1885
1886static void rbd_sysfs_dev_release(struct device *dev)
1887{
1888}
1889
1890static struct device_type rbd_device_type = {
1891 .name = "rbd",
1892 .groups = rbd_attr_groups,
1893 .release = rbd_sysfs_dev_release,
1894};
1895
1896
1897/*
1898 sysfs - snapshots
1899*/
1900
1901static ssize_t rbd_snap_size_show(struct device *dev,
1902 struct device_attribute *attr,
1903 char *buf)
1904{
1905 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1906
1907 return sprintf(buf, "%lld\n", (long long)snap->size);
1908}
1909
1910static ssize_t rbd_snap_id_show(struct device *dev,
1911 struct device_attribute *attr,
1912 char *buf)
1913{
1914 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1915
1916 return sprintf(buf, "%lld\n", (long long)snap->id);
1917}
1918
1919static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1920static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1921
1922static struct attribute *rbd_snap_attrs[] = {
1923 &dev_attr_snap_size.attr,
1924 &dev_attr_snap_id.attr,
1925 NULL,
1926};
1927
1928static struct attribute_group rbd_snap_attr_group = {
1929 .attrs = rbd_snap_attrs,
1930};
1931
1932static void rbd_snap_dev_release(struct device *dev)
1933{
1934 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1935 kfree(snap->name);
1936 kfree(snap);
1937}
1938
1939static const struct attribute_group *rbd_snap_attr_groups[] = {
1940 &rbd_snap_attr_group,
1941 NULL
1942};
1943
1944static struct device_type rbd_snap_device_type = {
1945 .groups = rbd_snap_attr_groups,
1946 .release = rbd_snap_dev_release,
1947};
1948
1949static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1950 struct rbd_snap *snap)
1951{
1952 list_del(&snap->node);
1953 device_unregister(&snap->dev);
1954}
1955
1956static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1957 struct rbd_snap *snap,
1958 struct device *parent)
1959{
1960 struct device *dev = &snap->dev;
1961 int ret;
1962
1963 dev->type = &rbd_snap_device_type;
1964 dev->parent = parent;
1965 dev->release = rbd_snap_dev_release;
1966 dev_set_name(dev, "snap_%s", snap->name);
1967 ret = device_register(dev);
1968
1969 return ret;
1970}
1971
1972static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1973 int i, const char *name,
1974 struct rbd_snap **snapp)
1975{
1976 int ret;
1977 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1978 if (!snap)
1979 return -ENOMEM;
1980 snap->name = kstrdup(name, GFP_KERNEL);
1981 snap->size = rbd_dev->header.snap_sizes[i];
1982 snap->id = rbd_dev->header.snapc->snaps[i];
1983 if (device_is_registered(&rbd_dev->dev)) {
1984 ret = rbd_register_snap_dev(rbd_dev, snap,
1985 &rbd_dev->dev);
1986 if (ret < 0)
1987 goto err;
1988 }
1989 *snapp = snap;
1990 return 0;
1991err:
1992 kfree(snap->name);
1993 kfree(snap);
1994 return ret;
1995}
1996
1997/*
1998 * search for the previous snap in a null delimited string list
1999 */
2000const char *rbd_prev_snap_name(const char *name, const char *start)
2001{
2002 if (name < start + 2)
2003 return NULL;
2004
2005 name -= 2;
2006 while (*name) {
2007 if (name == start)
2008 return start;
2009 name--;
2010 }
2011 return name + 1;
2012}
2013
2014/*
2015 * compare the old list of snapshots that we have to what's in the header
2016 * and update it accordingly. Note that the header holds the snapshots
2017 * in a reverse order (from newest to oldest) and we need to go from
2018 * older to new so that we don't get a duplicate snap name when
2019 * doing the process (e.g., removed snapshot and recreated a new
2020 * one with the same name.
2021 */
2022static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2023{
2024 const char *name, *first_name;
2025 int i = rbd_dev->header.total_snaps;
2026 struct rbd_snap *snap, *old_snap = NULL;
2027 int ret;
2028 struct list_head *p, *n;
2029
2030 first_name = rbd_dev->header.snap_names;
2031 name = first_name + rbd_dev->header.snap_names_len;
2032
2033 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2034 u64 cur_id;
2035
2036 old_snap = list_entry(p, struct rbd_snap, node);
2037
2038 if (i)
2039 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2040
2041 if (!i || old_snap->id < cur_id) {
2042 /* old_snap->id was skipped, thus was removed */
2043 __rbd_remove_snap_dev(rbd_dev, old_snap);
2044 continue;
2045 }
2046 if (old_snap->id == cur_id) {
2047 /* we have this snapshot already */
2048 i--;
2049 name = rbd_prev_snap_name(name, first_name);
2050 continue;
2051 }
2052 for (; i > 0;
2053 i--, name = rbd_prev_snap_name(name, first_name)) {
2054 if (!name) {
2055 WARN_ON(1);
2056 return -EINVAL;
2057 }
2058 cur_id = rbd_dev->header.snapc->snaps[i];
2059 /* snapshot removal? handle it above */
2060 if (cur_id >= old_snap->id)
2061 break;
2062 /* a new snapshot */
2063 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2064 if (ret < 0)
2065 return ret;
2066
2067 /* note that we add it backward so using n and not p */
2068 list_add(&snap->node, n);
2069 p = &snap->node;
2070 }
2071 }
2072 /* we're done going over the old snap list, just add what's left */
2073 for (; i > 0; i--) {
2074 name = rbd_prev_snap_name(name, first_name);
2075 if (!name) {
2076 WARN_ON(1);
2077 return -EINVAL;
2078 }
2079 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2080 if (ret < 0)
2081 return ret;
2082 list_add(&snap->node, &rbd_dev->snaps);
2083 }
2084
2085 return 0;
2086}
2087
2088
2089static void rbd_root_dev_release(struct device *dev)
2090{
2091}
2092
2093static struct device rbd_root_dev = {
2094 .init_name = "rbd",
2095 .release = rbd_root_dev_release,
2096};
2097
2098static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2099{
2100 int ret = -ENOMEM;
2101 struct device *dev;
2102 struct rbd_snap *snap;
2103
2104 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2105 dev = &rbd_dev->dev;
2106
2107 dev->bus = &rbd_bus_type;
2108 dev->type = &rbd_device_type;
2109 dev->parent = &rbd_root_dev;
2110 dev->release = rbd_dev_release;
2111 dev_set_name(dev, "%d", rbd_dev->id);
2112 ret = device_register(dev);
2113 if (ret < 0)
2114 goto done_free;
2115
2116 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2117 ret = rbd_register_snap_dev(rbd_dev, snap,
2118 &rbd_dev->dev);
2119 if (ret < 0)
602adf40
YS
2120 break;
2121 }
2122
2123 mutex_unlock(&ctl_mutex);
dfc5606d
YS
2124 return 0;
2125done_free:
2126 mutex_unlock(&ctl_mutex);
2127 return ret;
602adf40
YS
2128}
2129
dfc5606d
YS
2130static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2131{
2132 device_unregister(&rbd_dev->dev);
2133}
2134
59c2be1e
YS
2135static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2136{
2137 int ret, rc;
2138
2139 do {
2140 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2141 rbd_dev->header.obj_version);
2142 if (ret == -ERANGE) {
2143 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2144 rc = __rbd_update_snaps(rbd_dev);
2145 mutex_unlock(&ctl_mutex);
2146 if (rc < 0)
2147 return rc;
2148 }
2149 } while (ret == -ERANGE);
2150
2151 return ret;
2152}
2153
1ddbe94e
AE
2154static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2155
2156/*
2157 * Get a unique rbd identifier. The minimum rbd id is 1.
2158 */
b7f23c36
AE
2159static int rbd_id_get(void)
2160{
1ddbe94e
AE
2161 return atomic64_inc_return(&rbd_id_max);
2162}
b7f23c36 2163
1ddbe94e
AE
2164/*
2165 * Record that an rbd identifier is no longer in use.
2166 */
2167static void rbd_id_put(int rbd_id)
2168{
2169 BUG_ON(rbd_id < 1);
b7f23c36 2170
1ddbe94e
AE
2171 /*
2172 * New id's are always one more than the current maximum.
2173 * If the id being "put" *is* that maximum, decrement the
2174 * maximum so the next one requested just reuses this one.
2175 */
2176 atomic64_cmpxchg(&rbd_id_max, rbd_id, rbd_id - 1);
b7f23c36
AE
2177}
2178
59c2be1e
YS
2179static ssize_t rbd_add(struct bus_type *bus,
2180 const char *buf,
2181 size_t count)
602adf40
YS
2182{
2183 struct ceph_osd_client *osdc;
2184 struct rbd_device *rbd_dev;
2185 ssize_t rc = -ENOMEM;
b7f23c36 2186 int irc;
602adf40
YS
2187 char *mon_dev_name;
2188 char *options;
2189
2190 if (!try_module_get(THIS_MODULE))
2191 return -ENODEV;
2192
2193 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2194 if (!mon_dev_name)
2195 goto err_out_mod;
2196
2197 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2198 if (!options)
2199 goto err_mon_dev;
2200
2201 /* new rbd_device object */
2202 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2203 if (!rbd_dev)
2204 goto err_out_opt;
2205
2206 /* static rbd_device initialization */
2207 spin_lock_init(&rbd_dev->lock);
2208 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2209 INIT_LIST_HEAD(&rbd_dev->snaps);
602adf40 2210
0e805a1d
AE
2211 init_rwsem(&rbd_dev->header.snap_rwsem);
2212
1ddbe94e 2213 /* generate unique id: one more than highest used so far */
b7f23c36 2214 rbd_dev->id = rbd_id_get();
602adf40
YS
2215
2216 /* add to global list */
e124a82f 2217 spin_lock(&rbd_dev_list_lock);
602adf40 2218 list_add_tail(&rbd_dev->node, &rbd_dev_list);
e124a82f 2219 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2220
2221 /* parse add command */
2222 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2223 "%" __stringify(RBD_MAX_OPT_LEN) "s "
2224 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2225 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2226 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2227 mon_dev_name, options, rbd_dev->pool_name,
2228 rbd_dev->obj, rbd_dev->snap_name) < 4) {
2229 rc = -EINVAL;
2230 goto err_out_slot;
2231 }
2232
2233 if (rbd_dev->snap_name[0] == 0)
cc9d734c
JD
2234 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2235 sizeof (RBD_SNAP_HEAD_NAME));
602adf40
YS
2236
2237 rbd_dev->obj_len = strlen(rbd_dev->obj);
2238 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2239 rbd_dev->obj, RBD_SUFFIX);
2240
2241 /* initialize rest of new object */
2242 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
e124a82f
AE
2243
2244 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
602adf40 2245 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
e124a82f
AE
2246 mutex_unlock(&ctl_mutex);
2247
602adf40
YS
2248 if (rc < 0)
2249 goto err_out_slot;
2250
602adf40 2251 /* pick the pool */
1dbb4399 2252 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2253 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2254 if (rc < 0)
2255 goto err_out_client;
2256 rbd_dev->poolid = rc;
2257
2258 /* register our block device */
2259 irc = register_blkdev(0, rbd_dev->name);
2260 if (irc < 0) {
2261 rc = irc;
2262 goto err_out_client;
2263 }
2264 rbd_dev->major = irc;
2265
dfc5606d
YS
2266 rc = rbd_bus_add_dev(rbd_dev);
2267 if (rc)
766fc439
YS
2268 goto err_out_blkdev;
2269
602adf40
YS
2270 /* set up and announce blkdev mapping */
2271 rc = rbd_init_disk(rbd_dev);
2272 if (rc)
766fc439 2273 goto err_out_bus;
602adf40 2274
59c2be1e
YS
2275 rc = rbd_init_watch_dev(rbd_dev);
2276 if (rc)
2277 goto err_out_bus;
2278
602adf40
YS
2279 return count;
2280
766fc439 2281err_out_bus:
e124a82f 2282 spin_lock(&rbd_dev_list_lock);
766fc439 2283 list_del_init(&rbd_dev->node);
e124a82f 2284 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2285 rbd_id_put(target_id);
766fc439
YS
2286
2287 /* this will also clean up rest of rbd_dev stuff */
2288
2289 rbd_bus_del_dev(rbd_dev);
2290 kfree(options);
2291 kfree(mon_dev_name);
2292 return rc;
2293
602adf40
YS
2294err_out_blkdev:
2295 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2296err_out_client:
2297 rbd_put_client(rbd_dev);
602adf40 2298err_out_slot:
e124a82f 2299 spin_lock(&rbd_dev_list_lock);
602adf40 2300 list_del_init(&rbd_dev->node);
e124a82f 2301 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2302 rbd_id_put(target_id);
602adf40
YS
2303
2304 kfree(rbd_dev);
2305err_out_opt:
2306 kfree(options);
2307err_mon_dev:
2308 kfree(mon_dev_name);
2309err_out_mod:
2310 dout("Error adding device %s\n", buf);
2311 module_put(THIS_MODULE);
2312 return rc;
2313}
2314
2315static struct rbd_device *__rbd_get_dev(unsigned long id)
2316{
2317 struct list_head *tmp;
2318 struct rbd_device *rbd_dev;
2319
e124a82f 2320 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2321 list_for_each(tmp, &rbd_dev_list) {
2322 rbd_dev = list_entry(tmp, struct rbd_device, node);
e124a82f
AE
2323 if (rbd_dev->id == id) {
2324 spin_unlock(&rbd_dev_list_lock);
602adf40 2325 return rbd_dev;
e124a82f 2326 }
602adf40 2327 }
e124a82f 2328 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2329 return NULL;
2330}
2331
dfc5606d 2332static void rbd_dev_release(struct device *dev)
602adf40 2333{
dfc5606d
YS
2334 struct rbd_device *rbd_dev =
2335 container_of(dev, struct rbd_device, dev);
602adf40 2336
1dbb4399
AE
2337 if (rbd_dev->watch_request) {
2338 struct ceph_client *client = rbd_dev->rbd_client->client;
2339
2340 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2341 rbd_dev->watch_request);
1dbb4399 2342 }
59c2be1e 2343 if (rbd_dev->watch_event)
79e3057c 2344 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
59c2be1e 2345
602adf40
YS
2346 rbd_put_client(rbd_dev);
2347
2348 /* clean up and free blkdev */
2349 rbd_free_disk(rbd_dev);
2350 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2351 kfree(rbd_dev);
2352
2353 /* release module ref */
2354 module_put(THIS_MODULE);
602adf40
YS
2355}
2356
dfc5606d
YS
2357static ssize_t rbd_remove(struct bus_type *bus,
2358 const char *buf,
2359 size_t count)
602adf40
YS
2360{
2361 struct rbd_device *rbd_dev = NULL;
2362 int target_id, rc;
2363 unsigned long ul;
2364 int ret = count;
2365
2366 rc = strict_strtoul(buf, 10, &ul);
2367 if (rc)
2368 return rc;
2369
2370 /* convert to int; abort if we lost anything in the conversion */
2371 target_id = (int) ul;
2372 if (target_id != ul)
2373 return -EINVAL;
2374
2375 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2376
2377 rbd_dev = __rbd_get_dev(target_id);
2378 if (!rbd_dev) {
2379 ret = -ENOENT;
2380 goto done;
2381 }
2382
e124a82f 2383 spin_lock(&rbd_dev_list_lock);
dfc5606d 2384 list_del_init(&rbd_dev->node);
e124a82f
AE
2385 spin_unlock(&rbd_dev_list_lock);
2386
1ddbe94e 2387 rbd_id_put(target_id);
dfc5606d
YS
2388
2389 __rbd_remove_all_snaps(rbd_dev);
2390 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2391
2392done:
2393 mutex_unlock(&ctl_mutex);
2394 return ret;
2395}
2396
dfc5606d
YS
2397static ssize_t rbd_snap_add(struct device *dev,
2398 struct device_attribute *attr,
2399 const char *buf,
2400 size_t count)
602adf40 2401{
dfc5606d
YS
2402 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2403 int ret;
2404 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2405 if (!name)
2406 return -ENOMEM;
2407
dfc5606d 2408 snprintf(name, count, "%s", buf);
602adf40
YS
2409
2410 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2411
602adf40
YS
2412 ret = rbd_header_add_snap(rbd_dev,
2413 name, GFP_KERNEL);
2414 if (ret < 0)
59c2be1e 2415 goto err_unlock;
602adf40 2416
dfc5606d 2417 ret = __rbd_update_snaps(rbd_dev);
602adf40 2418 if (ret < 0)
59c2be1e
YS
2419 goto err_unlock;
2420
2421 /* shouldn't hold ctl_mutex when notifying.. notify might
2422 trigger a watch callback that would need to get that mutex */
2423 mutex_unlock(&ctl_mutex);
2424
2425 /* make a best effort, don't error if failed */
2426 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
602adf40
YS
2427
2428 ret = count;
59c2be1e
YS
2429 kfree(name);
2430 return ret;
2431
2432err_unlock:
602adf40 2433 mutex_unlock(&ctl_mutex);
602adf40
YS
2434 kfree(name);
2435 return ret;
2436}
2437
dfc5606d
YS
2438static struct bus_attribute rbd_bus_attrs[] = {
2439 __ATTR(add, S_IWUSR, NULL, rbd_add),
2440 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
602adf40
YS
2441 __ATTR_NULL
2442};
2443
2444/*
2445 * create control files in sysfs
dfc5606d 2446 * /sys/bus/rbd/...
602adf40
YS
2447 */
2448static int rbd_sysfs_init(void)
2449{
dfc5606d 2450 int ret;
602adf40 2451
dfc5606d 2452 rbd_bus_type.bus_attrs = rbd_bus_attrs;
602adf40 2453
dfc5606d 2454 ret = bus_register(&rbd_bus_type);
21079786 2455 if (ret < 0)
dfc5606d 2456 return ret;
602adf40 2457
dfc5606d 2458 ret = device_register(&rbd_root_dev);
602adf40 2459
602adf40
YS
2460 return ret;
2461}
2462
2463static void rbd_sysfs_cleanup(void)
2464{
dfc5606d
YS
2465 device_unregister(&rbd_root_dev);
2466 bus_unregister(&rbd_bus_type);
602adf40
YS
2467}
2468
2469int __init rbd_init(void)
2470{
2471 int rc;
2472
2473 rc = rbd_sysfs_init();
2474 if (rc)
2475 return rc;
602adf40
YS
2476 pr_info("loaded " DRV_NAME_LONG "\n");
2477 return 0;
2478}
2479
2480void __exit rbd_exit(void)
2481{
2482 rbd_sysfs_cleanup();
2483}
2484
2485module_init(rbd_init);
2486module_exit(rbd_exit);
2487
2488MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2489MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2490MODULE_DESCRIPTION("rados block device");
2491
2492/* following authorship retained from original osdblk.c */
2493MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2494
2495MODULE_LICENSE("GPL");