]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - drivers/block/rbd.c
rbd: don't look up snapshot id in rbd_dev_mapping_set()
[mirror_ubuntu-bionic-kernel.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
602adf40
YS
44
45#include "rbd_types.h"
46
aafb230e
AE
47#define RBD_DEBUG /* Activate rbd_assert() calls */
48
593a9e7b
AE
49/*
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
54 */
55#define SECTOR_SHIFT 9
56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57
f0f8cef5
AE
58#define RBD_DRV_NAME "rbd"
59#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
60
61#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
62
d4b125e9
AE
63#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
64#define RBD_MAX_SNAP_NAME_LEN \
65 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
35d489f9 67#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
68
69#define RBD_SNAP_HEAD_NAME "-"
70
9682fc6d
AE
71#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
72
9e15b77d
AE
73/* This allows a single page to hold an image name sent by OSD */
74#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 75#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 76
1e130199 77#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 78
d889140c
AE
79/* Feature bits */
80
5cbf6f12
AE
81#define RBD_FEATURE_LAYERING (1<<0)
82#define RBD_FEATURE_STRIPINGV2 (1<<1)
83#define RBD_FEATURES_ALL \
84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
85
86/* Features supported by this (client software) implementation. */
87
770eba6e 88#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 89
81a89793
AE
90/*
91 * An RBD device name will be "rbd#", where the "rbd" comes from
92 * RBD_DRV_NAME above, and # is a unique integer identifier.
93 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94 * enough to hold all possible device names.
95 */
602adf40 96#define DEV_NAME_LEN 32
81a89793 97#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
98
99/*
100 * block device image metadata (in-memory version)
101 */
102struct rbd_image_header {
f84344f3 103 /* These four fields never change for a given rbd image */
849b4260 104 char *object_prefix;
34b13184 105 u64 features;
602adf40
YS
106 __u8 obj_order;
107 __u8 crypt_type;
108 __u8 comp_type;
602adf40 109
f84344f3
AE
110 /* The remaining fields need to be updated occasionally */
111 u64 image_size;
112 struct ceph_snap_context *snapc;
602adf40
YS
113 char *snap_names;
114 u64 *snap_sizes;
59c2be1e 115
500d0c0f
AE
116 u64 stripe_unit;
117 u64 stripe_count;
59c2be1e
YS
118};
119
0d7dbfce
AE
120/*
121 * An rbd image specification.
122 *
123 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
124 * identify an image. Each rbd_dev structure includes a pointer to
125 * an rbd_spec structure that encapsulates this identity.
126 *
127 * Each of the id's in an rbd_spec has an associated name. For a
128 * user-mapped image, the names are supplied and the id's associated
129 * with them are looked up. For a layered image, a parent image is
130 * defined by the tuple, and the names are looked up.
131 *
132 * An rbd_dev structure contains a parent_spec pointer which is
133 * non-null if the image it represents is a child in a layered
134 * image. This pointer will refer to the rbd_spec structure used
135 * by the parent rbd_dev for its own identity (i.e., the structure
136 * is shared between the parent and child).
137 *
138 * Since these structures are populated once, during the discovery
139 * phase of image construction, they are effectively immutable so
140 * we make no effort to synchronize access to them.
141 *
142 * Note that code herein does not assume the image name is known (it
143 * could be a null pointer).
0d7dbfce
AE
144 */
145struct rbd_spec {
146 u64 pool_id;
ecb4dc22 147 const char *pool_name;
0d7dbfce 148
ecb4dc22
AE
149 const char *image_id;
150 const char *image_name;
0d7dbfce
AE
151
152 u64 snap_id;
ecb4dc22 153 const char *snap_name;
0d7dbfce
AE
154
155 struct kref kref;
156};
157
602adf40 158/*
f0f8cef5 159 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
160 */
161struct rbd_client {
162 struct ceph_client *client;
163 struct kref kref;
164 struct list_head node;
165};
166
bf0d5f50
AE
167struct rbd_img_request;
168typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
169
170#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
171
172struct rbd_obj_request;
173typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
174
9969ebc5
AE
175enum obj_request_type {
176 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
177};
bf0d5f50 178
926f9b3f
AE
179enum obj_req_flags {
180 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 181 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
182 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
183 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
184};
185
bf0d5f50
AE
186struct rbd_obj_request {
187 const char *object_name;
188 u64 offset; /* object start byte */
189 u64 length; /* bytes from offset */
926f9b3f 190 unsigned long flags;
bf0d5f50 191
c5b5ef6c
AE
192 /*
193 * An object request associated with an image will have its
194 * img_data flag set; a standalone object request will not.
195 *
196 * A standalone object request will have which == BAD_WHICH
197 * and a null obj_request pointer.
198 *
199 * An object request initiated in support of a layered image
200 * object (to check for its existence before a write) will
201 * have which == BAD_WHICH and a non-null obj_request pointer.
202 *
203 * Finally, an object request for rbd image data will have
204 * which != BAD_WHICH, and will have a non-null img_request
205 * pointer. The value of which will be in the range
206 * 0..(img_request->obj_request_count-1).
207 */
208 union {
209 struct rbd_obj_request *obj_request; /* STAT op */
210 struct {
211 struct rbd_img_request *img_request;
212 u64 img_offset;
213 /* links for img_request->obj_requests list */
214 struct list_head links;
215 };
216 };
bf0d5f50
AE
217 u32 which; /* posn image request list */
218
219 enum obj_request_type type;
788e2df3
AE
220 union {
221 struct bio *bio_list;
222 struct {
223 struct page **pages;
224 u32 page_count;
225 };
226 };
0eefd470 227 struct page **copyup_pages;
bf0d5f50
AE
228
229 struct ceph_osd_request *osd_req;
230
231 u64 xferred; /* bytes transferred */
1b83bef2 232 int result;
bf0d5f50
AE
233
234 rbd_obj_callback_t callback;
788e2df3 235 struct completion completion;
bf0d5f50
AE
236
237 struct kref kref;
238};
239
0c425248 240enum img_req_flags {
9849e986
AE
241 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
242 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 243 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
244};
245
bf0d5f50 246struct rbd_img_request {
bf0d5f50
AE
247 struct rbd_device *rbd_dev;
248 u64 offset; /* starting image byte offset */
249 u64 length; /* byte count from offset */
0c425248 250 unsigned long flags;
bf0d5f50 251 union {
9849e986 252 u64 snap_id; /* for reads */
bf0d5f50 253 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
254 };
255 union {
256 struct request *rq; /* block request */
257 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 258 };
3d7efd18 259 struct page **copyup_pages;
bf0d5f50
AE
260 spinlock_t completion_lock;/* protects next_completion */
261 u32 next_completion;
262 rbd_img_callback_t callback;
55f27e09 263 u64 xferred;/* aggregate bytes transferred */
a5a337d4 264 int result; /* first nonzero obj_request result */
bf0d5f50
AE
265
266 u32 obj_request_count;
267 struct list_head obj_requests; /* rbd_obj_request structs */
268
269 struct kref kref;
270};
271
272#define for_each_obj_request(ireq, oreq) \
ef06f4d3 273 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 274#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 275 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 276#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 277 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 278
f84344f3 279struct rbd_mapping {
99c1f08f 280 u64 size;
34b13184 281 u64 features;
f84344f3
AE
282 bool read_only;
283};
284
602adf40
YS
285/*
286 * a single device
287 */
288struct rbd_device {
de71a297 289 int dev_id; /* blkdev unique id */
602adf40
YS
290
291 int major; /* blkdev assigned major */
292 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 293
a30b71b9 294 u32 image_format; /* Either 1 or 2 */
602adf40
YS
295 struct rbd_client *rbd_client;
296
297 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
298
b82d167b 299 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
300
301 struct rbd_image_header header;
b82d167b 302 unsigned long flags; /* possibly lock protected */
0d7dbfce 303 struct rbd_spec *spec;
602adf40 304
0d7dbfce 305 char *header_name;
971f839a 306
0903e875
AE
307 struct ceph_file_layout layout;
308
59c2be1e 309 struct ceph_osd_event *watch_event;
975241af 310 struct rbd_obj_request *watch_request;
59c2be1e 311
86b00e0d
AE
312 struct rbd_spec *parent_spec;
313 u64 parent_overlap;
2f82ee54 314 struct rbd_device *parent;
86b00e0d 315
c666601a
JD
316 /* protects updating the header */
317 struct rw_semaphore header_rwsem;
f84344f3
AE
318
319 struct rbd_mapping mapping;
602adf40
YS
320
321 struct list_head node;
dfc5606d 322
dfc5606d
YS
323 /* sysfs related */
324 struct device dev;
b82d167b 325 unsigned long open_count; /* protected by lock */
dfc5606d
YS
326};
327
b82d167b
AE
328/*
329 * Flag bits for rbd_dev->flags. If atomicity is required,
330 * rbd_dev->lock is used to protect access.
331 *
332 * Currently, only the "removing" flag (which is coupled with the
333 * "open_count" field) requires atomic access.
334 */
6d292906
AE
335enum rbd_dev_flags {
336 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 337 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
338};
339
602adf40 340static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 341
602adf40 342static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
343static DEFINE_SPINLOCK(rbd_dev_list_lock);
344
432b8587
AE
345static LIST_HEAD(rbd_client_list); /* clients */
346static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 347
78c2a44a
AE
348/* Slab caches for frequently-allocated structures */
349
1c2a9dfe 350static struct kmem_cache *rbd_img_request_cache;
868311b1 351static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 352static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 353
3d7efd18
AE
354static int rbd_img_request_submit(struct rbd_img_request *img_request);
355
200a6a8b 356static void rbd_dev_device_release(struct device *dev);
dfc5606d 357
f0f8cef5
AE
358static ssize_t rbd_add(struct bus_type *bus, const char *buf,
359 size_t count);
360static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
361 size_t count);
71f293e2 362static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
f0f8cef5
AE
363
364static struct bus_attribute rbd_bus_attrs[] = {
365 __ATTR(add, S_IWUSR, NULL, rbd_add),
366 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
367 __ATTR_NULL
368};
369
370static struct bus_type rbd_bus_type = {
371 .name = "rbd",
372 .bus_attrs = rbd_bus_attrs,
373};
374
375static void rbd_root_dev_release(struct device *dev)
376{
377}
378
379static struct device rbd_root_dev = {
380 .init_name = "rbd",
381 .release = rbd_root_dev_release,
382};
383
06ecc6cb
AE
384static __printf(2, 3)
385void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
386{
387 struct va_format vaf;
388 va_list args;
389
390 va_start(args, fmt);
391 vaf.fmt = fmt;
392 vaf.va = &args;
393
394 if (!rbd_dev)
395 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
396 else if (rbd_dev->disk)
397 printk(KERN_WARNING "%s: %s: %pV\n",
398 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
399 else if (rbd_dev->spec && rbd_dev->spec->image_name)
400 printk(KERN_WARNING "%s: image %s: %pV\n",
401 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
402 else if (rbd_dev->spec && rbd_dev->spec->image_id)
403 printk(KERN_WARNING "%s: id %s: %pV\n",
404 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
405 else /* punt */
406 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
407 RBD_DRV_NAME, rbd_dev, &vaf);
408 va_end(args);
409}
410
aafb230e
AE
411#ifdef RBD_DEBUG
412#define rbd_assert(expr) \
413 if (unlikely(!(expr))) { \
414 printk(KERN_ERR "\nAssertion failure in %s() " \
415 "at line %d:\n\n" \
416 "\trbd_assert(%s);\n\n", \
417 __func__, __LINE__, #expr); \
418 BUG(); \
419 }
420#else /* !RBD_DEBUG */
421# define rbd_assert(expr) ((void) 0)
422#endif /* !RBD_DEBUG */
dfc5606d 423
b454e36d 424static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
425static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
426static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 427
cc4a38bd
AE
428static int rbd_dev_refresh(struct rbd_device *rbd_dev);
429static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
54cac61f
AE
430static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
431 u64 snap_id);
2ad3d716
AE
432static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
433 u8 *order, u64 *snap_size);
434static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
435 u64 *snap_features);
436static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 437
602adf40
YS
438static int rbd_open(struct block_device *bdev, fmode_t mode)
439{
f0f8cef5 440 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 441 bool removing = false;
602adf40 442
f84344f3 443 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
444 return -EROFS;
445
a14ea269 446 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
447 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
448 removing = true;
449 else
450 rbd_dev->open_count++;
a14ea269 451 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
452 if (removing)
453 return -ENOENT;
454
42382b70 455 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 456 (void) get_device(&rbd_dev->dev);
f84344f3 457 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 458 mutex_unlock(&ctl_mutex);
340c7a2b 459
602adf40
YS
460 return 0;
461}
462
dfc5606d
YS
463static int rbd_release(struct gendisk *disk, fmode_t mode)
464{
465 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
466 unsigned long open_count_before;
467
a14ea269 468 spin_lock_irq(&rbd_dev->lock);
b82d167b 469 open_count_before = rbd_dev->open_count--;
a14ea269 470 spin_unlock_irq(&rbd_dev->lock);
b82d167b 471 rbd_assert(open_count_before > 0);
dfc5606d 472
42382b70 473 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 474 put_device(&rbd_dev->dev);
42382b70 475 mutex_unlock(&ctl_mutex);
dfc5606d
YS
476
477 return 0;
478}
479
602adf40
YS
480static const struct block_device_operations rbd_bd_ops = {
481 .owner = THIS_MODULE,
482 .open = rbd_open,
dfc5606d 483 .release = rbd_release,
602adf40
YS
484};
485
486/*
487 * Initialize an rbd client instance.
43ae4701 488 * We own *ceph_opts.
602adf40 489 */
f8c38929 490static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
491{
492 struct rbd_client *rbdc;
493 int ret = -ENOMEM;
494
37206ee5 495 dout("%s:\n", __func__);
602adf40
YS
496 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
497 if (!rbdc)
498 goto out_opt;
499
500 kref_init(&rbdc->kref);
501 INIT_LIST_HEAD(&rbdc->node);
502
bc534d86
AE
503 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
504
43ae4701 505 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 506 if (IS_ERR(rbdc->client))
bc534d86 507 goto out_mutex;
43ae4701 508 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
509
510 ret = ceph_open_session(rbdc->client);
511 if (ret < 0)
512 goto out_err;
513
432b8587 514 spin_lock(&rbd_client_list_lock);
602adf40 515 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 516 spin_unlock(&rbd_client_list_lock);
602adf40 517
bc534d86 518 mutex_unlock(&ctl_mutex);
37206ee5 519 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 520
602adf40
YS
521 return rbdc;
522
523out_err:
524 ceph_destroy_client(rbdc->client);
bc534d86
AE
525out_mutex:
526 mutex_unlock(&ctl_mutex);
602adf40
YS
527 kfree(rbdc);
528out_opt:
43ae4701
AE
529 if (ceph_opts)
530 ceph_destroy_options(ceph_opts);
37206ee5
AE
531 dout("%s: error %d\n", __func__, ret);
532
28f259b7 533 return ERR_PTR(ret);
602adf40
YS
534}
535
2f82ee54
AE
536static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
537{
538 kref_get(&rbdc->kref);
539
540 return rbdc;
541}
542
602adf40 543/*
1f7ba331
AE
544 * Find a ceph client with specific addr and configuration. If
545 * found, bump its reference count.
602adf40 546 */
1f7ba331 547static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
548{
549 struct rbd_client *client_node;
1f7ba331 550 bool found = false;
602adf40 551
43ae4701 552 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
553 return NULL;
554
1f7ba331
AE
555 spin_lock(&rbd_client_list_lock);
556 list_for_each_entry(client_node, &rbd_client_list, node) {
557 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
558 __rbd_get_client(client_node);
559
1f7ba331
AE
560 found = true;
561 break;
562 }
563 }
564 spin_unlock(&rbd_client_list_lock);
565
566 return found ? client_node : NULL;
602adf40
YS
567}
568
59c2be1e
YS
569/*
570 * mount options
571 */
572enum {
59c2be1e
YS
573 Opt_last_int,
574 /* int args above */
575 Opt_last_string,
576 /* string args above */
cc0538b6
AE
577 Opt_read_only,
578 Opt_read_write,
579 /* Boolean args above */
580 Opt_last_bool,
59c2be1e
YS
581};
582
43ae4701 583static match_table_t rbd_opts_tokens = {
59c2be1e
YS
584 /* int args above */
585 /* string args above */
be466c1c 586 {Opt_read_only, "read_only"},
cc0538b6
AE
587 {Opt_read_only, "ro"}, /* Alternate spelling */
588 {Opt_read_write, "read_write"},
589 {Opt_read_write, "rw"}, /* Alternate spelling */
590 /* Boolean args above */
59c2be1e
YS
591 {-1, NULL}
592};
593
98571b5a
AE
594struct rbd_options {
595 bool read_only;
596};
597
598#define RBD_READ_ONLY_DEFAULT false
599
59c2be1e
YS
600static int parse_rbd_opts_token(char *c, void *private)
601{
43ae4701 602 struct rbd_options *rbd_opts = private;
59c2be1e
YS
603 substring_t argstr[MAX_OPT_ARGS];
604 int token, intval, ret;
605
43ae4701 606 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
607 if (token < 0)
608 return -EINVAL;
609
610 if (token < Opt_last_int) {
611 ret = match_int(&argstr[0], &intval);
612 if (ret < 0) {
613 pr_err("bad mount option arg (not int) "
614 "at '%s'\n", c);
615 return ret;
616 }
617 dout("got int token %d val %d\n", token, intval);
618 } else if (token > Opt_last_int && token < Opt_last_string) {
619 dout("got string token %d val %s\n", token,
620 argstr[0].from);
cc0538b6
AE
621 } else if (token > Opt_last_string && token < Opt_last_bool) {
622 dout("got Boolean token %d\n", token);
59c2be1e
YS
623 } else {
624 dout("got token %d\n", token);
625 }
626
627 switch (token) {
cc0538b6
AE
628 case Opt_read_only:
629 rbd_opts->read_only = true;
630 break;
631 case Opt_read_write:
632 rbd_opts->read_only = false;
633 break;
59c2be1e 634 default:
aafb230e
AE
635 rbd_assert(false);
636 break;
59c2be1e
YS
637 }
638 return 0;
639}
640
602adf40
YS
641/*
642 * Get a ceph client with specific addr and configuration, if one does
643 * not exist create it.
644 */
9d3997fd 645static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 646{
f8c38929 647 struct rbd_client *rbdc;
59c2be1e 648
1f7ba331 649 rbdc = rbd_client_find(ceph_opts);
9d3997fd 650 if (rbdc) /* using an existing client */
43ae4701 651 ceph_destroy_options(ceph_opts);
9d3997fd 652 else
f8c38929 653 rbdc = rbd_client_create(ceph_opts);
602adf40 654
9d3997fd 655 return rbdc;
602adf40
YS
656}
657
658/*
659 * Destroy ceph client
d23a4b3f 660 *
432b8587 661 * Caller must hold rbd_client_list_lock.
602adf40
YS
662 */
663static void rbd_client_release(struct kref *kref)
664{
665 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
666
37206ee5 667 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 668 spin_lock(&rbd_client_list_lock);
602adf40 669 list_del(&rbdc->node);
cd9d9f5d 670 spin_unlock(&rbd_client_list_lock);
602adf40
YS
671
672 ceph_destroy_client(rbdc->client);
673 kfree(rbdc);
674}
675
676/*
677 * Drop reference to ceph client node. If it's not referenced anymore, release
678 * it.
679 */
9d3997fd 680static void rbd_put_client(struct rbd_client *rbdc)
602adf40 681{
c53d5893
AE
682 if (rbdc)
683 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
684}
685
a30b71b9
AE
686static bool rbd_image_format_valid(u32 image_format)
687{
688 return image_format == 1 || image_format == 2;
689}
690
8e94af8e
AE
691static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
692{
103a150f
AE
693 size_t size;
694 u32 snap_count;
695
696 /* The header has to start with the magic rbd header text */
697 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
698 return false;
699
db2388b6
AE
700 /* The bio layer requires at least sector-sized I/O */
701
702 if (ondisk->options.order < SECTOR_SHIFT)
703 return false;
704
705 /* If we use u64 in a few spots we may be able to loosen this */
706
707 if (ondisk->options.order > 8 * sizeof (int) - 1)
708 return false;
709
103a150f
AE
710 /*
711 * The size of a snapshot header has to fit in a size_t, and
712 * that limits the number of snapshots.
713 */
714 snap_count = le32_to_cpu(ondisk->snap_count);
715 size = SIZE_MAX - sizeof (struct ceph_snap_context);
716 if (snap_count > size / sizeof (__le64))
717 return false;
718
719 /*
720 * Not only that, but the size of the entire the snapshot
721 * header must also be representable in a size_t.
722 */
723 size -= snap_count * sizeof (__le64);
724 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
725 return false;
726
727 return true;
8e94af8e
AE
728}
729
602adf40
YS
730/*
731 * Create a new header structure, translate header format from the on-disk
732 * header.
733 */
734static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 735 struct rbd_image_header_ondisk *ondisk)
602adf40 736{
ccece235 737 u32 snap_count;
58c17b0e 738 size_t len;
d2bb24e5 739 size_t size;
621901d6 740 u32 i;
602adf40 741
6a52325f
AE
742 memset(header, 0, sizeof (*header));
743
103a150f
AE
744 snap_count = le32_to_cpu(ondisk->snap_count);
745
58c17b0e
AE
746 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
747 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 748 if (!header->object_prefix)
602adf40 749 return -ENOMEM;
58c17b0e
AE
750 memcpy(header->object_prefix, ondisk->object_prefix, len);
751 header->object_prefix[len] = '\0';
00f1f36f 752
602adf40 753 if (snap_count) {
f785cc1d
AE
754 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
755
621901d6
AE
756 /* Save a copy of the snapshot names */
757
f785cc1d
AE
758 if (snap_names_len > (u64) SIZE_MAX)
759 return -EIO;
760 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 761 if (!header->snap_names)
6a52325f 762 goto out_err;
f785cc1d
AE
763 /*
764 * Note that rbd_dev_v1_header_read() guarantees
765 * the ondisk buffer we're working with has
766 * snap_names_len bytes beyond the end of the
767 * snapshot id array, this memcpy() is safe.
768 */
769 memcpy(header->snap_names, &ondisk->snaps[snap_count],
770 snap_names_len);
6a52325f 771
621901d6
AE
772 /* Record each snapshot's size */
773
d2bb24e5
AE
774 size = snap_count * sizeof (*header->snap_sizes);
775 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 776 if (!header->snap_sizes)
6a52325f 777 goto out_err;
621901d6
AE
778 for (i = 0; i < snap_count; i++)
779 header->snap_sizes[i] =
780 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40
YS
781 } else {
782 header->snap_names = NULL;
783 header->snap_sizes = NULL;
784 }
849b4260 785
34b13184 786 header->features = 0; /* No features support in v1 images */
602adf40
YS
787 header->obj_order = ondisk->options.order;
788 header->crypt_type = ondisk->options.crypt_type;
789 header->comp_type = ondisk->options.comp_type;
6a52325f 790
621901d6
AE
791 /* Allocate and fill in the snapshot context */
792
f84344f3 793 header->image_size = le64_to_cpu(ondisk->image_size);
468521c1 794
812164f8 795 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6a52325f
AE
796 if (!header->snapc)
797 goto out_err;
505cbb9b 798 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
621901d6 799 for (i = 0; i < snap_count; i++)
468521c1 800 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
801
802 return 0;
803
6a52325f 804out_err:
849b4260 805 kfree(header->snap_sizes);
ccece235 806 header->snap_sizes = NULL;
602adf40 807 kfree(header->snap_names);
ccece235 808 header->snap_names = NULL;
6a52325f
AE
809 kfree(header->object_prefix);
810 header->object_prefix = NULL;
ccece235 811
00f1f36f 812 return -ENOMEM;
602adf40
YS
813}
814
9682fc6d
AE
815static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
816{
817 const char *snap_name;
818
819 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
820
821 /* Skip over names until we find the one we are looking for */
822
823 snap_name = rbd_dev->header.snap_names;
824 while (which--)
825 snap_name += strlen(snap_name) + 1;
826
827 return kstrdup(snap_name, GFP_KERNEL);
828}
829
30d1cff8
AE
830/*
831 * Snapshot id comparison function for use with qsort()/bsearch().
832 * Note that result is for snapshots in *descending* order.
833 */
834static int snapid_compare_reverse(const void *s1, const void *s2)
835{
836 u64 snap_id1 = *(u64 *)s1;
837 u64 snap_id2 = *(u64 *)s2;
838
839 if (snap_id1 < snap_id2)
840 return 1;
841 return snap_id1 == snap_id2 ? 0 : -1;
842}
843
844/*
845 * Search a snapshot context to see if the given snapshot id is
846 * present.
847 *
848 * Returns the position of the snapshot id in the array if it's found,
849 * or BAD_SNAP_INDEX otherwise.
850 *
851 * Note: The snapshot array is in kept sorted (by the osd) in
852 * reverse order, highest snapshot id first.
853 */
9682fc6d
AE
854static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
855{
856 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 857 u64 *found;
9682fc6d 858
30d1cff8
AE
859 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
860 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 861
30d1cff8 862 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
863}
864
2ad3d716
AE
865static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
866 u64 snap_id)
9e15b77d 867{
54cac61f 868 u32 which;
9e15b77d 869
54cac61f
AE
870 which = rbd_dev_snap_index(rbd_dev, snap_id);
871 if (which == BAD_SNAP_INDEX)
872 return NULL;
873
874 return _rbd_dev_v1_snap_name(rbd_dev, which);
875}
876
877static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
878{
9e15b77d
AE
879 if (snap_id == CEPH_NOSNAP)
880 return RBD_SNAP_HEAD_NAME;
881
54cac61f
AE
882 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
883 if (rbd_dev->image_format == 1)
884 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 885
54cac61f 886 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
887}
888
2ad3d716
AE
889static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
890 u64 *snap_size)
602adf40 891{
2ad3d716
AE
892 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
893 if (snap_id == CEPH_NOSNAP) {
894 *snap_size = rbd_dev->header.image_size;
895 } else if (rbd_dev->image_format == 1) {
896 u32 which;
602adf40 897
2ad3d716
AE
898 which = rbd_dev_snap_index(rbd_dev, snap_id);
899 if (which == BAD_SNAP_INDEX)
900 return -ENOENT;
e86924a8 901
2ad3d716
AE
902 *snap_size = rbd_dev->header.snap_sizes[which];
903 } else {
904 u64 size = 0;
905 int ret;
906
907 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
908 if (ret)
909 return ret;
910
911 *snap_size = size;
912 }
913 return 0;
602adf40
YS
914}
915
2ad3d716
AE
916static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
917 u64 *snap_features)
602adf40 918{
2ad3d716
AE
919 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
920 if (snap_id == CEPH_NOSNAP) {
921 *snap_features = rbd_dev->header.features;
922 } else if (rbd_dev->image_format == 1) {
923 *snap_features = 0; /* No features for format 1 */
602adf40 924 } else {
2ad3d716
AE
925 u64 features = 0;
926 int ret;
8b0241f8 927
2ad3d716
AE
928 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
929 if (ret)
930 return ret;
931
932 *snap_features = features;
933 }
934 return 0;
935}
936
937static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
938{
8f4b7d98 939 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
940 u64 size = 0;
941 u64 features = 0;
942 int ret;
943
2ad3d716
AE
944 ret = rbd_snap_size(rbd_dev, snap_id, &size);
945 if (ret)
946 return ret;
947 ret = rbd_snap_features(rbd_dev, snap_id, &features);
948 if (ret)
949 return ret;
950
951 rbd_dev->mapping.size = size;
952 rbd_dev->mapping.features = features;
953
954 /* If we are mapping a snapshot it must be marked read-only */
955
956 if (snap_id != CEPH_NOSNAP)
957 rbd_dev->mapping.read_only = true;
958
8b0241f8 959 return 0;
602adf40
YS
960}
961
d1cf5788
AE
962static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
963{
964 rbd_dev->mapping.size = 0;
965 rbd_dev->mapping.features = 0;
966 rbd_dev->mapping.read_only = true;
967}
968
200a6a8b
AE
969static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
970{
971 rbd_dev->mapping.size = 0;
972 rbd_dev->mapping.features = 0;
973 rbd_dev->mapping.read_only = true;
974}
975
98571b5a 976static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 977{
65ccfe21
AE
978 char *name;
979 u64 segment;
980 int ret;
602adf40 981
78c2a44a 982 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
983 if (!name)
984 return NULL;
985 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 986 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 987 rbd_dev->header.object_prefix, segment);
2fd82b9e 988 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
989 pr_err("error formatting segment name for #%llu (%d)\n",
990 segment, ret);
991 kfree(name);
992 name = NULL;
993 }
602adf40 994
65ccfe21
AE
995 return name;
996}
602adf40 997
78c2a44a
AE
998static void rbd_segment_name_free(const char *name)
999{
1000 /* The explicit cast here is needed to drop the const qualifier */
1001
1002 kmem_cache_free(rbd_segment_name_cache, (void *)name);
1003}
1004
65ccfe21
AE
1005static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
1006{
1007 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 1008
65ccfe21
AE
1009 return offset & (segment_size - 1);
1010}
1011
1012static u64 rbd_segment_length(struct rbd_device *rbd_dev,
1013 u64 offset, u64 length)
1014{
1015 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1016
1017 offset &= segment_size - 1;
1018
aafb230e 1019 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1020 if (offset + length > segment_size)
1021 length = segment_size - offset;
1022
1023 return length;
602adf40
YS
1024}
1025
029bcbd8
JD
1026/*
1027 * returns the size of an object in the image
1028 */
1029static u64 rbd_obj_bytes(struct rbd_image_header *header)
1030{
1031 return 1 << header->obj_order;
1032}
1033
602adf40
YS
1034/*
1035 * bio helpers
1036 */
1037
1038static void bio_chain_put(struct bio *chain)
1039{
1040 struct bio *tmp;
1041
1042 while (chain) {
1043 tmp = chain;
1044 chain = chain->bi_next;
1045 bio_put(tmp);
1046 }
1047}
1048
1049/*
1050 * zeros a bio chain, starting at specific offset
1051 */
1052static void zero_bio_chain(struct bio *chain, int start_ofs)
1053{
1054 struct bio_vec *bv;
1055 unsigned long flags;
1056 void *buf;
1057 int i;
1058 int pos = 0;
1059
1060 while (chain) {
1061 bio_for_each_segment(bv, chain, i) {
1062 if (pos + bv->bv_len > start_ofs) {
1063 int remainder = max(start_ofs - pos, 0);
1064 buf = bvec_kmap_irq(bv, &flags);
1065 memset(buf + remainder, 0,
1066 bv->bv_len - remainder);
85b5aaa6 1067 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1068 }
1069 pos += bv->bv_len;
1070 }
1071
1072 chain = chain->bi_next;
1073 }
1074}
1075
b9434c5b
AE
1076/*
1077 * similar to zero_bio_chain(), zeros data defined by a page array,
1078 * starting at the given byte offset from the start of the array and
1079 * continuing up to the given end offset. The pages array is
1080 * assumed to be big enough to hold all bytes up to the end.
1081 */
1082static void zero_pages(struct page **pages, u64 offset, u64 end)
1083{
1084 struct page **page = &pages[offset >> PAGE_SHIFT];
1085
1086 rbd_assert(end > offset);
1087 rbd_assert(end - offset <= (u64)SIZE_MAX);
1088 while (offset < end) {
1089 size_t page_offset;
1090 size_t length;
1091 unsigned long flags;
1092 void *kaddr;
1093
1094 page_offset = (size_t)(offset & ~PAGE_MASK);
1095 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1096 local_irq_save(flags);
1097 kaddr = kmap_atomic(*page);
1098 memset(kaddr + page_offset, 0, length);
1099 kunmap_atomic(kaddr);
1100 local_irq_restore(flags);
1101
1102 offset += length;
1103 page++;
1104 }
1105}
1106
602adf40 1107/*
f7760dad
AE
1108 * Clone a portion of a bio, starting at the given byte offset
1109 * and continuing for the number of bytes indicated.
602adf40 1110 */
f7760dad
AE
1111static struct bio *bio_clone_range(struct bio *bio_src,
1112 unsigned int offset,
1113 unsigned int len,
1114 gfp_t gfpmask)
602adf40 1115{
f7760dad
AE
1116 struct bio_vec *bv;
1117 unsigned int resid;
1118 unsigned short idx;
1119 unsigned int voff;
1120 unsigned short end_idx;
1121 unsigned short vcnt;
1122 struct bio *bio;
1123
1124 /* Handle the easy case for the caller */
1125
1126 if (!offset && len == bio_src->bi_size)
1127 return bio_clone(bio_src, gfpmask);
1128
1129 if (WARN_ON_ONCE(!len))
1130 return NULL;
1131 if (WARN_ON_ONCE(len > bio_src->bi_size))
1132 return NULL;
1133 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1134 return NULL;
1135
1136 /* Find first affected segment... */
1137
1138 resid = offset;
1139 __bio_for_each_segment(bv, bio_src, idx, 0) {
1140 if (resid < bv->bv_len)
1141 break;
1142 resid -= bv->bv_len;
602adf40 1143 }
f7760dad 1144 voff = resid;
602adf40 1145
f7760dad 1146 /* ...and the last affected segment */
602adf40 1147
f7760dad
AE
1148 resid += len;
1149 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1150 if (resid <= bv->bv_len)
1151 break;
1152 resid -= bv->bv_len;
1153 }
1154 vcnt = end_idx - idx + 1;
1155
1156 /* Build the clone */
1157
1158 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1159 if (!bio)
1160 return NULL; /* ENOMEM */
602adf40 1161
f7760dad
AE
1162 bio->bi_bdev = bio_src->bi_bdev;
1163 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1164 bio->bi_rw = bio_src->bi_rw;
1165 bio->bi_flags |= 1 << BIO_CLONED;
1166
1167 /*
1168 * Copy over our part of the bio_vec, then update the first
1169 * and last (or only) entries.
1170 */
1171 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1172 vcnt * sizeof (struct bio_vec));
1173 bio->bi_io_vec[0].bv_offset += voff;
1174 if (vcnt > 1) {
1175 bio->bi_io_vec[0].bv_len -= voff;
1176 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1177 } else {
1178 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1179 }
1180
f7760dad
AE
1181 bio->bi_vcnt = vcnt;
1182 bio->bi_size = len;
1183 bio->bi_idx = 0;
1184
1185 return bio;
1186}
1187
1188/*
1189 * Clone a portion of a bio chain, starting at the given byte offset
1190 * into the first bio in the source chain and continuing for the
1191 * number of bytes indicated. The result is another bio chain of
1192 * exactly the given length, or a null pointer on error.
1193 *
1194 * The bio_src and offset parameters are both in-out. On entry they
1195 * refer to the first source bio and the offset into that bio where
1196 * the start of data to be cloned is located.
1197 *
1198 * On return, bio_src is updated to refer to the bio in the source
1199 * chain that contains first un-cloned byte, and *offset will
1200 * contain the offset of that byte within that bio.
1201 */
1202static struct bio *bio_chain_clone_range(struct bio **bio_src,
1203 unsigned int *offset,
1204 unsigned int len,
1205 gfp_t gfpmask)
1206{
1207 struct bio *bi = *bio_src;
1208 unsigned int off = *offset;
1209 struct bio *chain = NULL;
1210 struct bio **end;
1211
1212 /* Build up a chain of clone bios up to the limit */
1213
1214 if (!bi || off >= bi->bi_size || !len)
1215 return NULL; /* Nothing to clone */
602adf40 1216
f7760dad
AE
1217 end = &chain;
1218 while (len) {
1219 unsigned int bi_size;
1220 struct bio *bio;
1221
f5400b7a
AE
1222 if (!bi) {
1223 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1224 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1225 }
f7760dad
AE
1226 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1227 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1228 if (!bio)
1229 goto out_err; /* ENOMEM */
1230
1231 *end = bio;
1232 end = &bio->bi_next;
602adf40 1233
f7760dad
AE
1234 off += bi_size;
1235 if (off == bi->bi_size) {
1236 bi = bi->bi_next;
1237 off = 0;
1238 }
1239 len -= bi_size;
1240 }
1241 *bio_src = bi;
1242 *offset = off;
1243
1244 return chain;
1245out_err:
1246 bio_chain_put(chain);
602adf40 1247
602adf40
YS
1248 return NULL;
1249}
1250
926f9b3f
AE
1251/*
1252 * The default/initial value for all object request flags is 0. For
1253 * each flag, once its value is set to 1 it is never reset to 0
1254 * again.
1255 */
57acbaa7 1256static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1257{
57acbaa7 1258 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1259 struct rbd_device *rbd_dev;
1260
57acbaa7
AE
1261 rbd_dev = obj_request->img_request->rbd_dev;
1262 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1263 obj_request);
1264 }
1265}
1266
57acbaa7 1267static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1268{
1269 smp_mb();
57acbaa7 1270 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1271}
1272
57acbaa7 1273static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1274{
57acbaa7
AE
1275 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1276 struct rbd_device *rbd_dev = NULL;
6365d33a 1277
57acbaa7
AE
1278 if (obj_request_img_data_test(obj_request))
1279 rbd_dev = obj_request->img_request->rbd_dev;
1280 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1281 obj_request);
1282 }
1283}
1284
57acbaa7 1285static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1286{
1287 smp_mb();
57acbaa7 1288 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1289}
1290
5679c59f
AE
1291/*
1292 * This sets the KNOWN flag after (possibly) setting the EXISTS
1293 * flag. The latter is set based on the "exists" value provided.
1294 *
1295 * Note that for our purposes once an object exists it never goes
1296 * away again. It's possible that the response from two existence
1297 * checks are separated by the creation of the target object, and
1298 * the first ("doesn't exist") response arrives *after* the second
1299 * ("does exist"). In that case we ignore the second one.
1300 */
1301static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1302 bool exists)
1303{
1304 if (exists)
1305 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1306 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1307 smp_mb();
1308}
1309
1310static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1311{
1312 smp_mb();
1313 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1314}
1315
1316static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1317{
1318 smp_mb();
1319 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1320}
1321
bf0d5f50
AE
1322static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1323{
37206ee5
AE
1324 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1325 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1326 kref_get(&obj_request->kref);
1327}
1328
1329static void rbd_obj_request_destroy(struct kref *kref);
1330static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1331{
1332 rbd_assert(obj_request != NULL);
37206ee5
AE
1333 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1334 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1335 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1336}
1337
1338static void rbd_img_request_get(struct rbd_img_request *img_request)
1339{
37206ee5
AE
1340 dout("%s: img %p (was %d)\n", __func__, img_request,
1341 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1342 kref_get(&img_request->kref);
1343}
1344
1345static void rbd_img_request_destroy(struct kref *kref);
1346static void rbd_img_request_put(struct rbd_img_request *img_request)
1347{
1348 rbd_assert(img_request != NULL);
37206ee5
AE
1349 dout("%s: img %p (was %d)\n", __func__, img_request,
1350 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1351 kref_put(&img_request->kref, rbd_img_request_destroy);
1352}
1353
1354static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1355 struct rbd_obj_request *obj_request)
1356{
25dcf954
AE
1357 rbd_assert(obj_request->img_request == NULL);
1358
b155e86c 1359 /* Image request now owns object's original reference */
bf0d5f50 1360 obj_request->img_request = img_request;
25dcf954 1361 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1362 rbd_assert(!obj_request_img_data_test(obj_request));
1363 obj_request_img_data_set(obj_request);
bf0d5f50 1364 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1365 img_request->obj_request_count++;
1366 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1367 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1368 obj_request->which);
bf0d5f50
AE
1369}
1370
1371static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1372 struct rbd_obj_request *obj_request)
1373{
1374 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1375
37206ee5
AE
1376 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1377 obj_request->which);
bf0d5f50 1378 list_del(&obj_request->links);
25dcf954
AE
1379 rbd_assert(img_request->obj_request_count > 0);
1380 img_request->obj_request_count--;
1381 rbd_assert(obj_request->which == img_request->obj_request_count);
1382 obj_request->which = BAD_WHICH;
6365d33a 1383 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1384 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1385 obj_request->img_request = NULL;
25dcf954 1386 obj_request->callback = NULL;
bf0d5f50
AE
1387 rbd_obj_request_put(obj_request);
1388}
1389
1390static bool obj_request_type_valid(enum obj_request_type type)
1391{
1392 switch (type) {
9969ebc5 1393 case OBJ_REQUEST_NODATA:
bf0d5f50 1394 case OBJ_REQUEST_BIO:
788e2df3 1395 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1396 return true;
1397 default:
1398 return false;
1399 }
1400}
1401
bf0d5f50
AE
1402static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1403 struct rbd_obj_request *obj_request)
1404{
37206ee5
AE
1405 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1406
bf0d5f50
AE
1407 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1408}
1409
1410static void rbd_img_request_complete(struct rbd_img_request *img_request)
1411{
55f27e09 1412
37206ee5 1413 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1414
1415 /*
1416 * If no error occurred, compute the aggregate transfer
1417 * count for the image request. We could instead use
1418 * atomic64_cmpxchg() to update it as each object request
1419 * completes; not clear which way is better off hand.
1420 */
1421 if (!img_request->result) {
1422 struct rbd_obj_request *obj_request;
1423 u64 xferred = 0;
1424
1425 for_each_obj_request(img_request, obj_request)
1426 xferred += obj_request->xferred;
1427 img_request->xferred = xferred;
1428 }
1429
bf0d5f50
AE
1430 if (img_request->callback)
1431 img_request->callback(img_request);
1432 else
1433 rbd_img_request_put(img_request);
1434}
1435
788e2df3
AE
1436/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1437
1438static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1439{
37206ee5
AE
1440 dout("%s: obj %p\n", __func__, obj_request);
1441
788e2df3
AE
1442 return wait_for_completion_interruptible(&obj_request->completion);
1443}
1444
0c425248
AE
1445/*
1446 * The default/initial value for all image request flags is 0. Each
1447 * is conditionally set to 1 at image request initialization time
1448 * and currently never change thereafter.
1449 */
1450static void img_request_write_set(struct rbd_img_request *img_request)
1451{
1452 set_bit(IMG_REQ_WRITE, &img_request->flags);
1453 smp_mb();
1454}
1455
1456static bool img_request_write_test(struct rbd_img_request *img_request)
1457{
1458 smp_mb();
1459 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1460}
1461
9849e986
AE
1462static void img_request_child_set(struct rbd_img_request *img_request)
1463{
1464 set_bit(IMG_REQ_CHILD, &img_request->flags);
1465 smp_mb();
1466}
1467
1468static bool img_request_child_test(struct rbd_img_request *img_request)
1469{
1470 smp_mb();
1471 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1472}
1473
d0b2e944
AE
1474static void img_request_layered_set(struct rbd_img_request *img_request)
1475{
1476 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1477 smp_mb();
1478}
1479
1480static bool img_request_layered_test(struct rbd_img_request *img_request)
1481{
1482 smp_mb();
1483 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1484}
1485
6e2a4505
AE
1486static void
1487rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1488{
b9434c5b
AE
1489 u64 xferred = obj_request->xferred;
1490 u64 length = obj_request->length;
1491
6e2a4505
AE
1492 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1493 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1494 xferred, length);
6e2a4505
AE
1495 /*
1496 * ENOENT means a hole in the image. We zero-fill the
1497 * entire length of the request. A short read also implies
1498 * zero-fill to the end of the request. Either way we
1499 * update the xferred count to indicate the whole request
1500 * was satisfied.
1501 */
b9434c5b 1502 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1503 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1504 if (obj_request->type == OBJ_REQUEST_BIO)
1505 zero_bio_chain(obj_request->bio_list, 0);
1506 else
1507 zero_pages(obj_request->pages, 0, length);
6e2a4505 1508 obj_request->result = 0;
b9434c5b
AE
1509 obj_request->xferred = length;
1510 } else if (xferred < length && !obj_request->result) {
1511 if (obj_request->type == OBJ_REQUEST_BIO)
1512 zero_bio_chain(obj_request->bio_list, xferred);
1513 else
1514 zero_pages(obj_request->pages, xferred, length);
1515 obj_request->xferred = length;
6e2a4505
AE
1516 }
1517 obj_request_done_set(obj_request);
1518}
1519
bf0d5f50
AE
1520static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1521{
37206ee5
AE
1522 dout("%s: obj %p cb %p\n", __func__, obj_request,
1523 obj_request->callback);
bf0d5f50
AE
1524 if (obj_request->callback)
1525 obj_request->callback(obj_request);
788e2df3
AE
1526 else
1527 complete_all(&obj_request->completion);
bf0d5f50
AE
1528}
1529
c47f9371 1530static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1531{
1532 dout("%s: obj %p\n", __func__, obj_request);
1533 obj_request_done_set(obj_request);
1534}
1535
c47f9371 1536static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1537{
57acbaa7 1538 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1539 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1540 bool layered = false;
1541
1542 if (obj_request_img_data_test(obj_request)) {
1543 img_request = obj_request->img_request;
1544 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1545 rbd_dev = img_request->rbd_dev;
57acbaa7 1546 }
8b3e1a56
AE
1547
1548 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1549 obj_request, img_request, obj_request->result,
1550 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1551 if (layered && obj_request->result == -ENOENT &&
1552 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1553 rbd_img_parent_read(obj_request);
1554 else if (img_request)
6e2a4505
AE
1555 rbd_img_obj_request_read_callback(obj_request);
1556 else
1557 obj_request_done_set(obj_request);
bf0d5f50
AE
1558}
1559
c47f9371 1560static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1561{
1b83bef2
SW
1562 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1563 obj_request->result, obj_request->length);
1564 /*
8b3e1a56
AE
1565 * There is no such thing as a successful short write. Set
1566 * it to our originally-requested length.
1b83bef2
SW
1567 */
1568 obj_request->xferred = obj_request->length;
07741308 1569 obj_request_done_set(obj_request);
bf0d5f50
AE
1570}
1571
fbfab539
AE
1572/*
1573 * For a simple stat call there's nothing to do. We'll do more if
1574 * this is part of a write sequence for a layered image.
1575 */
c47f9371 1576static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1577{
37206ee5 1578 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1579 obj_request_done_set(obj_request);
1580}
1581
bf0d5f50
AE
1582static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1583 struct ceph_msg *msg)
1584{
1585 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1586 u16 opcode;
1587
37206ee5 1588 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1589 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1590 if (obj_request_img_data_test(obj_request)) {
1591 rbd_assert(obj_request->img_request);
1592 rbd_assert(obj_request->which != BAD_WHICH);
1593 } else {
1594 rbd_assert(obj_request->which == BAD_WHICH);
1595 }
bf0d5f50 1596
1b83bef2
SW
1597 if (osd_req->r_result < 0)
1598 obj_request->result = osd_req->r_result;
bf0d5f50 1599
0eefd470 1600 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1601
c47f9371
AE
1602 /*
1603 * We support a 64-bit length, but ultimately it has to be
1604 * passed to blk_end_request(), which takes an unsigned int.
1605 */
1b83bef2 1606 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1607 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1608 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1609 switch (opcode) {
1610 case CEPH_OSD_OP_READ:
c47f9371 1611 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1612 break;
1613 case CEPH_OSD_OP_WRITE:
c47f9371 1614 rbd_osd_write_callback(obj_request);
bf0d5f50 1615 break;
fbfab539 1616 case CEPH_OSD_OP_STAT:
c47f9371 1617 rbd_osd_stat_callback(obj_request);
fbfab539 1618 break;
36be9a76 1619 case CEPH_OSD_OP_CALL:
b8d70035 1620 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1621 case CEPH_OSD_OP_WATCH:
c47f9371 1622 rbd_osd_trivial_callback(obj_request);
9969ebc5 1623 break;
bf0d5f50
AE
1624 default:
1625 rbd_warn(NULL, "%s: unsupported op %hu\n",
1626 obj_request->object_name, (unsigned short) opcode);
1627 break;
1628 }
1629
07741308 1630 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1631 rbd_obj_request_complete(obj_request);
1632}
1633
9d4df01f 1634static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1635{
1636 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1637 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1638 u64 snap_id;
430c28c3 1639
8c042b0d 1640 rbd_assert(osd_req != NULL);
430c28c3 1641
9d4df01f 1642 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1643 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1644 NULL, snap_id, NULL);
1645}
1646
1647static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1648{
1649 struct rbd_img_request *img_request = obj_request->img_request;
1650 struct ceph_osd_request *osd_req = obj_request->osd_req;
1651 struct ceph_snap_context *snapc;
1652 struct timespec mtime = CURRENT_TIME;
1653
1654 rbd_assert(osd_req != NULL);
1655
1656 snapc = img_request ? img_request->snapc : NULL;
1657 ceph_osdc_build_request(osd_req, obj_request->offset,
1658 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1659}
1660
bf0d5f50
AE
1661static struct ceph_osd_request *rbd_osd_req_create(
1662 struct rbd_device *rbd_dev,
1663 bool write_request,
430c28c3 1664 struct rbd_obj_request *obj_request)
bf0d5f50 1665{
bf0d5f50
AE
1666 struct ceph_snap_context *snapc = NULL;
1667 struct ceph_osd_client *osdc;
1668 struct ceph_osd_request *osd_req;
bf0d5f50 1669
6365d33a
AE
1670 if (obj_request_img_data_test(obj_request)) {
1671 struct rbd_img_request *img_request = obj_request->img_request;
1672
0c425248
AE
1673 rbd_assert(write_request ==
1674 img_request_write_test(img_request));
1675 if (write_request)
bf0d5f50 1676 snapc = img_request->snapc;
bf0d5f50
AE
1677 }
1678
1679 /* Allocate and initialize the request, for the single op */
1680
1681 osdc = &rbd_dev->rbd_client->client->osdc;
1682 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1683 if (!osd_req)
1684 return NULL; /* ENOMEM */
bf0d5f50 1685
430c28c3 1686 if (write_request)
bf0d5f50 1687 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1688 else
bf0d5f50 1689 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1690
1691 osd_req->r_callback = rbd_osd_req_callback;
1692 osd_req->r_priv = obj_request;
1693
1694 osd_req->r_oid_len = strlen(obj_request->object_name);
1695 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1696 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1697
1698 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1699
bf0d5f50
AE
1700 return osd_req;
1701}
1702
0eefd470
AE
1703/*
1704 * Create a copyup osd request based on the information in the
1705 * object request supplied. A copyup request has two osd ops,
1706 * a copyup method call, and a "normal" write request.
1707 */
1708static struct ceph_osd_request *
1709rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1710{
1711 struct rbd_img_request *img_request;
1712 struct ceph_snap_context *snapc;
1713 struct rbd_device *rbd_dev;
1714 struct ceph_osd_client *osdc;
1715 struct ceph_osd_request *osd_req;
1716
1717 rbd_assert(obj_request_img_data_test(obj_request));
1718 img_request = obj_request->img_request;
1719 rbd_assert(img_request);
1720 rbd_assert(img_request_write_test(img_request));
1721
1722 /* Allocate and initialize the request, for the two ops */
1723
1724 snapc = img_request->snapc;
1725 rbd_dev = img_request->rbd_dev;
1726 osdc = &rbd_dev->rbd_client->client->osdc;
1727 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1728 if (!osd_req)
1729 return NULL; /* ENOMEM */
1730
1731 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1732 osd_req->r_callback = rbd_osd_req_callback;
1733 osd_req->r_priv = obj_request;
1734
1735 osd_req->r_oid_len = strlen(obj_request->object_name);
1736 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1737 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1738
1739 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1740
1741 return osd_req;
1742}
1743
1744
bf0d5f50
AE
1745static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1746{
1747 ceph_osdc_put_request(osd_req);
1748}
1749
1750/* object_name is assumed to be a non-null pointer and NUL-terminated */
1751
1752static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1753 u64 offset, u64 length,
1754 enum obj_request_type type)
1755{
1756 struct rbd_obj_request *obj_request;
1757 size_t size;
1758 char *name;
1759
1760 rbd_assert(obj_request_type_valid(type));
1761
1762 size = strlen(object_name) + 1;
f907ad55
AE
1763 name = kmalloc(size, GFP_KERNEL);
1764 if (!name)
bf0d5f50
AE
1765 return NULL;
1766
868311b1 1767 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1768 if (!obj_request) {
1769 kfree(name);
1770 return NULL;
1771 }
1772
bf0d5f50
AE
1773 obj_request->object_name = memcpy(name, object_name, size);
1774 obj_request->offset = offset;
1775 obj_request->length = length;
926f9b3f 1776 obj_request->flags = 0;
bf0d5f50
AE
1777 obj_request->which = BAD_WHICH;
1778 obj_request->type = type;
1779 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1780 init_completion(&obj_request->completion);
bf0d5f50
AE
1781 kref_init(&obj_request->kref);
1782
37206ee5
AE
1783 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1784 offset, length, (int)type, obj_request);
1785
bf0d5f50
AE
1786 return obj_request;
1787}
1788
1789static void rbd_obj_request_destroy(struct kref *kref)
1790{
1791 struct rbd_obj_request *obj_request;
1792
1793 obj_request = container_of(kref, struct rbd_obj_request, kref);
1794
37206ee5
AE
1795 dout("%s: obj %p\n", __func__, obj_request);
1796
bf0d5f50
AE
1797 rbd_assert(obj_request->img_request == NULL);
1798 rbd_assert(obj_request->which == BAD_WHICH);
1799
1800 if (obj_request->osd_req)
1801 rbd_osd_req_destroy(obj_request->osd_req);
1802
1803 rbd_assert(obj_request_type_valid(obj_request->type));
1804 switch (obj_request->type) {
9969ebc5
AE
1805 case OBJ_REQUEST_NODATA:
1806 break; /* Nothing to do */
bf0d5f50
AE
1807 case OBJ_REQUEST_BIO:
1808 if (obj_request->bio_list)
1809 bio_chain_put(obj_request->bio_list);
1810 break;
788e2df3
AE
1811 case OBJ_REQUEST_PAGES:
1812 if (obj_request->pages)
1813 ceph_release_page_vector(obj_request->pages,
1814 obj_request->page_count);
1815 break;
bf0d5f50
AE
1816 }
1817
f907ad55 1818 kfree(obj_request->object_name);
868311b1
AE
1819 obj_request->object_name = NULL;
1820 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1821}
1822
1823/*
1824 * Caller is responsible for filling in the list of object requests
1825 * that comprises the image request, and the Linux request pointer
1826 * (if there is one).
1827 */
cc344fa1
AE
1828static struct rbd_img_request *rbd_img_request_create(
1829 struct rbd_device *rbd_dev,
bf0d5f50 1830 u64 offset, u64 length,
9849e986
AE
1831 bool write_request,
1832 bool child_request)
bf0d5f50
AE
1833{
1834 struct rbd_img_request *img_request;
bf0d5f50 1835
1c2a9dfe 1836 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
1837 if (!img_request)
1838 return NULL;
1839
1840 if (write_request) {
1841 down_read(&rbd_dev->header_rwsem);
812164f8 1842 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1843 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1844 }
1845
1846 img_request->rq = NULL;
1847 img_request->rbd_dev = rbd_dev;
1848 img_request->offset = offset;
1849 img_request->length = length;
0c425248
AE
1850 img_request->flags = 0;
1851 if (write_request) {
1852 img_request_write_set(img_request);
468521c1 1853 img_request->snapc = rbd_dev->header.snapc;
0c425248 1854 } else {
bf0d5f50 1855 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1856 }
9849e986
AE
1857 if (child_request)
1858 img_request_child_set(img_request);
d0b2e944
AE
1859 if (rbd_dev->parent_spec)
1860 img_request_layered_set(img_request);
bf0d5f50
AE
1861 spin_lock_init(&img_request->completion_lock);
1862 img_request->next_completion = 0;
1863 img_request->callback = NULL;
a5a337d4 1864 img_request->result = 0;
bf0d5f50
AE
1865 img_request->obj_request_count = 0;
1866 INIT_LIST_HEAD(&img_request->obj_requests);
1867 kref_init(&img_request->kref);
1868
1869 rbd_img_request_get(img_request); /* Avoid a warning */
1870 rbd_img_request_put(img_request); /* TEMPORARY */
1871
37206ee5
AE
1872 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1873 write_request ? "write" : "read", offset, length,
1874 img_request);
1875
bf0d5f50
AE
1876 return img_request;
1877}
1878
1879static void rbd_img_request_destroy(struct kref *kref)
1880{
1881 struct rbd_img_request *img_request;
1882 struct rbd_obj_request *obj_request;
1883 struct rbd_obj_request *next_obj_request;
1884
1885 img_request = container_of(kref, struct rbd_img_request, kref);
1886
37206ee5
AE
1887 dout("%s: img %p\n", __func__, img_request);
1888
bf0d5f50
AE
1889 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1890 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1891 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1892
0c425248 1893 if (img_request_write_test(img_request))
812164f8 1894 ceph_put_snap_context(img_request->snapc);
bf0d5f50 1895
8b3e1a56
AE
1896 if (img_request_child_test(img_request))
1897 rbd_obj_request_put(img_request->obj_request);
1898
1c2a9dfe 1899 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
1900}
1901
1217857f
AE
1902static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1903{
6365d33a 1904 struct rbd_img_request *img_request;
1217857f
AE
1905 unsigned int xferred;
1906 int result;
8b3e1a56 1907 bool more;
1217857f 1908
6365d33a
AE
1909 rbd_assert(obj_request_img_data_test(obj_request));
1910 img_request = obj_request->img_request;
1911
1217857f
AE
1912 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1913 xferred = (unsigned int)obj_request->xferred;
1914 result = obj_request->result;
1915 if (result) {
1916 struct rbd_device *rbd_dev = img_request->rbd_dev;
1917
1918 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1919 img_request_write_test(img_request) ? "write" : "read",
1920 obj_request->length, obj_request->img_offset,
1921 obj_request->offset);
1922 rbd_warn(rbd_dev, " result %d xferred %x\n",
1923 result, xferred);
1924 if (!img_request->result)
1925 img_request->result = result;
1926 }
1927
f1a4739f
AE
1928 /* Image object requests don't own their page array */
1929
1930 if (obj_request->type == OBJ_REQUEST_PAGES) {
1931 obj_request->pages = NULL;
1932 obj_request->page_count = 0;
1933 }
1934
8b3e1a56
AE
1935 if (img_request_child_test(img_request)) {
1936 rbd_assert(img_request->obj_request != NULL);
1937 more = obj_request->which < img_request->obj_request_count - 1;
1938 } else {
1939 rbd_assert(img_request->rq != NULL);
1940 more = blk_end_request(img_request->rq, result, xferred);
1941 }
1942
1943 return more;
1217857f
AE
1944}
1945
2169238d
AE
1946static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1947{
1948 struct rbd_img_request *img_request;
1949 u32 which = obj_request->which;
1950 bool more = true;
1951
6365d33a 1952 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
1953 img_request = obj_request->img_request;
1954
1955 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1956 rbd_assert(img_request != NULL);
2169238d
AE
1957 rbd_assert(img_request->obj_request_count > 0);
1958 rbd_assert(which != BAD_WHICH);
1959 rbd_assert(which < img_request->obj_request_count);
1960 rbd_assert(which >= img_request->next_completion);
1961
1962 spin_lock_irq(&img_request->completion_lock);
1963 if (which != img_request->next_completion)
1964 goto out;
1965
1966 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
1967 rbd_assert(more);
1968 rbd_assert(which < img_request->obj_request_count);
1969
1970 if (!obj_request_done_test(obj_request))
1971 break;
1217857f 1972 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
1973 which++;
1974 }
1975
1976 rbd_assert(more ^ (which == img_request->obj_request_count));
1977 img_request->next_completion = which;
1978out:
1979 spin_unlock_irq(&img_request->completion_lock);
1980
1981 if (!more)
1982 rbd_img_request_complete(img_request);
1983}
1984
f1a4739f
AE
1985/*
1986 * Split up an image request into one or more object requests, each
1987 * to a different object. The "type" parameter indicates whether
1988 * "data_desc" is the pointer to the head of a list of bio
1989 * structures, or the base of a page array. In either case this
1990 * function assumes data_desc describes memory sufficient to hold
1991 * all data described by the image request.
1992 */
1993static int rbd_img_request_fill(struct rbd_img_request *img_request,
1994 enum obj_request_type type,
1995 void *data_desc)
bf0d5f50
AE
1996{
1997 struct rbd_device *rbd_dev = img_request->rbd_dev;
1998 struct rbd_obj_request *obj_request = NULL;
1999 struct rbd_obj_request *next_obj_request;
0c425248 2000 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
2001 struct bio *bio_list;
2002 unsigned int bio_offset = 0;
2003 struct page **pages;
7da22d29 2004 u64 img_offset;
bf0d5f50
AE
2005 u64 resid;
2006 u16 opcode;
2007
f1a4739f
AE
2008 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
2009 (int)type, data_desc);
37206ee5 2010
430c28c3 2011 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 2012 img_offset = img_request->offset;
bf0d5f50 2013 resid = img_request->length;
4dda41d3 2014 rbd_assert(resid > 0);
f1a4739f
AE
2015
2016 if (type == OBJ_REQUEST_BIO) {
2017 bio_list = data_desc;
2018 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2019 } else {
2020 rbd_assert(type == OBJ_REQUEST_PAGES);
2021 pages = data_desc;
2022 }
2023
bf0d5f50 2024 while (resid) {
2fa12320 2025 struct ceph_osd_request *osd_req;
bf0d5f50 2026 const char *object_name;
bf0d5f50
AE
2027 u64 offset;
2028 u64 length;
2029
7da22d29 2030 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2031 if (!object_name)
2032 goto out_unwind;
7da22d29
AE
2033 offset = rbd_segment_offset(rbd_dev, img_offset);
2034 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2035 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2036 offset, length, type);
78c2a44a
AE
2037 /* object request has its own copy of the object name */
2038 rbd_segment_name_free(object_name);
bf0d5f50
AE
2039 if (!obj_request)
2040 goto out_unwind;
2041
f1a4739f
AE
2042 if (type == OBJ_REQUEST_BIO) {
2043 unsigned int clone_size;
2044
2045 rbd_assert(length <= (u64)UINT_MAX);
2046 clone_size = (unsigned int)length;
2047 obj_request->bio_list =
2048 bio_chain_clone_range(&bio_list,
2049 &bio_offset,
2050 clone_size,
2051 GFP_ATOMIC);
2052 if (!obj_request->bio_list)
2053 goto out_partial;
2054 } else {
2055 unsigned int page_count;
2056
2057 obj_request->pages = pages;
2058 page_count = (u32)calc_pages_for(offset, length);
2059 obj_request->page_count = page_count;
2060 if ((offset + length) & ~PAGE_MASK)
2061 page_count--; /* more on last page */
2062 pages += page_count;
2063 }
bf0d5f50 2064
2fa12320
AE
2065 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2066 obj_request);
2067 if (!osd_req)
bf0d5f50 2068 goto out_partial;
2fa12320 2069 obj_request->osd_req = osd_req;
2169238d 2070 obj_request->callback = rbd_img_obj_callback;
430c28c3 2071
2fa12320
AE
2072 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2073 0, 0);
f1a4739f
AE
2074 if (type == OBJ_REQUEST_BIO)
2075 osd_req_op_extent_osd_data_bio(osd_req, 0,
2076 obj_request->bio_list, length);
2077 else
2078 osd_req_op_extent_osd_data_pages(osd_req, 0,
2079 obj_request->pages, length,
2080 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
2081
2082 if (write_request)
2083 rbd_osd_req_format_write(obj_request);
2084 else
2085 rbd_osd_req_format_read(obj_request);
430c28c3 2086
7da22d29 2087 obj_request->img_offset = img_offset;
bf0d5f50
AE
2088 rbd_img_obj_request_add(img_request, obj_request);
2089
7da22d29 2090 img_offset += length;
bf0d5f50
AE
2091 resid -= length;
2092 }
2093
2094 return 0;
2095
2096out_partial:
2097 rbd_obj_request_put(obj_request);
2098out_unwind:
2099 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2100 rbd_obj_request_put(obj_request);
2101
2102 return -ENOMEM;
2103}
2104
0eefd470
AE
2105static void
2106rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2107{
2108 struct rbd_img_request *img_request;
2109 struct rbd_device *rbd_dev;
2110 u64 length;
2111 u32 page_count;
2112
2113 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2114 rbd_assert(obj_request_img_data_test(obj_request));
2115 img_request = obj_request->img_request;
2116 rbd_assert(img_request);
2117
2118 rbd_dev = img_request->rbd_dev;
2119 rbd_assert(rbd_dev);
2120 length = (u64)1 << rbd_dev->header.obj_order;
2121 page_count = (u32)calc_pages_for(0, length);
2122
2123 rbd_assert(obj_request->copyup_pages);
2124 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2125 obj_request->copyup_pages = NULL;
2126
2127 /*
2128 * We want the transfer count to reflect the size of the
2129 * original write request. There is no such thing as a
2130 * successful short write, so if the request was successful
2131 * we can just set it to the originally-requested length.
2132 */
2133 if (!obj_request->result)
2134 obj_request->xferred = obj_request->length;
2135
2136 /* Finish up with the normal image object callback */
2137
2138 rbd_img_obj_callback(obj_request);
2139}
2140
3d7efd18
AE
2141static void
2142rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2143{
2144 struct rbd_obj_request *orig_request;
0eefd470
AE
2145 struct ceph_osd_request *osd_req;
2146 struct ceph_osd_client *osdc;
2147 struct rbd_device *rbd_dev;
3d7efd18 2148 struct page **pages;
3d7efd18
AE
2149 int result;
2150 u64 obj_size;
2151 u64 xferred;
2152
2153 rbd_assert(img_request_child_test(img_request));
2154
2155 /* First get what we need from the image request */
2156
2157 pages = img_request->copyup_pages;
2158 rbd_assert(pages != NULL);
2159 img_request->copyup_pages = NULL;
2160
2161 orig_request = img_request->obj_request;
2162 rbd_assert(orig_request != NULL);
0eefd470 2163 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
3d7efd18
AE
2164 result = img_request->result;
2165 obj_size = img_request->length;
2166 xferred = img_request->xferred;
2167
0eefd470
AE
2168 rbd_dev = img_request->rbd_dev;
2169 rbd_assert(rbd_dev);
2170 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2171
3d7efd18
AE
2172 rbd_img_request_put(img_request);
2173
0eefd470
AE
2174 if (result)
2175 goto out_err;
2176
2177 /* Allocate the new copyup osd request for the original request */
2178
2179 result = -ENOMEM;
2180 rbd_assert(!orig_request->osd_req);
2181 osd_req = rbd_osd_req_create_copyup(orig_request);
2182 if (!osd_req)
2183 goto out_err;
2184 orig_request->osd_req = osd_req;
2185 orig_request->copyup_pages = pages;
3d7efd18 2186
0eefd470 2187 /* Initialize the copyup op */
3d7efd18 2188
0eefd470
AE
2189 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2190 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2191 false, false);
3d7efd18 2192
0eefd470
AE
2193 /* Then the original write request op */
2194
2195 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2196 orig_request->offset,
2197 orig_request->length, 0, 0);
2198 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2199 orig_request->length);
2200
2201 rbd_osd_req_format_write(orig_request);
2202
2203 /* All set, send it off. */
2204
2205 orig_request->callback = rbd_img_obj_copyup_callback;
2206 osdc = &rbd_dev->rbd_client->client->osdc;
2207 result = rbd_obj_request_submit(osdc, orig_request);
2208 if (!result)
2209 return;
2210out_err:
2211 /* Record the error code and complete the request */
2212
2213 orig_request->result = result;
2214 orig_request->xferred = 0;
2215 obj_request_done_set(orig_request);
2216 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2217}
2218
2219/*
2220 * Read from the parent image the range of data that covers the
2221 * entire target of the given object request. This is used for
2222 * satisfying a layered image write request when the target of an
2223 * object request from the image request does not exist.
2224 *
2225 * A page array big enough to hold the returned data is allocated
2226 * and supplied to rbd_img_request_fill() as the "data descriptor."
2227 * When the read completes, this page array will be transferred to
2228 * the original object request for the copyup operation.
2229 *
2230 * If an error occurs, record it as the result of the original
2231 * object request and mark it done so it gets completed.
2232 */
2233static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2234{
2235 struct rbd_img_request *img_request = NULL;
2236 struct rbd_img_request *parent_request = NULL;
2237 struct rbd_device *rbd_dev;
2238 u64 img_offset;
2239 u64 length;
2240 struct page **pages = NULL;
2241 u32 page_count;
2242 int result;
2243
2244 rbd_assert(obj_request_img_data_test(obj_request));
2245 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2246
2247 img_request = obj_request->img_request;
2248 rbd_assert(img_request != NULL);
2249 rbd_dev = img_request->rbd_dev;
2250 rbd_assert(rbd_dev->parent != NULL);
2251
0eefd470
AE
2252 /*
2253 * First things first. The original osd request is of no
2254 * use to use any more, we'll need a new one that can hold
2255 * the two ops in a copyup request. We'll get that later,
2256 * but for now we can release the old one.
2257 */
2258 rbd_osd_req_destroy(obj_request->osd_req);
2259 obj_request->osd_req = NULL;
2260
3d7efd18
AE
2261 /*
2262 * Determine the byte range covered by the object in the
2263 * child image to which the original request was to be sent.
2264 */
2265 img_offset = obj_request->img_offset - obj_request->offset;
2266 length = (u64)1 << rbd_dev->header.obj_order;
2267
a9e8ba2c
AE
2268 /*
2269 * There is no defined parent data beyond the parent
2270 * overlap, so limit what we read at that boundary if
2271 * necessary.
2272 */
2273 if (img_offset + length > rbd_dev->parent_overlap) {
2274 rbd_assert(img_offset < rbd_dev->parent_overlap);
2275 length = rbd_dev->parent_overlap - img_offset;
2276 }
2277
3d7efd18
AE
2278 /*
2279 * Allocate a page array big enough to receive the data read
2280 * from the parent.
2281 */
2282 page_count = (u32)calc_pages_for(0, length);
2283 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2284 if (IS_ERR(pages)) {
2285 result = PTR_ERR(pages);
2286 pages = NULL;
2287 goto out_err;
2288 }
2289
2290 result = -ENOMEM;
2291 parent_request = rbd_img_request_create(rbd_dev->parent,
2292 img_offset, length,
2293 false, true);
2294 if (!parent_request)
2295 goto out_err;
2296 rbd_obj_request_get(obj_request);
2297 parent_request->obj_request = obj_request;
2298
2299 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2300 if (result)
2301 goto out_err;
2302 parent_request->copyup_pages = pages;
2303
2304 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2305 result = rbd_img_request_submit(parent_request);
2306 if (!result)
2307 return 0;
2308
2309 parent_request->copyup_pages = NULL;
2310 parent_request->obj_request = NULL;
2311 rbd_obj_request_put(obj_request);
2312out_err:
2313 if (pages)
2314 ceph_release_page_vector(pages, page_count);
2315 if (parent_request)
2316 rbd_img_request_put(parent_request);
2317 obj_request->result = result;
2318 obj_request->xferred = 0;
2319 obj_request_done_set(obj_request);
2320
2321 return result;
2322}
2323
c5b5ef6c
AE
2324static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2325{
c5b5ef6c
AE
2326 struct rbd_obj_request *orig_request;
2327 int result;
2328
2329 rbd_assert(!obj_request_img_data_test(obj_request));
2330
2331 /*
2332 * All we need from the object request is the original
2333 * request and the result of the STAT op. Grab those, then
2334 * we're done with the request.
2335 */
2336 orig_request = obj_request->obj_request;
2337 obj_request->obj_request = NULL;
2338 rbd_assert(orig_request);
2339 rbd_assert(orig_request->img_request);
2340
2341 result = obj_request->result;
2342 obj_request->result = 0;
2343
2344 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2345 obj_request, orig_request, result,
2346 obj_request->xferred, obj_request->length);
2347 rbd_obj_request_put(obj_request);
2348
2349 rbd_assert(orig_request);
2350 rbd_assert(orig_request->img_request);
c5b5ef6c
AE
2351
2352 /*
2353 * Our only purpose here is to determine whether the object
2354 * exists, and we don't want to treat the non-existence as
2355 * an error. If something else comes back, transfer the
2356 * error to the original request and complete it now.
2357 */
2358 if (!result) {
2359 obj_request_existence_set(orig_request, true);
2360 } else if (result == -ENOENT) {
2361 obj_request_existence_set(orig_request, false);
2362 } else if (result) {
2363 orig_request->result = result;
3d7efd18 2364 goto out;
c5b5ef6c
AE
2365 }
2366
2367 /*
2368 * Resubmit the original request now that we have recorded
2369 * whether the target object exists.
2370 */
b454e36d 2371 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2372out:
c5b5ef6c
AE
2373 if (orig_request->result)
2374 rbd_obj_request_complete(orig_request);
2375 rbd_obj_request_put(orig_request);
2376}
2377
2378static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2379{
2380 struct rbd_obj_request *stat_request;
2381 struct rbd_device *rbd_dev;
2382 struct ceph_osd_client *osdc;
2383 struct page **pages = NULL;
2384 u32 page_count;
2385 size_t size;
2386 int ret;
2387
2388 /*
2389 * The response data for a STAT call consists of:
2390 * le64 length;
2391 * struct {
2392 * le32 tv_sec;
2393 * le32 tv_nsec;
2394 * } mtime;
2395 */
2396 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2397 page_count = (u32)calc_pages_for(0, size);
2398 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2399 if (IS_ERR(pages))
2400 return PTR_ERR(pages);
2401
2402 ret = -ENOMEM;
2403 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2404 OBJ_REQUEST_PAGES);
2405 if (!stat_request)
2406 goto out;
2407
2408 rbd_obj_request_get(obj_request);
2409 stat_request->obj_request = obj_request;
2410 stat_request->pages = pages;
2411 stat_request->page_count = page_count;
2412
2413 rbd_assert(obj_request->img_request);
2414 rbd_dev = obj_request->img_request->rbd_dev;
2415 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2416 stat_request);
2417 if (!stat_request->osd_req)
2418 goto out;
2419 stat_request->callback = rbd_img_obj_exists_callback;
2420
2421 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2422 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2423 false, false);
9d4df01f 2424 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2425
2426 osdc = &rbd_dev->rbd_client->client->osdc;
2427 ret = rbd_obj_request_submit(osdc, stat_request);
2428out:
2429 if (ret)
2430 rbd_obj_request_put(obj_request);
2431
2432 return ret;
2433}
2434
b454e36d
AE
2435static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2436{
2437 struct rbd_img_request *img_request;
a9e8ba2c 2438 struct rbd_device *rbd_dev;
3d7efd18 2439 bool known;
b454e36d
AE
2440
2441 rbd_assert(obj_request_img_data_test(obj_request));
2442
2443 img_request = obj_request->img_request;
2444 rbd_assert(img_request);
a9e8ba2c 2445 rbd_dev = img_request->rbd_dev;
b454e36d 2446
b454e36d 2447 /*
a9e8ba2c
AE
2448 * Only writes to layered images need special handling.
2449 * Reads and non-layered writes are simple object requests.
2450 * Layered writes that start beyond the end of the overlap
2451 * with the parent have no parent data, so they too are
2452 * simple object requests. Finally, if the target object is
2453 * known to already exist, its parent data has already been
2454 * copied, so a write to the object can also be handled as a
2455 * simple object request.
b454e36d
AE
2456 */
2457 if (!img_request_write_test(img_request) ||
2458 !img_request_layered_test(img_request) ||
a9e8ba2c 2459 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2460 ((known = obj_request_known_test(obj_request)) &&
2461 obj_request_exists_test(obj_request))) {
b454e36d
AE
2462
2463 struct rbd_device *rbd_dev;
2464 struct ceph_osd_client *osdc;
2465
2466 rbd_dev = obj_request->img_request->rbd_dev;
2467 osdc = &rbd_dev->rbd_client->client->osdc;
2468
2469 return rbd_obj_request_submit(osdc, obj_request);
2470 }
2471
2472 /*
3d7efd18
AE
2473 * It's a layered write. The target object might exist but
2474 * we may not know that yet. If we know it doesn't exist,
2475 * start by reading the data for the full target object from
2476 * the parent so we can use it for a copyup to the target.
b454e36d 2477 */
3d7efd18
AE
2478 if (known)
2479 return rbd_img_obj_parent_read_full(obj_request);
2480
2481 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2482
2483 return rbd_img_obj_exists_submit(obj_request);
2484}
2485
bf0d5f50
AE
2486static int rbd_img_request_submit(struct rbd_img_request *img_request)
2487{
bf0d5f50 2488 struct rbd_obj_request *obj_request;
46faeed4 2489 struct rbd_obj_request *next_obj_request;
bf0d5f50 2490
37206ee5 2491 dout("%s: img %p\n", __func__, img_request);
46faeed4 2492 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2493 int ret;
2494
b454e36d 2495 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2496 if (ret)
2497 return ret;
bf0d5f50
AE
2498 }
2499
2500 return 0;
2501}
8b3e1a56
AE
2502
2503static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2504{
2505 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2506 struct rbd_device *rbd_dev;
2507 u64 obj_end;
8b3e1a56
AE
2508
2509 rbd_assert(img_request_child_test(img_request));
2510
2511 obj_request = img_request->obj_request;
a9e8ba2c
AE
2512 rbd_assert(obj_request);
2513 rbd_assert(obj_request->img_request);
2514
8b3e1a56 2515 obj_request->result = img_request->result;
a9e8ba2c
AE
2516 if (obj_request->result)
2517 goto out;
2518
2519 /*
2520 * We need to zero anything beyond the parent overlap
2521 * boundary. Since rbd_img_obj_request_read_callback()
2522 * will zero anything beyond the end of a short read, an
2523 * easy way to do this is to pretend the data from the
2524 * parent came up short--ending at the overlap boundary.
2525 */
2526 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2527 obj_end = obj_request->img_offset + obj_request->length;
2528 rbd_dev = obj_request->img_request->rbd_dev;
2529 if (obj_end > rbd_dev->parent_overlap) {
2530 u64 xferred = 0;
2531
2532 if (obj_request->img_offset < rbd_dev->parent_overlap)
2533 xferred = rbd_dev->parent_overlap -
2534 obj_request->img_offset;
8b3e1a56 2535
a9e8ba2c
AE
2536 obj_request->xferred = min(img_request->xferred, xferred);
2537 } else {
2538 obj_request->xferred = img_request->xferred;
2539 }
2540out:
b5b09be3 2541 rbd_img_request_put(img_request);
8b3e1a56
AE
2542 rbd_img_obj_request_read_callback(obj_request);
2543 rbd_obj_request_complete(obj_request);
2544}
2545
2546static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2547{
2548 struct rbd_device *rbd_dev;
2549 struct rbd_img_request *img_request;
2550 int result;
2551
2552 rbd_assert(obj_request_img_data_test(obj_request));
2553 rbd_assert(obj_request->img_request != NULL);
2554 rbd_assert(obj_request->result == (s32) -ENOENT);
2555 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2556
2557 rbd_dev = obj_request->img_request->rbd_dev;
2558 rbd_assert(rbd_dev->parent != NULL);
2559 /* rbd_read_finish(obj_request, obj_request->length); */
2560 img_request = rbd_img_request_create(rbd_dev->parent,
2561 obj_request->img_offset,
2562 obj_request->length,
2563 false, true);
2564 result = -ENOMEM;
2565 if (!img_request)
2566 goto out_err;
2567
2568 rbd_obj_request_get(obj_request);
2569 img_request->obj_request = obj_request;
2570
f1a4739f
AE
2571 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2572 obj_request->bio_list);
8b3e1a56
AE
2573 if (result)
2574 goto out_err;
2575
2576 img_request->callback = rbd_img_parent_read_callback;
2577 result = rbd_img_request_submit(img_request);
2578 if (result)
2579 goto out_err;
2580
2581 return;
2582out_err:
2583 if (img_request)
2584 rbd_img_request_put(img_request);
2585 obj_request->result = result;
2586 obj_request->xferred = 0;
2587 obj_request_done_set(obj_request);
2588}
bf0d5f50 2589
cc4a38bd 2590static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2591{
2592 struct rbd_obj_request *obj_request;
2169238d 2593 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2594 int ret;
2595
2596 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2597 OBJ_REQUEST_NODATA);
2598 if (!obj_request)
2599 return -ENOMEM;
2600
2601 ret = -ENOMEM;
430c28c3 2602 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2603 if (!obj_request->osd_req)
2604 goto out;
2169238d 2605 obj_request->callback = rbd_obj_request_put;
b8d70035 2606
c99d2d4a 2607 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2608 notify_id, 0, 0);
9d4df01f 2609 rbd_osd_req_format_read(obj_request);
430c28c3 2610
b8d70035 2611 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2612out:
cf81b60e
AE
2613 if (ret)
2614 rbd_obj_request_put(obj_request);
b8d70035
AE
2615
2616 return ret;
2617}
2618
2619static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2620{
2621 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2622 int ret;
b8d70035
AE
2623
2624 if (!rbd_dev)
2625 return;
2626
37206ee5 2627 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2628 rbd_dev->header_name, (unsigned long long)notify_id,
2629 (unsigned int)opcode);
e627db08
AE
2630 ret = rbd_dev_refresh(rbd_dev);
2631 if (ret)
2632 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
b8d70035 2633
cc4a38bd 2634 rbd_obj_notify_ack(rbd_dev, notify_id);
b8d70035
AE
2635}
2636
9969ebc5
AE
2637/*
2638 * Request sync osd watch/unwatch. The value of "start" determines
2639 * whether a watch request is being initiated or torn down.
2640 */
2641static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2642{
2643 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2644 struct rbd_obj_request *obj_request;
9969ebc5
AE
2645 int ret;
2646
2647 rbd_assert(start ^ !!rbd_dev->watch_event);
2648 rbd_assert(start ^ !!rbd_dev->watch_request);
2649
2650 if (start) {
3c663bbd 2651 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2652 &rbd_dev->watch_event);
2653 if (ret < 0)
2654 return ret;
8eb87565 2655 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2656 }
2657
2658 ret = -ENOMEM;
2659 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2660 OBJ_REQUEST_NODATA);
2661 if (!obj_request)
2662 goto out_cancel;
2663
430c28c3
AE
2664 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2665 if (!obj_request->osd_req)
2666 goto out_cancel;
2667
8eb87565 2668 if (start)
975241af 2669 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2670 else
6977c3f9 2671 ceph_osdc_unregister_linger_request(osdc,
975241af 2672 rbd_dev->watch_request->osd_req);
2169238d
AE
2673
2674 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
b21ebddd 2675 rbd_dev->watch_event->cookie, 0, start);
9d4df01f 2676 rbd_osd_req_format_write(obj_request);
2169238d 2677
9969ebc5
AE
2678 ret = rbd_obj_request_submit(osdc, obj_request);
2679 if (ret)
2680 goto out_cancel;
2681 ret = rbd_obj_request_wait(obj_request);
2682 if (ret)
2683 goto out_cancel;
9969ebc5
AE
2684 ret = obj_request->result;
2685 if (ret)
2686 goto out_cancel;
2687
8eb87565
AE
2688 /*
2689 * A watch request is set to linger, so the underlying osd
2690 * request won't go away until we unregister it. We retain
2691 * a pointer to the object request during that time (in
2692 * rbd_dev->watch_request), so we'll keep a reference to
2693 * it. We'll drop that reference (below) after we've
2694 * unregistered it.
2695 */
2696 if (start) {
2697 rbd_dev->watch_request = obj_request;
2698
2699 return 0;
2700 }
2701
2702 /* We have successfully torn down the watch request */
2703
2704 rbd_obj_request_put(rbd_dev->watch_request);
2705 rbd_dev->watch_request = NULL;
9969ebc5
AE
2706out_cancel:
2707 /* Cancel the event if we're tearing down, or on error */
2708 ceph_osdc_cancel_event(rbd_dev->watch_event);
2709 rbd_dev->watch_event = NULL;
9969ebc5
AE
2710 if (obj_request)
2711 rbd_obj_request_put(obj_request);
2712
2713 return ret;
2714}
2715
36be9a76 2716/*
f40eb349
AE
2717 * Synchronous osd object method call. Returns the number of bytes
2718 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2719 */
2720static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2721 const char *object_name,
2722 const char *class_name,
2723 const char *method_name,
4157976b 2724 const void *outbound,
36be9a76 2725 size_t outbound_size,
4157976b 2726 void *inbound,
e2a58ee5 2727 size_t inbound_size)
36be9a76 2728{
2169238d 2729 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2730 struct rbd_obj_request *obj_request;
36be9a76
AE
2731 struct page **pages;
2732 u32 page_count;
2733 int ret;
2734
2735 /*
6010a451
AE
2736 * Method calls are ultimately read operations. The result
2737 * should placed into the inbound buffer provided. They
2738 * also supply outbound data--parameters for the object
2739 * method. Currently if this is present it will be a
2740 * snapshot id.
36be9a76 2741 */
57385b51 2742 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2743 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2744 if (IS_ERR(pages))
2745 return PTR_ERR(pages);
2746
2747 ret = -ENOMEM;
6010a451 2748 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2749 OBJ_REQUEST_PAGES);
2750 if (!obj_request)
2751 goto out;
2752
2753 obj_request->pages = pages;
2754 obj_request->page_count = page_count;
2755
430c28c3 2756 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2757 if (!obj_request->osd_req)
2758 goto out;
2759
c99d2d4a 2760 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2761 class_name, method_name);
2762 if (outbound_size) {
2763 struct ceph_pagelist *pagelist;
2764
2765 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2766 if (!pagelist)
2767 goto out;
2768
2769 ceph_pagelist_init(pagelist);
2770 ceph_pagelist_append(pagelist, outbound, outbound_size);
2771 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2772 pagelist);
2773 }
a4ce40a9
AE
2774 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2775 obj_request->pages, inbound_size,
44cd188d 2776 0, false, false);
9d4df01f 2777 rbd_osd_req_format_read(obj_request);
430c28c3 2778
36be9a76
AE
2779 ret = rbd_obj_request_submit(osdc, obj_request);
2780 if (ret)
2781 goto out;
2782 ret = rbd_obj_request_wait(obj_request);
2783 if (ret)
2784 goto out;
2785
2786 ret = obj_request->result;
2787 if (ret < 0)
2788 goto out;
57385b51
AE
2789
2790 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2791 ret = (int)obj_request->xferred;
903bb32e 2792 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
2793out:
2794 if (obj_request)
2795 rbd_obj_request_put(obj_request);
2796 else
2797 ceph_release_page_vector(pages, page_count);
2798
2799 return ret;
2800}
2801
bf0d5f50 2802static void rbd_request_fn(struct request_queue *q)
cc344fa1 2803 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
2804{
2805 struct rbd_device *rbd_dev = q->queuedata;
2806 bool read_only = rbd_dev->mapping.read_only;
2807 struct request *rq;
2808 int result;
2809
2810 while ((rq = blk_fetch_request(q))) {
2811 bool write_request = rq_data_dir(rq) == WRITE;
2812 struct rbd_img_request *img_request;
2813 u64 offset;
2814 u64 length;
2815
2816 /* Ignore any non-FS requests that filter through. */
2817
2818 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
2819 dout("%s: non-fs request type %d\n", __func__,
2820 (int) rq->cmd_type);
2821 __blk_end_request_all(rq, 0);
2822 continue;
2823 }
2824
2825 /* Ignore/skip any zero-length requests */
2826
2827 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2828 length = (u64) blk_rq_bytes(rq);
2829
2830 if (!length) {
2831 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
2832 __blk_end_request_all(rq, 0);
2833 continue;
2834 }
2835
2836 spin_unlock_irq(q->queue_lock);
2837
2838 /* Disallow writes to a read-only device */
2839
2840 if (write_request) {
2841 result = -EROFS;
2842 if (read_only)
2843 goto end_request;
2844 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2845 }
2846
6d292906
AE
2847 /*
2848 * Quit early if the mapped snapshot no longer
2849 * exists. It's still possible the snapshot will
2850 * have disappeared by the time our request arrives
2851 * at the osd, but there's no sense in sending it if
2852 * we already know.
2853 */
2854 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
2855 dout("request for non-existent snapshot");
2856 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2857 result = -ENXIO;
2858 goto end_request;
2859 }
2860
bf0d5f50 2861 result = -EINVAL;
c0cd10db
AE
2862 if (offset && length > U64_MAX - offset + 1) {
2863 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2864 offset, length);
bf0d5f50 2865 goto end_request; /* Shouldn't happen */
c0cd10db 2866 }
bf0d5f50 2867
00a653e2
AE
2868 result = -EIO;
2869 if (offset + length > rbd_dev->mapping.size) {
2870 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2871 offset, length, rbd_dev->mapping.size);
2872 goto end_request;
2873 }
2874
bf0d5f50
AE
2875 result = -ENOMEM;
2876 img_request = rbd_img_request_create(rbd_dev, offset, length,
9849e986 2877 write_request, false);
bf0d5f50
AE
2878 if (!img_request)
2879 goto end_request;
2880
2881 img_request->rq = rq;
2882
f1a4739f
AE
2883 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2884 rq->bio);
bf0d5f50
AE
2885 if (!result)
2886 result = rbd_img_request_submit(img_request);
2887 if (result)
2888 rbd_img_request_put(img_request);
2889end_request:
2890 spin_lock_irq(q->queue_lock);
2891 if (result < 0) {
7da22d29
AE
2892 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2893 write_request ? "write" : "read",
2894 length, offset, result);
2895
bf0d5f50
AE
2896 __blk_end_request_all(rq, result);
2897 }
2898 }
2899}
2900
602adf40
YS
2901/*
2902 * a queue callback. Makes sure that we don't create a bio that spans across
2903 * multiple osd objects. One exception would be with a single page bios,
f7760dad 2904 * which we handle later at bio_chain_clone_range()
602adf40
YS
2905 */
2906static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2907 struct bio_vec *bvec)
2908{
2909 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
2910 sector_t sector_offset;
2911 sector_t sectors_per_obj;
2912 sector_t obj_sector_offset;
2913 int ret;
2914
2915 /*
2916 * Find how far into its rbd object the partition-relative
2917 * bio start sector is to offset relative to the enclosing
2918 * device.
2919 */
2920 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2921 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2922 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2923
2924 /*
2925 * Compute the number of bytes from that offset to the end
2926 * of the object. Account for what's already used by the bio.
2927 */
2928 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2929 if (ret > bmd->bi_size)
2930 ret -= bmd->bi_size;
2931 else
2932 ret = 0;
2933
2934 /*
2935 * Don't send back more than was asked for. And if the bio
2936 * was empty, let the whole thing through because: "Note
2937 * that a block device *must* allow a single page to be
2938 * added to an empty bio."
2939 */
2940 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2941 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2942 ret = (int) bvec->bv_len;
2943
2944 return ret;
602adf40
YS
2945}
2946
2947static void rbd_free_disk(struct rbd_device *rbd_dev)
2948{
2949 struct gendisk *disk = rbd_dev->disk;
2950
2951 if (!disk)
2952 return;
2953
a0cab924
AE
2954 rbd_dev->disk = NULL;
2955 if (disk->flags & GENHD_FL_UP) {
602adf40 2956 del_gendisk(disk);
a0cab924
AE
2957 if (disk->queue)
2958 blk_cleanup_queue(disk->queue);
2959 }
602adf40
YS
2960 put_disk(disk);
2961}
2962
788e2df3
AE
2963static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2964 const char *object_name,
7097f8df 2965 u64 offset, u64 length, void *buf)
788e2df3
AE
2966
2967{
2169238d 2968 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 2969 struct rbd_obj_request *obj_request;
788e2df3
AE
2970 struct page **pages = NULL;
2971 u32 page_count;
1ceae7ef 2972 size_t size;
788e2df3
AE
2973 int ret;
2974
2975 page_count = (u32) calc_pages_for(offset, length);
2976 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2977 if (IS_ERR(pages))
2978 ret = PTR_ERR(pages);
2979
2980 ret = -ENOMEM;
2981 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2982 OBJ_REQUEST_PAGES);
788e2df3
AE
2983 if (!obj_request)
2984 goto out;
2985
2986 obj_request->pages = pages;
2987 obj_request->page_count = page_count;
2988
430c28c3 2989 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
2990 if (!obj_request->osd_req)
2991 goto out;
2992
c99d2d4a
AE
2993 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2994 offset, length, 0, 0);
406e2c9f 2995 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 2996 obj_request->pages,
44cd188d
AE
2997 obj_request->length,
2998 obj_request->offset & ~PAGE_MASK,
2999 false, false);
9d4df01f 3000 rbd_osd_req_format_read(obj_request);
430c28c3 3001
788e2df3
AE
3002 ret = rbd_obj_request_submit(osdc, obj_request);
3003 if (ret)
3004 goto out;
3005 ret = rbd_obj_request_wait(obj_request);
3006 if (ret)
3007 goto out;
3008
3009 ret = obj_request->result;
3010 if (ret < 0)
3011 goto out;
1ceae7ef
AE
3012
3013 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
3014 size = (size_t) obj_request->xferred;
903bb32e 3015 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3016 rbd_assert(size <= (size_t)INT_MAX);
3017 ret = (int)size;
788e2df3
AE
3018out:
3019 if (obj_request)
3020 rbd_obj_request_put(obj_request);
3021 else
3022 ceph_release_page_vector(pages, page_count);
3023
3024 return ret;
3025}
3026
602adf40 3027/*
4156d998
AE
3028 * Read the complete header for the given rbd device.
3029 *
3030 * Returns a pointer to a dynamically-allocated buffer containing
3031 * the complete and validated header. Caller can pass the address
3032 * of a variable that will be filled in with the version of the
3033 * header object at the time it was read.
3034 *
3035 * Returns a pointer-coded errno if a failure occurs.
602adf40 3036 */
4156d998 3037static struct rbd_image_header_ondisk *
7097f8df 3038rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
602adf40 3039{
4156d998 3040 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3041 u32 snap_count = 0;
4156d998
AE
3042 u64 names_size = 0;
3043 u32 want_count;
3044 int ret;
602adf40 3045
00f1f36f 3046 /*
4156d998
AE
3047 * The complete header will include an array of its 64-bit
3048 * snapshot ids, followed by the names of those snapshots as
3049 * a contiguous block of NUL-terminated strings. Note that
3050 * the number of snapshots could change by the time we read
3051 * it in, in which case we re-read it.
00f1f36f 3052 */
4156d998
AE
3053 do {
3054 size_t size;
3055
3056 kfree(ondisk);
3057
3058 size = sizeof (*ondisk);
3059 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3060 size += names_size;
3061 ondisk = kmalloc(size, GFP_KERNEL);
3062 if (!ondisk)
3063 return ERR_PTR(-ENOMEM);
3064
788e2df3 3065 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3066 0, size, ondisk);
4156d998
AE
3067 if (ret < 0)
3068 goto out_err;
c0cd10db 3069 if ((size_t)ret < size) {
4156d998 3070 ret = -ENXIO;
06ecc6cb
AE
3071 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3072 size, ret);
4156d998
AE
3073 goto out_err;
3074 }
3075 if (!rbd_dev_ondisk_valid(ondisk)) {
3076 ret = -ENXIO;
06ecc6cb 3077 rbd_warn(rbd_dev, "invalid header");
4156d998 3078 goto out_err;
81e759fb 3079 }
602adf40 3080
4156d998
AE
3081 names_size = le64_to_cpu(ondisk->snap_names_len);
3082 want_count = snap_count;
3083 snap_count = le32_to_cpu(ondisk->snap_count);
3084 } while (snap_count != want_count);
00f1f36f 3085
4156d998 3086 return ondisk;
00f1f36f 3087
4156d998
AE
3088out_err:
3089 kfree(ondisk);
3090
3091 return ERR_PTR(ret);
3092}
3093
3094/*
3095 * reload the ondisk the header
3096 */
3097static int rbd_read_header(struct rbd_device *rbd_dev,
3098 struct rbd_image_header *header)
3099{
3100 struct rbd_image_header_ondisk *ondisk;
4156d998 3101 int ret;
602adf40 3102
7097f8df 3103 ondisk = rbd_dev_v1_header_read(rbd_dev);
4156d998
AE
3104 if (IS_ERR(ondisk))
3105 return PTR_ERR(ondisk);
3106 ret = rbd_header_from_disk(header, ondisk);
4156d998
AE
3107 kfree(ondisk);
3108
3109 return ret;
602adf40
YS
3110}
3111
602adf40
YS
3112/*
3113 * only read the first part of the ondisk header, without the snaps info
3114 */
cc4a38bd 3115static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
602adf40
YS
3116{
3117 int ret;
3118 struct rbd_image_header h;
602adf40
YS
3119
3120 ret = rbd_read_header(rbd_dev, &h);
3121 if (ret < 0)
3122 return ret;
3123
a51aa0c0
JD
3124 down_write(&rbd_dev->header_rwsem);
3125
9478554a
AE
3126 /* Update image size, and check for resize of mapped image */
3127 rbd_dev->header.image_size = h.image_size;
29334ba4
AE
3128 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
3129 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3130 rbd_dev->mapping.size = rbd_dev->header.image_size;
9db4b3e3 3131
849b4260 3132 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 3133 kfree(rbd_dev->header.snap_sizes);
849b4260 3134 kfree(rbd_dev->header.snap_names);
d1d25646 3135 /* osd requests may still refer to snapc */
812164f8 3136 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 3137
93a24e08 3138 rbd_dev->header.image_size = h.image_size;
602adf40
YS
3139 rbd_dev->header.snapc = h.snapc;
3140 rbd_dev->header.snap_names = h.snap_names;
3141 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260 3142 /* Free the extra copy of the object prefix */
c0cd10db
AE
3143 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3144 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
849b4260
AE
3145 kfree(h.object_prefix);
3146
c666601a 3147 up_write(&rbd_dev->header_rwsem);
602adf40 3148
dfc5606d 3149 return ret;
602adf40
YS
3150}
3151
15228ede
AE
3152/*
3153 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3154 * has disappeared from the (just updated) snapshot context.
3155 */
3156static void rbd_exists_validate(struct rbd_device *rbd_dev)
3157{
3158 u64 snap_id;
3159
3160 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3161 return;
3162
3163 snap_id = rbd_dev->spec->snap_id;
3164 if (snap_id == CEPH_NOSNAP)
3165 return;
3166
3167 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3168 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3169}
3170
cc4a38bd 3171static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3172{
e627db08 3173 u64 mapping_size;
1fe5e993
AE
3174 int ret;
3175
117973fb 3176 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
e627db08 3177 mapping_size = rbd_dev->mapping.size;
1fe5e993 3178 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb 3179 if (rbd_dev->image_format == 1)
cc4a38bd 3180 ret = rbd_dev_v1_refresh(rbd_dev);
117973fb 3181 else
cc4a38bd 3182 ret = rbd_dev_v2_refresh(rbd_dev);
15228ede
AE
3183
3184 /* If it's a mapped snapshot, validate its EXISTS flag */
3185
3186 rbd_exists_validate(rbd_dev);
1fe5e993 3187 mutex_unlock(&ctl_mutex);
00a653e2
AE
3188 if (mapping_size != rbd_dev->mapping.size) {
3189 sector_t size;
3190
3191 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3192 dout("setting size to %llu sectors", (unsigned long long)size);
3193 set_capacity(rbd_dev->disk, size);
a3fbe5d4 3194 revalidate_disk(rbd_dev->disk);
00a653e2 3195 }
1fe5e993
AE
3196
3197 return ret;
3198}
3199
602adf40
YS
3200static int rbd_init_disk(struct rbd_device *rbd_dev)
3201{
3202 struct gendisk *disk;
3203 struct request_queue *q;
593a9e7b 3204 u64 segment_size;
602adf40 3205
602adf40 3206 /* create gendisk info */
602adf40
YS
3207 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3208 if (!disk)
1fcdb8aa 3209 return -ENOMEM;
602adf40 3210
f0f8cef5 3211 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3212 rbd_dev->dev_id);
602adf40
YS
3213 disk->major = rbd_dev->major;
3214 disk->first_minor = 0;
3215 disk->fops = &rbd_bd_ops;
3216 disk->private_data = rbd_dev;
3217
bf0d5f50 3218 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3219 if (!q)
3220 goto out_disk;
029bcbd8 3221
593a9e7b
AE
3222 /* We use the default size, but let's be explicit about it. */
3223 blk_queue_physical_block_size(q, SECTOR_SIZE);
3224
029bcbd8 3225 /* set io sizes to object size */
593a9e7b
AE
3226 segment_size = rbd_obj_bytes(&rbd_dev->header);
3227 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3228 blk_queue_max_segment_size(q, segment_size);
3229 blk_queue_io_min(q, segment_size);
3230 blk_queue_io_opt(q, segment_size);
029bcbd8 3231
602adf40
YS
3232 blk_queue_merge_bvec(q, rbd_merge_bvec);
3233 disk->queue = q;
3234
3235 q->queuedata = rbd_dev;
3236
3237 rbd_dev->disk = disk;
602adf40 3238
602adf40 3239 return 0;
602adf40
YS
3240out_disk:
3241 put_disk(disk);
1fcdb8aa
AE
3242
3243 return -ENOMEM;
602adf40
YS
3244}
3245
dfc5606d
YS
3246/*
3247 sysfs
3248*/
3249
593a9e7b
AE
3250static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3251{
3252 return container_of(dev, struct rbd_device, dev);
3253}
3254
dfc5606d
YS
3255static ssize_t rbd_size_show(struct device *dev,
3256 struct device_attribute *attr, char *buf)
3257{
593a9e7b 3258 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3259
fc71d833
AE
3260 return sprintf(buf, "%llu\n",
3261 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3262}
3263
34b13184
AE
3264/*
3265 * Note this shows the features for whatever's mapped, which is not
3266 * necessarily the base image.
3267 */
3268static ssize_t rbd_features_show(struct device *dev,
3269 struct device_attribute *attr, char *buf)
3270{
3271 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3272
3273 return sprintf(buf, "0x%016llx\n",
fc71d833 3274 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3275}
3276
dfc5606d
YS
3277static ssize_t rbd_major_show(struct device *dev,
3278 struct device_attribute *attr, char *buf)
3279{
593a9e7b 3280 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3281
fc71d833
AE
3282 if (rbd_dev->major)
3283 return sprintf(buf, "%d\n", rbd_dev->major);
3284
3285 return sprintf(buf, "(none)\n");
3286
dfc5606d
YS
3287}
3288
3289static ssize_t rbd_client_id_show(struct device *dev,
3290 struct device_attribute *attr, char *buf)
602adf40 3291{
593a9e7b 3292 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3293
1dbb4399
AE
3294 return sprintf(buf, "client%lld\n",
3295 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3296}
3297
dfc5606d
YS
3298static ssize_t rbd_pool_show(struct device *dev,
3299 struct device_attribute *attr, char *buf)
602adf40 3300{
593a9e7b 3301 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3302
0d7dbfce 3303 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3304}
3305
9bb2f334
AE
3306static ssize_t rbd_pool_id_show(struct device *dev,
3307 struct device_attribute *attr, char *buf)
3308{
3309 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3310
0d7dbfce 3311 return sprintf(buf, "%llu\n",
fc71d833 3312 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3313}
3314
dfc5606d
YS
3315static ssize_t rbd_name_show(struct device *dev,
3316 struct device_attribute *attr, char *buf)
3317{
593a9e7b 3318 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3319
a92ffdf8
AE
3320 if (rbd_dev->spec->image_name)
3321 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3322
3323 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3324}
3325
589d30e0
AE
3326static ssize_t rbd_image_id_show(struct device *dev,
3327 struct device_attribute *attr, char *buf)
3328{
3329 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3330
0d7dbfce 3331 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3332}
3333
34b13184
AE
3334/*
3335 * Shows the name of the currently-mapped snapshot (or
3336 * RBD_SNAP_HEAD_NAME for the base image).
3337 */
dfc5606d
YS
3338static ssize_t rbd_snap_show(struct device *dev,
3339 struct device_attribute *attr,
3340 char *buf)
3341{
593a9e7b 3342 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3343
0d7dbfce 3344 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3345}
3346
86b00e0d
AE
3347/*
3348 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3349 * for the parent image. If there is no parent, simply shows
3350 * "(no parent image)".
3351 */
3352static ssize_t rbd_parent_show(struct device *dev,
3353 struct device_attribute *attr,
3354 char *buf)
3355{
3356 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3357 struct rbd_spec *spec = rbd_dev->parent_spec;
3358 int count;
3359 char *bufp = buf;
3360
3361 if (!spec)
3362 return sprintf(buf, "(no parent image)\n");
3363
3364 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3365 (unsigned long long) spec->pool_id, spec->pool_name);
3366 if (count < 0)
3367 return count;
3368 bufp += count;
3369
3370 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3371 spec->image_name ? spec->image_name : "(unknown)");
3372 if (count < 0)
3373 return count;
3374 bufp += count;
3375
3376 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3377 (unsigned long long) spec->snap_id, spec->snap_name);
3378 if (count < 0)
3379 return count;
3380 bufp += count;
3381
3382 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3383 if (count < 0)
3384 return count;
3385 bufp += count;
3386
3387 return (ssize_t) (bufp - buf);
3388}
3389
dfc5606d
YS
3390static ssize_t rbd_image_refresh(struct device *dev,
3391 struct device_attribute *attr,
3392 const char *buf,
3393 size_t size)
3394{
593a9e7b 3395 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3396 int ret;
602adf40 3397
cc4a38bd 3398 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3399 if (ret)
3400 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3401
3402 return ret < 0 ? ret : size;
dfc5606d 3403}
602adf40 3404
dfc5606d 3405static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3406static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3407static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3408static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3409static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3410static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3411static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3412static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3413static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3414static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3415static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3416
3417static struct attribute *rbd_attrs[] = {
3418 &dev_attr_size.attr,
34b13184 3419 &dev_attr_features.attr,
dfc5606d
YS
3420 &dev_attr_major.attr,
3421 &dev_attr_client_id.attr,
3422 &dev_attr_pool.attr,
9bb2f334 3423 &dev_attr_pool_id.attr,
dfc5606d 3424 &dev_attr_name.attr,
589d30e0 3425 &dev_attr_image_id.attr,
dfc5606d 3426 &dev_attr_current_snap.attr,
86b00e0d 3427 &dev_attr_parent.attr,
dfc5606d 3428 &dev_attr_refresh.attr,
dfc5606d
YS
3429 NULL
3430};
3431
3432static struct attribute_group rbd_attr_group = {
3433 .attrs = rbd_attrs,
3434};
3435
3436static const struct attribute_group *rbd_attr_groups[] = {
3437 &rbd_attr_group,
3438 NULL
3439};
3440
3441static void rbd_sysfs_dev_release(struct device *dev)
3442{
3443}
3444
3445static struct device_type rbd_device_type = {
3446 .name = "rbd",
3447 .groups = rbd_attr_groups,
3448 .release = rbd_sysfs_dev_release,
3449};
3450
8b8fb99c
AE
3451static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3452{
3453 kref_get(&spec->kref);
3454
3455 return spec;
3456}
3457
3458static void rbd_spec_free(struct kref *kref);
3459static void rbd_spec_put(struct rbd_spec *spec)
3460{
3461 if (spec)
3462 kref_put(&spec->kref, rbd_spec_free);
3463}
3464
3465static struct rbd_spec *rbd_spec_alloc(void)
3466{
3467 struct rbd_spec *spec;
3468
3469 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3470 if (!spec)
3471 return NULL;
3472 kref_init(&spec->kref);
3473
8b8fb99c
AE
3474 return spec;
3475}
3476
3477static void rbd_spec_free(struct kref *kref)
3478{
3479 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3480
3481 kfree(spec->pool_name);
3482 kfree(spec->image_id);
3483 kfree(spec->image_name);
3484 kfree(spec->snap_name);
3485 kfree(spec);
3486}
3487
cc344fa1 3488static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3489 struct rbd_spec *spec)
3490{
3491 struct rbd_device *rbd_dev;
3492
3493 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3494 if (!rbd_dev)
3495 return NULL;
3496
3497 spin_lock_init(&rbd_dev->lock);
6d292906 3498 rbd_dev->flags = 0;
c53d5893 3499 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3500 init_rwsem(&rbd_dev->header_rwsem);
3501
3502 rbd_dev->spec = spec;
3503 rbd_dev->rbd_client = rbdc;
3504
0903e875
AE
3505 /* Initialize the layout used for all rbd requests */
3506
3507 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3508 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3509 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3510 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3511
c53d5893
AE
3512 return rbd_dev;
3513}
3514
3515static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3516{
c53d5893
AE
3517 rbd_put_client(rbd_dev->rbd_client);
3518 rbd_spec_put(rbd_dev->spec);
3519 kfree(rbd_dev);
3520}
3521
9d475de5
AE
3522/*
3523 * Get the size and object order for an image snapshot, or if
3524 * snap_id is CEPH_NOSNAP, gets this information for the base
3525 * image.
3526 */
3527static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3528 u8 *order, u64 *snap_size)
3529{
3530 __le64 snapid = cpu_to_le64(snap_id);
3531 int ret;
3532 struct {
3533 u8 order;
3534 __le64 size;
3535 } __attribute__ ((packed)) size_buf = { 0 };
3536
36be9a76 3537 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3538 "rbd", "get_size",
4157976b 3539 &snapid, sizeof (snapid),
e2a58ee5 3540 &size_buf, sizeof (size_buf));
36be9a76 3541 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3542 if (ret < 0)
3543 return ret;
57385b51
AE
3544 if (ret < sizeof (size_buf))
3545 return -ERANGE;
9d475de5 3546
c86f86e9
AE
3547 if (order)
3548 *order = size_buf.order;
9d475de5
AE
3549 *snap_size = le64_to_cpu(size_buf.size);
3550
3551 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
57385b51
AE
3552 (unsigned long long)snap_id, (unsigned int)*order,
3553 (unsigned long long)*snap_size);
9d475de5
AE
3554
3555 return 0;
3556}
3557
3558static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3559{
3560 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3561 &rbd_dev->header.obj_order,
3562 &rbd_dev->header.image_size);
3563}
3564
1e130199
AE
3565static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3566{
3567 void *reply_buf;
3568 int ret;
3569 void *p;
3570
3571 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3572 if (!reply_buf)
3573 return -ENOMEM;
3574
36be9a76 3575 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3576 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3577 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3578 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3579 if (ret < 0)
3580 goto out;
3581
3582 p = reply_buf;
3583 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3584 p + ret, NULL, GFP_NOIO);
3585 ret = 0;
1e130199
AE
3586
3587 if (IS_ERR(rbd_dev->header.object_prefix)) {
3588 ret = PTR_ERR(rbd_dev->header.object_prefix);
3589 rbd_dev->header.object_prefix = NULL;
3590 } else {
3591 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3592 }
1e130199
AE
3593out:
3594 kfree(reply_buf);
3595
3596 return ret;
3597}
3598
b1b5402a
AE
3599static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3600 u64 *snap_features)
3601{
3602 __le64 snapid = cpu_to_le64(snap_id);
3603 struct {
3604 __le64 features;
3605 __le64 incompat;
4157976b 3606 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3607 u64 incompat;
b1b5402a
AE
3608 int ret;
3609
36be9a76 3610 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3611 "rbd", "get_features",
4157976b 3612 &snapid, sizeof (snapid),
e2a58ee5 3613 &features_buf, sizeof (features_buf));
36be9a76 3614 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3615 if (ret < 0)
3616 return ret;
57385b51
AE
3617 if (ret < sizeof (features_buf))
3618 return -ERANGE;
d889140c
AE
3619
3620 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3621 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3622 return -ENXIO;
d889140c 3623
b1b5402a
AE
3624 *snap_features = le64_to_cpu(features_buf.features);
3625
3626 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3627 (unsigned long long)snap_id,
3628 (unsigned long long)*snap_features,
3629 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3630
3631 return 0;
3632}
3633
3634static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3635{
3636 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3637 &rbd_dev->header.features);
3638}
3639
86b00e0d
AE
3640static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3641{
3642 struct rbd_spec *parent_spec;
3643 size_t size;
3644 void *reply_buf = NULL;
3645 __le64 snapid;
3646 void *p;
3647 void *end;
3648 char *image_id;
3649 u64 overlap;
86b00e0d
AE
3650 int ret;
3651
3652 parent_spec = rbd_spec_alloc();
3653 if (!parent_spec)
3654 return -ENOMEM;
3655
3656 size = sizeof (__le64) + /* pool_id */
3657 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3658 sizeof (__le64) + /* snap_id */
3659 sizeof (__le64); /* overlap */
3660 reply_buf = kmalloc(size, GFP_KERNEL);
3661 if (!reply_buf) {
3662 ret = -ENOMEM;
3663 goto out_err;
3664 }
3665
3666 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3667 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3668 "rbd", "get_parent",
4157976b 3669 &snapid, sizeof (snapid),
e2a58ee5 3670 reply_buf, size);
36be9a76 3671 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3672 if (ret < 0)
3673 goto out_err;
3674
86b00e0d 3675 p = reply_buf;
57385b51
AE
3676 end = reply_buf + ret;
3677 ret = -ERANGE;
86b00e0d
AE
3678 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3679 if (parent_spec->pool_id == CEPH_NOPOOL)
3680 goto out; /* No parent? No problem. */
3681
0903e875
AE
3682 /* The ceph file layout needs to fit pool id in 32 bits */
3683
3684 ret = -EIO;
c0cd10db
AE
3685 if (parent_spec->pool_id > (u64)U32_MAX) {
3686 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3687 (unsigned long long)parent_spec->pool_id, U32_MAX);
57385b51 3688 goto out_err;
c0cd10db 3689 }
0903e875 3690
979ed480 3691 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3692 if (IS_ERR(image_id)) {
3693 ret = PTR_ERR(image_id);
3694 goto out_err;
3695 }
3696 parent_spec->image_id = image_id;
3697 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3698 ceph_decode_64_safe(&p, end, overlap, out_err);
3699
3700 rbd_dev->parent_overlap = overlap;
3701 rbd_dev->parent_spec = parent_spec;
3702 parent_spec = NULL; /* rbd_dev now owns this */
3703out:
3704 ret = 0;
3705out_err:
3706 kfree(reply_buf);
3707 rbd_spec_put(parent_spec);
3708
3709 return ret;
3710}
3711
cc070d59
AE
3712static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3713{
3714 struct {
3715 __le64 stripe_unit;
3716 __le64 stripe_count;
3717 } __attribute__ ((packed)) striping_info_buf = { 0 };
3718 size_t size = sizeof (striping_info_buf);
3719 void *p;
3720 u64 obj_size;
3721 u64 stripe_unit;
3722 u64 stripe_count;
3723 int ret;
3724
3725 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3726 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3727 (char *)&striping_info_buf, size);
cc070d59
AE
3728 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3729 if (ret < 0)
3730 return ret;
3731 if (ret < size)
3732 return -ERANGE;
3733
3734 /*
3735 * We don't actually support the "fancy striping" feature
3736 * (STRIPINGV2) yet, but if the striping sizes are the
3737 * defaults the behavior is the same as before. So find
3738 * out, and only fail if the image has non-default values.
3739 */
3740 ret = -EINVAL;
3741 obj_size = (u64)1 << rbd_dev->header.obj_order;
3742 p = &striping_info_buf;
3743 stripe_unit = ceph_decode_64(&p);
3744 if (stripe_unit != obj_size) {
3745 rbd_warn(rbd_dev, "unsupported stripe unit "
3746 "(got %llu want %llu)",
3747 stripe_unit, obj_size);
3748 return -EINVAL;
3749 }
3750 stripe_count = ceph_decode_64(&p);
3751 if (stripe_count != 1) {
3752 rbd_warn(rbd_dev, "unsupported stripe count "
3753 "(got %llu want 1)", stripe_count);
3754 return -EINVAL;
3755 }
500d0c0f
AE
3756 rbd_dev->header.stripe_unit = stripe_unit;
3757 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3758
3759 return 0;
3760}
3761
9e15b77d
AE
3762static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3763{
3764 size_t image_id_size;
3765 char *image_id;
3766 void *p;
3767 void *end;
3768 size_t size;
3769 void *reply_buf = NULL;
3770 size_t len = 0;
3771 char *image_name = NULL;
3772 int ret;
3773
3774 rbd_assert(!rbd_dev->spec->image_name);
3775
69e7a02f
AE
3776 len = strlen(rbd_dev->spec->image_id);
3777 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3778 image_id = kmalloc(image_id_size, GFP_KERNEL);
3779 if (!image_id)
3780 return NULL;
3781
3782 p = image_id;
4157976b 3783 end = image_id + image_id_size;
57385b51 3784 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
3785
3786 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3787 reply_buf = kmalloc(size, GFP_KERNEL);
3788 if (!reply_buf)
3789 goto out;
3790
36be9a76 3791 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3792 "rbd", "dir_get_name",
3793 image_id, image_id_size,
e2a58ee5 3794 reply_buf, size);
9e15b77d
AE
3795 if (ret < 0)
3796 goto out;
3797 p = reply_buf;
f40eb349
AE
3798 end = reply_buf + ret;
3799
9e15b77d
AE
3800 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3801 if (IS_ERR(image_name))
3802 image_name = NULL;
3803 else
3804 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3805out:
3806 kfree(reply_buf);
3807 kfree(image_id);
3808
3809 return image_name;
3810}
3811
2ad3d716
AE
3812static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3813{
3814 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3815 const char *snap_name;
3816 u32 which = 0;
3817
3818 /* Skip over names until we find the one we are looking for */
3819
3820 snap_name = rbd_dev->header.snap_names;
3821 while (which < snapc->num_snaps) {
3822 if (!strcmp(name, snap_name))
3823 return snapc->snaps[which];
3824 snap_name += strlen(snap_name) + 1;
3825 which++;
3826 }
3827 return CEPH_NOSNAP;
3828}
3829
3830static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3831{
3832 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3833 u32 which;
3834 bool found = false;
3835 u64 snap_id;
3836
3837 for (which = 0; !found && which < snapc->num_snaps; which++) {
3838 const char *snap_name;
3839
3840 snap_id = snapc->snaps[which];
3841 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3842 if (IS_ERR(snap_name))
3843 break;
3844 found = !strcmp(name, snap_name);
3845 kfree(snap_name);
3846 }
3847 return found ? snap_id : CEPH_NOSNAP;
3848}
3849
3850/*
3851 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3852 * no snapshot by that name is found, or if an error occurs.
3853 */
3854static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3855{
3856 if (rbd_dev->image_format == 1)
3857 return rbd_v1_snap_id_by_name(rbd_dev, name);
3858
3859 return rbd_v2_snap_id_by_name(rbd_dev, name);
3860}
3861
9e15b77d 3862/*
2e9f7f1c
AE
3863 * When an rbd image has a parent image, it is identified by the
3864 * pool, image, and snapshot ids (not names). This function fills
3865 * in the names for those ids. (It's OK if we can't figure out the
3866 * name for an image id, but the pool and snapshot ids should always
3867 * exist and have names.) All names in an rbd spec are dynamically
3868 * allocated.
e1d4213f
AE
3869 *
3870 * When an image being mapped (not a parent) is probed, we have the
3871 * pool name and pool id, image name and image id, and the snapshot
3872 * name. The only thing we're missing is the snapshot id.
9e15b77d 3873 */
2e9f7f1c 3874static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 3875{
2e9f7f1c
AE
3876 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3877 struct rbd_spec *spec = rbd_dev->spec;
3878 const char *pool_name;
3879 const char *image_name;
3880 const char *snap_name;
9e15b77d
AE
3881 int ret;
3882
e1d4213f
AE
3883 /*
3884 * An image being mapped will have the pool name (etc.), but
3885 * we need to look up the snapshot id.
3886 */
2e9f7f1c
AE
3887 if (spec->pool_name) {
3888 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 3889 u64 snap_id;
e1d4213f 3890
2ad3d716
AE
3891 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3892 if (snap_id == CEPH_NOSNAP)
e1d4213f 3893 return -ENOENT;
2ad3d716 3894 spec->snap_id = snap_id;
e1d4213f 3895 } else {
2e9f7f1c 3896 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
3897 }
3898
3899 return 0;
3900 }
9e15b77d 3901
2e9f7f1c 3902 /* Get the pool name; we have to make our own copy of this */
9e15b77d 3903
2e9f7f1c
AE
3904 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3905 if (!pool_name) {
3906 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
3907 return -EIO;
3908 }
2e9f7f1c
AE
3909 pool_name = kstrdup(pool_name, GFP_KERNEL);
3910 if (!pool_name)
9e15b77d
AE
3911 return -ENOMEM;
3912
3913 /* Fetch the image name; tolerate failure here */
3914
2e9f7f1c
AE
3915 image_name = rbd_dev_image_name(rbd_dev);
3916 if (!image_name)
06ecc6cb 3917 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 3918
2e9f7f1c 3919 /* Look up the snapshot name, and make a copy */
9e15b77d 3920
2e9f7f1c 3921 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
2e9f7f1c
AE
3922 if (!snap_name) {
3923 ret = -ENOMEM;
9e15b77d 3924 goto out_err;
2e9f7f1c
AE
3925 }
3926
3927 spec->pool_name = pool_name;
3928 spec->image_name = image_name;
3929 spec->snap_name = snap_name;
9e15b77d
AE
3930
3931 return 0;
3932out_err:
2e9f7f1c
AE
3933 kfree(image_name);
3934 kfree(pool_name);
9e15b77d
AE
3935
3936 return ret;
3937}
3938
cc4a38bd 3939static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
3940{
3941 size_t size;
3942 int ret;
3943 void *reply_buf;
3944 void *p;
3945 void *end;
3946 u64 seq;
3947 u32 snap_count;
3948 struct ceph_snap_context *snapc;
3949 u32 i;
3950
3951 /*
3952 * We'll need room for the seq value (maximum snapshot id),
3953 * snapshot count, and array of that many snapshot ids.
3954 * For now we have a fixed upper limit on the number we're
3955 * prepared to receive.
3956 */
3957 size = sizeof (__le64) + sizeof (__le32) +
3958 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3959 reply_buf = kzalloc(size, GFP_KERNEL);
3960 if (!reply_buf)
3961 return -ENOMEM;
3962
36be9a76 3963 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3964 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 3965 reply_buf, size);
36be9a76 3966 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3967 if (ret < 0)
3968 goto out;
3969
35d489f9 3970 p = reply_buf;
57385b51
AE
3971 end = reply_buf + ret;
3972 ret = -ERANGE;
35d489f9
AE
3973 ceph_decode_64_safe(&p, end, seq, out);
3974 ceph_decode_32_safe(&p, end, snap_count, out);
3975
3976 /*
3977 * Make sure the reported number of snapshot ids wouldn't go
3978 * beyond the end of our buffer. But before checking that,
3979 * make sure the computed size of the snapshot context we
3980 * allocate is representable in a size_t.
3981 */
3982 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3983 / sizeof (u64)) {
3984 ret = -EINVAL;
3985 goto out;
3986 }
3987 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3988 goto out;
468521c1 3989 ret = 0;
35d489f9 3990
812164f8 3991 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
3992 if (!snapc) {
3993 ret = -ENOMEM;
3994 goto out;
3995 }
35d489f9 3996 snapc->seq = seq;
35d489f9
AE
3997 for (i = 0; i < snap_count; i++)
3998 snapc->snaps[i] = ceph_decode_64(&p);
3999
49ece554 4000 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
4001 rbd_dev->header.snapc = snapc;
4002
4003 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 4004 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
4005out:
4006 kfree(reply_buf);
4007
57385b51 4008 return ret;
35d489f9
AE
4009}
4010
54cac61f
AE
4011static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
4012 u64 snap_id)
b8b1e2db
AE
4013{
4014 size_t size;
4015 void *reply_buf;
54cac61f 4016 __le64 snapid;
b8b1e2db
AE
4017 int ret;
4018 void *p;
4019 void *end;
b8b1e2db
AE
4020 char *snap_name;
4021
4022 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4023 reply_buf = kmalloc(size, GFP_KERNEL);
4024 if (!reply_buf)
4025 return ERR_PTR(-ENOMEM);
4026
54cac61f 4027 snapid = cpu_to_le64(snap_id);
36be9a76 4028 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 4029 "rbd", "get_snapshot_name",
54cac61f 4030 &snapid, sizeof (snapid),
e2a58ee5 4031 reply_buf, size);
36be9a76 4032 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4033 if (ret < 0) {
4034 snap_name = ERR_PTR(ret);
b8b1e2db 4035 goto out;
f40eb349 4036 }
b8b1e2db
AE
4037
4038 p = reply_buf;
f40eb349 4039 end = reply_buf + ret;
e5c35534 4040 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4041 if (IS_ERR(snap_name))
b8b1e2db 4042 goto out;
b8b1e2db 4043
f40eb349 4044 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4045 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4046out:
4047 kfree(reply_buf);
4048
f40eb349 4049 return snap_name;
b8b1e2db
AE
4050}
4051
cc4a38bd 4052static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
117973fb
AE
4053{
4054 int ret;
117973fb
AE
4055
4056 down_write(&rbd_dev->header_rwsem);
4057
117973fb
AE
4058 ret = rbd_dev_v2_image_size(rbd_dev);
4059 if (ret)
4060 goto out;
29334ba4
AE
4061 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4062 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4063 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4064
cc4a38bd 4065 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb
AE
4066 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4067 if (ret)
4068 goto out;
117973fb
AE
4069out:
4070 up_write(&rbd_dev->header_rwsem);
4071
4072 return ret;
4073}
4074
dfc5606d
YS
4075static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4076{
dfc5606d 4077 struct device *dev;
cd789ab9 4078 int ret;
dfc5606d
YS
4079
4080 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4081
cd789ab9 4082 dev = &rbd_dev->dev;
dfc5606d
YS
4083 dev->bus = &rbd_bus_type;
4084 dev->type = &rbd_device_type;
4085 dev->parent = &rbd_root_dev;
200a6a8b 4086 dev->release = rbd_dev_device_release;
de71a297 4087 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4088 ret = device_register(dev);
dfc5606d 4089
dfc5606d 4090 mutex_unlock(&ctl_mutex);
cd789ab9 4091
dfc5606d 4092 return ret;
602adf40
YS
4093}
4094
dfc5606d
YS
4095static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4096{
4097 device_unregister(&rbd_dev->dev);
4098}
4099
e2839308 4100static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4101
4102/*
499afd5b
AE
4103 * Get a unique rbd identifier for the given new rbd_dev, and add
4104 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4105 */
e2839308 4106static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4107{
e2839308 4108 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4109
4110 spin_lock(&rbd_dev_list_lock);
4111 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4112 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4113 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4114 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4115}
b7f23c36 4116
1ddbe94e 4117/*
499afd5b
AE
4118 * Remove an rbd_dev from the global list, and record that its
4119 * identifier is no longer in use.
1ddbe94e 4120 */
e2839308 4121static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4122{
d184f6bf 4123 struct list_head *tmp;
de71a297 4124 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4125 int max_id;
4126
aafb230e 4127 rbd_assert(rbd_id > 0);
499afd5b 4128
e2839308
AE
4129 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4130 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4131 spin_lock(&rbd_dev_list_lock);
4132 list_del_init(&rbd_dev->node);
d184f6bf
AE
4133
4134 /*
4135 * If the id being "put" is not the current maximum, there
4136 * is nothing special we need to do.
4137 */
e2839308 4138 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4139 spin_unlock(&rbd_dev_list_lock);
4140 return;
4141 }
4142
4143 /*
4144 * We need to update the current maximum id. Search the
4145 * list to find out what it is. We're more likely to find
4146 * the maximum at the end, so search the list backward.
4147 */
4148 max_id = 0;
4149 list_for_each_prev(tmp, &rbd_dev_list) {
4150 struct rbd_device *rbd_dev;
4151
4152 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4153 if (rbd_dev->dev_id > max_id)
4154 max_id = rbd_dev->dev_id;
d184f6bf 4155 }
499afd5b 4156 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4157
1ddbe94e 4158 /*
e2839308 4159 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4160 * which case it now accurately reflects the new maximum.
4161 * Be careful not to overwrite the maximum value in that
4162 * case.
1ddbe94e 4163 */
e2839308
AE
4164 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4165 dout(" max dev id has been reset\n");
b7f23c36
AE
4166}
4167
e28fff26
AE
4168/*
4169 * Skips over white space at *buf, and updates *buf to point to the
4170 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4171 * the token (string of non-white space characters) found. Note
4172 * that *buf must be terminated with '\0'.
e28fff26
AE
4173 */
4174static inline size_t next_token(const char **buf)
4175{
4176 /*
4177 * These are the characters that produce nonzero for
4178 * isspace() in the "C" and "POSIX" locales.
4179 */
4180 const char *spaces = " \f\n\r\t\v";
4181
4182 *buf += strspn(*buf, spaces); /* Find start of token */
4183
4184 return strcspn(*buf, spaces); /* Return token length */
4185}
4186
4187/*
4188 * Finds the next token in *buf, and if the provided token buffer is
4189 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4190 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4191 * must be terminated with '\0' on entry.
e28fff26
AE
4192 *
4193 * Returns the length of the token found (not including the '\0').
4194 * Return value will be 0 if no token is found, and it will be >=
4195 * token_size if the token would not fit.
4196 *
593a9e7b 4197 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4198 * found token. Note that this occurs even if the token buffer is
4199 * too small to hold it.
4200 */
4201static inline size_t copy_token(const char **buf,
4202 char *token,
4203 size_t token_size)
4204{
4205 size_t len;
4206
4207 len = next_token(buf);
4208 if (len < token_size) {
4209 memcpy(token, *buf, len);
4210 *(token + len) = '\0';
4211 }
4212 *buf += len;
4213
4214 return len;
4215}
4216
ea3352f4
AE
4217/*
4218 * Finds the next token in *buf, dynamically allocates a buffer big
4219 * enough to hold a copy of it, and copies the token into the new
4220 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4221 * that a duplicate buffer is created even for a zero-length token.
4222 *
4223 * Returns a pointer to the newly-allocated duplicate, or a null
4224 * pointer if memory for the duplicate was not available. If
4225 * the lenp argument is a non-null pointer, the length of the token
4226 * (not including the '\0') is returned in *lenp.
4227 *
4228 * If successful, the *buf pointer will be updated to point beyond
4229 * the end of the found token.
4230 *
4231 * Note: uses GFP_KERNEL for allocation.
4232 */
4233static inline char *dup_token(const char **buf, size_t *lenp)
4234{
4235 char *dup;
4236 size_t len;
4237
4238 len = next_token(buf);
4caf35f9 4239 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4240 if (!dup)
4241 return NULL;
ea3352f4
AE
4242 *(dup + len) = '\0';
4243 *buf += len;
4244
4245 if (lenp)
4246 *lenp = len;
4247
4248 return dup;
4249}
4250
a725f65e 4251/*
859c31df
AE
4252 * Parse the options provided for an "rbd add" (i.e., rbd image
4253 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4254 * and the data written is passed here via a NUL-terminated buffer.
4255 * Returns 0 if successful or an error code otherwise.
d22f76e7 4256 *
859c31df
AE
4257 * The information extracted from these options is recorded in
4258 * the other parameters which return dynamically-allocated
4259 * structures:
4260 * ceph_opts
4261 * The address of a pointer that will refer to a ceph options
4262 * structure. Caller must release the returned pointer using
4263 * ceph_destroy_options() when it is no longer needed.
4264 * rbd_opts
4265 * Address of an rbd options pointer. Fully initialized by
4266 * this function; caller must release with kfree().
4267 * spec
4268 * Address of an rbd image specification pointer. Fully
4269 * initialized by this function based on parsed options.
4270 * Caller must release with rbd_spec_put().
4271 *
4272 * The options passed take this form:
4273 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4274 * where:
4275 * <mon_addrs>
4276 * A comma-separated list of one or more monitor addresses.
4277 * A monitor address is an ip address, optionally followed
4278 * by a port number (separated by a colon).
4279 * I.e.: ip1[:port1][,ip2[:port2]...]
4280 * <options>
4281 * A comma-separated list of ceph and/or rbd options.
4282 * <pool_name>
4283 * The name of the rados pool containing the rbd image.
4284 * <image_name>
4285 * The name of the image in that pool to map.
4286 * <snap_id>
4287 * An optional snapshot id. If provided, the mapping will
4288 * present data from the image at the time that snapshot was
4289 * created. The image head is used if no snapshot id is
4290 * provided. Snapshot mappings are always read-only.
a725f65e 4291 */
859c31df 4292static int rbd_add_parse_args(const char *buf,
dc79b113 4293 struct ceph_options **ceph_opts,
859c31df
AE
4294 struct rbd_options **opts,
4295 struct rbd_spec **rbd_spec)
e28fff26 4296{
d22f76e7 4297 size_t len;
859c31df 4298 char *options;
0ddebc0c 4299 const char *mon_addrs;
ecb4dc22 4300 char *snap_name;
0ddebc0c 4301 size_t mon_addrs_size;
859c31df 4302 struct rbd_spec *spec = NULL;
4e9afeba 4303 struct rbd_options *rbd_opts = NULL;
859c31df 4304 struct ceph_options *copts;
dc79b113 4305 int ret;
e28fff26
AE
4306
4307 /* The first four tokens are required */
4308
7ef3214a 4309 len = next_token(&buf);
4fb5d671
AE
4310 if (!len) {
4311 rbd_warn(NULL, "no monitor address(es) provided");
4312 return -EINVAL;
4313 }
0ddebc0c 4314 mon_addrs = buf;
f28e565a 4315 mon_addrs_size = len + 1;
7ef3214a 4316 buf += len;
a725f65e 4317
dc79b113 4318 ret = -EINVAL;
f28e565a
AE
4319 options = dup_token(&buf, NULL);
4320 if (!options)
dc79b113 4321 return -ENOMEM;
4fb5d671
AE
4322 if (!*options) {
4323 rbd_warn(NULL, "no options provided");
4324 goto out_err;
4325 }
e28fff26 4326
859c31df
AE
4327 spec = rbd_spec_alloc();
4328 if (!spec)
f28e565a 4329 goto out_mem;
859c31df
AE
4330
4331 spec->pool_name = dup_token(&buf, NULL);
4332 if (!spec->pool_name)
4333 goto out_mem;
4fb5d671
AE
4334 if (!*spec->pool_name) {
4335 rbd_warn(NULL, "no pool name provided");
4336 goto out_err;
4337 }
e28fff26 4338
69e7a02f 4339 spec->image_name = dup_token(&buf, NULL);
859c31df 4340 if (!spec->image_name)
f28e565a 4341 goto out_mem;
4fb5d671
AE
4342 if (!*spec->image_name) {
4343 rbd_warn(NULL, "no image name provided");
4344 goto out_err;
4345 }
d4b125e9 4346
f28e565a
AE
4347 /*
4348 * Snapshot name is optional; default is to use "-"
4349 * (indicating the head/no snapshot).
4350 */
3feeb894 4351 len = next_token(&buf);
820a5f3e 4352 if (!len) {
3feeb894
AE
4353 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4354 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4355 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4356 ret = -ENAMETOOLONG;
f28e565a 4357 goto out_err;
849b4260 4358 }
ecb4dc22
AE
4359 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4360 if (!snap_name)
f28e565a 4361 goto out_mem;
ecb4dc22
AE
4362 *(snap_name + len) = '\0';
4363 spec->snap_name = snap_name;
e5c35534 4364
0ddebc0c 4365 /* Initialize all rbd options to the defaults */
e28fff26 4366
4e9afeba
AE
4367 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4368 if (!rbd_opts)
4369 goto out_mem;
4370
4371 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4372
859c31df 4373 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4374 mon_addrs + mon_addrs_size - 1,
4e9afeba 4375 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4376 if (IS_ERR(copts)) {
4377 ret = PTR_ERR(copts);
dc79b113
AE
4378 goto out_err;
4379 }
859c31df
AE
4380 kfree(options);
4381
4382 *ceph_opts = copts;
4e9afeba 4383 *opts = rbd_opts;
859c31df 4384 *rbd_spec = spec;
0ddebc0c 4385
dc79b113 4386 return 0;
f28e565a 4387out_mem:
dc79b113 4388 ret = -ENOMEM;
d22f76e7 4389out_err:
859c31df
AE
4390 kfree(rbd_opts);
4391 rbd_spec_put(spec);
f28e565a 4392 kfree(options);
d22f76e7 4393
dc79b113 4394 return ret;
a725f65e
AE
4395}
4396
589d30e0
AE
4397/*
4398 * An rbd format 2 image has a unique identifier, distinct from the
4399 * name given to it by the user. Internally, that identifier is
4400 * what's used to specify the names of objects related to the image.
4401 *
4402 * A special "rbd id" object is used to map an rbd image name to its
4403 * id. If that object doesn't exist, then there is no v2 rbd image
4404 * with the supplied name.
4405 *
4406 * This function will record the given rbd_dev's image_id field if
4407 * it can be determined, and in that case will return 0. If any
4408 * errors occur a negative errno will be returned and the rbd_dev's
4409 * image_id field will be unchanged (and should be NULL).
4410 */
4411static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4412{
4413 int ret;
4414 size_t size;
4415 char *object_name;
4416 void *response;
c0fba368 4417 char *image_id;
2f82ee54 4418
2c0d0a10
AE
4419 /*
4420 * When probing a parent image, the image id is already
4421 * known (and the image name likely is not). There's no
c0fba368
AE
4422 * need to fetch the image id again in this case. We
4423 * do still need to set the image format though.
2c0d0a10 4424 */
c0fba368
AE
4425 if (rbd_dev->spec->image_id) {
4426 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4427
2c0d0a10 4428 return 0;
c0fba368 4429 }
2c0d0a10 4430
589d30e0
AE
4431 /*
4432 * First, see if the format 2 image id file exists, and if
4433 * so, get the image's persistent id from it.
4434 */
69e7a02f 4435 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4436 object_name = kmalloc(size, GFP_NOIO);
4437 if (!object_name)
4438 return -ENOMEM;
0d7dbfce 4439 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4440 dout("rbd id object name is %s\n", object_name);
4441
4442 /* Response will be an encoded string, which includes a length */
4443
4444 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4445 response = kzalloc(size, GFP_NOIO);
4446 if (!response) {
4447 ret = -ENOMEM;
4448 goto out;
4449 }
4450
c0fba368
AE
4451 /* If it doesn't exist we'll assume it's a format 1 image */
4452
36be9a76 4453 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4454 "rbd", "get_id", NULL, 0,
e2a58ee5 4455 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4456 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4457 if (ret == -ENOENT) {
4458 image_id = kstrdup("", GFP_KERNEL);
4459 ret = image_id ? 0 : -ENOMEM;
4460 if (!ret)
4461 rbd_dev->image_format = 1;
4462 } else if (ret > sizeof (__le32)) {
4463 void *p = response;
4464
4465 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4466 NULL, GFP_NOIO);
c0fba368
AE
4467 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4468 if (!ret)
4469 rbd_dev->image_format = 2;
589d30e0 4470 } else {
c0fba368
AE
4471 ret = -EINVAL;
4472 }
4473
4474 if (!ret) {
4475 rbd_dev->spec->image_id = image_id;
4476 dout("image_id is %s\n", image_id);
589d30e0
AE
4477 }
4478out:
4479 kfree(response);
4480 kfree(object_name);
4481
4482 return ret;
4483}
4484
6fd48b3b
AE
4485/* Undo whatever state changes are made by v1 or v2 image probe */
4486
4487static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4488{
4489 struct rbd_image_header *header;
4490
4491 rbd_dev_remove_parent(rbd_dev);
4492 rbd_spec_put(rbd_dev->parent_spec);
4493 rbd_dev->parent_spec = NULL;
4494 rbd_dev->parent_overlap = 0;
4495
4496 /* Free dynamic fields from the header, then zero it out */
4497
4498 header = &rbd_dev->header;
812164f8 4499 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4500 kfree(header->snap_sizes);
4501 kfree(header->snap_names);
4502 kfree(header->object_prefix);
4503 memset(header, 0, sizeof (*header));
4504}
4505
a30b71b9
AE
4506static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4507{
4508 int ret;
a30b71b9
AE
4509
4510 /* Populate rbd image metadata */
4511
4512 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4513 if (ret < 0)
4514 goto out_err;
86b00e0d
AE
4515
4516 /* Version 1 images have no parent (no layering) */
4517
4518 rbd_dev->parent_spec = NULL;
4519 rbd_dev->parent_overlap = 0;
4520
a30b71b9
AE
4521 dout("discovered version 1 image, header name is %s\n",
4522 rbd_dev->header_name);
4523
4524 return 0;
4525
4526out_err:
4527 kfree(rbd_dev->header_name);
4528 rbd_dev->header_name = NULL;
0d7dbfce
AE
4529 kfree(rbd_dev->spec->image_id);
4530 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
4531
4532 return ret;
4533}
4534
4535static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4536{
9d475de5 4537 int ret;
a30b71b9 4538
9d475de5 4539 ret = rbd_dev_v2_image_size(rbd_dev);
57385b51 4540 if (ret)
1e130199
AE
4541 goto out_err;
4542
4543 /* Get the object prefix (a.k.a. block_name) for the image */
4544
4545 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4546 if (ret)
b1b5402a
AE
4547 goto out_err;
4548
d889140c 4549 /* Get the and check features for the image */
b1b5402a
AE
4550
4551 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4552 if (ret)
9d475de5 4553 goto out_err;
35d489f9 4554
86b00e0d
AE
4555 /* If the image supports layering, get the parent info */
4556
4557 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4558 ret = rbd_dev_v2_parent_info(rbd_dev);
57385b51 4559 if (ret)
86b00e0d 4560 goto out_err;
96882f55 4561 /*
c734b796
AE
4562 * Print a warning if this image has a parent.
4563 * Don't print it if the image now being probed
4564 * is itself a parent. We can tell at this point
4565 * because we won't know its pool name yet (just its
4566 * pool id).
96882f55 4567 */
c734b796 4568 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
96882f55
AE
4569 rbd_warn(rbd_dev, "WARNING: kernel layering "
4570 "is EXPERIMENTAL!");
86b00e0d
AE
4571 }
4572
cc070d59
AE
4573 /* If the image supports fancy striping, get its parameters */
4574
4575 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4576 ret = rbd_dev_v2_striping_info(rbd_dev);
4577 if (ret < 0)
4578 goto out_err;
4579 }
4580
6e14b1a6
AE
4581 /* crypto and compression type aren't (yet) supported for v2 images */
4582
4583 rbd_dev->header.crypt_type = 0;
4584 rbd_dev->header.comp_type = 0;
35d489f9 4585
6e14b1a6
AE
4586 /* Get the snapshot context, plus the header version */
4587
cc4a38bd 4588 ret = rbd_dev_v2_snap_context(rbd_dev);
35d489f9
AE
4589 if (ret)
4590 goto out_err;
6e14b1a6 4591
a30b71b9
AE
4592 dout("discovered version 2 image, header name is %s\n",
4593 rbd_dev->header_name);
4594
35152979 4595 return 0;
9d475de5 4596out_err:
86b00e0d
AE
4597 rbd_dev->parent_overlap = 0;
4598 rbd_spec_put(rbd_dev->parent_spec);
4599 rbd_dev->parent_spec = NULL;
9d475de5
AE
4600 kfree(rbd_dev->header_name);
4601 rbd_dev->header_name = NULL;
1e130199
AE
4602 kfree(rbd_dev->header.object_prefix);
4603 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4604
4605 return ret;
a30b71b9
AE
4606}
4607
124afba2 4608static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4609{
2f82ee54 4610 struct rbd_device *parent = NULL;
124afba2
AE
4611 struct rbd_spec *parent_spec;
4612 struct rbd_client *rbdc;
4613 int ret;
4614
4615 if (!rbd_dev->parent_spec)
4616 return 0;
4617 /*
4618 * We need to pass a reference to the client and the parent
4619 * spec when creating the parent rbd_dev. Images related by
4620 * parent/child relationships always share both.
4621 */
4622 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4623 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4624
4625 ret = -ENOMEM;
4626 parent = rbd_dev_create(rbdc, parent_spec);
4627 if (!parent)
4628 goto out_err;
4629
4630 ret = rbd_dev_image_probe(parent);
4631 if (ret < 0)
4632 goto out_err;
4633 rbd_dev->parent = parent;
4634
4635 return 0;
4636out_err:
4637 if (parent) {
4638 rbd_spec_put(rbd_dev->parent_spec);
4639 kfree(rbd_dev->header_name);
4640 rbd_dev_destroy(parent);
4641 } else {
4642 rbd_put_client(rbdc);
4643 rbd_spec_put(parent_spec);
4644 }
4645
4646 return ret;
4647}
4648
200a6a8b 4649static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4650{
83a06263 4651 int ret;
d1cf5788
AE
4652
4653 ret = rbd_dev_mapping_set(rbd_dev);
83a06263 4654 if (ret)
9bb81c9b 4655 return ret;
5de10f3b 4656
83a06263
AE
4657 /* generate unique id: find highest unique id, add one */
4658 rbd_dev_id_get(rbd_dev);
4659
4660 /* Fill in the device name, now that we have its id. */
4661 BUILD_BUG_ON(DEV_NAME_LEN
4662 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4663 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4664
4665 /* Get our block major device number. */
4666
4667 ret = register_blkdev(0, rbd_dev->name);
4668 if (ret < 0)
4669 goto err_out_id;
4670 rbd_dev->major = ret;
4671
4672 /* Set up the blkdev mapping. */
4673
4674 ret = rbd_init_disk(rbd_dev);
4675 if (ret)
4676 goto err_out_blkdev;
4677
4678 ret = rbd_bus_add_dev(rbd_dev);
4679 if (ret)
4680 goto err_out_disk;
4681
83a06263
AE
4682 /* Everything's ready. Announce the disk to the world. */
4683
b5156e76 4684 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
129b79d4 4685 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4686 add_disk(rbd_dev->disk);
4687
4688 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4689 (unsigned long long) rbd_dev->mapping.size);
4690
4691 return ret;
2f82ee54 4692
83a06263
AE
4693err_out_disk:
4694 rbd_free_disk(rbd_dev);
4695err_out_blkdev:
4696 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4697err_out_id:
4698 rbd_dev_id_put(rbd_dev);
d1cf5788 4699 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4700
4701 return ret;
4702}
4703
332bb12d
AE
4704static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4705{
4706 struct rbd_spec *spec = rbd_dev->spec;
4707 size_t size;
4708
4709 /* Record the header object name for this rbd image. */
4710
4711 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4712
4713 if (rbd_dev->image_format == 1)
4714 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4715 else
4716 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4717
4718 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4719 if (!rbd_dev->header_name)
4720 return -ENOMEM;
4721
4722 if (rbd_dev->image_format == 1)
4723 sprintf(rbd_dev->header_name, "%s%s",
4724 spec->image_name, RBD_SUFFIX);
4725 else
4726 sprintf(rbd_dev->header_name, "%s%s",
4727 RBD_HEADER_PREFIX, spec->image_id);
4728 return 0;
4729}
4730
200a6a8b
AE
4731static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4732{
6fd48b3b
AE
4733 int ret;
4734
6fd48b3b
AE
4735 rbd_dev_unprobe(rbd_dev);
4736 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4737 if (ret)
4738 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
200a6a8b 4739 kfree(rbd_dev->header_name);
6fd48b3b
AE
4740 rbd_dev->header_name = NULL;
4741 rbd_dev->image_format = 0;
4742 kfree(rbd_dev->spec->image_id);
4743 rbd_dev->spec->image_id = NULL;
4744
200a6a8b
AE
4745 rbd_dev_destroy(rbd_dev);
4746}
4747
a30b71b9
AE
4748/*
4749 * Probe for the existence of the header object for the given rbd
4750 * device. For format 2 images this includes determining the image
4751 * id.
4752 */
71f293e2 4753static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
a30b71b9
AE
4754{
4755 int ret;
b644de2b 4756 int tmp;
a30b71b9
AE
4757
4758 /*
4759 * Get the id from the image id object. If it's not a
4760 * format 2 image, we'll get ENOENT back, and we'll assume
4761 * it's a format 1 image.
4762 */
4763 ret = rbd_dev_image_id(rbd_dev);
4764 if (ret)
c0fba368
AE
4765 return ret;
4766 rbd_assert(rbd_dev->spec->image_id);
4767 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4768
332bb12d
AE
4769 ret = rbd_dev_header_name(rbd_dev);
4770 if (ret)
4771 goto err_out_format;
4772
b644de2b
AE
4773 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4774 if (ret)
4775 goto out_header_name;
4776
c0fba368 4777 if (rbd_dev->image_format == 1)
a30b71b9
AE
4778 ret = rbd_dev_v1_probe(rbd_dev);
4779 else
4780 ret = rbd_dev_v2_probe(rbd_dev);
5655c4d9 4781 if (ret)
b644de2b 4782 goto err_out_watch;
83a06263 4783
9bb81c9b
AE
4784 ret = rbd_dev_spec_update(rbd_dev);
4785 if (ret)
33dca39f 4786 goto err_out_probe;
9bb81c9b
AE
4787
4788 ret = rbd_dev_probe_parent(rbd_dev);
6fd48b3b
AE
4789 if (!ret)
4790 return 0;
83a06263 4791
6fd48b3b
AE
4792err_out_probe:
4793 rbd_dev_unprobe(rbd_dev);
b644de2b
AE
4794err_out_watch:
4795 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4796 if (tmp)
4797 rbd_warn(rbd_dev, "unable to tear down watch request\n");
332bb12d
AE
4798out_header_name:
4799 kfree(rbd_dev->header_name);
4800 rbd_dev->header_name = NULL;
4801err_out_format:
4802 rbd_dev->image_format = 0;
5655c4d9
AE
4803 kfree(rbd_dev->spec->image_id);
4804 rbd_dev->spec->image_id = NULL;
4805
4806 dout("probe failed, returning %d\n", ret);
4807
a30b71b9
AE
4808 return ret;
4809}
4810
59c2be1e
YS
4811static ssize_t rbd_add(struct bus_type *bus,
4812 const char *buf,
4813 size_t count)
602adf40 4814{
cb8627c7 4815 struct rbd_device *rbd_dev = NULL;
dc79b113 4816 struct ceph_options *ceph_opts = NULL;
4e9afeba 4817 struct rbd_options *rbd_opts = NULL;
859c31df 4818 struct rbd_spec *spec = NULL;
9d3997fd 4819 struct rbd_client *rbdc;
27cc2594
AE
4820 struct ceph_osd_client *osdc;
4821 int rc = -ENOMEM;
602adf40
YS
4822
4823 if (!try_module_get(THIS_MODULE))
4824 return -ENODEV;
4825
602adf40 4826 /* parse add command */
859c31df 4827 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4828 if (rc < 0)
bd4ba655 4829 goto err_out_module;
78cea76e 4830
9d3997fd
AE
4831 rbdc = rbd_get_client(ceph_opts);
4832 if (IS_ERR(rbdc)) {
4833 rc = PTR_ERR(rbdc);
0ddebc0c 4834 goto err_out_args;
9d3997fd 4835 }
c53d5893 4836 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4837
602adf40 4838 /* pick the pool */
9d3997fd 4839 osdc = &rbdc->client->osdc;
859c31df 4840 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4841 if (rc < 0)
4842 goto err_out_client;
c0cd10db 4843 spec->pool_id = (u64)rc;
859c31df 4844
0903e875
AE
4845 /* The ceph file layout needs to fit pool id in 32 bits */
4846
c0cd10db
AE
4847 if (spec->pool_id > (u64)U32_MAX) {
4848 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4849 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
4850 rc = -EIO;
4851 goto err_out_client;
4852 }
4853
c53d5893 4854 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4855 if (!rbd_dev)
4856 goto err_out_client;
c53d5893
AE
4857 rbdc = NULL; /* rbd_dev now owns this */
4858 spec = NULL; /* rbd_dev now owns this */
602adf40 4859
bd4ba655 4860 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4861 kfree(rbd_opts);
4862 rbd_opts = NULL; /* done with this */
bd4ba655 4863
71f293e2 4864 rc = rbd_dev_image_probe(rbd_dev);
a30b71b9 4865 if (rc < 0)
c53d5893 4866 goto err_out_rbd_dev;
05fd6f6f 4867
b536f69a
AE
4868 rc = rbd_dev_device_setup(rbd_dev);
4869 if (!rc)
4870 return count;
4871
4872 rbd_dev_image_release(rbd_dev);
c53d5893
AE
4873err_out_rbd_dev:
4874 rbd_dev_destroy(rbd_dev);
bd4ba655 4875err_out_client:
9d3997fd 4876 rbd_put_client(rbdc);
0ddebc0c 4877err_out_args:
78cea76e
AE
4878 if (ceph_opts)
4879 ceph_destroy_options(ceph_opts);
4e9afeba 4880 kfree(rbd_opts);
859c31df 4881 rbd_spec_put(spec);
bd4ba655
AE
4882err_out_module:
4883 module_put(THIS_MODULE);
27cc2594 4884
602adf40 4885 dout("Error adding device %s\n", buf);
27cc2594 4886
c0cd10db 4887 return (ssize_t)rc;
602adf40
YS
4888}
4889
de71a297 4890static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4891{
4892 struct list_head *tmp;
4893 struct rbd_device *rbd_dev;
4894
e124a82f 4895 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4896 list_for_each(tmp, &rbd_dev_list) {
4897 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4898 if (rbd_dev->dev_id == dev_id) {
e124a82f 4899 spin_unlock(&rbd_dev_list_lock);
602adf40 4900 return rbd_dev;
e124a82f 4901 }
602adf40 4902 }
e124a82f 4903 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4904 return NULL;
4905}
4906
200a6a8b 4907static void rbd_dev_device_release(struct device *dev)
602adf40 4908{
593a9e7b 4909 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4910
602adf40 4911 rbd_free_disk(rbd_dev);
200a6a8b
AE
4912 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4913 rbd_dev_clear_mapping(rbd_dev);
602adf40 4914 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 4915 rbd_dev->major = 0;
e2839308 4916 rbd_dev_id_put(rbd_dev);
d1cf5788 4917 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
4918}
4919
05a46afd
AE
4920static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4921{
ad945fc1 4922 while (rbd_dev->parent) {
05a46afd
AE
4923 struct rbd_device *first = rbd_dev;
4924 struct rbd_device *second = first->parent;
4925 struct rbd_device *third;
4926
4927 /*
4928 * Follow to the parent with no grandparent and
4929 * remove it.
4930 */
4931 while (second && (third = second->parent)) {
4932 first = second;
4933 second = third;
4934 }
ad945fc1 4935 rbd_assert(second);
8ad42cd0 4936 rbd_dev_image_release(second);
ad945fc1
AE
4937 first->parent = NULL;
4938 first->parent_overlap = 0;
4939
4940 rbd_assert(first->parent_spec);
05a46afd
AE
4941 rbd_spec_put(first->parent_spec);
4942 first->parent_spec = NULL;
05a46afd
AE
4943 }
4944}
4945
dfc5606d
YS
4946static ssize_t rbd_remove(struct bus_type *bus,
4947 const char *buf,
4948 size_t count)
602adf40
YS
4949{
4950 struct rbd_device *rbd_dev = NULL;
0d8189e1 4951 int target_id;
602adf40 4952 unsigned long ul;
0d8189e1 4953 int ret;
602adf40 4954
0d8189e1
AE
4955 ret = strict_strtoul(buf, 10, &ul);
4956 if (ret)
4957 return ret;
602adf40
YS
4958
4959 /* convert to int; abort if we lost anything in the conversion */
4960 target_id = (int) ul;
4961 if (target_id != ul)
4962 return -EINVAL;
4963
4964 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4965
4966 rbd_dev = __rbd_get_dev(target_id);
4967 if (!rbd_dev) {
4968 ret = -ENOENT;
4969 goto done;
42382b70
AE
4970 }
4971
a14ea269 4972 spin_lock_irq(&rbd_dev->lock);
b82d167b 4973 if (rbd_dev->open_count)
42382b70 4974 ret = -EBUSY;
b82d167b
AE
4975 else
4976 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 4977 spin_unlock_irq(&rbd_dev->lock);
b82d167b 4978 if (ret < 0)
42382b70 4979 goto done;
0d8189e1 4980 ret = count;
b480815a 4981 rbd_bus_del_dev(rbd_dev);
8ad42cd0 4982 rbd_dev_image_release(rbd_dev);
79ab7558 4983 module_put(THIS_MODULE);
602adf40
YS
4984done:
4985 mutex_unlock(&ctl_mutex);
aafb230e 4986
602adf40
YS
4987 return ret;
4988}
4989
602adf40
YS
4990/*
4991 * create control files in sysfs
dfc5606d 4992 * /sys/bus/rbd/...
602adf40
YS
4993 */
4994static int rbd_sysfs_init(void)
4995{
dfc5606d 4996 int ret;
602adf40 4997
fed4c143 4998 ret = device_register(&rbd_root_dev);
21079786 4999 if (ret < 0)
dfc5606d 5000 return ret;
602adf40 5001
fed4c143
AE
5002 ret = bus_register(&rbd_bus_type);
5003 if (ret < 0)
5004 device_unregister(&rbd_root_dev);
602adf40 5005
602adf40
YS
5006 return ret;
5007}
5008
5009static void rbd_sysfs_cleanup(void)
5010{
dfc5606d 5011 bus_unregister(&rbd_bus_type);
fed4c143 5012 device_unregister(&rbd_root_dev);
602adf40
YS
5013}
5014
1c2a9dfe
AE
5015static int rbd_slab_init(void)
5016{
5017 rbd_assert(!rbd_img_request_cache);
5018 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5019 sizeof (struct rbd_img_request),
5020 __alignof__(struct rbd_img_request),
5021 0, NULL);
868311b1
AE
5022 if (!rbd_img_request_cache)
5023 return -ENOMEM;
5024
5025 rbd_assert(!rbd_obj_request_cache);
5026 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5027 sizeof (struct rbd_obj_request),
5028 __alignof__(struct rbd_obj_request),
5029 0, NULL);
78c2a44a
AE
5030 if (!rbd_obj_request_cache)
5031 goto out_err;
5032
5033 rbd_assert(!rbd_segment_name_cache);
5034 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5035 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5036 if (rbd_segment_name_cache)
1c2a9dfe 5037 return 0;
78c2a44a
AE
5038out_err:
5039 if (rbd_obj_request_cache) {
5040 kmem_cache_destroy(rbd_obj_request_cache);
5041 rbd_obj_request_cache = NULL;
5042 }
1c2a9dfe 5043
868311b1
AE
5044 kmem_cache_destroy(rbd_img_request_cache);
5045 rbd_img_request_cache = NULL;
5046
1c2a9dfe
AE
5047 return -ENOMEM;
5048}
5049
5050static void rbd_slab_exit(void)
5051{
78c2a44a
AE
5052 rbd_assert(rbd_segment_name_cache);
5053 kmem_cache_destroy(rbd_segment_name_cache);
5054 rbd_segment_name_cache = NULL;
5055
868311b1
AE
5056 rbd_assert(rbd_obj_request_cache);
5057 kmem_cache_destroy(rbd_obj_request_cache);
5058 rbd_obj_request_cache = NULL;
5059
1c2a9dfe
AE
5060 rbd_assert(rbd_img_request_cache);
5061 kmem_cache_destroy(rbd_img_request_cache);
5062 rbd_img_request_cache = NULL;
5063}
5064
cc344fa1 5065static int __init rbd_init(void)
602adf40
YS
5066{
5067 int rc;
5068
1e32d34c
AE
5069 if (!libceph_compatible(NULL)) {
5070 rbd_warn(NULL, "libceph incompatibility (quitting)");
5071
5072 return -EINVAL;
5073 }
1c2a9dfe 5074 rc = rbd_slab_init();
602adf40
YS
5075 if (rc)
5076 return rc;
1c2a9dfe
AE
5077 rc = rbd_sysfs_init();
5078 if (rc)
5079 rbd_slab_exit();
5080 else
5081 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5082
5083 return rc;
602adf40
YS
5084}
5085
cc344fa1 5086static void __exit rbd_exit(void)
602adf40
YS
5087{
5088 rbd_sysfs_cleanup();
1c2a9dfe 5089 rbd_slab_exit();
602adf40
YS
5090}
5091
5092module_init(rbd_init);
5093module_exit(rbd_exit);
5094
5095MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5096MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5097MODULE_DESCRIPTION("rados block device");
5098
5099/* following authorship retained from original osdblk.c */
5100MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5101
5102MODULE_LICENSE("GPL");