]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/rbd.c
rbd: don't use snapc->seq that way
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
593a9e7b
AE
44/*
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
49 */
50#define SECTOR_SHIFT 9
51#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
52
f0f8cef5
AE
53#define RBD_DRV_NAME "rbd"
54#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
55
56#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
57
602adf40
YS
58#define RBD_MAX_SNAP_NAME_LEN 32
59#define RBD_MAX_OPT_LEN 1024
60
61#define RBD_SNAP_HEAD_NAME "-"
62
81a89793
AE
63/*
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
68 */
602adf40 69#define DEV_NAME_LEN 32
81a89793 70#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40 71
59c2be1e
YS
72#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
602adf40
YS
74/*
75 * block device image metadata (in-memory version)
76 */
77struct rbd_image_header {
78 u64 image_size;
849b4260 79 char *object_prefix;
602adf40
YS
80 __u8 obj_order;
81 __u8 crypt_type;
82 __u8 comp_type;
602adf40
YS
83 struct ceph_snap_context *snapc;
84 size_t snap_names_len;
85 u64 snap_seq;
86 u32 total_snaps;
87
88 char *snap_names;
89 u64 *snap_sizes;
59c2be1e
YS
90
91 u64 obj_version;
92};
93
94struct rbd_options {
95 int notify_timeout;
602adf40
YS
96};
97
98/*
f0f8cef5 99 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
100 */
101struct rbd_client {
102 struct ceph_client *client;
59c2be1e 103 struct rbd_options *rbd_opts;
602adf40
YS
104 struct kref kref;
105 struct list_head node;
106};
107
108/*
f0f8cef5 109 * a request completion status
602adf40 110 */
1fec7093
YS
111struct rbd_req_status {
112 int done;
113 int rc;
114 u64 bytes;
115};
116
117/*
118 * a collection of requests
119 */
120struct rbd_req_coll {
121 int total;
122 int num_done;
123 struct kref kref;
124 struct rbd_req_status status[0];
602adf40
YS
125};
126
f0f8cef5
AE
127/*
128 * a single io request
129 */
130struct rbd_request {
131 struct request *rq; /* blk layer request */
132 struct bio *bio; /* cloned bio */
133 struct page **pages; /* list of used pages */
134 u64 len;
135 int coll_index;
136 struct rbd_req_coll *coll;
137};
138
dfc5606d
YS
139struct rbd_snap {
140 struct device dev;
141 const char *name;
3591538f 142 u64 size;
dfc5606d
YS
143 struct list_head node;
144 u64 id;
145};
146
602adf40
YS
147/*
148 * a single device
149 */
150struct rbd_device {
151 int id; /* blkdev unique id */
152
153 int major; /* blkdev assigned major */
154 struct gendisk *disk; /* blkdev's gendisk and rq */
155 struct request_queue *q;
156
602adf40
YS
157 struct rbd_client *rbd_client;
158
159 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160
161 spinlock_t lock; /* queue lock */
162
163 struct rbd_image_header header;
0bed54dc
AE
164 char *image_name;
165 size_t image_name_len;
166 char *header_name;
d22f76e7 167 char *pool_name;
9bb2f334 168 int pool_id;
602adf40 169
59c2be1e
YS
170 struct ceph_osd_event *watch_event;
171 struct ceph_osd_request *watch_request;
172
c666601a
JD
173 /* protects updating the header */
174 struct rw_semaphore header_rwsem;
e88a36ec 175 /* name of the snapshot this device reads from */
820a5f3e 176 char *snap_name;
e88a36ec 177 /* id of the snapshot this device reads from */
77dfe99f 178 u64 snap_id; /* current snapshot id */
e88a36ec
JD
179 /* whether the snap_id this device reads from still exists */
180 bool snap_exists;
181 int read_only;
602adf40
YS
182
183 struct list_head node;
dfc5606d
YS
184
185 /* list of snapshots */
186 struct list_head snaps;
187
188 /* sysfs related */
189 struct device dev;
190};
191
602adf40 192static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 193
602adf40 194static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
195static DEFINE_SPINLOCK(rbd_dev_list_lock);
196
432b8587
AE
197static LIST_HEAD(rbd_client_list); /* clients */
198static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 199
dfc5606d
YS
200static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
201static void rbd_dev_release(struct device *dev);
dfc5606d
YS
202static ssize_t rbd_snap_add(struct device *dev,
203 struct device_attribute *attr,
204 const char *buf,
205 size_t count);
206static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
69932487 207 struct rbd_snap *snap);
dfc5606d 208
f0f8cef5
AE
209static ssize_t rbd_add(struct bus_type *bus, const char *buf,
210 size_t count);
211static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
212 size_t count);
213
214static struct bus_attribute rbd_bus_attrs[] = {
215 __ATTR(add, S_IWUSR, NULL, rbd_add),
216 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
217 __ATTR_NULL
218};
219
220static struct bus_type rbd_bus_type = {
221 .name = "rbd",
222 .bus_attrs = rbd_bus_attrs,
223};
224
225static void rbd_root_dev_release(struct device *dev)
226{
227}
228
229static struct device rbd_root_dev = {
230 .init_name = "rbd",
231 .release = rbd_root_dev_release,
232};
233
dfc5606d 234
dfc5606d
YS
235static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
236{
237 return get_device(&rbd_dev->dev);
238}
239
240static void rbd_put_dev(struct rbd_device *rbd_dev)
241{
242 put_device(&rbd_dev->dev);
243}
602adf40 244
263c6ca0 245static int __rbd_refresh_header(struct rbd_device *rbd_dev);
59c2be1e 246
602adf40
YS
247static int rbd_open(struct block_device *bdev, fmode_t mode)
248{
f0f8cef5 249 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
602adf40 250
dfc5606d
YS
251 rbd_get_dev(rbd_dev);
252
602adf40
YS
253 set_device_ro(bdev, rbd_dev->read_only);
254
255 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
256 return -EROFS;
257
258 return 0;
259}
260
dfc5606d
YS
261static int rbd_release(struct gendisk *disk, fmode_t mode)
262{
263 struct rbd_device *rbd_dev = disk->private_data;
264
265 rbd_put_dev(rbd_dev);
266
267 return 0;
268}
269
602adf40
YS
270static const struct block_device_operations rbd_bd_ops = {
271 .owner = THIS_MODULE,
272 .open = rbd_open,
dfc5606d 273 .release = rbd_release,
602adf40
YS
274};
275
276/*
277 * Initialize an rbd client instance.
43ae4701 278 * We own *ceph_opts.
602adf40 279 */
43ae4701 280static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
59c2be1e 281 struct rbd_options *rbd_opts)
602adf40
YS
282{
283 struct rbd_client *rbdc;
284 int ret = -ENOMEM;
285
286 dout("rbd_client_create\n");
287 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
288 if (!rbdc)
289 goto out_opt;
290
291 kref_init(&rbdc->kref);
292 INIT_LIST_HEAD(&rbdc->node);
293
bc534d86
AE
294 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
295
43ae4701 296 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 297 if (IS_ERR(rbdc->client))
bc534d86 298 goto out_mutex;
43ae4701 299 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
300
301 ret = ceph_open_session(rbdc->client);
302 if (ret < 0)
303 goto out_err;
304
59c2be1e
YS
305 rbdc->rbd_opts = rbd_opts;
306
432b8587 307 spin_lock(&rbd_client_list_lock);
602adf40 308 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 309 spin_unlock(&rbd_client_list_lock);
602adf40 310
bc534d86
AE
311 mutex_unlock(&ctl_mutex);
312
602adf40
YS
313 dout("rbd_client_create created %p\n", rbdc);
314 return rbdc;
315
316out_err:
317 ceph_destroy_client(rbdc->client);
bc534d86
AE
318out_mutex:
319 mutex_unlock(&ctl_mutex);
602adf40
YS
320 kfree(rbdc);
321out_opt:
43ae4701
AE
322 if (ceph_opts)
323 ceph_destroy_options(ceph_opts);
28f259b7 324 return ERR_PTR(ret);
602adf40
YS
325}
326
327/*
328 * Find a ceph client with specific addr and configuration.
329 */
43ae4701 330static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
331{
332 struct rbd_client *client_node;
333
43ae4701 334 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
335 return NULL;
336
337 list_for_each_entry(client_node, &rbd_client_list, node)
43ae4701 338 if (!ceph_compare_options(ceph_opts, client_node->client))
602adf40
YS
339 return client_node;
340 return NULL;
341}
342
59c2be1e
YS
343/*
344 * mount options
345 */
346enum {
347 Opt_notify_timeout,
348 Opt_last_int,
349 /* int args above */
350 Opt_last_string,
351 /* string args above */
352};
353
43ae4701 354static match_table_t rbd_opts_tokens = {
59c2be1e
YS
355 {Opt_notify_timeout, "notify_timeout=%d"},
356 /* int args above */
357 /* string args above */
358 {-1, NULL}
359};
360
361static int parse_rbd_opts_token(char *c, void *private)
362{
43ae4701 363 struct rbd_options *rbd_opts = private;
59c2be1e
YS
364 substring_t argstr[MAX_OPT_ARGS];
365 int token, intval, ret;
366
43ae4701 367 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
368 if (token < 0)
369 return -EINVAL;
370
371 if (token < Opt_last_int) {
372 ret = match_int(&argstr[0], &intval);
373 if (ret < 0) {
374 pr_err("bad mount option arg (not int) "
375 "at '%s'\n", c);
376 return ret;
377 }
378 dout("got int token %d val %d\n", token, intval);
379 } else if (token > Opt_last_int && token < Opt_last_string) {
380 dout("got string token %d val %s\n", token,
381 argstr[0].from);
382 } else {
383 dout("got token %d\n", token);
384 }
385
386 switch (token) {
387 case Opt_notify_timeout:
43ae4701 388 rbd_opts->notify_timeout = intval;
59c2be1e
YS
389 break;
390 default:
391 BUG_ON(token);
392 }
393 return 0;
394}
395
602adf40
YS
396/*
397 * Get a ceph client with specific addr and configuration, if one does
398 * not exist create it.
399 */
5214ecc4
AE
400static struct rbd_client *rbd_get_client(const char *mon_addr,
401 size_t mon_addr_len,
402 char *options)
602adf40
YS
403{
404 struct rbd_client *rbdc;
43ae4701 405 struct ceph_options *ceph_opts;
59c2be1e
YS
406 struct rbd_options *rbd_opts;
407
408 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
409 if (!rbd_opts)
d720bcb0 410 return ERR_PTR(-ENOMEM);
59c2be1e
YS
411
412 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
602adf40 413
43ae4701
AE
414 ceph_opts = ceph_parse_options(options, mon_addr,
415 mon_addr + mon_addr_len,
416 parse_rbd_opts_token, rbd_opts);
417 if (IS_ERR(ceph_opts)) {
d720bcb0 418 kfree(rbd_opts);
43ae4701 419 return ERR_CAST(ceph_opts);
ee57741c 420 }
602adf40 421
432b8587 422 spin_lock(&rbd_client_list_lock);
43ae4701 423 rbdc = __rbd_client_find(ceph_opts);
602adf40 424 if (rbdc) {
602adf40
YS
425 /* using an existing client */
426 kref_get(&rbdc->kref);
432b8587 427 spin_unlock(&rbd_client_list_lock);
e6994d3d 428
43ae4701 429 ceph_destroy_options(ceph_opts);
e6994d3d
AE
430 kfree(rbd_opts);
431
d720bcb0 432 return rbdc;
602adf40 433 }
432b8587 434 spin_unlock(&rbd_client_list_lock);
602adf40 435
43ae4701 436 rbdc = rbd_client_create(ceph_opts, rbd_opts);
d97081b0 437
d720bcb0
AE
438 if (IS_ERR(rbdc))
439 kfree(rbd_opts);
602adf40 440
d720bcb0 441 return rbdc;
602adf40
YS
442}
443
444/*
445 * Destroy ceph client
d23a4b3f 446 *
432b8587 447 * Caller must hold rbd_client_list_lock.
602adf40
YS
448 */
449static void rbd_client_release(struct kref *kref)
450{
451 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
452
453 dout("rbd_release_client %p\n", rbdc);
cd9d9f5d 454 spin_lock(&rbd_client_list_lock);
602adf40 455 list_del(&rbdc->node);
cd9d9f5d 456 spin_unlock(&rbd_client_list_lock);
602adf40
YS
457
458 ceph_destroy_client(rbdc->client);
59c2be1e 459 kfree(rbdc->rbd_opts);
602adf40
YS
460 kfree(rbdc);
461}
462
463/*
464 * Drop reference to ceph client node. If it's not referenced anymore, release
465 * it.
466 */
467static void rbd_put_client(struct rbd_device *rbd_dev)
468{
469 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
470 rbd_dev->rbd_client = NULL;
602adf40
YS
471}
472
1fec7093
YS
473/*
474 * Destroy requests collection
475 */
476static void rbd_coll_release(struct kref *kref)
477{
478 struct rbd_req_coll *coll =
479 container_of(kref, struct rbd_req_coll, kref);
480
481 dout("rbd_coll_release %p\n", coll);
482 kfree(coll);
483}
602adf40
YS
484
485/*
486 * Create a new header structure, translate header format from the on-disk
487 * header.
488 */
489static int rbd_header_from_disk(struct rbd_image_header *header,
490 struct rbd_image_header_ondisk *ondisk,
50f7c4c9 491 u32 allocated_snaps,
602adf40
YS
492 gfp_t gfp_flags)
493{
50f7c4c9 494 u32 i, snap_count;
602adf40 495
21079786 496 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
81e759fb 497 return -ENXIO;
81e759fb 498
00f1f36f 499 snap_count = le32_to_cpu(ondisk->snap_count);
50f7c4c9
XW
500 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
501 / sizeof (*ondisk))
502 return -EINVAL;
602adf40 503 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
f9f9a190 504 snap_count * sizeof(u64),
602adf40
YS
505 gfp_flags);
506 if (!header->snapc)
507 return -ENOMEM;
00f1f36f 508
00f1f36f 509 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
602adf40
YS
510 if (snap_count) {
511 header->snap_names = kmalloc(header->snap_names_len,
f8ad495a 512 gfp_flags);
602adf40
YS
513 if (!header->snap_names)
514 goto err_snapc;
515 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
f8ad495a 516 gfp_flags);
602adf40
YS
517 if (!header->snap_sizes)
518 goto err_names;
519 } else {
520 header->snap_names = NULL;
521 header->snap_sizes = NULL;
522 }
849b4260
AE
523
524 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
525 gfp_flags);
526 if (!header->object_prefix)
527 goto err_sizes;
528
ca1e49a6 529 memcpy(header->object_prefix, ondisk->block_name,
602adf40 530 sizeof(ondisk->block_name));
849b4260 531 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
602adf40
YS
532
533 header->image_size = le64_to_cpu(ondisk->image_size);
534 header->obj_order = ondisk->options.order;
535 header->crypt_type = ondisk->options.crypt_type;
536 header->comp_type = ondisk->options.comp_type;
537
538 atomic_set(&header->snapc->nref, 1);
539 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
540 header->snapc->num_snaps = snap_count;
541 header->total_snaps = snap_count;
542
21079786 543 if (snap_count && allocated_snaps == snap_count) {
602adf40
YS
544 for (i = 0; i < snap_count; i++) {
545 header->snapc->snaps[i] =
546 le64_to_cpu(ondisk->snaps[i].id);
547 header->snap_sizes[i] =
548 le64_to_cpu(ondisk->snaps[i].image_size);
549 }
550
551 /* copy snapshot names */
552 memcpy(header->snap_names, &ondisk->snaps[i],
553 header->snap_names_len);
554 }
555
556 return 0;
557
849b4260
AE
558err_sizes:
559 kfree(header->snap_sizes);
602adf40
YS
560err_names:
561 kfree(header->snap_names);
562err_snapc:
563 kfree(header->snapc);
00f1f36f 564 return -ENOMEM;
602adf40
YS
565}
566
602adf40
YS
567static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
568 u64 *seq, u64 *size)
569{
570 int i;
571 char *p = header->snap_names;
572
00f1f36f
AE
573 for (i = 0; i < header->total_snaps; i++) {
574 if (!strcmp(snap_name, p)) {
602adf40 575
00f1f36f 576 /* Found it. Pass back its id and/or size */
602adf40 577
00f1f36f
AE
578 if (seq)
579 *seq = header->snapc->snaps[i];
580 if (size)
581 *size = header->snap_sizes[i];
582 return i;
583 }
584 p += strlen(p) + 1; /* Skip ahead to the next name */
585 }
586 return -ENOENT;
602adf40
YS
587}
588
0ce1a794 589static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
602adf40 590{
0ce1a794 591 struct rbd_image_header *header = &rbd_dev->header;
602adf40
YS
592 struct ceph_snap_context *snapc = header->snapc;
593 int ret = -ENOENT;
594
0ce1a794 595 down_write(&rbd_dev->header_rwsem);
602adf40 596
0ce1a794 597 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 598 sizeof (RBD_SNAP_HEAD_NAME))) {
602adf40
YS
599 if (header->total_snaps)
600 snapc->seq = header->snap_seq;
601 else
602 snapc->seq = 0;
0ce1a794 603 rbd_dev->snap_id = CEPH_NOSNAP;
e88a36ec 604 rbd_dev->snap_exists = false;
0ce1a794 605 rbd_dev->read_only = 0;
602adf40
YS
606 if (size)
607 *size = header->image_size;
608 } else {
0ce1a794
AE
609 ret = snap_by_name(header, rbd_dev->snap_name,
610 &snapc->seq, size);
602adf40
YS
611 if (ret < 0)
612 goto done;
0ce1a794 613 rbd_dev->snap_id = snapc->seq;
e88a36ec 614 rbd_dev->snap_exists = true;
0ce1a794 615 rbd_dev->read_only = 1;
602adf40
YS
616 }
617
618 ret = 0;
619done:
0ce1a794 620 up_write(&rbd_dev->header_rwsem);
602adf40
YS
621 return ret;
622}
623
624static void rbd_header_free(struct rbd_image_header *header)
625{
849b4260 626 kfree(header->object_prefix);
602adf40 627 kfree(header->snap_sizes);
849b4260 628 kfree(header->snap_names);
d1d25646 629 ceph_put_snap_context(header->snapc);
602adf40
YS
630}
631
632/*
633 * get the actual striped segment name, offset and length
634 */
635static u64 rbd_get_segment(struct rbd_image_header *header,
ca1e49a6 636 const char *object_prefix,
602adf40
YS
637 u64 ofs, u64 len,
638 char *seg_name, u64 *segofs)
639{
640 u64 seg = ofs >> header->obj_order;
641
642 if (seg_name)
643 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
ca1e49a6 644 "%s.%012llx", object_prefix, seg);
602adf40
YS
645
646 ofs = ofs & ((1 << header->obj_order) - 1);
647 len = min_t(u64, len, (1 << header->obj_order) - ofs);
648
649 if (segofs)
650 *segofs = ofs;
651
652 return len;
653}
654
1fec7093
YS
655static int rbd_get_num_segments(struct rbd_image_header *header,
656 u64 ofs, u64 len)
657{
658 u64 start_seg = ofs >> header->obj_order;
659 u64 end_seg = (ofs + len - 1) >> header->obj_order;
660 return end_seg - start_seg + 1;
661}
662
029bcbd8
JD
663/*
664 * returns the size of an object in the image
665 */
666static u64 rbd_obj_bytes(struct rbd_image_header *header)
667{
668 return 1 << header->obj_order;
669}
670
602adf40
YS
671/*
672 * bio helpers
673 */
674
675static void bio_chain_put(struct bio *chain)
676{
677 struct bio *tmp;
678
679 while (chain) {
680 tmp = chain;
681 chain = chain->bi_next;
682 bio_put(tmp);
683 }
684}
685
686/*
687 * zeros a bio chain, starting at specific offset
688 */
689static void zero_bio_chain(struct bio *chain, int start_ofs)
690{
691 struct bio_vec *bv;
692 unsigned long flags;
693 void *buf;
694 int i;
695 int pos = 0;
696
697 while (chain) {
698 bio_for_each_segment(bv, chain, i) {
699 if (pos + bv->bv_len > start_ofs) {
700 int remainder = max(start_ofs - pos, 0);
701 buf = bvec_kmap_irq(bv, &flags);
702 memset(buf + remainder, 0,
703 bv->bv_len - remainder);
85b5aaa6 704 bvec_kunmap_irq(buf, &flags);
602adf40
YS
705 }
706 pos += bv->bv_len;
707 }
708
709 chain = chain->bi_next;
710 }
711}
712
713/*
714 * bio_chain_clone - clone a chain of bios up to a certain length.
715 * might return a bio_pair that will need to be released.
716 */
717static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
718 struct bio_pair **bp,
719 int len, gfp_t gfpmask)
720{
721 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
722 int total = 0;
723
724 if (*bp) {
725 bio_pair_release(*bp);
726 *bp = NULL;
727 }
728
729 while (old_chain && (total < len)) {
730 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
731 if (!tmp)
732 goto err_out;
733
734 if (total + old_chain->bi_size > len) {
735 struct bio_pair *bp;
736
737 /*
738 * this split can only happen with a single paged bio,
739 * split_bio will BUG_ON if this is not the case
740 */
741 dout("bio_chain_clone split! total=%d remaining=%d"
742 "bi_size=%d\n",
743 (int)total, (int)len-total,
744 (int)old_chain->bi_size);
745
746 /* split the bio. We'll release it either in the next
747 call, or it will have to be released outside */
593a9e7b 748 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
602adf40
YS
749 if (!bp)
750 goto err_out;
751
752 __bio_clone(tmp, &bp->bio1);
753
754 *next = &bp->bio2;
755 } else {
756 __bio_clone(tmp, old_chain);
757 *next = old_chain->bi_next;
758 }
759
760 tmp->bi_bdev = NULL;
761 gfpmask &= ~__GFP_WAIT;
762 tmp->bi_next = NULL;
763
764 if (!new_chain) {
765 new_chain = tail = tmp;
766 } else {
767 tail->bi_next = tmp;
768 tail = tmp;
769 }
770 old_chain = old_chain->bi_next;
771
772 total += tmp->bi_size;
773 }
774
775 BUG_ON(total < len);
776
777 if (tail)
778 tail->bi_next = NULL;
779
780 *old = old_chain;
781
782 return new_chain;
783
784err_out:
785 dout("bio_chain_clone with err\n");
786 bio_chain_put(new_chain);
787 return NULL;
788}
789
790/*
791 * helpers for osd request op vectors.
792 */
793static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
794 int num_ops,
795 int opcode,
796 u32 payload_len)
797{
798 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
799 GFP_NOIO);
800 if (!*ops)
801 return -ENOMEM;
802 (*ops)[0].op = opcode;
803 /*
804 * op extent offset and length will be set later on
805 * in calc_raw_layout()
806 */
807 (*ops)[0].payload_len = payload_len;
808 return 0;
809}
810
811static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
812{
813 kfree(ops);
814}
815
1fec7093
YS
816static void rbd_coll_end_req_index(struct request *rq,
817 struct rbd_req_coll *coll,
818 int index,
819 int ret, u64 len)
820{
821 struct request_queue *q;
822 int min, max, i;
823
824 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
825 coll, index, ret, len);
826
827 if (!rq)
828 return;
829
830 if (!coll) {
831 blk_end_request(rq, ret, len);
832 return;
833 }
834
835 q = rq->q;
836
837 spin_lock_irq(q->queue_lock);
838 coll->status[index].done = 1;
839 coll->status[index].rc = ret;
840 coll->status[index].bytes = len;
841 max = min = coll->num_done;
842 while (max < coll->total && coll->status[max].done)
843 max++;
844
845 for (i = min; i<max; i++) {
846 __blk_end_request(rq, coll->status[i].rc,
847 coll->status[i].bytes);
848 coll->num_done++;
849 kref_put(&coll->kref, rbd_coll_release);
850 }
851 spin_unlock_irq(q->queue_lock);
852}
853
854static void rbd_coll_end_req(struct rbd_request *req,
855 int ret, u64 len)
856{
857 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
858}
859
602adf40
YS
860/*
861 * Send ceph osd request
862 */
863static int rbd_do_request(struct request *rq,
0ce1a794 864 struct rbd_device *rbd_dev,
602adf40
YS
865 struct ceph_snap_context *snapc,
866 u64 snapid,
aded07ea 867 const char *object_name, u64 ofs, u64 len,
602adf40
YS
868 struct bio *bio,
869 struct page **pages,
870 int num_pages,
871 int flags,
872 struct ceph_osd_req_op *ops,
1fec7093
YS
873 struct rbd_req_coll *coll,
874 int coll_index,
602adf40 875 void (*rbd_cb)(struct ceph_osd_request *req,
59c2be1e
YS
876 struct ceph_msg *msg),
877 struct ceph_osd_request **linger_req,
878 u64 *ver)
602adf40
YS
879{
880 struct ceph_osd_request *req;
881 struct ceph_file_layout *layout;
882 int ret;
883 u64 bno;
884 struct timespec mtime = CURRENT_TIME;
885 struct rbd_request *req_data;
886 struct ceph_osd_request_head *reqhead;
1dbb4399 887 struct ceph_osd_client *osdc;
602adf40 888
602adf40 889 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
1fec7093
YS
890 if (!req_data) {
891 if (coll)
892 rbd_coll_end_req_index(rq, coll, coll_index,
893 -ENOMEM, len);
894 return -ENOMEM;
895 }
896
897 if (coll) {
898 req_data->coll = coll;
899 req_data->coll_index = coll_index;
900 }
602adf40 901
aded07ea
AE
902 dout("rbd_do_request object_name=%s ofs=%lld len=%lld\n",
903 object_name, len, ofs);
602adf40 904
0ce1a794 905 osdc = &rbd_dev->rbd_client->client->osdc;
1dbb4399
AE
906 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
907 false, GFP_NOIO, pages, bio);
4ad12621 908 if (!req) {
4ad12621 909 ret = -ENOMEM;
602adf40
YS
910 goto done_pages;
911 }
912
913 req->r_callback = rbd_cb;
914
915 req_data->rq = rq;
916 req_data->bio = bio;
917 req_data->pages = pages;
918 req_data->len = len;
919
920 req->r_priv = req_data;
921
922 reqhead = req->r_request->front.iov_base;
923 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
924
aded07ea 925 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
602adf40
YS
926 req->r_oid_len = strlen(req->r_oid);
927
928 layout = &req->r_file_layout;
929 memset(layout, 0, sizeof(*layout));
930 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931 layout->fl_stripe_count = cpu_to_le32(1);
932 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
0ce1a794 933 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
1dbb4399
AE
934 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
935 req, ops);
602adf40
YS
936
937 ceph_osdc_build_request(req, ofs, &len,
938 ops,
939 snapc,
940 &mtime,
941 req->r_oid, req->r_oid_len);
602adf40 942
59c2be1e 943 if (linger_req) {
1dbb4399 944 ceph_osdc_set_request_linger(osdc, req);
59c2be1e
YS
945 *linger_req = req;
946 }
947
1dbb4399 948 ret = ceph_osdc_start_request(osdc, req, false);
602adf40
YS
949 if (ret < 0)
950 goto done_err;
951
952 if (!rbd_cb) {
1dbb4399 953 ret = ceph_osdc_wait_request(osdc, req);
59c2be1e
YS
954 if (ver)
955 *ver = le64_to_cpu(req->r_reassert_version.version);
1fec7093
YS
956 dout("reassert_ver=%lld\n",
957 le64_to_cpu(req->r_reassert_version.version));
602adf40
YS
958 ceph_osdc_put_request(req);
959 }
960 return ret;
961
962done_err:
963 bio_chain_put(req_data->bio);
964 ceph_osdc_put_request(req);
965done_pages:
1fec7093 966 rbd_coll_end_req(req_data, ret, len);
602adf40 967 kfree(req_data);
602adf40
YS
968 return ret;
969}
970
971/*
972 * Ceph osd op callback
973 */
974static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
975{
976 struct rbd_request *req_data = req->r_priv;
977 struct ceph_osd_reply_head *replyhead;
978 struct ceph_osd_op *op;
979 __s32 rc;
980 u64 bytes;
981 int read_op;
982
983 /* parse reply */
984 replyhead = msg->front.iov_base;
985 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
986 op = (void *)(replyhead + 1);
987 rc = le32_to_cpu(replyhead->result);
988 bytes = le64_to_cpu(op->extent.length);
895cfcc8 989 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
602adf40
YS
990
991 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
992
993 if (rc == -ENOENT && read_op) {
994 zero_bio_chain(req_data->bio, 0);
995 rc = 0;
996 } else if (rc == 0 && read_op && bytes < req_data->len) {
997 zero_bio_chain(req_data->bio, bytes);
998 bytes = req_data->len;
999 }
1000
1fec7093 1001 rbd_coll_end_req(req_data, rc, bytes);
602adf40
YS
1002
1003 if (req_data->bio)
1004 bio_chain_put(req_data->bio);
1005
1006 ceph_osdc_put_request(req);
1007 kfree(req_data);
1008}
1009
59c2be1e
YS
1010static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1011{
1012 ceph_osdc_put_request(req);
1013}
1014
602adf40
YS
1015/*
1016 * Do a synchronous ceph osd operation
1017 */
0ce1a794 1018static int rbd_req_sync_op(struct rbd_device *rbd_dev,
602adf40
YS
1019 struct ceph_snap_context *snapc,
1020 u64 snapid,
1021 int opcode,
1022 int flags,
1023 struct ceph_osd_req_op *orig_ops,
aded07ea 1024 const char *object_name,
602adf40 1025 u64 ofs, u64 len,
59c2be1e
YS
1026 char *buf,
1027 struct ceph_osd_request **linger_req,
1028 u64 *ver)
602adf40
YS
1029{
1030 int ret;
1031 struct page **pages;
1032 int num_pages;
1033 struct ceph_osd_req_op *ops = orig_ops;
1034 u32 payload_len;
1035
1036 num_pages = calc_pages_for(ofs , len);
1037 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
b8d0638a
DC
1038 if (IS_ERR(pages))
1039 return PTR_ERR(pages);
602adf40
YS
1040
1041 if (!orig_ops) {
1042 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1043 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1044 if (ret < 0)
1045 goto done;
1046
1047 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1048 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1049 if (ret < 0)
1050 goto done_ops;
1051 }
1052 }
1053
0ce1a794 1054 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
aded07ea 1055 object_name, ofs, len, NULL,
602adf40
YS
1056 pages, num_pages,
1057 flags,
1058 ops,
1fec7093 1059 NULL, 0,
59c2be1e
YS
1060 NULL,
1061 linger_req, ver);
602adf40
YS
1062 if (ret < 0)
1063 goto done_ops;
1064
1065 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1066 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1067
1068done_ops:
1069 if (!orig_ops)
1070 rbd_destroy_ops(ops);
1071done:
1072 ceph_release_page_vector(pages, num_pages);
1073 return ret;
1074}
1075
1076/*
1077 * Do an asynchronous ceph osd operation
1078 */
1079static int rbd_do_op(struct request *rq,
0ce1a794 1080 struct rbd_device *rbd_dev,
602adf40
YS
1081 struct ceph_snap_context *snapc,
1082 u64 snapid,
d1f57ea6 1083 int opcode, int flags,
602adf40 1084 u64 ofs, u64 len,
1fec7093
YS
1085 struct bio *bio,
1086 struct rbd_req_coll *coll,
1087 int coll_index)
602adf40
YS
1088{
1089 char *seg_name;
1090 u64 seg_ofs;
1091 u64 seg_len;
1092 int ret;
1093 struct ceph_osd_req_op *ops;
1094 u32 payload_len;
1095
1096 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1097 if (!seg_name)
1098 return -ENOMEM;
1099
1100 seg_len = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1101 rbd_dev->header.object_prefix,
602adf40
YS
1102 ofs, len,
1103 seg_name, &seg_ofs);
602adf40
YS
1104
1105 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1106
1107 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1108 if (ret < 0)
1109 goto done;
1110
1111 /* we've taken care of segment sizes earlier when we
1112 cloned the bios. We should never have a segment
1113 truncated at this point */
1114 BUG_ON(seg_len < len);
1115
1116 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1117 seg_name, seg_ofs, seg_len,
1118 bio,
1119 NULL, 0,
1120 flags,
1121 ops,
1fec7093 1122 coll, coll_index,
59c2be1e 1123 rbd_req_cb, 0, NULL);
11f77002
SW
1124
1125 rbd_destroy_ops(ops);
602adf40
YS
1126done:
1127 kfree(seg_name);
1128 return ret;
1129}
1130
1131/*
1132 * Request async osd write
1133 */
1134static int rbd_req_write(struct request *rq,
1135 struct rbd_device *rbd_dev,
1136 struct ceph_snap_context *snapc,
1137 u64 ofs, u64 len,
1fec7093
YS
1138 struct bio *bio,
1139 struct rbd_req_coll *coll,
1140 int coll_index)
602adf40
YS
1141{
1142 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1143 CEPH_OSD_OP_WRITE,
1144 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1fec7093 1145 ofs, len, bio, coll, coll_index);
602adf40
YS
1146}
1147
1148/*
1149 * Request async osd read
1150 */
1151static int rbd_req_read(struct request *rq,
1152 struct rbd_device *rbd_dev,
1153 u64 snapid,
1154 u64 ofs, u64 len,
1fec7093
YS
1155 struct bio *bio,
1156 struct rbd_req_coll *coll,
1157 int coll_index)
602adf40
YS
1158{
1159 return rbd_do_op(rq, rbd_dev, NULL,
b06e6a6b 1160 snapid,
602adf40
YS
1161 CEPH_OSD_OP_READ,
1162 CEPH_OSD_FLAG_READ,
1fec7093 1163 ofs, len, bio, coll, coll_index);
602adf40
YS
1164}
1165
1166/*
1167 * Request sync osd read
1168 */
0ce1a794 1169static int rbd_req_sync_read(struct rbd_device *rbd_dev,
602adf40
YS
1170 struct ceph_snap_context *snapc,
1171 u64 snapid,
aded07ea 1172 const char *object_name,
602adf40 1173 u64 ofs, u64 len,
59c2be1e
YS
1174 char *buf,
1175 u64 *ver)
602adf40 1176{
0ce1a794 1177 return rbd_req_sync_op(rbd_dev, NULL,
b06e6a6b 1178 snapid,
602adf40
YS
1179 CEPH_OSD_OP_READ,
1180 CEPH_OSD_FLAG_READ,
1181 NULL,
d1f57ea6 1182 object_name, ofs, len, buf, NULL, ver);
602adf40
YS
1183}
1184
1185/*
59c2be1e
YS
1186 * Request sync osd watch
1187 */
0ce1a794 1188static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
59c2be1e
YS
1189 u64 ver,
1190 u64 notify_id,
aded07ea 1191 const char *object_name)
59c2be1e
YS
1192{
1193 struct ceph_osd_req_op *ops;
11f77002
SW
1194 int ret;
1195
1196 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
59c2be1e
YS
1197 if (ret < 0)
1198 return ret;
1199
a71b891b 1200 ops[0].watch.ver = cpu_to_le64(ver);
59c2be1e
YS
1201 ops[0].watch.cookie = notify_id;
1202 ops[0].watch.flag = 0;
1203
0ce1a794 1204 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
aded07ea 1205 object_name, 0, 0, NULL,
ad4f232f 1206 NULL, 0,
59c2be1e
YS
1207 CEPH_OSD_FLAG_READ,
1208 ops,
1fec7093 1209 NULL, 0,
59c2be1e
YS
1210 rbd_simple_req_cb, 0, NULL);
1211
1212 rbd_destroy_ops(ops);
1213 return ret;
1214}
1215
1216static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1217{
0ce1a794 1218 struct rbd_device *rbd_dev = (struct rbd_device *)data;
a71b891b 1219 u64 hver;
13143d2d
SW
1220 int rc;
1221
0ce1a794 1222 if (!rbd_dev)
59c2be1e
YS
1223 return;
1224
0bed54dc
AE
1225 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n",
1226 rbd_dev->header_name, notify_id, (int) opcode);
59c2be1e 1227 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
0ce1a794 1228 rc = __rbd_refresh_header(rbd_dev);
a71b891b 1229 hver = rbd_dev->header.obj_version;
59c2be1e 1230 mutex_unlock(&ctl_mutex);
13143d2d 1231 if (rc)
f0f8cef5 1232 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
0ce1a794 1233 " update snaps: %d\n", rbd_dev->major, rc);
59c2be1e 1234
a71b891b 1235 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id, rbd_dev->header_name);
59c2be1e
YS
1236}
1237
1238/*
1239 * Request sync osd watch
1240 */
0ce1a794 1241static int rbd_req_sync_watch(struct rbd_device *rbd_dev,
aded07ea 1242 const char *object_name,
59c2be1e
YS
1243 u64 ver)
1244{
1245 struct ceph_osd_req_op *ops;
0ce1a794 1246 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1247
1248 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1249 if (ret < 0)
1250 return ret;
1251
1252 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
0ce1a794 1253 (void *)rbd_dev, &rbd_dev->watch_event);
59c2be1e
YS
1254 if (ret < 0)
1255 goto fail;
1256
1257 ops[0].watch.ver = cpu_to_le64(ver);
0ce1a794 1258 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
59c2be1e
YS
1259 ops[0].watch.flag = 1;
1260
0ce1a794 1261 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e
YS
1262 CEPH_NOSNAP,
1263 0,
1264 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1265 ops,
d1f57ea6 1266 object_name, 0, 0, NULL,
0ce1a794 1267 &rbd_dev->watch_request, NULL);
59c2be1e
YS
1268
1269 if (ret < 0)
1270 goto fail_event;
1271
1272 rbd_destroy_ops(ops);
1273 return 0;
1274
1275fail_event:
0ce1a794
AE
1276 ceph_osdc_cancel_event(rbd_dev->watch_event);
1277 rbd_dev->watch_event = NULL;
59c2be1e
YS
1278fail:
1279 rbd_destroy_ops(ops);
1280 return ret;
1281}
1282
79e3057c
YS
1283/*
1284 * Request sync osd unwatch
1285 */
0ce1a794 1286static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
aded07ea 1287 const char *object_name)
79e3057c
YS
1288{
1289 struct ceph_osd_req_op *ops;
1290
1291 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1292 if (ret < 0)
1293 return ret;
1294
1295 ops[0].watch.ver = 0;
0ce1a794 1296 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
79e3057c
YS
1297 ops[0].watch.flag = 0;
1298
0ce1a794 1299 ret = rbd_req_sync_op(rbd_dev, NULL,
79e3057c
YS
1300 CEPH_NOSNAP,
1301 0,
1302 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1303 ops,
d1f57ea6 1304 object_name, 0, 0, NULL, NULL, NULL);
79e3057c
YS
1305
1306 rbd_destroy_ops(ops);
0ce1a794
AE
1307 ceph_osdc_cancel_event(rbd_dev->watch_event);
1308 rbd_dev->watch_event = NULL;
79e3057c
YS
1309 return ret;
1310}
1311
59c2be1e 1312struct rbd_notify_info {
0ce1a794 1313 struct rbd_device *rbd_dev;
59c2be1e
YS
1314};
1315
1316static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1317{
0ce1a794
AE
1318 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1319 if (!rbd_dev)
59c2be1e
YS
1320 return;
1321
0ce1a794 1322 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n",
0bed54dc 1323 rbd_dev->header_name,
59c2be1e
YS
1324 notify_id, (int)opcode);
1325}
1326
1327/*
1328 * Request sync osd notify
1329 */
0ce1a794 1330static int rbd_req_sync_notify(struct rbd_device *rbd_dev,
aded07ea 1331 const char *object_name)
59c2be1e
YS
1332{
1333 struct ceph_osd_req_op *ops;
0ce1a794 1334 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
59c2be1e
YS
1335 struct ceph_osd_event *event;
1336 struct rbd_notify_info info;
1337 int payload_len = sizeof(u32) + sizeof(u32);
1338 int ret;
1339
1340 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1341 if (ret < 0)
1342 return ret;
1343
0ce1a794 1344 info.rbd_dev = rbd_dev;
59c2be1e
YS
1345
1346 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1347 (void *)&info, &event);
1348 if (ret < 0)
1349 goto fail;
1350
1351 ops[0].watch.ver = 1;
1352 ops[0].watch.flag = 1;
1353 ops[0].watch.cookie = event->cookie;
1354 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1355 ops[0].watch.timeout = 12;
1356
0ce1a794 1357 ret = rbd_req_sync_op(rbd_dev, NULL,
59c2be1e
YS
1358 CEPH_NOSNAP,
1359 0,
1360 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1361 ops,
d1f57ea6 1362 object_name, 0, 0, NULL, NULL, NULL);
59c2be1e
YS
1363 if (ret < 0)
1364 goto fail_event;
1365
1366 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1367 dout("ceph_osdc_wait_event returned %d\n", ret);
1368 rbd_destroy_ops(ops);
1369 return 0;
1370
1371fail_event:
1372 ceph_osdc_cancel_event(event);
1373fail:
1374 rbd_destroy_ops(ops);
1375 return ret;
1376}
1377
602adf40
YS
1378/*
1379 * Request sync osd read
1380 */
0ce1a794 1381static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
aded07ea
AE
1382 const char *object_name,
1383 const char *class_name,
1384 const char *method_name,
602adf40 1385 const char *data,
59c2be1e
YS
1386 int len,
1387 u64 *ver)
602adf40
YS
1388{
1389 struct ceph_osd_req_op *ops;
aded07ea
AE
1390 int class_name_len = strlen(class_name);
1391 int method_name_len = strlen(method_name);
602adf40 1392 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
aded07ea 1393 class_name_len + method_name_len + len);
602adf40
YS
1394 if (ret < 0)
1395 return ret;
1396
aded07ea
AE
1397 ops[0].cls.class_name = class_name;
1398 ops[0].cls.class_len = (__u8) class_name_len;
1399 ops[0].cls.method_name = method_name;
1400 ops[0].cls.method_len = (__u8) method_name_len;
602adf40
YS
1401 ops[0].cls.argc = 0;
1402 ops[0].cls.indata = data;
1403 ops[0].cls.indata_len = len;
1404
0ce1a794 1405 ret = rbd_req_sync_op(rbd_dev, NULL,
602adf40
YS
1406 CEPH_NOSNAP,
1407 0,
1408 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1409 ops,
d1f57ea6 1410 object_name, 0, 0, NULL, NULL, ver);
602adf40
YS
1411
1412 rbd_destroy_ops(ops);
1413
1414 dout("cls_exec returned %d\n", ret);
1415 return ret;
1416}
1417
1fec7093
YS
1418static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1419{
1420 struct rbd_req_coll *coll =
1421 kzalloc(sizeof(struct rbd_req_coll) +
1422 sizeof(struct rbd_req_status) * num_reqs,
1423 GFP_ATOMIC);
1424
1425 if (!coll)
1426 return NULL;
1427 coll->total = num_reqs;
1428 kref_init(&coll->kref);
1429 return coll;
1430}
1431
602adf40
YS
1432/*
1433 * block device queue callback
1434 */
1435static void rbd_rq_fn(struct request_queue *q)
1436{
1437 struct rbd_device *rbd_dev = q->queuedata;
1438 struct request *rq;
1439 struct bio_pair *bp = NULL;
1440
00f1f36f 1441 while ((rq = blk_fetch_request(q))) {
602adf40
YS
1442 struct bio *bio;
1443 struct bio *rq_bio, *next_bio = NULL;
1444 bool do_write;
1445 int size, op_size = 0;
1446 u64 ofs;
1fec7093
YS
1447 int num_segs, cur_seg = 0;
1448 struct rbd_req_coll *coll;
d1d25646 1449 struct ceph_snap_context *snapc;
602adf40
YS
1450
1451 /* peek at request from block layer */
1452 if (!rq)
1453 break;
1454
1455 dout("fetched request\n");
1456
1457 /* filter out block requests we don't understand */
1458 if ((rq->cmd_type != REQ_TYPE_FS)) {
1459 __blk_end_request_all(rq, 0);
00f1f36f 1460 continue;
602adf40
YS
1461 }
1462
1463 /* deduce our operation (read, write) */
1464 do_write = (rq_data_dir(rq) == WRITE);
1465
1466 size = blk_rq_bytes(rq);
593a9e7b 1467 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
602adf40
YS
1468 rq_bio = rq->bio;
1469 if (do_write && rbd_dev->read_only) {
1470 __blk_end_request_all(rq, -EROFS);
00f1f36f 1471 continue;
602adf40
YS
1472 }
1473
1474 spin_unlock_irq(q->queue_lock);
1475
d1d25646 1476 down_read(&rbd_dev->header_rwsem);
e88a36ec 1477
d1d25646 1478 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
e88a36ec 1479 up_read(&rbd_dev->header_rwsem);
d1d25646
JD
1480 dout("request for non-existent snapshot");
1481 spin_lock_irq(q->queue_lock);
1482 __blk_end_request_all(rq, -ENXIO);
1483 continue;
e88a36ec
JD
1484 }
1485
d1d25646
JD
1486 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1487
1488 up_read(&rbd_dev->header_rwsem);
1489
602adf40
YS
1490 dout("%s 0x%x bytes at 0x%llx\n",
1491 do_write ? "write" : "read",
593a9e7b 1492 size, blk_rq_pos(rq) * SECTOR_SIZE);
602adf40 1493
1fec7093
YS
1494 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1495 coll = rbd_alloc_coll(num_segs);
1496 if (!coll) {
1497 spin_lock_irq(q->queue_lock);
1498 __blk_end_request_all(rq, -ENOMEM);
d1d25646 1499 ceph_put_snap_context(snapc);
00f1f36f 1500 continue;
1fec7093
YS
1501 }
1502
602adf40
YS
1503 do {
1504 /* a bio clone to be passed down to OSD req */
1505 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1506 op_size = rbd_get_segment(&rbd_dev->header,
ca1e49a6 1507 rbd_dev->header.object_prefix,
602adf40
YS
1508 ofs, size,
1509 NULL, NULL);
1fec7093 1510 kref_get(&coll->kref);
602adf40
YS
1511 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1512 op_size, GFP_ATOMIC);
1513 if (!bio) {
1fec7093
YS
1514 rbd_coll_end_req_index(rq, coll, cur_seg,
1515 -ENOMEM, op_size);
1516 goto next_seg;
602adf40
YS
1517 }
1518
1fec7093 1519
602adf40
YS
1520 /* init OSD command: write or read */
1521 if (do_write)
1522 rbd_req_write(rq, rbd_dev,
d1d25646 1523 snapc,
602adf40 1524 ofs,
1fec7093
YS
1525 op_size, bio,
1526 coll, cur_seg);
602adf40
YS
1527 else
1528 rbd_req_read(rq, rbd_dev,
77dfe99f 1529 rbd_dev->snap_id,
602adf40 1530 ofs,
1fec7093
YS
1531 op_size, bio,
1532 coll, cur_seg);
602adf40 1533
1fec7093 1534next_seg:
602adf40
YS
1535 size -= op_size;
1536 ofs += op_size;
1537
1fec7093 1538 cur_seg++;
602adf40
YS
1539 rq_bio = next_bio;
1540 } while (size > 0);
1fec7093 1541 kref_put(&coll->kref, rbd_coll_release);
602adf40
YS
1542
1543 if (bp)
1544 bio_pair_release(bp);
602adf40 1545 spin_lock_irq(q->queue_lock);
d1d25646
JD
1546
1547 ceph_put_snap_context(snapc);
602adf40
YS
1548 }
1549}
1550
1551/*
1552 * a queue callback. Makes sure that we don't create a bio that spans across
1553 * multiple osd objects. One exception would be with a single page bios,
1554 * which we handle later at bio_chain_clone
1555 */
1556static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1557 struct bio_vec *bvec)
1558{
1559 struct rbd_device *rbd_dev = q->queuedata;
593a9e7b
AE
1560 unsigned int chunk_sectors;
1561 sector_t sector;
1562 unsigned int bio_sectors;
602adf40
YS
1563 int max;
1564
593a9e7b
AE
1565 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1566 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1567 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1568
602adf40 1569 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
593a9e7b 1570 + bio_sectors)) << SECTOR_SHIFT;
602adf40
YS
1571 if (max < 0)
1572 max = 0; /* bio_add cannot handle a negative return */
1573 if (max <= bvec->bv_len && bio_sectors == 0)
1574 return bvec->bv_len;
1575 return max;
1576}
1577
1578static void rbd_free_disk(struct rbd_device *rbd_dev)
1579{
1580 struct gendisk *disk = rbd_dev->disk;
1581
1582 if (!disk)
1583 return;
1584
1585 rbd_header_free(&rbd_dev->header);
1586
1587 if (disk->flags & GENHD_FL_UP)
1588 del_gendisk(disk);
1589 if (disk->queue)
1590 blk_cleanup_queue(disk->queue);
1591 put_disk(disk);
1592}
1593
1594/*
1595 * reload the ondisk the header
1596 */
1597static int rbd_read_header(struct rbd_device *rbd_dev,
1598 struct rbd_image_header *header)
1599{
1600 ssize_t rc;
1601 struct rbd_image_header_ondisk *dh;
50f7c4c9 1602 u32 snap_count = 0;
59c2be1e 1603 u64 ver;
00f1f36f 1604 size_t len;
602adf40 1605
00f1f36f
AE
1606 /*
1607 * First reads the fixed-size header to determine the number
1608 * of snapshots, then re-reads it, along with all snapshot
1609 * records as well as their stored names.
1610 */
1611 len = sizeof (*dh);
602adf40 1612 while (1) {
602adf40
YS
1613 dh = kmalloc(len, GFP_KERNEL);
1614 if (!dh)
1615 return -ENOMEM;
1616
1617 rc = rbd_req_sync_read(rbd_dev,
1618 NULL, CEPH_NOSNAP,
0bed54dc 1619 rbd_dev->header_name,
602adf40 1620 0, len,
59c2be1e 1621 (char *)dh, &ver);
602adf40
YS
1622 if (rc < 0)
1623 goto out_dh;
1624
1625 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
81e759fb 1626 if (rc < 0) {
00f1f36f 1627 if (rc == -ENXIO)
81e759fb 1628 pr_warning("unrecognized header format"
0bed54dc
AE
1629 " for image %s\n",
1630 rbd_dev->image_name);
602adf40 1631 goto out_dh;
81e759fb 1632 }
602adf40 1633
00f1f36f
AE
1634 if (snap_count == header->total_snaps)
1635 break;
1636
1637 snap_count = header->total_snaps;
1638 len = sizeof (*dh) +
1639 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1640 header->snap_names_len;
1641
1642 rbd_header_free(header);
1643 kfree(dh);
602adf40 1644 }
59c2be1e 1645 header->obj_version = ver;
602adf40
YS
1646
1647out_dh:
1648 kfree(dh);
1649 return rc;
1650}
1651
1652/*
1653 * create a snapshot
1654 */
0ce1a794 1655static int rbd_header_add_snap(struct rbd_device *rbd_dev,
602adf40
YS
1656 const char *snap_name,
1657 gfp_t gfp_flags)
1658{
1659 int name_len = strlen(snap_name);
1660 u64 new_snapid;
1661 int ret;
916d4d67 1662 void *data, *p, *e;
59c2be1e 1663 u64 ver;
1dbb4399 1664 struct ceph_mon_client *monc;
602adf40
YS
1665
1666 /* we should create a snapshot only if we're pointing at the head */
0ce1a794 1667 if (rbd_dev->snap_id != CEPH_NOSNAP)
602adf40
YS
1668 return -EINVAL;
1669
0ce1a794
AE
1670 monc = &rbd_dev->rbd_client->client->monc;
1671 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
602adf40
YS
1672 dout("created snapid=%lld\n", new_snapid);
1673 if (ret < 0)
1674 return ret;
1675
1676 data = kmalloc(name_len + 16, gfp_flags);
1677 if (!data)
1678 return -ENOMEM;
1679
916d4d67
SW
1680 p = data;
1681 e = data + name_len + 16;
602adf40 1682
916d4d67
SW
1683 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1684 ceph_encode_64_safe(&p, e, new_snapid, bad);
602adf40 1685
0bed54dc 1686 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
0ce1a794 1687 "rbd", "snap_add",
916d4d67 1688 data, p - data, &ver);
602adf40 1689
916d4d67 1690 kfree(data);
602adf40
YS
1691
1692 if (ret < 0)
1693 return ret;
1694
0ce1a794
AE
1695 down_write(&rbd_dev->header_rwsem);
1696 rbd_dev->header.snapc->seq = new_snapid;
1697 up_write(&rbd_dev->header_rwsem);
602adf40
YS
1698
1699 return 0;
1700bad:
1701 return -ERANGE;
1702}
1703
dfc5606d
YS
1704static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1705{
1706 struct rbd_snap *snap;
1707
1708 while (!list_empty(&rbd_dev->snaps)) {
1709 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1710 __rbd_remove_snap_dev(rbd_dev, snap);
1711 }
1712}
1713
602adf40
YS
1714/*
1715 * only read the first part of the ondisk header, without the snaps info
1716 */
263c6ca0 1717static int __rbd_refresh_header(struct rbd_device *rbd_dev)
602adf40
YS
1718{
1719 int ret;
1720 struct rbd_image_header h;
602adf40
YS
1721
1722 ret = rbd_read_header(rbd_dev, &h);
1723 if (ret < 0)
1724 return ret;
1725
a51aa0c0
JD
1726 down_write(&rbd_dev->header_rwsem);
1727
9db4b3e3 1728 /* resized? */
474ef7ce
JD
1729 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1730 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1731
1732 dout("setting size to %llu sectors", (unsigned long long) size);
1733 set_capacity(rbd_dev->disk, size);
1734 }
9db4b3e3 1735
849b4260 1736 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 1737 kfree(rbd_dev->header.snap_sizes);
849b4260 1738 kfree(rbd_dev->header.snap_names);
d1d25646
JD
1739 /* osd requests may still refer to snapc */
1740 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 1741
a71b891b 1742 rbd_dev->header.obj_version = h.obj_version;
93a24e08 1743 rbd_dev->header.image_size = h.image_size;
602adf40
YS
1744 rbd_dev->header.total_snaps = h.total_snaps;
1745 rbd_dev->header.snapc = h.snapc;
1746 rbd_dev->header.snap_names = h.snap_names;
dfc5606d 1747 rbd_dev->header.snap_names_len = h.snap_names_len;
602adf40 1748 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260
AE
1749 /* Free the extra copy of the object prefix */
1750 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1751 kfree(h.object_prefix);
1752
dfc5606d
YS
1753 ret = __rbd_init_snaps_header(rbd_dev);
1754
c666601a 1755 up_write(&rbd_dev->header_rwsem);
602adf40 1756
dfc5606d 1757 return ret;
602adf40
YS
1758}
1759
1760static int rbd_init_disk(struct rbd_device *rbd_dev)
1761{
1762 struct gendisk *disk;
1763 struct request_queue *q;
1764 int rc;
593a9e7b 1765 u64 segment_size;
602adf40
YS
1766 u64 total_size = 0;
1767
1768 /* contact OSD, request size info about the object being mapped */
1769 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1770 if (rc)
1771 return rc;
1772
dfc5606d
YS
1773 /* no need to lock here, as rbd_dev is not registered yet */
1774 rc = __rbd_init_snaps_header(rbd_dev);
1775 if (rc)
1776 return rc;
1777
cc9d734c 1778 rc = rbd_header_set_snap(rbd_dev, &total_size);
602adf40
YS
1779 if (rc)
1780 return rc;
1781
1782 /* create gendisk info */
1783 rc = -ENOMEM;
1784 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1785 if (!disk)
1786 goto out;
1787
f0f8cef5 1788 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
aedfec59 1789 rbd_dev->id);
602adf40
YS
1790 disk->major = rbd_dev->major;
1791 disk->first_minor = 0;
1792 disk->fops = &rbd_bd_ops;
1793 disk->private_data = rbd_dev;
1794
1795 /* init rq */
1796 rc = -ENOMEM;
1797 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1798 if (!q)
1799 goto out_disk;
029bcbd8 1800
593a9e7b
AE
1801 /* We use the default size, but let's be explicit about it. */
1802 blk_queue_physical_block_size(q, SECTOR_SIZE);
1803
029bcbd8 1804 /* set io sizes to object size */
593a9e7b
AE
1805 segment_size = rbd_obj_bytes(&rbd_dev->header);
1806 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1807 blk_queue_max_segment_size(q, segment_size);
1808 blk_queue_io_min(q, segment_size);
1809 blk_queue_io_opt(q, segment_size);
029bcbd8 1810
602adf40
YS
1811 blk_queue_merge_bvec(q, rbd_merge_bvec);
1812 disk->queue = q;
1813
1814 q->queuedata = rbd_dev;
1815
1816 rbd_dev->disk = disk;
1817 rbd_dev->q = q;
1818
1819 /* finally, announce the disk to the world */
593a9e7b 1820 set_capacity(disk, total_size / SECTOR_SIZE);
602adf40
YS
1821 add_disk(disk);
1822
1823 pr_info("%s: added with size 0x%llx\n",
1824 disk->disk_name, (unsigned long long)total_size);
1825 return 0;
1826
1827out_disk:
1828 put_disk(disk);
1829out:
1830 return rc;
1831}
1832
dfc5606d
YS
1833/*
1834 sysfs
1835*/
1836
593a9e7b
AE
1837static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1838{
1839 return container_of(dev, struct rbd_device, dev);
1840}
1841
dfc5606d
YS
1842static ssize_t rbd_size_show(struct device *dev,
1843 struct device_attribute *attr, char *buf)
1844{
593a9e7b 1845 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0
JD
1846 sector_t size;
1847
1848 down_read(&rbd_dev->header_rwsem);
1849 size = get_capacity(rbd_dev->disk);
1850 up_read(&rbd_dev->header_rwsem);
dfc5606d 1851
a51aa0c0 1852 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
dfc5606d
YS
1853}
1854
1855static ssize_t rbd_major_show(struct device *dev,
1856 struct device_attribute *attr, char *buf)
1857{
593a9e7b 1858 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 1859
dfc5606d
YS
1860 return sprintf(buf, "%d\n", rbd_dev->major);
1861}
1862
1863static ssize_t rbd_client_id_show(struct device *dev,
1864 struct device_attribute *attr, char *buf)
602adf40 1865{
593a9e7b 1866 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1867
1dbb4399
AE
1868 return sprintf(buf, "client%lld\n",
1869 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
1870}
1871
dfc5606d
YS
1872static ssize_t rbd_pool_show(struct device *dev,
1873 struct device_attribute *attr, char *buf)
602adf40 1874{
593a9e7b 1875 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1876
1877 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1878}
1879
9bb2f334
AE
1880static ssize_t rbd_pool_id_show(struct device *dev,
1881 struct device_attribute *attr, char *buf)
1882{
1883 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1884
1885 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1886}
1887
dfc5606d
YS
1888static ssize_t rbd_name_show(struct device *dev,
1889 struct device_attribute *attr, char *buf)
1890{
593a9e7b 1891 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 1892
0bed54dc 1893 return sprintf(buf, "%s\n", rbd_dev->image_name);
dfc5606d
YS
1894}
1895
1896static ssize_t rbd_snap_show(struct device *dev,
1897 struct device_attribute *attr,
1898 char *buf)
1899{
593a9e7b 1900 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1901
1902 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1903}
1904
1905static ssize_t rbd_image_refresh(struct device *dev,
1906 struct device_attribute *attr,
1907 const char *buf,
1908 size_t size)
1909{
593a9e7b 1910 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
1911 int rc;
1912 int ret = size;
602adf40
YS
1913
1914 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1915
263c6ca0 1916 rc = __rbd_refresh_header(rbd_dev);
dfc5606d
YS
1917 if (rc < 0)
1918 ret = rc;
602adf40 1919
dfc5606d
YS
1920 mutex_unlock(&ctl_mutex);
1921 return ret;
1922}
602adf40 1923
dfc5606d
YS
1924static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1925static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1926static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1927static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 1928static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d
YS
1929static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1930static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1931static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1932static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
dfc5606d
YS
1933
1934static struct attribute *rbd_attrs[] = {
1935 &dev_attr_size.attr,
1936 &dev_attr_major.attr,
1937 &dev_attr_client_id.attr,
1938 &dev_attr_pool.attr,
9bb2f334 1939 &dev_attr_pool_id.attr,
dfc5606d
YS
1940 &dev_attr_name.attr,
1941 &dev_attr_current_snap.attr,
1942 &dev_attr_refresh.attr,
1943 &dev_attr_create_snap.attr,
dfc5606d
YS
1944 NULL
1945};
1946
1947static struct attribute_group rbd_attr_group = {
1948 .attrs = rbd_attrs,
1949};
1950
1951static const struct attribute_group *rbd_attr_groups[] = {
1952 &rbd_attr_group,
1953 NULL
1954};
1955
1956static void rbd_sysfs_dev_release(struct device *dev)
1957{
1958}
1959
1960static struct device_type rbd_device_type = {
1961 .name = "rbd",
1962 .groups = rbd_attr_groups,
1963 .release = rbd_sysfs_dev_release,
1964};
1965
1966
1967/*
1968 sysfs - snapshots
1969*/
1970
1971static ssize_t rbd_snap_size_show(struct device *dev,
1972 struct device_attribute *attr,
1973 char *buf)
1974{
1975 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1976
3591538f 1977 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
dfc5606d
YS
1978}
1979
1980static ssize_t rbd_snap_id_show(struct device *dev,
1981 struct device_attribute *attr,
1982 char *buf)
1983{
1984 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1985
3591538f 1986 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
dfc5606d
YS
1987}
1988
1989static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1990static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1991
1992static struct attribute *rbd_snap_attrs[] = {
1993 &dev_attr_snap_size.attr,
1994 &dev_attr_snap_id.attr,
1995 NULL,
1996};
1997
1998static struct attribute_group rbd_snap_attr_group = {
1999 .attrs = rbd_snap_attrs,
2000};
2001
2002static void rbd_snap_dev_release(struct device *dev)
2003{
2004 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2005 kfree(snap->name);
2006 kfree(snap);
2007}
2008
2009static const struct attribute_group *rbd_snap_attr_groups[] = {
2010 &rbd_snap_attr_group,
2011 NULL
2012};
2013
2014static struct device_type rbd_snap_device_type = {
2015 .groups = rbd_snap_attr_groups,
2016 .release = rbd_snap_dev_release,
2017};
2018
2019static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2020 struct rbd_snap *snap)
2021{
2022 list_del(&snap->node);
2023 device_unregister(&snap->dev);
2024}
2025
2026static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2027 struct rbd_snap *snap,
2028 struct device *parent)
2029{
2030 struct device *dev = &snap->dev;
2031 int ret;
2032
2033 dev->type = &rbd_snap_device_type;
2034 dev->parent = parent;
2035 dev->release = rbd_snap_dev_release;
2036 dev_set_name(dev, "snap_%s", snap->name);
2037 ret = device_register(dev);
2038
2039 return ret;
2040}
2041
2042static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2043 int i, const char *name,
2044 struct rbd_snap **snapp)
2045{
2046 int ret;
2047 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2048 if (!snap)
2049 return -ENOMEM;
2050 snap->name = kstrdup(name, GFP_KERNEL);
2051 snap->size = rbd_dev->header.snap_sizes[i];
2052 snap->id = rbd_dev->header.snapc->snaps[i];
2053 if (device_is_registered(&rbd_dev->dev)) {
2054 ret = rbd_register_snap_dev(rbd_dev, snap,
2055 &rbd_dev->dev);
2056 if (ret < 0)
2057 goto err;
2058 }
2059 *snapp = snap;
2060 return 0;
2061err:
2062 kfree(snap->name);
2063 kfree(snap);
2064 return ret;
2065}
2066
2067/*
2068 * search for the previous snap in a null delimited string list
2069 */
2070const char *rbd_prev_snap_name(const char *name, const char *start)
2071{
2072 if (name < start + 2)
2073 return NULL;
2074
2075 name -= 2;
2076 while (*name) {
2077 if (name == start)
2078 return start;
2079 name--;
2080 }
2081 return name + 1;
2082}
2083
2084/*
2085 * compare the old list of snapshots that we have to what's in the header
2086 * and update it accordingly. Note that the header holds the snapshots
2087 * in a reverse order (from newest to oldest) and we need to go from
2088 * older to new so that we don't get a duplicate snap name when
2089 * doing the process (e.g., removed snapshot and recreated a new
2090 * one with the same name.
2091 */
2092static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2093{
2094 const char *name, *first_name;
2095 int i = rbd_dev->header.total_snaps;
2096 struct rbd_snap *snap, *old_snap = NULL;
2097 int ret;
2098 struct list_head *p, *n;
2099
2100 first_name = rbd_dev->header.snap_names;
2101 name = first_name + rbd_dev->header.snap_names_len;
2102
2103 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2104 u64 cur_id;
2105
2106 old_snap = list_entry(p, struct rbd_snap, node);
2107
2108 if (i)
2109 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2110
2111 if (!i || old_snap->id < cur_id) {
e88a36ec
JD
2112 /*
2113 * old_snap->id was skipped, thus was
2114 * removed. If this rbd_dev is mapped to
2115 * the removed snapshot, record that it no
2116 * longer exists, to prevent further I/O.
2117 */
2118 if (rbd_dev->snap_id == old_snap->id)
2119 rbd_dev->snap_exists = false;
dfc5606d
YS
2120 __rbd_remove_snap_dev(rbd_dev, old_snap);
2121 continue;
2122 }
2123 if (old_snap->id == cur_id) {
2124 /* we have this snapshot already */
2125 i--;
2126 name = rbd_prev_snap_name(name, first_name);
2127 continue;
2128 }
2129 for (; i > 0;
2130 i--, name = rbd_prev_snap_name(name, first_name)) {
2131 if (!name) {
2132 WARN_ON(1);
2133 return -EINVAL;
2134 }
2135 cur_id = rbd_dev->header.snapc->snaps[i];
2136 /* snapshot removal? handle it above */
2137 if (cur_id >= old_snap->id)
2138 break;
2139 /* a new snapshot */
2140 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2141 if (ret < 0)
2142 return ret;
2143
2144 /* note that we add it backward so using n and not p */
2145 list_add(&snap->node, n);
2146 p = &snap->node;
2147 }
2148 }
2149 /* we're done going over the old snap list, just add what's left */
2150 for (; i > 0; i--) {
2151 name = rbd_prev_snap_name(name, first_name);
2152 if (!name) {
2153 WARN_ON(1);
2154 return -EINVAL;
2155 }
2156 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2157 if (ret < 0)
2158 return ret;
2159 list_add(&snap->node, &rbd_dev->snaps);
2160 }
2161
2162 return 0;
2163}
2164
dfc5606d
YS
2165static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2166{
f0f8cef5 2167 int ret;
dfc5606d
YS
2168 struct device *dev;
2169 struct rbd_snap *snap;
2170
2171 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2172 dev = &rbd_dev->dev;
2173
2174 dev->bus = &rbd_bus_type;
2175 dev->type = &rbd_device_type;
2176 dev->parent = &rbd_root_dev;
2177 dev->release = rbd_dev_release;
2178 dev_set_name(dev, "%d", rbd_dev->id);
2179 ret = device_register(dev);
2180 if (ret < 0)
f0f8cef5 2181 goto out;
dfc5606d
YS
2182
2183 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2184 ret = rbd_register_snap_dev(rbd_dev, snap,
2185 &rbd_dev->dev);
2186 if (ret < 0)
602adf40
YS
2187 break;
2188 }
f0f8cef5 2189out:
dfc5606d
YS
2190 mutex_unlock(&ctl_mutex);
2191 return ret;
602adf40
YS
2192}
2193
dfc5606d
YS
2194static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2195{
2196 device_unregister(&rbd_dev->dev);
2197}
2198
59c2be1e
YS
2199static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2200{
2201 int ret, rc;
2202
2203 do {
0bed54dc 2204 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->header_name,
59c2be1e
YS
2205 rbd_dev->header.obj_version);
2206 if (ret == -ERANGE) {
2207 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
263c6ca0 2208 rc = __rbd_refresh_header(rbd_dev);
59c2be1e
YS
2209 mutex_unlock(&ctl_mutex);
2210 if (rc < 0)
2211 return rc;
2212 }
2213 } while (ret == -ERANGE);
2214
2215 return ret;
2216}
2217
1ddbe94e
AE
2218static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2219
2220/*
499afd5b
AE
2221 * Get a unique rbd identifier for the given new rbd_dev, and add
2222 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 2223 */
499afd5b 2224static void rbd_id_get(struct rbd_device *rbd_dev)
b7f23c36 2225{
499afd5b
AE
2226 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2227
2228 spin_lock(&rbd_dev_list_lock);
2229 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2230 spin_unlock(&rbd_dev_list_lock);
1ddbe94e 2231}
b7f23c36 2232
1ddbe94e 2233/*
499afd5b
AE
2234 * Remove an rbd_dev from the global list, and record that its
2235 * identifier is no longer in use.
1ddbe94e 2236 */
499afd5b 2237static void rbd_id_put(struct rbd_device *rbd_dev)
1ddbe94e 2238{
d184f6bf
AE
2239 struct list_head *tmp;
2240 int rbd_id = rbd_dev->id;
2241 int max_id;
2242
2243 BUG_ON(rbd_id < 1);
499afd5b
AE
2244
2245 spin_lock(&rbd_dev_list_lock);
2246 list_del_init(&rbd_dev->node);
d184f6bf
AE
2247
2248 /*
2249 * If the id being "put" is not the current maximum, there
2250 * is nothing special we need to do.
2251 */
2252 if (rbd_id != atomic64_read(&rbd_id_max)) {
2253 spin_unlock(&rbd_dev_list_lock);
2254 return;
2255 }
2256
2257 /*
2258 * We need to update the current maximum id. Search the
2259 * list to find out what it is. We're more likely to find
2260 * the maximum at the end, so search the list backward.
2261 */
2262 max_id = 0;
2263 list_for_each_prev(tmp, &rbd_dev_list) {
2264 struct rbd_device *rbd_dev;
2265
2266 rbd_dev = list_entry(tmp, struct rbd_device, node);
2267 if (rbd_id > max_id)
2268 max_id = rbd_id;
2269 }
499afd5b 2270 spin_unlock(&rbd_dev_list_lock);
b7f23c36 2271
1ddbe94e 2272 /*
d184f6bf
AE
2273 * The max id could have been updated by rbd_id_get(), in
2274 * which case it now accurately reflects the new maximum.
2275 * Be careful not to overwrite the maximum value in that
2276 * case.
1ddbe94e 2277 */
d184f6bf 2278 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
b7f23c36
AE
2279}
2280
e28fff26
AE
2281/*
2282 * Skips over white space at *buf, and updates *buf to point to the
2283 * first found non-space character (if any). Returns the length of
593a9e7b
AE
2284 * the token (string of non-white space characters) found. Note
2285 * that *buf must be terminated with '\0'.
e28fff26
AE
2286 */
2287static inline size_t next_token(const char **buf)
2288{
2289 /*
2290 * These are the characters that produce nonzero for
2291 * isspace() in the "C" and "POSIX" locales.
2292 */
2293 const char *spaces = " \f\n\r\t\v";
2294
2295 *buf += strspn(*buf, spaces); /* Find start of token */
2296
2297 return strcspn(*buf, spaces); /* Return token length */
2298}
2299
2300/*
2301 * Finds the next token in *buf, and if the provided token buffer is
2302 * big enough, copies the found token into it. The result, if
593a9e7b
AE
2303 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2304 * must be terminated with '\0' on entry.
e28fff26
AE
2305 *
2306 * Returns the length of the token found (not including the '\0').
2307 * Return value will be 0 if no token is found, and it will be >=
2308 * token_size if the token would not fit.
2309 *
593a9e7b 2310 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
2311 * found token. Note that this occurs even if the token buffer is
2312 * too small to hold it.
2313 */
2314static inline size_t copy_token(const char **buf,
2315 char *token,
2316 size_t token_size)
2317{
2318 size_t len;
2319
2320 len = next_token(buf);
2321 if (len < token_size) {
2322 memcpy(token, *buf, len);
2323 *(token + len) = '\0';
2324 }
2325 *buf += len;
2326
2327 return len;
2328}
2329
ea3352f4
AE
2330/*
2331 * Finds the next token in *buf, dynamically allocates a buffer big
2332 * enough to hold a copy of it, and copies the token into the new
2333 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2334 * that a duplicate buffer is created even for a zero-length token.
2335 *
2336 * Returns a pointer to the newly-allocated duplicate, or a null
2337 * pointer if memory for the duplicate was not available. If
2338 * the lenp argument is a non-null pointer, the length of the token
2339 * (not including the '\0') is returned in *lenp.
2340 *
2341 * If successful, the *buf pointer will be updated to point beyond
2342 * the end of the found token.
2343 *
2344 * Note: uses GFP_KERNEL for allocation.
2345 */
2346static inline char *dup_token(const char **buf, size_t *lenp)
2347{
2348 char *dup;
2349 size_t len;
2350
2351 len = next_token(buf);
2352 dup = kmalloc(len + 1, GFP_KERNEL);
2353 if (!dup)
2354 return NULL;
2355
2356 memcpy(dup, *buf, len);
2357 *(dup + len) = '\0';
2358 *buf += len;
2359
2360 if (lenp)
2361 *lenp = len;
2362
2363 return dup;
2364}
2365
a725f65e 2366/*
0bed54dc 2367 * This fills in the pool_name, image_name, image_name_len, snap_name,
a725f65e
AE
2368 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2369 * on the list of monitor addresses and other options provided via
2370 * /sys/bus/rbd/add.
d22f76e7
AE
2371 *
2372 * Note: rbd_dev is assumed to have been initially zero-filled.
a725f65e
AE
2373 */
2374static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2375 const char *buf,
7ef3214a 2376 const char **mon_addrs,
5214ecc4 2377 size_t *mon_addrs_size,
e28fff26 2378 char *options,
0bed54dc 2379 size_t options_size)
e28fff26 2380{
d22f76e7
AE
2381 size_t len;
2382 int ret;
e28fff26
AE
2383
2384 /* The first four tokens are required */
2385
7ef3214a
AE
2386 len = next_token(&buf);
2387 if (!len)
a725f65e 2388 return -EINVAL;
5214ecc4 2389 *mon_addrs_size = len + 1;
7ef3214a
AE
2390 *mon_addrs = buf;
2391
2392 buf += len;
a725f65e 2393
e28fff26
AE
2394 len = copy_token(&buf, options, options_size);
2395 if (!len || len >= options_size)
2396 return -EINVAL;
2397
bf3e5ae1 2398 ret = -ENOMEM;
d22f76e7
AE
2399 rbd_dev->pool_name = dup_token(&buf, NULL);
2400 if (!rbd_dev->pool_name)
d22f76e7 2401 goto out_err;
e28fff26 2402
0bed54dc
AE
2403 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2404 if (!rbd_dev->image_name)
bf3e5ae1 2405 goto out_err;
a725f65e 2406
cb8627c7
AE
2407 /* Create the name of the header object */
2408
0bed54dc 2409 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
bf3e5ae1
AE
2410 + sizeof (RBD_SUFFIX),
2411 GFP_KERNEL);
0bed54dc 2412 if (!rbd_dev->header_name)
cb8627c7 2413 goto out_err;
0bed54dc 2414 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
a725f65e 2415
e28fff26 2416 /*
820a5f3e
AE
2417 * The snapshot name is optional. If none is is supplied,
2418 * we use the default value.
e28fff26 2419 */
820a5f3e
AE
2420 rbd_dev->snap_name = dup_token(&buf, &len);
2421 if (!rbd_dev->snap_name)
2422 goto out_err;
2423 if (!len) {
2424 /* Replace the empty name with the default */
2425 kfree(rbd_dev->snap_name);
2426 rbd_dev->snap_name
2427 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2428 if (!rbd_dev->snap_name)
2429 goto out_err;
2430
e28fff26
AE
2431 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2432 sizeof (RBD_SNAP_HEAD_NAME));
849b4260 2433 }
e28fff26 2434
a725f65e 2435 return 0;
d22f76e7
AE
2436
2437out_err:
0bed54dc
AE
2438 kfree(rbd_dev->header_name);
2439 kfree(rbd_dev->image_name);
d22f76e7
AE
2440 kfree(rbd_dev->pool_name);
2441 rbd_dev->pool_name = NULL;
2442
2443 return ret;
a725f65e
AE
2444}
2445
59c2be1e
YS
2446static ssize_t rbd_add(struct bus_type *bus,
2447 const char *buf,
2448 size_t count)
602adf40 2449{
cb8627c7
AE
2450 char *options;
2451 struct rbd_device *rbd_dev = NULL;
7ef3214a
AE
2452 const char *mon_addrs = NULL;
2453 size_t mon_addrs_size = 0;
27cc2594
AE
2454 struct ceph_osd_client *osdc;
2455 int rc = -ENOMEM;
602adf40
YS
2456
2457 if (!try_module_get(THIS_MODULE))
2458 return -ENODEV;
2459
60571c7d 2460 options = kmalloc(count, GFP_KERNEL);
602adf40 2461 if (!options)
27cc2594 2462 goto err_nomem;
cb8627c7
AE
2463 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2464 if (!rbd_dev)
2465 goto err_nomem;
602adf40
YS
2466
2467 /* static rbd_device initialization */
2468 spin_lock_init(&rbd_dev->lock);
2469 INIT_LIST_HEAD(&rbd_dev->node);
dfc5606d 2470 INIT_LIST_HEAD(&rbd_dev->snaps);
c666601a 2471 init_rwsem(&rbd_dev->header_rwsem);
602adf40 2472
c666601a 2473 init_rwsem(&rbd_dev->header_rwsem);
0e805a1d 2474
d184f6bf 2475 /* generate unique id: find highest unique id, add one */
499afd5b 2476 rbd_id_get(rbd_dev);
602adf40 2477
a725f65e 2478 /* Fill in the device name, now that we have its id. */
81a89793
AE
2479 BUILD_BUG_ON(DEV_NAME_LEN
2480 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2481 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
a725f65e 2482
602adf40 2483 /* parse add command */
7ef3214a 2484 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
e28fff26 2485 options, count);
a725f65e 2486 if (rc)
f0f8cef5 2487 goto err_put_id;
e124a82f 2488
5214ecc4
AE
2489 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2490 options);
d720bcb0
AE
2491 if (IS_ERR(rbd_dev->rbd_client)) {
2492 rc = PTR_ERR(rbd_dev->rbd_client);
f0f8cef5 2493 goto err_put_id;
d720bcb0 2494 }
602adf40 2495
602adf40 2496 /* pick the pool */
1dbb4399 2497 osdc = &rbd_dev->rbd_client->client->osdc;
602adf40
YS
2498 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2499 if (rc < 0)
2500 goto err_out_client;
9bb2f334 2501 rbd_dev->pool_id = rc;
602adf40
YS
2502
2503 /* register our block device */
27cc2594
AE
2504 rc = register_blkdev(0, rbd_dev->name);
2505 if (rc < 0)
602adf40 2506 goto err_out_client;
27cc2594 2507 rbd_dev->major = rc;
602adf40 2508
dfc5606d
YS
2509 rc = rbd_bus_add_dev(rbd_dev);
2510 if (rc)
766fc439
YS
2511 goto err_out_blkdev;
2512
32eec68d
AE
2513 /*
2514 * At this point cleanup in the event of an error is the job
2515 * of the sysfs code (initiated by rbd_bus_del_dev()).
2516 *
2517 * Set up and announce blkdev mapping.
2518 */
602adf40
YS
2519 rc = rbd_init_disk(rbd_dev);
2520 if (rc)
766fc439 2521 goto err_out_bus;
602adf40 2522
59c2be1e
YS
2523 rc = rbd_init_watch_dev(rbd_dev);
2524 if (rc)
2525 goto err_out_bus;
2526
602adf40
YS
2527 return count;
2528
766fc439 2529err_out_bus:
766fc439
YS
2530 /* this will also clean up rest of rbd_dev stuff */
2531
2532 rbd_bus_del_dev(rbd_dev);
2533 kfree(options);
766fc439
YS
2534 return rc;
2535
602adf40
YS
2536err_out_blkdev:
2537 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2538err_out_client:
2539 rbd_put_client(rbd_dev);
f0f8cef5 2540err_put_id:
cb8627c7 2541 if (rbd_dev->pool_name) {
820a5f3e 2542 kfree(rbd_dev->snap_name);
0bed54dc
AE
2543 kfree(rbd_dev->header_name);
2544 kfree(rbd_dev->image_name);
cb8627c7
AE
2545 kfree(rbd_dev->pool_name);
2546 }
499afd5b 2547 rbd_id_put(rbd_dev);
27cc2594 2548err_nomem:
27cc2594 2549 kfree(rbd_dev);
cb8627c7 2550 kfree(options);
27cc2594 2551
602adf40
YS
2552 dout("Error adding device %s\n", buf);
2553 module_put(THIS_MODULE);
27cc2594
AE
2554
2555 return (ssize_t) rc;
602adf40
YS
2556}
2557
2558static struct rbd_device *__rbd_get_dev(unsigned long id)
2559{
2560 struct list_head *tmp;
2561 struct rbd_device *rbd_dev;
2562
e124a82f 2563 spin_lock(&rbd_dev_list_lock);
602adf40
YS
2564 list_for_each(tmp, &rbd_dev_list) {
2565 rbd_dev = list_entry(tmp, struct rbd_device, node);
e124a82f
AE
2566 if (rbd_dev->id == id) {
2567 spin_unlock(&rbd_dev_list_lock);
602adf40 2568 return rbd_dev;
e124a82f 2569 }
602adf40 2570 }
e124a82f 2571 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
2572 return NULL;
2573}
2574
dfc5606d 2575static void rbd_dev_release(struct device *dev)
602adf40 2576{
593a9e7b 2577 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 2578
1dbb4399
AE
2579 if (rbd_dev->watch_request) {
2580 struct ceph_client *client = rbd_dev->rbd_client->client;
2581
2582 ceph_osdc_unregister_linger_request(&client->osdc,
59c2be1e 2583 rbd_dev->watch_request);
1dbb4399 2584 }
59c2be1e 2585 if (rbd_dev->watch_event)
0bed54dc 2586 rbd_req_sync_unwatch(rbd_dev, rbd_dev->header_name);
59c2be1e 2587
602adf40
YS
2588 rbd_put_client(rbd_dev);
2589
2590 /* clean up and free blkdev */
2591 rbd_free_disk(rbd_dev);
2592 unregister_blkdev(rbd_dev->major, rbd_dev->name);
32eec68d
AE
2593
2594 /* done with the id, and with the rbd_dev */
820a5f3e 2595 kfree(rbd_dev->snap_name);
0bed54dc 2596 kfree(rbd_dev->header_name);
d22f76e7 2597 kfree(rbd_dev->pool_name);
0bed54dc 2598 kfree(rbd_dev->image_name);
32eec68d 2599 rbd_id_put(rbd_dev);
602adf40
YS
2600 kfree(rbd_dev);
2601
2602 /* release module ref */
2603 module_put(THIS_MODULE);
602adf40
YS
2604}
2605
dfc5606d
YS
2606static ssize_t rbd_remove(struct bus_type *bus,
2607 const char *buf,
2608 size_t count)
602adf40
YS
2609{
2610 struct rbd_device *rbd_dev = NULL;
2611 int target_id, rc;
2612 unsigned long ul;
2613 int ret = count;
2614
2615 rc = strict_strtoul(buf, 10, &ul);
2616 if (rc)
2617 return rc;
2618
2619 /* convert to int; abort if we lost anything in the conversion */
2620 target_id = (int) ul;
2621 if (target_id != ul)
2622 return -EINVAL;
2623
2624 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2625
2626 rbd_dev = __rbd_get_dev(target_id);
2627 if (!rbd_dev) {
2628 ret = -ENOENT;
2629 goto done;
2630 }
2631
dfc5606d
YS
2632 __rbd_remove_all_snaps(rbd_dev);
2633 rbd_bus_del_dev(rbd_dev);
602adf40
YS
2634
2635done:
2636 mutex_unlock(&ctl_mutex);
2637 return ret;
2638}
2639
dfc5606d
YS
2640static ssize_t rbd_snap_add(struct device *dev,
2641 struct device_attribute *attr,
2642 const char *buf,
2643 size_t count)
602adf40 2644{
593a9e7b 2645 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d
YS
2646 int ret;
2647 char *name = kmalloc(count + 1, GFP_KERNEL);
602adf40
YS
2648 if (!name)
2649 return -ENOMEM;
2650
dfc5606d 2651 snprintf(name, count, "%s", buf);
602adf40
YS
2652
2653 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2654
602adf40
YS
2655 ret = rbd_header_add_snap(rbd_dev,
2656 name, GFP_KERNEL);
2657 if (ret < 0)
59c2be1e 2658 goto err_unlock;
602adf40 2659
263c6ca0 2660 ret = __rbd_refresh_header(rbd_dev);
602adf40 2661 if (ret < 0)
59c2be1e
YS
2662 goto err_unlock;
2663
2664 /* shouldn't hold ctl_mutex when notifying.. notify might
2665 trigger a watch callback that would need to get that mutex */
2666 mutex_unlock(&ctl_mutex);
2667
2668 /* make a best effort, don't error if failed */
0bed54dc 2669 rbd_req_sync_notify(rbd_dev, rbd_dev->header_name);
602adf40
YS
2670
2671 ret = count;
59c2be1e
YS
2672 kfree(name);
2673 return ret;
2674
2675err_unlock:
602adf40 2676 mutex_unlock(&ctl_mutex);
602adf40
YS
2677 kfree(name);
2678 return ret;
2679}
2680
602adf40
YS
2681/*
2682 * create control files in sysfs
dfc5606d 2683 * /sys/bus/rbd/...
602adf40
YS
2684 */
2685static int rbd_sysfs_init(void)
2686{
dfc5606d 2687 int ret;
602adf40 2688
fed4c143 2689 ret = device_register(&rbd_root_dev);
21079786 2690 if (ret < 0)
dfc5606d 2691 return ret;
602adf40 2692
fed4c143
AE
2693 ret = bus_register(&rbd_bus_type);
2694 if (ret < 0)
2695 device_unregister(&rbd_root_dev);
602adf40 2696
602adf40
YS
2697 return ret;
2698}
2699
2700static void rbd_sysfs_cleanup(void)
2701{
dfc5606d 2702 bus_unregister(&rbd_bus_type);
fed4c143 2703 device_unregister(&rbd_root_dev);
602adf40
YS
2704}
2705
2706int __init rbd_init(void)
2707{
2708 int rc;
2709
2710 rc = rbd_sysfs_init();
2711 if (rc)
2712 return rc;
f0f8cef5 2713 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
2714 return 0;
2715}
2716
2717void __exit rbd_exit(void)
2718{
2719 rbd_sysfs_cleanup();
2720}
2721
2722module_init(rbd_init);
2723module_exit(rbd_exit);
2724
2725MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2726MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2727MODULE_DESCRIPTION("rados block device");
2728
2729/* following authorship retained from original osdblk.c */
2730MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2731
2732MODULE_LICENSE("GPL");