]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/rbd.c
rbd: check for overflow in rbd_get_num_segments()
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
593a9e7b
AE
44/*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
df111be6
AE
53/* It might be useful to have this defined elsewhere too */
54
55#define U64_MAX ((u64) (~0ULL))
56
f0f8cef5
AE
57#define RBD_DRV_NAME "rbd"
58#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
59
60#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
61
602adf40
YS
62#define RBD_MAX_SNAP_NAME_LEN 32
63#define RBD_MAX_OPT_LEN 1024
64
65#define RBD_SNAP_HEAD_NAME "-"
66
81a89793
AE
67/*
68 * An RBD device name will be "rbd#", where the "rbd" comes from
69 * RBD_DRV_NAME above, and # is a unique integer identifier.
70 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
71 * enough to hold all possible device names.
72 */
602adf40 73#define DEV_NAME_LEN 32
81a89793 74#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 75
cc0538b6 76#define RBD_READ_ONLY_DEFAULT false
59c2be1e 77
602adf40
YS
78/*
79 * block device image metadata (in-memory version)
80 */
81struct rbd_image_header {
82 u64 image_size;
849b4260 83 char *object_prefix;
602adf40
YS
84 __u8 obj_order;
85 __u8 crypt_type;
86 __u8 comp_type;
602adf40 87 struct ceph_snap_context *snapc;
602adf40
YS
88 u32 total_snaps;
89
90 char *snap_names;
91 u64 *snap_sizes;
59c2be1e
YS
92
93 u64 obj_version;
94};
95
96struct rbd_options {
cc0538b6 97 bool read_only;
602adf40
YS
98};
99
100/*
f0f8cef5 101 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
102 */
103struct rbd_client {
104 struct ceph_client *client;
105 struct kref kref;
106 struct list_head node;
107};
108
109/*
f0f8cef5 110 * a request completion status
602adf40 111 */
1fec7093
YS
112struct rbd_req_status {
113 int done;
114 int rc;
115 u64 bytes;
116};
117
118/*
119 * a collection of requests
120 */
121struct rbd_req_coll {
122 int total;
123 int num_done;
124 struct kref kref;
125 struct rbd_req_status status[0];
602adf40
YS
126};
127
f0f8cef5
AE
128/*
129 * a single io request
130 */
131struct rbd_request {
132 struct request *rq; /* blk layer request */
133 struct bio *bio; /* cloned bio */
134 struct page **pages; /* list of used pages */
135 u64 len;
136 int coll_index;
137 struct rbd_req_coll *coll;
138};
139
dfc5606d
YS
140struct rbd_snap {
141 struct device dev;
142 const char *name;
3591538f 143 u64 size;
dfc5606d
YS
144 struct list_head node;
145 u64 id;
146};
147
602adf40
YS
148/*
149 * a single device
150 */
151struct rbd_device {
de71a297 152 int dev_id; /* blkdev unique id */
602adf40
YS
153
154 int major; /* blkdev assigned major */
155 struct gendisk *disk; /* blkdev's gendisk and rq */
156 struct request_queue *q;
157
f8c38929 158 struct rbd_options rbd_opts;
602adf40
YS
159 struct rbd_client *rbd_client;
160
161 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
162
163 spinlock_t lock; /* queue lock */
164
165 struct rbd_image_header header;
0bed54dc
AE
166 char *image_name;
167 size_t image_name_len;
168 char *header_name;
d22f76e7 169 char *pool_name;
9bb2f334 170 int pool_id;
602adf40 171
59c2be1e
YS
172 struct ceph_osd_event *watch_event;
173 struct ceph_osd_request *watch_request;
174
c666601a
JD
175 /* protects updating the header */
176 struct rw_semaphore header_rwsem;
e88a36ec 177 /* name of the snapshot this device reads from */
820a5f3e 178 char *snap_name;
e88a36ec 179 /* id of the snapshot this device reads from */
77dfe99f 180 u64 snap_id; /* current snapshot id */
e88a36ec
JD
181 /* whether the snap_id this device reads from still exists */
182 bool snap_exists;
cc0538b6 183 bool read_only;
602adf40
YS
184
185 struct list_head node;
dfc5606d
YS
186
187 /* list of snapshots */
188 struct list_head snaps;
189
190 /* sysfs related */
191 struct device dev;
192};
193
602adf40 194static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 195
602adf40 196static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
197static DEFINE_SPINLOCK(rbd_dev_list_lock);
198
432b8587
AE
199static LIST_HEAD(rbd_client_list); /* clients */
200static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 201
dfc5606d
YS
202static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
203static void rbd_dev_release(struct device *dev);
dfc5606d
YS
204static ssize_t rbd_snap_add(struct device *dev,
205 struct device_attribute *attr,
206 const char *buf,
207 size_t count);
14e7085d 208static void __rbd_remove_snap_dev(struct rbd_snap *snap);
dfc5606d 209
f0f8cef5
AE
210static ssize_t rbd_add(struct bus_type *bus, const char *buf,
211 size_t count);
212static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
213 size_t count);
214
215static struct bus_attribute rbd_bus_attrs[] = {
216 __ATTR(add, S_IWUSR, NULL, rbd_add),
217 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
218 __ATTR_NULL
219};
220
221static struct bus_type rbd_bus_type = {
222 .name = "rbd",
223 .bus_attrs = rbd_bus_attrs,
224};
225
226static void rbd_root_dev_release(struct device *dev)
227{
228}
229
230static struct device rbd_root_dev = {
231 .init_name = "rbd",
232 .release = rbd_root_dev_release,
233};
234
dfc5606d 235
dfc5606d
YS
236static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
237{
238 return get_device(&rbd_dev->dev);
239}
240
241static void rbd_put_dev(struct rbd_device *rbd_dev)
242{
243 put_device(&rbd_dev->dev);
244}
602adf40 245
1fe5e993 246static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
59c2be1e 247
602adf40
YS
248static int rbd_open(struct block_device *bdev, fmode_t mode)
249{
f0f8cef5 250 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 251
602adf40
YS
252 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
253 return -EROFS;
254
340c7a2b
AE
255 rbd_get_dev(rbd_dev);
256 set_device_ro(bdev, rbd_dev->read_only);
257
602adf40
YS
258 return 0;
259}
260
dfc5606d
YS
261static int rbd_release(struct gendisk *disk, fmode_t mode)
262{
263 struct rbd_device *rbd_dev = disk->private_data;
264
265 rbd_put_dev(rbd_dev);
266
267 return 0;
268}
269
602adf40
YS
270static const struct block_device_operations rbd_bd_ops = {
271 .owner = THIS_MODULE,
272 .open = rbd_open,
dfc5606d 273 .release = rbd_release,
602adf40
YS
274};
275
276/*
277 * Initialize an rbd client instance.
43ae4701 278 * We own *ceph_opts.
602adf40 279 */
f8c38929 280static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
281{
282 struct rbd_client *rbdc;
283 int ret = -ENOMEM;
284
285 dout("rbd_client_create\n");
286 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
287 if (!rbdc)
288 goto out_opt;
289
290 kref_init(&rbdc->kref);
291 INIT_LIST_HEAD(&rbdc->node);
292
bc534d86
AE
293 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
294
43ae4701 295 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 296 if (IS_ERR(rbdc->client))
bc534d86 297 goto out_mutex;
43ae4701 298 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
299
300 ret = ceph_open_session(rbdc->client);
301 if (ret < 0)
302 goto out_err;
303
432b8587 304 spin_lock(&rbd_client_list_lock);
602adf40 305 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 306 spin_unlock(&rbd_client_list_lock);
602adf40 307
bc534d86
AE
308 mutex_unlock(&ctl_mutex);
309
602adf40
YS
310 dout("rbd_client_create created %p\n", rbdc);
311 return rbdc;
312
313out_err:
314 ceph_destroy_client(rbdc->client);
bc534d86
AE
315out_mutex:
316 mutex_unlock(&ctl_mutex);
602adf40
YS
317 kfree(rbdc);
318out_opt:
43ae4701
AE
319 if (ceph_opts)
320 ceph_destroy_options(ceph_opts);
28f259b7 321 return ERR_PTR(ret);
602adf40
YS
322}
323
324/*
1f7ba331
AE
325 * Find a ceph client with specific addr and configuration. If
326 * found, bump its reference count.
602adf40 327 */
1f7ba331 328static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
329{
330 struct rbd_client *client_node;
1f7ba331 331 bool found = false;
602adf40 332
43ae4701 333 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
334 return NULL;
335
1f7ba331
AE
336 spin_lock(&rbd_client_list_lock);
337 list_for_each_entry(client_node, &rbd_client_list, node) {
338 if (!ceph_compare_options(ceph_opts, client_node->client)) {
339 kref_get(&client_node->kref);
340 found = true;
341 break;
342 }
343 }
344 spin_unlock(&rbd_client_list_lock);
345
346 return found ? client_node : NULL;
602adf40
YS
347}
348
59c2be1e
YS
349/*
350 * mount options
351 */
352enum {
59c2be1e
YS
353 Opt_last_int,
354 /* int args above */
355 Opt_last_string,
356 /* string args above */
cc0538b6
AE
357 Opt_read_only,
358 Opt_read_write,
359 /* Boolean args above */
360 Opt_last_bool,
59c2be1e
YS
361};
362
43ae4701 363static match_table_t rbd_opts_tokens = {
59c2be1e
YS
364 /* int args above */
365 /* string args above */
cc0538b6
AE
366 {Opt_read_only, "read_only"},
367 {Opt_read_only, "ro"}, /* Alternate spelling */
368 {Opt_read_write, "read_write"},
369 {Opt_read_write, "rw"}, /* Alternate spelling */
370 /* Boolean args above */
59c2be1e
YS
371 {-1, NULL}
372};
373
374static int parse_rbd_opts_token(char *c, void *private)
375{
43ae4701 376 struct rbd_options *rbd_opts = private;
59c2be1e
YS
377 substring_t argstr[MAX_OPT_ARGS];
378 int token, intval, ret;
379
43ae4701 380 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
381 if (token < 0)
382 return -EINVAL;
383
384 if (token < Opt_last_int) {
385 ret = match_int(&argstr[0], &intval);
386 if (ret < 0) {
387 pr_err("bad mount option arg (not int) "
388 "at '%s'\n", c);
389 return ret;
390 }
391 dout("got int token %d val %d\n", token, intval);
392 } else if (token > Opt_last_int && token < Opt_last_string) {
393 dout("got string token %d val %s\n", token,
394 argstr[0].from);
cc0538b6
AE
395 } else if (token > Opt_last_string && token < Opt_last_bool) {
396 dout("got Boolean token %d\n", token);
59c2be1e
YS
397 } else {
398 dout("got token %d\n", token);
399 }
400
401 switch (token) {
cc0538b6
AE
402 case Opt_read_only:
403 rbd_opts->read_only = true;
404 break;
405 case Opt_read_write:
406 rbd_opts->read_only = false;
407 break;
59c2be1e
YS
408 default:
409 BUG_ON(token);
410 }
411 return 0;
412}
413
602adf40
YS
414/*
415 * Get a ceph client with specific addr and configuration, if one does
416 * not exist create it.
417 */
f8c38929
AE
418static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
419 size_t mon_addr_len, char *options)
602adf40 420{
f8c38929 421 struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
43ae4701 422 struct ceph_options *ceph_opts;
f8c38929 423 struct rbd_client *rbdc;
59c2be1e 424
cc0538b6 425 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
602adf40 426
43ae4701
AE
427 ceph_opts = ceph_parse_options(options, mon_addr,
428 mon_addr + mon_addr_len,
429 parse_rbd_opts_token, rbd_opts);
f8c38929
AE
430 if (IS_ERR(ceph_opts))
431 return PTR_ERR(ceph_opts);
602adf40 432
1f7ba331 433 rbdc = rbd_client_find(ceph_opts);
602adf40 434 if (rbdc) {
602adf40 435 /* using an existing client */
43ae4701 436 ceph_destroy_options(ceph_opts);
f8c38929
AE
437 } else {
438 rbdc = rbd_client_create(ceph_opts);
439 if (IS_ERR(rbdc))
440 return PTR_ERR(rbdc);
602adf40 441 }
f8c38929 442 rbd_dev->rbd_client = rbdc;
602adf40 443
f8c38929 444 return 0;
602adf40
YS
445}
446
447/*
448 * Destroy ceph client
d23a4b3f 449 *
432b8587 450 * Caller must hold rbd_client_list_lock.
602adf40
YS
451 */
452static void rbd_client_release(struct kref *kref)
453{
454 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
455
456 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 457 spin_lock(&rbd_client_list_lock);
602adf40 458 list_del(&rbdc->node);
cd9d9f5d 459 spin_unlock(&rbd_client_list_lock);
602adf40
YS
460
461 ceph_destroy_client(rbdc->client);
462 kfree(rbdc);
463}
464
465/*
466 * Drop reference to ceph client node. If it's not referenced anymore, release
467 * it.
468 */
469static void rbd_put_client(struct rbd_device *rbd_dev)
470{
471 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
472 rbd_dev->rbd_client = NULL;
602adf40
YS
473}
474
1fec7093
YS
475/*
476 * Destroy requests collection
477 */
478static void rbd_coll_release(struct kref *kref)
479{
480 struct rbd_req_coll *coll =
481 container_of(kref, struct rbd_req_coll, kref);
482
483 dout("rbd_coll_release %p\n", coll);
484 kfree(coll);
485}
602adf40 486
8e94af8e
AE
487static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
488{
103a150f
AE
489 size_t size;
490 u32 snap_count;
491
492 /* The header has to start with the magic rbd header text */
493 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
494 return false;
495
496 /*
497 * The size of a snapshot header has to fit in a size_t, and
498 * that limits the number of snapshots.
499 */
500 snap_count = le32_to_cpu(ondisk->snap_count);
501 size = SIZE_MAX - sizeof (struct ceph_snap_context);
502 if (snap_count > size / sizeof (__le64))
503 return false;
504
505 /*
506 * Not only that, but the size of the entire the snapshot
507 * header must also be representable in a size_t.
508 */
509 size -= snap_count * sizeof (__le64);
510 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
511 return false;
512
513 return true;
8e94af8e
AE
514}
515
602adf40
YS
516/*
517 * Create a new header structure, translate header format from the on-disk
518 * header.
519 */
520static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 521 struct rbd_image_header_ondisk *ondisk)
602adf40 522{
ccece235 523 u32 snap_count;
58c17b0e 524 size_t len;
d2bb24e5 525 size_t size;
621901d6 526 u32 i;
602adf40 527
6a52325f
AE
528 memset(header, 0, sizeof (*header));
529
103a150f
AE
530 snap_count = le32_to_cpu(ondisk->snap_count);
531
58c17b0e
AE
532 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
533 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 534 if (!header->object_prefix)
602adf40 535 return -ENOMEM;
58c17b0e
AE
536 memcpy(header->object_prefix, ondisk->object_prefix, len);
537 header->object_prefix[len] = '\0';
00f1f36f 538
602adf40 539 if (snap_count) {
f785cc1d
AE
540 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
541
621901d6
AE
542 /* Save a copy of the snapshot names */
543
f785cc1d
AE
544 if (snap_names_len > (u64) SIZE_MAX)
545 return -EIO;
546 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 547 if (!header->snap_names)
6a52325f 548 goto out_err;
f785cc1d
AE
549 /*
550 * Note that rbd_dev_v1_header_read() guarantees
551 * the ondisk buffer we're working with has
552 * snap_names_len bytes beyond the end of the
553 * snapshot id array, this memcpy() is safe.
554 */
555 memcpy(header->snap_names, &ondisk->snaps[snap_count],
556 snap_names_len);
6a52325f 557
621901d6
AE
558 /* Record each snapshot's size */
559
d2bb24e5
AE
560 size = snap_count * sizeof (*header->snap_sizes);
561 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 562 if (!header->snap_sizes)
6a52325f 563 goto out_err;
621901d6
AE
564 for (i = 0; i < snap_count; i++)
565 header->snap_sizes[i] =
566 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40 567 } else {
ccece235 568 WARN_ON(ondisk->snap_names_len);
602adf40
YS
569 header->snap_names = NULL;
570 header->snap_sizes = NULL;
571 }
849b4260 572
602adf40
YS
573 header->image_size = le64_to_cpu(ondisk->image_size);
574 header->obj_order = ondisk->options.order;
575 header->crypt_type = ondisk->options.crypt_type;
576 header->comp_type = ondisk->options.comp_type;
6a52325f
AE
577 header->total_snaps = snap_count;
578
621901d6
AE
579 /* Allocate and fill in the snapshot context */
580
6a52325f
AE
581 size = sizeof (struct ceph_snap_context);
582 size += snap_count * sizeof (header->snapc->snaps[0]);
583 header->snapc = kzalloc(size, GFP_KERNEL);
584 if (!header->snapc)
585 goto out_err;
602adf40
YS
586
587 atomic_set(&header->snapc->nref, 1);
505cbb9b 588 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 589 header->snapc->num_snaps = snap_count;
621901d6
AE
590 for (i = 0; i < snap_count; i++)
591 header->snapc->snaps[i] =
592 le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
593
594 return 0;
595
6a52325f 596out_err:
849b4260 597 kfree(header->snap_sizes);
ccece235 598 header->snap_sizes = NULL;
602adf40 599 kfree(header->snap_names);
ccece235 600 header->snap_names = NULL;
6a52325f
AE
601 kfree(header->object_prefix);
602 header->object_prefix = NULL;
ccece235 603
00f1f36f 604 return -ENOMEM;
602adf40
YS
605}
606
602adf40
YS
607static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
608 u64 *seq, u64 *size)
609{
610 int i;
611 char *p = header->snap_names;
612
00f1f36f
AE
613 for (i = 0; i < header->total_snaps; i++) {
614 if (!strcmp(snap_name, p)) {
602adf40 615
00f1f36f 616 /* Found it. Pass back its id and/or size */
602adf40 617
00f1f36f
AE
618 if (seq)
619 *seq = header->snapc->snaps[i];
620 if (size)
621 *size = header->snap_sizes[i];
622 return i;
623 }
624 p += strlen(p) + 1; /* Skip ahead to the next name */
625 }
626 return -ENOENT;
602adf40
YS
627}
628
0ce1a794 629static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
602adf40 630{
78dc447d 631 int ret;
602adf40 632
0ce1a794 633 down_write(&rbd_dev->header_rwsem);
602adf40 634
0ce1a794 635 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 636 sizeof (RBD_SNAP_HEAD_NAME))) {
0ce1a794 637 rbd_dev->snap_id = CEPH_NOSNAP;
e88a36ec 638 rbd_dev->snap_exists = false;
cc0538b6 639 rbd_dev->read_only = rbd_dev->rbd_opts.read_only;
602adf40 640 if (size)
78dc447d 641 *size = rbd_dev->header.image_size;
602adf40 642 } else {
78dc447d
AE
643 u64 snap_id = 0;
644
645 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
646 &snap_id, size);
602adf40
YS
647 if (ret < 0)
648 goto done;
78dc447d 649 rbd_dev->snap_id = snap_id;
e88a36ec 650 rbd_dev->snap_exists = true;
cc0538b6 651 rbd_dev->read_only = true; /* No choice for snapshots */
602adf40
YS
652 }
653
654 ret = 0;
655done:
0ce1a794 656 up_write(&rbd_dev->header_rwsem);
602adf40
YS
657 return ret;
658}
659
660static void rbd_header_free(struct rbd_image_header *header)
661{
849b4260 662 kfree(header->object_prefix);
d78fd7ae 663 header->object_prefix = NULL;
602adf40 664 kfree(header->snap_sizes);
d78fd7ae 665 header->snap_sizes = NULL;
849b4260 666 kfree(header->snap_names);
d78fd7ae 667 header->snap_names = NULL;
d1d25646 668 ceph_put_snap_context(header->snapc);
d78fd7ae 669 header->snapc = NULL;
602adf40
YS
670}
671
672/*
673 * get the actual striped segment name, offset and length
674 */
675static u64 rbd_get_segment(struct rbd_image_header *header,
ca1e49a6 676 const char *object_prefix,
602adf40
YS
677 u64 ofs, u64 len,
678 char *seg_name, u64 *segofs)
679{
680 u64 seg = ofs >> header->obj_order;
681
682 if (seg_name)
683 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
ca1e49a6 684 "%s.%012llx", object_prefix, seg);
602adf40
YS
685
686 ofs = ofs & ((1 << header->obj_order) - 1);
687 len = min_t(u64, len, (1 << header->obj_order) - ofs);
688
689 if (segofs)
690 *segofs = ofs;
691
692 return len;
693}
694
1fec7093
YS
695static int rbd_get_num_segments(struct rbd_image_header *header,
696 u64 ofs, u64 len)
697{
df111be6
AE
698 u64 start_seg;
699 u64 end_seg;
700
701 if (!len)
702 return 0;
703 if (len - 1 > U64_MAX - ofs)
704 return -ERANGE;
705
706 start_seg = ofs >> header->obj_order;
707 end_seg = (ofs + len - 1) >> header->obj_order;
708
1fec7093
YS
709 return end_seg - start_seg + 1;
710}
711
029bcbd8
JD
712/*
713 * returns the size of an object in the image
714 */
715static u64 rbd_obj_bytes(struct rbd_image_header *header)
716{
717 return 1 << header->obj_order;
718}
719
602adf40
YS
720/*
721 * bio helpers
722 */
723
724static void bio_chain_put(struct bio *chain)
725{
726 struct bio *tmp;
727
728 while (chain) {
729 tmp = chain;
730 chain = chain->bi_next;
731 bio_put(tmp);
732 }
733}
734
735/*
736 * zeros a bio chain, starting at specific offset
737 */
738static void zero_bio_chain(struct bio *chain, int start_ofs)
739{
740 struct bio_vec *bv;
741 unsigned long flags;
742 void *buf;
743 int i;
744 int pos = 0;
745
746 while (chain) {
747 bio_for_each_segment(bv, chain, i) {
748 if (pos + bv->bv_len > start_ofs) {
749 int remainder = max(start_ofs - pos, 0);
750 buf = bvec_kmap_irq(bv, &flags);
751 memset(buf + remainder, 0,
752 bv->bv_len - remainder);
85b5aaa6 753 bvec_kunmap_irq(buf, &flags);
602adf40
YS
754 }
755 pos += bv->bv_len;
756 }
757
758 chain = chain->bi_next;
759 }
760}
761
762/*
763 * bio_chain_clone - clone a chain of bios up to a certain length.
764 * might return a bio_pair that will need to be released.
765 */
766static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
767 struct bio_pair **bp,
768 int len, gfp_t gfpmask)
769{
542582fc
AE
770 struct bio *old_chain = *old;
771 struct bio *new_chain = NULL;
772 struct bio *tail;
602adf40
YS
773 int total = 0;
774
775 if (*bp) {
776 bio_pair_release(*bp);
777 *bp = NULL;
778 }
779
780 while (old_chain && (total < len)) {
542582fc
AE
781 struct bio *tmp;
782
602adf40
YS
783 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
784 if (!tmp)
785 goto err_out;
542582fc 786 gfpmask &= ~__GFP_WAIT; /* can't wait after the first */
602adf40
YS
787
788 if (total + old_chain->bi_size > len) {
789 struct bio_pair *bp;
790
791 /*
792 * this split can only happen with a single paged bio,
793 * split_bio will BUG_ON if this is not the case
794 */
795 dout("bio_chain_clone split! total=%d remaining=%d"
bd919d45
AE
796 "bi_size=%u\n",
797 total, len - total, old_chain->bi_size);
602adf40
YS
798
799 /* split the bio. We'll release it either in the next
800 call, or it will have to be released outside */
593a9e7b 801 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
602adf40
YS
802 if (!bp)
803 goto err_out;
804
805 __bio_clone(tmp, &bp->bio1);
806
807 *next = &bp->bio2;
808 } else {
809 __bio_clone(tmp, old_chain);
810 *next = old_chain->bi_next;
811 }
812
813 tmp->bi_bdev = NULL;
602adf40 814 tmp->bi_next = NULL;
542582fc 815 if (new_chain)
602adf40 816 tail->bi_next = tmp;
542582fc
AE
817 else
818 new_chain = tmp;
819 tail = tmp;
602adf40
YS
820 old_chain = old_chain->bi_next;
821
822 total += tmp->bi_size;
823 }
824
825 BUG_ON(total < len);
826
602adf40
YS
827 *old = old_chain;
828
829 return new_chain;
830
831err_out:
832 dout("bio_chain_clone with err\n");
833 bio_chain_put(new_chain);
834 return NULL;
835}
836
837/*
838 * helpers for osd request op vectors.
839 */
57cfc106
AE
840static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
841 int opcode, u32 payload_len)
602adf40 842{
57cfc106
AE
843 struct ceph_osd_req_op *ops;
844
845 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
846 if (!ops)
847 return NULL;
848
849 ops[0].op = opcode;
850
602adf40
YS
851 /*
852 * op extent offset and length will be set later on
853 * in calc_raw_layout()
854 */
57cfc106
AE
855 ops[0].payload_len = payload_len;
856
857 return ops;
602adf40
YS
858}
859
860static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
861{
862 kfree(ops);
863}
864
1fec7093
YS
865static void rbd_coll_end_req_index(struct request *rq,
866 struct rbd_req_coll *coll,
867 int index,
868 int ret, u64 len)
869{
870 struct request_queue *q;
871 int min, max, i;
872
bd919d45
AE
873 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
874 coll, index, ret, (unsigned long long) len);
1fec7093
YS
875
876 if (!rq)
877 return;
878
879 if (!coll) {
880 blk_end_request(rq, ret, len);
881 return;
882 }
883
884 q = rq->q;
885
886 spin_lock_irq(q->queue_lock);
887 coll->status[index].done = 1;
888 coll->status[index].rc = ret;
889 coll->status[index].bytes = len;
890 max = min = coll->num_done;
891 while (max < coll->total && coll->status[max].done)
892 max++;
893
894 for (i = min; i<max; i++) {
895 __blk_end_request(rq, coll->status[i].rc,
896 coll->status[i].bytes);
897 coll->num_done++;
898 kref_put(&coll->kref, rbd_coll_release);
899 }
900 spin_unlock_irq(q->queue_lock);
901}
902
903static void rbd_coll_end_req(struct rbd_request *req,
904 int ret, u64 len)
905{
906 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
907}
908
602adf40
YS
909/*
910 * Send ceph osd request
911 */
912static int rbd_do_request(struct request *rq,
0ce1a794 913 struct rbd_device *rbd_dev,
602adf40
YS
914 struct ceph_snap_context *snapc,
915 u64 snapid,
aded07ea 916 const char *object_name, u64 ofs, u64 len,
602adf40
YS
917 struct bio *bio,
918 struct page **pages,
919 int num_pages,
920 int flags,
921 struct ceph_osd_req_op *ops,
1fec7093
YS
922 struct rbd_req_coll *coll,
923 int coll_index,
602adf40 924 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
925 struct ceph_msg *msg),
926 struct ceph_osd_request **linger_req,
927 u64 *ver)
602adf40
YS
928{
929 struct ceph_osd_request *req;
930 struct ceph_file_layout *layout;
931 int ret;
932 u64 bno;
933 struct timespec mtime = CURRENT_TIME;
934 struct rbd_request *req_data;
935 struct ceph_osd_request_head *reqhead;
1dbb4399 936 struct ceph_osd_client *osdc;
602adf40 937
602adf40 938 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
939 if (!req_data) {
940 if (coll)
941 rbd_coll_end_req_index(rq, coll, coll_index,
942 -ENOMEM, len);
943 return -ENOMEM;
944 }
945
946 if (coll) {
947 req_data->coll = coll;
948 req_data->coll_index = coll_index;
949 }
602adf40 950
bd919d45
AE
951 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
952 (unsigned long long) ofs, (unsigned long long) len);
602adf40 953
0ce1a794 954 osdc = &rbd_dev->rbd_client->client->osdc;
1dbb4399
AE
955 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
956 false, GFP_NOIO, pages, bio);
4ad12621 957 if (!req) {
4ad12621 958 ret = -ENOMEM;
602adf40
YS
959 goto done_pages;
960 }
961
962 req->r_callback = rbd_cb;
963
964 req_data->rq = rq;
965 req_data->bio = bio;
966 req_data->pages = pages;
967 req_data->len = len;
968
969 req->r_priv = req_data;
970
971 reqhead = req->r_request->front.iov_base;
972 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
973
aded07ea 974 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
602adf40
YS
975 req->r_oid_len = strlen(req->r_oid);
976
977 layout = &req->r_file_layout;
978 memset(layout, 0, sizeof(*layout));
979 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
980 layout->fl_stripe_count = cpu_to_le32(1);
981 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
0ce1a794 982 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1dbb4399
AE
983 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
984 req, ops);
602adf40
YS
985
986 ceph_osdc_build_request(req, ofs, &len,
987 ops,
988 snapc,
989 &mtime,
990 req->r_oid, req->r_oid_len);
602adf40 991
59c2be1e 992 if (linger_req) {
1dbb4399 993 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
994 *linger_req = req;
995 }
996
1dbb4399 997 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
998 if (ret < 0)
999 goto done_err;
1000
1001 if (!rbd_cb) {
1dbb4399 1002 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
1003 if (ver)
1004 *ver = le64_to_cpu(req->r_reassert_version.version);
bd919d45
AE
1005 dout("reassert_ver=%llu\n",
1006 (unsigned long long)
1007 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
1008 ceph_osdc_put_request(req);
1009 }
1010 return ret;
1011
1012done_err:
1013 bio_chain_put(req_data->bio);
1014 ceph_osdc_put_request(req);
1015done_pages:
1fec7093 1016 rbd_coll_end_req(req_data, ret, len);
602adf40 1017 kfree(req_data);
602adf40
YS
1018 return ret;
1019}
1020
1021/*
1022 * Ceph osd op callback
1023 */
1024static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1025{
1026 struct rbd_request *req_data = req->r_priv;
1027 struct ceph_osd_reply_head *replyhead;
1028 struct ceph_osd_op *op;
1029 __s32 rc;
1030 u64 bytes;
1031 int read_op;
1032
1033 /* parse reply */
1034 replyhead = msg->front.iov_base;
1035 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1036 op = (void *)(replyhead + 1);
1037 rc = le32_to_cpu(replyhead->result);
1038 bytes = le64_to_cpu(op->extent.length);
895cfcc8 1039 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
602adf40 1040
bd919d45
AE
1041 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1042 (unsigned long long) bytes, read_op, (int) rc);
602adf40
YS
1043
1044 if (rc == -ENOENT && read_op) {
1045 zero_bio_chain(req_data->bio, 0);
1046 rc = 0;
1047 } else if (rc == 0 && read_op && bytes < req_data->len) {
1048 zero_bio_chain(req_data->bio, bytes);
1049 bytes = req_data->len;
1050 }
1051
1fec7093 1052 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
1053
1054 if (req_data->bio)
1055 bio_chain_put(req_data->bio);
1056
1057 ceph_osdc_put_request(req);
1058 kfree(req_data);
1059}
1060
59c2be1e
YS
1061static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1062{
1063 ceph_osdc_put_request(req);
1064}
1065
602adf40
YS
1066/*
1067 * Do a synchronous ceph osd operation
1068 */
0ce1a794 1069static int rbd_req_sync_op(struct rbd_device *rbd_dev,
602adf40
YS
1070 struct ceph_snap_context *snapc,
1071 u64 snapid,
602adf40 1072 int flags,
913d2fdc 1073 struct ceph_osd_req_op *ops,
aded07ea 1074 const char *object_name,
602adf40 1075 u64 ofs, u64 len,
59c2be1e
YS
1076 char *buf,
1077 struct ceph_osd_request **linger_req,
1078 u64 *ver)
602adf40
YS
1079{
1080 int ret;
1081 struct page **pages;
1082 int num_pages;
913d2fdc
AE
1083
1084 BUG_ON(ops == NULL);
602adf40
YS
1085
1086 num_pages = calc_pages_for(ofs , len);
1087 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1088 if (IS_ERR(pages))
1089 return PTR_ERR(pages);
602adf40 1090
0ce1a794 1091 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
aded07ea 1092 object_name, ofs, len, NULL,
602adf40
YS
1093 pages, num_pages,
1094 flags,
1095 ops,
1fec7093 1096 NULL, 0,
59c2be1e
YS
1097 NULL,
1098 linger_req, ver);
602adf40 1099 if (ret < 0)
913d2fdc 1100 goto done;
602adf40
YS
1101
1102 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1103 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1104
602adf40
YS
1105done:
1106 ceph_release_page_vector(pages, num_pages);
1107 return ret;
1108}
1109
1110/*
1111 * Do an asynchronous ceph osd operation
1112 */
1113static int rbd_do_op(struct request *rq,
0ce1a794 1114 struct rbd_device *rbd_dev,
602adf40
YS
1115 struct ceph_snap_context *snapc,
1116 u64 snapid,
d1f57ea6 1117 int opcode, int flags,
602adf40 1118 u64 ofs, u64 len,
1fec7093
YS
1119 struct bio *bio,
1120 struct rbd_req_coll *coll,
1121 int coll_index)
602adf40
YS
1122{
1123 char *seg_name;
1124 u64 seg_ofs;
1125 u64 seg_len;
1126 int ret;
1127 struct ceph_osd_req_op *ops;
1128 u32 payload_len;
1129
1130 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1131 if (!seg_name)
1132 return -ENOMEM;
1133
1134 seg_len = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1135 rbd_dev->header.object_prefix,
602adf40
YS
1136 ofs, len,
1137 seg_name, &seg_ofs);
602adf40
YS
1138
1139 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1140
57cfc106
AE
1141 ret = -ENOMEM;
1142 ops = rbd_create_rw_ops(1, opcode, payload_len);
1143 if (!ops)
602adf40
YS
1144 goto done;
1145
1146 /* we've taken care of segment sizes earlier when we
1147 cloned the bios. We should never have a segment
1148 truncated at this point */
1149 BUG_ON(seg_len < len);
1150
1151 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1152 seg_name, seg_ofs, seg_len,
1153 bio,
1154 NULL, 0,
1155 flags,
1156 ops,
1fec7093 1157 coll, coll_index,
59c2be1e 1158 rbd_req_cb, 0, NULL);
11f77002
SW
1159
1160 rbd_destroy_ops(ops);
602adf40
YS
1161done:
1162 kfree(seg_name);
1163 return ret;
1164}
1165
1166/*
1167 * Request async osd write
1168 */
1169static int rbd_req_write(struct request *rq,
1170 struct rbd_device *rbd_dev,
1171 struct ceph_snap_context *snapc,
1172 u64 ofs, u64 len,
1fec7093
YS
1173 struct bio *bio,
1174 struct rbd_req_coll *coll,
1175 int coll_index)
602adf40
YS
1176{
1177 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1178 CEPH_OSD_OP_WRITE,
1179 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1fec7093 1180 ofs, len, bio, coll, coll_index);
602adf40
YS
1181}
1182
1183/*
1184 * Request async osd read
1185 */
1186static int rbd_req_read(struct request *rq,
1187 struct rbd_device *rbd_dev,
1188 u64 snapid,
1189 u64 ofs, u64 len,
1fec7093
YS
1190 struct bio *bio,
1191 struct rbd_req_coll *coll,
1192 int coll_index)
602adf40
YS
1193{
1194 return rbd_do_op(rq, rbd_dev, NULL,
b06e6a6b 1195 snapid,
602adf40
YS
1196 CEPH_OSD_OP_READ,
1197 CEPH_OSD_FLAG_READ,
1fec7093 1198 ofs, len, bio, coll, coll_index);
602adf40
YS
1199}
1200
1201/*
1202 * Request sync osd read
1203 */
0ce1a794 1204static int rbd_req_sync_read(struct rbd_device *rbd_dev,
602adf40 1205 u64 snapid,
aded07ea 1206 const char *object_name,
602adf40 1207 u64 ofs, u64 len,
59c2be1e
YS
1208 char *buf,
1209 u64 *ver)
602adf40 1210{
913d2fdc
AE
1211 struct ceph_osd_req_op *ops;
1212 int ret;
1213
1214 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1215 if (!ops)
1216 return -ENOMEM;
1217
1218 ret = rbd_req_sync_op(rbd_dev, NULL,
b06e6a6b 1219 snapid,
602adf40 1220 CEPH_OSD_FLAG_READ,
913d2fdc
AE
1221 ops, object_name, ofs, len, buf, NULL, ver);
1222 rbd_destroy_ops(ops);
1223
1224 return ret;
602adf40
YS
1225}
1226
1227/*
59c2be1e
YS
1228 * Request sync osd watch
1229 */
0ce1a794 1230static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
59c2be1e 1231 u64 ver,
7f0a24d8 1232 u64 notify_id)
59c2be1e
YS
1233{
1234 struct ceph_osd_req_op *ops;
11f77002
SW
1235 int ret;
1236
57cfc106
AE
1237 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1238 if (!ops)
1239 return -ENOMEM;
59c2be1e 1240
a71b891b 1241 ops[0].watch.ver = cpu_to_le64(ver);
59c2be1e
YS
1242 ops[0].watch.cookie = notify_id;
1243 ops[0].watch.flag = 0;
1244
0ce1a794 1245 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
7f0a24d8 1246 rbd_dev->header_name, 0, 0, NULL,
ad4f232f 1247 NULL, 0,
59c2be1e
YS
1248 CEPH_OSD_FLAG_READ,
1249 ops,
1fec7093 1250 NULL, 0,
59c2be1e
YS
1251 rbd_simple_req_cb, 0, NULL);
1252
1253 rbd_destroy_ops(ops);
1254 return ret;
1255}
1256
1257static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1258{
0ce1a794 1259 struct rbd_device *rbd_dev = (struct rbd_device *)data;
a71b891b 1260 u64 hver;
13143d2d
SW
1261 int rc;
1262
0ce1a794 1263 if (!rbd_dev)
59c2be1e
YS
1264 return;
1265
bd919d45
AE
1266 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1267 rbd_dev->header_name, (unsigned long long) notify_id,
1268 (unsigned int) opcode);
1fe5e993 1269 rc = rbd_refresh_header(rbd_dev, &hver);
13143d2d 1270 if (rc)
f0f8cef5 1271 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
0ce1a794 1272 " update snaps: %d\n", rbd_dev->major, rc);
59c2be1e 1273
7f0a24d8 1274 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
59c2be1e
YS
1275}
1276
1277/*
1278 * Request sync osd watch
1279 */
0e6f322d 1280static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
59c2be1e
YS
1281{
1282 struct ceph_osd_req_op *ops;
0ce1a794 1283 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
57cfc106 1284 int ret;
59c2be1e 1285
57cfc106
AE
1286 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1287 if (!ops)
1288 return -ENOMEM;
59c2be1e
YS
1289
1290 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
0ce1a794 1291 (void *)rbd_dev, &rbd_dev->watch_event);
59c2be1e
YS
1292 if (ret < 0)
1293 goto fail;
1294
0e6f322d 1295 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
0ce1a794 1296 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
59c2be1e
YS
1297 ops[0].watch.flag = 1;
1298
0ce1a794 1299 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1300 CEPH_NOSNAP,
59c2be1e
YS
1301 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1302 ops,
0e6f322d
AE
1303 rbd_dev->header_name,
1304 0, 0, NULL,
0ce1a794 1305 &rbd_dev->watch_request, NULL);
59c2be1e
YS
1306
1307 if (ret < 0)
1308 goto fail_event;
1309
1310 rbd_destroy_ops(ops);
1311 return 0;
1312
1313fail_event:
0ce1a794
AE
1314 ceph_osdc_cancel_event(rbd_dev->watch_event);
1315 rbd_dev->watch_event = NULL;
59c2be1e
YS
1316fail:
1317 rbd_destroy_ops(ops);
1318 return ret;
1319}
1320
79e3057c
YS
1321/*
1322 * Request sync osd unwatch
1323 */
070c633f 1324static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
79e3057c
YS
1325{
1326 struct ceph_osd_req_op *ops;
57cfc106 1327 int ret;
79e3057c 1328
57cfc106
AE
1329 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1330 if (!ops)
1331 return -ENOMEM;
79e3057c
YS
1332
1333 ops[0].watch.ver = 0;
0ce1a794 1334 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
79e3057c
YS
1335 ops[0].watch.flag = 0;
1336
0ce1a794 1337 ret = rbd_req_sync_op(rbd_dev, NULL,
79e3057c 1338 CEPH_NOSNAP,
79e3057c
YS
1339 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1340 ops,
070c633f
AE
1341 rbd_dev->header_name,
1342 0, 0, NULL, NULL, NULL);
1343
79e3057c
YS
1344
1345 rbd_destroy_ops(ops);
0ce1a794
AE
1346 ceph_osdc_cancel_event(rbd_dev->watch_event);
1347 rbd_dev->watch_event = NULL;
79e3057c
YS
1348 return ret;
1349}
1350
59c2be1e 1351struct rbd_notify_info {
0ce1a794 1352 struct rbd_device *rbd_dev;
59c2be1e
YS
1353};
1354
1355static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1356{
0ce1a794
AE
1357 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1358 if (!rbd_dev)
59c2be1e
YS
1359 return;
1360
bd919d45
AE
1361 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1362 rbd_dev->header_name, (unsigned long long) notify_id,
1363 (unsigned int) opcode);
59c2be1e
YS
1364}
1365
1366/*
1367 * Request sync osd notify
1368 */
4cb16250 1369static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
59c2be1e
YS
1370{
1371 struct ceph_osd_req_op *ops;
0ce1a794 1372 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1373 struct ceph_osd_event *event;
1374 struct rbd_notify_info info;
1375 int payload_len = sizeof(u32) + sizeof(u32);
1376 int ret;
1377
57cfc106
AE
1378 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1379 if (!ops)
1380 return -ENOMEM;
59c2be1e 1381
0ce1a794 1382 info.rbd_dev = rbd_dev;
59c2be1e
YS
1383
1384 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1385 (void *)&info, &event);
1386 if (ret < 0)
1387 goto fail;
1388
1389 ops[0].watch.ver = 1;
1390 ops[0].watch.flag = 1;
1391 ops[0].watch.cookie = event->cookie;
1392 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1393 ops[0].watch.timeout = 12;
1394
0ce1a794 1395 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e 1396 CEPH_NOSNAP,
59c2be1e
YS
1397 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1398 ops,
4cb16250
AE
1399 rbd_dev->header_name,
1400 0, 0, NULL, NULL, NULL);
59c2be1e
YS
1401 if (ret < 0)
1402 goto fail_event;
1403
1404 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1405 dout("ceph_osdc_wait_event returned %d\n", ret);
1406 rbd_destroy_ops(ops);
1407 return 0;
1408
1409fail_event:
1410 ceph_osdc_cancel_event(event);
1411fail:
1412 rbd_destroy_ops(ops);
1413 return ret;
1414}
1415
602adf40
YS
1416/*
1417 * Request sync osd read
1418 */
0ce1a794 1419static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
aded07ea
AE
1420 const char *object_name,
1421 const char *class_name,
1422 const char *method_name,
602adf40 1423 const char *data,
59c2be1e
YS
1424 int len,
1425 u64 *ver)
602adf40
YS
1426{
1427 struct ceph_osd_req_op *ops;
aded07ea
AE
1428 int class_name_len = strlen(class_name);
1429 int method_name_len = strlen(method_name);
57cfc106
AE
1430 int ret;
1431
1432 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
aded07ea 1433 class_name_len + method_name_len + len);
57cfc106
AE
1434 if (!ops)
1435 return -ENOMEM;
602adf40 1436
aded07ea
AE
1437 ops[0].cls.class_name = class_name;
1438 ops[0].cls.class_len = (__u8) class_name_len;
1439 ops[0].cls.method_name = method_name;
1440 ops[0].cls.method_len = (__u8) method_name_len;
602adf40
YS
1441 ops[0].cls.argc = 0;
1442 ops[0].cls.indata = data;
1443 ops[0].cls.indata_len = len;
1444
0ce1a794 1445 ret = rbd_req_sync_op(rbd_dev, NULL,
602adf40 1446 CEPH_NOSNAP,
602adf40
YS
1447 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1448 ops,
d1f57ea6 1449 object_name, 0, 0, NULL, NULL, ver);
602adf40
YS
1450
1451 rbd_destroy_ops(ops);
1452
1453 dout("cls_exec returned %d\n", ret);
1454 return ret;
1455}
1456
1fec7093
YS
1457static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1458{
1459 struct rbd_req_coll *coll =
1460 kzalloc(sizeof(struct rbd_req_coll) +
1461 sizeof(struct rbd_req_status) * num_reqs,
1462 GFP_ATOMIC);
1463
1464 if (!coll)
1465 return NULL;
1466 coll->total = num_reqs;
1467 kref_init(&coll->kref);
1468 return coll;
1469}
1470
602adf40
YS
1471/*
1472 * block device queue callback
1473 */
1474static void rbd_rq_fn(struct request_queue *q)
1475{
1476 struct rbd_device *rbd_dev = q->queuedata;
1477 struct request *rq;
1478 struct bio_pair *bp = NULL;
1479
00f1f36f 1480 while ((rq = blk_fetch_request(q))) {
602adf40
YS
1481 struct bio *bio;
1482 struct bio *rq_bio, *next_bio = NULL;
1483 bool do_write;
bd919d45
AE
1484 unsigned int size;
1485 u64 op_size = 0;
602adf40 1486 u64 ofs;
1fec7093
YS
1487 int num_segs, cur_seg = 0;
1488 struct rbd_req_coll *coll;
d1d25646 1489 struct ceph_snap_context *snapc;
602adf40 1490
602adf40
YS
1491 dout("fetched request\n");
1492
1493 /* filter out block requests we don't understand */
1494 if ((rq->cmd_type != REQ_TYPE_FS)) {
1495 __blk_end_request_all(rq, 0);
00f1f36f 1496 continue;
602adf40
YS
1497 }
1498
1499 /* deduce our operation (read, write) */
1500 do_write = (rq_data_dir(rq) == WRITE);
1501
1502 size = blk_rq_bytes(rq);
593a9e7b 1503 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
602adf40
YS
1504 rq_bio = rq->bio;
1505 if (do_write && rbd_dev->read_only) {
1506 __blk_end_request_all(rq, -EROFS);
00f1f36f 1507 continue;
602adf40
YS
1508 }
1509
1510 spin_unlock_irq(q->queue_lock);
1511
d1d25646 1512 down_read(&rbd_dev->header_rwsem);
e88a36ec 1513
d1d25646 1514 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
e88a36ec 1515 up_read(&rbd_dev->header_rwsem);
d1d25646
JD
1516 dout("request for non-existent snapshot");
1517 spin_lock_irq(q->queue_lock);
1518 __blk_end_request_all(rq, -ENXIO);
1519 continue;
e88a36ec
JD
1520 }
1521
d1d25646
JD
1522 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1523
1524 up_read(&rbd_dev->header_rwsem);
1525
602adf40
YS
1526 dout("%s 0x%x bytes at 0x%llx\n",
1527 do_write ? "write" : "read",
bd919d45 1528 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
602adf40 1529
1fec7093 1530 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
df111be6
AE
1531 if (num_segs <= 0) {
1532 spin_lock_irq(q->queue_lock);
1533 __blk_end_request_all(rq, num_segs);
1534 ceph_put_snap_context(snapc);
1535 continue;
1536 }
1fec7093
YS
1537 coll = rbd_alloc_coll(num_segs);
1538 if (!coll) {
1539 spin_lock_irq(q->queue_lock);
1540 __blk_end_request_all(rq, -ENOMEM);
d1d25646 1541 ceph_put_snap_context(snapc);
00f1f36f 1542 continue;
1fec7093
YS
1543 }
1544
602adf40
YS
1545 do {
1546 /* a bio clone to be passed down to OSD req */
bd919d45 1547 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
602adf40 1548 op_size = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1549 rbd_dev->header.object_prefix,
602adf40
YS
1550 ofs, size,
1551 NULL, NULL);
1fec7093 1552 kref_get(&coll->kref);
602adf40
YS
1553 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1554 op_size, GFP_ATOMIC);
1555 if (!bio) {
1fec7093
YS
1556 rbd_coll_end_req_index(rq, coll, cur_seg,
1557 -ENOMEM, op_size);
1558 goto next_seg;
602adf40
YS
1559 }
1560
1fec7093 1561
602adf40
YS
1562 /* init OSD command: write or read */
1563 if (do_write)
1564 rbd_req_write(rq, rbd_dev,
d1d25646 1565 snapc,
602adf40 1566 ofs,
1fec7093
YS
1567 op_size, bio,
1568 coll, cur_seg);
602adf40
YS
1569 else
1570 rbd_req_read(rq, rbd_dev,
77dfe99f 1571 rbd_dev->snap_id,
602adf40 1572 ofs,
1fec7093
YS
1573 op_size, bio,
1574 coll, cur_seg);
602adf40 1575
1fec7093 1576next_seg:
602adf40
YS
1577 size -= op_size;
1578 ofs += op_size;
1579
1fec7093 1580 cur_seg++;
602adf40
YS
1581 rq_bio = next_bio;
1582 } while (size > 0);
1fec7093 1583 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1584
1585 if (bp)
1586 bio_pair_release(bp);
602adf40 1587 spin_lock_irq(q->queue_lock);
d1d25646
JD
1588
1589 ceph_put_snap_context(snapc);
602adf40
YS
1590 }
1591}
1592
1593/*
1594 * a queue callback. Makes sure that we don't create a bio that spans across
1595 * multiple osd objects. One exception would be with a single page bios,
1596 * which we handle later at bio_chain_clone
1597 */
1598static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1599 struct bio_vec *bvec)
1600{
1601 struct rbd_device *rbd_dev = q->queuedata;
593a9e7b
AE
1602 unsigned int chunk_sectors;
1603 sector_t sector;
1604 unsigned int bio_sectors;
602adf40
YS
1605 int max;
1606
593a9e7b
AE
1607 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1608 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1609 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1610
602adf40 1611 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
593a9e7b 1612 + bio_sectors)) << SECTOR_SHIFT;
602adf40
YS
1613 if (max < 0)
1614 max = 0; /* bio_add cannot handle a negative return */
1615 if (max <= bvec->bv_len && bio_sectors == 0)
1616 return bvec->bv_len;
1617 return max;
1618}
1619
1620static void rbd_free_disk(struct rbd_device *rbd_dev)
1621{
1622 struct gendisk *disk = rbd_dev->disk;
1623
1624 if (!disk)
1625 return;
1626
1627 rbd_header_free(&rbd_dev->header);
1628
1629 if (disk->flags & GENHD_FL_UP)
1630 del_gendisk(disk);
1631 if (disk->queue)
1632 blk_cleanup_queue(disk->queue);
1633 put_disk(disk);
1634}
1635
1636/*
4156d998
AE
1637 * Read the complete header for the given rbd device.
1638 *
1639 * Returns a pointer to a dynamically-allocated buffer containing
1640 * the complete and validated header. Caller can pass the address
1641 * of a variable that will be filled in with the version of the
1642 * header object at the time it was read.
1643 *
1644 * Returns a pointer-coded errno if a failure occurs.
602adf40 1645 */
4156d998
AE
1646static struct rbd_image_header_ondisk *
1647rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
602adf40 1648{
4156d998 1649 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 1650 u32 snap_count = 0;
4156d998
AE
1651 u64 names_size = 0;
1652 u32 want_count;
1653 int ret;
602adf40 1654
00f1f36f 1655 /*
4156d998
AE
1656 * The complete header will include an array of its 64-bit
1657 * snapshot ids, followed by the names of those snapshots as
1658 * a contiguous block of NUL-terminated strings. Note that
1659 * the number of snapshots could change by the time we read
1660 * it in, in which case we re-read it.
00f1f36f 1661 */
4156d998
AE
1662 do {
1663 size_t size;
1664
1665 kfree(ondisk);
1666
1667 size = sizeof (*ondisk);
1668 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
1669 size += names_size;
1670 ondisk = kmalloc(size, GFP_KERNEL);
1671 if (!ondisk)
1672 return ERR_PTR(-ENOMEM);
1673
1674 ret = rbd_req_sync_read(rbd_dev, CEPH_NOSNAP,
0bed54dc 1675 rbd_dev->header_name,
4156d998
AE
1676 0, size,
1677 (char *) ondisk, version);
1678
1679 if (ret < 0)
1680 goto out_err;
1681 if (WARN_ON((size_t) ret < size)) {
1682 ret = -ENXIO;
1683 pr_warning("short header read for image %s"
1684 " (want %zd got %d)\n",
1685 rbd_dev->image_name, size, ret);
1686 goto out_err;
1687 }
1688 if (!rbd_dev_ondisk_valid(ondisk)) {
1689 ret = -ENXIO;
1690 pr_warning("invalid header for image %s\n",
1691 rbd_dev->image_name);
1692 goto out_err;
81e759fb 1693 }
602adf40 1694
4156d998
AE
1695 names_size = le64_to_cpu(ondisk->snap_names_len);
1696 want_count = snap_count;
1697 snap_count = le32_to_cpu(ondisk->snap_count);
1698 } while (snap_count != want_count);
00f1f36f 1699
4156d998 1700 return ondisk;
00f1f36f 1701
4156d998
AE
1702out_err:
1703 kfree(ondisk);
1704
1705 return ERR_PTR(ret);
1706}
1707
1708/*
1709 * reload the ondisk the header
1710 */
1711static int rbd_read_header(struct rbd_device *rbd_dev,
1712 struct rbd_image_header *header)
1713{
1714 struct rbd_image_header_ondisk *ondisk;
1715 u64 ver = 0;
1716 int ret;
602adf40 1717
4156d998
AE
1718 ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
1719 if (IS_ERR(ondisk))
1720 return PTR_ERR(ondisk);
1721 ret = rbd_header_from_disk(header, ondisk);
1722 if (ret >= 0)
1723 header->obj_version = ver;
1724 kfree(ondisk);
1725
1726 return ret;
602adf40
YS
1727}
1728
1729/*
1730 * create a snapshot
1731 */
0ce1a794 1732static int rbd_header_add_snap(struct rbd_device *rbd_dev,
602adf40
YS
1733 const char *snap_name,
1734 gfp_t gfp_flags)
1735{
1736 int name_len = strlen(snap_name);
1737 u64 new_snapid;
1738 int ret;
916d4d67 1739 void *data, *p, *e;
1dbb4399 1740 struct ceph_mon_client *monc;
602adf40
YS
1741
1742 /* we should create a snapshot only if we're pointing at the head */
0ce1a794 1743 if (rbd_dev->snap_id != CEPH_NOSNAP)
602adf40
YS
1744 return -EINVAL;
1745
0ce1a794
AE
1746 monc = &rbd_dev->rbd_client->client->monc;
1747 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
bd919d45 1748 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
602adf40
YS
1749 if (ret < 0)
1750 return ret;
1751
1752 data = kmalloc(name_len + 16, gfp_flags);
1753 if (!data)
1754 return -ENOMEM;
1755
916d4d67
SW
1756 p = data;
1757 e = data + name_len + 16;
602adf40 1758
916d4d67
SW
1759 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1760 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40 1761
0bed54dc 1762 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
0ce1a794 1763 "rbd", "snap_add",
d67d4be5 1764 data, p - data, NULL);
602adf40 1765
916d4d67 1766 kfree(data);
602adf40 1767
505cbb9b 1768 return ret < 0 ? ret : 0;
602adf40
YS
1769bad:
1770 return -ERANGE;
1771}
1772
dfc5606d
YS
1773static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1774{
1775 struct rbd_snap *snap;
a0593290 1776 struct rbd_snap *next;
dfc5606d 1777
a0593290 1778 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
14e7085d 1779 __rbd_remove_snap_dev(snap);
dfc5606d
YS
1780}
1781
602adf40
YS
1782/*
1783 * only read the first part of the ondisk header, without the snaps info
1784 */
b813623a 1785static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
602adf40
YS
1786{
1787 int ret;
1788 struct rbd_image_header h;
602adf40
YS
1789
1790 ret = rbd_read_header(rbd_dev, &h);
1791 if (ret < 0)
1792 return ret;
1793
a51aa0c0
JD
1794 down_write(&rbd_dev->header_rwsem);
1795
9db4b3e3 1796 /* resized? */
474ef7ce
JD
1797 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1798 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1799
1800 dout("setting size to %llu sectors", (unsigned long long) size);
1801 set_capacity(rbd_dev->disk, size);
1802 }
9db4b3e3 1803
849b4260 1804 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 1805 kfree(rbd_dev->header.snap_sizes);
849b4260 1806 kfree(rbd_dev->header.snap_names);
d1d25646
JD
1807 /* osd requests may still refer to snapc */
1808 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 1809
b813623a
AE
1810 if (hver)
1811 *hver = h.obj_version;
a71b891b 1812 rbd_dev->header.obj_version = h.obj_version;
93a24e08 1813 rbd_dev->header.image_size = h.image_size;
602adf40
YS
1814 rbd_dev->header.total_snaps = h.total_snaps;
1815 rbd_dev->header.snapc = h.snapc;
1816 rbd_dev->header.snap_names = h.snap_names;
1817 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
1818 /* Free the extra copy of the object prefix */
1819 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1820 kfree(h.object_prefix);
1821
dfc5606d
YS
1822 ret = __rbd_init_snaps_header(rbd_dev);
1823
c666601a 1824 up_write(&rbd_dev->header_rwsem);
602adf40 1825
dfc5606d 1826 return ret;
602adf40
YS
1827}
1828
1fe5e993
AE
1829static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1830{
1831 int ret;
1832
1833 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1834 ret = __rbd_refresh_header(rbd_dev, hver);
1835 mutex_unlock(&ctl_mutex);
1836
1837 return ret;
1838}
1839
602adf40
YS
1840static int rbd_init_disk(struct rbd_device *rbd_dev)
1841{
1842 struct gendisk *disk;
1843 struct request_queue *q;
1844 int rc;
593a9e7b 1845 u64 segment_size;
602adf40
YS
1846 u64 total_size = 0;
1847
1848 /* contact OSD, request size info about the object being mapped */
1849 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1850 if (rc)
1851 return rc;
1852
dfc5606d
YS
1853 /* no need to lock here, as rbd_dev is not registered yet */
1854 rc = __rbd_init_snaps_header(rbd_dev);
1855 if (rc)
1856 return rc;
1857
cc9d734c 1858 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1859 if (rc)
1860 return rc;
1861
1862 /* create gendisk info */
1863 rc = -ENOMEM;
1864 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1865 if (!disk)
1866 goto out;
1867
f0f8cef5 1868 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 1869 rbd_dev->dev_id);
602adf40
YS
1870 disk->major = rbd_dev->major;
1871 disk->first_minor = 0;
1872 disk->fops = &rbd_bd_ops;
1873 disk->private_data = rbd_dev;
1874
1875 /* init rq */
1876 rc = -ENOMEM;
1877 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1878 if (!q)
1879 goto out_disk;
029bcbd8 1880
593a9e7b
AE
1881 /* We use the default size, but let's be explicit about it. */
1882 blk_queue_physical_block_size(q, SECTOR_SIZE);
1883
029bcbd8 1884 /* set io sizes to object size */
593a9e7b
AE
1885 segment_size = rbd_obj_bytes(&rbd_dev->header);
1886 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1887 blk_queue_max_segment_size(q, segment_size);
1888 blk_queue_io_min(q, segment_size);
1889 blk_queue_io_opt(q, segment_size);
029bcbd8 1890
602adf40
YS
1891 blk_queue_merge_bvec(q, rbd_merge_bvec);
1892 disk->queue = q;
1893
1894 q->queuedata = rbd_dev;
1895
1896 rbd_dev->disk = disk;
1897 rbd_dev->q = q;
1898
1899 /* finally, announce the disk to the world */
593a9e7b 1900 set_capacity(disk, total_size / SECTOR_SIZE);
602adf40
YS
1901 add_disk(disk);
1902
1903 pr_info("%s: added with size 0x%llx\n",
1904 disk->disk_name, (unsigned long long)total_size);
1905 return 0;
1906
1907out_disk:
1908 put_disk(disk);
1909out:
1910 return rc;
1911}
1912
dfc5606d
YS
1913/*
1914 sysfs
1915*/
1916
593a9e7b
AE
1917static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1918{
1919 return container_of(dev, struct rbd_device, dev);
1920}
1921
dfc5606d
YS
1922static ssize_t rbd_size_show(struct device *dev,
1923 struct device_attribute *attr, char *buf)
1924{
593a9e7b 1925 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
1926 sector_t size;
1927
1928 down_read(&rbd_dev->header_rwsem);
1929 size = get_capacity(rbd_dev->disk);
1930 up_read(&rbd_dev->header_rwsem);
dfc5606d 1931
a51aa0c0 1932 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
1933}
1934
1935static ssize_t rbd_major_show(struct device *dev,
1936 struct device_attribute *attr, char *buf)
1937{
593a9e7b 1938 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 1939
dfc5606d
YS
1940 return sprintf(buf, "%d\n", rbd_dev->major);
1941}
1942
1943static ssize_t rbd_client_id_show(struct device *dev,
1944 struct device_attribute *attr, char *buf)
602adf40 1945{
593a9e7b 1946 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1947
1dbb4399
AE
1948 return sprintf(buf, "client%lld\n",
1949 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1950}
1951
dfc5606d
YS
1952static ssize_t rbd_pool_show(struct device *dev,
1953 struct device_attribute *attr, char *buf)
602adf40 1954{
593a9e7b 1955 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1956
1957 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1958}
1959
9bb2f334
AE
1960static ssize_t rbd_pool_id_show(struct device *dev,
1961 struct device_attribute *attr, char *buf)
1962{
1963 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1964
1965 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1966}
1967
dfc5606d
YS
1968static ssize_t rbd_name_show(struct device *dev,
1969 struct device_attribute *attr, char *buf)
1970{
593a9e7b 1971 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1972
0bed54dc 1973 return sprintf(buf, "%s\n", rbd_dev->image_name);
dfc5606d
YS
1974}
1975
1976static ssize_t rbd_snap_show(struct device *dev,
1977 struct device_attribute *attr,
1978 char *buf)
1979{
593a9e7b 1980 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1981
1982 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1983}
1984
1985static ssize_t rbd_image_refresh(struct device *dev,
1986 struct device_attribute *attr,
1987 const char *buf,
1988 size_t size)
1989{
593a9e7b 1990 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 1991 int ret;
602adf40 1992
1fe5e993 1993 ret = rbd_refresh_header(rbd_dev, NULL);
b813623a
AE
1994
1995 return ret < 0 ? ret : size;
dfc5606d 1996}
602adf40 1997
dfc5606d
YS
1998static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1999static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
2000static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
2001static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 2002static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d
YS
2003static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
2004static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
2005static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
2006static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
2007
2008static struct attribute *rbd_attrs[] = {
2009 &dev_attr_size.attr,
2010 &dev_attr_major.attr,
2011 &dev_attr_client_id.attr,
2012 &dev_attr_pool.attr,
9bb2f334 2013 &dev_attr_pool_id.attr,
dfc5606d
YS
2014 &dev_attr_name.attr,
2015 &dev_attr_current_snap.attr,
2016 &dev_attr_refresh.attr,
2017 &dev_attr_create_snap.attr,
dfc5606d
YS
2018 NULL
2019};
2020
2021static struct attribute_group rbd_attr_group = {
2022 .attrs = rbd_attrs,
2023};
2024
2025static const struct attribute_group *rbd_attr_groups[] = {
2026 &rbd_attr_group,
2027 NULL
2028};
2029
2030static void rbd_sysfs_dev_release(struct device *dev)
2031{
2032}
2033
2034static struct device_type rbd_device_type = {
2035 .name = "rbd",
2036 .groups = rbd_attr_groups,
2037 .release = rbd_sysfs_dev_release,
2038};
2039
2040
2041/*
2042 sysfs - snapshots
2043*/
2044
2045static ssize_t rbd_snap_size_show(struct device *dev,
2046 struct device_attribute *attr,
2047 char *buf)
2048{
2049 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2050
3591538f 2051 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
2052}
2053
2054static ssize_t rbd_snap_id_show(struct device *dev,
2055 struct device_attribute *attr,
2056 char *buf)
2057{
2058 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2059
3591538f 2060 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
2061}
2062
2063static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2064static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2065
2066static struct attribute *rbd_snap_attrs[] = {
2067 &dev_attr_snap_size.attr,
2068 &dev_attr_snap_id.attr,
2069 NULL,
2070};
2071
2072static struct attribute_group rbd_snap_attr_group = {
2073 .attrs = rbd_snap_attrs,
2074};
2075
2076static void rbd_snap_dev_release(struct device *dev)
2077{
2078 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2079 kfree(snap->name);
2080 kfree(snap);
2081}
2082
2083static const struct attribute_group *rbd_snap_attr_groups[] = {
2084 &rbd_snap_attr_group,
2085 NULL
2086};
2087
2088static struct device_type rbd_snap_device_type = {
2089 .groups = rbd_snap_attr_groups,
2090 .release = rbd_snap_dev_release,
2091};
2092
14e7085d 2093static void __rbd_remove_snap_dev(struct rbd_snap *snap)
dfc5606d
YS
2094{
2095 list_del(&snap->node);
2096 device_unregister(&snap->dev);
2097}
2098
14e7085d 2099static int rbd_register_snap_dev(struct rbd_snap *snap,
dfc5606d
YS
2100 struct device *parent)
2101{
2102 struct device *dev = &snap->dev;
2103 int ret;
2104
2105 dev->type = &rbd_snap_device_type;
2106 dev->parent = parent;
2107 dev->release = rbd_snap_dev_release;
2108 dev_set_name(dev, "snap_%s", snap->name);
2109 ret = device_register(dev);
2110
2111 return ret;
2112}
2113
4e891e0a
AE
2114static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2115 int i, const char *name)
dfc5606d 2116{
4e891e0a 2117 struct rbd_snap *snap;
dfc5606d 2118 int ret;
4e891e0a
AE
2119
2120 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 2121 if (!snap)
4e891e0a
AE
2122 return ERR_PTR(-ENOMEM);
2123
2124 ret = -ENOMEM;
dfc5606d 2125 snap->name = kstrdup(name, GFP_KERNEL);
4e891e0a
AE
2126 if (!snap->name)
2127 goto err;
2128
dfc5606d
YS
2129 snap->size = rbd_dev->header.snap_sizes[i];
2130 snap->id = rbd_dev->header.snapc->snaps[i];
2131 if (device_is_registered(&rbd_dev->dev)) {
14e7085d 2132 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d
YS
2133 if (ret < 0)
2134 goto err;
2135 }
4e891e0a
AE
2136
2137 return snap;
2138
dfc5606d
YS
2139err:
2140 kfree(snap->name);
2141 kfree(snap);
4e891e0a
AE
2142
2143 return ERR_PTR(ret);
dfc5606d
YS
2144}
2145
2146/*
35938150
AE
2147 * Scan the rbd device's current snapshot list and compare it to the
2148 * newly-received snapshot context. Remove any existing snapshots
2149 * not present in the new snapshot context. Add a new snapshot for
2150 * any snaphots in the snapshot context not in the current list.
2151 * And verify there are no changes to snapshots we already know
2152 * about.
2153 *
2154 * Assumes the snapshots in the snapshot context are sorted by
2155 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2156 * are also maintained in that order.)
dfc5606d
YS
2157 */
2158static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2159{
35938150
AE
2160 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2161 const u32 snap_count = snapc->num_snaps;
2162 char *snap_name = rbd_dev->header.snap_names;
2163 struct list_head *head = &rbd_dev->snaps;
2164 struct list_head *links = head->next;
2165 u32 index = 0;
dfc5606d 2166
35938150
AE
2167 while (index < snap_count || links != head) {
2168 u64 snap_id;
2169 struct rbd_snap *snap;
dfc5606d 2170
35938150
AE
2171 snap_id = index < snap_count ? snapc->snaps[index]
2172 : CEPH_NOSNAP;
2173 snap = links != head ? list_entry(links, struct rbd_snap, node)
2174 : NULL;
2175 BUG_ON(snap && snap->id == CEPH_NOSNAP);
dfc5606d 2176
35938150
AE
2177 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2178 struct list_head *next = links->next;
dfc5606d 2179
35938150 2180 /* Existing snapshot not in the new snap context */
dfc5606d 2181
35938150 2182 if (rbd_dev->snap_id == snap->id)
e88a36ec 2183 rbd_dev->snap_exists = false;
35938150
AE
2184 __rbd_remove_snap_dev(snap);
2185
2186 /* Done with this list entry; advance */
2187
2188 links = next;
dfc5606d
YS
2189 continue;
2190 }
35938150
AE
2191
2192 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2193 struct rbd_snap *new_snap;
2194
2195 /* We haven't seen this snapshot before */
2196
2197 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2198 snap_name);
2199 if (IS_ERR(new_snap))
2200 return PTR_ERR(new_snap);
2201
2202 /* New goes before existing, or at end of list */
2203
2204 if (snap)
2205 list_add_tail(&new_snap->node, &snap->node);
2206 else
523f3258 2207 list_add_tail(&new_snap->node, head);
35938150
AE
2208 } else {
2209 /* Already have this one */
2210
2211 BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2212 BUG_ON(strcmp(snap->name, snap_name));
2213
2214 /* Done with this list entry; advance */
2215
2216 links = links->next;
dfc5606d 2217 }
35938150
AE
2218
2219 /* Advance to the next entry in the snapshot context */
2220
2221 index++;
2222 snap_name += strlen(snap_name) + 1;
dfc5606d
YS
2223 }
2224
2225 return 0;
2226}
2227
dfc5606d
YS
2228static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2229{
f0f8cef5 2230 int ret;
dfc5606d
YS
2231 struct device *dev;
2232 struct rbd_snap *snap;
2233
2234 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2235 dev = &rbd_dev->dev;
2236
2237 dev->bus = &rbd_bus_type;
2238 dev->type = &rbd_device_type;
2239 dev->parent = &rbd_root_dev;
2240 dev->release = rbd_dev_release;
de71a297 2241 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d
YS
2242 ret = device_register(dev);
2243 if (ret < 0)
f0f8cef5 2244 goto out;
dfc5606d
YS
2245
2246 list_for_each_entry(snap, &rbd_dev->snaps, node) {
14e7085d 2247 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
dfc5606d 2248 if (ret < 0)
602adf40
YS
2249 break;
2250 }
f0f8cef5 2251out:
dfc5606d
YS
2252 mutex_unlock(&ctl_mutex);
2253 return ret;
602adf40
YS
2254}
2255
dfc5606d
YS
2256static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2257{
2258 device_unregister(&rbd_dev->dev);
2259}
2260
59c2be1e
YS
2261static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2262{
2263 int ret, rc;
2264
2265 do {
0e6f322d 2266 ret = rbd_req_sync_watch(rbd_dev);
59c2be1e 2267 if (ret == -ERANGE) {
1fe5e993 2268 rc = rbd_refresh_header(rbd_dev, NULL);
59c2be1e
YS
2269 if (rc < 0)
2270 return rc;
2271 }
2272 } while (ret == -ERANGE);
2273
2274 return ret;
2275}
2276
1ddbe94e
AE
2277static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2278
2279/*
499afd5b
AE
2280 * Get a unique rbd identifier for the given new rbd_dev, and add
2281 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2282 */
499afd5b 2283static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2284{
de71a297 2285 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
499afd5b
AE
2286
2287 spin_lock(&rbd_dev_list_lock);
2288 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2289 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2290}
b7f23c36 2291
1ddbe94e 2292/*
499afd5b
AE
2293 * Remove an rbd_dev from the global list, and record that its
2294 * identifier is no longer in use.
1ddbe94e 2295 */
499afd5b 2296static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2297{
d184f6bf 2298 struct list_head *tmp;
de71a297 2299 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
2300 int max_id;
2301
2302 BUG_ON(rbd_id < 1);
499afd5b
AE
2303
2304 spin_lock(&rbd_dev_list_lock);
2305 list_del_init(&rbd_dev->node);
d184f6bf
AE
2306
2307 /*
2308 * If the id being "put" is not the current maximum, there
2309 * is nothing special we need to do.
2310 */
2311 if (rbd_id != atomic64_read(&rbd_id_max)) {
2312 spin_unlock(&rbd_dev_list_lock);
2313 return;
2314 }
2315
2316 /*
2317 * We need to update the current maximum id. Search the
2318 * list to find out what it is. We're more likely to find
2319 * the maximum at the end, so search the list backward.
2320 */
2321 max_id = 0;
2322 list_for_each_prev(tmp, &rbd_dev_list) {
2323 struct rbd_device *rbd_dev;
2324
2325 rbd_dev = list_entry(tmp, struct rbd_device, node);
2326 if (rbd_id > max_id)
2327 max_id = rbd_id;
2328 }
499afd5b 2329 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2330
1ddbe94e 2331 /*
d184f6bf
AE
2332 * The max id could have been updated by rbd_id_get(), in
2333 * which case it now accurately reflects the new maximum.
2334 * Be careful not to overwrite the maximum value in that
2335 * case.
1ddbe94e 2336 */
d184f6bf 2337 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2338}
2339
e28fff26
AE
2340/*
2341 * Skips over white space at *buf, and updates *buf to point to the
2342 * first found non-space character (if any). Returns the length of
593a9e7b
AE
2343 * the token (string of non-white space characters) found. Note
2344 * that *buf must be terminated with '\0'.
e28fff26
AE
2345 */
2346static inline size_t next_token(const char **buf)
2347{
2348 /*
2349 * These are the characters that produce nonzero for
2350 * isspace() in the "C" and "POSIX" locales.
2351 */
2352 const char *spaces = " \f\n\r\t\v";
2353
2354 *buf += strspn(*buf, spaces); /* Find start of token */
2355
2356 return strcspn(*buf, spaces); /* Return token length */
2357}
2358
2359/*
2360 * Finds the next token in *buf, and if the provided token buffer is
2361 * big enough, copies the found token into it. The result, if
593a9e7b
AE
2362 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2363 * must be terminated with '\0' on entry.
e28fff26
AE
2364 *
2365 * Returns the length of the token found (not including the '\0').
2366 * Return value will be 0 if no token is found, and it will be >=
2367 * token_size if the token would not fit.
2368 *
593a9e7b 2369 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
2370 * found token. Note that this occurs even if the token buffer is
2371 * too small to hold it.
2372 */
2373static inline size_t copy_token(const char **buf,
2374 char *token,
2375 size_t token_size)
2376{
2377 size_t len;
2378
2379 len = next_token(buf);
2380 if (len < token_size) {
2381 memcpy(token, *buf, len);
2382 *(token + len) = '\0';
2383 }
2384 *buf += len;
2385
2386 return len;
2387}
2388
ea3352f4
AE
2389/*
2390 * Finds the next token in *buf, dynamically allocates a buffer big
2391 * enough to hold a copy of it, and copies the token into the new
2392 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2393 * that a duplicate buffer is created even for a zero-length token.
2394 *
2395 * Returns a pointer to the newly-allocated duplicate, or a null
2396 * pointer if memory for the duplicate was not available. If
2397 * the lenp argument is a non-null pointer, the length of the token
2398 * (not including the '\0') is returned in *lenp.
2399 *
2400 * If successful, the *buf pointer will be updated to point beyond
2401 * the end of the found token.
2402 *
2403 * Note: uses GFP_KERNEL for allocation.
2404 */
2405static inline char *dup_token(const char **buf, size_t *lenp)
2406{
2407 char *dup;
2408 size_t len;
2409
2410 len = next_token(buf);
2411 dup = kmalloc(len + 1, GFP_KERNEL);
2412 if (!dup)
2413 return NULL;
2414
2415 memcpy(dup, *buf, len);
2416 *(dup + len) = '\0';
2417 *buf += len;
2418
2419 if (lenp)
2420 *lenp = len;
2421
2422 return dup;
2423}
2424
a725f65e 2425/*
0bed54dc 2426 * This fills in the pool_name, image_name, image_name_len, snap_name,
a725f65e
AE
2427 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2428 * on the list of monitor addresses and other options provided via
2429 * /sys/bus/rbd/add.
d22f76e7
AE
2430 *
2431 * Note: rbd_dev is assumed to have been initially zero-filled.
a725f65e
AE
2432 */
2433static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2434 const char *buf,
7ef3214a 2435 const char **mon_addrs,
5214ecc4 2436 size_t *mon_addrs_size,
e28fff26 2437 char *options,
0bed54dc 2438 size_t options_size)
e28fff26 2439{
d22f76e7
AE
2440 size_t len;
2441 int ret;
e28fff26
AE
2442
2443 /* The first four tokens are required */
2444
7ef3214a
AE
2445 len = next_token(&buf);
2446 if (!len)
a725f65e 2447 return -EINVAL;
5214ecc4 2448 *mon_addrs_size = len + 1;
7ef3214a
AE
2449 *mon_addrs = buf;
2450
2451 buf += len;
a725f65e 2452
e28fff26
AE
2453 len = copy_token(&buf, options, options_size);
2454 if (!len || len >= options_size)
2455 return -EINVAL;
2456
bf3e5ae1 2457 ret = -ENOMEM;
d22f76e7
AE
2458 rbd_dev->pool_name = dup_token(&buf, NULL);
2459 if (!rbd_dev->pool_name)
d22f76e7 2460 goto out_err;
e28fff26 2461
0bed54dc
AE
2462 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2463 if (!rbd_dev->image_name)
bf3e5ae1 2464 goto out_err;
a725f65e 2465
cb8627c7
AE
2466 /* Create the name of the header object */
2467
0bed54dc 2468 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
bf3e5ae1
AE
2469 + sizeof (RBD_SUFFIX),
2470 GFP_KERNEL);
0bed54dc 2471 if (!rbd_dev->header_name)
cb8627c7 2472 goto out_err;
0bed54dc 2473 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
a725f65e 2474
e28fff26 2475 /*
820a5f3e
AE
2476 * The snapshot name is optional. If none is is supplied,
2477 * we use the default value.
e28fff26 2478 */
820a5f3e
AE
2479 rbd_dev->snap_name = dup_token(&buf, &len);
2480 if (!rbd_dev->snap_name)
2481 goto out_err;
2482 if (!len) {
2483 /* Replace the empty name with the default */
2484 kfree(rbd_dev->snap_name);
2485 rbd_dev->snap_name
2486 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2487 if (!rbd_dev->snap_name)
2488 goto out_err;
2489
e28fff26
AE
2490 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2491 sizeof (RBD_SNAP_HEAD_NAME));
849b4260 2492 }
e28fff26 2493
a725f65e 2494 return 0;
d22f76e7
AE
2495
2496out_err:
0bed54dc 2497 kfree(rbd_dev->header_name);
d78fd7ae 2498 rbd_dev->header_name = NULL;
0bed54dc 2499 kfree(rbd_dev->image_name);
d78fd7ae
AE
2500 rbd_dev->image_name = NULL;
2501 rbd_dev->image_name_len = 0;
d22f76e7
AE
2502 kfree(rbd_dev->pool_name);
2503 rbd_dev->pool_name = NULL;
2504
2505 return ret;
a725f65e
AE
2506}
2507
59c2be1e
YS
2508static ssize_t rbd_add(struct bus_type *bus,
2509 const char *buf,
2510 size_t count)
602adf40 2511{
cb8627c7
AE
2512 char *options;
2513 struct rbd_device *rbd_dev = NULL;
7ef3214a
AE
2514 const char *mon_addrs = NULL;
2515 size_t mon_addrs_size = 0;
27cc2594
AE
2516 struct ceph_osd_client *osdc;
2517 int rc = -ENOMEM;
602adf40
YS
2518
2519 if (!try_module_get(THIS_MODULE))
2520 return -ENODEV;
2521
60571c7d 2522 options = kmalloc(count, GFP_KERNEL);
602adf40 2523 if (!options)
27cc2594 2524 goto err_nomem;
cb8627c7
AE
2525 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2526 if (!rbd_dev)
2527 goto err_nomem;
602adf40
YS
2528
2529 /* static rbd_device initialization */
2530 spin_lock_init(&rbd_dev->lock);
2531 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2532 INIT_LIST_HEAD(&rbd_dev->snaps);
c666601a 2533 init_rwsem(&rbd_dev->header_rwsem);
602adf40 2534
d184f6bf 2535 /* generate unique id: find highest unique id, add one */
499afd5b 2536 rbd_id_get(rbd_dev);
602adf40 2537
a725f65e 2538 /* Fill in the device name, now that we have its id. */
81a89793
AE
2539 BUILD_BUG_ON(DEV_NAME_LEN
2540 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
de71a297 2541 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
a725f65e 2542
602adf40 2543 /* parse add command */
7ef3214a 2544 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
e28fff26 2545 options, count);
a725f65e 2546 if (rc)
f0f8cef5 2547 goto err_put_id;
e124a82f 2548
f8c38929
AE
2549 rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
2550 if (rc < 0)
f0f8cef5 2551 goto err_put_id;
602adf40 2552
602adf40 2553 /* pick the pool */
1dbb4399 2554 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2555 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2556 if (rc < 0)
2557 goto err_out_client;
9bb2f334 2558 rbd_dev->pool_id = rc;
602adf40
YS
2559
2560 /* register our block device */
27cc2594
AE
2561 rc = register_blkdev(0, rbd_dev->name);
2562 if (rc < 0)
602adf40 2563 goto err_out_client;
27cc2594 2564 rbd_dev->major = rc;
602adf40 2565
dfc5606d
YS
2566 rc = rbd_bus_add_dev(rbd_dev);
2567 if (rc)
766fc439
YS
2568 goto err_out_blkdev;
2569
32eec68d
AE
2570 /*
2571 * At this point cleanup in the event of an error is the job
2572 * of the sysfs code (initiated by rbd_bus_del_dev()).
2573 *
2574 * Set up and announce blkdev mapping.
2575 */
602adf40
YS
2576 rc = rbd_init_disk(rbd_dev);
2577 if (rc)
766fc439 2578 goto err_out_bus;
602adf40 2579
59c2be1e
YS
2580 rc = rbd_init_watch_dev(rbd_dev);
2581 if (rc)
2582 goto err_out_bus;
2583
602adf40
YS
2584 return count;
2585
766fc439 2586err_out_bus:
766fc439
YS
2587 /* this will also clean up rest of rbd_dev stuff */
2588
2589 rbd_bus_del_dev(rbd_dev);
2590 kfree(options);
766fc439
YS
2591 return rc;
2592
602adf40
YS
2593err_out_blkdev:
2594 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2595err_out_client:
2596 rbd_put_client(rbd_dev);
f0f8cef5 2597err_put_id:
cb8627c7 2598 if (rbd_dev->pool_name) {
820a5f3e 2599 kfree(rbd_dev->snap_name);
0bed54dc
AE
2600 kfree(rbd_dev->header_name);
2601 kfree(rbd_dev->image_name);
cb8627c7
AE
2602 kfree(rbd_dev->pool_name);
2603 }
499afd5b 2604 rbd_id_put(rbd_dev);
27cc2594 2605err_nomem:
27cc2594 2606 kfree(rbd_dev);
cb8627c7 2607 kfree(options);
27cc2594 2608
602adf40
YS
2609 dout("Error adding device %s\n", buf);
2610 module_put(THIS_MODULE);
27cc2594
AE
2611
2612 return (ssize_t) rc;
602adf40
YS
2613}
2614
de71a297 2615static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
2616{
2617 struct list_head *tmp;
2618 struct rbd_device *rbd_dev;
2619
e124a82f 2620 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2621 list_for_each(tmp, &rbd_dev_list) {
2622 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 2623 if (rbd_dev->dev_id == dev_id) {
e124a82f 2624 spin_unlock(&rbd_dev_list_lock);
602adf40 2625 return rbd_dev;
e124a82f 2626 }
602adf40 2627 }
e124a82f 2628 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2629 return NULL;
2630}
2631
dfc5606d 2632static void rbd_dev_release(struct device *dev)
602adf40 2633{
593a9e7b 2634 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2635
1dbb4399
AE
2636 if (rbd_dev->watch_request) {
2637 struct ceph_client *client = rbd_dev->rbd_client->client;
2638
2639 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2640 rbd_dev->watch_request);
1dbb4399 2641 }
59c2be1e 2642 if (rbd_dev->watch_event)
070c633f 2643 rbd_req_sync_unwatch(rbd_dev);
59c2be1e 2644
602adf40
YS
2645 rbd_put_client(rbd_dev);
2646
2647 /* clean up and free blkdev */
2648 rbd_free_disk(rbd_dev);
2649 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d
AE
2650
2651 /* done with the id, and with the rbd_dev */
820a5f3e 2652 kfree(rbd_dev->snap_name);
0bed54dc 2653 kfree(rbd_dev->header_name);
d22f76e7 2654 kfree(rbd_dev->pool_name);
0bed54dc 2655 kfree(rbd_dev->image_name);
32eec68d 2656 rbd_id_put(rbd_dev);
602adf40
YS
2657 kfree(rbd_dev);
2658
2659 /* release module ref */
2660 module_put(THIS_MODULE);
602adf40
YS
2661}
2662
dfc5606d
YS
2663static ssize_t rbd_remove(struct bus_type *bus,
2664 const char *buf,
2665 size_t count)
602adf40
YS
2666{
2667 struct rbd_device *rbd_dev = NULL;
2668 int target_id, rc;
2669 unsigned long ul;
2670 int ret = count;
2671
2672 rc = strict_strtoul(buf, 10, &ul);
2673 if (rc)
2674 return rc;
2675
2676 /* convert to int; abort if we lost anything in the conversion */
2677 target_id = (int) ul;
2678 if (target_id != ul)
2679 return -EINVAL;
2680
2681 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2682
2683 rbd_dev = __rbd_get_dev(target_id);
2684 if (!rbd_dev) {
2685 ret = -ENOENT;
2686 goto done;
2687 }
2688
dfc5606d
YS
2689 __rbd_remove_all_snaps(rbd_dev);
2690 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2691
2692done:
2693 mutex_unlock(&ctl_mutex);
2694 return ret;
2695}
2696
dfc5606d
YS
2697static ssize_t rbd_snap_add(struct device *dev,
2698 struct device_attribute *attr,
2699 const char *buf,
2700 size_t count)
602adf40 2701{
593a9e7b 2702 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2703 int ret;
2704 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2705 if (!name)
2706 return -ENOMEM;
2707
dfc5606d 2708 snprintf(name, count, "%s", buf);
602adf40
YS
2709
2710 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2711
602adf40
YS
2712 ret = rbd_header_add_snap(rbd_dev,
2713 name, GFP_KERNEL);
2714 if (ret < 0)
59c2be1e 2715 goto err_unlock;
602adf40 2716
b813623a 2717 ret = __rbd_refresh_header(rbd_dev, NULL);
602adf40 2718 if (ret < 0)
59c2be1e
YS
2719 goto err_unlock;
2720
2721 /* shouldn't hold ctl_mutex when notifying.. notify might
2722 trigger a watch callback that would need to get that mutex */
2723 mutex_unlock(&ctl_mutex);
2724
2725 /* make a best effort, don't error if failed */
4cb16250 2726 rbd_req_sync_notify(rbd_dev);
602adf40
YS
2727
2728 ret = count;
59c2be1e
YS
2729 kfree(name);
2730 return ret;
2731
2732err_unlock:
602adf40 2733 mutex_unlock(&ctl_mutex);
602adf40
YS
2734 kfree(name);
2735 return ret;
2736}
2737
602adf40
YS
2738/*
2739 * create control files in sysfs
dfc5606d 2740 * /sys/bus/rbd/...
602adf40
YS
2741 */
2742static int rbd_sysfs_init(void)
2743{
dfc5606d 2744 int ret;
602adf40 2745
fed4c143 2746 ret = device_register(&rbd_root_dev);
21079786 2747 if (ret < 0)
dfc5606d 2748 return ret;
602adf40 2749
fed4c143
AE
2750 ret = bus_register(&rbd_bus_type);
2751 if (ret < 0)
2752 device_unregister(&rbd_root_dev);
602adf40 2753
602adf40
YS
2754 return ret;
2755}
2756
2757static void rbd_sysfs_cleanup(void)
2758{
dfc5606d 2759 bus_unregister(&rbd_bus_type);
fed4c143 2760 device_unregister(&rbd_root_dev);
602adf40
YS
2761}
2762
2763int __init rbd_init(void)
2764{
2765 int rc;
2766
2767 rc = rbd_sysfs_init();
2768 if (rc)
2769 return rc;
f0f8cef5 2770 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2771 return 0;
2772}
2773
2774void __exit rbd_exit(void)
2775{
2776 rbd_sysfs_cleanup();
2777}
2778
2779module_init(rbd_init);
2780module_exit(rbd_exit);
2781
2782MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2783MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2784MODULE_DESCRIPTION("rados block device");
2785
2786/* following authorship retained from original osdblk.c */
2787MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2788
2789MODULE_LICENSE("GPL");