]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/rbd.c
rbd: use reference counting for the snap context
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
593a9e7b
AE
44/*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
f0f8cef5
AE
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
55
56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
602adf40
YS
58#define RBD_MAX_SNAP_NAME_LEN 32
59#define RBD_MAX_OPT_LEN 1024
60
61#define RBD_SNAP_HEAD_NAME "-"
62
81a89793
AE
63/*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
602adf40 69#define DEV_NAME_LEN 32
81a89793 70#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 71
59c2be1e
YS
72#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
602adf40
YS
74/*
75 * block device image metadata (in-memory version)
76 */
77struct rbd_image_header {
78 u64 image_size;
849b4260 79 char *object_prefix;
602adf40
YS
80 __u8 obj_order;
81 __u8 crypt_type;
82 __u8 comp_type;
602adf40
YS
83 struct ceph_snap_context *snapc;
84 size_t snap_names_len;
85 u64 snap_seq;
86 u32 total_snaps;
87
88 char *snap_names;
89 u64 *snap_sizes;
59c2be1e
YS
90
91 u64 obj_version;
92};
93
94struct rbd_options {
95 int notify_timeout;
602adf40
YS
96};
97
98/*
f0f8cef5 99 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
100 */
101struct rbd_client {
102 struct ceph_client *client;
59c2be1e 103 struct rbd_options *rbd_opts;
602adf40
YS
104 struct kref kref;
105 struct list_head node;
106};
107
108/*
f0f8cef5 109 * a request completion status
602adf40 110 */
1fec7093
YS
111struct rbd_req_status {
112 int done;
113 int rc;
114 u64 bytes;
115};
116
117/*
118 * a collection of requests
119 */
120struct rbd_req_coll {
121 int total;
122 int num_done;
123 struct kref kref;
124 struct rbd_req_status status[0];
602adf40
YS
125};
126
f0f8cef5
AE
127/*
128 * a single io request
129 */
130struct rbd_request {
131 struct request *rq; /* blk layer request */
132 struct bio *bio; /* cloned bio */
133 struct page **pages; /* list of used pages */
134 u64 len;
135 int coll_index;
136 struct rbd_req_coll *coll;
137};
138
dfc5606d
YS
139struct rbd_snap {
140 struct device dev;
141 const char *name;
3591538f 142 u64 size;
dfc5606d
YS
143 struct list_head node;
144 u64 id;
145};
146
602adf40
YS
147/*
148 * a single device
149 */
150struct rbd_device {
151 int id; /* blkdev unique id */
152
153 int major; /* blkdev assigned major */
154 struct gendisk *disk; /* blkdev's gendisk and rq */
155 struct request_queue *q;
156
602adf40
YS
157 struct rbd_client *rbd_client;
158
159 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160
161 spinlock_t lock; /* queue lock */
162
163 struct rbd_image_header header;
0bed54dc
AE
164 char *image_name;
165 size_t image_name_len;
166 char *header_name;
d22f76e7 167 char *pool_name;
9bb2f334 168 int pool_id;
602adf40 169
59c2be1e
YS
170 struct ceph_osd_event *watch_event;
171 struct ceph_osd_request *watch_request;
172
c666601a
JD
173 /* protects updating the header */
174 struct rw_semaphore header_rwsem;
e88a36ec 175 /* name of the snapshot this device reads from */
820a5f3e 176 char *snap_name;
e88a36ec 177 /* id of the snapshot this device reads from */
77dfe99f 178 u64 snap_id; /* current snapshot id */
e88a36ec
JD
179 /* whether the snap_id this device reads from still exists */
180 bool snap_exists;
181 int read_only;
602adf40
YS
182
183 struct list_head node;
dfc5606d
YS
184
185 /* list of snapshots */
186 struct list_head snaps;
187
188 /* sysfs related */
189 struct device dev;
190};
191
602adf40 192static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 193
602adf40 194static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
195static DEFINE_SPINLOCK(rbd_dev_list_lock);
196
432b8587
AE
197static LIST_HEAD(rbd_client_list); /* clients */
198static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 199
dfc5606d
YS
200static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
201static void rbd_dev_release(struct device *dev);
dfc5606d
YS
202static ssize_t rbd_snap_add(struct device *dev,
203 struct device_attribute *attr,
204 const char *buf,
205 size_t count);
206static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
69932487 207 struct rbd_snap *snap);
dfc5606d 208
f0f8cef5
AE
209static ssize_t rbd_add(struct bus_type *bus, const char *buf,
210 size_t count);
211static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
212 size_t count);
213
214static struct bus_attribute rbd_bus_attrs[] = {
215 __ATTR(add, S_IWUSR, NULL, rbd_add),
216 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
217 __ATTR_NULL
218};
219
220static struct bus_type rbd_bus_type = {
221 .name = "rbd",
222 .bus_attrs = rbd_bus_attrs,
223};
224
225static void rbd_root_dev_release(struct device *dev)
226{
227}
228
229static struct device rbd_root_dev = {
230 .init_name = "rbd",
231 .release = rbd_root_dev_release,
232};
233
dfc5606d 234
dfc5606d
YS
235static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
236{
237 return get_device(&rbd_dev->dev);
238}
239
240static void rbd_put_dev(struct rbd_device *rbd_dev)
241{
242 put_device(&rbd_dev->dev);
243}
602adf40 244
263c6ca0 245static int __rbd_refresh_header(struct rbd_device *rbd_dev);
59c2be1e 246
602adf40
YS
247static int rbd_open(struct block_device *bdev, fmode_t mode)
248{
f0f8cef5 249 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 250
dfc5606d
YS
251 rbd_get_dev(rbd_dev);
252
602adf40
YS
253 set_device_ro(bdev, rbd_dev->read_only);
254
255 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
256 return -EROFS;
257
258 return 0;
259}
260
dfc5606d
YS
261static int rbd_release(struct gendisk *disk, fmode_t mode)
262{
263 struct rbd_device *rbd_dev = disk->private_data;
264
265 rbd_put_dev(rbd_dev);
266
267 return 0;
268}
269
602adf40
YS
270static const struct block_device_operations rbd_bd_ops = {
271 .owner = THIS_MODULE,
272 .open = rbd_open,
dfc5606d 273 .release = rbd_release,
602adf40
YS
274};
275
276/*
277 * Initialize an rbd client instance.
43ae4701 278 * We own *ceph_opts.
602adf40 279 */
43ae4701 280static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
59c2be1e 281 struct rbd_options *rbd_opts)
602adf40
YS
282{
283 struct rbd_client *rbdc;
284 int ret = -ENOMEM;
285
286 dout("rbd_client_create\n");
287 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
288 if (!rbdc)
289 goto out_opt;
290
291 kref_init(&rbdc->kref);
292 INIT_LIST_HEAD(&rbdc->node);
293
bc534d86
AE
294 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
295
43ae4701 296 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 297 if (IS_ERR(rbdc->client))
bc534d86 298 goto out_mutex;
43ae4701 299 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
300
301 ret = ceph_open_session(rbdc->client);
302 if (ret < 0)
303 goto out_err;
304
59c2be1e
YS
305 rbdc->rbd_opts = rbd_opts;
306
432b8587 307 spin_lock(&rbd_client_list_lock);
602adf40 308 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 309 spin_unlock(&rbd_client_list_lock);
602adf40 310
bc534d86
AE
311 mutex_unlock(&ctl_mutex);
312
602adf40
YS
313 dout("rbd_client_create created %p\n", rbdc);
314 return rbdc;
315
316out_err:
317 ceph_destroy_client(rbdc->client);
bc534d86
AE
318out_mutex:
319 mutex_unlock(&ctl_mutex);
602adf40
YS
320 kfree(rbdc);
321out_opt:
43ae4701
AE
322 if (ceph_opts)
323 ceph_destroy_options(ceph_opts);
28f259b7 324 return ERR_PTR(ret);
602adf40
YS
325}
326
327/*
328 * Find a ceph client with specific addr and configuration.
329 */
43ae4701 330static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
331{
332 struct rbd_client *client_node;
333
43ae4701 334 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
335 return NULL;
336
337 list_for_each_entry(client_node, &rbd_client_list, node)
43ae4701 338 if (!ceph_compare_options(ceph_opts, client_node->client))
602adf40
YS
339 return client_node;
340 return NULL;
341}
342
59c2be1e
YS
343/*
344 * mount options
345 */
346enum {
347 Opt_notify_timeout,
348 Opt_last_int,
349 /* int args above */
350 Opt_last_string,
351 /* string args above */
352};
353
43ae4701 354static match_table_t rbd_opts_tokens = {
59c2be1e
YS
355 {Opt_notify_timeout, "notify_timeout=%d"},
356 /* int args above */
357 /* string args above */
358 {-1, NULL}
359};
360
361static int parse_rbd_opts_token(char *c, void *private)
362{
43ae4701 363 struct rbd_options *rbd_opts = private;
59c2be1e
YS
364 substring_t argstr[MAX_OPT_ARGS];
365 int token, intval, ret;
366
43ae4701 367 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
368 if (token < 0)
369 return -EINVAL;
370
371 if (token < Opt_last_int) {
372 ret = match_int(&argstr[0], &intval);
373 if (ret < 0) {
374 pr_err("bad mount option arg (not int) "
375 "at '%s'\n", c);
376 return ret;
377 }
378 dout("got int token %d val %d\n", token, intval);
379 } else if (token > Opt_last_int && token < Opt_last_string) {
380 dout("got string token %d val %s\n", token,
381 argstr[0].from);
382 } else {
383 dout("got token %d\n", token);
384 }
385
386 switch (token) {
387 case Opt_notify_timeout:
43ae4701 388 rbd_opts->notify_timeout = intval;
59c2be1e
YS
389 break;
390 default:
391 BUG_ON(token);
392 }
393 return 0;
394}
395
602adf40
YS
396/*
397 * Get a ceph client with specific addr and configuration, if one does
398 * not exist create it.
399 */
5214ecc4
AE
400static struct rbd_client *rbd_get_client(const char *mon_addr,
401 size_t mon_addr_len,
402 char *options)
602adf40
YS
403{
404 struct rbd_client *rbdc;
43ae4701 405 struct ceph_options *ceph_opts;
59c2be1e
YS
406 struct rbd_options *rbd_opts;
407
408 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
409 if (!rbd_opts)
d720bcb0 410 return ERR_PTR(-ENOMEM);
59c2be1e
YS
411
412 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 413
43ae4701
AE
414 ceph_opts = ceph_parse_options(options, mon_addr,
415 mon_addr + mon_addr_len,
416 parse_rbd_opts_token, rbd_opts);
417 if (IS_ERR(ceph_opts)) {
d720bcb0 418 kfree(rbd_opts);
43ae4701 419 return ERR_CAST(ceph_opts);
ee57741c 420 }
602adf40 421
432b8587 422 spin_lock(&rbd_client_list_lock);
43ae4701 423 rbdc = __rbd_client_find(ceph_opts);
602adf40 424 if (rbdc) {
602adf40
YS
425 /* using an existing client */
426 kref_get(&rbdc->kref);
432b8587 427 spin_unlock(&rbd_client_list_lock);
e6994d3d 428
43ae4701 429 ceph_destroy_options(ceph_opts);
e6994d3d
AE
430 kfree(rbd_opts);
431
d720bcb0 432 return rbdc;
602adf40 433 }
432b8587 434 spin_unlock(&rbd_client_list_lock);
602adf40 435
43ae4701 436 rbdc = rbd_client_create(ceph_opts, rbd_opts);
d97081b0 437
d720bcb0
AE
438 if (IS_ERR(rbdc))
439 kfree(rbd_opts);
602adf40 440
d720bcb0 441 return rbdc;
602adf40
YS
442}
443
444/*
445 * Destroy ceph client
d23a4b3f 446 *
432b8587 447 * Caller must hold rbd_client_list_lock.
602adf40
YS
448 */
449static void rbd_client_release(struct kref *kref)
450{
451 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
452
453 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 454 spin_lock(&rbd_client_list_lock);
602adf40 455 list_del(&rbdc->node);
cd9d9f5d 456 spin_unlock(&rbd_client_list_lock);
602adf40
YS
457
458 ceph_destroy_client(rbdc->client);
59c2be1e 459 kfree(rbdc->rbd_opts);
602adf40
YS
460 kfree(rbdc);
461}
462
463/*
464 * Drop reference to ceph client node. If it's not referenced anymore, release
465 * it.
466 */
467static void rbd_put_client(struct rbd_device *rbd_dev)
468{
469 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
470 rbd_dev->rbd_client = NULL;
602adf40
YS
471}
472
1fec7093
YS
473/*
474 * Destroy requests collection
475 */
476static void rbd_coll_release(struct kref *kref)
477{
478 struct rbd_req_coll *coll =
479 container_of(kref, struct rbd_req_coll, kref);
480
481 dout("rbd_coll_release %p\n", coll);
482 kfree(coll);
483}
602adf40
YS
484
485/*
486 * Create a new header structure, translate header format from the on-disk
487 * header.
488 */
489static int rbd_header_from_disk(struct rbd_image_header *header,
490 struct rbd_image_header_ondisk *ondisk,
50f7c4c9 491 u32 allocated_snaps,
602adf40
YS
492 gfp_t gfp_flags)
493{
50f7c4c9 494 u32 i, snap_count;
602adf40 495
21079786 496 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
81e759fb 497 return -ENXIO;
81e759fb 498
00f1f36f 499 snap_count = le32_to_cpu(ondisk->snap_count);
50f7c4c9
XW
500 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
501 / sizeof (*ondisk))
502 return -EINVAL;
602adf40 503 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
f9f9a190 504 snap_count * sizeof(u64),
602adf40
YS
505 gfp_flags);
506 if (!header->snapc)
507 return -ENOMEM;
00f1f36f 508
00f1f36f 509 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
602adf40
YS
510 if (snap_count) {
511 header->snap_names = kmalloc(header->snap_names_len,
f8ad495a 512 gfp_flags);
602adf40
YS
513 if (!header->snap_names)
514 goto err_snapc;
515 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
f8ad495a 516 gfp_flags);
602adf40
YS
517 if (!header->snap_sizes)
518 goto err_names;
519 } else {
520 header->snap_names = NULL;
521 header->snap_sizes = NULL;
522 }
849b4260
AE
523
524 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
525 gfp_flags);
526 if (!header->object_prefix)
527 goto err_sizes;
528
ca1e49a6 529 memcpy(header->object_prefix, ondisk->block_name,
602adf40 530 sizeof(ondisk->block_name));
849b4260 531 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
602adf40
YS
532
533 header->image_size = le64_to_cpu(ondisk->image_size);
534 header->obj_order = ondisk->options.order;
535 header->crypt_type = ondisk->options.crypt_type;
536 header->comp_type = ondisk->options.comp_type;
537
538 atomic_set(&header->snapc->nref, 1);
539 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
540 header->snapc->num_snaps = snap_count;
541 header->total_snaps = snap_count;
542
21079786 543 if (snap_count && allocated_snaps == snap_count) {
602adf40
YS
544 for (i = 0; i < snap_count; i++) {
545 header->snapc->snaps[i] =
546 le64_to_cpu(ondisk->snaps[i].id);
547 header->snap_sizes[i] =
548 le64_to_cpu(ondisk->snaps[i].image_size);
549 }
550
551 /* copy snapshot names */
552 memcpy(header->snap_names, &ondisk->snaps[i],
553 header->snap_names_len);
554 }
555
556 return 0;
557
849b4260
AE
558err_sizes:
559 kfree(header->snap_sizes);
602adf40
YS
560err_names:
561 kfree(header->snap_names);
562err_snapc:
563 kfree(header->snapc);
00f1f36f 564 return -ENOMEM;
602adf40
YS
565}
566
602adf40
YS
567static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
568 u64 *seq, u64 *size)
569{
570 int i;
571 char *p = header->snap_names;
572
00f1f36f
AE
573 for (i = 0; i < header->total_snaps; i++) {
574 if (!strcmp(snap_name, p)) {
602adf40 575
00f1f36f 576 /* Found it. Pass back its id and/or size */
602adf40 577
00f1f36f
AE
578 if (seq)
579 *seq = header->snapc->snaps[i];
580 if (size)
581 *size = header->snap_sizes[i];
582 return i;
583 }
584 p += strlen(p) + 1; /* Skip ahead to the next name */
585 }
586 return -ENOENT;
602adf40
YS
587}
588
0ce1a794 589static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
602adf40 590{
0ce1a794 591 struct rbd_image_header *header = &rbd_dev->header;
602adf40
YS
592 struct ceph_snap_context *snapc = header->snapc;
593 int ret = -ENOENT;
594
0ce1a794 595 down_write(&rbd_dev->header_rwsem);
602adf40 596
0ce1a794 597 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 598 sizeof (RBD_SNAP_HEAD_NAME))) {
602adf40
YS
599 if (header->total_snaps)
600 snapc->seq = header->snap_seq;
601 else
602 snapc->seq = 0;
0ce1a794 603 rbd_dev->snap_id = CEPH_NOSNAP;
e88a36ec 604 rbd_dev->snap_exists = false;
0ce1a794 605 rbd_dev->read_only = 0;
602adf40
YS
606 if (size)
607 *size = header->image_size;
608 } else {
0ce1a794
AE
609 ret = snap_by_name(header, rbd_dev->snap_name,
610 &snapc->seq, size);
602adf40
YS
611 if (ret < 0)
612 goto done;
0ce1a794 613 rbd_dev->snap_id = snapc->seq;
e88a36ec 614 rbd_dev->snap_exists = true;
0ce1a794 615 rbd_dev->read_only = 1;
602adf40
YS
616 }
617
618 ret = 0;
619done:
0ce1a794 620 up_write(&rbd_dev->header_rwsem);
602adf40
YS
621 return ret;
622}
623
624static void rbd_header_free(struct rbd_image_header *header)
625{
849b4260 626 kfree(header->object_prefix);
602adf40 627 kfree(header->snap_sizes);
849b4260 628 kfree(header->snap_names);
d1d25646 629 ceph_put_snap_context(header->snapc);
602adf40
YS
630}
631
632/*
633 * get the actual striped segment name, offset and length
634 */
635static u64 rbd_get_segment(struct rbd_image_header *header,
ca1e49a6 636 const char *object_prefix,
602adf40
YS
637 u64 ofs, u64 len,
638 char *seg_name, u64 *segofs)
639{
640 u64 seg = ofs >> header->obj_order;
641
642 if (seg_name)
643 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
ca1e49a6 644 "%s.%012llx", object_prefix, seg);
602adf40
YS
645
646 ofs = ofs & ((1 << header->obj_order) - 1);
647 len = min_t(u64, len, (1 << header->obj_order) - ofs);
648
649 if (segofs)
650 *segofs = ofs;
651
652 return len;
653}
654
1fec7093
YS
655static int rbd_get_num_segments(struct rbd_image_header *header,
656 u64 ofs, u64 len)
657{
658 u64 start_seg = ofs >> header->obj_order;
659 u64 end_seg = (ofs + len - 1) >> header->obj_order;
660 return end_seg - start_seg + 1;
661}
662
029bcbd8
JD
663/*
664 * returns the size of an object in the image
665 */
666static u64 rbd_obj_bytes(struct rbd_image_header *header)
667{
668 return 1 << header->obj_order;
669}
670
602adf40
YS
671/*
672 * bio helpers
673 */
674
675static void bio_chain_put(struct bio *chain)
676{
677 struct bio *tmp;
678
679 while (chain) {
680 tmp = chain;
681 chain = chain->bi_next;
682 bio_put(tmp);
683 }
684}
685
686/*
687 * zeros a bio chain, starting at specific offset
688 */
689static void zero_bio_chain(struct bio *chain, int start_ofs)
690{
691 struct bio_vec *bv;
692 unsigned long flags;
693 void *buf;
694 int i;
695 int pos = 0;
696
697 while (chain) {
698 bio_for_each_segment(bv, chain, i) {
699 if (pos + bv->bv_len > start_ofs) {
700 int remainder = max(start_ofs - pos, 0);
701 buf = bvec_kmap_irq(bv, &flags);
702 memset(buf + remainder, 0,
703 bv->bv_len - remainder);
85b5aaa6 704 bvec_kunmap_irq(buf, &flags);
602adf40
YS
705 }
706 pos += bv->bv_len;
707 }
708
709 chain = chain->bi_next;
710 }
711}
712
713/*
714 * bio_chain_clone - clone a chain of bios up to a certain length.
715 * might return a bio_pair that will need to be released.
716 */
717static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
718 struct bio_pair **bp,
719 int len, gfp_t gfpmask)
720{
721 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
722 int total = 0;
723
724 if (*bp) {
725 bio_pair_release(*bp);
726 *bp = NULL;
727 }
728
729 while (old_chain && (total < len)) {
730 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
731 if (!tmp)
732 goto err_out;
733
734 if (total + old_chain->bi_size > len) {
735 struct bio_pair *bp;
736
737 /*
738 * this split can only happen with a single paged bio,
739 * split_bio will BUG_ON if this is not the case
740 */
741 dout("bio_chain_clone split! total=%d remaining=%d"
742 "bi_size=%d\n",
743 (int)total, (int)len-total,
744 (int)old_chain->bi_size);
745
746 /* split the bio. We'll release it either in the next
747 call, or it will have to be released outside */
593a9e7b 748 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
602adf40
YS
749 if (!bp)
750 goto err_out;
751
752 __bio_clone(tmp, &bp->bio1);
753
754 *next = &bp->bio2;
755 } else {
756 __bio_clone(tmp, old_chain);
757 *next = old_chain->bi_next;
758 }
759
760 tmp->bi_bdev = NULL;
761 gfpmask &= ~__GFP_WAIT;
762 tmp->bi_next = NULL;
763
764 if (!new_chain) {
765 new_chain = tail = tmp;
766 } else {
767 tail->bi_next = tmp;
768 tail = tmp;
769 }
770 old_chain = old_chain->bi_next;
771
772 total += tmp->bi_size;
773 }
774
775 BUG_ON(total < len);
776
777 if (tail)
778 tail->bi_next = NULL;
779
780 *old = old_chain;
781
782 return new_chain;
783
784err_out:
785 dout("bio_chain_clone with err\n");
786 bio_chain_put(new_chain);
787 return NULL;
788}
789
790/*
791 * helpers for osd request op vectors.
792 */
793static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
794 int num_ops,
795 int opcode,
796 u32 payload_len)
797{
798 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
799 GFP_NOIO);
800 if (!*ops)
801 return -ENOMEM;
802 (*ops)[0].op = opcode;
803 /*
804 * op extent offset and length will be set later on
805 * in calc_raw_layout()
806 */
807 (*ops)[0].payload_len = payload_len;
808 return 0;
809}
810
811static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
812{
813 kfree(ops);
814}
815
1fec7093
YS
816static void rbd_coll_end_req_index(struct request *rq,
817 struct rbd_req_coll *coll,
818 int index,
819 int ret, u64 len)
820{
821 struct request_queue *q;
822 int min, max, i;
823
824 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
825 coll, index, ret, len);
826
827 if (!rq)
828 return;
829
830 if (!coll) {
831 blk_end_request(rq, ret, len);
832 return;
833 }
834
835 q = rq->q;
836
837 spin_lock_irq(q->queue_lock);
838 coll->status[index].done = 1;
839 coll->status[index].rc = ret;
840 coll->status[index].bytes = len;
841 max = min = coll->num_done;
842 while (max < coll->total && coll->status[max].done)
843 max++;
844
845 for (i = min; i<max; i++) {
846 __blk_end_request(rq, coll->status[i].rc,
847 coll->status[i].bytes);
848 coll->num_done++;
849 kref_put(&coll->kref, rbd_coll_release);
850 }
851 spin_unlock_irq(q->queue_lock);
852}
853
854static void rbd_coll_end_req(struct rbd_request *req,
855 int ret, u64 len)
856{
857 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
858}
859
602adf40
YS
860/*
861 * Send ceph osd request
862 */
863static int rbd_do_request(struct request *rq,
0ce1a794 864 struct rbd_device *rbd_dev,
602adf40
YS
865 struct ceph_snap_context *snapc,
866 u64 snapid,
aded07ea 867 const char *object_name, u64 ofs, u64 len,
602adf40
YS
868 struct bio *bio,
869 struct page **pages,
870 int num_pages,
871 int flags,
872 struct ceph_osd_req_op *ops,
1fec7093
YS
873 struct rbd_req_coll *coll,
874 int coll_index,
602adf40 875 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
876 struct ceph_msg *msg),
877 struct ceph_osd_request **linger_req,
878 u64 *ver)
602adf40
YS
879{
880 struct ceph_osd_request *req;
881 struct ceph_file_layout *layout;
882 int ret;
883 u64 bno;
884 struct timespec mtime = CURRENT_TIME;
885 struct rbd_request *req_data;
886 struct ceph_osd_request_head *reqhead;
1dbb4399 887 struct ceph_osd_client *osdc;
602adf40 888
602adf40 889 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
890 if (!req_data) {
891 if (coll)
892 rbd_coll_end_req_index(rq, coll, coll_index,
893 -ENOMEM, len);
894 return -ENOMEM;
895 }
896
897 if (coll) {
898 req_data->coll = coll;
899 req_data->coll_index = coll_index;
900 }
602adf40 901
aded07ea
AE
902 dout("rbd_do_request object_name=%s ofs=%lld len=%lld\n",
903 object_name, len, ofs);
602adf40 904
0ce1a794 905 osdc = &rbd_dev->rbd_client->client->osdc;
1dbb4399
AE
906 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
907 false, GFP_NOIO, pages, bio);
4ad12621 908 if (!req) {
4ad12621 909 ret = -ENOMEM;
602adf40
YS
910 goto done_pages;
911 }
912
913 req->r_callback = rbd_cb;
914
915 req_data->rq = rq;
916 req_data->bio = bio;
917 req_data->pages = pages;
918 req_data->len = len;
919
920 req->r_priv = req_data;
921
922 reqhead = req->r_request->front.iov_base;
923 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
924
aded07ea 925 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
602adf40
YS
926 req->r_oid_len = strlen(req->r_oid);
927
928 layout = &req->r_file_layout;
929 memset(layout, 0, sizeof(*layout));
930 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931 layout->fl_stripe_count = cpu_to_le32(1);
932 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
0ce1a794 933 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1dbb4399
AE
934 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
935 req, ops);
602adf40
YS
936
937 ceph_osdc_build_request(req, ofs, &len,
938 ops,
939 snapc,
940 &mtime,
941 req->r_oid, req->r_oid_len);
602adf40 942
59c2be1e 943 if (linger_req) {
1dbb4399 944 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
945 *linger_req = req;
946 }
947
1dbb4399 948 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
949 if (ret < 0)
950 goto done_err;
951
952 if (!rbd_cb) {
1dbb4399 953 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
954 if (ver)
955 *ver = le64_to_cpu(req->r_reassert_version.version);
1fec7093
YS
956 dout("reassert_ver=%lld\n",
957 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
958 ceph_osdc_put_request(req);
959 }
960 return ret;
961
962done_err:
963 bio_chain_put(req_data->bio);
964 ceph_osdc_put_request(req);
965done_pages:
1fec7093 966 rbd_coll_end_req(req_data, ret, len);
602adf40 967 kfree(req_data);
602adf40
YS
968 return ret;
969}
970
971/*
972 * Ceph osd op callback
973 */
974static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
975{
976 struct rbd_request *req_data = req->r_priv;
977 struct ceph_osd_reply_head *replyhead;
978 struct ceph_osd_op *op;
979 __s32 rc;
980 u64 bytes;
981 int read_op;
982
983 /* parse reply */
984 replyhead = msg->front.iov_base;
985 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
986 op = (void *)(replyhead + 1);
987 rc = le32_to_cpu(replyhead->result);
988 bytes = le64_to_cpu(op->extent.length);
895cfcc8 989 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
602adf40
YS
990
991 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
992
993 if (rc == -ENOENT && read_op) {
994 zero_bio_chain(req_data->bio, 0);
995 rc = 0;
996 } else if (rc == 0 && read_op && bytes < req_data->len) {
997 zero_bio_chain(req_data->bio, bytes);
998 bytes = req_data->len;
999 }
1000
1fec7093 1001 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
1002
1003 if (req_data->bio)
1004 bio_chain_put(req_data->bio);
1005
1006 ceph_osdc_put_request(req);
1007 kfree(req_data);
1008}
1009
59c2be1e
YS
1010static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1011{
1012 ceph_osdc_put_request(req);
1013}
1014
602adf40
YS
1015/*
1016 * Do a synchronous ceph osd operation
1017 */
0ce1a794 1018static int rbd_req_sync_op(struct rbd_device *rbd_dev,
602adf40
YS
1019 struct ceph_snap_context *snapc,
1020 u64 snapid,
1021 int opcode,
1022 int flags,
1023 struct ceph_osd_req_op *orig_ops,
aded07ea 1024 const char *object_name,
602adf40 1025 u64 ofs, u64 len,
59c2be1e
YS
1026 char *buf,
1027 struct ceph_osd_request **linger_req,
1028 u64 *ver)
602adf40
YS
1029{
1030 int ret;
1031 struct page **pages;
1032 int num_pages;
1033 struct ceph_osd_req_op *ops = orig_ops;
1034 u32 payload_len;
1035
1036 num_pages = calc_pages_for(ofs , len);
1037 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1038 if (IS_ERR(pages))
1039 return PTR_ERR(pages);
602adf40
YS
1040
1041 if (!orig_ops) {
1042 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1043 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1044 if (ret < 0)
1045 goto done;
1046
1047 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1048 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1049 if (ret < 0)
1050 goto done_ops;
1051 }
1052 }
1053
0ce1a794 1054 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
aded07ea 1055 object_name, ofs, len, NULL,
602adf40
YS
1056 pages, num_pages,
1057 flags,
1058 ops,
1fec7093 1059 NULL, 0,
59c2be1e
YS
1060 NULL,
1061 linger_req, ver);
602adf40
YS
1062 if (ret < 0)
1063 goto done_ops;
1064
1065 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1066 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1067
1068done_ops:
1069 if (!orig_ops)
1070 rbd_destroy_ops(ops);
1071done:
1072 ceph_release_page_vector(pages, num_pages);
1073 return ret;
1074}
1075
1076/*
1077 * Do an asynchronous ceph osd operation
1078 */
1079static int rbd_do_op(struct request *rq,
0ce1a794 1080 struct rbd_device *rbd_dev,
602adf40
YS
1081 struct ceph_snap_context *snapc,
1082 u64 snapid,
d1f57ea6 1083 int opcode, int flags,
602adf40 1084 u64 ofs, u64 len,
1fec7093
YS
1085 struct bio *bio,
1086 struct rbd_req_coll *coll,
1087 int coll_index)
602adf40
YS
1088{
1089 char *seg_name;
1090 u64 seg_ofs;
1091 u64 seg_len;
1092 int ret;
1093 struct ceph_osd_req_op *ops;
1094 u32 payload_len;
1095
1096 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1097 if (!seg_name)
1098 return -ENOMEM;
1099
1100 seg_len = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1101 rbd_dev->header.object_prefix,
602adf40
YS
1102 ofs, len,
1103 seg_name, &seg_ofs);
602adf40
YS
1104
1105 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1106
1107 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1108 if (ret < 0)
1109 goto done;
1110
1111 /* we've taken care of segment sizes earlier when we
1112 cloned the bios. We should never have a segment
1113 truncated at this point */
1114 BUG_ON(seg_len < len);
1115
1116 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1117 seg_name, seg_ofs, seg_len,
1118 bio,
1119 NULL, 0,
1120 flags,
1121 ops,
1fec7093 1122 coll, coll_index,
59c2be1e 1123 rbd_req_cb, 0, NULL);
11f77002
SW
1124
1125 rbd_destroy_ops(ops);
602adf40
YS
1126done:
1127 kfree(seg_name);
1128 return ret;
1129}
1130
1131/*
1132 * Request async osd write
1133 */
1134static int rbd_req_write(struct request *rq,
1135 struct rbd_device *rbd_dev,
1136 struct ceph_snap_context *snapc,
1137 u64 ofs, u64 len,
1fec7093
YS
1138 struct bio *bio,
1139 struct rbd_req_coll *coll,
1140 int coll_index)
602adf40
YS
1141{
1142 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1143 CEPH_OSD_OP_WRITE,
1144 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1fec7093 1145 ofs, len, bio, coll, coll_index);
602adf40
YS
1146}
1147
1148/*
1149 * Request async osd read
1150 */
1151static int rbd_req_read(struct request *rq,
1152 struct rbd_device *rbd_dev,
1153 u64 snapid,
1154 u64 ofs, u64 len,
1fec7093
YS
1155 struct bio *bio,
1156 struct rbd_req_coll *coll,
1157 int coll_index)
602adf40
YS
1158{
1159 return rbd_do_op(rq, rbd_dev, NULL,
b06e6a6b 1160 snapid,
602adf40
YS
1161 CEPH_OSD_OP_READ,
1162 CEPH_OSD_FLAG_READ,
1fec7093 1163 ofs, len, bio, coll, coll_index);
602adf40
YS
1164}
1165
1166/*
1167 * Request sync osd read
1168 */
0ce1a794 1169static int rbd_req_sync_read(struct rbd_device *rbd_dev,
602adf40
YS
1170 struct ceph_snap_context *snapc,
1171 u64 snapid,
aded07ea 1172 const char *object_name,
602adf40 1173 u64 ofs, u64 len,
59c2be1e
YS
1174 char *buf,
1175 u64 *ver)
602adf40 1176{
0ce1a794 1177 return rbd_req_sync_op(rbd_dev, NULL,
b06e6a6b 1178 snapid,
602adf40
YS
1179 CEPH_OSD_OP_READ,
1180 CEPH_OSD_FLAG_READ,
1181 NULL,
d1f57ea6 1182 object_name, ofs, len, buf, NULL, ver);
602adf40
YS
1183}
1184
1185/*
59c2be1e
YS
1186 * Request sync osd watch
1187 */
0ce1a794 1188static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
59c2be1e
YS
1189 u64 ver,
1190 u64 notify_id,
aded07ea 1191 const char *object_name)
59c2be1e
YS
1192{
1193 struct ceph_osd_req_op *ops;
11f77002
SW
1194 int ret;
1195
1196 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
59c2be1e
YS
1197 if (ret < 0)
1198 return ret;
1199
0ce1a794 1200 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
59c2be1e
YS
1201 ops[0].watch.cookie = notify_id;
1202 ops[0].watch.flag = 0;
1203
0ce1a794 1204 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
aded07ea 1205 object_name, 0, 0, NULL,
ad4f232f 1206 NULL, 0,
59c2be1e
YS
1207 CEPH_OSD_FLAG_READ,
1208 ops,
1fec7093 1209 NULL, 0,
59c2be1e
YS
1210 rbd_simple_req_cb, 0, NULL);
1211
1212 rbd_destroy_ops(ops);
1213 return ret;
1214}
1215
1216static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1217{
0ce1a794 1218 struct rbd_device *rbd_dev = (struct rbd_device *)data;
13143d2d
SW
1219 int rc;
1220
0ce1a794 1221 if (!rbd_dev)
59c2be1e
YS
1222 return;
1223
0bed54dc
AE
1224 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n",
1225 rbd_dev->header_name, notify_id, (int) opcode);
59c2be1e 1226 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
0ce1a794 1227 rc = __rbd_refresh_header(rbd_dev);
59c2be1e 1228 mutex_unlock(&ctl_mutex);
13143d2d 1229 if (rc)
f0f8cef5 1230 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
0ce1a794 1231 " update snaps: %d\n", rbd_dev->major, rc);
59c2be1e 1232
0bed54dc 1233 rbd_req_sync_notify_ack(rbd_dev, ver, notify_id, rbd_dev->header_name);
59c2be1e
YS
1234}
1235
1236/*
1237 * Request sync osd watch
1238 */
0ce1a794 1239static int rbd_req_sync_watch(struct rbd_device *rbd_dev,
aded07ea 1240 const char *object_name,
59c2be1e
YS
1241 u64 ver)
1242{
1243 struct ceph_osd_req_op *ops;
0ce1a794 1244 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1245
1246 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1247 if (ret < 0)
1248 return ret;
1249
1250 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
0ce1a794 1251 (void *)rbd_dev, &rbd_dev->watch_event);
59c2be1e
YS
1252 if (ret < 0)
1253 goto fail;
1254
1255 ops[0].watch.ver = cpu_to_le64(ver);
0ce1a794 1256 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
59c2be1e
YS
1257 ops[0].watch.flag = 1;
1258
0ce1a794 1259 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e
YS
1260 CEPH_NOSNAP,
1261 0,
1262 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1263 ops,
d1f57ea6 1264 object_name, 0, 0, NULL,
0ce1a794 1265 &rbd_dev->watch_request, NULL);
59c2be1e
YS
1266
1267 if (ret < 0)
1268 goto fail_event;
1269
1270 rbd_destroy_ops(ops);
1271 return 0;
1272
1273fail_event:
0ce1a794
AE
1274 ceph_osdc_cancel_event(rbd_dev->watch_event);
1275 rbd_dev->watch_event = NULL;
59c2be1e
YS
1276fail:
1277 rbd_destroy_ops(ops);
1278 return ret;
1279}
1280
79e3057c
YS
1281/*
1282 * Request sync osd unwatch
1283 */
0ce1a794 1284static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
aded07ea 1285 const char *object_name)
79e3057c
YS
1286{
1287 struct ceph_osd_req_op *ops;
1288
1289 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1290 if (ret < 0)
1291 return ret;
1292
1293 ops[0].watch.ver = 0;
0ce1a794 1294 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
79e3057c
YS
1295 ops[0].watch.flag = 0;
1296
0ce1a794 1297 ret = rbd_req_sync_op(rbd_dev, NULL,
79e3057c
YS
1298 CEPH_NOSNAP,
1299 0,
1300 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1301 ops,
d1f57ea6 1302 object_name, 0, 0, NULL, NULL, NULL);
79e3057c
YS
1303
1304 rbd_destroy_ops(ops);
0ce1a794
AE
1305 ceph_osdc_cancel_event(rbd_dev->watch_event);
1306 rbd_dev->watch_event = NULL;
79e3057c
YS
1307 return ret;
1308}
1309
59c2be1e 1310struct rbd_notify_info {
0ce1a794 1311 struct rbd_device *rbd_dev;
59c2be1e
YS
1312};
1313
1314static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1315{
0ce1a794
AE
1316 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1317 if (!rbd_dev)
59c2be1e
YS
1318 return;
1319
0ce1a794 1320 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n",
0bed54dc 1321 rbd_dev->header_name,
59c2be1e
YS
1322 notify_id, (int)opcode);
1323}
1324
1325/*
1326 * Request sync osd notify
1327 */
0ce1a794 1328static int rbd_req_sync_notify(struct rbd_device *rbd_dev,
aded07ea 1329 const char *object_name)
59c2be1e
YS
1330{
1331 struct ceph_osd_req_op *ops;
0ce1a794 1332 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1333 struct ceph_osd_event *event;
1334 struct rbd_notify_info info;
1335 int payload_len = sizeof(u32) + sizeof(u32);
1336 int ret;
1337
1338 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1339 if (ret < 0)
1340 return ret;
1341
0ce1a794 1342 info.rbd_dev = rbd_dev;
59c2be1e
YS
1343
1344 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1345 (void *)&info, &event);
1346 if (ret < 0)
1347 goto fail;
1348
1349 ops[0].watch.ver = 1;
1350 ops[0].watch.flag = 1;
1351 ops[0].watch.cookie = event->cookie;
1352 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1353 ops[0].watch.timeout = 12;
1354
0ce1a794 1355 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e
YS
1356 CEPH_NOSNAP,
1357 0,
1358 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1359 ops,
d1f57ea6 1360 object_name, 0, 0, NULL, NULL, NULL);
59c2be1e
YS
1361 if (ret < 0)
1362 goto fail_event;
1363
1364 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1365 dout("ceph_osdc_wait_event returned %d\n", ret);
1366 rbd_destroy_ops(ops);
1367 return 0;
1368
1369fail_event:
1370 ceph_osdc_cancel_event(event);
1371fail:
1372 rbd_destroy_ops(ops);
1373 return ret;
1374}
1375
602adf40
YS
1376/*
1377 * Request sync osd read
1378 */
0ce1a794 1379static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
aded07ea
AE
1380 const char *object_name,
1381 const char *class_name,
1382 const char *method_name,
602adf40 1383 const char *data,
59c2be1e
YS
1384 int len,
1385 u64 *ver)
602adf40
YS
1386{
1387 struct ceph_osd_req_op *ops;
aded07ea
AE
1388 int class_name_len = strlen(class_name);
1389 int method_name_len = strlen(method_name);
602adf40 1390 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
aded07ea 1391 class_name_len + method_name_len + len);
602adf40
YS
1392 if (ret < 0)
1393 return ret;
1394
aded07ea
AE
1395 ops[0].cls.class_name = class_name;
1396 ops[0].cls.class_len = (__u8) class_name_len;
1397 ops[0].cls.method_name = method_name;
1398 ops[0].cls.method_len = (__u8) method_name_len;
602adf40
YS
1399 ops[0].cls.argc = 0;
1400 ops[0].cls.indata = data;
1401 ops[0].cls.indata_len = len;
1402
0ce1a794 1403 ret = rbd_req_sync_op(rbd_dev, NULL,
602adf40
YS
1404 CEPH_NOSNAP,
1405 0,
1406 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1407 ops,
d1f57ea6 1408 object_name, 0, 0, NULL, NULL, ver);
602adf40
YS
1409
1410 rbd_destroy_ops(ops);
1411
1412 dout("cls_exec returned %d\n", ret);
1413 return ret;
1414}
1415
1fec7093
YS
1416static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1417{
1418 struct rbd_req_coll *coll =
1419 kzalloc(sizeof(struct rbd_req_coll) +
1420 sizeof(struct rbd_req_status) * num_reqs,
1421 GFP_ATOMIC);
1422
1423 if (!coll)
1424 return NULL;
1425 coll->total = num_reqs;
1426 kref_init(&coll->kref);
1427 return coll;
1428}
1429
602adf40
YS
1430/*
1431 * block device queue callback
1432 */
1433static void rbd_rq_fn(struct request_queue *q)
1434{
1435 struct rbd_device *rbd_dev = q->queuedata;
1436 struct request *rq;
1437 struct bio_pair *bp = NULL;
1438
00f1f36f 1439 while ((rq = blk_fetch_request(q))) {
602adf40
YS
1440 struct bio *bio;
1441 struct bio *rq_bio, *next_bio = NULL;
1442 bool do_write;
1443 int size, op_size = 0;
1444 u64 ofs;
1fec7093
YS
1445 int num_segs, cur_seg = 0;
1446 struct rbd_req_coll *coll;
d1d25646 1447 struct ceph_snap_context *snapc;
602adf40
YS
1448
1449 /* peek at request from block layer */
1450 if (!rq)
1451 break;
1452
1453 dout("fetched request\n");
1454
1455 /* filter out block requests we don't understand */
1456 if ((rq->cmd_type != REQ_TYPE_FS)) {
1457 __blk_end_request_all(rq, 0);
00f1f36f 1458 continue;
602adf40
YS
1459 }
1460
1461 /* deduce our operation (read, write) */
1462 do_write = (rq_data_dir(rq) == WRITE);
1463
1464 size = blk_rq_bytes(rq);
593a9e7b 1465 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
602adf40
YS
1466 rq_bio = rq->bio;
1467 if (do_write && rbd_dev->read_only) {
1468 __blk_end_request_all(rq, -EROFS);
00f1f36f 1469 continue;
602adf40
YS
1470 }
1471
1472 spin_unlock_irq(q->queue_lock);
1473
d1d25646 1474 down_read(&rbd_dev->header_rwsem);
e88a36ec 1475
d1d25646 1476 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
e88a36ec 1477 up_read(&rbd_dev->header_rwsem);
d1d25646
JD
1478 dout("request for non-existent snapshot");
1479 spin_lock_irq(q->queue_lock);
1480 __blk_end_request_all(rq, -ENXIO);
1481 continue;
e88a36ec
JD
1482 }
1483
d1d25646
JD
1484 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1485
1486 up_read(&rbd_dev->header_rwsem);
1487
602adf40
YS
1488 dout("%s 0x%x bytes at 0x%llx\n",
1489 do_write ? "write" : "read",
593a9e7b 1490 size, blk_rq_pos(rq) * SECTOR_SIZE);
602adf40 1491
1fec7093
YS
1492 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1493 coll = rbd_alloc_coll(num_segs);
1494 if (!coll) {
1495 spin_lock_irq(q->queue_lock);
1496 __blk_end_request_all(rq, -ENOMEM);
d1d25646 1497 ceph_put_snap_context(snapc);
00f1f36f 1498 continue;
1fec7093
YS
1499 }
1500
602adf40
YS
1501 do {
1502 /* a bio clone to be passed down to OSD req */
1503 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1504 op_size = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1505 rbd_dev->header.object_prefix,
602adf40
YS
1506 ofs, size,
1507 NULL, NULL);
1fec7093 1508 kref_get(&coll->kref);
602adf40
YS
1509 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1510 op_size, GFP_ATOMIC);
1511 if (!bio) {
1fec7093
YS
1512 rbd_coll_end_req_index(rq, coll, cur_seg,
1513 -ENOMEM, op_size);
1514 goto next_seg;
602adf40
YS
1515 }
1516
1fec7093 1517
602adf40
YS
1518 /* init OSD command: write or read */
1519 if (do_write)
1520 rbd_req_write(rq, rbd_dev,
d1d25646 1521 snapc,
602adf40 1522 ofs,
1fec7093
YS
1523 op_size, bio,
1524 coll, cur_seg);
602adf40
YS
1525 else
1526 rbd_req_read(rq, rbd_dev,
77dfe99f 1527 rbd_dev->snap_id,
602adf40 1528 ofs,
1fec7093
YS
1529 op_size, bio,
1530 coll, cur_seg);
602adf40 1531
1fec7093 1532next_seg:
602adf40
YS
1533 size -= op_size;
1534 ofs += op_size;
1535
1fec7093 1536 cur_seg++;
602adf40
YS
1537 rq_bio = next_bio;
1538 } while (size > 0);
1fec7093 1539 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1540
1541 if (bp)
1542 bio_pair_release(bp);
602adf40 1543 spin_lock_irq(q->queue_lock);
d1d25646
JD
1544
1545 ceph_put_snap_context(snapc);
602adf40
YS
1546 }
1547}
1548
1549/*
1550 * a queue callback. Makes sure that we don't create a bio that spans across
1551 * multiple osd objects. One exception would be with a single page bios,
1552 * which we handle later at bio_chain_clone
1553 */
1554static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1555 struct bio_vec *bvec)
1556{
1557 struct rbd_device *rbd_dev = q->queuedata;
593a9e7b
AE
1558 unsigned int chunk_sectors;
1559 sector_t sector;
1560 unsigned int bio_sectors;
602adf40
YS
1561 int max;
1562
593a9e7b
AE
1563 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1564 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1565 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1566
602adf40 1567 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
593a9e7b 1568 + bio_sectors)) << SECTOR_SHIFT;
602adf40
YS
1569 if (max < 0)
1570 max = 0; /* bio_add cannot handle a negative return */
1571 if (max <= bvec->bv_len && bio_sectors == 0)
1572 return bvec->bv_len;
1573 return max;
1574}
1575
1576static void rbd_free_disk(struct rbd_device *rbd_dev)
1577{
1578 struct gendisk *disk = rbd_dev->disk;
1579
1580 if (!disk)
1581 return;
1582
1583 rbd_header_free(&rbd_dev->header);
1584
1585 if (disk->flags & GENHD_FL_UP)
1586 del_gendisk(disk);
1587 if (disk->queue)
1588 blk_cleanup_queue(disk->queue);
1589 put_disk(disk);
1590}
1591
1592/*
1593 * reload the ondisk the header
1594 */
1595static int rbd_read_header(struct rbd_device *rbd_dev,
1596 struct rbd_image_header *header)
1597{
1598 ssize_t rc;
1599 struct rbd_image_header_ondisk *dh;
50f7c4c9 1600 u32 snap_count = 0;
59c2be1e 1601 u64 ver;
00f1f36f 1602 size_t len;
602adf40 1603
00f1f36f
AE
1604 /*
1605 * First reads the fixed-size header to determine the number
1606 * of snapshots, then re-reads it, along with all snapshot
1607 * records as well as their stored names.
1608 */
1609 len = sizeof (*dh);
602adf40 1610 while (1) {
602adf40
YS
1611 dh = kmalloc(len, GFP_KERNEL);
1612 if (!dh)
1613 return -ENOMEM;
1614
1615 rc = rbd_req_sync_read(rbd_dev,
1616 NULL, CEPH_NOSNAP,
0bed54dc 1617 rbd_dev->header_name,
602adf40 1618 0, len,
59c2be1e 1619 (char *)dh, &ver);
602adf40
YS
1620 if (rc < 0)
1621 goto out_dh;
1622
1623 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
81e759fb 1624 if (rc < 0) {
00f1f36f 1625 if (rc == -ENXIO)
81e759fb 1626 pr_warning("unrecognized header format"
0bed54dc
AE
1627 " for image %s\n",
1628 rbd_dev->image_name);
602adf40 1629 goto out_dh;
81e759fb 1630 }
602adf40 1631
00f1f36f
AE
1632 if (snap_count == header->total_snaps)
1633 break;
1634
1635 snap_count = header->total_snaps;
1636 len = sizeof (*dh) +
1637 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1638 header->snap_names_len;
1639
1640 rbd_header_free(header);
1641 kfree(dh);
602adf40 1642 }
59c2be1e 1643 header->obj_version = ver;
602adf40
YS
1644
1645out_dh:
1646 kfree(dh);
1647 return rc;
1648}
1649
1650/*
1651 * create a snapshot
1652 */
0ce1a794 1653static int rbd_header_add_snap(struct rbd_device *rbd_dev,
602adf40
YS
1654 const char *snap_name,
1655 gfp_t gfp_flags)
1656{
1657 int name_len = strlen(snap_name);
1658 u64 new_snapid;
1659 int ret;
916d4d67 1660 void *data, *p, *e;
59c2be1e 1661 u64 ver;
1dbb4399 1662 struct ceph_mon_client *monc;
602adf40
YS
1663
1664 /* we should create a snapshot only if we're pointing at the head */
0ce1a794 1665 if (rbd_dev->snap_id != CEPH_NOSNAP)
602adf40
YS
1666 return -EINVAL;
1667
0ce1a794
AE
1668 monc = &rbd_dev->rbd_client->client->monc;
1669 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
602adf40
YS
1670 dout("created snapid=%lld\n", new_snapid);
1671 if (ret < 0)
1672 return ret;
1673
1674 data = kmalloc(name_len + 16, gfp_flags);
1675 if (!data)
1676 return -ENOMEM;
1677
916d4d67
SW
1678 p = data;
1679 e = data + name_len + 16;
602adf40 1680
916d4d67
SW
1681 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1682 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40 1683
0bed54dc 1684 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
0ce1a794 1685 "rbd", "snap_add",
916d4d67 1686 data, p - data, &ver);
602adf40 1687
916d4d67 1688 kfree(data);
602adf40
YS
1689
1690 if (ret < 0)
1691 return ret;
1692
0ce1a794
AE
1693 down_write(&rbd_dev->header_rwsem);
1694 rbd_dev->header.snapc->seq = new_snapid;
1695 up_write(&rbd_dev->header_rwsem);
602adf40
YS
1696
1697 return 0;
1698bad:
1699 return -ERANGE;
1700}
1701
dfc5606d
YS
1702static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1703{
1704 struct rbd_snap *snap;
1705
1706 while (!list_empty(&rbd_dev->snaps)) {
1707 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1708 __rbd_remove_snap_dev(rbd_dev, snap);
1709 }
1710}
1711
602adf40
YS
1712/*
1713 * only read the first part of the ondisk header, without the snaps info
1714 */
263c6ca0 1715static int __rbd_refresh_header(struct rbd_device *rbd_dev)
602adf40
YS
1716{
1717 int ret;
1718 struct rbd_image_header h;
1719 u64 snap_seq;
59c2be1e 1720 int follow_seq = 0;
602adf40
YS
1721
1722 ret = rbd_read_header(rbd_dev, &h);
1723 if (ret < 0)
1724 return ret;
1725
a51aa0c0
JD
1726 down_write(&rbd_dev->header_rwsem);
1727
9db4b3e3 1728 /* resized? */
474ef7ce
JD
1729 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1730 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1731
1732 dout("setting size to %llu sectors", (unsigned long long) size);
1733 set_capacity(rbd_dev->disk, size);
1734 }
9db4b3e3 1735
602adf40 1736 snap_seq = rbd_dev->header.snapc->seq;
59c2be1e
YS
1737 if (rbd_dev->header.total_snaps &&
1738 rbd_dev->header.snapc->snaps[0] == snap_seq)
1739 /* pointing at the head, will need to follow that
1740 if head moves */
1741 follow_seq = 1;
602adf40 1742
849b4260 1743 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 1744 kfree(rbd_dev->header.snap_sizes);
849b4260 1745 kfree(rbd_dev->header.snap_names);
d1d25646
JD
1746 /* osd requests may still refer to snapc */
1747 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 1748
93a24e08 1749 rbd_dev->header.image_size = h.image_size;
602adf40
YS
1750 rbd_dev->header.total_snaps = h.total_snaps;
1751 rbd_dev->header.snapc = h.snapc;
1752 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1753 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1754 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
1755 /* Free the extra copy of the object prefix */
1756 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1757 kfree(h.object_prefix);
1758
59c2be1e
YS
1759 if (follow_seq)
1760 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1761 else
1762 rbd_dev->header.snapc->seq = snap_seq;
602adf40 1763
dfc5606d
YS
1764 ret = __rbd_init_snaps_header(rbd_dev);
1765
c666601a 1766 up_write(&rbd_dev->header_rwsem);
602adf40 1767
dfc5606d 1768 return ret;
602adf40
YS
1769}
1770
1771static int rbd_init_disk(struct rbd_device *rbd_dev)
1772{
1773 struct gendisk *disk;
1774 struct request_queue *q;
1775 int rc;
593a9e7b 1776 u64 segment_size;
602adf40
YS
1777 u64 total_size = 0;
1778
1779 /* contact OSD, request size info about the object being mapped */
1780 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1781 if (rc)
1782 return rc;
1783
dfc5606d
YS
1784 /* no need to lock here, as rbd_dev is not registered yet */
1785 rc = __rbd_init_snaps_header(rbd_dev);
1786 if (rc)
1787 return rc;
1788
cc9d734c 1789 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1790 if (rc)
1791 return rc;
1792
1793 /* create gendisk info */
1794 rc = -ENOMEM;
1795 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1796 if (!disk)
1797 goto out;
1798
f0f8cef5 1799 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
aedfec59 1800 rbd_dev->id);
602adf40
YS
1801 disk->major = rbd_dev->major;
1802 disk->first_minor = 0;
1803 disk->fops = &rbd_bd_ops;
1804 disk->private_data = rbd_dev;
1805
1806 /* init rq */
1807 rc = -ENOMEM;
1808 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1809 if (!q)
1810 goto out_disk;
029bcbd8 1811
593a9e7b
AE
1812 /* We use the default size, but let's be explicit about it. */
1813 blk_queue_physical_block_size(q, SECTOR_SIZE);
1814
029bcbd8 1815 /* set io sizes to object size */
593a9e7b
AE
1816 segment_size = rbd_obj_bytes(&rbd_dev->header);
1817 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1818 blk_queue_max_segment_size(q, segment_size);
1819 blk_queue_io_min(q, segment_size);
1820 blk_queue_io_opt(q, segment_size);
029bcbd8 1821
602adf40
YS
1822 blk_queue_merge_bvec(q, rbd_merge_bvec);
1823 disk->queue = q;
1824
1825 q->queuedata = rbd_dev;
1826
1827 rbd_dev->disk = disk;
1828 rbd_dev->q = q;
1829
1830 /* finally, announce the disk to the world */
593a9e7b 1831 set_capacity(disk, total_size / SECTOR_SIZE);
602adf40
YS
1832 add_disk(disk);
1833
1834 pr_info("%s: added with size 0x%llx\n",
1835 disk->disk_name, (unsigned long long)total_size);
1836 return 0;
1837
1838out_disk:
1839 put_disk(disk);
1840out:
1841 return rc;
1842}
1843
dfc5606d
YS
1844/*
1845 sysfs
1846*/
1847
593a9e7b
AE
1848static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1849{
1850 return container_of(dev, struct rbd_device, dev);
1851}
1852
dfc5606d
YS
1853static ssize_t rbd_size_show(struct device *dev,
1854 struct device_attribute *attr, char *buf)
1855{
593a9e7b 1856 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
1857 sector_t size;
1858
1859 down_read(&rbd_dev->header_rwsem);
1860 size = get_capacity(rbd_dev->disk);
1861 up_read(&rbd_dev->header_rwsem);
dfc5606d 1862
a51aa0c0 1863 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
1864}
1865
1866static ssize_t rbd_major_show(struct device *dev,
1867 struct device_attribute *attr, char *buf)
1868{
593a9e7b 1869 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 1870
dfc5606d
YS
1871 return sprintf(buf, "%d\n", rbd_dev->major);
1872}
1873
1874static ssize_t rbd_client_id_show(struct device *dev,
1875 struct device_attribute *attr, char *buf)
602adf40 1876{
593a9e7b 1877 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1878
1dbb4399
AE
1879 return sprintf(buf, "client%lld\n",
1880 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1881}
1882
dfc5606d
YS
1883static ssize_t rbd_pool_show(struct device *dev,
1884 struct device_attribute *attr, char *buf)
602adf40 1885{
593a9e7b 1886 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1887
1888 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1889}
1890
9bb2f334
AE
1891static ssize_t rbd_pool_id_show(struct device *dev,
1892 struct device_attribute *attr, char *buf)
1893{
1894 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895
1896 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1897}
1898
dfc5606d
YS
1899static ssize_t rbd_name_show(struct device *dev,
1900 struct device_attribute *attr, char *buf)
1901{
593a9e7b 1902 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1903
0bed54dc 1904 return sprintf(buf, "%s\n", rbd_dev->image_name);
dfc5606d
YS
1905}
1906
1907static ssize_t rbd_snap_show(struct device *dev,
1908 struct device_attribute *attr,
1909 char *buf)
1910{
593a9e7b 1911 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1912
1913 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1914}
1915
1916static ssize_t rbd_image_refresh(struct device *dev,
1917 struct device_attribute *attr,
1918 const char *buf,
1919 size_t size)
1920{
593a9e7b 1921 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1922 int rc;
1923 int ret = size;
602adf40
YS
1924
1925 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1926
263c6ca0 1927 rc = __rbd_refresh_header(rbd_dev);
dfc5606d
YS
1928 if (rc < 0)
1929 ret = rc;
602adf40 1930
dfc5606d
YS
1931 mutex_unlock(&ctl_mutex);
1932 return ret;
1933}
602adf40 1934
dfc5606d
YS
1935static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1936static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1937static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1938static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 1939static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d
YS
1940static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1941static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1942static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1943static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1944
1945static struct attribute *rbd_attrs[] = {
1946 &dev_attr_size.attr,
1947 &dev_attr_major.attr,
1948 &dev_attr_client_id.attr,
1949 &dev_attr_pool.attr,
9bb2f334 1950 &dev_attr_pool_id.attr,
dfc5606d
YS
1951 &dev_attr_name.attr,
1952 &dev_attr_current_snap.attr,
1953 &dev_attr_refresh.attr,
1954 &dev_attr_create_snap.attr,
dfc5606d
YS
1955 NULL
1956};
1957
1958static struct attribute_group rbd_attr_group = {
1959 .attrs = rbd_attrs,
1960};
1961
1962static const struct attribute_group *rbd_attr_groups[] = {
1963 &rbd_attr_group,
1964 NULL
1965};
1966
1967static void rbd_sysfs_dev_release(struct device *dev)
1968{
1969}
1970
1971static struct device_type rbd_device_type = {
1972 .name = "rbd",
1973 .groups = rbd_attr_groups,
1974 .release = rbd_sysfs_dev_release,
1975};
1976
1977
1978/*
1979 sysfs - snapshots
1980*/
1981
1982static ssize_t rbd_snap_size_show(struct device *dev,
1983 struct device_attribute *attr,
1984 char *buf)
1985{
1986 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1987
3591538f 1988 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
1989}
1990
1991static ssize_t rbd_snap_id_show(struct device *dev,
1992 struct device_attribute *attr,
1993 char *buf)
1994{
1995 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1996
3591538f 1997 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
1998}
1999
2000static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
2001static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
2002
2003static struct attribute *rbd_snap_attrs[] = {
2004 &dev_attr_snap_size.attr,
2005 &dev_attr_snap_id.attr,
2006 NULL,
2007};
2008
2009static struct attribute_group rbd_snap_attr_group = {
2010 .attrs = rbd_snap_attrs,
2011};
2012
2013static void rbd_snap_dev_release(struct device *dev)
2014{
2015 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2016 kfree(snap->name);
2017 kfree(snap);
2018}
2019
2020static const struct attribute_group *rbd_snap_attr_groups[] = {
2021 &rbd_snap_attr_group,
2022 NULL
2023};
2024
2025static struct device_type rbd_snap_device_type = {
2026 .groups = rbd_snap_attr_groups,
2027 .release = rbd_snap_dev_release,
2028};
2029
2030static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2031 struct rbd_snap *snap)
2032{
2033 list_del(&snap->node);
2034 device_unregister(&snap->dev);
2035}
2036
2037static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2038 struct rbd_snap *snap,
2039 struct device *parent)
2040{
2041 struct device *dev = &snap->dev;
2042 int ret;
2043
2044 dev->type = &rbd_snap_device_type;
2045 dev->parent = parent;
2046 dev->release = rbd_snap_dev_release;
2047 dev_set_name(dev, "snap_%s", snap->name);
2048 ret = device_register(dev);
2049
2050 return ret;
2051}
2052
2053static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2054 int i, const char *name,
2055 struct rbd_snap **snapp)
2056{
2057 int ret;
2058 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2059 if (!snap)
2060 return -ENOMEM;
2061 snap->name = kstrdup(name, GFP_KERNEL);
2062 snap->size = rbd_dev->header.snap_sizes[i];
2063 snap->id = rbd_dev->header.snapc->snaps[i];
2064 if (device_is_registered(&rbd_dev->dev)) {
2065 ret = rbd_register_snap_dev(rbd_dev, snap,
2066 &rbd_dev->dev);
2067 if (ret < 0)
2068 goto err;
2069 }
2070 *snapp = snap;
2071 return 0;
2072err:
2073 kfree(snap->name);
2074 kfree(snap);
2075 return ret;
2076}
2077
2078/*
2079 * search for the previous snap in a null delimited string list
2080 */
2081const char *rbd_prev_snap_name(const char *name, const char *start)
2082{
2083 if (name < start + 2)
2084 return NULL;
2085
2086 name -= 2;
2087 while (*name) {
2088 if (name == start)
2089 return start;
2090 name--;
2091 }
2092 return name + 1;
2093}
2094
2095/*
2096 * compare the old list of snapshots that we have to what's in the header
2097 * and update it accordingly. Note that the header holds the snapshots
2098 * in a reverse order (from newest to oldest) and we need to go from
2099 * older to new so that we don't get a duplicate snap name when
2100 * doing the process (e.g., removed snapshot and recreated a new
2101 * one with the same name.
2102 */
2103static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2104{
2105 const char *name, *first_name;
2106 int i = rbd_dev->header.total_snaps;
2107 struct rbd_snap *snap, *old_snap = NULL;
2108 int ret;
2109 struct list_head *p, *n;
2110
2111 first_name = rbd_dev->header.snap_names;
2112 name = first_name + rbd_dev->header.snap_names_len;
2113
2114 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2115 u64 cur_id;
2116
2117 old_snap = list_entry(p, struct rbd_snap, node);
2118
2119 if (i)
2120 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2121
2122 if (!i || old_snap->id < cur_id) {
e88a36ec
JD
2123 /*
2124 * old_snap->id was skipped, thus was
2125 * removed. If this rbd_dev is mapped to
2126 * the removed snapshot, record that it no
2127 * longer exists, to prevent further I/O.
2128 */
2129 if (rbd_dev->snap_id == old_snap->id)
2130 rbd_dev->snap_exists = false;
dfc5606d
YS
2131 __rbd_remove_snap_dev(rbd_dev, old_snap);
2132 continue;
2133 }
2134 if (old_snap->id == cur_id) {
2135 /* we have this snapshot already */
2136 i--;
2137 name = rbd_prev_snap_name(name, first_name);
2138 continue;
2139 }
2140 for (; i > 0;
2141 i--, name = rbd_prev_snap_name(name, first_name)) {
2142 if (!name) {
2143 WARN_ON(1);
2144 return -EINVAL;
2145 }
2146 cur_id = rbd_dev->header.snapc->snaps[i];
2147 /* snapshot removal? handle it above */
2148 if (cur_id >= old_snap->id)
2149 break;
2150 /* a new snapshot */
2151 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2152 if (ret < 0)
2153 return ret;
2154
2155 /* note that we add it backward so using n and not p */
2156 list_add(&snap->node, n);
2157 p = &snap->node;
2158 }
2159 }
2160 /* we're done going over the old snap list, just add what's left */
2161 for (; i > 0; i--) {
2162 name = rbd_prev_snap_name(name, first_name);
2163 if (!name) {
2164 WARN_ON(1);
2165 return -EINVAL;
2166 }
2167 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2168 if (ret < 0)
2169 return ret;
2170 list_add(&snap->node, &rbd_dev->snaps);
2171 }
2172
2173 return 0;
2174}
2175
dfc5606d
YS
2176static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2177{
f0f8cef5 2178 int ret;
dfc5606d
YS
2179 struct device *dev;
2180 struct rbd_snap *snap;
2181
2182 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2183 dev = &rbd_dev->dev;
2184
2185 dev->bus = &rbd_bus_type;
2186 dev->type = &rbd_device_type;
2187 dev->parent = &rbd_root_dev;
2188 dev->release = rbd_dev_release;
2189 dev_set_name(dev, "%d", rbd_dev->id);
2190 ret = device_register(dev);
2191 if (ret < 0)
f0f8cef5 2192 goto out;
dfc5606d
YS
2193
2194 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2195 ret = rbd_register_snap_dev(rbd_dev, snap,
2196 &rbd_dev->dev);
2197 if (ret < 0)
602adf40
YS
2198 break;
2199 }
f0f8cef5 2200out:
dfc5606d
YS
2201 mutex_unlock(&ctl_mutex);
2202 return ret;
602adf40
YS
2203}
2204
dfc5606d
YS
2205static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2206{
2207 device_unregister(&rbd_dev->dev);
2208}
2209
59c2be1e
YS
2210static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2211{
2212 int ret, rc;
2213
2214 do {
0bed54dc 2215 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->header_name,
59c2be1e
YS
2216 rbd_dev->header.obj_version);
2217 if (ret == -ERANGE) {
2218 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
263c6ca0 2219 rc = __rbd_refresh_header(rbd_dev);
59c2be1e
YS
2220 mutex_unlock(&ctl_mutex);
2221 if (rc < 0)
2222 return rc;
2223 }
2224 } while (ret == -ERANGE);
2225
2226 return ret;
2227}
2228
1ddbe94e
AE
2229static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2230
2231/*
499afd5b
AE
2232 * Get a unique rbd identifier for the given new rbd_dev, and add
2233 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2234 */
499afd5b 2235static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2236{
499afd5b
AE
2237 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2238
2239 spin_lock(&rbd_dev_list_lock);
2240 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2241 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2242}
b7f23c36 2243
1ddbe94e 2244/*
499afd5b
AE
2245 * Remove an rbd_dev from the global list, and record that its
2246 * identifier is no longer in use.
1ddbe94e 2247 */
499afd5b 2248static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2249{
d184f6bf
AE
2250 struct list_head *tmp;
2251 int rbd_id = rbd_dev->id;
2252 int max_id;
2253
2254 BUG_ON(rbd_id < 1);
499afd5b
AE
2255
2256 spin_lock(&rbd_dev_list_lock);
2257 list_del_init(&rbd_dev->node);
d184f6bf
AE
2258
2259 /*
2260 * If the id being "put" is not the current maximum, there
2261 * is nothing special we need to do.
2262 */
2263 if (rbd_id != atomic64_read(&rbd_id_max)) {
2264 spin_unlock(&rbd_dev_list_lock);
2265 return;
2266 }
2267
2268 /*
2269 * We need to update the current maximum id. Search the
2270 * list to find out what it is. We're more likely to find
2271 * the maximum at the end, so search the list backward.
2272 */
2273 max_id = 0;
2274 list_for_each_prev(tmp, &rbd_dev_list) {
2275 struct rbd_device *rbd_dev;
2276
2277 rbd_dev = list_entry(tmp, struct rbd_device, node);
2278 if (rbd_id > max_id)
2279 max_id = rbd_id;
2280 }
499afd5b 2281 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2282
1ddbe94e 2283 /*
d184f6bf
AE
2284 * The max id could have been updated by rbd_id_get(), in
2285 * which case it now accurately reflects the new maximum.
2286 * Be careful not to overwrite the maximum value in that
2287 * case.
1ddbe94e 2288 */
d184f6bf 2289 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2290}
2291
e28fff26
AE
2292/*
2293 * Skips over white space at *buf, and updates *buf to point to the
2294 * first found non-space character (if any). Returns the length of
593a9e7b
AE
2295 * the token (string of non-white space characters) found. Note
2296 * that *buf must be terminated with '\0'.
e28fff26
AE
2297 */
2298static inline size_t next_token(const char **buf)
2299{
2300 /*
2301 * These are the characters that produce nonzero for
2302 * isspace() in the "C" and "POSIX" locales.
2303 */
2304 const char *spaces = " \f\n\r\t\v";
2305
2306 *buf += strspn(*buf, spaces); /* Find start of token */
2307
2308 return strcspn(*buf, spaces); /* Return token length */
2309}
2310
2311/*
2312 * Finds the next token in *buf, and if the provided token buffer is
2313 * big enough, copies the found token into it. The result, if
593a9e7b
AE
2314 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2315 * must be terminated with '\0' on entry.
e28fff26
AE
2316 *
2317 * Returns the length of the token found (not including the '\0').
2318 * Return value will be 0 if no token is found, and it will be >=
2319 * token_size if the token would not fit.
2320 *
593a9e7b 2321 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
2322 * found token. Note that this occurs even if the token buffer is
2323 * too small to hold it.
2324 */
2325static inline size_t copy_token(const char **buf,
2326 char *token,
2327 size_t token_size)
2328{
2329 size_t len;
2330
2331 len = next_token(buf);
2332 if (len < token_size) {
2333 memcpy(token, *buf, len);
2334 *(token + len) = '\0';
2335 }
2336 *buf += len;
2337
2338 return len;
2339}
2340
ea3352f4
AE
2341/*
2342 * Finds the next token in *buf, dynamically allocates a buffer big
2343 * enough to hold a copy of it, and copies the token into the new
2344 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2345 * that a duplicate buffer is created even for a zero-length token.
2346 *
2347 * Returns a pointer to the newly-allocated duplicate, or a null
2348 * pointer if memory for the duplicate was not available. If
2349 * the lenp argument is a non-null pointer, the length of the token
2350 * (not including the '\0') is returned in *lenp.
2351 *
2352 * If successful, the *buf pointer will be updated to point beyond
2353 * the end of the found token.
2354 *
2355 * Note: uses GFP_KERNEL for allocation.
2356 */
2357static inline char *dup_token(const char **buf, size_t *lenp)
2358{
2359 char *dup;
2360 size_t len;
2361
2362 len = next_token(buf);
2363 dup = kmalloc(len + 1, GFP_KERNEL);
2364 if (!dup)
2365 return NULL;
2366
2367 memcpy(dup, *buf, len);
2368 *(dup + len) = '\0';
2369 *buf += len;
2370
2371 if (lenp)
2372 *lenp = len;
2373
2374 return dup;
2375}
2376
a725f65e 2377/*
0bed54dc 2378 * This fills in the pool_name, image_name, image_name_len, snap_name,
a725f65e
AE
2379 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2380 * on the list of monitor addresses and other options provided via
2381 * /sys/bus/rbd/add.
d22f76e7
AE
2382 *
2383 * Note: rbd_dev is assumed to have been initially zero-filled.
a725f65e
AE
2384 */
2385static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2386 const char *buf,
7ef3214a 2387 const char **mon_addrs,
5214ecc4 2388 size_t *mon_addrs_size,
e28fff26 2389 char *options,
0bed54dc 2390 size_t options_size)
e28fff26 2391{
d22f76e7
AE
2392 size_t len;
2393 int ret;
e28fff26
AE
2394
2395 /* The first four tokens are required */
2396
7ef3214a
AE
2397 len = next_token(&buf);
2398 if (!len)
a725f65e 2399 return -EINVAL;
5214ecc4 2400 *mon_addrs_size = len + 1;
7ef3214a
AE
2401 *mon_addrs = buf;
2402
2403 buf += len;
a725f65e 2404
e28fff26
AE
2405 len = copy_token(&buf, options, options_size);
2406 if (!len || len >= options_size)
2407 return -EINVAL;
2408
bf3e5ae1 2409 ret = -ENOMEM;
d22f76e7
AE
2410 rbd_dev->pool_name = dup_token(&buf, NULL);
2411 if (!rbd_dev->pool_name)
d22f76e7 2412 goto out_err;
e28fff26 2413
0bed54dc
AE
2414 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2415 if (!rbd_dev->image_name)
bf3e5ae1 2416 goto out_err;
a725f65e 2417
cb8627c7
AE
2418 /* Create the name of the header object */
2419
0bed54dc 2420 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
bf3e5ae1
AE
2421 + sizeof (RBD_SUFFIX),
2422 GFP_KERNEL);
0bed54dc 2423 if (!rbd_dev->header_name)
cb8627c7 2424 goto out_err;
0bed54dc 2425 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
a725f65e 2426
e28fff26 2427 /*
820a5f3e
AE
2428 * The snapshot name is optional. If none is is supplied,
2429 * we use the default value.
e28fff26 2430 */
820a5f3e
AE
2431 rbd_dev->snap_name = dup_token(&buf, &len);
2432 if (!rbd_dev->snap_name)
2433 goto out_err;
2434 if (!len) {
2435 /* Replace the empty name with the default */
2436 kfree(rbd_dev->snap_name);
2437 rbd_dev->snap_name
2438 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2439 if (!rbd_dev->snap_name)
2440 goto out_err;
2441
e28fff26
AE
2442 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2443 sizeof (RBD_SNAP_HEAD_NAME));
849b4260 2444 }
e28fff26 2445
a725f65e 2446 return 0;
d22f76e7
AE
2447
2448out_err:
0bed54dc
AE
2449 kfree(rbd_dev->header_name);
2450 kfree(rbd_dev->image_name);
d22f76e7
AE
2451 kfree(rbd_dev->pool_name);
2452 rbd_dev->pool_name = NULL;
2453
2454 return ret;
a725f65e
AE
2455}
2456
59c2be1e
YS
2457static ssize_t rbd_add(struct bus_type *bus,
2458 const char *buf,
2459 size_t count)
602adf40 2460{
cb8627c7
AE
2461 char *options;
2462 struct rbd_device *rbd_dev = NULL;
7ef3214a
AE
2463 const char *mon_addrs = NULL;
2464 size_t mon_addrs_size = 0;
27cc2594
AE
2465 struct ceph_osd_client *osdc;
2466 int rc = -ENOMEM;
602adf40
YS
2467
2468 if (!try_module_get(THIS_MODULE))
2469 return -ENODEV;
2470
60571c7d 2471 options = kmalloc(count, GFP_KERNEL);
602adf40 2472 if (!options)
27cc2594 2473 goto err_nomem;
cb8627c7
AE
2474 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2475 if (!rbd_dev)
2476 goto err_nomem;
602adf40
YS
2477
2478 /* static rbd_device initialization */
2479 spin_lock_init(&rbd_dev->lock);
2480 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2481 INIT_LIST_HEAD(&rbd_dev->snaps);
c666601a 2482 init_rwsem(&rbd_dev->header_rwsem);
602adf40 2483
c666601a 2484 init_rwsem(&rbd_dev->header_rwsem);
0e805a1d 2485
d184f6bf 2486 /* generate unique id: find highest unique id, add one */
499afd5b 2487 rbd_id_get(rbd_dev);
602adf40 2488
a725f65e 2489 /* Fill in the device name, now that we have its id. */
81a89793
AE
2490 BUILD_BUG_ON(DEV_NAME_LEN
2491 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2492 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
a725f65e 2493
602adf40 2494 /* parse add command */
7ef3214a 2495 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
e28fff26 2496 options, count);
a725f65e 2497 if (rc)
f0f8cef5 2498 goto err_put_id;
e124a82f 2499
5214ecc4
AE
2500 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2501 options);
d720bcb0
AE
2502 if (IS_ERR(rbd_dev->rbd_client)) {
2503 rc = PTR_ERR(rbd_dev->rbd_client);
f0f8cef5 2504 goto err_put_id;
d720bcb0 2505 }
602adf40 2506
602adf40 2507 /* pick the pool */
1dbb4399 2508 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2509 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2510 if (rc < 0)
2511 goto err_out_client;
9bb2f334 2512 rbd_dev->pool_id = rc;
602adf40
YS
2513
2514 /* register our block device */
27cc2594
AE
2515 rc = register_blkdev(0, rbd_dev->name);
2516 if (rc < 0)
602adf40 2517 goto err_out_client;
27cc2594 2518 rbd_dev->major = rc;
602adf40 2519
dfc5606d
YS
2520 rc = rbd_bus_add_dev(rbd_dev);
2521 if (rc)
766fc439
YS
2522 goto err_out_blkdev;
2523
32eec68d
AE
2524 /*
2525 * At this point cleanup in the event of an error is the job
2526 * of the sysfs code (initiated by rbd_bus_del_dev()).
2527 *
2528 * Set up and announce blkdev mapping.
2529 */
602adf40
YS
2530 rc = rbd_init_disk(rbd_dev);
2531 if (rc)
766fc439 2532 goto err_out_bus;
602adf40 2533
59c2be1e
YS
2534 rc = rbd_init_watch_dev(rbd_dev);
2535 if (rc)
2536 goto err_out_bus;
2537
602adf40
YS
2538 return count;
2539
766fc439 2540err_out_bus:
766fc439
YS
2541 /* this will also clean up rest of rbd_dev stuff */
2542
2543 rbd_bus_del_dev(rbd_dev);
2544 kfree(options);
766fc439
YS
2545 return rc;
2546
602adf40
YS
2547err_out_blkdev:
2548 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2549err_out_client:
2550 rbd_put_client(rbd_dev);
f0f8cef5 2551err_put_id:
cb8627c7 2552 if (rbd_dev->pool_name) {
820a5f3e 2553 kfree(rbd_dev->snap_name);
0bed54dc
AE
2554 kfree(rbd_dev->header_name);
2555 kfree(rbd_dev->image_name);
cb8627c7
AE
2556 kfree(rbd_dev->pool_name);
2557 }
499afd5b 2558 rbd_id_put(rbd_dev);
27cc2594 2559err_nomem:
27cc2594 2560 kfree(rbd_dev);
cb8627c7 2561 kfree(options);
27cc2594 2562
602adf40
YS
2563 dout("Error adding device %s\n", buf);
2564 module_put(THIS_MODULE);
27cc2594
AE
2565
2566 return (ssize_t) rc;
602adf40
YS
2567}
2568
2569static struct rbd_device *__rbd_get_dev(unsigned long id)
2570{
2571 struct list_head *tmp;
2572 struct rbd_device *rbd_dev;
2573
e124a82f 2574 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2575 list_for_each(tmp, &rbd_dev_list) {
2576 rbd_dev = list_entry(tmp, struct rbd_device, node);
e124a82f
AE
2577 if (rbd_dev->id == id) {
2578 spin_unlock(&rbd_dev_list_lock);
602adf40 2579 return rbd_dev;
e124a82f 2580 }
602adf40 2581 }
e124a82f 2582 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2583 return NULL;
2584}
2585
dfc5606d 2586static void rbd_dev_release(struct device *dev)
602adf40 2587{
593a9e7b 2588 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2589
1dbb4399
AE
2590 if (rbd_dev->watch_request) {
2591 struct ceph_client *client = rbd_dev->rbd_client->client;
2592
2593 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2594 rbd_dev->watch_request);
1dbb4399 2595 }
59c2be1e 2596 if (rbd_dev->watch_event)
0bed54dc 2597 rbd_req_sync_unwatch(rbd_dev, rbd_dev->header_name);
59c2be1e 2598
602adf40
YS
2599 rbd_put_client(rbd_dev);
2600
2601 /* clean up and free blkdev */
2602 rbd_free_disk(rbd_dev);
2603 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d
AE
2604
2605 /* done with the id, and with the rbd_dev */
820a5f3e 2606 kfree(rbd_dev->snap_name);
0bed54dc 2607 kfree(rbd_dev->header_name);
d22f76e7 2608 kfree(rbd_dev->pool_name);
0bed54dc 2609 kfree(rbd_dev->image_name);
32eec68d 2610 rbd_id_put(rbd_dev);
602adf40
YS
2611 kfree(rbd_dev);
2612
2613 /* release module ref */
2614 module_put(THIS_MODULE);
602adf40
YS
2615}
2616
dfc5606d
YS
2617static ssize_t rbd_remove(struct bus_type *bus,
2618 const char *buf,
2619 size_t count)
602adf40
YS
2620{
2621 struct rbd_device *rbd_dev = NULL;
2622 int target_id, rc;
2623 unsigned long ul;
2624 int ret = count;
2625
2626 rc = strict_strtoul(buf, 10, &ul);
2627 if (rc)
2628 return rc;
2629
2630 /* convert to int; abort if we lost anything in the conversion */
2631 target_id = (int) ul;
2632 if (target_id != ul)
2633 return -EINVAL;
2634
2635 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2636
2637 rbd_dev = __rbd_get_dev(target_id);
2638 if (!rbd_dev) {
2639 ret = -ENOENT;
2640 goto done;
2641 }
2642
dfc5606d
YS
2643 __rbd_remove_all_snaps(rbd_dev);
2644 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2645
2646done:
2647 mutex_unlock(&ctl_mutex);
2648 return ret;
2649}
2650
dfc5606d
YS
2651static ssize_t rbd_snap_add(struct device *dev,
2652 struct device_attribute *attr,
2653 const char *buf,
2654 size_t count)
602adf40 2655{
593a9e7b 2656 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2657 int ret;
2658 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2659 if (!name)
2660 return -ENOMEM;
2661
dfc5606d 2662 snprintf(name, count, "%s", buf);
602adf40
YS
2663
2664 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2665
602adf40
YS
2666 ret = rbd_header_add_snap(rbd_dev,
2667 name, GFP_KERNEL);
2668 if (ret < 0)
59c2be1e 2669 goto err_unlock;
602adf40 2670
263c6ca0 2671 ret = __rbd_refresh_header(rbd_dev);
602adf40 2672 if (ret < 0)
59c2be1e
YS
2673 goto err_unlock;
2674
2675 /* shouldn't hold ctl_mutex when notifying.. notify might
2676 trigger a watch callback that would need to get that mutex */
2677 mutex_unlock(&ctl_mutex);
2678
2679 /* make a best effort, don't error if failed */
0bed54dc 2680 rbd_req_sync_notify(rbd_dev, rbd_dev->header_name);
602adf40
YS
2681
2682 ret = count;
59c2be1e
YS
2683 kfree(name);
2684 return ret;
2685
2686err_unlock:
602adf40 2687 mutex_unlock(&ctl_mutex);
602adf40
YS
2688 kfree(name);
2689 return ret;
2690}
2691
602adf40
YS
2692/*
2693 * create control files in sysfs
dfc5606d 2694 * /sys/bus/rbd/...
602adf40
YS
2695 */
2696static int rbd_sysfs_init(void)
2697{
dfc5606d 2698 int ret;
602adf40 2699
fed4c143 2700 ret = device_register(&rbd_root_dev);
21079786 2701 if (ret < 0)
dfc5606d 2702 return ret;
602adf40 2703
fed4c143
AE
2704 ret = bus_register(&rbd_bus_type);
2705 if (ret < 0)
2706 device_unregister(&rbd_root_dev);
602adf40 2707
602adf40
YS
2708 return ret;
2709}
2710
2711static void rbd_sysfs_cleanup(void)
2712{
dfc5606d 2713 bus_unregister(&rbd_bus_type);
fed4c143 2714 device_unregister(&rbd_root_dev);
602adf40
YS
2715}
2716
2717int __init rbd_init(void)
2718{
2719 int rc;
2720
2721 rc = rbd_sysfs_init();
2722 if (rc)
2723 return rc;
f0f8cef5 2724 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2725 return 0;
2726}
2727
2728void __exit rbd_exit(void)
2729{
2730 rbd_sysfs_cleanup();
2731}
2732
2733module_init(rbd_init);
2734module_exit(rbd_exit);
2735
2736MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2737MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2738MODULE_DESCRIPTION("rados block device");
2739
2740/* following authorship retained from original osdblk.c */
2741MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2742
2743MODULE_LICENSE("GPL");