]> git.proxmox.com Git - mirror_ubuntu-artful-kernel.git/blame - drivers/block/rbd.c
rbd: wire up is_visible() sysfs callback for rbd bus
[mirror_ubuntu-artful-kernel.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
f8a22fc2 44#include <linux/idr.h>
602adf40
YS
45
46#include "rbd_types.h"
47
aafb230e
AE
48#define RBD_DEBUG /* Activate rbd_assert() calls */
49
593a9e7b
AE
50/*
51 * The basic unit of block I/O is a sector. It is interpreted in a
52 * number of contexts in Linux (blk, bio, genhd), but the default is
53 * universally 512 bytes. These symbols are just slightly more
54 * meaningful than the bare numbers they represent.
55 */
56#define SECTOR_SHIFT 9
57#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
58
a2acd00e
AE
59/*
60 * Increment the given counter and return its updated value.
61 * If the counter is already 0 it will not be incremented.
62 * If the counter is already at its maximum value returns
63 * -EINVAL without updating it.
64 */
65static int atomic_inc_return_safe(atomic_t *v)
66{
67 unsigned int counter;
68
69 counter = (unsigned int)__atomic_add_unless(v, 1, 0);
70 if (counter <= (unsigned int)INT_MAX)
71 return (int)counter;
72
73 atomic_dec(v);
74
75 return -EINVAL;
76}
77
78/* Decrement the counter. Return the resulting value, or -EINVAL */
79static int atomic_dec_return_safe(atomic_t *v)
80{
81 int counter;
82
83 counter = atomic_dec_return(v);
84 if (counter >= 0)
85 return counter;
86
87 atomic_inc(v);
88
89 return -EINVAL;
90}
91
f0f8cef5 92#define RBD_DRV_NAME "rbd"
602adf40
YS
93
94#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
95
d4b125e9
AE
96#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
97#define RBD_MAX_SNAP_NAME_LEN \
98 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
99
35d489f9 100#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
101
102#define RBD_SNAP_HEAD_NAME "-"
103
9682fc6d
AE
104#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
105
9e15b77d
AE
106/* This allows a single page to hold an image name sent by OSD */
107#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 108#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 109
1e130199 110#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 111
d889140c
AE
112/* Feature bits */
113
5cbf6f12
AE
114#define RBD_FEATURE_LAYERING (1<<0)
115#define RBD_FEATURE_STRIPINGV2 (1<<1)
116#define RBD_FEATURES_ALL \
117 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
118
119/* Features supported by this (client software) implementation. */
120
770eba6e 121#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 122
81a89793
AE
123/*
124 * An RBD device name will be "rbd#", where the "rbd" comes from
125 * RBD_DRV_NAME above, and # is a unique integer identifier.
126 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
127 * enough to hold all possible device names.
128 */
602adf40 129#define DEV_NAME_LEN 32
81a89793 130#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
131
132/*
133 * block device image metadata (in-memory version)
134 */
135struct rbd_image_header {
f35a4dee 136 /* These six fields never change for a given rbd image */
849b4260 137 char *object_prefix;
602adf40
YS
138 __u8 obj_order;
139 __u8 crypt_type;
140 __u8 comp_type;
f35a4dee
AE
141 u64 stripe_unit;
142 u64 stripe_count;
143 u64 features; /* Might be changeable someday? */
602adf40 144
f84344f3
AE
145 /* The remaining fields need to be updated occasionally */
146 u64 image_size;
147 struct ceph_snap_context *snapc;
f35a4dee
AE
148 char *snap_names; /* format 1 only */
149 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
150};
151
0d7dbfce
AE
152/*
153 * An rbd image specification.
154 *
155 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
156 * identify an image. Each rbd_dev structure includes a pointer to
157 * an rbd_spec structure that encapsulates this identity.
158 *
159 * Each of the id's in an rbd_spec has an associated name. For a
160 * user-mapped image, the names are supplied and the id's associated
161 * with them are looked up. For a layered image, a parent image is
162 * defined by the tuple, and the names are looked up.
163 *
164 * An rbd_dev structure contains a parent_spec pointer which is
165 * non-null if the image it represents is a child in a layered
166 * image. This pointer will refer to the rbd_spec structure used
167 * by the parent rbd_dev for its own identity (i.e., the structure
168 * is shared between the parent and child).
169 *
170 * Since these structures are populated once, during the discovery
171 * phase of image construction, they are effectively immutable so
172 * we make no effort to synchronize access to them.
173 *
174 * Note that code herein does not assume the image name is known (it
175 * could be a null pointer).
0d7dbfce
AE
176 */
177struct rbd_spec {
178 u64 pool_id;
ecb4dc22 179 const char *pool_name;
0d7dbfce 180
ecb4dc22
AE
181 const char *image_id;
182 const char *image_name;
0d7dbfce
AE
183
184 u64 snap_id;
ecb4dc22 185 const char *snap_name;
0d7dbfce
AE
186
187 struct kref kref;
188};
189
602adf40 190/*
f0f8cef5 191 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
192 */
193struct rbd_client {
194 struct ceph_client *client;
195 struct kref kref;
196 struct list_head node;
197};
198
bf0d5f50
AE
199struct rbd_img_request;
200typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
201
202#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
203
204struct rbd_obj_request;
205typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
206
9969ebc5
AE
207enum obj_request_type {
208 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
209};
bf0d5f50 210
926f9b3f
AE
211enum obj_req_flags {
212 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 213 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
214 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
215 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
216};
217
bf0d5f50
AE
218struct rbd_obj_request {
219 const char *object_name;
220 u64 offset; /* object start byte */
221 u64 length; /* bytes from offset */
926f9b3f 222 unsigned long flags;
bf0d5f50 223
c5b5ef6c
AE
224 /*
225 * An object request associated with an image will have its
226 * img_data flag set; a standalone object request will not.
227 *
228 * A standalone object request will have which == BAD_WHICH
229 * and a null obj_request pointer.
230 *
231 * An object request initiated in support of a layered image
232 * object (to check for its existence before a write) will
233 * have which == BAD_WHICH and a non-null obj_request pointer.
234 *
235 * Finally, an object request for rbd image data will have
236 * which != BAD_WHICH, and will have a non-null img_request
237 * pointer. The value of which will be in the range
238 * 0..(img_request->obj_request_count-1).
239 */
240 union {
241 struct rbd_obj_request *obj_request; /* STAT op */
242 struct {
243 struct rbd_img_request *img_request;
244 u64 img_offset;
245 /* links for img_request->obj_requests list */
246 struct list_head links;
247 };
248 };
bf0d5f50
AE
249 u32 which; /* posn image request list */
250
251 enum obj_request_type type;
788e2df3
AE
252 union {
253 struct bio *bio_list;
254 struct {
255 struct page **pages;
256 u32 page_count;
257 };
258 };
0eefd470 259 struct page **copyup_pages;
ebda6408 260 u32 copyup_page_count;
bf0d5f50
AE
261
262 struct ceph_osd_request *osd_req;
263
264 u64 xferred; /* bytes transferred */
1b83bef2 265 int result;
bf0d5f50
AE
266
267 rbd_obj_callback_t callback;
788e2df3 268 struct completion completion;
bf0d5f50
AE
269
270 struct kref kref;
271};
272
0c425248 273enum img_req_flags {
9849e986
AE
274 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
275 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 276 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
277};
278
bf0d5f50 279struct rbd_img_request {
bf0d5f50
AE
280 struct rbd_device *rbd_dev;
281 u64 offset; /* starting image byte offset */
282 u64 length; /* byte count from offset */
0c425248 283 unsigned long flags;
bf0d5f50 284 union {
9849e986 285 u64 snap_id; /* for reads */
bf0d5f50 286 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
287 };
288 union {
289 struct request *rq; /* block request */
290 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 291 };
3d7efd18 292 struct page **copyup_pages;
ebda6408 293 u32 copyup_page_count;
bf0d5f50
AE
294 spinlock_t completion_lock;/* protects next_completion */
295 u32 next_completion;
296 rbd_img_callback_t callback;
55f27e09 297 u64 xferred;/* aggregate bytes transferred */
a5a337d4 298 int result; /* first nonzero obj_request result */
bf0d5f50
AE
299
300 u32 obj_request_count;
301 struct list_head obj_requests; /* rbd_obj_request structs */
302
303 struct kref kref;
304};
305
306#define for_each_obj_request(ireq, oreq) \
ef06f4d3 307 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 308#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 309 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 310#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 311 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 312
f84344f3 313struct rbd_mapping {
99c1f08f 314 u64 size;
34b13184 315 u64 features;
f84344f3
AE
316 bool read_only;
317};
318
602adf40
YS
319/*
320 * a single device
321 */
322struct rbd_device {
de71a297 323 int dev_id; /* blkdev unique id */
602adf40
YS
324
325 int major; /* blkdev assigned major */
dd82fff1 326 int minor;
602adf40 327 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 328
a30b71b9 329 u32 image_format; /* Either 1 or 2 */
602adf40
YS
330 struct rbd_client *rbd_client;
331
332 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
333
b82d167b 334 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
335
336 struct rbd_image_header header;
b82d167b 337 unsigned long flags; /* possibly lock protected */
0d7dbfce 338 struct rbd_spec *spec;
602adf40 339
0d7dbfce 340 char *header_name;
971f839a 341
0903e875
AE
342 struct ceph_file_layout layout;
343
59c2be1e 344 struct ceph_osd_event *watch_event;
975241af 345 struct rbd_obj_request *watch_request;
59c2be1e 346
86b00e0d
AE
347 struct rbd_spec *parent_spec;
348 u64 parent_overlap;
a2acd00e 349 atomic_t parent_ref;
2f82ee54 350 struct rbd_device *parent;
86b00e0d 351
c666601a
JD
352 /* protects updating the header */
353 struct rw_semaphore header_rwsem;
f84344f3
AE
354
355 struct rbd_mapping mapping;
602adf40
YS
356
357 struct list_head node;
dfc5606d 358
dfc5606d
YS
359 /* sysfs related */
360 struct device dev;
b82d167b 361 unsigned long open_count; /* protected by lock */
dfc5606d
YS
362};
363
b82d167b
AE
364/*
365 * Flag bits for rbd_dev->flags. If atomicity is required,
366 * rbd_dev->lock is used to protect access.
367 *
368 * Currently, only the "removing" flag (which is coupled with the
369 * "open_count" field) requires atomic access.
370 */
6d292906
AE
371enum rbd_dev_flags {
372 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 373 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
374};
375
cfbf6377 376static DEFINE_MUTEX(client_mutex); /* Serialize client creation */
e124a82f 377
602adf40 378static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
379static DEFINE_SPINLOCK(rbd_dev_list_lock);
380
432b8587
AE
381static LIST_HEAD(rbd_client_list); /* clients */
382static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 383
78c2a44a
AE
384/* Slab caches for frequently-allocated structures */
385
1c2a9dfe 386static struct kmem_cache *rbd_img_request_cache;
868311b1 387static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 388static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 389
f8a22fc2
ID
390static DEFINE_IDA(rbd_dev_id_ida);
391
3d7efd18
AE
392static int rbd_img_request_submit(struct rbd_img_request *img_request);
393
200a6a8b 394static void rbd_dev_device_release(struct device *dev);
dfc5606d 395
f0f8cef5
AE
396static ssize_t rbd_add(struct bus_type *bus, const char *buf,
397 size_t count);
398static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
399 size_t count);
1f3ef788 400static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping);
a2acd00e 401static void rbd_spec_put(struct rbd_spec *spec);
f0f8cef5 402
b15a21dd
GKH
403static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
404static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
405
406static struct attribute *rbd_bus_attrs[] = {
407 &bus_attr_add.attr,
408 &bus_attr_remove.attr,
409 NULL,
f0f8cef5 410};
92c76dc0
ID
411
412static umode_t rbd_bus_is_visible(struct kobject *kobj,
413 struct attribute *attr, int index)
414{
415 return attr->mode;
416}
417
418static const struct attribute_group rbd_bus_group = {
419 .attrs = rbd_bus_attrs,
420 .is_visible = rbd_bus_is_visible,
421};
422__ATTRIBUTE_GROUPS(rbd_bus);
f0f8cef5
AE
423
424static struct bus_type rbd_bus_type = {
425 .name = "rbd",
b15a21dd 426 .bus_groups = rbd_bus_groups,
f0f8cef5
AE
427};
428
429static void rbd_root_dev_release(struct device *dev)
430{
431}
432
433static struct device rbd_root_dev = {
434 .init_name = "rbd",
435 .release = rbd_root_dev_release,
436};
437
06ecc6cb
AE
438static __printf(2, 3)
439void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
440{
441 struct va_format vaf;
442 va_list args;
443
444 va_start(args, fmt);
445 vaf.fmt = fmt;
446 vaf.va = &args;
447
448 if (!rbd_dev)
449 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
450 else if (rbd_dev->disk)
451 printk(KERN_WARNING "%s: %s: %pV\n",
452 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
453 else if (rbd_dev->spec && rbd_dev->spec->image_name)
454 printk(KERN_WARNING "%s: image %s: %pV\n",
455 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
456 else if (rbd_dev->spec && rbd_dev->spec->image_id)
457 printk(KERN_WARNING "%s: id %s: %pV\n",
458 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
459 else /* punt */
460 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
461 RBD_DRV_NAME, rbd_dev, &vaf);
462 va_end(args);
463}
464
aafb230e
AE
465#ifdef RBD_DEBUG
466#define rbd_assert(expr) \
467 if (unlikely(!(expr))) { \
468 printk(KERN_ERR "\nAssertion failure in %s() " \
469 "at line %d:\n\n" \
470 "\trbd_assert(%s);\n\n", \
471 __func__, __LINE__, #expr); \
472 BUG(); \
473 }
474#else /* !RBD_DEBUG */
475# define rbd_assert(expr) ((void) 0)
476#endif /* !RBD_DEBUG */
dfc5606d 477
b454e36d 478static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
479static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
480static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 481
cc4a38bd 482static int rbd_dev_refresh(struct rbd_device *rbd_dev);
2df3fac7
AE
483static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev);
484static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev);
54cac61f
AE
485static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
486 u64 snap_id);
2ad3d716
AE
487static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
488 u8 *order, u64 *snap_size);
489static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
490 u64 *snap_features);
491static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 492
602adf40
YS
493static int rbd_open(struct block_device *bdev, fmode_t mode)
494{
f0f8cef5 495 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 496 bool removing = false;
602adf40 497
f84344f3 498 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
499 return -EROFS;
500
a14ea269 501 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
502 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
503 removing = true;
504 else
505 rbd_dev->open_count++;
a14ea269 506 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
507 if (removing)
508 return -ENOENT;
509
c3e946ce 510 (void) get_device(&rbd_dev->dev);
f84344f3 511 set_device_ro(bdev, rbd_dev->mapping.read_only);
340c7a2b 512
602adf40
YS
513 return 0;
514}
515
db2a144b 516static void rbd_release(struct gendisk *disk, fmode_t mode)
dfc5606d
YS
517{
518 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
519 unsigned long open_count_before;
520
a14ea269 521 spin_lock_irq(&rbd_dev->lock);
b82d167b 522 open_count_before = rbd_dev->open_count--;
a14ea269 523 spin_unlock_irq(&rbd_dev->lock);
b82d167b 524 rbd_assert(open_count_before > 0);
dfc5606d 525
c3e946ce 526 put_device(&rbd_dev->dev);
dfc5606d
YS
527}
528
602adf40
YS
529static const struct block_device_operations rbd_bd_ops = {
530 .owner = THIS_MODULE,
531 .open = rbd_open,
dfc5606d 532 .release = rbd_release,
602adf40
YS
533};
534
535/*
7262cfca 536 * Initialize an rbd client instance. Success or not, this function
cfbf6377 537 * consumes ceph_opts. Caller holds client_mutex.
602adf40 538 */
f8c38929 539static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
540{
541 struct rbd_client *rbdc;
542 int ret = -ENOMEM;
543
37206ee5 544 dout("%s:\n", __func__);
602adf40
YS
545 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
546 if (!rbdc)
547 goto out_opt;
548
549 kref_init(&rbdc->kref);
550 INIT_LIST_HEAD(&rbdc->node);
551
43ae4701 552 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 553 if (IS_ERR(rbdc->client))
08f75463 554 goto out_rbdc;
43ae4701 555 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
556
557 ret = ceph_open_session(rbdc->client);
558 if (ret < 0)
08f75463 559 goto out_client;
602adf40 560
432b8587 561 spin_lock(&rbd_client_list_lock);
602adf40 562 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 563 spin_unlock(&rbd_client_list_lock);
602adf40 564
37206ee5 565 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 566
602adf40 567 return rbdc;
08f75463 568out_client:
602adf40 569 ceph_destroy_client(rbdc->client);
08f75463 570out_rbdc:
602adf40
YS
571 kfree(rbdc);
572out_opt:
43ae4701
AE
573 if (ceph_opts)
574 ceph_destroy_options(ceph_opts);
37206ee5
AE
575 dout("%s: error %d\n", __func__, ret);
576
28f259b7 577 return ERR_PTR(ret);
602adf40
YS
578}
579
2f82ee54
AE
580static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
581{
582 kref_get(&rbdc->kref);
583
584 return rbdc;
585}
586
602adf40 587/*
1f7ba331
AE
588 * Find a ceph client with specific addr and configuration. If
589 * found, bump its reference count.
602adf40 590 */
1f7ba331 591static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
592{
593 struct rbd_client *client_node;
1f7ba331 594 bool found = false;
602adf40 595
43ae4701 596 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
597 return NULL;
598
1f7ba331
AE
599 spin_lock(&rbd_client_list_lock);
600 list_for_each_entry(client_node, &rbd_client_list, node) {
601 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
602 __rbd_get_client(client_node);
603
1f7ba331
AE
604 found = true;
605 break;
606 }
607 }
608 spin_unlock(&rbd_client_list_lock);
609
610 return found ? client_node : NULL;
602adf40
YS
611}
612
59c2be1e
YS
613/*
614 * mount options
615 */
616enum {
59c2be1e
YS
617 Opt_last_int,
618 /* int args above */
619 Opt_last_string,
620 /* string args above */
cc0538b6
AE
621 Opt_read_only,
622 Opt_read_write,
623 /* Boolean args above */
624 Opt_last_bool,
59c2be1e
YS
625};
626
43ae4701 627static match_table_t rbd_opts_tokens = {
59c2be1e
YS
628 /* int args above */
629 /* string args above */
be466c1c 630 {Opt_read_only, "read_only"},
cc0538b6
AE
631 {Opt_read_only, "ro"}, /* Alternate spelling */
632 {Opt_read_write, "read_write"},
633 {Opt_read_write, "rw"}, /* Alternate spelling */
634 /* Boolean args above */
59c2be1e
YS
635 {-1, NULL}
636};
637
98571b5a
AE
638struct rbd_options {
639 bool read_only;
640};
641
642#define RBD_READ_ONLY_DEFAULT false
643
59c2be1e
YS
644static int parse_rbd_opts_token(char *c, void *private)
645{
43ae4701 646 struct rbd_options *rbd_opts = private;
59c2be1e
YS
647 substring_t argstr[MAX_OPT_ARGS];
648 int token, intval, ret;
649
43ae4701 650 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
651 if (token < 0)
652 return -EINVAL;
653
654 if (token < Opt_last_int) {
655 ret = match_int(&argstr[0], &intval);
656 if (ret < 0) {
657 pr_err("bad mount option arg (not int) "
658 "at '%s'\n", c);
659 return ret;
660 }
661 dout("got int token %d val %d\n", token, intval);
662 } else if (token > Opt_last_int && token < Opt_last_string) {
663 dout("got string token %d val %s\n", token,
664 argstr[0].from);
cc0538b6
AE
665 } else if (token > Opt_last_string && token < Opt_last_bool) {
666 dout("got Boolean token %d\n", token);
59c2be1e
YS
667 } else {
668 dout("got token %d\n", token);
669 }
670
671 switch (token) {
cc0538b6
AE
672 case Opt_read_only:
673 rbd_opts->read_only = true;
674 break;
675 case Opt_read_write:
676 rbd_opts->read_only = false;
677 break;
59c2be1e 678 default:
aafb230e
AE
679 rbd_assert(false);
680 break;
59c2be1e
YS
681 }
682 return 0;
683}
684
602adf40
YS
685/*
686 * Get a ceph client with specific addr and configuration, if one does
7262cfca
AE
687 * not exist create it. Either way, ceph_opts is consumed by this
688 * function.
602adf40 689 */
9d3997fd 690static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 691{
f8c38929 692 struct rbd_client *rbdc;
59c2be1e 693
cfbf6377 694 mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
1f7ba331 695 rbdc = rbd_client_find(ceph_opts);
9d3997fd 696 if (rbdc) /* using an existing client */
43ae4701 697 ceph_destroy_options(ceph_opts);
9d3997fd 698 else
f8c38929 699 rbdc = rbd_client_create(ceph_opts);
cfbf6377 700 mutex_unlock(&client_mutex);
602adf40 701
9d3997fd 702 return rbdc;
602adf40
YS
703}
704
705/*
706 * Destroy ceph client
d23a4b3f 707 *
432b8587 708 * Caller must hold rbd_client_list_lock.
602adf40
YS
709 */
710static void rbd_client_release(struct kref *kref)
711{
712 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
713
37206ee5 714 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 715 spin_lock(&rbd_client_list_lock);
602adf40 716 list_del(&rbdc->node);
cd9d9f5d 717 spin_unlock(&rbd_client_list_lock);
602adf40
YS
718
719 ceph_destroy_client(rbdc->client);
720 kfree(rbdc);
721}
722
723/*
724 * Drop reference to ceph client node. If it's not referenced anymore, release
725 * it.
726 */
9d3997fd 727static void rbd_put_client(struct rbd_client *rbdc)
602adf40 728{
c53d5893
AE
729 if (rbdc)
730 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
731}
732
a30b71b9
AE
733static bool rbd_image_format_valid(u32 image_format)
734{
735 return image_format == 1 || image_format == 2;
736}
737
8e94af8e
AE
738static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
739{
103a150f
AE
740 size_t size;
741 u32 snap_count;
742
743 /* The header has to start with the magic rbd header text */
744 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
745 return false;
746
db2388b6
AE
747 /* The bio layer requires at least sector-sized I/O */
748
749 if (ondisk->options.order < SECTOR_SHIFT)
750 return false;
751
752 /* If we use u64 in a few spots we may be able to loosen this */
753
754 if (ondisk->options.order > 8 * sizeof (int) - 1)
755 return false;
756
103a150f
AE
757 /*
758 * The size of a snapshot header has to fit in a size_t, and
759 * that limits the number of snapshots.
760 */
761 snap_count = le32_to_cpu(ondisk->snap_count);
762 size = SIZE_MAX - sizeof (struct ceph_snap_context);
763 if (snap_count > size / sizeof (__le64))
764 return false;
765
766 /*
767 * Not only that, but the size of the entire the snapshot
768 * header must also be representable in a size_t.
769 */
770 size -= snap_count * sizeof (__le64);
771 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
772 return false;
773
774 return true;
8e94af8e
AE
775}
776
602adf40 777/*
bb23e37a
AE
778 * Fill an rbd image header with information from the given format 1
779 * on-disk header.
602adf40 780 */
662518b1 781static int rbd_header_from_disk(struct rbd_device *rbd_dev,
4156d998 782 struct rbd_image_header_ondisk *ondisk)
602adf40 783{
662518b1 784 struct rbd_image_header *header = &rbd_dev->header;
bb23e37a
AE
785 bool first_time = header->object_prefix == NULL;
786 struct ceph_snap_context *snapc;
787 char *object_prefix = NULL;
788 char *snap_names = NULL;
789 u64 *snap_sizes = NULL;
ccece235 790 u32 snap_count;
d2bb24e5 791 size_t size;
bb23e37a 792 int ret = -ENOMEM;
621901d6 793 u32 i;
602adf40 794
bb23e37a 795 /* Allocate this now to avoid having to handle failure below */
6a52325f 796
bb23e37a
AE
797 if (first_time) {
798 size_t len;
103a150f 799
bb23e37a
AE
800 len = strnlen(ondisk->object_prefix,
801 sizeof (ondisk->object_prefix));
802 object_prefix = kmalloc(len + 1, GFP_KERNEL);
803 if (!object_prefix)
804 return -ENOMEM;
805 memcpy(object_prefix, ondisk->object_prefix, len);
806 object_prefix[len] = '\0';
807 }
00f1f36f 808
bb23e37a 809 /* Allocate the snapshot context and fill it in */
00f1f36f 810
bb23e37a
AE
811 snap_count = le32_to_cpu(ondisk->snap_count);
812 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
813 if (!snapc)
814 goto out_err;
815 snapc->seq = le64_to_cpu(ondisk->snap_seq);
602adf40 816 if (snap_count) {
bb23e37a 817 struct rbd_image_snap_ondisk *snaps;
f785cc1d
AE
818 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
819
bb23e37a 820 /* We'll keep a copy of the snapshot names... */
621901d6 821
bb23e37a
AE
822 if (snap_names_len > (u64)SIZE_MAX)
823 goto out_2big;
824 snap_names = kmalloc(snap_names_len, GFP_KERNEL);
825 if (!snap_names)
6a52325f
AE
826 goto out_err;
827
bb23e37a 828 /* ...as well as the array of their sizes. */
621901d6 829
d2bb24e5 830 size = snap_count * sizeof (*header->snap_sizes);
bb23e37a
AE
831 snap_sizes = kmalloc(size, GFP_KERNEL);
832 if (!snap_sizes)
6a52325f 833 goto out_err;
bb23e37a 834
f785cc1d 835 /*
bb23e37a
AE
836 * Copy the names, and fill in each snapshot's id
837 * and size.
838 *
99a41ebc 839 * Note that rbd_dev_v1_header_info() guarantees the
bb23e37a 840 * ondisk buffer we're working with has
f785cc1d
AE
841 * snap_names_len bytes beyond the end of the
842 * snapshot id array, this memcpy() is safe.
843 */
bb23e37a
AE
844 memcpy(snap_names, &ondisk->snaps[snap_count], snap_names_len);
845 snaps = ondisk->snaps;
846 for (i = 0; i < snap_count; i++) {
847 snapc->snaps[i] = le64_to_cpu(snaps[i].id);
848 snap_sizes[i] = le64_to_cpu(snaps[i].image_size);
849 }
602adf40 850 }
6a52325f 851
bb23e37a 852 /* We won't fail any more, fill in the header */
621901d6 853
bb23e37a
AE
854 if (first_time) {
855 header->object_prefix = object_prefix;
856 header->obj_order = ondisk->options.order;
857 header->crypt_type = ondisk->options.crypt_type;
858 header->comp_type = ondisk->options.comp_type;
859 /* The rest aren't used for format 1 images */
860 header->stripe_unit = 0;
861 header->stripe_count = 0;
862 header->features = 0;
602adf40 863 } else {
662518b1
AE
864 ceph_put_snap_context(header->snapc);
865 kfree(header->snap_names);
866 kfree(header->snap_sizes);
602adf40 867 }
849b4260 868
bb23e37a 869 /* The remaining fields always get updated (when we refresh) */
621901d6 870
f84344f3 871 header->image_size = le64_to_cpu(ondisk->image_size);
bb23e37a
AE
872 header->snapc = snapc;
873 header->snap_names = snap_names;
874 header->snap_sizes = snap_sizes;
468521c1 875
662518b1 876 /* Make sure mapping size is consistent with header info */
602adf40 877
662518b1
AE
878 if (rbd_dev->spec->snap_id == CEPH_NOSNAP || first_time)
879 if (rbd_dev->mapping.size != header->image_size)
880 rbd_dev->mapping.size = header->image_size;
881
602adf40 882 return 0;
bb23e37a
AE
883out_2big:
884 ret = -EIO;
6a52325f 885out_err:
bb23e37a
AE
886 kfree(snap_sizes);
887 kfree(snap_names);
888 ceph_put_snap_context(snapc);
889 kfree(object_prefix);
ccece235 890
bb23e37a 891 return ret;
602adf40
YS
892}
893
9682fc6d
AE
894static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
895{
896 const char *snap_name;
897
898 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
899
900 /* Skip over names until we find the one we are looking for */
901
902 snap_name = rbd_dev->header.snap_names;
903 while (which--)
904 snap_name += strlen(snap_name) + 1;
905
906 return kstrdup(snap_name, GFP_KERNEL);
907}
908
30d1cff8
AE
909/*
910 * Snapshot id comparison function for use with qsort()/bsearch().
911 * Note that result is for snapshots in *descending* order.
912 */
913static int snapid_compare_reverse(const void *s1, const void *s2)
914{
915 u64 snap_id1 = *(u64 *)s1;
916 u64 snap_id2 = *(u64 *)s2;
917
918 if (snap_id1 < snap_id2)
919 return 1;
920 return snap_id1 == snap_id2 ? 0 : -1;
921}
922
923/*
924 * Search a snapshot context to see if the given snapshot id is
925 * present.
926 *
927 * Returns the position of the snapshot id in the array if it's found,
928 * or BAD_SNAP_INDEX otherwise.
929 *
930 * Note: The snapshot array is in kept sorted (by the osd) in
931 * reverse order, highest snapshot id first.
932 */
9682fc6d
AE
933static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
934{
935 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 936 u64 *found;
9682fc6d 937
30d1cff8
AE
938 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
939 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 940
30d1cff8 941 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
942}
943
2ad3d716
AE
944static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
945 u64 snap_id)
9e15b77d 946{
54cac61f 947 u32 which;
da6a6b63 948 const char *snap_name;
9e15b77d 949
54cac61f
AE
950 which = rbd_dev_snap_index(rbd_dev, snap_id);
951 if (which == BAD_SNAP_INDEX)
da6a6b63 952 return ERR_PTR(-ENOENT);
54cac61f 953
da6a6b63
JD
954 snap_name = _rbd_dev_v1_snap_name(rbd_dev, which);
955 return snap_name ? snap_name : ERR_PTR(-ENOMEM);
54cac61f
AE
956}
957
958static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
959{
9e15b77d
AE
960 if (snap_id == CEPH_NOSNAP)
961 return RBD_SNAP_HEAD_NAME;
962
54cac61f
AE
963 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
964 if (rbd_dev->image_format == 1)
965 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 966
54cac61f 967 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
968}
969
2ad3d716
AE
970static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
971 u64 *snap_size)
602adf40 972{
2ad3d716
AE
973 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
974 if (snap_id == CEPH_NOSNAP) {
975 *snap_size = rbd_dev->header.image_size;
976 } else if (rbd_dev->image_format == 1) {
977 u32 which;
602adf40 978
2ad3d716
AE
979 which = rbd_dev_snap_index(rbd_dev, snap_id);
980 if (which == BAD_SNAP_INDEX)
981 return -ENOENT;
e86924a8 982
2ad3d716
AE
983 *snap_size = rbd_dev->header.snap_sizes[which];
984 } else {
985 u64 size = 0;
986 int ret;
987
988 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
989 if (ret)
990 return ret;
991
992 *snap_size = size;
993 }
994 return 0;
602adf40
YS
995}
996
2ad3d716
AE
997static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
998 u64 *snap_features)
602adf40 999{
2ad3d716
AE
1000 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
1001 if (snap_id == CEPH_NOSNAP) {
1002 *snap_features = rbd_dev->header.features;
1003 } else if (rbd_dev->image_format == 1) {
1004 *snap_features = 0; /* No features for format 1 */
602adf40 1005 } else {
2ad3d716
AE
1006 u64 features = 0;
1007 int ret;
8b0241f8 1008
2ad3d716
AE
1009 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
1010 if (ret)
1011 return ret;
1012
1013 *snap_features = features;
1014 }
1015 return 0;
1016}
1017
1018static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
1019{
8f4b7d98 1020 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
1021 u64 size = 0;
1022 u64 features = 0;
1023 int ret;
1024
2ad3d716
AE
1025 ret = rbd_snap_size(rbd_dev, snap_id, &size);
1026 if (ret)
1027 return ret;
1028 ret = rbd_snap_features(rbd_dev, snap_id, &features);
1029 if (ret)
1030 return ret;
1031
1032 rbd_dev->mapping.size = size;
1033 rbd_dev->mapping.features = features;
1034
8b0241f8 1035 return 0;
602adf40
YS
1036}
1037
d1cf5788
AE
1038static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
1039{
1040 rbd_dev->mapping.size = 0;
1041 rbd_dev->mapping.features = 0;
200a6a8b
AE
1042}
1043
98571b5a 1044static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 1045{
65ccfe21
AE
1046 char *name;
1047 u64 segment;
1048 int ret;
3a96d5cd 1049 char *name_format;
602adf40 1050
78c2a44a 1051 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
1052 if (!name)
1053 return NULL;
1054 segment = offset >> rbd_dev->header.obj_order;
3a96d5cd
JD
1055 name_format = "%s.%012llx";
1056 if (rbd_dev->image_format == 2)
1057 name_format = "%s.%016llx";
1058 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, name_format,
65ccfe21 1059 rbd_dev->header.object_prefix, segment);
2fd82b9e 1060 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
1061 pr_err("error formatting segment name for #%llu (%d)\n",
1062 segment, ret);
1063 kfree(name);
1064 name = NULL;
1065 }
602adf40 1066
65ccfe21
AE
1067 return name;
1068}
602adf40 1069
78c2a44a
AE
1070static void rbd_segment_name_free(const char *name)
1071{
1072 /* The explicit cast here is needed to drop the const qualifier */
1073
1074 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1075}
1076
65ccfe21
AE
1077static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1078{
1079 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1080
65ccfe21
AE
1081 return offset & (segment_size - 1);
1082}
1083
1084static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1085 u64 offset, u64 length)
1086{
1087 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1088
1089 offset &= segment_size - 1;
1090
aafb230e 1091 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1092 if (offset + length > segment_size)
1093 length = segment_size - offset;
1094
1095 return length;
602adf40
YS
1096}
1097
029bcbd8
JD
1098/*
1099 * returns the size of an object in the image
1100 */
1101static u64 rbd_obj_bytes(struct rbd_image_header *header)
1102{
1103 return 1 << header->obj_order;
1104}
1105
602adf40
YS
1106/*
1107 * bio helpers
1108 */
1109
1110static void bio_chain_put(struct bio *chain)
1111{
1112 struct bio *tmp;
1113
1114 while (chain) {
1115 tmp = chain;
1116 chain = chain->bi_next;
1117 bio_put(tmp);
1118 }
1119}
1120
1121/*
1122 * zeros a bio chain, starting at specific offset
1123 */
1124static void zero_bio_chain(struct bio *chain, int start_ofs)
1125{
1126 struct bio_vec *bv;
1127 unsigned long flags;
1128 void *buf;
1129 int i;
1130 int pos = 0;
1131
1132 while (chain) {
1133 bio_for_each_segment(bv, chain, i) {
1134 if (pos + bv->bv_len > start_ofs) {
1135 int remainder = max(start_ofs - pos, 0);
1136 buf = bvec_kmap_irq(bv, &flags);
1137 memset(buf + remainder, 0,
1138 bv->bv_len - remainder);
e2156054 1139 flush_dcache_page(bv->bv_page);
85b5aaa6 1140 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1141 }
1142 pos += bv->bv_len;
1143 }
1144
1145 chain = chain->bi_next;
1146 }
1147}
1148
b9434c5b
AE
1149/*
1150 * similar to zero_bio_chain(), zeros data defined by a page array,
1151 * starting at the given byte offset from the start of the array and
1152 * continuing up to the given end offset. The pages array is
1153 * assumed to be big enough to hold all bytes up to the end.
1154 */
1155static void zero_pages(struct page **pages, u64 offset, u64 end)
1156{
1157 struct page **page = &pages[offset >> PAGE_SHIFT];
1158
1159 rbd_assert(end > offset);
1160 rbd_assert(end - offset <= (u64)SIZE_MAX);
1161 while (offset < end) {
1162 size_t page_offset;
1163 size_t length;
1164 unsigned long flags;
1165 void *kaddr;
1166
491205a8
GU
1167 page_offset = offset & ~PAGE_MASK;
1168 length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
b9434c5b
AE
1169 local_irq_save(flags);
1170 kaddr = kmap_atomic(*page);
1171 memset(kaddr + page_offset, 0, length);
e2156054 1172 flush_dcache_page(*page);
b9434c5b
AE
1173 kunmap_atomic(kaddr);
1174 local_irq_restore(flags);
1175
1176 offset += length;
1177 page++;
1178 }
1179}
1180
602adf40 1181/*
f7760dad
AE
1182 * Clone a portion of a bio, starting at the given byte offset
1183 * and continuing for the number of bytes indicated.
602adf40 1184 */
f7760dad
AE
1185static struct bio *bio_clone_range(struct bio *bio_src,
1186 unsigned int offset,
1187 unsigned int len,
1188 gfp_t gfpmask)
602adf40 1189{
f7760dad
AE
1190 struct bio_vec *bv;
1191 unsigned int resid;
1192 unsigned short idx;
1193 unsigned int voff;
1194 unsigned short end_idx;
1195 unsigned short vcnt;
1196 struct bio *bio;
1197
1198 /* Handle the easy case for the caller */
1199
1200 if (!offset && len == bio_src->bi_size)
1201 return bio_clone(bio_src, gfpmask);
1202
1203 if (WARN_ON_ONCE(!len))
1204 return NULL;
1205 if (WARN_ON_ONCE(len > bio_src->bi_size))
1206 return NULL;
1207 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1208 return NULL;
1209
1210 /* Find first affected segment... */
1211
1212 resid = offset;
d74c6d51 1213 bio_for_each_segment(bv, bio_src, idx) {
f7760dad
AE
1214 if (resid < bv->bv_len)
1215 break;
1216 resid -= bv->bv_len;
602adf40 1217 }
f7760dad 1218 voff = resid;
602adf40 1219
f7760dad 1220 /* ...and the last affected segment */
602adf40 1221
f7760dad
AE
1222 resid += len;
1223 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1224 if (resid <= bv->bv_len)
1225 break;
1226 resid -= bv->bv_len;
1227 }
1228 vcnt = end_idx - idx + 1;
1229
1230 /* Build the clone */
1231
1232 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1233 if (!bio)
1234 return NULL; /* ENOMEM */
602adf40 1235
f7760dad
AE
1236 bio->bi_bdev = bio_src->bi_bdev;
1237 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1238 bio->bi_rw = bio_src->bi_rw;
1239 bio->bi_flags |= 1 << BIO_CLONED;
1240
1241 /*
1242 * Copy over our part of the bio_vec, then update the first
1243 * and last (or only) entries.
1244 */
1245 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1246 vcnt * sizeof (struct bio_vec));
1247 bio->bi_io_vec[0].bv_offset += voff;
1248 if (vcnt > 1) {
1249 bio->bi_io_vec[0].bv_len -= voff;
1250 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1251 } else {
1252 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1253 }
1254
f7760dad
AE
1255 bio->bi_vcnt = vcnt;
1256 bio->bi_size = len;
1257 bio->bi_idx = 0;
1258
1259 return bio;
1260}
1261
1262/*
1263 * Clone a portion of a bio chain, starting at the given byte offset
1264 * into the first bio in the source chain and continuing for the
1265 * number of bytes indicated. The result is another bio chain of
1266 * exactly the given length, or a null pointer on error.
1267 *
1268 * The bio_src and offset parameters are both in-out. On entry they
1269 * refer to the first source bio and the offset into that bio where
1270 * the start of data to be cloned is located.
1271 *
1272 * On return, bio_src is updated to refer to the bio in the source
1273 * chain that contains first un-cloned byte, and *offset will
1274 * contain the offset of that byte within that bio.
1275 */
1276static struct bio *bio_chain_clone_range(struct bio **bio_src,
1277 unsigned int *offset,
1278 unsigned int len,
1279 gfp_t gfpmask)
1280{
1281 struct bio *bi = *bio_src;
1282 unsigned int off = *offset;
1283 struct bio *chain = NULL;
1284 struct bio **end;
1285
1286 /* Build up a chain of clone bios up to the limit */
1287
1288 if (!bi || off >= bi->bi_size || !len)
1289 return NULL; /* Nothing to clone */
602adf40 1290
f7760dad
AE
1291 end = &chain;
1292 while (len) {
1293 unsigned int bi_size;
1294 struct bio *bio;
1295
f5400b7a
AE
1296 if (!bi) {
1297 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1298 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1299 }
f7760dad
AE
1300 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1301 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1302 if (!bio)
1303 goto out_err; /* ENOMEM */
1304
1305 *end = bio;
1306 end = &bio->bi_next;
602adf40 1307
f7760dad
AE
1308 off += bi_size;
1309 if (off == bi->bi_size) {
1310 bi = bi->bi_next;
1311 off = 0;
1312 }
1313 len -= bi_size;
1314 }
1315 *bio_src = bi;
1316 *offset = off;
1317
1318 return chain;
1319out_err:
1320 bio_chain_put(chain);
602adf40 1321
602adf40
YS
1322 return NULL;
1323}
1324
926f9b3f
AE
1325/*
1326 * The default/initial value for all object request flags is 0. For
1327 * each flag, once its value is set to 1 it is never reset to 0
1328 * again.
1329 */
57acbaa7 1330static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1331{
57acbaa7 1332 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1333 struct rbd_device *rbd_dev;
1334
57acbaa7
AE
1335 rbd_dev = obj_request->img_request->rbd_dev;
1336 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1337 obj_request);
1338 }
1339}
1340
57acbaa7 1341static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1342{
1343 smp_mb();
57acbaa7 1344 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1345}
1346
57acbaa7 1347static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1348{
57acbaa7
AE
1349 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1350 struct rbd_device *rbd_dev = NULL;
6365d33a 1351
57acbaa7
AE
1352 if (obj_request_img_data_test(obj_request))
1353 rbd_dev = obj_request->img_request->rbd_dev;
1354 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1355 obj_request);
1356 }
1357}
1358
57acbaa7 1359static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1360{
1361 smp_mb();
57acbaa7 1362 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1363}
1364
5679c59f
AE
1365/*
1366 * This sets the KNOWN flag after (possibly) setting the EXISTS
1367 * flag. The latter is set based on the "exists" value provided.
1368 *
1369 * Note that for our purposes once an object exists it never goes
1370 * away again. It's possible that the response from two existence
1371 * checks are separated by the creation of the target object, and
1372 * the first ("doesn't exist") response arrives *after* the second
1373 * ("does exist"). In that case we ignore the second one.
1374 */
1375static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1376 bool exists)
1377{
1378 if (exists)
1379 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1380 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1381 smp_mb();
1382}
1383
1384static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1385{
1386 smp_mb();
1387 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1388}
1389
1390static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1391{
1392 smp_mb();
1393 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1394}
1395
bf0d5f50
AE
1396static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1397{
37206ee5
AE
1398 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1399 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1400 kref_get(&obj_request->kref);
1401}
1402
1403static void rbd_obj_request_destroy(struct kref *kref);
1404static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1405{
1406 rbd_assert(obj_request != NULL);
37206ee5
AE
1407 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1408 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1409 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1410}
1411
e93f3152
AE
1412static bool img_request_child_test(struct rbd_img_request *img_request);
1413static void rbd_parent_request_destroy(struct kref *kref);
bf0d5f50
AE
1414static void rbd_img_request_destroy(struct kref *kref);
1415static void rbd_img_request_put(struct rbd_img_request *img_request)
1416{
1417 rbd_assert(img_request != NULL);
37206ee5
AE
1418 dout("%s: img %p (was %d)\n", __func__, img_request,
1419 atomic_read(&img_request->kref.refcount));
e93f3152
AE
1420 if (img_request_child_test(img_request))
1421 kref_put(&img_request->kref, rbd_parent_request_destroy);
1422 else
1423 kref_put(&img_request->kref, rbd_img_request_destroy);
bf0d5f50
AE
1424}
1425
1426static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1427 struct rbd_obj_request *obj_request)
1428{
25dcf954
AE
1429 rbd_assert(obj_request->img_request == NULL);
1430
b155e86c 1431 /* Image request now owns object's original reference */
bf0d5f50 1432 obj_request->img_request = img_request;
25dcf954 1433 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1434 rbd_assert(!obj_request_img_data_test(obj_request));
1435 obj_request_img_data_set(obj_request);
bf0d5f50 1436 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1437 img_request->obj_request_count++;
1438 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1439 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1440 obj_request->which);
bf0d5f50
AE
1441}
1442
1443static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1444 struct rbd_obj_request *obj_request)
1445{
1446 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1447
37206ee5
AE
1448 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1449 obj_request->which);
bf0d5f50 1450 list_del(&obj_request->links);
25dcf954
AE
1451 rbd_assert(img_request->obj_request_count > 0);
1452 img_request->obj_request_count--;
1453 rbd_assert(obj_request->which == img_request->obj_request_count);
1454 obj_request->which = BAD_WHICH;
6365d33a 1455 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1456 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1457 obj_request->img_request = NULL;
25dcf954 1458 obj_request->callback = NULL;
bf0d5f50
AE
1459 rbd_obj_request_put(obj_request);
1460}
1461
1462static bool obj_request_type_valid(enum obj_request_type type)
1463{
1464 switch (type) {
9969ebc5 1465 case OBJ_REQUEST_NODATA:
bf0d5f50 1466 case OBJ_REQUEST_BIO:
788e2df3 1467 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1468 return true;
1469 default:
1470 return false;
1471 }
1472}
1473
bf0d5f50
AE
1474static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1475 struct rbd_obj_request *obj_request)
1476{
37206ee5
AE
1477 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1478
bf0d5f50
AE
1479 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1480}
1481
1482static void rbd_img_request_complete(struct rbd_img_request *img_request)
1483{
55f27e09 1484
37206ee5 1485 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1486
1487 /*
1488 * If no error occurred, compute the aggregate transfer
1489 * count for the image request. We could instead use
1490 * atomic64_cmpxchg() to update it as each object request
1491 * completes; not clear which way is better off hand.
1492 */
1493 if (!img_request->result) {
1494 struct rbd_obj_request *obj_request;
1495 u64 xferred = 0;
1496
1497 for_each_obj_request(img_request, obj_request)
1498 xferred += obj_request->xferred;
1499 img_request->xferred = xferred;
1500 }
1501
bf0d5f50
AE
1502 if (img_request->callback)
1503 img_request->callback(img_request);
1504 else
1505 rbd_img_request_put(img_request);
1506}
1507
788e2df3
AE
1508/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1509
1510static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1511{
37206ee5
AE
1512 dout("%s: obj %p\n", __func__, obj_request);
1513
788e2df3
AE
1514 return wait_for_completion_interruptible(&obj_request->completion);
1515}
1516
0c425248
AE
1517/*
1518 * The default/initial value for all image request flags is 0. Each
1519 * is conditionally set to 1 at image request initialization time
1520 * and currently never change thereafter.
1521 */
1522static void img_request_write_set(struct rbd_img_request *img_request)
1523{
1524 set_bit(IMG_REQ_WRITE, &img_request->flags);
1525 smp_mb();
1526}
1527
1528static bool img_request_write_test(struct rbd_img_request *img_request)
1529{
1530 smp_mb();
1531 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1532}
1533
9849e986
AE
1534static void img_request_child_set(struct rbd_img_request *img_request)
1535{
1536 set_bit(IMG_REQ_CHILD, &img_request->flags);
1537 smp_mb();
1538}
1539
e93f3152
AE
1540static void img_request_child_clear(struct rbd_img_request *img_request)
1541{
1542 clear_bit(IMG_REQ_CHILD, &img_request->flags);
1543 smp_mb();
1544}
1545
9849e986
AE
1546static bool img_request_child_test(struct rbd_img_request *img_request)
1547{
1548 smp_mb();
1549 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1550}
1551
d0b2e944
AE
1552static void img_request_layered_set(struct rbd_img_request *img_request)
1553{
1554 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1555 smp_mb();
1556}
1557
a2acd00e
AE
1558static void img_request_layered_clear(struct rbd_img_request *img_request)
1559{
1560 clear_bit(IMG_REQ_LAYERED, &img_request->flags);
1561 smp_mb();
1562}
1563
d0b2e944
AE
1564static bool img_request_layered_test(struct rbd_img_request *img_request)
1565{
1566 smp_mb();
1567 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1568}
1569
6e2a4505
AE
1570static void
1571rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1572{
b9434c5b
AE
1573 u64 xferred = obj_request->xferred;
1574 u64 length = obj_request->length;
1575
6e2a4505
AE
1576 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1577 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1578 xferred, length);
6e2a4505 1579 /*
17c1cc1d
JD
1580 * ENOENT means a hole in the image. We zero-fill the entire
1581 * length of the request. A short read also implies zero-fill
1582 * to the end of the request. An error requires the whole
1583 * length of the request to be reported finished with an error
1584 * to the block layer. In each case we update the xferred
1585 * count to indicate the whole request was satisfied.
6e2a4505 1586 */
b9434c5b 1587 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1588 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1589 if (obj_request->type == OBJ_REQUEST_BIO)
1590 zero_bio_chain(obj_request->bio_list, 0);
1591 else
1592 zero_pages(obj_request->pages, 0, length);
6e2a4505 1593 obj_request->result = 0;
b9434c5b
AE
1594 } else if (xferred < length && !obj_request->result) {
1595 if (obj_request->type == OBJ_REQUEST_BIO)
1596 zero_bio_chain(obj_request->bio_list, xferred);
1597 else
1598 zero_pages(obj_request->pages, xferred, length);
6e2a4505 1599 }
17c1cc1d 1600 obj_request->xferred = length;
6e2a4505
AE
1601 obj_request_done_set(obj_request);
1602}
1603
bf0d5f50
AE
1604static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1605{
37206ee5
AE
1606 dout("%s: obj %p cb %p\n", __func__, obj_request,
1607 obj_request->callback);
bf0d5f50
AE
1608 if (obj_request->callback)
1609 obj_request->callback(obj_request);
788e2df3
AE
1610 else
1611 complete_all(&obj_request->completion);
bf0d5f50
AE
1612}
1613
c47f9371 1614static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1615{
1616 dout("%s: obj %p\n", __func__, obj_request);
1617 obj_request_done_set(obj_request);
1618}
1619
c47f9371 1620static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1621{
57acbaa7 1622 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1623 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1624 bool layered = false;
1625
1626 if (obj_request_img_data_test(obj_request)) {
1627 img_request = obj_request->img_request;
1628 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1629 rbd_dev = img_request->rbd_dev;
57acbaa7 1630 }
8b3e1a56
AE
1631
1632 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1633 obj_request, img_request, obj_request->result,
1634 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1635 if (layered && obj_request->result == -ENOENT &&
1636 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1637 rbd_img_parent_read(obj_request);
1638 else if (img_request)
6e2a4505
AE
1639 rbd_img_obj_request_read_callback(obj_request);
1640 else
1641 obj_request_done_set(obj_request);
bf0d5f50
AE
1642}
1643
c47f9371 1644static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1645{
1b83bef2
SW
1646 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1647 obj_request->result, obj_request->length);
1648 /*
8b3e1a56
AE
1649 * There is no such thing as a successful short write. Set
1650 * it to our originally-requested length.
1b83bef2
SW
1651 */
1652 obj_request->xferred = obj_request->length;
07741308 1653 obj_request_done_set(obj_request);
bf0d5f50
AE
1654}
1655
fbfab539
AE
1656/*
1657 * For a simple stat call there's nothing to do. We'll do more if
1658 * this is part of a write sequence for a layered image.
1659 */
c47f9371 1660static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1661{
37206ee5 1662 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1663 obj_request_done_set(obj_request);
1664}
1665
bf0d5f50
AE
1666static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1667 struct ceph_msg *msg)
1668{
1669 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1670 u16 opcode;
1671
37206ee5 1672 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1673 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1674 if (obj_request_img_data_test(obj_request)) {
1675 rbd_assert(obj_request->img_request);
1676 rbd_assert(obj_request->which != BAD_WHICH);
1677 } else {
1678 rbd_assert(obj_request->which == BAD_WHICH);
1679 }
bf0d5f50 1680
1b83bef2
SW
1681 if (osd_req->r_result < 0)
1682 obj_request->result = osd_req->r_result;
bf0d5f50 1683
0eefd470 1684 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1685
c47f9371
AE
1686 /*
1687 * We support a 64-bit length, but ultimately it has to be
1688 * passed to blk_end_request(), which takes an unsigned int.
1689 */
1b83bef2 1690 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1691 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1692 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1693 switch (opcode) {
1694 case CEPH_OSD_OP_READ:
c47f9371 1695 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1696 break;
1697 case CEPH_OSD_OP_WRITE:
c47f9371 1698 rbd_osd_write_callback(obj_request);
bf0d5f50 1699 break;
fbfab539 1700 case CEPH_OSD_OP_STAT:
c47f9371 1701 rbd_osd_stat_callback(obj_request);
fbfab539 1702 break;
36be9a76 1703 case CEPH_OSD_OP_CALL:
b8d70035 1704 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1705 case CEPH_OSD_OP_WATCH:
c47f9371 1706 rbd_osd_trivial_callback(obj_request);
9969ebc5 1707 break;
bf0d5f50
AE
1708 default:
1709 rbd_warn(NULL, "%s: unsupported op %hu\n",
1710 obj_request->object_name, (unsigned short) opcode);
1711 break;
1712 }
1713
07741308 1714 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1715 rbd_obj_request_complete(obj_request);
1716}
1717
9d4df01f 1718static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1719{
1720 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1721 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1722 u64 snap_id;
430c28c3 1723
8c042b0d 1724 rbd_assert(osd_req != NULL);
430c28c3 1725
9d4df01f 1726 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1727 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1728 NULL, snap_id, NULL);
1729}
1730
1731static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1732{
1733 struct rbd_img_request *img_request = obj_request->img_request;
1734 struct ceph_osd_request *osd_req = obj_request->osd_req;
1735 struct ceph_snap_context *snapc;
1736 struct timespec mtime = CURRENT_TIME;
1737
1738 rbd_assert(osd_req != NULL);
1739
1740 snapc = img_request ? img_request->snapc : NULL;
1741 ceph_osdc_build_request(osd_req, obj_request->offset,
1742 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1743}
1744
bf0d5f50
AE
1745static struct ceph_osd_request *rbd_osd_req_create(
1746 struct rbd_device *rbd_dev,
1747 bool write_request,
430c28c3 1748 struct rbd_obj_request *obj_request)
bf0d5f50 1749{
bf0d5f50
AE
1750 struct ceph_snap_context *snapc = NULL;
1751 struct ceph_osd_client *osdc;
1752 struct ceph_osd_request *osd_req;
bf0d5f50 1753
6365d33a
AE
1754 if (obj_request_img_data_test(obj_request)) {
1755 struct rbd_img_request *img_request = obj_request->img_request;
1756
0c425248
AE
1757 rbd_assert(write_request ==
1758 img_request_write_test(img_request));
1759 if (write_request)
bf0d5f50 1760 snapc = img_request->snapc;
bf0d5f50
AE
1761 }
1762
1763 /* Allocate and initialize the request, for the single op */
1764
1765 osdc = &rbd_dev->rbd_client->client->osdc;
1766 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1767 if (!osd_req)
1768 return NULL; /* ENOMEM */
bf0d5f50 1769
430c28c3 1770 if (write_request)
bf0d5f50 1771 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1772 else
bf0d5f50 1773 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1774
1775 osd_req->r_callback = rbd_osd_req_callback;
1776 osd_req->r_priv = obj_request;
1777
1778 osd_req->r_oid_len = strlen(obj_request->object_name);
1779 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1780 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1781
1782 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1783
bf0d5f50
AE
1784 return osd_req;
1785}
1786
0eefd470
AE
1787/*
1788 * Create a copyup osd request based on the information in the
1789 * object request supplied. A copyup request has two osd ops,
1790 * a copyup method call, and a "normal" write request.
1791 */
1792static struct ceph_osd_request *
1793rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1794{
1795 struct rbd_img_request *img_request;
1796 struct ceph_snap_context *snapc;
1797 struct rbd_device *rbd_dev;
1798 struct ceph_osd_client *osdc;
1799 struct ceph_osd_request *osd_req;
1800
1801 rbd_assert(obj_request_img_data_test(obj_request));
1802 img_request = obj_request->img_request;
1803 rbd_assert(img_request);
1804 rbd_assert(img_request_write_test(img_request));
1805
1806 /* Allocate and initialize the request, for the two ops */
1807
1808 snapc = img_request->snapc;
1809 rbd_dev = img_request->rbd_dev;
1810 osdc = &rbd_dev->rbd_client->client->osdc;
1811 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1812 if (!osd_req)
1813 return NULL; /* ENOMEM */
1814
1815 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1816 osd_req->r_callback = rbd_osd_req_callback;
1817 osd_req->r_priv = obj_request;
1818
1819 osd_req->r_oid_len = strlen(obj_request->object_name);
1820 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1821 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1822
1823 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1824
1825 return osd_req;
1826}
1827
1828
bf0d5f50
AE
1829static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1830{
1831 ceph_osdc_put_request(osd_req);
1832}
1833
1834/* object_name is assumed to be a non-null pointer and NUL-terminated */
1835
1836static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1837 u64 offset, u64 length,
1838 enum obj_request_type type)
1839{
1840 struct rbd_obj_request *obj_request;
1841 size_t size;
1842 char *name;
1843
1844 rbd_assert(obj_request_type_valid(type));
1845
1846 size = strlen(object_name) + 1;
f907ad55
AE
1847 name = kmalloc(size, GFP_KERNEL);
1848 if (!name)
bf0d5f50
AE
1849 return NULL;
1850
868311b1 1851 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1852 if (!obj_request) {
1853 kfree(name);
1854 return NULL;
1855 }
1856
bf0d5f50
AE
1857 obj_request->object_name = memcpy(name, object_name, size);
1858 obj_request->offset = offset;
1859 obj_request->length = length;
926f9b3f 1860 obj_request->flags = 0;
bf0d5f50
AE
1861 obj_request->which = BAD_WHICH;
1862 obj_request->type = type;
1863 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1864 init_completion(&obj_request->completion);
bf0d5f50
AE
1865 kref_init(&obj_request->kref);
1866
37206ee5
AE
1867 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1868 offset, length, (int)type, obj_request);
1869
bf0d5f50
AE
1870 return obj_request;
1871}
1872
1873static void rbd_obj_request_destroy(struct kref *kref)
1874{
1875 struct rbd_obj_request *obj_request;
1876
1877 obj_request = container_of(kref, struct rbd_obj_request, kref);
1878
37206ee5
AE
1879 dout("%s: obj %p\n", __func__, obj_request);
1880
bf0d5f50
AE
1881 rbd_assert(obj_request->img_request == NULL);
1882 rbd_assert(obj_request->which == BAD_WHICH);
1883
1884 if (obj_request->osd_req)
1885 rbd_osd_req_destroy(obj_request->osd_req);
1886
1887 rbd_assert(obj_request_type_valid(obj_request->type));
1888 switch (obj_request->type) {
9969ebc5
AE
1889 case OBJ_REQUEST_NODATA:
1890 break; /* Nothing to do */
bf0d5f50
AE
1891 case OBJ_REQUEST_BIO:
1892 if (obj_request->bio_list)
1893 bio_chain_put(obj_request->bio_list);
1894 break;
788e2df3
AE
1895 case OBJ_REQUEST_PAGES:
1896 if (obj_request->pages)
1897 ceph_release_page_vector(obj_request->pages,
1898 obj_request->page_count);
1899 break;
bf0d5f50
AE
1900 }
1901
f907ad55 1902 kfree(obj_request->object_name);
868311b1
AE
1903 obj_request->object_name = NULL;
1904 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1905}
1906
fb65d228
AE
1907/* It's OK to call this for a device with no parent */
1908
1909static void rbd_spec_put(struct rbd_spec *spec);
1910static void rbd_dev_unparent(struct rbd_device *rbd_dev)
1911{
1912 rbd_dev_remove_parent(rbd_dev);
1913 rbd_spec_put(rbd_dev->parent_spec);
1914 rbd_dev->parent_spec = NULL;
1915 rbd_dev->parent_overlap = 0;
1916}
1917
a2acd00e
AE
1918/*
1919 * Parent image reference counting is used to determine when an
1920 * image's parent fields can be safely torn down--after there are no
1921 * more in-flight requests to the parent image. When the last
1922 * reference is dropped, cleaning them up is safe.
1923 */
1924static void rbd_dev_parent_put(struct rbd_device *rbd_dev)
1925{
1926 int counter;
1927
1928 if (!rbd_dev->parent_spec)
1929 return;
1930
1931 counter = atomic_dec_return_safe(&rbd_dev->parent_ref);
1932 if (counter > 0)
1933 return;
1934
1935 /* Last reference; clean up parent data structures */
1936
1937 if (!counter)
1938 rbd_dev_unparent(rbd_dev);
1939 else
1940 rbd_warn(rbd_dev, "parent reference underflow\n");
1941}
1942
1943/*
1944 * If an image has a non-zero parent overlap, get a reference to its
1945 * parent.
1946 *
392a9dad
AE
1947 * We must get the reference before checking for the overlap to
1948 * coordinate properly with zeroing the parent overlap in
1949 * rbd_dev_v2_parent_info() when an image gets flattened. We
1950 * drop it again if there is no overlap.
1951 *
a2acd00e
AE
1952 * Returns true if the rbd device has a parent with a non-zero
1953 * overlap and a reference for it was successfully taken, or
1954 * false otherwise.
1955 */
1956static bool rbd_dev_parent_get(struct rbd_device *rbd_dev)
1957{
1958 int counter;
1959
1960 if (!rbd_dev->parent_spec)
1961 return false;
1962
1963 counter = atomic_inc_return_safe(&rbd_dev->parent_ref);
1964 if (counter > 0 && rbd_dev->parent_overlap)
1965 return true;
1966
1967 /* Image was flattened, but parent is not yet torn down */
1968
1969 if (counter < 0)
1970 rbd_warn(rbd_dev, "parent reference overflow\n");
1971
1972 return false;
1973}
1974
bf0d5f50
AE
1975/*
1976 * Caller is responsible for filling in the list of object requests
1977 * that comprises the image request, and the Linux request pointer
1978 * (if there is one).
1979 */
cc344fa1
AE
1980static struct rbd_img_request *rbd_img_request_create(
1981 struct rbd_device *rbd_dev,
bf0d5f50 1982 u64 offset, u64 length,
e93f3152 1983 bool write_request)
bf0d5f50
AE
1984{
1985 struct rbd_img_request *img_request;
bf0d5f50 1986
1c2a9dfe 1987 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
1988 if (!img_request)
1989 return NULL;
1990
1991 if (write_request) {
1992 down_read(&rbd_dev->header_rwsem);
812164f8 1993 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1994 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1995 }
1996
1997 img_request->rq = NULL;
1998 img_request->rbd_dev = rbd_dev;
1999 img_request->offset = offset;
2000 img_request->length = length;
0c425248
AE
2001 img_request->flags = 0;
2002 if (write_request) {
2003 img_request_write_set(img_request);
468521c1 2004 img_request->snapc = rbd_dev->header.snapc;
0c425248 2005 } else {
bf0d5f50 2006 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 2007 }
a2acd00e 2008 if (rbd_dev_parent_get(rbd_dev))
d0b2e944 2009 img_request_layered_set(img_request);
bf0d5f50
AE
2010 spin_lock_init(&img_request->completion_lock);
2011 img_request->next_completion = 0;
2012 img_request->callback = NULL;
a5a337d4 2013 img_request->result = 0;
bf0d5f50
AE
2014 img_request->obj_request_count = 0;
2015 INIT_LIST_HEAD(&img_request->obj_requests);
2016 kref_init(&img_request->kref);
2017
37206ee5
AE
2018 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
2019 write_request ? "write" : "read", offset, length,
2020 img_request);
2021
bf0d5f50
AE
2022 return img_request;
2023}
2024
2025static void rbd_img_request_destroy(struct kref *kref)
2026{
2027 struct rbd_img_request *img_request;
2028 struct rbd_obj_request *obj_request;
2029 struct rbd_obj_request *next_obj_request;
2030
2031 img_request = container_of(kref, struct rbd_img_request, kref);
2032
37206ee5
AE
2033 dout("%s: img %p\n", __func__, img_request);
2034
bf0d5f50
AE
2035 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2036 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 2037 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 2038
a2acd00e
AE
2039 if (img_request_layered_test(img_request)) {
2040 img_request_layered_clear(img_request);
2041 rbd_dev_parent_put(img_request->rbd_dev);
2042 }
2043
0c425248 2044 if (img_request_write_test(img_request))
812164f8 2045 ceph_put_snap_context(img_request->snapc);
bf0d5f50 2046
1c2a9dfe 2047 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
2048}
2049
e93f3152
AE
2050static struct rbd_img_request *rbd_parent_request_create(
2051 struct rbd_obj_request *obj_request,
2052 u64 img_offset, u64 length)
2053{
2054 struct rbd_img_request *parent_request;
2055 struct rbd_device *rbd_dev;
2056
2057 rbd_assert(obj_request->img_request);
2058 rbd_dev = obj_request->img_request->rbd_dev;
2059
2060 parent_request = rbd_img_request_create(rbd_dev->parent,
2061 img_offset, length, false);
2062 if (!parent_request)
2063 return NULL;
2064
2065 img_request_child_set(parent_request);
2066 rbd_obj_request_get(obj_request);
2067 parent_request->obj_request = obj_request;
2068
2069 return parent_request;
2070}
2071
2072static void rbd_parent_request_destroy(struct kref *kref)
2073{
2074 struct rbd_img_request *parent_request;
2075 struct rbd_obj_request *orig_request;
2076
2077 parent_request = container_of(kref, struct rbd_img_request, kref);
2078 orig_request = parent_request->obj_request;
2079
2080 parent_request->obj_request = NULL;
2081 rbd_obj_request_put(orig_request);
2082 img_request_child_clear(parent_request);
2083
2084 rbd_img_request_destroy(kref);
2085}
2086
1217857f
AE
2087static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
2088{
6365d33a 2089 struct rbd_img_request *img_request;
1217857f
AE
2090 unsigned int xferred;
2091 int result;
8b3e1a56 2092 bool more;
1217857f 2093
6365d33a
AE
2094 rbd_assert(obj_request_img_data_test(obj_request));
2095 img_request = obj_request->img_request;
2096
1217857f
AE
2097 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
2098 xferred = (unsigned int)obj_request->xferred;
2099 result = obj_request->result;
2100 if (result) {
2101 struct rbd_device *rbd_dev = img_request->rbd_dev;
2102
2103 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
2104 img_request_write_test(img_request) ? "write" : "read",
2105 obj_request->length, obj_request->img_offset,
2106 obj_request->offset);
2107 rbd_warn(rbd_dev, " result %d xferred %x\n",
2108 result, xferred);
2109 if (!img_request->result)
2110 img_request->result = result;
2111 }
2112
f1a4739f
AE
2113 /* Image object requests don't own their page array */
2114
2115 if (obj_request->type == OBJ_REQUEST_PAGES) {
2116 obj_request->pages = NULL;
2117 obj_request->page_count = 0;
2118 }
2119
8b3e1a56
AE
2120 if (img_request_child_test(img_request)) {
2121 rbd_assert(img_request->obj_request != NULL);
2122 more = obj_request->which < img_request->obj_request_count - 1;
2123 } else {
2124 rbd_assert(img_request->rq != NULL);
2125 more = blk_end_request(img_request->rq, result, xferred);
2126 }
2127
2128 return more;
1217857f
AE
2129}
2130
2169238d
AE
2131static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
2132{
2133 struct rbd_img_request *img_request;
2134 u32 which = obj_request->which;
2135 bool more = true;
2136
6365d33a 2137 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
2138 img_request = obj_request->img_request;
2139
2140 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
2141 rbd_assert(img_request != NULL);
2169238d
AE
2142 rbd_assert(img_request->obj_request_count > 0);
2143 rbd_assert(which != BAD_WHICH);
2144 rbd_assert(which < img_request->obj_request_count);
2145 rbd_assert(which >= img_request->next_completion);
2146
2147 spin_lock_irq(&img_request->completion_lock);
2148 if (which != img_request->next_completion)
2149 goto out;
2150
2151 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
2152 rbd_assert(more);
2153 rbd_assert(which < img_request->obj_request_count);
2154
2155 if (!obj_request_done_test(obj_request))
2156 break;
1217857f 2157 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
2158 which++;
2159 }
2160
2161 rbd_assert(more ^ (which == img_request->obj_request_count));
2162 img_request->next_completion = which;
2163out:
2164 spin_unlock_irq(&img_request->completion_lock);
2165
2166 if (!more)
2167 rbd_img_request_complete(img_request);
2168}
2169
f1a4739f
AE
2170/*
2171 * Split up an image request into one or more object requests, each
2172 * to a different object. The "type" parameter indicates whether
2173 * "data_desc" is the pointer to the head of a list of bio
2174 * structures, or the base of a page array. In either case this
2175 * function assumes data_desc describes memory sufficient to hold
2176 * all data described by the image request.
2177 */
2178static int rbd_img_request_fill(struct rbd_img_request *img_request,
2179 enum obj_request_type type,
2180 void *data_desc)
bf0d5f50
AE
2181{
2182 struct rbd_device *rbd_dev = img_request->rbd_dev;
2183 struct rbd_obj_request *obj_request = NULL;
2184 struct rbd_obj_request *next_obj_request;
0c425248 2185 bool write_request = img_request_write_test(img_request);
a158073c 2186 struct bio *bio_list = NULL;
f1a4739f 2187 unsigned int bio_offset = 0;
a158073c 2188 struct page **pages = NULL;
7da22d29 2189 u64 img_offset;
bf0d5f50
AE
2190 u64 resid;
2191 u16 opcode;
2192
f1a4739f
AE
2193 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2194 (int)type, data_desc);
37206ee5 2195
430c28c3 2196 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2197 img_offset = img_request->offset;
bf0d5f50 2198 resid = img_request->length;
4dda41d3 2199 rbd_assert(resid > 0);
f1a4739f
AE
2200
2201 if (type == OBJ_REQUEST_BIO) {
2202 bio_list = data_desc;
2203 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2204 } else {
2205 rbd_assert(type == OBJ_REQUEST_PAGES);
2206 pages = data_desc;
2207 }
2208
bf0d5f50 2209 while (resid) {
2fa12320 2210 struct ceph_osd_request *osd_req;
bf0d5f50 2211 const char *object_name;
bf0d5f50
AE
2212 u64 offset;
2213 u64 length;
2214
7da22d29 2215 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2216 if (!object_name)
2217 goto out_unwind;
7da22d29
AE
2218 offset = rbd_segment_offset(rbd_dev, img_offset);
2219 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2220 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2221 offset, length, type);
78c2a44a
AE
2222 /* object request has its own copy of the object name */
2223 rbd_segment_name_free(object_name);
bf0d5f50
AE
2224 if (!obj_request)
2225 goto out_unwind;
03507db6
JD
2226 /*
2227 * set obj_request->img_request before creating the
2228 * osd_request so that it gets the right snapc
2229 */
2230 rbd_img_obj_request_add(img_request, obj_request);
bf0d5f50 2231
f1a4739f
AE
2232 if (type == OBJ_REQUEST_BIO) {
2233 unsigned int clone_size;
2234
2235 rbd_assert(length <= (u64)UINT_MAX);
2236 clone_size = (unsigned int)length;
2237 obj_request->bio_list =
2238 bio_chain_clone_range(&bio_list,
2239 &bio_offset,
2240 clone_size,
2241 GFP_ATOMIC);
2242 if (!obj_request->bio_list)
2243 goto out_partial;
2244 } else {
2245 unsigned int page_count;
2246
2247 obj_request->pages = pages;
2248 page_count = (u32)calc_pages_for(offset, length);
2249 obj_request->page_count = page_count;
2250 if ((offset + length) & ~PAGE_MASK)
2251 page_count--; /* more on last page */
2252 pages += page_count;
2253 }
bf0d5f50 2254
2fa12320
AE
2255 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2256 obj_request);
2257 if (!osd_req)
bf0d5f50 2258 goto out_partial;
2fa12320 2259 obj_request->osd_req = osd_req;
2169238d 2260 obj_request->callback = rbd_img_obj_callback;
430c28c3 2261
2fa12320
AE
2262 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2263 0, 0);
f1a4739f
AE
2264 if (type == OBJ_REQUEST_BIO)
2265 osd_req_op_extent_osd_data_bio(osd_req, 0,
2266 obj_request->bio_list, length);
2267 else
2268 osd_req_op_extent_osd_data_pages(osd_req, 0,
2269 obj_request->pages, length,
2270 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
2271
2272 if (write_request)
2273 rbd_osd_req_format_write(obj_request);
2274 else
2275 rbd_osd_req_format_read(obj_request);
430c28c3 2276
7da22d29 2277 obj_request->img_offset = img_offset;
bf0d5f50 2278
7da22d29 2279 img_offset += length;
bf0d5f50
AE
2280 resid -= length;
2281 }
2282
2283 return 0;
2284
2285out_partial:
2286 rbd_obj_request_put(obj_request);
2287out_unwind:
2288 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2289 rbd_obj_request_put(obj_request);
2290
2291 return -ENOMEM;
2292}
2293
0eefd470
AE
2294static void
2295rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2296{
2297 struct rbd_img_request *img_request;
2298 struct rbd_device *rbd_dev;
ebda6408 2299 struct page **pages;
0eefd470
AE
2300 u32 page_count;
2301
2302 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2303 rbd_assert(obj_request_img_data_test(obj_request));
2304 img_request = obj_request->img_request;
2305 rbd_assert(img_request);
2306
2307 rbd_dev = img_request->rbd_dev;
2308 rbd_assert(rbd_dev);
0eefd470 2309
ebda6408
AE
2310 pages = obj_request->copyup_pages;
2311 rbd_assert(pages != NULL);
0eefd470 2312 obj_request->copyup_pages = NULL;
ebda6408
AE
2313 page_count = obj_request->copyup_page_count;
2314 rbd_assert(page_count);
2315 obj_request->copyup_page_count = 0;
2316 ceph_release_page_vector(pages, page_count);
0eefd470
AE
2317
2318 /*
2319 * We want the transfer count to reflect the size of the
2320 * original write request. There is no such thing as a
2321 * successful short write, so if the request was successful
2322 * we can just set it to the originally-requested length.
2323 */
2324 if (!obj_request->result)
2325 obj_request->xferred = obj_request->length;
2326
2327 /* Finish up with the normal image object callback */
2328
2329 rbd_img_obj_callback(obj_request);
2330}
2331
3d7efd18
AE
2332static void
2333rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2334{
2335 struct rbd_obj_request *orig_request;
0eefd470
AE
2336 struct ceph_osd_request *osd_req;
2337 struct ceph_osd_client *osdc;
2338 struct rbd_device *rbd_dev;
3d7efd18 2339 struct page **pages;
ebda6408 2340 u32 page_count;
bbea1c1a 2341 int img_result;
ebda6408 2342 u64 parent_length;
b91f09f1
AE
2343 u64 offset;
2344 u64 length;
3d7efd18
AE
2345
2346 rbd_assert(img_request_child_test(img_request));
2347
2348 /* First get what we need from the image request */
2349
2350 pages = img_request->copyup_pages;
2351 rbd_assert(pages != NULL);
2352 img_request->copyup_pages = NULL;
ebda6408
AE
2353 page_count = img_request->copyup_page_count;
2354 rbd_assert(page_count);
2355 img_request->copyup_page_count = 0;
3d7efd18
AE
2356
2357 orig_request = img_request->obj_request;
2358 rbd_assert(orig_request != NULL);
b91f09f1 2359 rbd_assert(obj_request_type_valid(orig_request->type));
bbea1c1a 2360 img_result = img_request->result;
ebda6408
AE
2361 parent_length = img_request->length;
2362 rbd_assert(parent_length == img_request->xferred);
91c6febb 2363 rbd_img_request_put(img_request);
3d7efd18 2364
91c6febb
AE
2365 rbd_assert(orig_request->img_request);
2366 rbd_dev = orig_request->img_request->rbd_dev;
0eefd470 2367 rbd_assert(rbd_dev);
0eefd470 2368
bbea1c1a
AE
2369 /*
2370 * If the overlap has become 0 (most likely because the
2371 * image has been flattened) we need to free the pages
2372 * and re-submit the original write request.
2373 */
2374 if (!rbd_dev->parent_overlap) {
2375 struct ceph_osd_client *osdc;
3d7efd18 2376
bbea1c1a
AE
2377 ceph_release_page_vector(pages, page_count);
2378 osdc = &rbd_dev->rbd_client->client->osdc;
2379 img_result = rbd_obj_request_submit(osdc, orig_request);
2380 if (!img_result)
2381 return;
2382 }
0eefd470 2383
bbea1c1a 2384 if (img_result)
0eefd470 2385 goto out_err;
0eefd470 2386
8785b1d4
AE
2387 /*
2388 * The original osd request is of no use to use any more.
2389 * We need a new one that can hold the two ops in a copyup
2390 * request. Allocate the new copyup osd request for the
2391 * original request, and release the old one.
2392 */
bbea1c1a 2393 img_result = -ENOMEM;
0eefd470
AE
2394 osd_req = rbd_osd_req_create_copyup(orig_request);
2395 if (!osd_req)
2396 goto out_err;
8785b1d4 2397 rbd_osd_req_destroy(orig_request->osd_req);
0eefd470
AE
2398 orig_request->osd_req = osd_req;
2399 orig_request->copyup_pages = pages;
ebda6408 2400 orig_request->copyup_page_count = page_count;
3d7efd18 2401
0eefd470 2402 /* Initialize the copyup op */
3d7efd18 2403
0eefd470 2404 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
ebda6408 2405 osd_req_op_cls_request_data_pages(osd_req, 0, pages, parent_length, 0,
0eefd470 2406 false, false);
3d7efd18 2407
0eefd470
AE
2408 /* Then the original write request op */
2409
b91f09f1
AE
2410 offset = orig_request->offset;
2411 length = orig_request->length;
0eefd470 2412 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
b91f09f1
AE
2413 offset, length, 0, 0);
2414 if (orig_request->type == OBJ_REQUEST_BIO)
2415 osd_req_op_extent_osd_data_bio(osd_req, 1,
2416 orig_request->bio_list, length);
2417 else
2418 osd_req_op_extent_osd_data_pages(osd_req, 1,
2419 orig_request->pages, length,
2420 offset & ~PAGE_MASK, false, false);
0eefd470
AE
2421
2422 rbd_osd_req_format_write(orig_request);
2423
2424 /* All set, send it off. */
2425
2426 orig_request->callback = rbd_img_obj_copyup_callback;
2427 osdc = &rbd_dev->rbd_client->client->osdc;
bbea1c1a
AE
2428 img_result = rbd_obj_request_submit(osdc, orig_request);
2429 if (!img_result)
0eefd470
AE
2430 return;
2431out_err:
2432 /* Record the error code and complete the request */
2433
bbea1c1a 2434 orig_request->result = img_result;
0eefd470
AE
2435 orig_request->xferred = 0;
2436 obj_request_done_set(orig_request);
2437 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2438}
2439
2440/*
2441 * Read from the parent image the range of data that covers the
2442 * entire target of the given object request. This is used for
2443 * satisfying a layered image write request when the target of an
2444 * object request from the image request does not exist.
2445 *
2446 * A page array big enough to hold the returned data is allocated
2447 * and supplied to rbd_img_request_fill() as the "data descriptor."
2448 * When the read completes, this page array will be transferred to
2449 * the original object request for the copyup operation.
2450 *
2451 * If an error occurs, record it as the result of the original
2452 * object request and mark it done so it gets completed.
2453 */
2454static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2455{
2456 struct rbd_img_request *img_request = NULL;
2457 struct rbd_img_request *parent_request = NULL;
2458 struct rbd_device *rbd_dev;
2459 u64 img_offset;
2460 u64 length;
2461 struct page **pages = NULL;
2462 u32 page_count;
2463 int result;
2464
2465 rbd_assert(obj_request_img_data_test(obj_request));
b91f09f1 2466 rbd_assert(obj_request_type_valid(obj_request->type));
3d7efd18
AE
2467
2468 img_request = obj_request->img_request;
2469 rbd_assert(img_request != NULL);
2470 rbd_dev = img_request->rbd_dev;
2471 rbd_assert(rbd_dev->parent != NULL);
2472
2473 /*
2474 * Determine the byte range covered by the object in the
2475 * child image to which the original request was to be sent.
2476 */
2477 img_offset = obj_request->img_offset - obj_request->offset;
2478 length = (u64)1 << rbd_dev->header.obj_order;
2479
a9e8ba2c
AE
2480 /*
2481 * There is no defined parent data beyond the parent
2482 * overlap, so limit what we read at that boundary if
2483 * necessary.
2484 */
2485 if (img_offset + length > rbd_dev->parent_overlap) {
2486 rbd_assert(img_offset < rbd_dev->parent_overlap);
2487 length = rbd_dev->parent_overlap - img_offset;
2488 }
2489
3d7efd18
AE
2490 /*
2491 * Allocate a page array big enough to receive the data read
2492 * from the parent.
2493 */
2494 page_count = (u32)calc_pages_for(0, length);
2495 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2496 if (IS_ERR(pages)) {
2497 result = PTR_ERR(pages);
2498 pages = NULL;
2499 goto out_err;
2500 }
2501
2502 result = -ENOMEM;
e93f3152
AE
2503 parent_request = rbd_parent_request_create(obj_request,
2504 img_offset, length);
3d7efd18
AE
2505 if (!parent_request)
2506 goto out_err;
3d7efd18
AE
2507
2508 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2509 if (result)
2510 goto out_err;
2511 parent_request->copyup_pages = pages;
ebda6408 2512 parent_request->copyup_page_count = page_count;
3d7efd18
AE
2513
2514 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2515 result = rbd_img_request_submit(parent_request);
2516 if (!result)
2517 return 0;
2518
2519 parent_request->copyup_pages = NULL;
ebda6408 2520 parent_request->copyup_page_count = 0;
3d7efd18
AE
2521 parent_request->obj_request = NULL;
2522 rbd_obj_request_put(obj_request);
2523out_err:
2524 if (pages)
2525 ceph_release_page_vector(pages, page_count);
2526 if (parent_request)
2527 rbd_img_request_put(parent_request);
2528 obj_request->result = result;
2529 obj_request->xferred = 0;
2530 obj_request_done_set(obj_request);
2531
2532 return result;
2533}
2534
c5b5ef6c
AE
2535static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2536{
c5b5ef6c 2537 struct rbd_obj_request *orig_request;
638f5abe 2538 struct rbd_device *rbd_dev;
c5b5ef6c
AE
2539 int result;
2540
2541 rbd_assert(!obj_request_img_data_test(obj_request));
2542
2543 /*
2544 * All we need from the object request is the original
2545 * request and the result of the STAT op. Grab those, then
2546 * we're done with the request.
2547 */
2548 orig_request = obj_request->obj_request;
2549 obj_request->obj_request = NULL;
912c317d 2550 rbd_obj_request_put(orig_request);
c5b5ef6c
AE
2551 rbd_assert(orig_request);
2552 rbd_assert(orig_request->img_request);
2553
2554 result = obj_request->result;
2555 obj_request->result = 0;
2556
2557 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2558 obj_request, orig_request, result,
2559 obj_request->xferred, obj_request->length);
2560 rbd_obj_request_put(obj_request);
2561
638f5abe
AE
2562 /*
2563 * If the overlap has become 0 (most likely because the
2564 * image has been flattened) we need to free the pages
2565 * and re-submit the original write request.
2566 */
2567 rbd_dev = orig_request->img_request->rbd_dev;
2568 if (!rbd_dev->parent_overlap) {
2569 struct ceph_osd_client *osdc;
2570
638f5abe
AE
2571 osdc = &rbd_dev->rbd_client->client->osdc;
2572 result = rbd_obj_request_submit(osdc, orig_request);
2573 if (!result)
2574 return;
2575 }
c5b5ef6c
AE
2576
2577 /*
2578 * Our only purpose here is to determine whether the object
2579 * exists, and we don't want to treat the non-existence as
2580 * an error. If something else comes back, transfer the
2581 * error to the original request and complete it now.
2582 */
2583 if (!result) {
2584 obj_request_existence_set(orig_request, true);
2585 } else if (result == -ENOENT) {
2586 obj_request_existence_set(orig_request, false);
2587 } else if (result) {
2588 orig_request->result = result;
3d7efd18 2589 goto out;
c5b5ef6c
AE
2590 }
2591
2592 /*
2593 * Resubmit the original request now that we have recorded
2594 * whether the target object exists.
2595 */
b454e36d 2596 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2597out:
c5b5ef6c
AE
2598 if (orig_request->result)
2599 rbd_obj_request_complete(orig_request);
c5b5ef6c
AE
2600}
2601
2602static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2603{
2604 struct rbd_obj_request *stat_request;
2605 struct rbd_device *rbd_dev;
2606 struct ceph_osd_client *osdc;
2607 struct page **pages = NULL;
2608 u32 page_count;
2609 size_t size;
2610 int ret;
2611
2612 /*
2613 * The response data for a STAT call consists of:
2614 * le64 length;
2615 * struct {
2616 * le32 tv_sec;
2617 * le32 tv_nsec;
2618 * } mtime;
2619 */
2620 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2621 page_count = (u32)calc_pages_for(0, size);
2622 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2623 if (IS_ERR(pages))
2624 return PTR_ERR(pages);
2625
2626 ret = -ENOMEM;
2627 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2628 OBJ_REQUEST_PAGES);
2629 if (!stat_request)
2630 goto out;
2631
2632 rbd_obj_request_get(obj_request);
2633 stat_request->obj_request = obj_request;
2634 stat_request->pages = pages;
2635 stat_request->page_count = page_count;
2636
2637 rbd_assert(obj_request->img_request);
2638 rbd_dev = obj_request->img_request->rbd_dev;
2639 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2640 stat_request);
2641 if (!stat_request->osd_req)
2642 goto out;
2643 stat_request->callback = rbd_img_obj_exists_callback;
2644
2645 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2646 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2647 false, false);
9d4df01f 2648 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2649
2650 osdc = &rbd_dev->rbd_client->client->osdc;
2651 ret = rbd_obj_request_submit(osdc, stat_request);
2652out:
2653 if (ret)
2654 rbd_obj_request_put(obj_request);
2655
2656 return ret;
2657}
2658
b454e36d
AE
2659static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2660{
2661 struct rbd_img_request *img_request;
a9e8ba2c 2662 struct rbd_device *rbd_dev;
3d7efd18 2663 bool known;
b454e36d
AE
2664
2665 rbd_assert(obj_request_img_data_test(obj_request));
2666
2667 img_request = obj_request->img_request;
2668 rbd_assert(img_request);
a9e8ba2c 2669 rbd_dev = img_request->rbd_dev;
b454e36d 2670
b454e36d 2671 /*
a9e8ba2c
AE
2672 * Only writes to layered images need special handling.
2673 * Reads and non-layered writes are simple object requests.
2674 * Layered writes that start beyond the end of the overlap
2675 * with the parent have no parent data, so they too are
2676 * simple object requests. Finally, if the target object is
2677 * known to already exist, its parent data has already been
2678 * copied, so a write to the object can also be handled as a
2679 * simple object request.
b454e36d
AE
2680 */
2681 if (!img_request_write_test(img_request) ||
2682 !img_request_layered_test(img_request) ||
a9e8ba2c 2683 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2684 ((known = obj_request_known_test(obj_request)) &&
2685 obj_request_exists_test(obj_request))) {
b454e36d
AE
2686
2687 struct rbd_device *rbd_dev;
2688 struct ceph_osd_client *osdc;
2689
2690 rbd_dev = obj_request->img_request->rbd_dev;
2691 osdc = &rbd_dev->rbd_client->client->osdc;
2692
2693 return rbd_obj_request_submit(osdc, obj_request);
2694 }
2695
2696 /*
3d7efd18
AE
2697 * It's a layered write. The target object might exist but
2698 * we may not know that yet. If we know it doesn't exist,
2699 * start by reading the data for the full target object from
2700 * the parent so we can use it for a copyup to the target.
b454e36d 2701 */
3d7efd18
AE
2702 if (known)
2703 return rbd_img_obj_parent_read_full(obj_request);
2704
2705 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2706
2707 return rbd_img_obj_exists_submit(obj_request);
2708}
2709
bf0d5f50
AE
2710static int rbd_img_request_submit(struct rbd_img_request *img_request)
2711{
bf0d5f50 2712 struct rbd_obj_request *obj_request;
46faeed4 2713 struct rbd_obj_request *next_obj_request;
bf0d5f50 2714
37206ee5 2715 dout("%s: img %p\n", __func__, img_request);
46faeed4 2716 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2717 int ret;
2718
b454e36d 2719 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2720 if (ret)
2721 return ret;
bf0d5f50
AE
2722 }
2723
2724 return 0;
2725}
8b3e1a56
AE
2726
2727static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2728{
2729 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2730 struct rbd_device *rbd_dev;
2731 u64 obj_end;
02c74fba
AE
2732 u64 img_xferred;
2733 int img_result;
8b3e1a56
AE
2734
2735 rbd_assert(img_request_child_test(img_request));
2736
02c74fba
AE
2737 /* First get what we need from the image request and release it */
2738
8b3e1a56 2739 obj_request = img_request->obj_request;
02c74fba
AE
2740 img_xferred = img_request->xferred;
2741 img_result = img_request->result;
2742 rbd_img_request_put(img_request);
2743
2744 /*
2745 * If the overlap has become 0 (most likely because the
2746 * image has been flattened) we need to re-submit the
2747 * original request.
2748 */
a9e8ba2c
AE
2749 rbd_assert(obj_request);
2750 rbd_assert(obj_request->img_request);
02c74fba
AE
2751 rbd_dev = obj_request->img_request->rbd_dev;
2752 if (!rbd_dev->parent_overlap) {
2753 struct ceph_osd_client *osdc;
2754
2755 osdc = &rbd_dev->rbd_client->client->osdc;
2756 img_result = rbd_obj_request_submit(osdc, obj_request);
2757 if (!img_result)
2758 return;
2759 }
a9e8ba2c 2760
02c74fba 2761 obj_request->result = img_result;
a9e8ba2c
AE
2762 if (obj_request->result)
2763 goto out;
2764
2765 /*
2766 * We need to zero anything beyond the parent overlap
2767 * boundary. Since rbd_img_obj_request_read_callback()
2768 * will zero anything beyond the end of a short read, an
2769 * easy way to do this is to pretend the data from the
2770 * parent came up short--ending at the overlap boundary.
2771 */
2772 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2773 obj_end = obj_request->img_offset + obj_request->length;
a9e8ba2c
AE
2774 if (obj_end > rbd_dev->parent_overlap) {
2775 u64 xferred = 0;
2776
2777 if (obj_request->img_offset < rbd_dev->parent_overlap)
2778 xferred = rbd_dev->parent_overlap -
2779 obj_request->img_offset;
8b3e1a56 2780
02c74fba 2781 obj_request->xferred = min(img_xferred, xferred);
a9e8ba2c 2782 } else {
02c74fba 2783 obj_request->xferred = img_xferred;
a9e8ba2c
AE
2784 }
2785out:
8b3e1a56
AE
2786 rbd_img_obj_request_read_callback(obj_request);
2787 rbd_obj_request_complete(obj_request);
2788}
2789
2790static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2791{
8b3e1a56
AE
2792 struct rbd_img_request *img_request;
2793 int result;
2794
2795 rbd_assert(obj_request_img_data_test(obj_request));
2796 rbd_assert(obj_request->img_request != NULL);
2797 rbd_assert(obj_request->result == (s32) -ENOENT);
5b2ab72d 2798 rbd_assert(obj_request_type_valid(obj_request->type));
8b3e1a56 2799
8b3e1a56 2800 /* rbd_read_finish(obj_request, obj_request->length); */
e93f3152 2801 img_request = rbd_parent_request_create(obj_request,
8b3e1a56 2802 obj_request->img_offset,
e93f3152 2803 obj_request->length);
8b3e1a56
AE
2804 result = -ENOMEM;
2805 if (!img_request)
2806 goto out_err;
2807
5b2ab72d
AE
2808 if (obj_request->type == OBJ_REQUEST_BIO)
2809 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2810 obj_request->bio_list);
2811 else
2812 result = rbd_img_request_fill(img_request, OBJ_REQUEST_PAGES,
2813 obj_request->pages);
8b3e1a56
AE
2814 if (result)
2815 goto out_err;
2816
2817 img_request->callback = rbd_img_parent_read_callback;
2818 result = rbd_img_request_submit(img_request);
2819 if (result)
2820 goto out_err;
2821
2822 return;
2823out_err:
2824 if (img_request)
2825 rbd_img_request_put(img_request);
2826 obj_request->result = result;
2827 obj_request->xferred = 0;
2828 obj_request_done_set(obj_request);
2829}
bf0d5f50 2830
20e0af67 2831static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2832{
2833 struct rbd_obj_request *obj_request;
2169238d 2834 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2835 int ret;
2836
2837 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2838 OBJ_REQUEST_NODATA);
2839 if (!obj_request)
2840 return -ENOMEM;
2841
2842 ret = -ENOMEM;
430c28c3 2843 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2844 if (!obj_request->osd_req)
2845 goto out;
2846
c99d2d4a 2847 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2848 notify_id, 0, 0);
9d4df01f 2849 rbd_osd_req_format_read(obj_request);
430c28c3 2850
b8d70035 2851 ret = rbd_obj_request_submit(osdc, obj_request);
cf81b60e 2852 if (ret)
20e0af67
JD
2853 goto out;
2854 ret = rbd_obj_request_wait(obj_request);
2855out:
2856 rbd_obj_request_put(obj_request);
b8d70035
AE
2857
2858 return ret;
2859}
2860
2861static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2862{
2863 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2864 int ret;
b8d70035
AE
2865
2866 if (!rbd_dev)
2867 return;
2868
37206ee5 2869 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2870 rbd_dev->header_name, (unsigned long long)notify_id,
2871 (unsigned int)opcode);
e627db08
AE
2872 ret = rbd_dev_refresh(rbd_dev);
2873 if (ret)
3b5cf2a2 2874 rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
b8d70035 2875
20e0af67 2876 rbd_obj_notify_ack_sync(rbd_dev, notify_id);
b8d70035
AE
2877}
2878
9969ebc5
AE
2879/*
2880 * Request sync osd watch/unwatch. The value of "start" determines
2881 * whether a watch request is being initiated or torn down.
2882 */
1f3ef788 2883static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, bool start)
9969ebc5
AE
2884{
2885 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2886 struct rbd_obj_request *obj_request;
9969ebc5
AE
2887 int ret;
2888
2889 rbd_assert(start ^ !!rbd_dev->watch_event);
2890 rbd_assert(start ^ !!rbd_dev->watch_request);
2891
2892 if (start) {
3c663bbd 2893 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2894 &rbd_dev->watch_event);
2895 if (ret < 0)
2896 return ret;
8eb87565 2897 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2898 }
2899
2900 ret = -ENOMEM;
2901 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2902 OBJ_REQUEST_NODATA);
2903 if (!obj_request)
2904 goto out_cancel;
2905
430c28c3
AE
2906 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2907 if (!obj_request->osd_req)
2908 goto out_cancel;
2909
8eb87565 2910 if (start)
975241af 2911 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2912 else
6977c3f9 2913 ceph_osdc_unregister_linger_request(osdc,
975241af 2914 rbd_dev->watch_request->osd_req);
2169238d
AE
2915
2916 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
1f3ef788 2917 rbd_dev->watch_event->cookie, 0, start ? 1 : 0);
9d4df01f 2918 rbd_osd_req_format_write(obj_request);
2169238d 2919
9969ebc5
AE
2920 ret = rbd_obj_request_submit(osdc, obj_request);
2921 if (ret)
2922 goto out_cancel;
2923 ret = rbd_obj_request_wait(obj_request);
2924 if (ret)
2925 goto out_cancel;
9969ebc5
AE
2926 ret = obj_request->result;
2927 if (ret)
2928 goto out_cancel;
2929
8eb87565
AE
2930 /*
2931 * A watch request is set to linger, so the underlying osd
2932 * request won't go away until we unregister it. We retain
2933 * a pointer to the object request during that time (in
2934 * rbd_dev->watch_request), so we'll keep a reference to
2935 * it. We'll drop that reference (below) after we've
2936 * unregistered it.
2937 */
2938 if (start) {
2939 rbd_dev->watch_request = obj_request;
2940
2941 return 0;
2942 }
2943
2944 /* We have successfully torn down the watch request */
2945
2946 rbd_obj_request_put(rbd_dev->watch_request);
2947 rbd_dev->watch_request = NULL;
9969ebc5
AE
2948out_cancel:
2949 /* Cancel the event if we're tearing down, or on error */
2950 ceph_osdc_cancel_event(rbd_dev->watch_event);
2951 rbd_dev->watch_event = NULL;
9969ebc5
AE
2952 if (obj_request)
2953 rbd_obj_request_put(obj_request);
2954
2955 return ret;
2956}
2957
36be9a76 2958/*
f40eb349
AE
2959 * Synchronous osd object method call. Returns the number of bytes
2960 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2961 */
2962static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2963 const char *object_name,
2964 const char *class_name,
2965 const char *method_name,
4157976b 2966 const void *outbound,
36be9a76 2967 size_t outbound_size,
4157976b 2968 void *inbound,
e2a58ee5 2969 size_t inbound_size)
36be9a76 2970{
2169238d 2971 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2972 struct rbd_obj_request *obj_request;
36be9a76
AE
2973 struct page **pages;
2974 u32 page_count;
2975 int ret;
2976
2977 /*
6010a451
AE
2978 * Method calls are ultimately read operations. The result
2979 * should placed into the inbound buffer provided. They
2980 * also supply outbound data--parameters for the object
2981 * method. Currently if this is present it will be a
2982 * snapshot id.
36be9a76 2983 */
57385b51 2984 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2985 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2986 if (IS_ERR(pages))
2987 return PTR_ERR(pages);
2988
2989 ret = -ENOMEM;
6010a451 2990 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2991 OBJ_REQUEST_PAGES);
2992 if (!obj_request)
2993 goto out;
2994
2995 obj_request->pages = pages;
2996 obj_request->page_count = page_count;
2997
430c28c3 2998 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2999 if (!obj_request->osd_req)
3000 goto out;
3001
c99d2d4a 3002 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
3003 class_name, method_name);
3004 if (outbound_size) {
3005 struct ceph_pagelist *pagelist;
3006
3007 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
3008 if (!pagelist)
3009 goto out;
3010
3011 ceph_pagelist_init(pagelist);
3012 ceph_pagelist_append(pagelist, outbound, outbound_size);
3013 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
3014 pagelist);
3015 }
a4ce40a9
AE
3016 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
3017 obj_request->pages, inbound_size,
44cd188d 3018 0, false, false);
9d4df01f 3019 rbd_osd_req_format_read(obj_request);
430c28c3 3020
36be9a76
AE
3021 ret = rbd_obj_request_submit(osdc, obj_request);
3022 if (ret)
3023 goto out;
3024 ret = rbd_obj_request_wait(obj_request);
3025 if (ret)
3026 goto out;
3027
3028 ret = obj_request->result;
3029 if (ret < 0)
3030 goto out;
57385b51
AE
3031
3032 rbd_assert(obj_request->xferred < (u64)INT_MAX);
3033 ret = (int)obj_request->xferred;
903bb32e 3034 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
3035out:
3036 if (obj_request)
3037 rbd_obj_request_put(obj_request);
3038 else
3039 ceph_release_page_vector(pages, page_count);
3040
3041 return ret;
3042}
3043
bf0d5f50 3044static void rbd_request_fn(struct request_queue *q)
cc344fa1 3045 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
3046{
3047 struct rbd_device *rbd_dev = q->queuedata;
3048 bool read_only = rbd_dev->mapping.read_only;
3049 struct request *rq;
3050 int result;
3051
3052 while ((rq = blk_fetch_request(q))) {
3053 bool write_request = rq_data_dir(rq) == WRITE;
3054 struct rbd_img_request *img_request;
3055 u64 offset;
3056 u64 length;
3057
3058 /* Ignore any non-FS requests that filter through. */
3059
3060 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
3061 dout("%s: non-fs request type %d\n", __func__,
3062 (int) rq->cmd_type);
3063 __blk_end_request_all(rq, 0);
3064 continue;
3065 }
3066
3067 /* Ignore/skip any zero-length requests */
3068
3069 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
3070 length = (u64) blk_rq_bytes(rq);
3071
3072 if (!length) {
3073 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
3074 __blk_end_request_all(rq, 0);
3075 continue;
3076 }
3077
3078 spin_unlock_irq(q->queue_lock);
3079
3080 /* Disallow writes to a read-only device */
3081
3082 if (write_request) {
3083 result = -EROFS;
3084 if (read_only)
3085 goto end_request;
3086 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
3087 }
3088
6d292906
AE
3089 /*
3090 * Quit early if the mapped snapshot no longer
3091 * exists. It's still possible the snapshot will
3092 * have disappeared by the time our request arrives
3093 * at the osd, but there's no sense in sending it if
3094 * we already know.
3095 */
3096 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
3097 dout("request for non-existent snapshot");
3098 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
3099 result = -ENXIO;
3100 goto end_request;
3101 }
3102
bf0d5f50 3103 result = -EINVAL;
c0cd10db
AE
3104 if (offset && length > U64_MAX - offset + 1) {
3105 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
3106 offset, length);
bf0d5f50 3107 goto end_request; /* Shouldn't happen */
c0cd10db 3108 }
bf0d5f50 3109
00a653e2
AE
3110 result = -EIO;
3111 if (offset + length > rbd_dev->mapping.size) {
3112 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
3113 offset, length, rbd_dev->mapping.size);
3114 goto end_request;
3115 }
3116
bf0d5f50
AE
3117 result = -ENOMEM;
3118 img_request = rbd_img_request_create(rbd_dev, offset, length,
e93f3152 3119 write_request);
bf0d5f50
AE
3120 if (!img_request)
3121 goto end_request;
3122
3123 img_request->rq = rq;
3124
f1a4739f
AE
3125 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
3126 rq->bio);
bf0d5f50
AE
3127 if (!result)
3128 result = rbd_img_request_submit(img_request);
3129 if (result)
3130 rbd_img_request_put(img_request);
3131end_request:
3132 spin_lock_irq(q->queue_lock);
3133 if (result < 0) {
7da22d29
AE
3134 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
3135 write_request ? "write" : "read",
3136 length, offset, result);
3137
bf0d5f50
AE
3138 __blk_end_request_all(rq, result);
3139 }
3140 }
3141}
3142
602adf40
YS
3143/*
3144 * a queue callback. Makes sure that we don't create a bio that spans across
3145 * multiple osd objects. One exception would be with a single page bios,
f7760dad 3146 * which we handle later at bio_chain_clone_range()
602adf40
YS
3147 */
3148static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
3149 struct bio_vec *bvec)
3150{
3151 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
3152 sector_t sector_offset;
3153 sector_t sectors_per_obj;
3154 sector_t obj_sector_offset;
3155 int ret;
3156
3157 /*
3158 * Find how far into its rbd object the partition-relative
3159 * bio start sector is to offset relative to the enclosing
3160 * device.
3161 */
3162 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
3163 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
3164 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
3165
3166 /*
3167 * Compute the number of bytes from that offset to the end
3168 * of the object. Account for what's already used by the bio.
3169 */
3170 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
3171 if (ret > bmd->bi_size)
3172 ret -= bmd->bi_size;
3173 else
3174 ret = 0;
3175
3176 /*
3177 * Don't send back more than was asked for. And if the bio
3178 * was empty, let the whole thing through because: "Note
3179 * that a block device *must* allow a single page to be
3180 * added to an empty bio."
3181 */
3182 rbd_assert(bvec->bv_len <= PAGE_SIZE);
3183 if (ret > (int) bvec->bv_len || !bmd->bi_size)
3184 ret = (int) bvec->bv_len;
3185
3186 return ret;
602adf40
YS
3187}
3188
3189static void rbd_free_disk(struct rbd_device *rbd_dev)
3190{
3191 struct gendisk *disk = rbd_dev->disk;
3192
3193 if (!disk)
3194 return;
3195
a0cab924
AE
3196 rbd_dev->disk = NULL;
3197 if (disk->flags & GENHD_FL_UP) {
602adf40 3198 del_gendisk(disk);
a0cab924
AE
3199 if (disk->queue)
3200 blk_cleanup_queue(disk->queue);
3201 }
602adf40
YS
3202 put_disk(disk);
3203}
3204
788e2df3
AE
3205static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
3206 const char *object_name,
7097f8df 3207 u64 offset, u64 length, void *buf)
788e2df3
AE
3208
3209{
2169238d 3210 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 3211 struct rbd_obj_request *obj_request;
788e2df3
AE
3212 struct page **pages = NULL;
3213 u32 page_count;
1ceae7ef 3214 size_t size;
788e2df3
AE
3215 int ret;
3216
3217 page_count = (u32) calc_pages_for(offset, length);
3218 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
3219 if (IS_ERR(pages))
3220 ret = PTR_ERR(pages);
3221
3222 ret = -ENOMEM;
3223 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 3224 OBJ_REQUEST_PAGES);
788e2df3
AE
3225 if (!obj_request)
3226 goto out;
3227
3228 obj_request->pages = pages;
3229 obj_request->page_count = page_count;
3230
430c28c3 3231 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
3232 if (!obj_request->osd_req)
3233 goto out;
3234
c99d2d4a
AE
3235 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
3236 offset, length, 0, 0);
406e2c9f 3237 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 3238 obj_request->pages,
44cd188d
AE
3239 obj_request->length,
3240 obj_request->offset & ~PAGE_MASK,
3241 false, false);
9d4df01f 3242 rbd_osd_req_format_read(obj_request);
430c28c3 3243
788e2df3
AE
3244 ret = rbd_obj_request_submit(osdc, obj_request);
3245 if (ret)
3246 goto out;
3247 ret = rbd_obj_request_wait(obj_request);
3248 if (ret)
3249 goto out;
3250
3251 ret = obj_request->result;
3252 if (ret < 0)
3253 goto out;
1ceae7ef
AE
3254
3255 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3256 size = (size_t) obj_request->xferred;
903bb32e 3257 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3258 rbd_assert(size <= (size_t)INT_MAX);
3259 ret = (int)size;
788e2df3
AE
3260out:
3261 if (obj_request)
3262 rbd_obj_request_put(obj_request);
3263 else
3264 ceph_release_page_vector(pages, page_count);
3265
3266 return ret;
3267}
3268
602adf40 3269/*
662518b1
AE
3270 * Read the complete header for the given rbd device. On successful
3271 * return, the rbd_dev->header field will contain up-to-date
3272 * information about the image.
602adf40 3273 */
99a41ebc 3274static int rbd_dev_v1_header_info(struct rbd_device *rbd_dev)
602adf40 3275{
4156d998 3276 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3277 u32 snap_count = 0;
4156d998
AE
3278 u64 names_size = 0;
3279 u32 want_count;
3280 int ret;
602adf40 3281
00f1f36f 3282 /*
4156d998
AE
3283 * The complete header will include an array of its 64-bit
3284 * snapshot ids, followed by the names of those snapshots as
3285 * a contiguous block of NUL-terminated strings. Note that
3286 * the number of snapshots could change by the time we read
3287 * it in, in which case we re-read it.
00f1f36f 3288 */
4156d998
AE
3289 do {
3290 size_t size;
3291
3292 kfree(ondisk);
3293
3294 size = sizeof (*ondisk);
3295 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3296 size += names_size;
3297 ondisk = kmalloc(size, GFP_KERNEL);
3298 if (!ondisk)
662518b1 3299 return -ENOMEM;
4156d998 3300
788e2df3 3301 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3302 0, size, ondisk);
4156d998 3303 if (ret < 0)
662518b1 3304 goto out;
c0cd10db 3305 if ((size_t)ret < size) {
4156d998 3306 ret = -ENXIO;
06ecc6cb
AE
3307 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3308 size, ret);
662518b1 3309 goto out;
4156d998
AE
3310 }
3311 if (!rbd_dev_ondisk_valid(ondisk)) {
3312 ret = -ENXIO;
06ecc6cb 3313 rbd_warn(rbd_dev, "invalid header");
662518b1 3314 goto out;
81e759fb 3315 }
602adf40 3316
4156d998
AE
3317 names_size = le64_to_cpu(ondisk->snap_names_len);
3318 want_count = snap_count;
3319 snap_count = le32_to_cpu(ondisk->snap_count);
3320 } while (snap_count != want_count);
00f1f36f 3321
662518b1
AE
3322 ret = rbd_header_from_disk(rbd_dev, ondisk);
3323out:
4156d998
AE
3324 kfree(ondisk);
3325
3326 return ret;
602adf40
YS
3327}
3328
15228ede
AE
3329/*
3330 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3331 * has disappeared from the (just updated) snapshot context.
3332 */
3333static void rbd_exists_validate(struct rbd_device *rbd_dev)
3334{
3335 u64 snap_id;
3336
3337 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3338 return;
3339
3340 snap_id = rbd_dev->spec->snap_id;
3341 if (snap_id == CEPH_NOSNAP)
3342 return;
3343
3344 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3345 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3346}
3347
9875201e
JD
3348static void rbd_dev_update_size(struct rbd_device *rbd_dev)
3349{
3350 sector_t size;
3351 bool removing;
3352
3353 /*
3354 * Don't hold the lock while doing disk operations,
3355 * or lock ordering will conflict with the bdev mutex via:
3356 * rbd_add() -> blkdev_get() -> rbd_open()
3357 */
3358 spin_lock_irq(&rbd_dev->lock);
3359 removing = test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
3360 spin_unlock_irq(&rbd_dev->lock);
3361 /*
3362 * If the device is being removed, rbd_dev->disk has
3363 * been destroyed, so don't try to update its size
3364 */
3365 if (!removing) {
3366 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3367 dout("setting size to %llu sectors", (unsigned long long)size);
3368 set_capacity(rbd_dev->disk, size);
3369 revalidate_disk(rbd_dev->disk);
3370 }
3371}
3372
cc4a38bd 3373static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3374{
e627db08 3375 u64 mapping_size;
1fe5e993
AE
3376 int ret;
3377
117973fb 3378 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
cfbf6377 3379 down_write(&rbd_dev->header_rwsem);
3b5cf2a2 3380 mapping_size = rbd_dev->mapping.size;
117973fb 3381 if (rbd_dev->image_format == 1)
99a41ebc 3382 ret = rbd_dev_v1_header_info(rbd_dev);
117973fb 3383 else
2df3fac7 3384 ret = rbd_dev_v2_header_info(rbd_dev);
15228ede
AE
3385
3386 /* If it's a mapped snapshot, validate its EXISTS flag */
3387
3388 rbd_exists_validate(rbd_dev);
cfbf6377
AE
3389 up_write(&rbd_dev->header_rwsem);
3390
00a653e2 3391 if (mapping_size != rbd_dev->mapping.size) {
9875201e 3392 rbd_dev_update_size(rbd_dev);
00a653e2 3393 }
1fe5e993
AE
3394
3395 return ret;
3396}
3397
602adf40
YS
3398static int rbd_init_disk(struct rbd_device *rbd_dev)
3399{
3400 struct gendisk *disk;
3401 struct request_queue *q;
593a9e7b 3402 u64 segment_size;
602adf40 3403
602adf40 3404 /* create gendisk info */
602adf40
YS
3405 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3406 if (!disk)
1fcdb8aa 3407 return -ENOMEM;
602adf40 3408
f0f8cef5 3409 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3410 rbd_dev->dev_id);
602adf40 3411 disk->major = rbd_dev->major;
dd82fff1 3412 disk->first_minor = rbd_dev->minor;
602adf40
YS
3413 disk->fops = &rbd_bd_ops;
3414 disk->private_data = rbd_dev;
3415
bf0d5f50 3416 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3417 if (!q)
3418 goto out_disk;
029bcbd8 3419
593a9e7b
AE
3420 /* We use the default size, but let's be explicit about it. */
3421 blk_queue_physical_block_size(q, SECTOR_SIZE);
3422
029bcbd8 3423 /* set io sizes to object size */
593a9e7b
AE
3424 segment_size = rbd_obj_bytes(&rbd_dev->header);
3425 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3426 blk_queue_max_segment_size(q, segment_size);
3427 blk_queue_io_min(q, segment_size);
3428 blk_queue_io_opt(q, segment_size);
029bcbd8 3429
602adf40
YS
3430 blk_queue_merge_bvec(q, rbd_merge_bvec);
3431 disk->queue = q;
3432
3433 q->queuedata = rbd_dev;
3434
3435 rbd_dev->disk = disk;
602adf40 3436
602adf40 3437 return 0;
602adf40
YS
3438out_disk:
3439 put_disk(disk);
1fcdb8aa
AE
3440
3441 return -ENOMEM;
602adf40
YS
3442}
3443
dfc5606d
YS
3444/*
3445 sysfs
3446*/
3447
593a9e7b
AE
3448static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3449{
3450 return container_of(dev, struct rbd_device, dev);
3451}
3452
dfc5606d
YS
3453static ssize_t rbd_size_show(struct device *dev,
3454 struct device_attribute *attr, char *buf)
3455{
593a9e7b 3456 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3457
fc71d833
AE
3458 return sprintf(buf, "%llu\n",
3459 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3460}
3461
34b13184
AE
3462/*
3463 * Note this shows the features for whatever's mapped, which is not
3464 * necessarily the base image.
3465 */
3466static ssize_t rbd_features_show(struct device *dev,
3467 struct device_attribute *attr, char *buf)
3468{
3469 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3470
3471 return sprintf(buf, "0x%016llx\n",
fc71d833 3472 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3473}
3474
dfc5606d
YS
3475static ssize_t rbd_major_show(struct device *dev,
3476 struct device_attribute *attr, char *buf)
3477{
593a9e7b 3478 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3479
fc71d833
AE
3480 if (rbd_dev->major)
3481 return sprintf(buf, "%d\n", rbd_dev->major);
3482
3483 return sprintf(buf, "(none)\n");
dd82fff1
ID
3484}
3485
3486static ssize_t rbd_minor_show(struct device *dev,
3487 struct device_attribute *attr, char *buf)
3488{
3489 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
fc71d833 3490
dd82fff1 3491 return sprintf(buf, "%d\n", rbd_dev->minor);
dfc5606d
YS
3492}
3493
3494static ssize_t rbd_client_id_show(struct device *dev,
3495 struct device_attribute *attr, char *buf)
602adf40 3496{
593a9e7b 3497 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3498
1dbb4399
AE
3499 return sprintf(buf, "client%lld\n",
3500 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3501}
3502
dfc5606d
YS
3503static ssize_t rbd_pool_show(struct device *dev,
3504 struct device_attribute *attr, char *buf)
602adf40 3505{
593a9e7b 3506 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3507
0d7dbfce 3508 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3509}
3510
9bb2f334
AE
3511static ssize_t rbd_pool_id_show(struct device *dev,
3512 struct device_attribute *attr, char *buf)
3513{
3514 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3515
0d7dbfce 3516 return sprintf(buf, "%llu\n",
fc71d833 3517 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3518}
3519
dfc5606d
YS
3520static ssize_t rbd_name_show(struct device *dev,
3521 struct device_attribute *attr, char *buf)
3522{
593a9e7b 3523 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3524
a92ffdf8
AE
3525 if (rbd_dev->spec->image_name)
3526 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3527
3528 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3529}
3530
589d30e0
AE
3531static ssize_t rbd_image_id_show(struct device *dev,
3532 struct device_attribute *attr, char *buf)
3533{
3534 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3535
0d7dbfce 3536 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3537}
3538
34b13184
AE
3539/*
3540 * Shows the name of the currently-mapped snapshot (or
3541 * RBD_SNAP_HEAD_NAME for the base image).
3542 */
dfc5606d
YS
3543static ssize_t rbd_snap_show(struct device *dev,
3544 struct device_attribute *attr,
3545 char *buf)
3546{
593a9e7b 3547 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3548
0d7dbfce 3549 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3550}
3551
86b00e0d
AE
3552/*
3553 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3554 * for the parent image. If there is no parent, simply shows
3555 * "(no parent image)".
3556 */
3557static ssize_t rbd_parent_show(struct device *dev,
3558 struct device_attribute *attr,
3559 char *buf)
3560{
3561 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3562 struct rbd_spec *spec = rbd_dev->parent_spec;
3563 int count;
3564 char *bufp = buf;
3565
3566 if (!spec)
3567 return sprintf(buf, "(no parent image)\n");
3568
3569 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3570 (unsigned long long) spec->pool_id, spec->pool_name);
3571 if (count < 0)
3572 return count;
3573 bufp += count;
3574
3575 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3576 spec->image_name ? spec->image_name : "(unknown)");
3577 if (count < 0)
3578 return count;
3579 bufp += count;
3580
3581 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3582 (unsigned long long) spec->snap_id, spec->snap_name);
3583 if (count < 0)
3584 return count;
3585 bufp += count;
3586
3587 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3588 if (count < 0)
3589 return count;
3590 bufp += count;
3591
3592 return (ssize_t) (bufp - buf);
3593}
3594
dfc5606d
YS
3595static ssize_t rbd_image_refresh(struct device *dev,
3596 struct device_attribute *attr,
3597 const char *buf,
3598 size_t size)
3599{
593a9e7b 3600 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3601 int ret;
602adf40 3602
cc4a38bd 3603 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3604 if (ret)
3605 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3606
3607 return ret < 0 ? ret : size;
dfc5606d 3608}
602adf40 3609
dfc5606d 3610static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3611static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d 3612static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
dd82fff1 3613static DEVICE_ATTR(minor, S_IRUGO, rbd_minor_show, NULL);
dfc5606d
YS
3614static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3615static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3616static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3617static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3618static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3619static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3620static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3621static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3622
3623static struct attribute *rbd_attrs[] = {
3624 &dev_attr_size.attr,
34b13184 3625 &dev_attr_features.attr,
dfc5606d 3626 &dev_attr_major.attr,
dd82fff1 3627 &dev_attr_minor.attr,
dfc5606d
YS
3628 &dev_attr_client_id.attr,
3629 &dev_attr_pool.attr,
9bb2f334 3630 &dev_attr_pool_id.attr,
dfc5606d 3631 &dev_attr_name.attr,
589d30e0 3632 &dev_attr_image_id.attr,
dfc5606d 3633 &dev_attr_current_snap.attr,
86b00e0d 3634 &dev_attr_parent.attr,
dfc5606d 3635 &dev_attr_refresh.attr,
dfc5606d
YS
3636 NULL
3637};
3638
3639static struct attribute_group rbd_attr_group = {
3640 .attrs = rbd_attrs,
3641};
3642
3643static const struct attribute_group *rbd_attr_groups[] = {
3644 &rbd_attr_group,
3645 NULL
3646};
3647
3648static void rbd_sysfs_dev_release(struct device *dev)
3649{
3650}
3651
3652static struct device_type rbd_device_type = {
3653 .name = "rbd",
3654 .groups = rbd_attr_groups,
3655 .release = rbd_sysfs_dev_release,
3656};
3657
8b8fb99c
AE
3658static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3659{
3660 kref_get(&spec->kref);
3661
3662 return spec;
3663}
3664
3665static void rbd_spec_free(struct kref *kref);
3666static void rbd_spec_put(struct rbd_spec *spec)
3667{
3668 if (spec)
3669 kref_put(&spec->kref, rbd_spec_free);
3670}
3671
3672static struct rbd_spec *rbd_spec_alloc(void)
3673{
3674 struct rbd_spec *spec;
3675
3676 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3677 if (!spec)
3678 return NULL;
3679 kref_init(&spec->kref);
3680
8b8fb99c
AE
3681 return spec;
3682}
3683
3684static void rbd_spec_free(struct kref *kref)
3685{
3686 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3687
3688 kfree(spec->pool_name);
3689 kfree(spec->image_id);
3690 kfree(spec->image_name);
3691 kfree(spec->snap_name);
3692 kfree(spec);
3693}
3694
cc344fa1 3695static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3696 struct rbd_spec *spec)
3697{
3698 struct rbd_device *rbd_dev;
3699
3700 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3701 if (!rbd_dev)
3702 return NULL;
3703
3704 spin_lock_init(&rbd_dev->lock);
6d292906 3705 rbd_dev->flags = 0;
a2acd00e 3706 atomic_set(&rbd_dev->parent_ref, 0);
c53d5893 3707 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3708 init_rwsem(&rbd_dev->header_rwsem);
3709
3710 rbd_dev->spec = spec;
3711 rbd_dev->rbd_client = rbdc;
3712
0903e875
AE
3713 /* Initialize the layout used for all rbd requests */
3714
3715 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3716 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3717 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3718 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3719
c53d5893
AE
3720 return rbd_dev;
3721}
3722
3723static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3724{
c53d5893
AE
3725 rbd_put_client(rbd_dev->rbd_client);
3726 rbd_spec_put(rbd_dev->spec);
3727 kfree(rbd_dev);
3728}
3729
9d475de5
AE
3730/*
3731 * Get the size and object order for an image snapshot, or if
3732 * snap_id is CEPH_NOSNAP, gets this information for the base
3733 * image.
3734 */
3735static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3736 u8 *order, u64 *snap_size)
3737{
3738 __le64 snapid = cpu_to_le64(snap_id);
3739 int ret;
3740 struct {
3741 u8 order;
3742 __le64 size;
3743 } __attribute__ ((packed)) size_buf = { 0 };
3744
36be9a76 3745 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3746 "rbd", "get_size",
4157976b 3747 &snapid, sizeof (snapid),
e2a58ee5 3748 &size_buf, sizeof (size_buf));
36be9a76 3749 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3750 if (ret < 0)
3751 return ret;
57385b51
AE
3752 if (ret < sizeof (size_buf))
3753 return -ERANGE;
9d475de5 3754
c3545579 3755 if (order) {
c86f86e9 3756 *order = size_buf.order;
c3545579
JD
3757 dout(" order %u", (unsigned int)*order);
3758 }
9d475de5
AE
3759 *snap_size = le64_to_cpu(size_buf.size);
3760
c3545579
JD
3761 dout(" snap_id 0x%016llx snap_size = %llu\n",
3762 (unsigned long long)snap_id,
57385b51 3763 (unsigned long long)*snap_size);
9d475de5
AE
3764
3765 return 0;
3766}
3767
3768static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3769{
3770 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3771 &rbd_dev->header.obj_order,
3772 &rbd_dev->header.image_size);
3773}
3774
1e130199
AE
3775static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3776{
3777 void *reply_buf;
3778 int ret;
3779 void *p;
3780
3781 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3782 if (!reply_buf)
3783 return -ENOMEM;
3784
36be9a76 3785 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3786 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3787 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3788 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3789 if (ret < 0)
3790 goto out;
3791
3792 p = reply_buf;
3793 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3794 p + ret, NULL, GFP_NOIO);
3795 ret = 0;
1e130199
AE
3796
3797 if (IS_ERR(rbd_dev->header.object_prefix)) {
3798 ret = PTR_ERR(rbd_dev->header.object_prefix);
3799 rbd_dev->header.object_prefix = NULL;
3800 } else {
3801 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3802 }
1e130199
AE
3803out:
3804 kfree(reply_buf);
3805
3806 return ret;
3807}
3808
b1b5402a
AE
3809static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3810 u64 *snap_features)
3811{
3812 __le64 snapid = cpu_to_le64(snap_id);
3813 struct {
3814 __le64 features;
3815 __le64 incompat;
4157976b 3816 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3817 u64 incompat;
b1b5402a
AE
3818 int ret;
3819
36be9a76 3820 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3821 "rbd", "get_features",
4157976b 3822 &snapid, sizeof (snapid),
e2a58ee5 3823 &features_buf, sizeof (features_buf));
36be9a76 3824 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3825 if (ret < 0)
3826 return ret;
57385b51
AE
3827 if (ret < sizeof (features_buf))
3828 return -ERANGE;
d889140c
AE
3829
3830 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3831 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3832 return -ENXIO;
d889140c 3833
b1b5402a
AE
3834 *snap_features = le64_to_cpu(features_buf.features);
3835
3836 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3837 (unsigned long long)snap_id,
3838 (unsigned long long)*snap_features,
3839 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3840
3841 return 0;
3842}
3843
3844static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3845{
3846 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3847 &rbd_dev->header.features);
3848}
3849
86b00e0d
AE
3850static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3851{
3852 struct rbd_spec *parent_spec;
3853 size_t size;
3854 void *reply_buf = NULL;
3855 __le64 snapid;
3856 void *p;
3857 void *end;
642a2537 3858 u64 pool_id;
86b00e0d 3859 char *image_id;
3b5cf2a2 3860 u64 snap_id;
86b00e0d 3861 u64 overlap;
86b00e0d
AE
3862 int ret;
3863
3864 parent_spec = rbd_spec_alloc();
3865 if (!parent_spec)
3866 return -ENOMEM;
3867
3868 size = sizeof (__le64) + /* pool_id */
3869 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3870 sizeof (__le64) + /* snap_id */
3871 sizeof (__le64); /* overlap */
3872 reply_buf = kmalloc(size, GFP_KERNEL);
3873 if (!reply_buf) {
3874 ret = -ENOMEM;
3875 goto out_err;
3876 }
3877
3878 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3879 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3880 "rbd", "get_parent",
4157976b 3881 &snapid, sizeof (snapid),
e2a58ee5 3882 reply_buf, size);
36be9a76 3883 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3884 if (ret < 0)
3885 goto out_err;
3886
86b00e0d 3887 p = reply_buf;
57385b51
AE
3888 end = reply_buf + ret;
3889 ret = -ERANGE;
642a2537 3890 ceph_decode_64_safe(&p, end, pool_id, out_err);
392a9dad
AE
3891 if (pool_id == CEPH_NOPOOL) {
3892 /*
3893 * Either the parent never existed, or we have
3894 * record of it but the image got flattened so it no
3895 * longer has a parent. When the parent of a
3896 * layered image disappears we immediately set the
3897 * overlap to 0. The effect of this is that all new
3898 * requests will be treated as if the image had no
3899 * parent.
3900 */
3901 if (rbd_dev->parent_overlap) {
3902 rbd_dev->parent_overlap = 0;
3903 smp_mb();
3904 rbd_dev_parent_put(rbd_dev);
3905 pr_info("%s: clone image has been flattened\n",
3906 rbd_dev->disk->disk_name);
3907 }
3908
86b00e0d 3909 goto out; /* No parent? No problem. */
392a9dad 3910 }
86b00e0d 3911
0903e875
AE
3912 /* The ceph file layout needs to fit pool id in 32 bits */
3913
3914 ret = -EIO;
642a2537 3915 if (pool_id > (u64)U32_MAX) {
c0cd10db 3916 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
642a2537 3917 (unsigned long long)pool_id, U32_MAX);
57385b51 3918 goto out_err;
c0cd10db 3919 }
0903e875 3920
979ed480 3921 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3922 if (IS_ERR(image_id)) {
3923 ret = PTR_ERR(image_id);
3924 goto out_err;
3925 }
3b5cf2a2 3926 ceph_decode_64_safe(&p, end, snap_id, out_err);
86b00e0d
AE
3927 ceph_decode_64_safe(&p, end, overlap, out_err);
3928
3b5cf2a2
AE
3929 /*
3930 * The parent won't change (except when the clone is
3931 * flattened, already handled that). So we only need to
3932 * record the parent spec we have not already done so.
3933 */
3934 if (!rbd_dev->parent_spec) {
3935 parent_spec->pool_id = pool_id;
3936 parent_spec->image_id = image_id;
3937 parent_spec->snap_id = snap_id;
70cf49cf
AE
3938 rbd_dev->parent_spec = parent_spec;
3939 parent_spec = NULL; /* rbd_dev now owns this */
3b5cf2a2
AE
3940 }
3941
3942 /*
3943 * We always update the parent overlap. If it's zero we
3944 * treat it specially.
3945 */
3946 rbd_dev->parent_overlap = overlap;
3947 smp_mb();
3948 if (!overlap) {
3949
3950 /* A null parent_spec indicates it's the initial probe */
3951
3952 if (parent_spec) {
3953 /*
3954 * The overlap has become zero, so the clone
3955 * must have been resized down to 0 at some
3956 * point. Treat this the same as a flatten.
3957 */
3958 rbd_dev_parent_put(rbd_dev);
3959 pr_info("%s: clone image now standalone\n",
3960 rbd_dev->disk->disk_name);
3961 } else {
3962 /*
3963 * For the initial probe, if we find the
3964 * overlap is zero we just pretend there was
3965 * no parent image.
3966 */
3967 rbd_warn(rbd_dev, "ignoring parent of "
3968 "clone with overlap 0\n");
3969 }
70cf49cf 3970 }
86b00e0d
AE
3971out:
3972 ret = 0;
3973out_err:
3974 kfree(reply_buf);
3975 rbd_spec_put(parent_spec);
3976
3977 return ret;
3978}
3979
cc070d59
AE
3980static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3981{
3982 struct {
3983 __le64 stripe_unit;
3984 __le64 stripe_count;
3985 } __attribute__ ((packed)) striping_info_buf = { 0 };
3986 size_t size = sizeof (striping_info_buf);
3987 void *p;
3988 u64 obj_size;
3989 u64 stripe_unit;
3990 u64 stripe_count;
3991 int ret;
3992
3993 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3994 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3995 (char *)&striping_info_buf, size);
cc070d59
AE
3996 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3997 if (ret < 0)
3998 return ret;
3999 if (ret < size)
4000 return -ERANGE;
4001
4002 /*
4003 * We don't actually support the "fancy striping" feature
4004 * (STRIPINGV2) yet, but if the striping sizes are the
4005 * defaults the behavior is the same as before. So find
4006 * out, and only fail if the image has non-default values.
4007 */
4008 ret = -EINVAL;
4009 obj_size = (u64)1 << rbd_dev->header.obj_order;
4010 p = &striping_info_buf;
4011 stripe_unit = ceph_decode_64(&p);
4012 if (stripe_unit != obj_size) {
4013 rbd_warn(rbd_dev, "unsupported stripe unit "
4014 "(got %llu want %llu)",
4015 stripe_unit, obj_size);
4016 return -EINVAL;
4017 }
4018 stripe_count = ceph_decode_64(&p);
4019 if (stripe_count != 1) {
4020 rbd_warn(rbd_dev, "unsupported stripe count "
4021 "(got %llu want 1)", stripe_count);
4022 return -EINVAL;
4023 }
500d0c0f
AE
4024 rbd_dev->header.stripe_unit = stripe_unit;
4025 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
4026
4027 return 0;
4028}
4029
9e15b77d
AE
4030static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
4031{
4032 size_t image_id_size;
4033 char *image_id;
4034 void *p;
4035 void *end;
4036 size_t size;
4037 void *reply_buf = NULL;
4038 size_t len = 0;
4039 char *image_name = NULL;
4040 int ret;
4041
4042 rbd_assert(!rbd_dev->spec->image_name);
4043
69e7a02f
AE
4044 len = strlen(rbd_dev->spec->image_id);
4045 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
4046 image_id = kmalloc(image_id_size, GFP_KERNEL);
4047 if (!image_id)
4048 return NULL;
4049
4050 p = image_id;
4157976b 4051 end = image_id + image_id_size;
57385b51 4052 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
4053
4054 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
4055 reply_buf = kmalloc(size, GFP_KERNEL);
4056 if (!reply_buf)
4057 goto out;
4058
36be9a76 4059 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
4060 "rbd", "dir_get_name",
4061 image_id, image_id_size,
e2a58ee5 4062 reply_buf, size);
9e15b77d
AE
4063 if (ret < 0)
4064 goto out;
4065 p = reply_buf;
f40eb349
AE
4066 end = reply_buf + ret;
4067
9e15b77d
AE
4068 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
4069 if (IS_ERR(image_name))
4070 image_name = NULL;
4071 else
4072 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
4073out:
4074 kfree(reply_buf);
4075 kfree(image_id);
4076
4077 return image_name;
4078}
4079
2ad3d716
AE
4080static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4081{
4082 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4083 const char *snap_name;
4084 u32 which = 0;
4085
4086 /* Skip over names until we find the one we are looking for */
4087
4088 snap_name = rbd_dev->header.snap_names;
4089 while (which < snapc->num_snaps) {
4090 if (!strcmp(name, snap_name))
4091 return snapc->snaps[which];
4092 snap_name += strlen(snap_name) + 1;
4093 which++;
4094 }
4095 return CEPH_NOSNAP;
4096}
4097
4098static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4099{
4100 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4101 u32 which;
4102 bool found = false;
4103 u64 snap_id;
4104
4105 for (which = 0; !found && which < snapc->num_snaps; which++) {
4106 const char *snap_name;
4107
4108 snap_id = snapc->snaps[which];
4109 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
efadc98a
JD
4110 if (IS_ERR(snap_name)) {
4111 /* ignore no-longer existing snapshots */
4112 if (PTR_ERR(snap_name) == -ENOENT)
4113 continue;
4114 else
4115 break;
4116 }
2ad3d716
AE
4117 found = !strcmp(name, snap_name);
4118 kfree(snap_name);
4119 }
4120 return found ? snap_id : CEPH_NOSNAP;
4121}
4122
4123/*
4124 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
4125 * no snapshot by that name is found, or if an error occurs.
4126 */
4127static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
4128{
4129 if (rbd_dev->image_format == 1)
4130 return rbd_v1_snap_id_by_name(rbd_dev, name);
4131
4132 return rbd_v2_snap_id_by_name(rbd_dev, name);
4133}
4134
9e15b77d 4135/*
2e9f7f1c
AE
4136 * When an rbd image has a parent image, it is identified by the
4137 * pool, image, and snapshot ids (not names). This function fills
4138 * in the names for those ids. (It's OK if we can't figure out the
4139 * name for an image id, but the pool and snapshot ids should always
4140 * exist and have names.) All names in an rbd spec are dynamically
4141 * allocated.
e1d4213f
AE
4142 *
4143 * When an image being mapped (not a parent) is probed, we have the
4144 * pool name and pool id, image name and image id, and the snapshot
4145 * name. The only thing we're missing is the snapshot id.
9e15b77d 4146 */
2e9f7f1c 4147static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 4148{
2e9f7f1c
AE
4149 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
4150 struct rbd_spec *spec = rbd_dev->spec;
4151 const char *pool_name;
4152 const char *image_name;
4153 const char *snap_name;
9e15b77d
AE
4154 int ret;
4155
e1d4213f
AE
4156 /*
4157 * An image being mapped will have the pool name (etc.), but
4158 * we need to look up the snapshot id.
4159 */
2e9f7f1c
AE
4160 if (spec->pool_name) {
4161 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 4162 u64 snap_id;
e1d4213f 4163
2ad3d716
AE
4164 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
4165 if (snap_id == CEPH_NOSNAP)
e1d4213f 4166 return -ENOENT;
2ad3d716 4167 spec->snap_id = snap_id;
e1d4213f 4168 } else {
2e9f7f1c 4169 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
4170 }
4171
4172 return 0;
4173 }
9e15b77d 4174
2e9f7f1c 4175 /* Get the pool name; we have to make our own copy of this */
9e15b77d 4176
2e9f7f1c
AE
4177 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
4178 if (!pool_name) {
4179 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
4180 return -EIO;
4181 }
2e9f7f1c
AE
4182 pool_name = kstrdup(pool_name, GFP_KERNEL);
4183 if (!pool_name)
9e15b77d
AE
4184 return -ENOMEM;
4185
4186 /* Fetch the image name; tolerate failure here */
4187
2e9f7f1c
AE
4188 image_name = rbd_dev_image_name(rbd_dev);
4189 if (!image_name)
06ecc6cb 4190 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 4191
2e9f7f1c 4192 /* Look up the snapshot name, and make a copy */
9e15b77d 4193
2e9f7f1c 4194 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
da6a6b63
JD
4195 if (IS_ERR(snap_name)) {
4196 ret = PTR_ERR(snap_name);
9e15b77d 4197 goto out_err;
2e9f7f1c
AE
4198 }
4199
4200 spec->pool_name = pool_name;
4201 spec->image_name = image_name;
4202 spec->snap_name = snap_name;
9e15b77d
AE
4203
4204 return 0;
4205out_err:
2e9f7f1c
AE
4206 kfree(image_name);
4207 kfree(pool_name);
9e15b77d
AE
4208
4209 return ret;
4210}
4211
cc4a38bd 4212static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
4213{
4214 size_t size;
4215 int ret;
4216 void *reply_buf;
4217 void *p;
4218 void *end;
4219 u64 seq;
4220 u32 snap_count;
4221 struct ceph_snap_context *snapc;
4222 u32 i;
4223
4224 /*
4225 * We'll need room for the seq value (maximum snapshot id),
4226 * snapshot count, and array of that many snapshot ids.
4227 * For now we have a fixed upper limit on the number we're
4228 * prepared to receive.
4229 */
4230 size = sizeof (__le64) + sizeof (__le32) +
4231 RBD_MAX_SNAP_COUNT * sizeof (__le64);
4232 reply_buf = kzalloc(size, GFP_KERNEL);
4233 if (!reply_buf)
4234 return -ENOMEM;
4235
36be9a76 4236 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 4237 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 4238 reply_buf, size);
36be9a76 4239 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
4240 if (ret < 0)
4241 goto out;
4242
35d489f9 4243 p = reply_buf;
57385b51
AE
4244 end = reply_buf + ret;
4245 ret = -ERANGE;
35d489f9
AE
4246 ceph_decode_64_safe(&p, end, seq, out);
4247 ceph_decode_32_safe(&p, end, snap_count, out);
4248
4249 /*
4250 * Make sure the reported number of snapshot ids wouldn't go
4251 * beyond the end of our buffer. But before checking that,
4252 * make sure the computed size of the snapshot context we
4253 * allocate is representable in a size_t.
4254 */
4255 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
4256 / sizeof (u64)) {
4257 ret = -EINVAL;
4258 goto out;
4259 }
4260 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
4261 goto out;
468521c1 4262 ret = 0;
35d489f9 4263
812164f8 4264 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
4265 if (!snapc) {
4266 ret = -ENOMEM;
4267 goto out;
4268 }
35d489f9 4269 snapc->seq = seq;
35d489f9
AE
4270 for (i = 0; i < snap_count; i++)
4271 snapc->snaps[i] = ceph_decode_64(&p);
4272
49ece554 4273 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4274 rbd_dev->header.snapc = snapc;
4275
4276 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4277 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4278out:
4279 kfree(reply_buf);
4280
57385b51 4281 return ret;
35d489f9
AE
4282}
4283
54cac61f
AE
4284static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4285 u64 snap_id)
b8b1e2db
AE
4286{
4287 size_t size;
4288 void *reply_buf;
54cac61f 4289 __le64 snapid;
b8b1e2db
AE
4290 int ret;
4291 void *p;
4292 void *end;
b8b1e2db
AE
4293 char *snap_name;
4294
4295 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4296 reply_buf = kmalloc(size, GFP_KERNEL);
4297 if (!reply_buf)
4298 return ERR_PTR(-ENOMEM);
4299
54cac61f 4300 snapid = cpu_to_le64(snap_id);
36be9a76 4301 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 4302 "rbd", "get_snapshot_name",
54cac61f 4303 &snapid, sizeof (snapid),
e2a58ee5 4304 reply_buf, size);
36be9a76 4305 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4306 if (ret < 0) {
4307 snap_name = ERR_PTR(ret);
b8b1e2db 4308 goto out;
f40eb349 4309 }
b8b1e2db
AE
4310
4311 p = reply_buf;
f40eb349 4312 end = reply_buf + ret;
e5c35534 4313 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4314 if (IS_ERR(snap_name))
b8b1e2db 4315 goto out;
b8b1e2db 4316
f40eb349 4317 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4318 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4319out:
4320 kfree(reply_buf);
4321
f40eb349 4322 return snap_name;
b8b1e2db
AE
4323}
4324
2df3fac7 4325static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
117973fb 4326{
2df3fac7 4327 bool first_time = rbd_dev->header.object_prefix == NULL;
117973fb 4328 int ret;
117973fb 4329
1617e40c
JD
4330 ret = rbd_dev_v2_image_size(rbd_dev);
4331 if (ret)
cfbf6377 4332 return ret;
1617e40c 4333
2df3fac7
AE
4334 if (first_time) {
4335 ret = rbd_dev_v2_header_onetime(rbd_dev);
4336 if (ret)
cfbf6377 4337 return ret;
2df3fac7
AE
4338 }
4339
642a2537
AE
4340 /*
4341 * If the image supports layering, get the parent info. We
4342 * need to probe the first time regardless. Thereafter we
4343 * only need to if there's a parent, to see if it has
4344 * disappeared due to the mapped image getting flattened.
4345 */
4346 if (rbd_dev->header.features & RBD_FEATURE_LAYERING &&
4347 (first_time || rbd_dev->parent_spec)) {
4348 bool warn;
4349
4350 ret = rbd_dev_v2_parent_info(rbd_dev);
4351 if (ret)
cfbf6377 4352 return ret;
642a2537
AE
4353
4354 /*
4355 * Print a warning if this is the initial probe and
4356 * the image has a parent. Don't print it if the
4357 * image now being probed is itself a parent. We
4358 * can tell at this point because we won't know its
4359 * pool name yet (just its pool id).
4360 */
4361 warn = rbd_dev->parent_spec && rbd_dev->spec->pool_name;
4362 if (first_time && warn)
4363 rbd_warn(rbd_dev, "WARNING: kernel layering "
4364 "is EXPERIMENTAL!");
4365 }
4366
29334ba4
AE
4367 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4368 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4369 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4370
cc4a38bd 4371 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb 4372 dout("rbd_dev_v2_snap_context returned %d\n", ret);
117973fb
AE
4373
4374 return ret;
4375}
4376
dfc5606d
YS
4377static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4378{
dfc5606d 4379 struct device *dev;
cd789ab9 4380 int ret;
dfc5606d 4381
cd789ab9 4382 dev = &rbd_dev->dev;
dfc5606d
YS
4383 dev->bus = &rbd_bus_type;
4384 dev->type = &rbd_device_type;
4385 dev->parent = &rbd_root_dev;
200a6a8b 4386 dev->release = rbd_dev_device_release;
de71a297 4387 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4388 ret = device_register(dev);
dfc5606d 4389
dfc5606d 4390 return ret;
602adf40
YS
4391}
4392
dfc5606d
YS
4393static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4394{
4395 device_unregister(&rbd_dev->dev);
4396}
4397
1ddbe94e 4398/*
499afd5b 4399 * Get a unique rbd identifier for the given new rbd_dev, and add
f8a22fc2 4400 * the rbd_dev to the global list.
1ddbe94e 4401 */
f8a22fc2 4402static int rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4403{
f8a22fc2
ID
4404 int new_dev_id;
4405
4406 new_dev_id = ida_simple_get(&rbd_dev_id_ida, 0, 0, GFP_KERNEL);
4407 if (new_dev_id < 0)
4408 return new_dev_id;
4409
4410 rbd_dev->dev_id = new_dev_id;
499afd5b
AE
4411
4412 spin_lock(&rbd_dev_list_lock);
4413 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4414 spin_unlock(&rbd_dev_list_lock);
f8a22fc2 4415
70eebd20 4416 dout("rbd_dev %p given dev id %d\n", rbd_dev, rbd_dev->dev_id);
f8a22fc2
ID
4417
4418 return 0;
1ddbe94e 4419}
b7f23c36 4420
1ddbe94e 4421/*
499afd5b
AE
4422 * Remove an rbd_dev from the global list, and record that its
4423 * identifier is no longer in use.
1ddbe94e 4424 */
e2839308 4425static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4426{
499afd5b
AE
4427 spin_lock(&rbd_dev_list_lock);
4428 list_del_init(&rbd_dev->node);
4429 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4430
f8a22fc2
ID
4431 ida_simple_remove(&rbd_dev_id_ida, rbd_dev->dev_id);
4432
4433 dout("rbd_dev %p released dev id %d\n", rbd_dev, rbd_dev->dev_id);
b7f23c36
AE
4434}
4435
e28fff26
AE
4436/*
4437 * Skips over white space at *buf, and updates *buf to point to the
4438 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4439 * the token (string of non-white space characters) found. Note
4440 * that *buf must be terminated with '\0'.
e28fff26
AE
4441 */
4442static inline size_t next_token(const char **buf)
4443{
4444 /*
4445 * These are the characters that produce nonzero for
4446 * isspace() in the "C" and "POSIX" locales.
4447 */
4448 const char *spaces = " \f\n\r\t\v";
4449
4450 *buf += strspn(*buf, spaces); /* Find start of token */
4451
4452 return strcspn(*buf, spaces); /* Return token length */
4453}
4454
4455/*
4456 * Finds the next token in *buf, and if the provided token buffer is
4457 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4458 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4459 * must be terminated with '\0' on entry.
e28fff26
AE
4460 *
4461 * Returns the length of the token found (not including the '\0').
4462 * Return value will be 0 if no token is found, and it will be >=
4463 * token_size if the token would not fit.
4464 *
593a9e7b 4465 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4466 * found token. Note that this occurs even if the token buffer is
4467 * too small to hold it.
4468 */
4469static inline size_t copy_token(const char **buf,
4470 char *token,
4471 size_t token_size)
4472{
4473 size_t len;
4474
4475 len = next_token(buf);
4476 if (len < token_size) {
4477 memcpy(token, *buf, len);
4478 *(token + len) = '\0';
4479 }
4480 *buf += len;
4481
4482 return len;
4483}
4484
ea3352f4
AE
4485/*
4486 * Finds the next token in *buf, dynamically allocates a buffer big
4487 * enough to hold a copy of it, and copies the token into the new
4488 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4489 * that a duplicate buffer is created even for a zero-length token.
4490 *
4491 * Returns a pointer to the newly-allocated duplicate, or a null
4492 * pointer if memory for the duplicate was not available. If
4493 * the lenp argument is a non-null pointer, the length of the token
4494 * (not including the '\0') is returned in *lenp.
4495 *
4496 * If successful, the *buf pointer will be updated to point beyond
4497 * the end of the found token.
4498 *
4499 * Note: uses GFP_KERNEL for allocation.
4500 */
4501static inline char *dup_token(const char **buf, size_t *lenp)
4502{
4503 char *dup;
4504 size_t len;
4505
4506 len = next_token(buf);
4caf35f9 4507 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4508 if (!dup)
4509 return NULL;
ea3352f4
AE
4510 *(dup + len) = '\0';
4511 *buf += len;
4512
4513 if (lenp)
4514 *lenp = len;
4515
4516 return dup;
4517}
4518
a725f65e 4519/*
859c31df
AE
4520 * Parse the options provided for an "rbd add" (i.e., rbd image
4521 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4522 * and the data written is passed here via a NUL-terminated buffer.
4523 * Returns 0 if successful or an error code otherwise.
d22f76e7 4524 *
859c31df
AE
4525 * The information extracted from these options is recorded in
4526 * the other parameters which return dynamically-allocated
4527 * structures:
4528 * ceph_opts
4529 * The address of a pointer that will refer to a ceph options
4530 * structure. Caller must release the returned pointer using
4531 * ceph_destroy_options() when it is no longer needed.
4532 * rbd_opts
4533 * Address of an rbd options pointer. Fully initialized by
4534 * this function; caller must release with kfree().
4535 * spec
4536 * Address of an rbd image specification pointer. Fully
4537 * initialized by this function based on parsed options.
4538 * Caller must release with rbd_spec_put().
4539 *
4540 * The options passed take this form:
4541 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4542 * where:
4543 * <mon_addrs>
4544 * A comma-separated list of one or more monitor addresses.
4545 * A monitor address is an ip address, optionally followed
4546 * by a port number (separated by a colon).
4547 * I.e.: ip1[:port1][,ip2[:port2]...]
4548 * <options>
4549 * A comma-separated list of ceph and/or rbd options.
4550 * <pool_name>
4551 * The name of the rados pool containing the rbd image.
4552 * <image_name>
4553 * The name of the image in that pool to map.
4554 * <snap_id>
4555 * An optional snapshot id. If provided, the mapping will
4556 * present data from the image at the time that snapshot was
4557 * created. The image head is used if no snapshot id is
4558 * provided. Snapshot mappings are always read-only.
a725f65e 4559 */
859c31df 4560static int rbd_add_parse_args(const char *buf,
dc79b113 4561 struct ceph_options **ceph_opts,
859c31df
AE
4562 struct rbd_options **opts,
4563 struct rbd_spec **rbd_spec)
e28fff26 4564{
d22f76e7 4565 size_t len;
859c31df 4566 char *options;
0ddebc0c 4567 const char *mon_addrs;
ecb4dc22 4568 char *snap_name;
0ddebc0c 4569 size_t mon_addrs_size;
859c31df 4570 struct rbd_spec *spec = NULL;
4e9afeba 4571 struct rbd_options *rbd_opts = NULL;
859c31df 4572 struct ceph_options *copts;
dc79b113 4573 int ret;
e28fff26
AE
4574
4575 /* The first four tokens are required */
4576
7ef3214a 4577 len = next_token(&buf);
4fb5d671
AE
4578 if (!len) {
4579 rbd_warn(NULL, "no monitor address(es) provided");
4580 return -EINVAL;
4581 }
0ddebc0c 4582 mon_addrs = buf;
f28e565a 4583 mon_addrs_size = len + 1;
7ef3214a 4584 buf += len;
a725f65e 4585
dc79b113 4586 ret = -EINVAL;
f28e565a
AE
4587 options = dup_token(&buf, NULL);
4588 if (!options)
dc79b113 4589 return -ENOMEM;
4fb5d671
AE
4590 if (!*options) {
4591 rbd_warn(NULL, "no options provided");
4592 goto out_err;
4593 }
e28fff26 4594
859c31df
AE
4595 spec = rbd_spec_alloc();
4596 if (!spec)
f28e565a 4597 goto out_mem;
859c31df
AE
4598
4599 spec->pool_name = dup_token(&buf, NULL);
4600 if (!spec->pool_name)
4601 goto out_mem;
4fb5d671
AE
4602 if (!*spec->pool_name) {
4603 rbd_warn(NULL, "no pool name provided");
4604 goto out_err;
4605 }
e28fff26 4606
69e7a02f 4607 spec->image_name = dup_token(&buf, NULL);
859c31df 4608 if (!spec->image_name)
f28e565a 4609 goto out_mem;
4fb5d671
AE
4610 if (!*spec->image_name) {
4611 rbd_warn(NULL, "no image name provided");
4612 goto out_err;
4613 }
d4b125e9 4614
f28e565a
AE
4615 /*
4616 * Snapshot name is optional; default is to use "-"
4617 * (indicating the head/no snapshot).
4618 */
3feeb894 4619 len = next_token(&buf);
820a5f3e 4620 if (!len) {
3feeb894
AE
4621 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4622 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4623 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4624 ret = -ENAMETOOLONG;
f28e565a 4625 goto out_err;
849b4260 4626 }
ecb4dc22
AE
4627 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4628 if (!snap_name)
f28e565a 4629 goto out_mem;
ecb4dc22
AE
4630 *(snap_name + len) = '\0';
4631 spec->snap_name = snap_name;
e5c35534 4632
0ddebc0c 4633 /* Initialize all rbd options to the defaults */
e28fff26 4634
4e9afeba
AE
4635 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4636 if (!rbd_opts)
4637 goto out_mem;
4638
4639 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4640
859c31df 4641 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4642 mon_addrs + mon_addrs_size - 1,
4e9afeba 4643 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4644 if (IS_ERR(copts)) {
4645 ret = PTR_ERR(copts);
dc79b113
AE
4646 goto out_err;
4647 }
859c31df
AE
4648 kfree(options);
4649
4650 *ceph_opts = copts;
4e9afeba 4651 *opts = rbd_opts;
859c31df 4652 *rbd_spec = spec;
0ddebc0c 4653
dc79b113 4654 return 0;
f28e565a 4655out_mem:
dc79b113 4656 ret = -ENOMEM;
d22f76e7 4657out_err:
859c31df
AE
4658 kfree(rbd_opts);
4659 rbd_spec_put(spec);
f28e565a 4660 kfree(options);
d22f76e7 4661
dc79b113 4662 return ret;
a725f65e
AE
4663}
4664
589d30e0
AE
4665/*
4666 * An rbd format 2 image has a unique identifier, distinct from the
4667 * name given to it by the user. Internally, that identifier is
4668 * what's used to specify the names of objects related to the image.
4669 *
4670 * A special "rbd id" object is used to map an rbd image name to its
4671 * id. If that object doesn't exist, then there is no v2 rbd image
4672 * with the supplied name.
4673 *
4674 * This function will record the given rbd_dev's image_id field if
4675 * it can be determined, and in that case will return 0. If any
4676 * errors occur a negative errno will be returned and the rbd_dev's
4677 * image_id field will be unchanged (and should be NULL).
4678 */
4679static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4680{
4681 int ret;
4682 size_t size;
4683 char *object_name;
4684 void *response;
c0fba368 4685 char *image_id;
2f82ee54 4686
2c0d0a10
AE
4687 /*
4688 * When probing a parent image, the image id is already
4689 * known (and the image name likely is not). There's no
c0fba368
AE
4690 * need to fetch the image id again in this case. We
4691 * do still need to set the image format though.
2c0d0a10 4692 */
c0fba368
AE
4693 if (rbd_dev->spec->image_id) {
4694 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4695
2c0d0a10 4696 return 0;
c0fba368 4697 }
2c0d0a10 4698
589d30e0
AE
4699 /*
4700 * First, see if the format 2 image id file exists, and if
4701 * so, get the image's persistent id from it.
4702 */
69e7a02f 4703 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4704 object_name = kmalloc(size, GFP_NOIO);
4705 if (!object_name)
4706 return -ENOMEM;
0d7dbfce 4707 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4708 dout("rbd id object name is %s\n", object_name);
4709
4710 /* Response will be an encoded string, which includes a length */
4711
4712 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4713 response = kzalloc(size, GFP_NOIO);
4714 if (!response) {
4715 ret = -ENOMEM;
4716 goto out;
4717 }
4718
c0fba368
AE
4719 /* If it doesn't exist we'll assume it's a format 1 image */
4720
36be9a76 4721 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4722 "rbd", "get_id", NULL, 0,
e2a58ee5 4723 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4724 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4725 if (ret == -ENOENT) {
4726 image_id = kstrdup("", GFP_KERNEL);
4727 ret = image_id ? 0 : -ENOMEM;
4728 if (!ret)
4729 rbd_dev->image_format = 1;
4730 } else if (ret > sizeof (__le32)) {
4731 void *p = response;
4732
4733 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4734 NULL, GFP_NOIO);
c0fba368
AE
4735 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4736 if (!ret)
4737 rbd_dev->image_format = 2;
589d30e0 4738 } else {
c0fba368
AE
4739 ret = -EINVAL;
4740 }
4741
4742 if (!ret) {
4743 rbd_dev->spec->image_id = image_id;
4744 dout("image_id is %s\n", image_id);
589d30e0
AE
4745 }
4746out:
4747 kfree(response);
4748 kfree(object_name);
4749
4750 return ret;
4751}
4752
3abef3b3
AE
4753/*
4754 * Undo whatever state changes are made by v1 or v2 header info
4755 * call.
4756 */
6fd48b3b
AE
4757static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4758{
4759 struct rbd_image_header *header;
4760
392a9dad
AE
4761 /* Drop parent reference unless it's already been done (or none) */
4762
4763 if (rbd_dev->parent_overlap)
4764 rbd_dev_parent_put(rbd_dev);
6fd48b3b
AE
4765
4766 /* Free dynamic fields from the header, then zero it out */
4767
4768 header = &rbd_dev->header;
812164f8 4769 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4770 kfree(header->snap_sizes);
4771 kfree(header->snap_names);
4772 kfree(header->object_prefix);
4773 memset(header, 0, sizeof (*header));
4774}
4775
2df3fac7 4776static int rbd_dev_v2_header_onetime(struct rbd_device *rbd_dev)
a30b71b9
AE
4777{
4778 int ret;
a30b71b9 4779
1e130199 4780 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4781 if (ret)
b1b5402a
AE
4782 goto out_err;
4783
2df3fac7
AE
4784 /*
4785 * Get the and check features for the image. Currently the
4786 * features are assumed to never change.
4787 */
b1b5402a 4788 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4789 if (ret)
9d475de5 4790 goto out_err;
35d489f9 4791
cc070d59
AE
4792 /* If the image supports fancy striping, get its parameters */
4793
4794 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4795 ret = rbd_dev_v2_striping_info(rbd_dev);
4796 if (ret < 0)
4797 goto out_err;
4798 }
2df3fac7 4799 /* No support for crypto and compression type format 2 images */
a30b71b9 4800
35152979 4801 return 0;
9d475de5 4802out_err:
642a2537 4803 rbd_dev->header.features = 0;
1e130199
AE
4804 kfree(rbd_dev->header.object_prefix);
4805 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4806
4807 return ret;
a30b71b9
AE
4808}
4809
124afba2 4810static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4811{
2f82ee54 4812 struct rbd_device *parent = NULL;
124afba2
AE
4813 struct rbd_spec *parent_spec;
4814 struct rbd_client *rbdc;
4815 int ret;
4816
4817 if (!rbd_dev->parent_spec)
4818 return 0;
4819 /*
4820 * We need to pass a reference to the client and the parent
4821 * spec when creating the parent rbd_dev. Images related by
4822 * parent/child relationships always share both.
4823 */
4824 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4825 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4826
4827 ret = -ENOMEM;
4828 parent = rbd_dev_create(rbdc, parent_spec);
4829 if (!parent)
4830 goto out_err;
4831
1f3ef788 4832 ret = rbd_dev_image_probe(parent, false);
124afba2
AE
4833 if (ret < 0)
4834 goto out_err;
4835 rbd_dev->parent = parent;
a2acd00e 4836 atomic_set(&rbd_dev->parent_ref, 1);
124afba2
AE
4837
4838 return 0;
4839out_err:
4840 if (parent) {
fb65d228 4841 rbd_dev_unparent(rbd_dev);
124afba2
AE
4842 kfree(rbd_dev->header_name);
4843 rbd_dev_destroy(parent);
4844 } else {
4845 rbd_put_client(rbdc);
4846 rbd_spec_put(parent_spec);
4847 }
4848
4849 return ret;
4850}
4851
200a6a8b 4852static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4853{
83a06263 4854 int ret;
d1cf5788 4855
f8a22fc2
ID
4856 /* Get an id and fill in device name. */
4857
4858 ret = rbd_dev_id_get(rbd_dev);
4859 if (ret)
4860 return ret;
83a06263 4861
83a06263
AE
4862 BUILD_BUG_ON(DEV_NAME_LEN
4863 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4864 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4865
4866 /* Get our block major device number. */
4867
4868 ret = register_blkdev(0, rbd_dev->name);
4869 if (ret < 0)
4870 goto err_out_id;
4871 rbd_dev->major = ret;
dd82fff1 4872 rbd_dev->minor = 0;
83a06263
AE
4873
4874 /* Set up the blkdev mapping. */
4875
4876 ret = rbd_init_disk(rbd_dev);
4877 if (ret)
4878 goto err_out_blkdev;
4879
f35a4dee 4880 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
4881 if (ret)
4882 goto err_out_disk;
f35a4dee
AE
4883 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4884
4885 ret = rbd_bus_add_dev(rbd_dev);
4886 if (ret)
4887 goto err_out_mapping;
83a06263 4888
83a06263
AE
4889 /* Everything's ready. Announce the disk to the world. */
4890
129b79d4 4891 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4892 add_disk(rbd_dev->disk);
4893
4894 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4895 (unsigned long long) rbd_dev->mapping.size);
4896
4897 return ret;
2f82ee54 4898
f35a4dee
AE
4899err_out_mapping:
4900 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4901err_out_disk:
4902 rbd_free_disk(rbd_dev);
4903err_out_blkdev:
4904 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4905err_out_id:
4906 rbd_dev_id_put(rbd_dev);
d1cf5788 4907 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4908
4909 return ret;
4910}
4911
332bb12d
AE
4912static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4913{
4914 struct rbd_spec *spec = rbd_dev->spec;
4915 size_t size;
4916
4917 /* Record the header object name for this rbd image. */
4918
4919 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4920
4921 if (rbd_dev->image_format == 1)
4922 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4923 else
4924 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4925
4926 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4927 if (!rbd_dev->header_name)
4928 return -ENOMEM;
4929
4930 if (rbd_dev->image_format == 1)
4931 sprintf(rbd_dev->header_name, "%s%s",
4932 spec->image_name, RBD_SUFFIX);
4933 else
4934 sprintf(rbd_dev->header_name, "%s%s",
4935 RBD_HEADER_PREFIX, spec->image_id);
4936 return 0;
4937}
4938
200a6a8b
AE
4939static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4940{
6fd48b3b 4941 rbd_dev_unprobe(rbd_dev);
200a6a8b 4942 kfree(rbd_dev->header_name);
6fd48b3b
AE
4943 rbd_dev->header_name = NULL;
4944 rbd_dev->image_format = 0;
4945 kfree(rbd_dev->spec->image_id);
4946 rbd_dev->spec->image_id = NULL;
4947
200a6a8b
AE
4948 rbd_dev_destroy(rbd_dev);
4949}
4950
a30b71b9
AE
4951/*
4952 * Probe for the existence of the header object for the given rbd
1f3ef788
AE
4953 * device. If this image is the one being mapped (i.e., not a
4954 * parent), initiate a watch on its header object before using that
4955 * object to get detailed information about the rbd image.
a30b71b9 4956 */
1f3ef788 4957static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool mapping)
a30b71b9
AE
4958{
4959 int ret;
b644de2b 4960 int tmp;
a30b71b9
AE
4961
4962 /*
3abef3b3
AE
4963 * Get the id from the image id object. Unless there's an
4964 * error, rbd_dev->spec->image_id will be filled in with
4965 * a dynamically-allocated string, and rbd_dev->image_format
4966 * will be set to either 1 or 2.
a30b71b9
AE
4967 */
4968 ret = rbd_dev_image_id(rbd_dev);
4969 if (ret)
c0fba368
AE
4970 return ret;
4971 rbd_assert(rbd_dev->spec->image_id);
4972 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4973
332bb12d
AE
4974 ret = rbd_dev_header_name(rbd_dev);
4975 if (ret)
4976 goto err_out_format;
4977
1f3ef788
AE
4978 if (mapping) {
4979 ret = rbd_dev_header_watch_sync(rbd_dev, true);
4980 if (ret)
4981 goto out_header_name;
4982 }
b644de2b 4983
c0fba368 4984 if (rbd_dev->image_format == 1)
99a41ebc 4985 ret = rbd_dev_v1_header_info(rbd_dev);
a30b71b9 4986 else
2df3fac7 4987 ret = rbd_dev_v2_header_info(rbd_dev);
5655c4d9 4988 if (ret)
b644de2b 4989 goto err_out_watch;
83a06263 4990
9bb81c9b
AE
4991 ret = rbd_dev_spec_update(rbd_dev);
4992 if (ret)
33dca39f 4993 goto err_out_probe;
9bb81c9b
AE
4994
4995 ret = rbd_dev_probe_parent(rbd_dev);
30d60ba2
AE
4996 if (ret)
4997 goto err_out_probe;
4998
4999 dout("discovered format %u image, header name is %s\n",
5000 rbd_dev->image_format, rbd_dev->header_name);
83a06263 5001
30d60ba2 5002 return 0;
6fd48b3b
AE
5003err_out_probe:
5004 rbd_dev_unprobe(rbd_dev);
b644de2b 5005err_out_watch:
1f3ef788
AE
5006 if (mapping) {
5007 tmp = rbd_dev_header_watch_sync(rbd_dev, false);
5008 if (tmp)
5009 rbd_warn(rbd_dev, "unable to tear down "
5010 "watch request (%d)\n", tmp);
5011 }
332bb12d
AE
5012out_header_name:
5013 kfree(rbd_dev->header_name);
5014 rbd_dev->header_name = NULL;
5015err_out_format:
5016 rbd_dev->image_format = 0;
5655c4d9
AE
5017 kfree(rbd_dev->spec->image_id);
5018 rbd_dev->spec->image_id = NULL;
5019
5020 dout("probe failed, returning %d\n", ret);
5021
a30b71b9
AE
5022 return ret;
5023}
5024
59c2be1e
YS
5025static ssize_t rbd_add(struct bus_type *bus,
5026 const char *buf,
5027 size_t count)
602adf40 5028{
cb8627c7 5029 struct rbd_device *rbd_dev = NULL;
dc79b113 5030 struct ceph_options *ceph_opts = NULL;
4e9afeba 5031 struct rbd_options *rbd_opts = NULL;
859c31df 5032 struct rbd_spec *spec = NULL;
9d3997fd 5033 struct rbd_client *rbdc;
27cc2594 5034 struct ceph_osd_client *osdc;
51344a38 5035 bool read_only;
27cc2594 5036 int rc = -ENOMEM;
602adf40
YS
5037
5038 if (!try_module_get(THIS_MODULE))
5039 return -ENODEV;
5040
602adf40 5041 /* parse add command */
859c31df 5042 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 5043 if (rc < 0)
bd4ba655 5044 goto err_out_module;
51344a38
AE
5045 read_only = rbd_opts->read_only;
5046 kfree(rbd_opts);
5047 rbd_opts = NULL; /* done with this */
78cea76e 5048
9d3997fd
AE
5049 rbdc = rbd_get_client(ceph_opts);
5050 if (IS_ERR(rbdc)) {
5051 rc = PTR_ERR(rbdc);
0ddebc0c 5052 goto err_out_args;
9d3997fd 5053 }
602adf40 5054
602adf40 5055 /* pick the pool */
9d3997fd 5056 osdc = &rbdc->client->osdc;
859c31df 5057 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
5058 if (rc < 0)
5059 goto err_out_client;
c0cd10db 5060 spec->pool_id = (u64)rc;
859c31df 5061
0903e875
AE
5062 /* The ceph file layout needs to fit pool id in 32 bits */
5063
c0cd10db
AE
5064 if (spec->pool_id > (u64)U32_MAX) {
5065 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
5066 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
5067 rc = -EIO;
5068 goto err_out_client;
5069 }
5070
c53d5893 5071 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
5072 if (!rbd_dev)
5073 goto err_out_client;
c53d5893
AE
5074 rbdc = NULL; /* rbd_dev now owns this */
5075 spec = NULL; /* rbd_dev now owns this */
602adf40 5076
1f3ef788 5077 rc = rbd_dev_image_probe(rbd_dev, true);
a30b71b9 5078 if (rc < 0)
c53d5893 5079 goto err_out_rbd_dev;
05fd6f6f 5080
7ce4eef7
AE
5081 /* If we are mapping a snapshot it must be marked read-only */
5082
5083 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
5084 read_only = true;
5085 rbd_dev->mapping.read_only = read_only;
5086
b536f69a 5087 rc = rbd_dev_device_setup(rbd_dev);
3abef3b3
AE
5088 if (rc) {
5089 rbd_dev_image_release(rbd_dev);
5090 goto err_out_module;
5091 }
5092
5093 return count;
b536f69a 5094
c53d5893
AE
5095err_out_rbd_dev:
5096 rbd_dev_destroy(rbd_dev);
bd4ba655 5097err_out_client:
9d3997fd 5098 rbd_put_client(rbdc);
0ddebc0c 5099err_out_args:
859c31df 5100 rbd_spec_put(spec);
bd4ba655
AE
5101err_out_module:
5102 module_put(THIS_MODULE);
27cc2594 5103
602adf40 5104 dout("Error adding device %s\n", buf);
27cc2594 5105
c0cd10db 5106 return (ssize_t)rc;
602adf40
YS
5107}
5108
200a6a8b 5109static void rbd_dev_device_release(struct device *dev)
602adf40 5110{
593a9e7b 5111 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 5112
602adf40 5113 rbd_free_disk(rbd_dev);
200a6a8b 5114 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 5115 rbd_dev_mapping_clear(rbd_dev);
602adf40 5116 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 5117 rbd_dev->major = 0;
e2839308 5118 rbd_dev_id_put(rbd_dev);
d1cf5788 5119 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
5120}
5121
05a46afd
AE
5122static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
5123{
ad945fc1 5124 while (rbd_dev->parent) {
05a46afd
AE
5125 struct rbd_device *first = rbd_dev;
5126 struct rbd_device *second = first->parent;
5127 struct rbd_device *third;
5128
5129 /*
5130 * Follow to the parent with no grandparent and
5131 * remove it.
5132 */
5133 while (second && (third = second->parent)) {
5134 first = second;
5135 second = third;
5136 }
ad945fc1 5137 rbd_assert(second);
8ad42cd0 5138 rbd_dev_image_release(second);
ad945fc1
AE
5139 first->parent = NULL;
5140 first->parent_overlap = 0;
5141
5142 rbd_assert(first->parent_spec);
05a46afd
AE
5143 rbd_spec_put(first->parent_spec);
5144 first->parent_spec = NULL;
05a46afd
AE
5145 }
5146}
5147
dfc5606d
YS
5148static ssize_t rbd_remove(struct bus_type *bus,
5149 const char *buf,
5150 size_t count)
602adf40
YS
5151{
5152 struct rbd_device *rbd_dev = NULL;
751cc0e3
AE
5153 struct list_head *tmp;
5154 int dev_id;
602adf40 5155 unsigned long ul;
82a442d2 5156 bool already = false;
0d8189e1 5157 int ret;
602adf40 5158
bb8e0e84 5159 ret = kstrtoul(buf, 10, &ul);
0d8189e1
AE
5160 if (ret)
5161 return ret;
602adf40
YS
5162
5163 /* convert to int; abort if we lost anything in the conversion */
751cc0e3
AE
5164 dev_id = (int)ul;
5165 if (dev_id != ul)
602adf40
YS
5166 return -EINVAL;
5167
751cc0e3
AE
5168 ret = -ENOENT;
5169 spin_lock(&rbd_dev_list_lock);
5170 list_for_each(tmp, &rbd_dev_list) {
5171 rbd_dev = list_entry(tmp, struct rbd_device, node);
5172 if (rbd_dev->dev_id == dev_id) {
5173 ret = 0;
5174 break;
5175 }
42382b70 5176 }
751cc0e3
AE
5177 if (!ret) {
5178 spin_lock_irq(&rbd_dev->lock);
5179 if (rbd_dev->open_count)
5180 ret = -EBUSY;
5181 else
82a442d2
AE
5182 already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
5183 &rbd_dev->flags);
751cc0e3
AE
5184 spin_unlock_irq(&rbd_dev->lock);
5185 }
5186 spin_unlock(&rbd_dev_list_lock);
82a442d2 5187 if (ret < 0 || already)
1ba0f1e7 5188 return ret;
751cc0e3 5189
1f3ef788
AE
5190 ret = rbd_dev_header_watch_sync(rbd_dev, false);
5191 if (ret)
5192 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
9abc5990
JD
5193
5194 /*
5195 * flush remaining watch callbacks - these must be complete
5196 * before the osd_client is shutdown
5197 */
5198 dout("%s: flushing notifies", __func__);
5199 ceph_osdc_flush_notifies(&rbd_dev->rbd_client->client->osdc);
9875201e
JD
5200 /*
5201 * Don't free anything from rbd_dev->disk until after all
5202 * notifies are completely processed. Otherwise
5203 * rbd_bus_del_dev() will race with rbd_watch_cb(), resulting
5204 * in a potential use after free of rbd_dev->disk or rbd_dev.
5205 */
5206 rbd_bus_del_dev(rbd_dev);
8ad42cd0 5207 rbd_dev_image_release(rbd_dev);
79ab7558 5208 module_put(THIS_MODULE);
aafb230e 5209
1ba0f1e7 5210 return count;
602adf40
YS
5211}
5212
602adf40
YS
5213/*
5214 * create control files in sysfs
dfc5606d 5215 * /sys/bus/rbd/...
602adf40
YS
5216 */
5217static int rbd_sysfs_init(void)
5218{
dfc5606d 5219 int ret;
602adf40 5220
fed4c143 5221 ret = device_register(&rbd_root_dev);
21079786 5222 if (ret < 0)
dfc5606d 5223 return ret;
602adf40 5224
fed4c143
AE
5225 ret = bus_register(&rbd_bus_type);
5226 if (ret < 0)
5227 device_unregister(&rbd_root_dev);
602adf40 5228
602adf40
YS
5229 return ret;
5230}
5231
5232static void rbd_sysfs_cleanup(void)
5233{
dfc5606d 5234 bus_unregister(&rbd_bus_type);
fed4c143 5235 device_unregister(&rbd_root_dev);
602adf40
YS
5236}
5237
1c2a9dfe
AE
5238static int rbd_slab_init(void)
5239{
5240 rbd_assert(!rbd_img_request_cache);
5241 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5242 sizeof (struct rbd_img_request),
5243 __alignof__(struct rbd_img_request),
5244 0, NULL);
868311b1
AE
5245 if (!rbd_img_request_cache)
5246 return -ENOMEM;
5247
5248 rbd_assert(!rbd_obj_request_cache);
5249 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5250 sizeof (struct rbd_obj_request),
5251 __alignof__(struct rbd_obj_request),
5252 0, NULL);
78c2a44a
AE
5253 if (!rbd_obj_request_cache)
5254 goto out_err;
5255
5256 rbd_assert(!rbd_segment_name_cache);
5257 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5258 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5259 if (rbd_segment_name_cache)
1c2a9dfe 5260 return 0;
78c2a44a
AE
5261out_err:
5262 if (rbd_obj_request_cache) {
5263 kmem_cache_destroy(rbd_obj_request_cache);
5264 rbd_obj_request_cache = NULL;
5265 }
1c2a9dfe 5266
868311b1
AE
5267 kmem_cache_destroy(rbd_img_request_cache);
5268 rbd_img_request_cache = NULL;
5269
1c2a9dfe
AE
5270 return -ENOMEM;
5271}
5272
5273static void rbd_slab_exit(void)
5274{
78c2a44a
AE
5275 rbd_assert(rbd_segment_name_cache);
5276 kmem_cache_destroy(rbd_segment_name_cache);
5277 rbd_segment_name_cache = NULL;
5278
868311b1
AE
5279 rbd_assert(rbd_obj_request_cache);
5280 kmem_cache_destroy(rbd_obj_request_cache);
5281 rbd_obj_request_cache = NULL;
5282
1c2a9dfe
AE
5283 rbd_assert(rbd_img_request_cache);
5284 kmem_cache_destroy(rbd_img_request_cache);
5285 rbd_img_request_cache = NULL;
5286}
5287
cc344fa1 5288static int __init rbd_init(void)
602adf40
YS
5289{
5290 int rc;
5291
1e32d34c
AE
5292 if (!libceph_compatible(NULL)) {
5293 rbd_warn(NULL, "libceph incompatibility (quitting)");
1e32d34c
AE
5294 return -EINVAL;
5295 }
e1b4d96d 5296
1c2a9dfe 5297 rc = rbd_slab_init();
602adf40
YS
5298 if (rc)
5299 return rc;
e1b4d96d 5300
1c2a9dfe
AE
5301 rc = rbd_sysfs_init();
5302 if (rc)
e1b4d96d 5303 goto err_out_slab;
1c2a9dfe 5304
e1b4d96d
ID
5305 pr_info("loaded\n");
5306 return 0;
5307
5308err_out_slab:
5309 rbd_slab_exit();
1c2a9dfe 5310 return rc;
602adf40
YS
5311}
5312
cc344fa1 5313static void __exit rbd_exit(void)
602adf40
YS
5314{
5315 rbd_sysfs_cleanup();
1c2a9dfe 5316 rbd_slab_exit();
602adf40
YS
5317}
5318
5319module_init(rbd_init);
5320module_exit(rbd_exit);
5321
d552c619 5322MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
602adf40
YS
5323MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5324MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
602adf40
YS
5325/* following authorship retained from original osdblk.c */
5326MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5327
90da258b 5328MODULE_DESCRIPTION("RADOS Block Device (RBD) driver");
602adf40 5329MODULE_LICENSE("GPL");