]> git.proxmox.com Git - mirror_ubuntu-bionic-kernel.git/blame - drivers/block/rbd.c
rbd: zero format 1 header structure earlier
[mirror_ubuntu-bionic-kernel.git] / drivers / block / rbd.c
CommitLineData
e2a58ee5 1
602adf40
YS
2/*
3 rbd.c -- Export ceph rados objects as a Linux block device
4
5
6 based on drivers/block/osdblk.c:
7
8 Copyright 2009 Red Hat, Inc.
9
10 This program is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; see the file COPYING. If not, write to
21 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
22
23
24
dfc5606d 25 For usage instructions, please refer to:
602adf40 26
dfc5606d 27 Documentation/ABI/testing/sysfs-bus-rbd
602adf40
YS
28
29 */
30
31#include <linux/ceph/libceph.h>
32#include <linux/ceph/osd_client.h>
33#include <linux/ceph/mon_client.h>
34#include <linux/ceph/decode.h>
59c2be1e 35#include <linux/parser.h>
30d1cff8 36#include <linux/bsearch.h>
602adf40
YS
37
38#include <linux/kernel.h>
39#include <linux/device.h>
40#include <linux/module.h>
41#include <linux/fs.h>
42#include <linux/blkdev.h>
1c2a9dfe 43#include <linux/slab.h>
602adf40
YS
44
45#include "rbd_types.h"
46
aafb230e
AE
47#define RBD_DEBUG /* Activate rbd_assert() calls */
48
593a9e7b
AE
49/*
50 * The basic unit of block I/O is a sector. It is interpreted in a
51 * number of contexts in Linux (blk, bio, genhd), but the default is
52 * universally 512 bytes. These symbols are just slightly more
53 * meaningful than the bare numbers they represent.
54 */
55#define SECTOR_SHIFT 9
56#define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
57
f0f8cef5
AE
58#define RBD_DRV_NAME "rbd"
59#define RBD_DRV_NAME_LONG "rbd (rados block device)"
602adf40
YS
60
61#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
62
d4b125e9
AE
63#define RBD_SNAP_DEV_NAME_PREFIX "snap_"
64#define RBD_MAX_SNAP_NAME_LEN \
65 (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
66
35d489f9 67#define RBD_MAX_SNAP_COUNT 510 /* allows max snapc to fit in 4KB */
602adf40
YS
68
69#define RBD_SNAP_HEAD_NAME "-"
70
9682fc6d
AE
71#define BAD_SNAP_INDEX U32_MAX /* invalid index into snap array */
72
9e15b77d
AE
73/* This allows a single page to hold an image name sent by OSD */
74#define RBD_IMAGE_NAME_LEN_MAX (PAGE_SIZE - sizeof (__le32) - 1)
1e130199 75#define RBD_IMAGE_ID_LEN_MAX 64
9e15b77d 76
1e130199 77#define RBD_OBJ_PREFIX_LEN_MAX 64
589d30e0 78
d889140c
AE
79/* Feature bits */
80
5cbf6f12
AE
81#define RBD_FEATURE_LAYERING (1<<0)
82#define RBD_FEATURE_STRIPINGV2 (1<<1)
83#define RBD_FEATURES_ALL \
84 (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
d889140c
AE
85
86/* Features supported by this (client software) implementation. */
87
770eba6e 88#define RBD_FEATURES_SUPPORTED (RBD_FEATURES_ALL)
d889140c 89
81a89793
AE
90/*
91 * An RBD device name will be "rbd#", where the "rbd" comes from
92 * RBD_DRV_NAME above, and # is a unique integer identifier.
93 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
94 * enough to hold all possible device names.
95 */
602adf40 96#define DEV_NAME_LEN 32
81a89793 97#define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
602adf40
YS
98
99/*
100 * block device image metadata (in-memory version)
101 */
102struct rbd_image_header {
f35a4dee 103 /* These six fields never change for a given rbd image */
849b4260 104 char *object_prefix;
602adf40
YS
105 __u8 obj_order;
106 __u8 crypt_type;
107 __u8 comp_type;
f35a4dee
AE
108 u64 stripe_unit;
109 u64 stripe_count;
110 u64 features; /* Might be changeable someday? */
602adf40 111
f84344f3
AE
112 /* The remaining fields need to be updated occasionally */
113 u64 image_size;
114 struct ceph_snap_context *snapc;
f35a4dee
AE
115 char *snap_names; /* format 1 only */
116 u64 *snap_sizes; /* format 1 only */
59c2be1e
YS
117};
118
0d7dbfce
AE
119/*
120 * An rbd image specification.
121 *
122 * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
c66c6e0c
AE
123 * identify an image. Each rbd_dev structure includes a pointer to
124 * an rbd_spec structure that encapsulates this identity.
125 *
126 * Each of the id's in an rbd_spec has an associated name. For a
127 * user-mapped image, the names are supplied and the id's associated
128 * with them are looked up. For a layered image, a parent image is
129 * defined by the tuple, and the names are looked up.
130 *
131 * An rbd_dev structure contains a parent_spec pointer which is
132 * non-null if the image it represents is a child in a layered
133 * image. This pointer will refer to the rbd_spec structure used
134 * by the parent rbd_dev for its own identity (i.e., the structure
135 * is shared between the parent and child).
136 *
137 * Since these structures are populated once, during the discovery
138 * phase of image construction, they are effectively immutable so
139 * we make no effort to synchronize access to them.
140 *
141 * Note that code herein does not assume the image name is known (it
142 * could be a null pointer).
0d7dbfce
AE
143 */
144struct rbd_spec {
145 u64 pool_id;
ecb4dc22 146 const char *pool_name;
0d7dbfce 147
ecb4dc22
AE
148 const char *image_id;
149 const char *image_name;
0d7dbfce
AE
150
151 u64 snap_id;
ecb4dc22 152 const char *snap_name;
0d7dbfce
AE
153
154 struct kref kref;
155};
156
602adf40 157/*
f0f8cef5 158 * an instance of the client. multiple devices may share an rbd client.
602adf40
YS
159 */
160struct rbd_client {
161 struct ceph_client *client;
162 struct kref kref;
163 struct list_head node;
164};
165
bf0d5f50
AE
166struct rbd_img_request;
167typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
168
169#define BAD_WHICH U32_MAX /* Good which or bad which, which? */
170
171struct rbd_obj_request;
172typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
173
9969ebc5
AE
174enum obj_request_type {
175 OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
176};
bf0d5f50 177
926f9b3f
AE
178enum obj_req_flags {
179 OBJ_REQ_DONE, /* completion flag: not done = 0, done = 1 */
6365d33a 180 OBJ_REQ_IMG_DATA, /* object usage: standalone = 0, image = 1 */
5679c59f
AE
181 OBJ_REQ_KNOWN, /* EXISTS flag valid: no = 0, yes = 1 */
182 OBJ_REQ_EXISTS, /* target exists: no = 0, yes = 1 */
926f9b3f
AE
183};
184
bf0d5f50
AE
185struct rbd_obj_request {
186 const char *object_name;
187 u64 offset; /* object start byte */
188 u64 length; /* bytes from offset */
926f9b3f 189 unsigned long flags;
bf0d5f50 190
c5b5ef6c
AE
191 /*
192 * An object request associated with an image will have its
193 * img_data flag set; a standalone object request will not.
194 *
195 * A standalone object request will have which == BAD_WHICH
196 * and a null obj_request pointer.
197 *
198 * An object request initiated in support of a layered image
199 * object (to check for its existence before a write) will
200 * have which == BAD_WHICH and a non-null obj_request pointer.
201 *
202 * Finally, an object request for rbd image data will have
203 * which != BAD_WHICH, and will have a non-null img_request
204 * pointer. The value of which will be in the range
205 * 0..(img_request->obj_request_count-1).
206 */
207 union {
208 struct rbd_obj_request *obj_request; /* STAT op */
209 struct {
210 struct rbd_img_request *img_request;
211 u64 img_offset;
212 /* links for img_request->obj_requests list */
213 struct list_head links;
214 };
215 };
bf0d5f50
AE
216 u32 which; /* posn image request list */
217
218 enum obj_request_type type;
788e2df3
AE
219 union {
220 struct bio *bio_list;
221 struct {
222 struct page **pages;
223 u32 page_count;
224 };
225 };
0eefd470 226 struct page **copyup_pages;
bf0d5f50
AE
227
228 struct ceph_osd_request *osd_req;
229
230 u64 xferred; /* bytes transferred */
1b83bef2 231 int result;
bf0d5f50
AE
232
233 rbd_obj_callback_t callback;
788e2df3 234 struct completion completion;
bf0d5f50
AE
235
236 struct kref kref;
237};
238
0c425248 239enum img_req_flags {
9849e986
AE
240 IMG_REQ_WRITE, /* I/O direction: read = 0, write = 1 */
241 IMG_REQ_CHILD, /* initiator: block = 0, child image = 1 */
d0b2e944 242 IMG_REQ_LAYERED, /* ENOENT handling: normal = 0, layered = 1 */
0c425248
AE
243};
244
bf0d5f50 245struct rbd_img_request {
bf0d5f50
AE
246 struct rbd_device *rbd_dev;
247 u64 offset; /* starting image byte offset */
248 u64 length; /* byte count from offset */
0c425248 249 unsigned long flags;
bf0d5f50 250 union {
9849e986 251 u64 snap_id; /* for reads */
bf0d5f50 252 struct ceph_snap_context *snapc; /* for writes */
9849e986
AE
253 };
254 union {
255 struct request *rq; /* block request */
256 struct rbd_obj_request *obj_request; /* obj req initiator */
bf0d5f50 257 };
3d7efd18 258 struct page **copyup_pages;
bf0d5f50
AE
259 spinlock_t completion_lock;/* protects next_completion */
260 u32 next_completion;
261 rbd_img_callback_t callback;
55f27e09 262 u64 xferred;/* aggregate bytes transferred */
a5a337d4 263 int result; /* first nonzero obj_request result */
bf0d5f50
AE
264
265 u32 obj_request_count;
266 struct list_head obj_requests; /* rbd_obj_request structs */
267
268 struct kref kref;
269};
270
271#define for_each_obj_request(ireq, oreq) \
ef06f4d3 272 list_for_each_entry(oreq, &(ireq)->obj_requests, links)
bf0d5f50 273#define for_each_obj_request_from(ireq, oreq) \
ef06f4d3 274 list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
bf0d5f50 275#define for_each_obj_request_safe(ireq, oreq, n) \
ef06f4d3 276 list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
bf0d5f50 277
f84344f3 278struct rbd_mapping {
99c1f08f 279 u64 size;
34b13184 280 u64 features;
f84344f3
AE
281 bool read_only;
282};
283
602adf40
YS
284/*
285 * a single device
286 */
287struct rbd_device {
de71a297 288 int dev_id; /* blkdev unique id */
602adf40
YS
289
290 int major; /* blkdev assigned major */
291 struct gendisk *disk; /* blkdev's gendisk and rq */
602adf40 292
a30b71b9 293 u32 image_format; /* Either 1 or 2 */
602adf40
YS
294 struct rbd_client *rbd_client;
295
296 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
297
b82d167b 298 spinlock_t lock; /* queue, flags, open_count */
602adf40
YS
299
300 struct rbd_image_header header;
b82d167b 301 unsigned long flags; /* possibly lock protected */
0d7dbfce 302 struct rbd_spec *spec;
602adf40 303
0d7dbfce 304 char *header_name;
971f839a 305
0903e875
AE
306 struct ceph_file_layout layout;
307
59c2be1e 308 struct ceph_osd_event *watch_event;
975241af 309 struct rbd_obj_request *watch_request;
59c2be1e 310
86b00e0d
AE
311 struct rbd_spec *parent_spec;
312 u64 parent_overlap;
2f82ee54 313 struct rbd_device *parent;
86b00e0d 314
c666601a
JD
315 /* protects updating the header */
316 struct rw_semaphore header_rwsem;
f84344f3
AE
317
318 struct rbd_mapping mapping;
602adf40
YS
319
320 struct list_head node;
dfc5606d 321
dfc5606d
YS
322 /* sysfs related */
323 struct device dev;
b82d167b 324 unsigned long open_count; /* protected by lock */
dfc5606d
YS
325};
326
b82d167b
AE
327/*
328 * Flag bits for rbd_dev->flags. If atomicity is required,
329 * rbd_dev->lock is used to protect access.
330 *
331 * Currently, only the "removing" flag (which is coupled with the
332 * "open_count" field) requires atomic access.
333 */
6d292906
AE
334enum rbd_dev_flags {
335 RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
b82d167b 336 RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
6d292906
AE
337};
338
602adf40 339static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
e124a82f 340
602adf40 341static LIST_HEAD(rbd_dev_list); /* devices */
e124a82f
AE
342static DEFINE_SPINLOCK(rbd_dev_list_lock);
343
432b8587
AE
344static LIST_HEAD(rbd_client_list); /* clients */
345static DEFINE_SPINLOCK(rbd_client_list_lock);
602adf40 346
78c2a44a
AE
347/* Slab caches for frequently-allocated structures */
348
1c2a9dfe 349static struct kmem_cache *rbd_img_request_cache;
868311b1 350static struct kmem_cache *rbd_obj_request_cache;
78c2a44a 351static struct kmem_cache *rbd_segment_name_cache;
1c2a9dfe 352
3d7efd18
AE
353static int rbd_img_request_submit(struct rbd_img_request *img_request);
354
200a6a8b 355static void rbd_dev_device_release(struct device *dev);
dfc5606d 356
f0f8cef5
AE
357static ssize_t rbd_add(struct bus_type *bus, const char *buf,
358 size_t count);
359static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
360 size_t count);
51344a38 361static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only);
f0f8cef5
AE
362
363static struct bus_attribute rbd_bus_attrs[] = {
364 __ATTR(add, S_IWUSR, NULL, rbd_add),
365 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
366 __ATTR_NULL
367};
368
369static struct bus_type rbd_bus_type = {
370 .name = "rbd",
371 .bus_attrs = rbd_bus_attrs,
372};
373
374static void rbd_root_dev_release(struct device *dev)
375{
376}
377
378static struct device rbd_root_dev = {
379 .init_name = "rbd",
380 .release = rbd_root_dev_release,
381};
382
06ecc6cb
AE
383static __printf(2, 3)
384void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
385{
386 struct va_format vaf;
387 va_list args;
388
389 va_start(args, fmt);
390 vaf.fmt = fmt;
391 vaf.va = &args;
392
393 if (!rbd_dev)
394 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
395 else if (rbd_dev->disk)
396 printk(KERN_WARNING "%s: %s: %pV\n",
397 RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
398 else if (rbd_dev->spec && rbd_dev->spec->image_name)
399 printk(KERN_WARNING "%s: image %s: %pV\n",
400 RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
401 else if (rbd_dev->spec && rbd_dev->spec->image_id)
402 printk(KERN_WARNING "%s: id %s: %pV\n",
403 RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
404 else /* punt */
405 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
406 RBD_DRV_NAME, rbd_dev, &vaf);
407 va_end(args);
408}
409
aafb230e
AE
410#ifdef RBD_DEBUG
411#define rbd_assert(expr) \
412 if (unlikely(!(expr))) { \
413 printk(KERN_ERR "\nAssertion failure in %s() " \
414 "at line %d:\n\n" \
415 "\trbd_assert(%s);\n\n", \
416 __func__, __LINE__, #expr); \
417 BUG(); \
418 }
419#else /* !RBD_DEBUG */
420# define rbd_assert(expr) ((void) 0)
421#endif /* !RBD_DEBUG */
dfc5606d 422
b454e36d 423static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
05a46afd
AE
424static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
425static void rbd_dev_remove_parent(struct rbd_device *rbd_dev);
8b3e1a56 426
cc4a38bd
AE
427static int rbd_dev_refresh(struct rbd_device *rbd_dev);
428static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev);
54cac61f
AE
429static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
430 u64 snap_id);
2ad3d716
AE
431static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
432 u8 *order, u64 *snap_size);
433static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
434 u64 *snap_features);
435static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name);
59c2be1e 436
602adf40
YS
437static int rbd_open(struct block_device *bdev, fmode_t mode)
438{
f0f8cef5 439 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
b82d167b 440 bool removing = false;
602adf40 441
f84344f3 442 if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
602adf40
YS
443 return -EROFS;
444
a14ea269 445 spin_lock_irq(&rbd_dev->lock);
b82d167b
AE
446 if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
447 removing = true;
448 else
449 rbd_dev->open_count++;
a14ea269 450 spin_unlock_irq(&rbd_dev->lock);
b82d167b
AE
451 if (removing)
452 return -ENOENT;
453
42382b70 454 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 455 (void) get_device(&rbd_dev->dev);
f84344f3 456 set_device_ro(bdev, rbd_dev->mapping.read_only);
42382b70 457 mutex_unlock(&ctl_mutex);
340c7a2b 458
602adf40
YS
459 return 0;
460}
461
dfc5606d
YS
462static int rbd_release(struct gendisk *disk, fmode_t mode)
463{
464 struct rbd_device *rbd_dev = disk->private_data;
b82d167b
AE
465 unsigned long open_count_before;
466
a14ea269 467 spin_lock_irq(&rbd_dev->lock);
b82d167b 468 open_count_before = rbd_dev->open_count--;
a14ea269 469 spin_unlock_irq(&rbd_dev->lock);
b82d167b 470 rbd_assert(open_count_before > 0);
dfc5606d 471
42382b70 472 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
c3e946ce 473 put_device(&rbd_dev->dev);
42382b70 474 mutex_unlock(&ctl_mutex);
dfc5606d
YS
475
476 return 0;
477}
478
602adf40
YS
479static const struct block_device_operations rbd_bd_ops = {
480 .owner = THIS_MODULE,
481 .open = rbd_open,
dfc5606d 482 .release = rbd_release,
602adf40
YS
483};
484
485/*
486 * Initialize an rbd client instance.
43ae4701 487 * We own *ceph_opts.
602adf40 488 */
f8c38929 489static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
602adf40
YS
490{
491 struct rbd_client *rbdc;
492 int ret = -ENOMEM;
493
37206ee5 494 dout("%s:\n", __func__);
602adf40
YS
495 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
496 if (!rbdc)
497 goto out_opt;
498
499 kref_init(&rbdc->kref);
500 INIT_LIST_HEAD(&rbdc->node);
501
bc534d86
AE
502 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
503
43ae4701 504 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
602adf40 505 if (IS_ERR(rbdc->client))
bc534d86 506 goto out_mutex;
43ae4701 507 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
602adf40
YS
508
509 ret = ceph_open_session(rbdc->client);
510 if (ret < 0)
511 goto out_err;
512
432b8587 513 spin_lock(&rbd_client_list_lock);
602adf40 514 list_add_tail(&rbdc->node, &rbd_client_list);
432b8587 515 spin_unlock(&rbd_client_list_lock);
602adf40 516
bc534d86 517 mutex_unlock(&ctl_mutex);
37206ee5 518 dout("%s: rbdc %p\n", __func__, rbdc);
bc534d86 519
602adf40
YS
520 return rbdc;
521
522out_err:
523 ceph_destroy_client(rbdc->client);
bc534d86
AE
524out_mutex:
525 mutex_unlock(&ctl_mutex);
602adf40
YS
526 kfree(rbdc);
527out_opt:
43ae4701
AE
528 if (ceph_opts)
529 ceph_destroy_options(ceph_opts);
37206ee5
AE
530 dout("%s: error %d\n", __func__, ret);
531
28f259b7 532 return ERR_PTR(ret);
602adf40
YS
533}
534
2f82ee54
AE
535static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
536{
537 kref_get(&rbdc->kref);
538
539 return rbdc;
540}
541
602adf40 542/*
1f7ba331
AE
543 * Find a ceph client with specific addr and configuration. If
544 * found, bump its reference count.
602adf40 545 */
1f7ba331 546static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
602adf40
YS
547{
548 struct rbd_client *client_node;
1f7ba331 549 bool found = false;
602adf40 550
43ae4701 551 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
602adf40
YS
552 return NULL;
553
1f7ba331
AE
554 spin_lock(&rbd_client_list_lock);
555 list_for_each_entry(client_node, &rbd_client_list, node) {
556 if (!ceph_compare_options(ceph_opts, client_node->client)) {
2f82ee54
AE
557 __rbd_get_client(client_node);
558
1f7ba331
AE
559 found = true;
560 break;
561 }
562 }
563 spin_unlock(&rbd_client_list_lock);
564
565 return found ? client_node : NULL;
602adf40
YS
566}
567
59c2be1e
YS
568/*
569 * mount options
570 */
571enum {
59c2be1e
YS
572 Opt_last_int,
573 /* int args above */
574 Opt_last_string,
575 /* string args above */
cc0538b6
AE
576 Opt_read_only,
577 Opt_read_write,
578 /* Boolean args above */
579 Opt_last_bool,
59c2be1e
YS
580};
581
43ae4701 582static match_table_t rbd_opts_tokens = {
59c2be1e
YS
583 /* int args above */
584 /* string args above */
be466c1c 585 {Opt_read_only, "read_only"},
cc0538b6
AE
586 {Opt_read_only, "ro"}, /* Alternate spelling */
587 {Opt_read_write, "read_write"},
588 {Opt_read_write, "rw"}, /* Alternate spelling */
589 /* Boolean args above */
59c2be1e
YS
590 {-1, NULL}
591};
592
98571b5a
AE
593struct rbd_options {
594 bool read_only;
595};
596
597#define RBD_READ_ONLY_DEFAULT false
598
59c2be1e
YS
599static int parse_rbd_opts_token(char *c, void *private)
600{
43ae4701 601 struct rbd_options *rbd_opts = private;
59c2be1e
YS
602 substring_t argstr[MAX_OPT_ARGS];
603 int token, intval, ret;
604
43ae4701 605 token = match_token(c, rbd_opts_tokens, argstr);
59c2be1e
YS
606 if (token < 0)
607 return -EINVAL;
608
609 if (token < Opt_last_int) {
610 ret = match_int(&argstr[0], &intval);
611 if (ret < 0) {
612 pr_err("bad mount option arg (not int) "
613 "at '%s'\n", c);
614 return ret;
615 }
616 dout("got int token %d val %d\n", token, intval);
617 } else if (token > Opt_last_int && token < Opt_last_string) {
618 dout("got string token %d val %s\n", token,
619 argstr[0].from);
cc0538b6
AE
620 } else if (token > Opt_last_string && token < Opt_last_bool) {
621 dout("got Boolean token %d\n", token);
59c2be1e
YS
622 } else {
623 dout("got token %d\n", token);
624 }
625
626 switch (token) {
cc0538b6
AE
627 case Opt_read_only:
628 rbd_opts->read_only = true;
629 break;
630 case Opt_read_write:
631 rbd_opts->read_only = false;
632 break;
59c2be1e 633 default:
aafb230e
AE
634 rbd_assert(false);
635 break;
59c2be1e
YS
636 }
637 return 0;
638}
639
602adf40
YS
640/*
641 * Get a ceph client with specific addr and configuration, if one does
642 * not exist create it.
643 */
9d3997fd 644static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
602adf40 645{
f8c38929 646 struct rbd_client *rbdc;
59c2be1e 647
1f7ba331 648 rbdc = rbd_client_find(ceph_opts);
9d3997fd 649 if (rbdc) /* using an existing client */
43ae4701 650 ceph_destroy_options(ceph_opts);
9d3997fd 651 else
f8c38929 652 rbdc = rbd_client_create(ceph_opts);
602adf40 653
9d3997fd 654 return rbdc;
602adf40
YS
655}
656
657/*
658 * Destroy ceph client
d23a4b3f 659 *
432b8587 660 * Caller must hold rbd_client_list_lock.
602adf40
YS
661 */
662static void rbd_client_release(struct kref *kref)
663{
664 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
665
37206ee5 666 dout("%s: rbdc %p\n", __func__, rbdc);
cd9d9f5d 667 spin_lock(&rbd_client_list_lock);
602adf40 668 list_del(&rbdc->node);
cd9d9f5d 669 spin_unlock(&rbd_client_list_lock);
602adf40
YS
670
671 ceph_destroy_client(rbdc->client);
672 kfree(rbdc);
673}
674
675/*
676 * Drop reference to ceph client node. If it's not referenced anymore, release
677 * it.
678 */
9d3997fd 679static void rbd_put_client(struct rbd_client *rbdc)
602adf40 680{
c53d5893
AE
681 if (rbdc)
682 kref_put(&rbdc->kref, rbd_client_release);
602adf40
YS
683}
684
a30b71b9
AE
685static bool rbd_image_format_valid(u32 image_format)
686{
687 return image_format == 1 || image_format == 2;
688}
689
8e94af8e
AE
690static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
691{
103a150f
AE
692 size_t size;
693 u32 snap_count;
694
695 /* The header has to start with the magic rbd header text */
696 if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
697 return false;
698
db2388b6
AE
699 /* The bio layer requires at least sector-sized I/O */
700
701 if (ondisk->options.order < SECTOR_SHIFT)
702 return false;
703
704 /* If we use u64 in a few spots we may be able to loosen this */
705
706 if (ondisk->options.order > 8 * sizeof (int) - 1)
707 return false;
708
103a150f
AE
709 /*
710 * The size of a snapshot header has to fit in a size_t, and
711 * that limits the number of snapshots.
712 */
713 snap_count = le32_to_cpu(ondisk->snap_count);
714 size = SIZE_MAX - sizeof (struct ceph_snap_context);
715 if (snap_count > size / sizeof (__le64))
716 return false;
717
718 /*
719 * Not only that, but the size of the entire the snapshot
720 * header must also be representable in a size_t.
721 */
722 size -= snap_count * sizeof (__le64);
723 if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
724 return false;
725
726 return true;
8e94af8e
AE
727}
728
602adf40
YS
729/*
730 * Create a new header structure, translate header format from the on-disk
731 * header.
732 */
733static int rbd_header_from_disk(struct rbd_image_header *header,
4156d998 734 struct rbd_image_header_ondisk *ondisk)
602adf40 735{
ccece235 736 u32 snap_count;
58c17b0e 737 size_t len;
d2bb24e5 738 size_t size;
621901d6 739 u32 i;
602adf40 740
103a150f
AE
741 snap_count = le32_to_cpu(ondisk->snap_count);
742
58c17b0e
AE
743 len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
744 header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
6a52325f 745 if (!header->object_prefix)
602adf40 746 return -ENOMEM;
58c17b0e
AE
747 memcpy(header->object_prefix, ondisk->object_prefix, len);
748 header->object_prefix[len] = '\0';
00f1f36f 749
602adf40 750 if (snap_count) {
f785cc1d
AE
751 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
752
621901d6
AE
753 /* Save a copy of the snapshot names */
754
f785cc1d
AE
755 if (snap_names_len > (u64) SIZE_MAX)
756 return -EIO;
757 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
602adf40 758 if (!header->snap_names)
6a52325f 759 goto out_err;
f785cc1d
AE
760 /*
761 * Note that rbd_dev_v1_header_read() guarantees
762 * the ondisk buffer we're working with has
763 * snap_names_len bytes beyond the end of the
764 * snapshot id array, this memcpy() is safe.
765 */
766 memcpy(header->snap_names, &ondisk->snaps[snap_count],
767 snap_names_len);
6a52325f 768
621901d6
AE
769 /* Record each snapshot's size */
770
d2bb24e5
AE
771 size = snap_count * sizeof (*header->snap_sizes);
772 header->snap_sizes = kmalloc(size, GFP_KERNEL);
602adf40 773 if (!header->snap_sizes)
6a52325f 774 goto out_err;
621901d6
AE
775 for (i = 0; i < snap_count; i++)
776 header->snap_sizes[i] =
777 le64_to_cpu(ondisk->snaps[i].image_size);
602adf40
YS
778 } else {
779 header->snap_names = NULL;
780 header->snap_sizes = NULL;
781 }
849b4260 782
34b13184 783 header->features = 0; /* No features support in v1 images */
602adf40
YS
784 header->obj_order = ondisk->options.order;
785 header->crypt_type = ondisk->options.crypt_type;
786 header->comp_type = ondisk->options.comp_type;
6a52325f 787
621901d6
AE
788 /* Allocate and fill in the snapshot context */
789
f84344f3 790 header->image_size = le64_to_cpu(ondisk->image_size);
468521c1 791
812164f8 792 header->snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
6a52325f
AE
793 if (!header->snapc)
794 goto out_err;
505cbb9b 795 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
621901d6 796 for (i = 0; i < snap_count; i++)
468521c1 797 header->snapc->snaps[i] = le64_to_cpu(ondisk->snaps[i].id);
602adf40
YS
798
799 return 0;
800
6a52325f 801out_err:
849b4260 802 kfree(header->snap_sizes);
ccece235 803 header->snap_sizes = NULL;
602adf40 804 kfree(header->snap_names);
ccece235 805 header->snap_names = NULL;
6a52325f
AE
806 kfree(header->object_prefix);
807 header->object_prefix = NULL;
ccece235 808
00f1f36f 809 return -ENOMEM;
602adf40
YS
810}
811
9682fc6d
AE
812static const char *_rbd_dev_v1_snap_name(struct rbd_device *rbd_dev, u32 which)
813{
814 const char *snap_name;
815
816 rbd_assert(which < rbd_dev->header.snapc->num_snaps);
817
818 /* Skip over names until we find the one we are looking for */
819
820 snap_name = rbd_dev->header.snap_names;
821 while (which--)
822 snap_name += strlen(snap_name) + 1;
823
824 return kstrdup(snap_name, GFP_KERNEL);
825}
826
30d1cff8
AE
827/*
828 * Snapshot id comparison function for use with qsort()/bsearch().
829 * Note that result is for snapshots in *descending* order.
830 */
831static int snapid_compare_reverse(const void *s1, const void *s2)
832{
833 u64 snap_id1 = *(u64 *)s1;
834 u64 snap_id2 = *(u64 *)s2;
835
836 if (snap_id1 < snap_id2)
837 return 1;
838 return snap_id1 == snap_id2 ? 0 : -1;
839}
840
841/*
842 * Search a snapshot context to see if the given snapshot id is
843 * present.
844 *
845 * Returns the position of the snapshot id in the array if it's found,
846 * or BAD_SNAP_INDEX otherwise.
847 *
848 * Note: The snapshot array is in kept sorted (by the osd) in
849 * reverse order, highest snapshot id first.
850 */
9682fc6d
AE
851static u32 rbd_dev_snap_index(struct rbd_device *rbd_dev, u64 snap_id)
852{
853 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
30d1cff8 854 u64 *found;
9682fc6d 855
30d1cff8
AE
856 found = bsearch(&snap_id, &snapc->snaps, snapc->num_snaps,
857 sizeof (snap_id), snapid_compare_reverse);
9682fc6d 858
30d1cff8 859 return found ? (u32)(found - &snapc->snaps[0]) : BAD_SNAP_INDEX;
9682fc6d
AE
860}
861
2ad3d716
AE
862static const char *rbd_dev_v1_snap_name(struct rbd_device *rbd_dev,
863 u64 snap_id)
9e15b77d 864{
54cac61f 865 u32 which;
9e15b77d 866
54cac61f
AE
867 which = rbd_dev_snap_index(rbd_dev, snap_id);
868 if (which == BAD_SNAP_INDEX)
869 return NULL;
870
871 return _rbd_dev_v1_snap_name(rbd_dev, which);
872}
873
874static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
875{
9e15b77d
AE
876 if (snap_id == CEPH_NOSNAP)
877 return RBD_SNAP_HEAD_NAME;
878
54cac61f
AE
879 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
880 if (rbd_dev->image_format == 1)
881 return rbd_dev_v1_snap_name(rbd_dev, snap_id);
9e15b77d 882
54cac61f 883 return rbd_dev_v2_snap_name(rbd_dev, snap_id);
9e15b77d
AE
884}
885
2ad3d716
AE
886static int rbd_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
887 u64 *snap_size)
602adf40 888{
2ad3d716
AE
889 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
890 if (snap_id == CEPH_NOSNAP) {
891 *snap_size = rbd_dev->header.image_size;
892 } else if (rbd_dev->image_format == 1) {
893 u32 which;
602adf40 894
2ad3d716
AE
895 which = rbd_dev_snap_index(rbd_dev, snap_id);
896 if (which == BAD_SNAP_INDEX)
897 return -ENOENT;
e86924a8 898
2ad3d716
AE
899 *snap_size = rbd_dev->header.snap_sizes[which];
900 } else {
901 u64 size = 0;
902 int ret;
903
904 ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, NULL, &size);
905 if (ret)
906 return ret;
907
908 *snap_size = size;
909 }
910 return 0;
602adf40
YS
911}
912
2ad3d716
AE
913static int rbd_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
914 u64 *snap_features)
602adf40 915{
2ad3d716
AE
916 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
917 if (snap_id == CEPH_NOSNAP) {
918 *snap_features = rbd_dev->header.features;
919 } else if (rbd_dev->image_format == 1) {
920 *snap_features = 0; /* No features for format 1 */
602adf40 921 } else {
2ad3d716
AE
922 u64 features = 0;
923 int ret;
8b0241f8 924
2ad3d716
AE
925 ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, &features);
926 if (ret)
927 return ret;
928
929 *snap_features = features;
930 }
931 return 0;
932}
933
934static int rbd_dev_mapping_set(struct rbd_device *rbd_dev)
935{
8f4b7d98 936 u64 snap_id = rbd_dev->spec->snap_id;
2ad3d716
AE
937 u64 size = 0;
938 u64 features = 0;
939 int ret;
940
2ad3d716
AE
941 ret = rbd_snap_size(rbd_dev, snap_id, &size);
942 if (ret)
943 return ret;
944 ret = rbd_snap_features(rbd_dev, snap_id, &features);
945 if (ret)
946 return ret;
947
948 rbd_dev->mapping.size = size;
949 rbd_dev->mapping.features = features;
950
8b0241f8 951 return 0;
602adf40
YS
952}
953
d1cf5788
AE
954static void rbd_dev_mapping_clear(struct rbd_device *rbd_dev)
955{
956 rbd_dev->mapping.size = 0;
957 rbd_dev->mapping.features = 0;
d1cf5788
AE
958}
959
98571b5a 960static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
602adf40 961{
65ccfe21
AE
962 char *name;
963 u64 segment;
964 int ret;
602adf40 965
78c2a44a 966 name = kmem_cache_alloc(rbd_segment_name_cache, GFP_NOIO);
65ccfe21
AE
967 if (!name)
968 return NULL;
969 segment = offset >> rbd_dev->header.obj_order;
2fd82b9e 970 ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
65ccfe21 971 rbd_dev->header.object_prefix, segment);
2fd82b9e 972 if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
65ccfe21
AE
973 pr_err("error formatting segment name for #%llu (%d)\n",
974 segment, ret);
975 kfree(name);
976 name = NULL;
977 }
602adf40 978
65ccfe21
AE
979 return name;
980}
602adf40 981
78c2a44a
AE
982static void rbd_segment_name_free(const char *name)
983{
984 /* The explicit cast here is needed to drop the const qualifier */
985
986 kmem_cache_free(rbd_segment_name_cache, (void *)name);
987}
988
65ccfe21
AE
989static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
990{
991 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
602adf40 992
65ccfe21
AE
993 return offset & (segment_size - 1);
994}
995
996static u64 rbd_segment_length(struct rbd_device *rbd_dev,
997 u64 offset, u64 length)
998{
999 u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
1000
1001 offset &= segment_size - 1;
1002
aafb230e 1003 rbd_assert(length <= U64_MAX - offset);
65ccfe21
AE
1004 if (offset + length > segment_size)
1005 length = segment_size - offset;
1006
1007 return length;
602adf40
YS
1008}
1009
029bcbd8
JD
1010/*
1011 * returns the size of an object in the image
1012 */
1013static u64 rbd_obj_bytes(struct rbd_image_header *header)
1014{
1015 return 1 << header->obj_order;
1016}
1017
602adf40
YS
1018/*
1019 * bio helpers
1020 */
1021
1022static void bio_chain_put(struct bio *chain)
1023{
1024 struct bio *tmp;
1025
1026 while (chain) {
1027 tmp = chain;
1028 chain = chain->bi_next;
1029 bio_put(tmp);
1030 }
1031}
1032
1033/*
1034 * zeros a bio chain, starting at specific offset
1035 */
1036static void zero_bio_chain(struct bio *chain, int start_ofs)
1037{
1038 struct bio_vec *bv;
1039 unsigned long flags;
1040 void *buf;
1041 int i;
1042 int pos = 0;
1043
1044 while (chain) {
1045 bio_for_each_segment(bv, chain, i) {
1046 if (pos + bv->bv_len > start_ofs) {
1047 int remainder = max(start_ofs - pos, 0);
1048 buf = bvec_kmap_irq(bv, &flags);
1049 memset(buf + remainder, 0,
1050 bv->bv_len - remainder);
85b5aaa6 1051 bvec_kunmap_irq(buf, &flags);
602adf40
YS
1052 }
1053 pos += bv->bv_len;
1054 }
1055
1056 chain = chain->bi_next;
1057 }
1058}
1059
b9434c5b
AE
1060/*
1061 * similar to zero_bio_chain(), zeros data defined by a page array,
1062 * starting at the given byte offset from the start of the array and
1063 * continuing up to the given end offset. The pages array is
1064 * assumed to be big enough to hold all bytes up to the end.
1065 */
1066static void zero_pages(struct page **pages, u64 offset, u64 end)
1067{
1068 struct page **page = &pages[offset >> PAGE_SHIFT];
1069
1070 rbd_assert(end > offset);
1071 rbd_assert(end - offset <= (u64)SIZE_MAX);
1072 while (offset < end) {
1073 size_t page_offset;
1074 size_t length;
1075 unsigned long flags;
1076 void *kaddr;
1077
1078 page_offset = (size_t)(offset & ~PAGE_MASK);
1079 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
1080 local_irq_save(flags);
1081 kaddr = kmap_atomic(*page);
1082 memset(kaddr + page_offset, 0, length);
1083 kunmap_atomic(kaddr);
1084 local_irq_restore(flags);
1085
1086 offset += length;
1087 page++;
1088 }
1089}
1090
602adf40 1091/*
f7760dad
AE
1092 * Clone a portion of a bio, starting at the given byte offset
1093 * and continuing for the number of bytes indicated.
602adf40 1094 */
f7760dad
AE
1095static struct bio *bio_clone_range(struct bio *bio_src,
1096 unsigned int offset,
1097 unsigned int len,
1098 gfp_t gfpmask)
602adf40 1099{
f7760dad
AE
1100 struct bio_vec *bv;
1101 unsigned int resid;
1102 unsigned short idx;
1103 unsigned int voff;
1104 unsigned short end_idx;
1105 unsigned short vcnt;
1106 struct bio *bio;
1107
1108 /* Handle the easy case for the caller */
1109
1110 if (!offset && len == bio_src->bi_size)
1111 return bio_clone(bio_src, gfpmask);
1112
1113 if (WARN_ON_ONCE(!len))
1114 return NULL;
1115 if (WARN_ON_ONCE(len > bio_src->bi_size))
1116 return NULL;
1117 if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1118 return NULL;
1119
1120 /* Find first affected segment... */
1121
1122 resid = offset;
1123 __bio_for_each_segment(bv, bio_src, idx, 0) {
1124 if (resid < bv->bv_len)
1125 break;
1126 resid -= bv->bv_len;
602adf40 1127 }
f7760dad 1128 voff = resid;
602adf40 1129
f7760dad 1130 /* ...and the last affected segment */
602adf40 1131
f7760dad
AE
1132 resid += len;
1133 __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1134 if (resid <= bv->bv_len)
1135 break;
1136 resid -= bv->bv_len;
1137 }
1138 vcnt = end_idx - idx + 1;
1139
1140 /* Build the clone */
1141
1142 bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1143 if (!bio)
1144 return NULL; /* ENOMEM */
602adf40 1145
f7760dad
AE
1146 bio->bi_bdev = bio_src->bi_bdev;
1147 bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1148 bio->bi_rw = bio_src->bi_rw;
1149 bio->bi_flags |= 1 << BIO_CLONED;
1150
1151 /*
1152 * Copy over our part of the bio_vec, then update the first
1153 * and last (or only) entries.
1154 */
1155 memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1156 vcnt * sizeof (struct bio_vec));
1157 bio->bi_io_vec[0].bv_offset += voff;
1158 if (vcnt > 1) {
1159 bio->bi_io_vec[0].bv_len -= voff;
1160 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1161 } else {
1162 bio->bi_io_vec[0].bv_len = len;
602adf40
YS
1163 }
1164
f7760dad
AE
1165 bio->bi_vcnt = vcnt;
1166 bio->bi_size = len;
1167 bio->bi_idx = 0;
1168
1169 return bio;
1170}
1171
1172/*
1173 * Clone a portion of a bio chain, starting at the given byte offset
1174 * into the first bio in the source chain and continuing for the
1175 * number of bytes indicated. The result is another bio chain of
1176 * exactly the given length, or a null pointer on error.
1177 *
1178 * The bio_src and offset parameters are both in-out. On entry they
1179 * refer to the first source bio and the offset into that bio where
1180 * the start of data to be cloned is located.
1181 *
1182 * On return, bio_src is updated to refer to the bio in the source
1183 * chain that contains first un-cloned byte, and *offset will
1184 * contain the offset of that byte within that bio.
1185 */
1186static struct bio *bio_chain_clone_range(struct bio **bio_src,
1187 unsigned int *offset,
1188 unsigned int len,
1189 gfp_t gfpmask)
1190{
1191 struct bio *bi = *bio_src;
1192 unsigned int off = *offset;
1193 struct bio *chain = NULL;
1194 struct bio **end;
1195
1196 /* Build up a chain of clone bios up to the limit */
1197
1198 if (!bi || off >= bi->bi_size || !len)
1199 return NULL; /* Nothing to clone */
602adf40 1200
f7760dad
AE
1201 end = &chain;
1202 while (len) {
1203 unsigned int bi_size;
1204 struct bio *bio;
1205
f5400b7a
AE
1206 if (!bi) {
1207 rbd_warn(NULL, "bio_chain exhausted with %u left", len);
f7760dad 1208 goto out_err; /* EINVAL; ran out of bio's */
f5400b7a 1209 }
f7760dad
AE
1210 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1211 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1212 if (!bio)
1213 goto out_err; /* ENOMEM */
1214
1215 *end = bio;
1216 end = &bio->bi_next;
602adf40 1217
f7760dad
AE
1218 off += bi_size;
1219 if (off == bi->bi_size) {
1220 bi = bi->bi_next;
1221 off = 0;
1222 }
1223 len -= bi_size;
1224 }
1225 *bio_src = bi;
1226 *offset = off;
1227
1228 return chain;
1229out_err:
1230 bio_chain_put(chain);
602adf40 1231
602adf40
YS
1232 return NULL;
1233}
1234
926f9b3f
AE
1235/*
1236 * The default/initial value for all object request flags is 0. For
1237 * each flag, once its value is set to 1 it is never reset to 0
1238 * again.
1239 */
57acbaa7 1240static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
926f9b3f 1241{
57acbaa7 1242 if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
926f9b3f
AE
1243 struct rbd_device *rbd_dev;
1244
57acbaa7
AE
1245 rbd_dev = obj_request->img_request->rbd_dev;
1246 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
926f9b3f
AE
1247 obj_request);
1248 }
1249}
1250
57acbaa7 1251static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
926f9b3f
AE
1252{
1253 smp_mb();
57acbaa7 1254 return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
926f9b3f
AE
1255}
1256
57acbaa7 1257static void obj_request_done_set(struct rbd_obj_request *obj_request)
6365d33a 1258{
57acbaa7
AE
1259 if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1260 struct rbd_device *rbd_dev = NULL;
6365d33a 1261
57acbaa7
AE
1262 if (obj_request_img_data_test(obj_request))
1263 rbd_dev = obj_request->img_request->rbd_dev;
1264 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
6365d33a
AE
1265 obj_request);
1266 }
1267}
1268
57acbaa7 1269static bool obj_request_done_test(struct rbd_obj_request *obj_request)
6365d33a
AE
1270{
1271 smp_mb();
57acbaa7 1272 return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
6365d33a
AE
1273}
1274
5679c59f
AE
1275/*
1276 * This sets the KNOWN flag after (possibly) setting the EXISTS
1277 * flag. The latter is set based on the "exists" value provided.
1278 *
1279 * Note that for our purposes once an object exists it never goes
1280 * away again. It's possible that the response from two existence
1281 * checks are separated by the creation of the target object, and
1282 * the first ("doesn't exist") response arrives *after* the second
1283 * ("does exist"). In that case we ignore the second one.
1284 */
1285static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1286 bool exists)
1287{
1288 if (exists)
1289 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1290 set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1291 smp_mb();
1292}
1293
1294static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1295{
1296 smp_mb();
1297 return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1298}
1299
1300static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1301{
1302 smp_mb();
1303 return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1304}
1305
bf0d5f50
AE
1306static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1307{
37206ee5
AE
1308 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1309 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1310 kref_get(&obj_request->kref);
1311}
1312
1313static void rbd_obj_request_destroy(struct kref *kref);
1314static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1315{
1316 rbd_assert(obj_request != NULL);
37206ee5
AE
1317 dout("%s: obj %p (was %d)\n", __func__, obj_request,
1318 atomic_read(&obj_request->kref.refcount));
bf0d5f50
AE
1319 kref_put(&obj_request->kref, rbd_obj_request_destroy);
1320}
1321
1322static void rbd_img_request_get(struct rbd_img_request *img_request)
1323{
37206ee5
AE
1324 dout("%s: img %p (was %d)\n", __func__, img_request,
1325 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1326 kref_get(&img_request->kref);
1327}
1328
1329static void rbd_img_request_destroy(struct kref *kref);
1330static void rbd_img_request_put(struct rbd_img_request *img_request)
1331{
1332 rbd_assert(img_request != NULL);
37206ee5
AE
1333 dout("%s: img %p (was %d)\n", __func__, img_request,
1334 atomic_read(&img_request->kref.refcount));
bf0d5f50
AE
1335 kref_put(&img_request->kref, rbd_img_request_destroy);
1336}
1337
1338static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1339 struct rbd_obj_request *obj_request)
1340{
25dcf954
AE
1341 rbd_assert(obj_request->img_request == NULL);
1342
b155e86c 1343 /* Image request now owns object's original reference */
bf0d5f50 1344 obj_request->img_request = img_request;
25dcf954 1345 obj_request->which = img_request->obj_request_count;
6365d33a
AE
1346 rbd_assert(!obj_request_img_data_test(obj_request));
1347 obj_request_img_data_set(obj_request);
bf0d5f50 1348 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954
AE
1349 img_request->obj_request_count++;
1350 list_add_tail(&obj_request->links, &img_request->obj_requests);
37206ee5
AE
1351 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1352 obj_request->which);
bf0d5f50
AE
1353}
1354
1355static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1356 struct rbd_obj_request *obj_request)
1357{
1358 rbd_assert(obj_request->which != BAD_WHICH);
25dcf954 1359
37206ee5
AE
1360 dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1361 obj_request->which);
bf0d5f50 1362 list_del(&obj_request->links);
25dcf954
AE
1363 rbd_assert(img_request->obj_request_count > 0);
1364 img_request->obj_request_count--;
1365 rbd_assert(obj_request->which == img_request->obj_request_count);
1366 obj_request->which = BAD_WHICH;
6365d33a 1367 rbd_assert(obj_request_img_data_test(obj_request));
bf0d5f50 1368 rbd_assert(obj_request->img_request == img_request);
bf0d5f50 1369 obj_request->img_request = NULL;
25dcf954 1370 obj_request->callback = NULL;
bf0d5f50
AE
1371 rbd_obj_request_put(obj_request);
1372}
1373
1374static bool obj_request_type_valid(enum obj_request_type type)
1375{
1376 switch (type) {
9969ebc5 1377 case OBJ_REQUEST_NODATA:
bf0d5f50 1378 case OBJ_REQUEST_BIO:
788e2df3 1379 case OBJ_REQUEST_PAGES:
bf0d5f50
AE
1380 return true;
1381 default:
1382 return false;
1383 }
1384}
1385
bf0d5f50
AE
1386static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1387 struct rbd_obj_request *obj_request)
1388{
37206ee5
AE
1389 dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1390
bf0d5f50
AE
1391 return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1392}
1393
1394static void rbd_img_request_complete(struct rbd_img_request *img_request)
1395{
55f27e09 1396
37206ee5 1397 dout("%s: img %p\n", __func__, img_request);
55f27e09
AE
1398
1399 /*
1400 * If no error occurred, compute the aggregate transfer
1401 * count for the image request. We could instead use
1402 * atomic64_cmpxchg() to update it as each object request
1403 * completes; not clear which way is better off hand.
1404 */
1405 if (!img_request->result) {
1406 struct rbd_obj_request *obj_request;
1407 u64 xferred = 0;
1408
1409 for_each_obj_request(img_request, obj_request)
1410 xferred += obj_request->xferred;
1411 img_request->xferred = xferred;
1412 }
1413
bf0d5f50
AE
1414 if (img_request->callback)
1415 img_request->callback(img_request);
1416 else
1417 rbd_img_request_put(img_request);
1418}
1419
788e2df3
AE
1420/* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1421
1422static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1423{
37206ee5
AE
1424 dout("%s: obj %p\n", __func__, obj_request);
1425
788e2df3
AE
1426 return wait_for_completion_interruptible(&obj_request->completion);
1427}
1428
0c425248
AE
1429/*
1430 * The default/initial value for all image request flags is 0. Each
1431 * is conditionally set to 1 at image request initialization time
1432 * and currently never change thereafter.
1433 */
1434static void img_request_write_set(struct rbd_img_request *img_request)
1435{
1436 set_bit(IMG_REQ_WRITE, &img_request->flags);
1437 smp_mb();
1438}
1439
1440static bool img_request_write_test(struct rbd_img_request *img_request)
1441{
1442 smp_mb();
1443 return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1444}
1445
9849e986
AE
1446static void img_request_child_set(struct rbd_img_request *img_request)
1447{
1448 set_bit(IMG_REQ_CHILD, &img_request->flags);
1449 smp_mb();
1450}
1451
1452static bool img_request_child_test(struct rbd_img_request *img_request)
1453{
1454 smp_mb();
1455 return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1456}
1457
d0b2e944
AE
1458static void img_request_layered_set(struct rbd_img_request *img_request)
1459{
1460 set_bit(IMG_REQ_LAYERED, &img_request->flags);
1461 smp_mb();
1462}
1463
1464static bool img_request_layered_test(struct rbd_img_request *img_request)
1465{
1466 smp_mb();
1467 return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1468}
1469
6e2a4505
AE
1470static void
1471rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1472{
b9434c5b
AE
1473 u64 xferred = obj_request->xferred;
1474 u64 length = obj_request->length;
1475
6e2a4505
AE
1476 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1477 obj_request, obj_request->img_request, obj_request->result,
b9434c5b 1478 xferred, length);
6e2a4505
AE
1479 /*
1480 * ENOENT means a hole in the image. We zero-fill the
1481 * entire length of the request. A short read also implies
1482 * zero-fill to the end of the request. Either way we
1483 * update the xferred count to indicate the whole request
1484 * was satisfied.
1485 */
b9434c5b 1486 rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
6e2a4505 1487 if (obj_request->result == -ENOENT) {
b9434c5b
AE
1488 if (obj_request->type == OBJ_REQUEST_BIO)
1489 zero_bio_chain(obj_request->bio_list, 0);
1490 else
1491 zero_pages(obj_request->pages, 0, length);
6e2a4505 1492 obj_request->result = 0;
b9434c5b
AE
1493 obj_request->xferred = length;
1494 } else if (xferred < length && !obj_request->result) {
1495 if (obj_request->type == OBJ_REQUEST_BIO)
1496 zero_bio_chain(obj_request->bio_list, xferred);
1497 else
1498 zero_pages(obj_request->pages, xferred, length);
1499 obj_request->xferred = length;
6e2a4505
AE
1500 }
1501 obj_request_done_set(obj_request);
1502}
1503
bf0d5f50
AE
1504static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1505{
37206ee5
AE
1506 dout("%s: obj %p cb %p\n", __func__, obj_request,
1507 obj_request->callback);
bf0d5f50
AE
1508 if (obj_request->callback)
1509 obj_request->callback(obj_request);
788e2df3
AE
1510 else
1511 complete_all(&obj_request->completion);
bf0d5f50
AE
1512}
1513
c47f9371 1514static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
39bf2c5d
AE
1515{
1516 dout("%s: obj %p\n", __func__, obj_request);
1517 obj_request_done_set(obj_request);
1518}
1519
c47f9371 1520static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1521{
57acbaa7 1522 struct rbd_img_request *img_request = NULL;
a9e8ba2c 1523 struct rbd_device *rbd_dev = NULL;
57acbaa7
AE
1524 bool layered = false;
1525
1526 if (obj_request_img_data_test(obj_request)) {
1527 img_request = obj_request->img_request;
1528 layered = img_request && img_request_layered_test(img_request);
a9e8ba2c 1529 rbd_dev = img_request->rbd_dev;
57acbaa7 1530 }
8b3e1a56
AE
1531
1532 dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1533 obj_request, img_request, obj_request->result,
1534 obj_request->xferred, obj_request->length);
a9e8ba2c
AE
1535 if (layered && obj_request->result == -ENOENT &&
1536 obj_request->img_offset < rbd_dev->parent_overlap)
8b3e1a56
AE
1537 rbd_img_parent_read(obj_request);
1538 else if (img_request)
6e2a4505
AE
1539 rbd_img_obj_request_read_callback(obj_request);
1540 else
1541 obj_request_done_set(obj_request);
bf0d5f50
AE
1542}
1543
c47f9371 1544static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
bf0d5f50 1545{
1b83bef2
SW
1546 dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1547 obj_request->result, obj_request->length);
1548 /*
8b3e1a56
AE
1549 * There is no such thing as a successful short write. Set
1550 * it to our originally-requested length.
1b83bef2
SW
1551 */
1552 obj_request->xferred = obj_request->length;
07741308 1553 obj_request_done_set(obj_request);
bf0d5f50
AE
1554}
1555
fbfab539
AE
1556/*
1557 * For a simple stat call there's nothing to do. We'll do more if
1558 * this is part of a write sequence for a layered image.
1559 */
c47f9371 1560static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
fbfab539 1561{
37206ee5 1562 dout("%s: obj %p\n", __func__, obj_request);
fbfab539
AE
1563 obj_request_done_set(obj_request);
1564}
1565
bf0d5f50
AE
1566static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1567 struct ceph_msg *msg)
1568{
1569 struct rbd_obj_request *obj_request = osd_req->r_priv;
bf0d5f50
AE
1570 u16 opcode;
1571
37206ee5 1572 dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
bf0d5f50 1573 rbd_assert(osd_req == obj_request->osd_req);
57acbaa7
AE
1574 if (obj_request_img_data_test(obj_request)) {
1575 rbd_assert(obj_request->img_request);
1576 rbd_assert(obj_request->which != BAD_WHICH);
1577 } else {
1578 rbd_assert(obj_request->which == BAD_WHICH);
1579 }
bf0d5f50 1580
1b83bef2
SW
1581 if (osd_req->r_result < 0)
1582 obj_request->result = osd_req->r_result;
bf0d5f50 1583
0eefd470 1584 BUG_ON(osd_req->r_num_ops > 2);
bf0d5f50 1585
c47f9371
AE
1586 /*
1587 * We support a 64-bit length, but ultimately it has to be
1588 * passed to blk_end_request(), which takes an unsigned int.
1589 */
1b83bef2 1590 obj_request->xferred = osd_req->r_reply_op_len[0];
8b3e1a56 1591 rbd_assert(obj_request->xferred < (u64)UINT_MAX);
79528734 1592 opcode = osd_req->r_ops[0].op;
bf0d5f50
AE
1593 switch (opcode) {
1594 case CEPH_OSD_OP_READ:
c47f9371 1595 rbd_osd_read_callback(obj_request);
bf0d5f50
AE
1596 break;
1597 case CEPH_OSD_OP_WRITE:
c47f9371 1598 rbd_osd_write_callback(obj_request);
bf0d5f50 1599 break;
fbfab539 1600 case CEPH_OSD_OP_STAT:
c47f9371 1601 rbd_osd_stat_callback(obj_request);
fbfab539 1602 break;
36be9a76 1603 case CEPH_OSD_OP_CALL:
b8d70035 1604 case CEPH_OSD_OP_NOTIFY_ACK:
9969ebc5 1605 case CEPH_OSD_OP_WATCH:
c47f9371 1606 rbd_osd_trivial_callback(obj_request);
9969ebc5 1607 break;
bf0d5f50
AE
1608 default:
1609 rbd_warn(NULL, "%s: unsupported op %hu\n",
1610 obj_request->object_name, (unsigned short) opcode);
1611 break;
1612 }
1613
07741308 1614 if (obj_request_done_test(obj_request))
bf0d5f50
AE
1615 rbd_obj_request_complete(obj_request);
1616}
1617
9d4df01f 1618static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
430c28c3
AE
1619{
1620 struct rbd_img_request *img_request = obj_request->img_request;
8c042b0d 1621 struct ceph_osd_request *osd_req = obj_request->osd_req;
9d4df01f 1622 u64 snap_id;
430c28c3 1623
8c042b0d 1624 rbd_assert(osd_req != NULL);
430c28c3 1625
9d4df01f 1626 snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
8c042b0d 1627 ceph_osdc_build_request(osd_req, obj_request->offset,
9d4df01f
AE
1628 NULL, snap_id, NULL);
1629}
1630
1631static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1632{
1633 struct rbd_img_request *img_request = obj_request->img_request;
1634 struct ceph_osd_request *osd_req = obj_request->osd_req;
1635 struct ceph_snap_context *snapc;
1636 struct timespec mtime = CURRENT_TIME;
1637
1638 rbd_assert(osd_req != NULL);
1639
1640 snapc = img_request ? img_request->snapc : NULL;
1641 ceph_osdc_build_request(osd_req, obj_request->offset,
1642 snapc, CEPH_NOSNAP, &mtime);
430c28c3
AE
1643}
1644
bf0d5f50
AE
1645static struct ceph_osd_request *rbd_osd_req_create(
1646 struct rbd_device *rbd_dev,
1647 bool write_request,
430c28c3 1648 struct rbd_obj_request *obj_request)
bf0d5f50 1649{
bf0d5f50
AE
1650 struct ceph_snap_context *snapc = NULL;
1651 struct ceph_osd_client *osdc;
1652 struct ceph_osd_request *osd_req;
bf0d5f50 1653
6365d33a
AE
1654 if (obj_request_img_data_test(obj_request)) {
1655 struct rbd_img_request *img_request = obj_request->img_request;
1656
0c425248
AE
1657 rbd_assert(write_request ==
1658 img_request_write_test(img_request));
1659 if (write_request)
bf0d5f50 1660 snapc = img_request->snapc;
bf0d5f50
AE
1661 }
1662
1663 /* Allocate and initialize the request, for the single op */
1664
1665 osdc = &rbd_dev->rbd_client->client->osdc;
1666 osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1667 if (!osd_req)
1668 return NULL; /* ENOMEM */
bf0d5f50 1669
430c28c3 1670 if (write_request)
bf0d5f50 1671 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
430c28c3 1672 else
bf0d5f50 1673 osd_req->r_flags = CEPH_OSD_FLAG_READ;
bf0d5f50
AE
1674
1675 osd_req->r_callback = rbd_osd_req_callback;
1676 osd_req->r_priv = obj_request;
1677
1678 osd_req->r_oid_len = strlen(obj_request->object_name);
1679 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1680 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1681
1682 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1683
bf0d5f50
AE
1684 return osd_req;
1685}
1686
0eefd470
AE
1687/*
1688 * Create a copyup osd request based on the information in the
1689 * object request supplied. A copyup request has two osd ops,
1690 * a copyup method call, and a "normal" write request.
1691 */
1692static struct ceph_osd_request *
1693rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1694{
1695 struct rbd_img_request *img_request;
1696 struct ceph_snap_context *snapc;
1697 struct rbd_device *rbd_dev;
1698 struct ceph_osd_client *osdc;
1699 struct ceph_osd_request *osd_req;
1700
1701 rbd_assert(obj_request_img_data_test(obj_request));
1702 img_request = obj_request->img_request;
1703 rbd_assert(img_request);
1704 rbd_assert(img_request_write_test(img_request));
1705
1706 /* Allocate and initialize the request, for the two ops */
1707
1708 snapc = img_request->snapc;
1709 rbd_dev = img_request->rbd_dev;
1710 osdc = &rbd_dev->rbd_client->client->osdc;
1711 osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1712 if (!osd_req)
1713 return NULL; /* ENOMEM */
1714
1715 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1716 osd_req->r_callback = rbd_osd_req_callback;
1717 osd_req->r_priv = obj_request;
1718
1719 osd_req->r_oid_len = strlen(obj_request->object_name);
1720 rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1721 memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1722
1723 osd_req->r_file_layout = rbd_dev->layout; /* struct */
1724
1725 return osd_req;
1726}
1727
1728
bf0d5f50
AE
1729static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1730{
1731 ceph_osdc_put_request(osd_req);
1732}
1733
1734/* object_name is assumed to be a non-null pointer and NUL-terminated */
1735
1736static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1737 u64 offset, u64 length,
1738 enum obj_request_type type)
1739{
1740 struct rbd_obj_request *obj_request;
1741 size_t size;
1742 char *name;
1743
1744 rbd_assert(obj_request_type_valid(type));
1745
1746 size = strlen(object_name) + 1;
f907ad55
AE
1747 name = kmalloc(size, GFP_KERNEL);
1748 if (!name)
bf0d5f50
AE
1749 return NULL;
1750
868311b1 1751 obj_request = kmem_cache_zalloc(rbd_obj_request_cache, GFP_KERNEL);
f907ad55
AE
1752 if (!obj_request) {
1753 kfree(name);
1754 return NULL;
1755 }
1756
bf0d5f50
AE
1757 obj_request->object_name = memcpy(name, object_name, size);
1758 obj_request->offset = offset;
1759 obj_request->length = length;
926f9b3f 1760 obj_request->flags = 0;
bf0d5f50
AE
1761 obj_request->which = BAD_WHICH;
1762 obj_request->type = type;
1763 INIT_LIST_HEAD(&obj_request->links);
788e2df3 1764 init_completion(&obj_request->completion);
bf0d5f50
AE
1765 kref_init(&obj_request->kref);
1766
37206ee5
AE
1767 dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1768 offset, length, (int)type, obj_request);
1769
bf0d5f50
AE
1770 return obj_request;
1771}
1772
1773static void rbd_obj_request_destroy(struct kref *kref)
1774{
1775 struct rbd_obj_request *obj_request;
1776
1777 obj_request = container_of(kref, struct rbd_obj_request, kref);
1778
37206ee5
AE
1779 dout("%s: obj %p\n", __func__, obj_request);
1780
bf0d5f50
AE
1781 rbd_assert(obj_request->img_request == NULL);
1782 rbd_assert(obj_request->which == BAD_WHICH);
1783
1784 if (obj_request->osd_req)
1785 rbd_osd_req_destroy(obj_request->osd_req);
1786
1787 rbd_assert(obj_request_type_valid(obj_request->type));
1788 switch (obj_request->type) {
9969ebc5
AE
1789 case OBJ_REQUEST_NODATA:
1790 break; /* Nothing to do */
bf0d5f50
AE
1791 case OBJ_REQUEST_BIO:
1792 if (obj_request->bio_list)
1793 bio_chain_put(obj_request->bio_list);
1794 break;
788e2df3
AE
1795 case OBJ_REQUEST_PAGES:
1796 if (obj_request->pages)
1797 ceph_release_page_vector(obj_request->pages,
1798 obj_request->page_count);
1799 break;
bf0d5f50
AE
1800 }
1801
f907ad55 1802 kfree(obj_request->object_name);
868311b1
AE
1803 obj_request->object_name = NULL;
1804 kmem_cache_free(rbd_obj_request_cache, obj_request);
bf0d5f50
AE
1805}
1806
1807/*
1808 * Caller is responsible for filling in the list of object requests
1809 * that comprises the image request, and the Linux request pointer
1810 * (if there is one).
1811 */
cc344fa1
AE
1812static struct rbd_img_request *rbd_img_request_create(
1813 struct rbd_device *rbd_dev,
bf0d5f50 1814 u64 offset, u64 length,
9849e986
AE
1815 bool write_request,
1816 bool child_request)
bf0d5f50
AE
1817{
1818 struct rbd_img_request *img_request;
bf0d5f50 1819
1c2a9dfe 1820 img_request = kmem_cache_alloc(rbd_img_request_cache, GFP_ATOMIC);
bf0d5f50
AE
1821 if (!img_request)
1822 return NULL;
1823
1824 if (write_request) {
1825 down_read(&rbd_dev->header_rwsem);
812164f8 1826 ceph_get_snap_context(rbd_dev->header.snapc);
bf0d5f50 1827 up_read(&rbd_dev->header_rwsem);
bf0d5f50
AE
1828 }
1829
1830 img_request->rq = NULL;
1831 img_request->rbd_dev = rbd_dev;
1832 img_request->offset = offset;
1833 img_request->length = length;
0c425248
AE
1834 img_request->flags = 0;
1835 if (write_request) {
1836 img_request_write_set(img_request);
468521c1 1837 img_request->snapc = rbd_dev->header.snapc;
0c425248 1838 } else {
bf0d5f50 1839 img_request->snap_id = rbd_dev->spec->snap_id;
0c425248 1840 }
9849e986
AE
1841 if (child_request)
1842 img_request_child_set(img_request);
d0b2e944
AE
1843 if (rbd_dev->parent_spec)
1844 img_request_layered_set(img_request);
bf0d5f50
AE
1845 spin_lock_init(&img_request->completion_lock);
1846 img_request->next_completion = 0;
1847 img_request->callback = NULL;
a5a337d4 1848 img_request->result = 0;
bf0d5f50
AE
1849 img_request->obj_request_count = 0;
1850 INIT_LIST_HEAD(&img_request->obj_requests);
1851 kref_init(&img_request->kref);
1852
1853 rbd_img_request_get(img_request); /* Avoid a warning */
1854 rbd_img_request_put(img_request); /* TEMPORARY */
1855
37206ee5
AE
1856 dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1857 write_request ? "write" : "read", offset, length,
1858 img_request);
1859
bf0d5f50
AE
1860 return img_request;
1861}
1862
1863static void rbd_img_request_destroy(struct kref *kref)
1864{
1865 struct rbd_img_request *img_request;
1866 struct rbd_obj_request *obj_request;
1867 struct rbd_obj_request *next_obj_request;
1868
1869 img_request = container_of(kref, struct rbd_img_request, kref);
1870
37206ee5
AE
1871 dout("%s: img %p\n", __func__, img_request);
1872
bf0d5f50
AE
1873 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1874 rbd_img_obj_request_del(img_request, obj_request);
25dcf954 1875 rbd_assert(img_request->obj_request_count == 0);
bf0d5f50 1876
0c425248 1877 if (img_request_write_test(img_request))
812164f8 1878 ceph_put_snap_context(img_request->snapc);
bf0d5f50 1879
8b3e1a56
AE
1880 if (img_request_child_test(img_request))
1881 rbd_obj_request_put(img_request->obj_request);
1882
1c2a9dfe 1883 kmem_cache_free(rbd_img_request_cache, img_request);
bf0d5f50
AE
1884}
1885
1217857f
AE
1886static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1887{
6365d33a 1888 struct rbd_img_request *img_request;
1217857f
AE
1889 unsigned int xferred;
1890 int result;
8b3e1a56 1891 bool more;
1217857f 1892
6365d33a
AE
1893 rbd_assert(obj_request_img_data_test(obj_request));
1894 img_request = obj_request->img_request;
1895
1217857f
AE
1896 rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1897 xferred = (unsigned int)obj_request->xferred;
1898 result = obj_request->result;
1899 if (result) {
1900 struct rbd_device *rbd_dev = img_request->rbd_dev;
1901
1902 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1903 img_request_write_test(img_request) ? "write" : "read",
1904 obj_request->length, obj_request->img_offset,
1905 obj_request->offset);
1906 rbd_warn(rbd_dev, " result %d xferred %x\n",
1907 result, xferred);
1908 if (!img_request->result)
1909 img_request->result = result;
1910 }
1911
f1a4739f
AE
1912 /* Image object requests don't own their page array */
1913
1914 if (obj_request->type == OBJ_REQUEST_PAGES) {
1915 obj_request->pages = NULL;
1916 obj_request->page_count = 0;
1917 }
1918
8b3e1a56
AE
1919 if (img_request_child_test(img_request)) {
1920 rbd_assert(img_request->obj_request != NULL);
1921 more = obj_request->which < img_request->obj_request_count - 1;
1922 } else {
1923 rbd_assert(img_request->rq != NULL);
1924 more = blk_end_request(img_request->rq, result, xferred);
1925 }
1926
1927 return more;
1217857f
AE
1928}
1929
2169238d
AE
1930static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1931{
1932 struct rbd_img_request *img_request;
1933 u32 which = obj_request->which;
1934 bool more = true;
1935
6365d33a 1936 rbd_assert(obj_request_img_data_test(obj_request));
2169238d
AE
1937 img_request = obj_request->img_request;
1938
1939 dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1940 rbd_assert(img_request != NULL);
2169238d
AE
1941 rbd_assert(img_request->obj_request_count > 0);
1942 rbd_assert(which != BAD_WHICH);
1943 rbd_assert(which < img_request->obj_request_count);
1944 rbd_assert(which >= img_request->next_completion);
1945
1946 spin_lock_irq(&img_request->completion_lock);
1947 if (which != img_request->next_completion)
1948 goto out;
1949
1950 for_each_obj_request_from(img_request, obj_request) {
2169238d
AE
1951 rbd_assert(more);
1952 rbd_assert(which < img_request->obj_request_count);
1953
1954 if (!obj_request_done_test(obj_request))
1955 break;
1217857f 1956 more = rbd_img_obj_end_request(obj_request);
2169238d
AE
1957 which++;
1958 }
1959
1960 rbd_assert(more ^ (which == img_request->obj_request_count));
1961 img_request->next_completion = which;
1962out:
1963 spin_unlock_irq(&img_request->completion_lock);
1964
1965 if (!more)
1966 rbd_img_request_complete(img_request);
1967}
1968
f1a4739f
AE
1969/*
1970 * Split up an image request into one or more object requests, each
1971 * to a different object. The "type" parameter indicates whether
1972 * "data_desc" is the pointer to the head of a list of bio
1973 * structures, or the base of a page array. In either case this
1974 * function assumes data_desc describes memory sufficient to hold
1975 * all data described by the image request.
1976 */
1977static int rbd_img_request_fill(struct rbd_img_request *img_request,
1978 enum obj_request_type type,
1979 void *data_desc)
bf0d5f50
AE
1980{
1981 struct rbd_device *rbd_dev = img_request->rbd_dev;
1982 struct rbd_obj_request *obj_request = NULL;
1983 struct rbd_obj_request *next_obj_request;
0c425248 1984 bool write_request = img_request_write_test(img_request);
f1a4739f
AE
1985 struct bio *bio_list;
1986 unsigned int bio_offset = 0;
1987 struct page **pages;
7da22d29 1988 u64 img_offset;
bf0d5f50
AE
1989 u64 resid;
1990 u16 opcode;
1991
f1a4739f
AE
1992 dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1993 (int)type, data_desc);
37206ee5 1994
430c28c3 1995 opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
7da22d29 1996 img_offset = img_request->offset;
bf0d5f50 1997 resid = img_request->length;
4dda41d3 1998 rbd_assert(resid > 0);
f1a4739f
AE
1999
2000 if (type == OBJ_REQUEST_BIO) {
2001 bio_list = data_desc;
2002 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
2003 } else {
2004 rbd_assert(type == OBJ_REQUEST_PAGES);
2005 pages = data_desc;
2006 }
2007
bf0d5f50 2008 while (resid) {
2fa12320 2009 struct ceph_osd_request *osd_req;
bf0d5f50 2010 const char *object_name;
bf0d5f50
AE
2011 u64 offset;
2012 u64 length;
2013
7da22d29 2014 object_name = rbd_segment_name(rbd_dev, img_offset);
bf0d5f50
AE
2015 if (!object_name)
2016 goto out_unwind;
7da22d29
AE
2017 offset = rbd_segment_offset(rbd_dev, img_offset);
2018 length = rbd_segment_length(rbd_dev, img_offset, resid);
bf0d5f50 2019 obj_request = rbd_obj_request_create(object_name,
f1a4739f 2020 offset, length, type);
78c2a44a
AE
2021 /* object request has its own copy of the object name */
2022 rbd_segment_name_free(object_name);
bf0d5f50
AE
2023 if (!obj_request)
2024 goto out_unwind;
2025
f1a4739f
AE
2026 if (type == OBJ_REQUEST_BIO) {
2027 unsigned int clone_size;
2028
2029 rbd_assert(length <= (u64)UINT_MAX);
2030 clone_size = (unsigned int)length;
2031 obj_request->bio_list =
2032 bio_chain_clone_range(&bio_list,
2033 &bio_offset,
2034 clone_size,
2035 GFP_ATOMIC);
2036 if (!obj_request->bio_list)
2037 goto out_partial;
2038 } else {
2039 unsigned int page_count;
2040
2041 obj_request->pages = pages;
2042 page_count = (u32)calc_pages_for(offset, length);
2043 obj_request->page_count = page_count;
2044 if ((offset + length) & ~PAGE_MASK)
2045 page_count--; /* more on last page */
2046 pages += page_count;
2047 }
bf0d5f50 2048
2fa12320
AE
2049 osd_req = rbd_osd_req_create(rbd_dev, write_request,
2050 obj_request);
2051 if (!osd_req)
bf0d5f50 2052 goto out_partial;
2fa12320 2053 obj_request->osd_req = osd_req;
2169238d 2054 obj_request->callback = rbd_img_obj_callback;
430c28c3 2055
2fa12320
AE
2056 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
2057 0, 0);
f1a4739f
AE
2058 if (type == OBJ_REQUEST_BIO)
2059 osd_req_op_extent_osd_data_bio(osd_req, 0,
2060 obj_request->bio_list, length);
2061 else
2062 osd_req_op_extent_osd_data_pages(osd_req, 0,
2063 obj_request->pages, length,
2064 offset & ~PAGE_MASK, false, false);
9d4df01f
AE
2065
2066 if (write_request)
2067 rbd_osd_req_format_write(obj_request);
2068 else
2069 rbd_osd_req_format_read(obj_request);
430c28c3 2070
7da22d29 2071 obj_request->img_offset = img_offset;
bf0d5f50
AE
2072 rbd_img_obj_request_add(img_request, obj_request);
2073
7da22d29 2074 img_offset += length;
bf0d5f50
AE
2075 resid -= length;
2076 }
2077
2078 return 0;
2079
2080out_partial:
2081 rbd_obj_request_put(obj_request);
2082out_unwind:
2083 for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2084 rbd_obj_request_put(obj_request);
2085
2086 return -ENOMEM;
2087}
2088
0eefd470
AE
2089static void
2090rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2091{
2092 struct rbd_img_request *img_request;
2093 struct rbd_device *rbd_dev;
2094 u64 length;
2095 u32 page_count;
2096
2097 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2098 rbd_assert(obj_request_img_data_test(obj_request));
2099 img_request = obj_request->img_request;
2100 rbd_assert(img_request);
2101
2102 rbd_dev = img_request->rbd_dev;
2103 rbd_assert(rbd_dev);
2104 length = (u64)1 << rbd_dev->header.obj_order;
2105 page_count = (u32)calc_pages_for(0, length);
2106
2107 rbd_assert(obj_request->copyup_pages);
2108 ceph_release_page_vector(obj_request->copyup_pages, page_count);
2109 obj_request->copyup_pages = NULL;
2110
2111 /*
2112 * We want the transfer count to reflect the size of the
2113 * original write request. There is no such thing as a
2114 * successful short write, so if the request was successful
2115 * we can just set it to the originally-requested length.
2116 */
2117 if (!obj_request->result)
2118 obj_request->xferred = obj_request->length;
2119
2120 /* Finish up with the normal image object callback */
2121
2122 rbd_img_obj_callback(obj_request);
2123}
2124
3d7efd18
AE
2125static void
2126rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2127{
2128 struct rbd_obj_request *orig_request;
0eefd470
AE
2129 struct ceph_osd_request *osd_req;
2130 struct ceph_osd_client *osdc;
2131 struct rbd_device *rbd_dev;
3d7efd18 2132 struct page **pages;
3d7efd18
AE
2133 int result;
2134 u64 obj_size;
2135 u64 xferred;
2136
2137 rbd_assert(img_request_child_test(img_request));
2138
2139 /* First get what we need from the image request */
2140
2141 pages = img_request->copyup_pages;
2142 rbd_assert(pages != NULL);
2143 img_request->copyup_pages = NULL;
2144
2145 orig_request = img_request->obj_request;
2146 rbd_assert(orig_request != NULL);
0eefd470 2147 rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
3d7efd18
AE
2148 result = img_request->result;
2149 obj_size = img_request->length;
2150 xferred = img_request->xferred;
2151
0eefd470
AE
2152 rbd_dev = img_request->rbd_dev;
2153 rbd_assert(rbd_dev);
2154 rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2155
3d7efd18
AE
2156 rbd_img_request_put(img_request);
2157
0eefd470
AE
2158 if (result)
2159 goto out_err;
2160
2161 /* Allocate the new copyup osd request for the original request */
2162
2163 result = -ENOMEM;
2164 rbd_assert(!orig_request->osd_req);
2165 osd_req = rbd_osd_req_create_copyup(orig_request);
2166 if (!osd_req)
2167 goto out_err;
2168 orig_request->osd_req = osd_req;
2169 orig_request->copyup_pages = pages;
3d7efd18 2170
0eefd470 2171 /* Initialize the copyup op */
3d7efd18 2172
0eefd470
AE
2173 osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2174 osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2175 false, false);
3d7efd18 2176
0eefd470
AE
2177 /* Then the original write request op */
2178
2179 osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2180 orig_request->offset,
2181 orig_request->length, 0, 0);
2182 osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2183 orig_request->length);
2184
2185 rbd_osd_req_format_write(orig_request);
2186
2187 /* All set, send it off. */
2188
2189 orig_request->callback = rbd_img_obj_copyup_callback;
2190 osdc = &rbd_dev->rbd_client->client->osdc;
2191 result = rbd_obj_request_submit(osdc, orig_request);
2192 if (!result)
2193 return;
2194out_err:
2195 /* Record the error code and complete the request */
2196
2197 orig_request->result = result;
2198 orig_request->xferred = 0;
2199 obj_request_done_set(orig_request);
2200 rbd_obj_request_complete(orig_request);
3d7efd18
AE
2201}
2202
2203/*
2204 * Read from the parent image the range of data that covers the
2205 * entire target of the given object request. This is used for
2206 * satisfying a layered image write request when the target of an
2207 * object request from the image request does not exist.
2208 *
2209 * A page array big enough to hold the returned data is allocated
2210 * and supplied to rbd_img_request_fill() as the "data descriptor."
2211 * When the read completes, this page array will be transferred to
2212 * the original object request for the copyup operation.
2213 *
2214 * If an error occurs, record it as the result of the original
2215 * object request and mark it done so it gets completed.
2216 */
2217static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2218{
2219 struct rbd_img_request *img_request = NULL;
2220 struct rbd_img_request *parent_request = NULL;
2221 struct rbd_device *rbd_dev;
2222 u64 img_offset;
2223 u64 length;
2224 struct page **pages = NULL;
2225 u32 page_count;
2226 int result;
2227
2228 rbd_assert(obj_request_img_data_test(obj_request));
2229 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2230
2231 img_request = obj_request->img_request;
2232 rbd_assert(img_request != NULL);
2233 rbd_dev = img_request->rbd_dev;
2234 rbd_assert(rbd_dev->parent != NULL);
2235
0eefd470
AE
2236 /*
2237 * First things first. The original osd request is of no
2238 * use to use any more, we'll need a new one that can hold
2239 * the two ops in a copyup request. We'll get that later,
2240 * but for now we can release the old one.
2241 */
2242 rbd_osd_req_destroy(obj_request->osd_req);
2243 obj_request->osd_req = NULL;
2244
3d7efd18
AE
2245 /*
2246 * Determine the byte range covered by the object in the
2247 * child image to which the original request was to be sent.
2248 */
2249 img_offset = obj_request->img_offset - obj_request->offset;
2250 length = (u64)1 << rbd_dev->header.obj_order;
2251
a9e8ba2c
AE
2252 /*
2253 * There is no defined parent data beyond the parent
2254 * overlap, so limit what we read at that boundary if
2255 * necessary.
2256 */
2257 if (img_offset + length > rbd_dev->parent_overlap) {
2258 rbd_assert(img_offset < rbd_dev->parent_overlap);
2259 length = rbd_dev->parent_overlap - img_offset;
2260 }
2261
3d7efd18
AE
2262 /*
2263 * Allocate a page array big enough to receive the data read
2264 * from the parent.
2265 */
2266 page_count = (u32)calc_pages_for(0, length);
2267 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2268 if (IS_ERR(pages)) {
2269 result = PTR_ERR(pages);
2270 pages = NULL;
2271 goto out_err;
2272 }
2273
2274 result = -ENOMEM;
2275 parent_request = rbd_img_request_create(rbd_dev->parent,
2276 img_offset, length,
2277 false, true);
2278 if (!parent_request)
2279 goto out_err;
2280 rbd_obj_request_get(obj_request);
2281 parent_request->obj_request = obj_request;
2282
2283 result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2284 if (result)
2285 goto out_err;
2286 parent_request->copyup_pages = pages;
2287
2288 parent_request->callback = rbd_img_obj_parent_read_full_callback;
2289 result = rbd_img_request_submit(parent_request);
2290 if (!result)
2291 return 0;
2292
2293 parent_request->copyup_pages = NULL;
2294 parent_request->obj_request = NULL;
2295 rbd_obj_request_put(obj_request);
2296out_err:
2297 if (pages)
2298 ceph_release_page_vector(pages, page_count);
2299 if (parent_request)
2300 rbd_img_request_put(parent_request);
2301 obj_request->result = result;
2302 obj_request->xferred = 0;
2303 obj_request_done_set(obj_request);
2304
2305 return result;
2306}
2307
c5b5ef6c
AE
2308static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2309{
c5b5ef6c
AE
2310 struct rbd_obj_request *orig_request;
2311 int result;
2312
2313 rbd_assert(!obj_request_img_data_test(obj_request));
2314
2315 /*
2316 * All we need from the object request is the original
2317 * request and the result of the STAT op. Grab those, then
2318 * we're done with the request.
2319 */
2320 orig_request = obj_request->obj_request;
2321 obj_request->obj_request = NULL;
2322 rbd_assert(orig_request);
2323 rbd_assert(orig_request->img_request);
2324
2325 result = obj_request->result;
2326 obj_request->result = 0;
2327
2328 dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2329 obj_request, orig_request, result,
2330 obj_request->xferred, obj_request->length);
2331 rbd_obj_request_put(obj_request);
2332
2333 rbd_assert(orig_request);
2334 rbd_assert(orig_request->img_request);
c5b5ef6c
AE
2335
2336 /*
2337 * Our only purpose here is to determine whether the object
2338 * exists, and we don't want to treat the non-existence as
2339 * an error. If something else comes back, transfer the
2340 * error to the original request and complete it now.
2341 */
2342 if (!result) {
2343 obj_request_existence_set(orig_request, true);
2344 } else if (result == -ENOENT) {
2345 obj_request_existence_set(orig_request, false);
2346 } else if (result) {
2347 orig_request->result = result;
3d7efd18 2348 goto out;
c5b5ef6c
AE
2349 }
2350
2351 /*
2352 * Resubmit the original request now that we have recorded
2353 * whether the target object exists.
2354 */
b454e36d 2355 orig_request->result = rbd_img_obj_request_submit(orig_request);
3d7efd18 2356out:
c5b5ef6c
AE
2357 if (orig_request->result)
2358 rbd_obj_request_complete(orig_request);
2359 rbd_obj_request_put(orig_request);
2360}
2361
2362static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2363{
2364 struct rbd_obj_request *stat_request;
2365 struct rbd_device *rbd_dev;
2366 struct ceph_osd_client *osdc;
2367 struct page **pages = NULL;
2368 u32 page_count;
2369 size_t size;
2370 int ret;
2371
2372 /*
2373 * The response data for a STAT call consists of:
2374 * le64 length;
2375 * struct {
2376 * le32 tv_sec;
2377 * le32 tv_nsec;
2378 * } mtime;
2379 */
2380 size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2381 page_count = (u32)calc_pages_for(0, size);
2382 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2383 if (IS_ERR(pages))
2384 return PTR_ERR(pages);
2385
2386 ret = -ENOMEM;
2387 stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2388 OBJ_REQUEST_PAGES);
2389 if (!stat_request)
2390 goto out;
2391
2392 rbd_obj_request_get(obj_request);
2393 stat_request->obj_request = obj_request;
2394 stat_request->pages = pages;
2395 stat_request->page_count = page_count;
2396
2397 rbd_assert(obj_request->img_request);
2398 rbd_dev = obj_request->img_request->rbd_dev;
2399 stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2400 stat_request);
2401 if (!stat_request->osd_req)
2402 goto out;
2403 stat_request->callback = rbd_img_obj_exists_callback;
2404
2405 osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2406 osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2407 false, false);
9d4df01f 2408 rbd_osd_req_format_read(stat_request);
c5b5ef6c
AE
2409
2410 osdc = &rbd_dev->rbd_client->client->osdc;
2411 ret = rbd_obj_request_submit(osdc, stat_request);
2412out:
2413 if (ret)
2414 rbd_obj_request_put(obj_request);
2415
2416 return ret;
2417}
2418
b454e36d
AE
2419static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2420{
2421 struct rbd_img_request *img_request;
a9e8ba2c 2422 struct rbd_device *rbd_dev;
3d7efd18 2423 bool known;
b454e36d
AE
2424
2425 rbd_assert(obj_request_img_data_test(obj_request));
2426
2427 img_request = obj_request->img_request;
2428 rbd_assert(img_request);
a9e8ba2c 2429 rbd_dev = img_request->rbd_dev;
b454e36d 2430
b454e36d 2431 /*
a9e8ba2c
AE
2432 * Only writes to layered images need special handling.
2433 * Reads and non-layered writes are simple object requests.
2434 * Layered writes that start beyond the end of the overlap
2435 * with the parent have no parent data, so they too are
2436 * simple object requests. Finally, if the target object is
2437 * known to already exist, its parent data has already been
2438 * copied, so a write to the object can also be handled as a
2439 * simple object request.
b454e36d
AE
2440 */
2441 if (!img_request_write_test(img_request) ||
2442 !img_request_layered_test(img_request) ||
a9e8ba2c 2443 rbd_dev->parent_overlap <= obj_request->img_offset ||
3d7efd18
AE
2444 ((known = obj_request_known_test(obj_request)) &&
2445 obj_request_exists_test(obj_request))) {
b454e36d
AE
2446
2447 struct rbd_device *rbd_dev;
2448 struct ceph_osd_client *osdc;
2449
2450 rbd_dev = obj_request->img_request->rbd_dev;
2451 osdc = &rbd_dev->rbd_client->client->osdc;
2452
2453 return rbd_obj_request_submit(osdc, obj_request);
2454 }
2455
2456 /*
3d7efd18
AE
2457 * It's a layered write. The target object might exist but
2458 * we may not know that yet. If we know it doesn't exist,
2459 * start by reading the data for the full target object from
2460 * the parent so we can use it for a copyup to the target.
b454e36d 2461 */
3d7efd18
AE
2462 if (known)
2463 return rbd_img_obj_parent_read_full(obj_request);
2464
2465 /* We don't know whether the target exists. Go find out. */
b454e36d
AE
2466
2467 return rbd_img_obj_exists_submit(obj_request);
2468}
2469
bf0d5f50
AE
2470static int rbd_img_request_submit(struct rbd_img_request *img_request)
2471{
bf0d5f50 2472 struct rbd_obj_request *obj_request;
46faeed4 2473 struct rbd_obj_request *next_obj_request;
bf0d5f50 2474
37206ee5 2475 dout("%s: img %p\n", __func__, img_request);
46faeed4 2476 for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
bf0d5f50
AE
2477 int ret;
2478
b454e36d 2479 ret = rbd_img_obj_request_submit(obj_request);
bf0d5f50
AE
2480 if (ret)
2481 return ret;
bf0d5f50
AE
2482 }
2483
2484 return 0;
2485}
8b3e1a56
AE
2486
2487static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2488{
2489 struct rbd_obj_request *obj_request;
a9e8ba2c
AE
2490 struct rbd_device *rbd_dev;
2491 u64 obj_end;
8b3e1a56
AE
2492
2493 rbd_assert(img_request_child_test(img_request));
2494
2495 obj_request = img_request->obj_request;
a9e8ba2c
AE
2496 rbd_assert(obj_request);
2497 rbd_assert(obj_request->img_request);
2498
8b3e1a56 2499 obj_request->result = img_request->result;
a9e8ba2c
AE
2500 if (obj_request->result)
2501 goto out;
2502
2503 /*
2504 * We need to zero anything beyond the parent overlap
2505 * boundary. Since rbd_img_obj_request_read_callback()
2506 * will zero anything beyond the end of a short read, an
2507 * easy way to do this is to pretend the data from the
2508 * parent came up short--ending at the overlap boundary.
2509 */
2510 rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2511 obj_end = obj_request->img_offset + obj_request->length;
2512 rbd_dev = obj_request->img_request->rbd_dev;
2513 if (obj_end > rbd_dev->parent_overlap) {
2514 u64 xferred = 0;
2515
2516 if (obj_request->img_offset < rbd_dev->parent_overlap)
2517 xferred = rbd_dev->parent_overlap -
2518 obj_request->img_offset;
8b3e1a56 2519
a9e8ba2c
AE
2520 obj_request->xferred = min(img_request->xferred, xferred);
2521 } else {
2522 obj_request->xferred = img_request->xferred;
2523 }
2524out:
b5b09be3 2525 rbd_img_request_put(img_request);
8b3e1a56
AE
2526 rbd_img_obj_request_read_callback(obj_request);
2527 rbd_obj_request_complete(obj_request);
2528}
2529
2530static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2531{
2532 struct rbd_device *rbd_dev;
2533 struct rbd_img_request *img_request;
2534 int result;
2535
2536 rbd_assert(obj_request_img_data_test(obj_request));
2537 rbd_assert(obj_request->img_request != NULL);
2538 rbd_assert(obj_request->result == (s32) -ENOENT);
2539 rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2540
2541 rbd_dev = obj_request->img_request->rbd_dev;
2542 rbd_assert(rbd_dev->parent != NULL);
2543 /* rbd_read_finish(obj_request, obj_request->length); */
2544 img_request = rbd_img_request_create(rbd_dev->parent,
2545 obj_request->img_offset,
2546 obj_request->length,
2547 false, true);
2548 result = -ENOMEM;
2549 if (!img_request)
2550 goto out_err;
2551
2552 rbd_obj_request_get(obj_request);
2553 img_request->obj_request = obj_request;
2554
f1a4739f
AE
2555 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2556 obj_request->bio_list);
8b3e1a56
AE
2557 if (result)
2558 goto out_err;
2559
2560 img_request->callback = rbd_img_parent_read_callback;
2561 result = rbd_img_request_submit(img_request);
2562 if (result)
2563 goto out_err;
2564
2565 return;
2566out_err:
2567 if (img_request)
2568 rbd_img_request_put(img_request);
2569 obj_request->result = result;
2570 obj_request->xferred = 0;
2571 obj_request_done_set(obj_request);
2572}
bf0d5f50 2573
cc4a38bd 2574static int rbd_obj_notify_ack(struct rbd_device *rbd_dev, u64 notify_id)
b8d70035
AE
2575{
2576 struct rbd_obj_request *obj_request;
2169238d 2577 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
b8d70035
AE
2578 int ret;
2579
2580 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2581 OBJ_REQUEST_NODATA);
2582 if (!obj_request)
2583 return -ENOMEM;
2584
2585 ret = -ENOMEM;
430c28c3 2586 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
b8d70035
AE
2587 if (!obj_request->osd_req)
2588 goto out;
2169238d 2589 obj_request->callback = rbd_obj_request_put;
b8d70035 2590
c99d2d4a 2591 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
cc4a38bd 2592 notify_id, 0, 0);
9d4df01f 2593 rbd_osd_req_format_read(obj_request);
430c28c3 2594
b8d70035 2595 ret = rbd_obj_request_submit(osdc, obj_request);
b8d70035 2596out:
cf81b60e
AE
2597 if (ret)
2598 rbd_obj_request_put(obj_request);
b8d70035
AE
2599
2600 return ret;
2601}
2602
2603static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2604{
2605 struct rbd_device *rbd_dev = (struct rbd_device *)data;
e627db08 2606 int ret;
b8d70035
AE
2607
2608 if (!rbd_dev)
2609 return;
2610
37206ee5 2611 dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
cc4a38bd
AE
2612 rbd_dev->header_name, (unsigned long long)notify_id,
2613 (unsigned int)opcode);
e627db08
AE
2614 ret = rbd_dev_refresh(rbd_dev);
2615 if (ret)
2616 rbd_warn(rbd_dev, ": header refresh error (%d)\n", ret);
b8d70035 2617
cc4a38bd 2618 rbd_obj_notify_ack(rbd_dev, notify_id);
b8d70035
AE
2619}
2620
9969ebc5
AE
2621/*
2622 * Request sync osd watch/unwatch. The value of "start" determines
2623 * whether a watch request is being initiated or torn down.
2624 */
2625static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2626{
2627 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2628 struct rbd_obj_request *obj_request;
9969ebc5
AE
2629 int ret;
2630
2631 rbd_assert(start ^ !!rbd_dev->watch_event);
2632 rbd_assert(start ^ !!rbd_dev->watch_request);
2633
2634 if (start) {
3c663bbd 2635 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
9969ebc5
AE
2636 &rbd_dev->watch_event);
2637 if (ret < 0)
2638 return ret;
8eb87565 2639 rbd_assert(rbd_dev->watch_event != NULL);
9969ebc5
AE
2640 }
2641
2642 ret = -ENOMEM;
2643 obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2644 OBJ_REQUEST_NODATA);
2645 if (!obj_request)
2646 goto out_cancel;
2647
430c28c3
AE
2648 obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2649 if (!obj_request->osd_req)
2650 goto out_cancel;
2651
8eb87565 2652 if (start)
975241af 2653 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
8eb87565 2654 else
6977c3f9 2655 ceph_osdc_unregister_linger_request(osdc,
975241af 2656 rbd_dev->watch_request->osd_req);
2169238d
AE
2657
2658 osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
b21ebddd 2659 rbd_dev->watch_event->cookie, 0, start);
9d4df01f 2660 rbd_osd_req_format_write(obj_request);
2169238d 2661
9969ebc5
AE
2662 ret = rbd_obj_request_submit(osdc, obj_request);
2663 if (ret)
2664 goto out_cancel;
2665 ret = rbd_obj_request_wait(obj_request);
2666 if (ret)
2667 goto out_cancel;
9969ebc5
AE
2668 ret = obj_request->result;
2669 if (ret)
2670 goto out_cancel;
2671
8eb87565
AE
2672 /*
2673 * A watch request is set to linger, so the underlying osd
2674 * request won't go away until we unregister it. We retain
2675 * a pointer to the object request during that time (in
2676 * rbd_dev->watch_request), so we'll keep a reference to
2677 * it. We'll drop that reference (below) after we've
2678 * unregistered it.
2679 */
2680 if (start) {
2681 rbd_dev->watch_request = obj_request;
2682
2683 return 0;
2684 }
2685
2686 /* We have successfully torn down the watch request */
2687
2688 rbd_obj_request_put(rbd_dev->watch_request);
2689 rbd_dev->watch_request = NULL;
9969ebc5
AE
2690out_cancel:
2691 /* Cancel the event if we're tearing down, or on error */
2692 ceph_osdc_cancel_event(rbd_dev->watch_event);
2693 rbd_dev->watch_event = NULL;
9969ebc5
AE
2694 if (obj_request)
2695 rbd_obj_request_put(obj_request);
2696
2697 return ret;
2698}
2699
36be9a76 2700/*
f40eb349
AE
2701 * Synchronous osd object method call. Returns the number of bytes
2702 * returned in the outbound buffer, or a negative error code.
36be9a76
AE
2703 */
2704static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2705 const char *object_name,
2706 const char *class_name,
2707 const char *method_name,
4157976b 2708 const void *outbound,
36be9a76 2709 size_t outbound_size,
4157976b 2710 void *inbound,
e2a58ee5 2711 size_t inbound_size)
36be9a76 2712{
2169238d 2713 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
36be9a76 2714 struct rbd_obj_request *obj_request;
36be9a76
AE
2715 struct page **pages;
2716 u32 page_count;
2717 int ret;
2718
2719 /*
6010a451
AE
2720 * Method calls are ultimately read operations. The result
2721 * should placed into the inbound buffer provided. They
2722 * also supply outbound data--parameters for the object
2723 * method. Currently if this is present it will be a
2724 * snapshot id.
36be9a76 2725 */
57385b51 2726 page_count = (u32)calc_pages_for(0, inbound_size);
36be9a76
AE
2727 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2728 if (IS_ERR(pages))
2729 return PTR_ERR(pages);
2730
2731 ret = -ENOMEM;
6010a451 2732 obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
36be9a76
AE
2733 OBJ_REQUEST_PAGES);
2734 if (!obj_request)
2735 goto out;
2736
2737 obj_request->pages = pages;
2738 obj_request->page_count = page_count;
2739
430c28c3 2740 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
36be9a76
AE
2741 if (!obj_request->osd_req)
2742 goto out;
2743
c99d2d4a 2744 osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
04017e29
AE
2745 class_name, method_name);
2746 if (outbound_size) {
2747 struct ceph_pagelist *pagelist;
2748
2749 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2750 if (!pagelist)
2751 goto out;
2752
2753 ceph_pagelist_init(pagelist);
2754 ceph_pagelist_append(pagelist, outbound, outbound_size);
2755 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2756 pagelist);
2757 }
a4ce40a9
AE
2758 osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2759 obj_request->pages, inbound_size,
44cd188d 2760 0, false, false);
9d4df01f 2761 rbd_osd_req_format_read(obj_request);
430c28c3 2762
36be9a76
AE
2763 ret = rbd_obj_request_submit(osdc, obj_request);
2764 if (ret)
2765 goto out;
2766 ret = rbd_obj_request_wait(obj_request);
2767 if (ret)
2768 goto out;
2769
2770 ret = obj_request->result;
2771 if (ret < 0)
2772 goto out;
57385b51
AE
2773
2774 rbd_assert(obj_request->xferred < (u64)INT_MAX);
2775 ret = (int)obj_request->xferred;
903bb32e 2776 ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
36be9a76
AE
2777out:
2778 if (obj_request)
2779 rbd_obj_request_put(obj_request);
2780 else
2781 ceph_release_page_vector(pages, page_count);
2782
2783 return ret;
2784}
2785
bf0d5f50 2786static void rbd_request_fn(struct request_queue *q)
cc344fa1 2787 __releases(q->queue_lock) __acquires(q->queue_lock)
bf0d5f50
AE
2788{
2789 struct rbd_device *rbd_dev = q->queuedata;
2790 bool read_only = rbd_dev->mapping.read_only;
2791 struct request *rq;
2792 int result;
2793
2794 while ((rq = blk_fetch_request(q))) {
2795 bool write_request = rq_data_dir(rq) == WRITE;
2796 struct rbd_img_request *img_request;
2797 u64 offset;
2798 u64 length;
2799
2800 /* Ignore any non-FS requests that filter through. */
2801
2802 if (rq->cmd_type != REQ_TYPE_FS) {
4dda41d3
AE
2803 dout("%s: non-fs request type %d\n", __func__,
2804 (int) rq->cmd_type);
2805 __blk_end_request_all(rq, 0);
2806 continue;
2807 }
2808
2809 /* Ignore/skip any zero-length requests */
2810
2811 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2812 length = (u64) blk_rq_bytes(rq);
2813
2814 if (!length) {
2815 dout("%s: zero-length request\n", __func__);
bf0d5f50
AE
2816 __blk_end_request_all(rq, 0);
2817 continue;
2818 }
2819
2820 spin_unlock_irq(q->queue_lock);
2821
2822 /* Disallow writes to a read-only device */
2823
2824 if (write_request) {
2825 result = -EROFS;
2826 if (read_only)
2827 goto end_request;
2828 rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2829 }
2830
6d292906
AE
2831 /*
2832 * Quit early if the mapped snapshot no longer
2833 * exists. It's still possible the snapshot will
2834 * have disappeared by the time our request arrives
2835 * at the osd, but there's no sense in sending it if
2836 * we already know.
2837 */
2838 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
bf0d5f50
AE
2839 dout("request for non-existent snapshot");
2840 rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2841 result = -ENXIO;
2842 goto end_request;
2843 }
2844
bf0d5f50 2845 result = -EINVAL;
c0cd10db
AE
2846 if (offset && length > U64_MAX - offset + 1) {
2847 rbd_warn(rbd_dev, "bad request range (%llu~%llu)\n",
2848 offset, length);
bf0d5f50 2849 goto end_request; /* Shouldn't happen */
c0cd10db 2850 }
bf0d5f50 2851
00a653e2
AE
2852 result = -EIO;
2853 if (offset + length > rbd_dev->mapping.size) {
2854 rbd_warn(rbd_dev, "beyond EOD (%llu~%llu > %llu)\n",
2855 offset, length, rbd_dev->mapping.size);
2856 goto end_request;
2857 }
2858
bf0d5f50
AE
2859 result = -ENOMEM;
2860 img_request = rbd_img_request_create(rbd_dev, offset, length,
9849e986 2861 write_request, false);
bf0d5f50
AE
2862 if (!img_request)
2863 goto end_request;
2864
2865 img_request->rq = rq;
2866
f1a4739f
AE
2867 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2868 rq->bio);
bf0d5f50
AE
2869 if (!result)
2870 result = rbd_img_request_submit(img_request);
2871 if (result)
2872 rbd_img_request_put(img_request);
2873end_request:
2874 spin_lock_irq(q->queue_lock);
2875 if (result < 0) {
7da22d29
AE
2876 rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2877 write_request ? "write" : "read",
2878 length, offset, result);
2879
bf0d5f50
AE
2880 __blk_end_request_all(rq, result);
2881 }
2882 }
2883}
2884
602adf40
YS
2885/*
2886 * a queue callback. Makes sure that we don't create a bio that spans across
2887 * multiple osd objects. One exception would be with a single page bios,
f7760dad 2888 * which we handle later at bio_chain_clone_range()
602adf40
YS
2889 */
2890static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2891 struct bio_vec *bvec)
2892{
2893 struct rbd_device *rbd_dev = q->queuedata;
e5cfeed2
AE
2894 sector_t sector_offset;
2895 sector_t sectors_per_obj;
2896 sector_t obj_sector_offset;
2897 int ret;
2898
2899 /*
2900 * Find how far into its rbd object the partition-relative
2901 * bio start sector is to offset relative to the enclosing
2902 * device.
2903 */
2904 sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2905 sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2906 obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2907
2908 /*
2909 * Compute the number of bytes from that offset to the end
2910 * of the object. Account for what's already used by the bio.
2911 */
2912 ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2913 if (ret > bmd->bi_size)
2914 ret -= bmd->bi_size;
2915 else
2916 ret = 0;
2917
2918 /*
2919 * Don't send back more than was asked for. And if the bio
2920 * was empty, let the whole thing through because: "Note
2921 * that a block device *must* allow a single page to be
2922 * added to an empty bio."
2923 */
2924 rbd_assert(bvec->bv_len <= PAGE_SIZE);
2925 if (ret > (int) bvec->bv_len || !bmd->bi_size)
2926 ret = (int) bvec->bv_len;
2927
2928 return ret;
602adf40
YS
2929}
2930
2931static void rbd_free_disk(struct rbd_device *rbd_dev)
2932{
2933 struct gendisk *disk = rbd_dev->disk;
2934
2935 if (!disk)
2936 return;
2937
a0cab924
AE
2938 rbd_dev->disk = NULL;
2939 if (disk->flags & GENHD_FL_UP) {
602adf40 2940 del_gendisk(disk);
a0cab924
AE
2941 if (disk->queue)
2942 blk_cleanup_queue(disk->queue);
2943 }
602adf40
YS
2944 put_disk(disk);
2945}
2946
788e2df3
AE
2947static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2948 const char *object_name,
7097f8df 2949 u64 offset, u64 length, void *buf)
788e2df3
AE
2950
2951{
2169238d 2952 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
788e2df3 2953 struct rbd_obj_request *obj_request;
788e2df3
AE
2954 struct page **pages = NULL;
2955 u32 page_count;
1ceae7ef 2956 size_t size;
788e2df3
AE
2957 int ret;
2958
2959 page_count = (u32) calc_pages_for(offset, length);
2960 pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2961 if (IS_ERR(pages))
2962 ret = PTR_ERR(pages);
2963
2964 ret = -ENOMEM;
2965 obj_request = rbd_obj_request_create(object_name, offset, length,
36be9a76 2966 OBJ_REQUEST_PAGES);
788e2df3
AE
2967 if (!obj_request)
2968 goto out;
2969
2970 obj_request->pages = pages;
2971 obj_request->page_count = page_count;
2972
430c28c3 2973 obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
788e2df3
AE
2974 if (!obj_request->osd_req)
2975 goto out;
2976
c99d2d4a
AE
2977 osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2978 offset, length, 0, 0);
406e2c9f 2979 osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
a4ce40a9 2980 obj_request->pages,
44cd188d
AE
2981 obj_request->length,
2982 obj_request->offset & ~PAGE_MASK,
2983 false, false);
9d4df01f 2984 rbd_osd_req_format_read(obj_request);
430c28c3 2985
788e2df3
AE
2986 ret = rbd_obj_request_submit(osdc, obj_request);
2987 if (ret)
2988 goto out;
2989 ret = rbd_obj_request_wait(obj_request);
2990 if (ret)
2991 goto out;
2992
2993 ret = obj_request->result;
2994 if (ret < 0)
2995 goto out;
1ceae7ef
AE
2996
2997 rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2998 size = (size_t) obj_request->xferred;
903bb32e 2999 ceph_copy_from_page_vector(pages, buf, 0, size);
7097f8df
AE
3000 rbd_assert(size <= (size_t)INT_MAX);
3001 ret = (int)size;
788e2df3
AE
3002out:
3003 if (obj_request)
3004 rbd_obj_request_put(obj_request);
3005 else
3006 ceph_release_page_vector(pages, page_count);
3007
3008 return ret;
3009}
3010
602adf40 3011/*
4156d998
AE
3012 * Read the complete header for the given rbd device.
3013 *
3014 * Returns a pointer to a dynamically-allocated buffer containing
3015 * the complete and validated header. Caller can pass the address
3016 * of a variable that will be filled in with the version of the
3017 * header object at the time it was read.
3018 *
3019 * Returns a pointer-coded errno if a failure occurs.
602adf40 3020 */
4156d998 3021static struct rbd_image_header_ondisk *
7097f8df 3022rbd_dev_v1_header_read(struct rbd_device *rbd_dev)
602adf40 3023{
4156d998 3024 struct rbd_image_header_ondisk *ondisk = NULL;
50f7c4c9 3025 u32 snap_count = 0;
4156d998
AE
3026 u64 names_size = 0;
3027 u32 want_count;
3028 int ret;
602adf40 3029
00f1f36f 3030 /*
4156d998
AE
3031 * The complete header will include an array of its 64-bit
3032 * snapshot ids, followed by the names of those snapshots as
3033 * a contiguous block of NUL-terminated strings. Note that
3034 * the number of snapshots could change by the time we read
3035 * it in, in which case we re-read it.
00f1f36f 3036 */
4156d998
AE
3037 do {
3038 size_t size;
3039
3040 kfree(ondisk);
3041
3042 size = sizeof (*ondisk);
3043 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
3044 size += names_size;
3045 ondisk = kmalloc(size, GFP_KERNEL);
3046 if (!ondisk)
3047 return ERR_PTR(-ENOMEM);
3048
788e2df3 3049 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
7097f8df 3050 0, size, ondisk);
4156d998
AE
3051 if (ret < 0)
3052 goto out_err;
c0cd10db 3053 if ((size_t)ret < size) {
4156d998 3054 ret = -ENXIO;
06ecc6cb
AE
3055 rbd_warn(rbd_dev, "short header read (want %zd got %d)",
3056 size, ret);
4156d998
AE
3057 goto out_err;
3058 }
3059 if (!rbd_dev_ondisk_valid(ondisk)) {
3060 ret = -ENXIO;
06ecc6cb 3061 rbd_warn(rbd_dev, "invalid header");
4156d998 3062 goto out_err;
81e759fb 3063 }
602adf40 3064
4156d998
AE
3065 names_size = le64_to_cpu(ondisk->snap_names_len);
3066 want_count = snap_count;
3067 snap_count = le32_to_cpu(ondisk->snap_count);
3068 } while (snap_count != want_count);
00f1f36f 3069
4156d998 3070 return ondisk;
00f1f36f 3071
4156d998
AE
3072out_err:
3073 kfree(ondisk);
3074
3075 return ERR_PTR(ret);
3076}
3077
3078/*
3079 * reload the ondisk the header
3080 */
3081static int rbd_read_header(struct rbd_device *rbd_dev,
3082 struct rbd_image_header *header)
3083{
3084 struct rbd_image_header_ondisk *ondisk;
4156d998 3085 int ret;
602adf40 3086
7097f8df 3087 ondisk = rbd_dev_v1_header_read(rbd_dev);
4156d998
AE
3088 if (IS_ERR(ondisk))
3089 return PTR_ERR(ondisk);
3090 ret = rbd_header_from_disk(header, ondisk);
4156d998
AE
3091 kfree(ondisk);
3092
3093 return ret;
602adf40
YS
3094}
3095
602adf40
YS
3096/*
3097 * only read the first part of the ondisk header, without the snaps info
3098 */
cc4a38bd 3099static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev)
602adf40
YS
3100{
3101 int ret;
3102 struct rbd_image_header h;
602adf40 3103
46578dcd 3104 memset(&h, 0, sizeof (h));
602adf40
YS
3105 ret = rbd_read_header(rbd_dev, &h);
3106 if (ret < 0)
3107 return ret;
3108
a51aa0c0
JD
3109 down_write(&rbd_dev->header_rwsem);
3110
9478554a
AE
3111 /* Update image size, and check for resize of mapped image */
3112 rbd_dev->header.image_size = h.image_size;
29334ba4
AE
3113 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
3114 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
3115 rbd_dev->mapping.size = rbd_dev->header.image_size;
9db4b3e3 3116
849b4260 3117 /* rbd_dev->header.object_prefix shouldn't change */
602adf40 3118 kfree(rbd_dev->header.snap_sizes);
849b4260 3119 kfree(rbd_dev->header.snap_names);
d1d25646 3120 /* osd requests may still refer to snapc */
812164f8 3121 ceph_put_snap_context(rbd_dev->header.snapc);
602adf40 3122
93a24e08 3123 rbd_dev->header.image_size = h.image_size;
602adf40
YS
3124 rbd_dev->header.snapc = h.snapc;
3125 rbd_dev->header.snap_names = h.snap_names;
3126 rbd_dev->header.snap_sizes = h.snap_sizes;
849b4260 3127 /* Free the extra copy of the object prefix */
c0cd10db
AE
3128 if (strcmp(rbd_dev->header.object_prefix, h.object_prefix))
3129 rbd_warn(rbd_dev, "object prefix changed (ignoring)");
849b4260
AE
3130 kfree(h.object_prefix);
3131
c666601a 3132 up_write(&rbd_dev->header_rwsem);
602adf40 3133
dfc5606d 3134 return ret;
602adf40
YS
3135}
3136
15228ede
AE
3137/*
3138 * Clear the rbd device's EXISTS flag if the snapshot it's mapped to
3139 * has disappeared from the (just updated) snapshot context.
3140 */
3141static void rbd_exists_validate(struct rbd_device *rbd_dev)
3142{
3143 u64 snap_id;
3144
3145 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags))
3146 return;
3147
3148 snap_id = rbd_dev->spec->snap_id;
3149 if (snap_id == CEPH_NOSNAP)
3150 return;
3151
3152 if (rbd_dev_snap_index(rbd_dev, snap_id) == BAD_SNAP_INDEX)
3153 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
3154}
3155
cc4a38bd 3156static int rbd_dev_refresh(struct rbd_device *rbd_dev)
1fe5e993 3157{
e627db08 3158 u64 mapping_size;
1fe5e993
AE
3159 int ret;
3160
117973fb 3161 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
e627db08 3162 mapping_size = rbd_dev->mapping.size;
1fe5e993 3163 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
117973fb 3164 if (rbd_dev->image_format == 1)
cc4a38bd 3165 ret = rbd_dev_v1_refresh(rbd_dev);
117973fb 3166 else
cc4a38bd 3167 ret = rbd_dev_v2_refresh(rbd_dev);
15228ede
AE
3168
3169 /* If it's a mapped snapshot, validate its EXISTS flag */
3170
3171 rbd_exists_validate(rbd_dev);
1fe5e993 3172 mutex_unlock(&ctl_mutex);
00a653e2
AE
3173 if (mapping_size != rbd_dev->mapping.size) {
3174 sector_t size;
3175
3176 size = (sector_t)rbd_dev->mapping.size / SECTOR_SIZE;
3177 dout("setting size to %llu sectors", (unsigned long long)size);
3178 set_capacity(rbd_dev->disk, size);
a3fbe5d4 3179 revalidate_disk(rbd_dev->disk);
00a653e2 3180 }
1fe5e993
AE
3181
3182 return ret;
3183}
3184
602adf40
YS
3185static int rbd_init_disk(struct rbd_device *rbd_dev)
3186{
3187 struct gendisk *disk;
3188 struct request_queue *q;
593a9e7b 3189 u64 segment_size;
602adf40 3190
602adf40 3191 /* create gendisk info */
602adf40
YS
3192 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3193 if (!disk)
1fcdb8aa 3194 return -ENOMEM;
602adf40 3195
f0f8cef5 3196 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
de71a297 3197 rbd_dev->dev_id);
602adf40
YS
3198 disk->major = rbd_dev->major;
3199 disk->first_minor = 0;
3200 disk->fops = &rbd_bd_ops;
3201 disk->private_data = rbd_dev;
3202
bf0d5f50 3203 q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
602adf40
YS
3204 if (!q)
3205 goto out_disk;
029bcbd8 3206
593a9e7b
AE
3207 /* We use the default size, but let's be explicit about it. */
3208 blk_queue_physical_block_size(q, SECTOR_SIZE);
3209
029bcbd8 3210 /* set io sizes to object size */
593a9e7b
AE
3211 segment_size = rbd_obj_bytes(&rbd_dev->header);
3212 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3213 blk_queue_max_segment_size(q, segment_size);
3214 blk_queue_io_min(q, segment_size);
3215 blk_queue_io_opt(q, segment_size);
029bcbd8 3216
602adf40
YS
3217 blk_queue_merge_bvec(q, rbd_merge_bvec);
3218 disk->queue = q;
3219
3220 q->queuedata = rbd_dev;
3221
3222 rbd_dev->disk = disk;
602adf40 3223
602adf40 3224 return 0;
602adf40
YS
3225out_disk:
3226 put_disk(disk);
1fcdb8aa
AE
3227
3228 return -ENOMEM;
602adf40
YS
3229}
3230
dfc5606d
YS
3231/*
3232 sysfs
3233*/
3234
593a9e7b
AE
3235static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3236{
3237 return container_of(dev, struct rbd_device, dev);
3238}
3239
dfc5606d
YS
3240static ssize_t rbd_size_show(struct device *dev,
3241 struct device_attribute *attr, char *buf)
3242{
593a9e7b 3243 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
a51aa0c0 3244
fc71d833
AE
3245 return sprintf(buf, "%llu\n",
3246 (unsigned long long)rbd_dev->mapping.size);
dfc5606d
YS
3247}
3248
34b13184
AE
3249/*
3250 * Note this shows the features for whatever's mapped, which is not
3251 * necessarily the base image.
3252 */
3253static ssize_t rbd_features_show(struct device *dev,
3254 struct device_attribute *attr, char *buf)
3255{
3256 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3257
3258 return sprintf(buf, "0x%016llx\n",
fc71d833 3259 (unsigned long long)rbd_dev->mapping.features);
34b13184
AE
3260}
3261
dfc5606d
YS
3262static ssize_t rbd_major_show(struct device *dev,
3263 struct device_attribute *attr, char *buf)
3264{
593a9e7b 3265 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 3266
fc71d833
AE
3267 if (rbd_dev->major)
3268 return sprintf(buf, "%d\n", rbd_dev->major);
3269
3270 return sprintf(buf, "(none)\n");
3271
dfc5606d
YS
3272}
3273
3274static ssize_t rbd_client_id_show(struct device *dev,
3275 struct device_attribute *attr, char *buf)
602adf40 3276{
593a9e7b 3277 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3278
1dbb4399
AE
3279 return sprintf(buf, "client%lld\n",
3280 ceph_client_id(rbd_dev->rbd_client->client));
602adf40
YS
3281}
3282
dfc5606d
YS
3283static ssize_t rbd_pool_show(struct device *dev,
3284 struct device_attribute *attr, char *buf)
602adf40 3285{
593a9e7b 3286 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3287
0d7dbfce 3288 return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
dfc5606d
YS
3289}
3290
9bb2f334
AE
3291static ssize_t rbd_pool_id_show(struct device *dev,
3292 struct device_attribute *attr, char *buf)
3293{
3294 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3295
0d7dbfce 3296 return sprintf(buf, "%llu\n",
fc71d833 3297 (unsigned long long) rbd_dev->spec->pool_id);
9bb2f334
AE
3298}
3299
dfc5606d
YS
3300static ssize_t rbd_name_show(struct device *dev,
3301 struct device_attribute *attr, char *buf)
3302{
593a9e7b 3303 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3304
a92ffdf8
AE
3305 if (rbd_dev->spec->image_name)
3306 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3307
3308 return sprintf(buf, "(unknown)\n");
dfc5606d
YS
3309}
3310
589d30e0
AE
3311static ssize_t rbd_image_id_show(struct device *dev,
3312 struct device_attribute *attr, char *buf)
3313{
3314 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3315
0d7dbfce 3316 return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
589d30e0
AE
3317}
3318
34b13184
AE
3319/*
3320 * Shows the name of the currently-mapped snapshot (or
3321 * RBD_SNAP_HEAD_NAME for the base image).
3322 */
dfc5606d
YS
3323static ssize_t rbd_snap_show(struct device *dev,
3324 struct device_attribute *attr,
3325 char *buf)
3326{
593a9e7b 3327 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
dfc5606d 3328
0d7dbfce 3329 return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
dfc5606d
YS
3330}
3331
86b00e0d
AE
3332/*
3333 * For an rbd v2 image, shows the pool id, image id, and snapshot id
3334 * for the parent image. If there is no parent, simply shows
3335 * "(no parent image)".
3336 */
3337static ssize_t rbd_parent_show(struct device *dev,
3338 struct device_attribute *attr,
3339 char *buf)
3340{
3341 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3342 struct rbd_spec *spec = rbd_dev->parent_spec;
3343 int count;
3344 char *bufp = buf;
3345
3346 if (!spec)
3347 return sprintf(buf, "(no parent image)\n");
3348
3349 count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3350 (unsigned long long) spec->pool_id, spec->pool_name);
3351 if (count < 0)
3352 return count;
3353 bufp += count;
3354
3355 count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3356 spec->image_name ? spec->image_name : "(unknown)");
3357 if (count < 0)
3358 return count;
3359 bufp += count;
3360
3361 count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3362 (unsigned long long) spec->snap_id, spec->snap_name);
3363 if (count < 0)
3364 return count;
3365 bufp += count;
3366
3367 count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3368 if (count < 0)
3369 return count;
3370 bufp += count;
3371
3372 return (ssize_t) (bufp - buf);
3373}
3374
dfc5606d
YS
3375static ssize_t rbd_image_refresh(struct device *dev,
3376 struct device_attribute *attr,
3377 const char *buf,
3378 size_t size)
3379{
593a9e7b 3380 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
b813623a 3381 int ret;
602adf40 3382
cc4a38bd 3383 ret = rbd_dev_refresh(rbd_dev);
e627db08
AE
3384 if (ret)
3385 rbd_warn(rbd_dev, ": manual header refresh error (%d)\n", ret);
b813623a
AE
3386
3387 return ret < 0 ? ret : size;
dfc5606d 3388}
602adf40 3389
dfc5606d 3390static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
34b13184 3391static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
dfc5606d
YS
3392static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3393static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3394static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
9bb2f334 3395static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
dfc5606d 3396static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
589d30e0 3397static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
dfc5606d
YS
3398static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3399static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
86b00e0d 3400static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
dfc5606d
YS
3401
3402static struct attribute *rbd_attrs[] = {
3403 &dev_attr_size.attr,
34b13184 3404 &dev_attr_features.attr,
dfc5606d
YS
3405 &dev_attr_major.attr,
3406 &dev_attr_client_id.attr,
3407 &dev_attr_pool.attr,
9bb2f334 3408 &dev_attr_pool_id.attr,
dfc5606d 3409 &dev_attr_name.attr,
589d30e0 3410 &dev_attr_image_id.attr,
dfc5606d 3411 &dev_attr_current_snap.attr,
86b00e0d 3412 &dev_attr_parent.attr,
dfc5606d 3413 &dev_attr_refresh.attr,
dfc5606d
YS
3414 NULL
3415};
3416
3417static struct attribute_group rbd_attr_group = {
3418 .attrs = rbd_attrs,
3419};
3420
3421static const struct attribute_group *rbd_attr_groups[] = {
3422 &rbd_attr_group,
3423 NULL
3424};
3425
3426static void rbd_sysfs_dev_release(struct device *dev)
3427{
3428}
3429
3430static struct device_type rbd_device_type = {
3431 .name = "rbd",
3432 .groups = rbd_attr_groups,
3433 .release = rbd_sysfs_dev_release,
3434};
3435
8b8fb99c
AE
3436static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3437{
3438 kref_get(&spec->kref);
3439
3440 return spec;
3441}
3442
3443static void rbd_spec_free(struct kref *kref);
3444static void rbd_spec_put(struct rbd_spec *spec)
3445{
3446 if (spec)
3447 kref_put(&spec->kref, rbd_spec_free);
3448}
3449
3450static struct rbd_spec *rbd_spec_alloc(void)
3451{
3452 struct rbd_spec *spec;
3453
3454 spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3455 if (!spec)
3456 return NULL;
3457 kref_init(&spec->kref);
3458
8b8fb99c
AE
3459 return spec;
3460}
3461
3462static void rbd_spec_free(struct kref *kref)
3463{
3464 struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3465
3466 kfree(spec->pool_name);
3467 kfree(spec->image_id);
3468 kfree(spec->image_name);
3469 kfree(spec->snap_name);
3470 kfree(spec);
3471}
3472
cc344fa1 3473static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
c53d5893
AE
3474 struct rbd_spec *spec)
3475{
3476 struct rbd_device *rbd_dev;
3477
3478 rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3479 if (!rbd_dev)
3480 return NULL;
3481
3482 spin_lock_init(&rbd_dev->lock);
6d292906 3483 rbd_dev->flags = 0;
c53d5893 3484 INIT_LIST_HEAD(&rbd_dev->node);
c53d5893
AE
3485 init_rwsem(&rbd_dev->header_rwsem);
3486
3487 rbd_dev->spec = spec;
3488 rbd_dev->rbd_client = rbdc;
3489
0903e875
AE
3490 /* Initialize the layout used for all rbd requests */
3491
3492 rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3493 rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3494 rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3495 rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3496
c53d5893
AE
3497 return rbd_dev;
3498}
3499
3500static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3501{
c53d5893
AE
3502 rbd_put_client(rbd_dev->rbd_client);
3503 rbd_spec_put(rbd_dev->spec);
3504 kfree(rbd_dev);
3505}
3506
9d475de5
AE
3507/*
3508 * Get the size and object order for an image snapshot, or if
3509 * snap_id is CEPH_NOSNAP, gets this information for the base
3510 * image.
3511 */
3512static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3513 u8 *order, u64 *snap_size)
3514{
3515 __le64 snapid = cpu_to_le64(snap_id);
3516 int ret;
3517 struct {
3518 u8 order;
3519 __le64 size;
3520 } __attribute__ ((packed)) size_buf = { 0 };
3521
36be9a76 3522 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
9d475de5 3523 "rbd", "get_size",
4157976b 3524 &snapid, sizeof (snapid),
e2a58ee5 3525 &size_buf, sizeof (size_buf));
36be9a76 3526 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
9d475de5
AE
3527 if (ret < 0)
3528 return ret;
57385b51
AE
3529 if (ret < sizeof (size_buf))
3530 return -ERANGE;
9d475de5 3531
c86f86e9
AE
3532 if (order)
3533 *order = size_buf.order;
9d475de5
AE
3534 *snap_size = le64_to_cpu(size_buf.size);
3535
3536 dout(" snap_id 0x%016llx order = %u, snap_size = %llu\n",
57385b51
AE
3537 (unsigned long long)snap_id, (unsigned int)*order,
3538 (unsigned long long)*snap_size);
9d475de5
AE
3539
3540 return 0;
3541}
3542
3543static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3544{
3545 return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3546 &rbd_dev->header.obj_order,
3547 &rbd_dev->header.image_size);
3548}
3549
1e130199
AE
3550static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3551{
3552 void *reply_buf;
3553 int ret;
3554 void *p;
3555
3556 reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3557 if (!reply_buf)
3558 return -ENOMEM;
3559
36be9a76 3560 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3561 "rbd", "get_object_prefix", NULL, 0,
e2a58ee5 3562 reply_buf, RBD_OBJ_PREFIX_LEN_MAX);
36be9a76 3563 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
1e130199
AE
3564 if (ret < 0)
3565 goto out;
3566
3567 p = reply_buf;
3568 rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
57385b51
AE
3569 p + ret, NULL, GFP_NOIO);
3570 ret = 0;
1e130199
AE
3571
3572 if (IS_ERR(rbd_dev->header.object_prefix)) {
3573 ret = PTR_ERR(rbd_dev->header.object_prefix);
3574 rbd_dev->header.object_prefix = NULL;
3575 } else {
3576 dout(" object_prefix = %s\n", rbd_dev->header.object_prefix);
3577 }
1e130199
AE
3578out:
3579 kfree(reply_buf);
3580
3581 return ret;
3582}
3583
b1b5402a
AE
3584static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3585 u64 *snap_features)
3586{
3587 __le64 snapid = cpu_to_le64(snap_id);
3588 struct {
3589 __le64 features;
3590 __le64 incompat;
4157976b 3591 } __attribute__ ((packed)) features_buf = { 0 };
d889140c 3592 u64 incompat;
b1b5402a
AE
3593 int ret;
3594
36be9a76 3595 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b1b5402a 3596 "rbd", "get_features",
4157976b 3597 &snapid, sizeof (snapid),
e2a58ee5 3598 &features_buf, sizeof (features_buf));
36be9a76 3599 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
b1b5402a
AE
3600 if (ret < 0)
3601 return ret;
57385b51
AE
3602 if (ret < sizeof (features_buf))
3603 return -ERANGE;
d889140c
AE
3604
3605 incompat = le64_to_cpu(features_buf.incompat);
5cbf6f12 3606 if (incompat & ~RBD_FEATURES_SUPPORTED)
b8f5c6ed 3607 return -ENXIO;
d889140c 3608
b1b5402a
AE
3609 *snap_features = le64_to_cpu(features_buf.features);
3610
3611 dout(" snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
57385b51
AE
3612 (unsigned long long)snap_id,
3613 (unsigned long long)*snap_features,
3614 (unsigned long long)le64_to_cpu(features_buf.incompat));
b1b5402a
AE
3615
3616 return 0;
3617}
3618
3619static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3620{
3621 return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3622 &rbd_dev->header.features);
3623}
3624
86b00e0d
AE
3625static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3626{
3627 struct rbd_spec *parent_spec;
3628 size_t size;
3629 void *reply_buf = NULL;
3630 __le64 snapid;
3631 void *p;
3632 void *end;
3633 char *image_id;
3634 u64 overlap;
86b00e0d
AE
3635 int ret;
3636
3637 parent_spec = rbd_spec_alloc();
3638 if (!parent_spec)
3639 return -ENOMEM;
3640
3641 size = sizeof (__le64) + /* pool_id */
3642 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX + /* image_id */
3643 sizeof (__le64) + /* snap_id */
3644 sizeof (__le64); /* overlap */
3645 reply_buf = kmalloc(size, GFP_KERNEL);
3646 if (!reply_buf) {
3647 ret = -ENOMEM;
3648 goto out_err;
3649 }
3650
3651 snapid = cpu_to_le64(CEPH_NOSNAP);
36be9a76 3652 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
86b00e0d 3653 "rbd", "get_parent",
4157976b 3654 &snapid, sizeof (snapid),
e2a58ee5 3655 reply_buf, size);
36be9a76 3656 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
86b00e0d
AE
3657 if (ret < 0)
3658 goto out_err;
3659
86b00e0d 3660 p = reply_buf;
57385b51
AE
3661 end = reply_buf + ret;
3662 ret = -ERANGE;
86b00e0d
AE
3663 ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3664 if (parent_spec->pool_id == CEPH_NOPOOL)
3665 goto out; /* No parent? No problem. */
3666
0903e875
AE
3667 /* The ceph file layout needs to fit pool id in 32 bits */
3668
3669 ret = -EIO;
c0cd10db
AE
3670 if (parent_spec->pool_id > (u64)U32_MAX) {
3671 rbd_warn(NULL, "parent pool id too large (%llu > %u)\n",
3672 (unsigned long long)parent_spec->pool_id, U32_MAX);
57385b51 3673 goto out_err;
c0cd10db 3674 }
0903e875 3675
979ed480 3676 image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
86b00e0d
AE
3677 if (IS_ERR(image_id)) {
3678 ret = PTR_ERR(image_id);
3679 goto out_err;
3680 }
3681 parent_spec->image_id = image_id;
3682 ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3683 ceph_decode_64_safe(&p, end, overlap, out_err);
3684
3685 rbd_dev->parent_overlap = overlap;
3686 rbd_dev->parent_spec = parent_spec;
3687 parent_spec = NULL; /* rbd_dev now owns this */
3688out:
3689 ret = 0;
3690out_err:
3691 kfree(reply_buf);
3692 rbd_spec_put(parent_spec);
3693
3694 return ret;
3695}
3696
cc070d59
AE
3697static int rbd_dev_v2_striping_info(struct rbd_device *rbd_dev)
3698{
3699 struct {
3700 __le64 stripe_unit;
3701 __le64 stripe_count;
3702 } __attribute__ ((packed)) striping_info_buf = { 0 };
3703 size_t size = sizeof (striping_info_buf);
3704 void *p;
3705 u64 obj_size;
3706 u64 stripe_unit;
3707 u64 stripe_count;
3708 int ret;
3709
3710 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3711 "rbd", "get_stripe_unit_count", NULL, 0,
e2a58ee5 3712 (char *)&striping_info_buf, size);
cc070d59
AE
3713 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3714 if (ret < 0)
3715 return ret;
3716 if (ret < size)
3717 return -ERANGE;
3718
3719 /*
3720 * We don't actually support the "fancy striping" feature
3721 * (STRIPINGV2) yet, but if the striping sizes are the
3722 * defaults the behavior is the same as before. So find
3723 * out, and only fail if the image has non-default values.
3724 */
3725 ret = -EINVAL;
3726 obj_size = (u64)1 << rbd_dev->header.obj_order;
3727 p = &striping_info_buf;
3728 stripe_unit = ceph_decode_64(&p);
3729 if (stripe_unit != obj_size) {
3730 rbd_warn(rbd_dev, "unsupported stripe unit "
3731 "(got %llu want %llu)",
3732 stripe_unit, obj_size);
3733 return -EINVAL;
3734 }
3735 stripe_count = ceph_decode_64(&p);
3736 if (stripe_count != 1) {
3737 rbd_warn(rbd_dev, "unsupported stripe count "
3738 "(got %llu want 1)", stripe_count);
3739 return -EINVAL;
3740 }
500d0c0f
AE
3741 rbd_dev->header.stripe_unit = stripe_unit;
3742 rbd_dev->header.stripe_count = stripe_count;
cc070d59
AE
3743
3744 return 0;
3745}
3746
9e15b77d
AE
3747static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3748{
3749 size_t image_id_size;
3750 char *image_id;
3751 void *p;
3752 void *end;
3753 size_t size;
3754 void *reply_buf = NULL;
3755 size_t len = 0;
3756 char *image_name = NULL;
3757 int ret;
3758
3759 rbd_assert(!rbd_dev->spec->image_name);
3760
69e7a02f
AE
3761 len = strlen(rbd_dev->spec->image_id);
3762 image_id_size = sizeof (__le32) + len;
9e15b77d
AE
3763 image_id = kmalloc(image_id_size, GFP_KERNEL);
3764 if (!image_id)
3765 return NULL;
3766
3767 p = image_id;
4157976b 3768 end = image_id + image_id_size;
57385b51 3769 ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
9e15b77d
AE
3770
3771 size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3772 reply_buf = kmalloc(size, GFP_KERNEL);
3773 if (!reply_buf)
3774 goto out;
3775
36be9a76 3776 ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
9e15b77d
AE
3777 "rbd", "dir_get_name",
3778 image_id, image_id_size,
e2a58ee5 3779 reply_buf, size);
9e15b77d
AE
3780 if (ret < 0)
3781 goto out;
3782 p = reply_buf;
f40eb349
AE
3783 end = reply_buf + ret;
3784
9e15b77d
AE
3785 image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3786 if (IS_ERR(image_name))
3787 image_name = NULL;
3788 else
3789 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3790out:
3791 kfree(reply_buf);
3792 kfree(image_id);
3793
3794 return image_name;
3795}
3796
2ad3d716
AE
3797static u64 rbd_v1_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3798{
3799 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3800 const char *snap_name;
3801 u32 which = 0;
3802
3803 /* Skip over names until we find the one we are looking for */
3804
3805 snap_name = rbd_dev->header.snap_names;
3806 while (which < snapc->num_snaps) {
3807 if (!strcmp(name, snap_name))
3808 return snapc->snaps[which];
3809 snap_name += strlen(snap_name) + 1;
3810 which++;
3811 }
3812 return CEPH_NOSNAP;
3813}
3814
3815static u64 rbd_v2_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3816{
3817 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
3818 u32 which;
3819 bool found = false;
3820 u64 snap_id;
3821
3822 for (which = 0; !found && which < snapc->num_snaps; which++) {
3823 const char *snap_name;
3824
3825 snap_id = snapc->snaps[which];
3826 snap_name = rbd_dev_v2_snap_name(rbd_dev, snap_id);
3827 if (IS_ERR(snap_name))
3828 break;
3829 found = !strcmp(name, snap_name);
3830 kfree(snap_name);
3831 }
3832 return found ? snap_id : CEPH_NOSNAP;
3833}
3834
3835/*
3836 * Assumes name is never RBD_SNAP_HEAD_NAME; returns CEPH_NOSNAP if
3837 * no snapshot by that name is found, or if an error occurs.
3838 */
3839static u64 rbd_snap_id_by_name(struct rbd_device *rbd_dev, const char *name)
3840{
3841 if (rbd_dev->image_format == 1)
3842 return rbd_v1_snap_id_by_name(rbd_dev, name);
3843
3844 return rbd_v2_snap_id_by_name(rbd_dev, name);
3845}
3846
9e15b77d 3847/*
2e9f7f1c
AE
3848 * When an rbd image has a parent image, it is identified by the
3849 * pool, image, and snapshot ids (not names). This function fills
3850 * in the names for those ids. (It's OK if we can't figure out the
3851 * name for an image id, but the pool and snapshot ids should always
3852 * exist and have names.) All names in an rbd spec are dynamically
3853 * allocated.
e1d4213f
AE
3854 *
3855 * When an image being mapped (not a parent) is probed, we have the
3856 * pool name and pool id, image name and image id, and the snapshot
3857 * name. The only thing we're missing is the snapshot id.
9e15b77d 3858 */
2e9f7f1c 3859static int rbd_dev_spec_update(struct rbd_device *rbd_dev)
9e15b77d 3860{
2e9f7f1c
AE
3861 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
3862 struct rbd_spec *spec = rbd_dev->spec;
3863 const char *pool_name;
3864 const char *image_name;
3865 const char *snap_name;
9e15b77d
AE
3866 int ret;
3867
e1d4213f
AE
3868 /*
3869 * An image being mapped will have the pool name (etc.), but
3870 * we need to look up the snapshot id.
3871 */
2e9f7f1c
AE
3872 if (spec->pool_name) {
3873 if (strcmp(spec->snap_name, RBD_SNAP_HEAD_NAME)) {
2ad3d716 3874 u64 snap_id;
e1d4213f 3875
2ad3d716
AE
3876 snap_id = rbd_snap_id_by_name(rbd_dev, spec->snap_name);
3877 if (snap_id == CEPH_NOSNAP)
e1d4213f 3878 return -ENOENT;
2ad3d716 3879 spec->snap_id = snap_id;
e1d4213f 3880 } else {
2e9f7f1c 3881 spec->snap_id = CEPH_NOSNAP;
e1d4213f
AE
3882 }
3883
3884 return 0;
3885 }
9e15b77d 3886
2e9f7f1c 3887 /* Get the pool name; we have to make our own copy of this */
9e15b77d 3888
2e9f7f1c
AE
3889 pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, spec->pool_id);
3890 if (!pool_name) {
3891 rbd_warn(rbd_dev, "no pool with id %llu", spec->pool_id);
935dc89f
AE
3892 return -EIO;
3893 }
2e9f7f1c
AE
3894 pool_name = kstrdup(pool_name, GFP_KERNEL);
3895 if (!pool_name)
9e15b77d
AE
3896 return -ENOMEM;
3897
3898 /* Fetch the image name; tolerate failure here */
3899
2e9f7f1c
AE
3900 image_name = rbd_dev_image_name(rbd_dev);
3901 if (!image_name)
06ecc6cb 3902 rbd_warn(rbd_dev, "unable to get image name");
9e15b77d 3903
2e9f7f1c 3904 /* Look up the snapshot name, and make a copy */
9e15b77d 3905
2e9f7f1c 3906 snap_name = rbd_snap_name(rbd_dev, spec->snap_id);
2e9f7f1c
AE
3907 if (!snap_name) {
3908 ret = -ENOMEM;
9e15b77d 3909 goto out_err;
2e9f7f1c
AE
3910 }
3911
3912 spec->pool_name = pool_name;
3913 spec->image_name = image_name;
3914 spec->snap_name = snap_name;
9e15b77d
AE
3915
3916 return 0;
3917out_err:
2e9f7f1c
AE
3918 kfree(image_name);
3919 kfree(pool_name);
9e15b77d
AE
3920
3921 return ret;
3922}
3923
cc4a38bd 3924static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev)
35d489f9
AE
3925{
3926 size_t size;
3927 int ret;
3928 void *reply_buf;
3929 void *p;
3930 void *end;
3931 u64 seq;
3932 u32 snap_count;
3933 struct ceph_snap_context *snapc;
3934 u32 i;
3935
3936 /*
3937 * We'll need room for the seq value (maximum snapshot id),
3938 * snapshot count, and array of that many snapshot ids.
3939 * For now we have a fixed upper limit on the number we're
3940 * prepared to receive.
3941 */
3942 size = sizeof (__le64) + sizeof (__le32) +
3943 RBD_MAX_SNAP_COUNT * sizeof (__le64);
3944 reply_buf = kzalloc(size, GFP_KERNEL);
3945 if (!reply_buf)
3946 return -ENOMEM;
3947
36be9a76 3948 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
4157976b 3949 "rbd", "get_snapcontext", NULL, 0,
e2a58ee5 3950 reply_buf, size);
36be9a76 3951 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
35d489f9
AE
3952 if (ret < 0)
3953 goto out;
3954
35d489f9 3955 p = reply_buf;
57385b51
AE
3956 end = reply_buf + ret;
3957 ret = -ERANGE;
35d489f9
AE
3958 ceph_decode_64_safe(&p, end, seq, out);
3959 ceph_decode_32_safe(&p, end, snap_count, out);
3960
3961 /*
3962 * Make sure the reported number of snapshot ids wouldn't go
3963 * beyond the end of our buffer. But before checking that,
3964 * make sure the computed size of the snapshot context we
3965 * allocate is representable in a size_t.
3966 */
3967 if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3968 / sizeof (u64)) {
3969 ret = -EINVAL;
3970 goto out;
3971 }
3972 if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3973 goto out;
468521c1 3974 ret = 0;
35d489f9 3975
812164f8 3976 snapc = ceph_create_snap_context(snap_count, GFP_KERNEL);
35d489f9
AE
3977 if (!snapc) {
3978 ret = -ENOMEM;
3979 goto out;
3980 }
35d489f9 3981 snapc->seq = seq;
35d489f9
AE
3982 for (i = 0; i < snap_count; i++)
3983 snapc->snaps[i] = ceph_decode_64(&p);
3984
49ece554 3985 ceph_put_snap_context(rbd_dev->header.snapc);
35d489f9
AE
3986 rbd_dev->header.snapc = snapc;
3987
3988 dout(" snap context seq = %llu, snap_count = %u\n",
57385b51 3989 (unsigned long long)seq, (unsigned int)snap_count);
35d489f9
AE
3990out:
3991 kfree(reply_buf);
3992
57385b51 3993 return ret;
35d489f9
AE
3994}
3995
54cac61f
AE
3996static const char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev,
3997 u64 snap_id)
b8b1e2db
AE
3998{
3999 size_t size;
4000 void *reply_buf;
54cac61f 4001 __le64 snapid;
b8b1e2db
AE
4002 int ret;
4003 void *p;
4004 void *end;
b8b1e2db
AE
4005 char *snap_name;
4006
4007 size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
4008 reply_buf = kmalloc(size, GFP_KERNEL);
4009 if (!reply_buf)
4010 return ERR_PTR(-ENOMEM);
4011
54cac61f 4012 snapid = cpu_to_le64(snap_id);
36be9a76 4013 ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
b8b1e2db 4014 "rbd", "get_snapshot_name",
54cac61f 4015 &snapid, sizeof (snapid),
e2a58ee5 4016 reply_buf, size);
36be9a76 4017 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
f40eb349
AE
4018 if (ret < 0) {
4019 snap_name = ERR_PTR(ret);
b8b1e2db 4020 goto out;
f40eb349 4021 }
b8b1e2db
AE
4022
4023 p = reply_buf;
f40eb349 4024 end = reply_buf + ret;
e5c35534 4025 snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
f40eb349 4026 if (IS_ERR(snap_name))
b8b1e2db 4027 goto out;
b8b1e2db 4028
f40eb349 4029 dout(" snap_id 0x%016llx snap_name = %s\n",
54cac61f 4030 (unsigned long long)snap_id, snap_name);
b8b1e2db
AE
4031out:
4032 kfree(reply_buf);
4033
f40eb349 4034 return snap_name;
b8b1e2db
AE
4035}
4036
cc4a38bd 4037static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev)
117973fb
AE
4038{
4039 int ret;
117973fb
AE
4040
4041 down_write(&rbd_dev->header_rwsem);
4042
117973fb
AE
4043 ret = rbd_dev_v2_image_size(rbd_dev);
4044 if (ret)
4045 goto out;
29334ba4
AE
4046 if (rbd_dev->spec->snap_id == CEPH_NOSNAP)
4047 if (rbd_dev->mapping.size != rbd_dev->header.image_size)
4048 rbd_dev->mapping.size = rbd_dev->header.image_size;
117973fb 4049
cc4a38bd 4050 ret = rbd_dev_v2_snap_context(rbd_dev);
117973fb
AE
4051 dout("rbd_dev_v2_snap_context returned %d\n", ret);
4052 if (ret)
4053 goto out;
117973fb
AE
4054out:
4055 up_write(&rbd_dev->header_rwsem);
4056
4057 return ret;
4058}
4059
dfc5606d
YS
4060static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4061{
dfc5606d 4062 struct device *dev;
cd789ab9 4063 int ret;
dfc5606d
YS
4064
4065 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
dfc5606d 4066
cd789ab9 4067 dev = &rbd_dev->dev;
dfc5606d
YS
4068 dev->bus = &rbd_bus_type;
4069 dev->type = &rbd_device_type;
4070 dev->parent = &rbd_root_dev;
200a6a8b 4071 dev->release = rbd_dev_device_release;
de71a297 4072 dev_set_name(dev, "%d", rbd_dev->dev_id);
dfc5606d 4073 ret = device_register(dev);
dfc5606d 4074
dfc5606d 4075 mutex_unlock(&ctl_mutex);
cd789ab9 4076
dfc5606d 4077 return ret;
602adf40
YS
4078}
4079
dfc5606d
YS
4080static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4081{
4082 device_unregister(&rbd_dev->dev);
4083}
4084
e2839308 4085static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
1ddbe94e
AE
4086
4087/*
499afd5b
AE
4088 * Get a unique rbd identifier for the given new rbd_dev, and add
4089 * the rbd_dev to the global list. The minimum rbd id is 1.
1ddbe94e 4090 */
e2839308 4091static void rbd_dev_id_get(struct rbd_device *rbd_dev)
b7f23c36 4092{
e2839308 4093 rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
499afd5b
AE
4094
4095 spin_lock(&rbd_dev_list_lock);
4096 list_add_tail(&rbd_dev->node, &rbd_dev_list);
4097 spin_unlock(&rbd_dev_list_lock);
e2839308
AE
4098 dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4099 (unsigned long long) rbd_dev->dev_id);
1ddbe94e 4100}
b7f23c36 4101
1ddbe94e 4102/*
499afd5b
AE
4103 * Remove an rbd_dev from the global list, and record that its
4104 * identifier is no longer in use.
1ddbe94e 4105 */
e2839308 4106static void rbd_dev_id_put(struct rbd_device *rbd_dev)
1ddbe94e 4107{
d184f6bf 4108 struct list_head *tmp;
de71a297 4109 int rbd_id = rbd_dev->dev_id;
d184f6bf
AE
4110 int max_id;
4111
aafb230e 4112 rbd_assert(rbd_id > 0);
499afd5b 4113
e2839308
AE
4114 dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4115 (unsigned long long) rbd_dev->dev_id);
499afd5b
AE
4116 spin_lock(&rbd_dev_list_lock);
4117 list_del_init(&rbd_dev->node);
d184f6bf
AE
4118
4119 /*
4120 * If the id being "put" is not the current maximum, there
4121 * is nothing special we need to do.
4122 */
e2839308 4123 if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
d184f6bf
AE
4124 spin_unlock(&rbd_dev_list_lock);
4125 return;
4126 }
4127
4128 /*
4129 * We need to update the current maximum id. Search the
4130 * list to find out what it is. We're more likely to find
4131 * the maximum at the end, so search the list backward.
4132 */
4133 max_id = 0;
4134 list_for_each_prev(tmp, &rbd_dev_list) {
4135 struct rbd_device *rbd_dev;
4136
4137 rbd_dev = list_entry(tmp, struct rbd_device, node);
b213e0b1
AE
4138 if (rbd_dev->dev_id > max_id)
4139 max_id = rbd_dev->dev_id;
d184f6bf 4140 }
499afd5b 4141 spin_unlock(&rbd_dev_list_lock);
b7f23c36 4142
1ddbe94e 4143 /*
e2839308 4144 * The max id could have been updated by rbd_dev_id_get(), in
d184f6bf
AE
4145 * which case it now accurately reflects the new maximum.
4146 * Be careful not to overwrite the maximum value in that
4147 * case.
1ddbe94e 4148 */
e2839308
AE
4149 atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4150 dout(" max dev id has been reset\n");
b7f23c36
AE
4151}
4152
e28fff26
AE
4153/*
4154 * Skips over white space at *buf, and updates *buf to point to the
4155 * first found non-space character (if any). Returns the length of
593a9e7b
AE
4156 * the token (string of non-white space characters) found. Note
4157 * that *buf must be terminated with '\0'.
e28fff26
AE
4158 */
4159static inline size_t next_token(const char **buf)
4160{
4161 /*
4162 * These are the characters that produce nonzero for
4163 * isspace() in the "C" and "POSIX" locales.
4164 */
4165 const char *spaces = " \f\n\r\t\v";
4166
4167 *buf += strspn(*buf, spaces); /* Find start of token */
4168
4169 return strcspn(*buf, spaces); /* Return token length */
4170}
4171
4172/*
4173 * Finds the next token in *buf, and if the provided token buffer is
4174 * big enough, copies the found token into it. The result, if
593a9e7b
AE
4175 * copied, is guaranteed to be terminated with '\0'. Note that *buf
4176 * must be terminated with '\0' on entry.
e28fff26
AE
4177 *
4178 * Returns the length of the token found (not including the '\0').
4179 * Return value will be 0 if no token is found, and it will be >=
4180 * token_size if the token would not fit.
4181 *
593a9e7b 4182 * The *buf pointer will be updated to point beyond the end of the
e28fff26
AE
4183 * found token. Note that this occurs even if the token buffer is
4184 * too small to hold it.
4185 */
4186static inline size_t copy_token(const char **buf,
4187 char *token,
4188 size_t token_size)
4189{
4190 size_t len;
4191
4192 len = next_token(buf);
4193 if (len < token_size) {
4194 memcpy(token, *buf, len);
4195 *(token + len) = '\0';
4196 }
4197 *buf += len;
4198
4199 return len;
4200}
4201
ea3352f4
AE
4202/*
4203 * Finds the next token in *buf, dynamically allocates a buffer big
4204 * enough to hold a copy of it, and copies the token into the new
4205 * buffer. The copy is guaranteed to be terminated with '\0'. Note
4206 * that a duplicate buffer is created even for a zero-length token.
4207 *
4208 * Returns a pointer to the newly-allocated duplicate, or a null
4209 * pointer if memory for the duplicate was not available. If
4210 * the lenp argument is a non-null pointer, the length of the token
4211 * (not including the '\0') is returned in *lenp.
4212 *
4213 * If successful, the *buf pointer will be updated to point beyond
4214 * the end of the found token.
4215 *
4216 * Note: uses GFP_KERNEL for allocation.
4217 */
4218static inline char *dup_token(const char **buf, size_t *lenp)
4219{
4220 char *dup;
4221 size_t len;
4222
4223 len = next_token(buf);
4caf35f9 4224 dup = kmemdup(*buf, len + 1, GFP_KERNEL);
ea3352f4
AE
4225 if (!dup)
4226 return NULL;
ea3352f4
AE
4227 *(dup + len) = '\0';
4228 *buf += len;
4229
4230 if (lenp)
4231 *lenp = len;
4232
4233 return dup;
4234}
4235
a725f65e 4236/*
859c31df
AE
4237 * Parse the options provided for an "rbd add" (i.e., rbd image
4238 * mapping) request. These arrive via a write to /sys/bus/rbd/add,
4239 * and the data written is passed here via a NUL-terminated buffer.
4240 * Returns 0 if successful or an error code otherwise.
d22f76e7 4241 *
859c31df
AE
4242 * The information extracted from these options is recorded in
4243 * the other parameters which return dynamically-allocated
4244 * structures:
4245 * ceph_opts
4246 * The address of a pointer that will refer to a ceph options
4247 * structure. Caller must release the returned pointer using
4248 * ceph_destroy_options() when it is no longer needed.
4249 * rbd_opts
4250 * Address of an rbd options pointer. Fully initialized by
4251 * this function; caller must release with kfree().
4252 * spec
4253 * Address of an rbd image specification pointer. Fully
4254 * initialized by this function based on parsed options.
4255 * Caller must release with rbd_spec_put().
4256 *
4257 * The options passed take this form:
4258 * <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4259 * where:
4260 * <mon_addrs>
4261 * A comma-separated list of one or more monitor addresses.
4262 * A monitor address is an ip address, optionally followed
4263 * by a port number (separated by a colon).
4264 * I.e.: ip1[:port1][,ip2[:port2]...]
4265 * <options>
4266 * A comma-separated list of ceph and/or rbd options.
4267 * <pool_name>
4268 * The name of the rados pool containing the rbd image.
4269 * <image_name>
4270 * The name of the image in that pool to map.
4271 * <snap_id>
4272 * An optional snapshot id. If provided, the mapping will
4273 * present data from the image at the time that snapshot was
4274 * created. The image head is used if no snapshot id is
4275 * provided. Snapshot mappings are always read-only.
a725f65e 4276 */
859c31df 4277static int rbd_add_parse_args(const char *buf,
dc79b113 4278 struct ceph_options **ceph_opts,
859c31df
AE
4279 struct rbd_options **opts,
4280 struct rbd_spec **rbd_spec)
e28fff26 4281{
d22f76e7 4282 size_t len;
859c31df 4283 char *options;
0ddebc0c 4284 const char *mon_addrs;
ecb4dc22 4285 char *snap_name;
0ddebc0c 4286 size_t mon_addrs_size;
859c31df 4287 struct rbd_spec *spec = NULL;
4e9afeba 4288 struct rbd_options *rbd_opts = NULL;
859c31df 4289 struct ceph_options *copts;
dc79b113 4290 int ret;
e28fff26
AE
4291
4292 /* The first four tokens are required */
4293
7ef3214a 4294 len = next_token(&buf);
4fb5d671
AE
4295 if (!len) {
4296 rbd_warn(NULL, "no monitor address(es) provided");
4297 return -EINVAL;
4298 }
0ddebc0c 4299 mon_addrs = buf;
f28e565a 4300 mon_addrs_size = len + 1;
7ef3214a 4301 buf += len;
a725f65e 4302
dc79b113 4303 ret = -EINVAL;
f28e565a
AE
4304 options = dup_token(&buf, NULL);
4305 if (!options)
dc79b113 4306 return -ENOMEM;
4fb5d671
AE
4307 if (!*options) {
4308 rbd_warn(NULL, "no options provided");
4309 goto out_err;
4310 }
e28fff26 4311
859c31df
AE
4312 spec = rbd_spec_alloc();
4313 if (!spec)
f28e565a 4314 goto out_mem;
859c31df
AE
4315
4316 spec->pool_name = dup_token(&buf, NULL);
4317 if (!spec->pool_name)
4318 goto out_mem;
4fb5d671
AE
4319 if (!*spec->pool_name) {
4320 rbd_warn(NULL, "no pool name provided");
4321 goto out_err;
4322 }
e28fff26 4323
69e7a02f 4324 spec->image_name = dup_token(&buf, NULL);
859c31df 4325 if (!spec->image_name)
f28e565a 4326 goto out_mem;
4fb5d671
AE
4327 if (!*spec->image_name) {
4328 rbd_warn(NULL, "no image name provided");
4329 goto out_err;
4330 }
d4b125e9 4331
f28e565a
AE
4332 /*
4333 * Snapshot name is optional; default is to use "-"
4334 * (indicating the head/no snapshot).
4335 */
3feeb894 4336 len = next_token(&buf);
820a5f3e 4337 if (!len) {
3feeb894
AE
4338 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4339 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
f28e565a 4340 } else if (len > RBD_MAX_SNAP_NAME_LEN) {
dc79b113 4341 ret = -ENAMETOOLONG;
f28e565a 4342 goto out_err;
849b4260 4343 }
ecb4dc22
AE
4344 snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4345 if (!snap_name)
f28e565a 4346 goto out_mem;
ecb4dc22
AE
4347 *(snap_name + len) = '\0';
4348 spec->snap_name = snap_name;
e5c35534 4349
0ddebc0c 4350 /* Initialize all rbd options to the defaults */
e28fff26 4351
4e9afeba
AE
4352 rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4353 if (!rbd_opts)
4354 goto out_mem;
4355
4356 rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
d22f76e7 4357
859c31df 4358 copts = ceph_parse_options(options, mon_addrs,
0ddebc0c 4359 mon_addrs + mon_addrs_size - 1,
4e9afeba 4360 parse_rbd_opts_token, rbd_opts);
859c31df
AE
4361 if (IS_ERR(copts)) {
4362 ret = PTR_ERR(copts);
dc79b113
AE
4363 goto out_err;
4364 }
859c31df
AE
4365 kfree(options);
4366
4367 *ceph_opts = copts;
4e9afeba 4368 *opts = rbd_opts;
859c31df 4369 *rbd_spec = spec;
0ddebc0c 4370
dc79b113 4371 return 0;
f28e565a 4372out_mem:
dc79b113 4373 ret = -ENOMEM;
d22f76e7 4374out_err:
859c31df
AE
4375 kfree(rbd_opts);
4376 rbd_spec_put(spec);
f28e565a 4377 kfree(options);
d22f76e7 4378
dc79b113 4379 return ret;
a725f65e
AE
4380}
4381
589d30e0
AE
4382/*
4383 * An rbd format 2 image has a unique identifier, distinct from the
4384 * name given to it by the user. Internally, that identifier is
4385 * what's used to specify the names of objects related to the image.
4386 *
4387 * A special "rbd id" object is used to map an rbd image name to its
4388 * id. If that object doesn't exist, then there is no v2 rbd image
4389 * with the supplied name.
4390 *
4391 * This function will record the given rbd_dev's image_id field if
4392 * it can be determined, and in that case will return 0. If any
4393 * errors occur a negative errno will be returned and the rbd_dev's
4394 * image_id field will be unchanged (and should be NULL).
4395 */
4396static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4397{
4398 int ret;
4399 size_t size;
4400 char *object_name;
4401 void *response;
c0fba368 4402 char *image_id;
2f82ee54 4403
2c0d0a10
AE
4404 /*
4405 * When probing a parent image, the image id is already
4406 * known (and the image name likely is not). There's no
c0fba368
AE
4407 * need to fetch the image id again in this case. We
4408 * do still need to set the image format though.
2c0d0a10 4409 */
c0fba368
AE
4410 if (rbd_dev->spec->image_id) {
4411 rbd_dev->image_format = *rbd_dev->spec->image_id ? 2 : 1;
4412
2c0d0a10 4413 return 0;
c0fba368 4414 }
2c0d0a10 4415
589d30e0
AE
4416 /*
4417 * First, see if the format 2 image id file exists, and if
4418 * so, get the image's persistent id from it.
4419 */
69e7a02f 4420 size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
589d30e0
AE
4421 object_name = kmalloc(size, GFP_NOIO);
4422 if (!object_name)
4423 return -ENOMEM;
0d7dbfce 4424 sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
589d30e0
AE
4425 dout("rbd id object name is %s\n", object_name);
4426
4427 /* Response will be an encoded string, which includes a length */
4428
4429 size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4430 response = kzalloc(size, GFP_NOIO);
4431 if (!response) {
4432 ret = -ENOMEM;
4433 goto out;
4434 }
4435
c0fba368
AE
4436 /* If it doesn't exist we'll assume it's a format 1 image */
4437
36be9a76 4438 ret = rbd_obj_method_sync(rbd_dev, object_name,
4157976b 4439 "rbd", "get_id", NULL, 0,
e2a58ee5 4440 response, RBD_IMAGE_ID_LEN_MAX);
36be9a76 4441 dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
c0fba368
AE
4442 if (ret == -ENOENT) {
4443 image_id = kstrdup("", GFP_KERNEL);
4444 ret = image_id ? 0 : -ENOMEM;
4445 if (!ret)
4446 rbd_dev->image_format = 1;
4447 } else if (ret > sizeof (__le32)) {
4448 void *p = response;
4449
4450 image_id = ceph_extract_encoded_string(&p, p + ret,
979ed480 4451 NULL, GFP_NOIO);
c0fba368
AE
4452 ret = IS_ERR(image_id) ? PTR_ERR(image_id) : 0;
4453 if (!ret)
4454 rbd_dev->image_format = 2;
589d30e0 4455 } else {
c0fba368
AE
4456 ret = -EINVAL;
4457 }
4458
4459 if (!ret) {
4460 rbd_dev->spec->image_id = image_id;
4461 dout("image_id is %s\n", image_id);
589d30e0
AE
4462 }
4463out:
4464 kfree(response);
4465 kfree(object_name);
4466
4467 return ret;
4468}
4469
6fd48b3b
AE
4470/* Undo whatever state changes are made by v1 or v2 image probe */
4471
4472static void rbd_dev_unprobe(struct rbd_device *rbd_dev)
4473{
4474 struct rbd_image_header *header;
4475
4476 rbd_dev_remove_parent(rbd_dev);
4477 rbd_spec_put(rbd_dev->parent_spec);
4478 rbd_dev->parent_spec = NULL;
4479 rbd_dev->parent_overlap = 0;
4480
4481 /* Free dynamic fields from the header, then zero it out */
4482
4483 header = &rbd_dev->header;
812164f8 4484 ceph_put_snap_context(header->snapc);
6fd48b3b
AE
4485 kfree(header->snap_sizes);
4486 kfree(header->snap_names);
4487 kfree(header->object_prefix);
4488 memset(header, 0, sizeof (*header));
4489}
4490
a30b71b9
AE
4491static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4492{
4493 int ret;
a30b71b9
AE
4494
4495 /* Populate rbd image metadata */
4496
4497 ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4498 if (ret < 0)
4499 goto out_err;
86b00e0d
AE
4500
4501 /* Version 1 images have no parent (no layering) */
4502
4503 rbd_dev->parent_spec = NULL;
4504 rbd_dev->parent_overlap = 0;
4505
a30b71b9
AE
4506 dout("discovered version 1 image, header name is %s\n",
4507 rbd_dev->header_name);
4508
4509 return 0;
4510
4511out_err:
4512 kfree(rbd_dev->header_name);
4513 rbd_dev->header_name = NULL;
0d7dbfce
AE
4514 kfree(rbd_dev->spec->image_id);
4515 rbd_dev->spec->image_id = NULL;
a30b71b9
AE
4516
4517 return ret;
4518}
4519
4520static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4521{
9d475de5 4522 int ret;
a30b71b9 4523
9d475de5 4524 ret = rbd_dev_v2_image_size(rbd_dev);
57385b51 4525 if (ret)
1e130199
AE
4526 goto out_err;
4527
4528 /* Get the object prefix (a.k.a. block_name) for the image */
4529
4530 ret = rbd_dev_v2_object_prefix(rbd_dev);
57385b51 4531 if (ret)
b1b5402a
AE
4532 goto out_err;
4533
d889140c 4534 /* Get the and check features for the image */
b1b5402a
AE
4535
4536 ret = rbd_dev_v2_features(rbd_dev);
57385b51 4537 if (ret)
9d475de5 4538 goto out_err;
35d489f9 4539
86b00e0d
AE
4540 /* If the image supports layering, get the parent info */
4541
4542 if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4543 ret = rbd_dev_v2_parent_info(rbd_dev);
57385b51 4544 if (ret)
86b00e0d 4545 goto out_err;
96882f55 4546 /*
c734b796
AE
4547 * Print a warning if this image has a parent.
4548 * Don't print it if the image now being probed
4549 * is itself a parent. We can tell at this point
4550 * because we won't know its pool name yet (just its
4551 * pool id).
96882f55 4552 */
c734b796 4553 if (rbd_dev->parent_spec && rbd_dev->spec->pool_name)
96882f55
AE
4554 rbd_warn(rbd_dev, "WARNING: kernel layering "
4555 "is EXPERIMENTAL!");
86b00e0d
AE
4556 }
4557
cc070d59
AE
4558 /* If the image supports fancy striping, get its parameters */
4559
4560 if (rbd_dev->header.features & RBD_FEATURE_STRIPINGV2) {
4561 ret = rbd_dev_v2_striping_info(rbd_dev);
4562 if (ret < 0)
4563 goto out_err;
4564 }
4565
6e14b1a6
AE
4566 /* crypto and compression type aren't (yet) supported for v2 images */
4567
4568 rbd_dev->header.crypt_type = 0;
4569 rbd_dev->header.comp_type = 0;
35d489f9 4570
6e14b1a6
AE
4571 /* Get the snapshot context, plus the header version */
4572
cc4a38bd 4573 ret = rbd_dev_v2_snap_context(rbd_dev);
35d489f9
AE
4574 if (ret)
4575 goto out_err;
6e14b1a6 4576
a30b71b9
AE
4577 dout("discovered version 2 image, header name is %s\n",
4578 rbd_dev->header_name);
4579
35152979 4580 return 0;
9d475de5 4581out_err:
86b00e0d
AE
4582 rbd_dev->parent_overlap = 0;
4583 rbd_spec_put(rbd_dev->parent_spec);
4584 rbd_dev->parent_spec = NULL;
9d475de5
AE
4585 kfree(rbd_dev->header_name);
4586 rbd_dev->header_name = NULL;
1e130199
AE
4587 kfree(rbd_dev->header.object_prefix);
4588 rbd_dev->header.object_prefix = NULL;
9d475de5
AE
4589
4590 return ret;
a30b71b9
AE
4591}
4592
124afba2 4593static int rbd_dev_probe_parent(struct rbd_device *rbd_dev)
83a06263 4594{
2f82ee54 4595 struct rbd_device *parent = NULL;
124afba2
AE
4596 struct rbd_spec *parent_spec;
4597 struct rbd_client *rbdc;
4598 int ret;
4599
4600 if (!rbd_dev->parent_spec)
4601 return 0;
4602 /*
4603 * We need to pass a reference to the client and the parent
4604 * spec when creating the parent rbd_dev. Images related by
4605 * parent/child relationships always share both.
4606 */
4607 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4608 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4609
4610 ret = -ENOMEM;
4611 parent = rbd_dev_create(rbdc, parent_spec);
4612 if (!parent)
4613 goto out_err;
4614
51344a38 4615 ret = rbd_dev_image_probe(parent, true);
124afba2
AE
4616 if (ret < 0)
4617 goto out_err;
4618 rbd_dev->parent = parent;
4619
4620 return 0;
4621out_err:
4622 if (parent) {
4623 rbd_spec_put(rbd_dev->parent_spec);
4624 kfree(rbd_dev->header_name);
4625 rbd_dev_destroy(parent);
4626 } else {
4627 rbd_put_client(rbdc);
4628 rbd_spec_put(parent_spec);
4629 }
4630
4631 return ret;
4632}
4633
200a6a8b 4634static int rbd_dev_device_setup(struct rbd_device *rbd_dev)
124afba2 4635{
83a06263 4636 int ret;
d1cf5788 4637
83a06263
AE
4638 /* generate unique id: find highest unique id, add one */
4639 rbd_dev_id_get(rbd_dev);
4640
4641 /* Fill in the device name, now that we have its id. */
4642 BUILD_BUG_ON(DEV_NAME_LEN
4643 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4644 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4645
4646 /* Get our block major device number. */
4647
4648 ret = register_blkdev(0, rbd_dev->name);
4649 if (ret < 0)
4650 goto err_out_id;
4651 rbd_dev->major = ret;
4652
4653 /* Set up the blkdev mapping. */
4654
4655 ret = rbd_init_disk(rbd_dev);
4656 if (ret)
4657 goto err_out_blkdev;
4658
f35a4dee 4659 ret = rbd_dev_mapping_set(rbd_dev);
83a06263
AE
4660 if (ret)
4661 goto err_out_disk;
f35a4dee
AE
4662 set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
4663
4664 ret = rbd_bus_add_dev(rbd_dev);
4665 if (ret)
4666 goto err_out_mapping;
83a06263 4667
83a06263
AE
4668 /* Everything's ready. Announce the disk to the world. */
4669
129b79d4 4670 set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
83a06263
AE
4671 add_disk(rbd_dev->disk);
4672
4673 pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4674 (unsigned long long) rbd_dev->mapping.size);
4675
4676 return ret;
2f82ee54 4677
f35a4dee
AE
4678err_out_mapping:
4679 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4680err_out_disk:
4681 rbd_free_disk(rbd_dev);
4682err_out_blkdev:
4683 unregister_blkdev(rbd_dev->major, rbd_dev->name);
4684err_out_id:
4685 rbd_dev_id_put(rbd_dev);
d1cf5788 4686 rbd_dev_mapping_clear(rbd_dev);
83a06263
AE
4687
4688 return ret;
4689}
4690
332bb12d
AE
4691static int rbd_dev_header_name(struct rbd_device *rbd_dev)
4692{
4693 struct rbd_spec *spec = rbd_dev->spec;
4694 size_t size;
4695
4696 /* Record the header object name for this rbd image. */
4697
4698 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4699
4700 if (rbd_dev->image_format == 1)
4701 size = strlen(spec->image_name) + sizeof (RBD_SUFFIX);
4702 else
4703 size = sizeof (RBD_HEADER_PREFIX) + strlen(spec->image_id);
4704
4705 rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4706 if (!rbd_dev->header_name)
4707 return -ENOMEM;
4708
4709 if (rbd_dev->image_format == 1)
4710 sprintf(rbd_dev->header_name, "%s%s",
4711 spec->image_name, RBD_SUFFIX);
4712 else
4713 sprintf(rbd_dev->header_name, "%s%s",
4714 RBD_HEADER_PREFIX, spec->image_id);
4715 return 0;
4716}
4717
200a6a8b
AE
4718static void rbd_dev_image_release(struct rbd_device *rbd_dev)
4719{
6fd48b3b
AE
4720 int ret;
4721
6fd48b3b
AE
4722 rbd_dev_unprobe(rbd_dev);
4723 ret = rbd_dev_header_watch_sync(rbd_dev, 0);
4724 if (ret)
4725 rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
200a6a8b 4726 kfree(rbd_dev->header_name);
6fd48b3b
AE
4727 rbd_dev->header_name = NULL;
4728 rbd_dev->image_format = 0;
4729 kfree(rbd_dev->spec->image_id);
4730 rbd_dev->spec->image_id = NULL;
4731
200a6a8b
AE
4732 rbd_dev_destroy(rbd_dev);
4733}
4734
a30b71b9
AE
4735/*
4736 * Probe for the existence of the header object for the given rbd
4737 * device. For format 2 images this includes determining the image
4738 * id.
4739 */
51344a38 4740static int rbd_dev_image_probe(struct rbd_device *rbd_dev, bool read_only)
a30b71b9
AE
4741{
4742 int ret;
b644de2b 4743 int tmp;
a30b71b9
AE
4744
4745 /*
4746 * Get the id from the image id object. If it's not a
4747 * format 2 image, we'll get ENOENT back, and we'll assume
4748 * it's a format 1 image.
4749 */
4750 ret = rbd_dev_image_id(rbd_dev);
4751 if (ret)
c0fba368
AE
4752 return ret;
4753 rbd_assert(rbd_dev->spec->image_id);
4754 rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
4755
332bb12d
AE
4756 ret = rbd_dev_header_name(rbd_dev);
4757 if (ret)
4758 goto err_out_format;
4759
b644de2b
AE
4760 ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4761 if (ret)
4762 goto out_header_name;
4763
c0fba368 4764 if (rbd_dev->image_format == 1)
a30b71b9
AE
4765 ret = rbd_dev_v1_probe(rbd_dev);
4766 else
4767 ret = rbd_dev_v2_probe(rbd_dev);
5655c4d9 4768 if (ret)
b644de2b 4769 goto err_out_watch;
83a06263 4770
9bb81c9b
AE
4771 ret = rbd_dev_spec_update(rbd_dev);
4772 if (ret)
33dca39f 4773 goto err_out_probe;
9bb81c9b 4774
51344a38
AE
4775 /* If we are mapping a snapshot it must be marked read-only */
4776
4777 if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
4778 read_only = true;
4779 rbd_dev->mapping.read_only = read_only;
4780
9bb81c9b 4781 ret = rbd_dev_probe_parent(rbd_dev);
6fd48b3b
AE
4782 if (!ret)
4783 return 0;
83a06263 4784
6fd48b3b
AE
4785err_out_probe:
4786 rbd_dev_unprobe(rbd_dev);
b644de2b
AE
4787err_out_watch:
4788 tmp = rbd_dev_header_watch_sync(rbd_dev, 0);
4789 if (tmp)
4790 rbd_warn(rbd_dev, "unable to tear down watch request\n");
332bb12d
AE
4791out_header_name:
4792 kfree(rbd_dev->header_name);
4793 rbd_dev->header_name = NULL;
4794err_out_format:
4795 rbd_dev->image_format = 0;
5655c4d9
AE
4796 kfree(rbd_dev->spec->image_id);
4797 rbd_dev->spec->image_id = NULL;
4798
4799 dout("probe failed, returning %d\n", ret);
4800
a30b71b9
AE
4801 return ret;
4802}
4803
59c2be1e
YS
4804static ssize_t rbd_add(struct bus_type *bus,
4805 const char *buf,
4806 size_t count)
602adf40 4807{
cb8627c7 4808 struct rbd_device *rbd_dev = NULL;
dc79b113 4809 struct ceph_options *ceph_opts = NULL;
4e9afeba 4810 struct rbd_options *rbd_opts = NULL;
859c31df 4811 struct rbd_spec *spec = NULL;
9d3997fd 4812 struct rbd_client *rbdc;
27cc2594 4813 struct ceph_osd_client *osdc;
51344a38 4814 bool read_only;
27cc2594 4815 int rc = -ENOMEM;
602adf40
YS
4816
4817 if (!try_module_get(THIS_MODULE))
4818 return -ENODEV;
4819
602adf40 4820 /* parse add command */
859c31df 4821 rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
dc79b113 4822 if (rc < 0)
bd4ba655 4823 goto err_out_module;
51344a38
AE
4824 read_only = rbd_opts->read_only;
4825 kfree(rbd_opts);
4826 rbd_opts = NULL; /* done with this */
78cea76e 4827
9d3997fd
AE
4828 rbdc = rbd_get_client(ceph_opts);
4829 if (IS_ERR(rbdc)) {
4830 rc = PTR_ERR(rbdc);
0ddebc0c 4831 goto err_out_args;
9d3997fd 4832 }
c53d5893 4833 ceph_opts = NULL; /* rbd_dev client now owns this */
602adf40 4834
602adf40 4835 /* pick the pool */
9d3997fd 4836 osdc = &rbdc->client->osdc;
859c31df 4837 rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
602adf40
YS
4838 if (rc < 0)
4839 goto err_out_client;
c0cd10db 4840 spec->pool_id = (u64)rc;
859c31df 4841
0903e875
AE
4842 /* The ceph file layout needs to fit pool id in 32 bits */
4843
c0cd10db
AE
4844 if (spec->pool_id > (u64)U32_MAX) {
4845 rbd_warn(NULL, "pool id too large (%llu > %u)\n",
4846 (unsigned long long)spec->pool_id, U32_MAX);
0903e875
AE
4847 rc = -EIO;
4848 goto err_out_client;
4849 }
4850
c53d5893 4851 rbd_dev = rbd_dev_create(rbdc, spec);
bd4ba655
AE
4852 if (!rbd_dev)
4853 goto err_out_client;
c53d5893
AE
4854 rbdc = NULL; /* rbd_dev now owns this */
4855 spec = NULL; /* rbd_dev now owns this */
602adf40 4856
51344a38 4857 rc = rbd_dev_image_probe(rbd_dev, read_only);
a30b71b9 4858 if (rc < 0)
c53d5893 4859 goto err_out_rbd_dev;
05fd6f6f 4860
b536f69a
AE
4861 rc = rbd_dev_device_setup(rbd_dev);
4862 if (!rc)
4863 return count;
4864
4865 rbd_dev_image_release(rbd_dev);
c53d5893
AE
4866err_out_rbd_dev:
4867 rbd_dev_destroy(rbd_dev);
bd4ba655 4868err_out_client:
9d3997fd 4869 rbd_put_client(rbdc);
0ddebc0c 4870err_out_args:
78cea76e
AE
4871 if (ceph_opts)
4872 ceph_destroy_options(ceph_opts);
4e9afeba 4873 kfree(rbd_opts);
859c31df 4874 rbd_spec_put(spec);
bd4ba655
AE
4875err_out_module:
4876 module_put(THIS_MODULE);
27cc2594 4877
602adf40 4878 dout("Error adding device %s\n", buf);
27cc2594 4879
c0cd10db 4880 return (ssize_t)rc;
602adf40
YS
4881}
4882
de71a297 4883static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
602adf40
YS
4884{
4885 struct list_head *tmp;
4886 struct rbd_device *rbd_dev;
4887
e124a82f 4888 spin_lock(&rbd_dev_list_lock);
602adf40
YS
4889 list_for_each(tmp, &rbd_dev_list) {
4890 rbd_dev = list_entry(tmp, struct rbd_device, node);
de71a297 4891 if (rbd_dev->dev_id == dev_id) {
e124a82f 4892 spin_unlock(&rbd_dev_list_lock);
602adf40 4893 return rbd_dev;
e124a82f 4894 }
602adf40 4895 }
e124a82f 4896 spin_unlock(&rbd_dev_list_lock);
602adf40
YS
4897 return NULL;
4898}
4899
200a6a8b 4900static void rbd_dev_device_release(struct device *dev)
602adf40 4901{
593a9e7b 4902 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
602adf40 4903
602adf40 4904 rbd_free_disk(rbd_dev);
200a6a8b 4905 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
6d80b130 4906 rbd_dev_mapping_clear(rbd_dev);
602adf40 4907 unregister_blkdev(rbd_dev->major, rbd_dev->name);
200a6a8b 4908 rbd_dev->major = 0;
e2839308 4909 rbd_dev_id_put(rbd_dev);
d1cf5788 4910 rbd_dev_mapping_clear(rbd_dev);
602adf40
YS
4911}
4912
05a46afd
AE
4913static void rbd_dev_remove_parent(struct rbd_device *rbd_dev)
4914{
ad945fc1 4915 while (rbd_dev->parent) {
05a46afd
AE
4916 struct rbd_device *first = rbd_dev;
4917 struct rbd_device *second = first->parent;
4918 struct rbd_device *third;
4919
4920 /*
4921 * Follow to the parent with no grandparent and
4922 * remove it.
4923 */
4924 while (second && (third = second->parent)) {
4925 first = second;
4926 second = third;
4927 }
ad945fc1 4928 rbd_assert(second);
8ad42cd0 4929 rbd_dev_image_release(second);
ad945fc1
AE
4930 first->parent = NULL;
4931 first->parent_overlap = 0;
4932
4933 rbd_assert(first->parent_spec);
05a46afd
AE
4934 rbd_spec_put(first->parent_spec);
4935 first->parent_spec = NULL;
05a46afd
AE
4936 }
4937}
4938
dfc5606d
YS
4939static ssize_t rbd_remove(struct bus_type *bus,
4940 const char *buf,
4941 size_t count)
602adf40
YS
4942{
4943 struct rbd_device *rbd_dev = NULL;
0d8189e1 4944 int target_id;
602adf40 4945 unsigned long ul;
0d8189e1 4946 int ret;
602adf40 4947
0d8189e1
AE
4948 ret = strict_strtoul(buf, 10, &ul);
4949 if (ret)
4950 return ret;
602adf40
YS
4951
4952 /* convert to int; abort if we lost anything in the conversion */
4953 target_id = (int) ul;
4954 if (target_id != ul)
4955 return -EINVAL;
4956
4957 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4958
4959 rbd_dev = __rbd_get_dev(target_id);
4960 if (!rbd_dev) {
4961 ret = -ENOENT;
4962 goto done;
42382b70
AE
4963 }
4964
a14ea269 4965 spin_lock_irq(&rbd_dev->lock);
b82d167b 4966 if (rbd_dev->open_count)
42382b70 4967 ret = -EBUSY;
b82d167b
AE
4968 else
4969 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
a14ea269 4970 spin_unlock_irq(&rbd_dev->lock);
b82d167b 4971 if (ret < 0)
42382b70 4972 goto done;
0d8189e1 4973 ret = count;
b480815a 4974 rbd_bus_del_dev(rbd_dev);
8ad42cd0 4975 rbd_dev_image_release(rbd_dev);
79ab7558 4976 module_put(THIS_MODULE);
602adf40
YS
4977done:
4978 mutex_unlock(&ctl_mutex);
aafb230e 4979
602adf40
YS
4980 return ret;
4981}
4982
602adf40
YS
4983/*
4984 * create control files in sysfs
dfc5606d 4985 * /sys/bus/rbd/...
602adf40
YS
4986 */
4987static int rbd_sysfs_init(void)
4988{
dfc5606d 4989 int ret;
602adf40 4990
fed4c143 4991 ret = device_register(&rbd_root_dev);
21079786 4992 if (ret < 0)
dfc5606d 4993 return ret;
602adf40 4994
fed4c143
AE
4995 ret = bus_register(&rbd_bus_type);
4996 if (ret < 0)
4997 device_unregister(&rbd_root_dev);
602adf40 4998
602adf40
YS
4999 return ret;
5000}
5001
5002static void rbd_sysfs_cleanup(void)
5003{
dfc5606d 5004 bus_unregister(&rbd_bus_type);
fed4c143 5005 device_unregister(&rbd_root_dev);
602adf40
YS
5006}
5007
1c2a9dfe
AE
5008static int rbd_slab_init(void)
5009{
5010 rbd_assert(!rbd_img_request_cache);
5011 rbd_img_request_cache = kmem_cache_create("rbd_img_request",
5012 sizeof (struct rbd_img_request),
5013 __alignof__(struct rbd_img_request),
5014 0, NULL);
868311b1
AE
5015 if (!rbd_img_request_cache)
5016 return -ENOMEM;
5017
5018 rbd_assert(!rbd_obj_request_cache);
5019 rbd_obj_request_cache = kmem_cache_create("rbd_obj_request",
5020 sizeof (struct rbd_obj_request),
5021 __alignof__(struct rbd_obj_request),
5022 0, NULL);
78c2a44a
AE
5023 if (!rbd_obj_request_cache)
5024 goto out_err;
5025
5026 rbd_assert(!rbd_segment_name_cache);
5027 rbd_segment_name_cache = kmem_cache_create("rbd_segment_name",
5028 MAX_OBJ_NAME_SIZE + 1, 1, 0, NULL);
5029 if (rbd_segment_name_cache)
1c2a9dfe 5030 return 0;
78c2a44a
AE
5031out_err:
5032 if (rbd_obj_request_cache) {
5033 kmem_cache_destroy(rbd_obj_request_cache);
5034 rbd_obj_request_cache = NULL;
5035 }
1c2a9dfe 5036
868311b1
AE
5037 kmem_cache_destroy(rbd_img_request_cache);
5038 rbd_img_request_cache = NULL;
5039
1c2a9dfe
AE
5040 return -ENOMEM;
5041}
5042
5043static void rbd_slab_exit(void)
5044{
78c2a44a
AE
5045 rbd_assert(rbd_segment_name_cache);
5046 kmem_cache_destroy(rbd_segment_name_cache);
5047 rbd_segment_name_cache = NULL;
5048
868311b1
AE
5049 rbd_assert(rbd_obj_request_cache);
5050 kmem_cache_destroy(rbd_obj_request_cache);
5051 rbd_obj_request_cache = NULL;
5052
1c2a9dfe
AE
5053 rbd_assert(rbd_img_request_cache);
5054 kmem_cache_destroy(rbd_img_request_cache);
5055 rbd_img_request_cache = NULL;
5056}
5057
cc344fa1 5058static int __init rbd_init(void)
602adf40
YS
5059{
5060 int rc;
5061
1e32d34c
AE
5062 if (!libceph_compatible(NULL)) {
5063 rbd_warn(NULL, "libceph incompatibility (quitting)");
5064
5065 return -EINVAL;
5066 }
1c2a9dfe 5067 rc = rbd_slab_init();
602adf40
YS
5068 if (rc)
5069 return rc;
1c2a9dfe
AE
5070 rc = rbd_sysfs_init();
5071 if (rc)
5072 rbd_slab_exit();
5073 else
5074 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5075
5076 return rc;
602adf40
YS
5077}
5078
cc344fa1 5079static void __exit rbd_exit(void)
602adf40
YS
5080{
5081 rbd_sysfs_cleanup();
1c2a9dfe 5082 rbd_slab_exit();
602adf40
YS
5083}
5084
5085module_init(rbd_init);
5086module_exit(rbd_exit);
5087
5088MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5089MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5090MODULE_DESCRIPTION("rados block device");
5091
5092/* following authorship retained from original osdblk.c */
5093MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5094
5095MODULE_LICENSE("GPL");