]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/rbd.c
rbd: do not duplicate ceph_client pointer in rbd_device
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
44#define DRV_NAME "rbd"
45#define DRV_NAME_LONG "rbd (rados block device)"
46
47#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
48
21079786 49#define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
602adf40
YS
50#define RBD_MAX_POOL_NAME_LEN 64
51#define RBD_MAX_SNAP_NAME_LEN 32
52#define RBD_MAX_OPT_LEN 1024
53
54#define RBD_SNAP_HEAD_NAME "-"
55
56#define DEV_NAME_LEN 32
57
59c2be1e
YS
58#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
602adf40
YS
60/*
61 * block device image metadata (in-memory version)
62 */
63struct rbd_image_header {
64 u64 image_size;
65 char block_name[32];
66 __u8 obj_order;
67 __u8 crypt_type;
68 __u8 comp_type;
69 struct rw_semaphore snap_rwsem;
70 struct ceph_snap_context *snapc;
71 size_t snap_names_len;
72 u64 snap_seq;
73 u32 total_snaps;
74
75 char *snap_names;
76 u64 *snap_sizes;
59c2be1e
YS
77
78 u64 obj_version;
79};
80
81struct rbd_options {
82 int notify_timeout;
602adf40
YS
83};
84
85/*
86 * an instance of the client. multiple devices may share a client.
87 */
88struct rbd_client {
89 struct ceph_client *client;
59c2be1e 90 struct rbd_options *rbd_opts;
602adf40
YS
91 struct kref kref;
92 struct list_head node;
93};
94
1fec7093
YS
95struct rbd_req_coll;
96
602adf40
YS
97/*
98 * a single io request
99 */
100struct rbd_request {
101 struct request *rq; /* blk layer request */
102 struct bio *bio; /* cloned bio */
103 struct page **pages; /* list of used pages */
104 u64 len;
1fec7093
YS
105 int coll_index;
106 struct rbd_req_coll *coll;
107};
108
109struct rbd_req_status {
110 int done;
111 int rc;
112 u64 bytes;
113};
114
115/*
116 * a collection of requests
117 */
118struct rbd_req_coll {
119 int total;
120 int num_done;
121 struct kref kref;
122 struct rbd_req_status status[0];
602adf40
YS
123};
124
dfc5606d
YS
125struct rbd_snap {
126 struct device dev;
127 const char *name;
128 size_t size;
129 struct list_head node;
130 u64 id;
131};
132
602adf40
YS
133/*
134 * a single device
135 */
136struct rbd_device {
137 int id; /* blkdev unique id */
138
139 int major; /* blkdev assigned major */
140 struct gendisk *disk; /* blkdev's gendisk and rq */
141 struct request_queue *q;
142
602adf40
YS
143 struct rbd_client *rbd_client;
144
145 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
146
147 spinlock_t lock; /* queue lock */
148
149 struct rbd_image_header header;
150 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
151 int obj_len;
152 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
153 char pool_name[RBD_MAX_POOL_NAME_LEN];
154 int poolid;
155
59c2be1e
YS
156 struct ceph_osd_event *watch_event;
157 struct ceph_osd_request *watch_request;
158
602adf40
YS
159 char snap_name[RBD_MAX_SNAP_NAME_LEN];
160 u32 cur_snap; /* index+1 of current snapshot within snap context
161 0 - for the head */
162 int read_only;
163
164 struct list_head node;
dfc5606d
YS
165
166 /* list of snapshots */
167 struct list_head snaps;
168
169 /* sysfs related */
170 struct device dev;
171};
172
173static struct bus_type rbd_bus_type = {
174 .name = "rbd",
602adf40
YS
175};
176
21079786 177static DEFINE_SPINLOCK(node_lock); /* protects client get/put */
602adf40 178
602adf40
YS
179static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
180static LIST_HEAD(rbd_dev_list); /* devices */
181static LIST_HEAD(rbd_client_list); /* clients */
182
dfc5606d
YS
183static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
184static void rbd_dev_release(struct device *dev);
dfc5606d
YS
185static ssize_t rbd_snap_add(struct device *dev,
186 struct device_attribute *attr,
187 const char *buf,
188 size_t count);
189static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
69932487 190 struct rbd_snap *snap);
dfc5606d
YS
191
192
193static struct rbd_device *dev_to_rbd(struct device *dev)
194{
195 return container_of(dev, struct rbd_device, dev);
196}
197
198static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
199{
200 return get_device(&rbd_dev->dev);
201}
202
203static void rbd_put_dev(struct rbd_device *rbd_dev)
204{
205 put_device(&rbd_dev->dev);
206}
602adf40 207
59c2be1e
YS
208static int __rbd_update_snaps(struct rbd_device *rbd_dev);
209
602adf40
YS
210static int rbd_open(struct block_device *bdev, fmode_t mode)
211{
212 struct gendisk *disk = bdev->bd_disk;
213 struct rbd_device *rbd_dev = disk->private_data;
214
dfc5606d
YS
215 rbd_get_dev(rbd_dev);
216
602adf40
YS
217 set_device_ro(bdev, rbd_dev->read_only);
218
219 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
220 return -EROFS;
221
222 return 0;
223}
224
dfc5606d
YS
225static int rbd_release(struct gendisk *disk, fmode_t mode)
226{
227 struct rbd_device *rbd_dev = disk->private_data;
228
229 rbd_put_dev(rbd_dev);
230
231 return 0;
232}
233
602adf40
YS
234static const struct block_device_operations rbd_bd_ops = {
235 .owner = THIS_MODULE,
236 .open = rbd_open,
dfc5606d 237 .release = rbd_release,
602adf40
YS
238};
239
240/*
241 * Initialize an rbd client instance.
242 * We own *opt.
243 */
59c2be1e
YS
244static struct rbd_client *rbd_client_create(struct ceph_options *opt,
245 struct rbd_options *rbd_opts)
602adf40
YS
246{
247 struct rbd_client *rbdc;
248 int ret = -ENOMEM;
249
250 dout("rbd_client_create\n");
251 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
252 if (!rbdc)
253 goto out_opt;
254
255 kref_init(&rbdc->kref);
256 INIT_LIST_HEAD(&rbdc->node);
257
6ab00d46 258 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
602adf40
YS
259 if (IS_ERR(rbdc->client))
260 goto out_rbdc;
28f259b7 261 opt = NULL; /* Now rbdc->client is responsible for opt */
602adf40
YS
262
263 ret = ceph_open_session(rbdc->client);
264 if (ret < 0)
265 goto out_err;
266
59c2be1e
YS
267 rbdc->rbd_opts = rbd_opts;
268
602adf40
YS
269 spin_lock(&node_lock);
270 list_add_tail(&rbdc->node, &rbd_client_list);
271 spin_unlock(&node_lock);
272
273 dout("rbd_client_create created %p\n", rbdc);
274 return rbdc;
275
276out_err:
277 ceph_destroy_client(rbdc->client);
602adf40
YS
278out_rbdc:
279 kfree(rbdc);
280out_opt:
28f259b7
VK
281 if (opt)
282 ceph_destroy_options(opt);
283 return ERR_PTR(ret);
602adf40
YS
284}
285
286/*
287 * Find a ceph client with specific addr and configuration.
288 */
289static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
290{
291 struct rbd_client *client_node;
292
293 if (opt->flags & CEPH_OPT_NOSHARE)
294 return NULL;
295
296 list_for_each_entry(client_node, &rbd_client_list, node)
297 if (ceph_compare_options(opt, client_node->client) == 0)
298 return client_node;
299 return NULL;
300}
301
59c2be1e
YS
302/*
303 * mount options
304 */
305enum {
306 Opt_notify_timeout,
307 Opt_last_int,
308 /* int args above */
309 Opt_last_string,
310 /* string args above */
311};
312
313static match_table_t rbdopt_tokens = {
314 {Opt_notify_timeout, "notify_timeout=%d"},
315 /* int args above */
316 /* string args above */
317 {-1, NULL}
318};
319
320static int parse_rbd_opts_token(char *c, void *private)
321{
322 struct rbd_options *rbdopt = private;
323 substring_t argstr[MAX_OPT_ARGS];
324 int token, intval, ret;
325
21079786 326 token = match_token(c, rbdopt_tokens, argstr);
59c2be1e
YS
327 if (token < 0)
328 return -EINVAL;
329
330 if (token < Opt_last_int) {
331 ret = match_int(&argstr[0], &intval);
332 if (ret < 0) {
333 pr_err("bad mount option arg (not int) "
334 "at '%s'\n", c);
335 return ret;
336 }
337 dout("got int token %d val %d\n", token, intval);
338 } else if (token > Opt_last_int && token < Opt_last_string) {
339 dout("got string token %d val %s\n", token,
340 argstr[0].from);
341 } else {
342 dout("got token %d\n", token);
343 }
344
345 switch (token) {
346 case Opt_notify_timeout:
347 rbdopt->notify_timeout = intval;
348 break;
349 default:
350 BUG_ON(token);
351 }
352 return 0;
353}
354
602adf40
YS
355/*
356 * Get a ceph client with specific addr and configuration, if one does
357 * not exist create it.
358 */
359static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
360 char *options)
361{
362 struct rbd_client *rbdc;
363 struct ceph_options *opt;
364 int ret;
59c2be1e
YS
365 struct rbd_options *rbd_opts;
366
367 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
368 if (!rbd_opts)
369 return -ENOMEM;
370
371 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 372
ee57741c 373 opt = ceph_parse_options(options, mon_addr,
21079786
AE
374 mon_addr + strlen(mon_addr),
375 parse_rbd_opts_token, rbd_opts);
ee57741c
AE
376 if (IS_ERR(opt)) {
377 ret = PTR_ERR(opt);
59c2be1e 378 goto done_err;
ee57741c 379 }
602adf40
YS
380
381 spin_lock(&node_lock);
382 rbdc = __rbd_client_find(opt);
383 if (rbdc) {
384 ceph_destroy_options(opt);
97bb59a0 385 kfree(rbd_opts);
602adf40
YS
386
387 /* using an existing client */
388 kref_get(&rbdc->kref);
389 rbd_dev->rbd_client = rbdc;
602adf40
YS
390 spin_unlock(&node_lock);
391 return 0;
392 }
393 spin_unlock(&node_lock);
394
59c2be1e
YS
395 rbdc = rbd_client_create(opt, rbd_opts);
396 if (IS_ERR(rbdc)) {
397 ret = PTR_ERR(rbdc);
398 goto done_err;
399 }
602adf40
YS
400
401 rbd_dev->rbd_client = rbdc;
602adf40 402 return 0;
59c2be1e
YS
403done_err:
404 kfree(rbd_opts);
405 return ret;
602adf40
YS
406}
407
408/*
409 * Destroy ceph client
d23a4b3f
AE
410 *
411 * Caller must hold node_lock.
602adf40
YS
412 */
413static void rbd_client_release(struct kref *kref)
414{
415 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
416
417 dout("rbd_release_client %p\n", rbdc);
602adf40 418 list_del(&rbdc->node);
602adf40
YS
419
420 ceph_destroy_client(rbdc->client);
59c2be1e 421 kfree(rbdc->rbd_opts);
602adf40
YS
422 kfree(rbdc);
423}
424
425/*
426 * Drop reference to ceph client node. If it's not referenced anymore, release
427 * it.
428 */
429static void rbd_put_client(struct rbd_device *rbd_dev)
430{
d23a4b3f 431 spin_lock(&node_lock);
602adf40 432 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
d23a4b3f 433 spin_unlock(&node_lock);
602adf40 434 rbd_dev->rbd_client = NULL;
602adf40
YS
435}
436
1fec7093
YS
437/*
438 * Destroy requests collection
439 */
440static void rbd_coll_release(struct kref *kref)
441{
442 struct rbd_req_coll *coll =
443 container_of(kref, struct rbd_req_coll, kref);
444
445 dout("rbd_coll_release %p\n", coll);
446 kfree(coll);
447}
602adf40
YS
448
449/*
450 * Create a new header structure, translate header format from the on-disk
451 * header.
452 */
453static int rbd_header_from_disk(struct rbd_image_header *header,
454 struct rbd_image_header_ondisk *ondisk,
455 int allocated_snaps,
456 gfp_t gfp_flags)
457{
458 int i;
459 u32 snap_count = le32_to_cpu(ondisk->snap_count);
460 int ret = -ENOMEM;
461
21079786 462 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
81e759fb 463 return -ENXIO;
81e759fb 464
602adf40 465 init_rwsem(&header->snap_rwsem);
602adf40
YS
466 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
467 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
21079786 468 snap_count * sizeof (*ondisk),
602adf40
YS
469 gfp_flags);
470 if (!header->snapc)
471 return -ENOMEM;
472 if (snap_count) {
473 header->snap_names = kmalloc(header->snap_names_len,
474 GFP_KERNEL);
475 if (!header->snap_names)
476 goto err_snapc;
477 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
478 GFP_KERNEL);
479 if (!header->snap_sizes)
480 goto err_names;
481 } else {
482 header->snap_names = NULL;
483 header->snap_sizes = NULL;
484 }
485 memcpy(header->block_name, ondisk->block_name,
486 sizeof(ondisk->block_name));
487
488 header->image_size = le64_to_cpu(ondisk->image_size);
489 header->obj_order = ondisk->options.order;
490 header->crypt_type = ondisk->options.crypt_type;
491 header->comp_type = ondisk->options.comp_type;
492
493 atomic_set(&header->snapc->nref, 1);
494 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
495 header->snapc->num_snaps = snap_count;
496 header->total_snaps = snap_count;
497
21079786 498 if (snap_count && allocated_snaps == snap_count) {
602adf40
YS
499 for (i = 0; i < snap_count; i++) {
500 header->snapc->snaps[i] =
501 le64_to_cpu(ondisk->snaps[i].id);
502 header->snap_sizes[i] =
503 le64_to_cpu(ondisk->snaps[i].image_size);
504 }
505
506 /* copy snapshot names */
507 memcpy(header->snap_names, &ondisk->snaps[i],
508 header->snap_names_len);
509 }
510
511 return 0;
512
513err_names:
514 kfree(header->snap_names);
515err_snapc:
516 kfree(header->snapc);
517 return ret;
518}
519
520static int snap_index(struct rbd_image_header *header, int snap_num)
521{
522 return header->total_snaps - snap_num;
523}
524
525static u64 cur_snap_id(struct rbd_device *rbd_dev)
526{
527 struct rbd_image_header *header = &rbd_dev->header;
528
529 if (!rbd_dev->cur_snap)
530 return 0;
531
532 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
533}
534
535static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
536 u64 *seq, u64 *size)
537{
538 int i;
539 char *p = header->snap_names;
540
541 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
542 if (strcmp(snap_name, p) == 0)
543 break;
544 }
545 if (i == header->total_snaps)
546 return -ENOENT;
547 if (seq)
548 *seq = header->snapc->snaps[i];
549
550 if (size)
551 *size = header->snap_sizes[i];
552
553 return i;
554}
555
556static int rbd_header_set_snap(struct rbd_device *dev,
557 const char *snap_name,
558 u64 *size)
559{
560 struct rbd_image_header *header = &dev->header;
561 struct ceph_snap_context *snapc = header->snapc;
562 int ret = -ENOENT;
563
564 down_write(&header->snap_rwsem);
565
566 if (!snap_name ||
567 !*snap_name ||
568 strcmp(snap_name, "-") == 0 ||
569 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
570 if (header->total_snaps)
571 snapc->seq = header->snap_seq;
572 else
573 snapc->seq = 0;
574 dev->cur_snap = 0;
575 dev->read_only = 0;
576 if (size)
577 *size = header->image_size;
578 } else {
579 ret = snap_by_name(header, snap_name, &snapc->seq, size);
580 if (ret < 0)
581 goto done;
582
583 dev->cur_snap = header->total_snaps - ret;
584 dev->read_only = 1;
585 }
586
587 ret = 0;
588done:
589 up_write(&header->snap_rwsem);
590 return ret;
591}
592
593static void rbd_header_free(struct rbd_image_header *header)
594{
595 kfree(header->snapc);
596 kfree(header->snap_names);
597 kfree(header->snap_sizes);
598}
599
600/*
601 * get the actual striped segment name, offset and length
602 */
603static u64 rbd_get_segment(struct rbd_image_header *header,
604 const char *block_name,
605 u64 ofs, u64 len,
606 char *seg_name, u64 *segofs)
607{
608 u64 seg = ofs >> header->obj_order;
609
610 if (seg_name)
611 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
612 "%s.%012llx", block_name, seg);
613
614 ofs = ofs & ((1 << header->obj_order) - 1);
615 len = min_t(u64, len, (1 << header->obj_order) - ofs);
616
617 if (segofs)
618 *segofs = ofs;
619
620 return len;
621}
622
1fec7093
YS
623static int rbd_get_num_segments(struct rbd_image_header *header,
624 u64 ofs, u64 len)
625{
626 u64 start_seg = ofs >> header->obj_order;
627 u64 end_seg = (ofs + len - 1) >> header->obj_order;
628 return end_seg - start_seg + 1;
629}
630
029bcbd8
JD
631/*
632 * returns the size of an object in the image
633 */
634static u64 rbd_obj_bytes(struct rbd_image_header *header)
635{
636 return 1 << header->obj_order;
637}
638
602adf40
YS
639/*
640 * bio helpers
641 */
642
643static void bio_chain_put(struct bio *chain)
644{
645 struct bio *tmp;
646
647 while (chain) {
648 tmp = chain;
649 chain = chain->bi_next;
650 bio_put(tmp);
651 }
652}
653
654/*
655 * zeros a bio chain, starting at specific offset
656 */
657static void zero_bio_chain(struct bio *chain, int start_ofs)
658{
659 struct bio_vec *bv;
660 unsigned long flags;
661 void *buf;
662 int i;
663 int pos = 0;
664
665 while (chain) {
666 bio_for_each_segment(bv, chain, i) {
667 if (pos + bv->bv_len > start_ofs) {
668 int remainder = max(start_ofs - pos, 0);
669 buf = bvec_kmap_irq(bv, &flags);
670 memset(buf + remainder, 0,
671 bv->bv_len - remainder);
85b5aaa6 672 bvec_kunmap_irq(buf, &flags);
602adf40
YS
673 }
674 pos += bv->bv_len;
675 }
676
677 chain = chain->bi_next;
678 }
679}
680
681/*
682 * bio_chain_clone - clone a chain of bios up to a certain length.
683 * might return a bio_pair that will need to be released.
684 */
685static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
686 struct bio_pair **bp,
687 int len, gfp_t gfpmask)
688{
689 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
690 int total = 0;
691
692 if (*bp) {
693 bio_pair_release(*bp);
694 *bp = NULL;
695 }
696
697 while (old_chain && (total < len)) {
698 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
699 if (!tmp)
700 goto err_out;
701
702 if (total + old_chain->bi_size > len) {
703 struct bio_pair *bp;
704
705 /*
706 * this split can only happen with a single paged bio,
707 * split_bio will BUG_ON if this is not the case
708 */
709 dout("bio_chain_clone split! total=%d remaining=%d"
710 "bi_size=%d\n",
711 (int)total, (int)len-total,
712 (int)old_chain->bi_size);
713
714 /* split the bio. We'll release it either in the next
715 call, or it will have to be released outside */
716 bp = bio_split(old_chain, (len - total) / 512ULL);
717 if (!bp)
718 goto err_out;
719
720 __bio_clone(tmp, &bp->bio1);
721
722 *next = &bp->bio2;
723 } else {
724 __bio_clone(tmp, old_chain);
725 *next = old_chain->bi_next;
726 }
727
728 tmp->bi_bdev = NULL;
729 gfpmask &= ~__GFP_WAIT;
730 tmp->bi_next = NULL;
731
732 if (!new_chain) {
733 new_chain = tail = tmp;
734 } else {
735 tail->bi_next = tmp;
736 tail = tmp;
737 }
738 old_chain = old_chain->bi_next;
739
740 total += tmp->bi_size;
741 }
742
743 BUG_ON(total < len);
744
745 if (tail)
746 tail->bi_next = NULL;
747
748 *old = old_chain;
749
750 return new_chain;
751
752err_out:
753 dout("bio_chain_clone with err\n");
754 bio_chain_put(new_chain);
755 return NULL;
756}
757
758/*
759 * helpers for osd request op vectors.
760 */
761static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
762 int num_ops,
763 int opcode,
764 u32 payload_len)
765{
766 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
767 GFP_NOIO);
768 if (!*ops)
769 return -ENOMEM;
770 (*ops)[0].op = opcode;
771 /*
772 * op extent offset and length will be set later on
773 * in calc_raw_layout()
774 */
775 (*ops)[0].payload_len = payload_len;
776 return 0;
777}
778
779static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
780{
781 kfree(ops);
782}
783
1fec7093
YS
784static void rbd_coll_end_req_index(struct request *rq,
785 struct rbd_req_coll *coll,
786 int index,
787 int ret, u64 len)
788{
789 struct request_queue *q;
790 int min, max, i;
791
792 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
793 coll, index, ret, len);
794
795 if (!rq)
796 return;
797
798 if (!coll) {
799 blk_end_request(rq, ret, len);
800 return;
801 }
802
803 q = rq->q;
804
805 spin_lock_irq(q->queue_lock);
806 coll->status[index].done = 1;
807 coll->status[index].rc = ret;
808 coll->status[index].bytes = len;
809 max = min = coll->num_done;
810 while (max < coll->total && coll->status[max].done)
811 max++;
812
813 for (i = min; i<max; i++) {
814 __blk_end_request(rq, coll->status[i].rc,
815 coll->status[i].bytes);
816 coll->num_done++;
817 kref_put(&coll->kref, rbd_coll_release);
818 }
819 spin_unlock_irq(q->queue_lock);
820}
821
822static void rbd_coll_end_req(struct rbd_request *req,
823 int ret, u64 len)
824{
825 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
826}
827
602adf40
YS
828/*
829 * Send ceph osd request
830 */
831static int rbd_do_request(struct request *rq,
832 struct rbd_device *dev,
833 struct ceph_snap_context *snapc,
834 u64 snapid,
835 const char *obj, u64 ofs, u64 len,
836 struct bio *bio,
837 struct page **pages,
838 int num_pages,
839 int flags,
840 struct ceph_osd_req_op *ops,
841 int num_reply,
1fec7093
YS
842 struct rbd_req_coll *coll,
843 int coll_index,
602adf40 844 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
845 struct ceph_msg *msg),
846 struct ceph_osd_request **linger_req,
847 u64 *ver)
602adf40
YS
848{
849 struct ceph_osd_request *req;
850 struct ceph_file_layout *layout;
851 int ret;
852 u64 bno;
853 struct timespec mtime = CURRENT_TIME;
854 struct rbd_request *req_data;
855 struct ceph_osd_request_head *reqhead;
856 struct rbd_image_header *header = &dev->header;
1dbb4399 857 struct ceph_osd_client *osdc;
602adf40 858
602adf40 859 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
860 if (!req_data) {
861 if (coll)
862 rbd_coll_end_req_index(rq, coll, coll_index,
863 -ENOMEM, len);
864 return -ENOMEM;
865 }
866
867 if (coll) {
868 req_data->coll = coll;
869 req_data->coll_index = coll_index;
870 }
602adf40 871
1fec7093 872 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
602adf40
YS
873
874 down_read(&header->snap_rwsem);
875
1dbb4399
AE
876 osdc = &dev->rbd_client->client->osdc;
877 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
878 false, GFP_NOIO, pages, bio);
4ad12621 879 if (!req) {
602adf40 880 up_read(&header->snap_rwsem);
4ad12621 881 ret = -ENOMEM;
602adf40
YS
882 goto done_pages;
883 }
884
885 req->r_callback = rbd_cb;
886
887 req_data->rq = rq;
888 req_data->bio = bio;
889 req_data->pages = pages;
890 req_data->len = len;
891
892 req->r_priv = req_data;
893
894 reqhead = req->r_request->front.iov_base;
895 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
896
897 strncpy(req->r_oid, obj, sizeof(req->r_oid));
898 req->r_oid_len = strlen(req->r_oid);
899
900 layout = &req->r_file_layout;
901 memset(layout, 0, sizeof(*layout));
902 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
903 layout->fl_stripe_count = cpu_to_le32(1);
904 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
905 layout->fl_pg_preferred = cpu_to_le32(-1);
906 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
1dbb4399
AE
907 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
908 req, ops);
602adf40
YS
909
910 ceph_osdc_build_request(req, ofs, &len,
911 ops,
912 snapc,
913 &mtime,
914 req->r_oid, req->r_oid_len);
915 up_read(&header->snap_rwsem);
916
59c2be1e 917 if (linger_req) {
1dbb4399 918 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
919 *linger_req = req;
920 }
921
1dbb4399 922 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
923 if (ret < 0)
924 goto done_err;
925
926 if (!rbd_cb) {
1dbb4399 927 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
928 if (ver)
929 *ver = le64_to_cpu(req->r_reassert_version.version);
1fec7093
YS
930 dout("reassert_ver=%lld\n",
931 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
932 ceph_osdc_put_request(req);
933 }
934 return ret;
935
936done_err:
937 bio_chain_put(req_data->bio);
938 ceph_osdc_put_request(req);
939done_pages:
1fec7093 940 rbd_coll_end_req(req_data, ret, len);
602adf40 941 kfree(req_data);
602adf40
YS
942 return ret;
943}
944
945/*
946 * Ceph osd op callback
947 */
948static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
949{
950 struct rbd_request *req_data = req->r_priv;
951 struct ceph_osd_reply_head *replyhead;
952 struct ceph_osd_op *op;
953 __s32 rc;
954 u64 bytes;
955 int read_op;
956
957 /* parse reply */
958 replyhead = msg->front.iov_base;
959 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
960 op = (void *)(replyhead + 1);
961 rc = le32_to_cpu(replyhead->result);
962 bytes = le64_to_cpu(op->extent.length);
963 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
964
965 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
966
967 if (rc == -ENOENT && read_op) {
968 zero_bio_chain(req_data->bio, 0);
969 rc = 0;
970 } else if (rc == 0 && read_op && bytes < req_data->len) {
971 zero_bio_chain(req_data->bio, bytes);
972 bytes = req_data->len;
973 }
974
1fec7093 975 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
976
977 if (req_data->bio)
978 bio_chain_put(req_data->bio);
979
980 ceph_osdc_put_request(req);
981 kfree(req_data);
982}
983
59c2be1e
YS
984static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
985{
986 ceph_osdc_put_request(req);
987}
988
602adf40
YS
989/*
990 * Do a synchronous ceph osd operation
991 */
992static int rbd_req_sync_op(struct rbd_device *dev,
993 struct ceph_snap_context *snapc,
994 u64 snapid,
995 int opcode,
996 int flags,
997 struct ceph_osd_req_op *orig_ops,
998 int num_reply,
999 const char *obj,
1000 u64 ofs, u64 len,
59c2be1e
YS
1001 char *buf,
1002 struct ceph_osd_request **linger_req,
1003 u64 *ver)
602adf40
YS
1004{
1005 int ret;
1006 struct page **pages;
1007 int num_pages;
1008 struct ceph_osd_req_op *ops = orig_ops;
1009 u32 payload_len;
1010
1011 num_pages = calc_pages_for(ofs , len);
1012 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1013 if (IS_ERR(pages))
1014 return PTR_ERR(pages);
602adf40
YS
1015
1016 if (!orig_ops) {
1017 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1018 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1019 if (ret < 0)
1020 goto done;
1021
1022 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1023 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1024 if (ret < 0)
1025 goto done_ops;
1026 }
1027 }
1028
1029 ret = rbd_do_request(NULL, dev, snapc, snapid,
1030 obj, ofs, len, NULL,
1031 pages, num_pages,
1032 flags,
1033 ops,
1034 2,
1fec7093 1035 NULL, 0,
59c2be1e
YS
1036 NULL,
1037 linger_req, ver);
602adf40
YS
1038 if (ret < 0)
1039 goto done_ops;
1040
1041 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1042 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1043
1044done_ops:
1045 if (!orig_ops)
1046 rbd_destroy_ops(ops);
1047done:
1048 ceph_release_page_vector(pages, num_pages);
1049 return ret;
1050}
1051
1052/*
1053 * Do an asynchronous ceph osd operation
1054 */
1055static int rbd_do_op(struct request *rq,
1056 struct rbd_device *rbd_dev ,
1057 struct ceph_snap_context *snapc,
1058 u64 snapid,
1059 int opcode, int flags, int num_reply,
1060 u64 ofs, u64 len,
1fec7093
YS
1061 struct bio *bio,
1062 struct rbd_req_coll *coll,
1063 int coll_index)
602adf40
YS
1064{
1065 char *seg_name;
1066 u64 seg_ofs;
1067 u64 seg_len;
1068 int ret;
1069 struct ceph_osd_req_op *ops;
1070 u32 payload_len;
1071
1072 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1073 if (!seg_name)
1074 return -ENOMEM;
1075
1076 seg_len = rbd_get_segment(&rbd_dev->header,
1077 rbd_dev->header.block_name,
1078 ofs, len,
1079 seg_name, &seg_ofs);
602adf40
YS
1080
1081 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1082
1083 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1084 if (ret < 0)
1085 goto done;
1086
1087 /* we've taken care of segment sizes earlier when we
1088 cloned the bios. We should never have a segment
1089 truncated at this point */
1090 BUG_ON(seg_len < len);
1091
1092 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1093 seg_name, seg_ofs, seg_len,
1094 bio,
1095 NULL, 0,
1096 flags,
1097 ops,
1098 num_reply,
1fec7093 1099 coll, coll_index,
59c2be1e 1100 rbd_req_cb, 0, NULL);
11f77002
SW
1101
1102 rbd_destroy_ops(ops);
602adf40
YS
1103done:
1104 kfree(seg_name);
1105 return ret;
1106}
1107
1108/*
1109 * Request async osd write
1110 */
1111static int rbd_req_write(struct request *rq,
1112 struct rbd_device *rbd_dev,
1113 struct ceph_snap_context *snapc,
1114 u64 ofs, u64 len,
1fec7093
YS
1115 struct bio *bio,
1116 struct rbd_req_coll *coll,
1117 int coll_index)
602adf40
YS
1118{
1119 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1120 CEPH_OSD_OP_WRITE,
1121 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1122 2,
1fec7093 1123 ofs, len, bio, coll, coll_index);
602adf40
YS
1124}
1125
1126/*
1127 * Request async osd read
1128 */
1129static int rbd_req_read(struct request *rq,
1130 struct rbd_device *rbd_dev,
1131 u64 snapid,
1132 u64 ofs, u64 len,
1fec7093
YS
1133 struct bio *bio,
1134 struct rbd_req_coll *coll,
1135 int coll_index)
602adf40
YS
1136{
1137 return rbd_do_op(rq, rbd_dev, NULL,
1138 (snapid ? snapid : CEPH_NOSNAP),
1139 CEPH_OSD_OP_READ,
1140 CEPH_OSD_FLAG_READ,
1141 2,
1fec7093 1142 ofs, len, bio, coll, coll_index);
602adf40
YS
1143}
1144
1145/*
1146 * Request sync osd read
1147 */
1148static int rbd_req_sync_read(struct rbd_device *dev,
1149 struct ceph_snap_context *snapc,
1150 u64 snapid,
1151 const char *obj,
1152 u64 ofs, u64 len,
59c2be1e
YS
1153 char *buf,
1154 u64 *ver)
602adf40
YS
1155{
1156 return rbd_req_sync_op(dev, NULL,
1157 (snapid ? snapid : CEPH_NOSNAP),
1158 CEPH_OSD_OP_READ,
1159 CEPH_OSD_FLAG_READ,
1160 NULL,
59c2be1e 1161 1, obj, ofs, len, buf, NULL, ver);
602adf40
YS
1162}
1163
1164/*
59c2be1e
YS
1165 * Request sync osd watch
1166 */
1167static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1168 u64 ver,
1169 u64 notify_id,
1170 const char *obj)
1171{
1172 struct ceph_osd_req_op *ops;
1173 struct page **pages = NULL;
11f77002
SW
1174 int ret;
1175
1176 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
59c2be1e
YS
1177 if (ret < 0)
1178 return ret;
1179
1180 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1181 ops[0].watch.cookie = notify_id;
1182 ops[0].watch.flag = 0;
1183
1184 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1185 obj, 0, 0, NULL,
1186 pages, 0,
1187 CEPH_OSD_FLAG_READ,
1188 ops,
1189 1,
1fec7093 1190 NULL, 0,
59c2be1e
YS
1191 rbd_simple_req_cb, 0, NULL);
1192
1193 rbd_destroy_ops(ops);
1194 return ret;
1195}
1196
1197static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1198{
1199 struct rbd_device *dev = (struct rbd_device *)data;
13143d2d
SW
1200 int rc;
1201
59c2be1e
YS
1202 if (!dev)
1203 return;
1204
1205 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1206 notify_id, (int)opcode);
1207 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
13143d2d 1208 rc = __rbd_update_snaps(dev);
59c2be1e 1209 mutex_unlock(&ctl_mutex);
13143d2d
SW
1210 if (rc)
1211 pr_warning(DRV_NAME "%d got notification but failed to update"
1212 " snaps: %d\n", dev->major, rc);
59c2be1e
YS
1213
1214 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1215}
1216
1217/*
1218 * Request sync osd watch
1219 */
1220static int rbd_req_sync_watch(struct rbd_device *dev,
1221 const char *obj,
1222 u64 ver)
1223{
1224 struct ceph_osd_req_op *ops;
1dbb4399 1225 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1226
1227 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1228 if (ret < 0)
1229 return ret;
1230
1231 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1232 (void *)dev, &dev->watch_event);
1233 if (ret < 0)
1234 goto fail;
1235
1236 ops[0].watch.ver = cpu_to_le64(ver);
1237 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1238 ops[0].watch.flag = 1;
1239
1240 ret = rbd_req_sync_op(dev, NULL,
1241 CEPH_NOSNAP,
1242 0,
1243 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1244 ops,
1245 1, obj, 0, 0, NULL,
1246 &dev->watch_request, NULL);
1247
1248 if (ret < 0)
1249 goto fail_event;
1250
1251 rbd_destroy_ops(ops);
1252 return 0;
1253
1254fail_event:
1255 ceph_osdc_cancel_event(dev->watch_event);
1256 dev->watch_event = NULL;
1257fail:
1258 rbd_destroy_ops(ops);
1259 return ret;
1260}
1261
79e3057c
YS
1262/*
1263 * Request sync osd unwatch
1264 */
1265static int rbd_req_sync_unwatch(struct rbd_device *dev,
1266 const char *obj)
1267{
1268 struct ceph_osd_req_op *ops;
1269
1270 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1271 if (ret < 0)
1272 return ret;
1273
1274 ops[0].watch.ver = 0;
1275 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1276 ops[0].watch.flag = 0;
1277
1278 ret = rbd_req_sync_op(dev, NULL,
1279 CEPH_NOSNAP,
1280 0,
1281 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1282 ops,
1283 1, obj, 0, 0, NULL, NULL, NULL);
1284
1285 rbd_destroy_ops(ops);
1286 ceph_osdc_cancel_event(dev->watch_event);
1287 dev->watch_event = NULL;
1288 return ret;
1289}
1290
59c2be1e
YS
1291struct rbd_notify_info {
1292 struct rbd_device *dev;
1293};
1294
1295static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1296{
1297 struct rbd_device *dev = (struct rbd_device *)data;
1298 if (!dev)
1299 return;
1300
1301 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1302 notify_id, (int)opcode);
1303}
1304
1305/*
1306 * Request sync osd notify
1307 */
1308static int rbd_req_sync_notify(struct rbd_device *dev,
1309 const char *obj)
1310{
1311 struct ceph_osd_req_op *ops;
1dbb4399 1312 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
59c2be1e
YS
1313 struct ceph_osd_event *event;
1314 struct rbd_notify_info info;
1315 int payload_len = sizeof(u32) + sizeof(u32);
1316 int ret;
1317
1318 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1319 if (ret < 0)
1320 return ret;
1321
1322 info.dev = dev;
1323
1324 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1325 (void *)&info, &event);
1326 if (ret < 0)
1327 goto fail;
1328
1329 ops[0].watch.ver = 1;
1330 ops[0].watch.flag = 1;
1331 ops[0].watch.cookie = event->cookie;
1332 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1333 ops[0].watch.timeout = 12;
1334
1335 ret = rbd_req_sync_op(dev, NULL,
1336 CEPH_NOSNAP,
1337 0,
1338 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1339 ops,
1340 1, obj, 0, 0, NULL, NULL, NULL);
1341 if (ret < 0)
1342 goto fail_event;
1343
1344 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1345 dout("ceph_osdc_wait_event returned %d\n", ret);
1346 rbd_destroy_ops(ops);
1347 return 0;
1348
1349fail_event:
1350 ceph_osdc_cancel_event(event);
1351fail:
1352 rbd_destroy_ops(ops);
1353 return ret;
1354}
1355
602adf40
YS
1356/*
1357 * Request sync osd read
1358 */
1359static int rbd_req_sync_exec(struct rbd_device *dev,
1360 const char *obj,
1361 const char *cls,
1362 const char *method,
1363 const char *data,
59c2be1e
YS
1364 int len,
1365 u64 *ver)
602adf40
YS
1366{
1367 struct ceph_osd_req_op *ops;
1368 int cls_len = strlen(cls);
1369 int method_len = strlen(method);
1370 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1371 cls_len + method_len + len);
1372 if (ret < 0)
1373 return ret;
1374
1375 ops[0].cls.class_name = cls;
1376 ops[0].cls.class_len = (__u8)cls_len;
1377 ops[0].cls.method_name = method;
1378 ops[0].cls.method_len = (__u8)method_len;
1379 ops[0].cls.argc = 0;
1380 ops[0].cls.indata = data;
1381 ops[0].cls.indata_len = len;
1382
1383 ret = rbd_req_sync_op(dev, NULL,
1384 CEPH_NOSNAP,
1385 0,
1386 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1387 ops,
59c2be1e 1388 1, obj, 0, 0, NULL, NULL, ver);
602adf40
YS
1389
1390 rbd_destroy_ops(ops);
1391
1392 dout("cls_exec returned %d\n", ret);
1393 return ret;
1394}
1395
1fec7093
YS
1396static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1397{
1398 struct rbd_req_coll *coll =
1399 kzalloc(sizeof(struct rbd_req_coll) +
1400 sizeof(struct rbd_req_status) * num_reqs,
1401 GFP_ATOMIC);
1402
1403 if (!coll)
1404 return NULL;
1405 coll->total = num_reqs;
1406 kref_init(&coll->kref);
1407 return coll;
1408}
1409
602adf40
YS
1410/*
1411 * block device queue callback
1412 */
1413static void rbd_rq_fn(struct request_queue *q)
1414{
1415 struct rbd_device *rbd_dev = q->queuedata;
1416 struct request *rq;
1417 struct bio_pair *bp = NULL;
1418
1419 rq = blk_fetch_request(q);
1420
1421 while (1) {
1422 struct bio *bio;
1423 struct bio *rq_bio, *next_bio = NULL;
1424 bool do_write;
1425 int size, op_size = 0;
1426 u64 ofs;
1fec7093
YS
1427 int num_segs, cur_seg = 0;
1428 struct rbd_req_coll *coll;
602adf40
YS
1429
1430 /* peek at request from block layer */
1431 if (!rq)
1432 break;
1433
1434 dout("fetched request\n");
1435
1436 /* filter out block requests we don't understand */
1437 if ((rq->cmd_type != REQ_TYPE_FS)) {
1438 __blk_end_request_all(rq, 0);
1439 goto next;
1440 }
1441
1442 /* deduce our operation (read, write) */
1443 do_write = (rq_data_dir(rq) == WRITE);
1444
1445 size = blk_rq_bytes(rq);
1446 ofs = blk_rq_pos(rq) * 512ULL;
1447 rq_bio = rq->bio;
1448 if (do_write && rbd_dev->read_only) {
1449 __blk_end_request_all(rq, -EROFS);
1450 goto next;
1451 }
1452
1453 spin_unlock_irq(q->queue_lock);
1454
1455 dout("%s 0x%x bytes at 0x%llx\n",
1456 do_write ? "write" : "read",
1457 size, blk_rq_pos(rq) * 512ULL);
1458
1fec7093
YS
1459 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1460 coll = rbd_alloc_coll(num_segs);
1461 if (!coll) {
1462 spin_lock_irq(q->queue_lock);
1463 __blk_end_request_all(rq, -ENOMEM);
1464 goto next;
1465 }
1466
602adf40
YS
1467 do {
1468 /* a bio clone to be passed down to OSD req */
1469 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1470 op_size = rbd_get_segment(&rbd_dev->header,
1471 rbd_dev->header.block_name,
1472 ofs, size,
1473 NULL, NULL);
1fec7093 1474 kref_get(&coll->kref);
602adf40
YS
1475 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1476 op_size, GFP_ATOMIC);
1477 if (!bio) {
1fec7093
YS
1478 rbd_coll_end_req_index(rq, coll, cur_seg,
1479 -ENOMEM, op_size);
1480 goto next_seg;
602adf40
YS
1481 }
1482
1fec7093 1483
602adf40
YS
1484 /* init OSD command: write or read */
1485 if (do_write)
1486 rbd_req_write(rq, rbd_dev,
1487 rbd_dev->header.snapc,
1488 ofs,
1fec7093
YS
1489 op_size, bio,
1490 coll, cur_seg);
602adf40
YS
1491 else
1492 rbd_req_read(rq, rbd_dev,
1493 cur_snap_id(rbd_dev),
1494 ofs,
1fec7093
YS
1495 op_size, bio,
1496 coll, cur_seg);
602adf40 1497
1fec7093 1498next_seg:
602adf40
YS
1499 size -= op_size;
1500 ofs += op_size;
1501
1fec7093 1502 cur_seg++;
602adf40
YS
1503 rq_bio = next_bio;
1504 } while (size > 0);
1fec7093 1505 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1506
1507 if (bp)
1508 bio_pair_release(bp);
602adf40
YS
1509 spin_lock_irq(q->queue_lock);
1510next:
1511 rq = blk_fetch_request(q);
1512 }
1513}
1514
1515/*
1516 * a queue callback. Makes sure that we don't create a bio that spans across
1517 * multiple osd objects. One exception would be with a single page bios,
1518 * which we handle later at bio_chain_clone
1519 */
1520static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1521 struct bio_vec *bvec)
1522{
1523 struct rbd_device *rbd_dev = q->queuedata;
1524 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1525 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1526 unsigned int bio_sectors = bmd->bi_size >> 9;
1527 int max;
1528
1529 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1530 + bio_sectors)) << 9;
1531 if (max < 0)
1532 max = 0; /* bio_add cannot handle a negative return */
1533 if (max <= bvec->bv_len && bio_sectors == 0)
1534 return bvec->bv_len;
1535 return max;
1536}
1537
1538static void rbd_free_disk(struct rbd_device *rbd_dev)
1539{
1540 struct gendisk *disk = rbd_dev->disk;
1541
1542 if (!disk)
1543 return;
1544
1545 rbd_header_free(&rbd_dev->header);
1546
1547 if (disk->flags & GENHD_FL_UP)
1548 del_gendisk(disk);
1549 if (disk->queue)
1550 blk_cleanup_queue(disk->queue);
1551 put_disk(disk);
1552}
1553
1554/*
1555 * reload the ondisk the header
1556 */
1557static int rbd_read_header(struct rbd_device *rbd_dev,
1558 struct rbd_image_header *header)
1559{
1560 ssize_t rc;
1561 struct rbd_image_header_ondisk *dh;
1562 int snap_count = 0;
1563 u64 snap_names_len = 0;
59c2be1e 1564 u64 ver;
602adf40
YS
1565
1566 while (1) {
1567 int len = sizeof(*dh) +
1568 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1569 snap_names_len;
1570
1571 rc = -ENOMEM;
1572 dh = kmalloc(len, GFP_KERNEL);
1573 if (!dh)
1574 return -ENOMEM;
1575
1576 rc = rbd_req_sync_read(rbd_dev,
1577 NULL, CEPH_NOSNAP,
1578 rbd_dev->obj_md_name,
1579 0, len,
59c2be1e 1580 (char *)dh, &ver);
602adf40
YS
1581 if (rc < 0)
1582 goto out_dh;
1583
1584 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
81e759fb
JD
1585 if (rc < 0) {
1586 if (rc == -ENXIO) {
1587 pr_warning("unrecognized header format"
1588 " for image %s", rbd_dev->obj);
1589 }
602adf40 1590 goto out_dh;
81e759fb 1591 }
602adf40
YS
1592
1593 if (snap_count != header->total_snaps) {
1594 snap_count = header->total_snaps;
1595 snap_names_len = header->snap_names_len;
1596 rbd_header_free(header);
1597 kfree(dh);
1598 continue;
1599 }
1600 break;
1601 }
59c2be1e 1602 header->obj_version = ver;
602adf40
YS
1603
1604out_dh:
1605 kfree(dh);
1606 return rc;
1607}
1608
1609/*
1610 * create a snapshot
1611 */
1612static int rbd_header_add_snap(struct rbd_device *dev,
1613 const char *snap_name,
1614 gfp_t gfp_flags)
1615{
1616 int name_len = strlen(snap_name);
1617 u64 new_snapid;
1618 int ret;
916d4d67 1619 void *data, *p, *e;
59c2be1e 1620 u64 ver;
1dbb4399 1621 struct ceph_mon_client *monc;
602adf40
YS
1622
1623 /* we should create a snapshot only if we're pointing at the head */
1624 if (dev->cur_snap)
1625 return -EINVAL;
1626
1dbb4399
AE
1627 monc = &dev->rbd_client->client->monc;
1628 ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
602adf40
YS
1629 dout("created snapid=%lld\n", new_snapid);
1630 if (ret < 0)
1631 return ret;
1632
1633 data = kmalloc(name_len + 16, gfp_flags);
1634 if (!data)
1635 return -ENOMEM;
1636
916d4d67
SW
1637 p = data;
1638 e = data + name_len + 16;
602adf40 1639
916d4d67
SW
1640 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1641 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40
YS
1642
1643 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
916d4d67 1644 data, p - data, &ver);
602adf40 1645
916d4d67 1646 kfree(data);
602adf40
YS
1647
1648 if (ret < 0)
1649 return ret;
1650
1651 dev->header.snapc->seq = new_snapid;
1652
1653 return 0;
1654bad:
1655 return -ERANGE;
1656}
1657
dfc5606d
YS
1658static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1659{
1660 struct rbd_snap *snap;
1661
1662 while (!list_empty(&rbd_dev->snaps)) {
1663 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1664 __rbd_remove_snap_dev(rbd_dev, snap);
1665 }
1666}
1667
602adf40
YS
1668/*
1669 * only read the first part of the ondisk header, without the snaps info
1670 */
dfc5606d 1671static int __rbd_update_snaps(struct rbd_device *rbd_dev)
602adf40
YS
1672{
1673 int ret;
1674 struct rbd_image_header h;
1675 u64 snap_seq;
59c2be1e 1676 int follow_seq = 0;
602adf40
YS
1677
1678 ret = rbd_read_header(rbd_dev, &h);
1679 if (ret < 0)
1680 return ret;
1681
9db4b3e3
SW
1682 /* resized? */
1683 set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1684
602adf40
YS
1685 down_write(&rbd_dev->header.snap_rwsem);
1686
1687 snap_seq = rbd_dev->header.snapc->seq;
59c2be1e
YS
1688 if (rbd_dev->header.total_snaps &&
1689 rbd_dev->header.snapc->snaps[0] == snap_seq)
1690 /* pointing at the head, will need to follow that
1691 if head moves */
1692 follow_seq = 1;
602adf40
YS
1693
1694 kfree(rbd_dev->header.snapc);
1695 kfree(rbd_dev->header.snap_names);
1696 kfree(rbd_dev->header.snap_sizes);
1697
1698 rbd_dev->header.total_snaps = h.total_snaps;
1699 rbd_dev->header.snapc = h.snapc;
1700 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1701 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1702 rbd_dev->header.snap_sizes = h.snap_sizes;
59c2be1e
YS
1703 if (follow_seq)
1704 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1705 else
1706 rbd_dev->header.snapc->seq = snap_seq;
602adf40 1707
dfc5606d
YS
1708 ret = __rbd_init_snaps_header(rbd_dev);
1709
602adf40
YS
1710 up_write(&rbd_dev->header.snap_rwsem);
1711
dfc5606d 1712 return ret;
602adf40
YS
1713}
1714
1715static int rbd_init_disk(struct rbd_device *rbd_dev)
1716{
1717 struct gendisk *disk;
1718 struct request_queue *q;
1719 int rc;
1720 u64 total_size = 0;
1721
1722 /* contact OSD, request size info about the object being mapped */
1723 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1724 if (rc)
1725 return rc;
1726
dfc5606d
YS
1727 /* no need to lock here, as rbd_dev is not registered yet */
1728 rc = __rbd_init_snaps_header(rbd_dev);
1729 if (rc)
1730 return rc;
1731
602adf40
YS
1732 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1733 if (rc)
1734 return rc;
1735
1736 /* create gendisk info */
1737 rc = -ENOMEM;
1738 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1739 if (!disk)
1740 goto out;
1741
aedfec59
SW
1742 snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1743 rbd_dev->id);
602adf40
YS
1744 disk->major = rbd_dev->major;
1745 disk->first_minor = 0;
1746 disk->fops = &rbd_bd_ops;
1747 disk->private_data = rbd_dev;
1748
1749 /* init rq */
1750 rc = -ENOMEM;
1751 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1752 if (!q)
1753 goto out_disk;
029bcbd8
JD
1754
1755 /* set io sizes to object size */
1756 blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1757 blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1758 blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1759 blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1760
602adf40
YS
1761 blk_queue_merge_bvec(q, rbd_merge_bvec);
1762 disk->queue = q;
1763
1764 q->queuedata = rbd_dev;
1765
1766 rbd_dev->disk = disk;
1767 rbd_dev->q = q;
1768
1769 /* finally, announce the disk to the world */
1770 set_capacity(disk, total_size / 512ULL);
1771 add_disk(disk);
1772
1773 pr_info("%s: added with size 0x%llx\n",
1774 disk->disk_name, (unsigned long long)total_size);
1775 return 0;
1776
1777out_disk:
1778 put_disk(disk);
1779out:
1780 return rc;
1781}
1782
dfc5606d
YS
1783/*
1784 sysfs
1785*/
1786
1787static ssize_t rbd_size_show(struct device *dev,
1788 struct device_attribute *attr, char *buf)
1789{
1790 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1791
1792 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1793}
1794
1795static ssize_t rbd_major_show(struct device *dev,
1796 struct device_attribute *attr, char *buf)
1797{
1798 struct rbd_device *rbd_dev = dev_to_rbd(dev);
602adf40 1799
dfc5606d
YS
1800 return sprintf(buf, "%d\n", rbd_dev->major);
1801}
1802
1803static ssize_t rbd_client_id_show(struct device *dev,
1804 struct device_attribute *attr, char *buf)
602adf40 1805{
dfc5606d
YS
1806 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1807
1dbb4399
AE
1808 return sprintf(buf, "client%lld\n",
1809 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1810}
1811
dfc5606d
YS
1812static ssize_t rbd_pool_show(struct device *dev,
1813 struct device_attribute *attr, char *buf)
602adf40 1814{
dfc5606d
YS
1815 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1816
1817 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1818}
1819
1820static ssize_t rbd_name_show(struct device *dev,
1821 struct device_attribute *attr, char *buf)
1822{
1823 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1824
1825 return sprintf(buf, "%s\n", rbd_dev->obj);
1826}
1827
1828static ssize_t rbd_snap_show(struct device *dev,
1829 struct device_attribute *attr,
1830 char *buf)
1831{
1832 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1833
1834 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1835}
1836
1837static ssize_t rbd_image_refresh(struct device *dev,
1838 struct device_attribute *attr,
1839 const char *buf,
1840 size_t size)
1841{
1842 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1843 int rc;
1844 int ret = size;
602adf40
YS
1845
1846 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1847
dfc5606d
YS
1848 rc = __rbd_update_snaps(rbd_dev);
1849 if (rc < 0)
1850 ret = rc;
602adf40 1851
dfc5606d
YS
1852 mutex_unlock(&ctl_mutex);
1853 return ret;
1854}
602adf40 1855
dfc5606d
YS
1856static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1857static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1858static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1859static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1860static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1861static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1862static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1863static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1864
1865static struct attribute *rbd_attrs[] = {
1866 &dev_attr_size.attr,
1867 &dev_attr_major.attr,
1868 &dev_attr_client_id.attr,
1869 &dev_attr_pool.attr,
1870 &dev_attr_name.attr,
1871 &dev_attr_current_snap.attr,
1872 &dev_attr_refresh.attr,
1873 &dev_attr_create_snap.attr,
dfc5606d
YS
1874 NULL
1875};
1876
1877static struct attribute_group rbd_attr_group = {
1878 .attrs = rbd_attrs,
1879};
1880
1881static const struct attribute_group *rbd_attr_groups[] = {
1882 &rbd_attr_group,
1883 NULL
1884};
1885
1886static void rbd_sysfs_dev_release(struct device *dev)
1887{
1888}
1889
1890static struct device_type rbd_device_type = {
1891 .name = "rbd",
1892 .groups = rbd_attr_groups,
1893 .release = rbd_sysfs_dev_release,
1894};
1895
1896
1897/*
1898 sysfs - snapshots
1899*/
1900
1901static ssize_t rbd_snap_size_show(struct device *dev,
1902 struct device_attribute *attr,
1903 char *buf)
1904{
1905 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1906
1907 return sprintf(buf, "%lld\n", (long long)snap->size);
1908}
1909
1910static ssize_t rbd_snap_id_show(struct device *dev,
1911 struct device_attribute *attr,
1912 char *buf)
1913{
1914 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1915
1916 return sprintf(buf, "%lld\n", (long long)snap->id);
1917}
1918
1919static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1920static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1921
1922static struct attribute *rbd_snap_attrs[] = {
1923 &dev_attr_snap_size.attr,
1924 &dev_attr_snap_id.attr,
1925 NULL,
1926};
1927
1928static struct attribute_group rbd_snap_attr_group = {
1929 .attrs = rbd_snap_attrs,
1930};
1931
1932static void rbd_snap_dev_release(struct device *dev)
1933{
1934 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1935 kfree(snap->name);
1936 kfree(snap);
1937}
1938
1939static const struct attribute_group *rbd_snap_attr_groups[] = {
1940 &rbd_snap_attr_group,
1941 NULL
1942};
1943
1944static struct device_type rbd_snap_device_type = {
1945 .groups = rbd_snap_attr_groups,
1946 .release = rbd_snap_dev_release,
1947};
1948
1949static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1950 struct rbd_snap *snap)
1951{
1952 list_del(&snap->node);
1953 device_unregister(&snap->dev);
1954}
1955
1956static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1957 struct rbd_snap *snap,
1958 struct device *parent)
1959{
1960 struct device *dev = &snap->dev;
1961 int ret;
1962
1963 dev->type = &rbd_snap_device_type;
1964 dev->parent = parent;
1965 dev->release = rbd_snap_dev_release;
1966 dev_set_name(dev, "snap_%s", snap->name);
1967 ret = device_register(dev);
1968
1969 return ret;
1970}
1971
1972static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1973 int i, const char *name,
1974 struct rbd_snap **snapp)
1975{
1976 int ret;
1977 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1978 if (!snap)
1979 return -ENOMEM;
1980 snap->name = kstrdup(name, GFP_KERNEL);
1981 snap->size = rbd_dev->header.snap_sizes[i];
1982 snap->id = rbd_dev->header.snapc->snaps[i];
1983 if (device_is_registered(&rbd_dev->dev)) {
1984 ret = rbd_register_snap_dev(rbd_dev, snap,
1985 &rbd_dev->dev);
1986 if (ret < 0)
1987 goto err;
1988 }
1989 *snapp = snap;
1990 return 0;
1991err:
1992 kfree(snap->name);
1993 kfree(snap);
1994 return ret;
1995}
1996
1997/*
1998 * search for the previous snap in a null delimited string list
1999 */
2000const char *rbd_prev_snap_name(const char *name, const char *start)
2001{
2002 if (name < start + 2)
2003 return NULL;
2004
2005 name -= 2;
2006 while (*name) {
2007 if (name == start)
2008 return start;
2009 name--;
2010 }
2011 return name + 1;
2012}
2013
2014/*
2015 * compare the old list of snapshots that we have to what's in the header
2016 * and update it accordingly. Note that the header holds the snapshots
2017 * in a reverse order (from newest to oldest) and we need to go from
2018 * older to new so that we don't get a duplicate snap name when
2019 * doing the process (e.g., removed snapshot and recreated a new
2020 * one with the same name.
2021 */
2022static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2023{
2024 const char *name, *first_name;
2025 int i = rbd_dev->header.total_snaps;
2026 struct rbd_snap *snap, *old_snap = NULL;
2027 int ret;
2028 struct list_head *p, *n;
2029
2030 first_name = rbd_dev->header.snap_names;
2031 name = first_name + rbd_dev->header.snap_names_len;
2032
2033 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2034 u64 cur_id;
2035
2036 old_snap = list_entry(p, struct rbd_snap, node);
2037
2038 if (i)
2039 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2040
2041 if (!i || old_snap->id < cur_id) {
2042 /* old_snap->id was skipped, thus was removed */
2043 __rbd_remove_snap_dev(rbd_dev, old_snap);
2044 continue;
2045 }
2046 if (old_snap->id == cur_id) {
2047 /* we have this snapshot already */
2048 i--;
2049 name = rbd_prev_snap_name(name, first_name);
2050 continue;
2051 }
2052 for (; i > 0;
2053 i--, name = rbd_prev_snap_name(name, first_name)) {
2054 if (!name) {
2055 WARN_ON(1);
2056 return -EINVAL;
2057 }
2058 cur_id = rbd_dev->header.snapc->snaps[i];
2059 /* snapshot removal? handle it above */
2060 if (cur_id >= old_snap->id)
2061 break;
2062 /* a new snapshot */
2063 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2064 if (ret < 0)
2065 return ret;
2066
2067 /* note that we add it backward so using n and not p */
2068 list_add(&snap->node, n);
2069 p = &snap->node;
2070 }
2071 }
2072 /* we're done going over the old snap list, just add what's left */
2073 for (; i > 0; i--) {
2074 name = rbd_prev_snap_name(name, first_name);
2075 if (!name) {
2076 WARN_ON(1);
2077 return -EINVAL;
2078 }
2079 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2080 if (ret < 0)
2081 return ret;
2082 list_add(&snap->node, &rbd_dev->snaps);
2083 }
2084
2085 return 0;
2086}
2087
2088
2089static void rbd_root_dev_release(struct device *dev)
2090{
2091}
2092
2093static struct device rbd_root_dev = {
2094 .init_name = "rbd",
2095 .release = rbd_root_dev_release,
2096};
2097
2098static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2099{
2100 int ret = -ENOMEM;
2101 struct device *dev;
2102 struct rbd_snap *snap;
2103
2104 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2105 dev = &rbd_dev->dev;
2106
2107 dev->bus = &rbd_bus_type;
2108 dev->type = &rbd_device_type;
2109 dev->parent = &rbd_root_dev;
2110 dev->release = rbd_dev_release;
2111 dev_set_name(dev, "%d", rbd_dev->id);
2112 ret = device_register(dev);
2113 if (ret < 0)
2114 goto done_free;
2115
2116 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2117 ret = rbd_register_snap_dev(rbd_dev, snap,
2118 &rbd_dev->dev);
2119 if (ret < 0)
602adf40
YS
2120 break;
2121 }
2122
2123 mutex_unlock(&ctl_mutex);
dfc5606d
YS
2124 return 0;
2125done_free:
2126 mutex_unlock(&ctl_mutex);
2127 return ret;
602adf40
YS
2128}
2129
dfc5606d
YS
2130static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2131{
2132 device_unregister(&rbd_dev->dev);
2133}
2134
59c2be1e
YS
2135static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2136{
2137 int ret, rc;
2138
2139 do {
2140 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2141 rbd_dev->header.obj_version);
2142 if (ret == -ERANGE) {
2143 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2144 rc = __rbd_update_snaps(rbd_dev);
2145 mutex_unlock(&ctl_mutex);
2146 if (rc < 0)
2147 return rc;
2148 }
2149 } while (ret == -ERANGE);
2150
2151 return ret;
2152}
2153
2154static ssize_t rbd_add(struct bus_type *bus,
2155 const char *buf,
2156 size_t count)
602adf40
YS
2157{
2158 struct ceph_osd_client *osdc;
2159 struct rbd_device *rbd_dev;
2160 ssize_t rc = -ENOMEM;
2161 int irc, new_id = 0;
2162 struct list_head *tmp;
2163 char *mon_dev_name;
2164 char *options;
2165
2166 if (!try_module_get(THIS_MODULE))
2167 return -ENODEV;
2168
2169 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2170 if (!mon_dev_name)
2171 goto err_out_mod;
2172
2173 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2174 if (!options)
2175 goto err_mon_dev;
2176
2177 /* new rbd_device object */
2178 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2179 if (!rbd_dev)
2180 goto err_out_opt;
2181
2182 /* static rbd_device initialization */
2183 spin_lock_init(&rbd_dev->lock);
2184 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2185 INIT_LIST_HEAD(&rbd_dev->snaps);
602adf40 2186
0e805a1d
AE
2187 init_rwsem(&rbd_dev->header.snap_rwsem);
2188
602adf40
YS
2189 /* generate unique id: find highest unique id, add one */
2190 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2191
2192 list_for_each(tmp, &rbd_dev_list) {
2193 struct rbd_device *rbd_dev;
2194
2195 rbd_dev = list_entry(tmp, struct rbd_device, node);
2196 if (rbd_dev->id >= new_id)
2197 new_id = rbd_dev->id + 1;
2198 }
2199
2200 rbd_dev->id = new_id;
2201
2202 /* add to global list */
2203 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2204
2205 /* parse add command */
2206 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2207 "%" __stringify(RBD_MAX_OPT_LEN) "s "
2208 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2209 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2210 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2211 mon_dev_name, options, rbd_dev->pool_name,
2212 rbd_dev->obj, rbd_dev->snap_name) < 4) {
2213 rc = -EINVAL;
2214 goto err_out_slot;
2215 }
2216
2217 if (rbd_dev->snap_name[0] == 0)
2218 rbd_dev->snap_name[0] = '-';
2219
2220 rbd_dev->obj_len = strlen(rbd_dev->obj);
2221 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2222 rbd_dev->obj, RBD_SUFFIX);
2223
2224 /* initialize rest of new object */
2225 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2226 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2227 if (rc < 0)
2228 goto err_out_slot;
2229
2230 mutex_unlock(&ctl_mutex);
2231
2232 /* pick the pool */
1dbb4399 2233 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2234 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2235 if (rc < 0)
2236 goto err_out_client;
2237 rbd_dev->poolid = rc;
2238
2239 /* register our block device */
2240 irc = register_blkdev(0, rbd_dev->name);
2241 if (irc < 0) {
2242 rc = irc;
2243 goto err_out_client;
2244 }
2245 rbd_dev->major = irc;
2246
dfc5606d
YS
2247 rc = rbd_bus_add_dev(rbd_dev);
2248 if (rc)
766fc439
YS
2249 goto err_out_blkdev;
2250
602adf40
YS
2251 /* set up and announce blkdev mapping */
2252 rc = rbd_init_disk(rbd_dev);
2253 if (rc)
766fc439 2254 goto err_out_bus;
602adf40 2255
59c2be1e
YS
2256 rc = rbd_init_watch_dev(rbd_dev);
2257 if (rc)
2258 goto err_out_bus;
2259
602adf40
YS
2260 return count;
2261
766fc439
YS
2262err_out_bus:
2263 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2264 list_del_init(&rbd_dev->node);
2265 mutex_unlock(&ctl_mutex);
2266
2267 /* this will also clean up rest of rbd_dev stuff */
2268
2269 rbd_bus_del_dev(rbd_dev);
2270 kfree(options);
2271 kfree(mon_dev_name);
2272 return rc;
2273
602adf40
YS
2274err_out_blkdev:
2275 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2276err_out_client:
2277 rbd_put_client(rbd_dev);
2278 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2279err_out_slot:
2280 list_del_init(&rbd_dev->node);
2281 mutex_unlock(&ctl_mutex);
2282
2283 kfree(rbd_dev);
2284err_out_opt:
2285 kfree(options);
2286err_mon_dev:
2287 kfree(mon_dev_name);
2288err_out_mod:
2289 dout("Error adding device %s\n", buf);
2290 module_put(THIS_MODULE);
2291 return rc;
2292}
2293
2294static struct rbd_device *__rbd_get_dev(unsigned long id)
2295{
2296 struct list_head *tmp;
2297 struct rbd_device *rbd_dev;
2298
2299 list_for_each(tmp, &rbd_dev_list) {
2300 rbd_dev = list_entry(tmp, struct rbd_device, node);
2301 if (rbd_dev->id == id)
2302 return rbd_dev;
2303 }
2304 return NULL;
2305}
2306
dfc5606d 2307static void rbd_dev_release(struct device *dev)
602adf40 2308{
dfc5606d
YS
2309 struct rbd_device *rbd_dev =
2310 container_of(dev, struct rbd_device, dev);
602adf40 2311
1dbb4399
AE
2312 if (rbd_dev->watch_request) {
2313 struct ceph_client *client = rbd_dev->rbd_client->client;
2314
2315 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2316 rbd_dev->watch_request);
1dbb4399 2317 }
59c2be1e 2318 if (rbd_dev->watch_event)
79e3057c 2319 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
59c2be1e 2320
602adf40
YS
2321 rbd_put_client(rbd_dev);
2322
2323 /* clean up and free blkdev */
2324 rbd_free_disk(rbd_dev);
2325 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2326 kfree(rbd_dev);
2327
2328 /* release module ref */
2329 module_put(THIS_MODULE);
602adf40
YS
2330}
2331
dfc5606d
YS
2332static ssize_t rbd_remove(struct bus_type *bus,
2333 const char *buf,
2334 size_t count)
602adf40
YS
2335{
2336 struct rbd_device *rbd_dev = NULL;
2337 int target_id, rc;
2338 unsigned long ul;
2339 int ret = count;
2340
2341 rc = strict_strtoul(buf, 10, &ul);
2342 if (rc)
2343 return rc;
2344
2345 /* convert to int; abort if we lost anything in the conversion */
2346 target_id = (int) ul;
2347 if (target_id != ul)
2348 return -EINVAL;
2349
2350 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2351
2352 rbd_dev = __rbd_get_dev(target_id);
2353 if (!rbd_dev) {
2354 ret = -ENOENT;
2355 goto done;
2356 }
2357
dfc5606d
YS
2358 list_del_init(&rbd_dev->node);
2359
2360 __rbd_remove_all_snaps(rbd_dev);
2361 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2362
2363done:
2364 mutex_unlock(&ctl_mutex);
2365 return ret;
2366}
2367
dfc5606d
YS
2368static ssize_t rbd_snap_add(struct device *dev,
2369 struct device_attribute *attr,
2370 const char *buf,
2371 size_t count)
602adf40 2372{
dfc5606d
YS
2373 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2374 int ret;
2375 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2376 if (!name)
2377 return -ENOMEM;
2378
dfc5606d 2379 snprintf(name, count, "%s", buf);
602adf40
YS
2380
2381 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2382
602adf40
YS
2383 ret = rbd_header_add_snap(rbd_dev,
2384 name, GFP_KERNEL);
2385 if (ret < 0)
59c2be1e 2386 goto err_unlock;
602adf40 2387
dfc5606d 2388 ret = __rbd_update_snaps(rbd_dev);
602adf40 2389 if (ret < 0)
59c2be1e
YS
2390 goto err_unlock;
2391
2392 /* shouldn't hold ctl_mutex when notifying.. notify might
2393 trigger a watch callback that would need to get that mutex */
2394 mutex_unlock(&ctl_mutex);
2395
2396 /* make a best effort, don't error if failed */
2397 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
602adf40
YS
2398
2399 ret = count;
59c2be1e
YS
2400 kfree(name);
2401 return ret;
2402
2403err_unlock:
602adf40 2404 mutex_unlock(&ctl_mutex);
602adf40
YS
2405 kfree(name);
2406 return ret;
2407}
2408
dfc5606d
YS
2409static struct bus_attribute rbd_bus_attrs[] = {
2410 __ATTR(add, S_IWUSR, NULL, rbd_add),
2411 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
602adf40
YS
2412 __ATTR_NULL
2413};
2414
2415/*
2416 * create control files in sysfs
dfc5606d 2417 * /sys/bus/rbd/...
602adf40
YS
2418 */
2419static int rbd_sysfs_init(void)
2420{
dfc5606d 2421 int ret;
602adf40 2422
dfc5606d 2423 rbd_bus_type.bus_attrs = rbd_bus_attrs;
602adf40 2424
dfc5606d 2425 ret = bus_register(&rbd_bus_type);
21079786 2426 if (ret < 0)
dfc5606d 2427 return ret;
602adf40 2428
dfc5606d 2429 ret = device_register(&rbd_root_dev);
602adf40 2430
602adf40
YS
2431 return ret;
2432}
2433
2434static void rbd_sysfs_cleanup(void)
2435{
dfc5606d
YS
2436 device_unregister(&rbd_root_dev);
2437 bus_unregister(&rbd_bus_type);
602adf40
YS
2438}
2439
2440int __init rbd_init(void)
2441{
2442 int rc;
2443
2444 rc = rbd_sysfs_init();
2445 if (rc)
2446 return rc;
602adf40
YS
2447 pr_info("loaded " DRV_NAME_LONG "\n");
2448 return 0;
2449}
2450
2451void __exit rbd_exit(void)
2452{
2453 rbd_sysfs_cleanup();
2454}
2455
2456module_init(rbd_init);
2457module_exit(rbd_exit);
2458
2459MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2460MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2461MODULE_DESCRIPTION("rados block device");
2462
2463/* following authorship retained from original osdblk.c */
2464MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2465
2466MODULE_LICENSE("GPL");