]> git.proxmox.com Git - mirror_ubuntu-jammy-kernel.git/blame - drivers/block/rbd.c
rbd: more version parameter removal
[mirror_ubuntu-jammy-kernel.git] / drivers / block / rbd.c
CommitLineData
602adf40
YS
1/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
dfc5606d 24 For usage instructions, please refer to:
602adf40 25
dfc5606d 26 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
27
28 */
29
30#include <linux/ceph/libceph.h>
31#include <linux/ceph/osd_client.h>
32#include <linux/ceph/mon_client.h>
33#include <linux/ceph/decode.h>
59c2be1e 34#include <linux/parser.h>
602adf40
YS
35
36#include <linux/kernel.h>
37#include <linux/device.h>
38#include <linux/module.h>
39#include <linux/fs.h>
40#include <linux/blkdev.h>
41
42#include "rbd_types.h"
43
aafb230e
AE
44#define RBD_DEBUG /* Activate rbd_assert() calls */
45
593a9e7b
AE
46/*
47 * The basic unit of block I/O is a sector. It is interpreted in a
48 * number of contexts in Linux (blk, bio, genhd), but the default is
49 * universally 512 bytes. These symbols are just slightly more
50 * meaningful than the bare numbers they represent.
51 */
52#define SECTOR_SHIFT 9
53#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
54
f0f8cef5
AE
55#define RBD_DRV_NAME "rbd"
56#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
57
58#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
59
d4b125e9
AE
60#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
61#define RBD_MAX_SNAP_NAME_LEN \
62 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
35d489f9 64#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
65
66#define RBD_SNAP_HEAD_NAME "-"
67
9e15b77d
AE
68/* This allows a single page to hold an image name sent by OSD */
69#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 70#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 71
1e130199 72#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 73
d889140c
AE
74/* Feature bits */
75
5cbf6f12
AE
76#define RBD_FEATURE_LAYERING (1<<0)
77#define RBD_FEATURE_STRIPINGV2 (1<<1)
78#define RBD_FEATURES_ALL \
79 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
80
81/* Features supported by this (client software) implementation. */
82
770eba6e 83#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 84
81a89793
AE
85/*
86 * An RBD device name will be "rbd#", where the "rbd" comes from
87 * RBD_DRV_NAME above, and # is a unique integer identifier.
88 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89 * enough to hold all possible device names.
90 */
602adf40 91#define DEV_NAME_LEN 32
81a89793 92#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
93
94/*
95 * block device image metadata (in-memory version)
96 */
97struct rbd_image_header {
f84344f3 98 /* These four fields never change for a given rbd image */
849b4260 99 char *object_prefix;
34b13184 100 u64 features;
602adf40
YS
101 __u8 obj_order;
102 __u8 crypt_type;
103 __u8 comp_type;
602adf40 104
f84344f3
AE
105 /* The remaining fields need to be updated occasionally */
106 u64 image_size;
107 struct ceph_snap_context *snapc;
602adf40
YS
108 char *snap_names;
109 u64 *snap_sizes;
59c2be1e 110
500d0c0f
AE
111 u64 stripe_unit;
112 u64 stripe_count;
59c2be1e
YS
113};
114
0d7dbfce
AE
115/*
116 * An rbd image specification.
117 *
118 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
119 * identify an image. Each rbd_dev structure includes a pointer to
120 * an rbd_spec structure that encapsulates this identity.
121 *
122 * Each of the id's in an rbd_spec has an associated name. For a
123 * user-mapped image, the names are supplied and the id's associated
124 * with them are looked up. For a layered image, a parent image is
125 * defined by the tuple, and the names are looked up.
126 *
127 * An rbd_dev structure contains a parent_spec pointer which is
128 * non-null if the image it represents is a child in a layered
129 * image. This pointer will refer to the rbd_spec structure used
130 * by the parent rbd_dev for its own identity (i.e., the structure
131 * is shared between the parent and child).
132 *
133 * Since these structures are populated once, during the discovery
134 * phase of image construction, they are effectively immutable so
135 * we make no effort to synchronize access to them.
136 *
137 * Note that code herein does not assume the image name is known (it
138 * could be a null pointer).
0d7dbfce
AE
139 */
140struct rbd_spec {
141 u64 pool_id;
ecb4dc22 142 const char *pool_name;
0d7dbfce 143
ecb4dc22
AE
144 const char *image_id;
145 const char *image_name;
0d7dbfce
AE
146
147 u64 snap_id;
ecb4dc22 148 const char *snap_name;
0d7dbfce
AE
149
150 struct kref kref;
151};
152
602adf40 153/*
f0f8cef5 154 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
155 */
156struct rbd_client {
157 struct ceph_client *client;
158 struct kref kref;
159 struct list_head node;
160};
161
bf0d5f50
AE
162struct rbd_img_request;
163typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
164
165#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
166
167struct rbd_obj_request;
168typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
169
9969ebc5
AE
170enum obj_request_type {
171 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
172};
bf0d5f50 173
926f9b3f
AE
174enum obj_req_flags {
175 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 176 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
177 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
178 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
179};
180
bf0d5f50
AE
181struct rbd_obj_request {
182 const char *object_name;
183 u64 offset; /* object start byte */
184 u64 length; /* bytes from offset */
926f9b3f 185 unsigned long flags;
bf0d5f50 186
c5b5ef6c
AE
187 /*
188 * An object request associated with an image will have its
189 * img_data flag set; a standalone object request will not.
190 *
191 * A standalone object request will have which == BAD_WHICH
192 * and a null obj_request pointer.
193 *
194 * An object request initiated in support of a layered image
195 * object (to check for its existence before a write) will
196 * have which == BAD_WHICH and a non-null obj_request pointer.
197 *
198 * Finally, an object request for rbd image data will have
199 * which != BAD_WHICH, and will have a non-null img_request
200 * pointer. The value of which will be in the range
201 * 0..(img_request->obj_request_count-1).
202 */
203 union {
204 struct rbd_obj_request *obj_request; /* STAT op */
205 struct {
206 struct rbd_img_request *img_request;
207 u64 img_offset;
208 /* links for img_request->obj_requests list */
209 struct list_head links;
210 };
211 };
bf0d5f50
AE
212 u32 which; /* posn image request list */
213
214 enum obj_request_type type;
788e2df3
AE
215 union {
216 struct bio *bio_list;
217 struct {
218 struct page **pages;
219 u32 page_count;
220 };
221 };
0eefd470 222 struct page **copyup_pages;
bf0d5f50
AE
223
224 struct ceph_osd_request *osd_req;
225
226 u64 xferred; /* bytes transferred */
227 u64 version;
1b83bef2 228 int result;
bf0d5f50
AE
229
230 rbd_obj_callback_t callback;
788e2df3 231 struct completion completion;
bf0d5f50
AE
232
233 struct kref kref;
234};
235
0c425248 236enum img_req_flags {
9849e986
AE
237 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
238 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 239 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
240};
241
bf0d5f50 242struct rbd_img_request {
bf0d5f50
AE
243 struct rbd_device *rbd_dev;
244 u64 offset; /* starting image byte offset */
245 u64 length; /* byte count from offset */
0c425248 246 unsigned long flags;
bf0d5f50 247 union {
9849e986 248 u64 snap_id; /* for reads */
bf0d5f50 249 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
250 };
251 union {
252 struct request *rq; /* block request */
253 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 254 };
3d7efd18 255 struct page **copyup_pages;
bf0d5f50
AE
256 spinlock_t completion_lock;/* protects next_completion */
257 u32 next_completion;
258 rbd_img_callback_t callback;
55f27e09 259 u64 xferred;/* aggregate bytes transferred */
a5a337d4 260 int result; /* first nonzero obj_request result */
bf0d5f50
AE
261
262 u32 obj_request_count;
263 struct list_head obj_requests; /* rbd_obj_request structs */
264
265 struct kref kref;
266};
267
268#define for_each_obj_request(ireq, oreq) \
ef06f4d3 269 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 270#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 271 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 272#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 273 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 274
dfc5606d 275struct rbd_snap {
dfc5606d 276 const char *name;
3591538f 277 u64 size;
dfc5606d
YS
278 struct list_head node;
279 u64 id;
34b13184 280 u64 features;
dfc5606d
YS
281};
282
f84344f3 283struct rbd_mapping {
99c1f08f 284 u64 size;
34b13184 285 u64 features;
f84344f3
AE
286 bool read_only;
287};
288
602adf40
YS
289/*
290 * a single device
291 */
292struct rbd_device {
de71a297 293 int dev_id; /* blkdev unique id */
602adf40
YS
294
295 int major; /* blkdev assigned major */
296 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 297
a30b71b9 298 u32 image_format; /* Either 1 or 2 */
602adf40
YS
299 struct rbd_client *rbd_client;
300
301 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
302
b82d167b 303 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
304
305 struct rbd_image_header header;
b82d167b 306 unsigned long flags; /* possibly lock protected */
0d7dbfce 307 struct rbd_spec *spec;
602adf40 308
0d7dbfce 309 char *header_name;
971f839a 310
0903e875
AE
311 struct ceph_file_layout layout;
312
59c2be1e 313 struct ceph_osd_event *watch_event;
975241af 314 struct rbd_obj_request *watch_request;
59c2be1e 315
86b00e0d
AE
316 struct rbd_spec *parent_spec;
317 u64 parent_overlap;
2f82ee54 318 struct rbd_device *parent;
86b00e0d 319
c666601a
JD
320 /* protects updating the header */
321 struct rw_semaphore header_rwsem;
f84344f3
AE
322
323 struct rbd_mapping mapping;
602adf40
YS
324
325 struct list_head node;
dfc5606d
YS
326
327 /* list of snapshots */
328 struct list_head snaps;
329
330 /* sysfs related */
331 struct device dev;
b82d167b 332 unsigned long open_count; /* protected by lock */
dfc5606d
YS
333};
334
b82d167b
AE
335/*
336 * Flag bits for rbd_dev->flags. If atomicity is required,
337 * rbd_dev->lock is used to protect access.
338 *
339 * Currently, only the "removing" flag (which is coupled with the
340 * "open_count" field) requires atomic access.
341 */
6d292906
AE
342enum rbd_dev_flags {
343 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 344 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
345};
346
602adf40 347static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 348
602adf40 349static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
350static DEFINE_SPINLOCK(rbd_dev_list_lock);
351
432b8587
AE
352static LIST_HEAD(rbd_client_list); /* clients */
353static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 354
3d7efd18
AE
355static int rbd_img_request_submit(struct rbd_img_request *img_request);
356
304f6808 357static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
304f6808 358
200a6a8b 359static void rbd_dev_device_release(struct device *dev);
6087b51b 360static void rbd_snap_destroy(struct rbd_snap *snap);
dfc5606d 361
f0f8cef5
AE
362static ssize_t rbd_add(struct bus_type *bus, const char *buf,
363 size_t count);
364static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
365 size_t count);
71f293e2 366static int rbd_dev_image_probe(struct rbd_device *rbd_dev);
f0f8cef5
AE
367
368static struct bus_attribute rbd_bus_attrs[] = {
369 __ATTR(add, S_IWUSR, NULL, rbd_add),
370 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
371 __ATTR_NULL
372};
373
374static struct bus_type rbd_bus_type = {
375 .name = "rbd",
376 .bus_attrs = rbd_bus_attrs,
377};
378
379static void rbd_root_dev_release(struct device *dev)
380{
381}
382
383static struct device rbd_root_dev = {
384 .init_name = "rbd",
385 .release = rbd_root_dev_release,
386};
387
06ecc6cb
AE
388static __printf(2, 3)
389void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
390{
391 struct va_format vaf;
392 va_list args;
393
394 va_start(args, fmt);
395 vaf.fmt = fmt;
396 vaf.va = &args;
397
398 if (!rbd_dev)
399 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
400 else if (rbd_dev->disk)
401 printk(KERN_WARNING "%s: %s: %pV\n",
402 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
403 else if (rbd_dev->spec && rbd_dev->spec->image_name)
404 printk(KERN_WARNING "%s: image %s: %pV\n",
405 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
406 else if (rbd_dev->spec && rbd_dev->spec->image_id)
407 printk(KERN_WARNING "%s: id %s: %pV\n",
408 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
409 else /* punt */
410 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
411 RBD_DRV_NAME, rbd_dev, &vaf);
412 va_end(args);
413}
414
aafb230e
AE
415#ifdef RBD_DEBUG
416#define rbd_assert(expr) \
417 if (unlikely(!(expr))) { \
418 printk(KERN_ERR "\nAssertion failure in %s() " \
419 "at line %d:\n\n" \
420 "\trbd_assert(%s);\n\n", \
421 __func__, __LINE__, #expr); \
422 BUG(); \
423 }
424#else /* !RBD_DEBUG */
425# define rbd_assert(expr) ((void) 0)
426#endif /* !RBD_DEBUG */
dfc5606d 427
b454e36d 428static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
429static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
430static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 431
cc4a38bd
AE
432static int rbd_dev_refresh(struct rbd_device *rbd_dev);
433static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
59c2be1e 434
602adf40
YS
435static int rbd_open(struct block_device *bdev, fmode_t mode)
436{
f0f8cef5 437 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 438 bool removing = false;
602adf40 439
f84344f3 440 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
441 return -EROFS;
442
a14ea269 443 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
444 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
445 removing = true;
446 else
447 rbd_dev->open_count++;
a14ea269 448 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
449 if (removing)
450 return -ENOENT;
451
42382b70 452 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 453 (void) get_device(&rbd_dev->dev);
f84344f3 454 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 455 mutex_unlock(&ctl_mutex);
340c7a2b 456
602adf40
YS
457 return 0;
458}
459
dfc5606d
YS
460static int rbd_release(struct gendisk *disk, fmode_t mode)
461{
462 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
463 unsigned long open_count_before;
464
a14ea269 465 spin_lock_irq(&rbd_dev->lock);
b82d167b 466 open_count_before = rbd_dev->open_count--;
a14ea269 467 spin_unlock_irq(&rbd_dev->lock);
b82d167b 468 rbd_assert(open_count_before > 0);
dfc5606d 469
42382b70 470 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 471 put_device(&rbd_dev->dev);
42382b70 472 mutex_unlock(&ctl_mutex);
dfc5606d
YS
473
474 return 0;
475}
476
602adf40
YS
477static const struct block_device_operations rbd_bd_ops = {
478 .owner = THIS_MODULE,
479 .open = rbd_open,
dfc5606d 480 .release = rbd_release,
602adf40
YS
481};
482
483/*
484 * Initialize an rbd client instance.
43ae4701 485 * We own *ceph_opts.
602adf40 486 */
f8c38929 487static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
488{
489 struct rbd_client *rbdc;
490 int ret = -ENOMEM;
491
37206ee5 492 dout("%s:\n", __func__);
602adf40
YS
493 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
494 if (!rbdc)
495 goto out_opt;
496
497 kref_init(&rbdc->kref);
498 INIT_LIST_HEAD(&rbdc->node);
499
bc534d86
AE
500 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
501
43ae4701 502 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 503 if (IS_ERR(rbdc->client))
bc534d86 504 goto out_mutex;
43ae4701 505 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
506
507 ret = ceph_open_session(rbdc->client);
508 if (ret < 0)
509 goto out_err;
510
432b8587 511 spin_lock(&rbd_client_list_lock);
602adf40 512 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 513 spin_unlock(&rbd_client_list_lock);
602adf40 514
bc534d86 515 mutex_unlock(&ctl_mutex);
37206ee5 516 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 517
602adf40
YS
518 return rbdc;
519
520out_err:
521 ceph_destroy_client(rbdc->client);
bc534d86
AE
522out_mutex:
523 mutex_unlock(&ctl_mutex);
602adf40
YS
524 kfree(rbdc);
525out_opt:
43ae4701
AE
526 if (ceph_opts)
527 ceph_destroy_options(ceph_opts);
37206ee5
AE
528 dout("%s: error %d\n", __func__, ret);
529
28f259b7 530 return ERR_PTR(ret);
602adf40
YS
531}
532
2f82ee54
AE
533static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
534{
535 kref_get(&rbdc->kref);
536
537 return rbdc;
538}
539
602adf40 540/*
1f7ba331
AE
541 * Find a ceph client with specific addr and configuration. If
542 * found, bump its reference count.
602adf40 543 */
1f7ba331 544static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
545{
546 struct rbd_client *client_node;
1f7ba331 547 bool found = false;
602adf40 548
43ae4701 549 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
550 return NULL;
551
1f7ba331
AE
552 spin_lock(&rbd_client_list_lock);
553 list_for_each_entry(client_node, &rbd_client_list, node) {
554 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
555 __rbd_get_client(client_node);
556
1f7ba331
AE
557 found = true;
558 break;
559 }
560 }
561 spin_unlock(&rbd_client_list_lock);
562
563 return found ? client_node : NULL;
602adf40
YS
564}
565
59c2be1e
YS
566/*
567 * mount options
568 */
569enum {
59c2be1e
YS
570 Opt_last_int,
571 /* int args above */
572 Opt_last_string,
573 /* string args above */
cc0538b6
AE
574 Opt_read_only,
575 Opt_read_write,
576 /* Boolean args above */
577 Opt_last_bool,
59c2be1e
YS
578};
579
43ae4701 580static match_table_t rbd_opts_tokens = {
59c2be1e
YS
581 /* int args above */
582 /* string args above */
be466c1c 583 {Opt_read_only, "read_only"},
cc0538b6
AE
584 {Opt_read_only, "ro"}, /* Alternate spelling */
585 {Opt_read_write, "read_write"},
586 {Opt_read_write, "rw"}, /* Alternate spelling */
587 /* Boolean args above */
59c2be1e
YS
588 {-1, NULL}
589};
590
98571b5a
AE
591struct rbd_options {
592 bool read_only;
593};
594
595#define RBD_READ_ONLY_DEFAULT false
596
59c2be1e
YS
597static int parse_rbd_opts_token(char *c, void *private)
598{
43ae4701 599 struct rbd_options *rbd_opts = private;
59c2be1e
YS
600 substring_t argstr[MAX_OPT_ARGS];
601 int token, intval, ret;
602
43ae4701 603 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
604 if (token < 0)
605 return -EINVAL;
606
607 if (token < Opt_last_int) {
608 ret = match_int(&argstr[0], &intval);
609 if (ret < 0) {
610 pr_err("bad mount option arg (not int) "
611 "at '%s'\n", c);
612 return ret;
613 }
614 dout("got int token %d val %d\n", token, intval);
615 } else if (token > Opt_last_int && token < Opt_last_string) {
616 dout("got string token %d val %s\n", token,
617 argstr[0].from);
cc0538b6
AE
618 } else if (token > Opt_last_string && token < Opt_last_bool) {
619 dout("got Boolean token %d\n", token);
59c2be1e
YS
620 } else {
621 dout("got token %d\n", token);
622 }
623
624 switch (token) {
cc0538b6
AE
625 case Opt_read_only:
626 rbd_opts->read_only = true;
627 break;
628 case Opt_read_write:
629 rbd_opts->read_only = false;
630 break;
59c2be1e 631 default:
aafb230e
AE
632 rbd_assert(false);
633 break;
59c2be1e
YS
634 }
635 return 0;
636}
637
602adf40
YS
638/*
639 * Get a ceph client with specific addr and configuration, if one does
640 * not exist create it.
641 */
9d3997fd 642static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 643{
f8c38929 644 struct rbd_client *rbdc;
59c2be1e 645
1f7ba331 646 rbdc = rbd_client_find(ceph_opts);
9d3997fd 647 if (rbdc) /* using an existing client */
43ae4701 648 ceph_destroy_options(ceph_opts);
9d3997fd 649 else
f8c38929 650 rbdc = rbd_client_create(ceph_opts);
602adf40 651
9d3997fd 652 return rbdc;
602adf40
YS
653}
654
655/*
656 * Destroy ceph client
d23a4b3f 657 *
432b8587 658 * Caller must hold rbd_client_list_lock.
602adf40
YS
659 */
660static void rbd_client_release(struct kref *kref)
661{
662 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
663
37206ee5 664 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 665 spin_lock(&rbd_client_list_lock);
602adf40 666 list_del(&rbdc->node);
cd9d9f5d 667 spin_unlock(&rbd_client_list_lock);
602adf40
YS
668
669 ceph_destroy_client(rbdc->client);
670 kfree(rbdc);
671}
672
673/*
674 * Drop reference to ceph client node. If it's not referenced anymore, release
675 * it.
676 */
9d3997fd 677static void rbd_put_client(struct rbd_client *rbdc)
602adf40 678{
c53d5893
AE
679 if (rbdc)
680 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
681}
682
a30b71b9
AE
683static bool rbd_image_format_valid(u32 image_format)
684{
685 return image_format == 1 || image_format == 2;
686}
687
8e94af8e
AE
688static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
689{
103a150f
AE
690 size_t size;
691 u32 snap_count;
692
693 /* The header has to start with the magic rbd header text */
694 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
695 return false;
696
db2388b6
AE
697 /* The bio layer requires at least sector-sized I/O */
698
699 if (ondisk->options.order < SECTOR_SHIFT)
700 return false;
701
702 /* If we use u64 in a few spots we may be able to loosen this */
703
704 if (ondisk->options.order > 8 * sizeof (int) - 1)
705 return false;
706
103a150f
AE
707 /*
708 * The size of a snapshot header has to fit in a size_t, and
709 * that limits the number of snapshots.
710 */
711 snap_count = le32_to_cpu(ondisk->snap_count);
712 size = SIZE_MAX - sizeof (struct ceph_snap_context);
713 if (snap_count > size / sizeof (__le64))
714 return false;
715
716 /*
717 * Not only that, but the size of the entire the snapshot
718 * header must also be representable in a size_t.
719 */
720 size -= snap_count * sizeof (__le64);
721 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
722 return false;
723
724 return true;
8e94af8e
AE
725}
726
602adf40
YS
727/*
728 * Create a new header structure, translate header format from the on-disk
729 * header.
730 */
731static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 732 struct rbd_image_header_ondisk *ondisk)
602adf40 733{
ccece235 734 u32 snap_count;
58c17b0e 735 size_t len;
d2bb24e5 736 size_t size;
621901d6 737 u32 i;
602adf40 738
6a52325f
AE
739 memset(header, 0, sizeof (*header));
740
103a150f
AE
741 snap_count = le32_to_cpu(ondisk->snap_count);
742
58c17b0e
AE
743 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
744 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 745 if (!header->object_prefix)
602adf40 746 return -ENOMEM;
58c17b0e
AE
747 memcpy(header->object_prefix, ondisk->object_prefix, len);
748 header->object_prefix[len] = '\0';
00f1f36f 749
602adf40 750 if (snap_count) {
f785cc1d
AE
751 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
752
621901d6
AE
753 /* Save a copy of the snapshot names */
754
f785cc1d
AE
755 if (snap_names_len > (u64) SIZE_MAX)
756 return -EIO;
757 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 758 if (!header->snap_names)
6a52325f 759 goto out_err;
f785cc1d
AE
760 /*
761 * Note that rbd_dev_v1_header_read() guarantees
762 * the ondisk buffer we're working with has
763 * snap_names_len bytes beyond the end of the
764 * snapshot id array, this memcpy() is safe.
765 */
766 memcpy(header->snap_names, &ondisk->snaps[snap_count],
767 snap_names_len);
6a52325f 768
621901d6
AE
769 /* Record each snapshot's size */
770
d2bb24e5
AE
771 size = snap_count * sizeof (*header->snap_sizes);
772 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 773 if (!header->snap_sizes)
6a52325f 774 goto out_err;
621901d6
AE
775 for (i = 0; i < snap_count; i++)
776 header->snap_sizes[i] =
777 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40
YS
778 } else {
779 header->snap_names = NULL;
780 header->snap_sizes = NULL;
781 }
849b4260 782
34b13184 783 header->features = 0; /* No features support in v1 images */
602adf40
YS
784 header->obj_order = ondisk->options.order;
785 header->crypt_type = ondisk->options.crypt_type;
786 header->comp_type = ondisk->options.comp_type;
6a52325f 787
621901d6
AE
788 /* Allocate and fill in the snapshot context */
789
f84344f3 790 header->image_size = le64_to_cpu(ondisk->image_size);
468521c1 791
812164f8 792 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6a52325f
AE
793 if (!header->snapc)
794 goto out_err;
505cbb9b 795 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
621901d6 796 for (i = 0; i < snap_count; i++)
468521c1 797 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
798
799 return 0;
800
6a52325f 801out_err:
849b4260 802 kfree(header->snap_sizes);
ccece235 803 header->snap_sizes = NULL;
602adf40 804 kfree(header->snap_names);
ccece235 805 header->snap_names = NULL;
6a52325f
AE
806 kfree(header->object_prefix);
807 header->object_prefix = NULL;
ccece235 808
00f1f36f 809 return -ENOMEM;
602adf40
YS
810}
811
9e15b77d
AE
812static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
813{
814 struct rbd_snap *snap;
815
816 if (snap_id == CEPH_NOSNAP)
817 return RBD_SNAP_HEAD_NAME;
818
819 list_for_each_entry(snap, &rbd_dev->snaps, node)
820 if (snap_id == snap->id)
821 return snap->name;
822
823 return NULL;
824}
825
8b0241f8
AE
826static struct rbd_snap *snap_by_name(struct rbd_device *rbd_dev,
827 const char *snap_name)
602adf40 828{
e86924a8 829 struct rbd_snap *snap;
602adf40 830
8b0241f8
AE
831 list_for_each_entry(snap, &rbd_dev->snaps, node)
832 if (!strcmp(snap_name, snap->name))
833 return snap;
e86924a8 834
8b0241f8 835 return NULL;
602adf40
YS
836}
837
d1cf5788 838static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
602adf40 839{
0d7dbfce 840 if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
cc9d734c 841 sizeof (RBD_SNAP_HEAD_NAME))) {
99c1f08f 842 rbd_dev->mapping.size = rbd_dev->header.image_size;
34b13184 843 rbd_dev->mapping.features = rbd_dev->header.features;
602adf40 844 } else {
8b0241f8
AE
845 struct rbd_snap *snap;
846
847 snap = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
848 if (!snap)
849 return -ENOENT;
8b0241f8
AE
850 rbd_dev->mapping.size = snap->size;
851 rbd_dev->mapping.features = snap->features;
f84344f3 852 rbd_dev->mapping.read_only = true;
602adf40 853 }
6d292906 854
8b0241f8 855 return 0;
602adf40
YS
856}
857
d1cf5788
AE
858static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
859{
860 rbd_dev->mapping.size = 0;
861 rbd_dev->mapping.features = 0;
862 rbd_dev->mapping.read_only = true;
863}
864
200a6a8b
AE
865static void rbd_dev_clear_mapping(struct rbd_device *rbd_dev)
866{
867 rbd_dev->mapping.size = 0;
868 rbd_dev->mapping.features = 0;
869 rbd_dev->mapping.read_only = true;
870}
871
98571b5a 872static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 873{
65ccfe21
AE
874 char *name;
875 u64 segment;
876 int ret;
602adf40 877
2fd82b9e 878 name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
65ccfe21
AE
879 if (!name)
880 return NULL;
881 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 882 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 883 rbd_dev->header.object_prefix, segment);
2fd82b9e 884 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
885 pr_err("error formatting segment name for #%llu (%d)\n",
886 segment, ret);
887 kfree(name);
888 name = NULL;
889 }
602adf40 890
65ccfe21
AE
891 return name;
892}
602adf40 893
65ccfe21
AE
894static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
895{
896 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 897
65ccfe21
AE
898 return offset & (segment_size - 1);
899}
900
901static u64 rbd_segment_length(struct rbd_device *rbd_dev,
902 u64 offset, u64 length)
903{
904 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
905
906 offset &= segment_size - 1;
907
aafb230e 908 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
909 if (offset + length > segment_size)
910 length = segment_size - offset;
911
912 return length;
602adf40
YS
913}
914
029bcbd8
JD
915/*
916 * returns the size of an object in the image
917 */
918static u64 rbd_obj_bytes(struct rbd_image_header *header)
919{
920 return 1 << header->obj_order;
921}
922
602adf40
YS
923/*
924 * bio helpers
925 */
926
927static void bio_chain_put(struct bio *chain)
928{
929 struct bio *tmp;
930
931 while (chain) {
932 tmp = chain;
933 chain = chain->bi_next;
934 bio_put(tmp);
935 }
936}
937
938/*
939 * zeros a bio chain, starting at specific offset
940 */
941static void zero_bio_chain(struct bio *chain, int start_ofs)
942{
943 struct bio_vec *bv;
944 unsigned long flags;
945 void *buf;
946 int i;
947 int pos = 0;
948
949 while (chain) {
950 bio_for_each_segment(bv, chain, i) {
951 if (pos + bv->bv_len > start_ofs) {
952 int remainder = max(start_ofs - pos, 0);
953 buf = bvec_kmap_irq(bv, &flags);
954 memset(buf + remainder, 0,
955 bv->bv_len - remainder);
85b5aaa6 956 bvec_kunmap_irq(buf, &flags);
602adf40
YS
957 }
958 pos += bv->bv_len;
959 }
960
961 chain = chain->bi_next;
962 }
963}
964
b9434c5b
AE
965/*
966 * similar to zero_bio_chain(), zeros data defined by a page array,
967 * starting at the given byte offset from the start of the array and
968 * continuing up to the given end offset. The pages array is
969 * assumed to be big enough to hold all bytes up to the end.
970 */
971static void zero_pages(struct page **pages, u64 offset, u64 end)
972{
973 struct page **page = &pages[offset >> PAGE_SHIFT];
974
975 rbd_assert(end > offset);
976 rbd_assert(end - offset <= (u64)SIZE_MAX);
977 while (offset < end) {
978 size_t page_offset;
979 size_t length;
980 unsigned long flags;
981 void *kaddr;
982
983 page_offset = (size_t)(offset & ~PAGE_MASK);
984 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
985 local_irq_save(flags);
986 kaddr = kmap_atomic(*page);
987 memset(kaddr + page_offset, 0, length);
988 kunmap_atomic(kaddr);
989 local_irq_restore(flags);
990
991 offset += length;
992 page++;
993 }
994}
995
602adf40 996/*
f7760dad
AE
997 * Clone a portion of a bio, starting at the given byte offset
998 * and continuing for the number of bytes indicated.
602adf40 999 */
f7760dad
AE
1000static struct bio *bio_clone_range(struct bio *bio_src,
1001 unsigned int offset,
1002 unsigned int len,
1003 gfp_t gfpmask)
602adf40 1004{
f7760dad
AE
1005 struct bio_vec *bv;
1006 unsigned int resid;
1007 unsigned short idx;
1008 unsigned int voff;
1009 unsigned short end_idx;
1010 unsigned short vcnt;
1011 struct bio *bio;
1012
1013 /* Handle the easy case for the caller */
1014
1015 if (!offset && len == bio_src->bi_size)
1016 return bio_clone(bio_src, gfpmask);
1017
1018 if (WARN_ON_ONCE(!len))
1019 return NULL;
1020 if (WARN_ON_ONCE(len > bio_src->bi_size))
1021 return NULL;
1022 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1023 return NULL;
1024
1025 /* Find first affected segment... */
1026
1027 resid = offset;
1028 __bio_for_each_segment(bv, bio_src, idx, 0) {
1029 if (resid < bv->bv_len)
1030 break;
1031 resid -= bv->bv_len;
602adf40 1032 }
f7760dad 1033 voff = resid;
602adf40 1034
f7760dad 1035 /* ...and the last affected segment */
602adf40 1036
f7760dad
AE
1037 resid += len;
1038 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1039 if (resid <= bv->bv_len)
1040 break;
1041 resid -= bv->bv_len;
1042 }
1043 vcnt = end_idx - idx + 1;
1044
1045 /* Build the clone */
1046
1047 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1048 if (!bio)
1049 return NULL; /* ENOMEM */
602adf40 1050
f7760dad
AE
1051 bio->bi_bdev = bio_src->bi_bdev;
1052 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1053 bio->bi_rw = bio_src->bi_rw;
1054 bio->bi_flags |= 1 << BIO_CLONED;
1055
1056 /*
1057 * Copy over our part of the bio_vec, then update the first
1058 * and last (or only) entries.
1059 */
1060 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1061 vcnt * sizeof (struct bio_vec));
1062 bio->bi_io_vec[0].bv_offset += voff;
1063 if (vcnt > 1) {
1064 bio->bi_io_vec[0].bv_len -= voff;
1065 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1066 } else {
1067 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1068 }
1069
f7760dad
AE
1070 bio->bi_vcnt = vcnt;
1071 bio->bi_size = len;
1072 bio->bi_idx = 0;
1073
1074 return bio;
1075}
1076
1077/*
1078 * Clone a portion of a bio chain, starting at the given byte offset
1079 * into the first bio in the source chain and continuing for the
1080 * number of bytes indicated. The result is another bio chain of
1081 * exactly the given length, or a null pointer on error.
1082 *
1083 * The bio_src and offset parameters are both in-out. On entry they
1084 * refer to the first source bio and the offset into that bio where
1085 * the start of data to be cloned is located.
1086 *
1087 * On return, bio_src is updated to refer to the bio in the source
1088 * chain that contains first un-cloned byte, and *offset will
1089 * contain the offset of that byte within that bio.
1090 */
1091static struct bio *bio_chain_clone_range(struct bio **bio_src,
1092 unsigned int *offset,
1093 unsigned int len,
1094 gfp_t gfpmask)
1095{
1096 struct bio *bi = *bio_src;
1097 unsigned int off = *offset;
1098 struct bio *chain = NULL;
1099 struct bio **end;
1100
1101 /* Build up a chain of clone bios up to the limit */
1102
1103 if (!bi || off >= bi->bi_size || !len)
1104 return NULL; /* Nothing to clone */
602adf40 1105
f7760dad
AE
1106 end = &chain;
1107 while (len) {
1108 unsigned int bi_size;
1109 struct bio *bio;
1110
f5400b7a
AE
1111 if (!bi) {
1112 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1113 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1114 }
f7760dad
AE
1115 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1116 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1117 if (!bio)
1118 goto out_err; /* ENOMEM */
1119
1120 *end = bio;
1121 end = &bio->bi_next;
602adf40 1122
f7760dad
AE
1123 off += bi_size;
1124 if (off == bi->bi_size) {
1125 bi = bi->bi_next;
1126 off = 0;
1127 }
1128 len -= bi_size;
1129 }
1130 *bio_src = bi;
1131 *offset = off;
1132
1133 return chain;
1134out_err:
1135 bio_chain_put(chain);
602adf40 1136
602adf40
YS
1137 return NULL;
1138}
1139
926f9b3f
AE
1140/*
1141 * The default/initial value for all object request flags is 0. For
1142 * each flag, once its value is set to 1 it is never reset to 0
1143 * again.
1144 */
57acbaa7 1145static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1146{
57acbaa7 1147 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1148 struct rbd_device *rbd_dev;
1149
57acbaa7
AE
1150 rbd_dev = obj_request->img_request->rbd_dev;
1151 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1152 obj_request);
1153 }
1154}
1155
57acbaa7 1156static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1157{
1158 smp_mb();
57acbaa7 1159 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1160}
1161
57acbaa7 1162static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1163{
57acbaa7
AE
1164 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1165 struct rbd_device *rbd_dev = NULL;
6365d33a 1166
57acbaa7
AE
1167 if (obj_request_img_data_test(obj_request))
1168 rbd_dev = obj_request->img_request->rbd_dev;
1169 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1170 obj_request);
1171 }
1172}
1173
57acbaa7 1174static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1175{
1176 smp_mb();
57acbaa7 1177 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1178}
1179
5679c59f
AE
1180/*
1181 * This sets the KNOWN flag after (possibly) setting the EXISTS
1182 * flag. The latter is set based on the "exists" value provided.
1183 *
1184 * Note that for our purposes once an object exists it never goes
1185 * away again. It's possible that the response from two existence
1186 * checks are separated by the creation of the target object, and
1187 * the first ("doesn't exist") response arrives *after* the second
1188 * ("does exist"). In that case we ignore the second one.
1189 */
1190static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1191 bool exists)
1192{
1193 if (exists)
1194 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1195 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1196 smp_mb();
1197}
1198
1199static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1200{
1201 smp_mb();
1202 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1203}
1204
1205static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1206{
1207 smp_mb();
1208 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1209}
1210
bf0d5f50
AE
1211static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1212{
37206ee5
AE
1213 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1214 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1215 kref_get(&obj_request->kref);
1216}
1217
1218static void rbd_obj_request_destroy(struct kref *kref);
1219static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1220{
1221 rbd_assert(obj_request != NULL);
37206ee5
AE
1222 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1223 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1224 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1225}
1226
1227static void rbd_img_request_get(struct rbd_img_request *img_request)
1228{
37206ee5
AE
1229 dout("%s: img %p (was %d)\n", __func__, img_request,
1230 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1231 kref_get(&img_request->kref);
1232}
1233
1234static void rbd_img_request_destroy(struct kref *kref);
1235static void rbd_img_request_put(struct rbd_img_request *img_request)
1236{
1237 rbd_assert(img_request != NULL);
37206ee5
AE
1238 dout("%s: img %p (was %d)\n", __func__, img_request,
1239 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1240 kref_put(&img_request->kref, rbd_img_request_destroy);
1241}
1242
1243static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1244 struct rbd_obj_request *obj_request)
1245{
25dcf954
AE
1246 rbd_assert(obj_request->img_request == NULL);
1247
b155e86c 1248 /* Image request now owns object's original reference */
bf0d5f50 1249 obj_request->img_request = img_request;
25dcf954 1250 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1251 rbd_assert(!obj_request_img_data_test(obj_request));
1252 obj_request_img_data_set(obj_request);
bf0d5f50 1253 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1254 img_request->obj_request_count++;
1255 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1256 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1257 obj_request->which);
bf0d5f50
AE
1258}
1259
1260static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1261 struct rbd_obj_request *obj_request)
1262{
1263 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1264
37206ee5
AE
1265 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1266 obj_request->which);
bf0d5f50 1267 list_del(&obj_request->links);
25dcf954
AE
1268 rbd_assert(img_request->obj_request_count > 0);
1269 img_request->obj_request_count--;
1270 rbd_assert(obj_request->which == img_request->obj_request_count);
1271 obj_request->which = BAD_WHICH;
6365d33a 1272 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1273 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1274 obj_request->img_request = NULL;
25dcf954 1275 obj_request->callback = NULL;
bf0d5f50
AE
1276 rbd_obj_request_put(obj_request);
1277}
1278
1279static bool obj_request_type_valid(enum obj_request_type type)
1280{
1281 switch (type) {
9969ebc5 1282 case OBJ_REQUEST_NODATA:
bf0d5f50 1283 case OBJ_REQUEST_BIO:
788e2df3 1284 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1285 return true;
1286 default:
1287 return false;
1288 }
1289}
1290
bf0d5f50
AE
1291static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1292 struct rbd_obj_request *obj_request)
1293{
37206ee5
AE
1294 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1295
bf0d5f50
AE
1296 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1297}
1298
1299static void rbd_img_request_complete(struct rbd_img_request *img_request)
1300{
55f27e09 1301
37206ee5 1302 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1303
1304 /*
1305 * If no error occurred, compute the aggregate transfer
1306 * count for the image request. We could instead use
1307 * atomic64_cmpxchg() to update it as each object request
1308 * completes; not clear which way is better off hand.
1309 */
1310 if (!img_request->result) {
1311 struct rbd_obj_request *obj_request;
1312 u64 xferred = 0;
1313
1314 for_each_obj_request(img_request, obj_request)
1315 xferred += obj_request->xferred;
1316 img_request->xferred = xferred;
1317 }
1318
bf0d5f50
AE
1319 if (img_request->callback)
1320 img_request->callback(img_request);
1321 else
1322 rbd_img_request_put(img_request);
1323}
1324
788e2df3
AE
1325/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1326
1327static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1328{
37206ee5
AE
1329 dout("%s: obj %p\n", __func__, obj_request);
1330
788e2df3
AE
1331 return wait_for_completion_interruptible(&obj_request->completion);
1332}
1333
0c425248
AE
1334/*
1335 * The default/initial value for all image request flags is 0. Each
1336 * is conditionally set to 1 at image request initialization time
1337 * and currently never change thereafter.
1338 */
1339static void img_request_write_set(struct rbd_img_request *img_request)
1340{
1341 set_bit(IMG_REQ_WRITE, &img_request->flags);
1342 smp_mb();
1343}
1344
1345static bool img_request_write_test(struct rbd_img_request *img_request)
1346{
1347 smp_mb();
1348 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1349}
1350
9849e986
AE
1351static void img_request_child_set(struct rbd_img_request *img_request)
1352{
1353 set_bit(IMG_REQ_CHILD, &img_request->flags);
1354 smp_mb();
1355}
1356
1357static bool img_request_child_test(struct rbd_img_request *img_request)
1358{
1359 smp_mb();
1360 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1361}
1362
d0b2e944
AE
1363static void img_request_layered_set(struct rbd_img_request *img_request)
1364{
1365 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1366 smp_mb();
1367}
1368
1369static bool img_request_layered_test(struct rbd_img_request *img_request)
1370{
1371 smp_mb();
1372 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1373}
1374
6e2a4505
AE
1375static void
1376rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1377{
b9434c5b
AE
1378 u64 xferred = obj_request->xferred;
1379 u64 length = obj_request->length;
1380
6e2a4505
AE
1381 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1382 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1383 xferred, length);
6e2a4505
AE
1384 /*
1385 * ENOENT means a hole in the image. We zero-fill the
1386 * entire length of the request. A short read also implies
1387 * zero-fill to the end of the request. Either way we
1388 * update the xferred count to indicate the whole request
1389 * was satisfied.
1390 */
b9434c5b 1391 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1392 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1393 if (obj_request->type == OBJ_REQUEST_BIO)
1394 zero_bio_chain(obj_request->bio_list, 0);
1395 else
1396 zero_pages(obj_request->pages, 0, length);
6e2a4505 1397 obj_request->result = 0;
b9434c5b
AE
1398 obj_request->xferred = length;
1399 } else if (xferred < length && !obj_request->result) {
1400 if (obj_request->type == OBJ_REQUEST_BIO)
1401 zero_bio_chain(obj_request->bio_list, xferred);
1402 else
1403 zero_pages(obj_request->pages, xferred, length);
1404 obj_request->xferred = length;
6e2a4505
AE
1405 }
1406 obj_request_done_set(obj_request);
1407}
1408
bf0d5f50
AE
1409static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1410{
37206ee5
AE
1411 dout("%s: obj %p cb %p\n", __func__, obj_request,
1412 obj_request->callback);
bf0d5f50
AE
1413 if (obj_request->callback)
1414 obj_request->callback(obj_request);
788e2df3
AE
1415 else
1416 complete_all(&obj_request->completion);
bf0d5f50
AE
1417}
1418
c47f9371 1419static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1420{
1421 dout("%s: obj %p\n", __func__, obj_request);
1422 obj_request_done_set(obj_request);
1423}
1424
c47f9371 1425static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1426{
57acbaa7 1427 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1428 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1429 bool layered = false;
1430
1431 if (obj_request_img_data_test(obj_request)) {
1432 img_request = obj_request->img_request;
1433 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1434 rbd_dev = img_request->rbd_dev;
57acbaa7 1435 }
8b3e1a56
AE
1436
1437 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1438 obj_request, img_request, obj_request->result,
1439 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1440 if (layered && obj_request->result == -ENOENT &&
1441 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1442 rbd_img_parent_read(obj_request);
1443 else if (img_request)
6e2a4505
AE
1444 rbd_img_obj_request_read_callback(obj_request);
1445 else
1446 obj_request_done_set(obj_request);
bf0d5f50
AE
1447}
1448
c47f9371 1449static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1450{
1b83bef2
SW
1451 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1452 obj_request->result, obj_request->length);
1453 /*
8b3e1a56
AE
1454 * There is no such thing as a successful short write. Set
1455 * it to our originally-requested length.
1b83bef2
SW
1456 */
1457 obj_request->xferred = obj_request->length;
07741308 1458 obj_request_done_set(obj_request);
bf0d5f50
AE
1459}
1460
fbfab539
AE
1461/*
1462 * For a simple stat call there's nothing to do. We'll do more if
1463 * this is part of a write sequence for a layered image.
1464 */
c47f9371 1465static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1466{
37206ee5 1467 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1468 obj_request_done_set(obj_request);
1469}
1470
bf0d5f50
AE
1471static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1472 struct ceph_msg *msg)
1473{
1474 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1475 u16 opcode;
1476
37206ee5 1477 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1478 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1479 if (obj_request_img_data_test(obj_request)) {
1480 rbd_assert(obj_request->img_request);
1481 rbd_assert(obj_request->which != BAD_WHICH);
1482 } else {
1483 rbd_assert(obj_request->which == BAD_WHICH);
1484 }
bf0d5f50 1485
1b83bef2
SW
1486 if (osd_req->r_result < 0)
1487 obj_request->result = osd_req->r_result;
bf0d5f50
AE
1488 obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1489
0eefd470 1490 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1491
c47f9371
AE
1492 /*
1493 * We support a 64-bit length, but ultimately it has to be
1494 * passed to blk_end_request(), which takes an unsigned int.
1495 */
1b83bef2 1496 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1497 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1498 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1499 switch (opcode) {
1500 case CEPH_OSD_OP_READ:
c47f9371 1501 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1502 break;
1503 case CEPH_OSD_OP_WRITE:
c47f9371 1504 rbd_osd_write_callback(obj_request);
bf0d5f50 1505 break;
fbfab539 1506 case CEPH_OSD_OP_STAT:
c47f9371 1507 rbd_osd_stat_callback(obj_request);
fbfab539 1508 break;
36be9a76 1509 case CEPH_OSD_OP_CALL:
b8d70035 1510 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1511 case CEPH_OSD_OP_WATCH:
c47f9371 1512 rbd_osd_trivial_callback(obj_request);
9969ebc5 1513 break;
bf0d5f50
AE
1514 default:
1515 rbd_warn(NULL, "%s: unsupported op %hu\n",
1516 obj_request->object_name, (unsigned short) opcode);
1517 break;
1518 }
1519
07741308 1520 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1521 rbd_obj_request_complete(obj_request);
1522}
1523
9d4df01f 1524static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1525{
1526 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1527 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1528 u64 snap_id;
430c28c3 1529
8c042b0d 1530 rbd_assert(osd_req != NULL);
430c28c3 1531
9d4df01f 1532 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1533 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1534 NULL, snap_id, NULL);
1535}
1536
1537static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1538{
1539 struct rbd_img_request *img_request = obj_request->img_request;
1540 struct ceph_osd_request *osd_req = obj_request->osd_req;
1541 struct ceph_snap_context *snapc;
1542 struct timespec mtime = CURRENT_TIME;
1543
1544 rbd_assert(osd_req != NULL);
1545
1546 snapc = img_request ? img_request->snapc : NULL;
1547 ceph_osdc_build_request(osd_req, obj_request->offset,
1548 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1549}
1550
bf0d5f50
AE
1551static struct ceph_osd_request *rbd_osd_req_create(
1552 struct rbd_device *rbd_dev,
1553 bool write_request,
430c28c3 1554 struct rbd_obj_request *obj_request)
bf0d5f50 1555{
bf0d5f50
AE
1556 struct ceph_snap_context *snapc = NULL;
1557 struct ceph_osd_client *osdc;
1558 struct ceph_osd_request *osd_req;
bf0d5f50 1559
6365d33a
AE
1560 if (obj_request_img_data_test(obj_request)) {
1561 struct rbd_img_request *img_request = obj_request->img_request;
1562
0c425248
AE
1563 rbd_assert(write_request ==
1564 img_request_write_test(img_request));
1565 if (write_request)
bf0d5f50 1566 snapc = img_request->snapc;
bf0d5f50
AE
1567 }
1568
1569 /* Allocate and initialize the request, for the single op */
1570
1571 osdc = &rbd_dev->rbd_client->client->osdc;
1572 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1573 if (!osd_req)
1574 return NULL; /* ENOMEM */
bf0d5f50 1575
430c28c3 1576 if (write_request)
bf0d5f50 1577 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1578 else
bf0d5f50 1579 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1580
1581 osd_req->r_callback = rbd_osd_req_callback;
1582 osd_req->r_priv = obj_request;
1583
1584 osd_req->r_oid_len = strlen(obj_request->object_name);
1585 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1586 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1587
1588 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1589
bf0d5f50
AE
1590 return osd_req;
1591}
1592
0eefd470
AE
1593/*
1594 * Create a copyup osd request based on the information in the
1595 * object request supplied. A copyup request has two osd ops,
1596 * a copyup method call, and a "normal" write request.
1597 */
1598static struct ceph_osd_request *
1599rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1600{
1601 struct rbd_img_request *img_request;
1602 struct ceph_snap_context *snapc;
1603 struct rbd_device *rbd_dev;
1604 struct ceph_osd_client *osdc;
1605 struct ceph_osd_request *osd_req;
1606
1607 rbd_assert(obj_request_img_data_test(obj_request));
1608 img_request = obj_request->img_request;
1609 rbd_assert(img_request);
1610 rbd_assert(img_request_write_test(img_request));
1611
1612 /* Allocate and initialize the request, for the two ops */
1613
1614 snapc = img_request->snapc;
1615 rbd_dev = img_request->rbd_dev;
1616 osdc = &rbd_dev->rbd_client->client->osdc;
1617 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1618 if (!osd_req)
1619 return NULL; /* ENOMEM */
1620
1621 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1622 osd_req->r_callback = rbd_osd_req_callback;
1623 osd_req->r_priv = obj_request;
1624
1625 osd_req->r_oid_len = strlen(obj_request->object_name);
1626 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1627 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1628
1629 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1630
1631 return osd_req;
1632}
1633
1634
bf0d5f50
AE
1635static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1636{
1637 ceph_osdc_put_request(osd_req);
1638}
1639
1640/* object_name is assumed to be a non-null pointer and NUL-terminated */
1641
1642static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1643 u64 offset, u64 length,
1644 enum obj_request_type type)
1645{
1646 struct rbd_obj_request *obj_request;
1647 size_t size;
1648 char *name;
1649
1650 rbd_assert(obj_request_type_valid(type));
1651
1652 size = strlen(object_name) + 1;
1653 obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1654 if (!obj_request)
1655 return NULL;
1656
1657 name = (char *)(obj_request + 1);
1658 obj_request->object_name = memcpy(name, object_name, size);
1659 obj_request->offset = offset;
1660 obj_request->length = length;
926f9b3f 1661 obj_request->flags = 0;
bf0d5f50
AE
1662 obj_request->which = BAD_WHICH;
1663 obj_request->type = type;
1664 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1665 init_completion(&obj_request->completion);
bf0d5f50
AE
1666 kref_init(&obj_request->kref);
1667
37206ee5
AE
1668 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1669 offset, length, (int)type, obj_request);
1670
bf0d5f50
AE
1671 return obj_request;
1672}
1673
1674static void rbd_obj_request_destroy(struct kref *kref)
1675{
1676 struct rbd_obj_request *obj_request;
1677
1678 obj_request = container_of(kref, struct rbd_obj_request, kref);
1679
37206ee5
AE
1680 dout("%s: obj %p\n", __func__, obj_request);
1681
bf0d5f50
AE
1682 rbd_assert(obj_request->img_request == NULL);
1683 rbd_assert(obj_request->which == BAD_WHICH);
1684
1685 if (obj_request->osd_req)
1686 rbd_osd_req_destroy(obj_request->osd_req);
1687
1688 rbd_assert(obj_request_type_valid(obj_request->type));
1689 switch (obj_request->type) {
9969ebc5
AE
1690 case OBJ_REQUEST_NODATA:
1691 break; /* Nothing to do */
bf0d5f50
AE
1692 case OBJ_REQUEST_BIO:
1693 if (obj_request->bio_list)
1694 bio_chain_put(obj_request->bio_list);
1695 break;
788e2df3
AE
1696 case OBJ_REQUEST_PAGES:
1697 if (obj_request->pages)
1698 ceph_release_page_vector(obj_request->pages,
1699 obj_request->page_count);
1700 break;
bf0d5f50
AE
1701 }
1702
1703 kfree(obj_request);
1704}
1705
1706/*
1707 * Caller is responsible for filling in the list of object requests
1708 * that comprises the image request, and the Linux request pointer
1709 * (if there is one).
1710 */
cc344fa1
AE
1711static struct rbd_img_request *rbd_img_request_create(
1712 struct rbd_device *rbd_dev,
bf0d5f50 1713 u64 offset, u64 length,
9849e986
AE
1714 bool write_request,
1715 bool child_request)
bf0d5f50
AE
1716{
1717 struct rbd_img_request *img_request;
bf0d5f50
AE
1718
1719 img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1720 if (!img_request)
1721 return NULL;
1722
1723 if (write_request) {
1724 down_read(&rbd_dev->header_rwsem);
812164f8 1725 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1726 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1727 }
1728
1729 img_request->rq = NULL;
1730 img_request->rbd_dev = rbd_dev;
1731 img_request->offset = offset;
1732 img_request->length = length;
0c425248
AE
1733 img_request->flags = 0;
1734 if (write_request) {
1735 img_request_write_set(img_request);
468521c1 1736 img_request->snapc = rbd_dev->header.snapc;
0c425248 1737 } else {
bf0d5f50 1738 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1739 }
9849e986
AE
1740 if (child_request)
1741 img_request_child_set(img_request);
d0b2e944
AE
1742 if (rbd_dev->parent_spec)
1743 img_request_layered_set(img_request);
bf0d5f50
AE
1744 spin_lock_init(&img_request->completion_lock);
1745 img_request->next_completion = 0;
1746 img_request->callback = NULL;
a5a337d4 1747 img_request->result = 0;
bf0d5f50
AE
1748 img_request->obj_request_count = 0;
1749 INIT_LIST_HEAD(&img_request->obj_requests);
1750 kref_init(&img_request->kref);
1751
1752 rbd_img_request_get(img_request); /* Avoid a warning */
1753 rbd_img_request_put(img_request); /* TEMPORARY */
1754
37206ee5
AE
1755 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1756 write_request ? "write" : "read", offset, length,
1757 img_request);
1758
bf0d5f50
AE
1759 return img_request;
1760}
1761
1762static void rbd_img_request_destroy(struct kref *kref)
1763{
1764 struct rbd_img_request *img_request;
1765 struct rbd_obj_request *obj_request;
1766 struct rbd_obj_request *next_obj_request;
1767
1768 img_request = container_of(kref, struct rbd_img_request, kref);
1769
37206ee5
AE
1770 dout("%s: img %p\n", __func__, img_request);
1771
bf0d5f50
AE
1772 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1773 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1774 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1775
0c425248 1776 if (img_request_write_test(img_request))
812164f8 1777 ceph_put_snap_context(img_request->snapc);
bf0d5f50 1778
8b3e1a56
AE
1779 if (img_request_child_test(img_request))
1780 rbd_obj_request_put(img_request->obj_request);
1781
bf0d5f50
AE
1782 kfree(img_request);
1783}
1784
1217857f
AE
1785static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1786{
6365d33a 1787 struct rbd_img_request *img_request;
1217857f
AE
1788 unsigned int xferred;
1789 int result;
8b3e1a56 1790 bool more;
1217857f 1791
6365d33a
AE
1792 rbd_assert(obj_request_img_data_test(obj_request));
1793 img_request = obj_request->img_request;
1794
1217857f
AE
1795 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1796 xferred = (unsigned int)obj_request->xferred;
1797 result = obj_request->result;
1798 if (result) {
1799 struct rbd_device *rbd_dev = img_request->rbd_dev;
1800
1801 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1802 img_request_write_test(img_request) ? "write" : "read",
1803 obj_request->length, obj_request->img_offset,
1804 obj_request->offset);
1805 rbd_warn(rbd_dev, " result %d xferred %x\n",
1806 result, xferred);
1807 if (!img_request->result)
1808 img_request->result = result;
1809 }
1810
f1a4739f
AE
1811 /* Image object requests don't own their page array */
1812
1813 if (obj_request->type == OBJ_REQUEST_PAGES) {
1814 obj_request->pages = NULL;
1815 obj_request->page_count = 0;
1816 }
1817
8b3e1a56
AE
1818 if (img_request_child_test(img_request)) {
1819 rbd_assert(img_request->obj_request != NULL);
1820 more = obj_request->which < img_request->obj_request_count - 1;
1821 } else {
1822 rbd_assert(img_request->rq != NULL);
1823 more = blk_end_request(img_request->rq, result, xferred);
1824 }
1825
1826 return more;
1217857f
AE
1827}
1828
2169238d
AE
1829static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1830{
1831 struct rbd_img_request *img_request;
1832 u32 which = obj_request->which;
1833 bool more = true;
1834
6365d33a 1835 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
1836 img_request = obj_request->img_request;
1837
1838 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1839 rbd_assert(img_request != NULL);
2169238d
AE
1840 rbd_assert(img_request->obj_request_count > 0);
1841 rbd_assert(which != BAD_WHICH);
1842 rbd_assert(which < img_request->obj_request_count);
1843 rbd_assert(which >= img_request->next_completion);
1844
1845 spin_lock_irq(&img_request->completion_lock);
1846 if (which != img_request->next_completion)
1847 goto out;
1848
1849 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
1850 rbd_assert(more);
1851 rbd_assert(which < img_request->obj_request_count);
1852
1853 if (!obj_request_done_test(obj_request))
1854 break;
1217857f 1855 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
1856 which++;
1857 }
1858
1859 rbd_assert(more ^ (which == img_request->obj_request_count));
1860 img_request->next_completion = which;
1861out:
1862 spin_unlock_irq(&img_request->completion_lock);
1863
1864 if (!more)
1865 rbd_img_request_complete(img_request);
1866}
1867
f1a4739f
AE
1868/*
1869 * Split up an image request into one or more object requests, each
1870 * to a different object. The "type" parameter indicates whether
1871 * "data_desc" is the pointer to the head of a list of bio
1872 * structures, or the base of a page array. In either case this
1873 * function assumes data_desc describes memory sufficient to hold
1874 * all data described by the image request.
1875 */
1876static int rbd_img_request_fill(struct rbd_img_request *img_request,
1877 enum obj_request_type type,
1878 void *data_desc)
bf0d5f50
AE
1879{
1880 struct rbd_device *rbd_dev = img_request->rbd_dev;
1881 struct rbd_obj_request *obj_request = NULL;
1882 struct rbd_obj_request *next_obj_request;
0c425248 1883 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
1884 struct bio *bio_list;
1885 unsigned int bio_offset = 0;
1886 struct page **pages;
7da22d29 1887 u64 img_offset;
bf0d5f50
AE
1888 u64 resid;
1889 u16 opcode;
1890
f1a4739f
AE
1891 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1892 (int)type, data_desc);
37206ee5 1893
430c28c3 1894 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 1895 img_offset = img_request->offset;
bf0d5f50 1896 resid = img_request->length;
4dda41d3 1897 rbd_assert(resid > 0);
f1a4739f
AE
1898
1899 if (type == OBJ_REQUEST_BIO) {
1900 bio_list = data_desc;
1901 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1902 } else {
1903 rbd_assert(type == OBJ_REQUEST_PAGES);
1904 pages = data_desc;
1905 }
1906
bf0d5f50 1907 while (resid) {
2fa12320 1908 struct ceph_osd_request *osd_req;
bf0d5f50 1909 const char *object_name;
bf0d5f50
AE
1910 u64 offset;
1911 u64 length;
1912
7da22d29 1913 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
1914 if (!object_name)
1915 goto out_unwind;
7da22d29
AE
1916 offset = rbd_segment_offset(rbd_dev, img_offset);
1917 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 1918 obj_request = rbd_obj_request_create(object_name,
f1a4739f 1919 offset, length, type);
bf0d5f50
AE
1920 kfree(object_name); /* object request has its own copy */
1921 if (!obj_request)
1922 goto out_unwind;
1923
f1a4739f
AE
1924 if (type == OBJ_REQUEST_BIO) {
1925 unsigned int clone_size;
1926
1927 rbd_assert(length <= (u64)UINT_MAX);
1928 clone_size = (unsigned int)length;
1929 obj_request->bio_list =
1930 bio_chain_clone_range(&bio_list,
1931 &bio_offset,
1932 clone_size,
1933 GFP_ATOMIC);
1934 if (!obj_request->bio_list)
1935 goto out_partial;
1936 } else {
1937 unsigned int page_count;
1938
1939 obj_request->pages = pages;
1940 page_count = (u32)calc_pages_for(offset, length);
1941 obj_request->page_count = page_count;
1942 if ((offset + length) & ~PAGE_MASK)
1943 page_count--; /* more on last page */
1944 pages += page_count;
1945 }
bf0d5f50 1946
2fa12320
AE
1947 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1948 obj_request);
1949 if (!osd_req)
bf0d5f50 1950 goto out_partial;
2fa12320 1951 obj_request->osd_req = osd_req;
2169238d 1952 obj_request->callback = rbd_img_obj_callback;
430c28c3 1953
2fa12320
AE
1954 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1955 0, 0);
f1a4739f
AE
1956 if (type == OBJ_REQUEST_BIO)
1957 osd_req_op_extent_osd_data_bio(osd_req, 0,
1958 obj_request->bio_list, length);
1959 else
1960 osd_req_op_extent_osd_data_pages(osd_req, 0,
1961 obj_request->pages, length,
1962 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
1963
1964 if (write_request)
1965 rbd_osd_req_format_write(obj_request);
1966 else
1967 rbd_osd_req_format_read(obj_request);
430c28c3 1968
7da22d29 1969 obj_request->img_offset = img_offset;
bf0d5f50
AE
1970 rbd_img_obj_request_add(img_request, obj_request);
1971
7da22d29 1972 img_offset += length;
bf0d5f50
AE
1973 resid -= length;
1974 }
1975
1976 return 0;
1977
1978out_partial:
1979 rbd_obj_request_put(obj_request);
1980out_unwind:
1981 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1982 rbd_obj_request_put(obj_request);
1983
1984 return -ENOMEM;
1985}
1986
0eefd470
AE
1987static void
1988rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
1989{
1990 struct rbd_img_request *img_request;
1991 struct rbd_device *rbd_dev;
1992 u64 length;
1993 u32 page_count;
1994
1995 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
1996 rbd_assert(obj_request_img_data_test(obj_request));
1997 img_request = obj_request->img_request;
1998 rbd_assert(img_request);
1999
2000 rbd_dev = img_request->rbd_dev;
2001 rbd_assert(rbd_dev);
2002 length = (u64)1 << rbd_dev->header.obj_order;
2003 page_count = (u32)calc_pages_for(0, length);
2004
2005 rbd_assert(obj_request->copyup_pages);
2006 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2007 obj_request->copyup_pages = NULL;
2008
2009 /*
2010 * We want the transfer count to reflect the size of the
2011 * original write request. There is no such thing as a
2012 * successful short write, so if the request was successful
2013 * we can just set it to the originally-requested length.
2014 */
2015 if (!obj_request->result)
2016 obj_request->xferred = obj_request->length;
2017
2018 /* Finish up with the normal image object callback */
2019
2020 rbd_img_obj_callback(obj_request);
2021}
2022
3d7efd18
AE
2023static void
2024rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2025{
2026 struct rbd_obj_request *orig_request;
0eefd470
AE
2027 struct ceph_osd_request *osd_req;
2028 struct ceph_osd_client *osdc;
2029 struct rbd_device *rbd_dev;
3d7efd18 2030 struct page **pages;
3d7efd18
AE
2031 int result;
2032 u64 obj_size;
2033 u64 xferred;
2034
2035 rbd_assert(img_request_child_test(img_request));
2036
2037 /* First get what we need from the image request */
2038
2039 pages = img_request->copyup_pages;
2040 rbd_assert(pages != NULL);
2041 img_request->copyup_pages = NULL;
2042
2043 orig_request = img_request->obj_request;
2044 rbd_assert(orig_request != NULL);
0eefd470 2045 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
3d7efd18
AE
2046 result = img_request->result;
2047 obj_size = img_request->length;
2048 xferred = img_request->xferred;
2049
0eefd470
AE
2050 rbd_dev = img_request->rbd_dev;
2051 rbd_assert(rbd_dev);
2052 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2053
3d7efd18
AE
2054 rbd_img_request_put(img_request);
2055
0eefd470
AE
2056 if (result)
2057 goto out_err;
2058
2059 /* Allocate the new copyup osd request for the original request */
2060
2061 result = -ENOMEM;
2062 rbd_assert(!orig_request->osd_req);
2063 osd_req = rbd_osd_req_create_copyup(orig_request);
2064 if (!osd_req)
2065 goto out_err;
2066 orig_request->osd_req = osd_req;
2067 orig_request->copyup_pages = pages;
3d7efd18 2068
0eefd470 2069 /* Initialize the copyup op */
3d7efd18 2070
0eefd470
AE
2071 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2072 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2073 false, false);
3d7efd18 2074
0eefd470
AE
2075 /* Then the original write request op */
2076
2077 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2078 orig_request->offset,
2079 orig_request->length, 0, 0);
2080 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2081 orig_request->length);
2082
2083 rbd_osd_req_format_write(orig_request);
2084
2085 /* All set, send it off. */
2086
2087 orig_request->callback = rbd_img_obj_copyup_callback;
2088 osdc = &rbd_dev->rbd_client->client->osdc;
2089 result = rbd_obj_request_submit(osdc, orig_request);
2090 if (!result)
2091 return;
2092out_err:
2093 /* Record the error code and complete the request */
2094
2095 orig_request->result = result;
2096 orig_request->xferred = 0;
2097 obj_request_done_set(orig_request);
2098 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2099}
2100
2101/*
2102 * Read from the parent image the range of data that covers the
2103 * entire target of the given object request. This is used for
2104 * satisfying a layered image write request when the target of an
2105 * object request from the image request does not exist.
2106 *
2107 * A page array big enough to hold the returned data is allocated
2108 * and supplied to rbd_img_request_fill() as the "data descriptor."
2109 * When the read completes, this page array will be transferred to
2110 * the original object request for the copyup operation.
2111 *
2112 * If an error occurs, record it as the result of the original
2113 * object request and mark it done so it gets completed.
2114 */
2115static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2116{
2117 struct rbd_img_request *img_request = NULL;
2118 struct rbd_img_request *parent_request = NULL;
2119 struct rbd_device *rbd_dev;
2120 u64 img_offset;
2121 u64 length;
2122 struct page **pages = NULL;
2123 u32 page_count;
2124 int result;
2125
2126 rbd_assert(obj_request_img_data_test(obj_request));
2127 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2128
2129 img_request = obj_request->img_request;
2130 rbd_assert(img_request != NULL);
2131 rbd_dev = img_request->rbd_dev;
2132 rbd_assert(rbd_dev->parent != NULL);
2133
0eefd470
AE
2134 /*
2135 * First things first. The original osd request is of no
2136 * use to use any more, we'll need a new one that can hold
2137 * the two ops in a copyup request. We'll get that later,
2138 * but for now we can release the old one.
2139 */
2140 rbd_osd_req_destroy(obj_request->osd_req);
2141 obj_request->osd_req = NULL;
2142
3d7efd18
AE
2143 /*
2144 * Determine the byte range covered by the object in the
2145 * child image to which the original request was to be sent.
2146 */
2147 img_offset = obj_request->img_offset - obj_request->offset;
2148 length = (u64)1 << rbd_dev->header.obj_order;
2149
a9e8ba2c
AE
2150 /*
2151 * There is no defined parent data beyond the parent
2152 * overlap, so limit what we read at that boundary if
2153 * necessary.
2154 */
2155 if (img_offset + length > rbd_dev->parent_overlap) {
2156 rbd_assert(img_offset < rbd_dev->parent_overlap);
2157 length = rbd_dev->parent_overlap - img_offset;
2158 }
2159
3d7efd18
AE
2160 /*
2161 * Allocate a page array big enough to receive the data read
2162 * from the parent.
2163 */
2164 page_count = (u32)calc_pages_for(0, length);
2165 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2166 if (IS_ERR(pages)) {
2167 result = PTR_ERR(pages);
2168 pages = NULL;
2169 goto out_err;
2170 }
2171
2172 result = -ENOMEM;
2173 parent_request = rbd_img_request_create(rbd_dev->parent,
2174 img_offset, length,
2175 false, true);
2176 if (!parent_request)
2177 goto out_err;
2178 rbd_obj_request_get(obj_request);
2179 parent_request->obj_request = obj_request;
2180
2181 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2182 if (result)
2183 goto out_err;
2184 parent_request->copyup_pages = pages;
2185
2186 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2187 result = rbd_img_request_submit(parent_request);
2188 if (!result)
2189 return 0;
2190
2191 parent_request->copyup_pages = NULL;
2192 parent_request->obj_request = NULL;
2193 rbd_obj_request_put(obj_request);
2194out_err:
2195 if (pages)
2196 ceph_release_page_vector(pages, page_count);
2197 if (parent_request)
2198 rbd_img_request_put(parent_request);
2199 obj_request->result = result;
2200 obj_request->xferred = 0;
2201 obj_request_done_set(obj_request);
2202
2203 return result;
2204}
2205
c5b5ef6c
AE
2206static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2207{
c5b5ef6c
AE
2208 struct rbd_obj_request *orig_request;
2209 int result;
2210
2211 rbd_assert(!obj_request_img_data_test(obj_request));
2212
2213 /*
2214 * All we need from the object request is the original
2215 * request and the result of the STAT op. Grab those, then
2216 * we're done with the request.
2217 */
2218 orig_request = obj_request->obj_request;
2219 obj_request->obj_request = NULL;
2220 rbd_assert(orig_request);
2221 rbd_assert(orig_request->img_request);
2222
2223 result = obj_request->result;
2224 obj_request->result = 0;
2225
2226 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2227 obj_request, orig_request, result,
2228 obj_request->xferred, obj_request->length);
2229 rbd_obj_request_put(obj_request);
2230
2231 rbd_assert(orig_request);
2232 rbd_assert(orig_request->img_request);
c5b5ef6c
AE
2233
2234 /*
2235 * Our only purpose here is to determine whether the object
2236 * exists, and we don't want to treat the non-existence as
2237 * an error. If something else comes back, transfer the
2238 * error to the original request and complete it now.
2239 */
2240 if (!result) {
2241 obj_request_existence_set(orig_request, true);
2242 } else if (result == -ENOENT) {
2243 obj_request_existence_set(orig_request, false);
2244 } else if (result) {
2245 orig_request->result = result;
3d7efd18 2246 goto out;
c5b5ef6c
AE
2247 }
2248
2249 /*
2250 * Resubmit the original request now that we have recorded
2251 * whether the target object exists.
2252 */
b454e36d 2253 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2254out:
c5b5ef6c
AE
2255 if (orig_request->result)
2256 rbd_obj_request_complete(orig_request);
2257 rbd_obj_request_put(orig_request);
2258}
2259
2260static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2261{
2262 struct rbd_obj_request *stat_request;
2263 struct rbd_device *rbd_dev;
2264 struct ceph_osd_client *osdc;
2265 struct page **pages = NULL;
2266 u32 page_count;
2267 size_t size;
2268 int ret;
2269
2270 /*
2271 * The response data for a STAT call consists of:
2272 * le64 length;
2273 * struct {
2274 * le32 tv_sec;
2275 * le32 tv_nsec;
2276 * } mtime;
2277 */
2278 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2279 page_count = (u32)calc_pages_for(0, size);
2280 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2281 if (IS_ERR(pages))
2282 return PTR_ERR(pages);
2283
2284 ret = -ENOMEM;
2285 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2286 OBJ_REQUEST_PAGES);
2287 if (!stat_request)
2288 goto out;
2289
2290 rbd_obj_request_get(obj_request);
2291 stat_request->obj_request = obj_request;
2292 stat_request->pages = pages;
2293 stat_request->page_count = page_count;
2294
2295 rbd_assert(obj_request->img_request);
2296 rbd_dev = obj_request->img_request->rbd_dev;
2297 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2298 stat_request);
2299 if (!stat_request->osd_req)
2300 goto out;
2301 stat_request->callback = rbd_img_obj_exists_callback;
2302
2303 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2304 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2305 false, false);
9d4df01f 2306 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2307
2308 osdc = &rbd_dev->rbd_client->client->osdc;
2309 ret = rbd_obj_request_submit(osdc, stat_request);
2310out:
2311 if (ret)
2312 rbd_obj_request_put(obj_request);
2313
2314 return ret;
2315}
2316
b454e36d
AE
2317static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2318{
2319 struct rbd_img_request *img_request;
a9e8ba2c 2320 struct rbd_device *rbd_dev;
3d7efd18 2321 bool known;
b454e36d
AE
2322
2323 rbd_assert(obj_request_img_data_test(obj_request));
2324
2325 img_request = obj_request->img_request;
2326 rbd_assert(img_request);
a9e8ba2c 2327 rbd_dev = img_request->rbd_dev;
b454e36d 2328
b454e36d 2329 /*
a9e8ba2c
AE
2330 * Only writes to layered images need special handling.
2331 * Reads and non-layered writes are simple object requests.
2332 * Layered writes that start beyond the end of the overlap
2333 * with the parent have no parent data, so they too are
2334 * simple object requests. Finally, if the target object is
2335 * known to already exist, its parent data has already been
2336 * copied, so a write to the object can also be handled as a
2337 * simple object request.
b454e36d
AE
2338 */
2339 if (!img_request_write_test(img_request) ||
2340 !img_request_layered_test(img_request) ||
a9e8ba2c 2341 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2342 ((known = obj_request_known_test(obj_request)) &&
2343 obj_request_exists_test(obj_request))) {
b454e36d
AE
2344
2345 struct rbd_device *rbd_dev;
2346 struct ceph_osd_client *osdc;
2347
2348 rbd_dev = obj_request->img_request->rbd_dev;
2349 osdc = &rbd_dev->rbd_client->client->osdc;
2350
2351 return rbd_obj_request_submit(osdc, obj_request);
2352 }
2353
2354 /*
3d7efd18
AE
2355 * It's a layered write. The target object might exist but
2356 * we may not know that yet. If we know it doesn't exist,
2357 * start by reading the data for the full target object from
2358 * the parent so we can use it for a copyup to the target.
b454e36d 2359 */
3d7efd18
AE
2360 if (known)
2361 return rbd_img_obj_parent_read_full(obj_request);
2362
2363 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2364
2365 return rbd_img_obj_exists_submit(obj_request);
2366}
2367
bf0d5f50
AE
2368static int rbd_img_request_submit(struct rbd_img_request *img_request)
2369{
bf0d5f50 2370 struct rbd_obj_request *obj_request;
46faeed4 2371 struct rbd_obj_request *next_obj_request;
bf0d5f50 2372
37206ee5 2373 dout("%s: img %p\n", __func__, img_request);
46faeed4 2374 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2375 int ret;
2376
b454e36d 2377 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2378 if (ret)
2379 return ret;
bf0d5f50
AE
2380 }
2381
2382 return 0;
2383}
8b3e1a56
AE
2384
2385static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2386{
2387 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2388 struct rbd_device *rbd_dev;
2389 u64 obj_end;
8b3e1a56
AE
2390
2391 rbd_assert(img_request_child_test(img_request));
2392
2393 obj_request = img_request->obj_request;
a9e8ba2c
AE
2394 rbd_assert(obj_request);
2395 rbd_assert(obj_request->img_request);
2396
8b3e1a56 2397 obj_request->result = img_request->result;
a9e8ba2c
AE
2398 if (obj_request->result)
2399 goto out;
2400
2401 /*
2402 * We need to zero anything beyond the parent overlap
2403 * boundary. Since rbd_img_obj_request_read_callback()
2404 * will zero anything beyond the end of a short read, an
2405 * easy way to do this is to pretend the data from the
2406 * parent came up short--ending at the overlap boundary.
2407 */
2408 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2409 obj_end = obj_request->img_offset + obj_request->length;
2410 rbd_dev = obj_request->img_request->rbd_dev;
2411 if (obj_end > rbd_dev->parent_overlap) {
2412 u64 xferred = 0;
2413
2414 if (obj_request->img_offset < rbd_dev->parent_overlap)
2415 xferred = rbd_dev->parent_overlap -
2416 obj_request->img_offset;
8b3e1a56 2417
a9e8ba2c
AE
2418 obj_request->xferred = min(img_request->xferred, xferred);
2419 } else {
2420 obj_request->xferred = img_request->xferred;
2421 }
2422out:
8b3e1a56
AE
2423 rbd_img_obj_request_read_callback(obj_request);
2424 rbd_obj_request_complete(obj_request);
2425}
2426
2427static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2428{
2429 struct rbd_device *rbd_dev;
2430 struct rbd_img_request *img_request;
2431 int result;
2432
2433 rbd_assert(obj_request_img_data_test(obj_request));
2434 rbd_assert(obj_request->img_request != NULL);
2435 rbd_assert(obj_request->result == (s32) -ENOENT);
2436 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2437
2438 rbd_dev = obj_request->img_request->rbd_dev;
2439 rbd_assert(rbd_dev->parent != NULL);
2440 /* rbd_read_finish(obj_request, obj_request->length); */
2441 img_request = rbd_img_request_create(rbd_dev->parent,
2442 obj_request->img_offset,
2443 obj_request->length,
2444 false, true);
2445 result = -ENOMEM;
2446 if (!img_request)
2447 goto out_err;
2448
2449 rbd_obj_request_get(obj_request);
2450 img_request->obj_request = obj_request;
2451
f1a4739f
AE
2452 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2453 obj_request->bio_list);
8b3e1a56
AE
2454 if (result)
2455 goto out_err;
2456
2457 img_request->callback = rbd_img_parent_read_callback;
2458 result = rbd_img_request_submit(img_request);
2459 if (result)
2460 goto out_err;
2461
2462 return;
2463out_err:
2464 if (img_request)
2465 rbd_img_request_put(img_request);
2466 obj_request->result = result;
2467 obj_request->xferred = 0;
2468 obj_request_done_set(obj_request);
2469}
bf0d5f50 2470
cc4a38bd 2471static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2472{
2473 struct rbd_obj_request *obj_request;
2169238d 2474 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2475 int ret;
2476
2477 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2478 OBJ_REQUEST_NODATA);
2479 if (!obj_request)
2480 return -ENOMEM;
2481
2482 ret = -ENOMEM;
430c28c3 2483 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2484 if (!obj_request->osd_req)
2485 goto out;
2169238d 2486 obj_request->callback = rbd_obj_request_put;
b8d70035 2487
c99d2d4a 2488 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2489 notify_id, 0, 0);
9d4df01f 2490 rbd_osd_req_format_read(obj_request);
430c28c3 2491
b8d70035 2492 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2493out:
cf81b60e
AE
2494 if (ret)
2495 rbd_obj_request_put(obj_request);
b8d70035
AE
2496
2497 return ret;
2498}
2499
2500static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2501{
2502 struct rbd_device *rbd_dev = (struct rbd_device *)data;
b8d70035
AE
2503
2504 if (!rbd_dev)
2505 return;
2506
37206ee5 2507 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2508 rbd_dev->header_name, (unsigned long long)notify_id,
2509 (unsigned int)opcode);
2510 (void)rbd_dev_refresh(rbd_dev);
b8d70035 2511
cc4a38bd 2512 rbd_obj_notify_ack(rbd_dev, notify_id);
b8d70035
AE
2513}
2514
9969ebc5
AE
2515/*
2516 * Request sync osd watch/unwatch. The value of "start" determines
2517 * whether a watch request is being initiated or torn down.
2518 */
2519static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2520{
2521 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2522 struct rbd_obj_request *obj_request;
9969ebc5
AE
2523 int ret;
2524
2525 rbd_assert(start ^ !!rbd_dev->watch_event);
2526 rbd_assert(start ^ !!rbd_dev->watch_request);
2527
2528 if (start) {
3c663bbd 2529 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2530 &rbd_dev->watch_event);
2531 if (ret < 0)
2532 return ret;
8eb87565 2533 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2534 }
2535
2536 ret = -ENOMEM;
2537 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2538 OBJ_REQUEST_NODATA);
2539 if (!obj_request)
2540 goto out_cancel;
2541
430c28c3
AE
2542 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2543 if (!obj_request->osd_req)
2544 goto out_cancel;
2545
8eb87565 2546 if (start)
975241af 2547 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2548 else
6977c3f9 2549 ceph_osdc_unregister_linger_request(osdc,
975241af 2550 rbd_dev->watch_request->osd_req);
2169238d
AE
2551
2552 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
b21ebddd 2553 rbd_dev->watch_event->cookie, 0, start);
9d4df01f 2554 rbd_osd_req_format_write(obj_request);
2169238d 2555
9969ebc5
AE
2556 ret = rbd_obj_request_submit(osdc, obj_request);
2557 if (ret)
2558 goto out_cancel;
2559 ret = rbd_obj_request_wait(obj_request);
2560 if (ret)
2561 goto out_cancel;
9969ebc5
AE
2562 ret = obj_request->result;
2563 if (ret)
2564 goto out_cancel;
2565
8eb87565
AE
2566 /*
2567 * A watch request is set to linger, so the underlying osd
2568 * request won't go away until we unregister it. We retain
2569 * a pointer to the object request during that time (in
2570 * rbd_dev->watch_request), so we'll keep a reference to
2571 * it. We'll drop that reference (below) after we've
2572 * unregistered it.
2573 */
2574 if (start) {
2575 rbd_dev->watch_request = obj_request;
2576
2577 return 0;
2578 }
2579
2580 /* We have successfully torn down the watch request */
2581
2582 rbd_obj_request_put(rbd_dev->watch_request);
2583 rbd_dev->watch_request = NULL;
9969ebc5
AE
2584out_cancel:
2585 /* Cancel the event if we're tearing down, or on error */
2586 ceph_osdc_cancel_event(rbd_dev->watch_event);
2587 rbd_dev->watch_event = NULL;
9969ebc5
AE
2588 if (obj_request)
2589 rbd_obj_request_put(obj_request);
2590
2591 return ret;
2592}
2593
36be9a76 2594/*
f40eb349
AE
2595 * Synchronous osd object method call. Returns the number of bytes
2596 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2597 */
2598static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2599 const char *object_name,
2600 const char *class_name,
2601 const char *method_name,
4157976b 2602 const void *outbound,
36be9a76 2603 size_t outbound_size,
4157976b 2604 void *inbound,
36be9a76
AE
2605 size_t inbound_size,
2606 u64 *version)
2607{
2169238d 2608 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2609 struct rbd_obj_request *obj_request;
36be9a76
AE
2610 struct page **pages;
2611 u32 page_count;
2612 int ret;
2613
2614 /*
6010a451
AE
2615 * Method calls are ultimately read operations. The result
2616 * should placed into the inbound buffer provided. They
2617 * also supply outbound data--parameters for the object
2618 * method. Currently if this is present it will be a
2619 * snapshot id.
36be9a76 2620 */
57385b51 2621 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2622 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2623 if (IS_ERR(pages))
2624 return PTR_ERR(pages);
2625
2626 ret = -ENOMEM;
6010a451 2627 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2628 OBJ_REQUEST_PAGES);
2629 if (!obj_request)
2630 goto out;
2631
2632 obj_request->pages = pages;
2633 obj_request->page_count = page_count;
2634
430c28c3 2635 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2636 if (!obj_request->osd_req)
2637 goto out;
2638
c99d2d4a 2639 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2640 class_name, method_name);
2641 if (outbound_size) {
2642 struct ceph_pagelist *pagelist;
2643
2644 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2645 if (!pagelist)
2646 goto out;
2647
2648 ceph_pagelist_init(pagelist);
2649 ceph_pagelist_append(pagelist, outbound, outbound_size);
2650 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2651 pagelist);
2652 }
a4ce40a9
AE
2653 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2654 obj_request->pages, inbound_size,
44cd188d 2655 0, false, false);
9d4df01f 2656 rbd_osd_req_format_read(obj_request);
430c28c3 2657
36be9a76
AE
2658 ret = rbd_obj_request_submit(osdc, obj_request);
2659 if (ret)
2660 goto out;
2661 ret = rbd_obj_request_wait(obj_request);
2662 if (ret)
2663 goto out;
2664
2665 ret = obj_request->result;
2666 if (ret < 0)
2667 goto out;
57385b51
AE
2668
2669 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2670 ret = (int)obj_request->xferred;
903bb32e 2671 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
2672 if (version)
2673 *version = obj_request->version;
2674out:
2675 if (obj_request)
2676 rbd_obj_request_put(obj_request);
2677 else
2678 ceph_release_page_vector(pages, page_count);
2679
2680 return ret;
2681}
2682
bf0d5f50 2683static void rbd_request_fn(struct request_queue *q)
cc344fa1 2684 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
2685{
2686 struct rbd_device *rbd_dev = q->queuedata;
2687 bool read_only = rbd_dev->mapping.read_only;
2688 struct request *rq;
2689 int result;
2690
2691 while ((rq = blk_fetch_request(q))) {
2692 bool write_request = rq_data_dir(rq) == WRITE;
2693 struct rbd_img_request *img_request;
2694 u64 offset;
2695 u64 length;
2696
2697 /* Ignore any non-FS requests that filter through. */
2698
2699 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
2700 dout("%s: non-fs request type %d\n", __func__,
2701 (int) rq->cmd_type);
2702 __blk_end_request_all(rq, 0);
2703 continue;
2704 }
2705
2706 /* Ignore/skip any zero-length requests */
2707
2708 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2709 length = (u64) blk_rq_bytes(rq);
2710
2711 if (!length) {
2712 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
2713 __blk_end_request_all(rq, 0);
2714 continue;
2715 }
2716
2717 spin_unlock_irq(q->queue_lock);
2718
2719 /* Disallow writes to a read-only device */
2720
2721 if (write_request) {
2722 result = -EROFS;
2723 if (read_only)
2724 goto end_request;
2725 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2726 }
2727
6d292906
AE
2728 /*
2729 * Quit early if the mapped snapshot no longer
2730 * exists. It's still possible the snapshot will
2731 * have disappeared by the time our request arrives
2732 * at the osd, but there's no sense in sending it if
2733 * we already know.
2734 */
2735 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
2736 dout("request for non-existent snapshot");
2737 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2738 result = -ENXIO;
2739 goto end_request;
2740 }
2741
bf0d5f50 2742 result = -EINVAL;
c0cd10db
AE
2743 if (offset && length > U64_MAX - offset + 1) {
2744 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2745 offset, length);
bf0d5f50 2746 goto end_request; /* Shouldn't happen */
c0cd10db 2747 }
bf0d5f50
AE
2748
2749 result = -ENOMEM;
2750 img_request = rbd_img_request_create(rbd_dev, offset, length,
9849e986 2751 write_request, false);
bf0d5f50
AE
2752 if (!img_request)
2753 goto end_request;
2754
2755 img_request->rq = rq;
2756
f1a4739f
AE
2757 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2758 rq->bio);
bf0d5f50
AE
2759 if (!result)
2760 result = rbd_img_request_submit(img_request);
2761 if (result)
2762 rbd_img_request_put(img_request);
2763end_request:
2764 spin_lock_irq(q->queue_lock);
2765 if (result < 0) {
7da22d29
AE
2766 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2767 write_request ? "write" : "read",
2768 length, offset, result);
2769
bf0d5f50
AE
2770 __blk_end_request_all(rq, result);
2771 }
2772 }
2773}
2774
602adf40
YS
2775/*
2776 * a queue callback. Makes sure that we don't create a bio that spans across
2777 * multiple osd objects. One exception would be with a single page bios,
f7760dad 2778 * which we handle later at bio_chain_clone_range()
602adf40
YS
2779 */
2780static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2781 struct bio_vec *bvec)
2782{
2783 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
2784 sector_t sector_offset;
2785 sector_t sectors_per_obj;
2786 sector_t obj_sector_offset;
2787 int ret;
2788
2789 /*
2790 * Find how far into its rbd object the partition-relative
2791 * bio start sector is to offset relative to the enclosing
2792 * device.
2793 */
2794 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2795 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2796 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2797
2798 /*
2799 * Compute the number of bytes from that offset to the end
2800 * of the object. Account for what's already used by the bio.
2801 */
2802 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2803 if (ret > bmd->bi_size)
2804 ret -= bmd->bi_size;
2805 else
2806 ret = 0;
2807
2808 /*
2809 * Don't send back more than was asked for. And if the bio
2810 * was empty, let the whole thing through because: "Note
2811 * that a block device *must* allow a single page to be
2812 * added to an empty bio."
2813 */
2814 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2815 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2816 ret = (int) bvec->bv_len;
2817
2818 return ret;
602adf40
YS
2819}
2820
2821static void rbd_free_disk(struct rbd_device *rbd_dev)
2822{
2823 struct gendisk *disk = rbd_dev->disk;
2824
2825 if (!disk)
2826 return;
2827
a0cab924
AE
2828 rbd_dev->disk = NULL;
2829 if (disk->flags & GENHD_FL_UP) {
602adf40 2830 del_gendisk(disk);
a0cab924
AE
2831 if (disk->queue)
2832 blk_cleanup_queue(disk->queue);
2833 }
602adf40
YS
2834 put_disk(disk);
2835}
2836
788e2df3
AE
2837static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2838 const char *object_name,
7097f8df 2839 u64 offset, u64 length, void *buf)
788e2df3
AE
2840
2841{
2169238d 2842 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 2843 struct rbd_obj_request *obj_request;
788e2df3
AE
2844 struct page **pages = NULL;
2845 u32 page_count;
1ceae7ef 2846 size_t size;
788e2df3
AE
2847 int ret;
2848
2849 page_count = (u32) calc_pages_for(offset, length);
2850 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2851 if (IS_ERR(pages))
2852 ret = PTR_ERR(pages);
2853
2854 ret = -ENOMEM;
2855 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2856 OBJ_REQUEST_PAGES);
788e2df3
AE
2857 if (!obj_request)
2858 goto out;
2859
2860 obj_request->pages = pages;
2861 obj_request->page_count = page_count;
2862
430c28c3 2863 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
2864 if (!obj_request->osd_req)
2865 goto out;
2866
c99d2d4a
AE
2867 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2868 offset, length, 0, 0);
406e2c9f 2869 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 2870 obj_request->pages,
44cd188d
AE
2871 obj_request->length,
2872 obj_request->offset & ~PAGE_MASK,
2873 false, false);
9d4df01f 2874 rbd_osd_req_format_read(obj_request);
430c28c3 2875
788e2df3
AE
2876 ret = rbd_obj_request_submit(osdc, obj_request);
2877 if (ret)
2878 goto out;
2879 ret = rbd_obj_request_wait(obj_request);
2880 if (ret)
2881 goto out;
2882
2883 ret = obj_request->result;
2884 if (ret < 0)
2885 goto out;
1ceae7ef
AE
2886
2887 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2888 size = (size_t) obj_request->xferred;
903bb32e 2889 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
2890 rbd_assert(size <= (size_t)INT_MAX);
2891 ret = (int)size;
788e2df3
AE
2892out:
2893 if (obj_request)
2894 rbd_obj_request_put(obj_request);
2895 else
2896 ceph_release_page_vector(pages, page_count);
2897
2898 return ret;
2899}
2900
602adf40 2901/*
4156d998
AE
2902 * Read the complete header for the given rbd device.
2903 *
2904 * Returns a pointer to a dynamically-allocated buffer containing
2905 * the complete and validated header. Caller can pass the address
2906 * of a variable that will be filled in with the version of the
2907 * header object at the time it was read.
2908 *
2909 * Returns a pointer-coded errno if a failure occurs.
602adf40 2910 */
4156d998 2911static struct rbd_image_header_ondisk *
7097f8df 2912rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
602adf40 2913{
4156d998 2914 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 2915 u32 snap_count = 0;
4156d998
AE
2916 u64 names_size = 0;
2917 u32 want_count;
2918 int ret;
602adf40 2919
00f1f36f 2920 /*
4156d998
AE
2921 * The complete header will include an array of its 64-bit
2922 * snapshot ids, followed by the names of those snapshots as
2923 * a contiguous block of NUL-terminated strings. Note that
2924 * the number of snapshots could change by the time we read
2925 * it in, in which case we re-read it.
00f1f36f 2926 */
4156d998
AE
2927 do {
2928 size_t size;
2929
2930 kfree(ondisk);
2931
2932 size = sizeof (*ondisk);
2933 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2934 size += names_size;
2935 ondisk = kmalloc(size, GFP_KERNEL);
2936 if (!ondisk)
2937 return ERR_PTR(-ENOMEM);
2938
788e2df3 2939 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 2940 0, size, ondisk);
4156d998
AE
2941 if (ret < 0)
2942 goto out_err;
c0cd10db 2943 if ((size_t)ret < size) {
4156d998 2944 ret = -ENXIO;
06ecc6cb
AE
2945 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2946 size, ret);
4156d998
AE
2947 goto out_err;
2948 }
2949 if (!rbd_dev_ondisk_valid(ondisk)) {
2950 ret = -ENXIO;
06ecc6cb 2951 rbd_warn(rbd_dev, "invalid header");
4156d998 2952 goto out_err;
81e759fb 2953 }
602adf40 2954
4156d998
AE
2955 names_size = le64_to_cpu(ondisk->snap_names_len);
2956 want_count = snap_count;
2957 snap_count = le32_to_cpu(ondisk->snap_count);
2958 } while (snap_count != want_count);
00f1f36f 2959
4156d998 2960 return ondisk;
00f1f36f 2961
4156d998
AE
2962out_err:
2963 kfree(ondisk);
2964
2965 return ERR_PTR(ret);
2966}
2967
2968/*
2969 * reload the ondisk the header
2970 */
2971static int rbd_read_header(struct rbd_device *rbd_dev,
2972 struct rbd_image_header *header)
2973{
2974 struct rbd_image_header_ondisk *ondisk;
4156d998 2975 int ret;
602adf40 2976
7097f8df 2977 ondisk = rbd_dev_v1_header_read(rbd_dev);
4156d998
AE
2978 if (IS_ERR(ondisk))
2979 return PTR_ERR(ondisk);
2980 ret = rbd_header_from_disk(header, ondisk);
4156d998
AE
2981 kfree(ondisk);
2982
2983 return ret;
602adf40
YS
2984}
2985
41f38c2b 2986static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
dfc5606d
YS
2987{
2988 struct rbd_snap *snap;
a0593290 2989 struct rbd_snap *next;
dfc5606d 2990
6087b51b
AE
2991 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node) {
2992 list_del(&snap->node);
2993 rbd_snap_destroy(snap);
2994 }
dfc5606d
YS
2995}
2996
9478554a
AE
2997static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
2998{
0d7dbfce 2999 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
9478554a
AE
3000 return;
3001
e28626a0
AE
3002 if (rbd_dev->mapping.size != rbd_dev->header.image_size) {
3003 sector_t size;
3004
3005 rbd_dev->mapping.size = rbd_dev->header.image_size;
3006 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3007 dout("setting size to %llu sectors", (unsigned long long)size);
3008 set_capacity(rbd_dev->disk, size);
3009 }
9478554a
AE
3010}
3011
602adf40
YS
3012/*
3013 * only read the first part of the ondisk header, without the snaps info
3014 */
cc4a38bd 3015static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
602adf40
YS
3016{
3017 int ret;
3018 struct rbd_image_header h;
602adf40
YS
3019
3020 ret = rbd_read_header(rbd_dev, &h);
3021 if (ret < 0)
3022 return ret;
3023
a51aa0c0
JD
3024 down_write(&rbd_dev->header_rwsem);
3025
9478554a
AE
3026 /* Update image size, and check for resize of mapped image */
3027 rbd_dev->header.image_size = h.image_size;
3028 rbd_update_mapping_size(rbd_dev);
9db4b3e3 3029
849b4260 3030 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 3031 kfree(rbd_dev->header.snap_sizes);
849b4260 3032 kfree(rbd_dev->header.snap_names);
d1d25646 3033 /* osd requests may still refer to snapc */
812164f8 3034 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 3035
93a24e08 3036 rbd_dev->header.image_size = h.image_size;
602adf40
YS
3037 rbd_dev->header.snapc = h.snapc;
3038 rbd_dev->header.snap_names = h.snap_names;
3039 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260 3040 /* Free the extra copy of the object prefix */
c0cd10db
AE
3041 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3042 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
849b4260
AE
3043 kfree(h.object_prefix);
3044
304f6808 3045 ret = rbd_dev_snaps_update(rbd_dev);
dfc5606d 3046
c666601a 3047 up_write(&rbd_dev->header_rwsem);
602adf40 3048
dfc5606d 3049 return ret;
602adf40
YS
3050}
3051
cc4a38bd 3052static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3053{
a3fbe5d4 3054 u64 image_size;
1fe5e993
AE
3055 int ret;
3056
117973fb 3057 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
a3fbe5d4 3058 image_size = rbd_dev->header.image_size;
1fe5e993 3059 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb 3060 if (rbd_dev->image_format == 1)
cc4a38bd 3061 ret = rbd_dev_v1_refresh(rbd_dev);
117973fb 3062 else
cc4a38bd 3063 ret = rbd_dev_v2_refresh(rbd_dev);
1fe5e993 3064 mutex_unlock(&ctl_mutex);
522a0cc0
AE
3065 if (ret)
3066 rbd_warn(rbd_dev, "got notification but failed to "
3067 " update snaps: %d\n", ret);
a3fbe5d4
AE
3068 if (image_size != rbd_dev->header.image_size)
3069 revalidate_disk(rbd_dev->disk);
1fe5e993
AE
3070
3071 return ret;
3072}
3073
602adf40
YS
3074static int rbd_init_disk(struct rbd_device *rbd_dev)
3075{
3076 struct gendisk *disk;
3077 struct request_queue *q;
593a9e7b 3078 u64 segment_size;
602adf40 3079
602adf40 3080 /* create gendisk info */
602adf40
YS
3081 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3082 if (!disk)
1fcdb8aa 3083 return -ENOMEM;
602adf40 3084
f0f8cef5 3085 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3086 rbd_dev->dev_id);
602adf40
YS
3087 disk->major = rbd_dev->major;
3088 disk->first_minor = 0;
3089 disk->fops = &rbd_bd_ops;
3090 disk->private_data = rbd_dev;
3091
bf0d5f50 3092 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3093 if (!q)
3094 goto out_disk;
029bcbd8 3095
593a9e7b
AE
3096 /* We use the default size, but let's be explicit about it. */
3097 blk_queue_physical_block_size(q, SECTOR_SIZE);
3098
029bcbd8 3099 /* set io sizes to object size */
593a9e7b
AE
3100 segment_size = rbd_obj_bytes(&rbd_dev->header);
3101 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3102 blk_queue_max_segment_size(q, segment_size);
3103 blk_queue_io_min(q, segment_size);
3104 blk_queue_io_opt(q, segment_size);
029bcbd8 3105
602adf40
YS
3106 blk_queue_merge_bvec(q, rbd_merge_bvec);
3107 disk->queue = q;
3108
3109 q->queuedata = rbd_dev;
3110
3111 rbd_dev->disk = disk;
602adf40 3112
602adf40 3113 return 0;
602adf40
YS
3114out_disk:
3115 put_disk(disk);
1fcdb8aa
AE
3116
3117 return -ENOMEM;
602adf40
YS
3118}
3119
dfc5606d
YS
3120/*
3121 sysfs
3122*/
3123
593a9e7b
AE
3124static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3125{
3126 return container_of(dev, struct rbd_device, dev);
3127}
3128
dfc5606d
YS
3129static ssize_t rbd_size_show(struct device *dev,
3130 struct device_attribute *attr, char *buf)
3131{
593a9e7b 3132 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3133
fc71d833
AE
3134 return sprintf(buf, "%llu\n",
3135 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3136}
3137
34b13184
AE
3138/*
3139 * Note this shows the features for whatever's mapped, which is not
3140 * necessarily the base image.
3141 */
3142static ssize_t rbd_features_show(struct device *dev,
3143 struct device_attribute *attr, char *buf)
3144{
3145 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3146
3147 return sprintf(buf, "0x%016llx\n",
fc71d833 3148 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3149}
3150
dfc5606d
YS
3151static ssize_t rbd_major_show(struct device *dev,
3152 struct device_attribute *attr, char *buf)
3153{
593a9e7b 3154 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3155
fc71d833
AE
3156 if (rbd_dev->major)
3157 return sprintf(buf, "%d\n", rbd_dev->major);
3158
3159 return sprintf(buf, "(none)\n");
3160
dfc5606d
YS
3161}
3162
3163static ssize_t rbd_client_id_show(struct device *dev,
3164 struct device_attribute *attr, char *buf)
602adf40 3165{
593a9e7b 3166 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3167
1dbb4399
AE
3168 return sprintf(buf, "client%lld\n",
3169 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3170}
3171
dfc5606d
YS
3172static ssize_t rbd_pool_show(struct device *dev,
3173 struct device_attribute *attr, char *buf)
602adf40 3174{
593a9e7b 3175 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3176
0d7dbfce 3177 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3178}
3179
9bb2f334
AE
3180static ssize_t rbd_pool_id_show(struct device *dev,
3181 struct device_attribute *attr, char *buf)
3182{
3183 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3184
0d7dbfce 3185 return sprintf(buf, "%llu\n",
fc71d833 3186 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3187}
3188
dfc5606d
YS
3189static ssize_t rbd_name_show(struct device *dev,
3190 struct device_attribute *attr, char *buf)
3191{
593a9e7b 3192 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3193
a92ffdf8
AE
3194 if (rbd_dev->spec->image_name)
3195 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3196
3197 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3198}
3199
589d30e0
AE
3200static ssize_t rbd_image_id_show(struct device *dev,
3201 struct device_attribute *attr, char *buf)
3202{
3203 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3204
0d7dbfce 3205 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3206}
3207
34b13184
AE
3208/*
3209 * Shows the name of the currently-mapped snapshot (or
3210 * RBD_SNAP_HEAD_NAME for the base image).
3211 */
dfc5606d
YS
3212static ssize_t rbd_snap_show(struct device *dev,
3213 struct device_attribute *attr,
3214 char *buf)
3215{
593a9e7b 3216 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3217
0d7dbfce 3218 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3219}
3220
86b00e0d
AE
3221/*
3222 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3223 * for the parent image. If there is no parent, simply shows
3224 * "(no parent image)".
3225 */
3226static ssize_t rbd_parent_show(struct device *dev,
3227 struct device_attribute *attr,
3228 char *buf)
3229{
3230 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3231 struct rbd_spec *spec = rbd_dev->parent_spec;
3232 int count;
3233 char *bufp = buf;
3234
3235 if (!spec)
3236 return sprintf(buf, "(no parent image)\n");
3237
3238 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3239 (unsigned long long) spec->pool_id, spec->pool_name);
3240 if (count < 0)
3241 return count;
3242 bufp += count;
3243
3244 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3245 spec->image_name ? spec->image_name : "(unknown)");
3246 if (count < 0)
3247 return count;
3248 bufp += count;
3249
3250 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3251 (unsigned long long) spec->snap_id, spec->snap_name);
3252 if (count < 0)
3253 return count;
3254 bufp += count;
3255
3256 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3257 if (count < 0)
3258 return count;
3259 bufp += count;
3260
3261 return (ssize_t) (bufp - buf);
3262}
3263
dfc5606d
YS
3264static ssize_t rbd_image_refresh(struct device *dev,
3265 struct device_attribute *attr,
3266 const char *buf,
3267 size_t size)
3268{
593a9e7b 3269 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3270 int ret;
602adf40 3271
cc4a38bd 3272 ret = rbd_dev_refresh(rbd_dev);
b813623a
AE
3273
3274 return ret < 0 ? ret : size;
dfc5606d 3275}
602adf40 3276
dfc5606d 3277static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3278static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3279static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3280static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3281static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3282static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3283static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3284static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3285static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3286static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3287static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3288
3289static struct attribute *rbd_attrs[] = {
3290 &dev_attr_size.attr,
34b13184 3291 &dev_attr_features.attr,
dfc5606d
YS
3292 &dev_attr_major.attr,
3293 &dev_attr_client_id.attr,
3294 &dev_attr_pool.attr,
9bb2f334 3295 &dev_attr_pool_id.attr,
dfc5606d 3296 &dev_attr_name.attr,
589d30e0 3297 &dev_attr_image_id.attr,
dfc5606d 3298 &dev_attr_current_snap.attr,
86b00e0d 3299 &dev_attr_parent.attr,
dfc5606d 3300 &dev_attr_refresh.attr,
dfc5606d
YS
3301 NULL
3302};
3303
3304static struct attribute_group rbd_attr_group = {
3305 .attrs = rbd_attrs,
3306};
3307
3308static const struct attribute_group *rbd_attr_groups[] = {
3309 &rbd_attr_group,
3310 NULL
3311};
3312
3313static void rbd_sysfs_dev_release(struct device *dev)
3314{
3315}
3316
3317static struct device_type rbd_device_type = {
3318 .name = "rbd",
3319 .groups = rbd_attr_groups,
3320 .release = rbd_sysfs_dev_release,
3321};
3322
8b8fb99c
AE
3323static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3324{
3325 kref_get(&spec->kref);
3326
3327 return spec;
3328}
3329
3330static void rbd_spec_free(struct kref *kref);
3331static void rbd_spec_put(struct rbd_spec *spec)
3332{
3333 if (spec)
3334 kref_put(&spec->kref, rbd_spec_free);
3335}
3336
3337static struct rbd_spec *rbd_spec_alloc(void)
3338{
3339 struct rbd_spec *spec;
3340
3341 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3342 if (!spec)
3343 return NULL;
3344 kref_init(&spec->kref);
3345
8b8fb99c
AE
3346 return spec;
3347}
3348
3349static void rbd_spec_free(struct kref *kref)
3350{
3351 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3352
3353 kfree(spec->pool_name);
3354 kfree(spec->image_id);
3355 kfree(spec->image_name);
3356 kfree(spec->snap_name);
3357 kfree(spec);
3358}
3359
cc344fa1 3360static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3361 struct rbd_spec *spec)
3362{
3363 struct rbd_device *rbd_dev;
3364
3365 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3366 if (!rbd_dev)
3367 return NULL;
3368
3369 spin_lock_init(&rbd_dev->lock);
6d292906 3370 rbd_dev->flags = 0;
c53d5893
AE
3371 INIT_LIST_HEAD(&rbd_dev->node);
3372 INIT_LIST_HEAD(&rbd_dev->snaps);
3373 init_rwsem(&rbd_dev->header_rwsem);
3374
3375 rbd_dev->spec = spec;
3376 rbd_dev->rbd_client = rbdc;
3377
0903e875
AE
3378 /* Initialize the layout used for all rbd requests */
3379
3380 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3381 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3382 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3383 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3384
c53d5893
AE
3385 return rbd_dev;
3386}
3387
3388static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3389{
c53d5893
AE
3390 rbd_put_client(rbd_dev->rbd_client);
3391 rbd_spec_put(rbd_dev->spec);
3392 kfree(rbd_dev);
3393}
3394
6087b51b 3395static void rbd_snap_destroy(struct rbd_snap *snap)
dfc5606d 3396{
3e83b65b
AE
3397 kfree(snap->name);
3398 kfree(snap);
dfc5606d
YS
3399}
3400
6087b51b 3401static struct rbd_snap *rbd_snap_create(struct rbd_device *rbd_dev,
c8d18425 3402 const char *snap_name,
34b13184
AE
3403 u64 snap_id, u64 snap_size,
3404 u64 snap_features)
dfc5606d 3405{
4e891e0a 3406 struct rbd_snap *snap;
4e891e0a
AE
3407
3408 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
dfc5606d 3409 if (!snap)
4e891e0a
AE
3410 return ERR_PTR(-ENOMEM);
3411
6e584f52 3412 snap->name = snap_name;
c8d18425
AE
3413 snap->id = snap_id;
3414 snap->size = snap_size;
34b13184 3415 snap->features = snap_features;
4e891e0a
AE
3416
3417 return snap;
dfc5606d
YS
3418}
3419
6e584f52
AE
3420/*
3421 * Returns a dynamically-allocated snapshot name if successful, or a
3422 * pointer-coded error otherwise.
3423 */
cb75223d 3424static const char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
cd892126
AE
3425 u64 *snap_size, u64 *snap_features)
3426{
cb75223d 3427 const char *snap_name;
6e584f52 3428 int i;
cd892126
AE
3429
3430 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3431
cd892126
AE
3432 /* Skip over names until we find the one we are looking for */
3433
3434 snap_name = rbd_dev->header.snap_names;
6e584f52 3435 for (i = 0; i < which; i++)
cd892126
AE
3436 snap_name += strlen(snap_name) + 1;
3437
6e584f52
AE
3438 snap_name = kstrdup(snap_name, GFP_KERNEL);
3439 if (!snap_name)
3440 return ERR_PTR(-ENOMEM);
3441
3442 *snap_size = rbd_dev->header.snap_sizes[which];
3443 *snap_features = 0; /* No features for v1 */
3444
cd892126
AE
3445 return snap_name;
3446}
3447
9d475de5
AE
3448/*
3449 * Get the size and object order for an image snapshot, or if
3450 * snap_id is CEPH_NOSNAP, gets this information for the base
3451 * image.
3452 */
3453static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3454 u8 *order, u64 *snap_size)
3455{
3456 __le64 snapid = cpu_to_le64(snap_id);
3457 int ret;
3458 struct {
3459 u8 order;
3460 __le64 size;
3461 } __attribute__ ((packed)) size_buf = { 0 };
3462
36be9a76 3463 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3464 "rbd", "get_size",
4157976b
AE
3465 &snapid, sizeof (snapid),
3466 &size_buf, sizeof (size_buf), NULL);
36be9a76 3467 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3468 if (ret < 0)
3469 return ret;
57385b51
AE
3470 if (ret < sizeof (size_buf))
3471 return -ERANGE;
9d475de5 3472
c86f86e9
AE
3473 if (order)
3474 *order = size_buf.order;
9d475de5
AE
3475 *snap_size = le64_to_cpu(size_buf.size);
3476
3477 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
57385b51
AE
3478 (unsigned long long)snap_id, (unsigned int)*order,
3479 (unsigned long long)*snap_size);
9d475de5
AE
3480
3481 return 0;
3482}
3483
3484static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3485{
3486 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3487 &rbd_dev->header.obj_order,
3488 &rbd_dev->header.image_size);
3489}
3490
1e130199
AE
3491static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3492{
3493 void *reply_buf;
3494 int ret;
3495 void *p;
3496
3497 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3498 if (!reply_buf)
3499 return -ENOMEM;
3500
36be9a76 3501 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3502 "rbd", "get_object_prefix", NULL, 0,
07b2391f 3503 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
36be9a76 3504 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3505 if (ret < 0)
3506 goto out;
3507
3508 p = reply_buf;
3509 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3510 p + ret, NULL, GFP_NOIO);
3511 ret = 0;
1e130199
AE
3512
3513 if (IS_ERR(rbd_dev->header.object_prefix)) {
3514 ret = PTR_ERR(rbd_dev->header.object_prefix);
3515 rbd_dev->header.object_prefix = NULL;
3516 } else {
3517 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3518 }
1e130199
AE
3519out:
3520 kfree(reply_buf);
3521
3522 return ret;
3523}
3524
b1b5402a
AE
3525static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3526 u64 *snap_features)
3527{
3528 __le64 snapid = cpu_to_le64(snap_id);
3529 struct {
3530 __le64 features;
3531 __le64 incompat;
4157976b 3532 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3533 u64 incompat;
b1b5402a
AE
3534 int ret;
3535
36be9a76 3536 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3537 "rbd", "get_features",
4157976b
AE
3538 &snapid, sizeof (snapid),
3539 &features_buf, sizeof (features_buf), NULL);
36be9a76 3540 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3541 if (ret < 0)
3542 return ret;
57385b51
AE
3543 if (ret < sizeof (features_buf))
3544 return -ERANGE;
d889140c
AE
3545
3546 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3547 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3548 return -ENXIO;
d889140c 3549
b1b5402a
AE
3550 *snap_features = le64_to_cpu(features_buf.features);
3551
3552 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3553 (unsigned long long)snap_id,
3554 (unsigned long long)*snap_features,
3555 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3556
3557 return 0;
3558}
3559
3560static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3561{
3562 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3563 &rbd_dev->header.features);
3564}
3565
86b00e0d
AE
3566static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3567{
3568 struct rbd_spec *parent_spec;
3569 size_t size;
3570 void *reply_buf = NULL;
3571 __le64 snapid;
3572 void *p;
3573 void *end;
3574 char *image_id;
3575 u64 overlap;
86b00e0d
AE
3576 int ret;
3577
3578 parent_spec = rbd_spec_alloc();
3579 if (!parent_spec)
3580 return -ENOMEM;
3581
3582 size = sizeof (__le64) + /* pool_id */
3583 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3584 sizeof (__le64) + /* snap_id */
3585 sizeof (__le64); /* overlap */
3586 reply_buf = kmalloc(size, GFP_KERNEL);
3587 if (!reply_buf) {
3588 ret = -ENOMEM;
3589 goto out_err;
3590 }
3591
3592 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3593 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3594 "rbd", "get_parent",
4157976b
AE
3595 &snapid, sizeof (snapid),
3596 reply_buf, size, NULL);
36be9a76 3597 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3598 if (ret < 0)
3599 goto out_err;
3600
86b00e0d 3601 p = reply_buf;
57385b51
AE
3602 end = reply_buf + ret;
3603 ret = -ERANGE;
86b00e0d
AE
3604 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3605 if (parent_spec->pool_id == CEPH_NOPOOL)
3606 goto out; /* No parent? No problem. */
3607
0903e875
AE
3608 /* The ceph file layout needs to fit pool id in 32 bits */
3609
3610 ret = -EIO;
c0cd10db
AE
3611 if (parent_spec->pool_id > (u64)U32_MAX) {
3612 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3613 (unsigned long long)parent_spec->pool_id, U32_MAX);
57385b51 3614 goto out_err;
c0cd10db 3615 }
0903e875 3616
979ed480 3617 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3618 if (IS_ERR(image_id)) {
3619 ret = PTR_ERR(image_id);
3620 goto out_err;
3621 }
3622 parent_spec->image_id = image_id;
3623 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3624 ceph_decode_64_safe(&p, end, overlap, out_err);
3625
3626 rbd_dev->parent_overlap = overlap;
3627 rbd_dev->parent_spec = parent_spec;
3628 parent_spec = NULL; /* rbd_dev now owns this */
3629out:
3630 ret = 0;
3631out_err:
3632 kfree(reply_buf);
3633 rbd_spec_put(parent_spec);
3634
3635 return ret;
3636}
3637
cc070d59
AE
3638static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3639{
3640 struct {
3641 __le64 stripe_unit;
3642 __le64 stripe_count;
3643 } __attribute__ ((packed)) striping_info_buf = { 0 };
3644 size_t size = sizeof (striping_info_buf);
3645 void *p;
3646 u64 obj_size;
3647 u64 stripe_unit;
3648 u64 stripe_count;
3649 int ret;
3650
3651 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3652 "rbd", "get_stripe_unit_count", NULL, 0,
3653 (char *)&striping_info_buf, size, NULL);
3654 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3655 if (ret < 0)
3656 return ret;
3657 if (ret < size)
3658 return -ERANGE;
3659
3660 /*
3661 * We don't actually support the "fancy striping" feature
3662 * (STRIPINGV2) yet, but if the striping sizes are the
3663 * defaults the behavior is the same as before. So find
3664 * out, and only fail if the image has non-default values.
3665 */
3666 ret = -EINVAL;
3667 obj_size = (u64)1 << rbd_dev->header.obj_order;
3668 p = &striping_info_buf;
3669 stripe_unit = ceph_decode_64(&p);
3670 if (stripe_unit != obj_size) {
3671 rbd_warn(rbd_dev, "unsupported stripe unit "
3672 "(got %llu want %llu)",
3673 stripe_unit, obj_size);
3674 return -EINVAL;
3675 }
3676 stripe_count = ceph_decode_64(&p);
3677 if (stripe_count != 1) {
3678 rbd_warn(rbd_dev, "unsupported stripe count "
3679 "(got %llu want 1)", stripe_count);
3680 return -EINVAL;
3681 }
500d0c0f
AE
3682 rbd_dev->header.stripe_unit = stripe_unit;
3683 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3684
3685 return 0;
3686}
3687
9e15b77d
AE
3688static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3689{
3690 size_t image_id_size;
3691 char *image_id;
3692 void *p;
3693 void *end;
3694 size_t size;
3695 void *reply_buf = NULL;
3696 size_t len = 0;
3697 char *image_name = NULL;
3698 int ret;
3699
3700 rbd_assert(!rbd_dev->spec->image_name);
3701
69e7a02f
AE
3702 len = strlen(rbd_dev->spec->image_id);
3703 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3704 image_id = kmalloc(image_id_size, GFP_KERNEL);
3705 if (!image_id)
3706 return NULL;
3707
3708 p = image_id;
4157976b 3709 end = image_id + image_id_size;
57385b51 3710 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
3711
3712 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3713 reply_buf = kmalloc(size, GFP_KERNEL);
3714 if (!reply_buf)
3715 goto out;
3716
36be9a76 3717 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3718 "rbd", "dir_get_name",
3719 image_id, image_id_size,
4157976b 3720 reply_buf, size, NULL);
9e15b77d
AE
3721 if (ret < 0)
3722 goto out;
3723 p = reply_buf;
f40eb349
AE
3724 end = reply_buf + ret;
3725
9e15b77d
AE
3726 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3727 if (IS_ERR(image_name))
3728 image_name = NULL;
3729 else
3730 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3731out:
3732 kfree(reply_buf);
3733 kfree(image_id);
3734
3735 return image_name;
3736}
3737
3738/*
2e9f7f1c
AE
3739 * When an rbd image has a parent image, it is identified by the
3740 * pool, image, and snapshot ids (not names). This function fills
3741 * in the names for those ids. (It's OK if we can't figure out the
3742 * name for an image id, but the pool and snapshot ids should always
3743 * exist and have names.) All names in an rbd spec are dynamically
3744 * allocated.
e1d4213f
AE
3745 *
3746 * When an image being mapped (not a parent) is probed, we have the
3747 * pool name and pool id, image name and image id, and the snapshot
3748 * name. The only thing we're missing is the snapshot id.
2e9f7f1c
AE
3749 *
3750 * The set of snapshots for an image is not known until they have
3751 * been read by rbd_dev_snaps_update(), so we can't completely fill
3752 * in this information until after that has been called.
9e15b77d 3753 */
2e9f7f1c 3754static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 3755{
2e9f7f1c
AE
3756 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3757 struct rbd_spec *spec = rbd_dev->spec;
3758 const char *pool_name;
3759 const char *image_name;
3760 const char *snap_name;
9e15b77d
AE
3761 int ret;
3762
e1d4213f
AE
3763 /*
3764 * An image being mapped will have the pool name (etc.), but
3765 * we need to look up the snapshot id.
3766 */
2e9f7f1c
AE
3767 if (spec->pool_name) {
3768 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
e1d4213f
AE
3769 struct rbd_snap *snap;
3770
2e9f7f1c 3771 snap = snap_by_name(rbd_dev, spec->snap_name);
e1d4213f
AE
3772 if (!snap)
3773 return -ENOENT;
2e9f7f1c 3774 spec->snap_id = snap->id;
e1d4213f 3775 } else {
2e9f7f1c 3776 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
3777 }
3778
3779 return 0;
3780 }
9e15b77d 3781
2e9f7f1c 3782 /* Get the pool name; we have to make our own copy of this */
9e15b77d 3783
2e9f7f1c
AE
3784 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3785 if (!pool_name) {
3786 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
3787 return -EIO;
3788 }
2e9f7f1c
AE
3789 pool_name = kstrdup(pool_name, GFP_KERNEL);
3790 if (!pool_name)
9e15b77d
AE
3791 return -ENOMEM;
3792
3793 /* Fetch the image name; tolerate failure here */
3794
2e9f7f1c
AE
3795 image_name = rbd_dev_image_name(rbd_dev);
3796 if (!image_name)
06ecc6cb 3797 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 3798
2e9f7f1c 3799 /* Look up the snapshot name, and make a copy */
9e15b77d 3800
2e9f7f1c
AE
3801 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
3802 if (!snap_name) {
3803 rbd_warn(rbd_dev, "no snapshot with id %llu", spec->snap_id);
9e15b77d
AE
3804 ret = -EIO;
3805 goto out_err;
3806 }
2e9f7f1c
AE
3807 snap_name = kstrdup(snap_name, GFP_KERNEL);
3808 if (!snap_name) {
3809 ret = -ENOMEM;
9e15b77d 3810 goto out_err;
2e9f7f1c
AE
3811 }
3812
3813 spec->pool_name = pool_name;
3814 spec->image_name = image_name;
3815 spec->snap_name = snap_name;
9e15b77d
AE
3816
3817 return 0;
3818out_err:
2e9f7f1c
AE
3819 kfree(image_name);
3820 kfree(pool_name);
9e15b77d
AE
3821
3822 return ret;
3823}
3824
cc4a38bd 3825static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
3826{
3827 size_t size;
3828 int ret;
3829 void *reply_buf;
3830 void *p;
3831 void *end;
3832 u64 seq;
3833 u32 snap_count;
3834 struct ceph_snap_context *snapc;
3835 u32 i;
3836
3837 /*
3838 * We'll need room for the seq value (maximum snapshot id),
3839 * snapshot count, and array of that many snapshot ids.
3840 * For now we have a fixed upper limit on the number we're
3841 * prepared to receive.
3842 */
3843 size = sizeof (__le64) + sizeof (__le32) +
3844 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3845 reply_buf = kzalloc(size, GFP_KERNEL);
3846 if (!reply_buf)
3847 return -ENOMEM;
3848
36be9a76 3849 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3850 "rbd", "get_snapcontext", NULL, 0,
cc4a38bd 3851 reply_buf, size, NULL);
36be9a76 3852 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3853 if (ret < 0)
3854 goto out;
3855
35d489f9 3856 p = reply_buf;
57385b51
AE
3857 end = reply_buf + ret;
3858 ret = -ERANGE;
35d489f9
AE
3859 ceph_decode_64_safe(&p, end, seq, out);
3860 ceph_decode_32_safe(&p, end, snap_count, out);
3861
3862 /*
3863 * Make sure the reported number of snapshot ids wouldn't go
3864 * beyond the end of our buffer. But before checking that,
3865 * make sure the computed size of the snapshot context we
3866 * allocate is representable in a size_t.
3867 */
3868 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3869 / sizeof (u64)) {
3870 ret = -EINVAL;
3871 goto out;
3872 }
3873 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3874 goto out;
468521c1 3875 ret = 0;
35d489f9 3876
812164f8 3877 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
3878 if (!snapc) {
3879 ret = -ENOMEM;
3880 goto out;
3881 }
35d489f9 3882 snapc->seq = seq;
35d489f9
AE
3883 for (i = 0; i < snap_count; i++)
3884 snapc->snaps[i] = ceph_decode_64(&p);
3885
3886 rbd_dev->header.snapc = snapc;
3887
3888 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 3889 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
3890out:
3891 kfree(reply_buf);
3892
57385b51 3893 return ret;
35d489f9
AE
3894}
3895
cb75223d 3896static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
b8b1e2db
AE
3897{
3898 size_t size;
3899 void *reply_buf;
3900 __le64 snap_id;
3901 int ret;
3902 void *p;
3903 void *end;
b8b1e2db
AE
3904 char *snap_name;
3905
3906 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3907 reply_buf = kmalloc(size, GFP_KERNEL);
3908 if (!reply_buf)
3909 return ERR_PTR(-ENOMEM);
3910
acb1b6ca 3911 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
b8b1e2db 3912 snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
36be9a76 3913 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 3914 "rbd", "get_snapshot_name",
4157976b 3915 &snap_id, sizeof (snap_id),
07b2391f 3916 reply_buf, size, NULL);
36be9a76 3917 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
3918 if (ret < 0) {
3919 snap_name = ERR_PTR(ret);
b8b1e2db 3920 goto out;
f40eb349 3921 }
b8b1e2db
AE
3922
3923 p = reply_buf;
f40eb349 3924 end = reply_buf + ret;
e5c35534 3925 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 3926 if (IS_ERR(snap_name))
b8b1e2db 3927 goto out;
b8b1e2db 3928
f40eb349
AE
3929 dout(" snap_id 0x%016llx snap_name = %s\n",
3930 (unsigned long long)le64_to_cpu(snap_id), snap_name);
b8b1e2db
AE
3931out:
3932 kfree(reply_buf);
3933
f40eb349 3934 return snap_name;
b8b1e2db
AE
3935}
3936
cb75223d 3937static const char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
b8b1e2db
AE
3938 u64 *snap_size, u64 *snap_features)
3939{
e0b49868 3940 u64 snap_id;
acb1b6ca
AE
3941 u64 size;
3942 u64 features;
cb75223d 3943 const char *snap_name;
b8b1e2db
AE
3944 int ret;
3945
acb1b6ca 3946 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
b8b1e2db 3947 snap_id = rbd_dev->header.snapc->snaps[which];
acb1b6ca 3948 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
b8b1e2db 3949 if (ret)
acb1b6ca
AE
3950 goto out_err;
3951
3952 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
b8b1e2db 3953 if (ret)
acb1b6ca
AE
3954 goto out_err;
3955
3956 snap_name = rbd_dev_v2_snap_name(rbd_dev, which);
3957 if (!IS_ERR(snap_name)) {
3958 *snap_size = size;
3959 *snap_features = features;
3960 }
b8b1e2db 3961
acb1b6ca
AE
3962 return snap_name;
3963out_err:
3964 return ERR_PTR(ret);
b8b1e2db
AE
3965}
3966
cb75223d 3967static const char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
b8b1e2db
AE
3968 u64 *snap_size, u64 *snap_features)
3969{
3970 if (rbd_dev->image_format == 1)
3971 return rbd_dev_v1_snap_info(rbd_dev, which,
3972 snap_size, snap_features);
3973 if (rbd_dev->image_format == 2)
3974 return rbd_dev_v2_snap_info(rbd_dev, which,
3975 snap_size, snap_features);
3976 return ERR_PTR(-EINVAL);
3977}
3978
cc4a38bd 3979static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
117973fb
AE
3980{
3981 int ret;
117973fb
AE
3982
3983 down_write(&rbd_dev->header_rwsem);
3984
117973fb
AE
3985 ret = rbd_dev_v2_image_size(rbd_dev);
3986 if (ret)
3987 goto out;
117973fb
AE
3988 rbd_update_mapping_size(rbd_dev);
3989
cc4a38bd 3990 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb
AE
3991 dout("rbd_dev_v2_snap_context returned %d\n", ret);
3992 if (ret)
3993 goto out;
3994 ret = rbd_dev_snaps_update(rbd_dev);
3995 dout("rbd_dev_snaps_update returned %d\n", ret);
3996 if (ret)
3997 goto out;
117973fb
AE
3998out:
3999 up_write(&rbd_dev->header_rwsem);
4000
4001 return ret;
4002}
4003
dfc5606d 4004/*
35938150
AE
4005 * Scan the rbd device's current snapshot list and compare it to the
4006 * newly-received snapshot context. Remove any existing snapshots
4007 * not present in the new snapshot context. Add a new snapshot for
4008 * any snaphots in the snapshot context not in the current list.
4009 * And verify there are no changes to snapshots we already know
4010 * about.
4011 *
4012 * Assumes the snapshots in the snapshot context are sorted by
4013 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
4014 * are also maintained in that order.)
522a0cc0
AE
4015 *
4016 * Note that any error occurs while updating the snapshot list
4017 * aborts the update, and the entire list is cleared. The snapshot
4018 * list becomes inconsistent at that point anyway, so it might as
4019 * well be empty.
dfc5606d 4020 */
304f6808 4021static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
dfc5606d 4022{
35938150
AE
4023 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4024 const u32 snap_count = snapc->num_snaps;
35938150
AE
4025 struct list_head *head = &rbd_dev->snaps;
4026 struct list_head *links = head->next;
4027 u32 index = 0;
522a0cc0 4028 int ret = 0;
dfc5606d 4029
522a0cc0 4030 dout("%s: snap count is %u\n", __func__, (unsigned int)snap_count);
35938150
AE
4031 while (index < snap_count || links != head) {
4032 u64 snap_id;
4033 struct rbd_snap *snap;
cb75223d 4034 const char *snap_name;
cd892126
AE
4035 u64 snap_size = 0;
4036 u64 snap_features = 0;
dfc5606d 4037
35938150
AE
4038 snap_id = index < snap_count ? snapc->snaps[index]
4039 : CEPH_NOSNAP;
4040 snap = links != head ? list_entry(links, struct rbd_snap, node)
4041 : NULL;
aafb230e 4042 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
dfc5606d 4043
35938150
AE
4044 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4045 struct list_head *next = links->next;
dfc5606d 4046
6d292906
AE
4047 /*
4048 * A previously-existing snapshot is not in
4049 * the new snap context.
4050 *
522a0cc0
AE
4051 * If the now-missing snapshot is the one
4052 * the image represents, clear its existence
4053 * flag so we can avoid sending any more
4054 * requests to it.
6d292906 4055 */
0d7dbfce 4056 if (rbd_dev->spec->snap_id == snap->id)
6d292906 4057 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3e83b65b 4058 dout("removing %ssnap id %llu\n",
0d7dbfce
AE
4059 rbd_dev->spec->snap_id == snap->id ?
4060 "mapped " : "",
522a0cc0 4061 (unsigned long long)snap->id);
6087b51b
AE
4062
4063 list_del(&snap->node);
4064 rbd_snap_destroy(snap);
35938150
AE
4065
4066 /* Done with this list entry; advance */
4067
4068 links = next;
dfc5606d
YS
4069 continue;
4070 }
35938150 4071
b8b1e2db
AE
4072 snap_name = rbd_dev_snap_info(rbd_dev, index,
4073 &snap_size, &snap_features);
522a0cc0
AE
4074 if (IS_ERR(snap_name)) {
4075 ret = PTR_ERR(snap_name);
4076 dout("failed to get snap info, error %d\n", ret);
4077 goto out_err;
4078 }
cd892126 4079
522a0cc0
AE
4080 dout("entry %u: snap_id = %llu\n", (unsigned int)snap_count,
4081 (unsigned long long)snap_id);
35938150
AE
4082 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4083 struct rbd_snap *new_snap;
4084
4085 /* We haven't seen this snapshot before */
4086
6087b51b 4087 new_snap = rbd_snap_create(rbd_dev, snap_name,
cd892126 4088 snap_id, snap_size, snap_features);
9fcbb800 4089 if (IS_ERR(new_snap)) {
522a0cc0
AE
4090 ret = PTR_ERR(new_snap);
4091 dout(" failed to add dev, error %d\n", ret);
4092 goto out_err;
9fcbb800 4093 }
35938150
AE
4094
4095 /* New goes before existing, or at end of list */
4096
9fcbb800 4097 dout(" added dev%s\n", snap ? "" : " at end\n");
35938150
AE
4098 if (snap)
4099 list_add_tail(&new_snap->node, &snap->node);
4100 else
523f3258 4101 list_add_tail(&new_snap->node, head);
35938150
AE
4102 } else {
4103 /* Already have this one */
4104
9fcbb800
AE
4105 dout(" already present\n");
4106
cd892126 4107 rbd_assert(snap->size == snap_size);
aafb230e 4108 rbd_assert(!strcmp(snap->name, snap_name));
cd892126 4109 rbd_assert(snap->features == snap_features);
35938150
AE
4110
4111 /* Done with this list entry; advance */
4112
4113 links = links->next;
dfc5606d 4114 }
35938150
AE
4115
4116 /* Advance to the next entry in the snapshot context */
4117
4118 index++;
dfc5606d 4119 }
9fcbb800 4120 dout("%s: done\n", __func__);
dfc5606d
YS
4121
4122 return 0;
522a0cc0
AE
4123out_err:
4124 rbd_remove_all_snaps(rbd_dev);
4125
4126 return ret;
dfc5606d
YS
4127}
4128
dfc5606d
YS
4129static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4130{
dfc5606d 4131 struct device *dev;
cd789ab9 4132 int ret;
dfc5606d
YS
4133
4134 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4135
cd789ab9 4136 dev = &rbd_dev->dev;
dfc5606d
YS
4137 dev->bus = &rbd_bus_type;
4138 dev->type = &rbd_device_type;
4139 dev->parent = &rbd_root_dev;
200a6a8b 4140 dev->release = rbd_dev_device_release;
de71a297 4141 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4142 ret = device_register(dev);
dfc5606d 4143
dfc5606d 4144 mutex_unlock(&ctl_mutex);
cd789ab9 4145
dfc5606d 4146 return ret;
602adf40
YS
4147}
4148
dfc5606d
YS
4149static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4150{
4151 device_unregister(&rbd_dev->dev);
4152}
4153
e2839308 4154static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4155
4156/*
499afd5b
AE
4157 * Get a unique rbd identifier for the given new rbd_dev, and add
4158 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4159 */
e2839308 4160static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4161{
e2839308 4162 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4163
4164 spin_lock(&rbd_dev_list_lock);
4165 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4166 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4167 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4168 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4169}
b7f23c36 4170
1ddbe94e 4171/*
499afd5b
AE
4172 * Remove an rbd_dev from the global list, and record that its
4173 * identifier is no longer in use.
1ddbe94e 4174 */
e2839308 4175static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4176{
d184f6bf 4177 struct list_head *tmp;
de71a297 4178 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4179 int max_id;
4180
aafb230e 4181 rbd_assert(rbd_id > 0);
499afd5b 4182
e2839308
AE
4183 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4184 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4185 spin_lock(&rbd_dev_list_lock);
4186 list_del_init(&rbd_dev->node);
d184f6bf
AE
4187
4188 /*
4189 * If the id being "put" is not the current maximum, there
4190 * is nothing special we need to do.
4191 */
e2839308 4192 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4193 spin_unlock(&rbd_dev_list_lock);
4194 return;
4195 }
4196
4197 /*
4198 * We need to update the current maximum id. Search the
4199 * list to find out what it is. We're more likely to find
4200 * the maximum at the end, so search the list backward.
4201 */
4202 max_id = 0;
4203 list_for_each_prev(tmp, &rbd_dev_list) {
4204 struct rbd_device *rbd_dev;
4205
4206 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4207 if (rbd_dev->dev_id > max_id)
4208 max_id = rbd_dev->dev_id;
d184f6bf 4209 }
499afd5b 4210 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4211
1ddbe94e 4212 /*
e2839308 4213 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4214 * which case it now accurately reflects the new maximum.
4215 * Be careful not to overwrite the maximum value in that
4216 * case.
1ddbe94e 4217 */
e2839308
AE
4218 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4219 dout(" max dev id has been reset\n");
b7f23c36
AE
4220}
4221
e28fff26
AE
4222/*
4223 * Skips over white space at *buf, and updates *buf to point to the
4224 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4225 * the token (string of non-white space characters) found. Note
4226 * that *buf must be terminated with '\0'.
e28fff26
AE
4227 */
4228static inline size_t next_token(const char **buf)
4229{
4230 /*
4231 * These are the characters that produce nonzero for
4232 * isspace() in the "C" and "POSIX" locales.
4233 */
4234 const char *spaces = " \f\n\r\t\v";
4235
4236 *buf += strspn(*buf, spaces); /* Find start of token */
4237
4238 return strcspn(*buf, spaces); /* Return token length */
4239}
4240
4241/*
4242 * Finds the next token in *buf, and if the provided token buffer is
4243 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4244 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4245 * must be terminated with '\0' on entry.
e28fff26
AE
4246 *
4247 * Returns the length of the token found (not including the '\0').
4248 * Return value will be 0 if no token is found, and it will be >=
4249 * token_size if the token would not fit.
4250 *
593a9e7b 4251 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4252 * found token. Note that this occurs even if the token buffer is
4253 * too small to hold it.
4254 */
4255static inline size_t copy_token(const char **buf,
4256 char *token,
4257 size_t token_size)
4258{
4259 size_t len;
4260
4261 len = next_token(buf);
4262 if (len < token_size) {
4263 memcpy(token, *buf, len);
4264 *(token + len) = '\0';
4265 }
4266 *buf += len;
4267
4268 return len;
4269}
4270
ea3352f4
AE
4271/*
4272 * Finds the next token in *buf, dynamically allocates a buffer big
4273 * enough to hold a copy of it, and copies the token into the new
4274 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4275 * that a duplicate buffer is created even for a zero-length token.
4276 *
4277 * Returns a pointer to the newly-allocated duplicate, or a null
4278 * pointer if memory for the duplicate was not available. If
4279 * the lenp argument is a non-null pointer, the length of the token
4280 * (not including the '\0') is returned in *lenp.
4281 *
4282 * If successful, the *buf pointer will be updated to point beyond
4283 * the end of the found token.
4284 *
4285 * Note: uses GFP_KERNEL for allocation.
4286 */
4287static inline char *dup_token(const char **buf, size_t *lenp)
4288{
4289 char *dup;
4290 size_t len;
4291
4292 len = next_token(buf);
4caf35f9 4293 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4294 if (!dup)
4295 return NULL;
ea3352f4
AE
4296 *(dup + len) = '\0';
4297 *buf += len;
4298
4299 if (lenp)
4300 *lenp = len;
4301
4302 return dup;
4303}
4304
a725f65e 4305/*
859c31df
AE
4306 * Parse the options provided for an "rbd add" (i.e., rbd image
4307 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4308 * and the data written is passed here via a NUL-terminated buffer.
4309 * Returns 0 if successful or an error code otherwise.
d22f76e7 4310 *
859c31df
AE
4311 * The information extracted from these options is recorded in
4312 * the other parameters which return dynamically-allocated
4313 * structures:
4314 * ceph_opts
4315 * The address of a pointer that will refer to a ceph options
4316 * structure. Caller must release the returned pointer using
4317 * ceph_destroy_options() when it is no longer needed.
4318 * rbd_opts
4319 * Address of an rbd options pointer. Fully initialized by
4320 * this function; caller must release with kfree().
4321 * spec
4322 * Address of an rbd image specification pointer. Fully
4323 * initialized by this function based on parsed options.
4324 * Caller must release with rbd_spec_put().
4325 *
4326 * The options passed take this form:
4327 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4328 * where:
4329 * <mon_addrs>
4330 * A comma-separated list of one or more monitor addresses.
4331 * A monitor address is an ip address, optionally followed
4332 * by a port number (separated by a colon).
4333 * I.e.: ip1[:port1][,ip2[:port2]...]
4334 * <options>
4335 * A comma-separated list of ceph and/or rbd options.
4336 * <pool_name>
4337 * The name of the rados pool containing the rbd image.
4338 * <image_name>
4339 * The name of the image in that pool to map.
4340 * <snap_id>
4341 * An optional snapshot id. If provided, the mapping will
4342 * present data from the image at the time that snapshot was
4343 * created. The image head is used if no snapshot id is
4344 * provided. Snapshot mappings are always read-only.
a725f65e 4345 */
859c31df 4346static int rbd_add_parse_args(const char *buf,
dc79b113 4347 struct ceph_options **ceph_opts,
859c31df
AE
4348 struct rbd_options **opts,
4349 struct rbd_spec **rbd_spec)
e28fff26 4350{
d22f76e7 4351 size_t len;
859c31df 4352 char *options;
0ddebc0c 4353 const char *mon_addrs;
ecb4dc22 4354 char *snap_name;
0ddebc0c 4355 size_t mon_addrs_size;
859c31df 4356 struct rbd_spec *spec = NULL;
4e9afeba 4357 struct rbd_options *rbd_opts = NULL;
859c31df 4358 struct ceph_options *copts;
dc79b113 4359 int ret;
e28fff26
AE
4360
4361 /* The first four tokens are required */
4362
7ef3214a 4363 len = next_token(&buf);
4fb5d671
AE
4364 if (!len) {
4365 rbd_warn(NULL, "no monitor address(es) provided");
4366 return -EINVAL;
4367 }
0ddebc0c 4368 mon_addrs = buf;
f28e565a 4369 mon_addrs_size = len + 1;
7ef3214a 4370 buf += len;
a725f65e 4371
dc79b113 4372 ret = -EINVAL;
f28e565a
AE
4373 options = dup_token(&buf, NULL);
4374 if (!options)
dc79b113 4375 return -ENOMEM;
4fb5d671
AE
4376 if (!*options) {
4377 rbd_warn(NULL, "no options provided");
4378 goto out_err;
4379 }
e28fff26 4380
859c31df
AE
4381 spec = rbd_spec_alloc();
4382 if (!spec)
f28e565a 4383 goto out_mem;
859c31df
AE
4384
4385 spec->pool_name = dup_token(&buf, NULL);
4386 if (!spec->pool_name)
4387 goto out_mem;
4fb5d671
AE
4388 if (!*spec->pool_name) {
4389 rbd_warn(NULL, "no pool name provided");
4390 goto out_err;
4391 }
e28fff26 4392
69e7a02f 4393 spec->image_name = dup_token(&buf, NULL);
859c31df 4394 if (!spec->image_name)
f28e565a 4395 goto out_mem;
4fb5d671
AE
4396 if (!*spec->image_name) {
4397 rbd_warn(NULL, "no image name provided");
4398 goto out_err;
4399 }
d4b125e9 4400
f28e565a
AE
4401 /*
4402 * Snapshot name is optional; default is to use "-"
4403 * (indicating the head/no snapshot).
4404 */
3feeb894 4405 len = next_token(&buf);
820a5f3e 4406 if (!len) {
3feeb894
AE
4407 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4408 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4409 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4410 ret = -ENAMETOOLONG;
f28e565a 4411 goto out_err;
849b4260 4412 }
ecb4dc22
AE
4413 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4414 if (!snap_name)
f28e565a 4415 goto out_mem;
ecb4dc22
AE
4416 *(snap_name + len) = '\0';
4417 spec->snap_name = snap_name;
e5c35534 4418
0ddebc0c 4419 /* Initialize all rbd options to the defaults */
e28fff26 4420
4e9afeba
AE
4421 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4422 if (!rbd_opts)
4423 goto out_mem;
4424
4425 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4426
859c31df 4427 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4428 mon_addrs + mon_addrs_size - 1,
4e9afeba 4429 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4430 if (IS_ERR(copts)) {
4431 ret = PTR_ERR(copts);
dc79b113
AE
4432 goto out_err;
4433 }
859c31df
AE
4434 kfree(options);
4435
4436 *ceph_opts = copts;
4e9afeba 4437 *opts = rbd_opts;
859c31df 4438 *rbd_spec = spec;
0ddebc0c 4439
dc79b113 4440 return 0;
f28e565a 4441out_mem:
dc79b113 4442 ret = -ENOMEM;
d22f76e7 4443out_err:
859c31df
AE
4444 kfree(rbd_opts);
4445 rbd_spec_put(spec);
f28e565a 4446 kfree(options);
d22f76e7 4447
dc79b113 4448 return ret;
a725f65e
AE
4449}
4450
589d30e0
AE
4451/*
4452 * An rbd format 2 image has a unique identifier, distinct from the
4453 * name given to it by the user. Internally, that identifier is
4454 * what's used to specify the names of objects related to the image.
4455 *
4456 * A special "rbd id" object is used to map an rbd image name to its
4457 * id. If that object doesn't exist, then there is no v2 rbd image
4458 * with the supplied name.
4459 *
4460 * This function will record the given rbd_dev's image_id field if
4461 * it can be determined, and in that case will return 0. If any
4462 * errors occur a negative errno will be returned and the rbd_dev's
4463 * image_id field will be unchanged (and should be NULL).
4464 */
4465static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4466{
4467 int ret;
4468 size_t size;
4469 char *object_name;
4470 void *response;
c0fba368 4471 char *image_id;
2f82ee54 4472
2c0d0a10
AE
4473 /*
4474 * When probing a parent image, the image id is already
4475 * known (and the image name likely is not). There's no
c0fba368
AE
4476 * need to fetch the image id again in this case. We
4477 * do still need to set the image format though.
2c0d0a10 4478 */
c0fba368
AE
4479 if (rbd_dev->spec->image_id) {
4480 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4481
2c0d0a10 4482 return 0;
c0fba368 4483 }
2c0d0a10 4484
589d30e0
AE
4485 /*
4486 * First, see if the format 2 image id file exists, and if
4487 * so, get the image's persistent id from it.
4488 */
69e7a02f 4489 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4490 object_name = kmalloc(size, GFP_NOIO);
4491 if (!object_name)
4492 return -ENOMEM;
0d7dbfce 4493 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4494 dout("rbd id object name is %s\n", object_name);
4495
4496 /* Response will be an encoded string, which includes a length */
4497
4498 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4499 response = kzalloc(size, GFP_NOIO);
4500 if (!response) {
4501 ret = -ENOMEM;
4502 goto out;
4503 }
4504
c0fba368
AE
4505 /* If it doesn't exist we'll assume it's a format 1 image */
4506
36be9a76 4507 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4508 "rbd", "get_id", NULL, 0,
07b2391f 4509 response, RBD_IMAGE_ID_LEN_MAX, NULL);
36be9a76 4510 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4511 if (ret == -ENOENT) {
4512 image_id = kstrdup("", GFP_KERNEL);
4513 ret = image_id ? 0 : -ENOMEM;
4514 if (!ret)
4515 rbd_dev->image_format = 1;
4516 } else if (ret > sizeof (__le32)) {
4517 void *p = response;
4518
4519 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4520 NULL, GFP_NOIO);
c0fba368
AE
4521 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4522 if (!ret)
4523 rbd_dev->image_format = 2;
589d30e0 4524 } else {
c0fba368
AE
4525 ret = -EINVAL;
4526 }
4527
4528 if (!ret) {
4529 rbd_dev->spec->image_id = image_id;
4530 dout("image_id is %s\n", image_id);
589d30e0
AE
4531 }
4532out:
4533 kfree(response);
4534 kfree(object_name);
4535
4536 return ret;
4537}
4538
6fd48b3b
AE
4539/* Undo whatever state changes are made by v1 or v2 image probe */
4540
4541static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4542{
4543 struct rbd_image_header *header;
4544
4545 rbd_dev_remove_parent(rbd_dev);
4546 rbd_spec_put(rbd_dev->parent_spec);
4547 rbd_dev->parent_spec = NULL;
4548 rbd_dev->parent_overlap = 0;
4549
4550 /* Free dynamic fields from the header, then zero it out */
4551
4552 header = &rbd_dev->header;
812164f8 4553 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4554 kfree(header->snap_sizes);
4555 kfree(header->snap_names);
4556 kfree(header->object_prefix);
4557 memset(header, 0, sizeof (*header));
4558}
4559
a30b71b9
AE
4560static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4561{
4562 int ret;
a30b71b9
AE
4563
4564 /* Populate rbd image metadata */
4565
4566 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4567 if (ret < 0)
4568 goto out_err;
86b00e0d
AE
4569
4570 /* Version 1 images have no parent (no layering) */
4571
4572 rbd_dev->parent_spec = NULL;
4573 rbd_dev->parent_overlap = 0;
4574
a30b71b9
AE
4575 dout("discovered version 1 image, header name is %s\n",
4576 rbd_dev->header_name);
4577
4578 return 0;
4579
4580out_err:
4581 kfree(rbd_dev->header_name);
4582 rbd_dev->header_name = NULL;
0d7dbfce
AE
4583 kfree(rbd_dev->spec->image_id);
4584 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
4585
4586 return ret;
4587}
4588
4589static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4590{
9d475de5 4591 int ret;
a30b71b9 4592
9d475de5 4593 ret = rbd_dev_v2_image_size(rbd_dev);
57385b51 4594 if (ret)
1e130199
AE
4595 goto out_err;
4596
4597 /* Get the object prefix (a.k.a. block_name) for the image */
4598
4599 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4600 if (ret)
b1b5402a
AE
4601 goto out_err;
4602
d889140c 4603 /* Get the and check features for the image */
b1b5402a
AE
4604
4605 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4606 if (ret)
9d475de5 4607 goto out_err;
35d489f9 4608
86b00e0d
AE
4609 /* If the image supports layering, get the parent info */
4610
4611 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4612 ret = rbd_dev_v2_parent_info(rbd_dev);
57385b51 4613 if (ret)
86b00e0d 4614 goto out_err;
96882f55
AE
4615
4616 /*
4617 * Don't print a warning for parent images. We can
4618 * tell this point because we won't know its pool
4619 * name yet (just its pool id).
4620 */
4621 if (rbd_dev->spec->pool_name)
4622 rbd_warn(rbd_dev, "WARNING: kernel layering "
4623 "is EXPERIMENTAL!");
86b00e0d
AE
4624 }
4625
cc070d59
AE
4626 /* If the image supports fancy striping, get its parameters */
4627
4628 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4629 ret = rbd_dev_v2_striping_info(rbd_dev);
4630 if (ret < 0)
4631 goto out_err;
4632 }
4633
6e14b1a6
AE
4634 /* crypto and compression type aren't (yet) supported for v2 images */
4635
4636 rbd_dev->header.crypt_type = 0;
4637 rbd_dev->header.comp_type = 0;
35d489f9 4638
6e14b1a6
AE
4639 /* Get the snapshot context, plus the header version */
4640
cc4a38bd 4641 ret = rbd_dev_v2_snap_context(rbd_dev);
35d489f9
AE
4642 if (ret)
4643 goto out_err;
6e14b1a6 4644
a30b71b9
AE
4645 dout("discovered version 2 image, header name is %s\n",
4646 rbd_dev->header_name);
4647
35152979 4648 return 0;
9d475de5 4649out_err:
86b00e0d
AE
4650 rbd_dev->parent_overlap = 0;
4651 rbd_spec_put(rbd_dev->parent_spec);
4652 rbd_dev->parent_spec = NULL;
9d475de5
AE
4653 kfree(rbd_dev->header_name);
4654 rbd_dev->header_name = NULL;
1e130199
AE
4655 kfree(rbd_dev->header.object_prefix);
4656 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4657
4658 return ret;
a30b71b9
AE
4659}
4660
124afba2 4661static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4662{
2f82ee54 4663 struct rbd_device *parent = NULL;
124afba2
AE
4664 struct rbd_spec *parent_spec;
4665 struct rbd_client *rbdc;
4666 int ret;
4667
4668 if (!rbd_dev->parent_spec)
4669 return 0;
4670 /*
4671 * We need to pass a reference to the client and the parent
4672 * spec when creating the parent rbd_dev. Images related by
4673 * parent/child relationships always share both.
4674 */
4675 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4676 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4677
4678 ret = -ENOMEM;
4679 parent = rbd_dev_create(rbdc, parent_spec);
4680 if (!parent)
4681 goto out_err;
4682
4683 ret = rbd_dev_image_probe(parent);
4684 if (ret < 0)
4685 goto out_err;
4686 rbd_dev->parent = parent;
4687
4688 return 0;
4689out_err:
4690 if (parent) {
4691 rbd_spec_put(rbd_dev->parent_spec);
4692 kfree(rbd_dev->header_name);
4693 rbd_dev_destroy(parent);
4694 } else {
4695 rbd_put_client(rbdc);
4696 rbd_spec_put(parent_spec);
4697 }
4698
4699 return ret;
4700}
4701
200a6a8b 4702static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4703{
83a06263 4704 int ret;
d1cf5788
AE
4705
4706 ret = rbd_dev_mapping_set(rbd_dev);
83a06263 4707 if (ret)
9bb81c9b 4708 return ret;
5de10f3b 4709
83a06263
AE
4710 /* generate unique id: find highest unique id, add one */
4711 rbd_dev_id_get(rbd_dev);
4712
4713 /* Fill in the device name, now that we have its id. */
4714 BUILD_BUG_ON(DEV_NAME_LEN
4715 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4716 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4717
4718 /* Get our block major device number. */
4719
4720 ret = register_blkdev(0, rbd_dev->name);
4721 if (ret < 0)
4722 goto err_out_id;
4723 rbd_dev->major = ret;
4724
4725 /* Set up the blkdev mapping. */
4726
4727 ret = rbd_init_disk(rbd_dev);
4728 if (ret)
4729 goto err_out_blkdev;
4730
4731 ret = rbd_bus_add_dev(rbd_dev);
4732 if (ret)
4733 goto err_out_disk;
4734
83a06263
AE
4735 /* Everything's ready. Announce the disk to the world. */
4736
b5156e76 4737 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
129b79d4 4738 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4739 add_disk(rbd_dev->disk);
4740
4741 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4742 (unsigned long long) rbd_dev->mapping.size);
4743
4744 return ret;
2f82ee54 4745
83a06263
AE
4746err_out_disk:
4747 rbd_free_disk(rbd_dev);
4748err_out_blkdev:
4749 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4750err_out_id:
4751 rbd_dev_id_put(rbd_dev);
d1cf5788 4752 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4753
4754 return ret;
4755}
4756
332bb12d
AE
4757static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4758{
4759 struct rbd_spec *spec = rbd_dev->spec;
4760 size_t size;
4761
4762 /* Record the header object name for this rbd image. */
4763
4764 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4765
4766 if (rbd_dev->image_format == 1)
4767 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4768 else
4769 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4770
4771 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4772 if (!rbd_dev->header_name)
4773 return -ENOMEM;
4774
4775 if (rbd_dev->image_format == 1)
4776 sprintf(rbd_dev->header_name, "%s%s",
4777 spec->image_name, RBD_SUFFIX);
4778 else
4779 sprintf(rbd_dev->header_name, "%s%s",
4780 RBD_HEADER_PREFIX, spec->image_id);
4781 return 0;
4782}
4783
200a6a8b
AE
4784static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4785{
6fd48b3b
AE
4786 int ret;
4787
4788 rbd_remove_all_snaps(rbd_dev);
4789 rbd_dev_unprobe(rbd_dev);
4790 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4791 if (ret)
4792 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
200a6a8b 4793 kfree(rbd_dev->header_name);
6fd48b3b
AE
4794 rbd_dev->header_name = NULL;
4795 rbd_dev->image_format = 0;
4796 kfree(rbd_dev->spec->image_id);
4797 rbd_dev->spec->image_id = NULL;
4798
200a6a8b
AE
4799 rbd_dev_destroy(rbd_dev);
4800}
4801
a30b71b9
AE
4802/*
4803 * Probe for the existence of the header object for the given rbd
4804 * device. For format 2 images this includes determining the image
4805 * id.
4806 */
71f293e2 4807static int rbd_dev_image_probe(struct rbd_device *rbd_dev)
a30b71b9
AE
4808{
4809 int ret;
b644de2b 4810 int tmp;
a30b71b9
AE
4811
4812 /*
4813 * Get the id from the image id object. If it's not a
4814 * format 2 image, we'll get ENOENT back, and we'll assume
4815 * it's a format 1 image.
4816 */
4817 ret = rbd_dev_image_id(rbd_dev);
4818 if (ret)
c0fba368
AE
4819 return ret;
4820 rbd_assert(rbd_dev->spec->image_id);
4821 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4822
332bb12d
AE
4823 ret = rbd_dev_header_name(rbd_dev);
4824 if (ret)
4825 goto err_out_format;
4826
b644de2b
AE
4827 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4828 if (ret)
4829 goto out_header_name;
4830
c0fba368 4831 if (rbd_dev->image_format == 1)
a30b71b9
AE
4832 ret = rbd_dev_v1_probe(rbd_dev);
4833 else
4834 ret = rbd_dev_v2_probe(rbd_dev);
5655c4d9 4835 if (ret)
b644de2b 4836 goto err_out_watch;
83a06263 4837
9bb81c9b
AE
4838 ret = rbd_dev_snaps_update(rbd_dev);
4839 if (ret)
6fd48b3b 4840 goto err_out_probe;
9bb81c9b
AE
4841
4842 ret = rbd_dev_spec_update(rbd_dev);
4843 if (ret)
4844 goto err_out_snaps;
4845
4846 ret = rbd_dev_probe_parent(rbd_dev);
6fd48b3b
AE
4847 if (!ret)
4848 return 0;
83a06263 4849
9bb81c9b
AE
4850err_out_snaps:
4851 rbd_remove_all_snaps(rbd_dev);
6fd48b3b
AE
4852err_out_probe:
4853 rbd_dev_unprobe(rbd_dev);
b644de2b
AE
4854err_out_watch:
4855 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4856 if (tmp)
4857 rbd_warn(rbd_dev, "unable to tear down watch request\n");
332bb12d
AE
4858out_header_name:
4859 kfree(rbd_dev->header_name);
4860 rbd_dev->header_name = NULL;
4861err_out_format:
4862 rbd_dev->image_format = 0;
5655c4d9
AE
4863 kfree(rbd_dev->spec->image_id);
4864 rbd_dev->spec->image_id = NULL;
4865
4866 dout("probe failed, returning %d\n", ret);
4867
a30b71b9
AE
4868 return ret;
4869}
4870
59c2be1e
YS
4871static ssize_t rbd_add(struct bus_type *bus,
4872 const char *buf,
4873 size_t count)
602adf40 4874{
cb8627c7 4875 struct rbd_device *rbd_dev = NULL;
dc79b113 4876 struct ceph_options *ceph_opts = NULL;
4e9afeba 4877 struct rbd_options *rbd_opts = NULL;
859c31df 4878 struct rbd_spec *spec = NULL;
9d3997fd 4879 struct rbd_client *rbdc;
27cc2594
AE
4880 struct ceph_osd_client *osdc;
4881 int rc = -ENOMEM;
602adf40
YS
4882
4883 if (!try_module_get(THIS_MODULE))
4884 return -ENODEV;
4885
602adf40 4886 /* parse add command */
859c31df 4887 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4888 if (rc < 0)
bd4ba655 4889 goto err_out_module;
78cea76e 4890
9d3997fd
AE
4891 rbdc = rbd_get_client(ceph_opts);
4892 if (IS_ERR(rbdc)) {
4893 rc = PTR_ERR(rbdc);
0ddebc0c 4894 goto err_out_args;
9d3997fd 4895 }
c53d5893 4896 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4897
602adf40 4898 /* pick the pool */
9d3997fd 4899 osdc = &rbdc->client->osdc;
859c31df 4900 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4901 if (rc < 0)
4902 goto err_out_client;
c0cd10db 4903 spec->pool_id = (u64)rc;
859c31df 4904
0903e875
AE
4905 /* The ceph file layout needs to fit pool id in 32 bits */
4906
c0cd10db
AE
4907 if (spec->pool_id > (u64)U32_MAX) {
4908 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4909 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
4910 rc = -EIO;
4911 goto err_out_client;
4912 }
4913
c53d5893 4914 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4915 if (!rbd_dev)
4916 goto err_out_client;
c53d5893
AE
4917 rbdc = NULL; /* rbd_dev now owns this */
4918 spec = NULL; /* rbd_dev now owns this */
602adf40 4919
bd4ba655 4920 rbd_dev->mapping.read_only = rbd_opts->read_only;
c53d5893
AE
4921 kfree(rbd_opts);
4922 rbd_opts = NULL; /* done with this */
bd4ba655 4923
71f293e2 4924 rc = rbd_dev_image_probe(rbd_dev);
a30b71b9 4925 if (rc < 0)
c53d5893 4926 goto err_out_rbd_dev;
05fd6f6f 4927
b536f69a
AE
4928 rc = rbd_dev_device_setup(rbd_dev);
4929 if (!rc)
4930 return count;
4931
4932 rbd_dev_image_release(rbd_dev);
c53d5893
AE
4933err_out_rbd_dev:
4934 rbd_dev_destroy(rbd_dev);
bd4ba655 4935err_out_client:
9d3997fd 4936 rbd_put_client(rbdc);
0ddebc0c 4937err_out_args:
78cea76e
AE
4938 if (ceph_opts)
4939 ceph_destroy_options(ceph_opts);
4e9afeba 4940 kfree(rbd_opts);
859c31df 4941 rbd_spec_put(spec);
bd4ba655
AE
4942err_out_module:
4943 module_put(THIS_MODULE);
27cc2594 4944
602adf40 4945 dout("Error adding device %s\n", buf);
27cc2594 4946
c0cd10db 4947 return (ssize_t)rc;
602adf40
YS
4948}
4949
de71a297 4950static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4951{
4952 struct list_head *tmp;
4953 struct rbd_device *rbd_dev;
4954
e124a82f 4955 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4956 list_for_each(tmp, &rbd_dev_list) {
4957 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4958 if (rbd_dev->dev_id == dev_id) {
e124a82f 4959 spin_unlock(&rbd_dev_list_lock);
602adf40 4960 return rbd_dev;
e124a82f 4961 }
602adf40 4962 }
e124a82f 4963 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4964 return NULL;
4965}
4966
200a6a8b 4967static void rbd_dev_device_release(struct device *dev)
602adf40 4968{
593a9e7b 4969 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4970
602adf40 4971 rbd_free_disk(rbd_dev);
200a6a8b
AE
4972 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4973 rbd_dev_clear_mapping(rbd_dev);
602adf40 4974 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 4975 rbd_dev->major = 0;
e2839308 4976 rbd_dev_id_put(rbd_dev);
d1cf5788 4977 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
4978}
4979
05a46afd
AE
4980static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4981{
ad945fc1 4982 while (rbd_dev->parent) {
05a46afd
AE
4983 struct rbd_device *first = rbd_dev;
4984 struct rbd_device *second = first->parent;
4985 struct rbd_device *third;
4986
4987 /*
4988 * Follow to the parent with no grandparent and
4989 * remove it.
4990 */
4991 while (second && (third = second->parent)) {
4992 first = second;
4993 second = third;
4994 }
ad945fc1 4995 rbd_assert(second);
8ad42cd0 4996 rbd_dev_image_release(second);
ad945fc1
AE
4997 first->parent = NULL;
4998 first->parent_overlap = 0;
4999
5000 rbd_assert(first->parent_spec);
05a46afd
AE
5001 rbd_spec_put(first->parent_spec);
5002 first->parent_spec = NULL;
05a46afd
AE
5003 }
5004}
5005
dfc5606d
YS
5006static ssize_t rbd_remove(struct bus_type *bus,
5007 const char *buf,
5008 size_t count)
602adf40
YS
5009{
5010 struct rbd_device *rbd_dev = NULL;
0d8189e1 5011 int target_id;
602adf40 5012 unsigned long ul;
0d8189e1 5013 int ret;
602adf40 5014
0d8189e1
AE
5015 ret = strict_strtoul(buf, 10, &ul);
5016 if (ret)
5017 return ret;
602adf40
YS
5018
5019 /* convert to int; abort if we lost anything in the conversion */
5020 target_id = (int) ul;
5021 if (target_id != ul)
5022 return -EINVAL;
5023
5024 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
5025
5026 rbd_dev = __rbd_get_dev(target_id);
5027 if (!rbd_dev) {
5028 ret = -ENOENT;
5029 goto done;
42382b70
AE
5030 }
5031
a14ea269 5032 spin_lock_irq(&rbd_dev->lock);
b82d167b 5033 if (rbd_dev->open_count)
42382b70 5034 ret = -EBUSY;
b82d167b
AE
5035 else
5036 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 5037 spin_unlock_irq(&rbd_dev->lock);
b82d167b 5038 if (ret < 0)
42382b70 5039 goto done;
0d8189e1 5040 ret = count;
b480815a 5041 rbd_bus_del_dev(rbd_dev);
8ad42cd0 5042 rbd_dev_image_release(rbd_dev);
79ab7558 5043 module_put(THIS_MODULE);
602adf40
YS
5044done:
5045 mutex_unlock(&ctl_mutex);
aafb230e 5046
602adf40
YS
5047 return ret;
5048}
5049
602adf40
YS
5050/*
5051 * create control files in sysfs
dfc5606d 5052 * /sys/bus/rbd/...
602adf40
YS
5053 */
5054static int rbd_sysfs_init(void)
5055{
dfc5606d 5056 int ret;
602adf40 5057
fed4c143 5058 ret = device_register(&rbd_root_dev);
21079786 5059 if (ret < 0)
dfc5606d 5060 return ret;
602adf40 5061
fed4c143
AE
5062 ret = bus_register(&rbd_bus_type);
5063 if (ret < 0)
5064 device_unregister(&rbd_root_dev);
602adf40 5065
602adf40
YS
5066 return ret;
5067}
5068
5069static void rbd_sysfs_cleanup(void)
5070{
dfc5606d 5071 bus_unregister(&rbd_bus_type);
fed4c143 5072 device_unregister(&rbd_root_dev);
602adf40
YS
5073}
5074
cc344fa1 5075static int __init rbd_init(void)
602adf40
YS
5076{
5077 int rc;
5078
1e32d34c
AE
5079 if (!libceph_compatible(NULL)) {
5080 rbd_warn(NULL, "libceph incompatibility (quitting)");
5081
5082 return -EINVAL;
5083 }
602adf40
YS
5084 rc = rbd_sysfs_init();
5085 if (rc)
5086 return rc;
f0f8cef5 5087 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
602adf40
YS
5088 return 0;
5089}
5090
cc344fa1 5091static void __exit rbd_exit(void)
602adf40
YS
5092{
5093 rbd_sysfs_cleanup();
5094}
5095
5096module_init(rbd_init);
5097module_exit(rbd_exit);
5098
5099MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5100MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5101MODULE_DESCRIPTION("rados block device");
5102
5103/* following authorship retained from original osdblk.c */
5104MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5105
5106MODULE_LICENSE("GPL");